Skip to content

Commit 5839cf8

Browse files
committed
fix: add a as as_span method for DataFileSet
1 parent e94fcc8 commit 5839cf8

File tree

7 files changed

+97
-67
lines changed

7 files changed

+97
-67
lines changed

src/iceberg/test/data_file_set_test.cc

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ TEST_F(DataFileSetTest, EmptySet) {
4545
EXPECT_TRUE(set.empty());
4646
EXPECT_EQ(set.size(), 0);
4747
EXPECT_EQ(set.begin(), set.end());
48+
EXPECT_TRUE(set.as_span().empty());
4849
}
4950

5051
TEST_F(DataFileSetTest, InsertSingleFile) {
@@ -108,6 +109,24 @@ TEST_F(DataFileSetTest, InsertionOrderPreserved) {
108109
EXPECT_EQ(paths[2], "/path/to/file3.parquet");
109110
}
110111

112+
TEST_F(DataFileSetTest, AsSpan) {
113+
DataFileSet set;
114+
EXPECT_TRUE(set.as_span().empty());
115+
116+
auto file1 = CreateDataFile("/path/to/file1.parquet");
117+
auto file2 = CreateDataFile("/path/to/file2.parquet");
118+
set.insert(file1);
119+
set.insert(file2);
120+
121+
auto span = set.as_span();
122+
EXPECT_EQ(span.size(), 2);
123+
EXPECT_EQ(span[0]->file_path, "/path/to/file1.parquet");
124+
EXPECT_EQ(span[1]->file_path, "/path/to/file2.parquet");
125+
126+
set.clear();
127+
EXPECT_TRUE(set.as_span().empty());
128+
}
129+
111130
TEST_F(DataFileSetTest, InsertDuplicatePreservesOrder) {
112131
DataFileSet set;
113132
auto file1 = CreateDataFile("/path/to/file1.parquet");

src/iceberg/test/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ iceberg_tests = {
8484
'sources': files(
8585
'bucket_util_test.cc',
8686
'config_test.cc',
87+
'data_file_set_test.cc',
8788
'decimal_test.cc',
8889
'endian_test.cc',
8990
'formatter_test.cc',

src/iceberg/update/fast_append.cc

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -197,9 +197,8 @@ Result<std::vector<ManifestFile>> FastAppend::WriteNewManifests() {
197197
if (new_manifests_.empty() && !new_data_files_by_spec_.empty()) {
198198
for (const auto& [spec_id, data_files] : new_data_files_by_spec_) {
199199
ICEBERG_ASSIGN_OR_RAISE(auto spec, Spec(spec_id));
200-
ICEBERG_ASSIGN_OR_RAISE(
201-
auto written_manifests,
202-
WriteDataManifests(data_files.begin(), data_files.end(), spec));
200+
ICEBERG_ASSIGN_OR_RAISE(auto written_manifests,
201+
WriteDataManifests(data_files.as_span(), spec));
203202
new_manifests_.insert(new_manifests_.end(),
204203
std::make_move_iterator(written_manifests.begin()),
205204
std::make_move_iterator(written_manifests.end()));

src/iceberg/update/snapshot_update.cc

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,11 @@
2727
#include "iceberg/manifest/manifest_entry.h"
2828
#include "iceberg/manifest/manifest_list.h"
2929
#include "iceberg/manifest/manifest_reader.h"
30+
#include "iceberg/manifest/manifest_writer.h"
31+
#include "iceberg/manifest/rolling_manifest_writer.h"
3032
#include "iceberg/partition_summary_internal.h"
33+
#include "iceberg/table.h"
34+
#include "iceberg/transaction.h"
3135
#include "iceberg/util/macros.h"
3236
#include "iceberg/util/snapshot_util_internal.h"
3337
#include "iceberg/util/string_util.h"
@@ -159,6 +163,62 @@ SnapshotUpdate::SnapshotUpdate(std::shared_ptr<Transaction> transaction)
159163
target_manifest_size_bytes_(
160164
base().properties.Get(TableProperties::kManifestTargetSizeBytes)) {}
161165

166+
// TODO(xxx): write manifests in parallel
167+
Result<std::vector<ManifestFile>> SnapshotUpdate::WriteDataManifests(
168+
std::span<const std::shared_ptr<DataFile>> files,
169+
const std::shared_ptr<PartitionSpec>& spec,
170+
std::optional<int64_t> data_sequence_number) {
171+
if (files.empty()) {
172+
return std::vector<ManifestFile>{};
173+
}
174+
175+
ICEBERG_ASSIGN_OR_RAISE(auto current_schema, base().Schema());
176+
RollingManifestWriter rolling_writer(
177+
[this, spec, schema = std::move(current_schema),
178+
snapshot_id = SnapshotId()]() -> Result<std::unique_ptr<ManifestWriter>> {
179+
return ManifestWriter::MakeWriter(base().format_version, snapshot_id,
180+
ManifestPath(), transaction_->table()->io(),
181+
std::move(spec), std::move(schema),
182+
ManifestContent::kData,
183+
/*first_row_id=*/base().next_row_id);
184+
},
185+
target_manifest_size_bytes_);
186+
187+
for (const auto& file : files) {
188+
ICEBERG_RETURN_UNEXPECTED(rolling_writer.WriteAddedEntry(file, data_sequence_number));
189+
}
190+
ICEBERG_RETURN_UNEXPECTED(rolling_writer.Close());
191+
return rolling_writer.ToManifestFiles();
192+
}
193+
194+
// TODO(xxx): write manifests in parallel
195+
Result<std::vector<ManifestFile>> SnapshotUpdate::WriteDeleteManifests(
196+
std::span<const std::shared_ptr<DataFile>> files,
197+
const std::shared_ptr<PartitionSpec>& spec) {
198+
if (files.empty()) {
199+
return std::vector<ManifestFile>{};
200+
}
201+
202+
ICEBERG_ASSIGN_OR_RAISE(auto current_schema, base().Schema());
203+
RollingManifestWriter rolling_writer(
204+
[this, spec, schema = std::move(current_schema),
205+
snapshot_id = SnapshotId()]() -> Result<std::unique_ptr<ManifestWriter>> {
206+
return ManifestWriter::MakeWriter(base().format_version, snapshot_id,
207+
ManifestPath(), transaction_->table()->io(),
208+
std::move(spec), std::move(schema),
209+
ManifestContent::kDeletes);
210+
},
211+
target_manifest_size_bytes_);
212+
213+
for (const auto& file : files) {
214+
// FIXME: Java impl wrap it with `PendingDeleteFile` and deals with
215+
// file->data_sequence_number
216+
ICEBERG_RETURN_UNEXPECTED(rolling_writer.WriteAddedEntry(file));
217+
}
218+
ICEBERG_RETURN_UNEXPECTED(rolling_writer.Close());
219+
return rolling_writer.ToManifestFiles();
220+
}
221+
162222
int64_t SnapshotUpdate::SnapshotId() {
163223
if (!snapshot_id_.has_value()) {
164224
snapshot_id_ = SnapshotUtil::GenerateSnapshotId(base());

src/iceberg/update/snapshot_update.h

Lines changed: 8 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,15 @@
2222
#include <functional>
2323
#include <memory>
2424
#include <optional>
25+
#include <span>
2526
#include <string>
2627
#include <unordered_map>
2728
#include <unordered_set>
2829
#include <vector>
2930

3031
#include "iceberg/iceberg_export.h"
31-
#include "iceberg/manifest/manifest_list.h"
32-
#include "iceberg/manifest/manifest_writer.h"
33-
#include "iceberg/manifest/rolling_manifest_writer.h"
3432
#include "iceberg/result.h"
3533
#include "iceberg/snapshot.h"
36-
#include "iceberg/table.h"
37-
#include "iceberg/transaction.h"
3834
#include "iceberg/type_fwd.h"
3935
#include "iceberg/update/pending_update.h"
4036

@@ -107,75 +103,23 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate {
107103

108104
/// \brief Write data manifests for the given data files
109105
///
110-
/// \tparam Iterator Iterator type that dereferences to std::shared_ptr<DataFile>
111-
/// \param begin Iterator to the beginning of the data files range
112-
/// \param end Iterator to the end of the data files range
106+
/// \param files Data files to write
113107
/// \param spec The partition spec to use
114108
/// \param data_sequence_number Optional data sequence number for the files
115109
/// \return A vector of manifest files
116-
// TODO(xxx): write manifests in parallel
117-
template <typename Iterator>
118110
Result<std::vector<ManifestFile>> WriteDataManifests(
119-
Iterator begin, Iterator end, const std::shared_ptr<PartitionSpec>& spec,
120-
std::optional<int64_t> data_sequence_number = std::nullopt) {
121-
if (begin == end) {
122-
return std::vector<ManifestFile>{};
123-
}
124-
125-
ICEBERG_ASSIGN_OR_RAISE(auto current_schema, base().Schema());
126-
RollingManifestWriter rolling_writer(
127-
[this, spec, schema = std::move(current_schema),
128-
snapshot_id = SnapshotId()]() -> Result<std::unique_ptr<ManifestWriter>> {
129-
return ManifestWriter::MakeWriter(base().format_version, snapshot_id,
130-
ManifestPath(), transaction_->table()->io(),
131-
std::move(spec), std::move(schema),
132-
ManifestContent::kData,
133-
/*first_row_id=*/base().next_row_id);
134-
},
135-
target_manifest_size_bytes_);
136-
137-
for (auto it = begin; it != end; ++it) {
138-
ICEBERG_RETURN_UNEXPECTED(
139-
rolling_writer.WriteAddedEntry(*it, data_sequence_number));
140-
}
141-
ICEBERG_RETURN_UNEXPECTED(rolling_writer.Close());
142-
return rolling_writer.ToManifestFiles();
143-
}
111+
std::span<const std::shared_ptr<DataFile>> files,
112+
const std::shared_ptr<PartitionSpec>& spec,
113+
std::optional<int64_t> data_sequence_number = std::nullopt);
144114

145115
/// \brief Write delete manifests for the given delete files
146116
///
147-
/// \tparam Iterator Iterator type that dereferences to std::shared_ptr<DataFile>
148-
/// \param begin Iterator to the beginning of the delete files range
149-
/// \param end Iterator to the end of the delete files range
117+
/// \param files Delete files to write
150118
/// \param spec The partition spec to use
151119
/// \return A vector of manifest files
152-
// TODO(xxx): write manifests in parallel
153-
template <typename Iterator>
154120
Result<std::vector<ManifestFile>> WriteDeleteManifests(
155-
Iterator begin, Iterator end, const std::shared_ptr<PartitionSpec>& spec) {
156-
if (begin == end) {
157-
return std::vector<ManifestFile>{};
158-
}
159-
160-
ICEBERG_ASSIGN_OR_RAISE(auto current_schema, base().Schema());
161-
RollingManifestWriter rolling_writer(
162-
[this, spec, schema = std::move(current_schema),
163-
snapshot_id = SnapshotId()]() -> Result<std::unique_ptr<ManifestWriter>> {
164-
return ManifestWriter::MakeWriter(base().format_version, snapshot_id,
165-
ManifestPath(), transaction_->table()->io(),
166-
std::move(spec), std::move(schema),
167-
ManifestContent::kDeletes);
168-
},
169-
target_manifest_size_bytes_);
170-
171-
for (auto it = begin; it != end; ++it) {
172-
/// FIXME: Java impl wrap it with `PendingDeleteFile` and deals with
173-
/// (*it)->data_sequenece_number
174-
ICEBERG_RETURN_UNEXPECTED(rolling_writer.WriteAddedEntry(*it));
175-
}
176-
ICEBERG_RETURN_UNEXPECTED(rolling_writer.Close());
177-
return rolling_writer.ToManifestFiles();
178-
}
121+
std::span<const std::shared_ptr<DataFile>> files,
122+
const std::shared_ptr<PartitionSpec>& spec);
179123

180124
Status SetTargetBranch(const std::string& branch);
181125
const std::string& target_branch() const { return target_branch_; }

src/iceberg/util/data_file_set.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
#include <iterator>
2727
#include <memory>
28+
#include <span>
2829
#include <string_view>
2930
#include <unordered_map>
3031
#include <vector>
@@ -79,6 +80,11 @@ class ICEBERG_EXPORT DataFileSet {
7980
const_iterator end() const { return elements_.end(); }
8081
const_iterator cend() const { return elements_.cend(); }
8182

83+
/// \brief Get a non-owning view of the data files in insertion order.
84+
std::span<const value_type> as_span() const {
85+
return std::span<const value_type>(elements_.data(), elements_.size());
86+
}
87+
8288
private:
8389
std::pair<iterator, bool> InsertImpl(value_type file) {
8490
if (!file) {

src/iceberg/util/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ install_headers(
2222
'config.h',
2323
'content_file_util.h',
2424
'conversions.h',
25+
'data_file_set.h',
2526
'decimal.h',
2627
'endian.h',
2728
'error_collector.h',

0 commit comments

Comments
 (0)