Skip to content

Conversation

@thehkkim
Copy link

@thehkkim thehkkim commented Jan 13, 2026

Purpose of this pull request

This PR adds support for Kafka message headers in the Kafka sink connector through a new kafka_headers_fields configuration option.

Users can now specify which fields should be set as Kafka message headers, enabling better message routing, filtering, and metadata management in Kafka consumers without parsing the message body.

Does this PR introduce any user-facing change?

Yes. This PR introduces a new configuration option kafka_headers_fields for the Kafka sink connector.

New Feature:

  • Added kafka_headers_fields configuration option to specify which fields should be added as Kafka message headers
  • Fields specified as headers are automatically excluded from the message value to avoid data duplication

Usage Example:

Configuration:

sink {
  kafka {
    topic = "test_topic"
    bootstrap.servers = "localhost:9092"
    format = json
    partition_key_fields = ["id"]
    kafka_headers_fields = ["source", "traceId"]  # New option
  }
}

Input data:

{"id": 1, "name": "test", "source": "web", "traceId": "trace-123"}

Output to Kafka:

  • Headers: source=web, traceId=trace-123
  • Key: {"id": 1}
  • Value: {"id": 1, "name": "test"} (header fields excluded)

Benefits:

  • Enables header-based routing and filtering in Kafka consumers
  • Supports distributed tracing by passing trace IDs via headers
  • Allows metadata tagging without parsing message body
  • Follows standard Kafka pattern where headers contain metadata

Backward Compatibility:

  • Fully backward compatible - new optional configuration
  • Existing configurations continue to work without any changes

How was this patch tested?

1. Unit Tests:

Added comprehensive test coverage in DefaultSeaTunnelRowSerializerTest:

  • testKafkaHeaders() - Verifies fields are correctly added to Kafka message headers
  • testKafkaHeadersWithNullValue() - Ensures null values are properly handled (not added to headers)
  • testHeaderFieldsExcludedFromValue() - Confirms header fields are excluded from message value

2. Test Results:

[INFO] Tests run: 4, Failures: 0, Errors: 0, Skipped: 0

All tests pass successfully, verifying:

  • ✅ Headers are correctly populated with specified field values
  • ✅ Header fields are automatically excluded from the serialized value
  • ✅ Null values are handled properly (skipped in headers)
  • ✅ Multiple header fields work correctly
  • ✅ Field values are converted to strings as header values

3. Test Coverage:

  • Tested with JSON format (primary use case)
  • Verified serialization logic for both headers and values
  • Confirmed backward compatibility when kafka_headers_fields is not configured

4. Code Quality:

  • ✅ Passed Apache SeaTunnel Spotless formatting checks
  • ✅ Follows existing code patterns in the Kafka connector
  • ✅ Consistent with partition_key_fields implementation pattern

Check list

  • Code changes are covered with comprehensive unit tests
  • Documentation updated to describe the new feature
    • Updated docs/en/connectors/sink/Kafka.md with feature description and usage example
    • Updated docs/zh/connectors/sink/Kafka.md with Chinese translation
  • Added configuration examples in documentation
  • No new Jar binary packages added
  • Not a new connector (enhancement to existing Kafka connector)
  • Code follows Apache SeaTunnel coding standards and passes all checks

@zhangshenghang
Copy link
Member

Thanks for the contribution! However, there are critical issues that must be addressed before merge.

[P0] Header fields with null values are permanently lost (COR-001)

  • DefaultSeaTunnelRowSerializer.java:223 skips null values when writing headers
  • DefaultSeaTunnelRowSerializer.java:350-355 unconditionally excludes all headerFields from value regardless of null values
  • Impact: Fields with null values disappear from both headers and value with no warning. Consider writing "null" string (consistent with partition_key_fields behavior), or retaining null fields in value.

[MAJOR] No overlap validation between partition_key_fields and kafka_headers_fields (GEN-001)

  • KafkaSinkWriter.java:252-290 validates both configs independently without checking for intersection
  • Impact: Same field can appear in both key and headers, then be excluded from value, causing data loss and confusion. Add validation to reject overlapping fields (similar to existing L197-201 pattern).

[MINOR] NATIVE format silently ignores kafka_headers_fields (GEN-002)

  • KafkaSinkWriter.java:187-190 calls a create method that doesn't support headerFields parameter
  • Impact: Configuration takes effect with no error but headers are not written. Either add validation to reject the config for NATIVE format, or document this limitation clearly.

Please fix at least the P0 and MAJOR issues before merging.

@thehkkim
Copy link
Author

@zhangshenghang
Thank you for the detailed review! I've fixed all the critical issues:

[P0] Header fields with null values are permanently lost (COR-001)

  • ✅ Fixed by writing "null" string instead of skipping null values
  • This is now consistent with partition_key_fields behavior

[MAJOR] No overlap validation between partition_key_fields and kafka_headers_fields (GEN-001)

  • ✅ Added validation to reject overlapping fields
  • Implemented check in KafkaSinkWriter.java that throws exception if same field appears in both configs

[MINOR] NATIVE format silently ignores kafka_headers_fields (GEN-002)

  • ✅ Added validation to reject kafka_headers_fields config for NATIVE format
  • Now throws clear error message: "kafka_headers_fields is not supported with NATIVE format"
  • Users will see error instead of silent failure

All changes have been tested. Ready for re-review!

@och5351
Copy link

och5351 commented Jan 22, 2026

Hi, @thehkkim !
LGTM +1

However, it appears that the commits should be squashed and the commit messages
should align with your PR title. PTAL.

image

I hope this feature can be merged, as it would be beneficial for our requirements.

Copy link
Collaborator

@dybyte dybyte left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be worth adding an e2e test in KafkaIT to verify headers and payload fields?

@thehkkim thehkkim force-pushed the feature/add-kafka-headers branch from 7ae6d89 to 34f16e4 Compare January 25, 2026 12:03
@github-actions github-actions bot added the e2e label Jan 25, 2026
@thehkkim thehkkim force-pushed the feature/add-kafka-headers branch from 34f16e4 to 5b2df2b Compare January 25, 2026 12:21
@thehkkim
Copy link
Author

@och5351
Done! I've squashed all commits into a single commit with the title
matching the PR title: [Feature][Connector-V2][Kafka] Add support for Kafka message header

@dybyte
Good suggestion! I've added an e2e test in KafkaIT to verify the headers functionality.

The test (testSinkKafkaWithHeaders) verifies:

  • Fields specified in kafka_headers_fields are correctly sent as Kafka message headers
  • These fields are excluded from the message payload
  • Non-header fields remain in the payload as expected

Test file: seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java:299-336
Config: seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka_sink_with_headers.conf

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants