Skip to content

Commit f035300

Browse files
AmmarAbouZormarcmo
authored andcommitted
Chipmunk CLI: Use watch channels for reconnecting state.
* Watch channels are more suited for follow the state of reconnecting since only the current state is the most important one besides that the reconnecting process must not be blocked by delivering its state * Change the types of the messages to make them more expressive. * Indexer: Rename variable in TCP source tests.
1 parent 9647786 commit f035300

File tree

4 files changed

+46
-35
lines changed

4 files changed

+46
-35
lines changed

application/apps/indexer/sources/src/socket/mod.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use std::time::Duration;
2-
use tokio::sync::mpsc::UnboundedSender;
2+
use tokio::sync::watch::Sender;
33

44
pub mod tcp;
55
pub mod udp;
@@ -19,14 +19,14 @@ pub struct ReconnectInfo {
1919
/// The time interval between each try to connect to server.
2020
internval: Duration,
2121
/// Channel to send information of the state of reconnecting progress.
22-
state_sender: Option<UnboundedSender<ReconnectStateMsg>>,
22+
state_sender: Option<Sender<ReconnectStateMsg>>,
2323
}
2424

2525
impl ReconnectInfo {
2626
pub fn new(
2727
max_attempts: usize,
2828
internval: Duration,
29-
state_sender: Option<UnboundedSender<ReconnectStateMsg>>,
29+
state_sender: Option<Sender<ReconnectStateMsg>>,
3030
) -> Self {
3131
Self {
3232
max_attempts,
@@ -56,7 +56,12 @@ enum ReconnectResult {
5656
#[derive(Debug, Clone)]
5757
/// Represent the information of the state of the reconnection progress.
5858
pub enum ReconnectStateMsg {
59-
Reconnecting,
6059
Connected,
61-
StateMsg(String),
60+
Reconnecting {
61+
attempts: usize,
62+
},
63+
Failed {
64+
attempts: usize,
65+
err_msg: Option<String>,
66+
},
6267
}

application/apps/indexer/sources/src/socket/tcp.rs

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -40,20 +40,17 @@ impl ReconnectToServer for TcpSource {
4040
};
4141

4242
if let Some(sender) = &reconnect_info.state_sender {
43-
if let Err(err) = sender.send(ReconnectStateMsg::Reconnecting) {
44-
log::error!("Failed to send reconnnecting state with err: {err}");
45-
}
43+
sender.send_replace(ReconnectStateMsg::Reconnecting { attempts: 0 });
44+
// Give receivers a chance to get the initial reconnecting state before sending
45+
// the first attempt update.
46+
yield_now().await;
4647
}
4748

4849
let mut attempts = 0;
4950
loop {
5051
attempts += 1;
5152
if let Some(sender) = &reconnect_info.state_sender {
52-
if let Err(err) = sender.send(ReconnectStateMsg::StateMsg(format!(
53-
"Reconnecting to TCP server. Attempt: {attempts}"
54-
))) {
55-
log::error!("Failed to send state msg with err: {err}");
56-
}
53+
sender.send_replace(ReconnectStateMsg::Reconnecting { attempts });
5754
}
5855
log::info!("Reconnecting to TCP server. Attempt: {attempts}");
5956
tokio::time::sleep(reconnect_info.internval).await;
@@ -73,11 +70,10 @@ impl ReconnectToServer for TcpSource {
7370
log::debug!("Got following error while trying to reconnect: {err}");
7471
if attempts >= reconnect_info.max_attempts {
7572
if let Some(sender) = &reconnect_info.state_sender {
76-
if let Err(err) = sender.send(ReconnectStateMsg::StateMsg(format!(
77-
"Reconnecting to TCP server failed after {attempts} attemps."
78-
))) {
79-
log::error!("Failed to send state msg with err: {err}");
80-
}
73+
sender.send_replace(ReconnectStateMsg::Failed {
74+
attempts,
75+
err_msg: Some(err.to_string()),
76+
});
8177
// Make sure the message has been sent before returning.
8278
yield_now().await;
8379
}
@@ -193,16 +189,16 @@ mod tests {
193189
sleep(Duration::from_millis(100)).await;
194190
}
195191
});
196-
let mut udp_source = TcpSource::new(SERVER, None).await?;
192+
let mut tcp_source = TcpSource::new(SERVER, None).await?;
197193
let receive_handle = tokio::spawn(async move {
198194
for msg in MESSAGES {
199-
udp_source.load(None).await.expect("reload failed");
195+
tcp_source.load(None).await.expect("reload failed");
200196
println!(
201197
"receive: {:02X?}",
202-
std::str::from_utf8(udp_source.current_slice())
198+
std::str::from_utf8(tcp_source.current_slice())
203199
);
204-
assert_eq!(udp_source.current_slice(), msg.as_bytes());
205-
udp_source.consume(msg.len());
200+
assert_eq!(tcp_source.current_slice(), msg.as_bytes());
201+
tcp_source.consume(msg.len());
206202
}
207203
});
208204

cli/chipmunk-cli/src/session/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use tokio_util::sync::CancellationToken;
1212
use parsers::LogMessage;
1313
use sources::{
1414
binary::raw::BinaryByteSource,
15-
socket::{tcp::TcpSource, udp::UdpSource, ReconnectInfo},
15+
socket::{tcp::TcpSource, udp::UdpSource, ReconnectInfo, ReconnectStateMsg},
1616
};
1717

1818
use crate::cli_args::InputSource;
@@ -49,7 +49,8 @@ where
4949
max_reconnect_count,
5050
reconnect_interval,
5151
} => {
52-
let (state_tx, state_rx) = tokio::sync::mpsc::unbounded_channel();
52+
let (state_tx, state_rx) = tokio::sync::watch::channel(ReconnectStateMsg::Connected);
53+
5354
let reconnect = max_reconnect_count.and_then(|max| {
5455
// provide reconnect infos when max count exists and bigger than zero.
5556
(max > 0).then(|| {
@@ -79,7 +80,7 @@ where
7980
}
8081
InputSource::Udp { address } => {
8182
// UDP connections inherently support auto-connecting by design.
82-
let (_state_tx, state_rx) = tokio::sync::mpsc::unbounded_channel();
83+
let (_state_tx, state_rx) = tokio::sync::watch::channel(ReconnectStateMsg::Connected);
8384

8485
let source = UdpSource::new(address, Vec::new())
8586
.await

cli/chipmunk-cli/src/session/socket.rs

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22
33
use anyhow::Context;
44
use futures::StreamExt;
5-
use std::{io::Write as _, path::PathBuf, time::Duration};
6-
use tokio::sync::mpsc::UnboundedReceiver;
5+
use std::{io::Write as _, ops::Deref, path::PathBuf, time::Duration};
6+
use tokio::sync::watch;
77
use tokio_util::sync::CancellationToken;
88

99
use parsers::{LogMessage, Parser};
@@ -28,7 +28,7 @@ pub async fn run_session<T, P, D, W>(
2828
bytesource: D,
2929
output_path: PathBuf,
3030
mut msg_formatter: W,
31-
mut state_rc: UnboundedReceiver<ReconnectStateMsg>,
31+
mut state_rc: watch::Receiver<ReconnectStateMsg>,
3232
update_interval: Duration,
3333
cancel_token: CancellationToken,
3434
) -> anyhow::Result<()>
@@ -68,18 +68,27 @@ where
6868

6969
return Ok(());
7070
}
71-
Some(msg) = state_rc.recv() => {
72-
match msg {
73-
ReconnectStateMsg::Reconnecting => {
71+
Ok(_) = state_rc.changed() => {
72+
let msg = state_rc.borrow_and_update();
73+
match msg.deref() {
74+
ReconnectStateMsg::Reconnecting { attempts } => {
7475
reconnecting = true;
75-
println!("Connection to server lost. Trying to reconnect...");
76+
if *attempts == 0 {
77+
println!("Connection to server lost. Trying to reconnect...");
78+
}else {
79+
println!("Reconnecting to TCP server. Attempt: {attempts}");
80+
}
7681
},
7782
ReconnectStateMsg::Connected => {
7883
reconnecting = false;
7984
println!("Connected to server");
8085
},
81-
ReconnectStateMsg::StateMsg(msg) => {
82-
println!("Reconnecting status: {msg}");
86+
ReconnectStateMsg::Failed{ attempts, err_msg } => {
87+
let mut msg = format!("Reconnecting to TCP server failed after {attempts} attempts.");
88+
if let Some(err_msg) = err_msg {
89+
msg = format!("{msg} Error: {err_msg}");
90+
}
91+
println!("{msg}");
8392
},
8493
}
8594
},

0 commit comments

Comments
 (0)