Skip to content

Commit 5b2df2b

Browse files
committed
[Feature][Connector-V2][Kafka] Add support for Kafka message header
1 parent 81d8733 commit 5b2df2b

File tree

8 files changed

+608
-7
lines changed

8 files changed

+608
-7
lines changed

docs/en/connectors/sink/Kafka.md

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ They can be downloaded via install-plugin.sh or from the Maven central repositor
3939
| kafka.config | Map | No | - | In addition to the above parameters that must be specified by the `Kafka producer` client, the user can also specify multiple non-mandatory parameters for the `producer` client, covering [all the producer parameters specified in the official Kafka document](https://kafka.apache.org/documentation.html#producerconfigs). |
4040
| semantics | String | No | NON | Semantics that can be chosen EXACTLY_ONCE/AT_LEAST_ONCE/NON, default NON. |
4141
| partition_key_fields | Array | No | - | Configure which fields are used as the key of the kafka message. |
42+
| kafka_headers_fields | Array | No | - | Configure which fields are used as the headers of the kafka message. The field value will be converted to a string and used as the header value. |
4243
| partition | Int | No | - | We can specify the partition, all messages will be sent to this partition. |
4344
| assign_partitions | Array | No | - | We can decide which partition to send based on the content of the message. The function of this parameter is to distribute information. |
4445
| transaction_prefix | String | No | - | If semantic is specified as EXACTLY_ONCE, the producer will write all messages in a Kafka transaction,kafka distinguishes different transactions by different transactionId. This parameter is prefix of kafka transactionId, make sure different job use different prefix. |
@@ -90,6 +91,23 @@ If not set partition key fields, the null message key will be sent to.
9091
The format of the message key is json, If name is set as the key, for example '{"name":"Jack"}'.
9192
The selected field must be an existing field in the upstream.
9293

94+
### Kafka Headers Fields
95+
96+
For example, if you want to use value of fields from upstream data as kafka message headers, you can assign field names to this property.
97+
98+
Upstream data is the following:
99+
100+
| name | age | data | source | traceId |
101+
|------|-----|---------------|--------|-----------|
102+
| Jack | 16 | data-example1 | web | trace-123 |
103+
| Mary | 23 | data-example2 | mobile | trace-456 |
104+
105+
If source and traceId are set as the kafka headers fields, then these field values will be added as headers to the kafka message.
106+
For example, the first row will have headers: `source=web` and `traceId=trace-123`.
107+
The field values will be converted to strings and used as header values.
108+
If a field value is null, it will not be added to the headers.
109+
The selected fields must be existing fields in the upstream.
110+
93111
### Assign Partitions
94112

95113
For example, there are five partitions in total, and the assign_partitions field in config is as follows:
@@ -140,6 +158,50 @@ sink {
140158
}
141159
```
142160

161+
### Using Kafka Headers
162+
163+
This example shows how to use kafka_headers_fields to set Kafka message headers:
164+
165+
```hocon
166+
env {
167+
parallelism = 1
168+
job.mode = "BATCH"
169+
}
170+
171+
source {
172+
FakeSource {
173+
parallelism = 1
174+
plugin_output = "fake"
175+
row.num = 16
176+
schema = {
177+
fields {
178+
name = "string"
179+
age = "int"
180+
source = "string"
181+
traceId = "string"
182+
}
183+
}
184+
}
185+
}
186+
187+
sink {
188+
kafka {
189+
topic = "test_topic"
190+
bootstrap.servers = "localhost:9092"
191+
format = json
192+
partition_key_fields = ["name"]
193+
kafka_headers_fields = ["source", "traceId"]
194+
kafka.request.timeout.ms = 60000
195+
semantics = EXACTLY_ONCE
196+
kafka.config = {
197+
acks = "all"
198+
request.timeout.ms = 60000
199+
buffer.memory = 33554432
200+
}
201+
}
202+
}
203+
```
204+
143205
### AWS MSK SASL/SCRAM
144206

145207
Replace the following `${username}` and `${password}` with the configuration values in AWS MSK.

docs/zh/connectors/sink/Kafka.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import ChangeLog from '../changelog/connector-kafka.md';
3939
| kafka.config | Map || - | 除了上述 Kafka Producer 客户端必须指定的参数外,用户还可以为 Producer 客户端指定多个非强制参数,涵盖 [Kafka官方文档中指定的所有生产者参数](https://kafka.apache.org/documentation.html#producerconfigs) |
4040
| semantics | String || NON | 可以选择的语义是 EXACTLY_ONCE/AT_LEAST_ONCE/NON,默认 NON。 |
4141
| partition_key_fields | Array || - | 配置字段用作 kafka 消息的key |
42+
| kafka_headers_fields | Array || - | 配置字段用作 kafka 消息的headers。字段值将被转换为字符串并用作 header 值 |
4243
| partition | Int || - | 可以指定分区,所有消息都会发送到此分区 |
4344
| assign_partitions | Array || - | 可以根据消息的内容决定发送哪个分区,该参数的作用是分发信息 |
4445
| transaction_prefix | String || - | 如果语义指定为EXACTLY_ONCE,生产者将把所有消息写入一个 Kafka 事务中,kafka 通过不同的 transactionId 来区分不同的事务。该参数是kafka transactionId的前缀,确保不同的作业使用不同的前缀 |
@@ -89,6 +90,23 @@ NON 不提供任何保证:如果 Kafka 代理出现问题,消息可能会丢
8990
消息 key 的格式为 json,如果设置 name 为 key,例如 `{"name":"Jack"}`
9091
所选的字段必须是上游数据中已存在的字段。
9192

93+
### Kafka Headers 字段
94+
95+
例如,如果你想使用上游数据中的字段值作为 kafka 消息的 headers,可以将这些字段名指定给此属性。
96+
97+
上游数据如下所示:
98+
99+
| name | age | data | source | traceId |
100+
|------|-----|---------------|--------|-----------|
101+
| Jack | 16 | data-example1 | web | trace-123 |
102+
| Mary | 23 | data-example2 | mobile | trace-456 |
103+
104+
如果将 source 和 traceId 设置为 kafka headers 字段,那么这些字段值将作为 headers 添加到 kafka 消息中。
105+
例如,第一行将具有 headers:`source=web``traceId=trace-123`
106+
字段值将被转换为字符串并用作 header 值。
107+
如果字段值为 null,则不会添加到 headers 中。
108+
所选的字段必须是上游数据中已存在的字段。
109+
92110
### 分区分配
93111

94112
假设总有五个分区,配置中的 assign_partitions 字段设置为:

seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSinkOptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,14 @@ public class KafkaSinkOptions extends KafkaBaseOptions {
4646
.withDescription(
4747
"Configure which fields are used as the key of the kafka message.");
4848

49+
public static final Option<List<String>> KAFKA_HEADERS_FIELDS =
50+
Options.key("kafka_headers_fields")
51+
.listType()
52+
.noDefaultValue()
53+
.withDescription(
54+
"Configure which fields are used as the headers of the kafka message. "
55+
+ "The field value will be converted to a string and used as the header value.");
56+
4957
public static final Option<KafkaSemantics> SEMANTICS =
5058
Options.key("semantics")
5159
.enumType(KafkaSemantics.class)

seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java

Lines changed: 106 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -119,12 +119,23 @@ public static DefaultSeaTunnelRowSerializer create(
119119
timestampExtractor(),
120120
keyExtractor(null, rowType, format, delimiter, pluginConfig),
121121
valueExtractor(rowType, format, delimiter, pluginConfig),
122-
headersExtractor());
122+
headersExtractor(null, rowType));
123+
}
124+
125+
public static DefaultSeaTunnelRowSerializer create(
126+
String topic,
127+
Integer partition,
128+
SeaTunnelRowType rowType,
129+
MessageFormat format,
130+
String delimiter,
131+
ReadonlyConfig pluginConfig) {
132+
return create(topic, partition, null, rowType, format, delimiter, pluginConfig);
123133
}
124134

125135
public static DefaultSeaTunnelRowSerializer create(
126136
String topic,
127137
Integer partition,
138+
List<String> headerFields,
128139
SeaTunnelRowType rowType,
129140
MessageFormat format,
130141
String delimiter,
@@ -134,13 +145,24 @@ public static DefaultSeaTunnelRowSerializer create(
134145
partitionExtractor(partition),
135146
timestampExtractor(),
136147
keyExtractor(null, rowType, format, delimiter, pluginConfig),
137-
valueExtractor(rowType, format, delimiter, pluginConfig),
138-
headersExtractor());
148+
valueExtractor(headerFields, rowType, format, delimiter, pluginConfig),
149+
headersExtractor(headerFields, rowType));
150+
}
151+
152+
public static DefaultSeaTunnelRowSerializer create(
153+
String topic,
154+
List<String> keyFields,
155+
SeaTunnelRowType rowType,
156+
MessageFormat format,
157+
String delimiter,
158+
ReadonlyConfig pluginConfig) {
159+
return create(topic, keyFields, null, rowType, format, delimiter, pluginConfig);
139160
}
140161

141162
public static DefaultSeaTunnelRowSerializer create(
142163
String topic,
143164
List<String> keyFields,
165+
List<String> headerFields,
144166
SeaTunnelRowType rowType,
145167
MessageFormat format,
146168
String delimiter,
@@ -150,8 +172,8 @@ public static DefaultSeaTunnelRowSerializer create(
150172
partitionExtractor(null),
151173
timestampExtractor(),
152174
keyExtractor(keyFields, rowType, format, delimiter, pluginConfig),
153-
valueExtractor(rowType, format, delimiter, pluginConfig),
154-
headersExtractor());
175+
valueExtractor(headerFields, rowType, format, delimiter, pluginConfig),
176+
headersExtractor(headerFields, rowType));
155177
}
156178

157179
private static Function<SeaTunnelRow, Integer> partitionNativeExtractor(
@@ -182,6 +204,32 @@ private static Function<SeaTunnelRow, Iterable<Header>> headersExtractor(
182204
convertToKafkaHeaders((Map<String, String>) row.getField(rowType.indexOf(HEADERS)));
183205
}
184206

207+
private static Function<SeaTunnelRow, Iterable<Header>> headersExtractor(
208+
List<String> headerFields, SeaTunnelRowType rowType) {
209+
if (headerFields == null || headerFields.isEmpty()) {
210+
return row -> null;
211+
}
212+
213+
int[] headerFieldIndexes = new int[headerFields.size()];
214+
for (int i = 0; i < headerFields.size(); i++) {
215+
headerFieldIndexes[i] = rowType.indexOf(headerFields.get(i));
216+
}
217+
218+
return row -> {
219+
RecordHeaders kafkaHeaders = new RecordHeaders();
220+
for (int i = 0; i < headerFields.size(); i++) {
221+
String headerName = headerFields.get(i);
222+
Object headerValue = row.getField(headerFieldIndexes[i]);
223+
// Write "null" string for null values to keep fields in headers
224+
// (consistent with partition_key_fields behavior)
225+
String valueStr = headerValue != null ? headerValue.toString() : "null";
226+
kafkaHeaders.add(
227+
new RecordHeader(headerName, valueStr.getBytes(StandardCharsets.UTF_8)));
228+
}
229+
return kafkaHeaders.toArray().length > 0 ? kafkaHeaders : null;
230+
};
231+
}
232+
185233
private static Function<SeaTunnelRow, String> topicExtractor(
186234
String topic, SeaTunnelRowType rowType, MessageFormat format) {
187235
if ((MessageFormat.COMPATIBLE_DEBEZIUM_JSON.equals(format)
@@ -256,6 +304,25 @@ private static Function<SeaTunnelRow, byte[]> valueExtractor(
256304
return row -> serializationSchema.serialize(row);
257305
}
258306

307+
private static Function<SeaTunnelRow, byte[]> valueExtractor(
308+
List<String> headerFields,
309+
SeaTunnelRowType rowType,
310+
MessageFormat format,
311+
String delimiter,
312+
ReadonlyConfig pluginConfig) {
313+
if (headerFields == null || headerFields.isEmpty()) {
314+
return valueExtractor(rowType, format, delimiter, pluginConfig);
315+
}
316+
317+
// Create a new row type excluding header fields
318+
SeaTunnelRowType valueRowType = createValueRowType(headerFields, rowType);
319+
Function<SeaTunnelRow, SeaTunnelRow> valueRowExtractor =
320+
createValueRowExtractor(valueRowType, headerFields, rowType);
321+
SerializationSchema serializationSchema =
322+
createSerializationSchema(valueRowType, format, delimiter, false, pluginConfig);
323+
return row -> serializationSchema.serialize(valueRowExtractor.apply(row));
324+
}
325+
259326
private static Function<SeaTunnelRow, byte[]> valueExtractor(SeaTunnelRowType rowType) {
260327
return row -> (byte[]) row.getField(rowType.indexOf(VALUE));
261328
}
@@ -273,6 +340,25 @@ private static SeaTunnelRowType createKeyType(
273340
return new SeaTunnelRowType(keyFieldNames.toArray(new String[0]), keyFieldDataTypeArr);
274341
}
275342

343+
private static SeaTunnelRowType createValueRowType(
344+
List<String> headerFieldNames, SeaTunnelRowType rowType) {
345+
// Create a row type excluding header fields
346+
List<String> valueFieldNames = new java.util.ArrayList<>();
347+
List<SeaTunnelDataType> valueFieldTypes = new java.util.ArrayList<>();
348+
349+
for (int i = 0; i < rowType.getTotalFields(); i++) {
350+
String fieldName = rowType.getFieldName(i);
351+
if (!headerFieldNames.contains(fieldName)) {
352+
valueFieldNames.add(fieldName);
353+
valueFieldTypes.add(rowType.getFieldType(i));
354+
}
355+
}
356+
357+
return new SeaTunnelRowType(
358+
valueFieldNames.toArray(new String[0]),
359+
valueFieldTypes.toArray(new SeaTunnelDataType[0]));
360+
}
361+
276362
private static Function<SeaTunnelRow, SeaTunnelRow> createKeyRowExtractor(
277363
SeaTunnelRowType keyType, SeaTunnelRowType rowType) {
278364
int[] keyIndex = new int[keyType.getTotalFields()];
@@ -288,6 +374,21 @@ private static Function<SeaTunnelRow, SeaTunnelRow> createKeyRowExtractor(
288374
};
289375
}
290376

377+
private static Function<SeaTunnelRow, SeaTunnelRow> createValueRowExtractor(
378+
SeaTunnelRowType valueType, List<String> headerFieldNames, SeaTunnelRowType rowType) {
379+
int[] valueIndex = new int[valueType.getTotalFields()];
380+
for (int i = 0; i < valueType.getTotalFields(); i++) {
381+
valueIndex[i] = rowType.indexOf(valueType.getFieldName(i));
382+
}
383+
return row -> {
384+
Object[] fields = new Object[valueType.getTotalFields()];
385+
for (int i = 0; i < valueIndex.length; i++) {
386+
fields[i] = row.getField(valueIndex[i]);
387+
}
388+
return new SeaTunnelRow(fields);
389+
};
390+
}
391+
291392
private static SerializationSchema createSerializationSchema(
292393
SeaTunnelRowType rowType,
293394
MessageFormat format,

0 commit comments

Comments
 (0)