Skip to content
36 changes: 33 additions & 3 deletions parquet/src/arrow/arrow_writer/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ use crate::column::writer::encoder::{
ColumnValueEncoder, DataPageValues, DictionaryPage, create_bloom_filter,
};
use crate::data_type::{AsBytes, ByteArray, Int32Type};
use crate::encodings::encoding::{DeltaBitPackEncoder, Encoder};
use crate::encodings::encoding::{DeltaBitPackEncoder, Encoder, PlainDataSizeCounter};
use crate::encodings::rle::RleEncoder;
use crate::errors::{ParquetError, Result};
use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
use crate::file::properties::{
DictionaryFallback, EnabledStatistics, WriterProperties, WriterVersion,
};
use crate::geospatial::accumulator::{GeoStatsAccumulator, try_new_geo_stats_accumulator};
use crate::geospatial::statistics::GeospatialStatistics;
use crate::schema::types::ColumnDescPtr;
Expand Down Expand Up @@ -421,6 +423,7 @@ impl DictEncoder {
pub struct ByteArrayEncoder {
fallback: FallbackEncoder,
dict_encoder: Option<DictEncoder>,
plain_data_size_counter: Option<PlainDataSizeCounter>,
statistics_enabled: EnabledStatistics,
min_value: Option<ByteArray>,
max_value: Option<ByteArray>,
Expand All @@ -446,6 +449,17 @@ impl ColumnValueEncoder for ByteArrayEncoder {
.dictionary_enabled(descr.path())
.then(DictEncoder::default);

let plain_data_size_counter = match props.dictionary_fallback(descr.path()) {
DictionaryFallback::OnPageSizeLimit => None,
DictionaryFallback::OnUnfavorableCompression => {
if dictionary.is_some() {
Some(PlainDataSizeCounter::new(descr))
} else {
None
}
}
};

let fallback = FallbackEncoder::new(descr, props)?;

let (bloom_filter, bloom_filter_target_fpp) = create_bloom_filter(props, descr)?;
Expand All @@ -460,6 +474,7 @@ impl ColumnValueEncoder for ByteArrayEncoder {
bloom_filter,
bloom_filter_target_fpp,
dict_encoder: dictionary,
plain_data_size_counter,
min_value: None,
max_value: None,
geo_stats_accumulator,
Expand Down Expand Up @@ -510,6 +525,11 @@ impl ColumnValueEncoder for ByteArrayEncoder {
Some(self.dict_encoder.as_ref()?.estimated_dict_page_size())
}

fn plain_encoded_data_size(&self) -> Option<usize> {
let counter = self.plain_data_size_counter.as_ref()?;
Some(counter.plain_encoded_data_size())
}

/// Returns an estimate of the data page size in bytes
///
/// This includes:
Expand All @@ -530,6 +550,8 @@ impl ColumnValueEncoder for ByteArrayEncoder {
));
}

self.plain_data_size_counter = None;

Ok(Some(encoder.flush_dict_page()))
}
_ => Ok(None),
Expand Down Expand Up @@ -582,7 +604,15 @@ where
}

match &mut encoder.dict_encoder {
Some(dict_encoder) => dict_encoder.encode(values, indices),
Some(dict_encoder) => {
dict_encoder.encode(values, indices);
if let Some(counter) = encoder.plain_data_size_counter.as_mut() {
for idx in indices {
let value = values.value(*idx);
counter.update_byte_array(value.as_ref());
}
}
}
None => encoder.fallback.encode(values, indices),
}
}
Expand Down
139 changes: 127 additions & 12 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1691,7 +1691,7 @@ mod tests {
use crate::data_type::AsBytes;
use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaData, ParquetMetaDataReader};
use crate::file::properties::{
BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
BloomFilterPosition, DictionaryFallback, EnabledStatistics, ReaderProperties, WriterVersion,
};
use crate::file::serialized_reader::ReadOptionsBuilder;
use crate::file::{
Expand Down Expand Up @@ -2572,6 +2572,74 @@ mod tests {
);
}

#[test]
fn arrow_writer_dictionary_fallback_on_unfavorable_compression() {
let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));

let mut builder = StringBuilder::with_capacity(100, 329 * 10_000);

// Generate an array of 10 unique 10 character strings.
// This results in a dictionary encoding larger than the plain encoded data,
// which should trigger a fallback to PLAIN encoding.
for i in 0..10 {
let value = i
.to_string()
.repeat(10)
.chars()
.take(10)
.collect::<String>();

builder.append_value(value);
}

let array = Arc::new(builder.finish());

let batch = RecordBatch::try_new(schema, vec![array]).unwrap();

let file = tempfile::tempfile().unwrap();

// Set dictionary fallback to trigger fallback to PLAIN encoding on unfavorable compression
let props = WriterProperties::builder()
.set_dictionary_fallback(DictionaryFallback::OnUnfavorableCompression)
.set_data_page_size_limit(1)
.set_write_batch_size(1)
.build();

let mut writer =
ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
.expect("Unable to write file");
writer.write(&batch).unwrap();
writer.close().unwrap();

let options = ReadOptionsBuilder::new().with_page_index().build();
let reader =
SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap();

let column = reader.metadata().row_group(0).columns();

assert_eq!(column.len(), 1);

// We should write one row before falling back to PLAIN encoding so there should still be a
// dictionary page.
assert!(
column[0].dictionary_page_offset().is_some(),
"Expected a dictionary page"
);

assert!(reader.metadata().offset_index().is_some());
let offset_indexes = &reader.metadata().offset_index().unwrap()[0];

let page_locations = offset_indexes[0].page_locations.clone();

// We should fallback to PLAIN encoding after the first row and our max page size is 1 bytes
// so we expect one dictionary encoded page and then a page per row thereafter.
assert_eq!(
page_locations.len(),
10,
"Expected 10 pages but got {page_locations:#?}"
);
}

#[test]
fn arrow_writer_float_nans() {
let f16_field = Field::new("a", DataType::Float16, false);
Expand Down Expand Up @@ -4789,6 +4857,15 @@ mod tests {
assert_eq!(chunk_page_stats, file_page_stats);
}

fn get_dict_page_size(meta: &ColumnChunkMetaData, data: Bytes) -> usize {
let mut reader = SerializedPageReader::new(Arc::new(data), meta, 0, None).unwrap();
let page = reader.get_next_page().unwrap().unwrap();
match page {
Page::DictionaryPage { buf, .. } => buf.len(),
_ => panic!("expected DictionaryPage"),
}
}

#[test]
fn test_different_dict_page_size_limit() {
let array = Arc::new(Int64Array::from_iter(0..1024 * 1024));
Expand All @@ -4813,18 +4890,56 @@ mod tests {
let col0_meta = metadata.row_group(0).column(0);
let col1_meta = metadata.row_group(0).column(1);

let get_dict_page_size = move |meta: &ColumnChunkMetaData| {
let mut reader =
SerializedPageReader::new(Arc::new(data.clone()), meta, 0, None).unwrap();
let page = reader.get_next_page().unwrap().unwrap();
match page {
Page::DictionaryPage { buf, .. } => buf.len(),
_ => panic!("expected DictionaryPage"),
}
};
assert_eq!(get_dict_page_size(col0_meta, data.clone()), 1024 * 1024);
assert_eq!(get_dict_page_size(col1_meta, data.clone()), 1024 * 1024 * 4);
}

#[test]
fn test_dict_page_size_decided_by_compression_fallback() {
Copy link
Copy Markdown
Contributor

@etseidl etseidl Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a test, I saved the output from this and examined the sizing. Without the heuristic, the encoded size for col0 is 8658384 bytes (the default fallback mechanism kicked in after 7 pages). With the heuristic, col1 is 8391126 bytes, a savings of 3%.

I also modified the test to mod the index with 32767. In that instance, col1 was still 8391126 bytes, but col0 was only 2231581, nearly 4X smaller.

I know this is not entirely representative, but it does again point out the pitfalls of too simplistic an approach.

Edit: I did a test of spark with the latter file (32k cardinality). By default, it opts to fallback for all pages, so the file is even larger. If I modify the global parquet.page.row.count.limit to 132000, it then opts for dictionary encoding as it should.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have modified the test in 1b6dd37 to demonstrate a case when even an early fallback decision brings about 12% compression. But I generally agree with your assessment, so more work is needed.

Another quirk is seen in this test: a dictionary page is still flushed to encode the first data page, even though there is no benefit. Parquet-java takes care to hand over the accumulated values to the plain encoder to be re-encoded.

// Generate values that are well dispersed across a range approximating (0..256 * 1024)
let array = Arc::new(Int32Array::from_iter(
(0i32..1024 * 1024).map(|x| x.wrapping_mul(163019) % 262139),
));
let schema = Arc::new(Schema::new(vec![Field::new(
"col0",
arrow_schema::DataType::Int32,
false,
)]));
let batch = arrow_array::RecordBatch::try_new(schema.clone(), vec![array]).unwrap();

let props = WriterProperties::builder()
.set_dictionary_page_size_limit(1024 * 1024)
.build();
let mut writer = ArrowWriter::try_new(Vec::new(), schema.clone(), Some(props)).unwrap();
writer.write(&batch).unwrap();
let data = Bytes::from(writer.into_inner().unwrap());

// println!("file length, dictionary: {}", data.len());

let mut metadata = ParquetMetaDataReader::new();
metadata.try_parse(&data).unwrap();
let metadata = metadata.finish().unwrap();
let full_dict_meta = metadata.row_group(0).column(0);
assert_eq!(get_dict_page_size(full_dict_meta, data.clone()), 1_048_576);

let props = WriterProperties::builder()
.set_dictionary_page_size_limit(1024 * 1024)
.set_column_dictionary_fallback(
ColumnPath::from("col0"),
DictionaryFallback::OnUnfavorableCompression,
)
.build();
let mut writer = ArrowWriter::try_new(Vec::new(), schema.clone(), Some(props)).unwrap();
writer.write(&batch).unwrap();
let data = Bytes::from(writer.into_inner().unwrap());

// println!("file length, fallback: {}", data.len());

assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024);
assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
let mut metadata = ParquetMetaDataReader::new();
metadata.try_parse(&data).unwrap();
let metadata = metadata.finish().unwrap();
let fallback_meta = metadata.row_group(0).column(0);
assert_eq!(get_dict_page_size(fallback_meta, data.clone()), 4096);
}

struct WriteBatchesShape {
Expand Down
37 changes: 34 additions & 3 deletions parquet/src/column/writer/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ use crate::column::writer::{
};
use crate::data_type::DataType;
use crate::data_type::private::ParquetValueType;
use crate::encodings::encoding::{DictEncoder, Encoder, get_encoder};
use crate::encodings::encoding::{DictEncoder, Encoder, PlainDataSizeCounter, get_encoder};
use crate::errors::{ParquetError, Result};
use crate::file::properties::{EnabledStatistics, WriterProperties};
use crate::file::properties::{DictionaryFallback, EnabledStatistics, WriterProperties};
use crate::geospatial::accumulator::{GeoStatsAccumulator, try_new_geo_stats_accumulator};
use crate::geospatial::statistics::GeospatialStatistics;
use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
Expand Down Expand Up @@ -109,6 +109,12 @@ pub trait ColumnValueEncoder {
/// <already_written_encoded_byte_size> + <estimated_encoded_size_of_unflushed_bytes>
fn estimated_data_page_size(&self) -> usize;

/// Returns the estimated size of plainly encoded data, in bytes,
/// that would be written without a dictionary.
/// If there is no dictionary, or the data size statistic is not available,
/// returns `None`.
fn plain_encoded_data_size(&self) -> Option<usize>;

/// Flush the dictionary page for this column chunk if any. Any subsequent calls to
/// [`Self::write`] will not be dictionary encoded
///
Expand All @@ -132,6 +138,7 @@ pub trait ColumnValueEncoder {
pub struct ColumnValueEncoderImpl<T: DataType> {
encoder: Box<dyn Encoder<T>>,
dict_encoder: Option<DictEncoder<T>>,
plain_data_size_counter: Option<PlainDataSizeCounter>,
descr: ColumnDescPtr,
num_values: usize,
statistics_enabled: EnabledStatistics,
Expand Down Expand Up @@ -176,7 +183,13 @@ impl<T: DataType> ColumnValueEncoderImpl<T> {
}

match &mut self.dict_encoder {
Some(encoder) => encoder.put(slice),
Some(encoder) => {
encoder.put(slice)?;
if let Some(counter) = self.plain_data_size_counter.as_mut() {
counter.update(slice);
}
Ok(())
}
_ => self.encoder.put(slice),
}
}
Expand All @@ -197,6 +210,16 @@ impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
let dict_supported = props.dictionary_enabled(descr.path())
&& has_dictionary_support(T::get_physical_type(), props);
let dict_encoder = dict_supported.then(|| DictEncoder::new(descr.clone()));
let plain_data_size_counter = match props.dictionary_fallback(descr.path()) {
DictionaryFallback::OnPageSizeLimit => None,
DictionaryFallback::OnUnfavorableCompression => {
if dict_encoder.is_some() {
Some(PlainDataSizeCounter::new(descr))
} else {
None
}
}
};

// Set either main encoder or fallback encoder.
let encoder = get_encoder(
Expand All @@ -215,6 +238,7 @@ impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
Ok(Self {
encoder,
dict_encoder,
plain_data_size_counter,
descr: descr.clone(),
num_values: 0,
statistics_enabled,
Expand Down Expand Up @@ -277,6 +301,11 @@ impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
Some(self.dict_encoder.as_ref()?.dict_encoded_size())
}

fn plain_encoded_data_size(&self) -> Option<usize> {
let counter = self.plain_data_size_counter.as_ref()?;
Some(counter.plain_encoded_data_size())
}

fn estimated_data_page_size(&self) -> usize {
match &self.dict_encoder {
Some(encoder) => encoder.estimated_data_encoded_size(),
Expand All @@ -293,6 +322,8 @@ impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
));
}

self.plain_data_size_counter = None;

let buf = encoder.write_dict()?;

Ok(Some(DictionaryPage {
Expand Down
33 changes: 24 additions & 9 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -746,18 +746,33 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {

/// Returns true if we need to fall back to non-dictionary encoding.
///
/// We can only fall back if dictionary encoder is set and we have exceeded dictionary
/// size.
#[inline]
/// The behavior is governed by the `dictionary_fallback` column property.
fn should_dict_fallback(&self) -> bool {
match self.encoder.estimated_dict_page_size() {
Some(size) => {
size >= self
.props
.column_dictionary_page_size_limit(self.descr.path())
let dict_size = match self.encoder.estimated_dict_page_size() {
Some(size) => size,
None => return false,
};

// First check: dictionary size exceeds limit
if dict_size
>= self
.props
.column_dictionary_page_size_limit(self.descr.path())
{
return true;
}

// Second check, if enabled: the compression heuristic.
// For similar logic in parquet-java,
// see DictionaryValuesWriter.isCompressionSatisfying
if let Some(raw_size) = self.encoder.plain_encoded_data_size() {
let encoded_size = self.encoder.estimated_data_page_size();
if encoded_size + dict_size >= raw_size {
return true;
}
None => false,
}

false
}

/// Returns true if there is enough data for a data page, false otherwise.
Expand Down
Loading
Loading