Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion arrow-array/src/array/null_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ mod tests {

// Simulate a NULL value in the parent array, for instance, if array being queried by
// invalid index
mutable.extend_nulls(1);
mutable.try_extend_nulls(1).unwrap();
let data = mutable.freeze();

let struct_array = Arc::new(StructArray::from(data.clone()));
Expand Down
2 changes: 1 addition & 1 deletion arrow-array/src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1729,7 +1729,7 @@ mod tests_from_ffi {
let data = array.to_data();

let mut mutable = MutableArrayData::new(vec![&data], false, len);
mutable.extend(0, 0, len);
mutable.try_extend(0, 0, len).unwrap();
make_array(mutable.freeze())
}

Expand Down
20 changes: 15 additions & 5 deletions arrow-cast/src/cast/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,14 @@ where
if cast_options.safe || array.is_null(idx) {
if last_pos != start_pos {
// Extend with valid slices
mutable.extend(0, last_pos, start_pos);
mutable
.try_extend(0, last_pos, start_pos)
.map_err(|e| ArrowError::CastError(e.to_string()))?;
}
// Pad this slice with nulls
mutable.extend_nulls(size as _);
mutable
.try_extend_nulls(size as _)
.map_err(|e| ArrowError::CastError(e.to_string()))?;
null_builder.set_bit(idx, false);
// Set last_pos to the end of this slice's values
last_pos = end_pos
Expand All @@ -211,7 +215,9 @@ where
if mutable.len() != cap {
// Remaining slices were all correct length
let remaining = cap - mutable.len();
mutable.extend(0, last_pos, last_pos + remaining)
mutable
.try_extend(0, last_pos, last_pos + remaining)
.map_err(|e| ArrowError::CastError(e.to_string()))?;
}
make_array(mutable.freeze())
}
Expand Down Expand Up @@ -252,15 +258,19 @@ pub(crate) fn cast_list_view_to_fixed_size_list<O: OffsetSizeTrait>(
if len != size as usize {
// Nulls in FixedSizeListArray take up space and so we must pad the values
if cast_options.safe || array.is_null(idx) {
mutable.extend_nulls(size as _);
mutable
.try_extend_nulls(size as _)
.map_err(|e| ArrowError::CastError(e.to_string()))?;
null_builder.set_bit(idx, false);
} else {
return Err(ArrowError::CastError(format!(
"Cannot cast to FixedSizeList({size}): value at index {idx} has length {len}",
)));
}
} else {
mutable.extend(0, offset, offset + len);
mutable
.try_extend(0, offset, offset + len)
.map_err(|e| ArrowError::CastError(e.to_string()))?;
}
}

Expand Down
7 changes: 6 additions & 1 deletion arrow-data/src/transform/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,16 @@ pub(super) fn build_extend(array: &ArrayData) -> Extend<'_> {
array.offset() + start,
len,
);
Ok(())
},
)
}

pub(super) fn extend_nulls(mutable: &mut _MutableArrayData, len: usize) {
pub(super) fn extend_nulls(
mutable: &mut _MutableArrayData,
len: usize,
) -> Result<(), arrow_schema::ArrowError> {
let buffer = &mut mutable.buffer1;
resize_for_bits(buffer, mutable.len + len);
Ok(())
}
7 changes: 6 additions & 1 deletion arrow-data/src/transform/fixed_binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,21 @@ pub(super) fn build_extend(array: &ArrayData) -> Extend<'_> {
move |mutable: &mut _MutableArrayData, _, start: usize, len: usize| {
let buffer = &mut mutable.buffer1;
buffer.extend_from_slice(&values[start * size..(start + len) * size]);
Ok(())
},
)
}

pub(super) fn extend_nulls(mutable: &mut _MutableArrayData, len: usize) {
pub(super) fn extend_nulls(
mutable: &mut _MutableArrayData,
len: usize,
) -> Result<(), arrow_schema::ArrowError> {
let size = match mutable.data_type {
DataType::FixedSizeBinary(i) => i as usize,
_ => unreachable!(),
};

let values_buffer = &mut mutable.buffer1;
values_buffer.extend_zeros(len * size);
Ok(())
}
20 changes: 10 additions & 10 deletions arrow-data/src/transform/fixed_size_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use crate::ArrayData;
use arrow_schema::DataType;
use arrow_schema::{ArrowError, DataType};

use super::{_MutableArrayData, Extend};

Expand All @@ -28,22 +28,22 @@ pub(super) fn build_extend(array: &ArrayData) -> Extend<'_> {

Box::new(
move |mutable: &mut _MutableArrayData, index: usize, start: usize, len: usize| {
mutable
.child_data
.iter_mut()
.for_each(|child| child.extend(index, start * size, (start + len) * size))
for child in mutable.child_data.iter_mut() {
child.try_extend(index, start * size, (start + len) * size)?;
}
Ok(())
},
)
}

pub(super) fn extend_nulls(mutable: &mut _MutableArrayData, len: usize) {
pub(super) fn extend_nulls(mutable: &mut _MutableArrayData, len: usize) -> Result<(), ArrowError> {
let size = match mutable.data_type {
DataType::FixedSizeList(_, i) => i as usize,
_ => unreachable!(),
};

mutable
.child_data
.iter_mut()
.for_each(|child| child.extend_nulls(len * size))
for child in mutable.child_data.iter_mut() {
child.try_extend_nulls(len * size)?;
}
Ok(())
}
15 changes: 10 additions & 5 deletions arrow-data/src/transform/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

use super::{
_MutableArrayData, Extend,
utils::{extend_offsets, get_last_offset},
utils::{get_last_offset, try_extend_offsets},
};
use crate::ArrayData;
use arrow_buffer::ArrowNativeType;
use arrow_schema::ArrowError;
use num_integer::Integer;
use num_traits::CheckedAdd;

Expand All @@ -36,9 +37,9 @@ pub(super) fn build_extend<T: ArrowNativeType + Integer + CheckedAdd>(
let last_offset: T = unsafe { get_last_offset(offset_buffer) };

// offsets
extend_offsets::<T>(offset_buffer, last_offset, &offsets[start..start + len + 1]);
try_extend_offsets::<T>(offset_buffer, last_offset, &offsets[start..start + len + 1])?;

mutable.child_data[0].extend(
mutable.child_data[0].try_extend(
index,
offsets[start].as_usize(),
offsets[start + len].as_usize(),
Expand All @@ -47,11 +48,15 @@ pub(super) fn build_extend<T: ArrowNativeType + Integer + CheckedAdd>(
)
}

pub(super) fn extend_nulls<T: ArrowNativeType>(mutable: &mut _MutableArrayData, len: usize) {
pub(super) fn extend_nulls<T: ArrowNativeType>(
mutable: &mut _MutableArrayData,
len: usize,
) -> Result<(), ArrowError> {
let offset_buffer = &mut mutable.buffer1;

// this is safe due to how offset is built. See details on `get_last_offset`
let last_offset: T = unsafe { get_last_offset(offset_buffer) };

(0..len).for_each(|_| offset_buffer.push(last_offset))
(0..len).for_each(|_| offset_buffer.push(last_offset));
Ok(())
}
19 changes: 16 additions & 3 deletions arrow-data/src/transform/list_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use crate::ArrayData;
use crate::transform::_MutableArrayData;
use arrow_buffer::ArrowNativeType;
use arrow_schema::ArrowError;
use num_integer::Integer;
use num_traits::CheckedAdd;

Expand All @@ -33,23 +34,35 @@ pub(super) fn build_extend<T: ArrowNativeType + Integer + CheckedAdd>(
for i in start..start + len {
mutable.buffer1.push(new_offset);
mutable.buffer2.push(sizes[i]);
new_offset = new_offset.checked_add(&sizes[i]).expect("offset overflow");
new_offset = new_offset.checked_add(&sizes[i]).ok_or_else(|| {
ArrowError::InvalidArgumentError(
"offset overflow: data exceeds the capacity of the offset type. \
Try splitting into smaller batches or using a larger type \
(e.g. LargeListView instead of ListView)"
.to_string(),
)
})?;

let size = sizes[i].as_usize();
if size > 0 {
let child_start = offsets[i].as_usize();
mutable.child_data[0].extend(index, child_start, child_start + size);
mutable.child_data[0].try_extend(index, child_start, child_start + size)?;
}
}
Ok(())
},
)
}

pub(super) fn extend_nulls<T: ArrowNativeType>(mutable: &mut _MutableArrayData, len: usize) {
pub(super) fn extend_nulls<T: ArrowNativeType>(
mutable: &mut _MutableArrayData,
len: usize,
) -> Result<(), ArrowError> {
let offset_buffer = &mut mutable.buffer1;
let sizes_buffer = &mut mutable.buffer2;

// We push 0 as a placeholder for NULL values in both the offsets and sizes
(0..len).for_each(|_| offset_buffer.push(T::default()));
(0..len).for_each(|_| sizes_buffer.push(T::default()));
Ok(())
}
86 changes: 75 additions & 11 deletions arrow-data/src/transform/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,10 @@ mod variable_size;
type ExtendNullBits<'a> = Box<dyn Fn(&mut _MutableArrayData, usize, usize) + 'a>;
// function that extends `[start..start+len]` to the mutable array.
// this is dynamic because different data_types influence how buffers and children are extended.
type Extend<'a> = Box<dyn Fn(&mut _MutableArrayData, usize, usize, usize) + 'a>;
type Extend<'a> =
Box<dyn Fn(&mut _MutableArrayData, usize, usize, usize) -> Result<(), ArrowError> + 'a>;

type ExtendNulls = Box<dyn Fn(&mut _MutableArrayData, usize)>;
type ExtendNulls = Box<dyn Fn(&mut _MutableArrayData, usize) -> Result<(), ArrowError>>;

/// A mutable [ArrayData] that knows how to freeze itself into an [ArrayData].
/// This is just a data container.
Expand Down Expand Up @@ -230,7 +231,8 @@ fn build_extend_view(array: &ArrayData, buffer_offset: u32) -> Extend<'_> {
let mut view = ByteView::from(*v);
view.buffer_index += buffer_offset;
view.into()
}))
}));
Ok(())
},
)
}
Expand Down Expand Up @@ -628,7 +630,10 @@ impl<'a> MutableArrayData<'a> {
let mut mutable = MutableArrayData::new(dictionaries, false, capacity);

for (i, len) in lengths.iter().enumerate() {
mutable.extend(i, 0, *len)
mutable.try_extend(i, 0, *len).expect(
"extend failed while building dictionary; \
this is a bug in MutableArrayData",
)
}

(Some(mutable.freeze()), true)
Expand Down Expand Up @@ -716,36 +721,95 @@ impl<'a> MutableArrayData<'a> {
}
}

/// Extends the in progress array with a region of the input arrays
/// Extends the in progress array with a region of the input arrays, returning an error on
/// overflow.
///
/// # Arguments
/// * `index` - the index of array that you what to copy values from
/// * `index` - the index of array that you want to copy values from
/// * `start` - the start index of the chunk (inclusive)
/// * `end` - the end index of the chunk (exclusive)
///
/// # Errors
/// Returns an error if offset arithmetic overflows the underlying integer type.
///
/// # Panic
/// This function panics if there is an invalid index,
/// i.e. `index` >= the number of source arrays
/// or `end` > the length of the `index`th array
pub fn extend(&mut self, index: usize, start: usize, end: usize) {
pub fn try_extend(&mut self, index: usize, start: usize, end: usize) -> Result<(), ArrowError> {
let len = end - start;
(self.extend_null_bits[index])(&mut self.data, start, len);
(self.extend_values[index])(&mut self.data, index, start, len);
// Snapshot buffer lengths before attempting the extend so we can roll
// back to a consistent state if it fails.
let buf1_len = self.data.buffer1.len();
let buf2_len = self.data.buffer2.len();
if let Err(e) = (self.extend_values[index])(&mut self.data, index, start, len) {
// Restore buffers to their pre-call lengths so the array remains
// in a valid state for the caller to inspect or retry.
self.data.buffer1.truncate(buf1_len);
self.data.buffer2.truncate(buf2_len);
return Err(e);
}
self.data.len += len;
Ok(())
}

/// Extends the in progress array with null elements, ignoring the input arrays.
/// Extends the in progress array with a region of the input arrays.
///
/// # Deprecated
/// Use [`try_extend`](Self::try_extend) instead, which returns an [`ArrowError`] on overflow
/// rather than panicking.
///
/// # Panic
/// This function panics if there is an invalid index,
/// i.e. `index` >= the number of source arrays,
/// `end` > the length of the `index`th array,
/// or the offset type overflows (e.g. more than 2 GiB in a `StringArray`).
#[deprecated(
since = "59.0.0",
note = "Use `try_extend` which returns an error on overflow instead of panicking"
)]
pub fn extend(&mut self, index: usize, start: usize, end: usize) {
self.try_extend(index, start, end)
.expect("extend failed due to offset overflow")
}

/// Extends the in progress array with null elements, ignoring the input arrays, returning an
/// error on overflow.
///
/// Prefer this over [`extend_nulls`](Self::extend_nulls) to handle cases where the run-end
/// counter overflows (relevant for `RunEndEncoded` arrays).
///
/// # Panics
///
/// Panics if [`MutableArrayData`] not created with `use_nulls` or nullable source arrays
pub fn extend_nulls(&mut self, len: usize) {
pub fn try_extend_nulls(&mut self, len: usize) -> Result<(), ArrowError> {
self.data.len += len;
let bit_len = bit_util::ceil(self.data.len, 8);
let nulls = self.data.null_buffer();
nulls.resize(bit_len, 0);
self.data.null_count += len;
(self.extend_nulls)(&mut self.data, len);
(self.extend_nulls)(&mut self.data, len)?;
Ok(())
}

/// Extends the in progress array with null elements, ignoring the input arrays.
///
/// # Deprecated
/// Use [`try_extend_nulls`](Self::try_extend_nulls) instead, which returns an [`ArrowError`]
/// on overflow rather than panicking.
///
/// # Panics
///
/// Panics if [`MutableArrayData`] not created with `use_nulls` or nullable source arrays,
/// or if the run-end counter overflows for `RunEndEncoded` arrays.
#[deprecated(
since = "59.0.0",
note = "Use `try_extend_nulls` which returns an error on overflow instead of panicking"
)]
pub fn extend_nulls(&mut self, len: usize) {
self.try_extend_nulls(len)
.expect("extend_nulls failed due to overflow")
}

/// Returns the current length
Expand Down
9 changes: 7 additions & 2 deletions arrow-data/src/transform/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@ use super::{_MutableArrayData, Extend};
use crate::ArrayData;

pub(super) fn build_extend(_: &ArrayData) -> Extend<'_> {
Box::new(move |_, _, _, _| {})
Box::new(move |_, _, _, _| Ok(()))
}

pub(super) fn extend_nulls(_: &mut _MutableArrayData, _: usize) {}
pub(super) fn extend_nulls(
_: &mut _MutableArrayData,
_: usize,
) -> Result<(), arrow_schema::ArrowError> {
Ok(())
}
Loading
Loading