Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ Phase 2 — full apply to clean up old S3 objects and apply remaining changes:

### Changed

- **CUMULUS-4788**
- split replication service into multiple services, one for each replication table group
- **CUMULUS-4534**
- collection translate functions pass cmr_provider/cmrProvider back and forth
- sf-scheduler lambda function uses collection cmr_provider to fill provider in cmr section of message template meta
Expand Down
46 changes: 25 additions & 21 deletions example/rds-iceberg-replication-tf/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ data "aws_secretsmanager_secret_version" "db_credentials" {
}

locals {
tags = merge(var.tags, { Deployment = var.prefix })
rds_security_group = lookup(data.terraform_remote_state.data_persistence.outputs, "rds_security_group", "")
rds_endpoint = lookup(data.terraform_remote_state.rds_cluster.outputs, "rds_endpoint")
admin_db_login_secret_arn = lookup(data.terraform_remote_state.rds_cluster.outputs, "admin_db_login_secret_arn")
db_credentials = jsondecode(data.aws_secretsmanager_secret_version.db_credentials.secret_string)
tags = merge(var.tags, { Deployment = var.prefix })
rds_security_group = lookup(data.terraform_remote_state.data_persistence.outputs, "rds_security_group", "")
rds_endpoint = lookup(data.terraform_remote_state.rds_cluster.outputs, "rds_endpoint")
admin_db_login_secret_arn = lookup(data.terraform_remote_state.rds_cluster.outputs, "admin_db_login_secret_arn")
db_credentials = jsondecode(data.aws_secretsmanager_secret_version.db_credentials.secret_string)
}

data "terraform_remote_state" "data_persistence" {
Expand All @@ -43,20 +43,24 @@ data "terraform_remote_state" "rds_cluster" {
}

module "rds_iceberg_replication" {
source = "../../tf-modules/rds-iceberg-replication"
prefix = var.prefix
db_admin_username = local.db_credentials.username
db_admin_password = local.db_credentials.password
region = var.region
vpc_id = var.vpc_id
subnets = var.subnets
rds_security_group = local.rds_security_group
rds_endpoint = local.rds_endpoint
force_new_deployment = var.force_new_deployment
cpu = var.cpu
cpu_architecture = var.cpu_architecture
volume_size_in_gb = var.volume_size_in_gb
kafka_image = var.kafka_image
connect_image = var.connect_image
tags = merge(var.tags, { Deployment = var.prefix })
source = "../../tf-modules/rds-iceberg-replication"
prefix = var.prefix
db_admin_username = local.db_credentials.username
db_admin_password = local.db_credentials.password
region = var.region
vpc_id = var.vpc_id
subnet = var.subnet
rds_security_group = local.rds_security_group
rds_endpoint = local.rds_endpoint
force_new_deployment = var.force_new_deployment
cpu = var.cpu
cpu_architecture = var.cpu_architecture
volume_size_in_gb = var.volume_size_in_gb
kafka_image = var.kafka_image
connect_image = var.connect_image
bootstrap_image = var.bootstrap_image
pg_db = "postgres"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the db name should be coming from an output of the RDS module.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it can? The terraform is not setting up the dB, it it? That's some script running migrations, I tihnk

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case we should make it a variable to be passed in instead of hardcoding to postgres.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

iceberg_namespace = var.iceberg_namespace
iceberg_s3_bucket = var.iceberg_s3_bucket
tags = merge(var.tags, { Deployment = var.prefix })
}
1 change: 1 addition & 0 deletions example/rds-iceberg-replication-tf/outputs.tf
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
output "iceberg_replication_cluster_arn" {
description = "The ARN of the ECS cluster created by this module"
value = module.rds_iceberg_replication.iceberg_replication_cluster_arn
}
5 changes: 5 additions & 0 deletions example/rds-iceberg-replication-tf/terraform.tfvars.example
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ tags = { "Deployment" = "my_prefix" }
rds_endpoint = "prefix-rds-cluster-v2.cluster-12345.us-east-1.rds.amazonaws.com"
connect_image = "<account ID>.dkr.ecr.us-east-1.amazonaws.com/cumulus/debezium/connect:0.3"
kafka_image = "<account ID>.dkr.ecr.us-east-1.amazonaws.com/cumulus/debezium/kafka:3.4
bootstrap_image = "<account ID>.dkr.ecr.us-east-1.amazonaws.com/cumulus/replication-bootstrap:0.1"
data_persistence_remote_state_config = {
bucket = "PREFIX-tf-state"
key = "PREFIX/data-persistence/terraform.tfstate"
Expand All @@ -18,3 +19,7 @@ rds_cluster_remote_state_config = {
key = "PREFIX/rds-cluster/terraform.tfstate"
region = "us-east-1"
}
iceberg_namespace = "SOME_NAMESPACE"
iceberg_s3_bucket = "YOUR_BUCKET"
pg_db = "postgres"
pg_schema = "public"
32 changes: 29 additions & 3 deletions example/rds-iceberg-replication-tf/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ variable "region" {
default = "us-east-1"
}

variable "subnets" {
description = "Subnets for database cluster. Requires at least 2 across multiple AZs"
type = list(string)
variable "subnet" {
description = "Subnet for Fargate tasks"
type = string
}

variable "tags" {
Expand Down Expand Up @@ -76,10 +76,36 @@ variable "connect_image" {
type = string
}

variable "bootstrap_image" {
description = "Image used to start the bootstrap container. See https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_ContainerDefinition.html#ECS-Type-ContainerDefinition-image"
type = string
}

variable "data_persistence_remote_state_config" {
type = object({ bucket = string, key = string, region = string })
}

variable "rds_cluster_remote_state_config" {
type = object({ bucket = string, key = string, region = string })
}

variable "iceberg_s3_bucket" {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we aren't creating these buckets in Terraform - is the plan to create them via the script that sets up the Iceberg tables?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that was what you said, but we can create them in the terraform if you want.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I think a terraform variable here and created by the loading script is probably the most flexible - we can always change our minds later.

description = "S3 bucket where iceberg tables are stored"
type = string
}

variable "iceberg_namespace" {
description = "iceberg namespace (same as glue database)"
type = string
}

variable "pg_db" {
description = "postgres database"
type = string
}

variable "pg_schema" {
description = "The name of the schema in the postgres database that contains the tables"
type = string
default = "public"
}
103 changes: 0 additions & 103 deletions tf-modules/rds-iceberg-replication/kafka-connect.tf

This file was deleted.

87 changes: 52 additions & 35 deletions tf-modules/rds-iceberg-replication/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -7,53 +7,70 @@ terraform {
}
}
provider "aws" {
region = var.region
region = var.region

ignore_tags {
key_prefixes = ["gsfc-ngap"]
}
}

locals {
full_name = "${var.prefix}-replication"
}

resource "aws_security_group" "no_ingress_all_egress" {

name = "${var.prefix}-replication-ecs-no-ingress-all-egress"
vpc_id = var.vpc_id

egress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
}

lifecycle {
# This prevents the "In Use" error by creating a new one
# before trying to kill the old one during updates
create_before_destroy = true
replication_services = {
small-tables = {
slot_name = "${var.prefix}-small-tables"
table_include_list = "${var.pg_schema}.collections,${var.pg_schema}.async_operations,${var.pg_schema}.providers,${var.pg_schema}.pdrs,${var.pg_schema}.reconciliation_reports,${var.pg_schema}.rules"
}
executions = {
slot_name = "${var.prefix}-executions"
table_include_list = "${var.pg_schema}.executions"
column_exclude_list = "${var.pg_schema}.executions.original_payload,${var.pg_schema}.executions.final_payload"
}
granules = {
slot_name = "${var.prefix}-granules"
table_include_list = "${var.pg_schema}.granules"
}
files = {
slot_name = "${var.prefix}-files"
table_include_list = "${var.pg_schema}.files"
}
}

tags = var.tags
}

resource "aws_ecs_cluster" "default" {
name = "${var.prefix}-CumulusIcebergReplicationECSCluster"
tags = var.tags
module "cluster" {
source = "./modules/cluster"
prefix = var.prefix
vpc_id = var.vpc_id
iceberg_s3_bucket = var.iceberg_s3_bucket
tags = var.tags
}

resource "aws_cloudwatch_log_group" "kafka-logs" {
name = "/aws/ecs/cluster/${local.full_name}/kafka"
retention_in_days = 1
}
module "replication_services" {
for_each = local.replication_services
source = "./modules/replication-service"

resource "aws_cloudwatch_log_group" "kafka-connect-logs" {
name = "/aws/ecs/cluster/${local.full_name}/kafka-connect"
retention_in_days = 1
}
slot_name = each.value.slot_name
table_include_list = each.value.table_include_list

data "aws_iam_policy" "ECSInfrastructure" {
arn = "arn:aws:iam::aws:policy/service-role/AmazonECSInfrastructureRolePolicyForVolumes"
prefix = var.prefix
vpc_id = var.vpc_id
subnet = var.subnet
rds_endpoint = var.rds_endpoint
rds_port = var.rds_port
iceberg_s3_bucket = var.iceberg_s3_bucket
iceberg_namespace = var.iceberg_namespace
kafka_image = var.kafka_image
connect_image = var.connect_image
bootstrap_image = var.bootstrap_image
cpu = var.cpu
cpu_architecture = var.cpu_architecture
volume_size_in_gb = var.volume_size_in_gb
db_admin_password = var.db_admin_password
db_admin_username = var.db_admin_username
pg_db = var.pg_db
ecs_infrastructure_role = module.cluster.ecs_infrastructure_role
ecs_task_execution_role = module.cluster.task_execution_role
fargate_task_role = module.cluster.task_execution_role
rds_security_group = var.rds_security_group
task_security_group_id = module.cluster.no_ingress_all_egress_security_group.id
ecs_cluster = module.cluster.replication_ecs_cluster
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ resource "aws_iam_policy" "s3_access_policy" {
"s3:DeleteObject"
],
Resource = [
"arn:aws:s3:::your-bucket-name",
"arn:aws:s3:::your-bucket-name/*"
"arn:aws:s3:::${var.iceberg_s3_bucket}",
"arn:aws:s3:::${var.iceberg_s3_bucket}/*"
]
}
]
Expand All @@ -74,7 +74,7 @@ resource "aws_iam_policy" "glue_access_policy" {
Action = [
"glue:*"
],
Resource = "*" # TODO: Scope down to specific ARNs if possible
Resource = "*"
}
]
})
Expand Down
Loading
Loading