1515#include " cloud_topics/level_one/frontend_reader/level_one_reader.h"
1616#include " cloud_topics/level_one/metastore/metastore.h"
1717#include " cloud_topics/level_zero/common/extent_meta.h"
18+ #include " cloud_topics/level_zero/common/producer_queue.h"
1819#include " cloud_topics/level_zero/frontend_reader/level_zero_reader.h"
1920#include " cloud_topics/level_zero/stm/ctp_stm.h"
2021#include " cloud_topics/level_zero/stm/placeholder.h"
@@ -529,32 +530,17 @@ struct upload_and_replicate_stages {
529530 , batch_id(batch_id)
530531 , opts(opts)
531532 , timeout(timeout) {}
532-
533- ss::promise<> request_enqueued;
534- ss::promise<result<raft::replicate_result>> replicate_finished;
535533};
536534
537- ss::future<> bg_upload_and_replicate (
535+ ss::future<result<raft::replicate_result>> do_upload_and_replicate (
538536 data_plane_api* api,
539537 ss::lw_shared_ptr<cluster::partition> partition,
538+ l0::producer_ticket ticket,
540539 model::record_batch_header header,
541540 ss::lw_shared_ptr<upload_and_replicate_stages> op,
542541 bool cache_enabled) {
543- vassert (api != nullptr , " cloud topics api is not initialized" );
544-
545- auto ticket = op->ctp_stm_api ->producer_queue ().reserve (
546- op->batch_id .pid .get_id ());
547- // Now that we've acquired our ticket and determined our ordering, we can
548- // say that the request is enqueued and get more produce requests.
549- op->request_enqueued .set_value ();
550-
551- auto fallback = ss::defer ([op] {
552- // This guarantees that the promises are set.
553- // The error code used here does not represent the
554- // actual error.
555- op->replicate_finished .set_value (raft::errc::timeout);
556- });
557-
542+ // The default errc that will cause the client to retry to operation
543+ constexpr auto default_errc = raft::errc::timeout;
558544 /*
559545 * L0 GC relies on a minimum epoch associated with each NTP for calculating
560546 * the name of an L0 object. The minimum is based on the topic revision, but
@@ -568,59 +554,66 @@ ss::future<> bg_upload_and_replicate(
568554 " Unexpected invalid min epoch {} for {}" ,
569555 min_epoch,
570556 op->ntp );
571-
572557 chunked_vector<model::record_batch> rb_copy;
573558 if (cache_enabled) {
574559 rb_copy = clone_batches (op->batches );
575560 }
576561
577562 auto timeout = op->timeout == 0ms ? L0_upload_default_timeout : op->timeout ;
578- auto res = co_await api->write_and_debounce (
563+ auto upload_fut = co_await ss::coroutine::as_future ( api->write_and_debounce (
579564 op->ntp ,
580565 min_epoch,
581566 std::move (op->batches ),
582- model::timeout_clock::now () + timeout);
567+ model::timeout_clock::now () + timeout)) ;
583568
584- if (res.has_error ()) {
569+ if (upload_fut.failed ()) {
570+ auto ex = upload_fut.get_exception ();
571+ vlog (cd_log.debug , " LO object upload has failed: {}" , ex);
572+ co_return default_errc;
573+ }
574+
575+ auto upload_res = upload_fut.get ();
576+
577+ if (upload_res.has_error ()) {
585578 vlog (
586579 cd_log.debug ,
587- " LO object upload has failed : {}" ,
588- res .error ().message ());
589- co_return ;
580+ " LO object upload has errored : {}" ,
581+ upload_res .error ().message ());
582+ co_return default_errc ;
590583 }
591-
592- if (res.value ().empty ()) {
584+ if (upload_res.value ().empty ()) {
593585 vlog (
594586 cd_log.warn ,
595587 " LO object upload returned empty result, nothing to replicate" );
596- co_return ;
588+ co_return default_errc ;
597589 }
598590
599591 auto fence_fut = co_await ss::coroutine::as_future (
600- op->ctp_stm_api ->fence_epoch (res .value ().front ().id .epoch ));
592+ op->ctp_stm_api ->fence_epoch (upload_res .value ().front ().id .epoch ));
601593 if (fence_fut.failed ()) {
602594 auto e = fence_fut.get_exception ();
603595 vlog (
604596 cd_log.warn ,
605597 " Failed to fence epoch {} for ntp {}, error: {}" ,
606- res .value ().front ().id .epoch ,
598+ upload_res .value ().front ().id .epoch ,
607599 op->ntp ,
608600 e);
609- co_return ;
601+ co_return default_errc ;
610602 }
611603 auto fence = std::move (fence_fut.get ());
612604 if (!fence.unit .has_value ()) {
613605 vlog (
614606 cd_log.warn ,
615607 " Failed to fence epoch {} for ntp {}, fence unit is empty" ,
616- res .value ().front ().id .epoch ,
608+ upload_res .value ().front ().id .epoch ,
617609 op->ntp );
618- co_return ;
610+ co_return default_errc ;
619611 }
620612
621613 chunked_vector<model::record_batch_header> headers;
622614 headers.push_back (header);
623- auto placeholders = co_await convert_to_placeholders (res.value (), headers);
615+ auto placeholders = co_await convert_to_placeholders (
616+ upload_res.value (), headers);
624617
625618 vassert (
626619 placeholders.batches .size () == 1 ,
@@ -632,66 +625,61 @@ ss::future<> bg_upload_and_replicate(
632625 op->opts = update_replicate_options (op->opts , fence.term );
633626 auto replicate_stages = partition->replicate_in_stages (
634627 op->batch_id , std::move (placeholders.batches .front ()), op->opts );
635-
636- fallback.cancel ();
637-
638628 // Once the request is enqueued in raft and our order is guaranteed we can
639629 // release our ticket and further requests can be enqueued into the raft
640630 // layer.
641- ssx::background = replicate_stages.request_enqueued .then_wrapped (
642- [t = std::move (ticket)](ss::future<> fut) mutable {
643- t.release ();
644- fut.ignore_ready_future ();
645- });
631+ auto enqueued_fut = co_await ss::coroutine::as_future (
632+ std::move (replicate_stages.request_enqueued ));
633+
634+ ticket.release (); // always release the ticket
646635
647- auto replicate_fut
648- = std::move (replicate_stages.replicate_finished )
649- .then (
650- [api,
651- cache_enabled,
652- inp = std::move (rb_copy),
653- ntp = partition->ntp (),
654- fence_unit = std::move (fence.unit )](
655- result<cluster::kafka_result> res) mutable
656- -> result<raft::replicate_result> {
657- if (res.has_error ()) {
658- return res.error ();
659- }
660- if (cache_enabled) {
661- // The term_id is not guaranteed to be set if the request
662- // was served from the list of finished requests. This might
663- // happen if the request is coming from the snapshot (in
664- // which case it's not stored) or from the log replay. The
665- // simplest solution in this case is to skip caching.
666- if (res.value ().last_term >= model::term_id{0 }) {
667- update_batches (
668- inp,
669- kafka::offset_cast (res.value ().last_offset ),
670- res.value ().last_term );
671- for (const auto & b : inp) {
672- vlog (
673- cd_log.trace ,
674- " Putting batch to cache: {}, term: {}" ,
675- b.base_offset (),
676- b.term ());
677- api->cache_put (ntp, b);
678- }
679- } else {
680- vlog (
681- cd_log.debug ,
682- " Skipping cache put for ntp {} at offset {} with "
683- " unset term" ,
684- ntp,
685- res.value ().last_offset );
686- }
687- }
688- return raft::replicate_result{
689- .last_offset = kafka::offset_cast (res.value ().last_offset ),
690- .last_term = res.value ().last_term ,
691- };
692- });
693-
694- replicate_fut.forward_to (std::move (op->replicate_finished ));
636+ if (enqueued_fut.failed ()) {
637+ auto ex = enqueued_fut.get_exception ();
638+ vlog (
639+ cd_log.trace ,
640+ " failed to enqueue replicate request into raft ({}): {}" ,
641+ op->ntp ,
642+ ex);
643+ // fallthrough - we expect the finish command to throw if this one did
644+ // and we don't want to abandon the replicate_finished future
645+ }
646+
647+ auto res = co_await std::move (replicate_stages.replicate_finished );
648+ if (res.has_error ()) {
649+ co_return res.error ();
650+ }
651+ if (cache_enabled) {
652+ // The term_id is not guaranteed to be set if the request
653+ // was served from the list of finished requests. This might
654+ // happen if the request is coming from the snapshot (in
655+ // which case it's not stored) or from the log replay. The
656+ // simplest solution in this case is to skip caching.
657+ if (res.value ().last_term >= model::term_id{0 }) {
658+ update_batches (
659+ rb_copy,
660+ kafka::offset_cast (res.value ().last_offset ),
661+ res.value ().last_term );
662+ for (const auto & b : rb_copy) {
663+ vlog (
664+ cd_log.trace ,
665+ " Putting batch to cache: {}, term: {}" ,
666+ b.base_offset (),
667+ b.term ());
668+ api->cache_put (op->ntp , b);
669+ }
670+ } else {
671+ vlog (
672+ cd_log.debug ,
673+ " Skipping cache put for ntp {} at offset {} with "
674+ " unset term" ,
675+ op->ntp ,
676+ res.value ().last_offset );
677+ }
678+ }
679+ co_return raft::replicate_result{
680+ .last_offset = kafka::offset_cast (res.value ().last_offset ),
681+ .last_term = res.value ().last_term ,
682+ };
695683}
696684} // namespace
697685
@@ -805,10 +793,18 @@ raft::replicate_stages frontend::replicate(
805793 opts.timeout .value_or (L0_replicate_default_timeout));
806794
807795 raft::replicate_stages out (raft::errc::success);
808- out.request_enqueued = op_state->request_enqueued .get_future ();
809- out.replicate_finished = op_state->replicate_finished .get_future ();
810- ssx::background = bg_upload_and_replicate (
811- _data_plane, _partition, header, op_state, cache_enabled ());
796+ auto ticket = op_state->ctp_stm_api ->producer_queue ().reserve (
797+ batch_id.pid .get_id ());
798+ // Now that we've acquired our ticket and determined our ordering, we can
799+ // say that the request is enqueued and get more produce requests.
800+ out.request_enqueued = ss::now ();
801+ out.replicate_finished = do_upload_and_replicate (
802+ _data_plane,
803+ _partition,
804+ std::move (ticket),
805+ header,
806+ op_state,
807+ cache_enabled ());
812808 return out;
813809}
814810
0 commit comments