Bug Report: PostgreSQL CDC with decoderbufs Plugin Missing before Data in UPDATE Events #4186
Unanswered
liuyq2step
asked this question in
Q&A
Replies: 0 comments
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.
-
Summary
When using Flink CDC with PostgreSQL 9.5 and the decoderbufs logical decoding plugin, UPDATE events are missing the before field in the CDC stream, causing a NullPointerException in DebeziumEventDeserializationSchema.
Environment
Flink CDC Version: 3.5.x
PostgreSQL Version: 9.5
Logical Decoding Plugin: decoderbufs (required for PostgreSQL 9.5)
REPLICA IDENTITY: DEFAULT or USING INDEX
Table: Has PRIMARY KEY constraint
Problem Description
When an UPDATE operation occurs on a table with REPLICA IDENTITY DEFAULT or USING INDEX, the CDC events generated by decoderbufs do not include the before field for UPDATE operations. This causes a NullPointerException in Flink CDC when it tries to process these events.
Evidence
json
{
"after": {
"id": 1,
"txt": "123",
"dt": 1764770838000000,
"content": 1,
"jsonbcol": 1,
"testev": 1,
"nub2": 1.0000
},
"source": {
"version": "1.9.8.Final",
"connector": "postgresql",
"name": "postgres_cdc_source",
"ts_ms": 1764743691547,
"db": "bgtest",
"sequence": ["5292988268984", "5292988269040"],
"schema": "public",
"table": "t_flinktest",
"txId": 61772951,
"lsn": 5292988269040
},
"op": "u", // UPDATE operation
"ts_ms": 1764743831030
// ❌ MISSING: "before" field!
}
json
{
"before": {
"id": 1 // Contains primary key
},
"source": { ... },
"op": "d", // DELETE operation
"ts_ms": 1764741493890
}
json
{
"after": { ...full data... },
"source": { ... },
"op": "c", // CREATE/INSERT operation
"ts_ms": 1764741943221
}
Impact
java
// In DebeziumEventDeserializationSchema.java
private RecordData extractBeforeDataRecord(Struct value, Schema valueSchema) throws Exception {
Schema beforeSchema = fieldSchema(valueSchema, Envelope.FieldName.BEFORE);
Struct beforeValue = fieldStruct(value, Envelope.FieldName.BEFORE); // Returns null!
return extractDataRecord(beforeValue, beforeSchema); // NPE when beforeValue is null
}
text
java.lang.NullPointerException
at org.apache.flink.cdc.debezium.event.DebeziumSchemaDataTypeInference.lambda$inferStruct$1(DebeziumSchemaDataTypeInference.java:209)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
...
at org.apache.flink.cdc.connectors.postgres.source.reader.PostgresPipelineRecordEmitter.processElement(PostgresPipelineRecordEmitter.java:114)
Beta Was this translation helpful? Give feedback.
All reactions