Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,7 @@ temp.data
*.bb
/temp
*~
CLAUDE.md
GEMINI.md
setup-ai.sh
.github/copilot-instructions.md
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ license = "MIT OR Apache-2.0"
name = "omnect-device-service"
readme = "README.md"
repository = "https://github.com/omnect/omnect-device-service.git"
version = "0.41.18"
version = "0.41.19"

[dependencies]
actix-server = { version = "2.6", default-features = false }
Expand Down
4 changes: 1 addition & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -619,9 +619,7 @@ The module reports the status of any attached modems. For this purpose the modul

### Network status

The network status is refreshed in an interval which can be configured by `REFRESH_NETWORK_STATUS_INTERVAL_SECS` environment variable. The default is 60s.

When reloading network configuration via the [reload network daemon](#reload-network-daemon) endpoint, the service waits for networkd to apply the new configuration before reporting status. The delay can be configured via `RELOAD_NETWORK_DELAY_MS` environment variable (default: 500ms).
The network status is updated event-driven by subscribing to D-Bus signals from `org.freedesktop.network1`. A 2-second debounce window coalesces bursts of signals (e.g. during DHCP negotiation) into a single status report.

**NOTE**: Currently reporting status of modems is no supported!

Expand Down
15 changes: 15 additions & 0 deletions src/systemd/networkd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,21 @@ pub async fn networkd_interfaces() -> Result<serde_json::Value> {
crate::common::from_json_file("testfiles/positive/systemd-networkd-link-description.json")
}

#[cfg(not(feature = "mock"))]
pub async fn networkd_signal_stream() -> Result<zbus::MessageStream> {
let conn = zbus::Connection::system()
.await
.context("networkd_signal_stream: zbus::Connection::system() failed")?;
let rule = zbus::MatchRule::builder()
.msg_type(zbus::message::Type::Signal)
.sender("org.freedesktop.network1")
.context("networkd_signal_stream: invalid sender name")?
.build();
zbus::MessageStream::for_match_rule(rule, &conn, Some(64))
.await
.context("networkd_signal_stream: for_match_rule() failed")
}

pub fn networkd_wait_online_timeout() -> Result<Option<Duration>> {
/*
we expect systemd-networkd-wait-online.service file to be present.
Expand Down
93 changes: 80 additions & 13 deletions src/twin/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use log::{debug, error, info, warn};
use serde::Serialize;
use serde_json::json;
use std::{env, time::Duration};
use tokio::{sync::mpsc::Sender, time::interval};
use tokio::sync::mpsc::Sender;

lazy_static! {
static ref REFRESH_NETWORK_STATUS_INTERVAL_SECS: u64 = {
Expand Down Expand Up @@ -51,6 +51,8 @@ pub struct Interface {
pub struct Network {
tx_reported_properties: Option<Sender<serde_json::Value>>,
interfaces: Vec<Interface>,
#[allow(dead_code)]
signal_task: Option<tokio::task::JoinHandle<()>>,
}

impl Feature for Network {
Expand All @@ -76,7 +78,83 @@ impl Feature for Network {
Ok(())
}

#[cfg(not(feature = "mock"))]
fn command_request_stream(&mut self) -> CommandRequestStreamResult {
use futures::StreamExt as _;
use tokio_stream::wrappers::ReceiverStream;

if !self.is_enabled() {
return Ok(None);
}

let (tx, rx) = tokio::sync::mpsc::channel::<CommandRequest>(4);

let handle = tokio::spawn(async move {
use std::any::TypeId;

let mut stream = match networkd::networkd_signal_stream().await {
Ok(s) => s,
Err(e) => {
error!("signal task: failed to start: {e:#}");
return;
}
};

// Coalesce bursts of signals (e.g. during DHCP negotiation) into a
// single report by waiting for DEBOUNCE silence after the last signal.
const DEBOUNCE: Duration = Duration::from_secs(2);
// select! futures are evaluated unconditionally even when their guard
// is false, so a placeholder far-future instant stands in for "no
// deadline pending" without a separate branch.
const FAR_FUTURE: Duration = Duration::from_secs(3600);
let mut debounce_deadline: Option<tokio::time::Instant> = None;

loop {
let sleep_until =
debounce_deadline.unwrap_or_else(|| tokio::time::Instant::now() + FAR_FUTURE);

tokio::select! {
biased;

msg = stream.next() => match msg {
Some(Ok(_)) => {
debug!("signal received, (re)arming debounce");
debounce_deadline = Some(tokio::time::Instant::now() + DEBOUNCE);
}
Some(Err(e)) => error!("signal stream error: {e:#}"),
None => {
warn!("signal stream ended unexpectedly");
return;
}
},

_ = tokio::time::sleep_until(sleep_until), if debounce_deadline.is_some() => {
debounce_deadline = None;
debug!("debounce elapsed, triggering report");
let req = CommandRequest {
command: Command::Interval(IntervalCommand {
feature_id: TypeId::of::<Network>(),
instant: tokio::time::Instant::now(),
}),
reply: None,
};
if tx.send(req).await.is_err() {
debug!("signal task: receiver dropped, stopping");
return;
}
}
}
}
});

self.signal_task = Some(handle);
Ok(Some(ReceiverStream::new(rx).boxed()))
}

#[cfg(feature = "mock")]
fn command_request_stream(&mut self) -> CommandRequestStreamResult {
use tokio::time::interval;

if !self.is_enabled() || 0 == *REFRESH_NETWORK_STATUS_INTERVAL_SECS {
Ok(None)
} else {
Expand All @@ -88,29 +166,18 @@ impl Feature for Network {

async fn command(&mut self, cmd: &Command) -> CommandResult {
match cmd {
Command::Interval(_) => {}
Command::Interval(_) => self.report(false).await?,
Command::ReloadNetwork => {
unit::unit_action(
NETWORK_SERVICE,
unit::UnitAction::Reload,
systemd_zbus::Mode::Fail,
)
.await?;

// Wait for networkd to apply configuration after reload.
// The reload job completion only means networkd received the signal,
// not that it finished applying the new configuration internally.
let delay_ms = env::var("RELOAD_NETWORK_DELAY_MS")
.unwrap_or("500".to_string())
.parse::<u64>()
.unwrap_or(500);
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
}
_ => bail!("unexpected command"),
}

self.report(false).await?;

Ok(None)
}
}
Expand Down
Loading