-
Notifications
You must be signed in to change notification settings - Fork 78
Remove OS Thread from Atomics Wait Async #989
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
9ae3e4f
7176102
9bc9dd7
a718026
c0ecf58
ce4d835
3f07edc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,13 +2,7 @@ | |
| // License, v. 2.0. If a copy of the MPL was not distributed with this | ||
| // file, You can obtain one at https://mozilla.org/MPL/2.0/. | ||
|
|
||
| use std::{ | ||
| hint::assert_unchecked, | ||
| ops::ControlFlow, | ||
| sync::Arc, | ||
| thread::{self, JoinHandle}, | ||
| time::Duration, | ||
| }; | ||
| use std::{hint::assert_unchecked, ops::ControlFlow, sync::Arc, time::Duration}; | ||
|
|
||
| use ecmascript_atomics::Ordering; | ||
|
|
||
|
|
@@ -1621,52 +1615,106 @@ fn create_wait_result_object<'gc>( | |
| .expect("Should perform GC here") | ||
| } | ||
|
|
||
| #[derive(Debug)] | ||
| struct WaitAsyncJobInner { | ||
| data_block: SharedDataBlock, | ||
| byte_index_in_buffer: usize, | ||
| waiter_record: Arc<WaiterRecord>, | ||
| promise_to_resolve: Global<Promise<'static>>, | ||
| join_handle: JoinHandle<WaitResult>, | ||
| _has_timeout: bool, | ||
| } | ||
|
|
||
| #[derive(Debug)] | ||
| #[repr(transparent)] | ||
| pub(crate) struct WaitAsyncJob(Box<WaitAsyncJobInner>); | ||
|
|
||
| impl WaitAsyncJob { | ||
| pub(crate) fn is_finished(&self) -> bool { | ||
| self.0.join_handle.is_finished() | ||
| self.0.waiter_record.is_notified() | ||
| } | ||
|
|
||
| pub(crate) fn _will_halt(&self) -> bool { | ||
| self.0._has_timeout | ||
| } | ||
|
|
||
| // NOTE: The reason for using `GcScope` here even though we could've gotten | ||
| // away with `NoGcScope` is that this is essentially a trait impl method, | ||
| // but currently without the trait. The job trait will be added eventually | ||
| // and we can get rid of this lint exception. | ||
| #[allow(unknown_lints, can_use_no_gc_scope)] | ||
| pub(crate) fn run<'gc>(self, agent: &mut Agent, gc: GcScope) -> JsResult<'gc, ()> { | ||
| pub(crate) fn run<'gc>(self, agent: &mut Agent, gc: GcScope<'gc, '_>) -> JsResult<'gc, ()> { | ||
| let gc = gc.into_nogc(); | ||
|
|
||
| // SAFETY: buffer is a cloned SharedDataBlock; non-dangling. | ||
| let waiters = unsafe { self.0.data_block.get_or_init_waiters() }; | ||
| // a. Perform EnterCriticalSection(WL). | ||
| let mut guard = waiters.lock().unwrap(); | ||
| let waiter_record = self.0.waiter_record; | ||
| guard.remove_from_list(self.0.byte_index_in_buffer, waiter_record.clone()); | ||
|
|
||
| let result = match waiter_record.get_result() { | ||
| Some(WaitResult::TimedOut) => WaitResult::TimedOut, | ||
| Some(WaitResult::Ok) => WaitResult::Ok, | ||
| None => { | ||
| waiter_record.set_result(WaitResult::Ok); | ||
| WaitResult::Ok | ||
| } | ||
| }; | ||
|
|
||
| let promise = self.0.promise_to_resolve.take(agent).bind(gc); | ||
| let Ok(result) = self.0.join_handle.join() else { | ||
| // Foreign thread died; we can never resolve. | ||
| let promise_capability = PromiseCapability::from_promise(promise, true); | ||
| match result { | ||
| WaitResult::Ok => { | ||
| unwrap_try(promise_capability.try_resolve( | ||
| agent, | ||
| BUILTIN_STRING_MEMORY.ok.into(), | ||
| gc, | ||
| )); | ||
| } | ||
| WaitResult::TimedOut => { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thought: The string could be created from some |
||
| unwrap_try(promise_capability.try_resolve( | ||
| agent, | ||
| BUILTIN_STRING_MEMORY.timed_out.into(), | ||
| gc, | ||
| )); | ||
| } | ||
| } | ||
| // c. Perform LeaveCriticalSection(WL). | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nitpick (blocking): Step b is missing. I assume that'd be removing the waiter from the list, but I'd want to see it. |
||
| drop(guard); | ||
|
|
||
| // d. Return unused. | ||
| Ok(()) | ||
| } | ||
| } | ||
|
|
||
| struct WaitAsyncTimeoutJobInner { | ||
| data_block: SharedDataBlock, | ||
| byte_index_in_buffer: usize, | ||
| waiter_record: Arc<WaiterRecord>, | ||
| } | ||
|
|
||
| pub(crate) struct WaitAsyncTimeoutJob(Box<WaitAsyncTimeoutJobInner>); | ||
|
|
||
| impl WaitAsyncTimeoutJob { | ||
| pub(crate) fn run<'gc>(self, _agent: &mut Agent, _gc: GcScope<'gc, '_>) -> JsResult<'gc, ()> { | ||
| if self.0.waiter_record.get_result().is_some() { | ||
| return Ok(()); | ||
| }; | ||
| } | ||
|
|
||
| // SAFETY: buffer is a cloned SharedDataBlock; non-dangling. | ||
| let waiters = unsafe { self.0.data_block.get_or_init_waiters() }; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nitpick: This seems like it could be a helper method. |
||
| // a. Perform EnterCriticalSection(WL). | ||
| let mut guard = waiters.lock().unwrap(); | ||
|
|
||
| // b. If WL.[[Waiters]] contains waiterRecord, then | ||
| // i. Let timeOfJobExecution be the time value (UTC) identifying the current time. | ||
| // ii. Assert: ℝ(timeOfJobExecution) ≥ waiterRecord.[[TimeoutTime]] (ignoring potential non-monotonicity of time values). | ||
| // iii. Set waiterRecord.[[Result]] to "timed-out". | ||
| self.0.waiter_record.set_result(WaitResult::TimedOut); | ||
|
|
||
| // iv. Perform RemoveWaiter(WL, waiterRecord). | ||
| let waiter_record = self.0.waiter_record.clone(); | ||
| guard.remove_from_list(self.0.byte_index_in_buffer, self.0.waiter_record); | ||
|
|
||
| // v. Perform NotifyWaiter(WL, waiterRecord). | ||
| waiter_record.notify_waiters(); | ||
|
|
||
| // c. Perform LeaveCriticalSection(WL). | ||
| let promise_capability = PromiseCapability::from_promise(promise, true); | ||
| let result = match result { | ||
| WaitResult::Ok => BUILTIN_STRING_MEMORY.ok.into(), | ||
| WaitResult::TimedOut => BUILTIN_STRING_MEMORY.timed_out.into(), | ||
| }; | ||
| unwrap_try(promise_capability.try_resolve(agent, result, gc)); | ||
| drop(guard); | ||
|
|
||
| // d. Return unused. | ||
| Ok(()) | ||
| } | ||
|
|
@@ -1689,40 +1737,41 @@ fn enqueue_atomics_wait_async_job<const IS_I64: bool>( | |
| // 1. Let timeoutJob be a new Job Abstract Closure with no parameters that | ||
| // captures WL and waiterRecord and performs the following steps when | ||
| // called: | ||
| let handle = thread::spawn(move || { | ||
| // SAFETY: buffer is a cloned SharedDataBlock; non-dangling. | ||
| let waiters = unsafe { data_block.get_or_init_waiters() }; | ||
| let mut guard = waiters.lock().unwrap(); | ||
|
|
||
| if t == u64::MAX { | ||
| waiter_record.wait(guard); | ||
| } else { | ||
| let dur = Duration::from_millis(t); | ||
| let (new_guard, timeout) = waiter_record.wait_timeout(guard, dur); | ||
| guard = new_guard; | ||
| if timeout.timed_out() { | ||
| guard.remove_from_list(byte_index_in_buffer, waiter_record); | ||
| // 2. Let now be the time value (UTC) identifying the current time. | ||
| // 3. Let currentRealm be the current Realm Record. | ||
| // 4. Perform HostEnqueueTimeoutJob(timeoutJob, currentRealm, 𝔽(waiterRecord.[[TimeoutTime]]) - now). | ||
|
|
||
| // 31. Perform LeaveCriticalSection(WL). | ||
| drop(guard); | ||
| let timeout_job_data = if t != u64::MAX { | ||
| Some(WaitAsyncTimeoutJobInner { | ||
| data_block: data_block.clone(), | ||
| byte_index_in_buffer, | ||
| waiter_record: waiter_record.clone(), | ||
| }) | ||
| } else { | ||
| None | ||
| }; | ||
|
|
||
| // 32. If mode is sync, return waiterRecord.[[Result]]. | ||
| return WaitResult::TimedOut; | ||
| } | ||
| } | ||
| WaitResult::Ok | ||
| }); | ||
| let wait_async_job = Job { | ||
| realm: Some(Global::new(agent, agent.current_realm(gc).unbind())), | ||
| inner: InnerJob::WaitAsync(WaitAsyncJob(Box::new(WaitAsyncJobInner { | ||
| data_block, | ||
| byte_index_in_buffer, | ||
| waiter_record, | ||
| promise_to_resolve: promise, | ||
| join_handle: handle, | ||
| _has_timeout: t != u64::MAX, | ||
| }))), | ||
| }; | ||
| // 2. Let now be the time value (UTC) identifying the current time. | ||
| // 3. Let currentRealm be the current Realm Record. | ||
| // 4. Perform HostEnqueueTimeoutJob(timeoutJob, currentRealm, 𝔽(waiterRecord.[[TimeoutTime]]) - now). | ||
| agent.host_hooks.enqueue_generic_job(wait_async_job); | ||
|
|
||
| if let Some(inner) = timeout_job_data { | ||
| let wait_async_timeout_job = Job { | ||
| realm: Some(Global::new(agent, agent.current_realm(gc).unbind())), | ||
| inner: InnerJob::WaitAsyncTimeout(WaitAsyncTimeoutJob(Box::new(inner))), | ||
| }; | ||
| agent | ||
| .host_hooks | ||
| .enqueue_timeout_job(wait_async_timeout_job, t); | ||
| } | ||
|
|
||
| // 5. Return unused. | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick (but blocking): I'd like to see here a reference to where these steps are coming from. Possibly also a link to the relevant spec method.