Skip to content

Commit 3c9d810

Browse files
committed
c
1 parent f8cd752 commit 3c9d810

File tree

4 files changed

+217
-169
lines changed

4 files changed

+217
-169
lines changed

rust/otap-dataflow/crates/pdata/src/otap.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ impl From<Traces> for OtapArrowRecords {
184184
/// storing and retrieving Arrow record batches in a type-safe manner. It is
185185
/// implemented by various structs that represent each signal type and provides
186186
/// methods to efficiently set and get record batches.
187-
pub trait OtapBatchStore: Default + Clone {
187+
pub trait OtapBatchStore: Default + Clone + Into<OtapArrowRecords> {
188188
/// Internally, implementers should use a bitmask for the types they support.
189189
/// The offsets in the bitmask should correspond to the ArrowPayloadType enum values.
190190
const TYPE_MASK: u64;

rust/otap-dataflow/crates/pdata/src/otap/testing.rs

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,18 @@ use crate::schema::schema::{DataType as OtapDataType, SimpleType};
2626
/// Known ID column paths that need plain encoding metadata.
2727
pub const ID_COLUMN_PATHS: &[&str] = &["id", "resource.id", "scope.id", "parent_id"];
2828

29-
/// Build a `[Option<RecordBatch>; Logs::COUNT]` from payload‑type / column specs.
29+
/// Build a [`Logs`] store from payload‑type / column specs.
3030
///
3131
/// Missing required columns are automatically filled in using the schema spec.
32+
/// Use `.into()` to convert to [`OtapArrowRecords`], or `.into_batches()` to
33+
/// get the raw `[Option<RecordBatch>; Logs::COUNT]` array.
3234
///
3335
/// ```ignore
34-
/// logs!(
36+
/// let logs: Logs = logs!(
3537
/// (Logs, ("id", UInt16, vec![0u16, 1, 2])),
3638
/// (LogAttrs, ("parent_id", UInt16, vec![0u16, 1, 2])),
37-
/// )
39+
/// );
40+
/// let records: OtapArrowRecords = logs.into();
3841
/// ```
3942
#[macro_export]
4043
macro_rules! logs {
@@ -54,9 +57,11 @@ macro_rules! logs {
5457
}
5558
pub use logs;
5659

57-
/// Build a `[Option<RecordBatch>; Traces::COUNT]` from payload‑type / column specs.
60+
/// Build a [`Traces`] store from payload‑type / column specs.
5861
///
5962
/// Missing required columns are automatically filled in using the schema spec.
63+
/// Use `.into()` to convert to [`OtapArrowRecords`], or `.into_batches()` to
64+
/// get the raw `[Option<RecordBatch>; Traces::COUNT]` array.
6065
#[macro_export]
6166
macro_rules! traces {
6267
($(($payload:ident, $($record_batch_args:tt)*)),* $(,)?) => {
@@ -75,9 +80,11 @@ macro_rules! traces {
7580
}
7681
pub use traces;
7782

78-
/// Build a `[Option<RecordBatch>; Metrics::COUNT]` from payload‑type / column specs.
83+
/// Build a [`Metrics`] store from payload‑type / column specs.
7984
///
8085
/// Missing required columns are automatically filled in using the schema spec.
86+
/// Use `.into()` to convert to [`OtapArrowRecords`], or `.into_batches()` to
87+
/// get the raw `[Option<RecordBatch>; Metrics::COUNT]` array.
8188
#[macro_export]
8289
macro_rules! metrics {
8390
($(($payload:ident, $($record_batch_args:tt)*)),* $(,)?) => {
@@ -180,15 +187,15 @@ fn normalize_test_batch(
180187
batch
181188
}
182189

183-
/// Create a `[Option<RecordBatch>; N]` from a list of (payload_type, batch) pairs.
190+
/// Create an `OtapBatchStore` (`Logs`, `Metrics`, or `Traces`) from payload-type / column specs.
184191
///
185192
/// Each batch is normalized (dotted columns -> structs), completed (missing columns
186193
/// filled from spec), and marked with encoding metadata. For metrics, missing data
187194
/// point tables are auto-generated based on metric_type.
188195
#[must_use]
189196
pub fn make_test_batch<S: OtapBatchStore, const N: usize>(
190197
inputs: Vec<(ArrowPayloadType, RecordBatch)>,
191-
) -> [Option<RecordBatch>; N] {
198+
) -> S {
192199
let allowed = S::allowed_payload_types();
193200
let mut result: [Option<RecordBatch>; N] = std::array::from_fn(|_| None);
194201
let all_payload_types: Vec<ArrowPayloadType> = inputs.iter().map(|(pt, _)| *pt).collect();
@@ -217,7 +224,13 @@ pub fn make_test_batch<S: OtapBatchStore, const N: usize>(
217224
// Auto-generate missing data point tables for metrics
218225
generate_missing_dp_tables(&mut result);
219226

220-
result
227+
// Move the completed batches into a new store
228+
let mut store = S::new();
229+
let batches_mut = store.batches_mut();
230+
for (idx, batch) in result.into_iter().enumerate() {
231+
batches_mut[idx] = batch;
232+
}
233+
store
221234
}
222235

223236
/// Fill in missing columns for a test batch using the OTAP schema spec.

0 commit comments

Comments
 (0)