Skip to content

Commit 380da74

Browse files
committed
comments
1 parent 52a06a4 commit 380da74

File tree

1 file changed

+11
-16
lines changed

1 file changed

+11
-16
lines changed

datafusion/physical-plan/src/spill/spill_pool.rs

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -91,35 +91,31 @@ impl SpillPoolShared {
9191

9292
/// Tracks the number of live [`SpillPoolWriter`] clones.
9393
///
94-
/// Cloning increments the count; dropping decrements it.
95-
/// [`WriterCount::is_last`] returns `true` when called from the final clone,
96-
/// which the writer uses to decide whether to finalize the spill pool.
94+
/// Cloning increments the count. [`WriterCount::decrement`] atomically
95+
/// decrements the count and reports whether the caller was the last clone.
9796
struct WriterCount(Arc<AtomicUsize>);
9897

9998
impl WriterCount {
10099
fn new() -> Self {
101100
Self(Arc::new(AtomicUsize::new(1)))
102101
}
103102

104-
/// Returns `true` if this is the only remaining clone.
105-
fn is_last(&self) -> bool {
106-
self.0.load(Ordering::Acquire) == 1
103+
/// Decrements the count and returns `true` if this was the last clone.
104+
///
105+
/// This is a single atomic operation, so concurrent drops cannot both
106+
/// observe themselves as "last".
107+
fn decrement(&self) -> bool {
108+
self.0.fetch_sub(1, Ordering::SeqCst) == 1
107109
}
108110
}
109111

110112
impl Clone for WriterCount {
111113
fn clone(&self) -> Self {
112-
self.0.fetch_add(1, Ordering::Relaxed);
114+
self.0.fetch_add(1, Ordering::SeqCst);
113115
Self(Arc::clone(&self.0))
114116
}
115117
}
116118

117-
impl Drop for WriterCount {
118-
fn drop(&mut self) {
119-
self.0.fetch_sub(1, Ordering::Release);
120-
}
121-
}
122-
123119
/// Writer for a spill pool. Provides coordinated write access with FIFO semantics.
124120
///
125121
/// Created by [`channel`]. See that function for architecture diagrams and usage examples.
@@ -266,10 +262,9 @@ impl SpillPoolWriter {
266262

267263
impl Drop for SpillPoolWriter {
268264
fn drop(&mut self) {
269-
if !self.writer_count.is_last() {
265+
if !self.writer_count.decrement() {
270266
// Other writer clones are still active; do not finalize or
271-
// signal EOF to readers. `self.writer_count` is decremented
272-
// automatically when this `Drop` returns.
267+
// signal EOF to readers.
273268
return;
274269
}
275270

0 commit comments

Comments
 (0)