Skip to content
31 changes: 28 additions & 3 deletions src/v/cloud_topics/data_plane_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,27 @@
#include <seastar/core/future.hh>
#include <seastar/core/lowres_clock.hh>

#include <expected>
#include <system_error>

namespace cloud_topics {

// staged_write is a write operation that has been reserved in the pipeline.
// it is decoupled from uploading so that we can provide backpressure before
// accepting more batches into the pipeline
struct staged_write {
struct batch_data {
batch_data() = default;
batch_data(const batch_data&) = default;
batch_data(batch_data&&) = delete;
batch_data& operator=(const batch_data&) = default;
batch_data& operator=(batch_data&&) = delete;
virtual ~batch_data() = default;
};

std::unique_ptr<batch_data> staged;
};

/// Dataplane API
class data_plane_api {
public:
Expand All @@ -37,11 +56,17 @@ class data_plane_api {
virtual ss::future<> start() = 0;
virtual ss::future<> stop() = 0;

/// Write data batches and get back placeholder batches
virtual ss::future<result<chunked_vector<extent_meta>>> write_and_debounce(
// Reserve the space needed for this write.
virtual ss::future<std::expected<staged_write, std::error_code>>
stage_write(chunked_vector<model::record_batch> batches) = 0;

// Execute this write using the reservation.
virtual ss::future<
std::expected<chunked_vector<extent_meta>, std::error_code>>
execute_write(
model::ntp ntp,
cluster_epoch min_epoch,
chunked_vector<model::record_batch> batches,
staged_write reservation,
model::timeout_clock::time_point deadline)
= 0;

Expand Down
33 changes: 24 additions & 9 deletions src/v/cloud_topics/data_plane_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@

namespace cloud_topics {

struct staged_pipeline_write : public staged_write::batch_data {
l0::write_pipeline<>::prepared_data data;
};

class impl
: public data_plane_api
, public ssx::sharded_service_container {
Expand Down Expand Up @@ -132,17 +136,28 @@ class impl
co_return;
}

ss::future<result<chunked_vector<extent_meta>>> write_and_debounce(
ss::future<std::expected<staged_write, std::error_code>>
stage_write(chunked_vector<model::record_batch> batches) override {
auto reservation = co_await _write_pipeline.local().prepare_write(
std::move(batches));
if (!reservation.has_value()) {
co_return std::unexpected(reservation.error());
}
auto staged = std::make_unique<staged_pipeline_write>();
staged->data = std::move(reservation.value());
co_return staged_write{.staged = std::move(staged)};
}

ss::future<std::expected<chunked_vector<extent_meta>, std::error_code>>
execute_write(
model::ntp ntp,
cluster_epoch min_epoch,
chunked_vector<model::record_batch> r,
model::timeout_clock::time_point timeout) override {
auto res = co_await _write_pipeline.local().write_and_debounce(
std::move(ntp), min_epoch, std::move(r), timeout);
if (res.has_value()) {
co_return std::move(res.value());
}
co_return res.error();
staged_write reservation,
model::timeout_clock::time_point deadline) override {
auto staged = std::unique_ptr<staged_pipeline_write>(
static_cast<staged_pipeline_write*>(reservation.staged.release()));
co_return co_await _write_pipeline.local().execute_write(
std::move(ntp), min_epoch, std::move(staged->data), deadline);
}

ss::future<result<chunked_vector<model::record_batch>>> materialize(
Expand Down
1 change: 1 addition & 0 deletions src/v/cloud_topics/frontend/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ redpanda_cc_library(
"//src/v/cloud_storage:types",
"//src/v/cloud_topics:logger",
"//src/v/cloud_topics/level_one/metastore",
"//src/v/cloud_topics/level_zero/common:producer_queue",
"//src/v/cloud_topics/level_zero/stm:ctp_stm",
"//src/v/cloud_topics/level_zero/stm:placeholder",
"//src/v/cluster",
Expand Down
Loading