Skip to content

Commit d180768

Browse files
authored
feat: add UpdateSnapshotReference (#512)
1 parent 0862d64 commit d180768

File tree

12 files changed

+457
-6
lines changed

12 files changed

+457
-6
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ set(ICEBERG_SOURCES
9494
update/update_partition_spec.cc
9595
update/update_properties.cc
9696
update/update_schema.cc
97+
update/update_snapshot_reference.cc
9798
update/update_sort_order.cc
9899
update/update_statistics.cc
99100
util/bucket_util.cc

src/iceberg/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ iceberg_sources = files(
112112
'update/update_partition_spec.cc',
113113
'update/update_properties.cc',
114114
'update/update_schema.cc',
115+
'update/update_snapshot_reference.cc',
115116
'update/update_sort_order.cc',
116117
'update/update_statistics.cc',
117118
'util/bucket_util.cc',

src/iceberg/parquet/parquet_data_util.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ Result<std::shared_ptr<::arrow::Array>> ProjectStructArray(
148148
return output_array;
149149
}
150150

151-
/// Templated implementation for projecting list arrays.
151+
/// \brief Templated implementation for projecting list arrays.
152152
/// Works with both ListArray/ListType (32-bit offsets) and
153153
/// LargeListArray/LargeListType (64-bit offsets).
154154
template <typename ArrowListArrayType, typename ArrowListType>

src/iceberg/transaction.cc

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
#include "iceberg/update/update_partition_spec.h"
4242
#include "iceberg/update/update_properties.h"
4343
#include "iceberg/update/update_schema.h"
44+
#include "iceberg/update/update_snapshot_reference.h"
4445
#include "iceberg/update/update_sort_order.h"
4546
#include "iceberg/update/update_statistics.h"
4647
#include "iceberg/util/checked_cast.h"
@@ -159,11 +160,6 @@ Status Transaction::Apply(PendingUpdate& update) {
159160
metadata_builder_->SetCurrentSchema(std::move(result.schema),
160161
result.new_last_column_id);
161162
} break;
162-
case PendingUpdate::Kind::kUpdateSortOrder: {
163-
auto& update_sort_order = internal::checked_cast<UpdateSortOrder&>(update);
164-
ICEBERG_ASSIGN_OR_RAISE(auto sort_order, update_sort_order.Apply());
165-
metadata_builder_->SetDefaultSortOrder(std::move(sort_order));
166-
} break;
167163
case PendingUpdate::Kind::kUpdateSnapshot: {
168164
const auto& base = metadata_builder_->current();
169165

@@ -200,6 +196,21 @@ Status Transaction::Apply(PendingUpdate& update) {
200196
metadata_builder_->AssignUUID();
201197
}
202198
} break;
199+
case PendingUpdate::Kind::kUpdateSnapshotReference: {
200+
auto& update_ref = internal::checked_cast<UpdateSnapshotReference&>(update);
201+
ICEBERG_ASSIGN_OR_RAISE(auto result, update_ref.Apply());
202+
for (const auto& name : result.to_remove) {
203+
metadata_builder_->RemoveRef(name);
204+
}
205+
for (auto&& [name, ref] : result.to_set) {
206+
metadata_builder_->SetRef(std::move(name), std::move(ref));
207+
}
208+
} break;
209+
case PendingUpdate::Kind::kUpdateSortOrder: {
210+
auto& update_sort_order = internal::checked_cast<UpdateSortOrder&>(update);
211+
ICEBERG_ASSIGN_OR_RAISE(auto sort_order, update_sort_order.Apply());
212+
metadata_builder_->SetDefaultSortOrder(std::move(sort_order));
213+
} break;
203214
case PendingUpdate::Kind::kUpdateStatistics: {
204215
auto& update_statistics = internal::checked_cast<UpdateStatistics&>(update);
205216
ICEBERG_ASSIGN_OR_RAISE(auto result, update_statistics.Apply());
@@ -335,4 +346,12 @@ Result<std::shared_ptr<UpdateStatistics>> Transaction::NewUpdateStatistics() {
335346
return update_statistics;
336347
}
337348

349+
Result<std::shared_ptr<UpdateSnapshotReference>>
350+
Transaction::NewUpdateSnapshotReference() {
351+
ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<UpdateSnapshotReference> update_ref,
352+
UpdateSnapshotReference::Make(shared_from_this()));
353+
ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_ref));
354+
return update_ref;
355+
}
356+
338357
} // namespace iceberg

src/iceberg/transaction.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,10 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this<Transacti
9797
/// \brief Create a new FastAppend to append data files and commit the changes.
9898
Result<std::shared_ptr<FastAppend>> NewFastAppend();
9999

100+
/// \brief Create a new UpdateSnapshotReference to update snapshot references (branches
101+
/// and tags) and commit the changes.
102+
Result<std::shared_ptr<UpdateSnapshotReference>> NewUpdateSnapshotReference();
103+
100104
private:
101105
Transaction(std::shared_ptr<Table> table, Kind kind, bool auto_commit,
102106
std::unique_ptr<TableMetadataBuilder> metadata_builder);

src/iceberg/type_fwd.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ class UpdateLocation;
198198
class UpdatePartitionSpec;
199199
class UpdateProperties;
200200
class UpdateSchema;
201+
class UpdateSnapshotReference;
201202
class UpdateSortOrder;
202203
class UpdateStatistics;
203204

src/iceberg/update/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ install_headers(
2525
'update_location.h',
2626
'update_partition_spec.h',
2727
'update_schema.h',
28+
'update_snapshot_reference.h',
2829
'update_sort_order.h',
2930
'update_properties.h',
3031
'update_statistics.h',

src/iceberg/update/pending_update.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector {
4949
kUpdateProperties,
5050
kUpdateSchema,
5151
kUpdateSnapshot,
52+
kUpdateSnapshotReference,
5253
kUpdateSortOrder,
5354
kUpdateStatistics,
5455
};
Lines changed: 244 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,244 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/update/update_snapshot_reference.h"
21+
22+
#include <memory>
23+
#include <optional>
24+
#include <string>
25+
#include <unordered_map>
26+
27+
#include "iceberg/snapshot.h"
28+
#include "iceberg/table_metadata.h"
29+
#include "iceberg/transaction.h"
30+
#include "iceberg/util/macros.h"
31+
#include "iceberg/util/snapshot_util_internal.h"
32+
33+
namespace iceberg {
34+
35+
Result<std::shared_ptr<UpdateSnapshotReference>> UpdateSnapshotReference::Make(
36+
std::shared_ptr<Transaction> transaction) {
37+
ICEBERG_PRECHECK(transaction != nullptr,
38+
"Cannot create UpdateSnapshotReference without a transaction");
39+
return std::shared_ptr<UpdateSnapshotReference>(
40+
new UpdateSnapshotReference(std::move(transaction)));
41+
}
42+
43+
UpdateSnapshotReference::UpdateSnapshotReference(std::shared_ptr<Transaction> transaction)
44+
: PendingUpdate(std::move(transaction)), updated_refs_(base().refs) {}
45+
46+
UpdateSnapshotReference::~UpdateSnapshotReference() = default;
47+
48+
UpdateSnapshotReference& UpdateSnapshotReference::CreateBranch(const std::string& name,
49+
int64_t snapshot_id) {
50+
ICEBERG_BUILDER_CHECK(!name.empty(), "Branch name cannot be empty");
51+
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto branch, SnapshotRef::MakeBranch(snapshot_id));
52+
auto [_, inserted] = updated_refs_.emplace(name, std::move(branch));
53+
ICEBERG_BUILDER_CHECK(inserted, "Ref '{}' already exists", name);
54+
return *this;
55+
}
56+
57+
UpdateSnapshotReference& UpdateSnapshotReference::CreateTag(const std::string& name,
58+
int64_t snapshot_id) {
59+
ICEBERG_BUILDER_CHECK(!name.empty(), "Tag name cannot be empty");
60+
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto tag, SnapshotRef::MakeTag(snapshot_id));
61+
auto [_, inserted] = updated_refs_.emplace(name, std::move(tag));
62+
ICEBERG_BUILDER_CHECK(inserted, "Ref '{}' already exists", name);
63+
return *this;
64+
}
65+
66+
UpdateSnapshotReference& UpdateSnapshotReference::RemoveBranch(const std::string& name) {
67+
ICEBERG_BUILDER_CHECK(!name.empty(), "Branch name cannot be empty");
68+
ICEBERG_BUILDER_CHECK(name != SnapshotRef::kMainBranch, "Cannot remove main branch");
69+
auto it = updated_refs_.find(name);
70+
ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Branch does not exist: {}", name);
71+
ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kBranch,
72+
"Ref '{}' is a tag not a branch", name);
73+
updated_refs_.erase(it);
74+
return *this;
75+
}
76+
77+
UpdateSnapshotReference& UpdateSnapshotReference::RemoveTag(const std::string& name) {
78+
ICEBERG_BUILDER_CHECK(!name.empty(), "Tag name cannot be empty");
79+
auto it = updated_refs_.find(name);
80+
ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Tag does not exist: {}", name);
81+
ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kTag,
82+
"Ref '{}' is a branch not a tag", name);
83+
updated_refs_.erase(it);
84+
return *this;
85+
}
86+
87+
UpdateSnapshotReference& UpdateSnapshotReference::RenameBranch(
88+
const std::string& name, const std::string& new_name) {
89+
ICEBERG_BUILDER_CHECK(!name.empty(), "Branch to rename cannot be empty");
90+
ICEBERG_BUILDER_CHECK(!new_name.empty(), "New branch name cannot be empty");
91+
ICEBERG_BUILDER_CHECK(name != SnapshotRef::kMainBranch, "Cannot rename main branch");
92+
auto it = updated_refs_.find(name);
93+
ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Branch does not exist: {}", name);
94+
ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kBranch,
95+
"Ref '{}' is a tag not a branch", name);
96+
auto [_, inserted] = updated_refs_.emplace(new_name, it->second);
97+
ICEBERG_BUILDER_CHECK(inserted, "Ref '{}' already exists", new_name);
98+
updated_refs_.erase(it);
99+
return *this;
100+
}
101+
102+
UpdateSnapshotReference& UpdateSnapshotReference::ReplaceBranch(const std::string& name,
103+
int64_t snapshot_id) {
104+
ICEBERG_BUILDER_CHECK(!name.empty(), "Branch name cannot be empty");
105+
auto it = updated_refs_.find(name);
106+
ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Branch does not exist: {}", name);
107+
ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kBranch,
108+
"Ref '{}' is a tag not a branch", name);
109+
it->second = it->second->Clone(snapshot_id);
110+
return *this;
111+
}
112+
113+
UpdateSnapshotReference& UpdateSnapshotReference::ReplaceBranch(const std::string& from,
114+
const std::string& to) {
115+
return ReplaceBranchInternal(from, to, /*fast_forward=*/false);
116+
}
117+
118+
UpdateSnapshotReference& UpdateSnapshotReference::FastForward(const std::string& from,
119+
const std::string& to) {
120+
return ReplaceBranchInternal(from, to, /*fast_forward=*/true);
121+
}
122+
123+
UpdateSnapshotReference& UpdateSnapshotReference::ReplaceBranchInternal(
124+
const std::string& from, const std::string& to, bool fast_forward) {
125+
ICEBERG_BUILDER_CHECK(!from.empty(), "Branch to update cannot be empty");
126+
ICEBERG_BUILDER_CHECK(!to.empty(), "Destination ref cannot be empty");
127+
auto to_it = updated_refs_.find(to);
128+
ICEBERG_BUILDER_CHECK(to_it != updated_refs_.end(), "Ref does not exist: {}", to);
129+
130+
auto from_it = updated_refs_.find(from);
131+
if (from_it == updated_refs_.end()) {
132+
return CreateBranch(from, to_it->second->snapshot_id);
133+
}
134+
135+
ICEBERG_BUILDER_CHECK(from_it->second->type() == SnapshotRefType::kBranch,
136+
"Ref '{}' is a tag not a branch", from);
137+
138+
// Nothing to replace if snapshot IDs are the same
139+
if (to_it->second->snapshot_id == from_it->second->snapshot_id) {
140+
return *this;
141+
}
142+
143+
if (fast_forward) {
144+
// Fast-forward is valid only when the current branch (from) is an ancestor of the
145+
// target (to), i.e. we are moving forward in history.
146+
const auto& base_metadata = transaction_->current();
147+
ICEBERG_BUILDER_ASSIGN_OR_RETURN(
148+
auto from_is_ancestor_of_to,
149+
SnapshotUtil::IsAncestorOf(
150+
to_it->second->snapshot_id, from_it->second->snapshot_id,
151+
[&base_metadata](int64_t id) { return base_metadata.SnapshotById(id); }));
152+
153+
ICEBERG_BUILDER_CHECK(from_is_ancestor_of_to,
154+
"Cannot fast-forward: {} is not an ancestor of {}", from, to);
155+
}
156+
157+
from_it->second = from_it->second->Clone(to_it->second->snapshot_id);
158+
return *this;
159+
}
160+
161+
UpdateSnapshotReference& UpdateSnapshotReference::ReplaceTag(const std::string& name,
162+
int64_t snapshot_id) {
163+
ICEBERG_BUILDER_CHECK(!name.empty(), "Tag name cannot be empty");
164+
auto it = updated_refs_.find(name);
165+
ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Tag does not exist: {}", name);
166+
ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kTag,
167+
"Ref '{}' is a branch not a tag", name);
168+
it->second = it->second->Clone(snapshot_id);
169+
return *this;
170+
}
171+
172+
UpdateSnapshotReference& UpdateSnapshotReference::SetMinSnapshotsToKeep(
173+
const std::string& name, int32_t min_snapshots_to_keep) {
174+
ICEBERG_BUILDER_CHECK(!name.empty(), "Branch name cannot be empty");
175+
auto it = updated_refs_.find(name);
176+
ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Branch does not exist: {}", name);
177+
ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kBranch,
178+
"Ref '{}' is a tag not a branch", name);
179+
it->second = it->second->Clone();
180+
std::get<SnapshotRef::Branch>(it->second->retention).min_snapshots_to_keep =
181+
min_snapshots_to_keep;
182+
ICEBERG_BUILDER_CHECK(it->second->Validate(),
183+
"Invalid min_snapshots_to_keep {} for branch '{}'",
184+
min_snapshots_to_keep, name);
185+
return *this;
186+
}
187+
188+
UpdateSnapshotReference& UpdateSnapshotReference::SetMaxSnapshotAgeMs(
189+
const std::string& name, int64_t max_snapshot_age_ms) {
190+
ICEBERG_BUILDER_CHECK(!name.empty(), "Branch name cannot be empty");
191+
auto it = updated_refs_.find(name);
192+
ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Branch does not exist: {}", name);
193+
ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kBranch,
194+
"Ref '{}' is a tag not a branch", name);
195+
it->second = it->second->Clone();
196+
std::get<SnapshotRef::Branch>(it->second->retention).max_snapshot_age_ms =
197+
max_snapshot_age_ms;
198+
ICEBERG_BUILDER_CHECK(it->second->Validate(),
199+
"Invalid max_snapshot_age_ms {} for branch '{}'",
200+
max_snapshot_age_ms, name);
201+
return *this;
202+
}
203+
204+
UpdateSnapshotReference& UpdateSnapshotReference::SetMaxRefAgeMs(const std::string& name,
205+
int64_t max_ref_age_ms) {
206+
ICEBERG_BUILDER_CHECK(!name.empty(), "Reference name cannot be empty");
207+
auto it = updated_refs_.find(name);
208+
ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Ref does not exist: {}", name);
209+
it->second = it->second->Clone();
210+
if (it->second->type() == SnapshotRefType::kBranch) {
211+
std::get<SnapshotRef::Branch>(it->second->retention).max_ref_age_ms = max_ref_age_ms;
212+
} else {
213+
std::get<SnapshotRef::Tag>(it->second->retention).max_ref_age_ms = max_ref_age_ms;
214+
}
215+
ICEBERG_BUILDER_CHECK(it->second->Validate(), "Invalid max_ref_age_ms {} for ref '{}'",
216+
max_ref_age_ms, name);
217+
return *this;
218+
}
219+
220+
Result<UpdateSnapshotReference::ApplyResult> UpdateSnapshotReference::Apply() {
221+
ICEBERG_RETURN_UNEXPECTED(CheckErrors());
222+
223+
ApplyResult result;
224+
const auto& current_refs = base().refs;
225+
226+
// Identify references which have been removed
227+
for (const auto& [name, ref] : current_refs) {
228+
if (!updated_refs_.contains(name)) {
229+
result.to_remove.push_back(name);
230+
}
231+
}
232+
233+
// Identify references which have been created or updated
234+
for (const auto& [name, ref] : updated_refs_) {
235+
if (auto iter = current_refs.find(name);
236+
iter == current_refs.end() || *iter->second != *ref) {
237+
result.to_set.emplace_back(name, ref);
238+
}
239+
}
240+
241+
return result;
242+
}
243+
244+
} // namespace iceberg

0 commit comments

Comments
 (0)