Skip to content

Commit d303f58

Browse files
authored
chore: Add end-to-end benchmark for array_agg, code cleanup (#20496)
## Which issue does this PR close? - Prep work for #20465 ## Rationale for this change - Add three queries to measure the end-to-end performance of `array_agg()`, as prep work for optimizing its performance. ## What changes are included in this PR? This PR also cleans up the `data_utils` benchmark code: - Seed the RNG once and use it for all data generation. The previous coding seeded an RNG but only used it for some data, and also used the same seed for every batch, which lead to repeated data (... I assume this was not the intent?) - The previous code made `u64_wide` a nullable field, but passed `9.0` for the `value_density` when generating data, which meant that no NULL values would ever be generated. Switch to making `u64_wide` non-nullable. - Fix up comments, remove a clippy suppress, various other cleanups. ## Are these changes tested? Yes. ## Are there any user-facing changes? No.
1 parent df8f818 commit d303f58

File tree

2 files changed

+80
-54
lines changed

2 files changed

+80
-54
lines changed

datafusion/core/benches/aggregate_query_sql.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,39 @@ fn criterion_benchmark(c: &mut Criterion) {
251251
)
252252
})
253253
});
254+
255+
c.bench_function("array_agg_query_group_by_few_groups", |b| {
256+
b.iter(|| {
257+
query(
258+
ctx.clone(),
259+
&rt,
260+
"SELECT u64_narrow, array_agg(f64) \
261+
FROM t GROUP BY u64_narrow",
262+
)
263+
})
264+
});
265+
266+
c.bench_function("array_agg_query_group_by_mid_groups", |b| {
267+
b.iter(|| {
268+
query(
269+
ctx.clone(),
270+
&rt,
271+
"SELECT u64_mid, array_agg(f64) \
272+
FROM t GROUP BY u64_mid",
273+
)
274+
})
275+
});
276+
277+
c.bench_function("array_agg_query_group_by_many_groups", |b| {
278+
b.iter(|| {
279+
query(
280+
ctx.clone(),
281+
&rt,
282+
"SELECT u64_wide, array_agg(f64) \
283+
FROM t GROUP BY u64_wide",
284+
)
285+
})
286+
});
254287
}
255288

256289
criterion_group!(benches, criterion_benchmark);

datafusion/core/benches/data_utils/mod.rs

Lines changed: 47 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ pub fn create_table_provider(
4545
) -> Result<Arc<MemTable>> {
4646
let schema = Arc::new(create_schema());
4747
let partitions =
48-
create_record_batches(schema.clone(), array_len, partitions_len, batch_size);
48+
create_record_batches(&schema, array_len, partitions_len, batch_size);
4949
// declare a table in memory. In spark API, this corresponds to createDataFrame(...).
5050
MemTable::try_new(schema, partitions).map(Arc::new)
5151
}
@@ -56,21 +56,19 @@ pub fn create_schema() -> Schema {
5656
Field::new("utf8", DataType::Utf8, false),
5757
Field::new("f32", DataType::Float32, false),
5858
Field::new("f64", DataType::Float64, true),
59-
// This field will contain integers randomly selected from a large
60-
// range of values, i.e. [0, u64::MAX], such that there are none (or
61-
// very few) repeated values.
62-
Field::new("u64_wide", DataType::UInt64, true),
63-
// This field will contain integers randomly selected from a narrow
64-
// range of values such that there are a few distinct values, but they
65-
// are repeated often.
59+
// Integers randomly selected from a wide range of values, i.e. [0,
60+
// u64::MAX], such that there are ~no repeated values.
61+
Field::new("u64_wide", DataType::UInt64, false),
62+
// Integers randomly selected from a mid-range of values [0, 1000),
63+
// providing ~1000 distinct groups.
64+
Field::new("u64_mid", DataType::UInt64, false),
65+
// Integers randomly selected from a narrow range of values such that
66+
// there are a few distinct values, but they are repeated often.
6667
Field::new("u64_narrow", DataType::UInt64, false),
6768
])
6869
}
6970

70-
fn create_data(size: usize, null_density: f64) -> Vec<Option<f64>> {
71-
// use random numbers to avoid spurious compiler optimizations wrt to branching
72-
let mut rng = StdRng::seed_from_u64(42);
73-
71+
fn create_data(rng: &mut StdRng, size: usize, null_density: f64) -> Vec<Option<f64>> {
7472
(0..size)
7573
.map(|_| {
7674
if rng.random::<f64>() > null_density {
@@ -82,56 +80,43 @@ fn create_data(size: usize, null_density: f64) -> Vec<Option<f64>> {
8280
.collect()
8381
}
8482

85-
fn create_integer_data(
86-
rng: &mut StdRng,
87-
size: usize,
88-
value_density: f64,
89-
) -> Vec<Option<u64>> {
90-
(0..size)
91-
.map(|_| {
92-
if rng.random::<f64>() > value_density {
93-
None
94-
} else {
95-
Some(rng.random::<u64>())
96-
}
97-
})
98-
.collect()
99-
}
100-
10183
fn create_record_batch(
10284
schema: SchemaRef,
10385
rng: &mut StdRng,
10486
batch_size: usize,
105-
i: usize,
87+
batch_index: usize,
10688
) -> RecordBatch {
107-
// the 4 here is the number of different keys.
108-
// a higher number increase sparseness
109-
let vs = [0, 1, 2, 3];
110-
let keys: Vec<String> = (0..batch_size)
111-
.map(
112-
// use random numbers to avoid spurious compiler optimizations wrt to branching
113-
|_| format!("hi{:?}", vs.choose(rng)),
114-
)
115-
.collect();
116-
let keys: Vec<&str> = keys.iter().map(|e| &**e).collect();
89+
// Randomly choose from 4 distinct key values; a higher number increases sparseness.
90+
let key_suffixes = [0, 1, 2, 3];
91+
let keys = StringArray::from_iter_values(
92+
(0..batch_size).map(|_| format!("hi{}", key_suffixes.choose(rng).unwrap())),
93+
);
11794

118-
let values = create_data(batch_size, 0.5);
95+
let values = create_data(rng, batch_size, 0.5);
11996

12097
// Integer values between [0, u64::MAX].
121-
let integer_values_wide = create_integer_data(rng, batch_size, 9.0);
98+
let integer_values_wide = (0..batch_size)
99+
.map(|_| rng.random::<u64>())
100+
.collect::<Vec<_>>();
122101

123-
// Integer values between [0, 9].
102+
// Integer values between [0, 1000).
103+
let integer_values_mid = (0..batch_size)
104+
.map(|_| rng.random_range(0..1000))
105+
.collect::<Vec<_>>();
106+
107+
// Integer values between [0, 10).
124108
let integer_values_narrow = (0..batch_size)
125-
.map(|_| rng.random_range(0_u64..10))
109+
.map(|_| rng.random_range(0..10))
126110
.collect::<Vec<_>>();
127111

128112
RecordBatch::try_new(
129113
schema,
130114
vec![
131-
Arc::new(StringArray::from(keys)),
132-
Arc::new(Float32Array::from(vec![i as f32; batch_size])),
115+
Arc::new(keys),
116+
Arc::new(Float32Array::from(vec![batch_index as f32; batch_size])),
133117
Arc::new(Float64Array::from(values)),
134118
Arc::new(UInt64Array::from(integer_values_wide)),
119+
Arc::new(UInt64Array::from(integer_values_mid)),
135120
Arc::new(UInt64Array::from(integer_values_narrow)),
136121
],
137122
)
@@ -140,21 +125,29 @@ fn create_record_batch(
140125

141126
/// Create record batches of `partitions_len` partitions and `batch_size` for each batch,
142127
/// with a total number of `array_len` records
143-
#[expect(clippy::needless_pass_by_value)]
144128
pub fn create_record_batches(
145-
schema: SchemaRef,
129+
schema: &SchemaRef,
146130
array_len: usize,
147131
partitions_len: usize,
148132
batch_size: usize,
149133
) -> Vec<Vec<RecordBatch>> {
150134
let mut rng = StdRng::seed_from_u64(42);
151-
(0..partitions_len)
152-
.map(|_| {
153-
(0..array_len / batch_size / partitions_len)
154-
.map(|i| create_record_batch(schema.clone(), &mut rng, batch_size, i))
155-
.collect::<Vec<_>>()
156-
})
157-
.collect::<Vec<_>>()
135+
let mut partitions = Vec::with_capacity(partitions_len);
136+
let batches_per_partition = array_len / batch_size / partitions_len;
137+
138+
for _ in 0..partitions_len {
139+
let mut batches = Vec::with_capacity(batches_per_partition);
140+
for batch_index in 0..batches_per_partition {
141+
batches.push(create_record_batch(
142+
schema.clone(),
143+
&mut rng,
144+
batch_size,
145+
batch_index,
146+
));
147+
}
148+
partitions.push(batches);
149+
}
150+
partitions
158151
}
159152

160153
/// An enum that wraps either a regular StringBuilder or a GenericByteViewBuilder

0 commit comments

Comments
 (0)