diff --git a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs index 2297926add5f..d51cdf5df36d 100644 --- a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs +++ b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs @@ -28,9 +28,10 @@ use crate::errors::{ParquetError, Result}; use crate::schema::types::ColumnDescPtr; use arrow_array::{ ArrayRef, Decimal32Array, Decimal64Array, Decimal128Array, Decimal256Array, - FixedSizeBinaryArray, Float16Array, IntervalDayTimeArray, IntervalYearMonthArray, + FixedSizeBinaryArray, Float16Array, IntervalDayTimeArray, IntervalMonthDayNanoArray, + IntervalYearMonthArray, }; -use arrow_buffer::{Buffer, IntervalDayTime, i256}; +use arrow_buffer::{Buffer, IntervalDayTime, IntervalMonthDayNano, i256}; use arrow_data::ArrayDataBuilder; use arrow_schema::{DataType as ArrowType, IntervalUnit}; use bytes::Bytes; @@ -96,6 +97,14 @@ pub fn make_fixed_len_byte_array_reader( )); } } + ArrowType::Interval(IntervalUnit::MonthDayNano) => { + if byte_length != 12 && byte_length != 16 { + return Err(general_err!( + "MonthDayNano interval must be 12 or 16 bytes, got {}", + byte_length + )); + } + } ArrowType::Interval(_) => { if byte_length != 12 { // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#interval @@ -222,7 +231,31 @@ impl ArrayReader for FixedLenByteArrayReader { Arc::new(IntervalDayTimeArray::from_unary(&binary, f)) as ArrayRef } IntervalUnit::MonthDayNano => { - return Err(nyi_err!("MonthDayNano intervals not supported")); + if binary.value_length() == 16 { + // Raw 16-byte: months(4) + days(4) + nanoseconds(8) + let f = |b: &[u8]| { + IntervalMonthDayNano::new( + i32::from_le_bytes(b[0..4].try_into().unwrap()), + i32::from_le_bytes(b[4..8].try_into().unwrap()), + i64::from_le_bytes(b[8..16].try_into().unwrap()), + ) + }; + Arc::new(IntervalMonthDayNanoArray::from_unary(&binary, f)) + as ArrayRef + } else { + // Coerced 12-byte: months(4) + days(4) + millis(4) + let f = |b: &[u8]| { + let millis = + i32::from_le_bytes(b[8..12].try_into().unwrap()); + IntervalMonthDayNano::new( + i32::from_le_bytes(b[0..4].try_into().unwrap()), + i32::from_le_bytes(b[4..8].try_into().unwrap()), + millis as i64 * 1_000_000, + ) + }; + Arc::new(IntervalMonthDayNanoArray::from_unary(&binary, f)) + as ArrayRef + } } } } diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 1b02c4ae25d3..eaf8342c56d2 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -1605,11 +1605,13 @@ pub(crate) mod tests { use arrow_array::cast::AsArray; use arrow_array::types::{ Date32Type, Date64Type, Decimal32Type, Decimal64Type, Decimal128Type, Decimal256Type, - DecimalType, Float16Type, Float32Type, Float64Type, Time32MillisecondType, - Time64MicrosecondType, + DecimalType, Float16Type, Float32Type, Float64Type, IntervalMonthDayNanoType, + Time32MillisecondType, Time64MicrosecondType, TimestampMillisecondType, }; use arrow_array::*; - use arrow_buffer::{ArrowNativeType, Buffer, IntervalDayTime, NullBuffer, i256}; + use arrow_buffer::{ + ArrowNativeType, Buffer, IntervalDayTime, IntervalMonthDayNano, NullBuffer, i256, + }; use arrow_data::{ArrayData, ArrayDataBuilder}; use arrow_schema::{DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit}; use arrow_select::concat::concat_batches; @@ -2176,6 +2178,296 @@ pub(crate) mod tests { Ok(()) } + #[test] + fn test_timestamp_second_roundtrip() -> Result<()> { + use arrow_array::TimestampSecondArray; + + let schema = Arc::new(Schema::new(vec![ + Field::new( + "ts-second-no-tz", + ArrowDataType::Timestamp(TimeUnit::Second, None), + false, + ), + Field::new( + "ts-second-utc", + ArrowDataType::Timestamp(TimeUnit::Second, Some("UTC".into())), + false, + ), + ])); + + let mut default_buf = Vec::with_capacity(1024); + let mut coerce_buf = Vec::with_capacity(1024); + + let coerce_props = WriterProperties::builder().set_coerce_types(true).build(); + + let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?; + let mut coerce_writer = + ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?; + + let original = RecordBatch::try_new( + schema, + vec![ + Arc::new(TimestampSecondArray::from(vec![ + 0, 1, -1, 1_000_000, -1_000_000, + ])), + Arc::new( + TimestampSecondArray::from(vec![0, 1, -1, 1_000_000, -1_000_000]) + .with_timezone("UTC"), + ), + ], + )?; + + default_writer.write(&original)?; + coerce_writer.write(&original)?; + + default_writer.close()?; + coerce_writer.close()?; + + let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?; + let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?; + + let default_ret = default_reader.next().unwrap()?; + let coerce_ret = coerce_reader.next().unwrap()?; + + // Default writer: lossless round-trip + assert_eq!(default_ret, original); + + // Coerce writer: values are stored as milliseconds, read back as Timestamp(Millisecond) + // Values should be original * 1000 + let coerce_no_tz = coerce_ret + .column(0) + .as_primitive::(); + let coerce_utc = coerce_ret + .column(1) + .as_primitive::(); + assert_eq!( + coerce_no_tz.values().as_ref(), + &[0, 1000, -1000, 1_000_000_000, -1_000_000_000] + ); + assert_eq!( + coerce_utc.values().as_ref(), + &[0, 1000, -1000, 1_000_000_000, -1_000_000_000] + ); + + Ok(()) + } + + #[test] + fn test_interval_month_day_nano_roundtrip() -> Result<()> { + use arrow_array::IntervalMonthDayNanoArray; + use arrow_buffer::IntervalMonthDayNano; + + let schema = Arc::new(Schema::new(vec![Field::new( + "interval-mdn", + ArrowDataType::Interval(arrow_schema::IntervalUnit::MonthDayNano), + false, + )])); + + let mut default_buf = Vec::with_capacity(1024); + let mut coerce_buf = Vec::with_capacity(1024); + + let coerce_props = WriterProperties::builder().set_coerce_types(true).build(); + + let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?; + let mut coerce_writer = + ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?; + + let original = RecordBatch::try_new( + schema, + vec![Arc::new(IntervalMonthDayNanoArray::from(vec![ + IntervalMonthDayNano::new(1, 2, 3_000_000), // exactly 3ms + IntervalMonthDayNano::new(-1, -2, -3_000_000), // exactly -3ms + IntervalMonthDayNano::new(12, 30, 5_500_000_000), // 5500ms = 5.5s + IntervalMonthDayNano::new(0, 0, 999_999), // sub-millisecond + ]))], + )?; + + default_writer.write(&original)?; + coerce_writer.write(&original)?; + + default_writer.close()?; + coerce_writer.close()?; + + let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?; + let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?; + + let default_ret = default_reader.next().unwrap()?; + let coerce_ret = coerce_reader.next().unwrap()?; + + // Default writer (16-byte raw): lossless round-trip + assert_eq!(default_ret, original); + + // Coerce writer (12-byte INTERVAL): nanoseconds truncated to milliseconds + let coerce_col = coerce_ret + .column(0) + .as_primitive::(); + assert_eq!( + coerce_col.value(0), + IntervalMonthDayNano::new(1, 2, 3_000_000) + ); // exact + assert_eq!( + coerce_col.value(1), + IntervalMonthDayNano::new(-1, -2, -3_000_000) + ); // exact + assert_eq!( + coerce_col.value(2), + IntervalMonthDayNano::new(12, 30, 5_500_000_000) + ); // exact + assert_eq!(coerce_col.value(3), IntervalMonthDayNano::new(0, 0, 0)); // sub-ms truncated to 0 + + Ok(()) + } + + #[test] + fn test_timestamp_second_coerce_edge_cases() -> Result<()> { + use arrow_array::TimestampSecondArray; + + let schema = Arc::new(Schema::new(vec![Field::new( + "ts", + ArrowDataType::Timestamp(TimeUnit::Second, None), + true, + )])); + + let coerce_props = WriterProperties::builder().set_coerce_types(true).build(); + let mut buf = Vec::with_capacity(1024); + let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), Some(coerce_props))?; + + // Edge cases: large values near overflow boundary, zero, nulls + let max_safe = i64::MAX / 1000; // largest value that won't overflow * 1000 + let min_safe = i64::MIN / 1000; + let original = RecordBatch::try_new( + schema, + vec![Arc::new(TimestampSecondArray::from(vec![ + Some(0), + Some(max_safe), + Some(min_safe), + None, + Some(1), + Some(-1), + ]))], + )?; + + writer.write(&original)?; + writer.close()?; + + let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?; + let ret = reader.next().unwrap()?; + + let col = ret.column(0).as_primitive::(); + assert_eq!(col.value(0), 0); + assert_eq!(col.value(1), max_safe * 1000); + assert_eq!(col.value(2), min_safe * 1000); + assert!(col.is_null(3)); // null preserved + assert_eq!(col.value(4), 1000); + assert_eq!(col.value(5), -1000); + + Ok(()) + } + + #[test] + fn test_interval_month_day_nano_coerce_edge_cases() -> Result<()> { + use arrow_array::IntervalMonthDayNanoArray; + + let schema = Arc::new(Schema::new(vec![Field::new( + "interval-mdn", + ArrowDataType::Interval(arrow_schema::IntervalUnit::MonthDayNano), + true, + )])); + + // Test coerce_types=false: lossless with nulls + let mut default_buf = Vec::with_capacity(1024); + let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?; + + let original = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(IntervalMonthDayNanoArray::from(vec![ + Some(IntervalMonthDayNano::new(1, 2, -3_000_000)), // negative ms + Some(IntervalMonthDayNano::new(0, 0, -999_999)), // negative sub-ms + Some(IntervalMonthDayNano::new(0, 0, 2_000_000_000_000)), // large ns + None, // null + ]))], + )?; + + default_writer.write(&original)?; + default_writer.close()?; + + let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?; + let ret = reader.next().unwrap()?; + // Raw 16-byte: lossless round-trip including nulls + assert_eq!(ret, original); + + // Test coerce_types=true: truncation edge cases + let coerce_props = WriterProperties::builder().set_coerce_types(true).build(); + let mut coerce_buf = Vec::with_capacity(1024); + let mut coerce_writer = ArrowWriter::try_new(&mut coerce_buf, schema, Some(coerce_props))?; + + coerce_writer.write(&original)?; + coerce_writer.close()?; + + let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?; + let ret = reader.next().unwrap()?; + + let col = ret.column(0).as_primitive::(); + // -3_000_000 ns / 1_000_000 = -3 ms → -3 * 1_000_000 = -3_000_000 ns (exact) + assert_eq!(col.value(0), IntervalMonthDayNano::new(1, 2, -3_000_000)); + // -999_999 ns / 1_000_000 = 0 ms (truncation toward zero) → 0 ns + assert_eq!(col.value(1), IntervalMonthDayNano::new(0, 0, 0)); + // 2_000_000_000_000 ns / 1_000_000 = 2_000_000 ms → 2_000_000 * 1_000_000 = 2_000_000_000_000 ns + assert_eq!( + col.value(2), + IntervalMonthDayNano::new(0, 0, 2_000_000_000_000) + ); + assert!(col.is_null(3)); // null preserved + + Ok(()) + } + + #[test] + fn test_interval_year_month_day_time_unaffected_by_coerce() -> Result<()> { + use arrow_array::{IntervalDayTimeArray, IntervalYearMonthArray}; + + let schema = Arc::new(Schema::new(vec![ + Field::new( + "ym", + ArrowDataType::Interval(arrow_schema::IntervalUnit::YearMonth), + false, + ), + Field::new( + "dt", + ArrowDataType::Interval(arrow_schema::IntervalUnit::DayTime), + false, + ), + ])); + + let coerce_props = WriterProperties::builder().set_coerce_types(true).build(); + let mut buf = Vec::with_capacity(1024); + let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), Some(coerce_props))?; + + let original = RecordBatch::try_new( + schema, + vec![ + Arc::new(IntervalYearMonthArray::from(vec![0, 12, -6, 100])), + Arc::new(IntervalDayTimeArray::from(vec![ + IntervalDayTime::new(0, 0), + IntervalDayTime::new(30, 1000), + IntervalDayTime::new(-1, -500), + IntervalDayTime::new(365, 86_400_000), + ])), + ], + )?; + + writer.write(&original)?; + writer.close()?; + + let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?; + let ret = reader.next().unwrap()?; + + // YearMonth and DayTime should round-trip losslessly even with coerce_types=true + assert_eq!(ret, original); + + Ok(()) + } + struct RandFixedLenGen {} impl RandGen for RandFixedLenGen { diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 8422263b1f63..71cd576a3aeb 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -28,7 +28,7 @@ use std::vec::IntoIter; use arrow_array::cast::AsArray; use arrow_array::types::*; -use arrow_array::{ArrayRef, Int32Array, RecordBatch, RecordBatchWriter}; +use arrow_array::{ArrayRef, Int32Array, Int64Array, RecordBatch, RecordBatchWriter}; use arrow_schema::{ ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef, TimeUnit, }; @@ -37,6 +37,7 @@ use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_pr use crate::arrow::ArrowSchemaConverter; use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder; +use crate::basic::{LogicalType, TimeUnit as ParquetTimeUnit}; use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter}; use crate::column::page_encryption::PageEncryptor; use crate::column::writer::encoder::ColumnValueEncoder; @@ -120,17 +121,17 @@ mod levels; /// ## Type Support /// /// The writer supports writing all Arrow [`DataType`]s that have a direct mapping to -/// Parquet types including [`StructArray`] and [`ListArray`]. +/// Parquet types including [`StructArray`] and [`ListArray`]. /// -/// The following are not supported: -/// -/// * [`IntervalMonthDayNanoArray`]: Parquet does not [support nanosecond intervals]. +/// [`IntervalMonthDayNanoArray`] is written as a 16-byte fixed-length byte array +/// by default (lossless). When [`coerce_types`] is enabled, it is written as a +/// 12-byte Parquet INTERVAL (lossy, truncating nanoseconds to milliseconds). /// /// [`DataType`]: https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html /// [`StructArray`]: https://docs.rs/arrow/latest/arrow/array/struct.StructArray.html /// [`ListArray`]: https://docs.rs/arrow/latest/arrow/array/type.ListArray.html /// [`IntervalMonthDayNanoArray`]: https://docs.rs/arrow/latest/arrow/array/type.IntervalMonthDayNanoArray.html -/// [support nanosecond intervals]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#interval +/// [`coerce_types`]: crate::file::properties::WriterProperties::coerce_types /// /// ## Type Compatibility /// The writer can write Arrow [`RecordBatch`]s that are logically equivalent. This means that for @@ -1400,8 +1401,22 @@ fn write_leaf( } ArrowDataType::Timestamp(unit, _) => match unit { TimeUnit::Second => { - let array = column.as_primitive::(); - write_primitive(typed, array.values(), levels) + // If coerced to milliseconds, multiply values by 1000 + if matches!( + typed.get_descriptor().logical_type_ref(), + Some(LogicalType::Timestamp { + unit: ParquetTimeUnit::MILLIS, + .. + }) + ) { + let array: Int64Array = column + .as_primitive::() + .unary(|x| x.saturating_mul(1000)); + write_primitive(typed, array.values(), levels) + } else { + let array = column.as_primitive::(); + write_primitive(typed, array.values(), levels) + } } TimeUnit::Millisecond => { let array = column.as_primitive::(); @@ -1482,10 +1497,13 @@ fn write_leaf( let array = column.as_primitive::(); get_interval_dt_array_slice(array, indices) } - _ => { - return Err(ParquetError::NYI(format!( - "Attempting to write an Arrow interval type {interval_unit:?} to parquet that is not yet implemented" - ))); + IntervalUnit::MonthDayNano => { + let array = column.as_primitive::(); + if typed.get_descriptor().type_length() == 12 { + get_interval_mdn_coerced_array_slice(array, indices) + } else { + get_interval_mdn_raw_array_slice(array, indices) + } } }, ArrowDataType::FixedSizeBinary(_) => { @@ -1580,6 +1598,44 @@ fn get_interval_dt_array_slice( values } +/// Returns 12-byte values representing months, days and milliseconds (4-bytes each). +/// MonthDayNano nanoseconds are truncated to milliseconds (lossy). +fn get_interval_mdn_coerced_array_slice( + array: &arrow_array::IntervalMonthDayNanoArray, + indices: &[usize], +) -> Vec { + let mut values = Vec::with_capacity(indices.len()); + for i in indices { + let value = array.value(*i); + let mut out = [0u8; 12]; + out[0..4].copy_from_slice(&value.months.to_le_bytes()); + out[4..8].copy_from_slice(&value.days.to_le_bytes()); + // Clamp to i32 range: Parquet INTERVAL stores milliseconds as 4-byte signed int + let millis = (value.nanoseconds / 1_000_000).clamp(i32::MIN as i64, i32::MAX as i64) as i32; + out[8..12].copy_from_slice(&millis.to_le_bytes()); + values.push(FixedLenByteArray::from(ByteArray::from(out.to_vec()))); + } + values +} + +/// Returns 16-byte values: months(4) + days(4) + nanoseconds(8). +/// Preserves the full IntervalMonthDayNano representation (lossless). +fn get_interval_mdn_raw_array_slice( + array: &arrow_array::IntervalMonthDayNanoArray, + indices: &[usize], +) -> Vec { + let mut values = Vec::with_capacity(indices.len()); + for i in indices { + let value = array.value(*i); + let mut out = [0u8; 16]; + out[0..4].copy_from_slice(&value.months.to_le_bytes()); + out[4..8].copy_from_slice(&value.days.to_le_bytes()); + out[8..16].copy_from_slice(&value.nanoseconds.to_le_bytes()); + values.push(FixedLenByteArray::from(ByteArray::from(out.to_vec()))); + } + values +} + fn get_decimal_32_array_slice( array: &arrow_array::Decimal32Array, indices: &[usize], @@ -3085,9 +3141,6 @@ mod tests { } #[test] - #[should_panic( - expected = "Attempting to write an Arrow interval type MonthDayNano to parquet that is not yet implemented" - )] fn interval_month_day_nano_single_column() { required_and_optional::(vec![ IntervalMonthDayNano::new(0, 1, 5), diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs index b2b93687ba89..2a892d0d3194 100644 --- a/parquet/src/arrow/schema/mod.rs +++ b/parquet/src/arrow/schema/mod.rs @@ -23,7 +23,7 @@ use std::collections::HashMap; use std::sync::Arc; use arrow_ipc::writer; -use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, TimeUnit}; +use arrow_schema::{DataType, Field, FieldRef, Fields, IntervalUnit, Schema, TimeUnit}; use crate::basic::{ ConvertedType, LogicalType, Repetition, TimeUnit as ParquetTimeUnit, Type as PhysicalType, @@ -625,12 +625,24 @@ fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result { .with_repetition(repetition) .with_id(id) .build(), - DataType::Timestamp(TimeUnit::Second, _) => { - // Cannot represent seconds in LogicalType - Type::primitive_type_builder(name, PhysicalType::INT64) - .with_repetition(repetition) - .with_id(id) - .build() + DataType::Timestamp(TimeUnit::Second, tz) => { + if coerce_types { + // Coerce seconds to the closest supported Parquet time unit (milliseconds) + Type::primitive_type_builder(name, PhysicalType::INT64) + .with_logical_type(Some(LogicalType::Timestamp { + is_adjusted_to_u_t_c: matches!(tz, Some(z) if !z.as_ref().is_empty()), + unit: ParquetTimeUnit::MILLIS, + })) + .with_repetition(repetition) + .with_id(id) + .build() + } else { + // Cannot represent seconds in LogicalType, store as raw INT64 + Type::primitive_type_builder(name, PhysicalType::INT64) + .with_repetition(repetition) + .with_id(id) + .build() + } } DataType::Timestamp(time_unit, tz) => { Type::primitive_type_builder(name, PhysicalType::INT64) @@ -701,6 +713,24 @@ fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result { .with_repetition(repetition) .with_id(id) .build(), + DataType::Interval(IntervalUnit::MonthDayNano) => { + if coerce_types { + // Coerce to 12-byte Parquet INTERVAL (truncates nanoseconds to milliseconds) + Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY) + .with_converted_type(ConvertedType::INTERVAL) + .with_repetition(repetition) + .with_id(id) + .with_length(12) + .build() + } else { + // Store as raw 16 bytes: month(4) + days(4) + nanoseconds(8) + Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY) + .with_repetition(repetition) + .with_id(id) + .with_length(16) + .build() + } + } DataType::Interval(_) => { Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY) .with_converted_type(ConvertedType::INTERVAL) @@ -2389,4 +2419,109 @@ mod tests { .contains("is not a virtual column") ); } + + #[test] + fn test_timestamp_second_coerce_types_false() { + let arrow_schema = Schema::new(vec![Field::new( + "ts_seconds", + DataType::Timestamp(TimeUnit::Second, None), + false, + )]); + + let parquet_schema = ArrowSchemaConverter::new() + .with_coerce_types(false) + .convert(&arrow_schema) + .unwrap(); + + let col = parquet_schema.column(0); + assert_eq!(col.physical_type(), PhysicalType::INT64); + assert_eq!(col.logical_type_ref(), None); + } + + #[test] + fn test_timestamp_second_no_tz_coerce_types_true() { + let arrow_schema = Schema::new(vec![Field::new( + "ts_seconds", + DataType::Timestamp(TimeUnit::Second, None), + false, + )]); + + let parquet_schema = ArrowSchemaConverter::new() + .with_coerce_types(true) + .convert(&arrow_schema) + .unwrap(); + + let col = parquet_schema.column(0); + assert_eq!(col.physical_type(), PhysicalType::INT64); + assert_eq!( + col.logical_type_ref(), + Some(&LogicalType::Timestamp { + is_adjusted_to_u_t_c: false, + unit: ParquetTimeUnit::MILLIS, + }) + ); + } + + #[test] + fn test_timestamp_second_utc_coerce_types_true() { + let arrow_schema = Schema::new(vec![Field::new( + "ts_seconds_utc", + DataType::Timestamp(TimeUnit::Second, Some("UTC".into())), + false, + )]); + + let parquet_schema = ArrowSchemaConverter::new() + .with_coerce_types(true) + .convert(&arrow_schema) + .unwrap(); + + let col = parquet_schema.column(0); + assert_eq!(col.physical_type(), PhysicalType::INT64); + assert_eq!( + col.logical_type_ref(), + Some(&LogicalType::Timestamp { + is_adjusted_to_u_t_c: true, + unit: ParquetTimeUnit::MILLIS, + }) + ); + } + + #[test] + fn test_interval_month_day_nano_coerce_types_false() { + let arrow_schema = Schema::new(vec![Field::new( + "interval", + DataType::Interval(IntervalUnit::MonthDayNano), + false, + )]); + + let parquet_schema = ArrowSchemaConverter::new() + .with_coerce_types(false) + .convert(&arrow_schema) + .unwrap(); + + let col = parquet_schema.column(0); + assert_eq!(col.physical_type(), PhysicalType::FIXED_LEN_BYTE_ARRAY); + assert_eq!(col.type_length(), 16); + assert_ne!(col.converted_type(), ConvertedType::INTERVAL); + assert_eq!(col.logical_type_ref(), None); + } + + #[test] + fn test_interval_month_day_nano_coerce_types_true() { + let arrow_schema = Schema::new(vec![Field::new( + "interval", + DataType::Interval(IntervalUnit::MonthDayNano), + false, + )]); + + let parquet_schema = ArrowSchemaConverter::new() + .with_coerce_types(true) + .convert(&arrow_schema) + .unwrap(); + + let col = parquet_schema.column(0); + assert_eq!(col.physical_type(), PhysicalType::FIXED_LEN_BYTE_ARRAY); + assert_eq!(col.type_length(), 12); + assert_eq!(col.converted_type(), ConvertedType::INTERVAL); + } } diff --git a/parquet/src/arrow/schema/primitive.rs b/parquet/src/arrow/schema/primitive.rs index 8959081bcb41..1d03e7f08fac 100644 --- a/parquet/src/arrow/schema/primitive.rs +++ b/parquet/src/arrow/schema/primitive.rs @@ -85,6 +85,9 @@ fn apply_hint(parquet: DataType, hint: DataType) -> DataType { // Determine interval time unit (#1666) (DataType::Interval(_), DataType::Interval(_)) => hint, + // Raw MonthDayNano stored as FixedSizeBinary(16) without INTERVAL converted type + (DataType::FixedSizeBinary(16), DataType::Interval(IntervalUnit::MonthDayNano)) => hint, + // Promote to Decimal256 or narrow to Decimal32 or Decimal64 (DataType::Decimal128(_, _), DataType::Decimal32(_, _)) => hint, (DataType::Decimal128(_, _), DataType::Decimal64(_, _)) => hint,