Skip to content

Commit 6de9b4f

Browse files
committed
attempt to fix inconsistency
1 parent 83940f2 commit 6de9b4f

File tree

11 files changed

+276
-326
lines changed

11 files changed

+276
-326
lines changed

src/iceberg/snapshot.cc

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,19 @@ SnapshotRefType SnapshotRef::type() const noexcept {
5252
retention);
5353
}
5454

55+
std::optional<int64_t> SnapshotRef::max_ref_age_ms() const noexcept {
56+
return std::visit(
57+
[&](const auto& retention) -> std::optional<int64_t> {
58+
using T = std::remove_cvref_t<decltype(retention)>;
59+
if constexpr (std::is_same_v<T, Branch>) {
60+
return retention.max_ref_age_ms;
61+
} else {
62+
return retention.max_ref_age_ms;
63+
}
64+
},
65+
retention);
66+
}
67+
5568
Status SnapshotRef::Validate() const {
5669
if (type() == SnapshotRefType::kBranch) {
5770
const auto& branch = std::get<Branch>(this->retention);

src/iceberg/snapshot.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ struct ICEBERG_EXPORT SnapshotRef {
113113

114114
SnapshotRefType type() const noexcept;
115115

116+
std::optional<int64_t> max_ref_age_ms() const noexcept;
117+
116118
/// \brief Create a branch reference
117119
///
118120
/// \param snapshot_id The snapshot ID for the branch

src/iceberg/table_metadata.cc

Lines changed: 25 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -617,7 +617,6 @@ class TableMetadataBuilder::Impl {
617617
Status SetBranchSnapshot(int64_t snapshot_id, const std::string& branch);
618618
Status SetBranchSnapshot(std::shared_ptr<Snapshot> snapshot, const std::string& branch);
619619
Status SetRef(const std::string& name, std::shared_ptr<SnapshotRef> ref);
620-
621620
Status RemoveRef(const std::string& name);
622621
Status RemoveSnapshots(const std::vector<int64_t>& snapshot_ids);
623622
Status RemovePartitionSpecs(const std::vector<int32_t>& spec_ids);
@@ -1339,15 +1338,11 @@ int32_t TableMetadataBuilder::Impl::ReuseOrCreateNewSchemaId(
13391338
}
13401339

13411340
Status TableMetadataBuilder::Impl::RemoveRef(const std::string& name) {
1342-
// Handle main branch specially
13431341
if (name == SnapshotRef::kMainBranch) {
13441342
metadata_.current_snapshot_id = kInvalidSnapshotId;
13451343
}
13461344

1347-
// Remove the ref from the map
1348-
auto it = metadata_.refs.find(name);
1349-
if (it != metadata_.refs.end()) {
1350-
metadata_.refs.erase(it);
1345+
if (metadata_.refs.erase(name) != 0) {
13511346
changes_.push_back(std::make_unique<table::RemoveSnapshotRef>(name));
13521347
}
13531348

@@ -1360,47 +1355,39 @@ Status TableMetadataBuilder::Impl::RemoveSnapshots(
13601355
return {};
13611356
}
13621357

1363-
std::unordered_set<int64_t> snapshot_ids_set(snapshot_ids.begin(), snapshot_ids.end());
1364-
1365-
// Build a map of snapshot IDs for quick lookup
1366-
std::unordered_map<int64_t, std::shared_ptr<Snapshot>> snapshots_by_id;
1367-
for (const auto& snapshot : metadata_.snapshots) {
1368-
if (snapshot) {
1369-
snapshots_by_id[snapshot->snapshot_id] = snapshot;
1370-
}
1371-
}
1372-
1373-
// Filter snapshots to retain
1358+
std::unordered_set<int64_t> ids_to_remove(snapshot_ids.begin(), snapshot_ids.end());
13741359
std::vector<std::shared_ptr<Snapshot>> retained_snapshots;
1375-
retained_snapshots.reserve(metadata_.snapshots.size());
1376-
1377-
for (const auto& snapshot : metadata_.snapshots) {
1378-
if (!snapshot) continue;
1379-
1380-
int64_t snapshot_id = snapshot->snapshot_id;
1381-
if (snapshot_ids_set.contains(snapshot_id)) {
1382-
// Remove from the map
1383-
snapshots_by_id.erase(snapshot_id);
1384-
// Record the removal
1385-
changes_.push_back(
1386-
std::make_unique<table::RemoveSnapshots>(std::vector<int64_t>{snapshot_id}));
1387-
// Note: Statistics and partition statistics removal would be handled here
1388-
// if those features were implemented
1360+
retained_snapshots.reserve(metadata_.snapshots.size() - snapshot_ids.size());
1361+
std::vector<int64_t> snapshot_ids_to_remove;
1362+
snapshot_ids_to_remove.reserve(snapshot_ids.size());
1363+
1364+
for (auto& snapshot : metadata_.snapshots) {
1365+
ICEBERG_CHECK(snapshot != nullptr, "Encountered null snapshot in metadata");
1366+
const int64_t snapshot_id = snapshot->snapshot_id;
1367+
if (ids_to_remove.contains(snapshot_id)) {
1368+
snapshots_by_id_.erase(snapshot_id);
1369+
snapshot_ids_to_remove.push_back(snapshot_id);
1370+
// FIXME: implement statistics removal and uncomment below
1371+
// ICEBERG_RETURN_UNEXPECTED(RemoveStatistics(snapshot_id));
1372+
// ICEBERG_RETURN_UNEXPECTED(RemovePartitionStatistics(snapshot_id));
13891373
} else {
1390-
retained_snapshots.push_back(snapshot);
1374+
retained_snapshots.push_back(std::move(snapshot));
13911375
}
13921376
}
13931377

1378+
if (!snapshot_ids_to_remove.empty()) {
1379+
changes_.push_back(std::make_unique<table::RemoveSnapshots>(snapshot_ids_to_remove));
1380+
}
1381+
13941382
metadata_.snapshots = std::move(retained_snapshots);
13951383

13961384
// Remove any refs that are no longer valid (dangling refs)
13971385
std::vector<std::string> dangling_refs;
13981386
for (const auto& [ref_name, ref] : metadata_.refs) {
1399-
if (!snapshots_by_id.contains(ref->snapshot_id)) {
1387+
if (!snapshots_by_id_.contains(ref->snapshot_id)) {
14001388
dangling_refs.push_back(ref_name);
14011389
}
14021390
}
1403-
14041391
for (const auto& ref_name : dangling_refs) {
14051392
ICEBERG_RETURN_UNEXPECTED(RemoveRef(ref_name));
14061393
}
@@ -1414,25 +1401,15 @@ Status TableMetadataBuilder::Impl::RemovePartitionSpecs(
14141401
return {};
14151402
}
14161403

1417-
std::unordered_set<int32_t> spec_ids_set(spec_ids.begin(), spec_ids.end());
1418-
1419-
// Validate that we're not removing the default spec
1420-
ICEBERG_PRECHECK(!spec_ids_set.contains(metadata_.default_spec_id),
1404+
std::unordered_set<int32_t> spec_ids_to_remove(spec_ids.begin(), spec_ids.end());
1405+
ICEBERG_PRECHECK(!spec_ids_to_remove.contains(metadata_.default_spec_id),
14211406
"Cannot remove the default partition spec");
14221407

1423-
// Filter partition specs to retain
14241408
metadata_.partition_specs =
14251409
metadata_.partition_specs | std::views::filter([&](const auto& spec) {
1426-
return !spec_ids_set.contains(spec->spec_id());
1410+
return !spec_ids_to_remove.contains(spec->spec_id());
14271411
}) |
1428-
std::ranges::to<std::vector<std::shared_ptr<iceberg::PartitionSpec>>>();
1429-
1430-
// Update the specs_by_id_ index
1431-
for (int32_t spec_id : spec_ids) {
1432-
specs_by_id_.erase(spec_id);
1433-
}
1434-
1435-
// Record the change
1412+
std::ranges::to<std::vector<std::shared_ptr<PartitionSpec>>>();
14361413
changes_.push_back(std::make_unique<table::RemovePartitionSpecs>(spec_ids));
14371414

14381415
return {};

src/iceberg/test/expire_snapshots_test.cc

Lines changed: 21 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -24,71 +24,43 @@
2424

2525
namespace iceberg {
2626

27-
class ExpireSnapshotsTest : public UpdateTestBase {
28-
protected:
29-
};
27+
class ExpireSnapshotsTest : public UpdateTestBase {};
3028

31-
TEST_F(ExpireSnapshotsTest, Empty) {
29+
TEST_F(ExpireSnapshotsTest, DefaultExpireByAge) {
3230
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
3331
ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
34-
EXPECT_THAT(result.snapshot_ids_to_remove.size(), 1);
35-
EXPECT_THAT(result.snapshot_ids_to_remove.at(0), 3051729675574597004);
36-
EXPECT_THAT(result.ref_to_remove.empty(), true);
37-
EXPECT_THAT(result.schema_ids_to_remove.empty(), true);
38-
EXPECT_THAT(result.partition_spec_ids_to_remove.empty(), true);
32+
EXPECT_EQ(result.snapshot_ids_to_remove.size(), 1);
33+
EXPECT_EQ(result.snapshot_ids_to_remove.at(0), 3051729675574597004);
3934
}
4035

41-
TEST_F(ExpireSnapshotsTest, Keep2) {
36+
TEST_F(ExpireSnapshotsTest, KeepAll) {
4237
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
4338
update->RetainLast(2);
4439
ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
45-
EXPECT_THAT(result.snapshot_ids_to_remove.empty(), true);
46-
EXPECT_THAT(result.ref_to_remove.empty(), true);
47-
EXPECT_THAT(result.schema_ids_to_remove.empty(), true);
48-
EXPECT_THAT(result.partition_spec_ids_to_remove.empty(), true);
40+
EXPECT_TRUE(result.snapshot_ids_to_remove.empty());
41+
EXPECT_TRUE(result.refs_to_remove.empty());
4942
}
5043

5144
TEST_F(ExpireSnapshotsTest, ExpireById) {
5245
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
5346
update->ExpireSnapshotId(3051729675574597004);
54-
update->RetainLast(2);
5547
ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
56-
EXPECT_THAT(result.snapshot_ids_to_remove.size(), 1);
57-
EXPECT_THAT(result.snapshot_ids_to_remove.at(0), 3051729675574597004);
58-
EXPECT_THAT(result.ref_to_remove.empty(), true);
59-
EXPECT_THAT(result.schema_ids_to_remove.empty(), true);
60-
EXPECT_THAT(result.partition_spec_ids_to_remove.empty(), true);
61-
}
62-
63-
TEST_F(ExpireSnapshotsTest, ExpireByIdNotExist) {
64-
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
65-
update->ExpireSnapshotId(3055729675574597003);
66-
update->RetainLast(2);
67-
auto result = update->Apply();
68-
EXPECT_THAT(result.has_value(), false);
69-
EXPECT_THAT(result.error().message.contains("Snapshot:3055729675574597003 not exist"),
70-
true);
48+
EXPECT_EQ(result.snapshot_ids_to_remove.size(), 1);
49+
EXPECT_EQ(result.snapshot_ids_to_remove.at(0), 3051729675574597004);
7150
}
7251

73-
TEST_F(ExpireSnapshotsTest, ExpireOlderThan1) {
74-
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
75-
update->ExpireOlderThan(1515100955770 - 1);
76-
ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
77-
EXPECT_THAT(result.snapshot_ids_to_remove.empty(), true);
78-
EXPECT_THAT(result.ref_to_remove.empty(), true);
79-
EXPECT_THAT(result.schema_ids_to_remove.empty(), true);
80-
EXPECT_THAT(result.partition_spec_ids_to_remove.empty(), true);
81-
}
82-
83-
TEST_F(ExpireSnapshotsTest, ExpireOlderThan2) {
84-
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
85-
update->ExpireOlderThan(1515100955770 + 1);
86-
ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
87-
EXPECT_THAT(result.snapshot_ids_to_remove.size(), 1);
88-
EXPECT_THAT(result.snapshot_ids_to_remove.at(0), 3051729675574597004);
89-
EXPECT_THAT(result.ref_to_remove.empty(), true);
90-
EXPECT_THAT(result.schema_ids_to_remove.empty(), true);
91-
EXPECT_THAT(result.partition_spec_ids_to_remove.empty(), true);
52+
TEST_F(ExpireSnapshotsTest, ExpireOlderThan) {
53+
struct TestCase {
54+
int64_t expire_older_than;
55+
size_t expected_num_expired;
56+
};
57+
std::vector<TestCase> test_cases = {{1515100955770 - 1, 0}, {1515100955770 + 1, 1}};
58+
for (const auto& test_case : test_cases) {
59+
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
60+
update->ExpireOlderThan(test_case.expire_older_than);
61+
ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
62+
EXPECT_EQ(result.snapshot_ids_to_remove.size(), test_case.expected_num_expired);
63+
}
9264
}
9365

9466
} // namespace iceberg

src/iceberg/test/table_metadata_builder_test.cc

Lines changed: 19 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1133,28 +1133,19 @@ TEST(TableMetadataBuilderTest, RemoveSchemasAfterSchemaChange) {
11331133
ASSERT_THAT(builder->Build(), HasErrorMessage("Cannot remove current schema: 1"));
11341134
}
11351135

1136-
// Test RemoveSnapshotRef
1137-
TEST(TableMetadataBuilderTest, RemoveSnapshotRefBasic) {
1136+
TEST(TableMetadataBuilderTest, RemoveSnapshotRef) {
11381137
auto base = CreateBaseMetadata();
11391138
auto builder = TableMetadataBuilder::BuildFrom(base.get());
11401139

11411140
// Add multiple snapshots
1142-
auto snapshot1 = std::make_shared<Snapshot>();
1143-
snapshot1->snapshot_id = 1;
1144-
builder->AddSnapshot(snapshot1);
1145-
auto snapshot2 = std::make_shared<Snapshot>();
1146-
snapshot2->snapshot_id = 2;
1147-
builder->AddSnapshot(snapshot2);
1141+
builder->AddSnapshot(std::make_shared<Snapshot>(Snapshot{.snapshot_id = 1}));
1142+
builder->AddSnapshot(std::make_shared<Snapshot>(Snapshot{.snapshot_id = 2}));
11481143

11491144
// Add multiple refs
1150-
auto ref1 = std::make_shared<SnapshotRef>();
1151-
ref1->snapshot_id = 1;
1152-
ref1->retention = SnapshotRef::Branch{};
1153-
builder->SetRef("ref1", ref1);
1154-
auto ref2 = std::make_shared<SnapshotRef>();
1155-
ref2->snapshot_id = 2;
1156-
ref2->retention = SnapshotRef::Tag{};
1157-
builder->SetRef("ref2", ref2);
1145+
ICEBERG_UNWRAP_OR_FAIL(auto ref1, SnapshotRef::MakeBranch(1));
1146+
ICEBERG_UNWRAP_OR_FAIL(auto ref2, SnapshotRef::MakeBranch(2));
1147+
builder->SetRef("ref1", std::move(ref1));
1148+
builder->SetRef("ref2", std::move(ref2));
11581149

11591150
ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build());
11601151
ASSERT_EQ(metadata->refs.size(), 2);
@@ -1167,68 +1158,50 @@ TEST(TableMetadataBuilderTest, RemoveSnapshotRefBasic) {
11671158
EXPECT_TRUE(metadata->refs.contains("ref1"));
11681159
}
11691160

1170-
// Test RemoveSnapshot
1171-
TEST(TableMetadataBuilderTest, RemoveSnapshotBasic) {
1161+
TEST(TableMetadataBuilderTest, RemoveSnapshot) {
11721162
auto base = CreateBaseMetadata();
11731163
auto builder = TableMetadataBuilder::BuildFrom(base.get());
11741164

11751165
// Add multiple snapshots
1176-
auto snapshot1 = std::make_shared<Snapshot>();
1177-
snapshot1->snapshot_id = 1;
1178-
builder->AddSnapshot(snapshot1);
1179-
auto snapshot2 = std::make_shared<Snapshot>();
1180-
snapshot2->snapshot_id = 2;
1181-
builder->AddSnapshot(snapshot2);
1166+
builder->AddSnapshot(std::make_shared<Snapshot>(Snapshot{.snapshot_id = 1}));
1167+
builder->AddSnapshot(std::make_shared<Snapshot>(Snapshot{.snapshot_id = 2}));
11821168

11831169
ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build());
11841170
ASSERT_EQ(metadata->snapshots.size(), 2);
11851171

11861172
// Remove one snapshot
11871173
builder = TableMetadataBuilder::BuildFrom(metadata.get());
1188-
std::vector<int64_t> to_remove{snapshot2->snapshot_id};
1174+
std::vector<int64_t> to_remove{2};
11891175
builder->RemoveSnapshots(to_remove);
11901176
ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build());
11911177
ASSERT_EQ(metadata->snapshots.size(), 1);
1192-
for (const auto& s : metadata->snapshots) {
1193-
std::cout << s->snapshot_id << std::endl;
1194-
}
1195-
EXPECT_TRUE(
1196-
std::ranges::find_if(metadata->snapshots, [&](const std::shared_ptr<Snapshot>& s) {
1197-
return s->snapshot_id == snapshot1->snapshot_id;
1198-
}) != metadata->snapshots.end());
1178+
ASSERT_THAT(metadata->SnapshotById(2), IsError(ErrorKind::kNotFound));
11991179
}
12001180

12011181
TEST(TableMetadataBuilderTest, RemoveSnapshotNotExist) {
12021182
auto base = CreateBaseMetadata();
12031183
auto builder = TableMetadataBuilder::BuildFrom(base.get());
12041184

12051185
// Add multiple snapshots
1206-
auto snapshot1 = std::make_shared<Snapshot>();
1207-
snapshot1->snapshot_id = 1;
1208-
builder->AddSnapshot(snapshot1);
1209-
auto snapshot2 = std::make_shared<Snapshot>();
1210-
snapshot2->snapshot_id = 2;
1211-
builder->AddSnapshot(snapshot2);
1186+
builder->AddSnapshot(std::make_shared<Snapshot>(Snapshot{.snapshot_id = 1}));
1187+
builder->AddSnapshot(std::make_shared<Snapshot>(Snapshot{.snapshot_id = 2}));
12121188

12131189
ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build());
12141190
ASSERT_EQ(metadata->snapshots.size(), 2);
12151191

12161192
// Remove one snapshot
12171193
builder = TableMetadataBuilder::BuildFrom(metadata.get());
1218-
std::vector<int64_t> to_remove{3};
1219-
builder->RemoveSnapshots(to_remove);
1194+
builder->RemoveSnapshots(std::vector<int64_t>{3});
12201195
ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build());
12211196
ASSERT_EQ(metadata->snapshots.size(), 2);
12221197

12231198
builder = TableMetadataBuilder::BuildFrom(metadata.get());
1224-
to_remove = {1, 2};
1225-
builder->RemoveSnapshots(to_remove);
1199+
builder->RemoveSnapshots(std::vector<int64_t>{1, 2});
12261200
ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build());
12271201
ASSERT_EQ(metadata->snapshots.size(), 0);
12281202
}
12291203

1230-
// Test RemovePartitionSpec
1231-
TEST(TableMetadataBuilderTest, RemovePartitionSpecBasic) {
1204+
TEST(TableMetadataBuilderTest, RemovePartitionSpec) {
12321205
// Add multiple specs
12331206
PartitionField field1(2, 4, "field1", Transform::Identity());
12341207
PartitionField field2(3, 5, "field2", Transform::Identity());
@@ -1248,10 +1221,7 @@ TEST(TableMetadataBuilderTest, RemovePartitionSpecBasic) {
12481221
builder->RemovePartitionSpecs({2});
12491222
ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build());
12501223
ASSERT_EQ(metadata->partition_specs.size(), 1);
1251-
EXPECT_TRUE(std::ranges::find_if(metadata->partition_specs,
1252-
[&](const std::shared_ptr<PartitionSpec>& s) {
1253-
return s->spec_id() == 1;
1254-
}) != metadata->partition_specs.end());
1224+
ASSERT_THAT(metadata->PartitionSpecById(2), IsError(ErrorKind::kNotFound));
12551225
}
12561226

12571227
TEST(TableMetadataBuilderTest, RemovePartitionSpecNotExist) {
@@ -1269,7 +1239,7 @@ TEST(TableMetadataBuilderTest, RemovePartitionSpecNotExist) {
12691239
ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build());
12701240
ASSERT_EQ(metadata->partition_specs.size(), 2);
12711241

1272-
// Remove one not exist spec
1242+
// Remove one non-existing spec
12731243
builder = TableMetadataBuilder::BuildFrom(metadata.get());
12741244
builder->RemovePartitionSpecs({3});
12751245
ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build());

src/iceberg/transaction.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,11 @@ Status Transaction::Apply(PendingUpdate& update) {
170170
if (!result.snapshot_ids_to_remove.empty()) {
171171
metadata_builder_->RemoveSnapshots(std::move(result.snapshot_ids_to_remove));
172172
}
173+
if (!result.refs_to_remove.empty()) {
174+
for (const auto& ref_name : result.refs_to_remove) {
175+
metadata_builder_->RemoveRef(ref_name);
176+
}
177+
}
173178
if (!result.partition_spec_ids_to_remove.empty()) {
174179
metadata_builder_->RemovePartitionSpecs(
175180
std::move(result.partition_spec_ids_to_remove));

0 commit comments

Comments
 (0)