diff --git a/.gitignore b/.gitignore index ab67db31..99f6e128 100644 --- a/.gitignore +++ b/.gitignore @@ -28,3 +28,6 @@ Dockerfile.cross charts *.tgz +*.tar.gz + +dist \ No newline at end of file diff --git a/Justfile b/Justfile index 4f6519a9..857034df 100644 --- a/Justfile +++ b/Justfile @@ -99,5 +99,8 @@ generate-docs: --templates-dir=./crd-doc-templates \ --config=./docs.config.yaml +install-kubectl-stacks: + cd tools/kubectl-stacks && go build -o {{env('GOPATH', `go env GOPATH`)}}/bin/kubectl-stacks . + deploy: helm-update earthly +deploy diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index fcd0e7ca..b7b295bc 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -42,6 +42,18 @@ rules: - patch - update - watch +- apiGroups: + - apps + resources: + - statefulsets + verbs: + - create + - delete + - get + - list + - patch + - update + - watch - apiGroups: - batch resources: diff --git a/docs/04-Modules/03-Ledger.md b/docs/04-Modules/03-Ledger.md index 9bbaea5b..9accfcab 100644 --- a/docs/04-Modules/03-Ledger.md +++ b/docs/04-Modules/03-Ledger.md @@ -100,3 +100,148 @@ Available fields: - `push-retry-period`: Retry period for failed pushes - `sync-period`: Synchronization period - `logs-page-size`: Number of logs per page + +## Ledger v3 Mirror + +Ledger v3 can run alongside an existing v2 deployment as a **mirror**: it continuously replicates v2 ledger data into its own Raft-based storage. This allows gradual migration or read-offloading without disrupting v2. + +When the `modules.ledger.v3-mirror` setting is present, the operator: +1. Deploys v2 normally (database, migrations, Deployment) +2. Deploys a v3 Raft StatefulSet in parallel +3. Runs a provisioning Job that creates mirror ledgers in v3, each sourcing data from the v2 PostgreSQL database + +### Enabling v3 Mirror + +Create a Settings resource with the key `modules.ledger.v3-mirror`. The value format is: + +``` +:,,... +``` + +- **v3-image-tag**: The container image tag of the ledger v3 binary (e.g. `v3.0.0-alpha.1`) +- **ledger names**: Comma-separated list of v2 ledger names to mirror + +```yaml +apiVersion: formance.com/v1beta1 +kind: Settings +metadata: + name: ledger-v3-mirror +spec: + stacks: ["my-stack"] + key: modules.ledger.v3-mirror + value: "v3.0.0-alpha.1:default,payments" +``` + +This example deploys a v3 cluster using image tag `v3.0.0-alpha.1` and creates two mirror ledgers (`default` and `payments`) that replicate from the v2 PostgreSQL database. + +### How It Works + +The provisioning Job connects to the v3 cluster's gRPC endpoint and calls `ledgerctl ledgers create` for each listed ledger with: +- `--mode mirror` — marks the ledger as a mirror (read-only, no direct writes) +- `--mirror-source-type postgres` — uses direct PostgreSQL access for replication +- `--mirror-dsn` — the PostgreSQL DSN of the v2 database (derived automatically from the Database resource) + +The Job is idempotent: if a mirror ledger already exists, the error is ignored. It retries on failure (e.g. if the v3 cluster is not yet ready). + +### Architecture + +The operator creates the following resources for v3: + +| Resource | Purpose | +|----------|---------| +| `StatefulSet/ledger` | Raft cluster nodes with `OrderedReady` pod management | +| `Service/ledger-raft` (headless) | DNS-based peer discovery for Raft consensus | +| `Job/v3-mirror-provision` | Creates mirror ledgers in the v3 cluster | +| 3 PVCs per pod | `wal`, `data`, `cold-cache` | + +### Requirements + +Ledger v3 does **not** require its own PostgreSQL or message broker. Storage is fully embedded (Pebble LSM). However, the v3 pods need network access to the v2 PostgreSQL database for mirror replication. + +### Cluster Settings + +```yaml +apiVersion: formance.com/v1beta1 +kind: Settings +metadata: + name: ledger-v3-replicas +spec: + stacks: ["*"] + key: module.ledger.v3.replicas + value: "3" +``` + +- `module.ledger.v3.replicas`: Number of Raft nodes. **Must be odd** for quorum (default: 3). + +The Raft cluster ID is automatically set to the stack name. + +### Persistence Settings + +Each pod gets three PVCs. Size and storage class are configurable: + +```yaml +apiVersion: formance.com/v1beta1 +kind: Settings +metadata: + name: ledger-v3-persistence +spec: + stacks: ["*"] + key: module.ledger.v3.persistence.wal.size + value: "5Gi" +--- +apiVersion: formance.com/v1beta1 +kind: Settings +metadata: + name: ledger-v3-data-size +spec: + stacks: ["*"] + key: module.ledger.v3.persistence.data.size + value: "10Gi" +--- +apiVersion: formance.com/v1beta1 +kind: Settings +metadata: + name: ledger-v3-cold-cache-size +spec: + stacks: ["*"] + key: module.ledger.v3.persistence.cold-cache.size + value: "10Gi" +``` + +| Key | Default | Description | +|-----|---------|-------------| +| `module.ledger.v3.persistence.wal.size` | 5Gi | WAL PVC size | +| `module.ledger.v3.persistence.wal.storage-class` | (cluster default) | WAL storage class | +| `module.ledger.v3.persistence.data.size` | 10Gi | Pebble data PVC size | +| `module.ledger.v3.persistence.data.storage-class` | (cluster default) | Data storage class | +| `module.ledger.v3.persistence.cold-cache.size` | 10Gi | Cold cache PVC size | +| `module.ledger.v3.persistence.cold-cache.storage-class` | (cluster default) | Cold cache storage class | + +### Pebble Tunables + +All Pebble settings are optional. When unset, the ledger binary defaults apply. + +| Key | Example | Description | +|-----|---------|-------------| +| `module.ledger.v3.pebble.cache-size` | 1073741824 | Block cache size in bytes | +| `module.ledger.v3.pebble.memtable-size` | 268435456 | Memtable size in bytes | +| `module.ledger.v3.pebble.memtable-stop-writes-threshold` | 2 | Memtable count before stopping writes | +| `module.ledger.v3.pebble.l0-compaction-threshold` | 4 | L0 files to trigger compaction | +| `module.ledger.v3.pebble.l0-stop-writes-threshold` | 12 | L0 files before stopping writes | +| `module.ledger.v3.pebble.lbase-max-bytes` | 67108864 | L1 max size in bytes | +| `module.ledger.v3.pebble.target-file-size` | 67108864 | SST file target size | +| `module.ledger.v3.pebble.max-concurrent-compactions` | 2 | Compaction parallelism | + +### Raft Tunables + +All Raft settings are optional. When unset, the ledger binary defaults apply. + +| Key | Example | Description | +|-----|---------|-------------| +| `module.ledger.v3.raft.snapshot-threshold` | 5000 | Log entries before snapshot | +| `module.ledger.v3.raft.election-tick` | 10 | Election timeout in ticks | +| `module.ledger.v3.raft.heartbeat-tick` | 1 | Heartbeat interval in ticks | +| `module.ledger.v3.raft.tick-interval` | 100ms | Duration of one tick | +| `module.ledger.v3.raft.max-size-per-msg` | 1048576 | Max message size in bytes | +| `module.ledger.v3.raft.max-inflight-msgs` | 256 | Max in-flight messages | +| `module.ledger.v3.raft.compaction-margin` | 1000 | Log retention after snapshot | diff --git a/docs/09-Configuration reference/01-Settings.md b/docs/09-Configuration reference/01-Settings.md index 6d56972a..be2e14fe 100644 --- a/docs/09-Configuration reference/01-Settings.md +++ b/docs/09-Configuration reference/01-Settings.md @@ -32,6 +32,30 @@ While we have some basic types (string, number, bool ...), we also have some com | ledger.worker.async-block-hasher | Map | max-block-size=1000, schedule="0 * * * * *" | Configure async block hasher for the Ledger worker (v2.3+). Fields: `max-block-size`, `schedule` | | ledger.worker.bucket-cleanup | Map | retention-period=720h, schedule="0 0 * * *" | Configure bucket cleanup for the Ledger worker (v2.4+). Fields: `retention-period`, `schedule` | | ledger.worker.pipelines | Map | pull-interval=5s, push-retry-period=10s, sync-period=1m, logs-page-size=100 | Configure pipelines for the Ledger worker (v2.3+). Fields: `pull-interval`, `push-retry-period`, `sync-period`, `logs-page-size` | +| module.ledger.v3-mirror | String | :ledger1,ledger2 | Configure mirror mode for the ledger | +| module.ledger.v3.replicas | Int | 3 | Raft cluster node count (v3+). Must be odd for quorum. Default: 3 | +| module.ledger.v3.cluster-id | String | default | Raft cluster ID (v3+). Default: "default" | +| module.ledger.v3.persistence.wal.size | String | 5Gi | PVC size for the Raft write-ahead log (v3+). Default: 5Gi | +| module.ledger.v3.persistence.wal.storage-class | String | | Storage class for the WAL PVC (v3+). Empty = cluster default | +| module.ledger.v3.persistence.data.size | String | 10Gi | PVC size for the Pebble data directory (v3+). Default: 10Gi | +| module.ledger.v3.persistence.data.storage-class | String | | Storage class for the data PVC (v3+). Empty = cluster default | +| module.ledger.v3.persistence.cold-cache.size | String | 10Gi | PVC size for the cold storage cache (v3+). Default: 10Gi | +| module.ledger.v3.persistence.cold-cache.storage-class | String | | Storage class for the cold cache PVC (v3+). Empty = cluster default | +| module.ledger.v3.pebble.cache-size | String | 1073741824 | Pebble block cache size in bytes (v3+) | +| module.ledger.v3.pebble.memtable-size | String | 268435456 | Pebble memtable size in bytes (v3+) | +| module.ledger.v3.pebble.memtable-stop-writes-threshold | String | 2 | Pebble memtable count before stopping writes (v3+) | +| module.ledger.v3.pebble.l0-compaction-threshold | String | 4 | L0 file count to trigger compaction (v3+) | +| module.ledger.v3.pebble.l0-stop-writes-threshold | String | 12 | L0 file count before stopping writes (v3+) | +| module.ledger.v3.pebble.lbase-max-bytes | String | 67108864 | L1 max size in bytes (v3+) | +| module.ledger.v3.pebble.target-file-size | String | 67108864 | SST file target size in bytes (v3+) | +| module.ledger.v3.pebble.max-concurrent-compactions | String | 2 | Compaction parallelism (v3+) | +| module.ledger.v3.raft.snapshot-threshold | String | 5000 | Log entries before taking a snapshot (v3+) | +| module.ledger.v3.raft.election-tick | String | 10 | Election timeout in ticks (v3+) | +| module.ledger.v3.raft.heartbeat-tick | String | 1 | Heartbeat interval in ticks (v3+) | +| module.ledger.v3.raft.tick-interval | String | 100ms | Duration of one Raft tick (v3+) | +| module.ledger.v3.raft.max-size-per-msg | String | 1048576 | Max Raft message size in bytes (v3+) | +| module.ledger.v3.raft.max-inflight-msgs | String | 256 | Max in-flight Raft messages (v3+) | +| module.ledger.v3.raft.compaction-margin | String | 1000 | Raft log retention after snapshot (v3+) | | payments.encryption-key | string | | Payments data encryption key | | payments.worker.temporal-max-concurrent-workflow-task-pollers | Int | | Payments worker max concurrent workflow task pollers configuration | | payments.worker.temporal-max-concurrent-activity-task-pollers | Int | | Payments worker max concurrent activity task pollers configuration | diff --git a/helm/crds/.helmignore b/helm/crds/.helmignore index 9e8e34ff..eb562196 100644 --- a/helm/crds/.helmignore +++ b/helm/crds/.helmignore @@ -21,6 +21,7 @@ .idea/ *.tmproj .vscode/ +./*.tgz # ignore kustomization.yaml files templates/rbac/kustomization.yaml diff --git a/helm/operator/.helmignore b/helm/operator/.helmignore index 9e8e34ff..eb562196 100644 --- a/helm/operator/.helmignore +++ b/helm/operator/.helmignore @@ -21,6 +21,7 @@ .idea/ *.tmproj .vscode/ +./*.tgz # ignore kustomization.yaml files templates/rbac/kustomization.yaml diff --git a/helm/operator/templates/gen/rbac.authorization.k8s.io_v1_clusterrole_formance-manager-role.yaml b/helm/operator/templates/gen/rbac.authorization.k8s.io_v1_clusterrole_formance-manager-role.yaml index 8bb7f718..df20410f 100644 --- a/helm/operator/templates/gen/rbac.authorization.k8s.io_v1_clusterrole_formance-manager-role.yaml +++ b/helm/operator/templates/gen/rbac.authorization.k8s.io_v1_clusterrole_formance-manager-role.yaml @@ -41,6 +41,18 @@ rules: - patch - update - watch +- apiGroups: + - apps + resources: + - statefulsets + verbs: + - create + - delete + - get + - list + - patch + - update + - watch - apiGroups: - batch resources: diff --git a/internal/resources/ledgers/init.go b/internal/resources/ledgers/init.go index 53c6e661..7f57f99c 100644 --- a/internal/resources/ledgers/init.go +++ b/internal/resources/ledgers/init.go @@ -18,6 +18,8 @@ package ledgers import ( _ "embed" + "fmt" + "strings" "github.com/pkg/errors" "golang.org/x/mod/semver" @@ -32,12 +34,14 @@ import ( "github.com/formancehq/operator/v3/internal/resources/gatewayhttpapis" "github.com/formancehq/operator/v3/internal/resources/jobs" "github.com/formancehq/operator/v3/internal/resources/registries" + "github.com/formancehq/operator/v3/internal/resources/settings" ) //+kubebuilder:rbac:groups=formance.com,resources=ledgers,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=formance.com,resources=ledgers/status,verbs=get;update;patch //+kubebuilder:rbac:groups=formance.com,resources=ledgers/finalizers,verbs=update //+kubebuilder:rbac:groups=batch,resources=cronjobs,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete func Reconcile(ctx Context, stack *v1beta1.Stack, ledger *v1beta1.Ledger, version string) error { database, err := databases.Create(ctx, stack, ledger) @@ -104,13 +108,59 @@ func Reconcile(ctx Context, stack *v1beta1.Stack, ledger *v1beta1.Ledger, versio } } - return installLedger(ctx, stack, ledger, database, imageConfiguration, version) + if err := installLedger(ctx, stack, ledger, database, imageConfiguration, version); err != nil { + return err + } + + // If v3 mirror is configured, deploy the v3 StatefulSet and provision mirror ledgers. + // The setting value is "image:ledger1,ledger2,..." (e.g. "v3.0.0-alpha.1:default,payments"). + v3MirrorSetting, err := settings.GetString(ctx, stack.Name, "modules", "ledger", "v3-mirror") + if err != nil { + return err + } + if v3MirrorSetting != nil { + v3Image, mirrorLedgers, err := parseV3MirrorSetting(*v3MirrorSetting) + if err != nil { + return err + } + if err := reconcileV3(ctx, stack, ledger, database, v3Image, mirrorLedgers); err != nil { + return err + } + } + + return nil +} + +// parseV3MirrorSetting parses the setting value "image:ledger1,ledger2,...". +// Returns the image tag and the list of ledger names to mirror. +func parseV3MirrorSetting(value string) (string, []string, error) { + parts := strings.SplitN(value, ":", 2) + if len(parts) != 2 || parts[0] == "" || parts[1] == "" { + return "", nil, fmt.Errorf("invalid v3-mirror setting %q: expected format \"tag:ledger1,ledger2,...\"", value) + } + + image := parts[0] + + var ledgers []string + for _, l := range strings.Split(parts[1], ",") { + l = strings.TrimSpace(l) + if l != "" { + ledgers = append(ledgers, l) + } + } + + if len(ledgers) == 0 { + return "", nil, fmt.Errorf("invalid v3-mirror setting %q: no ledger names specified", value) + } + + return image, ledgers, nil } func init() { Init( WithModuleReconciler(Reconcile, WithOwn[*v1beta1.Ledger](&appsv1.Deployment{}), + WithOwn[*v1beta1.Ledger](&appsv1.StatefulSet{}), WithOwn[*v1beta1.Ledger](&batchv1.Job{}), WithOwn[*v1beta1.Ledger](&corev1.Service{}), WithOwn[*v1beta1.Ledger](&v1beta1.GatewayHTTPAPI{}), diff --git a/internal/resources/ledgers/v3.go b/internal/resources/ledgers/v3.go new file mode 100644 index 00000000..21815dee --- /dev/null +++ b/internal/resources/ledgers/v3.go @@ -0,0 +1,476 @@ +package ledgers + +import ( + "fmt" + "strings" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + + "github.com/formancehq/operator/v3/api/formance.com/v1beta1" + "github.com/formancehq/operator/v3/internal/core" + "github.com/formancehq/operator/v3/internal/resources/databases" + "github.com/formancehq/operator/v3/internal/resources/jobs" + "github.com/formancehq/operator/v3/internal/resources/registries" + "github.com/formancehq/operator/v3/internal/resources/settings" +) + +const ( + v3PortHTTP = int32(9000) + v3PortGRPC = int32(8888) + v3PortRaft = int32(7777) +) + +func reconcileV3(ctx core.Context, stack *v1beta1.Stack, ledger *v1beta1.Ledger, database *v1beta1.Database, version string, mirrorLedgers []string) error { + imageConfiguration, err := registries.GetImageConfiguration(ctx, stack.Name, fmt.Sprintf("ghcr.io/formancehq/ledger-v3:%s", version)) + if err != nil { + return err + } + + if err := createV3HeadlessService(ctx, stack, ledger); err != nil { + return err + } + + if err := installV3StatefulSet(ctx, stack, ledger, imageConfiguration); err != nil { + return err + } + + // Build postgres env vars from the v2 database for the mirror source. + postgresEnvVars, err := buildV2PostgresEnvVars(ctx, stack, database) + if err != nil { + return err + } + + if err := createV3MirrorProvisioningJob(ctx, stack, ledger, imageConfiguration, postgresEnvVars, mirrorLedgers); err != nil { + return err + } + + return nil +} + +func createV3HeadlessService(ctx core.Context, stack *v1beta1.Stack, ledger *v1beta1.Ledger) error { + headlessSvcName := "ledger-raft" + + _, _, err := core.CreateOrUpdate[*corev1.Service](ctx, types.NamespacedName{ + Name: headlessSvcName, + Namespace: stack.Name, + }, + func(t *corev1.Service) error { + t.Spec = corev1.ServiceSpec{ + ClusterIP: "None", + PublishNotReadyAddresses: true, + Ports: []corev1.ServicePort{ + { + Name: "raft", + Port: v3PortRaft, + Protocol: "TCP", + TargetPort: intstr.FromString("raft"), + }, + { + Name: "grpc", + Port: v3PortGRPC, + Protocol: "TCP", + TargetPort: intstr.FromString("grpc"), + }, + }, + Selector: map[string]string{ + "app.kubernetes.io/name": "ledger-v3", + }, + } + return nil + }, + core.WithController[*corev1.Service](ctx.GetScheme(), ledger), + ) + return err +} + +func installV3StatefulSet(ctx core.Context, stack *v1beta1.Stack, ledger *v1beta1.Ledger, image *registries.ImageConfiguration) error { + stackName := stack.Name + + replicas, err := settings.GetInt32OrDefault(ctx, stackName, 3, "module", "ledger", "v3", "replicas") + if err != nil { + return err + } + if replicas%2 == 0 { + return fmt.Errorf("module.ledger.v3.replicas must be odd, got %d", replicas) + } + + volumeClaims, err := buildV3VolumeClaimTemplates(ctx, stackName) + if err != nil { + return err + } + + podTemplate, err := buildV3PodTemplate(ctx, stack, ledger, image) + if err != nil { + return err + } + + headlessSvcName := "ledger-raft" + stsName := "ledger" + + _, _, err = core.CreateOrUpdate[*appsv1.StatefulSet](ctx, types.NamespacedName{ + Name: stsName, + Namespace: stackName, + }, + func(t *appsv1.StatefulSet) error { + // VolumeClaimTemplates are immutable after creation. + // Only set them when the StatefulSet is new (no UID yet). + if t.UID == "" { + t.Spec.VolumeClaimTemplates = volumeClaims + } + + t.Spec.Replicas = &replicas + t.Spec.ServiceName = headlessSvcName + t.Spec.PodManagementPolicy = appsv1.OrderedReadyPodManagement + t.Spec.Selector = &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app.kubernetes.io/name": "ledger-v3", + }, + } + t.Spec.Template = *podTemplate + return nil + }, + core.WithController[*appsv1.StatefulSet](ctx.GetScheme(), ledger), + ) + return err +} + +func buildV3PodTemplate(ctx core.Context, stack *v1beta1.Stack, ledger *v1beta1.Ledger, image *registries.ImageConfiguration) (*corev1.PodTemplateSpec, error) { + stackName := stack.Name + + otlpEnv, err := settings.GetOTELEnvVars(ctx, stackName, core.LowerCamelCaseKind(ctx, ledger), " ") + if err != nil { + return nil, err + } + + clusterID := stackName + + dataDir := "/data/app" + walDir := "/data/raft" + + env := []corev1.EnvVar{ + core.Env("BIND_ADDR", fmt.Sprintf("0.0.0.0:%d", v3PortRaft)), + core.Env("CLUSTER_ID", clusterID), + core.Env("GRPC_PORT", fmt.Sprint(v3PortGRPC)), + core.Env("HTTP_PORT", fmt.Sprint(v3PortHTTP)), + core.Env("WAL_DIR", walDir), + core.Env("DATA_DIR", dataDir), + { + Name: "POD_NAME", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.name"}, + }, + }, + { + Name: "POD_NAMESPACE", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.namespace"}, + }, + }, + } + env = append(env, otlpEnv...) + env = append(env, core.GetDevEnvVars(stack, ledger)...) + + // Add pebble settings + pebbleEnv, err := buildV3PebbleEnvVars(ctx, stackName) + if err != nil { + return nil, err + } + env = append(env, pebbleEnv...) + + // Add raft settings + raftEnv, err := buildV3RaftEnvVars(ctx, stackName) + if err != nil { + return nil, err + } + env = append(env, raftEnv...) + + headlessSvcName := "ledger-raft" + command := buildV3Command(headlessSvcName, dataDir) + + container := corev1.Container{ + Name: "ledger", + Image: image.GetFullImageName(), + Command: []string{"/bin/sh", "-c"}, + Args: []string{command}, + Env: env, + Ports: []corev1.ContainerPort{ + {Name: "http", ContainerPort: v3PortHTTP}, + {Name: "grpc", ContainerPort: v3PortGRPC}, + {Name: "raft", ContainerPort: v3PortRaft}, + }, + VolumeMounts: []corev1.VolumeMount{ + {Name: "wal", MountPath: walDir}, + {Name: "data", MountPath: dataDir}, + {Name: "cold-cache", MountPath: "/data/cold-cache"}, + }, + LivenessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/livez", + Port: intstr.FromString("http"), + }, + }, + FailureThreshold: 20, + }, + StartupProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/livez", + Port: intstr.FromString("http"), + }, + }, + FailureThreshold: 30, + PeriodSeconds: 10, + }, + ReadinessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/readyz", + Port: intstr.FromString("http"), + }, + }, + }, + Lifecycle: &corev1.Lifecycle{ + PreStop: &corev1.LifecycleHandler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", buildV3PreStopScript(walDir)}, + }, + }, + }, + } + + return &corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app.kubernetes.io/name": "ledger-v3", + }, + }, + Spec: corev1.PodSpec{ + ImagePullSecrets: image.PullSecrets, + Containers: []corev1.Container{container}, + }, + }, nil +} + +func buildV3Command(headlessSvc, dataDir string) string { + // Shell script that computes node-id from the StatefulSet ordinal index, + // builds the advertise-addr from the pod's DNS name within the headless service, + // and decides whether to --bootstrap or --join depending on ordinal. + // + // POD_NAME is like "ledger-0", POD_INDEX is extracted from the suffix. + // pod-0 bootstraps (if no existing state), other pods join pod-0. + lines := []string{ + // Extract the ordinal index from the pod name (e.g. "ledger-2" → "2") + `POD_INDEX=${POD_NAME##*-}`, + // Raft node IDs must be >= 1 + `NODE_ID=$((POD_INDEX + 1))`, + // FQDN within the headless service + fmt.Sprintf(`ADVERTISE_ADDR="${POD_NAME}.%s.${POD_NAMESPACE}.svc.cluster.local:%d"`, headlessSvc, v3PortRaft), + // First pod (ordinal 0) bootstraps if no checkpoint exists yet, otherwise normal start. + // Other pods join pod-0. + fmt.Sprintf(`BOOTSTRAP_ADDR="ledger-0.%s.${POD_NAMESPACE}.svc.cluster.local:%d"`, headlessSvc, v3PortGRPC), + `CLUSTER_FLAG=""`, + fmt.Sprintf(`if [ "$POD_INDEX" = "0" ]; then + if [ ! -d "%s/pebble" ]; then + CLUSTER_FLAG="--bootstrap" + fi +else + CLUSTER_FLAG="--join $BOOTSTRAP_ADDR" +fi`, dataDir), + // Exec into the ledger binary + `exec ./ledger run \`, + ` --node-id "$NODE_ID" \`, + ` --advertise-addr "$ADVERTISE_ADDR" \`, + ` $CLUSTER_FLAG`, + } + + return strings.Join(lines, "\n") +} + +// buildV3PreStopScript returns a shell script executed by the Kubernetes preStop +// lifecycle hook before a pod is terminated. It deregisters the local node from +// the Raft cluster and cleans the WAL directory so that a future re-join (after +// scale-up) starts as a fresh learner. +func buildV3PreStopScript(walDir string) string { + lines := []string{ + // Best-effort deregister: call the admin endpoint to remove this node + // from the Raft cluster. Ignore errors (e.g., last node, or already removed). + fmt.Sprintf(`wget --post-data='' -q -O- http://localhost:%d/_admin/deregister || true`, v3PortHTTP), + // Clean WAL so that if this pod restarts (scale-up), it joins as a fresh learner. + fmt.Sprintf(`rm -rf %s/* || true`, walDir), + } + + return strings.Join(lines, "\n") +} + +func buildV3VolumeClaimTemplates(ctx core.Context, stackName string) ([]corev1.PersistentVolumeClaim, error) { + type volumeSpec struct { + name string + sizeKey string + defaultSize string + storageClassKey string + } + + specs := []volumeSpec{ + {"wal", "module.ledger.v3.persistence.wal.size", "5Gi", "module.ledger.v3.persistence.wal.storage-class"}, + {"data", "module.ledger.v3.persistence.data.size", "10Gi", "module.ledger.v3.persistence.data.storage-class"}, + {"cold-cache", "module.ledger.v3.persistence.cold-cache.size", "10Gi", "module.ledger.v3.persistence.cold-cache.storage-class"}, + } + + var claims []corev1.PersistentVolumeClaim + for _, s := range specs { + sizeStr, err := settings.GetStringOrDefault(ctx, stackName, s.defaultSize, strings.Split(s.sizeKey, ".")...) + if err != nil { + return nil, err + } + + storageClass, err := settings.GetStringOrEmpty(ctx, stackName, strings.Split(s.storageClassKey, ".")...) + if err != nil { + return nil, err + } + + pvc := corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: s.name, + }, + Spec: corev1.PersistentVolumeClaimSpec{ + AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce}, + Resources: corev1.VolumeResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceStorage: resource.MustParse(sizeStr), + }, + }, + }, + } + if storageClass != "" { + pvc.Spec.StorageClassName = &storageClass + } + claims = append(claims, pvc) + } + + return claims, nil +} + +// pebble setting key → env var name +var v3PebbleSettings = []struct { + key string + envVar string +}{ + {"cache-size", "PEBBLE_CACHE_SIZE"}, + {"memtable-size", "PEBBLE_MEMTABLE_SIZE"}, + {"memtable-stop-writes-threshold", "PEBBLE_MEMTABLE_STOP_WRITES_THRESHOLD"}, + {"l0-compaction-threshold", "PEBBLE_L0_COMPACTION_THRESHOLD"}, + {"l0-stop-writes-threshold", "PEBBLE_L0_STOP_WRITES_THRESHOLD"}, + {"lbase-max-bytes", "PEBBLE_LBASE_MAX_BYTES"}, + {"target-file-size", "PEBBLE_TARGET_FILE_SIZE"}, + {"max-concurrent-compactions", "PEBBLE_MAX_CONCURRENT_COMPACTIONS"}, +} + +func buildV3PebbleEnvVars(ctx core.Context, stackName string) ([]corev1.EnvVar, error) { + var envVars []corev1.EnvVar + for _, s := range v3PebbleSettings { + val, err := settings.GetStringOrEmpty(ctx, stackName, "module", "ledger", "v3", "pebble", s.key) + if err != nil { + return nil, err + } + if val != "" { + envVars = append(envVars, core.Env(s.envVar, val)) + } + } + return envVars, nil +} + +var v3RaftSettings = []struct { + key string + envVar string +}{ + {"snapshot-threshold", "RAFT_SNAPSHOT_THRESHOLD"}, + {"election-tick", "RAFT_ELECTION_TICK"}, + {"heartbeat-tick", "RAFT_HEARTBEAT_TICK"}, + {"tick-interval", "RAFT_TICK_INTERVAL"}, + {"max-size-per-msg", "RAFT_MAX_SIZE_PER_MSG"}, + {"max-inflight-msgs", "RAFT_MAX_INFLIGHT_MSGS"}, + {"compaction-margin", "RAFT_COMPACTION_MARGIN"}, +} + +func buildV3RaftEnvVars(ctx core.Context, stackName string) ([]corev1.EnvVar, error) { + var envVars []corev1.EnvVar + for _, s := range v3RaftSettings { + val, err := settings.GetStringOrEmpty(ctx, stackName, "module", "ledger", "v3", "raft", s.key) + if err != nil { + return nil, err + } + if val != "" { + envVars = append(envVars, core.Env(s.envVar, val)) + } + } + return envVars, nil +} + +// buildV2PostgresEnvVars builds env vars that resolve to the v2 postgres DSN at runtime, +// using the same secret-based credential resolution as GetPostgresEnvVars. +func buildV2PostgresEnvVars(ctx core.Context, stack *v1beta1.Stack, database *v1beta1.Database) ([]corev1.EnvVar, error) { + pgEnvVars, err := databases.GetPostgresEnvVars(ctx, stack, database) + if err != nil { + return nil, err + } + + // POSTGRES_URI is already computed by GetPostgresEnvVars. + // Alias it as MIRROR_POSTGRES_DSN for the provisioning job. + pgEnvVars = append(pgEnvVars, core.Env("MIRROR_POSTGRES_DSN", core.EnvVarPlaceholder("POSTGRES_URI"))) + + return pgEnvVars, nil +} + +// createV3MirrorProvisioningJob creates a Job that provisions mirror ledgers in the v3 cluster. +// It uses ledgerctl to create each mirror ledger with the postgres source pointing at the v2 database. +func createV3MirrorProvisioningJob( + ctx core.Context, + stack *v1beta1.Stack, + ledger *v1beta1.Ledger, + image *registries.ImageConfiguration, + postgresEnvVars []corev1.EnvVar, + mirrorLedgers []string, +) error { + headlessSvcName := "ledger-raft" + grpcAddr := fmt.Sprintf("ledger-0.%s.%s.svc.cluster.local:%d", headlessSvcName, stack.Name, v3PortGRPC) + + // Build a shell script that creates each mirror ledger. + // "already exists" errors are ignored (idempotent), all others are fatal. + var scriptLines []string + scriptLines = append(scriptLines, `set -e`) + for _, name := range mirrorLedgers { + scriptLines = append(scriptLines, fmt.Sprintf( + `echo "Creating mirror ledger %s..." +OUT=$(./ledgerctl ledgers create --name %s --mode mirror --mirror-source-type postgres --mirror-dsn "$MIRROR_POSTGRES_DSN" --server %s --insecure 2>&1) || { + if echo "$OUT" | grep -qi "already exists"; then + echo "Ledger %s already exists, skipping." + else + echo "$OUT" >&2 + exit 1 + fi +}`, + name, name, grpcAddr, name, + )) + } + scriptLines = append(scriptLines, `echo "All mirror ledgers provisioned."`) + + script := strings.Join(scriptLines, "\n") + + container := corev1.Container{ + Name: "provision-mirrors", + Image: image.GetFullImageName(), + Command: []string{"/bin/sh", "-c"}, + Args: []string{script}, + Env: postgresEnvVars, + } + + return jobs.Handle(ctx, ledger, "v3-mirror-provision", container, + jobs.WithImagePullSecrets(image.PullSecrets), + ) +} diff --git a/internal/tests/ledger_v3_controller_test.go b/internal/tests/ledger_v3_controller_test.go new file mode 100644 index 00000000..16923244 --- /dev/null +++ b/internal/tests/ledger_v3_controller_test.go @@ -0,0 +1,331 @@ +package tests_test + +import ( + "github.com/google/uuid" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + + "github.com/formancehq/operator/v3/api/formance.com/v1beta1" + "github.com/formancehq/operator/v3/internal/core" + "github.com/formancehq/operator/v3/internal/resources/settings" + . "github.com/formancehq/operator/v3/internal/tests/internal" +) + +var _ = Describe("LedgerV3Controller", func() { + Context("When creating a Ledger with v3-mirror setting", func() { + var ( + stack *v1beta1.Stack + ledger *v1beta1.Ledger + databaseSettings *v1beta1.Settings + v3MirrorSetting *v1beta1.Settings + ) + BeforeEach(func() { + stack = &v1beta1.Stack{ + ObjectMeta: RandObjectMeta(), + Spec: v1beta1.StackSpec{Version: "v99.0.0"}, + } + databaseSettings = settings.New(uuid.NewString(), "postgres.*.uri", "postgresql://localhost", stack.Name) + v3MirrorSetting = settings.New(uuid.NewString(), "modules.ledger.v3-mirror", "v3.0.0:default,payments", stack.Name) + ledger = &v1beta1.Ledger{ + ObjectMeta: RandObjectMeta(), + Spec: v1beta1.LedgerSpec{ + StackDependency: v1beta1.StackDependency{ + Stack: stack.Name, + }, + }, + } + }) + JustBeforeEach(func() { + Expect(Create(stack)).To(Succeed()) + Expect(Create(databaseSettings)).To(Succeed()) + Expect(Create(v3MirrorSetting)).To(Succeed()) + Expect(Create(ledger)).To(Succeed()) + }) + AfterEach(func() { + Expect(Delete(ledger)).To(Succeed()) + Expect(Delete(v3MirrorSetting)).To(Succeed()) + Expect(Delete(databaseSettings)).To(Succeed()) + Expect(Delete(stack)).To(Succeed()) + }) + + It("Should create a StatefulSet", func() { + sts := &appsv1.StatefulSet{} + Eventually(func() error { + return LoadResource(stack.Name, "ledger", sts) + }).Should(Succeed()) + Expect(sts).To(BeControlledBy(ledger)) + }) + + It("Should create a StatefulSet with 3 replicas by default", func() { + sts := &appsv1.StatefulSet{} + Eventually(func() error { + return LoadResource(stack.Name, "ledger", sts) + }).Should(Succeed()) + Expect(*sts.Spec.Replicas).To(Equal(int32(3))) + }) + + It("Should create a StatefulSet with OrderedReady pod management", func() { + sts := &appsv1.StatefulSet{} + Eventually(func() error { + return LoadResource(stack.Name, "ledger", sts) + }).Should(Succeed()) + Expect(sts.Spec.PodManagementPolicy).To(Equal(appsv1.OrderedReadyPodManagement)) + }) + + It("Should create a StatefulSet using the headless service", func() { + sts := &appsv1.StatefulSet{} + Eventually(func() error { + return LoadResource(stack.Name, "ledger", sts) + }).Should(Succeed()) + Expect(sts.Spec.ServiceName).To(Equal("ledger-raft")) + }) + + It("Should create 3 volume claim templates (wal, data, cold-cache)", func() { + sts := &appsv1.StatefulSet{} + Eventually(func() error { + return LoadResource(stack.Name, "ledger", sts) + }).Should(Succeed()) + Expect(sts.Spec.VolumeClaimTemplates).To(HaveLen(3)) + Expect(sts.Spec.VolumeClaimTemplates[0].Name).To(Equal("wal")) + Expect(sts.Spec.VolumeClaimTemplates[1].Name).To(Equal("data")) + Expect(sts.Spec.VolumeClaimTemplates[2].Name).To(Equal("cold-cache")) + }) + + It("Should configure the container with 3 ports (http, grpc, raft)", func() { + sts := &appsv1.StatefulSet{} + Eventually(func() error { + return LoadResource(stack.Name, "ledger", sts) + }).Should(Succeed()) + container := sts.Spec.Template.Spec.Containers[0] + Expect(container.Ports).To(HaveLen(3)) + Expect(container.Ports).To(ContainElements( + HaveField("Name", "http"), + HaveField("Name", "grpc"), + HaveField("Name", "raft"), + )) + }) + + It("Should configure 3 volume mounts", func() { + sts := &appsv1.StatefulSet{} + Eventually(func() error { + return LoadResource(stack.Name, "ledger", sts) + }).Should(Succeed()) + container := sts.Spec.Template.Spec.Containers[0] + Expect(container.VolumeMounts).To(ConsistOf( + corev1.VolumeMount{Name: "wal", MountPath: "/data/raft"}, + corev1.VolumeMount{Name: "data", MountPath: "/data/app"}, + corev1.VolumeMount{Name: "cold-cache", MountPath: "/data/cold-cache"}, + )) + }) + + It("Should configure liveness, readiness, and startup probes", func() { + sts := &appsv1.StatefulSet{} + Eventually(func() error { + return LoadResource(stack.Name, "ledger", sts) + }).Should(Succeed()) + container := sts.Spec.Template.Spec.Containers[0] + Expect(container.LivenessProbe).NotTo(BeNil()) + Expect(container.LivenessProbe.HTTPGet.Path).To(Equal("/livez")) + Expect(container.ReadinessProbe).NotTo(BeNil()) + Expect(container.ReadinessProbe.HTTPGet.Path).To(Equal("/readyz")) + Expect(container.StartupProbe).NotTo(BeNil()) + Expect(container.StartupProbe.HTTPGet.Path).To(Equal("/livez")) + }) + + It("Should configure a preStop lifecycle hook for Raft deregistration", func() { + sts := &appsv1.StatefulSet{} + Eventually(func() error { + return LoadResource(stack.Name, "ledger", sts) + }).Should(Succeed()) + container := sts.Spec.Template.Spec.Containers[0] + Expect(container.Lifecycle).NotTo(BeNil()) + Expect(container.Lifecycle.PreStop).NotTo(BeNil()) + Expect(container.Lifecycle.PreStop.Exec).NotTo(BeNil()) + Expect(container.Lifecycle.PreStop.Exec.Command).To(HaveLen(3)) + Expect(container.Lifecycle.PreStop.Exec.Command[2]).To(ContainSubstring("/_admin/deregister")) + Expect(container.Lifecycle.PreStop.Exec.Command[2]).To(ContainSubstring("rm -rf")) + }) + + It("Should set CLUSTER_ID env var to the stack name", func() { + sts := &appsv1.StatefulSet{} + Eventually(func() error { + return LoadResource(stack.Name, "ledger", sts) + }).Should(Succeed()) + Expect(sts.Spec.Template.Spec.Containers[0].Env).To( + ContainElement(core.Env("CLUSTER_ID", stack.Name)), + ) + }) + + It("Should set downward API env vars (POD_NAME, POD_NAMESPACE)", func() { + sts := &appsv1.StatefulSet{} + Eventually(func() error { + return LoadResource(stack.Name, "ledger", sts) + }).Should(Succeed()) + env := sts.Spec.Template.Spec.Containers[0].Env + Expect(env).To(ContainElement(HaveField("Name", "POD_NAME"))) + Expect(env).To(ContainElement(HaveField("Name", "POD_NAMESPACE"))) + }) + + It("Should create a headless service for Raft peer discovery", func() { + svc := &corev1.Service{} + Eventually(func() error { + return LoadResource(stack.Name, "ledger-raft", svc) + }).Should(Succeed()) + Expect(svc).To(BeControlledBy(ledger)) + Expect(svc.Spec.ClusterIP).To(Equal("None")) + Expect(svc.Spec.PublishNotReadyAddresses).To(BeTrue()) + Expect(svc.Spec.Ports).To(ContainElements( + HaveField("Name", "raft"), + HaveField("Name", "grpc"), + )) + }) + + It("Should also create a v2 GatewayHTTPAPI with _healthcheck endpoint", func() { + httpAPI := &v1beta1.GatewayHTTPAPI{} + Eventually(func() error { + return LoadResource("", core.GetObjectName(stack.Name, "ledger"), httpAPI) + }).Should(Succeed()) + Expect(httpAPI.Spec.HealthCheckEndpoint).To(Equal("_healthcheck")) + }) + + It("Should also create a Database object for the v2 path", func() { + database := &v1beta1.Database{} + Eventually(func() error { + return LoadResource("", core.GetObjectName(stack.Name, "ledger"), database) + }).Should(Succeed()) + }) + + It("Should use the correct v3 image", func() { + sts := &appsv1.StatefulSet{} + Eventually(func() error { + return LoadResource(stack.Name, "ledger", sts) + }).Should(Succeed()) + Expect(sts.Spec.Template.Spec.Containers[0].Image).To(ContainSubstring("ledger-v3")) + Expect(sts.Spec.Template.Spec.Containers[0].Image).To(ContainSubstring("v3.0.0")) + }) + + It("Should create a mirror provisioning job", func() { + jobList := &batchv1.JobList{} + Eventually(func(g Gomega) { + g.Expect(List(jobList)).To(Succeed()) + found := false + for _, j := range jobList.Items { + if j.Namespace == stack.Name { + for _, c := range j.Spec.Template.Spec.Containers { + if c.Name == "provision-mirrors" { + found = true + g.Expect(c.Image).To(ContainSubstring("ledger-v3")) + g.Expect(c.Args[0]).To(ContainSubstring("default")) + g.Expect(c.Args[0]).To(ContainSubstring("payments")) + } + } + } + } + g.Expect(found).To(BeTrue()) + }).Should(Succeed()) + }) + + Context("with custom replicas setting", func() { + var replicasSetting *v1beta1.Settings + BeforeEach(func() { + replicasSetting = settings.New(uuid.NewString(), "module.ledger.v3.replicas", "5", stack.Name) + }) + JustBeforeEach(func() { + Expect(Create(replicasSetting)).To(Succeed()) + }) + AfterEach(func() { + Expect(Delete(replicasSetting)).To(Succeed()) + }) + It("Should create a StatefulSet with 5 replicas", func() { + sts := &appsv1.StatefulSet{} + Eventually(func(g Gomega) int32 { + g.Expect(LoadResource(stack.Name, "ledger", sts)).To(Succeed()) + return *sts.Spec.Replicas + }).Should(Equal(int32(5))) + }) + }) + + Context("with custom persistence sizes", func() { + var walSizeSetting *v1beta1.Settings + BeforeEach(func() { + walSizeSetting = settings.New(uuid.NewString(), "module.ledger.v3.persistence.wal.size", "20Gi", stack.Name) + }) + JustBeforeEach(func() { + Expect(Create(walSizeSetting)).To(Succeed()) + }) + AfterEach(func() { + Expect(Delete(walSizeSetting)).To(Succeed()) + }) + It("Should create WAL PVC with custom size", func() { + sts := &appsv1.StatefulSet{} + Eventually(func(g Gomega) string { + g.Expect(LoadResource(stack.Name, "ledger", sts)).To(Succeed()) + return sts.Spec.VolumeClaimTemplates[0].Spec.Resources.Requests.Storage().String() + }).Should(Equal("20Gi")) + }) + }) + + Context("with pebble settings", func() { + var cacheSizeSetting *v1beta1.Settings + BeforeEach(func() { + cacheSizeSetting = settings.New(uuid.NewString(), "module.ledger.v3.pebble.cache-size", "2147483648", stack.Name) + }) + JustBeforeEach(func() { + Expect(Create(cacheSizeSetting)).To(Succeed()) + }) + AfterEach(func() { + Expect(Delete(cacheSizeSetting)).To(Succeed()) + }) + It("Should set PEBBLE_CACHE_SIZE env var", func() { + sts := &appsv1.StatefulSet{} + Eventually(func(g Gomega) []corev1.EnvVar { + g.Expect(LoadResource(stack.Name, "ledger", sts)).To(Succeed()) + return sts.Spec.Template.Spec.Containers[0].Env + }).Should(ContainElement(core.Env("PEBBLE_CACHE_SIZE", "2147483648"))) + }) + }) + + Context("with raft settings", func() { + var snapshotSetting *v1beta1.Settings + BeforeEach(func() { + snapshotSetting = settings.New(uuid.NewString(), "module.ledger.v3.raft.snapshot-threshold", "10000", stack.Name) + }) + JustBeforeEach(func() { + Expect(Create(snapshotSetting)).To(Succeed()) + }) + AfterEach(func() { + Expect(Delete(snapshotSetting)).To(Succeed()) + }) + It("Should set RAFT_SNAPSHOT_THRESHOLD env var", func() { + sts := &appsv1.StatefulSet{} + Eventually(func(g Gomega) []corev1.EnvVar { + g.Expect(LoadResource(stack.Name, "ledger", sts)).To(Succeed()) + return sts.Spec.Template.Spec.Containers[0].Env + }).Should(ContainElement(core.Env("RAFT_SNAPSHOT_THRESHOLD", "10000"))) + }) + }) + + Context("with monitoring enabled", func() { + var otelTracesDSNSetting *v1beta1.Settings + BeforeEach(func() { + otelTracesDSNSetting = settings.New(uuid.NewString(), "opentelemetry.traces.dsn", "grpc://collector", stack.Name) + }) + JustBeforeEach(func() { + Expect(Create(otelTracesDSNSetting)).To(Succeed()) + }) + AfterEach(func() { + Expect(Delete(otelTracesDSNSetting)).To(Succeed()) + }) + It("Should add OTEL env vars to the StatefulSet", func() { + sts := &appsv1.StatefulSet{} + Eventually(func(g Gomega) []corev1.EnvVar { + g.Expect(LoadResource(stack.Name, "ledger", sts)).To(Succeed()) + return sts.Spec.Template.Spec.Containers[0].Env + }).Should(ContainElement(HaveField("Name", "OTEL_SERVICE_NAME"))) + }) + }) + }) +}) diff --git a/tools/kubectl-stacks/apiextensions.go b/tools/kubectl-stacks/apiextensions.go new file mode 100644 index 00000000..4854f3a8 --- /dev/null +++ b/tools/kubectl-stacks/apiextensions.go @@ -0,0 +1,30 @@ +package main + +import ( + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer" +) + +var apiExtensionsGV = schema.GroupVersion{ + Group: "apiextensions.k8s.io", + Version: "v1", +} + +// unstructuredNegotiator implements runtime.NegotiatedSerializer for raw JSON +// responses (used to query CRDs without importing apiextensions types). +type unstructuredNegotiator struct{} + +func (unstructuredNegotiator) EncoderForVersion(e runtime.Encoder, _ runtime.GroupVersioner) runtime.Encoder { + return e +} + +func (unstructuredNegotiator) DecoderToVersion(d runtime.Decoder, _ runtime.GroupVersioner) runtime.Decoder { + return d +} + +func (u unstructuredNegotiator) SupportedMediaTypes() []runtime.SerializerInfo { + scheme := runtime.NewScheme() + codecs := serializer.NewCodecFactory(scheme) + return codecs.SupportedMediaTypes() +} diff --git a/tools/kubectl-stacks/create.go b/tools/kubectl-stacks/create.go new file mode 100644 index 00000000..3e3e49d4 --- /dev/null +++ b/tools/kubectl-stacks/create.go @@ -0,0 +1,39 @@ +package main + +import ( + "fmt" + + "github.com/spf13/cobra" + "k8s.io/cli-runtime/pkg/genericclioptions" + "k8s.io/client-go/rest" + + "github.com/formancehq/operator/v3/api/formance.com/v1beta1" +) + +func NewCreateCommand(configFlags *genericclioptions.ConfigFlags) *cobra.Command { + return &cobra.Command{ + Use: "create ", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + client, err := getRestClient(configFlags) + if err != nil { + return err + } + + return create(cmd, client, args[0]) + }, + } +} + +func create(cmd *cobra.Command, client *rest.RESTClient, name string) error { + _, _ = fmt.Fprintf(cmd.OutOrStdout(), "Creating stack '%s'...\r\n", name) + + stack := &v1beta1.Stack{} + stack.SetName(name) + + return client.Post(). + Resource("Stacks"). + Body(stack). + Do(cmd.Context()). + Error() +} diff --git a/tools/kubectl-stacks/enable_module.go b/tools/kubectl-stacks/enable_module.go new file mode 100644 index 00000000..4f613925 --- /dev/null +++ b/tools/kubectl-stacks/enable_module.go @@ -0,0 +1,151 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "strings" + + "github.com/spf13/cobra" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/cli-runtime/pkg/genericclioptions" + "k8s.io/client-go/rest" + + "github.com/formancehq/operator/v3/api/formance.com/v1beta1" +) + +// moduleCRD holds the kind and plural resource name extracted from a CRD. +type moduleCRD struct { + Kind string + Plural string +} + +// discoverModules lists CRDs with label formance.com/kind=module and returns +// the available module kinds with their plural resource names. +func discoverModules(ctx context.Context, configFlags *genericclioptions.ConfigFlags) ([]moduleCRD, error) { + restConfig, err := configFlags.ToRESTConfig() + if err != nil { + return nil, err + } + + restConfig.APIPath = "/apis" + restConfig.GroupVersion = &apiExtensionsGV + restConfig.NegotiatedSerializer = unstructuredNegotiator{} + + client, err := rest.RESTClientFor(restConfig) + if err != nil { + return nil, err + } + + raw, err := client.Get(). + Resource("customresourcedefinitions"). + Param("labelSelector", "formance.com/kind=module"). + Do(ctx). + Raw() + if err != nil { + return nil, fmt.Errorf("failed to list CRDs: %w", err) + } + + var crdList struct { + Items []struct { + Spec struct { + Names struct { + Kind string `json:"kind"` + Plural string `json:"plural"` + } `json:"names"` + } `json:"spec"` + } `json:"items"` + } + if err := json.Unmarshal(raw, &crdList); err != nil { + return nil, fmt.Errorf("failed to parse CRD list: %w", err) + } + + modules := make([]moduleCRD, 0, len(crdList.Items)) + for _, item := range crdList.Items { + modules = append(modules, moduleCRD{ + Kind: item.Spec.Names.Kind, + Plural: item.Spec.Names.Plural, + }) + } + + return modules, nil +} + +// resolveModule does a case-insensitive lookup of the input against discovered modules. +func resolveModule(input string, modules []moduleCRD) (moduleCRD, error) { + lower := strings.ToLower(input) + for _, m := range modules { + if strings.ToLower(m.Kind) == lower { + return m, nil + } + } + kinds := make([]string, len(modules)) + for i, m := range modules { + kinds[i] = m.Kind + } + return moduleCRD{}, fmt.Errorf("unknown module %q, available modules: %v", input, kinds) +} + +func NewEnableModuleCommand(configFlags *genericclioptions.ConfigFlags) *cobra.Command { + return &cobra.Command{ + Use: "enable-module ", + Short: "Enable a module on a stack", + Args: cobra.ExactArgs(2), + RunE: func(cmd *cobra.Command, args []string) error { + client, err := getRestClient(configFlags) + if err != nil { + return err + } + + modules, err := discoverModules(cmd.Context(), configFlags) + if err != nil { + return err + } + + return enableModule(cmd, client, modules, args[0], args[1]) + }, + } +} + +func enableModule(cmd *cobra.Command, client *rest.RESTClient, modules []moduleCRD, stackName, moduleInput string) error { + mod, err := resolveModule(moduleInput, modules) + if err != nil { + return err + } + + _, _ = fmt.Fprintf(cmd.OutOrStdout(), "Enabling module '%s' on stack '%s'...\r\n", mod.Kind, stackName) + + obj := &unstructured.Unstructured{ + Object: map[string]any{ + "apiVersion": v1beta1.GroupVersion.String(), + "kind": mod.Kind, + "metadata": map[string]any{ + "name": stackName + "-" + toLowerKebab(mod.Kind), + }, + "spec": map[string]any{ + "stack": stackName, + }, + }, + } + + return client.Post(). + Resource(mod.Plural). + Body(obj). + Do(cmd.Context()). + Error() +} + +func toLowerKebab(s string) string { + var result []byte + for i, c := range s { + if c >= 'A' && c <= 'Z' { + if i > 0 { + result = append(result, '-') + } + result = append(result, byte(c)+32) + } else { + result = append(result, byte(c)) + } + } + return string(result) +} diff --git a/tools/kubectl-stacks/main.go b/tools/kubectl-stacks/main.go index a078af91..4bbcfffc 100644 --- a/tools/kubectl-stacks/main.go +++ b/tools/kubectl-stacks/main.go @@ -22,12 +22,14 @@ func NewRootCommand() *cobra.Command { configFlags := genericclioptions.NewConfigFlags(true) configFlags.AddFlags(cmd.PersistentFlags()) cmd.AddCommand( + NewCreateCommand(configFlags), NewLockCommand(configFlags), NewUnlockCommand(configFlags), NewListCommand(configFlags), NewSetDebugCommand(configFlags), NewDisableCommand(configFlags), NewEnableCommand(configFlags), + NewEnableModuleCommand(configFlags), NewUpgradeCommand(configFlags), NewSettingsCommand(configFlags), )