Skip to content

Commit 078acb1

Browse files
committed
feat: add FileWriterFactory with separated writer files
Add FileWriterFactory to create data and delete writers, with comprehensive improvements per review feedback. File Organization: - Separate DataWriter to data_writer.h/.cc - Separate PositionDeleteWriter to position_delete_writer.h/.cc - Separate EqualityDeleteWriter to equality_delete_writer.h/.cc - Separate FileWriterFactory to file_writer_factory.h/.cc - Keep only FileWriter base interface in writer.h/.cc Key Features: - Input validation for all factory methods (path, schema, spec) - Thread safety documentation (NOT thread-safe) - State management in stub implementations (is_closed tracking) - Support for Parquet and Avro formats - Pass-by-value + std::move for sink parameters Implementation: - FileWriterFactory directly creates writers (true factory pattern) - Writers use friend pattern - only factory can construct them - Internal MakeXxxInternal functions handle cross-file construction - Stub implementations validate inputs before returning NotImplemented Tests: - 54 comprehensive tests covering: * Input validation (10 tests) * State management (9 tests) * File format support (6 tests) * Edge cases (6 tests) - All tests passing - All pre-commit checks passing Related to #441
1 parent cf0fd37 commit 078acb1

11 files changed

+1461
-4
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ set(ICEBERG_INCLUDES "$<BUILD_INTERFACE:${PROJECT_BINARY_DIR}/src>"
2020
set(ICEBERG_SOURCES
2121
arrow_c_data_guard_internal.cc
2222
catalog/memory/in_memory_catalog.cc
23+
data/data_writer.cc
24+
data/equality_delete_writer.cc
25+
data/file_writer_factory.cc
26+
data/position_delete_writer.cc
2327
data/writer.cc
2428
delete_file_index.cc
2529
expression/aggregate.cc

src/iceberg/data/data_writer.cc

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/data/data_writer.h"
21+
22+
namespace iceberg {
23+
24+
//=============================================================================
25+
// DataWriter - stub implementation (to be completed in separate PR per #441)
26+
//=============================================================================
27+
28+
class DataWriter::Impl {
29+
public:
30+
explicit Impl(DataWriterOptions options) : options_(std::move(options)) {}
31+
DataWriterOptions options_;
32+
bool is_closed_ = false;
33+
};
34+
35+
DataWriter::DataWriter(std::unique_ptr<Impl> impl) : impl_(std::move(impl)) {}
36+
DataWriter::~DataWriter() = default;
37+
38+
Status DataWriter::Write(ArrowArray* data) {
39+
if (!data) {
40+
return InvalidArgument("Cannot write null data");
41+
}
42+
if (impl_->is_closed_) {
43+
return Invalid("Writer is already closed");
44+
}
45+
return NotImplemented("DataWriter not yet implemented - see #441");
46+
}
47+
48+
Result<int64_t> DataWriter::Length() const {
49+
return NotImplemented("DataWriter not yet implemented - see #441");
50+
}
51+
52+
Status DataWriter::Close() {
53+
if (impl_->is_closed_) {
54+
return {}; // Close is idempotent
55+
}
56+
impl_->is_closed_ = true;
57+
return NotImplemented("DataWriter not yet implemented - see #441");
58+
}
59+
60+
Result<FileWriter::WriteResult> DataWriter::Metadata() {
61+
if (!impl_->is_closed_) {
62+
return Invalid("Writer must be closed before getting metadata");
63+
}
64+
return NotImplemented("DataWriter not yet implemented - see #441");
65+
}
66+
67+
// Internal factory function for FileWriterFactory
68+
std::unique_ptr<DataWriter> MakeDataWriterInternal(const DataWriterOptions& options) {
69+
auto impl = std::make_unique<DataWriter::Impl>(options);
70+
return std::unique_ptr<DataWriter>(new DataWriter(std::move(impl)));
71+
}
72+
73+
} // namespace iceberg

src/iceberg/data/data_writer.h

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
/// \file iceberg/data/data_writer.h
23+
/// Data writer for Iceberg tables.
24+
25+
#include <cstdint>
26+
#include <memory>
27+
#include <optional>
28+
#include <string>
29+
30+
#include "iceberg/arrow_c_data.h"
31+
#include "iceberg/data/writer.h"
32+
#include "iceberg/file_format.h"
33+
#include "iceberg/iceberg_export.h"
34+
#include "iceberg/result.h"
35+
#include "iceberg/row/partition_values.h"
36+
#include "iceberg/type_fwd.h"
37+
38+
namespace iceberg {
39+
40+
/// \brief Options for creating a DataWriter.
41+
///
42+
/// \note The following features from Java DataWriter are not yet supported:
43+
/// - Encryption key metadata (uses FileIO instead of EncryptedOutputFile)
44+
/// - Metrics collection and reporting
45+
/// - Split offsets tracking
46+
struct ICEBERG_EXPORT DataWriterOptions {
47+
std::string path;
48+
std::shared_ptr<Schema> schema;
49+
std::shared_ptr<PartitionSpec> spec;
50+
PartitionValues partition;
51+
FileFormatType format = FileFormatType::kParquet;
52+
std::shared_ptr<FileIO> io;
53+
std::optional<int32_t> sort_order_id;
54+
std::shared_ptr<class WriterProperties> properties;
55+
};
56+
57+
/// \brief Writer for Iceberg data files.
58+
///
59+
/// \warning Thread Safety: Writer instances are NOT thread-safe. Each writer should only
60+
/// be used by a single thread. Do not call Write(), Close(), or Metadata() concurrently.
61+
class ICEBERG_EXPORT DataWriter : public FileWriter {
62+
public:
63+
~DataWriter() override;
64+
65+
Status Write(ArrowArray* data) override;
66+
Result<int64_t> Length() const override;
67+
Status Close() override;
68+
Result<WriteResult> Metadata() override;
69+
70+
private:
71+
friend class FileWriterFactory;
72+
friend std::unique_ptr<DataWriter> MakeDataWriterInternal(const DataWriterOptions&);
73+
class Impl;
74+
std::unique_ptr<Impl> impl_;
75+
explicit DataWriter(std::unique_ptr<Impl> impl);
76+
};
77+
78+
} // namespace iceberg
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/data/equality_delete_writer.h"
21+
22+
namespace iceberg {
23+
24+
//=============================================================================
25+
// EqualityDeleteWriter - stub implementation (to be completed in separate PR per #441)
26+
//=============================================================================
27+
28+
class EqualityDeleteWriter::Impl {
29+
public:
30+
explicit Impl(EqualityDeleteWriterOptions options) : options_(std::move(options)) {}
31+
EqualityDeleteWriterOptions options_;
32+
bool is_closed_ = false;
33+
};
34+
35+
EqualityDeleteWriter::EqualityDeleteWriter(std::unique_ptr<Impl> impl)
36+
: impl_(std::move(impl)) {}
37+
EqualityDeleteWriter::~EqualityDeleteWriter() = default;
38+
39+
Status EqualityDeleteWriter::Write(ArrowArray* data) {
40+
if (!data) {
41+
return InvalidArgument("Cannot write null data");
42+
}
43+
if (impl_->is_closed_) {
44+
return Invalid("Writer is already closed");
45+
}
46+
return NotImplemented("EqualityDeleteWriter not yet implemented - see #441");
47+
}
48+
49+
Result<int64_t> EqualityDeleteWriter::Length() const {
50+
return NotImplemented("EqualityDeleteWriter not yet implemented - see #441");
51+
}
52+
53+
Status EqualityDeleteWriter::Close() {
54+
if (impl_->is_closed_) {
55+
return {}; // Close is idempotent
56+
}
57+
impl_->is_closed_ = true;
58+
return NotImplemented("EqualityDeleteWriter not yet implemented - see #441");
59+
}
60+
61+
Result<FileWriter::WriteResult> EqualityDeleteWriter::Metadata() {
62+
if (!impl_->is_closed_) {
63+
return Invalid("Writer must be closed before getting metadata");
64+
}
65+
return NotImplemented("EqualityDeleteWriter not yet implemented - see #441");
66+
}
67+
68+
const std::vector<int32_t>& EqualityDeleteWriter::equality_field_ids() const {
69+
return impl_->options_.equality_field_ids;
70+
}
71+
72+
// Internal factory function for FileWriterFactory
73+
std::unique_ptr<EqualityDeleteWriter> MakeEqualityDeleteWriterInternal(
74+
const EqualityDeleteWriterOptions& options) {
75+
auto impl = std::make_unique<EqualityDeleteWriter::Impl>(options);
76+
return std::unique_ptr<EqualityDeleteWriter>(new EqualityDeleteWriter(std::move(impl)));
77+
}
78+
79+
} // namespace iceberg
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
/// \file iceberg/data/equality_delete_writer.h
23+
/// Equality delete writer for Iceberg tables.
24+
25+
#include <cstdint>
26+
#include <memory>
27+
#include <optional>
28+
#include <string>
29+
#include <vector>
30+
31+
#include "iceberg/arrow_c_data.h"
32+
#include "iceberg/data/writer.h"
33+
#include "iceberg/file_format.h"
34+
#include "iceberg/iceberg_export.h"
35+
#include "iceberg/result.h"
36+
#include "iceberg/row/partition_values.h"
37+
#include "iceberg/type_fwd.h"
38+
39+
namespace iceberg {
40+
41+
/// \brief Options for creating an EqualityDeleteWriter.
42+
///
43+
/// \note The following features from Java EqualityDeleteWriter are not yet supported:
44+
/// - Encryption key metadata
45+
/// - Metrics collection and reporting
46+
/// - Split offsets tracking
47+
struct ICEBERG_EXPORT EqualityDeleteWriterOptions {
48+
std::string path;
49+
std::shared_ptr<Schema> schema;
50+
std::shared_ptr<PartitionSpec> spec;
51+
PartitionValues partition;
52+
FileFormatType format = FileFormatType::kParquet;
53+
std::shared_ptr<FileIO> io;
54+
std::vector<int32_t> equality_field_ids;
55+
std::optional<int32_t> sort_order_id;
56+
std::shared_ptr<class WriterProperties> properties;
57+
};
58+
59+
/// \brief Writer for Iceberg equality delete files.
60+
///
61+
/// \warning Thread Safety: Writer instances are NOT thread-safe. Each writer should only
62+
/// be used by a single thread. Do not call Write(), Close(), or Metadata() concurrently.
63+
class ICEBERG_EXPORT EqualityDeleteWriter : public FileWriter {
64+
public:
65+
~EqualityDeleteWriter() override;
66+
67+
Status Write(ArrowArray* data) override;
68+
Result<int64_t> Length() const override;
69+
Status Close() override;
70+
Result<WriteResult> Metadata() override;
71+
72+
const std::vector<int32_t>& equality_field_ids() const;
73+
74+
private:
75+
friend class FileWriterFactory;
76+
friend std::unique_ptr<EqualityDeleteWriter> MakeEqualityDeleteWriterInternal(
77+
const EqualityDeleteWriterOptions&);
78+
class Impl;
79+
std::unique_ptr<Impl> impl_;
80+
explicit EqualityDeleteWriter(std::unique_ptr<Impl> impl);
81+
};
82+
83+
} // namespace iceberg

0 commit comments

Comments
 (0)