Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 123 additions & 24 deletions parquet/src/arrow/array_reader/row_number.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,21 @@ use crate::file::metadata::{ParquetMetaData, RowGroupMetaData};
use arrow_array::{ArrayRef, Int64Array};
use arrow_schema::DataType;
use std::any::Any;
use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};
use std::ops::Range;
use std::sync::Arc;

/// Tracks row numbers within a Parquet file and emits them as an `Int64Array`.
pub(crate) struct RowNumberReader {
buffered_row_numbers: Vec<i64>,
remaining_row_numbers: std::iter::Flatten<std::vec::IntoIter<std::ops::Range<i64>>>,
/// Pre-computed row ranges that are not read yet.
///
/// This reader only keeps track of the ranges of row numbers for each row group. The range is
/// not materialized into a full array until it's needed.
remaining_row_ranges: VecDeque<Range<i64>>,
/// Row ranges read but not emitted.
///
/// These are either full or partial (split) row ranges taken from `remaining_row_ranges`.
buffered_row_ranges: Vec<Range<i64>>,
}

impl RowNumberReader {
Expand All @@ -49,7 +58,7 @@ impl RowNumberReader {
// Pass 2: Build ranges in the order specified by the row_groups iterator
// This is O(N) where N is the number of selected row groups
// This preserves the user's requested order instead of sorting by ordinal
let ranges: Vec<_> = row_groups
let ranges: VecDeque<_> = row_groups
.map(|rg| {
let ordinal = rg.ordinal().ok_or_else(|| {
ParquetError::General(
Expand All @@ -70,23 +79,52 @@ impl RowNumberReader {
.collect::<Result<_>>()?;

Ok(Self {
buffered_row_numbers: Vec::new(),
remaining_row_numbers: ranges.into_iter().flatten(),
buffered_row_ranges: Vec::new(),
remaining_row_ranges: ranges,
})
}

/// Take up to `count` rows from the first range, splitting it if needed.
///
/// Returns `None` if no ranges remain.
fn take_range(&mut self, count: usize) -> Option<Range<i64>> {
let first = self.remaining_row_ranges.front_mut()?;
if (first.end - first.start) <= count as i64 {
// take out the full range
self.remaining_row_ranges.pop_front()
} else {
// first range has more rows than we need.
// so we split the range and put the remaining back.
let split = first.start + count as i64;
let taken = first.start..split;
first.start = split;
Some(taken)
}
}
}

impl ArrayReader for RowNumberReader {
fn read_records(&mut self, batch_size: usize) -> Result<usize> {
let starting_len = self.buffered_row_numbers.len();
self.buffered_row_numbers
.extend((&mut self.remaining_row_numbers).take(batch_size));
Ok(self.buffered_row_numbers.len() - starting_len)
let mut remaining = batch_size;
while remaining > 0 {
let Some(range) = self.take_range(remaining) else {
break;
};
remaining -= (range.end - range.start) as usize;
self.buffered_row_ranges.push(range);
}
Ok(batch_size - remaining)
}

fn skip_records(&mut self, num_records: usize) -> Result<usize> {
// TODO: Use advance_by when it stabilizes to improve performance
Ok((&mut self.remaining_row_numbers).take(num_records).count())
let mut remaining = num_records;
while remaining > 0 {
let Some(range) = self.take_range(remaining) else {
break;
};
remaining -= (range.end - range.start) as usize;
}
Ok(num_records - remaining)
}

fn as_any(&self) -> &dyn Any {
Expand All @@ -98,9 +136,18 @@ impl ArrayReader for RowNumberReader {
}

fn consume_batch(&mut self) -> Result<ArrayRef> {
Ok(Arc::new(Int64Array::from_iter(
self.buffered_row_numbers.drain(..),
)))
let total_rows: i64 = self
.buffered_row_ranges
.iter()
.map(|range| range.end - range.start)
.sum();
let mut result = Vec::with_capacity(total_rows as usize);

for range in self.buffered_row_ranges.drain(..) {
result.extend(range);
}

Ok(Arc::new(Int64Array::from(result)))
}

fn get_def_levels(&self) -> Option<&[i16]> {
Expand Down Expand Up @@ -168,6 +215,21 @@ mod tests {
ParquetMetaData::new(file_metadata, row_group_metas)
}

fn consume_row_numbers(reader: &mut RowNumberReader) -> Vec<i64> {
let array = reader.consume_batch().unwrap();
array
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.values()
.to_vec()
}

fn reader_for_all(metadata: &ParquetMetaData) -> RowNumberReader {
let all_rgs: Vec<_> = metadata.row_groups().iter().collect();
RowNumberReader::try_new(metadata, all_rgs.into_iter()).unwrap()
}

#[test]
fn test_row_number_reader_reverse_order() {
// Create metadata with 3 row groups, each with 2 rows
Expand All @@ -191,16 +253,53 @@ mod tests {
let num_read = reader.read_records(6).unwrap();
assert_eq!(num_read, 4); // Should read 4 rows total (2 from each selected group)

let array = reader.consume_batch().unwrap();
let row_numbers = array.as_any().downcast_ref::<Int64Array>().unwrap();

// Expected: row group 2 first (rows 4-5), then row group 0 (rows 0-1)
let expected = vec![4, 5, 0, 1];
let actual: Vec<i64> = row_numbers.iter().map(|v| v.unwrap()).collect();
assert_eq!(consume_row_numbers(&mut reader), vec![4, 5, 0, 1]);
}

assert_eq!(
actual, expected,
"Row numbers should match the order of selected row groups, not file order"
);
#[test]
fn test_range_splitting_across_batches() {
// One row group with 10 rows
let metadata = create_test_parquet_metadata(vec![(0, 10)]);
let mut reader = reader_for_all(&metadata);

assert_eq!(reader.read_records(3).unwrap(), 3);
assert_eq!(consume_row_numbers(&mut reader), vec![0, 1, 2]);

assert_eq!(reader.read_records(3).unwrap(), 3);
assert_eq!(consume_row_numbers(&mut reader), vec![3, 4, 5]);

assert_eq!(reader.read_records(3).unwrap(), 3);
assert_eq!(consume_row_numbers(&mut reader), vec![6, 7, 8]);

// Only 1 row left, requesting 3
assert_eq!(reader.read_records(3).unwrap(), 1);
assert_eq!(consume_row_numbers(&mut reader), vec![9]);
}

#[test]
fn test_interleaved_skip_and_read() {
// Row group 0: rows 0..5
// Row group 1: rows 5..10
let metadata = create_test_parquet_metadata(vec![(0, 5), (1, 5)]);
let mut reader = reader_for_all(&metadata);

assert_eq!(reader.skip_records(2).unwrap(), 2); // skip [0,1]
assert_eq!(reader.read_records(2).unwrap(), 2); // read [2,3]
assert_eq!(reader.skip_records(3).unwrap(), 3); // skip [4] then [5,6]
assert_eq!(reader.read_records(3).unwrap(), 3); // read [7,8,9]

assert_eq!(consume_row_numbers(&mut reader), vec![2, 3, 7, 8, 9]);
}

#[test]
fn test_skip_then_read() {
// One row group with 10 rows
let metadata = create_test_parquet_metadata(vec![(0, 10)]);
let mut reader = reader_for_all(&metadata);

assert_eq!(reader.skip_records(3).unwrap(), 3);
assert_eq!(reader.read_records(4).unwrap(), 4);
assert_eq!(consume_row_numbers(&mut reader), vec![3, 4, 5, 6]);
}
}
Loading