Skip to content

Commit a457099

Browse files
authored
feat: add FastAppend (#516)
1 parent bc2e026 commit a457099

24 files changed

+823
-40
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ set(ICEBERG_SOURCES
4949
manifest/manifest_group.cc
5050
manifest/manifest_list.cc
5151
manifest/manifest_reader.cc
52+
manifest/manifest_util.cc
5253
manifest/manifest_writer.cc
5354
manifest/rolling_manifest_writer.cc
5455
manifest/v1_metadata.cc
@@ -85,6 +86,7 @@ set(ICEBERG_SOURCES
8586
transform_function.cc
8687
type.cc
8788
update/expire_snapshots.cc
89+
update/fast_append.cc
8890
update/pending_update.cc
8991
update/snapshot_update.cc
9092
update/update_location.cc

src/iceberg/constants.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ namespace iceberg {
3232

3333
constexpr std::string_view kParquetFieldIdKey = "PARQUET:field_id";
3434
constexpr int64_t kInvalidSnapshotId = -1;
35+
constexpr int64_t kInvalidSequenceNumber = -1;
3536
/// \brief Stand-in for the current sequence number that will be assigned when the commit
3637
/// is successful. This is replaced when writing a manifest list by the ManifestFile
3738
/// adapter.

src/iceberg/inheritable_metadata.cc

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,16 @@
2121

2222
#include <utility>
2323

24-
#include <iceberg/result.h>
25-
2624
#include "iceberg/manifest/manifest_entry.h"
2725
#include "iceberg/manifest/manifest_list.h"
28-
#include "iceberg/snapshot.h"
2926

3027
namespace iceberg {
3128

29+
InheritableMetadata::~InheritableMetadata() = default;
30+
BaseInheritableMetadata::~BaseInheritableMetadata() = default;
31+
CopyInheritableMetadata::~CopyInheritableMetadata() = default;
32+
EmptyInheritableMetadata::~EmptyInheritableMetadata() = default;
33+
3234
BaseInheritableMetadata::BaseInheritableMetadata(int32_t spec_id, int64_t snapshot_id,
3335
int64_t sequence_number,
3436
std::string manifest_location)

src/iceberg/inheritable_metadata.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ namespace iceberg {
3939
/// from the manifest file. This interface provides a way to apply such inheritance rules.
4040
class ICEBERG_EXPORT InheritableMetadata {
4141
public:
42-
virtual ~InheritableMetadata() = default;
42+
virtual ~InheritableMetadata();
4343

4444
/// \brief Apply inheritable metadata to a manifest entry.
4545
/// \param entry The manifest entry to modify.
@@ -61,6 +61,8 @@ class ICEBERG_EXPORT BaseInheritableMetadata : public InheritableMetadata {
6161

6262
Status Apply(ManifestEntry& entry) override;
6363

64+
~BaseInheritableMetadata() override;
65+
6466
private:
6567
int32_t spec_id_;
6668
int64_t snapshot_id_;
@@ -72,6 +74,8 @@ class ICEBERG_EXPORT BaseInheritableMetadata : public InheritableMetadata {
7274
class ICEBERG_EXPORT EmptyInheritableMetadata : public InheritableMetadata {
7375
public:
7476
Status Apply(ManifestEntry& entry) override;
77+
78+
~EmptyInheritableMetadata() override;
7579
};
7680

7781
/// \brief Metadata inheritance for copying manifests before commit.
@@ -83,6 +87,8 @@ class ICEBERG_EXPORT CopyInheritableMetadata : public InheritableMetadata {
8387

8488
Status Apply(ManifestEntry& entry) override;
8589

90+
~CopyInheritableMetadata() override;
91+
8692
private:
8793
int64_t snapshot_id_;
8894
};

src/iceberg/manifest/manifest_reader.cc

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
#include "iceberg/expression/expression.h"
3333
#include "iceberg/expression/projections.h"
3434
#include "iceberg/file_format.h"
35+
#include "iceberg/inheritable_metadata.h"
3536
#include "iceberg/manifest/manifest_entry.h"
3637
#include "iceberg/manifest/manifest_list.h"
3738
#include "iceberg/manifest/manifest_reader_internal.h"
@@ -998,18 +999,22 @@ Result<std::unique_ptr<ManifestReader>> ManifestReader::Make(
998999
}
9991000

10001001
Result<std::unique_ptr<ManifestReader>> ManifestReader::Make(
1001-
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
1002-
std::shared_ptr<Schema> schema, std::shared_ptr<PartitionSpec> spec) {
1003-
if (file_io == nullptr || schema == nullptr || spec == nullptr) {
1004-
return InvalidArgument(
1005-
"FileIO, Schema, and PartitionSpec cannot be null to create ManifestReader");
1002+
std::string_view manifest_location, std::optional<int64_t> manifest_length,
1003+
std::shared_ptr<FileIO> file_io, std::shared_ptr<Schema> schema,
1004+
std::shared_ptr<PartitionSpec> spec,
1005+
std::unique_ptr<InheritableMetadata> inheritable_metadata,
1006+
std::optional<int64_t> first_row_id) {
1007+
ICEBERG_PRECHECK(file_io != nullptr, "FileIO cannot be null to read manifest");
1008+
ICEBERG_PRECHECK(schema != nullptr, "Schema cannot be null to read manifest");
1009+
ICEBERG_PRECHECK(spec != nullptr, "PartitionSpec cannot be null to read manifest");
1010+
1011+
if (inheritable_metadata == nullptr) {
1012+
ICEBERG_ASSIGN_OR_RAISE(inheritable_metadata, InheritableMetadataFactory::Empty());
10061013
}
10071014

1008-
// No metadata to inherit in this case.
1009-
ICEBERG_ASSIGN_OR_RAISE(auto inheritable_metadata, InheritableMetadataFactory::Empty());
10101015
return std::make_unique<ManifestReaderImpl>(
1011-
std::string(manifest_location), std::nullopt, std::move(file_io), std::move(schema),
1012-
std::move(spec), std::move(inheritable_metadata), std::nullopt);
1016+
std::string(manifest_location), manifest_length, std::move(file_io),
1017+
std::move(schema), std::move(spec), std::move(inheritable_metadata), first_row_id);
10131018
}
10141019

10151020
Result<std::unique_ptr<ManifestListReader>> ManifestListReader::Make(

src/iceberg/manifest/manifest_reader.h

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222
/// \file iceberg/manifest/manifest_reader.h
2323
/// Data reader interface for manifest files.
2424

25+
#include <cstdint>
2526
#include <memory>
27+
#include <optional>
2628
#include <string>
2729
#include <unordered_map>
2830
#include <vector>
@@ -92,13 +94,19 @@ class ICEBERG_EXPORT ManifestReader {
9294

9395
/// \brief Creates a reader for a manifest file.
9496
/// \param manifest_location Path to the manifest file.
97+
/// \param manifest_length Length of the manifest file.
9598
/// \param file_io File IO implementation to use.
9699
/// \param schema Schema used to bind the partition type.
97100
/// \param spec Partition spec used for this manifest file.
101+
/// \param inheritable_metadata Inheritable metadata.
102+
/// \param first_row_id First row ID to use for the manifest entries.
98103
/// \return A Result containing the reader or an error.
99104
static Result<std::unique_ptr<ManifestReader>> Make(
100-
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
101-
std::shared_ptr<Schema> schema, std::shared_ptr<PartitionSpec> spec);
105+
std::string_view manifest_location, std::optional<int64_t> manifest_length,
106+
std::shared_ptr<FileIO> file_io, std::shared_ptr<Schema> schema,
107+
std::shared_ptr<PartitionSpec> spec,
108+
std::unique_ptr<InheritableMetadata> inheritable_metadata,
109+
std::optional<int64_t> first_row_id = std::nullopt);
102110

103111
/// \brief Add stats columns to the column list if needed.
104112
static std::vector<std::string> WithStatsColumns(
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include <memory>
21+
#include <optional>
22+
23+
#include "iceberg/inheritable_metadata.h"
24+
#include "iceberg/manifest/manifest_entry.h"
25+
#include "iceberg/manifest/manifest_reader.h"
26+
#include "iceberg/manifest/manifest_util_internal.h"
27+
#include "iceberg/manifest/manifest_writer.h"
28+
#include "iceberg/result.h"
29+
#include "iceberg/schema.h"
30+
#include "iceberg/snapshot.h"
31+
#include "iceberg/util/macros.h"
32+
33+
namespace iceberg {
34+
35+
Result<ManifestFile> CopyAppendManifest(
36+
const ManifestFile& manifest, const std::shared_ptr<FileIO>& file_io,
37+
const std::shared_ptr<Schema>& schema, const std::shared_ptr<PartitionSpec>& spec,
38+
int64_t snapshot_id, const std::string& output_path, int8_t format_version,
39+
SnapshotSummaryBuilder* summary_builder) {
40+
// use metadata that will add the current snapshot's ID for the rewrite
41+
// read first_row_id as null because this copies the incoming manifest before commit
42+
ICEBERG_ASSIGN_OR_RAISE(auto inheritable_metadata,
43+
InheritableMetadataFactory::ForCopy(snapshot_id));
44+
ICEBERG_ASSIGN_OR_RAISE(
45+
auto reader,
46+
ManifestReader::Make(manifest.manifest_path, manifest.manifest_length, file_io,
47+
schema, spec, std::move(inheritable_metadata),
48+
/*first_row_id=*/std::nullopt));
49+
ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries());
50+
51+
// do not produce row IDs for the copy
52+
ICEBERG_ASSIGN_OR_RAISE(
53+
auto writer, ManifestWriter::MakeWriter(
54+
format_version, snapshot_id, output_path, file_io, spec, schema,
55+
ManifestContent::kData, /*first_row_id=*/std::nullopt));
56+
57+
for (auto& entry : entries) {
58+
ICEBERG_CHECK(entry.status == ManifestStatus::kAdded,
59+
"Manifest to copy must only contain added entries");
60+
if (summary_builder != nullptr && entry.data_file != nullptr) {
61+
ICEBERG_RETURN_UNEXPECTED(summary_builder->AddedFile(*spec, *entry.data_file));
62+
}
63+
64+
ICEBERG_RETURN_UNEXPECTED(writer->WriteAddedEntry(entry));
65+
}
66+
67+
ICEBERG_RETURN_UNEXPECTED(writer->Close());
68+
return writer->ToManifestFile();
69+
}
70+
71+
} // namespace iceberg
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
/// \file iceberg/manifest/manifest_util_internal.h
23+
/// Internal utility functions for manifest operations.
24+
25+
#include <cstdint>
26+
#include <memory>
27+
#include <string>
28+
29+
#include "iceberg/iceberg_export.h"
30+
#include "iceberg/result.h"
31+
#include "iceberg/type_fwd.h"
32+
33+
namespace iceberg {
34+
35+
/// \brief Copy an append manifest with a new snapshot ID.
36+
///
37+
/// This function copies a manifest file that contains only ADDED entries,
38+
/// rewriting it with a new snapshot ID. This is similar to Java's
39+
/// ManifestFiles.copyAppendManifest.
40+
///
41+
/// \param manifest The manifest file to copy
42+
/// \param file_io File IO implementation to use
43+
/// \param schema Table schema
44+
/// \param spec Partition spec for the manifest
45+
/// \param snapshot_id The new snapshot ID to assign to entries
46+
/// \param output_path Path where the new manifest will be written
47+
/// \param format_version Table format version
48+
/// \param summary_builder Optional summary builder to update with file metrics
49+
/// \return The copied manifest file, or an error
50+
ICEBERG_EXPORT Result<ManifestFile> CopyAppendManifest(
51+
const ManifestFile& manifest, const std::shared_ptr<FileIO>& file_io,
52+
const std::shared_ptr<Schema>& schema, const std::shared_ptr<PartitionSpec>& spec,
53+
int64_t snapshot_id, const std::string& output_path, int8_t format_version,
54+
SnapshotSummaryBuilder* summary_builder = nullptr);
55+
56+
} // namespace iceberg

src/iceberg/meson.build

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ iceberg_sources = files(
6767
'manifest/manifest_group.cc',
6868
'manifest/manifest_list.cc',
6969
'manifest/manifest_reader.cc',
70+
'manifest/manifest_util.cc',
7071
'manifest/manifest_writer.cc',
7172
'manifest/rolling_manifest_writer.cc',
7273
'manifest/v1_metadata.cc',
@@ -103,6 +104,7 @@ iceberg_sources = files(
103104
'transform_function.cc',
104105
'type.cc',
105106
'update/expire_snapshots.cc',
107+
'update/fast_append.cc',
106108
'update/pending_update.cc',
107109
'update/snapshot_update.cc',
108110
'update/update_location.cc',

src/iceberg/table.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,13 @@ Result<std::shared_ptr<UpdateLocation>> Table::NewUpdateLocation() {
199199
return transaction->NewUpdateLocation();
200200
}
201201

202+
Result<std::shared_ptr<FastAppend>> Table::NewFastAppend() {
203+
ICEBERG_ASSIGN_OR_RAISE(
204+
auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate,
205+
/*auto_commit=*/true));
206+
return transaction->NewFastAppend();
207+
}
208+
202209
Result<std::shared_ptr<StagedTable>> StagedTable::Make(
203210
TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
204211
std::string metadata_location, std::shared_ptr<FileIO> io,

0 commit comments

Comments
 (0)