Skip to content

Commit 4c4ffff

Browse files
committed
feat: Impl IncrementalAppendScan
1 parent 066bee0 commit 4c4ffff

File tree

6 files changed

+1096
-140
lines changed

6 files changed

+1096
-140
lines changed

src/iceberg/table_scan.cc

Lines changed: 173 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,89 @@ Status TableScanContext::Validate() const {
171171
return {};
172172
}
173173

174+
bool TableScanContext::IsScanCurrentLineage() const {
175+
return !from_snapshot_id.has_value() && !to_snapshot_id.has_value();
176+
}
177+
178+
Result<int64_t> TableScanContext::ToSnapshotIdInclusive(
179+
const TableMetadata& metadata) const {
180+
// Get the branch's current snapshot ID if branch is set
181+
std::shared_ptr<Snapshot> branch_snapshot;
182+
if (!branch.empty()) {
183+
auto iter = metadata.refs.find(branch);
184+
ICEBERG_CHECK(iter != metadata.refs.end() && iter->second != nullptr,
185+
"Cannot find branch: {}", branch);
186+
ICEBERG_ASSIGN_OR_RAISE(branch_snapshot,
187+
metadata.SnapshotById(iter->second->snapshot_id));
188+
}
189+
190+
if (to_snapshot_id.has_value()) {
191+
int64_t to_snapshot_id_value = to_snapshot_id.value();
192+
193+
if (branch_snapshot != nullptr) {
194+
// Validate `to_snapshot_id` is on the current branch
195+
ICEBERG_ASSIGN_OR_RAISE(
196+
bool is_ancestor,
197+
SnapshotUtil::IsAncestorOf(metadata, branch_snapshot->snapshot_id,
198+
to_snapshot_id_value));
199+
ICEBERG_CHECK(is_ancestor,
200+
"End snapshot is not a valid snapshot on the current branch: {}",
201+
branch);
202+
}
203+
204+
return to_snapshot_id_value;
205+
}
206+
207+
// If to_snapshot_id is not set, use branch's current snapshot if branch is set
208+
if (branch_snapshot != nullptr) {
209+
return branch_snapshot->snapshot_id;
210+
}
211+
212+
// Get current snapshot from table's current snapshot
213+
std::shared_ptr<Snapshot> current_snapshot;
214+
ICEBERG_ASSIGN_OR_RAISE(current_snapshot, metadata.Snapshot());
215+
ICEBERG_CHECK(current_snapshot != nullptr,
216+
"End snapshot is not set and table has no current snapshot");
217+
return current_snapshot->snapshot_id;
218+
}
219+
220+
Result<std::optional<int64_t>> TableScanContext::FromSnapshotIdExclusive(
221+
const TableMetadata& metadata, int64_t to_snapshot_id_inclusive) const {
222+
if (!from_snapshot_id.has_value()) {
223+
return std::nullopt;
224+
}
225+
226+
int64_t from_snapshot_id_value = from_snapshot_id.value();
227+
228+
// Validate `from_snapshot_id` is an ancestor of `to_snapshot_id_inclusive`
229+
if (from_snapshot_id_inclusive) {
230+
ICEBERG_ASSIGN_OR_RAISE(bool is_ancestor,
231+
SnapshotUtil::IsAncestorOf(metadata, to_snapshot_id_inclusive,
232+
from_snapshot_id_value));
233+
ICEBERG_CHECK(
234+
is_ancestor,
235+
"Starting snapshot (inclusive) {} is not an ancestor of end snapshot {}",
236+
from_snapshot_id_value, to_snapshot_id_inclusive);
237+
238+
// For inclusive behavior, return the parent snapshot ID (can be nullopt)
239+
ICEBERG_ASSIGN_OR_RAISE(auto from_snapshot,
240+
metadata.SnapshotById(from_snapshot_id_value));
241+
return from_snapshot->parent_snapshot_id;
242+
}
243+
244+
// Validate there is an ancestor of `to_snapshot_id_inclusive` where parent is
245+
// `from_snapshot_id`
246+
ICEBERG_ASSIGN_OR_RAISE(bool is_parent_ancestor, SnapshotUtil::IsParentAncestorOf(
247+
metadata, to_snapshot_id_inclusive,
248+
from_snapshot_id_value));
249+
ICEBERG_CHECK(
250+
is_parent_ancestor,
251+
"Starting snapshot (exclusive) {} is not a parent ancestor of end snapshot {}",
252+
from_snapshot_id_value, to_snapshot_id_inclusive);
253+
254+
return from_snapshot_id_value;
255+
}
256+
174257
} // namespace internal
175258

176259
ScanTask::~ScanTask() = default;
@@ -340,10 +423,15 @@ TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::AsOfTime(
340423

341424
template <typename ScanType>
342425
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::FromSnapshot(
343-
[[maybe_unused]] int64_t from_snapshot_id, [[maybe_unused]] bool inclusive)
426+
int64_t from_snapshot_id, bool inclusive)
344427
requires IsIncrementalScan<ScanType>
345428
{
346-
AddError(NotImplemented("Incremental scan is not implemented"));
429+
if (inclusive) {
430+
ICEBERG_BUILDER_ASSIGN_OR_RETURN(std::ignore,
431+
metadata_->SnapshotById(from_snapshot_id));
432+
}
433+
this->context_.from_snapshot_id = from_snapshot_id;
434+
this->context_.from_snapshot_id_inclusive = inclusive;
347435
return *this;
348436
}
349437

@@ -352,31 +440,45 @@ TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::FromSnapshot(
352440
const std::string& ref, bool inclusive)
353441
requires IsIncrementalScan<ScanType>
354442
{
355-
AddError(NotImplemented("Incremental scan is not implemented"));
356-
return *this;
443+
auto iter = metadata_->refs.find(ref);
444+
ICEBERG_BUILDER_CHECK(iter != metadata_->refs.end(), "Cannot find ref: {}", ref);
445+
ICEBERG_BUILDER_CHECK(iter->second != nullptr, "Ref {} is null", ref);
446+
ICEBERG_BUILDER_CHECK(iter->second->type() == SnapshotRefType::kTag,
447+
"Ref {} is not a tag", ref);
448+
return FromSnapshot(iter->second->snapshot_id, inclusive);
357449
}
358450

359451
template <typename ScanType>
360452
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::ToSnapshot(int64_t to_snapshot_id)
361453
requires IsIncrementalScan<ScanType>
362454
{
363-
AddError(NotImplemented("Incremental scan is not implemented"));
455+
ICEBERG_BUILDER_ASSIGN_OR_RETURN(std::ignore, metadata_->SnapshotById(to_snapshot_id));
456+
context_.to_snapshot_id = to_snapshot_id;
364457
return *this;
365458
}
366459

367460
template <typename ScanType>
368461
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::ToSnapshot(const std::string& ref)
369462
requires IsIncrementalScan<ScanType>
370463
{
371-
AddError(NotImplemented("Incremental scan is not implemented"));
372-
return *this;
464+
auto iter = metadata_->refs.find(ref);
465+
ICEBERG_BUILDER_CHECK(iter != metadata_->refs.end(), "Cannot find ref: {}", ref);
466+
ICEBERG_BUILDER_CHECK(iter->second != nullptr, "Ref {} is null", ref);
467+
ICEBERG_BUILDER_CHECK(iter->second->type() == SnapshotRefType::kTag,
468+
"Ref {} is not a tag", ref);
469+
return ToSnapshot(iter->second->snapshot_id);
373470
}
374471

375472
template <typename ScanType>
376473
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::UseBranch(
377474
const std::string& branch)
378475
requires IsIncrementalScan<ScanType>
379476
{
477+
auto iter = metadata_->refs.find(branch);
478+
ICEBERG_BUILDER_CHECK(iter != metadata_->refs.end(), "Cannot find ref: {}", branch);
479+
ICEBERG_BUILDER_CHECK(iter->second != nullptr, "Ref {} is null", branch);
480+
ICEBERG_BUILDER_CHECK(iter->second->type() == SnapshotRefType::kBranch,
481+
"Ref {} is not a branch", branch);
380482
context_.branch = branch;
381483
return *this;
382484
}
@@ -539,17 +641,75 @@ Result<std::vector<std::shared_ptr<FileScanTask>>> DataTableScan::PlanFiles() co
539641
// IncrementalAppendScan implementation
540642

541643
Result<std::unique_ptr<IncrementalAppendScan>> IncrementalAppendScan::Make(
542-
[[maybe_unused]] std::shared_ptr<TableMetadata> metadata,
543-
[[maybe_unused]] std::shared_ptr<Schema> schema,
544-
[[maybe_unused]] std::shared_ptr<FileIO> io,
545-
[[maybe_unused]] internal::TableScanContext context) {
546-
return NotImplemented("IncrementalAppendScan is not implemented");
644+
std::shared_ptr<TableMetadata> metadata, std::shared_ptr<Schema> schema,
645+
std::shared_ptr<FileIO> io, internal::TableScanContext context) {
646+
ICEBERG_PRECHECK(metadata != nullptr, "Table metadata cannot be null");
647+
ICEBERG_PRECHECK(schema != nullptr, "Schema cannot be null");
648+
ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null");
649+
return std::unique_ptr<IncrementalAppendScan>(new IncrementalAppendScan(
650+
std::move(metadata), std::move(schema), std::move(io), std::move(context)));
547651
}
548652

549653
Result<std::vector<std::shared_ptr<FileScanTask>>> IncrementalAppendScan::PlanFiles(
550654
std::optional<int64_t> from_snapshot_id_exclusive,
551655
int64_t to_snapshot_id_inclusive) const {
552-
return NotImplemented("IncrementalAppendScan::PlanFiles is not implemented");
656+
ICEBERG_ASSIGN_OR_RAISE(
657+
auto ancestors_snapshots,
658+
SnapshotUtil::AncestorsBetween(*metadata_, to_snapshot_id_inclusive,
659+
from_snapshot_id_exclusive));
660+
661+
std::vector<std::shared_ptr<Snapshot>> append_snapshots;
662+
std::ranges::copy_if(ancestors_snapshots, std::back_inserter(append_snapshots),
663+
[](const auto& snapshot) {
664+
return snapshot != nullptr &&
665+
snapshot->Operation().has_value() &&
666+
snapshot->Operation().value() == DataOperation::kAppend;
667+
});
668+
if (append_snapshots.empty()) {
669+
return std::vector<std::shared_ptr<FileScanTask>>{};
670+
}
671+
672+
std::unordered_set<int64_t> snapshot_ids;
673+
std::ranges::transform(append_snapshots,
674+
std::inserter(snapshot_ids, snapshot_ids.end()),
675+
[](const auto& snapshot) { return snapshot->snapshot_id; });
676+
677+
std::vector<ManifestFile> data_manifests;
678+
for (const auto& snapshot : append_snapshots) {
679+
SnapshotCache snapshot_cache(snapshot.get());
680+
ICEBERG_ASSIGN_OR_RAISE(auto manifests, snapshot_cache.DataManifests(io_));
681+
std::ranges::copy_if(manifests, std::back_inserter(data_manifests),
682+
[&snapshot_ids](const ManifestFile& manifest) {
683+
return snapshot_ids.contains(manifest.added_snapshot_id);
684+
});
685+
}
686+
if (data_manifests.empty()) {
687+
return std::vector<std::shared_ptr<FileScanTask>>{};
688+
}
689+
690+
TableMetadataCache metadata_cache(metadata_.get());
691+
ICEBERG_ASSIGN_OR_RAISE(auto specs_by_id, metadata_cache.GetPartitionSpecsById());
692+
693+
ICEBERG_ASSIGN_OR_RAISE(
694+
auto manifest_group,
695+
ManifestGroup::Make(io_, schema_, specs_by_id, std::move(data_manifests), {}));
696+
697+
manifest_group->CaseSensitive(context_.case_sensitive)
698+
.Select(ScanColumns())
699+
.FilterData(filter())
700+
.FilterManifestEntries([&snapshot_ids](const ManifestEntry& entry) {
701+
return entry.snapshot_id.has_value() &&
702+
snapshot_ids.contains(entry.snapshot_id.value()) &&
703+
entry.status == ManifestStatus::kAdded;
704+
})
705+
.IgnoreDeleted()
706+
.ColumnsToKeepStats(context_.columns_to_keep_stats);
707+
708+
if (context_.ignore_residuals) {
709+
manifest_group->IgnoreResiduals();
710+
}
711+
712+
return manifest_group->PlanFiles();
553713
}
554714

555715
// IncrementalChangelogScan implementation

src/iceberg/table_scan.h

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
#include "iceberg/arrow_c_data.h"
3131
#include "iceberg/result.h"
32+
#include "iceberg/table_metadata.h"
3233
#include "iceberg/type_fwd.h"
3334
#include "iceberg/util/error_collector.h"
3435

@@ -132,6 +133,17 @@ struct TableScanContext {
132133

133134
// Validate the context parameters to see if they have conflicts.
134135
[[nodiscard]] Status Validate() const;
136+
137+
/// \brief Returns true if this scan is a current lineage scan, which means it does not
138+
/// specify from/to snapshot IDs.
139+
bool IsScanCurrentLineage() const;
140+
141+
/// \brief Get the snapshot ID to scan up to (inclusive) based on the context.
142+
Result<int64_t> ToSnapshotIdInclusive(const TableMetadata& metadata) const;
143+
144+
/// \brief Get the snapshot ID to scan from (exclusive) based on the context.
145+
Result<std::optional<int64_t>> FromSnapshotIdExclusive(
146+
const TableMetadata& metadata, int64_t to_snapshot_id_inclusive) const;
135147
};
136148

137149
} // namespace internal
@@ -361,9 +373,7 @@ class ICEBERG_EXPORT IncrementalScan : public TableScan {
361373

362374
/// \brief Plans the scan tasks by resolving manifests and data files.
363375
/// \return A Result containing scan tasks or an error.
364-
Result<std::vector<std::shared_ptr<ScanTaskType>>> PlanFiles() const {
365-
return NotImplemented("IncrementalScan::PlanFiles is not implemented");
366-
}
376+
Result<std::vector<std::shared_ptr<ScanTaskType>>> PlanFiles() const;
367377

368378
protected:
369379
virtual Result<std::vector<std::shared_ptr<ScanTaskType>>> PlanFiles(
@@ -373,6 +383,26 @@ class ICEBERG_EXPORT IncrementalScan : public TableScan {
373383
using TableScan::TableScan;
374384
};
375385

386+
// Template method implementation (must be in header for MSVC)
387+
template <typename ScanTaskType>
388+
Result<std::vector<std::shared_ptr<ScanTaskType>>>
389+
IncrementalScan<ScanTaskType>::PlanFiles() const {
390+
if (context_.IsScanCurrentLineage()) {
391+
ICEBERG_ASSIGN_OR_RAISE(auto current_snapshot, metadata_->Snapshot());
392+
if (current_snapshot == nullptr) {
393+
return std::vector<std::shared_ptr<ScanTaskType>>{};
394+
}
395+
}
396+
397+
ICEBERG_ASSIGN_OR_RAISE(int64_t to_snapshot_id_inclusive,
398+
context_.ToSnapshotIdInclusive(*metadata_));
399+
ICEBERG_ASSIGN_OR_RAISE(
400+
std::optional<int64_t> from_snapshot_id_exclusive,
401+
context_.FromSnapshotIdExclusive(*metadata_, to_snapshot_id_inclusive));
402+
403+
return PlanFiles(from_snapshot_id_exclusive, to_snapshot_id_inclusive);
404+
}
405+
376406
/// \brief A scan that reads data files added between snapshots (incremental appends).
377407
class ICEBERG_EXPORT IncrementalAppendScan : public IncrementalScan<FileScanTask> {
378408
public:
@@ -383,6 +413,9 @@ class ICEBERG_EXPORT IncrementalAppendScan : public IncrementalScan<FileScanTask
383413

384414
~IncrementalAppendScan() override = default;
385415

416+
// Bring the public PlanFiles() from base class into scope
417+
using IncrementalScan<FileScanTask>::PlanFiles;
418+
386419
protected:
387420
Result<std::vector<std::shared_ptr<FileScanTask>>> PlanFiles(
388421
std::optional<int64_t> from_snapshot_id_exclusive,
@@ -402,6 +435,9 @@ class ICEBERG_EXPORT IncrementalChangelogScan
402435

403436
~IncrementalChangelogScan() override = default;
404437

438+
// Bring the public PlanFiles() from base class into scope
439+
using IncrementalScan<ChangelogScanTask>::PlanFiles;
440+
405441
protected:
406442
Result<std::vector<std::shared_ptr<ChangelogScanTask>>> PlanFiles(
407443
std::optional<int64_t> from_snapshot_id_exclusive,

src/iceberg/test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ if(ICEBERG_BUILD_BUNDLE)
172172
USE_BUNDLE
173173
SOURCES
174174
file_scan_task_test.cc
175+
incremental_append_scan_test.cc
175176
table_scan_test.cc)
176177

177178
add_iceberg_test(table_update_test

0 commit comments

Comments
 (0)