@@ -32,6 +32,7 @@ level_zero_log_reader_impl::level_zero_log_reader_impl(
3232 ss::lw_shared_ptr<cluster::partition> ctp,
3333 data_plane_api* ct_api)
3434 : _config(cfg)
35+ , _next_offset(_config.start_offset)
3536 , _ctp(std::move(ctp))
3637 , _ct_api(ct_api)
3738 , _log(cd_log, fmt::format(" [{}/{}]" , fmt::ptr(this ), _ctp->ntp ())) {
@@ -52,10 +53,8 @@ level_zero_log_reader_impl::do_load_slice(
5253 // the 'empty' state. It doesn't make any difference if the reader is in
5354 // the 'materialized' state. If we're in 'ready' state we risk to go out
5455 // of sync with cached metadata so it's safer to hydrate.
55- if (cache_enabled ()) {
56- if (auto cached = maybe_load_slices_from_cache ()) {
57- co_return std::move (cached.value ());
58- }
56+ if (auto cached = maybe_read_batches_from_cache (); !cached.empty ()) {
57+ co_return cached;
5958 }
6059
6160 chunked_circular_buffer<model::record_batch> res;
@@ -88,63 +87,54 @@ level_zero_log_reader_impl::do_load_slice(
8887 co_return res;
8988}
9089
91- std::optional< chunked_circular_buffer<model::record_batch> >
92- level_zero_log_reader_impl::maybe_load_slices_from_cache () {
90+ chunked_circular_buffer<model::record_batch>
91+ level_zero_log_reader_impl::maybe_read_batches_from_cache () {
9392 chunked_circular_buffer<model::record_batch> ret;
94- auto current = _config.start_offset ;
95- while (current <= _config.max_offset ) {
93+ if (!cache_enabled ()) {
94+ return ret;
95+ }
96+
97+ /*
98+ * Fetch batches from the cache starting at `_next_offset` until we hit a
99+ * gap or a control batch and must then fetch the data from object storage.
100+ */
101+ while (_next_offset <= _config.max_offset ) {
96102 auto batch = _ct_api->cache_get (
97- _ctp->ntp (), kafka::offset_cast (current ));
103+ _ctp->ntp (), kafka::offset_cast (_next_offset ));
98104 if (!batch.has_value ()) {
99- // We hit a gap in the cache and have to download objects
100- // from S3.
101- //
102- // NOTE: this can also happen when transactions are used and
103- // we would encounter reading a control batch from the local log.
104105 break ;
105106 }
107+
106108 vlog (
107109 _log.trace ,
108110 " Loaded batch from cache for {}: {} @ term {}" ,
109- current ,
111+ _next_offset ,
110112 batch.value ().base_offset (),
111113 batch.value ().term ());
112- vassert (
113- batch.value ().term () > model::term_id{-1 },
114- " Batch without term in the cache: {}" ,
115- batch.value ().header ());
114+
116115 auto batch_size = batch.value ().size_bytes ();
117116 if (is_over_limit (batch_size)) {
118117 break ;
119118 }
120- vassert (
121- batch->base_offset () <= kafka::offset_cast (current)
122- && kafka::offset_cast (current) <= batch->last_offset (),
123- " Unexpected batch for {}, got range: [{},{}] for offset {}" ,
124- _ctp->ntp (),
125- batch->base_offset (),
126- batch->last_offset (),
127- current);
119+
128120 ret.push_back (std::move (batch.value ()));
129- _config. bytes_consumed += batch_size;
130- current = model::offset_cast (
121+ _bytes_consumed += batch_size;
122+ _next_offset = model::offset_cast (
131123 model::next_offset (ret.back ().last_offset ()));
132124 }
133- _config. start_offset = current;
134- if (_config. start_offset > _config.max_offset ) {
125+
126+ if (_next_offset > _config.max_offset ) {
135127 vlog (
136128 _log.debug ,
137129 " reached end of stream, start offset: {}, max offset: {}, "
138- " current : {}" ,
130+ " next offset : {}" ,
139131 _config.start_offset ,
140132 _config.max_offset ,
141- current );
133+ _next_offset );
142134 _current = state::end_of_stream_state;
143135 }
144- if (!ret.empty ()) {
145- return ret;
146- }
147- return std::nullopt ;
136+
137+ return ret;
148138}
149139
150140storage::local_log_reader_config level_zero_log_reader_impl::ctp_read_config () {
@@ -155,7 +145,7 @@ storage::local_log_reader_config level_zero_log_reader_impl::ctp_read_config() {
155145 */
156146 auto ot_state = _ctp->get_offset_translator_state ();
157147 auto start_offset = ot_state->to_log_offset (
158- kafka::offset_cast (_config. start_offset ));
148+ kafka::offset_cast (_next_offset ));
159149 auto max_offset = ot_state->to_log_offset (
160150 kafka::offset_cast (_config.max_offset ));
161151
@@ -324,7 +314,7 @@ ss::future<> level_zero_log_reader_impl::materialize_batches(
324314 hydrated_batch_size);
325315 break ;
326316 }
327- _config. bytes_consumed += hydrated_batch_size;
317+ _bytes_consumed += hydrated_batch_size;
328318 if (
329319 auto * meta = std::get_if<cloud_topics::extent_meta>(
330320 &unhydrated_it->data )) {
@@ -453,7 +443,7 @@ void level_zero_log_reader_impl::consume_materialized_batches(
453443 _hydrated.size (),
454444 _unhydrated.size ());
455445 *dest = std::exchange (_hydrated, {});
456- _config. start_offset = model::offset_cast (
446+ _next_offset = model::offset_cast (
457447 model::next_offset (dest->back ().last_offset ()));
458448 _current = _unhydrated.empty () ? state::empty_state : state::ready_state;
459449}
@@ -467,7 +457,7 @@ bool level_zero_log_reader_impl::is_end_of_stream() const {
467457}
468458
469459bool level_zero_log_reader_impl::is_over_limit (size_t size) const {
470- return (_config.strict_max_bytes || _config. bytes_consumed > 0 )
471- && (_config. bytes_consumed + size) > _config.max_bytes ;
460+ return (_config.strict_max_bytes || _bytes_consumed > 0 )
461+ && (_bytes_consumed + size) > _config.max_bytes ;
472462}
473463} // namespace cloud_topics
0 commit comments