Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 36 additions & 3 deletions parquet/src/arrow/array_reader/fixed_len_byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
use arrow_array::{
ArrayRef, Decimal32Array, Decimal64Array, Decimal128Array, Decimal256Array,
FixedSizeBinaryArray, Float16Array, IntervalDayTimeArray, IntervalYearMonthArray,
FixedSizeBinaryArray, Float16Array, IntervalDayTimeArray, IntervalMonthDayNanoArray,
IntervalYearMonthArray,
};
use arrow_buffer::{Buffer, IntervalDayTime, i256};
use arrow_buffer::{Buffer, IntervalDayTime, IntervalMonthDayNano, i256};
use arrow_data::ArrayDataBuilder;
use arrow_schema::{DataType as ArrowType, IntervalUnit};
use bytes::Bytes;
Expand Down Expand Up @@ -96,6 +97,14 @@ pub fn make_fixed_len_byte_array_reader(
));
}
}
ArrowType::Interval(IntervalUnit::MonthDayNano) => {
if byte_length != 12 && byte_length != 16 {
return Err(general_err!(
"MonthDayNano interval must be 12 or 16 bytes, got {}",
byte_length
));
}
}
ArrowType::Interval(_) => {
if byte_length != 12 {
// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#interval
Expand Down Expand Up @@ -222,7 +231,31 @@ impl ArrayReader for FixedLenByteArrayReader {
Arc::new(IntervalDayTimeArray::from_unary(&binary, f)) as ArrayRef
}
IntervalUnit::MonthDayNano => {
return Err(nyi_err!("MonthDayNano intervals not supported"));
if binary.value_length() == 16 {
// Raw 16-byte: months(4) + days(4) + nanoseconds(8)
let f = |b: &[u8]| {
IntervalMonthDayNano::new(
i32::from_le_bytes(b[0..4].try_into().unwrap()),
i32::from_le_bytes(b[4..8].try_into().unwrap()),
i64::from_le_bytes(b[8..16].try_into().unwrap()),
)
};
Arc::new(IntervalMonthDayNanoArray::from_unary(&binary, f))
as ArrayRef
} else {
// Coerced 12-byte: months(4) + days(4) + millis(4)
let f = |b: &[u8]| {
let millis =
i32::from_le_bytes(b[8..12].try_into().unwrap());
IntervalMonthDayNano::new(
i32::from_le_bytes(b[0..4].try_into().unwrap()),
i32::from_le_bytes(b[4..8].try_into().unwrap()),
millis as i64 * 1_000_000,
)
};
Arc::new(IntervalMonthDayNanoArray::from_unary(&binary, f))
as ArrayRef
}
}
}
}
Expand Down
298 changes: 295 additions & 3 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1605,11 +1605,13 @@ pub(crate) mod tests {
use arrow_array::cast::AsArray;
use arrow_array::types::{
Date32Type, Date64Type, Decimal32Type, Decimal64Type, Decimal128Type, Decimal256Type,
DecimalType, Float16Type, Float32Type, Float64Type, Time32MillisecondType,
Time64MicrosecondType,
DecimalType, Float16Type, Float32Type, Float64Type, IntervalMonthDayNanoType,
Time32MillisecondType, Time64MicrosecondType, TimestampMillisecondType,
};
use arrow_array::*;
use arrow_buffer::{ArrowNativeType, Buffer, IntervalDayTime, NullBuffer, i256};
use arrow_buffer::{
ArrowNativeType, Buffer, IntervalDayTime, IntervalMonthDayNano, NullBuffer, i256,
};
use arrow_data::{ArrayData, ArrayDataBuilder};
use arrow_schema::{DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit};
use arrow_select::concat::concat_batches;
Expand Down Expand Up @@ -2176,6 +2178,296 @@ pub(crate) mod tests {

Ok(())
}
#[test]
fn test_timestamp_second_roundtrip() -> Result<()> {
use arrow_array::TimestampSecondArray;

let schema = Arc::new(Schema::new(vec![
Field::new(
"ts-second-no-tz",
ArrowDataType::Timestamp(TimeUnit::Second, None),
false,
),
Field::new(
"ts-second-utc",
ArrowDataType::Timestamp(TimeUnit::Second, Some("UTC".into())),
false,
),
]));

let mut default_buf = Vec::with_capacity(1024);
let mut coerce_buf = Vec::with_capacity(1024);

let coerce_props = WriterProperties::builder().set_coerce_types(true).build();

let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;
let mut coerce_writer =
ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?;

let original = RecordBatch::try_new(
schema,
vec![
Arc::new(TimestampSecondArray::from(vec![
0, 1, -1, 1_000_000, -1_000_000,
])),
Arc::new(
TimestampSecondArray::from(vec![0, 1, -1, 1_000_000, -1_000_000])
.with_timezone("UTC"),
),
],
)?;

default_writer.write(&original)?;
coerce_writer.write(&original)?;

default_writer.close()?;
coerce_writer.close()?;

let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;

let default_ret = default_reader.next().unwrap()?;
let coerce_ret = coerce_reader.next().unwrap()?;

// Default writer: lossless round-trip
assert_eq!(default_ret, original);

// Coerce writer: values are stored as milliseconds, read back as Timestamp(Millisecond)
// Values should be original * 1000
let coerce_no_tz = coerce_ret
.column(0)
.as_primitive::<TimestampMillisecondType>();
let coerce_utc = coerce_ret
.column(1)
.as_primitive::<TimestampMillisecondType>();
assert_eq!(
coerce_no_tz.values().as_ref(),
&[0, 1000, -1000, 1_000_000_000, -1_000_000_000]
);
assert_eq!(
coerce_utc.values().as_ref(),
&[0, 1000, -1000, 1_000_000_000, -1_000_000_000]
);

Ok(())
}

#[test]
fn test_interval_month_day_nano_roundtrip() -> Result<()> {
use arrow_array::IntervalMonthDayNanoArray;
use arrow_buffer::IntervalMonthDayNano;

let schema = Arc::new(Schema::new(vec![Field::new(
"interval-mdn",
ArrowDataType::Interval(arrow_schema::IntervalUnit::MonthDayNano),
false,
)]));

let mut default_buf = Vec::with_capacity(1024);
let mut coerce_buf = Vec::with_capacity(1024);

let coerce_props = WriterProperties::builder().set_coerce_types(true).build();

let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;
let mut coerce_writer =
ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?;

let original = RecordBatch::try_new(
schema,
vec![Arc::new(IntervalMonthDayNanoArray::from(vec![
IntervalMonthDayNano::new(1, 2, 3_000_000), // exactly 3ms
IntervalMonthDayNano::new(-1, -2, -3_000_000), // exactly -3ms
IntervalMonthDayNano::new(12, 30, 5_500_000_000), // 5500ms = 5.5s
IntervalMonthDayNano::new(0, 0, 999_999), // sub-millisecond
]))],
)?;

default_writer.write(&original)?;
coerce_writer.write(&original)?;

default_writer.close()?;
coerce_writer.close()?;

let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;

let default_ret = default_reader.next().unwrap()?;
let coerce_ret = coerce_reader.next().unwrap()?;

// Default writer (16-byte raw): lossless round-trip
assert_eq!(default_ret, original);

// Coerce writer (12-byte INTERVAL): nanoseconds truncated to milliseconds
let coerce_col = coerce_ret
.column(0)
.as_primitive::<IntervalMonthDayNanoType>();
assert_eq!(
coerce_col.value(0),
IntervalMonthDayNano::new(1, 2, 3_000_000)
); // exact
assert_eq!(
coerce_col.value(1),
IntervalMonthDayNano::new(-1, -2, -3_000_000)
); // exact
assert_eq!(
coerce_col.value(2),
IntervalMonthDayNano::new(12, 30, 5_500_000_000)
); // exact
assert_eq!(coerce_col.value(3), IntervalMonthDayNano::new(0, 0, 0)); // sub-ms truncated to 0

Ok(())
}

#[test]
fn test_timestamp_second_coerce_edge_cases() -> Result<()> {
use arrow_array::TimestampSecondArray;

let schema = Arc::new(Schema::new(vec![Field::new(
"ts",
ArrowDataType::Timestamp(TimeUnit::Second, None),
true,
)]));

let coerce_props = WriterProperties::builder().set_coerce_types(true).build();
let mut buf = Vec::with_capacity(1024);
let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), Some(coerce_props))?;

// Edge cases: large values near overflow boundary, zero, nulls
let max_safe = i64::MAX / 1000; // largest value that won't overflow * 1000
let min_safe = i64::MIN / 1000;
let original = RecordBatch::try_new(
schema,
vec![Arc::new(TimestampSecondArray::from(vec![
Some(0),
Some(max_safe),
Some(min_safe),
None,
Some(1),
Some(-1),
]))],
)?;

writer.write(&original)?;
writer.close()?;

let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
let ret = reader.next().unwrap()?;

let col = ret.column(0).as_primitive::<TimestampMillisecondType>();
assert_eq!(col.value(0), 0);
assert_eq!(col.value(1), max_safe * 1000);
assert_eq!(col.value(2), min_safe * 1000);
assert!(col.is_null(3)); // null preserved
assert_eq!(col.value(4), 1000);
assert_eq!(col.value(5), -1000);

Ok(())
}

#[test]
fn test_interval_month_day_nano_coerce_edge_cases() -> Result<()> {
use arrow_array::IntervalMonthDayNanoArray;

let schema = Arc::new(Schema::new(vec![Field::new(
"interval-mdn",
ArrowDataType::Interval(arrow_schema::IntervalUnit::MonthDayNano),
true,
)]));

// Test coerce_types=false: lossless with nulls
let mut default_buf = Vec::with_capacity(1024);
let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;

let original = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(IntervalMonthDayNanoArray::from(vec![
Some(IntervalMonthDayNano::new(1, 2, -3_000_000)), // negative ms
Some(IntervalMonthDayNano::new(0, 0, -999_999)), // negative sub-ms
Some(IntervalMonthDayNano::new(0, 0, 2_000_000_000_000)), // large ns
None, // null
]))],
)?;

default_writer.write(&original)?;
default_writer.close()?;

let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
let ret = reader.next().unwrap()?;
// Raw 16-byte: lossless round-trip including nulls
assert_eq!(ret, original);

// Test coerce_types=true: truncation edge cases
let coerce_props = WriterProperties::builder().set_coerce_types(true).build();
let mut coerce_buf = Vec::with_capacity(1024);
let mut coerce_writer = ArrowWriter::try_new(&mut coerce_buf, schema, Some(coerce_props))?;

coerce_writer.write(&original)?;
coerce_writer.close()?;

let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;
let ret = reader.next().unwrap()?;

let col = ret.column(0).as_primitive::<IntervalMonthDayNanoType>();
// -3_000_000 ns / 1_000_000 = -3 ms → -3 * 1_000_000 = -3_000_000 ns (exact)
assert_eq!(col.value(0), IntervalMonthDayNano::new(1, 2, -3_000_000));
// -999_999 ns / 1_000_000 = 0 ms (truncation toward zero) → 0 ns
assert_eq!(col.value(1), IntervalMonthDayNano::new(0, 0, 0));
// 2_000_000_000_000 ns / 1_000_000 = 2_000_000 ms → 2_000_000 * 1_000_000 = 2_000_000_000_000 ns
assert_eq!(
col.value(2),
IntervalMonthDayNano::new(0, 0, 2_000_000_000_000)
);
assert!(col.is_null(3)); // null preserved

Ok(())
}

#[test]
fn test_interval_year_month_day_time_unaffected_by_coerce() -> Result<()> {
use arrow_array::{IntervalDayTimeArray, IntervalYearMonthArray};

let schema = Arc::new(Schema::new(vec![
Field::new(
"ym",
ArrowDataType::Interval(arrow_schema::IntervalUnit::YearMonth),
false,
),
Field::new(
"dt",
ArrowDataType::Interval(arrow_schema::IntervalUnit::DayTime),
false,
),
]));

let coerce_props = WriterProperties::builder().set_coerce_types(true).build();
let mut buf = Vec::with_capacity(1024);
let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), Some(coerce_props))?;

let original = RecordBatch::try_new(
schema,
vec![
Arc::new(IntervalYearMonthArray::from(vec![0, 12, -6, 100])),
Arc::new(IntervalDayTimeArray::from(vec![
IntervalDayTime::new(0, 0),
IntervalDayTime::new(30, 1000),
IntervalDayTime::new(-1, -500),
IntervalDayTime::new(365, 86_400_000),
])),
],
)?;

writer.write(&original)?;
writer.close()?;

let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
let ret = reader.next().unwrap()?;

// YearMonth and DayTime should round-trip losslessly even with coerce_types=true
assert_eq!(ret, original);

Ok(())
}

struct RandFixedLenGen {}

impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
Expand Down
Loading