From 0c7203cbe3563118244db748ce754ac6879d6d37 Mon Sep 17 00:00:00 2001 From: sdf-jkl Date: Tue, 30 Dec 2025 15:20:02 -0500 Subject: [PATCH 1/8] add arg_count_err helper --- src/is_variant_null.rs | 6 +++--- src/json_to_variant.rs | 9 +++------ src/shared.rs | 11 +++++++++-- src/variant_get.rs | 4 ++-- src/variant_list_insert.rs | 4 ++-- src/variant_object_insert.rs | 4 ++-- src/variant_pretty.rs | 15 ++++++--------- src/variant_to_json.rs | 11 ++++------- 8 files changed, 31 insertions(+), 33 deletions(-) diff --git a/src/is_variant_null.rs b/src/is_variant_null.rs index ac6a6d0..59fbdfc 100644 --- a/src/is_variant_null.rs +++ b/src/is_variant_null.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use arrow::array::{ArrayRef, BooleanArray}; use arrow_schema::DataType; -use datafusion::common::{exec_datafusion_err, exec_err}; +use datafusion::common::exec_datafusion_err; use datafusion::error::Result; use datafusion::logical_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility, @@ -11,7 +11,7 @@ use datafusion::scalar::ScalarValue; use parquet_variant::Variant; use parquet_variant_compute::VariantArray; -use crate::shared::{try_field_as_variant_array, try_parse_variant_scalar}; +use crate::shared::{args_count_err, try_field_as_variant_array, try_parse_variant_scalar}; #[derive(Debug, Hash, PartialEq, Eq)] pub struct IsVariantNullUdf { @@ -52,7 +52,7 @@ impl ScalarUDFImpl for IsVariantNullUdf { try_field_as_variant_array(variant_field.as_ref())?; let [variant_arg] = args.args.as_slice() else { - return exec_err!("expected 1 argument"); + return Err(args_count_err(1, args.args.len())); }; let out = match variant_arg { diff --git a/src/json_to_variant.rs b/src/json_to_variant.rs index 6a23c56..1d049df 100644 --- a/src/json_to_variant.rs +++ b/src/json_to_variant.rs @@ -15,7 +15,7 @@ use datafusion::{ use parquet_variant_compute::{VariantArrayBuilder, VariantType}; use parquet_variant_json::JsonToVariant as JsonToVariantExt; -use crate::shared::{try_field_as_string, try_parse_string_scalar}; +use crate::shared::{args_count_err, try_field_as_string, try_parse_string_scalar}; /// Returns a Variant from a JSON string #[derive(Debug, Hash, PartialEq, Eq)] @@ -74,14 +74,11 @@ impl ScalarUDFImpl for JsonToVariantUdf { let arg_field = args .arg_fields .first() - .ok_or_else(|| exec_datafusion_err!("empty argument, expected 1 argument"))?; + .ok_or_else(|| args_count_err(1, 0))?; try_field_as_string(arg_field.as_ref())?; - let arg = args - .args - .first() - .ok_or_else(|| exec_datafusion_err!("empty argument, expected 1 argument"))?; + let arg = args.args.first().ok_or_else(|| args_count_err(1, 0))?; let out = match arg { ColumnarValue::Scalar(scalar_value) => { diff --git a/src/shared.rs b/src/shared.rs index 8ebfb43..52ba16e 100644 --- a/src/shared.rs +++ b/src/shared.rs @@ -4,7 +4,7 @@ use arrow::array::{Array, cast::AsArray}; use arrow_schema::extension::ExtensionType; use arrow_schema::{DataType, Field}; use datafusion::common::exec_datafusion_err; -use datafusion::error::Result; +use datafusion::error::{DataFusionError, Result}; use datafusion::{common::exec_err, scalar::ScalarValue}; use parquet_variant_compute::{VariantArray, VariantType}; @@ -128,6 +128,13 @@ pub fn ensure(pred: bool, err_msg: &str) -> Result<()> { Ok(()) } +// cleaner error handling + +/// helper for argument count errors +pub fn args_count_err(expected: usize, actual: usize) -> DataFusionError { + DataFusionError::Execution(format!("expected {expected} arguments, got {actual}")) +} + // test related methods #[cfg(test)] @@ -144,7 +151,7 @@ pub fn build_variant_array_from_json(value: &serde_json::Value) -> VariantArray pub fn build_variant_array_from_json_array(jsons: &[Option]) -> VariantArray { let mut builder = VariantArrayBuilder::new(jsons.len()); - jsons.into_iter().for_each(|v| match v.as_ref() { + jsons.iter().for_each(|v| match v.as_ref() { Some(json) => builder.append_json(json.to_string().as_str()).unwrap(), None => builder.append_null(), }); diff --git a/src/variant_get.rs b/src/variant_get.rs index 43e06e9..2054326 100644 --- a/src/variant_get.rs +++ b/src/variant_get.rs @@ -17,7 +17,7 @@ use parquet_variant::VariantPath; use parquet_variant_compute::{GetOptions, VariantArray, VariantType, variant_get}; use crate::shared::{ - try_field_as_variant_array, try_parse_string_columnar, try_parse_string_scalar, + args_count_err, try_field_as_variant_array, try_parse_string_columnar, try_parse_string_scalar, }; fn type_hint_from_scalar(field_name: &str, scalar: &ScalarValue) -> Result { @@ -118,7 +118,7 @@ impl ScalarUDFImpl for VariantGetUdf { let (variant_arg, variant_path, type_arg) = match args.args.as_slice() { [variant_arg, variant_path] => (variant_arg, variant_path, None), [variant_arg, variant_path, type_arg] => (variant_arg, variant_path, Some(type_arg)), - _ => return exec_err!("expected 2 or 3 arguments"), + _ => return Err(args_count_err(1, args.args.len())), }; let variant_field = args diff --git a/src/variant_list_insert.rs b/src/variant_list_insert.rs index 8ceb8e1..8f7a707 100644 --- a/src/variant_list_insert.rs +++ b/src/variant_list_insert.rs @@ -13,7 +13,7 @@ use datafusion::{ use parquet_variant::{Variant, VariantBuilder}; use parquet_variant_compute::{VariantArray, VariantType}; -use crate::shared::{ensure, try_parse_variant_scalar}; +use crate::shared::{args_count_err, ensure, try_parse_variant_scalar}; #[derive(Debug, Hash, PartialEq, Eq)] pub struct VariantListInsert { @@ -71,7 +71,7 @@ impl ScalarUDFImpl for VariantListInsert { )?; let [variant_list_to_update, element_to_append] = argument_values.as_slice() else { - return exec_err!("expected 2 arguments"); + return Err(args_count_err(2, argument_values.len())); }; let all_arguments_variant_field = argument_fields diff --git a/src/variant_object_insert.rs b/src/variant_object_insert.rs index ad5580e..683e2a1 100644 --- a/src/variant_object_insert.rs +++ b/src/variant_object_insert.rs @@ -13,7 +13,7 @@ use datafusion::{ use parquet_variant::{Variant, VariantBuilder}; use parquet_variant_compute::{VariantArray, VariantType}; -use crate::shared::{ensure, try_parse_string_scalar, try_parse_variant_scalar}; +use crate::shared::{args_count_err, ensure, try_parse_string_scalar, try_parse_variant_scalar}; #[derive(Debug, Hash, PartialEq, Eq)] pub struct VariantObjectInsert { @@ -71,7 +71,7 @@ impl ScalarUDFImpl for VariantObjectInsert { )?; let [variant_object_to_update, key, value] = argument_values.as_slice() else { - return exec_err!("expected 3 arguments"); + return Err(args_count_err(3, argument_values.len())); }; { diff --git a/src/variant_pretty.rs b/src/variant_pretty.rs index 33e3783..1b38ab1 100644 --- a/src/variant_pretty.rs +++ b/src/variant_pretty.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use arrow::array::StringViewArray; use arrow_schema::DataType; use datafusion::{ - common::{exec_datafusion_err, exec_err}, + common::exec_err, error::Result, logical_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility, @@ -12,7 +12,7 @@ use datafusion::{ }; use parquet_variant_compute::VariantArray; -use crate::shared::try_field_as_variant_array; +use crate::shared::{args_count_err, try_field_as_variant_array}; #[derive(Debug, Hash, PartialEq, Eq)] pub struct VariantPretty { @@ -48,14 +48,11 @@ impl ScalarUDFImpl for VariantPretty { let field = args .arg_fields .first() - .ok_or_else(|| exec_datafusion_err!("empty argument, expected 1 argument"))?; + .ok_or_else(|| args_count_err(1, 0))?; try_field_as_variant_array(field.as_ref())?; - let arg = args - .args - .first() - .ok_or_else(|| exec_datafusion_err!("empty argument, expected 1 argument"))?; + let arg = args.args.first().ok_or_else(|| args_count_err(1, 0))?; let out = match arg { ColumnarValue::Scalar(scalar) => { @@ -66,7 +63,7 @@ impl ScalarUDFImpl for VariantPretty { let variant_array = VariantArray::try_new(variant_array.as_ref())?; let v = variant_array.value(0); - ColumnarValue::Scalar(ScalarValue::Utf8View(Some(format!("{:?}", v)))) + ColumnarValue::Scalar(ScalarValue::Utf8View(Some(format!("{v:?}")))) } ColumnarValue::Array(arr) => match arr.data_type() { DataType::Struct(_) => { @@ -74,7 +71,7 @@ impl ScalarUDFImpl for VariantPretty { let out = variant_array .iter() - .map(|v| v.map(|v| format!("{:?}", v))) + .map(|v| v.map(|v| format!("{v:?}"))) .collect::>(); let out: StringViewArray = out.into(); diff --git a/src/variant_to_json.rs b/src/variant_to_json.rs index 271c0b0..a2acd05 100644 --- a/src/variant_to_json.rs +++ b/src/variant_to_json.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use arrow::array::StringViewArray; use arrow_schema::DataType; use datafusion::{ - common::{exec_datafusion_err, exec_err}, + common::exec_err, error::Result, logical_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility, @@ -15,7 +15,7 @@ use datafusion::{ use parquet_variant_compute::VariantArray; use parquet_variant_json::VariantToJson; -use crate::shared::try_field_as_variant_array; +use crate::shared::{args_count_err, try_field_as_variant_array}; /// Returns a JSON string from a VariantArray /// @@ -59,14 +59,11 @@ impl ScalarUDFImpl for VariantToJsonUdf { let field = args .arg_fields .first() - .ok_or_else(|| exec_datafusion_err!("empty argument, expected 1 argument"))?; + .ok_or_else(|| args_count_err(1, 0))?; try_field_as_variant_array(field.as_ref())?; - let arg = args - .args - .first() - .ok_or_else(|| exec_datafusion_err!("empty argument, expected 1 argument"))?; + let arg = args.args.first().ok_or_else(|| args_count_err(1, 0))?; let out = match arg { ColumnarValue::Scalar(scalar) => { From 51c745df5bed19eee4f16dc22550b9a66db55fd8 Mon Sep 17 00:00:00 2001 From: sdf-jkl Date: Tue, 30 Dec 2025 15:31:25 -0500 Subject: [PATCH 2/8] add arg_count_err --- src/is_variant_null.rs | 2 +- src/json_to_variant.rs | 4 ++-- src/shared.rs | 7 +++++-- src/variant_get.rs | 2 +- src/variant_list_insert.rs | 2 +- src/variant_object_insert.rs | 2 +- src/variant_pretty.rs | 4 ++-- src/variant_to_json.rs | 4 ++-- 8 files changed, 15 insertions(+), 12 deletions(-) diff --git a/src/is_variant_null.rs b/src/is_variant_null.rs index 59fbdfc..c666a5c 100644 --- a/src/is_variant_null.rs +++ b/src/is_variant_null.rs @@ -52,7 +52,7 @@ impl ScalarUDFImpl for IsVariantNullUdf { try_field_as_variant_array(variant_field.as_ref())?; let [variant_arg] = args.args.as_slice() else { - return Err(args_count_err(1, args.args.len())); + return Err(args_count_err("1", args.args.len())); }; let out = match variant_arg { diff --git a/src/json_to_variant.rs b/src/json_to_variant.rs index 1d049df..be54e1e 100644 --- a/src/json_to_variant.rs +++ b/src/json_to_variant.rs @@ -74,11 +74,11 @@ impl ScalarUDFImpl for JsonToVariantUdf { let arg_field = args .arg_fields .first() - .ok_or_else(|| args_count_err(1, 0))?; + .ok_or_else(|| args_count_err("1", 0))?; try_field_as_string(arg_field.as_ref())?; - let arg = args.args.first().ok_or_else(|| args_count_err(1, 0))?; + let arg = args.args.first().ok_or_else(|| args_count_err("1", 0))?; let out = match arg { ColumnarValue::Scalar(scalar_value) => { diff --git a/src/shared.rs b/src/shared.rs index 52ba16e..973e6cb 100644 --- a/src/shared.rs +++ b/src/shared.rs @@ -131,10 +131,13 @@ pub fn ensure(pred: bool, err_msg: &str) -> Result<()> { // cleaner error handling /// helper for argument count errors -pub fn args_count_err(expected: usize, actual: usize) -> DataFusionError { - DataFusionError::Execution(format!("expected {expected} arguments, got {actual}")) +pub fn args_count_err(expected: &'static str, actual: usize) -> DataFusionError { + DataFusionError::Execution(format!( + "expected {expected} argument(s), got {actual}" + )) } + // test related methods #[cfg(test)] diff --git a/src/variant_get.rs b/src/variant_get.rs index 2054326..269836f 100644 --- a/src/variant_get.rs +++ b/src/variant_get.rs @@ -118,7 +118,7 @@ impl ScalarUDFImpl for VariantGetUdf { let (variant_arg, variant_path, type_arg) = match args.args.as_slice() { [variant_arg, variant_path] => (variant_arg, variant_path, None), [variant_arg, variant_path, type_arg] => (variant_arg, variant_path, Some(type_arg)), - _ => return Err(args_count_err(1, args.args.len())), + _ => return Err(args_count_err("2 or 3", args.args.len())), }; let variant_field = args diff --git a/src/variant_list_insert.rs b/src/variant_list_insert.rs index 8f7a707..6146224 100644 --- a/src/variant_list_insert.rs +++ b/src/variant_list_insert.rs @@ -71,7 +71,7 @@ impl ScalarUDFImpl for VariantListInsert { )?; let [variant_list_to_update, element_to_append] = argument_values.as_slice() else { - return Err(args_count_err(2, argument_values.len())); + return Err(args_count_err("2", argument_values.len())); }; let all_arguments_variant_field = argument_fields diff --git a/src/variant_object_insert.rs b/src/variant_object_insert.rs index 683e2a1..c1c05a3 100644 --- a/src/variant_object_insert.rs +++ b/src/variant_object_insert.rs @@ -71,7 +71,7 @@ impl ScalarUDFImpl for VariantObjectInsert { )?; let [variant_object_to_update, key, value] = argument_values.as_slice() else { - return Err(args_count_err(3, argument_values.len())); + return Err(args_count_err("3", argument_values.len())); }; { diff --git a/src/variant_pretty.rs b/src/variant_pretty.rs index 1b38ab1..5c35e4b 100644 --- a/src/variant_pretty.rs +++ b/src/variant_pretty.rs @@ -48,11 +48,11 @@ impl ScalarUDFImpl for VariantPretty { let field = args .arg_fields .first() - .ok_or_else(|| args_count_err(1, 0))?; + .ok_or_else(|| args_count_err("1", 0))?; try_field_as_variant_array(field.as_ref())?; - let arg = args.args.first().ok_or_else(|| args_count_err(1, 0))?; + let arg = args.args.first().ok_or_else(|| args_count_err("1", 0))?; let out = match arg { ColumnarValue::Scalar(scalar) => { diff --git a/src/variant_to_json.rs b/src/variant_to_json.rs index a2acd05..93b90cb 100644 --- a/src/variant_to_json.rs +++ b/src/variant_to_json.rs @@ -59,11 +59,11 @@ impl ScalarUDFImpl for VariantToJsonUdf { let field = args .arg_fields .first() - .ok_or_else(|| args_count_err(1, 0))?; + .ok_or_else(|| args_count_err("1", 0))?; try_field_as_variant_array(field.as_ref())?; - let arg = args.args.first().ok_or_else(|| args_count_err(1, 0))?; + let arg = args.args.first().ok_or_else(|| args_count_err("1", 0))?; let out = match arg { ColumnarValue::Scalar(scalar) => { From 6cabebdd0452bb17f0dfebc12ffb1cb30d9aa9ad Mon Sep 17 00:00:00 2001 From: sdf-jkl Date: Tue, 30 Dec 2025 16:13:20 -0500 Subject: [PATCH 3/8] add type_err helper --- src/is_variant_null.rs | 2 +- src/json_to_variant.rs | 6 +++--- src/shared.rs | 7 +++++++ src/variant_get.rs | 4 ++-- src/variant_pretty.rs | 7 +++---- src/variant_to_json.rs | 7 +++---- 6 files changed, 19 insertions(+), 14 deletions(-) diff --git a/src/is_variant_null.rs b/src/is_variant_null.rs index c666a5c..529189c 100644 --- a/src/is_variant_null.rs +++ b/src/is_variant_null.rs @@ -47,7 +47,7 @@ impl ScalarUDFImpl for IsVariantNullUdf { let variant_field = args .arg_fields .first() - .ok_or_else(|| exec_datafusion_err!("expected 1 argument field type"))?; + .ok_or_else(|| exec_datafusion_err!("expected 1 argument field type, got {}", args.arg_fields.len()))?; try_field_as_variant_array(variant_field.as_ref())?; diff --git a/src/json_to_variant.rs b/src/json_to_variant.rs index be54e1e..5e3fdac 100644 --- a/src/json_to_variant.rs +++ b/src/json_to_variant.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use arrow::array::{Array, ArrayRef, LargeStringArray, StringArray, StringViewArray, StructArray}; use arrow_schema::{DataType, Field, Fields}; use datafusion::{ - common::{exec_datafusion_err, exec_err}, + common::exec_datafusion_err, error::Result, logical_expr::{ ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, @@ -15,7 +15,7 @@ use datafusion::{ use parquet_variant_compute::{VariantArrayBuilder, VariantType}; use parquet_variant_json::JsonToVariant as JsonToVariantExt; -use crate::shared::{args_count_err, try_field_as_string, try_parse_string_scalar}; +use crate::shared::{args_count_err, try_field_as_string, try_parse_string_scalar, type_err}; /// Returns a Variant from a JSON string #[derive(Debug, Hash, PartialEq, Eq)] @@ -98,7 +98,7 @@ impl ScalarUDFImpl for JsonToVariantUdf { DataType::Utf8 => ColumnarValue::Array(from_utf8_arr(arr)?), DataType::LargeUtf8 => ColumnarValue::Array(from_large_utf8_arr(arr)?), DataType::Utf8View => ColumnarValue::Array(from_utf8view_arr(arr)?), - _ => return exec_err!("Invalid data type {}", arr.data_type()), + _ => return type_err("Utf8, LargeUtf8, or Utf8View", arr.data_type()), }, }; diff --git a/src/shared.rs b/src/shared.rs index 973e6cb..4eb9b45 100644 --- a/src/shared.rs +++ b/src/shared.rs @@ -137,6 +137,13 @@ pub fn args_count_err(expected: &'static str, actual: usize) -> DataFusionError )) } +// helper for argument type errors +pub fn type_err(expected: &str, actual: &DataType) -> Result { + Err(DataFusionError::Execution(format!( + "expected {expected}, got {actual:?}" + ))) +} + // test related methods diff --git a/src/variant_get.rs b/src/variant_get.rs index 269836f..4ff457d 100644 --- a/src/variant_get.rs +++ b/src/variant_get.rs @@ -17,7 +17,7 @@ use parquet_variant::VariantPath; use parquet_variant_compute::{GetOptions, VariantArray, VariantType, variant_get}; use crate::shared::{ - args_count_err, try_field_as_variant_array, try_parse_string_columnar, try_parse_string_scalar, + args_count_err, try_field_as_variant_array, try_parse_string_columnar, try_parse_string_scalar, type_err, }; fn type_hint_from_scalar(field_name: &str, scalar: &ScalarValue) -> Result { @@ -196,7 +196,7 @@ impl ScalarUDFImpl for VariantGetUdf { } (ColumnarValue::Scalar(scalar_variant), ColumnarValue::Array(variant_paths)) => { let ScalarValue::Struct(variant_array) = scalar_variant else { - return exec_err!("expected struct array"); + return type_err("Struct", &scalar_variant.data_type()); }; let variant_array = Arc::clone(variant_array) as ArrayRef; diff --git a/src/variant_pretty.rs b/src/variant_pretty.rs index 5c35e4b..75ee900 100644 --- a/src/variant_pretty.rs +++ b/src/variant_pretty.rs @@ -3,7 +3,6 @@ use std::sync::Arc; use arrow::array::StringViewArray; use arrow_schema::DataType; use datafusion::{ - common::exec_err, error::Result, logical_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility, @@ -12,7 +11,7 @@ use datafusion::{ }; use parquet_variant_compute::VariantArray; -use crate::shared::{args_count_err, try_field_as_variant_array}; +use crate::shared::{args_count_err, try_field_as_variant_array, type_err}; #[derive(Debug, Hash, PartialEq, Eq)] pub struct VariantPretty { @@ -57,7 +56,7 @@ impl ScalarUDFImpl for VariantPretty { let out = match arg { ColumnarValue::Scalar(scalar) => { let ScalarValue::Struct(variant_array) = scalar else { - return exec_err!("Unsupported data type: {}", scalar.data_type()); + return type_err("Struct", &scalar.data_type()); }; let variant_array = VariantArray::try_new(variant_array.as_ref())?; @@ -78,7 +77,7 @@ impl ScalarUDFImpl for VariantPretty { ColumnarValue::Array(Arc::new(out)) } - unsupported => return exec_err!("Invalid data type: {unsupported}"), + unsupported => return type_err("Struct", unsupported), }, }; diff --git a/src/variant_to_json.rs b/src/variant_to_json.rs index 93b90cb..23c51da 100644 --- a/src/variant_to_json.rs +++ b/src/variant_to_json.rs @@ -5,7 +5,6 @@ use std::sync::Arc; use arrow::array::StringViewArray; use arrow_schema::DataType; use datafusion::{ - common::exec_err, error::Result, logical_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility, @@ -15,7 +14,7 @@ use datafusion::{ use parquet_variant_compute::VariantArray; use parquet_variant_json::VariantToJson; -use crate::shared::{args_count_err, try_field_as_variant_array}; +use crate::shared::{args_count_err, try_field_as_variant_array, type_err}; /// Returns a JSON string from a VariantArray /// @@ -68,7 +67,7 @@ impl ScalarUDFImpl for VariantToJsonUdf { let out = match arg { ColumnarValue::Scalar(scalar) => { let ScalarValue::Struct(variant_array) = scalar else { - return exec_err!("Unsupported data type: {}", scalar.data_type()); + return type_err("Struct", &scalar.data_type()); }; let variant_array = VariantArray::try_new(variant_array.as_ref())?; @@ -88,7 +87,7 @@ impl ScalarUDFImpl for VariantToJsonUdf { ColumnarValue::Array(Arc::new(out)) } - unsupported => return exec_err!("Invalid data type: {unsupported}"), + unsupported => return type_err("Struct", unsupported), }, }; From f8019cc5fa7106aa6c9b78dce7a3d02e1e2ae71a Mon Sep 17 00:00:00 2001 From: sdf-jkl Date: Tue, 30 Dec 2025 16:24:58 -0500 Subject: [PATCH 4/8] replace exec_err with type_err --- src/variant_get.rs | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/src/variant_get.rs b/src/variant_get.rs index 4ff457d..0838fb0 100644 --- a/src/variant_get.rs +++ b/src/variant_get.rs @@ -26,10 +26,7 @@ fn type_hint_from_scalar(field_name: &str, scalar: &ScalarValue) -> Result value.as_str(), other => { - return exec_err!( - "type hint must be a non-null UTF8 literal, got {}", - other.data_type() - ); + return type_err("Utf8, LargeUtf8, or Utf8View", &other.data_type()); } }; @@ -96,7 +93,7 @@ impl ScalarUDFImpl for VariantGetUdf { fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result> { if let Some(maybe_scalar) = args.scalar_arguments.get(2) { let scalar = maybe_scalar.ok_or_else(|| { - exec_datafusion_err!("type hint argument to variant_get must be a literal") + exec_datafusion_err!("Expected Some() scalar argument got None") })?; return type_hint_from_scalar(self.name(), scalar); } @@ -147,7 +144,7 @@ impl ScalarUDFImpl for VariantGetUdf { } (ColumnarValue::Scalar(scalar_variant), ColumnarValue::Scalar(variant_path)) => { let ScalarValue::Struct(variant_array) = scalar_variant else { - return exec_err!("expected struct array"); + return type_err("Struct", &scalar_variant.data_type()); }; let variant_array = Arc::clone(variant_array) as ArrayRef; @@ -167,7 +164,7 @@ impl ScalarUDFImpl for VariantGetUdf { (ColumnarValue::Array(variant_array), ColumnarValue::Array(variant_paths)) => { if variant_array.len() != variant_paths.len() { return exec_err!( - "expected variant_array and variant paths to be of same length" + "expected variant array and variant paths array to be of same length" ); } From 8a53ce6ba86eb4280a3e59fa39c3ee0169d4b671 Mon Sep 17 00:00:00 2001 From: sdf-jkl Date: Tue, 30 Dec 2025 16:36:20 -0500 Subject: [PATCH 5/8] cargo fmt --- src/is_variant_null.rs | 10 ++++++---- src/shared.rs | 5 +---- src/variant_get.rs | 8 ++++---- 3 files changed, 11 insertions(+), 12 deletions(-) diff --git a/src/is_variant_null.rs b/src/is_variant_null.rs index 529189c..708c08a 100644 --- a/src/is_variant_null.rs +++ b/src/is_variant_null.rs @@ -44,10 +44,12 @@ impl ScalarUDFImpl for IsVariantNullUdf { } fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { - let variant_field = args - .arg_fields - .first() - .ok_or_else(|| exec_datafusion_err!("expected 1 argument field type, got {}", args.arg_fields.len()))?; + let variant_field = args.arg_fields.first().ok_or_else(|| { + exec_datafusion_err!( + "expected 1 argument field type, got {}", + args.arg_fields.len() + ) + })?; try_field_as_variant_array(variant_field.as_ref())?; diff --git a/src/shared.rs b/src/shared.rs index 4eb9b45..80ad952 100644 --- a/src/shared.rs +++ b/src/shared.rs @@ -132,9 +132,7 @@ pub fn ensure(pred: bool, err_msg: &str) -> Result<()> { /// helper for argument count errors pub fn args_count_err(expected: &'static str, actual: usize) -> DataFusionError { - DataFusionError::Execution(format!( - "expected {expected} argument(s), got {actual}" - )) + DataFusionError::Execution(format!("expected {expected} argument(s), got {actual}")) } // helper for argument type errors @@ -144,7 +142,6 @@ pub fn type_err(expected: &str, actual: &DataType) -> Result Result { @@ -92,9 +93,8 @@ impl ScalarUDFImpl for VariantGetUdf { fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result> { if let Some(maybe_scalar) = args.scalar_arguments.get(2) { - let scalar = maybe_scalar.ok_or_else(|| { - exec_datafusion_err!("Expected Some() scalar argument got None") - })?; + let scalar = maybe_scalar + .ok_or_else(|| exec_datafusion_err!("Expected Some() scalar argument got None"))?; return type_hint_from_scalar(self.name(), scalar); } From d6780212fa0ed16be91acf8a959a6462f4c65ead Mon Sep 17 00:00:00 2001 From: sdf-jkl Date: Tue, 31 Mar 2026 15:23:42 -0400 Subject: [PATCH 6/8] Improve error messaging --- src/is_variant_null.rs | 18 ++++++------- src/json_to_variant.rs | 18 ++++++++++--- src/shared.rs | 51 ++++++++++++++++++++++++++++-------- src/variant_get.rs | 47 ++++++++++++++++++++++----------- src/variant_list_insert.rs | 19 +++++++++----- src/variant_object_insert.rs | 30 ++++++++++++++++----- src/variant_pretty.rs | 15 +++++++---- src/variant_to_json.rs | 15 +++++++---- 8 files changed, 151 insertions(+), 62 deletions(-) diff --git a/src/is_variant_null.rs b/src/is_variant_null.rs index 708c08a..8b24773 100644 --- a/src/is_variant_null.rs +++ b/src/is_variant_null.rs @@ -2,7 +2,6 @@ use std::sync::Arc; use arrow::array::{ArrayRef, BooleanArray}; use arrow_schema::DataType; -use datafusion::common::exec_datafusion_err; use datafusion::error::Result; use datafusion::logical_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility, @@ -11,7 +10,10 @@ use datafusion::scalar::ScalarValue; use parquet_variant::Variant; use parquet_variant_compute::VariantArray; -use crate::shared::{args_count_err, try_field_as_variant_array, try_parse_variant_scalar}; +use crate::shared::{ + arg_field_meta_missing_err, args_count_err, try_field_as_variant_array, + try_parse_variant_scalar, +}; #[derive(Debug, Hash, PartialEq, Eq)] pub struct IsVariantNullUdf { @@ -44,17 +46,15 @@ impl ScalarUDFImpl for IsVariantNullUdf { } fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { - let variant_field = args.arg_fields.first().ok_or_else(|| { - exec_datafusion_err!( - "expected 1 argument field type, got {}", - args.arg_fields.len() - ) - })?; + let variant_field = args + .arg_fields + .first() + .ok_or_else(|| arg_field_meta_missing_err(self.name(), 1))?; try_field_as_variant_array(variant_field.as_ref())?; let [variant_arg] = args.args.as_slice() else { - return Err(args_count_err("1", args.args.len())); + return Err(args_count_err(self.name(), "1", args.args.len())); }; let out = match variant_arg { diff --git a/src/json_to_variant.rs b/src/json_to_variant.rs index 0a81205..9d83a26 100644 --- a/src/json_to_variant.rs +++ b/src/json_to_variant.rs @@ -15,7 +15,7 @@ use datafusion::{ use parquet_variant_compute::{VariantArrayBuilder, VariantType}; use parquet_variant_json::JsonToVariant as JsonToVariantExt; -use crate::shared::{args_count_err, try_field_as_string, try_parse_string_scalar, type_err}; +use crate::shared::{arg_type_err, args_count_err, try_field_as_string, try_parse_string_scalar}; /// Returns a Variant from a JSON string #[derive(Debug, Hash, PartialEq, Eq)] @@ -74,11 +74,14 @@ impl ScalarUDFImpl for JsonToVariantUdf { let arg_field = args .arg_fields .first() - .ok_or_else(|| args_count_err("1", 0))?; + .ok_or_else(|| args_count_err(self.name(), "1", args.arg_fields.len()))?; try_field_as_string(arg_field.as_ref())?; - let arg = args.args.first().ok_or_else(|| args_count_err("1", 0))?; + let arg = args + .args + .first() + .ok_or_else(|| args_count_err(self.name(), "1", args.args.len()))?; let out = match arg { ColumnarValue::Scalar(scalar_value) => { @@ -98,7 +101,14 @@ impl ScalarUDFImpl for JsonToVariantUdf { DataType::Utf8 => ColumnarValue::Array(from_utf8_arr(arr)?), DataType::LargeUtf8 => ColumnarValue::Array(from_large_utf8_arr(arr)?), DataType::Utf8View => ColumnarValue::Array(from_utf8view_arr(arr)?), - _ => return type_err("Utf8, LargeUtf8, or Utf8View", arr.data_type()), + _ => { + return arg_type_err( + self.name(), + 1, + "Utf8, LargeUtf8, or Utf8View", + arr.data_type(), + ); + } }, }; diff --git a/src/shared.rs b/src/shared.rs index b4416a5..5e63927 100644 --- a/src/shared.rs +++ b/src/shared.rs @@ -2,13 +2,13 @@ use std::sync::Arc; #[cfg(test)] use arrow::array::StructArray; -use arrow::array::{cast::AsArray, Array, ArrayRef}; -use arrow_schema::extension::ExtensionType; +use arrow::array::{Array, ArrayRef, cast::AsArray}; #[cfg(test)] use arrow_schema::Fields; +use arrow_schema::extension::ExtensionType; use arrow_schema::{DataType, Field}; use datafusion::common::exec_datafusion_err; -use datafusion::error::Result; +use datafusion::error::{DataFusionError, Result}; use datafusion::logical_expr::{ColumnarValue, ScalarFunctionArgs}; use datafusion::{common::exec_err, scalar::ScalarValue}; use parquet_variant::{Variant, VariantPath}; @@ -257,20 +257,49 @@ pub fn ensure(pred: bool, err_msg: &str) -> Result<()> { Ok(()) } -// cleaner error handling - -/// helper for argument count errors -pub fn args_count_err(expected: &'static str, actual: usize) -> DataFusionError { - DataFusionError::Execution(format!("expected {expected} argument(s), got {actual}")) +/// Helper for argument count errors. +pub fn args_count_err(udf: &str, expected: &'static str, actual: usize) -> DataFusionError { + DataFusionError::Execution(format!( + "{udf}: expected {expected} argument(s), got {actual}" + )) } -// helper for argument type errors -pub fn type_err(expected: &str, actual: &DataType) -> Result { +/// Helper for argument type errors. +pub fn arg_type_err( + udf: &str, + arg_index: u8, + expected: &str, + actual: &DataType, +) -> Result { Err(DataFusionError::Execution(format!( - "expected {expected}, got {actual:?}" + "{udf} arg #{arg_index}: expected {expected}, got {actual}" ))) } +/// Helper for unexpected NULL argument values. +pub fn arg_null_err(udf: &str, arg_index: u8, expected: &str) -> Result { + Err(arg_null_error(udf, arg_index, expected)) +} + +/// Helper for unexpected NULL argument values as a plain DataFusionError. +pub fn arg_null_error(udf: &str, arg_index: u8, expected: &str) -> DataFusionError { + DataFusionError::Execution(format!( + "{udf} arg #{arg_index}: expected {expected}, got NULL" + )) +} + +/// Helper for scalar/array shape mismatches. +pub fn arg_shape_err(udf: &str, arg_index: u8, expected: &str, actual: &str) -> DataFusionError { + DataFusionError::Execution(format!( + "{udf} arg #{arg_index}: expected {expected}, got {actual}" + )) +} + +/// Helper for missing argument field metadata. +pub fn arg_field_meta_missing_err(udf: &str, arg_index: u8) -> DataFusionError { + DataFusionError::Execution(format!("{udf} arg #{arg_index} field metadata is missing")) +} + // test related methods #[cfg(test)] diff --git a/src/variant_get.rs b/src/variant_get.rs index 63aed0f..b9576ff 100644 --- a/src/variant_get.rs +++ b/src/variant_get.rs @@ -8,7 +8,7 @@ use arrow::{ }; use arrow_schema::{ArrowError, DataType, Field, FieldRef, Fields}; use datafusion::{ - common::{arrow_datafusion_err, exec_datafusion_err, exec_err}, + common::{arrow_datafusion_err, exec_datafusion_err}, error::{DataFusionError, Result}, logical_expr::{ ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, Signature, @@ -22,17 +22,30 @@ use parquet_variant_json::VariantToJson; use crate::impl_variant_get::impl_variant_get_typed; use crate::shared::{ + arg_field_meta_missing_err, arg_null_err, arg_shape_err, arg_type_err, args_count_err, invoke_variant_get_typed, try_field_as_variant_array, try_parse_string_columnar, try_parse_string_scalar, }; -fn type_hint_from_scalar(field_name: &str, scalar: &ScalarValue) -> Result { +fn type_hint_from_scalar( + udf_name: &str, + field_name: &str, + scalar: &ScalarValue, +) -> Result { let type_name = match scalar { ScalarValue::Utf8(Some(value)) | ScalarValue::Utf8View(Some(value)) | ScalarValue::LargeUtf8(Some(value)) => value.as_str(), + ScalarValue::Utf8(None) | ScalarValue::Utf8View(None) | ScalarValue::LargeUtf8(None) => { + return arg_null_err(udf_name, 3, "a non-null UTF8 literal"); + } other => { - return type_err("Utf8, LargeUtf8, or Utf8View", &other.data_type()); + return arg_type_err( + udf_name, + 3, + "Utf8, LargeUtf8, or Utf8View", + &other.data_type(), + ); } }; @@ -45,12 +58,10 @@ fn type_hint_from_scalar(field_name: &str, scalar: &ScalarValue) -> Result Result { +fn type_hint_from_value(udf_name: &str, field_name: &str, arg: &ColumnarValue) -> Result { match arg { - ColumnarValue::Scalar(value) => type_hint_from_scalar(field_name, value), - ColumnarValue::Array(_) => { - exec_err!("type hint argument must be a scalar UTF8 literal") - } + ColumnarValue::Scalar(value) => type_hint_from_scalar(udf_name, field_name, value), + ColumnarValue::Array(_) => Err(arg_shape_err(udf_name, 3, "scalar value", "array value")), } } @@ -93,18 +104,19 @@ fn invoke_variant_get( let (variant_arg, variant_path, type_arg) = match args.args.as_slice() { [variant_arg, variant_path] => (variant_arg, variant_path, None), [variant_arg, variant_path, type_arg] => (variant_arg, variant_path, Some(type_arg)), - _ => return exec_err!("expected 2 or 3 arguments"), + _ => return Err(args_count_err(udf_name, "2 or 3", args.args.len())), }; let variant_field = args .arg_fields .first() - .ok_or_else(|| exec_datafusion_err!("expected argument field"))?; + .ok_or_else(|| arg_field_meta_missing_err(udf_name, 1))?; try_field_as_variant_array(variant_field.as_ref())?; + let type_field_name = args.return_field.name(); let type_field = type_arg - .map(|arg| type_hint_from_value(udf_name, arg)) + .map(|arg| type_hint_from_value(udf_name, type_field_name, arg)) .transpose()?; let out = match (variant_arg, variant_path) { @@ -122,7 +134,7 @@ fn invoke_variant_get( } (ColumnarValue::Scalar(scalar_variant), ColumnarValue::Scalar(variant_path)) => { let ScalarValue::Struct(variant_array) = scalar_variant else { - return exec_err!("expected struct array"); + return arg_type_err(udf_name, 1, "Struct", &scalar_variant.data_type()); }; let variant_array = Arc::clone(variant_array) as ArrayRef; @@ -141,7 +153,12 @@ fn invoke_variant_get( } (ColumnarValue::Array(variant_array), ColumnarValue::Array(variant_paths)) => { if variant_array.len() != variant_paths.len() { - return exec_err!("expected variant_array and variant paths to be of same length"); + return Err(arg_shape_err( + udf_name, + 2, + "array with same length as arg #1", + "array with different length", + )); } let variant_paths = try_parse_string_columnar(variant_paths)?; @@ -172,7 +189,7 @@ fn invoke_variant_get( } (ColumnarValue::Scalar(scalar_variant), ColumnarValue::Array(variant_paths)) => { let ScalarValue::Struct(variant_array) = scalar_variant else { - return exec_err!("expected struct array"); + return arg_type_err(udf_name, 1, "Struct", &scalar_variant.data_type()); }; let variant_array = Arc::clone(variant_array) as ArrayRef; @@ -203,7 +220,7 @@ fn return_field_for_variant_get(name: &str, args: ReturnFieldArgs) -> Result { let (m, v) = create_variant_list_with_new_elements( + self.name(), variant_list, [element_to_append].into_iter(), )?; @@ -195,9 +198,12 @@ impl ScalarUDFImpl for VariantListInsert { Ok(ColumnarValue::Array(Arc::new(out) as _)) } - (ColumnarValue::Scalar(_), ColumnarValue::Array(_)) => { - exec_err!("unsupported argument") - } + (ColumnarValue::Scalar(_), ColumnarValue::Array(_)) => Err(arg_shape_err( + self.name(), + 2, + "scalar value when arg #1 is scalar", + "array value", + )), } } } @@ -205,11 +211,12 @@ impl ScalarUDFImpl for VariantListInsert { // note: I wonder if we can abstract this away // it would be good to profile and see if this pocket of code is slow fn create_variant_list_with_new_elements<'m, 'v>( + udf_name: &str, variant_list: Variant, elements_to_insert: impl Iterator>, ) -> Result<(Vec, Vec)> { let Variant::List(variant_list) = variant_list else { - return exec_err!("expected variant list"); + return exec_err!("{udf_name} arg #1: expected variant list"); }; // note: I wonder if we can abstract this away diff --git a/src/variant_object_insert.rs b/src/variant_object_insert.rs index c1c05a3..857ba8e 100644 --- a/src/variant_object_insert.rs +++ b/src/variant_object_insert.rs @@ -13,7 +13,10 @@ use datafusion::{ use parquet_variant::{Variant, VariantBuilder}; use parquet_variant_compute::{VariantArray, VariantType}; -use crate::shared::{args_count_err, ensure, try_parse_string_scalar, try_parse_variant_scalar}; +use crate::shared::{ + arg_null_error, arg_shape_err, args_count_err, ensure, try_parse_string_scalar, + try_parse_variant_scalar, +}; #[derive(Debug, Hash, PartialEq, Eq)] pub struct VariantObjectInsert { @@ -71,7 +74,7 @@ impl ScalarUDFImpl for VariantObjectInsert { )?; let [variant_object_to_update, key, value] = argument_values.as_slice() else { - return Err(args_count_err("3", argument_values.len())); + return Err(args_count_err(self.name(), "3", argument_values.len())); }; { @@ -87,16 +90,26 @@ impl ScalarUDFImpl for VariantObjectInsert { let key = { let ColumnarValue::Scalar(key) = key else { - return exec_err!("expected scalar value for key"); + return Err(arg_shape_err( + self.name(), + 2, + "scalar string value", + "array value", + )); }; try_parse_string_scalar(key)? - .ok_or_else(|| DataFusionError::Execution("expected non null string".into()))? + .ok_or_else(|| arg_null_error(self.name(), 2, "a non-null string literal"))? }; let value_array = { let ColumnarValue::Scalar(value) = value else { - return exec_err!("expected scalar value for value"); + return Err(arg_shape_err( + self.name(), + 3, + "scalar variant value", + "array value", + )); }; try_parse_variant_scalar(value)? @@ -108,7 +121,7 @@ impl ScalarUDFImpl for VariantObjectInsert { let variant_object = try_parse_variant_scalar(scalar_variant_object_to_update)?; let variant_object = variant_object.value(0); let Variant::Object(variant_object) = variant_object else { - return exec_err!("expected variant object"); + return exec_err!("{} arg #1: expected variant object", self.name()); }; let mut v = VariantBuilder::new(); @@ -136,7 +149,10 @@ impl ScalarUDFImpl for VariantObjectInsert { v_opt .map(|variant_object| { let Variant::Object(variant_object) = variant_object else { - return exec_err!("expected variant object"); + return exec_err!( + "{} arg #1: expected variant object", + self.name() + ); }; let mut v = VariantBuilder::new(); diff --git a/src/variant_pretty.rs b/src/variant_pretty.rs index 75ee900..fded548 100644 --- a/src/variant_pretty.rs +++ b/src/variant_pretty.rs @@ -11,7 +11,9 @@ use datafusion::{ }; use parquet_variant_compute::VariantArray; -use crate::shared::{args_count_err, try_field_as_variant_array, type_err}; +use crate::shared::{ + arg_field_meta_missing_err, arg_type_err, args_count_err, try_field_as_variant_array, +}; #[derive(Debug, Hash, PartialEq, Eq)] pub struct VariantPretty { @@ -47,16 +49,19 @@ impl ScalarUDFImpl for VariantPretty { let field = args .arg_fields .first() - .ok_or_else(|| args_count_err("1", 0))?; + .ok_or_else(|| arg_field_meta_missing_err(self.name(), 1))?; try_field_as_variant_array(field.as_ref())?; - let arg = args.args.first().ok_or_else(|| args_count_err("1", 0))?; + let arg = args + .args + .first() + .ok_or_else(|| args_count_err(self.name(), "1", args.args.len()))?; let out = match arg { ColumnarValue::Scalar(scalar) => { let ScalarValue::Struct(variant_array) = scalar else { - return type_err("Struct", &scalar.data_type()); + return arg_type_err(self.name(), 1, "Struct", &scalar.data_type()); }; let variant_array = VariantArray::try_new(variant_array.as_ref())?; @@ -77,7 +82,7 @@ impl ScalarUDFImpl for VariantPretty { ColumnarValue::Array(Arc::new(out)) } - unsupported => return type_err("Struct", unsupported), + unsupported => return arg_type_err(self.name(), 1, "Struct", unsupported), }, }; diff --git a/src/variant_to_json.rs b/src/variant_to_json.rs index 23c51da..3c8a39f 100644 --- a/src/variant_to_json.rs +++ b/src/variant_to_json.rs @@ -14,7 +14,9 @@ use datafusion::{ use parquet_variant_compute::VariantArray; use parquet_variant_json::VariantToJson; -use crate::shared::{args_count_err, try_field_as_variant_array, type_err}; +use crate::shared::{ + arg_field_meta_missing_err, arg_type_err, args_count_err, try_field_as_variant_array, +}; /// Returns a JSON string from a VariantArray /// @@ -58,16 +60,19 @@ impl ScalarUDFImpl for VariantToJsonUdf { let field = args .arg_fields .first() - .ok_or_else(|| args_count_err("1", 0))?; + .ok_or_else(|| arg_field_meta_missing_err(self.name(), 1))?; try_field_as_variant_array(field.as_ref())?; - let arg = args.args.first().ok_or_else(|| args_count_err("1", 0))?; + let arg = args + .args + .first() + .ok_or_else(|| args_count_err(self.name(), "1", args.args.len()))?; let out = match arg { ColumnarValue::Scalar(scalar) => { let ScalarValue::Struct(variant_array) = scalar else { - return type_err("Struct", &scalar.data_type()); + return arg_type_err(self.name(), 1, "Struct", &scalar.data_type()); }; let variant_array = VariantArray::try_new(variant_array.as_ref())?; @@ -87,7 +92,7 @@ impl ScalarUDFImpl for VariantToJsonUdf { ColumnarValue::Array(Arc::new(out)) } - unsupported => return type_err("Struct", unsupported), + unsupported => return arg_type_err(self.name(), 1, "Struct", unsupported), }, }; From c705c2e5b2940b3bb265fe8a418d1d1272faa9b4 Mon Sep 17 00:00:00 2001 From: sdf-jkl Date: Tue, 31 Mar 2026 15:42:12 -0400 Subject: [PATCH 7/8] Reuse shared argument error helpers in remaining UDFs --- src/cast_to_variant.rs | 19 ++++++++++++------- src/variant_list_delete.rs | 11 ++++++++--- src/variant_normalize.rs | 11 ++++++----- src/variant_object_delete.rs | 16 ++++++++++++---- src/variant_object_keys.rs | 21 ++++++++++++++------- 5 files changed, 52 insertions(+), 26 deletions(-) diff --git a/src/cast_to_variant.rs b/src/cast_to_variant.rs index 8513d7e..7d7e233 100644 --- a/src/cast_to_variant.rs +++ b/src/cast_to_variant.rs @@ -3,7 +3,6 @@ use std::sync::Arc; use arrow::array::{Array, ArrayRef, AsArray, StructArray}; use arrow_schema::{DataType, Field}; use datafusion::{ - common::exec_err, error::Result, logical_expr::{ ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, Signature, @@ -14,7 +13,9 @@ use datafusion::{ use parquet_variant::Variant; use parquet_variant_compute::{VariantArray, VariantArrayBuilder, cast_to_variant}; -use crate::shared::{try_parse_binary_columnar, try_parse_binary_scalar}; +use crate::shared::{ + arg_shape_err, args_count_err, try_parse_binary_columnar, try_parse_binary_scalar, +}; #[derive(Debug, Hash, PartialEq, Eq)] pub struct CastToVariantUdf { @@ -57,15 +58,19 @@ impl CastToVariantUdf { } fn from_metadata_value( + udf_name: &str, metadata_argument: &ColumnarValue, variant_argument: &ColumnarValue, ) -> Result { let out = match (metadata_argument, variant_argument) { (ColumnarValue::Array(metadata_array), ColumnarValue::Array(value_array)) => { if metadata_array.len() != value_array.len() { - return exec_err!( - "expected metadata array to be of same length as variant array" - ); + return Err(arg_shape_err( + udf_name, + 2, + "array with same length as arg #1", + "array with different length", + )); } let metadata_array = try_parse_binary_columnar(metadata_array)?; @@ -180,11 +185,11 @@ impl ScalarUDFImpl for CastToVariantUdf { fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { match args.args.as_slice() { [metadata_value, variant_value] => { - Self::from_metadata_value(metadata_value, variant_value) + Self::from_metadata_value(self.name(), metadata_value, variant_value) } [ColumnarValue::Scalar(scalar_value)] => Self::from_scalar_value(scalar_value), [ColumnarValue::Array(array)] => Self::from_array(array), - _ => exec_err!("unrecognized argument"), + _ => Err(args_count_err(self.name(), "1 or 2", args.args.len())), } } } diff --git a/src/variant_list_delete.rs b/src/variant_list_delete.rs index 5d76b30..f29ac3e 100644 --- a/src/variant_list_delete.rs +++ b/src/variant_list_delete.rs @@ -13,7 +13,7 @@ use datafusion::{ use parquet_variant::{Variant, VariantBuilder}; use parquet_variant_compute::{VariantArray, VariantType}; -use crate::shared::{ensure, try_parse_variant_scalar}; +use crate::shared::{arg_shape_err, args_count_err, ensure, try_parse_variant_scalar}; #[derive(Debug, Hash, PartialEq, Eq)] pub struct VariantListDelete { @@ -109,7 +109,7 @@ impl ScalarUDFImpl for VariantListDelete { )?; let [variant_list_to_update, index_to_delete] = argument_values.as_slice() else { - return exec_err!("expected 2 arguments"); + return Err(args_count_err(self.name(), "2", argument_values.len())); }; ensure( @@ -119,7 +119,12 @@ impl ScalarUDFImpl for VariantListDelete { let index = { let ColumnarValue::Scalar(index) = index_to_delete else { - return exec_err!("expected scalar value for index"); + return Err(arg_shape_err( + self.name(), + 2, + "scalar integer value", + "array value", + )); }; try_parse_index_scalar(index)? diff --git a/src/variant_normalize.rs b/src/variant_normalize.rs index 4a92b20..2415000 100644 --- a/src/variant_normalize.rs +++ b/src/variant_normalize.rs @@ -2,7 +2,6 @@ use std::sync::Arc; use arrow::array::{ArrayRef, StructArray}; use arrow_schema::{DataType, Field, Fields}; -use datafusion::common::{exec_datafusion_err, exec_err}; use datafusion::error::Result; use datafusion::logical_expr::{ ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, @@ -11,7 +10,9 @@ use datafusion::logical_expr::{ use datafusion::scalar::ScalarValue; use parquet_variant_compute::{VariantArray, VariantArrayBuilder, VariantType}; -use crate::shared::try_field_as_variant_array; +use crate::shared::{ + arg_field_meta_missing_err, arg_type_err, args_count_err, try_field_as_variant_array, +}; /// Normalizes a Variant value into a canonical binary form. /// @@ -92,18 +93,18 @@ impl ScalarUDFImpl for VariantNormalizeUdf { let variant_field = args .arg_fields .first() - .ok_or_else(|| exec_datafusion_err!("expected 1 argument"))?; + .ok_or_else(|| arg_field_meta_missing_err(self.name(), 1))?; try_field_as_variant_array(variant_field.as_ref())?; let [variant_arg] = args.args.as_slice() else { - return exec_err!("expected 1 argument"); + return Err(args_count_err(self.name(), "1", args.args.len())); }; let out = match variant_arg { ColumnarValue::Scalar(scalar_variant) => { let ScalarValue::Struct(struct_array) = scalar_variant else { - return exec_err!("expected variant struct"); + return arg_type_err(self.name(), 1, "Struct", &scalar_variant.data_type()); }; let variant_array = VariantArray::try_new(struct_array.as_ref())?; diff --git a/src/variant_object_delete.rs b/src/variant_object_delete.rs index 9d21bda..b0f8101 100644 --- a/src/variant_object_delete.rs +++ b/src/variant_object_delete.rs @@ -13,7 +13,10 @@ use datafusion::{ use parquet_variant::{Variant, VariantBuilder}; use parquet_variant_compute::{VariantArray, VariantType}; -use crate::shared::{ensure, try_parse_string_scalar, try_parse_variant_scalar}; +use crate::shared::{ + arg_null_error, arg_shape_err, args_count_err, ensure, try_parse_string_scalar, + try_parse_variant_scalar, +}; #[derive(Debug, Hash, PartialEq, Eq)] pub struct VariantObjectDelete { @@ -71,7 +74,7 @@ impl ScalarUDFImpl for VariantObjectDelete { )?; let [variant_object_to_update, key_to_delete] = argument_values.as_slice() else { - return exec_err!("expected 2 arguments"); + return Err(args_count_err(self.name(), "2", argument_values.len())); }; ensure( @@ -81,11 +84,16 @@ impl ScalarUDFImpl for VariantObjectDelete { let key = { let ColumnarValue::Scalar(key) = key_to_delete else { - return exec_err!("expected scalar value for key"); + return Err(arg_shape_err( + self.name(), + 2, + "scalar string value", + "array value", + )); }; try_parse_string_scalar(key)? - .ok_or_else(|| DataFusionError::Execution("expected non null string".into()))? + .ok_or_else(|| arg_null_error(self.name(), 2, "a non-null string literal"))? }; match variant_object_to_update { diff --git a/src/variant_object_keys.rs b/src/variant_object_keys.rs index a7bfaea..cb46133 100644 --- a/src/variant_object_keys.rs +++ b/src/variant_object_keys.rs @@ -2,7 +2,6 @@ use std::sync::Arc; use arrow::array::{ArrayRef, ListBuilder, StringBuilder}; use arrow_schema::{DataType, Field}; -use datafusion::common::{exec_datafusion_err, exec_err}; use datafusion::error::Result; use datafusion::logical_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility, @@ -12,7 +11,10 @@ use parquet_variant::Variant; use parquet_variant::VariantPath; use parquet_variant_compute::{GetOptions, VariantArray, variant_get as compute_variant_get}; -use crate::shared::{try_field_as_variant_array, try_parse_string_scalar}; +use crate::shared::{ + arg_field_meta_missing_err, arg_null_error, arg_shape_err, arg_type_err, args_count_err, + try_field_as_variant_array, try_parse_string_scalar, +}; #[derive(Debug, Hash, PartialEq, Eq)] pub struct VariantObjectKeys { @@ -70,26 +72,31 @@ impl ScalarUDFImpl for VariantObjectKeys { [variant_arg] => (variant_arg, None), [variant_arg, path_arg] => { let ColumnarValue::Scalar(path_scalar) = path_arg else { - return exec_err!("expected scalar value for path"); + return Err(arg_shape_err( + self.name(), + 2, + "scalar string value", + "array value", + )); }; let path = try_parse_string_scalar(path_scalar)? - .ok_or_else(|| exec_datafusion_err!("expected non-null string for path"))?; + .ok_or_else(|| arg_null_error(self.name(), 2, "a non-null string literal"))?; (variant_arg, Some(path)) } - _ => return exec_err!("expected 1 or 2 arguments"), + _ => return Err(args_count_err(self.name(), "1 or 2", args.args.len())), }; let variant_field = args .arg_fields .first() - .ok_or_else(|| exec_datafusion_err!("expected 1 argument field type"))?; + .ok_or_else(|| arg_field_meta_missing_err(self.name(), 1))?; try_field_as_variant_array(variant_field.as_ref())?; let out = match variant_arg { ColumnarValue::Scalar(scalar_variant) => { let ScalarValue::Struct(struct_arr) = scalar_variant else { - return exec_err!("expected variant scalar value"); + return arg_type_err(self.name(), 1, "Struct", &scalar_variant.data_type()); }; let arr: ArrayRef = Arc::clone(struct_arr) as ArrayRef; From 62331e279d2dbdfd69f4ae806944d372fe194ab5 Mon Sep 17 00:00:00 2001 From: sdf-jkl Date: Fri, 3 Apr 2026 11:55:11 -0400 Subject: [PATCH 8/8] Add arg_variant_kind_err helper --- src/shared.rs | 11 +++++++++++ src/variant_list_delete.rs | 6 ++++-- src/variant_list_insert.rs | 11 ++++------- src/variant_object_delete.rs | 9 ++++----- src/variant_object_insert.rs | 12 ++++-------- 5 files changed, 27 insertions(+), 22 deletions(-) diff --git a/src/shared.rs b/src/shared.rs index c882c85..ee52b0c 100644 --- a/src/shared.rs +++ b/src/shared.rs @@ -300,6 +300,17 @@ pub fn arg_shape_err(udf: &str, arg_index: u8, expected: &str, actual: &str) -> )) } +/// Helper for invalid Variant kind in an argument. +pub fn arg_variant_kind_err( + udf: &str, + arg_index: u8, + expected_variant_kind: &str, +) -> DataFusionError { + DataFusionError::Execution(format!( + "{udf} arg #{arg_index}: expected variant {expected_variant_kind}" + )) +} + /// Helper for missing argument field metadata. pub fn arg_field_meta_missing_err(udf: &str, arg_index: u8) -> DataFusionError { DataFusionError::Execution(format!("{udf} arg #{arg_index} field metadata is missing")) diff --git a/src/variant_list_delete.rs b/src/variant_list_delete.rs index f29ac3e..d9ec759 100644 --- a/src/variant_list_delete.rs +++ b/src/variant_list_delete.rs @@ -13,7 +13,9 @@ use datafusion::{ use parquet_variant::{Variant, VariantBuilder}; use parquet_variant_compute::{VariantArray, VariantType}; -use crate::shared::{arg_shape_err, args_count_err, ensure, try_parse_variant_scalar}; +use crate::shared::{ + arg_shape_err, arg_variant_kind_err, args_count_err, ensure, try_parse_variant_scalar, +}; #[derive(Debug, Hash, PartialEq, Eq)] pub struct VariantListDelete { @@ -41,7 +43,7 @@ fn try_parse_index_scalar(scalar: &ScalarValue) -> Result { fn delete_list_element(variant_list: Variant, index: usize) -> Result<(Vec, Vec)> { let Variant::List(variant_list) = variant_list else { - return exec_err!("expected variant list"); + return Err(arg_variant_kind_err("variant_list_delete", 1, "list")); }; if index >= variant_list.len() { diff --git a/src/variant_list_insert.rs b/src/variant_list_insert.rs index b96c8ed..7fb797c 100644 --- a/src/variant_list_insert.rs +++ b/src/variant_list_insert.rs @@ -3,7 +3,6 @@ use std::sync::Arc; use arrow::array::StructArray; use arrow_schema::{DataType, Field, Fields}; use datafusion::{ - common::exec_err, error::{DataFusionError, Result}, logical_expr::{ ColumnarValue, ReturnFieldArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility, @@ -13,7 +12,9 @@ use datafusion::{ use parquet_variant::{Variant, VariantBuilder}; use parquet_variant_compute::{VariantArray, VariantType}; -use crate::shared::{arg_shape_err, args_count_err, ensure, try_parse_variant_scalar}; +use crate::shared::{ + arg_shape_err, arg_variant_kind_err, args_count_err, ensure, try_parse_variant_scalar, +}; #[derive(Debug, Hash, PartialEq, Eq)] pub struct VariantListInsert { @@ -96,7 +97,6 @@ impl ScalarUDFImpl for VariantListInsert { let out: StructArray = { let (m, v) = create_variant_list_with_new_elements( - self.name(), variant_list, [variant_to_insert].into_iter(), )?; @@ -121,7 +121,6 @@ impl ScalarUDFImpl for VariantListInsert { .map(|v| { v.map(|v| { create_variant_list_with_new_elements( - self.name(), v, [variant_to_insert.clone()].into_iter(), ) @@ -160,7 +159,6 @@ impl ScalarUDFImpl for VariantListInsert { match (variant_list_to_update, element_to_append) { (Some(variant_list), Some(element_to_append)) => { let (m, v) = create_variant_list_with_new_elements( - self.name(), variant_list, [element_to_append].into_iter(), )?; @@ -211,12 +209,11 @@ impl ScalarUDFImpl for VariantListInsert { // note: I wonder if we can abstract this away // it would be good to profile and see if this pocket of code is slow fn create_variant_list_with_new_elements<'m, 'v>( - udf_name: &str, variant_list: Variant, elements_to_insert: impl Iterator>, ) -> Result<(Vec, Vec)> { let Variant::List(variant_list) = variant_list else { - return exec_err!("{udf_name} arg #1: expected variant list"); + return Err(arg_variant_kind_err("variant_list_insert", 1, "list")); }; // note: I wonder if we can abstract this away diff --git a/src/variant_object_delete.rs b/src/variant_object_delete.rs index b0f8101..9bd0fc7 100644 --- a/src/variant_object_delete.rs +++ b/src/variant_object_delete.rs @@ -3,7 +3,6 @@ use std::sync::Arc; use arrow::array::StructArray; use arrow_schema::{DataType, Field, Fields}; use datafusion::{ - common::exec_err, error::{DataFusionError, Result}, logical_expr::{ ColumnarValue, ReturnFieldArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility, @@ -14,8 +13,8 @@ use parquet_variant::{Variant, VariantBuilder}; use parquet_variant_compute::{VariantArray, VariantType}; use crate::shared::{ - arg_null_error, arg_shape_err, args_count_err, ensure, try_parse_string_scalar, - try_parse_variant_scalar, + arg_null_error, arg_shape_err, arg_variant_kind_err, args_count_err, ensure, + try_parse_string_scalar, try_parse_variant_scalar, }; #[derive(Debug, Hash, PartialEq, Eq)] @@ -101,7 +100,7 @@ impl ScalarUDFImpl for VariantObjectDelete { let variant_object = try_parse_variant_scalar(scalar_variant_object_to_update)?; let variant_object = variant_object.value(0); let Variant::Object(variant_object) = variant_object else { - return exec_err!("expected variant object"); + return Err(arg_variant_kind_err(self.name(), 1, "object")); }; let mut v = VariantBuilder::new(); @@ -131,7 +130,7 @@ impl ScalarUDFImpl for VariantObjectDelete { v_opt .map(|variant_object| { let Variant::Object(variant_object) = variant_object else { - return exec_err!("expected variant object"); + return Err(arg_variant_kind_err(self.name(), 1, "object")); }; let mut v = VariantBuilder::new(); diff --git a/src/variant_object_insert.rs b/src/variant_object_insert.rs index 857ba8e..46d78bf 100644 --- a/src/variant_object_insert.rs +++ b/src/variant_object_insert.rs @@ -3,7 +3,6 @@ use std::sync::Arc; use arrow::array::StructArray; use arrow_schema::{DataType, Field, Fields}; use datafusion::{ - common::exec_err, error::{DataFusionError, Result}, logical_expr::{ ColumnarValue, ReturnFieldArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility, @@ -14,8 +13,8 @@ use parquet_variant::{Variant, VariantBuilder}; use parquet_variant_compute::{VariantArray, VariantType}; use crate::shared::{ - arg_null_error, arg_shape_err, args_count_err, ensure, try_parse_string_scalar, - try_parse_variant_scalar, + arg_null_error, arg_shape_err, arg_variant_kind_err, args_count_err, ensure, + try_parse_string_scalar, try_parse_variant_scalar, }; #[derive(Debug, Hash, PartialEq, Eq)] @@ -121,7 +120,7 @@ impl ScalarUDFImpl for VariantObjectInsert { let variant_object = try_parse_variant_scalar(scalar_variant_object_to_update)?; let variant_object = variant_object.value(0); let Variant::Object(variant_object) = variant_object else { - return exec_err!("{} arg #1: expected variant object", self.name()); + return Err(arg_variant_kind_err(self.name(), 1, "object")); }; let mut v = VariantBuilder::new(); @@ -149,10 +148,7 @@ impl ScalarUDFImpl for VariantObjectInsert { v_opt .map(|variant_object| { let Variant::Object(variant_object) = variant_object else { - return exec_err!( - "{} arg #1: expected variant object", - self.name() - ); + return Err(arg_variant_kind_err(self.name(), 1, "object")); }; let mut v = VariantBuilder::new();