Skip to content

Commit 5da6ef2

Browse files
committed
add finish mechanism
1 parent cf16104 commit 5da6ef2

File tree

1 file changed

+73
-8
lines changed

1 file changed

+73
-8
lines changed

src/file/uploadfile.rs

Lines changed: 73 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ use tokio_util::codec::{BytesCodec, FramedRead};
2020
use variantly::Variantly;
2121

2222
use crate::prelude::CallbackFun;
23+
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
24+
use std::sync::Arc;
2325

2426
lazy_static! {
2527
static ref BUFFER_SIZE: usize = std::env::var("DVCLI_BUFFER_SIZE")
@@ -212,19 +214,82 @@ impl UploadFile {
212214
) -> Result<Body, Box<dyn Error>> {
213215
let (tx, rx) = tokio::sync::mpsc::channel(100);
214216

215-
// Spawn background task to handle only the first connection
217+
// Spawn another unix listener, that when called will introduce a cancellation
218+
let finish_address = format!(
219+
"{}.cancel",
220+
listener
221+
.local_addr()
222+
.unwrap()
223+
.as_pathname()
224+
.unwrap()
225+
.to_string_lossy()
226+
);
227+
let finish_listener = UnixListener::bind(finish_address).unwrap();
228+
229+
// Shared state for cancellation and connection tracking
230+
let should_cancel = Arc::new(AtomicBool::new(false));
231+
let active_connections = Arc::new(AtomicUsize::new(0));
232+
233+
// Clone references for the background task
234+
let should_cancel_clone = Arc::clone(&should_cancel);
235+
let active_connections_clone = Arc::clone(&active_connections);
236+
237+
// Spawn background task to handle connections concurrently
216238
tokio::spawn(async move {
217239
let mut incoming = UnixListenerStream::new(listener);
240+
let mut finish_incoming = UnixListenerStream::new(finish_listener);
241+
242+
loop {
243+
tokio::select! {
244+
// Handle regular connections
245+
conn_result = incoming.next() => {
246+
match conn_result {
247+
Some(Ok(stream)) => {
248+
// Check if we should accept new connections
249+
if should_cancel_clone.load(Ordering::Relaxed) {
250+
continue; // Don't accept new connections after cancellation
251+
}
252+
253+
let tx_clone = tx.clone();
254+
let active_connections_clone2 = Arc::clone(&active_connections_clone);
255+
256+
// Increment active connection count
257+
active_connections_clone2.fetch_add(1, Ordering::Relaxed);
258+
259+
// Spawn a separate task for each connection
260+
tokio::spawn(async move {
261+
Self::connection_streamer(stream, tx_clone).await;
262+
// Decrement when connection ends
263+
active_connections_clone2.fetch_sub(1, Ordering::Relaxed);
264+
});
265+
}
266+
Some(Err(_)) => continue, // Skip failed connections
267+
None => break, // No more incoming connections
268+
}
269+
}
218270

219-
// Wait for the first connection only
220-
if let Some(conn_result) = incoming.next().await {
221-
if let Ok(stream) = conn_result {
222-
// Handle the first connection directly and wait for it to complete
223-
Self::connection_streamer(stream, tx.clone()).await;
271+
// Handle finish listener connections
272+
finish_result = finish_incoming.next() => {
273+
match finish_result {
274+
Some(Ok(_)) => {
275+
// Set cancellation flag
276+
should_cancel_clone.store(true, Ordering::Relaxed);
277+
278+
// Wait for all active connections to finish
279+
while active_connections_clone.load(Ordering::Relaxed) > 0 {
280+
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
281+
}
282+
283+
// Drop the tx to signal end of stream
284+
drop(tx);
285+
break;
286+
}
287+
Some(Err(_)) => continue,
288+
None => break,
289+
}
290+
}
224291
}
225292
}
226-
// After the first connection completes, drop tx to signal EOF
227-
drop(tx);
228293
});
229294

230295
// Reuse existing receiver stream logic

0 commit comments

Comments
 (0)