Skip to content

Commit f9c7b35

Browse files
Fix typed UDFs (variant_get_str, etc.) to handle shredded variants
Use variant_get kernel for path navigation in invoke_variant_get_typed instead of VariantArray::iter() + Variant::get_path(). The iter() API panics on shredded variant arrays with nested typed_value structs. The variant_get kernel correctly navigates shredded structures and returns simple sub-variant arrays that iter() can handle.
1 parent 5714b74 commit f9c7b35

File tree

1 file changed

+48
-38
lines changed

1 file changed

+48
-38
lines changed

src/shared.rs

Lines changed: 48 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use datafusion::error::Result;
1212
use datafusion::logical_expr::{ColumnarValue, ScalarFunctionArgs};
1313
use datafusion::{common::exec_err, scalar::ScalarValue};
1414
use parquet_variant::Variant;
15-
use parquet_variant_compute::{VariantArray, VariantType};
15+
use parquet_variant_compute::{GetOptions, VariantArray, VariantType, variant_get};
1616

1717
use crate::variant_get::PathMode;
1818

@@ -126,49 +126,43 @@ pub fn try_parse_string_columnar(array: &Arc<dyn Array>) -> Result<Vec<Option<&s
126126
Err(exec_datafusion_err!("expected string array"))
127127
}
128128

129-
pub fn variant_get_single_value<T>(
130-
variant_array: &VariantArray,
131-
index: usize,
132-
path: &str,
133-
path_mode: PathMode,
134-
extract: for<'m, 'v> fn(Variant<'m, 'v>) -> Result<Option<T>>,
135-
) -> Result<Option<T>> {
136-
let Some(variant) = variant_array.iter().nth(index).flatten() else {
137-
return Ok(None);
138-
};
139-
140-
let variant_path = path_mode.try_build_path(path)?;
141-
let Some(value) = variant.get_path(&variant_path) else {
142-
return Ok(None);
143-
};
144-
145-
extract(value)
146-
}
147-
148-
pub fn variant_get_array_values<T>(
149-
variant_array: &VariantArray,
129+
/// Navigate to a path within a variant array and extract values.
130+
///
131+
/// Uses `variant_get` for path navigation (which correctly handles shredded variants),
132+
/// then iterates the resulting simple sub-variant array to extract typed values.
133+
pub fn variant_get_and_extract<T>(
134+
input: &ArrayRef,
150135
path: &str,
151136
path_mode: PathMode,
152137
extract: for<'m, 'v> fn(Variant<'m, 'v>) -> Result<Option<T>>,
153138
) -> Result<Vec<Option<T>>> {
154139
let variant_path = path_mode.try_build_path(path)?;
140+
let options = GetOptions::new_with_path(variant_path);
141+
let sub_variant_arr = variant_get(input, options)?;
155142

156-
variant_array
143+
let sub_variant = VariantArray::try_new(sub_variant_arr.as_ref())?;
144+
sub_variant
157145
.iter()
158146
.map(|maybe_variant| {
159147
let Some(variant) = maybe_variant else {
160148
return Ok(None);
161149
};
162-
163-
let Some(value) = variant.get_path(&variant_path) else {
164-
return Ok(None);
165-
};
166-
167-
extract(value)
150+
extract(variant)
168151
})
169152
.collect()
170153
}
171154

155+
/// Navigate to a path within a single-row variant and extract a value.
156+
pub fn variant_get_and_extract_single<T>(
157+
input: &ArrayRef,
158+
path: &str,
159+
path_mode: PathMode,
160+
extract: for<'m, 'v> fn(Variant<'m, 'v>) -> Result<Option<T>>,
161+
) -> Result<Option<T>> {
162+
let values = variant_get_and_extract(input, path, path_mode, extract)?;
163+
Ok(values.into_iter().next().flatten())
164+
}
165+
172166
pub fn invoke_variant_get_typed<T>(
173167
args: ScalarFunctionArgs,
174168
scalar_from_option: fn(Option<T>) -> ScalarValue,
@@ -194,8 +188,7 @@ pub fn invoke_variant_get_typed<T>(
194188
.map(|s| s.as_str())
195189
.unwrap_or_default();
196190

197-
let variant_array = VariantArray::try_new(variant_array.as_ref())?;
198-
let values = variant_get_array_values(&variant_array, path, path_mode, extract)?;
191+
let values = variant_get_and_extract(variant_array, path, path_mode, extract)?;
199192
ColumnarValue::Array(array_from_values(values))
200193
}
201194
(ColumnarValue::Scalar(scalar_variant), ColumnarValue::Scalar(path_scalar)) => {
@@ -207,8 +200,8 @@ pub fn invoke_variant_get_typed<T>(
207200
.map(|s| s.as_str())
208201
.unwrap_or_default();
209202

210-
let variant_array = VariantArray::try_new(variant_array.as_ref())?;
211-
let value = variant_get_single_value(&variant_array, 0, path, path_mode, extract)?;
203+
let arr = Arc::clone(variant_array) as ArrayRef;
204+
let value = variant_get_and_extract_single(&arr, path, path_mode, extract)?;
212205

213206
ColumnarValue::Scalar(scalar_from_option(value))
214207
}
@@ -217,13 +210,23 @@ pub fn invoke_variant_get_typed<T>(
217210
return exec_err!("expected variant array and paths to be of same length");
218211
}
219212

213+
// Per-row paths: must navigate each row individually
220214
let paths = try_parse_string_columnar(paths)?;
221215
let variant_array = VariantArray::try_new(variant_array.as_ref())?;
222216

223-
let values: Vec<Option<T>> = (0..variant_array.len())
224-
.map(|i| {
225-
let path = paths[i].unwrap_or_default();
226-
variant_get_single_value(&variant_array, i, path, path_mode, extract)
217+
let values: Vec<Option<T>> = variant_array
218+
.iter()
219+
.zip(paths.iter())
220+
.map(|(maybe_variant, path)| {
221+
let Some(variant) = maybe_variant else {
222+
return Ok(None);
223+
};
224+
let path = path.unwrap_or_default();
225+
let variant_path = path_mode.try_build_path(path)?;
226+
let Some(value) = variant.get_path(&variant_path) else {
227+
return Ok(None);
228+
};
229+
extract(value)
227230
})
228231
.collect::<Result<_>>()?;
229232

@@ -241,7 +244,14 @@ pub fn invoke_variant_get_typed<T>(
241244
.iter()
242245
.map(|path| {
243246
let path = path.unwrap_or_default();
244-
variant_get_single_value(&variant_array, 0, path, path_mode, extract)
247+
let Some(variant) = variant_array.iter().next().flatten() else {
248+
return Ok(None);
249+
};
250+
let variant_path = path_mode.try_build_path(path)?;
251+
let Some(value) = variant.get_path(&variant_path) else {
252+
return Ok(None);
253+
};
254+
extract(value)
245255
})
246256
.collect::<Result<_>>()?;
247257

0 commit comments

Comments
 (0)