Skip to content

Commit 4902a9e

Browse files
committed
CLI: Collector changes for producer & Increase version
* Apply the recent changes in producer with records collector in chipmunk CLI tool. * Change the final report since we have access to bytes information with the new changes. * Increase app version & Update changelog
1 parent c0cfdfd commit 4902a9e

File tree

7 files changed

+88
-74
lines changed

7 files changed

+88
-74
lines changed

application/apps/indexer/processor/src/producer/mod.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,24 @@ impl<T: LogMessage, P: Parser<T>, D: ByteSource> MessageProducer<T, P, D> {
306306
}
307307
}
308308

309+
/// Total loaded bytes form byte source in this session.
310+
#[inline]
311+
pub fn total_loaded_bytes(&self) -> usize {
312+
self.total_loaded
313+
}
314+
315+
/// Total skipped bytes by source and parser in this session.
316+
#[inline]
317+
pub fn total_skipped_bytes(&self) -> usize {
318+
self.total_skipped
319+
}
320+
321+
/// Total amount of parsed items produced in this session.
322+
#[inline]
323+
pub fn total_produced_items(&self) -> usize {
324+
self.total_messages
325+
}
326+
309327
/// Append incoming (SDE) Source-Data-Exchange to the underline byte source data.
310328
pub async fn sde_income(
311329
&mut self,

cli/chipmunk-cli/CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
1+
# 0.2.2
2+
3+
## Changes:
4+
5+
* Include bytes informations in session summary.
6+
* Internal performance improvements.
7+
18
# 0.2.1
29

310
## Changes:

cli/chipmunk-cli/Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cli/chipmunk-cli/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "chipmunk-cli"
3-
version = "0.2.1"
3+
version = "0.2.2"
44
authors = ["Ammar Abou Zor <ammar.abou.zor@accenture.com>"]
55
edition = "2024"
66
description = "CLI Tool for parsing bytes form different source supporting multiple data formats"

cli/chipmunk-cli/src/session/file.rs

Lines changed: 28 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ use anyhow::Context;
44
use std::{io::Write as _, path::PathBuf};
55
use tokio_util::sync::CancellationToken;
66

7-
use parsers::{LogMessage, Parser};
8-
use processor::producer::MessageProducer;
7+
use parsers::{LogMessage, ParseYield, Parser};
8+
use processor::producer::{GeneralLogCollector, MessageProducer, ProduceSummary};
99
use sources::ByteSource;
1010

1111
use crate::session::create_append_file_writer;
@@ -37,33 +37,38 @@ where
3737
W: MessageFormatter,
3838
{
3939
let mut producer = MessageProducer::new(parser, bytesource);
40+
let mut collector = GeneralLogCollector::default();
4041

4142
let mut file_writer = create_append_file_writer(&output_path)?;
4243

44+
let write_sum = |p: &mut MessageProducer<_, _, _>| {
45+
super::write_summary(
46+
p.total_produced_items(),
47+
p.total_loaded_bytes(),
48+
p.total_skipped_bytes(),
49+
);
50+
};
51+
4352
let mut msg_count = 0;
44-
let mut skipped_count = 0;
45-
let mut empty_count = 0;
46-
let mut incomplete_count = 0;
4753

4854
loop {
55+
collector.get_records().clear();
4956
tokio::select! {
5057
_ = cancel_token.cancelled() => {
5158
file_writer.flush().context("Error writing data to file.")?;
52-
super::write_summary(msg_count, skipped_count, empty_count, incomplete_count);
59+
write_sum(&mut producer);
5360

5461
return Ok(());
5562
},
56-
Some(items) = producer.read_next_segment() => {
57-
for (_, item) in items {
58-
match item {
59-
parsers::MessageStreamItem::Item(parse_yield) => {
60-
let msg = match parse_yield {
61-
parsers::ParseYield::Message(msg) => msg,
62-
parsers::ParseYield::Attachment(_attachment) => {
63-
// attachment are postponed for now.
64-
continue;
65-
}
66-
parsers::ParseYield::MessageAndAttachment((msg, _attachment)) => msg,
63+
res = producer.produce_next(&mut collector) => {
64+
let summary = res?;
65+
match summary {
66+
ProduceSummary::Processed {..} => {
67+
for record in collector.get_records() {
68+
let msg = match record {
69+
ParseYield::Message(msg) => msg,
70+
ParseYield::Attachment(..) => continue,
71+
ParseYield::MessageAndAttachment((msg, _att)) => msg,
6772
};
6873
msg_formatter.write_msg(&mut file_writer, msg)?;
6974

@@ -72,16 +77,12 @@ where
7277
println!("Processing... {msg_count} messages have been written to file.");
7378
}
7479
}
75-
parsers::MessageStreamItem::Skipped => skipped_count += 1,
76-
parsers::MessageStreamItem::Incomplete => incomplete_count += 1,
77-
parsers::MessageStreamItem::Empty => empty_count += 1,
78-
parsers::MessageStreamItem::Done => {
79-
println!("Parsing Done");
80-
super::write_summary(msg_count, skipped_count, empty_count, incomplete_count);
81-
82-
return Ok(());
83-
}
84-
}
80+
},
81+
// We don't support file tailing in the CLI tool.
82+
ProduceSummary::NoBytesAvailable {..} | ProduceSummary::Done {..} => {
83+
write_sum(&mut producer);
84+
return Ok(());
85+
},
8586
}
8687
}
8788

cli/chipmunk-cli/src/session/mod.rs

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -123,27 +123,15 @@ where
123123
}
124124

125125
/// Writes summary of the process session.
126-
fn write_summary(
127-
msg_count: usize,
128-
skipped_count: usize,
129-
empty_count: usize,
130-
incomplete_count: usize,
131-
) {
126+
fn write_summary(msg_count: usize, loaded_bytes: usize, skipped_bytes: usize) {
132127
const UNDERLINE_ANSI: &str = "\x1b[4m";
133128
const RESET_ANSI: &str = "\x1b[0m";
134129

135130
println!("{UNDERLINE_ANSI}Process Summary{RESET_ANSI}:");
136131

137132
println!("* {msg_count} messages has been written to file.");
138-
if skipped_count > 0 {
139-
println!("* {skipped_count} messages skipped");
140-
}
141-
if empty_count > 0 {
142-
println!("* {empty_count} messages were empty");
143-
}
144-
if incomplete_count > 0 {
145-
println!("* {incomplete_count} messages were incomplete");
146-
}
133+
println!("* {loaded_bytes} bytes has been loaded from source.");
134+
println!("* {skipped_bytes} bytes has been skipped.");
147135
}
148136

149137
/// Creates or append a file with the provided [`file_path`] returning its buffer writer.

cli/chipmunk-cli/src/session/socket.rs

Lines changed: 30 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ use std::{io::Write as _, ops::Deref, path::PathBuf, time::Duration};
55
use tokio::sync::watch;
66
use tokio_util::sync::CancellationToken;
77

8-
use parsers::{LogMessage, Parser};
9-
use processor::producer::MessageProducer;
8+
use parsers::{LogMessage, ParseYield, Parser};
9+
use processor::producer::{GeneralLogCollector, MessageProducer, ProduceSummary};
1010
use sources::{ByteSource, socket::tcp::reconnect::ReconnectStateMsg};
1111

1212
use crate::session::create_append_file_writer;
@@ -44,25 +44,31 @@ where
4444

4545
let mut file_writer = create_append_file_writer(&output_path)?;
4646

47+
let mut collector = GeneralLogCollector::default();
48+
4749
// Flush the file writer every 500 milliseconds for users tailing the output
4850
// file when messages are receive in relative slow frequency.
4951
let mut flush_interval = tokio::time::interval(Duration::from_millis(500));
5052

5153
// Counters to keep track on the status of the session.
52-
let mut msg_count = 0;
5354
let mut reconnecting = false;
54-
let mut skipped_count = 0;
55-
let mut empty_count = 0;
56-
let mut incomplete_count = 0;
5755

5856
// Keep track how many message has been received since the last flush.
5957
let mut msg_since_last_flush = 0;
6058

59+
let write_sum = |p: &mut MessageProducer<_, _, _>| {
60+
super::write_summary(
61+
p.total_produced_items(),
62+
p.total_loaded_bytes(),
63+
p.total_skipped_bytes(),
64+
);
65+
};
66+
6167
loop {
6268
tokio::select! {
6369
_ = cancel_token.cancelled() => {
6470
file_writer.flush().context("Error writing data to file.")?;
65-
super::write_summary(msg_count, skipped_count, empty_count, incomplete_count);
71+
write_sum(&mut producer);
6672

6773
return Ok(());
6874
}
@@ -98,36 +104,30 @@ where
98104
}
99105
_ = update_interval.tick() => {
100106
if !reconnecting {
107+
let msg_count = producer.total_produced_items();
101108
println!("Processing... {msg_count} messages have been written to file.");
102109
}
103110
}
104-
Some(items) = producer.read_next_segment() => {
105-
for (_, item) in items {
106-
match item {
107-
parsers::MessageStreamItem::Item(parse_yield) => {
108-
let msg = match parse_yield {
109-
parsers::ParseYield::Message(msg) => msg,
110-
parsers::ParseYield::Attachment(_attachment) => {
111-
// attachment are postponed for now.
112-
continue;
113-
}
114-
parsers::ParseYield::MessageAndAttachment((msg, _attachment)) => msg,
111+
res = producer.produce_next(&mut collector) => {
112+
let summary = res?;
113+
114+
match summary {
115+
ProduceSummary::Processed {..} => {
116+
for record in collector.get_records() {
117+
let msg = match record {
118+
ParseYield::Message(msg) => msg,
119+
ParseYield::Attachment(..) => continue,
120+
ParseYield::MessageAndAttachment((msg, _att)) => msg,
115121
};
116122
msg_formatter.write_msg(&mut file_writer, msg)?;
117123
msg_since_last_flush += 1;
118-
119-
msg_count += 1;
120124
}
121-
parsers::MessageStreamItem::Skipped => skipped_count += 1,
122-
parsers::MessageStreamItem::Incomplete => incomplete_count += 1,
123-
parsers::MessageStreamItem::Empty => empty_count += 1,
124-
parsers::MessageStreamItem::Done => {
125-
println!("Parsing Done");
126-
super::write_summary(msg_count, skipped_count, empty_count, incomplete_count);
127-
128-
return Ok(());
129-
}
130-
}
125+
},
126+
// No tailing support for streams.
127+
ProduceSummary::NoBytesAvailable {..} | ProduceSummary::Done {..} => {
128+
write_sum(&mut producer);
129+
return Ok(());
130+
},
131131
}
132132
}
133133
};

0 commit comments

Comments
 (0)