-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Description
Problem Description
In librdkafka v2.13.0, when Kafka returns a MetadataResponse that simultaneously satisfies:
broker_cnt == 0topic_cnt > 0
The client treats this metadata as successfully parsed and executes a series of erroneous state transitions, ultimately resulting in:
- All known brokers being decommissioned
- The rebootstrap (reconnection) timer being stopped
- The client ceasing all reconnection attempts
- Consumers remaining permanently offline, requiring a process restart to recover
This behavior does not exist in librdkafka v2.11.1.
Impact Scope
- Consumers remain offline even after Kafka cluster rolling upgrade completes
- No automatic recovery mechanism exists
- Severe impact on production systems (permanent unavailability)
- Manual client process restart is the only remediation
Environment Information
- librdkafka: v2.13.0
- Kafka: 3.8.0 → 3.9.1 (rolling upgrade)
- Deployment: Strimzi Kafka Operator
- metadata.recovery.strategy: rebootstrap
Reproduction Steps
-
Deploy Kafka cluster using Strimzi Kafka Operator:
- Kafka replicas = 3
- ZooKeeper replicas = 1
-
Perform Kafka 3.8.0 → 3.9.1 rolling upgrade
-
During the upgrade, Kafka intermittently returns the following MetadataResponse:
broker_cnt = 0topic_cnt = 1- Current topic replication factor =
1
-
librdkafka parses this response normally
-
Client subsequently stops reconnecting, consumers remain offline
Actual Behavior
-
handle_Metadata()treats this MetadataResponse as successfully parsed and immediately calls:rd_kafka_rebootstrap_tmr_stop()stopping the rebootstrap timer
-
rd_kafka_parse_Metadata0()returns success for the "0 brokers + has topics" case
(only returnsRD_KAFKA_RESP_ERR__PARTIALfor "0 brokers + 0 topics"):if (md->broker_cnt == 0 && md->topic_cnt == 0) { rd_rkb_dbg(rkb, METADATA, "METADATA", "No brokers or topics in metadata: should retry"); err = RD_KAFKA_RESP_ERR__PARTIAL; goto err; }
-
In the success path,
rd_kafka_metadata_decommission_unavailable_brokers()is called
Since the metadata contains no brokers, all known brokers are marked and enter thedecommissionstate -
Given that
ALL_BROKERS_DOWNhad previously triggeredrd_kafka_rebootstrap():- This "successful" MetadataResponse then stops the rebootstrap timer
- Causing the ongoing reconnection flow to be interrupted
-
Final result:
- Rebootstrap timer stopped
- Broker list cleared
- No further rebootstrap triggered
- Client permanently stuck in offline state
Version Comparison
| librdkafka Version | Behavior |
|---|---|
| v2.11.1 | Reconnects normally, no permanent disconnection |
| v2.13.0 | Permanent disconnection issue occurs |
Reproduction Environment
Kafka Cluster (Strimzi)
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: test
namespace: kafka
spec:
kafka:
version: 3.8.0
replicas: 3
resources:
requests:
memory: 512Mi
cpu: 50m
limits:
memory: 2Gi
cpu: 1
listeners:
- name: plain
port: 9092
type: internal
tls: false
config:
auto.create.topics.enable: "false"
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
default.replication.factor: 1
readinessProbe:
initialDelaySeconds: 15
timeoutSeconds: 5
livenessProbe:
initialDelaySeconds: 15
timeoutSeconds: 5
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
class: longhorn-nvme
size: 30Gi
kraftMetadata: shared
deleteClaim: false
zookeeper:
replicas: 1
resources:
requests:
memory: 256Mi
cpu: 20m
storage:
type: persistent-claim
class: longhorn-nvme
size: 30Gi
deleteClaim: false
entityOperator:
topicOperator: {}
userOperator: {}Strimzi Operator Installation
helm repo add strimzi https://strimzi.io/charts/
helm install strimzi-kafka-operator strimzi/strimzi-kafka-operator \
-n infra \
--version 0.45.1 \
-f strimzi-kafka-operator-values.yaml# strimzi-kafka-operator-values.yaml
watchAnyNamespace: trueReproduction Code
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <inttypes.h>
#include <rdkafka.h>
#include <unistd.h>
#include <time.h>
static volatile sig_atomic_t run = 1;
static void stop(int sig) { run = 0; }
static void get_time_str(char *timebuf, size_t size) {
time_t now = time(NULL);
struct tm *t = localtime(&now);
strftime(timebuf, size, "%Y-%m-%d %H:%M:%S", t);
}
static void log_cb(const rd_kafka_t *rk, int level,
const char *fac, const char *buf) {
(void)rk;
struct timeval tv;
char timebuf[64];
gettimeofday(&tv, NULL);
strftime(timebuf, sizeof(timebuf),
"%Y-%m-%d %H:%M:%S", localtime(&tv.tv_sec));
fprintf(stderr, "[%s.%03ld] LOG_CB level=%d fac=%s: %s\n",
timebuf, tv.tv_usec / 1000, level, fac, buf);
}
static void error_cb(rd_kafka_t *rk, int err,
const char *reason, void *opaque) {
(void)rk; (void)opaque;
char timebuf[64];
get_time_str(timebuf, sizeof(timebuf));
fprintf(stderr, "[%s] ERROR_CB: %s: %s\n",
timebuf, rd_kafka_err2str(err), reason);
}
static void rebalance_cb(rd_kafka_t *rk,
rd_kafka_resp_err_t err,
rd_kafka_topic_partition_list_t *partitions,
void *opaque) {
(void)opaque;
char timebuf[64];
get_time_str(timebuf, sizeof(timebuf));
fprintf(stderr, "[%s] REBALANCE_CB: %s, partitions=%d\n",
timebuf, rd_kafka_err2str(err),
partitions ? partitions->cnt : -1);
if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS)
rd_kafka_assign(rk, partitions);
else
rd_kafka_assign(rk, NULL);
}
int main() {
signal(SIGINT, stop);
char *brokers = getenv("BOOTSTRAP_SERVERS");
char *topic = getenv("TOPIC");
char *group = getenv("GROUP_ID");
if (!brokers || !topic || !group) {
fprintf(stderr, "set BOOTSTRAP_SERVERS, TOPIC, GROUP_ID\n");
return 1;
}
rd_kafka_conf_t *conf = rd_kafka_conf_new();
rd_kafka_conf_set(conf, "bootstrap.servers", brokers, NULL, 0);
rd_kafka_conf_set(conf, "group.id", group, NULL, 0);
rd_kafka_conf_set(conf, "auto.offset.reset", "earliest", NULL, 0);
rd_kafka_conf_set(conf, "metadata.recovery.strategy",
"rebootstrap", NULL, 0);
rd_kafka_conf_set(conf, "debug", "all", NULL, 0);
rd_kafka_conf_set(conf, "log_level", "7", NULL, 0);
rd_kafka_conf_set_log_cb(conf, log_cb);
rd_kafka_conf_set_error_cb(conf, error_cb);
rd_kafka_conf_set_rebalance_cb(conf, rebalance_cb);
rd_kafka_t *rk =
rd_kafka_new(RD_KAFKA_CONSUMER, conf, NULL, 0);
rd_kafka_poll_set_consumer(rk);
rd_kafka_topic_partition_list_t *subs =
rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(subs, topic, -1);
rd_kafka_subscribe(rk, subs);
while (run) {
rd_kafka_message_t *msg =
rd_kafka_consumer_poll(rk, 1000);
if (!msg) {
fprintf(stderr, "poll timeout\n");
continue;
}
if (msg->err)
fprintf(stderr, "msg error: %s\n",
rd_kafka_message_errstr(msg));
rd_kafka_message_destroy(msg);
sleep(1);
}
rd_kafka_consumer_close(rk);
rd_kafka_destroy(rk);
return 0;
}Reproduction Supplement: Kafka Broker Rolling Upgrade Pod Timeline
⏱ Kafka Rolling Upgrade Timeline (Two Rounds)
Environment Details
-
Kafka: 3.9.1 (Strimzi Operator 0.49.1)
-
Brokers: 3 (test-kafka-0 / 1 / 2)
-
Upgrade Method: Rolling upgrade (individual broker recreation)
-
Characteristics:
- Pod recreation changes IP address
- Between
RunningandReadystates, there is a 20–30 second unavailability window - During this window, clients may receive
broker_cnt=0 && topic_cnt>0MetadataResponse
🔁 First Round (Rolling Upgrade Start)
Broker 0 (test-kafka-0)
16:01:28 Terminating
16:01:29 Completed
16:01:29 Pending → ContainerCreating
16:01:37 Running IP=10.244.36.143
16:01:58 Ready
Broker 1 (test-kafka-1)
16:01:59 Terminating
16:02:00 Completed
16:02:01 Pending → ContainerCreating
16:02:10 Running IP=10.244.195.199
16:02:31 Ready
Broker 2 (test-kafka-2)
16:02:31 Terminating
16:02:32 Completed
16:02:33 Pending → ContainerCreating
16:02:50 Running IP=10.244.122.106
16:03:11 Ready
🔁 Second Round (Rolling Upgrade Continues)
Broker 1 (test-kafka-1)
16:03:29 Terminating
16:03:30 Completed
16:03:31 Pending → ContainerCreating
16:03:43 Running IP=10.244.195.233
16:04:04 Ready
Broker 2 (test-kafka-2)
16:04:05 Terminating
16:04:06 Completed
16:04:07 Pending → ContainerCreating
16:04:10 Running IP=10.244.122.104
16:04:31 Ready
Broker 0 (test-kafka-0)
16:04:31 Terminating
16:04:32 Completed
16:04:33 Pending → ContainerCreating
16:04:41 Running IP=10.244.36.162
16:05:02 Ready
Log Examples
[2026-02-06 16:03:17.662] LOG_CB level=7 fac=METADATA: [thrd:main]: test-kafka-2.test-kafka-brokers.kafka.svc:9092/2: ===== Received metadata (for 1 requested topics): partition leader query =====
[2026-02-06 16:03:17.663] LOG_CB level=7 fac=METADATA: [thrd:main]: test-kafka-2.test-kafka-brokers.kafka.svc:9092/2: ClusterId: jeQY4BWARHeTFHSxxSXPwA, ControllerId: -1
[2026-02-06 16:03:17.663] LOG_CB level=7 fac=METADATA: [thrd:main]: test-kafka-2.test-kafka-brokers.kafka.svc:9092/2: 0 brokers, 1 topics
[2026-02-06 16:03:17] ERROR_CB: Local: All broker connections are down: 3/3 brokers are down
[2026-02-06 16:17:34.229] LOG_CB level=7 fac=METADATA: [thrd:main]: Hinted cache of 1/1 topic(s) being queried
[2026-02-06 16:17:34.229] LOG_CB level=7 fac=METADATA: [thrd:main]: Skipping metadata refresh of 1 topic(s): refresh unavailable topics: no usable brokers