fix: add timeout and cleanup for inflight nodes in MessagesStream#251
Merged
ShogunPanda merged 8 commits intoplatformatic:mainfrom Mar 18, 2026
Merged
Conversation
If a fetch callback is never invoked, the broker's node ID stays in inflightNodes permanently, starving all partitions led by that broker. Change inflightNodes from a Set to a Map tracking timestamps, sweep entries older than 2 minutes on each fetch cycle, and clear the map on consumer:group:join so stale entries from a previous assignment cannot block brokers after a rebalance. Signed-off-by: Tristan Burch <tristan@day.ai>
When all partition leaders are already in the inflight map, the requests map is empty after building. Since no fetch callbacks are registered and no data is pushed, Node.js Readable stream contract means _read() won't be called again. This prevents the 120-second stale inflight cleanup sweep from ever running, causing the stream to stall indefinitely. Add a 1-second delayed retry when requests.size === 0 and there are inflight nodes, so the cleanup sweep gets a chance to run. Also clear partitionsEpochs on consumer:group:join to prevent stale leader epochs from persisting across rebalances. Signed-off-by: Tristan Burch <tristan@day.ai>
Emitting 'fetch' on the empty-request path inflates the fetch counter used by consumers to control pausing, causing getLag tests to see incorrect lag values. Signed-off-by: Tristan Burch <tristan@day.ai>
ShogunPanda
requested changes
Mar 17, 2026
Replace the 1-second setTimeout with process.nextTick when retrying fetch due to all partition leaders being inflight. This avoids an unnecessary delay and lets the retry fire as soon as the current event loop tick completes. Adds a test that verifies the retry path is exercised when a fetch callback is delayed. Signed-off-by: Tristan Burch <tristan@day.ai>
Signed-off-by: Tristan Burch <tristan@day.ai>
If I try to use `nextTick`, and when `requests.size === 0`, and inflight nodes exist, the code does this: `process.nextTick` --> `#fetch()` --> `metadata()` --> no requests --> `process.nextTick` --> ... Each iteration sends a metadata request, triggering rapid-fire broker calls that form a tight metadata loop that overwhelms the broker and starves I/O. Signed-off-by: Tristan Burch <tristan@day.ai>
The once('fetch') listener fired synchronously during emit('fetch'),
before #pushRecords updated #offsetsToFetch. This caused #fetch() to
build requests with stale offsets, re-fetching the same messages.
The retry is unnecessary: when inflight fetches complete, #pushRecords
already calls process.nextTick(() => #fetch()) which resumes the cycle
with correct offsets. The connection layer's requestTimeout guarantees
callbacks are always invoked, so the stale cleanup can always execute.
Signed-off-by: Tristan Burch <tristan@day.ai>
…fetches Clearing inflightNodes on consumer:group:join caused a race where an in-flight fetch callback would delete the entry added by a newer fetch, producing extra fetch events and flaky test failures in getLag. The 120-second stale sweep already handles permanently stuck entries. Signed-off-by: Tristan Burch <tristan@day.ai>
ShogunPanda
approved these changes
Mar 18, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
If a fetch callback is never invoked, the broker's node ID stays in
inflightNodespermanently, starving all partitions led by that broker.Change
inflightNodesfrom a Set to a Map tracking timestamps, sweep entries older than 2 minutes on each fetch cycle, and clear the map onconsumer:group:joinso stale entries from a previous assignment cannot block brokers after a rebalance.