Skip to content

Commit 53acf95

Browse files
authored
Merge pull request #2416 from AmmarAbouZor/producer-collector
Refactor Producer: Collect records and write them in bulks & Fix tailing for binary files
2 parents f83f86a + a432b84 commit 53acf95

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+2263
-1530
lines changed

application/apps/indexer/addons/dlt-tools/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ indexer_base = { path = "../../indexer_base" }
99
log.workspace = true
1010
parsers = { path = "../../parsers" }
1111
sources = { path = "../../sources" }
12+
processor = { path = "../../processor"}
1213
tokio = { workspace = true , features = ["full"] }
1314
tokio-util = { workspace = true, features = ["codec", "net"] }
1415

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
use parsers::{Attachment, LogMessage};
2+
use processor::producer::LogRecordsCollector;
3+
4+
/// Logs Collector with interest of attachments only.
5+
#[derive(Default)]
6+
pub struct AttachmentsCollector {
7+
pub attachments: Vec<Attachment>,
8+
}
9+
10+
impl<T: LogMessage> LogRecordsCollector<T> for AttachmentsCollector {
11+
fn append(&mut self, log_record: parsers::ParseYield<T>) {
12+
match log_record {
13+
parsers::ParseYield::Message(_) => {}
14+
parsers::ParseYield::Attachment(attachment) => self.attachments.push(attachment),
15+
parsers::ParseYield::MessageAndAttachment((_msg, attachment)) => {
16+
self.attachments.push(attachment)
17+
}
18+
}
19+
}
20+
}

application/apps/indexer/addons/dlt-tools/src/lib.rs

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,20 @@ extern crate indexer_base;
1616
extern crate log;
1717

1818
use dlt_core::filtering::DltFilterConfig;
19-
use parsers::{Attachment, MessageStreamItem, ParseYield, dlt::DltParser};
20-
use sources::{binary::raw::BinaryByteSource, producer::MessageProducer};
19+
use parsers::{Attachment, dlt::DltParser};
20+
use processor::producer::{MessageProducer, ProduceSummary};
21+
use sources::binary::raw::BinaryByteSource;
2122
use std::{
2223
fs::File,
2324
io::{BufReader, BufWriter, Write},
2425
path::{Path, PathBuf},
2526
};
2627
use tokio_util::sync::CancellationToken;
2728

29+
use crate::attachments_collector::AttachmentsCollector;
30+
31+
mod attachments_collector;
32+
2833
pub async fn scan_dlt_ft(
2934
input: PathBuf,
3035
filter: Option<DltFilterConfig>,
@@ -47,7 +52,8 @@ pub async fn scan_dlt_ft(
4752

4853
let mut canceled = false;
4954

50-
let mut attachments = vec![];
55+
let mut collector = AttachmentsCollector::default();
56+
5157
loop {
5258
tokio::select! {
5359
// Check on events in current order ensuring cancel will be checked at first
@@ -58,20 +64,18 @@ pub async fn scan_dlt_ft(
5864
canceled = true;
5965
break;
6066
}
61-
items = producer.read_next_segment() => {
62-
match items {
63-
Some(items) => {
64-
for (_, item) in items {
65-
if let MessageStreamItem::Item(ParseYield::MessageAndAttachment((_msg, attachment))) = item {
66-
attachments.push(attachment.to_owned());
67-
} else if let MessageStreamItem::Item(ParseYield::Attachment(attachment)) = item {
68-
attachments.push(attachment.to_owned());
69-
}
70-
}
71-
}
72-
_ => {
67+
prod_res = producer.produce_next(&mut collector) => {
68+
match prod_res {
69+
Ok(ProduceSummary::Processed {..}) => {
70+
// Attachments are appended.
71+
},
72+
Ok(ProduceSummary::Done {..} | ProduceSummary::NoBytesAvailable {..}) => {
73+
// Stop as tailing isn't needed here.
7374
break;
7475
}
76+
Err(err) => {
77+
return Err(format!("Error while processing DLT file. {err}"));
78+
},
7579
}
7680
}
7781
}
@@ -81,7 +85,7 @@ pub async fn scan_dlt_ft(
8185
return Ok(Vec::new());
8286
}
8387

84-
Ok(attachments)
88+
Ok(collector.attachments)
8589
}
8690
Err(error) => Err(format!("failed to open file: {error}")),
8791
}

application/apps/indexer/parsers/src/lib.rs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -97,15 +97,6 @@ pub trait LogMessage: Display + Serialize {
9797
fn to_writer<W: Write>(&self, writer: &mut W) -> Result<usize, std::io::Error>;
9898
}
9999

100-
#[derive(Debug)]
101-
pub enum MessageStreamItem<T: LogMessage> {
102-
Item(ParseYield<T>),
103-
Skipped,
104-
Incomplete,
105-
Empty,
106-
Done,
107-
}
108-
109100
/// A trait for parsers that extract one item at a time from a byte slice.
110101
///
111102
/// Any type implementing this trait will automatically implement the [`Parser`] trait

application/apps/indexer/plugins_host/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ dir_checksum = { path = "../../../../cli/development-cli/dir_checksum" }
2626

2727
[dev-dependencies]
2828
criterion = { workspace = true, features = ["async_tokio"] }
29+
processor = { path = "../processor"}
2930

3031
[[bench]]
3132
name = "plugin_parser_init"

application/apps/indexer/plugins_host/benches/plugin_utls.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use stypes::{PluginConfigItem, PluginConfigValue};
99
// Rust LSP may mark this as an error, but this is an issue with the LSP itself.
1010
// The code will compile without problems.
1111

12-
#[path = "./../../sources/benches/bench_utls.rs"]
12+
#[path = "./../../processor/benches/bench_utls.rs"]
1313
mod bench_utls;
1414

1515
/// Represents the needed configuration to run benchmarks on a plugin.

application/apps/indexer/processor/Cargo.toml

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,50 @@ uuid = { workspace = true , features = ["serde", "v4"] }
2727
stypes = { path = "../stypes", features=["rustcore"] }
2828

2929
[dev-dependencies]
30-
criterion.workspace = true
30+
criterion = { workspace = true, features = ["async_tokio"] }
3131
pretty_assertions = "1.4"
3232
rand.workspace = true
3333
tempfile.workspace = true
34+
tokio.workspace = true
35+
plugins_host = {path = "../plugins_host/"}
36+
toml.workspace = true
3437

3538
[[bench]]
3639
name = "map_benchmarks"
3740
harness = false
41+
42+
[[bench]]
43+
name = "dlt_producer"
44+
harness = false
45+
46+
[[bench]]
47+
name = "someip_producer"
48+
harness = false
49+
50+
[[bench]]
51+
name = "someip_legacy_producer"
52+
harness = false
53+
54+
[[bench]]
55+
name = "text_producer"
56+
harness = false
57+
58+
[[bench]]
59+
name = "plugin_praser_producer"
60+
harness = false
61+
62+
[[bench]]
63+
name = "mocks_once_producer"
64+
harness = false
65+
66+
[[bench]]
67+
name = "mocks_once_parallel"
68+
harness = false
69+
70+
[[bench]]
71+
name = "mocks_multi_producer"
72+
harness = false
73+
74+
[[bench]]
75+
name = "mocks_multi_parallel"
76+
harness = false

application/apps/indexer/sources/benches/bench_utls.rs renamed to application/apps/indexer/processor/benches/bench_utls.rs

Lines changed: 47 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,17 @@
33
#![allow(unused)]
44

55
use std::{
6+
alloc::Layout,
67
fs::File,
78
io::{Cursor, Read},
89
path::PathBuf,
910
time::Duration,
1011
};
1112

1213
use criterion::Criterion;
13-
use parsers::{LogMessage, MessageStreamItem};
14-
use sources::{binary::raw::BinaryByteSource, producer::MessageProducer};
14+
use parsers::{LogMessage, ParseYield};
15+
use processor::producer::{GeneralLogCollector, MessageProducer};
16+
use sources::binary::raw::BinaryByteSource;
1517

1618
pub const INPUT_SOURCE_ENV_VAR: &str = "CHIPMUNK_BENCH_SOURCE";
1719
pub const CONFIG_ENV_VAR: &str = "CHIPMUNK_BENCH_CONFIG";
@@ -63,12 +65,12 @@ pub fn get_config() -> Option<String> {
6365
/// The purpose of this struct is to convince the compiler that we are using all input
6466
/// possibilities of producer to avoid unwanted optimizations.
6567
pub struct ProducerCounter {
68+
pub items: usize,
6669
pub msg: usize,
6770
pub txt: usize,
6871
pub att: usize,
69-
pub skipped: usize,
70-
pub incomplete: usize,
71-
pub empty: usize,
72+
pub loaded_bytes: usize,
73+
pub skipped_bytes: usize,
7274
}
7375

7476
/// Run producer until the end converting messages into strings too, while counting all the
@@ -80,28 +82,48 @@ where
8082
T: LogMessage,
8183
{
8284
let mut counter = ProducerCounter::default();
85+
let mut collector = GeneralLogCollector::default();
86+
87+
loop {
88+
collector.get_records().clear();
89+
match producer.produce_next(&mut collector).await.unwrap() {
90+
processor::producer::ProduceSummary::Processed {
91+
bytes_consumed,
92+
messages_count,
93+
skipped_bytes,
94+
} => {
95+
counter.items += messages_count;
96+
counter.loaded_bytes += bytes_consumed;
97+
counter.skipped_bytes += skipped_bytes;
98+
}
99+
processor::producer::ProduceSummary::NoBytesAvailable { skipped_bytes } => {
100+
counter.skipped_bytes += skipped_bytes;
101+
break;
102+
}
103+
processor::producer::ProduceSummary::Done {
104+
loaded_bytes,
105+
skipped_bytes,
106+
produced_messages,
107+
} => {
108+
counter.loaded_bytes = loaded_bytes;
109+
counter.skipped_bytes = skipped_bytes;
110+
counter.items = produced_messages;
111+
break;
112+
}
113+
}
83114

84-
while let Some(items) = producer.read_next_segment().await {
85-
for (_, i) in items {
86-
match i {
87-
MessageStreamItem::Item(item) => match item {
88-
parsers::ParseYield::Message(msg) => {
89-
counter.msg += 1;
90-
counter.txt += msg.to_string().len();
91-
}
92-
parsers::ParseYield::Attachment(att) => counter.att += att.size,
93-
parsers::ParseYield::MessageAndAttachment((msg, att)) => {
94-
counter.msg += 1;
95-
counter.txt += msg.to_string().len();
96-
counter.att += att.size;
97-
}
98-
},
99-
MessageStreamItem::Skipped => {
100-
counter.skipped += 1;
115+
for item in collector.get_records() {
116+
match item {
117+
ParseYield::Message(msg) => {
118+
counter.msg += 1;
119+
counter.txt += msg.to_string().len();
120+
}
121+
ParseYield::Attachment(att) => counter.att += att.size,
122+
ParseYield::MessageAndAttachment((msg, att)) => {
123+
counter.msg += 1;
124+
counter.txt += msg.to_string().len();
125+
counter.att += att.size;
101126
}
102-
MessageStreamItem::Incomplete => counter.incomplete += 1,
103-
MessageStreamItem::Empty => counter.empty += 1,
104-
MessageStreamItem::Done => break,
105127
}
106128
}
107129
}

application/apps/indexer/sources/benches/dlt_producer.rs renamed to application/apps/indexer/processor/benches/dlt_producer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use bench_utls::{
55
bench_standrad_config, create_binary_bytesource, get_config, read_binary, run_producer,
66
};
77
use parsers::dlt::{self, DltParser};
8-
use sources::producer::MessageProducer;
8+
use processor::producer::MessageProducer;
99

1010
mod bench_utls;
1111

application/apps/indexer/sources/benches/mocks/mock_parser.rs renamed to application/apps/indexer/processor/benches/mocks/mock_parser.rs

File renamed without changes.

0 commit comments

Comments
 (0)