Skip to content

[OpenFGA] syncing with bento/redpanda connect #9

@swelborn

Description

@swelborn

Problem

We have this generic issue of syncing state between data sources. What would be nice is if we could avoid writing potentially brittle python scripts to do this and use standard tooling that does this very well.

I am thinking of this in the context of a future openfga deployment, but it could be done more generically to grab data from various sources, restructure them in some processing pipeline, and send them to data sinks. There are many of these inputs/outputs available for both bento and red panda connect.

Solution

There are many solutions that do something like this, but the one that seems to be most used is called redpanda connect (formerly benthos). Some of the connectors like mongo_cdc sit behind redpanda's license. There is also a fork of the original benthos called bento, which is MIT licensed. If we choose to use one of these tools, we could start with bento and if things aren't as well-supported as the redpanda version, we could switch there without a significant lift - they use the same yaml format as far as I can tell.

Example

There are a couple options for connecting to the mongodb. Unfortunately as mentioned above the mongo change streams connector is behind a paywall, but we could get around this by (1) polling the db for what we need or (2) writing a custom plugin for redpanda that would grab changestream data and send it to a benthos endpoint (you can set up an http server easily). For the first option, this is what I see this looking like for openfga. This was generated by an agent, so you can think of this as pseudocode, but pretty close. I added my own comments with ###:

input:
  mongodb:
    ### NOTE: we could run this on a timer every minute/5 minutes or something
    # Standard Bento/Redpanda connect MongoDB input (polling/querying). 
    # Note: As you mentioned, 'mongodb_cdc' would require the Redpanda enterprise license.
    url: mongodb://localhost:27017
    database: my_database
    collection: user_roles
    operation: find
    query: '{}' # You can filter this down to specific documents
    
pipeline:
  processors:
    # Bloblang is the native mapping language used to restructure the data.
    # We need to transform the Mongo JSON into the OpenFGA tuple Write API format.
    - mapping: |
        # Example MongoDB input document:
        # { "_id": "alice", "role": "viewer", "document_id": "doc-123" }
        
        # We construct the payload expected by OpenFGA's /write endpoint:
        root.writes = {
          "tuple_keys": [
            {
              "user": "user:" + this._id.string(),
              "relation": this.role,
              "object": "document:" + this.document_id
            }
          ]
        }

output:
  # We use the HTTP Client to send the constructed payload to OpenFGA
  http_client:
    url: "http://<openfga-host>:8080/stores/${OPENFGA_STORE_ID}/write"
    verb: POST
    headers:
      Content-Type: application/json
      # Authorization: Bearer ${OPENFGA_API_TOKEN} ### NOTE: this would be injected by redpanda's secrets
    
    # Redpanda Connect handles retries automatically based on HTTP status codes.
    max_in_flight: 64
    retries: 3
    retry_period: 1s

Metadata

Metadata

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions