Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions src/iceberg/json_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -645,9 +645,8 @@ Result<std::unique_ptr<Snapshot>> SnapshotFromJson(const nlohmann::json& json) {
ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, GetJsonValue<int64_t>(json, kSnapshotId));
ICEBERG_ASSIGN_OR_RAISE(auto sequence_number,
GetJsonValueOptional<int64_t>(json, kSequenceNumber));
ICEBERG_ASSIGN_OR_RAISE(
auto timestamp_ms,
GetJsonValue<int64_t>(json, kTimestampMs).and_then(TimePointMsFromUnixMs));
ICEBERG_ASSIGN_OR_RAISE(auto unix_ms, GetJsonValue<int64_t>(json, kTimestampMs));
auto timestamp_ms = TimePointMsFromUnixMs(unix_ms);
ICEBERG_ASSIGN_OR_RAISE(auto manifest_list,
GetJsonValue<std::string>(json, kManifestList));

Expand Down Expand Up @@ -781,9 +780,8 @@ nlohmann::json ToJson(const SnapshotLogEntry& snapshot_log_entry) {

Result<SnapshotLogEntry> SnapshotLogEntryFromJson(const nlohmann::json& json) {
SnapshotLogEntry snapshot_log_entry;
ICEBERG_ASSIGN_OR_RAISE(
snapshot_log_entry.timestamp_ms,
GetJsonValue<int64_t>(json, kTimestampMs).and_then(TimePointMsFromUnixMs));
ICEBERG_ASSIGN_OR_RAISE(auto unix_ms, GetJsonValue<int64_t>(json, kTimestampMs));
snapshot_log_entry.timestamp_ms = TimePointMsFromUnixMs(unix_ms);
ICEBERG_ASSIGN_OR_RAISE(snapshot_log_entry.snapshot_id,
GetJsonValue<int64_t>(json, kSnapshotId));
return snapshot_log_entry;
Expand All @@ -798,9 +796,8 @@ nlohmann::json ToJson(const MetadataLogEntry& metadata_log_entry) {

Result<MetadataLogEntry> MetadataLogEntryFromJson(const nlohmann::json& json) {
MetadataLogEntry metadata_log_entry;
ICEBERG_ASSIGN_OR_RAISE(
metadata_log_entry.timestamp_ms,
GetJsonValue<int64_t>(json, kTimestampMs).and_then(TimePointMsFromUnixMs));
ICEBERG_ASSIGN_OR_RAISE(auto unix_ms, GetJsonValue<int64_t>(json, kTimestampMs));
metadata_log_entry.timestamp_ms = TimePointMsFromUnixMs(unix_ms);
ICEBERG_ASSIGN_OR_RAISE(metadata_log_entry.metadata_file,
GetJsonValue<std::string>(json, kMetadataFile));
return metadata_log_entry;
Expand Down
3 changes: 1 addition & 2 deletions src/iceberg/table_scan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,7 @@ TableScanBuilder& TableScanBuilder::UseRef(const std::string& ref) {
}

TableScanBuilder& TableScanBuilder::AsOfTime(int64_t timestamp_millis) {
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto time_point_ms,
TimePointMsFromUnixMs(timestamp_millis));
auto time_point_ms = TimePointMsFromUnixMs(timestamp_millis);
ICEBERG_BUILDER_ASSIGN_OR_RETURN(
auto snapshot_id, SnapshotUtil::SnapshotIdAsOfTime(*metadata_, time_point_ms));
return UseSnapshot(snapshot_id);
Expand Down
4 changes: 2 additions & 2 deletions src/iceberg/test/json_internal_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ TEST(JsonInternalTest, Snapshot) {
Snapshot snapshot{.snapshot_id = 1234567890,
.parent_snapshot_id = 9876543210,
.sequence_number = 99,
.timestamp_ms = TimePointMsFromUnixMs(1234567890123).value(),
.timestamp_ms = TimePointMsFromUnixMs(1234567890123),
.manifest_list = "/path/to/manifest_list",
.summary = summary,
.schema_id = 42};
Expand Down Expand Up @@ -403,7 +403,7 @@ TEST(JsonInternalTest, TableUpdateAddSnapshot) {
Snapshot{.snapshot_id = 123456789,
.parent_snapshot_id = 987654321,
.sequence_number = 5,
.timestamp_ms = TimePointMsFromUnixMs(1234567890000).value(),
.timestamp_ms = TimePointMsFromUnixMs(1234567890000),
.manifest_list = "/path/to/manifest-list.avro",
.summary = {{SnapshotSummaryFields::kOperation, DataOperation::kAppend}},
.schema_id = 1});
Expand Down
2 changes: 1 addition & 1 deletion src/iceberg/test/metadata_io_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class MetadataIOTest : public TempFileTestBase {
.snapshots = {std::make_shared<Snapshot>(Snapshot{
.snapshot_id = 3051729675574597004,
.sequence_number = 0,
.timestamp_ms = TimePointMsFromUnixMs(1515100955770).value(),
.timestamp_ms = TimePointMsFromUnixMs(1515100955770),
.manifest_list = "s3://a/b/1.avro",
.summary = {{"operation", "append"}},
})},
Expand Down
22 changes: 11 additions & 11 deletions src/iceberg/test/metadata_serde_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ TEST(MetadataSerdeTest, DeserializeV1Valid) {
.table_uuid = "d20125c8-7284-442c-9aea-15fee620737c",
.location = "s3://bucket/test/location",
.last_sequence_number = 0,
.last_updated_ms = TimePointMsFromUnixMs(1602638573874).value(),
.last_updated_ms = TimePointMsFromUnixMs(1602638573874),
.last_column_id = 3,
.schemas = {expected_schema},
.current_schema_id = Schema::kInitialSchemaId,
Expand Down Expand Up @@ -170,7 +170,7 @@ TEST(MetadataSerdeTest, DeserializeV2Valid) {
auto expected_snapshot_1 = std::make_shared<Snapshot>(Snapshot{
.snapshot_id = 3051729675574597004,
.sequence_number = 0,
.timestamp_ms = TimePointMsFromUnixMs(1515100955770).value(),
.timestamp_ms = TimePointMsFromUnixMs(1515100955770),
.manifest_list = "s3://a/b/1.avro",
.summary = {{"operation", "append"}},
});
Expand All @@ -179,7 +179,7 @@ TEST(MetadataSerdeTest, DeserializeV2Valid) {
.snapshot_id = 3055729675574597004,
.parent_snapshot_id = 3051729675574597004,
.sequence_number = 1,
.timestamp_ms = TimePointMsFromUnixMs(1555100955770).value(),
.timestamp_ms = TimePointMsFromUnixMs(1555100955770),
.manifest_list = "s3://a/b/2.avro",
.summary = {{"operation", "append"}},
.schema_id = 1,
Expand All @@ -190,7 +190,7 @@ TEST(MetadataSerdeTest, DeserializeV2Valid) {
.table_uuid = "9c12d441-03fe-4693-9a96-a0705ddf69c1",
.location = "s3://bucket/test/location",
.last_sequence_number = 34,
.last_updated_ms = TimePointMsFromUnixMs(1602638573590).value(),
.last_updated_ms = TimePointMsFromUnixMs(1602638573590),
.last_column_id = 3,
.schemas = {expected_schema_1, expected_schema_2},
.current_schema_id = 1,
Expand All @@ -200,10 +200,10 @@ TEST(MetadataSerdeTest, DeserializeV2Valid) {
.current_snapshot_id = 3055729675574597004,
.snapshots = {expected_snapshot_1, expected_snapshot_2},
.snapshot_log = {SnapshotLogEntry{
.timestamp_ms = TimePointMsFromUnixMs(1515100955770).value(),
.timestamp_ms = TimePointMsFromUnixMs(1515100955770),
.snapshot_id = 3051729675574597004},
SnapshotLogEntry{
.timestamp_ms = TimePointMsFromUnixMs(1555100955770).value(),
.timestamp_ms = TimePointMsFromUnixMs(1555100955770),
.snapshot_id = 3055729675574597004}},
.sort_orders = {expected_sort_order},
.default_sort_order_id = 3,
Expand Down Expand Up @@ -260,7 +260,7 @@ TEST(MetadataSerdeTest, DeserializeV2ValidMinimal) {
.table_uuid = "9c12d441-03fe-4693-9a96-a0705ddf69c1",
.location = "s3://bucket/test/location",
.last_sequence_number = 34,
.last_updated_ms = TimePointMsFromUnixMs(1602638573590).value(),
.last_updated_ms = TimePointMsFromUnixMs(1602638573590),
.last_column_id = 3,
.schemas = {expected_schema},
.current_schema_id = 0,
Expand Down Expand Up @@ -298,7 +298,7 @@ TEST(MetadataSerdeTest, DeserializeStatisticsFiles) {
auto expected_snapshot = std::make_shared<Snapshot>(Snapshot{
.snapshot_id = 3055729675574597004,
.sequence_number = 1,
.timestamp_ms = TimePointMsFromUnixMs(1555100955770).value(),
.timestamp_ms = TimePointMsFromUnixMs(1555100955770),
.manifest_list = "s3://a/b/2.avro",
.summary = {{"operation", "append"}},
.schema_id = 0,
Expand Down Expand Up @@ -326,7 +326,7 @@ TEST(MetadataSerdeTest, DeserializeStatisticsFiles) {
.table_uuid = "9c12d441-03fe-4693-9a96-a0705ddf69c1",
.location = "s3://bucket/test/location",
.last_sequence_number = 34,
.last_updated_ms = TimePointMsFromUnixMs(1602638573590).value(),
.last_updated_ms = TimePointMsFromUnixMs(1602638573590),
.last_column_id = 3,
.schemas = {expected_schema},
.current_schema_id = 0,
Expand Down Expand Up @@ -361,7 +361,7 @@ TEST(MetadataSerdeTest, DeserializePartitionStatisticsFiles) {
.table_uuid = "9c12d441-03fe-4693-9a96-a0705ddf69c1",
.location = "s3://bucket/test/location",
.last_sequence_number = 34,
.last_updated_ms = TimePointMsFromUnixMs(1602638573590).value(),
.last_updated_ms = TimePointMsFromUnixMs(1602638573590),
.last_column_id = 3,
.schemas = {std::make_shared<Schema>(
std::vector<SchemaField>{SchemaField(/*field_id=*/1, "x", int64(),
Expand All @@ -376,7 +376,7 @@ TEST(MetadataSerdeTest, DeserializePartitionStatisticsFiles) {
.snapshots = {std::make_shared<Snapshot>(Snapshot{
.snapshot_id = 3055729675574597004,
.sequence_number = 1,
.timestamp_ms = TimePointMsFromUnixMs(1555100955770).value(),
.timestamp_ms = TimePointMsFromUnixMs(1555100955770),
.manifest_list = "s3://a/b/2.avro",
.summary = {{"operation", "append"}},
.schema_id = 0,
Expand Down
8 changes: 4 additions & 4 deletions src/iceberg/test/snapshot_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ TEST_F(SnapshotTest, ConstructionAndFieldAccess) {
Snapshot snapshot{.snapshot_id = 12345,
.parent_snapshot_id = 54321,
.sequence_number = 1,
.timestamp_ms = TimePointMsFromUnixMs(1615569200000).value(),
.timestamp_ms = TimePointMsFromUnixMs(1615569200000),
.manifest_list = "s3://example/manifest_list.avro",
.summary = summary1,
.schema_id = 10};
Expand All @@ -107,13 +107,13 @@ TEST_F(SnapshotTest, ConstructionAndFieldAccess) {

TEST_F(SnapshotTest, EqualityComparison) {
// Test the == and != operators
Snapshot snapshot1(12345, {}, 1, TimePointMsFromUnixMs(1615569200000).value(),
Snapshot snapshot1(12345, {}, 1, TimePointMsFromUnixMs(1615569200000),
"s3://example/manifest_list.avro", summary1, {});

Snapshot snapshot2(12345, {}, 1, TimePointMsFromUnixMs(1615569200000).value(),
Snapshot snapshot2(12345, {}, 1, TimePointMsFromUnixMs(1615569200000),
"s3://example/manifest_list.avro", summary2, {});

Snapshot snapshot3(67890, {}, 1, TimePointMsFromUnixMs(1615569200000).value(),
Snapshot snapshot3(67890, {}, 1, TimePointMsFromUnixMs(1615569200000),
"s3://example/manifest_list.avro", summary3, {});

EXPECT_EQ(snapshot1, snapshot2);
Expand Down
4 changes: 2 additions & 2 deletions src/iceberg/util/timepoint.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

namespace iceberg {

Result<TimePointMs> TimePointMsFromUnixMs(int64_t unix_ms) {
TimePointMs TimePointMsFromUnixMs(int64_t unix_ms) {
return TimePointMs{std::chrono::milliseconds(unix_ms)};
}

Expand All @@ -35,7 +35,7 @@ int64_t UnixMsFromTimePointMs(TimePointMs time_point_ms) {
.count();
}

Result<TimePointNs> TimePointNsFromUnixNs(int64_t unix_ns) {
TimePointNs TimePointNsFromUnixNs(int64_t unix_ns) {
return TimePointNs{std::chrono::nanoseconds(unix_ns)};
}

Expand Down
5 changes: 2 additions & 3 deletions src/iceberg/util/timepoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include <chrono>

#include "iceberg/iceberg_export.h"
#include "iceberg/result.h"

namespace iceberg {

Expand All @@ -35,13 +34,13 @@ using TimePointNs =
std::chrono::time_point<std::chrono::system_clock, std::chrono::nanoseconds>;

/// \brief Returns a TimePointMs from a Unix timestamp in milliseconds
ICEBERG_EXPORT Result<TimePointMs> TimePointMsFromUnixMs(int64_t unix_ms);
ICEBERG_EXPORT TimePointMs TimePointMsFromUnixMs(int64_t unix_ms);

/// \brief Returns a Unix timestamp in milliseconds from a TimePointMs
ICEBERG_EXPORT int64_t UnixMsFromTimePointMs(TimePointMs time_point_ms);

/// \brief Returns a TimePointNs from a Unix timestamp in nanoseconds
ICEBERG_EXPORT Result<TimePointNs> TimePointNsFromUnixNs(int64_t unix_ns);
ICEBERG_EXPORT TimePointNs TimePointNsFromUnixNs(int64_t unix_ns);

/// \brief Returns a Unix timestamp in nanoseconds from a TimePointNs
ICEBERG_EXPORT int64_t UnixNsFromTimePointNs(TimePointNs time_point_ns);
Expand Down
Loading