Skip to content

Commit 7ce043c

Browse files
Avoid use-after-free with stdexec::run_loop (#1746)
* Avoid use-after-free with stdexec::run_loop We need to synchronize returning from `__run_loop_base::run` with potentially concurrent calls to `__run_loop_base::finish`. This is done by introducing a counter, ensuring proper completion of all tasks in flight. Also see #1742 for additional information. * Fixing use after free - Properly synchronizing `finish` with `run` with task counts - Adding sync_wait relacy test for verification - adapting atomic wrappers to account for missing std::atomic_ref in relacy * Apply suggestions from code review --------- Co-authored-by: Eric Niebler <[email protected]>
1 parent 970dbac commit 7ce043c

File tree

4 files changed

+78
-7
lines changed

4 files changed

+78
-7
lines changed

include/stdexec/__detail/__atomic.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ namespace stdexec::__std {
5858
using std::atomic_thread_fence;
5959
using std::atomic_signal_fence;
6060

61-
# if __cpp_lib_atomic_ref >= 2018'06L
61+
# if __cpp_lib_atomic_ref >= 2018'06L && !defined(STDEXEC_RELACY)
6262
using std::atomic_ref;
6363
# else
6464
inline constexpr int __atomic_flag_map[] = {

include/stdexec/__detail/__run_loop.hpp

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
#include "__schedulers.hpp"
2727

2828
#include "__atomic.hpp"
29+
#include "__config.hpp"
30+
#include <cstddef>
2931

3032
namespace stdexec {
3133
/////////////////////////////////////////////////////////////////////////////
@@ -34,24 +36,40 @@ namespace stdexec {
3436
public:
3537
__run_loop_base() = default;
3638

39+
~__run_loop_base() noexcept {
40+
STDEXEC_ASSERT(__task_count_.load(__std::memory_order_acquire) == 0);
41+
}
42+
3743
STDEXEC_ATTRIBUTE(host, device) void run() noexcept {
3844
// execute work items until the __finishing_ flag is set:
3945
while (!__finishing_.load(__std::memory_order_acquire)) {
4046
__queue_.wait_for_item();
4147
__execute_all();
4248
}
4349
// drain the queue, taking care to execute any tasks that get added while
44-
// executing the remaining tasks:
45-
while (__execute_all())
50+
// executing the remaining tasks (also wait for other tasks that might still be in flight):
51+
while (__execute_all() || __task_count_.load(__std::memory_order_acquire) > 0)
4652
;
4753
}
4854

4955
STDEXEC_ATTRIBUTE(host, device) void finish() noexcept {
56+
// Increment our task count to avoid lifetime issues. This is preventing
57+
// a use-after-free issue if finish is called from a different thread.
58+
// We increment the task counter by two to prevent the run loop from
59+
// exiting before we schedule the noop task.
60+
__task_count_.fetch_add(2, __std::memory_order_release);
5061
if (!__finishing_.exchange(true, __std::memory_order_acq_rel)) {
5162
// push an empty work item to the queue to wake up the consuming thread
52-
// and let it finish:
63+
// and let it finish.
64+
// The count will be decremented once the tasks executes.
5365
__queue_.push(&__noop_task);
66+
// If the task got pushed, simply subtract one again, the other decrement
67+
// happens when the noop task got executed.
68+
__task_count_.fetch_sub(1, __std::memory_order_release);
69+
return;
5470
}
71+
// We are done finishing. Decrement the count by two, which signals final completion.
72+
__task_count_.fetch_sub(2, __std::memory_order_release);
5573
}
5674

5775
struct __task : __immovable {
@@ -73,6 +91,7 @@ namespace stdexec {
7391

7492
template <class _Rcvr>
7593
struct __opstate_t : __task {
94+
__std::atomic<std::size_t>* __task_count_;
7695
__atomic_intrusive_queue<&__task::__next_>* __queue_;
7796
_Rcvr __rcvr_;
7897

@@ -89,14 +108,17 @@ namespace stdexec {
89108

90109
STDEXEC_ATTRIBUTE(host, device)
91110
constexpr explicit __opstate_t(
111+
__std::atomic<std::size_t>* __task_count,
92112
__atomic_intrusive_queue<&__task::__next_>* __queue,
93113
_Rcvr __rcvr)
94114
: __task{&__execute_impl}
115+
, __task_count_(__task_count)
95116
, __queue_{__queue}
96117
, __rcvr_{static_cast<_Rcvr&&>(__rcvr)} {
97118
}
98119

99120
STDEXEC_ATTRIBUTE(host, device) constexpr void start() noexcept {
121+
__task_count_->fetch_add(1, __std::memory_order_release);
100122
__queue_->push(this);
101123
}
102124
};
@@ -112,20 +134,25 @@ namespace stdexec {
112134
return false; // No tasks to execute.
113135
}
114136

137+
std::size_t __task_count = 0;
138+
115139
do {
116140
// Take care to increment the iterator before executing the task,
117141
// because __execute() may invalidate the current node.
118142
auto __prev = __it++;
119143
(*__prev)->__execute();
144+
++__task_count;
120145
} while (__it != __queue.end());
121146

122147
__queue.clear();
148+
__task_count_.fetch_sub(__task_count, __std::memory_order_release);
123149
return true;
124150
}
125151

126152
STDEXEC_ATTRIBUTE(host, device) static void __noop_(__task*) noexcept {
127153
}
128154

155+
__std::atomic<std::size_t> __task_count_{0};
129156
__std::atomic<bool> __finishing_{false};
130157
__atomic_intrusive_queue<&__task::__next_> __queue_{};
131158
__task __noop_task{&__noop_};
@@ -186,7 +213,7 @@ namespace stdexec {
186213
template <class _Rcvr>
187214
STDEXEC_ATTRIBUTE(nodiscard, host, device)
188215
constexpr auto connect(_Rcvr __rcvr) const noexcept -> __opstate_t<_Rcvr> {
189-
return __opstate_t<_Rcvr>{&__loop_->__queue_, static_cast<_Rcvr&&>(__rcvr)};
216+
return __opstate_t<_Rcvr>{&__loop_->__task_count_, &__loop_->__queue_, static_cast<_Rcvr&&>(__rcvr)};
190217
}
191218

192219
STDEXEC_ATTRIBUTE(nodiscard, host, device)

test/rrd/Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
# User-customizable variables:
22
CXX ?= c++
33
CXX_STD ?= c++20
4-
CXXFLAGS ?= -I relacy -I relacy/relacy/fakestd -O1 -std=$(CXX_STD) -I ../../include -I ../../test -g
4+
CXXFLAGS ?= -DSTDEXEC_RELACY -I relacy -I relacy/relacy/fakestd -O1 -std=$(CXX_STD) -I ../../include -I ../../test -g
55
DEPFLAGS ?= -MD -MF $(@).d -MP -MT $(@)
66
build_dir = build
77

88
.SECONDARY:
99

10-
test_programs = split async_scope
10+
test_programs = split async_scope sync_wait
1111

1212
test_exe_files = $(foreach name,$(test_programs),$(build_dir)/$(name))
1313

test/rrd/sync_wait.cpp

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright (c) 2025 NVIDIA Corporation
3+
* Copyright (c) 2025 Chris Cotter
4+
*
5+
* Licensed under the Apache License Version 2.0 with LLVM Exceptions
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* https://llvm.org/LICENSE.txt
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
#include "../../relacy/relacy_std.hpp"
19+
20+
#include <stdexec/execution.hpp>
21+
#include <exec/static_thread_pool.hpp>
22+
23+
namespace ex = stdexec;
24+
25+
struct sync_wait_bg_thread : rl::test_suite<sync_wait_bg_thread, 1> {
26+
static size_t const dynamic_thread_count = 1;
27+
28+
void thread(unsigned) {
29+
exec::static_thread_pool pool{1};
30+
auto sender = ex::schedule(pool.get_scheduler()) | ex::then([] { return 42; });
31+
32+
auto [val] = ex::sync_wait(sender).value();
33+
RL_ASSERT(val == 42);
34+
}
35+
};
36+
37+
auto main() -> int {
38+
rl::test_params p;
39+
p.iteration_count = 50000;
40+
p.execution_depth_limit = 10000;
41+
p.search_type = rl::random_scheduler_type;
42+
rl::simulate<sync_wait_bg_thread>(p);
43+
return 0;
44+
}

0 commit comments

Comments
 (0)