diff --git a/CHANGELOG.md b/CHANGELOG.md index 1c96dcdd2..b727f60cf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,9 @@ - **Gateway OTLP metrics in the per-pod sidecar**: when `spec.monitoring.enabled=true`, the OTel Collector sidecar now exposes an OTLP/gRPC receiver on `127.0.0.1:4317` and the documentdb-gateway is configured (via `OTEL_EXPORTER_OTLP_ENDPOINT` and `OTEL_METRICS_ENABLED`) to push its `db_client_*` metrics there. The sidecar's existing prometheus exporter re-exports them alongside the existing `documentdb.postgres.up` sqlquery output, with per-pod attribution added by the collector's resource processor. No new CRD fields; this turns on automatically wherever monitoring was already enabled. - **Two-Phase Extension Upgrade**: New `spec.schemaVersion` field separates binary upgrades (`spec.documentDBVersion`) from irreversible schema migrations (`ALTER EXTENSION UPDATE`). The default behavior gives you a rollback-safe window — update the binary first, validate, then finalize the schema. Set `schemaVersion: "auto"` for single-step upgrades in development environments. See the [upgrade guide](docs/operator-public-documentation/preview/operations/upgrades.md) for details. +### Behavioral Changes +- **Sidecar memory & CPU isolation**: `spec.resource.memory` and `spec.resource.cpu` are now treated as the total pod resource envelope. The operator reserves resources for the documentdb-gateway sidecar (memory default 18.75% of the envelope, capped at 32Gi) and, when `spec.monitoring.enabled` is true, the OTel collector sidecar (default memory limit 128Mi, CPU request 50m / limit 200m), then gives PostgreSQL the remainder and recomputes its memory-aware parameters (`shared_buffers`, etc.) accordingly. This isolates a gateway/collector leak so it is OOM-killed in its own container instead of crowding out PostgreSQL. The split is configurable per component via `spec.resource.{gateway,database,otel}` and fleet-wide via operator Helm values. The envelope is **optional**: you may omit `spec.resource.memory`/`cpu` for a dimension when it is set explicitly on both the gateway and the database (the effective envelope is then the sum); a partially specified dimension without an envelope is rejected by the validating webhook. Existing clusters adopt the new split (and a one-time rolling restart) on their next reconcile. + ### Breaking Changes - **CRD restructure into domain-grouped stanzas**: image, postgres and plugin fields have moved into dedicated groups. Migrate as follows: `spec.documentDBImage` → `spec.image.documentDB`, `spec.gatewayImage` → `spec.image.gateway`, `spec.postgresImage` → `spec.image.postgres`, `spec.sidecarInjectorPluginName` → `spec.plugins.sidecarInjectorName`. A new `spec.postgres` group exposes `uid`, `gid` and `postInitSQL` (the operator's mandatory bootstrap statements always run first; user statements are appended after). A new root-level `spec.imagePullSecrets` is propagated to the underlying CNPG cluster. - **Validating webhook added**: A new `ValidatingWebhookConfiguration` enforces that `spec.schemaVersion` never exceeds the binary version and blocks `spec.documentDBVersion` rollbacks below the committed schema version. This requires [cert-manager](https://cert-manager.io/) to be installed in the cluster (it is already a prerequisite for the sidecar injector). Existing clusters upgrading to this release will have the webhook activated automatically via `helm upgrade`. diff --git a/docs/operator-public-documentation/postgresql-tuning.md b/docs/operator-public-documentation/postgresql-tuning.md index 7ff4472c4..f7be1327b 100644 --- a/docs/operator-public-documentation/postgresql-tuning.md +++ b/docs/operator-public-documentation/postgresql-tuning.md @@ -15,7 +15,7 @@ The operator manages PostgreSQL parameters through a layered merge system with c ## Resource Configuration -Configure CPU and memory for your DocumentDB pods using the `spec.resource` section: +Configure CPU and memory for your DocumentDB pods using the `spec.resource` section. The `memory` and `cpu` values are total pod envelopes, not only PostgreSQL resources: ```yaml apiVersion: documentdb.io/preview @@ -26,13 +26,13 @@ spec: resource: storage: pvcSize: "50Gi" - memory: "8Gi" # Pod memory limit (Guaranteed QoS) - cpu: "4" # Pod CPU limit (Guaranteed QoS) + memory: "8Gi" # Total pod memory envelope + cpu: "4" # Total pod CPU envelope (carved like memory) ``` -When `memory` is set, the operator uses **Guaranteed QoS** (requests = limits), as recommended by CloudNative-PG for database workloads. This ensures predictable performance and stable memory for PostgreSQL buffer management. +When top-level `memory` or `cpu` is set, the operator allocates that envelope across the PostgreSQL container, the documentdb-gateway sidecar, and, when monitoring is enabled, the OTel Collector sidecar. Each component gets its own container resource settings so sidecars reserve their memory/CPU and a sidecar memory leak is OOM-killed in that sidecar instead of crowding out PostgreSQL. -If `memory` is not specified (or set to `"0"`), no resource limits are applied and static fallback values are used for memory-sensitive parameters. +If neither an envelope nor any per-container value is specified for a dimension, no limits are applied for that dimension and static fallback values are used for memory-sensitive parameters when memory is unmanaged. See [The envelope is optional](#the-envelope-is-optional) below for omitting the envelope while still sizing containers. !!! note Changing `memory` (or `cpu`) triggers a rolling restart of the DocumentDB pods, @@ -40,11 +40,86 @@ If `memory` is not specified (or set to `"0"`), no resource limits are applied a memory-aware PostgreSQL parameters (`shared_buffers`, `effective_cache_size`, `work_mem`, `maintenance_work_mem`) are recomputed and applied at the same time. +## Sidecar Memory Isolation + +The operator treats `spec.resource.memory` and `spec.resource.cpu` as total pod envelopes and carves out sidecar reservations before computing PostgreSQL settings: + +- **documentdb-gateway**: by default, reserves 18.75% of the total memory envelope, capped at 32Gi; its configured CPU reservation is carved from the CPU envelope. +- **OTel Collector**: when `spec.monitoring.enabled` is true, defaults to a 48Mi memory request, a 128Mi memory limit, a 50m CPU request, and a 200m CPU limit (Burstable — the requests are reserved and the limits cap a telemetry burst). +- **PostgreSQL**: receives the remaining memory and CPU, and memory-aware parameters such as `shared_buffers` are recomputed from that database allocation. + +Override individual containers with `spec.resource.gateway`, `spec.resource.database`, and `spec.resource.otel` when a cluster needs explicit sizing: + +```yaml +apiVersion: documentdb.io/preview +kind: DocumentDB +metadata: + name: sized-cluster +spec: + monitoring: + enabled: true + resource: + memory: "8Gi" # Total pod memory envelope + cpu: "4" # Total pod CPU envelope + gateway: + memory: "1Gi" + cpu: "500m" + database: + memory: "6Gi" + cpu: "3" + otel: + memory: "128Mi" + cpu: "50m" +``` + +Each per-component value is a Kubernetes quantity string and, when set, overrides the automatic split for that container. + +### The envelope is optional + +`spec.resource.memory` and `spec.resource.cpu` (the pod envelope) are optional. For each dimension independently: + +- **Set the envelope** and let the operator divide it (gateway and OTel reserved, PostgreSQL gets the remainder). +- **Omit the envelope** and instead set that dimension on **both** `spec.resource.gateway` and `spec.resource.database` — the effective envelope is the sum of the containers (the OTel collector uses its default if you do not set it). For example: + +```yaml +spec: + resource: + storage: + pvcSize: "50Gi" + # no top-level memory/cpu — derived from the containers below + gateway: + memory: "1Gi" + cpu: "500m" + database: + memory: "6Gi" + cpu: "3" +``` + +- **Omit the envelope and all container values** for a dimension to leave it unmanaged (no limits). + +If you omit the envelope but only partially specify the containers (for example, you set `gateway.memory` but not `database.memory`), the resource is **rejected** by the validating webhook, because the sidecar reservation and PostgreSQL remainder for that dimension cannot be derived without the envelope. Likewise, an explicit envelope that the sidecar reservations exhaust — or that explicit per-container values exceed — is rejected. + +Cluster-wide defaults are configured with the operator Helm chart: + +```yaml +operator: + sidecarResources: + gatewayMemoryFraction: "0.1875" + gatewayMemoryCap: "32Gi" + gatewayCpuLimit: "" # optional; bounds gateway async worker threads + otelMemoryRequest: "48Mi" + otelMemoryLimit: "128Mi" + otelCpuRequest: "50m" + otelCpuLimit: "200m" # ceiling on the collector's CPU burst +``` + +Use per-cluster `spec.resource` overrides for individual workload needs; use Helm values to change fleet-wide defaults for clusters managed by the operator. + ## Memory-Aware Defaults -When a memory limit is configured, these parameters are automatically computed: +When PostgreSQL has an effective database memory allocation, these parameters are automatically computed from that allocation: -| Parameter | Formula | Example (8Gi) | +| Parameter | Formula | Example (8Gi database allocation) | |-----------|---------|---------------| | `shared_buffers` | 25% of memory | 2GB | | `effective_cache_size` | 75% of memory | 6GB | @@ -53,7 +128,7 @@ When a memory limit is configured, these parameters are automatically computed: ### Sizing Reference -| Pod Memory | shared_buffers | effective_cache_size | work_mem | maintenance_work_mem | +| Database Memory | shared_buffers | effective_cache_size | work_mem | maintenance_work_mem | |-----------|----------------|---------------------|----------|---------------------| | (not set) | 256MB | 512MB | 16MB | 128MB | | 2Gi | 512MB | 1536MB | 4MB | 204MB | @@ -138,8 +213,8 @@ spec: This configuration will produce the following effective parameters (among others): -- `shared_buffers`: 4GB (auto-computed from 16Gi) -- `effective_cache_size`: 12GB (auto-computed) +- `shared_buffers`: auto-computed from the PostgreSQL memory remaining after sidecar reservations +- `effective_cache_size`: auto-computed from the same effective database allocation - `max_connections`: 500 (user override) - `wal_level`: logical (protected, from ChangeStreams gate) - `cron.database_name`: postgres (protected) diff --git a/operator/cnpg-plugins/sidecar-injector/internal/config/config.go b/operator/cnpg-plugins/sidecar-injector/internal/config/config.go index a2903fb17..c2df32128 100644 --- a/operator/cnpg-plugins/sidecar-injector/internal/config/config.go +++ b/operator/cnpg-plugins/sidecar-injector/internal/config/config.go @@ -12,6 +12,7 @@ import ( "github.com/cloudnative-pg/cnpg-i-machinery/pkg/pluginhelper/validation" "github.com/cloudnative-pg/cnpg-i/pkg/operator" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" ) const ( @@ -19,9 +20,17 @@ const ( annotationParameter = "annotations" gatewayImageParameter = "gatewayImage" gatewayImagePullPolicyParameter = "gatewayImagePullPolicy" + gatewayMemoryRequestParameter = "gatewayMemoryRequest" + gatewayMemoryLimitParameter = "gatewayMemoryLimit" + gatewayCPURequestParameter = "gatewayCpuRequest" + gatewayCPULimitParameter = "gatewayCpuLimit" documentDbCredentialSecretParameter = "documentDbCredentialSecret" otelCollectorImageParameter = "otelCollectorImage" otelConfigMapNameParameter = "otelConfigMapName" + otelMemoryRequestParameter = "otelMemoryRequest" + otelMemoryLimitParameter = "otelMemoryLimit" + otelCPURequestParameter = "otelCpuRequest" + otelCPULimitParameter = "otelCpuLimit" prometheusPortParameter = "prometheusPort" ) @@ -31,9 +40,17 @@ type Configuration struct { Annotations map[string]string GatewayImage string GatewayImagePullPolicy corev1.PullPolicy + GatewayMemoryRequest string + GatewayMemoryLimit string + GatewayCPURequest string + GatewayCPULimit string DocumentDbCredentialSecret string OtelCollectorImage string OtelConfigMapName string + OTelMemoryRequest string + OTelMemoryLimit string + OTelCPURequest string + OTelCPULimit string PrometheusPort int32 } @@ -67,6 +84,16 @@ func FromParameters( gatewayImage := helper.Parameters[gatewayImageParameter] credentialSecret := helper.Parameters[documentDbCredentialSecretParameter] pullPolicy := parsePullPolicy(helper.Parameters[gatewayImagePullPolicyParameter]) + validateQuantityParameters(helper, &validationErrors, + gatewayMemoryRequestParameter, + gatewayMemoryLimitParameter, + gatewayCPURequestParameter, + gatewayCPULimitParameter, + otelMemoryRequestParameter, + otelMemoryLimitParameter, + otelCPURequestParameter, + otelCPULimitParameter, + ) var prometheusPort int32 if portStr := helper.Parameters[prometheusPortParameter]; portStr != "" { @@ -86,9 +113,17 @@ func FromParameters( Annotations: annotations, GatewayImage: gatewayImage, GatewayImagePullPolicy: pullPolicy, + GatewayMemoryRequest: helper.Parameters[gatewayMemoryRequestParameter], + GatewayMemoryLimit: helper.Parameters[gatewayMemoryLimitParameter], + GatewayCPURequest: helper.Parameters[gatewayCPURequestParameter], + GatewayCPULimit: helper.Parameters[gatewayCPULimitParameter], DocumentDbCredentialSecret: credentialSecret, OtelCollectorImage: helper.Parameters[otelCollectorImageParameter], OtelConfigMapName: helper.Parameters[otelConfigMapNameParameter], + OTelMemoryRequest: helper.Parameters[otelMemoryRequestParameter], + OTelMemoryLimit: helper.Parameters[otelMemoryLimitParameter], + OTelCPURequest: helper.Parameters[otelCPURequestParameter], + OTelCPULimit: helper.Parameters[otelCPULimitParameter], PrometheusPort: prometheusPort, } @@ -115,6 +150,25 @@ func ValidateChanges( return validationErrors } +func validateQuantityParameters( + helper *common.Plugin, + validationErrors *[]*operator.ValidationError, + parameters ...string, +) { + for _, parameter := range parameters { + value := helper.Parameters[parameter] + if value == "" { + continue + } + if _, err := resource.ParseQuantity(value); err != nil { + *validationErrors = append( + *validationErrors, + validation.BuildErrorForParameter(helper, parameter, "invalid resource quantity: "+err.Error()), + ) + } + } +} + // applyDefaults fills the configuration with the defaults func (config *Configuration) applyDefaults() { if len(config.Labels) == 0 { @@ -166,7 +220,21 @@ func (config *Configuration) ToParameters() (map[string]string, error) { result[annotationParameter] = string(serializedAnnotations) result[gatewayImageParameter] = config.GatewayImage result[gatewayImagePullPolicyParameter] = string(config.GatewayImagePullPolicy) + // Omit empty optional resource params to avoid noisy defaulting diffs. + setIfNotEmpty := func(key, val string) { + if val != "" { + result[key] = val + } + } + setIfNotEmpty(gatewayMemoryRequestParameter, config.GatewayMemoryRequest) + setIfNotEmpty(gatewayMemoryLimitParameter, config.GatewayMemoryLimit) + setIfNotEmpty(gatewayCPURequestParameter, config.GatewayCPURequest) + setIfNotEmpty(gatewayCPULimitParameter, config.GatewayCPULimit) result[documentDbCredentialSecretParameter] = config.DocumentDbCredentialSecret + setIfNotEmpty(otelMemoryRequestParameter, config.OTelMemoryRequest) + setIfNotEmpty(otelMemoryLimitParameter, config.OTelMemoryLimit) + setIfNotEmpty(otelCPURequestParameter, config.OTelCPURequest) + setIfNotEmpty(otelCPULimitParameter, config.OTelCPULimit) return result, nil } diff --git a/operator/cnpg-plugins/sidecar-injector/internal/config/config_test.go b/operator/cnpg-plugins/sidecar-injector/internal/config/config_test.go index 3e8f3205a..1712b6038 100644 --- a/operator/cnpg-plugins/sidecar-injector/internal/config/config_test.go +++ b/operator/cnpg-plugins/sidecar-injector/internal/config/config_test.go @@ -83,12 +83,56 @@ func TestFromParameters(t *testing.T) { t.Errorf("GatewayImagePullPolicy = %q, want IfNotPresent", config.GatewayImagePullPolicy) } }) + + t.Run("resource parameters from parameters", func(t *testing.T) { + helper := &common.Plugin{Parameters: map[string]string{ + "gatewayMemoryRequest": "768Mi", + "gatewayMemoryLimit": "3Gi", + "gatewayCpuRequest": "500m", + "gatewayCpuLimit": "2", + "otelMemoryRequest": "64Mi", + "otelMemoryLimit": "128Mi", + "otelCpuRequest": "100m", + }} + config, errs := FromParameters(helper) + if len(errs) != 0 { + t.Fatalf("unexpected validation errors: %v", errs) + } + if config.GatewayMemoryRequest != "768Mi" { + t.Errorf("GatewayMemoryRequest = %q, want 768Mi", config.GatewayMemoryRequest) + } + if config.GatewayMemoryLimit != "3Gi" { + t.Errorf("GatewayMemoryLimit = %q, want 3Gi", config.GatewayMemoryLimit) + } + if config.GatewayCPURequest != "500m" { + t.Errorf("GatewayCPURequest = %q, want 500m", config.GatewayCPURequest) + } + if config.GatewayCPULimit != "2" { + t.Errorf("GatewayCPULimit = %q, want 2", config.GatewayCPULimit) + } + if config.OTelMemoryRequest != "64Mi" { + t.Errorf("OTelMemoryRequest = %q, want 64Mi", config.OTelMemoryRequest) + } + if config.OTelMemoryLimit != "128Mi" { + t.Errorf("OTelMemoryLimit = %q, want 128Mi", config.OTelMemoryLimit) + } + if config.OTelCPURequest != "100m" { + t.Errorf("OTelCPURequest = %q, want 100m", config.OTelCPURequest) + } + }) } func TestToParametersRoundTrip(t *testing.T) { original := &Configuration{ GatewayImage: "my-image:latest", GatewayImagePullPolicy: corev1.PullNever, + GatewayMemoryRequest: "768Mi", + GatewayMemoryLimit: "3Gi", + GatewayCPURequest: "500m", + GatewayCPULimit: "2", + OTelMemoryRequest: "64Mi", + OTelMemoryLimit: "128Mi", + OTelCPURequest: "100m", } original.applyDefaults() @@ -108,4 +152,25 @@ func TestToParametersRoundTrip(t *testing.T) { if restored.GatewayImage != original.GatewayImage { t.Errorf("round-trip gateway image = %q, want %q", restored.GatewayImage, original.GatewayImage) } + if restored.GatewayMemoryRequest != original.GatewayMemoryRequest { + t.Errorf("round-trip gateway memory request = %q, want %q", restored.GatewayMemoryRequest, original.GatewayMemoryRequest) + } + if restored.GatewayMemoryLimit != original.GatewayMemoryLimit { + t.Errorf("round-trip gateway memory limit = %q, want %q", restored.GatewayMemoryLimit, original.GatewayMemoryLimit) + } + if restored.GatewayCPURequest != original.GatewayCPURequest { + t.Errorf("round-trip gateway cpu request = %q, want %q", restored.GatewayCPURequest, original.GatewayCPURequest) + } + if restored.GatewayCPULimit != original.GatewayCPULimit { + t.Errorf("round-trip gateway cpu limit = %q, want %q", restored.GatewayCPULimit, original.GatewayCPULimit) + } + if restored.OTelMemoryRequest != original.OTelMemoryRequest { + t.Errorf("round-trip otel memory request = %q, want %q", restored.OTelMemoryRequest, original.OTelMemoryRequest) + } + if restored.OTelMemoryLimit != original.OTelMemoryLimit { + t.Errorf("round-trip otel memory limit = %q, want %q", restored.OTelMemoryLimit, original.OTelMemoryLimit) + } + if restored.OTelCPURequest != original.OTelCPURequest { + t.Errorf("round-trip otel cpu request = %q, want %q", restored.OTelCPURequest, original.OTelCPURequest) + } } diff --git a/operator/cnpg-plugins/sidecar-injector/internal/lifecycle/lifecycle.go b/operator/cnpg-plugins/sidecar-injector/internal/lifecycle/lifecycle.go index 709829e59..90bef7cff 100644 --- a/operator/cnpg-plugins/sidecar-injector/internal/lifecycle/lifecycle.go +++ b/operator/cnpg-plugins/sidecar-injector/internal/lifecycle/lifecycle.go @@ -9,12 +9,14 @@ import ( "errors" "fmt" "log" + "strconv" "github.com/cloudnative-pg/cnpg-i-machinery/pkg/pluginhelper/common" "github.com/cloudnative-pg/cnpg-i-machinery/pkg/pluginhelper/decoder" "github.com/cloudnative-pg/cnpg-i-machinery/pkg/pluginhelper/object" "github.com/cloudnative-pg/cnpg-i/pkg/lifecycle" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/utils/pointer" @@ -178,6 +180,14 @@ func (impl Implementation) reconcileMetadata( RunAsGroup: pointer.Int64(1000), }, } + if resources := buildResources( + configuration.GatewayCPURequest, + configuration.GatewayCPULimit, + configuration.GatewayMemoryRequest, + configuration.GatewayMemoryLimit, + ); hasResourceRequirements(resources) { + sidecar.Resources = resources + } // If TLS secret parameter provided, mount it at /tls // Track whether TLS secret is configured to augment container args later @@ -314,6 +324,17 @@ func (impl Implementation) reconcileMetadata( }, }, } + if resources := buildResources( + configuration.OTelCPURequest, + configuration.OTelCPULimit, + configuration.OTelMemoryRequest, + configuration.OTelMemoryLimit, + ); hasResourceRequirements(resources) { + otelSidecar.Resources = resources + } + if goMemLimitEnv, ok := buildGoMemLimitEnv(configuration.OTelMemoryLimit); ok { + otelSidecar.Env = append(otelSidecar.Env, goMemLimitEnv) + } // Expose Prometheus metrics port when configured if configuration.PrometheusPort > 0 { @@ -382,6 +403,56 @@ func (impl Implementation) reconcileMetadata( // Kept as a package-level constant so tests can reference it. const gatewayContainerName = "documentdb-gateway" +func buildResources(cpuReq, cpuLim, memReq, memLim string) corev1.ResourceRequirements { + var requests corev1.ResourceList + var limits corev1.ResourceList + + addQuantity := func(list corev1.ResourceList, name corev1.ResourceName, value string) corev1.ResourceList { + if value == "" { + return list + } + quantity, err := resource.ParseQuantity(value) + if err != nil { + return list + } + if list == nil { + list = corev1.ResourceList{} + } + list[name] = quantity + return list + } + + requests = addQuantity(requests, corev1.ResourceCPU, cpuReq) + requests = addQuantity(requests, corev1.ResourceMemory, memReq) + limits = addQuantity(limits, corev1.ResourceCPU, cpuLim) + limits = addQuantity(limits, corev1.ResourceMemory, memLim) + + return corev1.ResourceRequirements{ + Requests: requests, + Limits: limits, + } +} + +func hasResourceRequirements(resources corev1.ResourceRequirements) bool { + return len(resources.Requests) > 0 || len(resources.Limits) > 0 +} + +func buildGoMemLimitEnv(memoryLimit string) (corev1.EnvVar, bool) { + if memoryLimit == "" { + return corev1.EnvVar{}, false + } + + quantity, err := resource.ParseQuantity(memoryLimit) + if err != nil { + return corev1.EnvVar{}, false + } + + return corev1.EnvVar{ + Name: "GOMEMLIMIT", + Value: strconv.FormatInt(quantity.Value()*80/100, 10), + }, true +} + // gatewayOTelEnvVars returns the OTel-related env vars that the sidecar // injector adds to the gateway container so it can push metrics to the // co-located OTel Collector sidecar. diff --git a/operator/cnpg-plugins/sidecar-injector/internal/lifecycle/lifecycle_test.go b/operator/cnpg-plugins/sidecar-injector/internal/lifecycle/lifecycle_test.go index c722ef1fb..fd3d22753 100644 --- a/operator/cnpg-plugins/sidecar-injector/internal/lifecycle/lifecycle_test.go +++ b/operator/cnpg-plugins/sidecar-injector/internal/lifecycle/lifecycle_test.go @@ -4,10 +4,19 @@ package lifecycle import ( + "context" + "encoding/json" "reflect" + "strings" "testing" + apiv1 "github.com/cloudnative-pg/api/pkg/api/v1" + cnpglifecycle "github.com/cloudnative-pg/cnpg-i/pkg/lifecycle" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + pluginmetadata "github.com/documentdb/cnpg-i-sidecar-injector/pkg/metadata" ) func gatewayContainer(env ...corev1.EnvVar) *corev1.Pod { @@ -110,3 +119,160 @@ func TestInjectGatewayOTelEnv_NoGatewayContainer(t *testing.T) { t.Errorf("expected no envs on non-gateway container, got %v", envNames(pod.Spec.Containers[0].Env)) } } + +func TestLifecycleHookInjectsContainerResourcesAndGoMemLimit(t *testing.T) { + enabled := true + cluster := &apiv1.Cluster{ + TypeMeta: metav1.TypeMeta{ + APIVersion: apiv1.SchemeGroupVersion.String(), + Kind: apiv1.ClusterKind, + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "cluster", + Namespace: "default", + }, + Spec: apiv1.ClusterSpec{ + Plugins: []apiv1.PluginConfiguration{ + { + Name: pluginmetadata.PluginName, + Enabled: &enabled, + Parameters: map[string]string{ + "gatewayImage": "gateway:latest", + "gatewayMemoryRequest": "768Mi", + "gatewayMemoryLimit": "3Gi", + "gatewayCpuRequest": "500m", + "gatewayCpuLimit": "2", + "otelCollectorImage": "otel:latest", + "otelConfigMapName": "otel-config", + "otelMemoryRequest": "64Mi", + "otelMemoryLimit": "128Mi", + "otelCpuRequest": "100m", + "otelCpuLimit": "300m", + "documentDbCredentialSecret": "documentdb-credentials", + }, + }, + }, + }, + Status: apiv1.ClusterStatus{ + TargetPrimary: "cluster-1", + }, + } + pod := &corev1.Pod{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Pod", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "cluster-1", + Namespace: "default", + Labels: map[string]string{"cnpg.io/cluster": "cluster"}, + Annotations: map[string]string{"cnpg.io/operatorVersion": "test"}, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + {Name: "postgres"}, + }, + }, + } + + response, err := Implementation{}.LifecycleHook(context.Background(), &cnpglifecycle.OperatorLifecycleRequest{ + OperationType: &cnpglifecycle.OperatorOperationType{ + Type: cnpglifecycle.OperatorOperationType_TYPE_CREATE, + }, + ClusterDefinition: mustMarshal(t, cluster), + ObjectDefinition: mustMarshal(t, pod), + }) + if err != nil { + t.Fatalf("LifecycleHook() error: %v", err) + } + + containers := addedContainersFromPatch(t, response.JsonPatch) + gateway, ok := containers[gatewayContainerName] + if !ok { + t.Fatalf("gateway container missing from patch; patch=%s", string(response.JsonPatch)) + } + assertResourceQuantity(t, gateway.Resources.Requests, corev1.ResourceCPU, "500m") + assertResourceQuantity(t, gateway.Resources.Requests, corev1.ResourceMemory, "768Mi") + assertResourceQuantity(t, gateway.Resources.Limits, corev1.ResourceCPU, "2") + assertResourceQuantity(t, gateway.Resources.Limits, corev1.ResourceMemory, "3Gi") + + otel, ok := containers["otel-collector"] + if !ok { + t.Fatalf("otel container missing from patch; patch=%s", string(response.JsonPatch)) + } + assertResourceQuantity(t, otel.Resources.Requests, corev1.ResourceCPU, "100m") + assertResourceQuantity(t, otel.Resources.Requests, corev1.ResourceMemory, "64Mi") + assertResourceQuantity(t, otel.Resources.Limits, corev1.ResourceMemory, "128Mi") + assertResourceQuantity(t, otel.Resources.Limits, corev1.ResourceCPU, "300m") + assertEnvValue(t, otel.Env, "GOMEMLIMIT", "107374182") +} + +func mustMarshal(t *testing.T, value any) []byte { + t.Helper() + data, err := json.Marshal(value) + if err != nil { + t.Fatalf("json.Marshal() error: %v", err) + } + return data +} + +func addedContainersFromPatch(t *testing.T, patch []byte) map[string]corev1.Container { + t.Helper() + var operations []struct { + Path string `json:"path"` + Value json.RawMessage `json:"value"` + } + if err := json.Unmarshal(patch, &operations); err != nil { + t.Fatalf("json.Unmarshal(patch) error: %v", err) + } + + containers := map[string]corev1.Container{} + for _, operation := range operations { + if operation.Path == "/spec/containers" { + var added []corev1.Container + if err := json.Unmarshal(operation.Value, &added); err != nil { + continue + } + for _, container := range added { + containers[container.Name] = container + } + continue + } + if !strings.HasPrefix(operation.Path, "/spec/containers/") { + continue + } + var container corev1.Container + if err := json.Unmarshal(operation.Value, &container); err != nil { + continue + } + if container.Name != "" { + containers[container.Name] = container + } + } + return containers +} + +func assertResourceQuantity(t *testing.T, resources corev1.ResourceList, name corev1.ResourceName, want string) { + t.Helper() + got, ok := resources[name] + if !ok { + t.Fatalf("resource %s missing, want %s", name, want) + } + wantQuantity := resource.MustParse(want) + if got.Cmp(wantQuantity) != 0 { + t.Errorf("resource %s = %s, want %s", name, got.String(), want) + } +} + +func assertEnvValue(t *testing.T, env []corev1.EnvVar, name, want string) { + t.Helper() + for _, envVar := range env { + if envVar.Name == name { + if envVar.Value != want { + t.Errorf("%s = %q, want %q", name, envVar.Value, want) + } + return + } + } + t.Fatalf("%s env var missing", name) +} diff --git a/operator/documentdb-helm-chart/crds/documentdb.io_dbs.yaml b/operator/documentdb-helm-chart/crds/documentdb.io_dbs.yaml index 425d33b25..5ff7352a2 100644 --- a/operator/documentdb-helm-chart/crds/documentdb.io_dbs.yaml +++ b/operator/documentdb-helm-chart/crds/documentdb.io_dbs.yaml @@ -1361,12 +1361,51 @@ spec: properties: cpu: description: |- - CPU specifies the CPU limit for each DocumentDB instance pod. - This value is passed to the CNPG Cluster's spec.resources.limits.cpu - and spec.resources.requests.cpu (Guaranteed QoS). - If not specified or set to "0", no CPU limit is applied. + CPU specifies the total CPU envelope for each DocumentDB instance pod. + The operator divides this envelope across PostgreSQL, the documentdb-gateway + sidecar, and, when monitoring is enabled, the OTel collector sidecar. + PostgreSQL receives the remainder after gateway and OTel CPU reservations; + an explicit per-container CPU override wins over the automatic carve-out. + If not specified or set to "0", no CPU envelope is applied. Examples: "2", "4", "500m" type: string + database: + description: |- + Database optionally overrides the resources allocated to the PostgreSQL + container. When unset, PostgreSQL receives the pod memory and CPU envelopes + minus the gateway and (when monitoring is enabled) OTel collector carve-outs. + properties: + cpu: + description: CPU is the CPU request=limit for the container + (e.g. "500m", "2"). + pattern: ^([0-9]+(\.[0-9]+)?(m|Ki|Mi|Gi|Ti|Pi|Ei|k|M|G|T|P|E)?)?$ + type: string + memory: + description: Memory is the memory request=limit for the container + (e.g. "512Mi", "2Gi"). + pattern: ^([0-9]+(\.[0-9]+)?(m|Ki|Mi|Gi|Ti|Pi|Ei|k|M|G|T|P|E)?)?$ + type: string + type: object + gateway: + description: |- + Gateway optionally overrides the resources allocated to the + documentdb-gateway sidecar container. When unset, the operator derives the + gateway's memory as min(gatewayMemoryFraction × memory, gatewayMemoryCap) + and carves it out of the pod memory envelope. The value is applied as both + the request and the limit (Guaranteed-class) so a gateway leak is + OOM-isolated and cannot crowd out PostgreSQL. + properties: + cpu: + description: CPU is the CPU request=limit for the container + (e.g. "500m", "2"). + pattern: ^([0-9]+(\.[0-9]+)?(m|Ki|Mi|Gi|Ti|Pi|Ei|k|M|G|T|P|E)?)?$ + type: string + memory: + description: Memory is the memory request=limit for the container + (e.g. "512Mi", "2Gi"). + pattern: ^([0-9]+(\.[0-9]+)?(m|Ki|Mi|Gi|Ti|Pi|Ei|k|M|G|T|P|E)?)?$ + type: string + type: object memory: description: |- Memory specifies the memory limit for each DocumentDB instance pod. @@ -1378,6 +1417,26 @@ spec: defaults are used for memory-aware parameters. Examples: "2Gi", "4Gi", "8Gi" type: string + otel: + description: |- + OTel optionally overrides the resources allocated to the otel-collector + sidecar container (only present when spec.monitoring.enabled is true). + When unset, the operator applies built-in defaults: memory request 48Mi / + limit 128Mi and CPU request 50m / limit 200m (Burstable — the requests are + the reserved floor and the limits cap a telemetry burst). Setting otel.cpu + or otel.memory pins that dimension to request == limit (Guaranteed). + properties: + cpu: + description: CPU is the CPU request=limit for the container + (e.g. "500m", "2"). + pattern: ^([0-9]+(\.[0-9]+)?(m|Ki|Mi|Gi|Ti|Pi|Ei|k|M|G|T|P|E)?)?$ + type: string + memory: + description: Memory is the memory request=limit for the container + (e.g. "512Mi", "2Gi"). + pattern: ^([0-9]+(\.[0-9]+)?(m|Ki|Mi|Gi|Ti|Pi|Ei|k|M|G|T|P|E)?)?$ + type: string + type: object storage: description: Storage configuration for DocumentDB persistent volumes. properties: diff --git a/operator/documentdb-helm-chart/templates/09_documentdb_operator.yaml b/operator/documentdb-helm-chart/templates/09_documentdb_operator.yaml index 80422be25..f458eb057 100644 --- a/operator/documentdb-helm-chart/templates/09_documentdb_operator.yaml +++ b/operator/documentdb-helm-chart/templates/09_documentdb_operator.yaml @@ -93,6 +93,24 @@ spec: env: - name: GATEWAY_PORT value: "10260" + - name: DOCUMENTDB_GATEWAY_MEMORY_FRACTION + value: "{{ .Values.operator.sidecarResources.gatewayMemoryFraction }}" + - name: DOCUMENTDB_GATEWAY_MEMORY_CAP + value: "{{ .Values.operator.sidecarResources.gatewayMemoryCap }}" + {{- if .Values.operator.sidecarResources.gatewayCpuLimit }} + - name: DOCUMENTDB_GATEWAY_CPU_LIMIT + value: "{{ .Values.operator.sidecarResources.gatewayCpuLimit }}" + {{- end }} + - name: DOCUMENTDB_OTEL_MEMORY_REQUEST + value: "{{ .Values.operator.sidecarResources.otelMemoryRequest }}" + - name: DOCUMENTDB_OTEL_MEMORY_LIMIT + value: "{{ .Values.operator.sidecarResources.otelMemoryLimit }}" + - name: DOCUMENTDB_OTEL_CPU_REQUEST + value: "{{ .Values.operator.sidecarResources.otelCpuRequest }}" + {{- if .Values.operator.sidecarResources.otelCpuLimit }} + - name: DOCUMENTDB_OTEL_CPU_LIMIT + value: "{{ .Values.operator.sidecarResources.otelCpuLimit }}" + {{- end }} {{- if .Values.documentDbVersion }} - name: DOCUMENTDB_VERSION value: "{{ .Values.documentDbVersion }}" diff --git a/operator/documentdb-helm-chart/tests/09_operator_deployment_test.yaml b/operator/documentdb-helm-chart/tests/09_operator_deployment_test.yaml index 8e175b977..38cf337de 100644 --- a/operator/documentdb-helm-chart/tests/09_operator_deployment_test.yaml +++ b/operator/documentdb-helm-chart/tests/09_operator_deployment_test.yaml @@ -127,6 +127,67 @@ tests: name: GATEWAY_PORT value: "10260" + - it: should set default sidecar resource isolation env vars + asserts: + - contains: + path: spec.template.spec.containers[0].env + content: + name: DOCUMENTDB_GATEWAY_MEMORY_FRACTION + value: "0.1875" + - contains: + path: spec.template.spec.containers[0].env + content: + name: DOCUMENTDB_GATEWAY_MEMORY_CAP + value: "32Gi" + - contains: + path: spec.template.spec.containers[0].env + content: + name: DOCUMENTDB_OTEL_MEMORY_REQUEST + value: "48Mi" + - contains: + path: spec.template.spec.containers[0].env + content: + name: DOCUMENTDB_OTEL_MEMORY_LIMIT + value: "128Mi" + - contains: + path: spec.template.spec.containers[0].env + content: + name: DOCUMENTDB_OTEL_CPU_REQUEST + value: "50m" + - contains: + path: spec.template.spec.containers[0].env + content: + name: DOCUMENTDB_OTEL_CPU_LIMIT + value: "200m" + + - it: should omit DOCUMENTDB_GATEWAY_CPU_LIMIT when empty + asserts: + - notContains: + path: spec.template.spec.containers[0].env + content: + name: DOCUMENTDB_GATEWAY_CPU_LIMIT + any: true + + - it: should set DOCUMENTDB_GATEWAY_CPU_LIMIT when configured + set: + operator.sidecarResources.gatewayCpuLimit: "500m" + asserts: + - contains: + path: spec.template.spec.containers[0].env + content: + name: DOCUMENTDB_GATEWAY_CPU_LIMIT + value: "500m" + + - it: should omit DOCUMENTDB_OTEL_CPU_LIMIT when empty + set: + operator.sidecarResources.otelCpuLimit: "" + asserts: + - notContains: + path: spec.template.spec.containers[0].env + content: + name: DOCUMENTDB_OTEL_CPU_LIMIT + any: true + # ------------------------------------------------------------------- # Service account # ------------------------------------------------------------------- diff --git a/operator/documentdb-helm-chart/values.yaml b/operator/documentdb-helm-chart/values.yaml index 27128577f..fabf3bccb 100644 --- a/operator/documentdb-helm-chart/values.yaml +++ b/operator/documentdb-helm-chart/values.yaml @@ -78,6 +78,19 @@ certManager: # Defaults are conservative and aim to be compatible with Pod Security Admission's # `restricted` profile. Override any field per component as needed. operator: + # Sidecar resource isolation defaults. spec.resource.memory on a DocumentDB + # cluster is the total pod memory envelope; the operator reserves memory for + # the gateway sidecar (gatewayMemoryFraction of the envelope, capped at + # gatewayMemoryCap) and, when monitoring is enabled, the OTel collector + # (otelMemoryLimit), then gives PostgreSQL the remainder. + sidecarResources: + gatewayMemoryFraction: "0.1875" + gatewayMemoryCap: "32Gi" + gatewayCpuLimit: "" # optional; bounds gateway async worker threads + otelMemoryRequest: "48Mi" + otelMemoryLimit: "128Mi" + otelCpuRequest: "50m" + otelCpuLimit: "200m" # bounds the collector's CPU burst (ceiling) # Requests-only by convention: scheduler reserves capacity for the # operator, but no memory ceiling so a single operator can manage # fleets of any size without OOMKill. Set limits explicitly if your diff --git a/operator/src/api/preview/documentdb_types.go b/operator/src/api/preview/documentdb_types.go index ec69f59e7..edbf7e40e 100644 --- a/operator/src/api/preview/documentdb_types.go +++ b/operator/src/api/preview/documentdb_types.go @@ -269,13 +269,71 @@ type Resource struct { // +optional Memory string `json:"memory,omitempty"` - // CPU specifies the CPU limit for each DocumentDB instance pod. - // This value is passed to the CNPG Cluster's spec.resources.limits.cpu - // and spec.resources.requests.cpu (Guaranteed QoS). - // If not specified or set to "0", no CPU limit is applied. + // CPU specifies the total CPU envelope for each DocumentDB instance pod. + // The operator divides this envelope across PostgreSQL, the documentdb-gateway + // sidecar, and, when monitoring is enabled, the OTel collector sidecar. + // PostgreSQL receives the remainder after gateway and OTel CPU reservations; + // an explicit per-container CPU override wins over the automatic carve-out. + // If not specified or set to "0", no CPU envelope is applied. // Examples: "2", "4", "500m" // +optional CPU string `json:"cpu,omitempty"` + + // Memory and CPU above describe the TOTAL pod envelope, but are OPTIONAL. + // When a sidecar (the gateway, and — with monitoring enabled — the OTel + // collector) shares the pod, the operator carves its memory and CPU out of + // the envelope and gives PostgreSQL the remainder, recomputing PostgreSQL's + // memory-aware parameters from that reduced value. The optional per-component + // overrides below let you size each container independently; an explicit + // override always wins over the automatic carve-out. + // + // The envelope (Memory/CPU above) may be omitted for a dimension when that + // dimension is set explicitly on BOTH the gateway and the database — the + // effective envelope is then the sum of the containers. If you omit the + // envelope without fully specifying the containers, the resource is rejected; + // if you omit both the envelope and all container values, that dimension is + // left unmanaged (no limits). + + // Gateway optionally overrides the resources allocated to the + // documentdb-gateway sidecar container. When unset, the operator derives the + // gateway's memory as min(gatewayMemoryFraction × memory, gatewayMemoryCap) + // and carves it out of the pod memory envelope. The value is applied as both + // the request and the limit (Guaranteed-class) so a gateway leak is + // OOM-isolated and cannot crowd out PostgreSQL. + // +optional + Gateway *ComponentResources `json:"gateway,omitempty"` + + // Database optionally overrides the resources allocated to the PostgreSQL + // container. When unset, PostgreSQL receives the pod memory and CPU envelopes + // minus the gateway and (when monitoring is enabled) OTel collector carve-outs. + // +optional + Database *ComponentResources `json:"database,omitempty"` + + // OTel optionally overrides the resources allocated to the otel-collector + // sidecar container (only present when spec.monitoring.enabled is true). + // When unset, the operator applies built-in defaults: memory request 48Mi / + // limit 128Mi and CPU request 50m / limit 200m (Burstable — the requests are + // the reserved floor and the limits cap a telemetry burst). Setting otel.cpu + // or otel.memory pins that dimension to request == limit (Guaranteed). + // +optional + OTel *ComponentResources `json:"otel,omitempty"` +} + +// ComponentResources overrides the CPU and/or memory allocated to an individual +// container in the DocumentDB pod (PostgreSQL, the gateway, or the OTel +// collector). Each field is a Kubernetes quantity string; when set it is applied +// as both the request and the limit for that container (Guaranteed-class) and +// overrides the automatic carve-out derived from spec.resource.memory. +type ComponentResources struct { + // Memory is the memory request=limit for the container (e.g. "512Mi", "2Gi"). + // +kubebuilder:validation:Pattern=`^([0-9]+(\.[0-9]+)?(m|Ki|Mi|Gi|Ti|Pi|Ei|k|M|G|T|P|E)?)?$` + // +optional + Memory string `json:"memory,omitempty"` + + // CPU is the CPU request=limit for the container (e.g. "500m", "2"). + // +kubebuilder:validation:Pattern=`^([0-9]+(\.[0-9]+)?(m|Ki|Mi|Gi|Ti|Pi|Ei|k|M|G|T|P|E)?)?$` + // +optional + CPU string `json:"cpu,omitempty"` } type StorageConfiguration struct { diff --git a/operator/src/api/preview/zz_generated.deepcopy.go b/operator/src/api/preview/zz_generated.deepcopy.go index 804104c90..81ca7a057 100644 --- a/operator/src/api/preview/zz_generated.deepcopy.go +++ b/operator/src/api/preview/zz_generated.deepcopy.go @@ -195,6 +195,21 @@ func (in *ClusterReplication) DeepCopy() *ClusterReplication { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ComponentResources) DeepCopyInto(out *ComponentResources) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ComponentResources. +func (in *ComponentResources) DeepCopy() *ComponentResources { + if in == nil { + return nil + } + out := new(ComponentResources) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *DocumentDB) DeepCopyInto(out *DocumentDB) { *out = *in @@ -257,7 +272,7 @@ func (in *DocumentDBList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *DocumentDBSpec) DeepCopyInto(out *DocumentDBSpec) { *out = *in - out.Resource = in.Resource + in.Resource.DeepCopyInto(&out.Resource) if in.Image != nil { in, out := &in.Image, &out.Image *out = new(ImageSpec) @@ -642,6 +657,21 @@ func (in *RecoveryConfiguration) DeepCopy() *RecoveryConfiguration { func (in *Resource) DeepCopyInto(out *Resource) { *out = *in out.Storage = in.Storage + if in.Gateway != nil { + in, out := &in.Gateway, &out.Gateway + *out = new(ComponentResources) + **out = **in + } + if in.Database != nil { + in, out := &in.Database, &out.Database + *out = new(ComponentResources) + **out = **in + } + if in.OTel != nil { + in, out := &in.OTel, &out.OTel + *out = new(ComponentResources) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Resource. diff --git a/operator/src/config/crd/bases/documentdb.io_dbs.yaml b/operator/src/config/crd/bases/documentdb.io_dbs.yaml index 425d33b25..5ff7352a2 100644 --- a/operator/src/config/crd/bases/documentdb.io_dbs.yaml +++ b/operator/src/config/crd/bases/documentdb.io_dbs.yaml @@ -1361,12 +1361,51 @@ spec: properties: cpu: description: |- - CPU specifies the CPU limit for each DocumentDB instance pod. - This value is passed to the CNPG Cluster's spec.resources.limits.cpu - and spec.resources.requests.cpu (Guaranteed QoS). - If not specified or set to "0", no CPU limit is applied. + CPU specifies the total CPU envelope for each DocumentDB instance pod. + The operator divides this envelope across PostgreSQL, the documentdb-gateway + sidecar, and, when monitoring is enabled, the OTel collector sidecar. + PostgreSQL receives the remainder after gateway and OTel CPU reservations; + an explicit per-container CPU override wins over the automatic carve-out. + If not specified or set to "0", no CPU envelope is applied. Examples: "2", "4", "500m" type: string + database: + description: |- + Database optionally overrides the resources allocated to the PostgreSQL + container. When unset, PostgreSQL receives the pod memory and CPU envelopes + minus the gateway and (when monitoring is enabled) OTel collector carve-outs. + properties: + cpu: + description: CPU is the CPU request=limit for the container + (e.g. "500m", "2"). + pattern: ^([0-9]+(\.[0-9]+)?(m|Ki|Mi|Gi|Ti|Pi|Ei|k|M|G|T|P|E)?)?$ + type: string + memory: + description: Memory is the memory request=limit for the container + (e.g. "512Mi", "2Gi"). + pattern: ^([0-9]+(\.[0-9]+)?(m|Ki|Mi|Gi|Ti|Pi|Ei|k|M|G|T|P|E)?)?$ + type: string + type: object + gateway: + description: |- + Gateway optionally overrides the resources allocated to the + documentdb-gateway sidecar container. When unset, the operator derives the + gateway's memory as min(gatewayMemoryFraction × memory, gatewayMemoryCap) + and carves it out of the pod memory envelope. The value is applied as both + the request and the limit (Guaranteed-class) so a gateway leak is + OOM-isolated and cannot crowd out PostgreSQL. + properties: + cpu: + description: CPU is the CPU request=limit for the container + (e.g. "500m", "2"). + pattern: ^([0-9]+(\.[0-9]+)?(m|Ki|Mi|Gi|Ti|Pi|Ei|k|M|G|T|P|E)?)?$ + type: string + memory: + description: Memory is the memory request=limit for the container + (e.g. "512Mi", "2Gi"). + pattern: ^([0-9]+(\.[0-9]+)?(m|Ki|Mi|Gi|Ti|Pi|Ei|k|M|G|T|P|E)?)?$ + type: string + type: object memory: description: |- Memory specifies the memory limit for each DocumentDB instance pod. @@ -1378,6 +1417,26 @@ spec: defaults are used for memory-aware parameters. Examples: "2Gi", "4Gi", "8Gi" type: string + otel: + description: |- + OTel optionally overrides the resources allocated to the otel-collector + sidecar container (only present when spec.monitoring.enabled is true). + When unset, the operator applies built-in defaults: memory request 48Mi / + limit 128Mi and CPU request 50m / limit 200m (Burstable — the requests are + the reserved floor and the limits cap a telemetry burst). Setting otel.cpu + or otel.memory pins that dimension to request == limit (Guaranteed). + properties: + cpu: + description: CPU is the CPU request=limit for the container + (e.g. "500m", "2"). + pattern: ^([0-9]+(\.[0-9]+)?(m|Ki|Mi|Gi|Ti|Pi|Ei|k|M|G|T|P|E)?)?$ + type: string + memory: + description: Memory is the memory request=limit for the container + (e.g. "512Mi", "2Gi"). + pattern: ^([0-9]+(\.[0-9]+)?(m|Ki|Mi|Gi|Ti|Pi|Ei|k|M|G|T|P|E)?)?$ + type: string + type: object storage: description: Storage configuration for DocumentDB persistent volumes. properties: diff --git a/operator/src/internal/cnpg/cnpg_cluster.go b/operator/src/internal/cnpg/cnpg_cluster.go index bc8923993..893a72b6d 100644 --- a/operator/src/internal/cnpg/cnpg_cluster.go +++ b/operator/src/internal/cnpg/cnpg_cluster.go @@ -22,6 +22,8 @@ import ( ) func GetCnpgClusterSpec(req ctrl.Request, documentdb *dbpreview.DocumentDB, documentdbImage, serviceAccountName, storageClass string, isPrimaryRegion bool, log logr.Logger) *cnpgv1.Cluster { + split := ComputeResourceSplit(documentdb, DefaultSplitConfig()) + sidecarPluginName := pluginsSidecarInjectorName(documentdb) if sidecarPluginName == "" { sidecarPluginName = util.DEFAULT_SIDECAR_INJECTOR_PLUGIN @@ -85,6 +87,10 @@ func GetCnpgClusterSpec(req ctrl.Request, documentdb *dbpreview.DocumentDB, docu if pullPolicy := os.Getenv(util.GATEWAY_IMAGE_PULL_POLICY_ENV); pullPolicy != "" { params["gatewayImagePullPolicy"] = pullPolicy } + addPluginParamIfSet(params, util.PLUGIN_PARAM_GATEWAY_MEMORY_REQUEST, split.Gateway.MemoryRequest) + addPluginParamIfSet(params, util.PLUGIN_PARAM_GATEWAY_MEMORY_LIMIT, split.Gateway.MemoryLimit) + addPluginParamIfSet(params, util.PLUGIN_PARAM_GATEWAY_CPU_REQUEST, split.Gateway.CPURequest) + addPluginParamIfSet(params, util.PLUGIN_PARAM_GATEWAY_CPU_LIMIT, split.Gateway.CPULimit) // If TLS is ready, surface secret name to plugin so it can mount certs. if documentdb.Status.TLS != nil && documentdb.Status.TLS.Ready && documentdb.Status.TLS.SecretName != "" { params["gatewayTLSSecret"] = documentdb.Status.TLS.SecretName @@ -92,9 +98,13 @@ func GetCnpgClusterSpec(req ctrl.Request, documentdb *dbpreview.DocumentDB, docu // Pass monitoring parameters to plugin for OTel sidecar injection. // Sidecar is only injected when monitoring is enabled. // Config hash triggers operator-initiated rolling restart on config changes. - if documentdb.Spec.Monitoring != nil && documentdb.Spec.Monitoring.Enabled { + if split.MonitoringEnabled { params["otelCollectorImage"] = util.DEFAULT_OTEL_COLLECTOR_IMAGE params["otelConfigMapName"] = otelcfg.ConfigMapName(documentdb.Name) + addPluginParamIfSet(params, util.PLUGIN_PARAM_OTEL_MEMORY_REQUEST, split.OTel.MemoryRequest) + addPluginParamIfSet(params, util.PLUGIN_PARAM_OTEL_MEMORY_LIMIT, split.OTel.MemoryLimit) + addPluginParamIfSet(params, util.PLUGIN_PARAM_OTEL_CPU_REQUEST, split.OTel.CPURequest) + addPluginParamIfSet(params, util.PLUGIN_PARAM_OTEL_CPU_LIMIT, split.OTel.CPULimit) if promPort := otelcfg.ResolvePrometheusPort(documentdb.Spec.Monitoring); promPort > 0 { params["prometheusPort"] = fmt.Sprintf("%d", promPort) } @@ -113,7 +123,7 @@ func GetCnpgClusterSpec(req ctrl.Request, documentdb *dbpreview.DocumentDB, docu Parameters: params, }} }(), - PostgresConfiguration: buildPostgresConfiguration(documentdb, extensionImageSource), + PostgresConfiguration: buildPostgresConfiguration(documentdb, extensionImageSource, split.PostgresMemoryBytes), Bootstrap: getBootstrapConfiguration(documentdb, isPrimaryRegion, log), LogLevel: cmp.Or(documentdb.Spec.LogLevel, "info"), Backup: &cnpgv1.BackupConfiguration{ @@ -123,7 +133,7 @@ func GetCnpgClusterSpec(req ctrl.Request, documentdb *dbpreview.DocumentDB, docu Target: cnpgv1.BackupTarget("primary"), }, Affinity: documentdb.Spec.Affinity, - Resources: buildResourceRequirements(documentdb), + Resources: buildResourceRequirements(split.Postgres), } spec.MaxStopDelay = getMaxStopDelayOrDefault(documentdb) applyPostgresProcessIdentity(&spec, documentdb) @@ -133,6 +143,12 @@ func GetCnpgClusterSpec(req ctrl.Request, documentdb *dbpreview.DocumentDB, docu } } +func addPluginParamIfSet(params map[string]string, key, value string) { + if value != "" { + params[key] = value + } +} + func getInheritedMetadataLabels(appName string) *cnpgv1.EmbeddedObjectMetadata { return &cnpgv1.EmbeddedObjectMetadata{ Labels: map[string]string{ @@ -218,40 +234,48 @@ func parseMemoryToBytes(memoryStr string) int64 { return qty.Value() } -// buildResourceRequirements constructs corev1.ResourceRequirements from the -// DocumentDB Resource spec. Uses Guaranteed QoS (requests == limits) as -// recommended by CNPG. Returns empty requirements if neither Memory nor CPU is set. -func buildResourceRequirements(documentdb *dbpreview.DocumentDB) corev1.ResourceRequirements { +// buildResourceRequirements constructs corev1.ResourceRequirements from a +// resolved component resource split. Returns empty requirements if nothing is set. +func buildResourceRequirements(component ComponentResource) corev1.ResourceRequirements { reqs := corev1.ResourceRequirements{} - mem := documentdb.Spec.Resource.Memory - cpu := documentdb.Spec.Resource.CPU - if (mem == "" || mem == "0") && (cpu == "" || cpu == "0") { - return reqs + requests := corev1.ResourceList{} + if quantity, ok := parseResourceQuantity(component.MemoryRequest); ok { + requests[corev1.ResourceMemory] = quantity + } + if quantity, ok := parseResourceQuantity(component.CPURequest); ok { + requests[corev1.ResourceCPU] = quantity } limits := corev1.ResourceList{} - if mem != "" && mem != "0" { - if quantity, err := resource.ParseQuantity(mem); err == nil { - limits[corev1.ResourceMemory] = quantity - } + if quantity, ok := parseResourceQuantity(component.MemoryLimit); ok { + limits[corev1.ResourceMemory] = quantity } - if cpu != "" && cpu != "0" { - if quantity, err := resource.ParseQuantity(cpu); err == nil { - limits[corev1.ResourceCPU] = quantity - } + if quantity, ok := parseResourceQuantity(component.CPULimit); ok { + limits[corev1.ResourceCPU] = quantity } - if len(limits) == 0 { - return reqs + if len(requests) > 0 { + reqs.Requests = requests + } + if len(limits) > 0 { + reqs.Limits = limits } - // Guaranteed QoS: requests == limits - reqs.Limits = limits - reqs.Requests = limits.DeepCopy() return reqs } +func parseResourceQuantity(value string) (resource.Quantity, bool) { + if value == "" || value == "0" { + return resource.Quantity{}, false + } + quantity, err := resource.ParseQuantity(value) + if err != nil { + return resource.Quantity{}, false + } + return quantity, true +} + // parsePullPolicy converts a string to a corev1.PullPolicy. // Returns empty string for unrecognized values. func parsePullPolicy(value string) corev1.PullPolicy { @@ -331,7 +355,7 @@ func applyPostgresProcessIdentity(spec *cnpgv1.ClusterSpec, documentdb *dbprevie // stanza (mounted from spec.image.documentDB as an ImageVolumeSource), // sets a fixed AdditionalLibraries list, and applies a small set of // operator-managed GUCs. -func buildPostgresConfiguration(documentdb *dbpreview.DocumentDB, extensionImageSource corev1.ImageVolumeSource) cnpgv1.PostgresConfiguration { +func buildPostgresConfiguration(documentdb *dbpreview.DocumentDB, extensionImageSource corev1.ImageVolumeSource, pgMemoryBytes int64) cnpgv1.PostgresConfiguration { pgHBA := []string{ "host all all 0.0.0.0/0 trust", "host all all ::0/0 trust", @@ -349,7 +373,7 @@ func buildPostgresConfiguration(documentdb *dbpreview.DocumentDB, extensionImage }, }, AdditionalLibraries: []string{"pg_cron", "pg_documentdb_core", "pg_documentdb"}, - Parameters: MergeParameters(documentdb, parseMemoryToBytes(documentdb.Spec.Resource.Memory)), + Parameters: MergeParameters(documentdb, pgMemoryBytes), PgHBA: pgHBA, } } diff --git a/operator/src/internal/cnpg/cnpg_cluster_test.go b/operator/src/internal/cnpg/cnpg_cluster_test.go index 880786ca6..0c132a1df 100644 --- a/operator/src/internal/cnpg/cnpg_cluster_test.go +++ b/operator/src/internal/cnpg/cnpg_cluster_test.go @@ -194,6 +194,15 @@ var _ = Describe("getDefaultBootstrapConfiguration", func() { var _ = Describe("GetCnpgClusterSpec", func() { var log = zap.New(zap.WriteTo(GinkgoWriter)) + setProdSplitEnv := func() { + GinkgoT().Setenv(util.GATEWAY_MEMORY_FRACTION_ENV, "0.1875") + GinkgoT().Setenv(util.GATEWAY_MEMORY_CAP_ENV, "32Gi") + GinkgoT().Setenv(util.GATEWAY_CPU_LIMIT_ENV, "") + GinkgoT().Setenv(util.OTEL_MEMORY_REQUEST_ENV, "48Mi") + GinkgoT().Setenv(util.OTEL_MEMORY_LIMIT_ENV, "128Mi") + GinkgoT().Setenv(util.OTEL_CPU_REQUEST_ENV, "50m") + } + It("creates a CNPG cluster spec with default bootstrap", func() { req := ctrl.Request{} req.Name = "test-cluster" @@ -544,6 +553,70 @@ var _ = Describe("GetCnpgClusterSpec", func() { Expect(params).To(HaveKeyWithValue("max_wal_senders", "10")) }) + It("uses carved postgres resources and gateway plugin params when monitoring is disabled", func() { + setProdSplitEnv() + req := ctrl.Request{} + req.Name = "test-cluster" + req.Namespace = "default" + + documentdb := &dbpreview.DocumentDB{ + Spec: dbpreview.DocumentDBSpec{ + InstancesPerNode: 1, + Resource: dbpreview.Resource{ + Storage: dbpreview.StorageConfiguration{ + PvcSize: "10Gi", + }, + Memory: "16Gi", + }, + }, + } + + cluster := GetCnpgClusterSpec(req, documentdb, "test-image:latest", "test-sa", "", true, log) + expectedPostgresMemory := resource.MustParse("13Gi") + Expect(cluster.Spec.Resources.Limits[corev1.ResourceMemory]).To(Equal(expectedPostgresMemory)) + Expect(cluster.Spec.Resources.Requests[corev1.ResourceMemory]).To(Equal(expectedPostgresMemory)) + Expect(cluster.Spec.Plugins[0].Parameters).To(HaveKeyWithValue(util.PLUGIN_PARAM_GATEWAY_MEMORY_REQUEST, "3Gi")) + Expect(cluster.Spec.Plugins[0].Parameters).To(HaveKeyWithValue(util.PLUGIN_PARAM_GATEWAY_MEMORY_LIMIT, "3Gi")) + Expect(cluster.Spec.Plugins[0].Parameters).NotTo(HaveKey(util.PLUGIN_PARAM_OTEL_MEMORY_REQUEST)) + Expect(cluster.Spec.Plugins[0].Parameters).NotTo(HaveKey(util.PLUGIN_PARAM_OTEL_MEMORY_LIMIT)) + Expect(cluster.Spec.PostgresConfiguration.Parameters).To(HaveKeyWithValue("shared_buffers", "3328MB")) + }) + + It("passes OTel resource plugin params and carves OTel memory when monitoring is enabled", func() { + setProdSplitEnv() + req := ctrl.Request{} + req.Name = "test-cluster" + req.Namespace = "default" + + documentdb := &dbpreview.DocumentDB{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + Namespace: "default", + }, + Spec: dbpreview.DocumentDBSpec{ + InstancesPerNode: 1, + Resource: dbpreview.Resource{ + Storage: dbpreview.StorageConfiguration{ + PvcSize: "10Gi", + }, + Memory: "16Gi", + }, + Monitoring: &dbpreview.MonitoringSpec{ + Enabled: true, + }, + }, + } + + cluster := GetCnpgClusterSpec(req, documentdb, "test-image:latest", "test-sa", "", true, log) + expectedPostgresMemory := resource.MustParse("13184Mi") + Expect(cluster.Spec.Resources.Limits[corev1.ResourceMemory]).To(Equal(expectedPostgresMemory)) + Expect(cluster.Spec.Resources.Requests[corev1.ResourceMemory]).To(Equal(expectedPostgresMemory)) + Expect(cluster.Spec.Plugins[0].Parameters).To(HaveKeyWithValue(util.PLUGIN_PARAM_GATEWAY_MEMORY_LIMIT, "3Gi")) + Expect(cluster.Spec.Plugins[0].Parameters).To(HaveKeyWithValue(util.PLUGIN_PARAM_OTEL_MEMORY_REQUEST, "48Mi")) + Expect(cluster.Spec.Plugins[0].Parameters).To(HaveKeyWithValue(util.PLUGIN_PARAM_OTEL_MEMORY_LIMIT, "128Mi")) + Expect(cluster.Spec.Plugins[0].Parameters).To(HaveKeyWithValue(util.PLUGIN_PARAM_OTEL_CPU_REQUEST, "50m")) + }) + It("passes monitoring parameters to plugin when monitoring is enabled", func() { req := ctrl.Request{} req.Name = "test-cluster" @@ -830,123 +903,101 @@ var _ = Describe("parseMemoryToBytes", func() { var _ = Describe("buildResourceRequirements", func() { It("returns empty requirements when both memory and cpu are empty", func() { - documentdb := &dbpreview.DocumentDB{ - Spec: dbpreview.DocumentDBSpec{ - Resource: dbpreview.Resource{ - Storage: dbpreview.StorageConfiguration{PvcSize: "10Gi"}, - }, - }, - } - result := buildResourceRequirements(documentdb) + result := buildResourceRequirements(ComponentResource{}) Expect(result.Limits).To(BeNil()) Expect(result.Requests).To(BeNil()) }) It("returns empty requirements when both memory and cpu are '0'", func() { - documentdb := &dbpreview.DocumentDB{ - Spec: dbpreview.DocumentDBSpec{ - Resource: dbpreview.Resource{ - Storage: dbpreview.StorageConfiguration{PvcSize: "10Gi"}, - Memory: "0", - CPU: "0", - }, - }, - } - result := buildResourceRequirements(documentdb) + result := buildResourceRequirements(ComponentResource{ + MemoryRequest: "0", + MemoryLimit: "0", + CPURequest: "0", + CPULimit: "0", + }) Expect(result.Limits).To(BeNil()) Expect(result.Requests).To(BeNil()) }) It("sets memory limits and requests with Guaranteed QoS", func() { - documentdb := &dbpreview.DocumentDB{ - Spec: dbpreview.DocumentDBSpec{ - Resource: dbpreview.Resource{ - Storage: dbpreview.StorageConfiguration{PvcSize: "10Gi"}, - Memory: "4Gi", - }, - }, - } - result := buildResourceRequirements(documentdb) + result := buildResourceRequirements(ComponentResource{ + MemoryRequest: "4Gi", + MemoryLimit: "4Gi", + }) expectedMem := resource.MustParse("4Gi") Expect(result.Limits[corev1.ResourceMemory]).To(Equal(expectedMem)) Expect(result.Requests[corev1.ResourceMemory]).To(Equal(expectedMem)) }) It("sets cpu limits and requests with Guaranteed QoS", func() { - documentdb := &dbpreview.DocumentDB{ - Spec: dbpreview.DocumentDBSpec{ - Resource: dbpreview.Resource{ - Storage: dbpreview.StorageConfiguration{PvcSize: "10Gi"}, - CPU: "2", - }, - }, - } - result := buildResourceRequirements(documentdb) + result := buildResourceRequirements(ComponentResource{ + CPURequest: "2", + CPULimit: "2", + }) expectedCPU := resource.MustParse("2") Expect(result.Limits[corev1.ResourceCPU]).To(Equal(expectedCPU)) Expect(result.Requests[corev1.ResourceCPU]).To(Equal(expectedCPU)) }) It("sets both memory and cpu", func() { - documentdb := &dbpreview.DocumentDB{ - Spec: dbpreview.DocumentDBSpec{ - Resource: dbpreview.Resource{ - Storage: dbpreview.StorageConfiguration{PvcSize: "10Gi"}, - Memory: "8Gi", - CPU: "4", - }, - }, - } - result := buildResourceRequirements(documentdb) + result := buildResourceRequirements(ComponentResource{ + MemoryRequest: "8Gi", + MemoryLimit: "8Gi", + CPURequest: "4", + CPULimit: "4", + }) Expect(result.Limits[corev1.ResourceMemory]).To(Equal(resource.MustParse("8Gi"))) Expect(result.Limits[corev1.ResourceCPU]).To(Equal(resource.MustParse("4"))) Expect(result.Requests[corev1.ResourceMemory]).To(Equal(resource.MustParse("8Gi"))) Expect(result.Requests[corev1.ResourceCPU]).To(Equal(resource.MustParse("4"))) }) + It("sets requests and limits independently", func() { + result := buildResourceRequirements(ComponentResource{ + MemoryRequest: "6Gi", + MemoryLimit: "8Gi", + CPURequest: "1500m", + CPULimit: "2", + }) + Expect(result.Requests[corev1.ResourceMemory]).To(Equal(resource.MustParse("6Gi"))) + Expect(result.Limits[corev1.ResourceMemory]).To(Equal(resource.MustParse("8Gi"))) + Expect(result.Requests[corev1.ResourceCPU]).To(Equal(resource.MustParse("1500m"))) + Expect(result.Limits[corev1.ResourceCPU]).To(Equal(resource.MustParse("2"))) + }) + It("ignores invalid memory values gracefully", func() { - documentdb := &dbpreview.DocumentDB{ - Spec: dbpreview.DocumentDBSpec{ - Resource: dbpreview.Resource{ - Storage: dbpreview.StorageConfiguration{PvcSize: "10Gi"}, - Memory: "notvalid", - CPU: "2", - }, - }, - } - result := buildResourceRequirements(documentdb) + result := buildResourceRequirements(ComponentResource{ + MemoryRequest: "notvalid", + MemoryLimit: "notvalid", + CPURequest: "2", + CPULimit: "2", + }) _, hasMem := result.Limits[corev1.ResourceMemory] Expect(hasMem).To(BeFalse()) Expect(result.Limits[corev1.ResourceCPU]).To(Equal(resource.MustParse("2"))) + Expect(result.Requests[corev1.ResourceCPU]).To(Equal(resource.MustParse("2"))) }) It("ignores invalid cpu values gracefully", func() { - documentdb := &dbpreview.DocumentDB{ - Spec: dbpreview.DocumentDBSpec{ - Resource: dbpreview.Resource{ - Storage: dbpreview.StorageConfiguration{PvcSize: "10Gi"}, - Memory: "4Gi", - CPU: "notvalid", - }, - }, - } - result := buildResourceRequirements(documentdb) + result := buildResourceRequirements(ComponentResource{ + MemoryRequest: "4Gi", + MemoryLimit: "4Gi", + CPURequest: "notvalid", + CPULimit: "notvalid", + }) _, hasCPU := result.Limits[corev1.ResourceCPU] Expect(hasCPU).To(BeFalse()) Expect(result.Limits[corev1.ResourceMemory]).To(Equal(resource.MustParse("4Gi"))) + Expect(result.Requests[corev1.ResourceMemory]).To(Equal(resource.MustParse("4Gi"))) }) It("returns empty requirements when all values are invalid", func() { - documentdb := &dbpreview.DocumentDB{ - Spec: dbpreview.DocumentDBSpec{ - Resource: dbpreview.Resource{ - Storage: dbpreview.StorageConfiguration{PvcSize: "10Gi"}, - Memory: "notvalid", - CPU: "alsonotvalid", - }, - }, - } - result := buildResourceRequirements(documentdb) + result := buildResourceRequirements(ComponentResource{ + MemoryRequest: "notvalid", + MemoryLimit: "notvalid", + CPURequest: "alsonotvalid", + CPULimit: "alsonotvalid", + }) Expect(result.Limits).To(BeNil()) Expect(result.Requests).To(BeNil()) }) diff --git a/operator/src/internal/cnpg/cnpg_sync.go b/operator/src/internal/cnpg/cnpg_sync.go index 4d32f66e3..2b5a3791c 100644 --- a/operator/src/internal/cnpg/cnpg_sync.go +++ b/operator/src/internal/cnpg/cnpg_sync.go @@ -14,14 +14,17 @@ import ( "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" + + util "github.com/documentdb/documentdb-operator/internal/utils" ) // SyncCnpgCluster compares the current and desired CNPG Cluster specs and patches // all fields in a single atomic JSON Patch operation. This is the single entry point // for ALL CNPG spec mutations (images + plugin params + replication). // -// Mutable plugin parameters synced: gatewayImage, gatewayTLSSecret, and OTel sidecar -// params (otelCollectorImage, otelConfigMapName, prometheusPort, otelConfigHash). +// Mutable plugin parameters synced: gatewayImage, gatewayTLSSecret, sidecar +// resource params, and OTel sidecar params (otelCollectorImage, +// otelConfigMapName, prometheusPort, otelConfigHash). // Other parameters (e.g., documentDbCredentialSecret) are set at cluster creation // and do not change during the lifecycle of a DocumentDB resource. // @@ -94,8 +97,8 @@ func SyncCnpgCluster( pluginParamsChanged = true } - // OTel sidecar parameters: add/update when monitoring is enabled, - // remove when monitoring is disabled. + // Sidecar parameters: add/update when desired is set, + // remove when desired is unset. // TODO(otel): Currently, changing OTel params triggers a rolling restart // (the operator adds a restart annotation) because the sidecar-injector // plugin reads params at pod creation time. @@ -103,8 +106,20 @@ func SyncCnpgCluster( // (e.g. Prometheus port, collector image) can take effect without restarting // database pods — for example, by updating the ConfigMap in-place and // signalling the OTel Collector to reload its configuration. - otelKeys := []string{"otelCollectorImage", "otelConfigMapName", "prometheusPort", "otelConfigHash"} - for _, key := range otelKeys { + sidecarParamKeys := []string{ + util.PLUGIN_PARAM_GATEWAY_MEMORY_REQUEST, + util.PLUGIN_PARAM_GATEWAY_MEMORY_LIMIT, + util.PLUGIN_PARAM_GATEWAY_CPU_REQUEST, + util.PLUGIN_PARAM_GATEWAY_CPU_LIMIT, + "otelCollectorImage", + "otelConfigMapName", + "prometheusPort", + "otelConfigHash", + util.PLUGIN_PARAM_OTEL_MEMORY_REQUEST, + util.PLUGIN_PARAM_OTEL_MEMORY_LIMIT, + util.PLUGIN_PARAM_OTEL_CPU_REQUEST, + } + for _, key := range sidecarParamKeys { desiredVal := getParam(desiredPlugin.Parameters, key) currentVal := getParam(currentPlugin.Parameters, key) if desiredVal != "" && currentVal != desiredVal { diff --git a/operator/src/internal/cnpg/resource_split.go b/operator/src/internal/cnpg/resource_split.go new file mode 100644 index 000000000..961695f1c --- /dev/null +++ b/operator/src/internal/cnpg/resource_split.go @@ -0,0 +1,293 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +package cnpg + +import ( + "os" + "strconv" + + "k8s.io/apimachinery/pkg/api/resource" + + dbpreview "github.com/documentdb/documentdb-operator/api/preview" + util "github.com/documentdb/documentdb-operator/internal/utils" +) + +// SplitConfig holds the operator-level (Helm/env configurable) knobs that drive +// the pod memory carve-out between PostgreSQL and the sidecars. +type SplitConfig struct { + // GatewayMemoryFraction is the fraction (0..1) of the pod memory envelope + // reserved for the gateway sidecar when no explicit override is given. + GatewayMemoryFraction float64 + // GatewayMemoryCapBytes caps the derived gateway memory (bytes). 0 = no cap. + GatewayMemoryCapBytes int64 + // GatewayCPULimit optionally pins a CPU limit on the gateway (quantity + // string, e.g. "2"). Empty leaves the gateway CPU unbounded. + GatewayCPULimit string + // OTelMemoryRequest / OTelMemoryLimit size the OTel collector sidecar + // (quantity strings). The limit is what gets carved from the envelope. + OTelMemoryRequest string + OTelMemoryLimit string + // OTelCPURequest is the OTel collector CPU request (quantity string). + OTelCPURequest string + // OTelCPULimit bounds the OTel collector CPU (a ceiling on burst). Empty + // leaves the collector's CPU unbounded. + OTelCPULimit string +} + +// ComponentResource is a resolved per-container request/limit pair. Empty +// strings mean "unset" (the operator omits that request/limit). +type ComponentResource struct { + MemoryRequest string + MemoryLimit string + CPURequest string + CPULimit string +} + +// setMemory pins the container's memory request and limit to q (Guaranteed). +func (c *ComponentResource) setMemory(q string) { + c.MemoryRequest = q + c.MemoryLimit = q +} + +// setCPU pins the container's CPU request and limit to q (Guaranteed). +func (c *ComponentResource) setCPU(q string) { + c.CPURequest = q + c.CPULimit = q +} + +// ResourceSplit is the resolved allocation across the pod's containers. +type ResourceSplit struct { + Postgres ComponentResource + Gateway ComponentResource + // OTel is only populated when monitoring is enabled. + OTel ComponentResource + MonitoringEnabled bool + // PostgresMemoryBytes is the memory limit (bytes) PostgreSQL receives after + // the carve-out. Used to compute memory-aware GUCs. 0 means unset/unlimited. + PostgresMemoryBytes int64 +} + +// DefaultSplitConfig loads the carve-out configuration from the operator +// environment, falling back to the documented production defaults. +func DefaultSplitConfig() SplitConfig { + frac := parseFloatOr(os.Getenv(util.GATEWAY_MEMORY_FRACTION_ENV), util.DEFAULT_GATEWAY_MEMORY_FRACTION) + capBytes := parseQuantityOr(os.Getenv(util.GATEWAY_MEMORY_CAP_ENV), util.DEFAULT_GATEWAY_MEMORY_CAP) + return SplitConfig{ + GatewayMemoryFraction: frac, + GatewayMemoryCapBytes: capBytes, + GatewayCPULimit: os.Getenv(util.GATEWAY_CPU_LIMIT_ENV), + OTelMemoryRequest: envOr(util.OTEL_MEMORY_REQUEST_ENV, util.DEFAULT_OTEL_MEMORY_REQUEST), + OTelMemoryLimit: envOr(util.OTEL_MEMORY_LIMIT_ENV, util.DEFAULT_OTEL_MEMORY_LIMIT), + OTelCPURequest: envOr(util.OTEL_CPU_REQUEST_ENV, util.DEFAULT_OTEL_CPU_REQUEST), + OTelCPULimit: envOr(util.OTEL_CPU_LIMIT_ENV, util.DEFAULT_OTEL_CPU_LIMIT), + } +} + +// ComputeResourceSplit resolves how the pod memory and CPU envelopes +// (spec.resource.memory / spec.resource.cpu) are divided across the PostgreSQL, +// gateway, and (when monitoring is enabled) OTel collector containers. +// +// The envelope is OPTIONAL. For each dimension: +// - If the envelope is set, the operator carves it: the gateway and OTel +// collector reservations are subtracted and PostgreSQL (the sink) gets the +// remainder. Unset reservations fall back to defaults (gateway memory = +// min(fraction × envelope, cap); OTel memory = limit default; OTel cpu = +// request default). +// - If the envelope is omitted, every container that has an explicit value +// keeps it; the effective envelope is the sum of the resolved containers. +// Containers whose default depends on the envelope (gateway memory fraction, +// PostgreSQL remainder) can only be derived when the envelope is set, so the +// omitted-envelope path requires those to be explicit — see ValidateResources. +// +// Legacy behavior is preserved: when neither the envelope nor any per-container +// value is set for a dimension, that dimension is left unmanaged (no limits). +func ComputeResourceSplit(documentdb *dbpreview.DocumentDB, cfg SplitConfig) ResourceSplit { + res := documentdb.Spec.Resource + monitoring := documentdb.Spec.Monitoring != nil && documentdb.Spec.Monitoring.Enabled + + envelopeBytes := parseMemoryToBytes(res.Memory) + split := ResourceSplit{MonitoringEnabled: monitoring} + + // --- OTel collector (memory) --- + var otelBytes int64 + if monitoring { + if componentMemSet(res.OTel) { + // Explicit override: requests == limits (Guaranteed). + split.OTel.setMemory(res.OTel.Memory) + otelBytes = parseMemoryToBytes(res.OTel.Memory) + } else { + split.OTel.MemoryRequest = cfg.OTelMemoryRequest + split.OTel.MemoryLimit = cfg.OTelMemoryLimit + // Carve the LIMIT (worst case) from the envelope so it is not + // oversubscribed. + otelBytes = parseMemoryToBytes(cfg.OTelMemoryLimit) + } + // OTel CPU: an explicit override pins request == limit (Guaranteed); + // otherwise the collector keeps its Burstable default (request floor + + // a bounded limit ceiling). CPU is compressible, so the carve-out below + // only reserves the request from the envelope — the limit just caps burst. + if cpu := componentCPU(res.OTel); cpu != "" { + split.OTel.setCPU(cpu) + } else { + split.OTel.CPURequest = cfg.OTelCPURequest + split.OTel.CPULimit = cfg.OTelCPULimit + } + } + + // --- Gateway (memory) --- + var gatewayBytes int64 + if componentMemSet(res.Gateway) { + split.Gateway.setMemory(res.Gateway.Memory) + gatewayBytes = parseMemoryToBytes(res.Gateway.Memory) + } else if envelopeBytes > 0 { + gatewayBytes = gatewayMemoryReservationBytes(envelopeBytes, cfg) + split.Gateway.setMemory(bytesToQuantity(gatewayBytes)) + } + + // Gateway CPU: explicit override wins, else operator-level limit (request + // mirrors the limit so the container is Guaranteed on CPU when bounded). + if cpu := componentCPU(res.Gateway); cpu != "" { + split.Gateway.setCPU(cpu) + } else if cfg.GatewayCPULimit != "" { + split.Gateway.setCPU(cfg.GatewayCPULimit) + } + + // --- PostgreSQL (remainder) --- + if componentMemSet(res.Database) { + split.Postgres.setMemory(res.Database.Memory) + split.PostgresMemoryBytes = parseMemoryToBytes(res.Database.Memory) + } else if envelopeBytes > 0 { + dbBytes := envelopeBytes - gatewayBytes - otelBytes + if dbBytes < 0 { + dbBytes = 0 + } + split.PostgresMemoryBytes = dbBytes + if dbBytes > 0 { + split.Postgres.setMemory(bytesToQuantity(dbBytes)) + } + } + + // PostgreSQL CPU (sink): database override wins; otherwise the pod CPU + // envelope minus the gateway and OTel CPU reservations, symmetric with the + // memory carve-out so the resolved container CPUs sum to the envelope. + if cpu := componentCPU(res.Database); cpu != "" { + split.Postgres.setCPU(cpu) + } else if env := normalizeCPU(res.CPU); env != "" { + pgCPU := subtractCPU(env, split.Gateway.CPURequest, split.OTel.CPURequest) + if pgCPU != "" { + split.Postgres.setCPU(pgCPU) + } + } + + return split +} + +// gatewayMemoryReservationBytes returns the gateway's memory reservation derived +// from the pod memory envelope: min(fraction × envelope, cap). +func gatewayMemoryReservationBytes(envelopeBytes int64, cfg SplitConfig) int64 { + b := int64(float64(envelopeBytes) * cfg.GatewayMemoryFraction) + if cfg.GatewayMemoryCapBytes > 0 && b > cfg.GatewayMemoryCapBytes { + b = cfg.GatewayMemoryCapBytes + } + return b +} + +// subtractCPU returns (envelope − Σ reserved) as a milli-CPU quantity string. +// Empty reservations are ignored; a non-positive remainder yields "". +func subtractCPU(envelope string, reserved ...string) string { + env, err := resource.ParseQuantity(envelope) + if err != nil { + return "" + } + milli := env.MilliValue() + for _, r := range reserved { + if r == "" || r == "0" { + continue + } + q, err := resource.ParseQuantity(r) + if err != nil { + continue + } + milli -= q.MilliValue() + } + if milli <= 0 { + return "" + } + return resource.NewMilliQuantity(milli, resource.DecimalSI).String() +} + +// --- helpers --- + +// componentCPU returns the component's CPU override, or "" when unset/zero. +func componentCPU(c *dbpreview.ComponentResources) string { + if c == nil { + return "" + } + return normalizeCPU(c.CPU) +} + +// normalizeCPU returns cpu unless it is unset/zero, in which case "". +func normalizeCPU(cpu string) string { + if !isSet(cpu) { + return "" + } + return cpu +} + +func envOr(envKey, fallback string) string { + if v := os.Getenv(envKey); v != "" { + return v + } + return fallback +} + +// parseFloatOr parses value as a float, falling back to fallback when value is +// empty or unparseable. The result is clamped to [0, 1]: 0 disables the gateway +// carve-out, and values above 1 would otherwise reserve more than the envelope. +func parseFloatOr(value, fallback string) float64 { + s := value + if s == "" { + s = fallback + } + f, err := strconv.ParseFloat(s, 64) + if err != nil { + // Fall back to the documented default (a constant, so ignore its error). + f, _ = strconv.ParseFloat(fallback, 64) + } + if f < 0 { + return 0 + } + if f > 1 { + return 1 + } + return f +} + +// parseQuantityOr parses value as a Kubernetes resource quantity and returns its +// byte value. It falls back to fallback when value is empty or unparseable (so a +// typo'd fleet-wide knob does not silently disable the cap) and clamps negatives +// to 0. +func parseQuantityOr(value, fallback string) int64 { + s := value + if s == "" { + s = fallback + } + q, err := resource.ParseQuantity(s) + if err != nil { + if q, err = resource.ParseQuantity(fallback); err != nil { + return 0 + } + } + if v := q.Value(); v > 0 { + return v + } + return 0 +} + +// bytesToQuantity renders a byte count as a binary-SI Kubernetes quantity string +// (e.g. 6442450944 -> "6Gi"). Values that are not a clean Ki/Mi/Gi multiple fall +// back to the smallest exact binary unit the quantity package can express. +func bytesToQuantity(bytes int64) string { + return resource.NewQuantity(bytes, resource.BinarySI).String() +} diff --git a/operator/src/internal/cnpg/resource_split_test.go b/operator/src/internal/cnpg/resource_split_test.go new file mode 100644 index 000000000..0a25b828c --- /dev/null +++ b/operator/src/internal/cnpg/resource_split_test.go @@ -0,0 +1,252 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +package cnpg + +import ( + "testing" + + dbpreview "github.com/documentdb/documentdb-operator/api/preview" +) + +// prodSplitConfig mirrors the documented production defaults (18.75%, cap 32Gi, +// otel 48Mi/128Mi) without depending on environment variables. +func prodSplitConfig() SplitConfig { + return SplitConfig{ + GatewayMemoryFraction: 0.1875, + GatewayMemoryCapBytes: 32 * 1024 * 1024 * 1024, // 32Gi + OTelMemoryRequest: "48Mi", + OTelMemoryLimit: "128Mi", + OTelCPURequest: "50m", + OTelCPULimit: "200m", + } +} + +func ddbWithMemory(mem string, monitoring bool) *dbpreview.DocumentDB { + d := &dbpreview.DocumentDB{} + d.Spec.Resource.Memory = mem + if monitoring { + d.Spec.Monitoring = &dbpreview.MonitoringSpec{Enabled: true} + } + return d +} + +func TestComputeResourceSplit_ProductionRows(t *testing.T) { + // (SKU, envelope, expected gateway, expected db remainder) — monitoring OFF. + cases := []struct { + name string + envelope string + wantGW string + wantDB string + }{ + {"M20", "4Gi", "768Mi", "3328Mi"}, + {"M50", "32Gi", "6Gi", "26Gi"}, + {"M60", "64Gi", "12Gi", "52Gi"}, + {"M200-capped", "256Gi", "32Gi", "224Gi"}, + } + cfg := prodSplitConfig() + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + s := ComputeResourceSplit(ddbWithMemory(tc.envelope, false), cfg) + if s.Gateway.MemoryLimit != tc.wantGW { + t.Errorf("gateway memory = %q, want %q", s.Gateway.MemoryLimit, tc.wantGW) + } + if s.Gateway.MemoryRequest != s.Gateway.MemoryLimit { + t.Errorf("gateway request %q != limit %q (want Guaranteed)", s.Gateway.MemoryRequest, s.Gateway.MemoryLimit) + } + if s.Postgres.MemoryLimit != tc.wantDB { + t.Errorf("postgres memory = %q, want %q", s.Postgres.MemoryLimit, tc.wantDB) + } + if s.MonitoringEnabled { + t.Errorf("monitoring should be disabled") + } + if s.OTel.MemoryLimit != "" { + t.Errorf("otel should be empty when monitoring disabled, got %q", s.OTel.MemoryLimit) + } + }) + } +} + +func TestComputeResourceSplit_MonitoringCarvesOTel(t *testing.T) { + cfg := prodSplitConfig() + s := ComputeResourceSplit(ddbWithMemory("16Gi", true), cfg) + + if !s.MonitoringEnabled { + t.Fatalf("monitoring should be enabled") + } + // 16Gi gateway = 18.75% = 3Gi. + if s.Gateway.MemoryLimit != "3Gi" { + t.Errorf("gateway = %q, want 3Gi", s.Gateway.MemoryLimit) + } + if s.OTel.MemoryRequest != "48Mi" || s.OTel.MemoryLimit != "128Mi" { + t.Errorf("otel req/limit = %q/%q, want 48Mi/128Mi", s.OTel.MemoryRequest, s.OTel.MemoryLimit) + } + // db = 16Gi - 3Gi - 128Mi = 13Gi - 128Mi = 13312Mi - 128Mi = 13184Mi. + if s.Postgres.MemoryLimit != "13184Mi" { + t.Errorf("postgres = %q, want 13184Mi", s.Postgres.MemoryLimit) + } + wantBytes := int64(13184) * 1024 * 1024 + if s.PostgresMemoryBytes != wantBytes { + t.Errorf("PostgresMemoryBytes = %d, want %d", s.PostgresMemoryBytes, wantBytes) + } +} + +func TestComputeResourceSplit_ExplicitOverridesWin(t *testing.T) { + cfg := prodSplitConfig() + d := ddbWithMemory("16Gi", true) + d.Spec.Resource.Gateway = &dbpreview.ComponentResources{Memory: "2Gi", CPU: "1"} + d.Spec.Resource.Database = &dbpreview.ComponentResources{Memory: "10Gi"} + d.Spec.Resource.OTel = &dbpreview.ComponentResources{Memory: "256Mi", CPU: "150m"} + + s := ComputeResourceSplit(d, cfg) + + if s.Gateway.MemoryLimit != "2Gi" || s.Gateway.MemoryRequest != "2Gi" { + t.Errorf("gateway override not applied: %+v", s.Gateway) + } + if s.Gateway.CPULimit != "1" || s.Gateway.CPURequest != "1" { + t.Errorf("gateway cpu override not applied: %+v", s.Gateway) + } + if s.OTel.MemoryLimit != "256Mi" || s.OTel.MemoryRequest != "256Mi" { + t.Errorf("otel override not applied: %+v", s.OTel) + } + // An explicit otel.cpu override pins request == limit (Guaranteed), not the + // Burstable 50m/200m default. + if s.OTel.CPULimit != "150m" || s.OTel.CPURequest != "150m" { + t.Errorf("otel cpu override not applied: %+v", s.OTel) + } + if s.Postgres.MemoryLimit != "10Gi" { + t.Errorf("database override not applied: %q", s.Postgres.MemoryLimit) + } +} + +func TestComputeResourceSplit_UnsetMemoryNoCarveOut(t *testing.T) { + cfg := prodSplitConfig() + // No envelope memory set -> no automatic carve-out (legacy behavior). + s := ComputeResourceSplit(ddbWithMemory("", false), cfg) + if s.Gateway.MemoryLimit != "" || s.Postgres.MemoryLimit != "" { + t.Errorf("expected no memory set, got gw=%q pg=%q", s.Gateway.MemoryLimit, s.Postgres.MemoryLimit) + } + if s.PostgresMemoryBytes != 0 { + t.Errorf("expected 0 postgres bytes, got %d", s.PostgresMemoryBytes) + } +} + +func TestComputeResourceSplit_CPUFromEnvelope(t *testing.T) { + cfg := prodSplitConfig() + d := ddbWithMemory("8Gi", false) + d.Spec.Resource.CPU = "4" + s := ComputeResourceSplit(d, cfg) + if s.Postgres.CPULimit != "4" || s.Postgres.CPURequest != "4" { + t.Errorf("postgres cpu = %q/%q, want 4/4", s.Postgres.CPURequest, s.Postgres.CPULimit) + } +} + +func TestComputeResourceSplit_GatewayCPULimitDefault(t *testing.T) { + cfg := prodSplitConfig() + cfg.GatewayCPULimit = "2" + d := ddbWithMemory("8Gi", false) + s := ComputeResourceSplit(d, cfg) + if s.Gateway.CPULimit != "2" || s.Gateway.CPURequest != "2" { + t.Errorf("gateway cpu = %q/%q, want 2/2", s.Gateway.CPURequest, s.Gateway.CPULimit) + } +} + +func TestComputeResourceSplit_EnvelopeOmittedAllExplicit(t *testing.T) { + cfg := prodSplitConfig() + // No envelope; gateway + database fully specified for both dims. + d := ddbWithMemory("", false) + d.Spec.Resource.CPU = "" + d.Spec.Resource.Gateway = &dbpreview.ComponentResources{Memory: "512Mi", CPU: "500m"} + d.Spec.Resource.Database = &dbpreview.ComponentResources{Memory: "4Gi", CPU: "3"} + + s := ComputeResourceSplit(d, cfg) + if s.Gateway.MemoryLimit != "512Mi" || s.Gateway.CPULimit != "500m" { + t.Errorf("gateway = %+v, want 512Mi/500m", s.Gateway) + } + if s.Postgres.MemoryLimit != "4Gi" || s.Postgres.CPULimit != "3" { + t.Errorf("postgres = %+v, want 4Gi/3", s.Postgres) + } + if s.PostgresMemoryBytes != 4*1024*1024*1024 { + t.Errorf("postgres bytes = %d, want 4Gi", s.PostgresMemoryBytes) + } +} + +func TestComputeResourceSplit_CPUCarvedWithMonitoring(t *testing.T) { + cfg := prodSplitConfig() + d := ddbWithMemory("8Gi", true) + d.Spec.Resource.CPU = "4" + s := ComputeResourceSplit(d, cfg) + // otel cpu reservation defaults to 50m request / 200m limit (Burstable); + // only the request is carved from the envelope, so postgres = 4 - 50m = 3950m. + if s.OTel.CPURequest != "50m" { + t.Errorf("otel cpu request = %q, want 50m", s.OTel.CPURequest) + } + if s.OTel.CPULimit != "200m" { + t.Errorf("otel cpu limit = %q, want 200m", s.OTel.CPULimit) + } + if s.Postgres.CPULimit != "3950m" { + t.Errorf("postgres cpu = %q, want 3950m (4 - 50m otel)", s.Postgres.CPULimit) + } +} + +func TestValidateResources(t *testing.T) { + cfg := prodSplitConfig() + mk := func(memEnv, cpuEnv string, monitoring bool) *dbpreview.DocumentDB { + d := ddbWithMemory(memEnv, monitoring) + d.Spec.Resource.CPU = cpuEnv + return d + } + + // 1. Envelope set, valid -> no errors. + if errs := ValidateResources(mk("16Gi", "", false), cfg); len(errs) != 0 { + t.Errorf("envelope-set valid: unexpected errors %v", errs) + } + + // 2. Envelope omitted, gateway+database memory set -> ok. + d := mk("", "", false) + d.Spec.Resource.Gateway = &dbpreview.ComponentResources{Memory: "512Mi"} + d.Spec.Resource.Database = &dbpreview.ComponentResources{Memory: "4Gi"} + if errs := ValidateResources(d, cfg); len(errs) != 0 { + t.Errorf("omitted+fully-specified memory: unexpected errors %v", errs) + } + + // 3. Envelope omitted, only gateway memory set -> error. + d = mk("", "", false) + d.Spec.Resource.Gateway = &dbpreview.ComponentResources{Memory: "512Mi"} + if errs := ValidateResources(d, cfg); len(errs) == 0 { + t.Errorf("omitted+partial memory: expected an error") + } + + // 4. Nothing set -> no error (unmanaged). + if errs := ValidateResources(mk("", "", false), cfg); len(errs) != 0 { + t.Errorf("unmanaged: unexpected errors %v", errs) + } + + // 5. Envelope set but explicit database memory exceeds it -> error. + d = mk("4Gi", "", false) + d.Spec.Resource.Database = &dbpreview.ComponentResources{Memory: "8Gi"} + if errs := ValidateResources(d, cfg); len(errs) == 0 { + t.Errorf("oversubscribed memory: expected an error") + } + + // 6. Tiny envelope: gateway(18.75% of 100Mi) + otel(128Mi) reservations + // exceed the 100Mi envelope, leaving nothing for postgres -> error. + if errs := ValidateResources(mk("100Mi", "", true), cfg); len(errs) == 0 { + t.Errorf("reservations exceed envelope: expected an error") + } + + // 7. CPU omitted + only database.cpu set (gateway.cpu unset) -> error (symmetric rule). + d = mk("", "", false) + d.Spec.Resource.Database = &dbpreview.ComponentResources{CPU: "2"} + if errs := ValidateResources(d, cfg); len(errs) == 0 { + t.Errorf("omitted+partial cpu: expected an error") + } + + // 8. CPU omitted + gateway.cpu + database.cpu set -> ok. + d = mk("", "", false) + d.Spec.Resource.Gateway = &dbpreview.ComponentResources{CPU: "500m"} + d.Spec.Resource.Database = &dbpreview.ComponentResources{CPU: "2"} + if errs := ValidateResources(d, cfg); len(errs) != 0 { + t.Errorf("omitted+fully-specified cpu: unexpected errors %v", errs) + } +} diff --git a/operator/src/internal/cnpg/resource_validation.go b/operator/src/internal/cnpg/resource_validation.go new file mode 100644 index 000000000..55b169ab6 --- /dev/null +++ b/operator/src/internal/cnpg/resource_validation.go @@ -0,0 +1,188 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +package cnpg + +import ( + "fmt" + + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/util/validation/field" + + dbpreview "github.com/documentdb/documentdb-operator/api/preview" +) + +// ValidateResources checks that spec.resource is internally consistent under the +// envelope-optional model. It returns a field.ErrorList suitable for the +// validating webhook. +// +// For each dimension (memory, cpu) the rule is: +// - If the pod envelope (spec.resource.) is set, the gateway + OTel +// reservations must leave room for PostgreSQL, and any explicit per-container +// values must not sum beyond the envelope. +// - If the envelope is omitted but at least one container sets the dimension, +// both the gateway and the database must set it explicitly (the gateway +// memory default is a fraction of the envelope and PostgreSQL is the +// remainder, so neither can be derived without the envelope). OTel may be +// omitted; it falls back to an envelope-independent default. +// - If neither the envelope nor any container sets the dimension, it is left +// unmanaged (no error). +func ValidateResources(documentdb *dbpreview.DocumentDB, cfg SplitConfig) field.ErrorList { + res := documentdb.Spec.Resource + monitoring := documentdb.Spec.Monitoring != nil && documentdb.Spec.Monitoring.Enabled + base := field.NewPath("spec", "resource") + + // Memory reservations, with defaults applied when a container omits the + // value (gateway always defaults to its fraction of the envelope). + memEnv := parseMemoryToBytes(res.Memory) + memGw := componentMemBytes(res.Gateway) + if memGw == 0 { + memGw = gatewayMemoryReservationBytes(memEnv, cfg) + } + var memOTel int64 + if monitoring { + if memOTel = componentMemBytes(res.OTel); memOTel == 0 { + memOTel = parseMemoryToBytes(cfg.OTelMemoryLimit) + } + } + + // CPU reservations. Unlike memory, the gateway only reserves CPU when an + // operator-level limit is configured. + cpuEnv := cpuMilli(res.CPU) + cpuGw := componentCPUMilli(res.Gateway) + if cpuGw == 0 && cfg.GatewayCPULimit != "" { + cpuGw = cpuMilli(cfg.GatewayCPULimit) + } + var cpuOTel int64 + if monitoring { + if cpuOTel = componentCPUMilli(res.OTel); cpuOTel == 0 { + cpuOTel = cpuMilli(cfg.OTelCPURequest) + } + } + + errs := validateDimension(base, dimension{ + noun: "memory", + envSet: isSet(res.Memory), + gwSet: componentMemSet(res.Gateway), + dbSet: componentMemSet(res.Database), + otelSet: monitoring && componentMemSet(res.OTel), + envValue: res.Memory, + dbValue: componentMemoryValue(res.Database), + envQty: memEnv, + gwReserved: memGw, + otelReserved: memOTel, + dbQty: componentMemBytes(res.Database), + format: bytesToQuantity, + }) + errs = append(errs, validateDimension(base, dimension{ + noun: "cpu", + envSet: isSet(res.CPU), + gwSet: componentCPUSet(res.Gateway), + dbSet: componentCPUSet(res.Database), + otelSet: monitoring && componentCPUSet(res.OTel), + envValue: res.CPU, + dbValue: componentCPUValue(res.Database), + envQty: cpuEnv, + gwReserved: cpuGw, + otelReserved: cpuOTel, + dbQty: componentCPUMilli(res.Database), + format: milliCPUString, + })...) + return errs +} + +// dimension is a fully resolved view of one resource dimension (memory or cpu) +// used by validateDimension. Quantities are in the dimension's native unit +// (bytes for memory, milli-CPU for cpu); format renders that unit for messages. +type dimension struct { + noun string // "memory" or "cpu", used in paths and messages + envSet, gwSet, dbSet, otelSet bool + envValue, dbValue string // raw quantity strings for error display + envQty, gwReserved, otelReserved int64 + dbQty int64 + format func(int64) string +} + +// validateDimension enforces the envelope-optional rules for a single dimension. +// Memory and CPU share this logic; only the unit and noun differ. +func validateDimension(base *field.Path, d dimension) field.ErrorList { + var errs field.ErrorList + if !d.envSet { + // Envelope omitted: it can only be derived when both the gateway and the + // database pin the dimension; any other partial configuration is invalid. + if (d.gwSet || d.dbSet || d.otelSet) && !(d.gwSet && d.dbSet) { + errs = append(errs, field.Required(base.Child(d.noun), + fmt.Sprintf("pod %s envelope is required unless %s is set on both spec.resource.gateway and spec.resource.database", d.noun, d.noun))) + } + return errs + } + reserved := d.gwReserved + d.otelReserved + switch { + case reserved >= d.envQty: + errs = append(errs, field.Invalid(base.Child(d.noun), d.envValue, + fmt.Sprintf("gateway and OTel %s reservations (%s) leave no %s for PostgreSQL within the pod %s envelope (%s)", + d.noun, d.format(reserved), d.noun, d.noun, d.envValue))) + case d.dbSet: + if total := reserved + d.dbQty; total > d.envQty { + errs = append(errs, field.Invalid(base.Child("database", d.noun), d.dbValue, + fmt.Sprintf("sum of gateway + OTel + database %s (%s) exceeds the pod %s envelope (%s)", + d.noun, d.format(total), d.noun, d.envValue))) + } + } + return errs +} + +// --- small accessors --- + +func isSet(s string) bool { return s != "" && s != "0" } + +func componentMemSet(c *dbpreview.ComponentResources) bool { + return c != nil && isSet(c.Memory) +} + +func componentCPUSet(c *dbpreview.ComponentResources) bool { + return c != nil && isSet(c.CPU) +} + +func componentMemBytes(c *dbpreview.ComponentResources) int64 { + if c == nil { + return 0 + } + return parseMemoryToBytes(c.Memory) +} + +func componentMemoryValue(c *dbpreview.ComponentResources) string { + if c == nil { + return "" + } + return c.Memory +} + +func componentCPUValue(c *dbpreview.ComponentResources) string { + if c == nil { + return "" + } + return c.CPU +} + +// milliCPUString renders a milli-CPU count as a Kubernetes quantity (e.g. "500m"). +func milliCPUString(m int64) string { return fmt.Sprintf("%dm", m) } + +func componentCPUMilli(c *dbpreview.ComponentResources) int64 { + if c == nil { + return 0 + } + return cpuMilli(c.CPU) +} + +// cpuMilli parses a CPU quantity string to milli-CPU; returns 0 on empty/invalid. +func cpuMilli(s string) int64 { + if !isSet(s) { + return 0 + } + q, err := resource.ParseQuantity(s) + if err != nil { + return 0 + } + return q.MilliValue() +} diff --git a/operator/src/internal/otel/base_config.yaml b/operator/src/internal/otel/base_config.yaml index d4fe2d529..0b82af385 100644 --- a/operator/src/internal/otel/base_config.yaml +++ b/operator/src/internal/otel/base_config.yaml @@ -27,6 +27,10 @@ receivers: endpoint: 127.0.0.1:4317 processors: + memory_limiter: + check_interval: 1s + limit_percentage: 80 + spike_limit_percentage: 25 batch: send_batch_size: 1024 timeout: "5s" diff --git a/operator/src/internal/otel/config.go b/operator/src/internal/otel/config.go index 59c1584d7..c76bebc81 100644 --- a/operator/src/internal/otel/config.go +++ b/operator/src/internal/otel/config.go @@ -54,7 +54,7 @@ func ConfigMapName(clusterName string) string { // GenerateConfigMapData returns the ConfigMap data entries for the OTel Collector. // The config is split into two files that the collector merges at startup: -// - static.yaml: receivers and batch processor from the embedded base_config.yaml. +// - static.yaml: receivers and static processors from the embedded base_config.yaml. // Add new SQL metric queries to base_config.yaml — no Go code changes needed. // - dynamic.yaml: per-cluster resource attributes, exporters, and pipeline wiring. // @@ -139,7 +139,8 @@ func generateDynamicConfig(clusterName, namespace string, spec *dbpreview.Monito } } - // Wire pipeline: receivers + batch from static.yaml, resource from this dynamic config. + // Wire pipeline: receivers + memory_limiter/batch from static.yaml, + // resource from this dynamic config. // Disable the collector's internal telemetry to avoid port conflicts with the // Prometheus exporter (both default to 8888). if len(exporterNames) > 0 { @@ -152,7 +153,7 @@ func generateDynamicConfig(clusterName, namespace string, spec *dbpreview.Monito Pipelines: map[string]pipelineConfig{ "metrics": { Receivers: receiverNames, - Processors: []string{"resource", "batch"}, + Processors: []string{"memory_limiter", "resource", "batch"}, Exporters: exporterNames, }, }, diff --git a/operator/src/internal/otel/config_test.go b/operator/src/internal/otel/config_test.go index e44502650..24546ae33 100644 --- a/operator/src/internal/otel/config_test.go +++ b/operator/src/internal/otel/config_test.go @@ -38,11 +38,23 @@ var _ = Describe("base_config.yaml embed", func() { Expect(yaml.Unmarshal(baseConfigYAML, &cfg)).To(Succeed()) Expect(cfg.Receivers).To(HaveKey("sqlquery")) Expect(cfg.Receivers).To(HaveKey("otlp")) + Expect(cfg.Processors).To(HaveKey("memory_limiter")) Expect(cfg.Processors).To(HaveKey("batch")) // Static config should NOT have exporters or service (those are dynamic) Expect(cfg.Exporters).To(BeEmpty()) }) + It("declares a cgroup-aware memory_limiter processor", func() { + var cfg collectorConfig + Expect(yaml.Unmarshal(baseConfigYAML, &cfg)).To(Succeed()) + + memLimiter, ok := cfg.Processors["memory_limiter"].(map[string]any) + Expect(ok).To(BeTrue(), "memory_limiter processor must be a map") + Expect(memLimiter["check_interval"]).To(Equal("1s")) + Expect(memLimiter["limit_percentage"]).To(Equal(80)) + Expect(memLimiter["spike_limit_percentage"]).To(Equal(25)) + }) + It("declares an OTLP gRPC receiver on port 4317", func() { var cfg collectorConfig Expect(yaml.Unmarshal(baseConfigYAML, &cfg)).To(Succeed()) @@ -71,6 +83,7 @@ var _ = Describe("GenerateConfigMapData", func() { // static.yaml should contain the embedded base config staticCfg := parseCfg(data["static.yaml"]) Expect(staticCfg.Receivers).To(HaveKey("sqlquery")) + Expect(staticCfg.Processors).To(HaveKey("memory_limiter")) Expect(staticCfg.Processors).To(HaveKey("batch")) }) @@ -99,7 +112,7 @@ var _ = Describe("GenerateConfigMapData", func() { // Pipeline wiring references receivers from static config Expect(dynCfg.Service.Pipelines["metrics"].Receivers).To(ConsistOf("sqlquery", "otlp")) - Expect(dynCfg.Service.Pipelines["metrics"].Processors).To(ConsistOf("resource", "batch")) + Expect(dynCfg.Service.Pipelines["metrics"].Processors).To(Equal([]string{"memory_limiter", "resource", "batch"})) Expect(dynCfg.Service.Pipelines["metrics"].Exporters).To(ConsistOf("prometheus")) }) diff --git a/operator/src/internal/utils/constants.go b/operator/src/internal/utils/constants.go index e47d66385..a28481433 100644 --- a/operator/src/internal/utils/constants.go +++ b/operator/src/internal/utils/constants.go @@ -32,6 +32,58 @@ const ( DEFAULT_DOCUMENTDB_CREDENTIALS_SECRET = "documentdb-credentials" DEFAULT_OTEL_COLLECTOR_IMAGE = "otel/opentelemetry-collector-contrib:0.149.0" + // --- Sidecar resource isolation (memory carve-out) --- + // spec.resource.memory is the TOTAL pod envelope. The operator carves the + // gateway (and, when monitoring is enabled, the OTel collector) memory out of + // it and gives PostgreSQL the remainder. These operator-level defaults are + // overridable via Helm values wired to the env vars below. + + // GATEWAY_MEMORY_FRACTION_ENV overrides the fraction of the pod memory + // envelope reserved for the gateway sidecar (default DEFAULT_GATEWAY_MEMORY_FRACTION). + GATEWAY_MEMORY_FRACTION_ENV = "DOCUMENTDB_GATEWAY_MEMORY_FRACTION" + // GATEWAY_MEMORY_CAP_ENV overrides the absolute cap on gateway memory + // (default DEFAULT_GATEWAY_MEMORY_CAP). + GATEWAY_MEMORY_CAP_ENV = "DOCUMENTDB_GATEWAY_MEMORY_CAP" + // GATEWAY_CPU_LIMIT_ENV optionally pins a CPU limit on the gateway sidecar to + // bound its async-runtime worker threads. Empty (default) leaves CPU unbounded. + GATEWAY_CPU_LIMIT_ENV = "DOCUMENTDB_GATEWAY_CPU_LIMIT" + // OTEL_MEMORY_REQUEST_ENV / OTEL_MEMORY_LIMIT_ENV override the OTel collector + // sidecar memory request/limit (defaults DEFAULT_OTEL_MEMORY_REQUEST / _LIMIT). + OTEL_MEMORY_REQUEST_ENV = "DOCUMENTDB_OTEL_MEMORY_REQUEST" + OTEL_MEMORY_LIMIT_ENV = "DOCUMENTDB_OTEL_MEMORY_LIMIT" + // OTEL_CPU_REQUEST_ENV optionally sets the OTel collector CPU request. + OTEL_CPU_REQUEST_ENV = "DOCUMENTDB_OTEL_CPU_REQUEST" + // OTEL_CPU_LIMIT_ENV optionally bounds the OTel collector CPU (a ceiling on + // burst; CPU is compressible so this throttles rather than OOM-kills). + OTEL_CPU_LIMIT_ENV = "DOCUMENTDB_OTEL_CPU_LIMIT" + + // DEFAULT_GATEWAY_MEMORY_FRACTION reserves 18.75% (3/16) of the pod memory + // envelope for the gateway, matching the production sizing model. + DEFAULT_GATEWAY_MEMORY_FRACTION = "0.1875" + // DEFAULT_GATEWAY_MEMORY_CAP caps gateway memory at 32Gi (production model). + DEFAULT_GATEWAY_MEMORY_CAP = "32Gi" + // DEFAULT_OTEL_MEMORY_REQUEST / _LIMIT size the (tiny) metrics-only collector. + DEFAULT_OTEL_MEMORY_REQUEST = "48Mi" + DEFAULT_OTEL_MEMORY_LIMIT = "128Mi" + // DEFAULT_OTEL_CPU_REQUEST is the collector CPU request. + DEFAULT_OTEL_CPU_REQUEST = "50m" + // DEFAULT_OTEL_CPU_LIMIT bounds the collector's CPU burst (Burstable: the + // 50m request above is the reserved floor, this is the hard ceiling). + DEFAULT_OTEL_CPU_LIMIT = "200m" + + // --- Sidecar-injector plugin parameter names for component resources --- + // The operator passes the resolved per-container requests/limits to the + // sidecar-injector plugin via these CNPG plugin parameters; the plugin sets + // them on the gateway and otel-collector container Resources. + PLUGIN_PARAM_GATEWAY_MEMORY_REQUEST = "gatewayMemoryRequest" + PLUGIN_PARAM_GATEWAY_MEMORY_LIMIT = "gatewayMemoryLimit" + PLUGIN_PARAM_GATEWAY_CPU_REQUEST = "gatewayCpuRequest" + PLUGIN_PARAM_GATEWAY_CPU_LIMIT = "gatewayCpuLimit" + PLUGIN_PARAM_OTEL_MEMORY_REQUEST = "otelMemoryRequest" + PLUGIN_PARAM_OTEL_MEMORY_LIMIT = "otelMemoryLimit" + PLUGIN_PARAM_OTEL_CPU_REQUEST = "otelCpuRequest" + PLUGIN_PARAM_OTEL_CPU_LIMIT = "otelCpuLimit" + // TODO: remove these constants once change stream support is included in the official images. CHANGESTREAM_DOCUMENTDB_IMAGE_REPOSITORY = "ghcr.io/wentingwu666666/documentdb-kubernetes-operator" CHANGESTREAM_DOCUMENTDB_IMAGE = CHANGESTREAM_DOCUMENTDB_IMAGE_REPOSITORY + "/documentdb-oss:16-changestream" diff --git a/operator/src/internal/webhook/documentdb_webhook.go b/operator/src/internal/webhook/documentdb_webhook.go index 7f13809cf..49e7cbfe6 100644 --- a/operator/src/internal/webhook/documentdb_webhook.go +++ b/operator/src/internal/webhook/documentdb_webhook.go @@ -22,6 +22,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/webhook/admission" dbpreview "github.com/documentdb/documentdb-operator/api/preview" + "github.com/documentdb/documentdb-operator/internal/cnpg" util "github.com/documentdb/documentdb-operator/internal/utils" ) @@ -103,6 +104,7 @@ func (v *DocumentDBValidator) validate(db *dbpreview.DocumentDB) (allErrs field. type validationFunc func(*dbpreview.DocumentDB) field.ErrorList validations := []validationFunc{ v.validateSchemaVersionNotExceedsBinary, + v.validateResources, // Add new spec-level validations here. } for _, fn := range validations { @@ -111,6 +113,14 @@ func (v *DocumentDBValidator) validate(db *dbpreview.DocumentDB) (allErrs field. return allErrs } +// validateResources ensures spec.resource is consistent under the +// envelope-optional model: the pod memory/cpu envelope may be omitted only when +// the gateway and database both specify that dimension, and an explicit envelope +// must leave room for PostgreSQL after the sidecar reservations. +func (v *DocumentDBValidator) validateResources(db *dbpreview.DocumentDB) field.ErrorList { + return cnpg.ValidateResources(db, cnpg.DefaultSplitConfig()) +} + // validateSchemaVersionNotExceedsBinary ensures spec.schemaVersion <= binary version. func (v *DocumentDBValidator) validateSchemaVersionNotExceedsBinary(db *dbpreview.DocumentDB) field.ErrorList { if db.Spec.SchemaVersion == "" || db.Spec.SchemaVersion == "auto" { diff --git a/operator/src/internal/webhook/documentdb_webhook_test.go b/operator/src/internal/webhook/documentdb_webhook_test.go index a869aa54f..1bf3ea2ad 100644 --- a/operator/src/internal/webhook/documentdb_webhook_test.go +++ b/operator/src/internal/webhook/documentdb_webhook_test.go @@ -480,3 +480,49 @@ var _ = Describe("validateStorageResize", func() { Expect(errs[0].Detail).To(ContainSubstring("pvcSize must be a valid resource quantity")) }) }) + +var _ = Describe("resource envelope validation", func() { + var v *DocumentDBValidator + + BeforeEach(func() { v = &DocumentDBValidator{} }) + + newDB := func() *dbpreview.DocumentDB { + return newTestDocumentDB("", "", "") + } + + It("allows an explicit pod memory envelope", func() { + db := newDB() + db.Spec.Resource.Memory = "8Gi" + Expect(v.validateResources(db)).To(BeEmpty()) + }) + + It("allows omitting the envelope when gateway and database memory are both set", func() { + db := newDB() + db.Spec.Resource.Gateway = &dbpreview.ComponentResources{Memory: "512Mi"} + db.Spec.Resource.Database = &dbpreview.ComponentResources{Memory: "4Gi"} + Expect(v.validateResources(db)).To(BeEmpty()) + }) + + It("rejects a partially specified memory split with no envelope", func() { + db := newDB() + db.Spec.Resource.Gateway = &dbpreview.ComponentResources{Memory: "512Mi"} + Expect(v.validateResources(db)).ToNot(BeEmpty()) + }) + + It("allows leaving a dimension entirely unmanaged", func() { + Expect(v.validateResources(newDB())).To(BeEmpty()) + }) + + It("rejects explicit database memory exceeding the envelope", func() { + db := newDB() + db.Spec.Resource.Memory = "4Gi" + db.Spec.Resource.Database = &dbpreview.ComponentResources{Memory: "8Gi"} + Expect(v.validateResources(db)).ToNot(BeEmpty()) + }) + + It("rejects a partially specified cpu split with no envelope", func() { + db := newDB() + db.Spec.Resource.Database = &dbpreview.ComponentResources{CPU: "2"} + Expect(v.validateResources(db)).ToNot(BeEmpty()) + }) +}) diff --git a/test/e2e/labels.go b/test/e2e/labels.go index bbf21a25f..d89461947 100644 --- a/test/e2e/labels.go +++ b/test/e2e/labels.go @@ -13,17 +13,18 @@ import "github.com/onsi/ginkgo/v2" // Keep these in sync with the design document. const ( // Area labels — one per test area (tests//). - LifecycleLabel = "lifecycle" - ScaleLabel = "scale" - DataLabel = "data" - PerformanceLabel = "performance" - BackupLabel = "backup" - RecoveryLabel = "recovery" - TLSLabel = "tls" - FeatureLabel = "feature-gates" - ExposureLabel = "exposure" - StatusLabel = "status" - UpgradeLabel = "upgrade" + LifecycleLabel = "lifecycle" + ScaleLabel = "scale" + DataLabel = "data" + PerformanceLabel = "performance" + BackupLabel = "backup" + RecoveryLabel = "recovery" + TLSLabel = "tls" + FeatureLabel = "feature-gates" + ExposureLabel = "exposure" + StatusLabel = "status" + UpgradeLabel = "upgrade" + ResourcesLabel = "resources" ClusterReplicationLabel = "cluster-replication" // Cross-cutting selectors. diff --git a/test/e2e/manifests/mixins/sidecar_resources.yaml.template b/test/e2e/manifests/mixins/sidecar_resources.yaml.template new file mode 100644 index 000000000..acdfbec3a --- /dev/null +++ b/test/e2e/manifests/mixins/sidecar_resources.yaml.template @@ -0,0 +1,17 @@ +apiVersion: documentdb.io/preview +kind: DocumentDB +metadata: + name: ${NAME} + namespace: ${NAMESPACE} +spec: + resource: + memory: ${POD_MEMORY} + monitoring: + enabled: ${MONITORING_ENABLED} + # A Prometheus exporter is configured so the OTel collector has a valid + # pipeline when monitoring is enabled. When monitoring is disabled the + # collector sidecar is not injected and this block is ignored. + # Port 9464 avoids CNPG's built-in metrics port (9187) and the gateway (10260). + exporter: + prometheus: + port: 9464 diff --git a/test/e2e/manifests/mixins/sidecar_resources_explicit.yaml.template b/test/e2e/manifests/mixins/sidecar_resources_explicit.yaml.template new file mode 100644 index 000000000..ede642477 --- /dev/null +++ b/test/e2e/manifests/mixins/sidecar_resources_explicit.yaml.template @@ -0,0 +1,13 @@ +apiVersion: documentdb.io/preview +kind: DocumentDB +metadata: + name: ${NAME} + namespace: ${NAMESPACE} +spec: + # No top-level resource.memory/cpu — the envelope is derived from the + # per-container values below (gateway + database fully specified). + resource: + gateway: + memory: ${GW_MEMORY} + database: + memory: ${DB_MEMORY} diff --git a/test/e2e/tests/resources/helpers_test.go b/test/e2e/tests/resources/helpers_test.go new file mode 100644 index 000000000..c299e4505 --- /dev/null +++ b/test/e2e/tests/resources/helpers_test.go @@ -0,0 +1,193 @@ +package resources + +import ( + "context" + "os" + "time" + + . "github.com/onsi/gomega" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + + previewv1 "github.com/documentdb/documentdb-operator/api/preview" + "github.com/documentdb/documentdb-operator/test/e2e" + documentdbutil "github.com/documentdb/documentdb-operator/test/e2e/pkg/e2eutils/documentdb" + "github.com/documentdb/documentdb-operator/test/e2e/pkg/e2eutils/fixtures" + "github.com/documentdb/documentdb-operator/test/e2e/pkg/e2eutils/namespaces" + "github.com/documentdb/documentdb-operator/test/e2e/pkg/e2eutils/timeouts" + shareddb "github.com/documentdb/documentdb-operator/test/shared/documentdb" +) + +const credSecretName = fixtures.DefaultCredentialSecretName + +const ( + defaultDocDBImage = "" + defaultGatewayImage = "" + + gatewayContainerName = "documentdb-gateway" + postgresContainerName = "postgres" + otelContainerName = "otel-collector" +) + +// baseVars builds the envsubst map the base/documentdb.yaml.template plus the +// sidecar_resources mixin expect. +func baseVars(ns, name, podMemory, monitoring string) map[string]string { + docdbImg := defaultDocDBImage + if v := os.Getenv("DOCUMENTDB_IMAGE"); v != "" { + docdbImg = v + } + gwImg := defaultGatewayImage + if v := os.Getenv("GATEWAY_IMAGE"); v != "" { + gwImg = v + } + sSize := "1Gi" + if v := os.Getenv("E2E_STORAGE_SIZE"); v != "" { + sSize = v + } + sClass := "standard" + if v := os.Getenv("E2E_STORAGE_CLASS"); v != "" { + sClass = v + } + return map[string]string{ + "NAMESPACE": ns, + "NAME": name, + "INSTANCES": "1", + "STORAGE_SIZE": sSize, + "STORAGE_CLASS": sClass, + "DOCUMENTDB_IMAGE": docdbImg, + "GATEWAY_IMAGE": gwImg, + "CREDENTIAL_SECRET": credSecretName, + "EXPOSURE_TYPE": "ClusterIP", + "LOG_LEVEL": "info", + "POD_MEMORY": podMemory, + "MONITORING_ENABLED": monitoring, + } +} + +func manifestsRoot() string { + // tests/resources/ → ../../manifests + return "../../manifests" +} + +// setupFreshCluster creates a namespace, credential secret, and a DocumentDB CR +// (base + sidecar_resources mixin) with the given pod memory envelope and +// monitoring flag, then waits for it to become healthy. Returns the live CR and +// a cleanup func that deletes the namespace. +func setupFreshCluster( + ctx context.Context, + c client.Client, + name, podMemory string, + monitoringEnabled bool, +) (*previewv1.DocumentDB, func()) { + ns := namespaces.NamespaceForSpec(e2e.ResourcesLabel) + Expect(fixtures.CreateLabeledNamespace(ctx, c, ns, e2e.ResourcesLabel)).To(Succeed()) + Expect(fixtures.CreateLabeledCredentialSecret(ctx, c, ns)).To(Succeed()) + + monitoring := "false" + if monitoringEnabled { + monitoring = "true" + } + vars := baseVars(ns, name, podMemory, monitoring) + + return createAndWait(ctx, c, ns, name, []string{"sidecar_resources"}, vars) +} + +// setupExplicitCluster creates a cluster with NO top-level resource.memory +// envelope, sizing the gateway and database memory explicitly so the operator +// derives the effective envelope as their sum. +func setupExplicitCluster( + ctx context.Context, + c client.Client, + name, gatewayMemory, databaseMemory string, +) (*previewv1.DocumentDB, func()) { + ns := namespaces.NamespaceForSpec(e2e.ResourcesLabel) + Expect(fixtures.CreateLabeledNamespace(ctx, c, ns, e2e.ResourcesLabel)).To(Succeed()) + Expect(fixtures.CreateLabeledCredentialSecret(ctx, c, ns)).To(Succeed()) + + vars := baseVars(ns, name, "", "false") + vars["GW_MEMORY"] = gatewayMemory + vars["DB_MEMORY"] = databaseMemory + + return createAndWait(ctx, c, ns, name, []string{"sidecar_resources_explicit"}, vars) +} + +// createAndWait renders base+mixins, creates the CR, waits for health, and +// returns the live object plus a namespace-cleanup func. +func createAndWait( + ctx context.Context, + c client.Client, + ns, name string, + mixins []string, + vars map[string]string, +) (*previewv1.DocumentDB, func()) { + _, err := documentdbutil.Create(ctx, c, ns, name, documentdbutil.CreateOptions{ + Base: "documentdb", + Mixins: mixins, + Vars: vars, + ManifestsRoot: manifestsRoot(), + }) + Expect(err).ToNot(HaveOccurred(), "create DocumentDB") + + Eventually(func() error { + return shareddb.WaitHealthy(ctx, c, + types.NamespacedName{Namespace: ns, Name: name}, + timeouts.For(timeouts.DocumentDBReady)) + }, timeouts.For(timeouts.DocumentDBReady)+30*time.Second, 10*time.Second). + Should(Succeed(), "DocumentDB %s/%s did not become healthy", ns, name) + + live, err := shareddb.Get(ctx, c, client.ObjectKey{Namespace: ns, Name: name}) + Expect(err).ToNot(HaveOccurred()) + + cleanup := func() { + delCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + _ = c.Delete(delCtx, &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ns}}) + } + return live, cleanup +} + +// getInstancePod returns the first CNPG-owned pod for the cluster. +func getInstancePod(ctx context.Context, c client.Client, ns, name string) (*corev1.Pod, error) { + pods := &corev1.PodList{} + if err := c.List(ctx, pods, + client.InNamespace(ns), + client.MatchingLabels{"cnpg.io/cluster": name}, + ); err != nil { + return nil, err + } + if len(pods.Items) == 0 { + return nil, &noPodErr{ns: ns, name: name} + } + return &pods.Items[0], nil +} + +func containerByName(pod *corev1.Pod, name string) *corev1.Container { + for i := range pod.Spec.Containers { + if pod.Spec.Containers[i].Name == name { + return &pod.Spec.Containers[i] + } + } + return nil +} + +// quantityEqual compares a resource.Quantity against an expected quantity string. +func quantityEqual(got *resource.Quantity, want string) bool { + if got == nil { + return false + } + wantQ, err := resource.ParseQuantity(want) + if err != nil { + return false + } + return got.Cmp(wantQ) == 0 +} + +type noPodErr struct{ ns, name string } + +func (e *noPodErr) Error() string { + return "no CNPG pod labelled cnpg.io/cluster=" + e.name + " in " + e.ns +} diff --git a/test/e2e/tests/resources/resources_suite_test.go b/test/e2e/tests/resources/resources_suite_test.go new file mode 100644 index 000000000..5a06779d9 --- /dev/null +++ b/test/e2e/tests/resources/resources_suite_test.go @@ -0,0 +1,53 @@ +// Package resources hosts the DocumentDB E2E resources area, validating the +// pod memory carve-out between the PostgreSQL, gateway, and OTel collector +// containers (sidecar resource isolation). This file is the Ginkgo root for +// the area binary and shares bootstrap with the other area binaries via the +// exported helpers in package e2e. +package resources + +import ( + "context" + "fmt" + "testing" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/documentdb/documentdb-operator/test/e2e" +) + +const operatorReadyTimeout = 2 * time.Minute + +func TestResources(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "DocumentDB E2E - Resources", Label(e2e.ResourcesLabel)) +} + +var _ = SynchronizedBeforeSuite( + func(ctx SpecContext) []byte { + if err := e2e.SetupSuite(ctx, operatorReadyTimeout); err != nil { + Fail(fmt.Sprintf("resources bootstrap: %v", err)) + } + return []byte{} + }, + func(_ SpecContext, _ []byte) { + if err := e2e.SetupSuite(context.Background(), operatorReadyTimeout); err != nil { + Fail(fmt.Sprintf("resources worker bootstrap: %v", err)) + } + }, +) + +var _ = SynchronizedAfterSuite( + func(_ SpecContext) {}, + func(ctx SpecContext) { + if err := e2e.TeardownSuite(ctx); err != nil { + fmt.Fprintf(GinkgoWriter, "resources teardown: %v\n", err) + } + }, +) + +var _ = BeforeEach(func() { + Expect(e2e.CheckOperatorUnchanged()).To(Succeed(), + "operator health check failed — a previous spec or reconciler likely restarted the operator") +}) diff --git a/test/e2e/tests/resources/sidecar_resources_test.go b/test/e2e/tests/resources/sidecar_resources_test.go new file mode 100644 index 000000000..ec65e612f --- /dev/null +++ b/test/e2e/tests/resources/sidecar_resources_test.go @@ -0,0 +1,151 @@ +package resources + +import ( + "context" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + corev1 "k8s.io/api/core/v1" + + "github.com/documentdb/documentdb-operator/test/e2e" +) + +// These specs validate the pod memory carve-out (sidecar resource isolation). +// With spec.resource.memory treated as the total pod envelope, the operator +// reserves memory for the gateway (default 18.75%, here 192Mi of a 1Gi envelope) +// and — when monitoring is enabled — the OTel collector (default memory limit +// 128Mi, CPU 50m request / 200m limit), and gives PostgreSQL the remainder. Each sidecar is Guaranteed (request==limit) +// so a leak is OOM-isolated to its own container. +// +// For a 1Gi (1024Mi) envelope: +// +// gateway = 18.75% × 1024Mi = 192Mi (request == limit) +// otel = 48Mi request / 128Mi limit; cpu 50m req / 200m limit (monitoring on) +// postgres = 1024 − 192 = 832Mi (monitoring off) +// postgres = 1024 − 192 − 128 = 704Mi (monitoring on) +const ( + podEnvelope = "1Gi" + wantGatewayMem = "192Mi" + wantPostgresNoMon = "832Mi" + wantPostgresWithMon = "704Mi" + wantOTelMemRequest = "48Mi" + wantOTelMemLimit = "128Mi" + wantOTelCPURequest = "50m" + wantOTelCPULimit = "200m" +) + +var _ = Describe("Sidecar memory carve-out", + Label(e2e.ResourcesLabel), e2e.MediumLevelLabel, + func() { + BeforeEach(func() { e2e.SkipUnlessLevel(e2e.Medium) }) + + It("reserves gateway memory and gives PostgreSQL the remainder (monitoring off)", + func() { + env := e2e.SuiteEnv() + Expect(env).ToNot(BeNil()) + c := env.Client + + ctx, cancel := context.WithTimeout(context.Background(), 12*time.Minute) + DeferCleanup(cancel) + + cr, cleanup := setupFreshCluster(ctx, c, "carveout-nomon", podEnvelope, false) + DeferCleanup(cleanup) + + pod, err := getInstancePod(ctx, c, cr.Namespace, cr.Name) + Expect(err).ToNot(HaveOccurred()) + + gw := containerByName(pod, gatewayContainerName) + Expect(gw).ToNot(BeNil(), "gateway container present") + assertGuaranteedMemory(gw, wantGatewayMem) + + pg := containerByName(pod, postgresContainerName) + Expect(pg).ToNot(BeNil(), "postgres container present") + assertGuaranteedMemory(pg, wantPostgresNoMon) + + Expect(containerByName(pod, otelContainerName)).To(BeNil(), + "otel collector should be absent when monitoring is disabled") + }) + + It("additionally reserves OTel collector memory when monitoring is enabled", + func() { + env := e2e.SuiteEnv() + Expect(env).ToNot(BeNil()) + c := env.Client + + ctx, cancel := context.WithTimeout(context.Background(), 12*time.Minute) + DeferCleanup(cancel) + + cr, cleanup := setupFreshCluster(ctx, c, "carveout-mon", podEnvelope, true) + DeferCleanup(cleanup) + + pod, err := getInstancePod(ctx, c, cr.Namespace, cr.Name) + Expect(err).ToNot(HaveOccurred()) + + gw := containerByName(pod, gatewayContainerName) + Expect(gw).ToNot(BeNil(), "gateway container present") + assertGuaranteedMemory(gw, wantGatewayMem) + + otel := containerByName(pod, otelContainerName) + Expect(otel).ToNot(BeNil(), "otel collector container present") + Expect(quantityEqual(otel.Resources.Requests.Memory(), wantOTelMemRequest)).To(BeTrue(), + "otel memory request = %s, want %s", otel.Resources.Requests.Memory(), wantOTelMemRequest) + Expect(quantityEqual(otel.Resources.Limits.Memory(), wantOTelMemLimit)).To(BeTrue(), + "otel memory limit = %s, want %s", otel.Resources.Limits.Memory(), wantOTelMemLimit) + Expect(quantityEqual(otel.Resources.Requests.Cpu(), wantOTelCPURequest)).To(BeTrue(), + "otel cpu request = %s, want %s", otel.Resources.Requests.Cpu(), wantOTelCPURequest) + Expect(quantityEqual(otel.Resources.Limits.Cpu(), wantOTelCPULimit)).To(BeTrue(), + "otel cpu limit = %s, want %s", otel.Resources.Limits.Cpu(), wantOTelCPULimit) + Expect(hasEnv(otel, "GOMEMLIMIT")).To(BeTrue(), + "otel collector should have a GOMEMLIMIT env var") + + pg := containerByName(pod, postgresContainerName) + Expect(pg).ToNot(BeNil(), "postgres container present") + assertGuaranteedMemory(pg, wantPostgresWithMon) + }) + + It("derives the envelope from per-container memory when the envelope is omitted", + func() { + env := e2e.SuiteEnv() + Expect(env).ToNot(BeNil()) + c := env.Client + + ctx, cancel := context.WithTimeout(context.Background(), 12*time.Minute) + DeferCleanup(cancel) + + // No spec.resource.memory; gateway + database memory set explicitly. + cr, cleanup := setupExplicitCluster(ctx, c, "carveout-explicit", "256Mi", "1Gi") + DeferCleanup(cleanup) + + pod, err := getInstancePod(ctx, c, cr.Namespace, cr.Name) + Expect(err).ToNot(HaveOccurred()) + + gw := containerByName(pod, gatewayContainerName) + Expect(gw).ToNot(BeNil(), "gateway container present") + assertGuaranteedMemory(gw, "256Mi") + + pg := containerByName(pod, postgresContainerName) + Expect(pg).ToNot(BeNil(), "postgres container present") + assertGuaranteedMemory(pg, "1Gi") + }) + }) + +// assertGuaranteedMemory asserts the container's memory request and limit both +// equal want (Guaranteed-class for memory). +func assertGuaranteedMemory(ctr *corev1.Container, want string) { + GinkgoHelper() + Expect(quantityEqual(ctr.Resources.Limits.Memory(), want)).To(BeTrue(), + "%s memory limit = %s, want %s", ctr.Name, ctr.Resources.Limits.Memory(), want) + Expect(quantityEqual(ctr.Resources.Requests.Memory(), want)).To(BeTrue(), + "%s memory request = %s, want %s", ctr.Name, ctr.Resources.Requests.Memory(), want) +} + +func hasEnv(ctr *corev1.Container, name string) bool { + for _, e := range ctr.Env { + if e.Name == name { + return true + } + } + return false +}