Skip to content

Commit afa0d59

Browse files
committed
tests to check runtime state destruction
1 parent fd2c4b9 commit afa0d59

File tree

5 files changed

+149
-52
lines changed

5 files changed

+149
-52
lines changed

zenoh/src/api/session.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -731,6 +731,12 @@ impl Session {
731731
inner: ManuallyDrop::new(Session(self.0.clone())),
732732
}
733733
}
734+
735+
#[cfg(feature = "test")]
736+
#[allow(dead_code)]
737+
pub(crate) fn inner_weak(&self) -> std::sync::Weak<SessionInner> {
738+
Arc::downgrade(&self.0)
739+
}
734740
}
735741

736742
impl Clone for Session {

zenoh/src/net/protocol/network.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ use crate::net::{
4242
common::AutoConnect,
4343
protocol::linkstate::{LinkEdgeWeight, LinkState, LinkStateList, LocalLinkState},
4444
routing::dispatcher::tables::NodeId,
45-
runtime::Runtime,
45+
runtime::{Runtime, WeakRuntime},
4646
};
4747

4848
#[derive(Clone, Default)]
@@ -133,7 +133,7 @@ pub(crate) struct Network {
133133
pub(crate) trees: Vec<Tree>,
134134
pub(crate) distances: Vec<f64>,
135135
pub(crate) graph: petgraph::stable_graph::StableUnGraph<Node, f64>,
136-
pub(crate) runtime: Runtime,
136+
pub(crate) runtime: WeakRuntime,
137137
pub(crate) link_weights: HashMap<ZenohIdProto, LinkEdgeWeight>,
138138
}
139139

@@ -178,7 +178,7 @@ impl Network {
178178
}],
179179
distances: vec![0.0],
180180
graph,
181-
runtime,
181+
runtime: Runtime::downgrade(&runtime),
182182
link_weights,
183183
}
184184
}
@@ -337,7 +337,7 @@ impl Network {
337337
whatami: self.graph[idx].whatami,
338338
locators: if details.locators {
339339
if idx == self.idx {
340-
Some(self.runtime.get_locators())
340+
Some(self.runtime.upgrade().unwrap().get_locators())
341341
} else {
342342
self.graph[idx].locators.clone()
343343
}
@@ -617,8 +617,8 @@ impl Network {
617617
}
618618

619619
fn connect_discovered_peer(&self, zid: ZenohIdProto, locators: Vec<Locator>) {
620-
let runtime = self.runtime.clone();
621-
self.runtime.spawn(async move {
620+
let runtime = self.runtime.upgrade().unwrap();
621+
self.runtime.upgrade().unwrap().spawn(async move {
622622
if runtime
623623
.manager()
624624
.get_transport_unicast(&zid)

zenoh/src/net/runtime/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -798,6 +798,7 @@ impl Runtime {
798798
}
799799

800800
#[cfg(feature = "test")]
801+
#[allow(dead_code)]
801802
pub(crate) fn state_weak(&self) -> Weak<RuntimeState> {
802803
Arc::downgrade(&self.state)
803804
}

zenoh/src/net/tests/runtime.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,10 @@
1313
//
1414
#[cfg(feature = "test")]
1515
mod tests {
16-
use crate::net::runtime::RuntimeBuilder;
17-
use crate::net::runtime::Runtime;
18-
use crate::api::config::Config;
16+
use crate::{
17+
api::config::Config,
18+
net::runtime::{Runtime, RuntimeBuilder},
19+
};
1920

2021
// Helper to create a runtime that does not bind to any network endpoints.
2122
async fn create_runtime() -> Runtime {
@@ -45,7 +46,7 @@ mod tests {
4546
let runtime = create_runtime().await;
4647
let weak = runtime.state_weak();
4748
runtime.close().await.unwrap(); // Cancels all internal tasks that hold references.
48-
// The Runtime object still holds the strong reference, so the weak is still valid.
49+
// The Runtime object still holds the strong reference, so the weak is still valid.
4950
assert!(weak.upgrade().is_some());
5051
}
5152

@@ -59,4 +60,4 @@ mod tests {
5960
drop(runtime); // Releases the last strong reference held by the Runtime object.
6061
assert!(weak.upgrade().is_none());
6162
}
62-
}
63+
}

zenoh/src/tests/session.rs

Lines changed: 130 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,16 @@
1313
//
1414
#[cfg(all(feature = "unstable", feature = "internal", feature = "test"))]
1515
mod runtime_state_weak_tests {
16-
use crate::api::session::open;
17-
use crate::net::runtime::Runtime;
18-
use crate::{api::config::Config, Session};
19-
use std::time::Duration;
20-
use tokio::time::sleep;
16+
use test_case::test_matrix;
2117
use zenoh_config::{ModeDependentValue, WhatAmI, WhatAmIMatcher};
2218
use zenoh_link::EndPoint;
2319

20+
use crate::{
21+
api::{config::Config, session::open},
22+
net::runtime::Runtime,
23+
Session,
24+
};
25+
2426
// Helper to get the internal static runtime of a session (only present for non-plugin sessions)
2527
fn get_static_runtime(session: &Session) -> Option<&Runtime> {
2628
session.static_runtime()
@@ -30,67 +32,94 @@ mod runtime_state_weak_tests {
3032
mode: WhatAmI,
3133
listen_endpoints: Vec<EndPoint>,
3234
connect_endpoints: Vec<EndPoint>,
35+
gossip: bool,
36+
peer_mode: &str,
3337
) -> Session {
3438
let mut config = Config::default();
39+
let _ = config.set_mode(Some(mode)).unwrap();
40+
config.scouting.multicast.set_enabled(Some(false)).unwrap();
41+
3542
config.connect.endpoints.set(connect_endpoints).unwrap();
3643
config.listen.endpoints.set(listen_endpoints).unwrap();
37-
config.scouting.multicast.set_enabled(Some(false)).unwrap();
3844

3945
config
4046
.routing
4147
.peer
42-
.set_mode(Some("linkstate".to_string()))
43-
.unwrap();
44-
config
45-
.scouting
46-
.gossip
47-
.set_autoconnect(Some(ModeDependentValue::Unique(WhatAmIMatcher::empty())))
48+
.set_mode(Some(peer_mode.to_string()))
4849
.unwrap();
4950

50-
let _ = config.set_mode(Some(mode)).unwrap();
51+
config.scouting.gossip.set_enabled(Some(gossip)).unwrap();
52+
if gossip {
53+
config
54+
.scouting
55+
.gossip
56+
.set_autoconnect(Some(ModeDependentValue::Unique(WhatAmIMatcher::empty())))
57+
.unwrap();
58+
}
59+
5160
open(config).await.unwrap()
5261
}
5362

5463
// Helper to create a session with a specific mode and no network endpoints.
55-
async fn create_silent_session(mode: WhatAmI) -> Session {
56-
create_session(mode, vec![], vec![]).await
64+
async fn create_silent_session(mode: WhatAmI, gossip: bool, peer_mode: &str) -> Session {
65+
create_session(mode, vec![], vec![], gossip, peer_mode).await
5766
}
5867

59-
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
60-
async fn test_session_runtime_state_weak_alive_before_close() {
61-
let session = create_silent_session(WhatAmI::Client).await;
68+
#[test_matrix(
69+
[WhatAmI::Peer],
70+
[true, false],
71+
["peer_to_peer", "linkstate"]
72+
)]
73+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
74+
async fn alive_before_close(mode: WhatAmI, gossip: bool, peer_mode: &str) {
75+
let session = create_silent_session(mode, gossip, peer_mode).await;
6276
let runtime = get_static_runtime(&session).expect("runtime should be present");
6377
let weak = runtime.state_weak();
6478
assert!(weak.upgrade().is_some());
6579
}
6680

67-
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
68-
async fn test_session_runtime_state_weak_still_alive_after_close() {
69-
let session = create_silent_session(WhatAmI::Client).await;
81+
#[test_matrix(
82+
[WhatAmI::Peer],
83+
[true, false],
84+
["peer_to_peer", "linkstate"]
85+
)]
86+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
87+
async fn still_alive_after_close(mode: WhatAmI, gossip: bool, peer_mode: &str) {
88+
let session = create_silent_session(mode, gossip, peer_mode).await;
7089
let runtime = get_static_runtime(&session).unwrap();
7190
let weak = runtime.state_weak();
7291
session.close().await.unwrap();
7392
// The runtime is still held by the session (since session is not dropped yet)
7493
assert!(weak.upgrade().is_some());
7594
}
7695

77-
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
78-
async fn test_session_runtime_state_weak_dead_after_drop() {
79-
let session = create_silent_session(WhatAmI::Client).await;
96+
#[test_matrix(
97+
[WhatAmI::Peer],
98+
[true, false],
99+
["peer_to_peer", "linkstate"]
100+
)]
101+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
102+
async fn dead_after_drop(mode: WhatAmI, gossip: bool, peer_mode: &str) {
103+
let session = create_silent_session(mode, gossip, peer_mode).await;
80104
let runtime = get_static_runtime(&session).unwrap();
81105
let weak = runtime.state_weak();
82106
session.close().await.unwrap(); // ensure tasks are cancelled
83107
drop(session); // drop the session, releasing the last strong reference to the runtime
84108
assert!(weak.upgrade().is_none());
85109
}
86110

87-
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
88-
async fn test_multiple_sessions_runtime_state_weak_independent() {
89-
let session1 = create_silent_session(WhatAmI::Client).await;
111+
#[test_matrix(
112+
[WhatAmI::Peer],
113+
[true, false],
114+
["peer_to_peer", "linkstate"]
115+
)]
116+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
117+
async fn multiple_sessions_independent(mode: WhatAmI, gossip: bool, peer_mode: &str) {
118+
let session1 = create_silent_session(mode, gossip, peer_mode).await;
90119
let runtime1 = get_static_runtime(&session1).unwrap();
91120
let weak1 = runtime1.state_weak();
92121

93-
let session2 = create_silent_session(WhatAmI::Client).await;
122+
let session2 = create_silent_session(mode, gossip, peer_mode).await;
94123
let runtime2 = get_static_runtime(&session2).unwrap();
95124
let weak2 = runtime2.state_weak();
96125

@@ -100,44 +129,100 @@ mod runtime_state_weak_tests {
100129

101130
// Close session1 and drop it.
102131
session1.close().await.unwrap();
132+
133+
// Both weak references should STILL be valid.
134+
assert!(weak1.upgrade().is_some());
135+
assert!(weak2.upgrade().is_some());
136+
103137
drop(session1);
138+
104139
assert!(weak1.upgrade().is_none());
105140
assert!(weak2.upgrade().is_some());
106141

107142
// Close session2 and drop it.
108143
session2.close().await.unwrap();
144+
145+
assert!(weak1.upgrade().is_none());
146+
assert!(weak2.upgrade().is_some());
147+
109148
drop(session2);
149+
150+
assert!(weak1.upgrade().is_none());
110151
assert!(weak2.upgrade().is_none());
111152
}
112153

113154
// Helper to create a client and a peer that are connected.
114-
async fn create_clique(num: usize) -> Vec<Session> {
115-
let port = 7448; // fixed port for simplicity; adjust if needed
155+
async fn create_clique(
156+
num: usize,
157+
mode: WhatAmI,
158+
gossip: bool,
159+
peer_mode: &str,
160+
) -> Vec<Session> {
161+
let port_offset = calc_offset(mode, gossip, peer_mode);
162+
let port = 12450 + port_offset;
116163
let peer_endpoint = format!("tcp/127.0.0.1:{}", port);
117-
let main =
118-
create_session(WhatAmI::Peer, vec![peer_endpoint.parse().unwrap()], vec![]).await;
119-
sleep(Duration::from_millis(100)).await;
164+
let main = create_session(
165+
mode,
166+
vec![peer_endpoint.parse().unwrap()],
167+
vec![],
168+
gossip,
169+
peer_mode,
170+
)
171+
.await;
120172

121173
let mut result = vec![main];
122174

123175
for _ in 0..num {
124-
let client =
125-
create_session(WhatAmI::Peer, vec![], vec![peer_endpoint.parse().unwrap()]).await;
176+
let client = create_session(
177+
mode,
178+
vec![],
179+
vec![peer_endpoint.parse().unwrap()],
180+
gossip,
181+
peer_mode,
182+
)
183+
.await;
126184
result.push(client);
127185
}
128186

129-
sleep(Duration::from_millis(1000)).await;
130-
131187
result
132188
}
133189

134-
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
135-
async fn test_interconnected_sessions_runtime_state_weak() {
136-
let num_clients = 3;
137-
let sessions = create_clique(num_clients).await;
190+
fn calc_offset(mode: WhatAmI, gossip: bool, peer_mode: &str) -> u16 {
191+
let mode = match mode {
192+
WhatAmI::Router => 0,
193+
WhatAmI::Peer => 1,
194+
WhatAmI::Client => 2,
195+
};
196+
197+
let gossip = match gossip {
198+
true => 0,
199+
false => 1,
200+
};
201+
202+
let peer_mode = if peer_mode == "linkstate" {
203+
0
204+
} else if peer_mode == "peer_to_peer" {
205+
1
206+
} else {
207+
panic!("Unsupported");
208+
};
209+
210+
mode + gossip * 3 + peer_mode * 3 * 2
211+
}
212+
213+
#[test_matrix(
214+
[WhatAmI::Peer],
215+
[true, false],
216+
["peer_to_peer", "linkstate"]
217+
)]
218+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
219+
async fn dead_after_drop_many_times(mode: WhatAmI, gossip: bool, peer_mode: &str) {
220+
let num_clients = 10;
221+
let sessions = create_clique(num_clients, mode, gossip, peer_mode).await;
138222

139223
// All weak references should be alive initially.
140224
for session in &sessions {
225+
assert!(session.inner_weak().upgrade().is_some());
141226
let runtime = get_static_runtime(session).expect("runtime should be present");
142227
assert!(runtime.state_weak().upgrade().is_some());
143228
}
@@ -147,11 +232,15 @@ mod runtime_state_weak_tests {
147232
let runtime_state = get_static_runtime(&session)
148233
.expect("runtime should be present")
149234
.state_weak();
235+
let session_inner = session.inner_weak();
236+
237+
assert!(session_inner.upgrade().is_some());
150238
assert!(runtime_state.upgrade().is_some());
151239

152240
session.close().await.unwrap();
153241
drop(session);
154242

243+
assert!(session_inner.upgrade().is_none());
155244
assert!(runtime_state.upgrade().is_none());
156245
}
157246
}

0 commit comments

Comments
 (0)