Skip to content

Commit 0255b8b

Browse files
authored
Merge pull request #29112 from joe-redpanda/dc_fix_err_log
invalid lso fix
2 parents 3b3976e + faf1868 commit 0255b8b

File tree

9 files changed

+12
-14
lines changed

9 files changed

+12
-14
lines changed

src/v/cluster/rm_stm.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1339,7 +1339,7 @@ model::offset rm_stm::last_stable_offset() {
13391339
}
13401340

13411341
auto synced_leader = _raft->is_leader() && _raft->term() == _insync_term;
1342-
model::offset lso{-1};
1342+
model::offset lso{model::invalid_lso};
13431343
auto last_visible_index = _raft->last_visible_index();
13441344
auto next_to_apply = model::next_offset(last_applied);
13451345
if (first_tx_start <= last_visible_index) {
@@ -1464,7 +1464,7 @@ ss::future<bool> rm_stm::sync(model::timeout_clock::duration timeout) {
14641464
auto ready = co_await raft::persisted_stm<>::sync(timeout);
14651465
if (ready) {
14661466
if (current_insync_term != _insync_term) {
1467-
_last_known_lso = model::offset{-1};
1467+
_last_known_lso = model::invalid_lso;
14681468
vlog(
14691469
_ctx_log.trace,
14701470
"garbage collecting requests from terms < {}",

src/v/cluster/rm_stm.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,7 @@ class rm_stm final : public raft::persisted_stm<> {
435435
// Highest producer ID applied to this stm.
436436
model::producer_id _highest_producer_id;
437437
// for monotonicity of computed LSO.
438-
model::offset _last_known_lso{-1};
438+
model::offset _last_known_lso{model::invalid_lso};
439439
/**
440440
* LSO lock protects the LSO from being exposed before transaction begin
441441
* batch is applied.

src/v/cluster_link/model/types.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1109,7 +1109,7 @@ struct shadow_topic_partition_leader_report
11091109
::model::partition_id partition;
11101110
kafka::offset source_partition_start_offset{-1};
11111111
kafka::offset source_partition_high_watermark{-1};
1112-
kafka::offset source_partition_last_stable_offset{-1};
1112+
kafka::offset source_partition_last_stable_offset{::model::invalid_lso};
11131113
std::chrono::milliseconds last_update_time{0};
11141114
kafka::offset shadow_partition_high_watermark{-1};
11151115

src/v/cluster_link/replication/types.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ namespace cluster_link::replication {
2222
struct partition_offsets_report {
2323
kafka::offset source_start_offset{-1};
2424
kafka::offset source_hwm{-1};
25-
kafka::offset source_lso{-1};
25+
kafka::offset source_lso{::model::invalid_lso};
2626
ss::lowres_clock::time_point update_time{};
2727
kafka::offset shadow_hwm{-1};
2828

src/v/kafka/client/direct_consumer/api_types.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ struct fetched_partition_data {
6464
kafka::leader_epoch leader_epoch{-1};
6565
kafka::offset start_offset{-1};
6666
kafka::offset high_watermark{-1};
67-
kafka::offset last_stable_offset{-1};
67+
kafka::offset last_stable_offset{model::invalid_lso};
6868

6969
// the following have reasonable defaults
7070
chunked_vector<model::record_batch> data{};
@@ -92,7 +92,7 @@ struct source_partition_offsets {
9292
// The source partition's log high watermark
9393
kafka::offset high_watermark{-1};
9494
// The source partition's log last stable offset
95-
kafka::offset last_stable_offset{-1};
95+
kafka::offset last_stable_offset{model::offset_cast(model::invalid_lso)};
9696
// The timestamp that the fetch response was received by the client
9797
ss::lowres_clock::time_point last_offset_update_timestamp{};
9898

src/v/kafka/client/fetcher.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ make_fetch_response(const model::topic_partition& tp, std::exception_ptr ex) {
7272
.partition_index{tp.partition},
7373
.error_code = error,
7474
.high_watermark{model::offset{-1}},
75-
.last_stable_offset{model::offset{-1}},
75+
.last_stable_offset{model::invalid_lso},
7676
.log_start_offset{model::offset{-1}},
7777
.aborted_transactions{},
7878
.records{}};

src/v/kafka/server/fetch_session.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ struct fetch_session_partition {
4242
, start_offset(p.log_start_offset)
4343
, fetch_offset(p.fetch_offset)
4444
, high_watermark(model::offset(-1))
45-
, last_stable_offset(model::offset(-1))
45+
, last_stable_offset(model::invalid_lso)
4646
, current_leader_epoch(p.current_leader_epoch) {}
4747
};
4848
/**

src/v/kafka/server/handlers/fetch.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -275,13 +275,13 @@ struct read_result {
275275
explicit read_result(error_code e)
276276
: start_offset(-1)
277277
, high_watermark(-1)
278-
, last_stable_offset(-1)
278+
, last_stable_offset(model::invalid_lso)
279279
, error(e) {}
280280

281281
read_result(error_code e, leader_id_and_epoch leader)
282282
: start_offset(-1)
283283
, high_watermark(-1)
284-
, last_stable_offset(-1)
284+
, last_stable_offset(model::invalid_lso)
285285
, current_leader(std::move(leader))
286286
, error(e) {}
287287

src/v/model/fundamental.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -306,9 +306,7 @@ inline constexpr model::offset prev_offset(model::offset o) {
306306
return o - model::offset{1};
307307
}
308308

309-
// An invalid offset indicating that actual LSO is not yet ready to be returned.
310-
// Follows the policy that LSO is the next offset of the decided offset.
311-
inline constexpr model::offset invalid_lso{next_offset(model::offset::min())};
309+
inline constexpr model::offset invalid_lso{-1};
312310

313311
struct topic_partition_view {
314312
topic_partition_view(model::topic_view tp, model::partition_id p)

0 commit comments

Comments
 (0)