diff --git a/Cargo.lock b/Cargo.lock index ae811e0..6e4be78 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,15 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" +[[package]] +name = "aho-corasick" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e60d3430d3a69478ad0993f19238d2df97c507009a52b3c10addcd7f6bcb916" +dependencies = [ + "memchr", +] + [[package]] name = "anstream" version = "0.6.18" @@ -66,6 +75,12 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "anyhow" +version = "1.0.98" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487" + [[package]] name = "atomic-waker" version = "1.1.2" @@ -157,6 +172,15 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990" +[[package]] +name = "colored" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fde0e0ec90c9dfb3b4b1a0891a7dcd0e2bffde2f7efed5fe7c9bb00e5bfb915e" +dependencies = [ + "windows-sys 0.59.0", +] + [[package]] name = "crossbeam-queue" version = "0.3.12" @@ -178,6 +202,16 @@ version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0" +[[package]] +name = "env_filter" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "186e05a59d4c50738528153b83b0b0194d3a29507dfec16eccd4b342903397d0" +dependencies = [ + "log", + "regex", +] + [[package]] name = "errno" version = "0.3.10" @@ -193,6 +227,7 @@ name = "fastimer" version = "0.8.0" dependencies = [ "log", + "logforth", "pin-project", "tokio", ] @@ -243,6 +278,47 @@ version = "1.70.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" +[[package]] +name = "jiff" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5ad87c89110f55e4cd4dc2893a9790820206729eaf221555f742d540b0724a0" +dependencies = [ + "jiff-static", + "jiff-tzdb-platform", + "log", + "portable-atomic", + "portable-atomic-util", + "serde", + "windows-sys 0.59.0", +] + +[[package]] +name = "jiff-static" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d076d5b64a7e2fe6f0743f02c43ca4a6725c0f904203bfe276a5b3e793103605" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "jiff-tzdb" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1283705eb0a21404d2bfd6eef2a7593d240bc42a0bdb39db0ad6fa2ec026524" + +[[package]] +name = "jiff-tzdb-platform" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "875a5a69ac2bab1a891711cf5eccbec1ce0341ea805560dcd90b7a2e925132e8" +dependencies = [ + "jiff-tzdb", +] + [[package]] name = "libc" version = "0.2.169" @@ -267,9 +343,22 @@ dependencies = [ [[package]] name = "log" -version = "0.4.25" +version = "0.4.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04cbf5b083de1c7e0222a7a51dbfdba1cbe1c6ab0b15e29fff3f6c077fd9cd9f" +checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" + +[[package]] +name = "logforth" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7d12336d4771854b6fdbf75bab8257e62d4221d1f9d7187fc254b29aa3bd23b" +dependencies = [ + "anyhow", + "colored", + "env_filter", + "jiff", + "log", +] [[package]] name = "memchr" @@ -361,20 +450,35 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" +[[package]] +name = "portable-atomic" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "350e9b48cbc6b0e028b0473b114454c6316e57336ee184ceab6e53f72c178b3e" + +[[package]] +name = "portable-atomic-util" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8a2f0d8d040d7848a709caf78912debcc3f33ee4b3cac47d73d1e1069e83507" +dependencies = [ + "portable-atomic", +] + [[package]] name = "proc-macro2" -version = "1.0.92" +version = "1.0.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37d3544b3f2748c54e147655edb5025752e2303145b5aefb3c3ea2c78b973bb0" +checksum = "02b3e5e68a3a1a02aad3ec490a98007cbc13c37cbe84a3cd7b8e406d76e7f778" dependencies = [ "unicode-ident", ] [[package]] name = "quote" -version = "1.0.37" +version = "1.0.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5b9d34b8991d19d98081b46eacdd8eb58c6f2b201139f7c5f643cc155a633af" +checksum = "1885c039570dc00dcb4ff087a89e185fd56bae234ddc7f056a945bf36467248d" dependencies = [ "proc-macro2", ] @@ -388,6 +492,35 @@ dependencies = [ "bitflags", ] +[[package]] +name = "regex" +version = "1.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" +dependencies = [ + "aho-corasick", + "memchr", + "regex-automata", + "regex-syntax", +] + +[[package]] +name = "regex-automata" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" + [[package]] name = "rustc-demangle" version = "0.1.24" @@ -413,6 +546,26 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "serde" +version = "1.0.219" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.219" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "signal-hook-registry" version = "1.4.2" @@ -446,9 +599,9 @@ checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" [[package]] name = "syn" -version = "2.0.90" +version = "2.0.100" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "919d3b74a5dd0ccd15aeb8f93e7006bd9e14c295087c9896a110f490752bcf31" +checksum = "b09a44accad81e1ba1cd74a32461ba89dee89095ba17b32f5d03683b1b1fc2a0" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index b77f8a3..8bbe615 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ members = ["fastimer", "fastimer-driver", "fastimer-tokio", "xtask"] resolver = "2" [workspace.package] -edition = "2021" +edition = "2024" homepage = "https://github.com/fast/fastimer" license = "Apache-2.0" readme = "README.md" diff --git a/fastimer-driver/src/heap.rs b/fastimer-driver/src/heap.rs index 1e6abb9..7f69113 100644 --- a/fastimer-driver/src/heap.rs +++ b/fastimer-driver/src/heap.rs @@ -14,9 +14,9 @@ use std::collections::BinaryHeap; use std::ops::ControlFlow; +use std::sync::Arc; use std::sync::atomic; use std::sync::atomic::AtomicBool; -use std::sync::Arc; use std::time::Duration; use std::time::Instant; diff --git a/fastimer-driver/src/lib.rs b/fastimer-driver/src/lib.rs index 051334b..0cf2846 100644 --- a/fastimer-driver/src/lib.rs +++ b/fastimer-driver/src/lib.rs @@ -20,9 +20,9 @@ use std::cmp; use std::future::Future; use std::pin::Pin; +use std::sync::Arc; use std::sync::atomic; use std::sync::atomic::AtomicBool; -use std::sync::Arc; use std::task::Context; use std::task::Poll; use std::time::Duration; @@ -30,8 +30,8 @@ use std::time::Instant; use atomic_waker::AtomicWaker; use crossbeam_queue::SegQueue; -use fastimer::make_instant_from_now; use fastimer::MakeDelay; +use fastimer::make_instant_from_now; use parking::Unparker; mod heap; diff --git a/fastimer-driver/tests/integration.rs b/fastimer-driver/tests/integration.rs index 1176b98..6a26fe3 100644 --- a/fastimer-driver/tests/integration.rs +++ b/fastimer-driver/tests/integration.rs @@ -29,10 +29,12 @@ fn assert_duration_eq(actual: Duration, expected: Duration) { fn test_binary_heap_driver() { let (mut driver, context, shutdown) = binary_heap_driver(); let (tx, rx) = std::sync::mpsc::channel(); - std::thread::spawn(move || loop { - if driver.turn().is_break() { - tx.send(()).unwrap(); - break; + std::thread::spawn(move || { + loop { + if driver.turn().is_break() { + tx.send(()).unwrap(); + break; + } } }); diff --git a/fastimer/Cargo.toml b/fastimer/Cargo.toml index ed21caa..afda62b 100644 --- a/fastimer/Cargo.toml +++ b/fastimer/Cargo.toml @@ -39,6 +39,8 @@ pin-project = { version = "1.1.9" } log = { version = "0.4.22", features = ["kv"], optional = true } [dev-dependencies] +log = { version = "0.4.22", features = ["kv"] } +logforth = { version = "0.24.0", features = ["colored"] } tokio = { workspace = true, features = ["full"] } [lints] diff --git a/fastimer/src/interval.rs b/fastimer/src/interval.rs index 0848524..7fca84e 100644 --- a/fastimer/src/interval.rs +++ b/fastimer/src/interval.rs @@ -17,17 +17,17 @@ // // [1] https://github.com/tokio-rs/tokio/blob/b8ac94ed/tokio/src/time/interval.rs -use std::future::poll_fn; use std::future::Future; +use std::future::poll_fn; use std::pin::Pin; -use std::task::ready; use std::task::Context; use std::task::Poll; +use std::task::ready; use std::time::Duration; use std::time::Instant; -use crate::far_future; use crate::MakeDelay; +use crate::far_future; /// Creates new [`Interval`] that yields with interval of `period`. The first /// tick completes immediately. The default [`MissedTickBehavior`] is @@ -49,8 +49,8 @@ use crate::MakeDelay; /// use std::time::Duration; /// use std::time::Instant; /// -/// use fastimer::interval; /// use fastimer::MakeDelay; +/// use fastimer::interval; /// /// struct TokioDelay; /// impl MakeDelay for TokioDelay { @@ -85,8 +85,8 @@ use crate::MakeDelay; /// use std::time::Duration; /// use std::time::Instant; /// -/// use fastimer::interval; /// use fastimer::MakeDelay; +/// use fastimer::interval; /// /// struct TokioDelay; /// impl MakeDelay for TokioDelay { @@ -139,8 +139,8 @@ pub fn interval(period: Duration, make_delay: D) -> Interval { /// use std::time::Duration; /// use std::time::Instant; /// -/// use fastimer::interval_at; /// use fastimer::MakeDelay; +/// use fastimer::interval_at; /// /// struct TokioDelay; /// impl MakeDelay for TokioDelay { diff --git a/fastimer/src/lib.rs b/fastimer/src/lib.rs index f799832..6b719a6 100644 --- a/fastimer/src/lib.rs +++ b/fastimer/src/lib.rs @@ -130,7 +130,7 @@ impl MakeDelayExt for T {} pub(crate) use self::macros::debug; pub(crate) use self::macros::info; -#[cfg(feature = "logging")] +#[cfg(any(test, feature = "logging"))] mod macros { macro_rules! debug { (target: $target:expr, $($arg:tt)+) => (log::debug!(target: $target, $($arg)+)); @@ -146,7 +146,7 @@ mod macros { pub(crate) use info; } -#[cfg(not(feature = "logging"))] +#[cfg(not(any(test, feature = "logging")))] #[macro_use] mod macros { macro_rules! info { diff --git a/fastimer/src/schedule/arbitrary.rs b/fastimer/src/schedule/arbitrary.rs index 8735f2b..4bb3446 100644 --- a/fastimer/src/schedule/arbitrary.rs +++ b/fastimer/src/schedule/arbitrary.rs @@ -13,21 +13,24 @@ // limitations under the License. use std::future::Future; +use std::ops::ControlFlow; +use std::pin::pin; use std::time::Duration; use std::time::Instant; -use crate::debug; -use crate::info; -use crate::schedule::delay_or_shutdown; -use crate::schedule::initial_delay_or_shutdown; -use crate::schedule::BaseAction; use crate::MakeDelay; use crate::Spawn; +use crate::debug; +use crate::info; +use crate::schedule::execute_or_shutdown; /// Repeatable action that can be scheduled with arbitrary delay. /// /// See [`ArbitraryDelayActionExt`] for scheduling methods. -pub trait ArbitraryDelayAction: BaseAction { +pub trait ArbitraryDelayAction: Send + 'static { + /// The name of the trait. + fn name(&self) -> &str; + /// Run the action. /// /// Return an Instant that indicates when to schedule the next run. @@ -38,13 +41,15 @@ pub trait ArbitraryDelayAction: BaseAction { pub trait ArbitraryDelayActionExt: ArbitraryDelayAction { /// Creates and executes a repeatable action that becomes enabled first after the given /// `initial_delay`, and subsequently based on the result of the action. - fn schedule_with_arbitrary_delay( + fn schedule_with_arbitrary_delay( mut self, + is_shutdown: Fut, spawn: &S, make_delay: D, initial_delay: Option, ) where Self: Sized, + Fut: Future + Send + 'static, S: Spawn, D: MakeDelay + Send + 'static, { @@ -55,20 +60,36 @@ pub trait ArbitraryDelayActionExt: ArbitraryDelayAction { initial_delay ); - let make_delay = - match initial_delay_or_shutdown(&mut self, make_delay, initial_delay).await { - Some(make_delay) => make_delay, - None => return, - }; + let mut is_shutdown = pin!(is_shutdown); + 'schedule: { + if let Some(initial_delay) = initial_delay { + if initial_delay > Duration::ZERO + && execute_or_shutdown(make_delay.delay(initial_delay), &mut is_shutdown) + .await + .is_break() + { + break 'schedule; + } + } + + loop { + debug!("executing scheduled task {}", self.name()); - loop { - debug!("executing scheduled task {}", self.name()); - let next = self.run().await; + let next = match execute_or_shutdown(self.run(), &mut is_shutdown).await { + ControlFlow::Continue(next) => next, + ControlFlow::Break(()) => break, + }; - if delay_or_shutdown(&mut self, make_delay.delay_util(next)).await { - return; + if execute_or_shutdown(make_delay.delay_util(next), &mut is_shutdown) + .await + .is_break() + { + break; + } } } + + info!("scheduled task {} is shutdown", self.name()); }); } } diff --git a/fastimer/src/schedule/mod.rs b/fastimer/src/schedule/mod.rs index 301eded..a99d23e 100644 --- a/fastimer/src/schedule/mod.rs +++ b/fastimer/src/schedule/mod.rs @@ -14,10 +14,11 @@ //! Repeatable and cancellable actions. +mod arbitrary; + use std::future::Future; -use std::time::Duration; +use std::ops::ControlFlow; -mod arbitrary; pub use arbitrary::*; mod notify; @@ -26,70 +27,19 @@ pub use notify::*; mod simple; pub use simple::*; -use crate::info; -use crate::MakeDelay; - mod select; - -use crate::schedule::select::select; use crate::schedule::select::Either; +use crate::schedule::select::select; -/// Base trait for shutdown-able scheduled actions. -pub trait BaseAction: Send + 'static { - /// The name of the trait. - fn name(&self) -> &str; - - /// Return a future that resolves when the action is shutdown. - /// - /// By default, this function returns a future that never resolves, i.e., the action will never - /// be shutdown. - fn is_shutdown(&self) -> impl Future + Send { - std::future::pending() - } - - /// A teardown hook that is called when the action is shutdown. - fn teardown(&mut self) {} -} - -/// Returns `None` if the action is shutdown; otherwise, returns `Some(make_delay)` -/// to give back the `make_delay` for further scheduling. -async fn initial_delay_or_shutdown( - action: &mut A, - make_delay: D, - initial_delay: Option, -) -> Option -where - A: BaseAction, - D: MakeDelay, -{ - let Some(initial_delay) = initial_delay else { - return Some(make_delay); - }; - - if initial_delay.is_zero() { - return Some(make_delay); - } - - if delay_or_shutdown(action, make_delay.delay(initial_delay)).await { - None - } else { - Some(make_delay) - } -} - -/// Returns `true` if the action is shutdown. -async fn delay_or_shutdown(action: &mut A, delay: D) -> bool +/// Returns [`ControlFlow::Break`] if the caller should shut down; otherwise, +/// returns [`ControlFlow::Continue`] with the result of the action run. +async fn execute_or_shutdown(f: F, is_shutdown: S) -> ControlFlow<(), O> where - A: BaseAction, - D: Future, + S: Future, + F: Future, { - let is_shutdown = action.is_shutdown(); - match select(is_shutdown, delay).await { - Either::Left(()) => { - info!("scheduled task {} is stopped", action.name()); - action.teardown(); - true - } - Either::Right(()) => false, + match select(is_shutdown, f).await { + Either::Left(()) => ControlFlow::Break(()), + Either::Right(o) => ControlFlow::Continue(o), } } diff --git a/fastimer/src/schedule/notify.rs b/fastimer/src/schedule/notify.rs index 5c4210b..643a373 100644 --- a/fastimer/src/schedule/notify.rs +++ b/fastimer/src/schedule/notify.rs @@ -13,47 +13,42 @@ // limitations under the License. use std::future::Future; +use std::pin::pin; use std::time::Duration; -use crate::debug; -use crate::info; -use crate::schedule::initial_delay_or_shutdown; -use crate::schedule::BaseAction; use crate::MakeDelay; use crate::Spawn; +use crate::debug; +use crate::info; +use crate::schedule::execute_or_shutdown; /// Repeatable action that can be scheduled by notifications. /// /// See [`NotifyActionExt`] for scheduling methods. -pub trait NotifyAction: BaseAction { +pub trait NotifyAction: Send + 'static { + /// The name of the trait. + fn name(&self) -> &str; + /// Run the action. fn run(&mut self) -> impl Future + Send; - /// Return a future that resolves when the action is notified. - /// - /// The future should return `true` if the action should be stopped, and `false` if the action - /// should be rescheduled. - /// - /// By default, this function calls [`is_shutdown`] to exit the action, and thus never - /// reschedule the action. Implementations can override this method to provide custom - /// notification logic, while still selects on [`is_shutdown`] to allow exiting the action. - /// - /// [`is_shutdown`]: BaseAction::is_shutdown - fn notified(&mut self) -> impl Future + Send { - async move { - self.is_shutdown().await; - true - } - } + /// Return a future that resolves when the action is notified to run again. + fn notified(&mut self) -> impl Future + Send; } /// An extension trait for [`NotifyAction`] that provides scheduling methods. pub trait NotifyActionExt: NotifyAction { /// Creates and executes a repeatable action that becomes enabled first after the given /// `initial_delay`, and subsequently when it is notified. - fn schedule_by_notify(mut self, spawn: &S, make_delay: D, initial_delay: Option) - where + fn schedule_by_notify( + mut self, + is_shutdown: Fut, + spawn: &S, + make_delay: D, + initial_delay: Option, + ) where Self: Sized, + Fut: Future + Send + 'static, S: Spawn, D: MakeDelay + Send + 'static, { @@ -64,21 +59,38 @@ pub trait NotifyActionExt: NotifyAction { initial_delay ); - match initial_delay_or_shutdown(&mut self, make_delay, initial_delay).await { - Some(..) => {} - None => return, - }; + let mut is_shutdown = pin!(is_shutdown); + 'schedule: { + if let Some(initial_delay) = initial_delay { + if initial_delay > Duration::ZERO + && execute_or_shutdown(make_delay.delay(initial_delay), &mut is_shutdown) + .await + .is_break() + { + break 'schedule; + } + } + + loop { + debug!("executing scheduled task {}", self.name()); - loop { - debug!("executing scheduled task {}", self.name()); - self.run().await; + if execute_or_shutdown(self.run(), &mut is_shutdown) + .await + .is_break() + { + break; + }; - if self.notified().await { - info!("scheduled task {} is stopped", self.name()); - self.teardown(); - return; + if execute_or_shutdown(self.notified(), &mut is_shutdown) + .await + .is_break() + { + break; + } } } + + info!("scheduled task {} is shutdown", self.name()); }); } } diff --git a/fastimer/src/schedule/simple.rs b/fastimer/src/schedule/simple.rs index 096208c..2f0df20 100644 --- a/fastimer/src/schedule/simple.rs +++ b/fastimer/src/schedule/simple.rs @@ -13,24 +13,26 @@ // limitations under the License. use std::future::Future; +use std::pin::pin; use std::time::Duration; use std::time::Instant; +use crate::MakeDelay; +use crate::Spawn; use crate::debug; use crate::far_future; use crate::info; use crate::make_instant_from; use crate::make_instant_from_now; -use crate::schedule::delay_or_shutdown; -use crate::schedule::initial_delay_or_shutdown; -use crate::schedule::BaseAction; -use crate::MakeDelay; -use crate::Spawn; +use crate::schedule::execute_or_shutdown; /// Repeatable action. /// /// See [`SimpleActionExt`] for scheduling methods. -pub trait SimpleAction: BaseAction { +pub trait SimpleAction: Send + 'static { + /// The name of the trait. + fn name(&self) -> &str; + /// Run the action. fn run(&mut self) -> impl Future + Send; } @@ -42,14 +44,16 @@ pub trait SimpleActionExt: SimpleAction { /// execution and the commencement of the next. /// /// This task will terminate if [`SimpleAction::run`] returns `true`. - fn schedule_with_fixed_delay( + fn schedule_with_fixed_delay( mut self, + is_shutdown: Fut, spawn: &S, make_delay: D, initial_delay: Option, delay: Duration, ) where Self: Sized, + Fut: Future + Send + 'static, S: Spawn, D: MakeDelay + Send + 'static, { @@ -61,20 +65,37 @@ pub trait SimpleActionExt: SimpleAction { initial_delay ); - let make_delay = - match initial_delay_or_shutdown(&mut self, make_delay, initial_delay).await { - Some(make_delay) => make_delay, - None => return, - }; + let mut is_shutdown = pin!(is_shutdown); + 'schedule: { + if let Some(initial_delay) = initial_delay { + if initial_delay > Duration::ZERO + && execute_or_shutdown(make_delay.delay(initial_delay), &mut is_shutdown) + .await + .is_break() + { + break 'schedule; + } + } - loop { - debug!("executing scheduled task {}", self.name()); - self.run().await; + loop { + debug!("executing scheduled task {}", self.name()); + if execute_or_shutdown(self.run(), &mut is_shutdown) + .await + .is_break() + { + break; + }; - if delay_or_shutdown(&mut self, make_delay.delay(delay)).await { - return; + if execute_or_shutdown(make_delay.delay(delay), &mut is_shutdown) + .await + .is_break() + { + break; + } } } + + info!("scheduled task {} is shutdown", self.name()); }); } @@ -87,14 +108,16 @@ pub trait SimpleActionExt: SimpleAction { /// /// If any execution of this task takes longer than its period, then subsequent /// executions may start late, but will not concurrently execute. - fn schedule_at_fixed_rate( + fn schedule_at_fixed_rate( mut self, + is_shutdown: Fut, spawn: &S, make_delay: D, initial_delay: Option, period: Duration, ) where Self: Sized, + Fut: Future + Send + 'static, S: Spawn, D: MakeDelay + Send + 'static, { @@ -137,25 +160,41 @@ pub trait SimpleActionExt: SimpleAction { initial_delay ); - let mut next = Instant::now(); - if let Some(initial_delay) = initial_delay { - if initial_delay > Duration::ZERO { - next = make_instant_from_now(initial_delay); - if delay_or_shutdown(&mut self, make_delay.delay_util(next)).await { - return; + let mut is_shutdown = pin!(is_shutdown); + 'schedule: { + let mut next = Instant::now(); + if let Some(initial_delay) = initial_delay { + if initial_delay > Duration::ZERO { + next = make_instant_from_now(initial_delay); + if execute_or_shutdown(make_delay.delay_util(next), &mut is_shutdown) + .await + .is_break() + { + break 'schedule; + } } } - } - loop { - debug!("executing scheduled task {}", self.name()); - self.run().await; + loop { + debug!("executing scheduled task {}", self.name()); + if execute_or_shutdown(self.run(), &mut is_shutdown) + .await + .is_break() + { + break; + }; - next = calculate_next_on_miss(next, period); - if delay_or_shutdown(&mut self, make_delay.delay_util(next)).await { - return; + next = calculate_next_on_miss(next, period); + if execute_or_shutdown(make_delay.delay_util(next), &mut is_shutdown) + .await + .is_break() + { + break; + } } } + + info!("scheduled task {} is shutdown", self.name()); }); } } diff --git a/fastimer/tests/common/mod.rs b/fastimer/tests/common/mod.rs new file mode 100644 index 0000000..91cec84 --- /dev/null +++ b/fastimer/tests/common/mod.rs @@ -0,0 +1,43 @@ +// Copyright 2024 FastLabs Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; +use std::time::Instant; + +use fastimer::MakeDelay; +use fastimer::Spawn; + +#[derive(Clone, Copy, Debug, Default)] +pub struct MakeTokioDelay; + +impl MakeDelay for MakeTokioDelay { + type Delay = tokio::time::Sleep; + + fn delay_util(&self, at: Instant) -> Self::Delay { + tokio::time::sleep_until(tokio::time::Instant::from_std(at)) + } + + fn delay(&self, duration: Duration) -> Self::Delay { + tokio::time::sleep(duration) + } +} + +#[derive(Clone, Copy, Debug, Default)] +pub struct TokioSpawn; + +impl Spawn for TokioSpawn { + fn spawn + Send + 'static>(&self, future: F) { + tokio::spawn(future); + } +} diff --git a/fastimer/tests/interval.rs b/fastimer/tests/interval.rs index b95f7cb..15c2c8b 100644 --- a/fastimer/tests/interval.rs +++ b/fastimer/tests/interval.rs @@ -19,6 +19,8 @@ use fastimer::Interval; use fastimer::MakeDelay; use fastimer::MakeDelayExt; +mod common; + #[track_caller] fn assert_duration_eq(actual: Duration, expected: Duration) { if expected.abs_diff(actual) > Duration::from_millis(250) { @@ -33,24 +35,9 @@ async fn assert_tick_about(interval: &mut Interval, expected: D assert_duration_eq(elapsed, expected); } -#[derive(Clone, Copy, Debug, Default)] -pub struct MakeTokioDelay; - -impl MakeDelay for MakeTokioDelay { - type Delay = tokio::time::Sleep; - - fn delay_util(&self, at: Instant) -> Self::Delay { - tokio::time::sleep_until(tokio::time::Instant::from_std(at)) - } - - fn delay(&self, duration: Duration) -> Self::Delay { - tokio::time::sleep(duration) - } -} - #[tokio::test] async fn test_interval_ticks() { - let mut interval = MakeTokioDelay.interval(Duration::from_secs(1)); + let mut interval = common::MakeTokioDelay.interval(Duration::from_secs(1)); assert_tick_about(&mut interval, Duration::ZERO).await; for _ in 0..5 { @@ -62,7 +49,7 @@ async fn test_interval_ticks() { async fn test_interval_at_ticks() { let first_tick = Instant::now() + Duration::from_secs(2); - let mut interval = MakeTokioDelay.interval_at(first_tick, Duration::from_secs(1)); + let mut interval = common::MakeTokioDelay.interval_at(first_tick, Duration::from_secs(1)); assert_tick_about(&mut interval, Duration::from_secs(2)).await; for _ in 0..5 { diff --git a/fastimer/tests/schedule.rs b/fastimer/tests/schedule.rs new file mode 100644 index 0000000..0249b1a --- /dev/null +++ b/fastimer/tests/schedule.rs @@ -0,0 +1,79 @@ +// Copyright 2024 FastLabs Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; +use std::time::Instant; + +use fastimer::MakeDelay; +use fastimer::schedule::SimpleAction; +use fastimer::schedule::SimpleActionExt; +use logforth::append; + +use crate::common::MakeTokioDelay; +use crate::common::TokioSpawn; + +mod common; + +struct MySimpleAction { + name: &'static str, + counter: u8, +} + +impl MySimpleAction { + fn new(name: &'static str) -> Self { + Self { name, counter: 0 } + } +} + +impl SimpleAction for MySimpleAction { + fn name(&self) -> &str { + self.name + } + + async fn run(&mut self) { + log::info!("[{}] starting turn {}", self.name, self.counter); + MakeTokioDelay.delay(Duration::from_secs(1)).await; + self.counter += 1; + } +} + +#[tokio::test] +async fn test_simple_action() { + let _ = logforth::builder() + .dispatch(|d| d.append(append::Stderr::default())) + .try_apply(); + + let initial_delay = Some(Duration::from_secs(1)); + let shutdown = Instant::now() + Duration::from_secs(10); + + MySimpleAction::new("schedule_with_fixed_delay").schedule_with_fixed_delay( + MakeTokioDelay.delay_util(shutdown), + &TokioSpawn, + MakeTokioDelay, + initial_delay, + Duration::from_secs(2), + ); + + MySimpleAction::new("schedule_at_fixed_rate").schedule_at_fixed_rate( + MakeTokioDelay.delay_util(shutdown), + &TokioSpawn, + MakeTokioDelay, + initial_delay, + Duration::from_secs(2), + ); + + MakeTokioDelay + .delay_util(shutdown + Duration::from_secs(1)) + .await; +}