Skip to content

Commit 066bee0

Browse files
authored
feat: Add incremental scan API (#559)
1 parent 8bf089f commit 066bee0

File tree

6 files changed

+271
-95
lines changed

6 files changed

+271
-95
lines changed

src/iceberg/table.cc

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,8 +149,18 @@ Result<std::unique_ptr<LocationProvider>> Table::location_provider() const {
149149
return LocationProvider::Make(metadata_->location, metadata_->properties);
150150
}
151151

152-
Result<std::unique_ptr<TableScanBuilder>> Table::NewScan() const {
153-
return TableScanBuilder::Make(metadata_, io_);
152+
Result<std::unique_ptr<DataTableScanBuilder>> Table::NewScan() const {
153+
return DataTableScanBuilder::Make(metadata_, io_);
154+
}
155+
156+
Result<std::unique_ptr<IncrementalAppendScanBuilder>> Table::NewIncrementalAppendScan()
157+
const {
158+
return IncrementalAppendScanBuilder::Make(metadata_, io_);
159+
}
160+
161+
Result<std::unique_ptr<IncrementalChangelogScanBuilder>>
162+
Table::NewIncrementalChangelogScan() const {
163+
return IncrementalChangelogScanBuilder::Make(metadata_, io_);
154164
}
155165

156166
Result<std::shared_ptr<Transaction>> Table::NewTransaction() {
@@ -247,7 +257,7 @@ Result<std::shared_ptr<StagedTable>> StagedTable::Make(
247257

248258
StagedTable::~StagedTable() = default;
249259

250-
Result<std::unique_ptr<TableScanBuilder>> StagedTable::NewScan() const {
260+
Result<std::unique_ptr<DataTableScanBuilder>> StagedTable::NewScan() const {
251261
return NotSupported("Cannot scan a staged table");
252262
}
253263

src/iceberg/table.h

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,15 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this<Table> {
127127
///
128128
/// Once a table scan builder is created, it can be refined to project columns and
129129
/// filter data.
130-
virtual Result<std::unique_ptr<TableScanBuilder>> NewScan() const;
130+
virtual Result<std::unique_ptr<DataTableScanBuilder>> NewScan() const;
131+
132+
/// \brief Create a new incremental append scan builder for this table
133+
virtual Result<std::unique_ptr<IncrementalAppendScanBuilder>> NewIncrementalAppendScan()
134+
const;
135+
136+
/// \brief Create a new incremental changelog scan builder for this table
137+
virtual Result<std::unique_ptr<IncrementalChangelogScanBuilder>>
138+
NewIncrementalChangelogScan() const;
131139

132140
/// \brief Create a new Transaction to commit multiple table operations at once.
133141
virtual Result<std::shared_ptr<Transaction>> NewTransaction();
@@ -196,7 +204,7 @@ class ICEBERG_EXPORT StagedTable final : public Table {
196204

197205
Status Refresh() override { return {}; }
198206

199-
Result<std::unique_ptr<TableScanBuilder>> NewScan() const override;
207+
Result<std::unique_ptr<DataTableScanBuilder>> NewScan() const override;
200208

201209
private:
202210
using Table::Table;

src/iceberg/table_scan.cc

Lines changed: 112 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -210,39 +210,50 @@ Result<ArrowArrayStream> FileScanTask::ToArrow(
210210
return MakeArrowArrayStream(std::move(reader));
211211
}
212212

213-
Result<std::unique_ptr<TableScanBuilder>> TableScanBuilder::Make(
213+
// Generic template implementation for Make
214+
template <typename ScanType>
215+
Result<std::unique_ptr<TableScanBuilder<ScanType>>> TableScanBuilder<ScanType>::Make(
214216
std::shared_ptr<TableMetadata> metadata, std::shared_ptr<FileIO> io) {
215217
ICEBERG_PRECHECK(metadata != nullptr, "Table metadata cannot be null");
216218
ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null");
217-
return std::unique_ptr<TableScanBuilder>(
218-
new TableScanBuilder(std::move(metadata), std::move(io)));
219+
return std::unique_ptr<TableScanBuilder<ScanType>>(
220+
new TableScanBuilder<ScanType>(std::move(metadata), std::move(io)));
219221
}
220222

221-
TableScanBuilder::TableScanBuilder(std::shared_ptr<TableMetadata> table_metadata,
222-
std::shared_ptr<FileIO> file_io)
223+
template <typename ScanType>
224+
TableScanBuilder<ScanType>::TableScanBuilder(
225+
std::shared_ptr<TableMetadata> table_metadata, std::shared_ptr<FileIO> file_io)
223226
: metadata_(std::move(table_metadata)), io_(std::move(file_io)) {}
224227

225-
TableScanBuilder& TableScanBuilder::Option(std::string key, std::string value) {
228+
template <typename ScanType>
229+
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::Option(std::string key,
230+
std::string value) {
226231
context_.options[std::move(key)] = std::move(value);
227232
return *this;
228233
}
229234

230-
TableScanBuilder& TableScanBuilder::Project(std::shared_ptr<Schema> schema) {
235+
template <typename ScanType>
236+
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::Project(
237+
std::shared_ptr<Schema> schema) {
231238
context_.projected_schema = std::move(schema);
232239
return *this;
233240
}
234241

235-
TableScanBuilder& TableScanBuilder::CaseSensitive(bool case_sensitive) {
242+
template <typename ScanType>
243+
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::CaseSensitive(
244+
bool case_sensitive) {
236245
context_.case_sensitive = case_sensitive;
237246
return *this;
238247
}
239248

240-
TableScanBuilder& TableScanBuilder::IncludeColumnStats() {
249+
template <typename ScanType>
250+
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::IncludeColumnStats() {
241251
context_.return_column_stats = true;
242252
return *this;
243253
}
244254

245-
TableScanBuilder& TableScanBuilder::IncludeColumnStats(
255+
template <typename ScanType>
256+
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::IncludeColumnStats(
246257
const std::vector<std::string>& requested_columns) {
247258
context_.return_column_stats = true;
248259
context_.columns_to_keep_stats.clear();
@@ -260,27 +271,35 @@ TableScanBuilder& TableScanBuilder::IncludeColumnStats(
260271
return *this;
261272
}
262273

263-
TableScanBuilder& TableScanBuilder::Select(const std::vector<std::string>& column_names) {
274+
template <typename ScanType>
275+
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::Select(
276+
const std::vector<std::string>& column_names) {
264277
context_.selected_columns = column_names;
265278
return *this;
266279
}
267280

268-
TableScanBuilder& TableScanBuilder::Filter(std::shared_ptr<Expression> filter) {
281+
template <typename ScanType>
282+
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::Filter(
283+
std::shared_ptr<Expression> filter) {
269284
context_.filter = std::move(filter);
270285
return *this;
271286
}
272287

273-
TableScanBuilder& TableScanBuilder::IgnoreResiduals() {
288+
template <typename ScanType>
289+
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::IgnoreResiduals() {
274290
context_.ignore_residuals = true;
275291
return *this;
276292
}
277293

278-
TableScanBuilder& TableScanBuilder::MinRowsRequested(int64_t num_rows) {
294+
template <typename ScanType>
295+
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::MinRowsRequested(
296+
int64_t num_rows) {
279297
context_.min_rows_requested = num_rows;
280298
return *this;
281299
}
282300

283-
TableScanBuilder& TableScanBuilder::UseSnapshot(int64_t snapshot_id) {
301+
template <typename ScanType>
302+
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::UseSnapshot(int64_t snapshot_id) {
284303
ICEBERG_BUILDER_CHECK(!context_.snapshot_id.has_value(),
285304
"Cannot override snapshot, already set snapshot id={}",
286305
context_.snapshot_id.value());
@@ -289,7 +308,8 @@ TableScanBuilder& TableScanBuilder::UseSnapshot(int64_t snapshot_id) {
289308
return *this;
290309
}
291310

292-
TableScanBuilder& TableScanBuilder::UseRef(const std::string& ref) {
311+
template <typename ScanType>
312+
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::UseRef(const std::string& ref) {
293313
if (ref == SnapshotRef::kMainBranch) {
294314
snapshot_schema_ = nullptr;
295315
context_.snapshot_id.reset();
@@ -309,38 +329,61 @@ TableScanBuilder& TableScanBuilder::UseRef(const std::string& ref) {
309329
return *this;
310330
}
311331

312-
TableScanBuilder& TableScanBuilder::AsOfTime(int64_t timestamp_millis) {
332+
template <typename ScanType>
333+
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::AsOfTime(
334+
int64_t timestamp_millis) {
313335
auto time_point_ms = TimePointMsFromUnixMs(timestamp_millis);
314336
ICEBERG_BUILDER_ASSIGN_OR_RETURN(
315337
auto snapshot_id, SnapshotUtil::SnapshotIdAsOfTime(*metadata_, time_point_ms));
316338
return UseSnapshot(snapshot_id);
317339
}
318340

319-
TableScanBuilder& TableScanBuilder::FromSnapshot(
320-
[[maybe_unused]] int64_t from_snapshot_id, [[maybe_unused]] bool inclusive) {
321-
return AddError(NotImplemented("Incremental scan is not implemented"));
341+
template <typename ScanType>
342+
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::FromSnapshot(
343+
[[maybe_unused]] int64_t from_snapshot_id, [[maybe_unused]] bool inclusive)
344+
requires IsIncrementalScan<ScanType>
345+
{
346+
AddError(NotImplemented("Incremental scan is not implemented"));
347+
return *this;
322348
}
323349

324-
TableScanBuilder& TableScanBuilder::FromSnapshot([[maybe_unused]] const std::string& ref,
325-
[[maybe_unused]] bool inclusive) {
326-
return AddError(NotImplemented("Incremental scan is not implemented"));
350+
template <typename ScanType>
351+
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::FromSnapshot(
352+
const std::string& ref, bool inclusive)
353+
requires IsIncrementalScan<ScanType>
354+
{
355+
AddError(NotImplemented("Incremental scan is not implemented"));
356+
return *this;
327357
}
328358

329-
TableScanBuilder& TableScanBuilder::ToSnapshot([[maybe_unused]] int64_t to_snapshot_id) {
330-
return AddError(NotImplemented("Incremental scan is not implemented"));
359+
template <typename ScanType>
360+
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::ToSnapshot(int64_t to_snapshot_id)
361+
requires IsIncrementalScan<ScanType>
362+
{
363+
AddError(NotImplemented("Incremental scan is not implemented"));
364+
return *this;
331365
}
332366

333-
TableScanBuilder& TableScanBuilder::ToSnapshot([[maybe_unused]] const std::string& ref) {
334-
return AddError(NotImplemented("Incremental scan is not implemented"));
367+
template <typename ScanType>
368+
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::ToSnapshot(const std::string& ref)
369+
requires IsIncrementalScan<ScanType>
370+
{
371+
AddError(NotImplemented("Incremental scan is not implemented"));
372+
return *this;
335373
}
336374

337-
TableScanBuilder& TableScanBuilder::UseBranch(const std::string& branch) {
375+
template <typename ScanType>
376+
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::UseBranch(
377+
const std::string& branch)
378+
requires IsIncrementalScan<ScanType>
379+
{
338380
context_.branch = branch;
339381
return *this;
340382
}
341383

384+
template <typename ScanType>
342385
Result<std::reference_wrapper<const std::shared_ptr<Schema>>>
343-
TableScanBuilder::ResolveSnapshotSchema() {
386+
TableScanBuilder<ScanType>::ResolveSnapshotSchema() {
344387
if (snapshot_schema_ == nullptr) {
345388
if (context_.snapshot_id.has_value()) {
346389
ICEBERG_ASSIGN_OR_RAISE(auto snapshot,
@@ -355,22 +398,20 @@ TableScanBuilder::ResolveSnapshotSchema() {
355398
return snapshot_schema_;
356399
}
357400

358-
bool TableScanBuilder::IsIncrementalScan() const {
359-
return context_.from_snapshot_id.has_value() || context_.to_snapshot_id.has_value();
360-
}
361-
362-
Result<std::unique_ptr<TableScan>> TableScanBuilder::Build() {
401+
template <typename ScanType>
402+
Result<std::unique_ptr<ScanType>> TableScanBuilder<ScanType>::Build() {
363403
ICEBERG_RETURN_UNEXPECTED(CheckErrors());
364404
ICEBERG_RETURN_UNEXPECTED(context_.Validate());
365405

366-
if (IsIncrementalScan()) {
367-
return NotImplemented("Incremental scan is not yet implemented");
368-
}
369-
370406
ICEBERG_ASSIGN_OR_RAISE(auto schema, ResolveSnapshotSchema());
371-
return DataTableScan::Make(metadata_, schema.get(), io_, std::move(context_));
407+
return ScanType::Make(metadata_, schema.get(), io_, std::move(context_));
372408
}
373409

410+
// Explicit template instantiations
411+
template class TableScanBuilder<DataTableScan>;
412+
template class TableScanBuilder<IncrementalAppendScan>;
413+
template class TableScanBuilder<IncrementalChangelogScan>;
414+
374415
TableScan::TableScan(std::shared_ptr<TableMetadata> metadata,
375416
std::shared_ptr<Schema> schema, std::shared_ptr<FileIO> file_io,
376417
internal::TableScanContext context)
@@ -466,12 +507,6 @@ Result<std::unique_ptr<DataTableScan>> DataTableScan::Make(
466507
std::move(metadata), std::move(schema), std::move(io), std::move(context)));
467508
}
468509

469-
DataTableScan::DataTableScan(std::shared_ptr<TableMetadata> metadata,
470-
std::shared_ptr<Schema> schema, std::shared_ptr<FileIO> io,
471-
internal::TableScanContext context)
472-
: TableScan(std::move(metadata), std::move(schema), std::move(io),
473-
std::move(context)) {}
474-
475510
Result<std::vector<std::shared_ptr<FileScanTask>>> DataTableScan::PlanFiles() const {
476511
ICEBERG_ASSIGN_OR_RAISE(auto snapshot, this->snapshot());
477512
if (!snapshot) {
@@ -501,4 +536,36 @@ Result<std::vector<std::shared_ptr<FileScanTask>>> DataTableScan::PlanFiles() co
501536
return manifest_group->PlanFiles();
502537
}
503538

539+
// IncrementalAppendScan implementation
540+
541+
Result<std::unique_ptr<IncrementalAppendScan>> IncrementalAppendScan::Make(
542+
[[maybe_unused]] std::shared_ptr<TableMetadata> metadata,
543+
[[maybe_unused]] std::shared_ptr<Schema> schema,
544+
[[maybe_unused]] std::shared_ptr<FileIO> io,
545+
[[maybe_unused]] internal::TableScanContext context) {
546+
return NotImplemented("IncrementalAppendScan is not implemented");
547+
}
548+
549+
Result<std::vector<std::shared_ptr<FileScanTask>>> IncrementalAppendScan::PlanFiles(
550+
std::optional<int64_t> from_snapshot_id_exclusive,
551+
int64_t to_snapshot_id_inclusive) const {
552+
return NotImplemented("IncrementalAppendScan::PlanFiles is not implemented");
553+
}
554+
555+
// IncrementalChangelogScan implementation
556+
557+
Result<std::unique_ptr<IncrementalChangelogScan>> IncrementalChangelogScan::Make(
558+
[[maybe_unused]] std::shared_ptr<TableMetadata> metadata,
559+
[[maybe_unused]] std::shared_ptr<Schema> schema,
560+
[[maybe_unused]] std::shared_ptr<FileIO> io,
561+
[[maybe_unused]] internal::TableScanContext context) {
562+
return NotImplemented("IncrementalChangelogScan is not implemented");
563+
}
564+
565+
Result<std::vector<std::shared_ptr<ChangelogScanTask>>>
566+
IncrementalChangelogScan::PlanFiles(std::optional<int64_t> from_snapshot_id_exclusive,
567+
int64_t to_snapshot_id_inclusive) const {
568+
return NotImplemented("IncrementalChangelogScan::PlanFiles is not implemented");
569+
}
570+
504571
} // namespace iceberg

0 commit comments

Comments
 (0)