ct: introduce producer queue#29084
Conversation
c315223 to
f4623c8
Compare
There was a problem hiding this comment.
Pull request overview
This PR introduces a producer queue mechanism to enforce ordering of concurrent produce requests within cloud topics. The change allows multiple concurrent requests per producer while maintaining the correct order when committing to Raft, addressing the need to preserve ordering when uploading to object storage happens concurrently.
Key Changes:
- Introduced
producer_queueclass to enforce per-producer ordering via tickets - Wired producer queue into the cloud topics STM layer
- Refactored frontend to use ticket-based ordering instead of background task promises
Reviewed changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
src/v/cloud_topics/level_zero/common/producer_queue.h |
Defines the producer queue API and ticket interface |
src/v/cloud_topics/level_zero/common/producer_queue.cc |
Implements producer queue with future chaining and state management |
src/v/cloud_topics/level_zero/common/tests/producer_queue_test.cc |
Comprehensive test suite validating queue ordering and concurrency |
src/v/cloud_topics/level_zero/stm/ctp_stm.h |
Adds producer queue member to the STM |
src/v/cloud_topics/level_zero/stm/ctp_stm.cc |
Integrates producer queue lifecycle (stop) and accessor |
src/v/cloud_topics/level_zero/stm/ctp_stm_api.h |
Exposes producer queue through STM API |
src/v/cloud_topics/level_zero/stm/ctp_stm_api.cc |
Implements producer queue accessor wrapper |
src/v/cloud_topics/frontend/frontend.cc |
Refactors replicate flow to use producer tickets and restructures upload/replicate logic |
| BUILD files | Adds necessary dependencies for producer queue integration |
e3e2d50 to
367b95d
Compare
Retry command for Build#78277please wait until all jobs are finished before running the slash command |
wdberkeley
left a comment
There was a problem hiding this comment.
I like it, seems elegant to me. Looking forward to seeing how it works in a benchmark!
| op->replicate_finished.set_value(raft::errc::timeout); | ||
| }); | ||
|
|
||
| // The default errc that will cause the client to retry to operation |
Lazin
left a comment
There was a problem hiding this comment.
Looks good. Needs back-pressure propagation.
| wait_on = it->second->promise.get_future(); | ||
| } | ||
|
|
||
| auto new_state = ss::make_lw_shared<chain_state>(); |
There was a problem hiding this comment.
It feels like this could be done using a map of semaphores with count set to 1. Is it done this way to simplify the implementation of the 'release' method?
There was a problem hiding this comment.
I guess it will also require storing a future inside a ticket. The future will be awaited in the redeem method and just discarded in the release method. I have a small gripe with storing futures in data structures but in this case I think it's not an issue.
There was a problem hiding this comment.
Yeah I agree futures stored in data structures are awkward and not good, but I think this is the simplest way to accomplish this?
There was a problem hiding this comment.
I had to rework this a bit from @bharathv's feedback if you want to take another look, I ended up going down the semaphore route
| auto it = _producer_states.find(pid); | ||
| ss::future<> wait_on = ss::now(); | ||
| if (it != _producer_states.end()) { | ||
| wait_on = it->second->promise.get_future(); |
There was a problem hiding this comment.
get_future() asserts on a double get(), right? Is that a problem?
ticket0
ticket1 <chained to ticket0>
ticket1 is destroyed without redeem/release (say some exception)
ticket2 -- tries to chain to ticket1 (since it is the last member) -- calls ticket0.promise.get_future()?
There was a problem hiding this comment.
ticket1 is destroyed without redeem/release (say some exception)
The dctor for tickets always calls release, I'm not seeing it possible to get a double get_future, as you always replace the state if you call get_future. It is possible to have issues if redeem is called twice, but I don't think that is what you're getting at?
There was a problem hiding this comment.
Ah I had a typo in my comment.. (sorry for the confusion)..
I meant (replace ticket1 -> ticket0).
ticket2 -- tries to chain to ticket0 (since it is the last member) -- calls ticket0.promise.get_future()?
In that case, ticket0.state.get_future() is called twice IIUC?
There was a problem hiding this comment.
Hrm, I guess that's an ordering violation, not a double get_future, because we unconditionally replace the current future always after we grab it.
There was a problem hiding this comment.
Okay good catch, I've fixed this case :)
367b95d to
38fb918
Compare
|
Force push: address review feedback and add backpressure from write pipeline units into the stages |
38fb918 to
b99a6a7
Compare
| out.request_enqueued = _data_plane->reserve_write(std::move(batch_vec)) | ||
| .then_wrapped([this, | ||
| p = std::move(result), | ||
| cloned = std::move(to_cache), | ||
| batch_id, |
There was a problem hiding this comment.
Do you think I should add back the struct to hold all the stuff? It feels very messy I agree
There was a problem hiding this comment.
doesn't seem worth it. we're just spoiled with coroutines most of the time
| ss::semaphore_units<> _units; | ||
| ss::future<ss::semaphore_units<>> _wait_on; | ||
| ss::lw_shared_ptr<chain_state> _state; | ||
| model::producer_id _pid; | ||
| producer_state_map* _map; | ||
| ss::gate::holder _gate_holder; |
There was a problem hiding this comment.
is this the state we maintain per request?
There was a problem hiding this comment.
Yes, we might be able to consolidate a little by moving the gate into the shared state, but this is the simplest
There was a problem hiding this comment.
Ended up simplifying it
#29084 (comment)
b99a6a7 to
2cab834
Compare
|
Force push: fixed the build by just doing the subclass trick instead of trying to forward declare only Also pushed a commit to get better error messages when there are stale epochs |
Right now we get unhelpful log messages about empty epochs, but now at least we get a better error message about what the latest value in the STM is.
3ae3891 to
95b8aa0
Compare
|
/ci-repeat 1 |
Retry command for Build#78497please wait until all jobs are finished before running the slash command |
|
Okay the debug failures are all shutdown hangs: The one cloud topics one is the reconciler, which is unrelated to this PR |
|
/ci-repeat 1 |
Retry command for Build#78501please wait until all jobs are finished before running the slash command |
|
/ci-repeat 1 |
Retry command for Build#78506please wait until all jobs are finished before running the slash command |
Hrm I wonder if increased traffic to the ctp_stm causing the shutdown delay... Anyways, I suspect it's due to the lock in the epoch fencing as everything else looks like it shuts down ok |
|
Okay so it seems reasonable that the issue here is the epoch locking timing out on shutdown, as there are lots of partition moves and the cluster epoch is probably out of sync on these different nodes. |
We see shutdowns in the ctp stm, likely because of outstanding units in the epoch fencing. Also add a watchdog for shutdown to better identify what is taking so long in stop. We switch from ss::rwlock to ss::semaphore as to be able to log the number of read locks held on shutdown hangs.
d30095b to
b4ad1b4
Compare
Retry command for Build#78512please wait until all jobs are finished before running the slash command |
|
The only test failure is now a crash that Evgeny has a fix for: |
|
/ci-repeat 1 |
Retry command for Build#78513please wait until all jobs are finished before running the slash command |
|
This time I got the same assertion again and |
|
I think I'm ready to force merge at this point @dotnwat, this PR is less flaky than CI on mainline wrt cloud topics. |
That sounds good to me. |
Backports Required
Release Notes