Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 93 additions & 3 deletions parquet-variant-compute/src/variant_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ use arrow::{
datatypes::Field,
error::Result,
};
use arrow_schema::extension::ExtensionType;
use arrow_schema::{ArrowError, DataType, FieldRef};
use parquet_variant::{VariantPath, VariantPathElement};

use crate::VariantArray;
use crate::variant_array::BorrowedShreddingState;
use crate::variant_to_arrow::make_variant_to_arrow_row_builder;
use crate::{VariantArray, VariantType, unshred_variant};

use arrow::array::AsArray;
use std::sync::Arc;
Expand Down Expand Up @@ -109,6 +110,11 @@ pub(crate) fn follow_shredded_path_element<'a>(
}
}

fn is_variant_extension(field: &Field) -> bool {
field.extension_type_name() == Some(VariantType::NAME)
&& field.try_extension_type::<VariantType>().is_ok()
}

/// Follows the given path as far as possible through shredded variant fields. If the path ends on a
/// shredded field, return it directly. Otherwise, use a row shredder to follow the rest of the path
/// and extract the requested value on a per-row basis.
Expand All @@ -131,7 +137,22 @@ fn shredded_get_path(
// Helper that shreds a VariantArray to a specific type.
let shred_basic_variant =
|target: VariantArray, path: VariantPath<'_>, as_field: Option<&Field>| {
let as_type = as_field.map(|f| f.data_type());
let requested_variant = as_field.is_some_and(is_variant_extension);
let target = if requested_variant {
unshred_variant(&target)?
} else {
target
};

if requested_variant && path.is_empty() {
return Ok(ArrayRef::from(target));
}

let as_type = if requested_variant {
None
} else {
as_field.map(|f| f.data_type())
};
let mut builder = make_variant_to_arrow_row_builder(
target.metadata_field(),
path,
Expand Down Expand Up @@ -179,6 +200,16 @@ fn shredded_get_path(
}
ShreddedPathStep::Missing => {
let num_rows = input.len();
if as_field.is_some_and(is_variant_extension) {
let all_nulls = Some(arrow::buffer::NullBuffer::from(vec![false; num_rows]));
let arr = VariantArray::from_parts(
input.metadata_field().clone(),
None,
None,
all_nulls,
);
return Ok(ArrayRef::from(arr));
}
let arr = match as_field.map(|f| f.data_type()) {
Some(data_type) => array::new_null_array(data_type, num_rows),
None => Arc::new(array::NullArray::new(num_rows)) as _,
Expand Down Expand Up @@ -222,7 +253,9 @@ fn shredded_get_path(
//
// For shredded/partially-shredded targets (`typed_value` present), recurse into each field
// separately to take advantage of deeper shredding in child fields.
if let DataType::Struct(fields) = as_field.data_type() {
if !is_variant_extension(as_field)
&& let DataType::Struct(fields) = as_field.data_type()
{
if target.typed_value_field().is_none() {
return shred_basic_variant(target, VariantPath::default(), Some(as_field));
}
Expand Down Expand Up @@ -2038,6 +2071,63 @@ mod test {
println!("Nested path 'a.x' result: {:?}", result);
}

#[test]
fn test_variant_get_as_variant_from_unshredded_input() {
let (unshredded, _) = create_variant_get_as_variant_test_data();
assert_variant_field_extraction_returns_unshredded_variant(&unshredded);
}

#[test]
fn test_variant_get_as_variant_from_shredded_input() {
let (_, shredded) = create_variant_get_as_variant_test_data();
assert_variant_field_extraction_returns_unshredded_variant(&shredded);
}

fn create_variant_get_as_variant_test_data() -> (ArrayRef, ArrayRef) {
let input_json: ArrayRef = Arc::new(StringArray::from(vec![
Some(r#"{"field_name": {"k": 100000}}"#),
Some(r#"{"field_name": {"k": "s"}}"#),
]));

let unshredded = ArrayRef::from(json_to_variant(&input_json).unwrap());
let unshredded_variant = VariantArray::try_new(&unshredded).unwrap();

let as_type = DataType::Struct(Fields::from(vec![Field::new(
"field_name",
DataType::Struct(Fields::from(vec![Field::new("k", DataType::Int32, true)])),
true,
)]));
let shredded = ArrayRef::from(shred_variant(&unshredded_variant, &as_type).unwrap());

(unshredded, shredded)
}

fn assert_variant_field_extraction_returns_unshredded_variant(input: &ArrayRef) {
let variant_output = VariantArray::try_new(input).unwrap().field("result");
let options = GetOptions::new_with_path(VariantPath::try_from("field_name").unwrap())
.with_as_type(Some(FieldRef::from(variant_output)));

let result = variant_get(input, options).unwrap();
let result_variant = VariantArray::try_new(&result).unwrap();

assert!(result_variant.typed_value_field().is_none());
assert!(result_variant.value_field().is_some());

let expected_json: ArrayRef = Arc::new(StringArray::from(vec![
Some(r#"{"k":100000}"#),
Some(r#"{"k":"s"}"#),
]));
let expected = json_to_variant(&expected_json).unwrap();

assert_eq!(result_variant.len(), expected.len());
for i in 0..result_variant.len() {
assert_eq!(result_variant.is_null(i), expected.is_null(i));
if !result_variant.is_null(i) {
assert_eq!(result_variant.value(i), expected.value(i));
}
}
}

/// Create test data for depth 0 (direct field access)
/// [{"x": 42}, {"x": "foo"}, {"y": 10}]
fn create_depth_0_test_data() -> ArrayRef {
Expand Down
Loading