Skip to content

Commit 4d6efc6

Browse files
committed
RSCBC-249: move close-watcher setup into successful connection path
Move the on_close channel and close-watcher task in begin_client_build from being created once before the retry loop to being created inside the success path of each retry iteration. Previously, if a connection attempt failed during bootstrap (e.g. auth or bucket select failure), StdKvClient::new would call kv_cli.close(), which fired the on_close_tx signal. This prematurely woke the close-watcher task, which saw no client in state and exited permanently. When the retry loop eventually succeeded and that client later disconnected, no close-watcher existed to detect it and trigger reconnection. Now each retry iteration creates its own channel and client ID, and the close-watcher is only spawned after a successful connection. Failed attempts' channels are simply dropped with no side effects.
1 parent 7725524 commit 4d6efc6

File tree

1 file changed

+54
-49
lines changed

1 file changed

+54
-49
lines changed

sdk/couchbase-core/src/kvclient_babysitter.rs

Lines changed: 54 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -226,17 +226,13 @@ impl<K: KvClient + 'static> StdKvClientBabysitter<K> {
226226

227227
fn begin_client_build(client_opts: Arc<ClientThreadOptions<K>>) {
228228
let state = client_opts.slow_state.clone();
229-
let client_id = Uuid::new_v4().to_string();
230229

231230
let desired_config = {
232231
let guard = state.lock().unwrap();
233232

234233
guard.desired_config.clone()
235234
};
236235

237-
let on_close_opts = client_opts.clone();
238-
let (on_close_tx, mut on_close_rx) = mpsc::channel(1);
239-
240236
let opts = KvClientOptions {
241237
address: desired_config.target.clone(),
242238
authenticator: desired_config.auth.clone(),
@@ -251,55 +247,12 @@ impl<K: KvClient + 'static> StdKvClientBabysitter<K> {
251247
.unsolicited_packet_tx
252248
.clone(),
253249
orphan_handler: client_opts.static_kv_client_options.orphan_handler.clone(),
254-
on_close_tx: Some(on_close_tx),
250+
on_close_tx: None,
255251
disable_decompression: client_opts.static_kv_client_options.disable_decompression,
256-
id: client_id.clone(),
257252
tracing: client_opts.tracing.clone(),
253+
id: String::new(),
258254
};
259255

260-
tokio::spawn(async move {
261-
select! {
262-
_ = on_close_opts.shutdown_token.cancelled() => {
263-
debug!("Client babysitter {} shutdown during on_close wait", &on_close_opts.id);
264-
return;
265-
}
266-
_ = on_close_rx.recv() => {
267-
debug!("Client babysitter {} detected client {} closed", &on_close_opts.id, &client_id);
268-
}
269-
};
270-
271-
{
272-
let mut guard = on_close_opts.slow_state.lock().unwrap();
273-
guard.is_building = false;
274-
if let Some(cli) = &guard.client {
275-
if cli.id() != client_id {
276-
return;
277-
}
278-
} else {
279-
return;
280-
}
281-
282-
guard.client = None;
283-
on_close_opts
284-
.fast_client
285-
.store(Arc::new(StdKvClientBabysitterClientState { client: None }));
286-
}
287-
288-
if let Err(e) = on_close_opts
289-
.state_change_handler
290-
.send((on_close_opts.id.clone(), None))
291-
{
292-
debug!(
293-
"Client babysitter {} failed to notify of closed client {}: {}",
294-
&on_close_opts.id, &client_id, e
295-
);
296-
}
297-
298-
if !on_close_opts.on_demand_connect {
299-
Self::maybe_begin_client(on_close_opts.clone());
300-
}
301-
});
302-
303256
tokio::spawn(async move {
304257
loop {
305258
let connect_err = {
@@ -322,6 +275,9 @@ impl<K: KvClient + 'static> StdKvClientBabysitter<K> {
322275
return;
323276
};
324277

278+
let client_id = Uuid::new_v4().to_string();
279+
let (on_close_tx, mut on_close_rx) = mpsc::channel(1);
280+
325281
let opts = {
326282
let mut guard = state.lock().unwrap();
327283
guard.current_state = ConnectionState::Connecting;
@@ -330,6 +286,8 @@ impl<K: KvClient + 'static> StdKvClientBabysitter<K> {
330286
opts.authenticator = guard.desired_config.auth.clone();
331287
opts.address = guard.desired_config.target.clone();
332288
opts.selected_bucket = guard.desired_config.selected_bucket.clone();
289+
opts.on_close_tx = Some(on_close_tx);
290+
opts.id = client_id.clone();
333291

334292
opts
335293
};
@@ -383,6 +341,53 @@ impl<K: KvClient + 'static> StdKvClientBabysitter<K> {
383341
);
384342
}
385343

344+
// Spawn the close-watcher only after a successful connection.
345+
// This prevents failed bootstrap attempts from prematurely
346+
// consuming the close signal and killing the watcher.
347+
let on_close_opts = client_opts.clone();
348+
tokio::spawn(async move {
349+
select! {
350+
_ = on_close_opts.shutdown_token.cancelled() => {
351+
debug!("Client babysitter {} shutdown during on_close wait", &on_close_opts.id);
352+
return;
353+
}
354+
_ = on_close_rx.recv() => {
355+
debug!("Client babysitter {} detected client {} closed", &on_close_opts.id, &client_id);
356+
}
357+
};
358+
359+
{
360+
let mut guard = on_close_opts.slow_state.lock().unwrap();
361+
guard.is_building = false;
362+
if let Some(cli) = &guard.client {
363+
if cli.id() != client_id {
364+
return;
365+
}
366+
} else {
367+
return;
368+
}
369+
370+
guard.client = None;
371+
on_close_opts.fast_client.store(Arc::new(
372+
StdKvClientBabysitterClientState { client: None },
373+
));
374+
}
375+
376+
if let Err(e) = on_close_opts
377+
.state_change_handler
378+
.send((on_close_opts.id.clone(), None))
379+
{
380+
debug!(
381+
"Client babysitter {} failed to notify of closed client {}: {}",
382+
&on_close_opts.id, &client_id, e
383+
);
384+
}
385+
386+
if !on_close_opts.on_demand_connect {
387+
Self::maybe_begin_client(on_close_opts.clone());
388+
}
389+
});
390+
386391
return;
387392
}
388393
Err(e) => {

0 commit comments

Comments
 (0)