@@ -171,6 +171,89 @@ Status TableScanContext::Validate() const {
171171 return {};
172172}
173173
174+ bool TableScanContext::IsScanCurrentLineage () const {
175+ return !from_snapshot_id.has_value () && !to_snapshot_id.has_value ();
176+ }
177+
178+ Result<int64_t > TableScanContext::ToSnapshotIdInclusive (
179+ const TableMetadata& metadata) const {
180+ // Get the branch's current snapshot ID if branch is set
181+ std::shared_ptr<Snapshot> branch_snapshot;
182+ if (!branch.empty ()) {
183+ auto iter = metadata.refs .find (branch);
184+ ICEBERG_CHECK (iter != metadata.refs .end () && iter->second != nullptr ,
185+ " Cannot find branch: {}" , branch);
186+ ICEBERG_ASSIGN_OR_RAISE (branch_snapshot,
187+ metadata.SnapshotById (iter->second ->snapshot_id ));
188+ }
189+
190+ if (to_snapshot_id.has_value ()) {
191+ int64_t to_snapshot_id_value = to_snapshot_id.value ();
192+
193+ if (branch_snapshot != nullptr ) {
194+ // Validate `to_snapshot_id` is on the current branch
195+ ICEBERG_ASSIGN_OR_RAISE (
196+ bool is_ancestor,
197+ SnapshotUtil::IsAncestorOf (metadata, branch_snapshot->snapshot_id ,
198+ to_snapshot_id_value));
199+ ICEBERG_CHECK (is_ancestor,
200+ " End snapshot is not a valid snapshot on the current branch: {}" ,
201+ branch);
202+ }
203+
204+ return to_snapshot_id_value;
205+ }
206+
207+ // If to_snapshot_id is not set, use branch's current snapshot if branch is set
208+ if (branch_snapshot != nullptr ) {
209+ return branch_snapshot->snapshot_id ;
210+ }
211+
212+ // Get current snapshot from table's current snapshot
213+ std::shared_ptr<Snapshot> current_snapshot;
214+ ICEBERG_ASSIGN_OR_RAISE (current_snapshot, metadata.Snapshot ());
215+ ICEBERG_CHECK (current_snapshot != nullptr ,
216+ " End snapshot is not set and table has no current snapshot" );
217+ return current_snapshot->snapshot_id ;
218+ }
219+
220+ Result<std::optional<int64_t >> TableScanContext::FromSnapshotIdExclusive (
221+ const TableMetadata& metadata, int64_t to_snapshot_id_inclusive) const {
222+ if (!from_snapshot_id.has_value ()) {
223+ return std::nullopt ;
224+ }
225+
226+ int64_t from_snapshot_id_value = from_snapshot_id.value ();
227+
228+ // Validate `from_snapshot_id` is an ancestor of `to_snapshot_id_inclusive`
229+ if (from_snapshot_id_inclusive) {
230+ ICEBERG_ASSIGN_OR_RAISE (bool is_ancestor,
231+ SnapshotUtil::IsAncestorOf (metadata, to_snapshot_id_inclusive,
232+ from_snapshot_id_value));
233+ ICEBERG_CHECK (
234+ is_ancestor,
235+ " Starting snapshot (inclusive) {} is not an ancestor of end snapshot {}" ,
236+ from_snapshot_id_value, to_snapshot_id_inclusive);
237+
238+ // For inclusive behavior, return the parent snapshot ID (can be nullopt)
239+ ICEBERG_ASSIGN_OR_RAISE (auto from_snapshot,
240+ metadata.SnapshotById (from_snapshot_id_value));
241+ return from_snapshot->parent_snapshot_id ;
242+ }
243+
244+ // Validate there is an ancestor of `to_snapshot_id_inclusive` where parent is
245+ // `from_snapshot_id`
246+ ICEBERG_ASSIGN_OR_RAISE (bool is_parent_ancestor, SnapshotUtil::IsParentAncestorOf (
247+ metadata, to_snapshot_id_inclusive,
248+ from_snapshot_id_value));
249+ ICEBERG_CHECK (
250+ is_parent_ancestor,
251+ " Starting snapshot (exclusive) {} is not a parent ancestor of end snapshot {}" ,
252+ from_snapshot_id_value, to_snapshot_id_inclusive);
253+
254+ return from_snapshot_id_value;
255+ }
256+
174257} // namespace internal
175258
176259ScanTask::~ScanTask () = default ;
@@ -340,10 +423,15 @@ TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::AsOfTime(
340423
341424template <typename ScanType>
342425TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::FromSnapshot(
343- [[maybe_unused]] int64_t from_snapshot_id, [[maybe_unused]] bool inclusive)
426+ int64_t from_snapshot_id, bool inclusive)
344427 requires IsIncrementalScan<ScanType>
345428{
346- AddError (NotImplemented (" Incremental scan is not implemented" ));
429+ if (inclusive) {
430+ ICEBERG_BUILDER_ASSIGN_OR_RETURN (std::ignore,
431+ metadata_->SnapshotById (from_snapshot_id));
432+ }
433+ this ->context_ .from_snapshot_id = from_snapshot_id;
434+ this ->context_ .from_snapshot_id_inclusive = inclusive;
347435 return *this ;
348436}
349437
@@ -352,31 +440,45 @@ TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::FromSnapshot(
352440 const std::string& ref, bool inclusive)
353441 requires IsIncrementalScan<ScanType>
354442{
355- AddError (NotImplemented (" Incremental scan is not implemented" ));
356- return *this ;
443+ auto iter = metadata_->refs .find (ref);
444+ ICEBERG_BUILDER_CHECK (iter != metadata_->refs .end (), " Cannot find ref: {}" , ref);
445+ ICEBERG_BUILDER_CHECK (iter->second != nullptr , " Ref {} is null" , ref);
446+ ICEBERG_BUILDER_CHECK (iter->second ->type () == SnapshotRefType::kTag ,
447+ " Ref {} is not a tag" , ref);
448+ return FromSnapshot (iter->second ->snapshot_id , inclusive);
357449}
358450
359451template <typename ScanType>
360452TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::ToSnapshot(int64_t to_snapshot_id)
361453 requires IsIncrementalScan<ScanType>
362454{
363- AddError (NotImplemented (" Incremental scan is not implemented" ));
455+ ICEBERG_BUILDER_ASSIGN_OR_RETURN (std::ignore, metadata_->SnapshotById (to_snapshot_id));
456+ context_.to_snapshot_id = to_snapshot_id;
364457 return *this ;
365458}
366459
367460template <typename ScanType>
368461TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::ToSnapshot(const std::string& ref)
369462 requires IsIncrementalScan<ScanType>
370463{
371- AddError (NotImplemented (" Incremental scan is not implemented" ));
372- return *this ;
464+ auto iter = metadata_->refs .find (ref);
465+ ICEBERG_BUILDER_CHECK (iter != metadata_->refs .end (), " Cannot find ref: {}" , ref);
466+ ICEBERG_BUILDER_CHECK (iter->second != nullptr , " Ref {} is null" , ref);
467+ ICEBERG_BUILDER_CHECK (iter->second ->type () == SnapshotRefType::kTag ,
468+ " Ref {} is not a tag" , ref);
469+ return ToSnapshot (iter->second ->snapshot_id );
373470}
374471
375472template <typename ScanType>
376473TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::UseBranch(
377474 const std::string& branch)
378475 requires IsIncrementalScan<ScanType>
379476{
477+ auto iter = metadata_->refs .find (branch);
478+ ICEBERG_BUILDER_CHECK (iter != metadata_->refs .end (), " Cannot find ref: {}" , branch);
479+ ICEBERG_BUILDER_CHECK (iter->second != nullptr , " Ref {} is null" , branch);
480+ ICEBERG_BUILDER_CHECK (iter->second ->type () == SnapshotRefType::kBranch ,
481+ " Ref {} is not a branch" , branch);
380482 context_.branch = branch;
381483 return *this ;
382484}
@@ -536,20 +638,100 @@ Result<std::vector<std::shared_ptr<FileScanTask>>> DataTableScan::PlanFiles() co
536638 return manifest_group->PlanFiles ();
537639}
538640
641+ template <typename ScanTaskType>
642+ Result<std::vector<std::shared_ptr<ScanTaskType>>>
643+ IncrementalScan<ScanTaskType>::PlanFiles() const {
644+ if (context_.IsScanCurrentLineage ()) {
645+ ICEBERG_ASSIGN_OR_RAISE (auto current_snapshot, metadata_->Snapshot ());
646+ if (current_snapshot == nullptr ) {
647+ return std::vector<std::shared_ptr<ScanTaskType>>{};
648+ }
649+ }
650+
651+ ICEBERG_ASSIGN_OR_RAISE (int64_t to_snapshot_id_inclusive,
652+ context_.ToSnapshotIdInclusive (*metadata_));
653+ ICEBERG_ASSIGN_OR_RAISE (
654+ std::optional<int64_t > from_snapshot_id_exclusive,
655+ context_.FromSnapshotIdExclusive (*metadata_, to_snapshot_id_inclusive));
656+
657+ return PlanFiles (from_snapshot_id_exclusive, to_snapshot_id_inclusive);
658+ }
659+
660+ template class IncrementalScan <FileScanTask>;
661+ template class IncrementalScan <ChangelogScanTask>;
662+
539663// IncrementalAppendScan implementation
540664
541665Result<std::unique_ptr<IncrementalAppendScan>> IncrementalAppendScan::Make (
542- [[maybe_unused]] std::shared_ptr<TableMetadata> metadata,
543- [[maybe_unused]] std::shared_ptr<Schema> schema,
544- [[maybe_unused]] std::shared_ptr<FileIO> io,
545- [[maybe_unused]] internal::TableScanContext context) {
546- return NotImplemented (" IncrementalAppendScan is not implemented" );
666+ std::shared_ptr<TableMetadata> metadata, std::shared_ptr<Schema> schema,
667+ std::shared_ptr<FileIO> io, internal::TableScanContext context) {
668+ ICEBERG_PRECHECK (metadata != nullptr , " Table metadata cannot be null" );
669+ ICEBERG_PRECHECK (schema != nullptr , " Schema cannot be null" );
670+ ICEBERG_PRECHECK (io != nullptr , " FileIO cannot be null" );
671+ return std::unique_ptr<IncrementalAppendScan>(new IncrementalAppendScan (
672+ std::move (metadata), std::move (schema), std::move (io), std::move (context)));
547673}
548674
549675Result<std::vector<std::shared_ptr<FileScanTask>>> IncrementalAppendScan::PlanFiles (
550676 std::optional<int64_t > from_snapshot_id_exclusive,
551677 int64_t to_snapshot_id_inclusive) const {
552- return NotImplemented (" IncrementalAppendScan::PlanFiles is not implemented" );
678+ ICEBERG_ASSIGN_OR_RAISE (
679+ auto ancestors_snapshots,
680+ SnapshotUtil::AncestorsBetween (*metadata_, to_snapshot_id_inclusive,
681+ from_snapshot_id_exclusive));
682+
683+ std::vector<std::shared_ptr<Snapshot>> append_snapshots;
684+ std::ranges::copy_if (ancestors_snapshots, std::back_inserter (append_snapshots),
685+ [](const auto & snapshot) {
686+ return snapshot != nullptr &&
687+ snapshot->Operation ().has_value () &&
688+ snapshot->Operation ().value () == DataOperation::kAppend ;
689+ });
690+ if (append_snapshots.empty ()) {
691+ return std::vector<std::shared_ptr<FileScanTask>>{};
692+ }
693+
694+ std::unordered_set<int64_t > snapshot_ids;
695+ std::ranges::transform (append_snapshots,
696+ std::inserter (snapshot_ids, snapshot_ids.end ()),
697+ [](const auto & snapshot) { return snapshot->snapshot_id ; });
698+
699+ std::vector<ManifestFile> data_manifests;
700+ for (const auto & snapshot : append_snapshots) {
701+ SnapshotCache snapshot_cache (snapshot.get ());
702+ ICEBERG_ASSIGN_OR_RAISE (auto manifests, snapshot_cache.DataManifests (io_));
703+ std::ranges::copy_if (manifests, std::back_inserter (data_manifests),
704+ [&snapshot_ids](const ManifestFile& manifest) {
705+ return snapshot_ids.contains (manifest.added_snapshot_id );
706+ });
707+ }
708+ if (data_manifests.empty ()) {
709+ return std::vector<std::shared_ptr<FileScanTask>>{};
710+ }
711+
712+ TableMetadataCache metadata_cache (metadata_.get ());
713+ ICEBERG_ASSIGN_OR_RAISE (auto specs_by_id, metadata_cache.GetPartitionSpecsById ());
714+
715+ ICEBERG_ASSIGN_OR_RAISE (
716+ auto manifest_group,
717+ ManifestGroup::Make (io_, schema_, specs_by_id, std::move (data_manifests), {}));
718+
719+ manifest_group->CaseSensitive (context_.case_sensitive )
720+ .Select (ScanColumns ())
721+ .FilterData (filter ())
722+ .FilterManifestEntries ([&snapshot_ids](const ManifestEntry& entry) {
723+ return entry.snapshot_id .has_value () &&
724+ snapshot_ids.contains (entry.snapshot_id .value ()) &&
725+ entry.status == ManifestStatus::kAdded ;
726+ })
727+ .IgnoreDeleted ()
728+ .ColumnsToKeepStats (context_.columns_to_keep_stats );
729+
730+ if (context_.ignore_residuals ) {
731+ manifest_group->IgnoreResiduals ();
732+ }
733+
734+ return manifest_group->PlanFiles ();
553735}
554736
555737// IncrementalChangelogScan implementation
0 commit comments