Skip to content

Commit f4623c8

Browse files
committed
ct/frontend: allow concurrent requests per producer
Currently, we can only upload a single batch from a producer at once in order to preserve the ordering of requests into rm_stm for idempotency. That is undesirable for both latency and cost (of s3 uploads). To fix this, mark requests as enqueued before they are uploaded into s3, but mark their order. Once the request is uploaded we re-enforce ordering of batches to preserve the requirements for rm_stm to do its thing.
1 parent 06f0d15 commit f4623c8

File tree

1 file changed

+19
-10
lines changed

1 file changed

+19
-10
lines changed

src/v/cloud_topics/frontend/frontend.cc

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -542,11 +542,16 @@ ss::future<> bg_upload_and_replicate(
542542
bool cache_enabled) {
543543
vassert(api != nullptr, "cloud topics api is not initialized");
544544

545+
auto ticket = op->ctp_stm_api->producer_queue().reserve(
546+
op->batch_id.pid.get_id());
547+
// Now that we've acquired our ticket and determined our ordering, we can
548+
// say that the request is enqueued and get more produce requests.
549+
op->request_enqueued.set_value();
550+
545551
auto fallback = ss::defer([op] {
546552
// This guarantees that the promises are set.
547553
// The error code used here does not represent the
548554
// actual error.
549-
op->request_enqueued.set_value();
550555
op->replicate_finished.set_value(raft::errc::timeout);
551556
});
552557

@@ -568,6 +573,7 @@ ss::future<> bg_upload_and_replicate(
568573
if (cache_enabled) {
569574
rb_copy = clone_batches(op->batches);
570575
}
576+
571577
auto timeout = op->timeout == 0ms ? L0_upload_default_timeout : op->timeout;
572578
auto res = co_await api->write_and_debounce(
573579
op->ntp,
@@ -614,26 +620,29 @@ ss::future<> bg_upload_and_replicate(
614620

615621
chunked_vector<model::record_batch_header> headers;
616622
headers.push_back(header);
617-
auto placeholders = co_await convert_to_placeholders(
618-
res.value(), std::move(headers));
623+
auto placeholders = co_await convert_to_placeholders(res.value(), headers);
619624

620625
vassert(
621626
placeholders.batches.size() == 1,
622627
"Expected single batch, got {}",
623628
placeholders.batches.size());
624-
625-
// Replicate
629+
// Wait for all previous requests from this producer to be processed
630+
co_await ticket.redeem();
631+
// Replicate now that our ticket is redeemed
626632
op->opts = update_replicate_options(op->opts, fence.term);
627633
auto replicate_stages = partition->replicate_in_stages(
628634
op->batch_id, std::move(placeholders.batches.front()), op->opts);
629635

630636
fallback.cancel();
631637

632-
// Forward future result to the 'op'. The expectation is that at this point
633-
// the target promises (inside 'op') are used to generate futures and these
634-
// futures are awaited.
635-
replicate_stages.request_enqueued.forward_to(
636-
std::move(op->request_enqueued));
638+
// Once the request is enqueued in raft and our order is guaranteed we can
639+
// release our ticket and further requests can be enqueued into the raft
640+
// layer.
641+
ssx::background = replicate_stages.request_enqueued.then_wrapped(
642+
[t = std::move(ticket)](ss::future<> fut) mutable {
643+
t.release();
644+
fut.ignore_ready_future();
645+
});
637646

638647
auto replicate_fut
639648
= std::move(replicate_stages.replicate_finished)

0 commit comments

Comments
 (0)