Skip to content

Commit 9daffe3

Browse files
authored
Reproduction and fix for backpressure processing stalling (#254)
* Reproduction and fix for backpressure processing stalling --- Signed-off-by: Igor Savin <iselwin@gmail.com> * Cleanup --- Signed-off-by: Igor Savin <iselwin@gmail.com> * Fix linting --- Signed-off-by: Igor Savin <iselwin@gmail.com> * Fix formatting --- Signed-off-by: Igor Savin <iselwin@gmail.com> * Fix formatting --- Signed-off-by: Igor Savin <iselwin@gmail.com> * Reproduction test --- Signed-off-by: Igor Savin <iselwin@gmail.com> * Add memory test --- Signed-off-by: Igor Savin <iselwin@gmail.com> * Revise memory test --- Signed-off-by: Igor Savin <iselwin@gmail.com> * Finalize tests --- Signed-off-by: Igor Savin <iselwin@gmail.com> * Fix CI --- Signed-off-by: Igor Savin <iselwin@gmail.com> * Remove memory test from CI --- Signed-off-by: Igor Savin <iselwin@gmail.com> * Apply the patch --- Signed-off-by: Igor Savin <iselwin@gmail.com>
1 parent 33ea831 commit 9daffe3

File tree

15 files changed

+1188
-28
lines changed

15 files changed

+1188
-28
lines changed

CLAUDE.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,17 @@ c8 -c test/config/c8-local.json node --test --test 'test/path/to/file.test.ts'
1717
1818
# Lint the code
1919
npm run lint
20+
21+
# Run memory tests (manual — not part of CI)
22+
# Requires Docker with 3-broker cluster running (docker compose up -d --wait)
23+
npm run test:memory
2024
```
2125

26+
Memory tests (`test/memory/*.memory-test.ts`) use `--expose-gc` and a 3-broker cluster
27+
with sustained backpressure to detect heap leaks. They are excluded from CI due to resource
28+
requirements but should be run manually when modifying the consumer stream, fetch loop, or
29+
backpressure handling. Use the `.memory-test.ts` suffix for new memory tests.
30+
2231
## Code Style Guidelines
2332

2433
- **TypeScript**: Strict typing with explicit type imports `import type { X }`. Avoid `any` all the times. Ensure types compliance.

CONTRIBUTING.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,17 @@ Then you run the test normally:
6868
pnpm test
6969
```
7070

71+
#### Memory Tests
72+
73+
Memory tests (`test/memory/*.memory-test.ts`) are not part of the regular test suite or CI.
74+
They use `--expose-gc` and a 3-broker cluster with sustained backpressure to detect heap
75+
leaks in the consumer stream. Run them manually when modifying the consumer stream, fetch
76+
loop, or backpressure handling:
77+
78+
```bash
79+
pnpm run test:memory
80+
```
81+
7182
## Pull Request Process
7283

7384
### Before Submitting

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
"format": "prettier -w benchmarks playground src test",
3333
"test": "c8 -c test/config/c8-local.json ./scripts/node --test --test-reporter=cleaner-spec-reporter 'test/*.test.ts' 'test/*/*.test.ts' 'test/*/*/*.test.ts'",
3434
"test:ci": "c8 -c test/config/c8-ci.json ./scripts/node --test --test-reporter=cleaner-spec-reporter 'test/*.test.ts' 'test/*/*.test.ts' 'test/*/*/*.test.ts'",
35+
"test:memory": "NODE_OPTIONS='--expose-gc' ./scripts/node --test --test-reporter=cleaner-spec-reporter 'test/**/*.memory-test.ts'",
3536
"test:docker:up": "docker-compose up -d --wait",
3637
"test:docker:down": "docker-compose down",
3738
"ci": "npm run build && npm run lint && npm run test:ci",
Lines changed: 309 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,309 @@
1+
/**
2+
* Backpressure stall reproduction test.
3+
*
4+
* Reproduces the fetch loop stall that occurs when MessagesStream is piped
5+
* through a batch-accumulating Duplex (like MQT's KafkaMessageBatchStream)
6+
* with async handler processing.
7+
*
8+
* The stall happens because:
9+
* 1. pushRecords pushes messages → push() returns false → canPush=false
10+
* → process.nextTick(#fetch) is NOT scheduled (canPush gate in #pushRecords)
11+
* 2. _read() fires → #fetch() → but pipeline already called pause() → #paused=true → returns
12+
* 3. resume() → #paused=false → super.resume() → _read() doesn't reliably fire
13+
* because state.reading is already true from the failed _read() in step 2
14+
* 4. Fetch loop is dead. Messages remain unconsumed in Kafka.
15+
*
16+
* Setup mirrors a real-world production scenario:
17+
* - 15 topics, 1000 messages per topic (15,000 total)
18+
* - Consumer starts BEFORE publishing (fetches partial results as messages arrive)
19+
* - Messages published interleaved across topics (round-robin), in parallel batches of 500
20+
* - Small messages (~150 bytes, matching real CDC payloads — no padding)
21+
* - pipeline(consumerStream, batchStream) — same as MQT's AbstractKafkaConsumer
22+
* - for-await on batchStream with async handler processing per batch
23+
* - Single broker (all partitions on one node → single fetch returns all topics)
24+
*/
25+
26+
import { randomUUID } from 'node:crypto'
27+
import { pipeline } from 'node:stream/promises'
28+
import { setTimeout as sleep } from 'node:timers/promises'
29+
import {
30+
Admin,
31+
Consumer,
32+
MessagesStreamModes,
33+
Producer,
34+
jsonSerializer,
35+
stringDeserializer,
36+
stringSerializer,
37+
type Message
38+
} from '../../src/index.ts'
39+
import { config } from './config.ts'
40+
import { MessageBatchStream } from './message-batch-stream.ts'
41+
42+
export interface BackpressureStallTestOptions {
43+
topicCount: number
44+
messagesPerTopic: number
45+
batchSize: number
46+
handlerDelayMs: number
47+
timeoutSeconds: number
48+
consumerHighWaterMark: number
49+
}
50+
51+
type DeserializedMessage = Message<string, Record<string, unknown>, string, string>
52+
53+
export async function runBackpressureStallTest (options: BackpressureStallTestOptions): Promise<void> {
54+
const { topicCount, messagesPerTopic, batchSize, handlerDelayMs, timeoutSeconds, consumerHighWaterMark } = options
55+
const totalMessages = topicCount * messagesPerTopic
56+
57+
console.log('\n=== Backpressure Stall Test ===')
58+
console.log(` Topics: ${topicCount}`)
59+
console.log(` Messages per topic: ${messagesPerTopic}`)
60+
console.log(` Total: ${totalMessages}`)
61+
console.log(` Batch size: ${batchSize}`)
62+
console.log(` Handler delay: ${handlerDelayMs}ms`)
63+
console.log(` Consumer highWaterMark: ${consumerHighWaterMark}`)
64+
console.log(` Timeout: ${timeoutSeconds}s\n`)
65+
66+
// Create topics
67+
const admin = new Admin({
68+
clientId: 'backpressure-test-admin',
69+
bootstrapBrokers: config.kafka.bootstrapBrokers
70+
})
71+
72+
const topicPrefix = `bp-stall-${randomUUID().slice(0, 8)}`
73+
const topics: string[] = []
74+
for (let i = 0; i < topicCount; i++) {
75+
topics.push(`${topicPrefix}-${i}`)
76+
}
77+
78+
for (const topic of topics) {
79+
try {
80+
await admin.createTopics({ topics: [topic], partitions: 1, replicas: 1 })
81+
} catch {
82+
// May already exist
83+
}
84+
}
85+
await admin.close()
86+
await sleep(500) // Wait for topics to propagate
87+
88+
// Producer setup
89+
const producer = new Producer<string, object, string, string>({
90+
clientId: `bp-producer-${randomUUID()}`,
91+
bootstrapBrokers: config.kafka.bootstrapBrokers,
92+
serializers: {
93+
key: stringSerializer,
94+
value: jsonSerializer,
95+
headerKey: stringSerializer,
96+
headerValue: stringSerializer
97+
}
98+
})
99+
100+
// Small CDC-like messages (no padding). With small messages (~150 bytes),
101+
// all 15000 fit in maxBytes (10MB). The consumer gets partial fetches
102+
// because it's already running while messages are still being published —
103+
// each fetch returns whatever's available at that moment (~300 msgs/topic).
104+
// With batchSize=500 and ~300 msgs/topic, batches span multiple topics →
105+
// multiple readable entries per flush → backpressure triggers.
106+
const messageFactories = [
107+
(id: number) => ({ op: 'c', after: { id }, before: null }),
108+
(id: number) => ({ op: 'c', after: { entry_id: id }, before: null }),
109+
(id: number) => ({ op: 'u', after: { entry_id: id }, before: null }),
110+
(id: number) => ({ op: 'c', after: { key_id: id }, before: null }),
111+
(id: number) => ({ op: 'c', after: { form_id: id }, before: null }),
112+
(id: number) => ({ op: 'c', after: { id, entry_id: id }, before: null }),
113+
(id: number) => ({ op: 'c', after: { trans_id: id }, before: null }),
114+
(id: number) => ({ op: 'u', after: { id: String(id), team_id: 1 }, before: { id: String(id), team_id: 0 } }),
115+
(id: number) => ({
116+
op: 'u',
117+
after: { project_id: String(id), is_default: 1 },
118+
before: { project_id: String(id), is_default: 0 }
119+
}),
120+
(id: number) => ({ op: 'c', after: { entry_id: id, form_id: id }, before: null }),
121+
(id: number) => ({ op: 'c', after: { from_trans_id: id, to_trans_id: id + 100_000 }, before: null }),
122+
(id: number) => ({ op: 'c', after: { key_id: id, tag_name: `tag-${id}` }, before: null }),
123+
(id: number) => ({ op: 'c', after: { id, status: 'active' }, before: null }),
124+
(id: number) => ({ op: 'c', after: { id, score: id * 0.1 }, before: null }),
125+
(id: number) => ({ op: 'c', after: { id, name: `entity-${id}` }, before: null })
126+
]
127+
128+
// Build all messages upfront, interleaved across topics (round-robin).
129+
// Each batch of 500 contains messages from all 15 topics.
130+
const PUBLISH_BATCH = 500
131+
const allMessages: { topic: string; key: string; value: object }[] = []
132+
const idBase = Math.round(Math.random() * 1_000_000)
133+
134+
for (let i = 0; i < messagesPerTopic; i++) {
135+
for (let t = 0; t < topics.length; t++) {
136+
const factory = messageFactories[t % messageFactories.length]!
137+
allMessages.push({
138+
topic: topics[t]!,
139+
key: `key-${idBase + i}`,
140+
value: factory(idBase + i)
141+
})
142+
}
143+
}
144+
145+
// ── Start consumer BEFORE publishing ──
146+
// The consumer must be running when messages start arriving so that it
147+
// fetches partial results as they appear, creating the conditions for
148+
// backpressure (cross-topic batches from partial fetches).
149+
const groupId = `bp-stall-test-${randomUUID()}`
150+
const consumer = new Consumer<string, Record<string, unknown>, string, string>({
151+
clientId: `bp-consumer-${randomUUID()}`,
152+
groupId,
153+
bootstrapBrokers: config.kafka.bootstrapBrokers,
154+
deserializers: {
155+
key: stringDeserializer,
156+
value: data => {
157+
if (!data) return {} as Record<string, unknown>
158+
try {
159+
return JSON.parse(Buffer.isBuffer(data) ? data.toString() : String(data))
160+
} catch {
161+
return {} as Record<string, unknown>
162+
}
163+
},
164+
headerKey: stringDeserializer,
165+
headerValue: stringDeserializer
166+
},
167+
sessionTimeout: 20_000,
168+
rebalanceTimeout: 40_000,
169+
heartbeatInterval: 3_000
170+
})
171+
172+
for (const topic of topics) {
173+
await consumer.topics.trackAll(topic)
174+
}
175+
await consumer.joinGroup()
176+
177+
// Consumer stream config matching production defaults:
178+
// - highWaterMark: 1024 (default) — push() returns false when buffer exceeds this
179+
// - maxWaitTime: 1000ms — broker accumulates messages for up to 1s before responding
180+
const consumerStream = await consumer.consume({
181+
topics,
182+
mode: MessagesStreamModes.EARLIEST,
183+
maxWaitTime: 1000,
184+
autocommit: true,
185+
highWaterMark: consumerHighWaterMark
186+
})
187+
188+
// Batch stream config matching typical production setup:
189+
// batchSize=500, timeoutMilliseconds=2000, readableHighWaterMark=32
190+
const batchStream = new MessageBatchStream<DeserializedMessage>({
191+
batchSize,
192+
timeoutMilliseconds: 2000,
193+
readableHighWaterMark: 32
194+
})
195+
196+
// pipeline(consumerStream, batchStream) — same as MQT's AbstractKafkaConsumer.start()
197+
const pipelinePromise = pipeline(consumerStream, batchStream).catch(error => {
198+
console.error('Pipeline error:', error)
199+
})
200+
201+
// Consume with async handler — same as MQT's handleSyncStreamBatch
202+
let consumed = 0
203+
let batchesProcessed = 0
204+
const consumePromise = (async () => {
205+
for await (const messageBatch of batchStream) {
206+
const batch = messageBatch as DeserializedMessage[]
207+
batchesProcessed++
208+
209+
// Matches real handler: iterate messages synchronously
210+
for (const message of batch) {
211+
consumed++
212+
// Sync work: schema validation (simulated by object access)
213+
// eslint-disable-next-line no-void
214+
void message.value
215+
}
216+
217+
// Async flush: 3 parallel Redis SADD calls (simulated with sleep)
218+
await sleep(handlerDelayMs)
219+
220+
// Commit last message in batch
221+
const lastMessage = batch[batch.length - 1]!
222+
await lastMessage.commit()
223+
}
224+
})()
225+
226+
console.log('Consumer started, now publishing...')
227+
228+
// ── Publish AFTER consumer is running ──
229+
// Fire all batches concurrently (Promise.all) so messages arrive gradually
230+
// while the consumer is already fetching.
231+
const publishStart = performance.now()
232+
const publishPromises: Promise<unknown>[] = []
233+
234+
for (let i = 0; i < allMessages.length; i += PUBLISH_BATCH) {
235+
const batch = allMessages.slice(i, i + PUBLISH_BATCH)
236+
publishPromises.push(producer.send({ messages: batch }))
237+
}
238+
239+
await Promise.all(publishPromises)
240+
const publishMs = performance.now() - publishStart
241+
console.log(`Published ${totalMessages} messages in ${publishMs.toFixed(0)}ms`)
242+
243+
// Monitor progress with stall detection
244+
const consumeStart = performance.now()
245+
const timeoutMs = timeoutSeconds * 1000
246+
let lastConsumed = 0
247+
let stallTicks = 0
248+
249+
const result = await new Promise<'ok' | 'stall' | 'timeout'>(resolve => {
250+
const check = setInterval(() => {
251+
const elapsed = performance.now() - consumeStart
252+
const pct = ((consumed / totalMessages) * 100).toFixed(1)
253+
const rate = elapsed > 0 ? Math.round((consumed / elapsed) * 1000) : 0
254+
255+
if (consumed >= totalMessages) {
256+
clearInterval(check)
257+
resolve('ok')
258+
return
259+
}
260+
261+
if (elapsed > timeoutMs) {
262+
clearInterval(check)
263+
resolve('timeout')
264+
return
265+
}
266+
267+
// Stall detection: no progress for 5 seconds
268+
if (consumed === lastConsumed && consumed > 0) {
269+
stallTicks++
270+
if (stallTicks >= 10) {
271+
clearInterval(check)
272+
console.log(`\n !!! STALL DETECTED at ${consumed}/${totalMessages} (${pct}%) — no progress for 5s`)
273+
resolve('stall')
274+
return
275+
}
276+
} else {
277+
stallTicks = 0
278+
}
279+
lastConsumed = consumed
280+
281+
process.stdout.write(
282+
`\r Consumed: ${consumed}/${totalMessages} (${pct}%) ${rate} msg/s batches=${batchesProcessed}`
283+
)
284+
}, 500)
285+
})
286+
287+
const totalMs = performance.now() - consumeStart
288+
console.log(`\n\n Result: ${result.toUpperCase()}`)
289+
console.log(` Consumed: ${consumed}/${totalMessages} (${((consumed / totalMessages) * 100).toFixed(1)}%)`)
290+
console.log(` Duration: ${(totalMs / 1000).toFixed(1)}s`)
291+
console.log(` Batches: ${batchesProcessed}`)
292+
if (consumed > 0) {
293+
console.log(` Throughput: ${Math.round((consumed / totalMs) * 1000)} msg/s`)
294+
}
295+
296+
// Cleanup
297+
await consumerStream.close()
298+
await Promise.all([pipelinePromise, consumePromise]).catch(() => {})
299+
await producer.close()
300+
await consumer.close()
301+
302+
if (result === 'stall' || result === 'timeout') {
303+
console.log('\n FAILED — fetch loop stalled under backpressure')
304+
process.exit(1)
305+
} else {
306+
console.log('\n PASSED — all messages consumed')
307+
process.exit(0)
308+
}
309+
}

0 commit comments

Comments
 (0)