Skip to content

DENG-10803: Send events for curated_corpus_api#380

Closed
BenWu wants to merge 1 commit intomainfrom
benwu/gcp-events
Closed

DENG-10803: Send events for curated_corpus_api#380
BenWu wants to merge 1 commit intomainfrom
benwu/gcp-events

Conversation

@BenWu
Copy link
Copy Markdown
Collaborator

@BenWu BenWu commented Mar 17, 2026

Goal

This adds an additional call in the event emitter in the curated_corpus_api to send events to a pubsub topic in gcp for further transformations and loading into bigquery (that's set up in https://github.com/mozilla-services/data-sandbox-terraform/pull/121/changes). The code so far is a simple proof of concept to validate this approach. This does work locally when triggering an event via graphql

I'd love feedback/perspectives on:

  • feasibility of this approach, any alternatives
  • organization of the code and where things go

Implementation Decisions

  • events are sent to GCP instead of pulled from AWS (e.g. from an SQS queue) since it's easier for me manage all the infra in GCP
  • events are sent one at a time instead of buffering, for simplicity

Deployment steps

  • Database migrations?
  • Deployed to dev?
  • Secrets?

References

JIRA ticket:

@claude
Copy link
Copy Markdown

claude bot commented Mar 17, 2026

Claude finished @BenWu's task in 1m 11s —— View job


Code Review

  • Gather context and read changed files
  • Review code changes
  • Post review feedback

Overall this is a clean and straightforward PoC. The approach (fire-and-forget publish to GCP Pub/Sub within the existing event emitter) is reasonable, and the gRPC retry handling is a good callout. A few issues to address, some before shipping to prod:

Issues

1. GCP_EVENT_PROJECT hardcoded to dev project in infra (likely a bug)

infrastructure/curated-corpus-api/src/main.ts:262:

{
  name: 'GCP_EVENT_PROJECT',
  value: 'pocket-snowplow-shim-dev',  // never changes for prod!
},

Unlike GCP_EVENT_TOPIC which uses config.isDev to switch between dev/prod, GCP_EVENT_PROJECT is always set to the dev project. If pocket-snowplow-shim-dev and pocket-snowplow-shim-prod are different GCP projects, prod events would land in the dev project.

Fix this →

2. Unguarded JSON.parse crashes the server on startup

servers/curated-corpus-api/src/events/curatedCorpusEventEmitter.ts:14-16:

const gcp_credentials = config.gcp.serviceAccountKey
  ? JSON.parse(config.gcp.serviceAccountKey)  // throws if malformed JSON
  : undefined;

If GCP_EVENT_SERVICE_ACCOUNT_KEY is set but contains malformed JSON (e.g., truncated secret, copy-paste error), this throws at module import time and takes the server down. Wrap in try/catch and log an error instead.

Fix this →

3. Config defaults silently fall through to dev values in prod

servers/curated-corpus-api/src/config/index.ts:55-57:

projectId: process.env.GCP_EVENT_PROJECT || 'pocket-snowplow-shim-dev',
topicName: process.env.GCP_EVENT_TOPIC || 'snowplow-events-dev',

If either env var is accidentally unset in prod, events will silently publish to the dev GCP project/topic with no error. Since these are set explicitly via the infra config, the fallback values aren't needed in practice — but if you want to keep them, consider naming them to make the dev intent clear (or at minimum document that they're dev-only defaults).

Minor

4. gcp_credentials uses snake_case — the rest of the codebase uses camelCase. Small inconsistency but worth fixing for consistency.

5. GCP_EVENT_TOPIC missing from example.envGCP_EVENT_PROJECT and GCP_EVENT_SERVICE_ACCOUNT_KEY are documented there but GCP_EVENT_TOPIC isn't. Developers setting up locally might be confused since the config falls back to snowplow-events-dev silently.

6. PubSub/topic initialized at module level — This works fine, but it does make unit testing harder (you can't easily mock the client per test). Not a blocker for a PoC, but worth noting if you add tests later. Moving initialization into the class (or injecting the topic via constructor) would make the emitter more testable.

What looks good

  • Error handling: catching publish failures and logging without crashing the app is the right approach for a non-critical side-effect.
  • The conditional credential spreading ...(gcp_credentials && { credentials: gcp_credentials }) correctly falls back to ADC when no key is configured — good for running in GCP environments without an explicit service account key.
  • Infra secret management via AWS Secrets Manager for the GCP service account key is solid.
  • The config.isDev topic switching in infra is correct.

@github-actions
Copy link
Copy Markdown

Plan Result (corpus-scheduler-lambda-cdk-production)

CI link

Plan: 0 to add, 1 to change, 0 to destroy.
  • Update
    • aws_lambda_function.corpus-scheduler-sqs-lambda_F2ECDF9F
Change Result (Click me)
  # aws_lambda_function.corpus-scheduler-sqs-lambda_F2ECDF9F will be updated in-place
  ~ resource "aws_lambda_function" "corpus-scheduler-sqs-lambda_F2ECDF9F" {
        id                             = "CorpusSchedulerLambda-Prod-SQS-Function"
      ~ qualified_arn                  = "arn:aws:lambda:us-east-1:996905175585:function:CorpusSchedulerLambda-Prod-SQS-Function:290" -> (known after apply)
      ~ qualified_invoke_arn           = "arn:aws:apigateway:us-east-1:lambda:path/2015-03-31/functions/arn:aws:lambda:us-east-1:996905175585:function:CorpusSchedulerLambda-Prod-SQS-Function:290/invocations" -> (known after apply)
        tags                           = {
            "app_code"       = "content"
            "component_code" = "content-corpusschedulerlambda"
            "env_code"       = "prod"
            "environment"    = "Prod"
            "service"        = "CorpusSchedulerLambda"
        }
      ~ version                        = "290" -> (known after apply)
        # (20 unchanged attributes hidden)

      ~ environment {
          ~ variables = {
              ~ "GIT_SHA"                          = (sensitive value)
                # (7 unchanged elements hidden)
            }
        }

        # (4 unchanged blocks hidden)
    }

Plan: 0 to add, 1 to change, 0 to destroy.

⚠️ Errors

  • failed to add a label corpus-scheduler-lambda-cdk-production/add-or-update: label name is too long (max: 50)

@github-actions
Copy link
Copy Markdown

Plan Result (prospect-translation-lambda-cdk-production)

CI link

Plan: 0 to add, 1 to change, 0 to destroy.
  • Update
    • aws_lambda_function.translation-lambda_translation-sqs-lambda_B9BDF6BA
Change Result (Click me)
  # aws_lambda_function.translation-lambda_translation-sqs-lambda_B9BDF6BA will be updated in-place
  ~ resource "aws_lambda_function" "translation-lambda_translation-sqs-lambda_B9BDF6BA" {
        id                             = "ProspectAPI-Prod-Sqs-Translation-Function"
        tags                           = {
            "app_code"       = "content"
            "component_code" = "content-prospectapi"
            "env_code"       = "prod"
            "environment"    = "Prod"
            "service"        = "ProspectAPI-Sqs-Translation"
        }
        # (22 unchanged attributes hidden)

      ~ environment {
          ~ variables = {
              ~ "GIT_SHA"                      = (sensitive value)
                # (5 unchanged elements hidden)
            }
        }

        # (4 unchanged blocks hidden)
    }

Plan: 0 to add, 1 to change, 0 to destroy.

⚠️ Errors

  • failed to add a label prospect-translation-lambda-cdk-production/add-or-update: label name is too long (max: 50)

@github-actions
Copy link
Copy Markdown

Plan Result (section-manager-lambda-cdk-production)

CI link

Plan: 0 to add, 1 to change, 0 to destroy.
  • Update
    • aws_lambda_function.section-manager-sqs-lambda_D7365DAE
Change Result (Click me)
  # aws_lambda_function.section-manager-sqs-lambda_D7365DAE will be updated in-place
  ~ resource "aws_lambda_function" "section-manager-sqs-lambda_D7365DAE" {
        id                             = "SectionManagerLambda-Prod-SQS-Function"
      ~ qualified_arn                  = "arn:aws:lambda:us-east-1:996905175585:function:SectionManagerLambda-Prod-SQS-Function:86" -> (known after apply)
      ~ qualified_invoke_arn           = "arn:aws:apigateway:us-east-1:lambda:path/2015-03-31/functions/arn:aws:lambda:us-east-1:996905175585:function:SectionManagerLambda-Prod-SQS-Function:86/invocations" -> (known after apply)
        tags                           = {
            "app_code"       = "content"
            "component_code" = "content-sectionmanagerlambda"
            "env_code"       = "prod"
            "environment"    = "Prod"
            "service"        = "SectionManagerLambda"
        }
      ~ version                        = "86" -> (known after apply)
        # (20 unchanged attributes hidden)

      ~ environment {
          ~ variables = {
              - "GIT_SHA"     = "829a2457a67a8e774bc9e284cd4b7a490c879f84" -> null
                # (5 unchanged elements hidden)
            }
        }

        # (4 unchanged blocks hidden)
    }

Plan: 0 to add, 1 to change, 0 to destroy.

⚠️ Errors

  • failed to add a label section-manager-lambda-cdk-production/add-or-update: label name is too long (max: 50)

@github-actions
Copy link
Copy Markdown

Plan Result (curated-corpus-api-cdk-production)

CI link

⚠️ Resource Deletion will happen

This plan contains resource delete operation. Please check the plan result very carefully!

Plan: 1 to add, 0 to change, 1 to destroy.
  • Replace
    • aws_ecs_task_definition.application_ecs_service_ecs-task_461CC9D4
Change Result (Click me)
  # aws_ecs_task_definition.application_ecs_service_ecs-task_461CC9D4 must be replaced
-/+ resource "aws_ecs_task_definition" "application_ecs_service_ecs-task_461CC9D4" {
      ~ arn                      = "arn:aws:ecs:us-east-1:996905175585:task-definition/CuratedCorpusAPI-Prod:858" -> (known after apply)
      ~ arn_without_revision     = "arn:aws:ecs:us-east-1:996905175585:task-definition/CuratedCorpusAPI-Prod" -> (known after apply)
      ~ container_definitions    = jsonencode(
          ~ [
              ~ {
                  ~ environment            = [
                        # (1 unchanged element hidden)
                        {
                            name  = "AWS_S3_BUCKET"
                            value = "pocket-curatedcorpusapi-prod-images"
                        },
                      + {
                          + name  = "GCP_EVENT_PROJECT"
                          + value = "pocket-snowplow-shim-dev"
                        },
                      + {
                          + name  = "GCP_EVENT_TOPIC"
                          + value = "snowplow-events-prod"
                        },
                        {
                            name  = "LOG_LEVEL"
                            value = "info"
                        },
                        # (4 unchanged elements hidden)
                    ]
                    name                   = "app"
                  ~ secrets                = [
                        {
                            name      = "DATABASE_URL"
                            valueFrom = "arn:aws:secretsmanager:us-east-1:996905175585:secret:CuratedCorpusAPI-Prod/curatedcorpusapi-prod20210927235441510200000001-9IpWYR:database_url::"
                        },
                      + {
                          + name      = "GCP_EVENT_SERVICE_ACCOUNT_KEY"
                          + valueFrom = "arn:aws:secretsmanager:us-east-1:996905175585:secret:CuratedCorpusAPI/Prod/GCP_EVENT_SERVICE_ACCOUNT_KEY"
                        },
                        {
                            name      = "SENTRY_DSN"
                            valueFrom = "arn:aws:ssm:us-east-1:996905175585:parameter/CuratedCorpusAPI/Prod/SENTRY_DSN"
                        },
                        # (3 unchanged elements hidden)
                    ]
                    # (9 unchanged attributes hidden)
                },
            ] # forces replacement
        )
      ~ id                       = "CuratedCorpusAPI-Prod" -> (known after apply)
      ~ revision                 = 858 -> (known after apply)
        tags                     = {
            "app_code"       = "pocket-content-shared"
            "component_code" = "content-curatedcorpusapi"
            "env_code"       = "prod"
            "environment"    = "Prod"
            "service"        = "CuratedCorpusAPI"
        }
        # (10 unchanged attributes hidden)
    }

Plan: 1 to add, 0 to change, 1 to destroy.

Changes to Outputs:
  ~ ecs-task-arn           = "arn:aws:ecs:us-east-1:996905175585:task-definition/CuratedCorpusAPI-Prod:858" -> (known after apply)

@github-actions
Copy link
Copy Markdown

Plan Result (prospect-api-cdk-production)

CI link

Plan: 0 to add, 2 to change, 0 to destroy.
  • Update
    • aws_dynamodb_table.dynamodb_prospects_dynamodb_table_9854E41E
    • aws_iam_policy.application_ecs_service_ecs-iam_ecs-task-role-policy_6FC89FB6
Change Result (Click me)
  # data.aws_iam_policy_document.application_ecs_service_ecs-iam_data-ecs-task-role-policy_090CC3AD will be read during apply
  # (depends on a resource or a module with changes pending)
 <= data "aws_iam_policy_document" "application_ecs_service_ecs-iam_data-ecs-task-role-policy_090CC3AD" {
      + id            = (known after apply)
      + json          = (known after apply)
      + minified_json = (known after apply)
      + version       = "2012-10-17"

      + statement {
          + actions   = [
              + "dynamodb:BatchGet*",
              + "dynamodb:DescribeTable",
              + "dynamodb:Get*",
              + "dynamodb:Query",
              + "dynamodb:Scan",
              + "dynamodb:UpdateItem",
            ]
          + effect    = "Allow"
          + resources = [
              + "arn:aws:dynamodb:us-east-1:996905175585:table/PROAPI-Prod-Prospects",
              + "arn:aws:dynamodb:us-east-1:996905175585:table/PROAPI-Prod-Prospects/*",
            ]
        }
      + statement {
          + actions   = [
              + "s3:*",
            ]
          + effect    = "Allow"
          + resources = [
              + "arn:aws:s3:::pocket-prospectapi-prod-images",
              + "arn:aws:s3:::pocket-prospectapi-prod-images/*",
            ]
        }
      + statement {
          + actions   = [
              + "logs:CreateLogGroup",
              + "logs:CreateLogStream",
              + "logs:DescribeLogGroups",
              + "logs:DescribeLogStreams",
              + "logs:PutLogEvents",
            ]
          + effect    = "Allow"
          + resources = [
              + "*",
            ]
        }
    }

  # aws_dynamodb_table.dynamodb_prospects_dynamodb_table_9854E41E will be updated in-place
  ~ resource "aws_dynamodb_table" "dynamodb_prospects_dynamodb_table_9854E41E" {
        id                          = "PROAPI-Prod-Prospects"
        name                        = "PROAPI-Prod-Prospects"
        tags                        = {
            "app_code"       = "content"
            "component_code" = "content-prospectapi"
            "env_code"       = "prod"
            "environment"    = "Prod"
            "service"        = "ProspectAPI"
        }
        # (9 unchanged attributes hidden)

      - global_secondary_index {
          - hash_key           = "scheduledSurfaceGuid" -> null
          - name               = "scheduledSurfaceGuid-prospectType" -> null
          - non_key_attributes = [] -> null
          - projection_type    = "ALL" -> null
          - range_key          = "prospectType" -> null
          - read_capacity      = 0 -> null
          - write_capacity     = 0 -> null
        }
      + global_secondary_index {
          + hash_key           = "scheduledSurfaceGuid"
          + name               = "scheduledSurfaceGuid-prospectType"
          + non_key_attributes = []
          + projection_type    = "ALL"
          + range_key          = "prospectType"
          + read_capacity      = 5
          + write_capacity     = 5
        }

        # (5 unchanged blocks hidden)
    }

  # aws_iam_policy.application_ecs_service_ecs-iam_ecs-task-role-policy_6FC89FB6 will be updated in-place
  ~ resource "aws_iam_policy" "application_ecs_service_ecs-iam_ecs-task-role-policy_6FC89FB6" {
        id               = "arn:aws:iam::996905175585:policy/ProspectAPI-Prod-TaskRolePolicy"
        name             = "ProspectAPI-Prod-TaskRolePolicy"
      ~ policy           = jsonencode(
            {
              - Statement = [
                  - {
                      - Action   = [
                          - "dynamodb:UpdateItem",
                          - "dynamodb:Scan",
                          - "dynamodb:Query",
                          - "dynamodb:Get*",
                          - "dynamodb:DescribeTable",
                          - "dynamodb:BatchGet*",
                        ]
                      - Effect   = "Allow"
                      - Resource = [
                          - "arn:aws:dynamodb:us-east-1:996905175585:table/PROAPI-Prod-Prospects/*",
                          - "arn:aws:dynamodb:us-east-1:996905175585:table/PROAPI-Prod-Prospects",
                        ]
                    },
                  - {
                      - Action   = "s3:*"
                      - Effect   = "Allow"
                      - Resource = [
                          - "arn:aws:s3:::pocket-prospectapi-prod-images/*",
                          - "arn:aws:s3:::pocket-prospectapi-prod-images",
                        ]
                    },
                  - {
                      - Action   = [
                          - "logs:PutLogEvents",
                          - "logs:DescribeLogStreams",
                          - "logs:DescribeLogGroups",
                          - "logs:CreateLogStream",
                          - "logs:CreateLogGroup",
                        ]
                      - Effect   = "Allow"
                      - Resource = "*"
                    },
                ]
              - Version   = "2012-10-17"
            }
        ) -> (known after apply)
        tags             = {
            "app_code"       = "content"
            "component_code" = "content-prospectapi"
            "env_code"       = "prod"
            "environment"    = "Prod"
            "service"        = "ProspectAPI"
        }
        # (5 unchanged attributes hidden)
    }

Plan: 0 to add, 2 to change, 0 to destroy.

Copy link
Copy Markdown
Contributor

@mmiermans mmiermans left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious to understand the different approaches we could take. Let meet live to discuss.

"@aws-sdk/client-s3": "3.529.1",
"@aws-sdk/lib-storage": "3.529.1",
"@devoxa/prisma-relay-cursor-connection": "3.1.0",
"@google-cloud/pubsub": "^5.3.0",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the pros/cons of using pub/sub directly versus using @mozilla/glean with a mozlog configuration? I created an experimental PR using Glean in this repo a year ago.

@BenWu
Copy link
Copy Markdown
Collaborator Author

BenWu commented Mar 18, 2026

No longer needed. I'll add details in https://mozilla-hub.atlassian.net/browse/DENG-10803

@BenWu BenWu closed this Mar 18, 2026
@BenWu BenWu deleted the benwu/gcp-events branch March 18, 2026 16:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants