Skip to content

Commit a432b84

Browse files
committed
Move producer benches to processor + Produce Error in Export
* Add producer error as a variant for export errors. * Move all benchmarks related to producer to processor crate since producer is moved there as well. * Corresponding changes in development CLI tool configuration file for benchmarks. * Apply adjustments in the producer loop with records collectors and state delivering in benchmarks.
1 parent 4902a9e commit a432b84

File tree

19 files changed

+115
-92
lines changed

19 files changed

+115
-92
lines changed

application/apps/indexer/plugins_host/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ dir_checksum = { path = "../../../../cli/development-cli/dir_checksum" }
2626

2727
[dev-dependencies]
2828
criterion = { workspace = true, features = ["async_tokio"] }
29+
processor = { path = "../processor"}
2930

3031
[[bench]]
3132
name = "plugin_parser_init"

application/apps/indexer/plugins_host/benches/plugin_utls.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use stypes::{PluginConfigItem, PluginConfigValue};
99
// Rust LSP may mark this as an error, but this is an issue with the LSP itself.
1010
// The code will compile without problems.
1111

12-
#[path = "./../../sources/benches/bench_utls.rs"]
12+
#[path = "./../../processor/benches/bench_utls.rs"]
1313
mod bench_utls;
1414

1515
/// Represents the needed configuration to run benchmarks on a plugin.

application/apps/indexer/processor/Cargo.toml

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,50 @@ uuid = { workspace = true , features = ["serde", "v4"] }
2727
stypes = { path = "../stypes", features=["rustcore"] }
2828

2929
[dev-dependencies]
30-
criterion.workspace = true
30+
criterion = { workspace = true, features = ["async_tokio"] }
3131
pretty_assertions = "1.4"
3232
rand.workspace = true
3333
tempfile.workspace = true
3434
tokio.workspace = true
35+
plugins_host = {path = "../plugins_host/"}
36+
toml.workspace = true
3537

3638
[[bench]]
3739
name = "map_benchmarks"
3840
harness = false
41+
42+
[[bench]]
43+
name = "dlt_producer"
44+
harness = false
45+
46+
[[bench]]
47+
name = "someip_producer"
48+
harness = false
49+
50+
[[bench]]
51+
name = "someip_legacy_producer"
52+
harness = false
53+
54+
[[bench]]
55+
name = "text_producer"
56+
harness = false
57+
58+
[[bench]]
59+
name = "plugin_praser_producer"
60+
harness = false
61+
62+
[[bench]]
63+
name = "mocks_once_producer"
64+
harness = false
65+
66+
[[bench]]
67+
name = "mocks_once_parallel"
68+
harness = false
69+
70+
[[bench]]
71+
name = "mocks_multi_producer"
72+
harness = false
73+
74+
[[bench]]
75+
name = "mocks_multi_parallel"
76+
harness = false

application/apps/indexer/sources/benches/bench_utls.rs renamed to application/apps/indexer/processor/benches/bench_utls.rs

Lines changed: 47 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,17 @@
33
#![allow(unused)]
44

55
use std::{
6+
alloc::Layout,
67
fs::File,
78
io::{Cursor, Read},
89
path::PathBuf,
910
time::Duration,
1011
};
1112

1213
use criterion::Criterion;
13-
use parsers::{LogMessage, MessageStreamItem};
14-
use sources::{binary::raw::BinaryByteSource, producer::MessageProducer};
14+
use parsers::{LogMessage, ParseYield};
15+
use processor::producer::{GeneralLogCollector, MessageProducer};
16+
use sources::binary::raw::BinaryByteSource;
1517

1618
pub const INPUT_SOURCE_ENV_VAR: &str = "CHIPMUNK_BENCH_SOURCE";
1719
pub const CONFIG_ENV_VAR: &str = "CHIPMUNK_BENCH_CONFIG";
@@ -63,12 +65,12 @@ pub fn get_config() -> Option<String> {
6365
/// The purpose of this struct is to convince the compiler that we are using all input
6466
/// possibilities of producer to avoid unwanted optimizations.
6567
pub struct ProducerCounter {
68+
pub items: usize,
6669
pub msg: usize,
6770
pub txt: usize,
6871
pub att: usize,
69-
pub skipped: usize,
70-
pub incomplete: usize,
71-
pub empty: usize,
72+
pub loaded_bytes: usize,
73+
pub skipped_bytes: usize,
7274
}
7375

7476
/// Run producer until the end converting messages into strings too, while counting all the
@@ -80,28 +82,48 @@ where
8082
T: LogMessage,
8183
{
8284
let mut counter = ProducerCounter::default();
85+
let mut collector = GeneralLogCollector::default();
86+
87+
loop {
88+
collector.get_records().clear();
89+
match producer.produce_next(&mut collector).await.unwrap() {
90+
processor::producer::ProduceSummary::Processed {
91+
bytes_consumed,
92+
messages_count,
93+
skipped_bytes,
94+
} => {
95+
counter.items += messages_count;
96+
counter.loaded_bytes += bytes_consumed;
97+
counter.skipped_bytes += skipped_bytes;
98+
}
99+
processor::producer::ProduceSummary::NoBytesAvailable { skipped_bytes } => {
100+
counter.skipped_bytes += skipped_bytes;
101+
break;
102+
}
103+
processor::producer::ProduceSummary::Done {
104+
loaded_bytes,
105+
skipped_bytes,
106+
produced_messages,
107+
} => {
108+
counter.loaded_bytes = loaded_bytes;
109+
counter.skipped_bytes = skipped_bytes;
110+
counter.items = produced_messages;
111+
break;
112+
}
113+
}
83114

84-
while let Some(items) = producer.read_next_segment().await {
85-
for (_, i) in items {
86-
match i {
87-
MessageStreamItem::Item(item) => match item {
88-
parsers::ParseYield::Message(msg) => {
89-
counter.msg += 1;
90-
counter.txt += msg.to_string().len();
91-
}
92-
parsers::ParseYield::Attachment(att) => counter.att += att.size,
93-
parsers::ParseYield::MessageAndAttachment((msg, att)) => {
94-
counter.msg += 1;
95-
counter.txt += msg.to_string().len();
96-
counter.att += att.size;
97-
}
98-
},
99-
MessageStreamItem::Skipped => {
100-
counter.skipped += 1;
115+
for item in collector.get_records() {
116+
match item {
117+
ParseYield::Message(msg) => {
118+
counter.msg += 1;
119+
counter.txt += msg.to_string().len();
120+
}
121+
ParseYield::Attachment(att) => counter.att += att.size,
122+
ParseYield::MessageAndAttachment((msg, att)) => {
123+
counter.msg += 1;
124+
counter.txt += msg.to_string().len();
125+
counter.att += att.size;
101126
}
102-
MessageStreamItem::Incomplete => counter.incomplete += 1,
103-
MessageStreamItem::Empty => counter.empty += 1,
104-
MessageStreamItem::Done => break,
105127
}
106128
}
107129
}

application/apps/indexer/sources/benches/dlt_producer.rs renamed to application/apps/indexer/processor/benches/dlt_producer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use bench_utls::{
55
bench_standrad_config, create_binary_bytesource, get_config, read_binary, run_producer,
66
};
77
use parsers::dlt::{self, DltParser};
8-
use sources::producer::MessageProducer;
8+
use processor::producer::MessageProducer;
99

1010
mod bench_utls;
1111

application/apps/indexer/sources/benches/mocks/mock_parser.rs renamed to application/apps/indexer/processor/benches/mocks/mock_parser.rs

File renamed without changes.

application/apps/indexer/sources/benches/mocks/mock_source.rs renamed to application/apps/indexer/processor/benches/mocks/mock_source.rs

File renamed without changes.

application/apps/indexer/sources/benches/mocks/mod.rs renamed to application/apps/indexer/processor/benches/mocks/mod.rs

File renamed without changes.

application/apps/indexer/sources/benches/mocks_multi_parallel.rs renamed to application/apps/indexer/processor/benches/mocks_multi_parallel.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
1010

1111
use bench_utls::{bench_standrad_config, run_producer};
1212
use mocks::{mock_parser::MockParser, mock_source::MockByteSource};
13-
use sources::producer::MessageProducer;
13+
use processor::producer::MessageProducer;
1414

1515
mod bench_utls;
1616
mod mocks;

application/apps/indexer/sources/benches/mocks_multi_producer.rs renamed to application/apps/indexer/processor/benches/mocks_multi_producer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
use bench_utls::{bench_standrad_config, run_producer};
77
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
88
use mocks::{mock_parser::MockParser, mock_source::MockByteSource};
9-
use sources::producer::MessageProducer;
9+
use processor::producer::MessageProducer;
1010
mod bench_utls;
1111
mod mocks;
1212

0 commit comments

Comments
 (0)