Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 61 additions & 16 deletions include/async/schedulers/trigger_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <async/schedulers/requeue_policy.hpp>

#include <stdx/atomic.hpp>
#include <stdx/concepts.hpp>
#include <stdx/ct_string.hpp>
#include <stdx/intrusive_list.hpp>
#include <stdx/type_traits.hpp>
Expand Down Expand Up @@ -33,6 +34,42 @@ template <typename... Args> struct trigger_task {
}
};

namespace run_policy {
struct base {
template <typename M, typename... Args>
static auto run(auto &&q, auto &count, Args &&...args) {
auto &task = q.front();
conc::call_in_critical_section<M>([&]() {
q.pop_front();
task.pending = false;
});
task.run(std::forward<Args>(args)...);
--count;
}
};

struct one : base {
template <typename, typename M, typename... Args>
static auto run(auto &&tasks, auto &count, Args &&...args) {
decltype(auto) q =
requeue_policy::immediate::template get_queue<0, M>(tasks);
if (not std::empty(q)) {
base::run<M>(q, count, std::forward<Args>(args)...);
}
}
};

struct all : base {
template <typename RQP, typename M>
static auto run(auto &&tasks, auto &count, auto &&...args) {
decltype(auto) q = RQP::template get_queue<0, M>(tasks);
while (not std::empty(q)) {
base::run<M>(q, count, args...);
}
}
};
} // namespace run_policy

template <typename Name, typename... Args> struct trigger_manager {
using task_t = trigger_task<Args...>;

Expand Down Expand Up @@ -64,18 +101,12 @@ template <typename Name, typename... Args> struct trigger_manager {
});
}

template <typename RQP = requeue_policy::deferred>
auto run(Args const &...args) -> void {
decltype(auto) q = RQP::template get_queue<0, mutex>(tasks);
while (not std::empty(q)) {
auto &task = q.front();
conc::call_in_critical_section<mutex>([&]() {
q.pop_front();
task.pending = false;
});
task.run(args...);
--task_count;
}
template <typename RQP = requeue_policy::deferred,
typename RunPolicy = run_policy::all, typename... As>
auto run(As &&...args) -> void {
static_assert((... and std::same_as<Args, std::remove_cvref_t<As>>));
RunPolicy::template run<RQP, mutex>(tasks, task_count,
std::forward<As>(args)...);
}

[[nodiscard]] auto empty() const -> bool { return task_count == 0; }
Expand All @@ -85,15 +116,29 @@ template <typename Name, typename... Args>
inline auto triggers = trigger_manager<Name, Args...>{};

template <typename Name, typename RQP = requeue_policy::deferred,
typename... Args>
typename RunPolicy = run_policy::all, typename... Args>
auto run_triggers(Args &&...args) -> void {
triggers<Name, std::remove_cvref_t<Args>...>.template run<RQP>(
triggers<Name, std::remove_cvref_t<Args>...>.template run<RQP, RunPolicy>(
std::forward<Args>(args)...);
}

template <stdx::ct_string Name, typename RQP = requeue_policy::deferred,
typename... Args>
typename RunPolicy = run_policy::all, typename... Args>
auto run_triggers(Args &&...args) -> void {
run_triggers<stdx::cts_t<Name>, RQP>(std::forward<Args>(args)...);
run_triggers<stdx::cts_t<Name>, RQP, RunPolicy>(
std::forward<Args>(args)...);
}

template <typename Name, typename RQP = requeue_policy::deferred,
typename... Args>
auto run_one_trigger(Args &&...args) -> void {
run_triggers<Name, RQP, run_policy::one>(std::forward<Args>(args)...);
}

template <stdx::ct_string Name, typename RQP = requeue_policy::deferred,
typename... Args>
auto run_one_trigger(Args &&...args) -> void {
run_triggers<stdx::cts_t<Name>, RQP, run_policy::one>(
std::forward<Args>(args)...);
}
} // namespace async
71 changes: 71 additions & 0 deletions test/schedulers/trigger_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,3 +216,74 @@ TEST_CASE("trigger_scheduler produces set_stopped debug signal",
CHECK(debug_events ==
std::vector{"op sched start"s, "op sched set_stopped"s});
}

TEMPLATE_TEST_CASE("trigger_scheduler runs all triggers by default",
"[trigger_scheduler]", decltype([] {})) {
constexpr auto name = type_string<TestType>;
auto s = async::trigger_scheduler<name>{};

int var1{};
async::sender auto sndr1 =
async::start_on(s, async::just_result_of([&] { var1 = 42; }));
CHECK(async::start_detached(sndr1));

int var2{};
async::sender auto sndr2 =
async::start_on(s, async::just_result_of([&] { var2 = 42; }));
CHECK(async::start_detached(sndr2));

async::run_triggers<name>();
CHECK(var1 == 42);
CHECK(var2 == 42);
CHECK(async::triggers<stdx::cts_t<name>>.empty());
}

TEMPLATE_TEST_CASE("trigger_scheduler can be policized for multi-stimulus",
"[trigger_scheduler]", decltype([] {})) {
constexpr auto name = type_string<TestType>;
auto s = async::trigger_scheduler<name>{};

int var1{};
async::sender auto sndr1 =
async::start_on(s, async::just_result_of([&] { var1 = 42; }));
CHECK(async::start_detached(sndr1));

int var2{};
async::sender auto sndr2 =
async::start_on(s, async::just_result_of([&] { var2 = 17; }));
CHECK(async::start_detached(sndr2));

async::run_one_trigger<name>();
CHECK(var1 == 42);
CHECK(not async::triggers<stdx::cts_t<name>>.empty());

async::run_one_trigger<name>();
CHECK(var2 == 17);
CHECK(async::triggers<stdx::cts_t<name>>.empty());
}

TEMPLATE_TEST_CASE("triggered task can start a new task on the same trigger",
"[trigger_scheduler]", decltype([] {})) {
constexpr auto name = type_string<TestType>;
auto s = async::trigger_scheduler<name>{};

auto start = [&](auto f) {
async::sender auto sndr = async::start_on(s, async::just_result_of(f));
CHECK(async::start_detached(sndr));
};

int var{};
start([&] {
if (var++ == 0) {
start([&] { var = 42; });
}
});

async::run_one_trigger<name>();
CHECK(var == 1);
CHECK(not async::triggers<stdx::cts_t<name>>.empty());

async::run_one_trigger<name>();
CHECK(var == 42);
CHECK(async::triggers<stdx::cts_t<name>>.empty());
}