feat: implement memory.atomic.notify,wait32,wait64 (#5255)

* feat: implement memory.atomic.notify,wait32,wait64

Added the parking_spot crate, which provides the needed registry for the
operations.

Signed-off-by: Harald Hoyer <harald@profian.com>

* fix: change trap message for HeapMisaligned

The threads spec test wants "unaligned atomic"
instead of "misaligned memory access".

Signed-off-by: Harald Hoyer <harald@profian.com>

* tests: add test for atomic wait on non-shared memory

Signed-off-by: Harald Hoyer <harald@profian.com>

* tests: add tests/spec_testsuite/proposals/threads

without pooling and reference types.
Also "shared_memory" is added to the "spectest" interface.

Signed-off-by: Harald Hoyer <harald@profian.com>

* tests: add atomics_notify.wast

checking that notify with 0 waiters returns 0 on shared and non-shared
memory.

Signed-off-by: Harald Hoyer <harald@profian.com>

* tests: add tests for atomic wait on shared memory

- return 2 - timeout for 0
- return 2 - timeout for 1000ns
- return 1 - invalid value

Signed-off-by: Harald Hoyer <harald@profian.com>

* fixup! feat: implement memory.atomic.notify,wait32,wait64

Signed-off-by: Harald Hoyer <harald@profian.com>

* fixup! feat: implement memory.atomic.notify,wait32,wait64

Signed-off-by: Harald Hoyer <harald@profian.com>

Signed-off-by: Harald Hoyer <harald@profian.com>
This commit is contained in:
Harald Hoyer
2022-11-21 19:23:06 +01:00
committed by GitHub
parent fe2bfdbc1f
commit c74706aa59
21 changed files with 970 additions and 112 deletions

View File

@@ -196,6 +196,17 @@ impl Instance {
}
}
/// Get a locally defined or imported memory.
pub(crate) fn get_runtime_memory(&mut self, index: MemoryIndex) -> &mut Memory {
if let Some(defined_index) = self.module().defined_memory_index(index) {
unsafe { &mut *self.get_defined_memory(defined_index) }
} else {
let import = self.imported_memory(index);
let ctx = unsafe { &mut *import.vmctx };
unsafe { &mut *ctx.instance_mut().get_defined_memory(import.index) }
}
}
/// Return the indexed `VMMemoryDefinition`.
fn memory(&self, index: DefinedMemoryIndex) -> VMMemoryDefinition {
unsafe { VMMemoryDefinition::load(self.memory_ptr(index)) }

View File

@@ -40,6 +40,7 @@ mod instance;
mod memory;
mod mmap;
mod mmap_vec;
mod parking_spot;
mod table;
mod traphandlers;
mod vmcontext;

View File

@@ -55,13 +55,13 @@
//! ```
use crate::externref::VMExternRef;
use crate::instance::Instance;
use crate::table::{Table, TableElementType};
use crate::vmcontext::{VMCallerCheckedAnyfunc, VMContext};
use crate::TrapReason;
use crate::{SharedMemory, TrapReason};
use anyhow::Result;
use std::mem;
use std::ptr::{self, NonNull};
use std::time::{Duration, Instant};
use wasmtime_environ::{
DataIndex, ElemIndex, FuncIndex, GlobalIndex, MemoryIndex, TableIndex, Trap,
};
@@ -434,81 +434,81 @@ unsafe fn externref_global_set(vmctx: *mut VMContext, index: u32, externref: *mu
unsafe fn memory_atomic_notify(
vmctx: *mut VMContext,
memory_index: u32,
addr: u64,
_count: u32,
addr_index: u64,
count: u32,
) -> Result<u32, TrapReason> {
let memory = MemoryIndex::from_u32(memory_index);
let instance = (*vmctx).instance();
validate_atomic_addr(instance, memory, addr, 4, 4)?;
Err(
anyhow::anyhow!("unimplemented: wasm atomics (fn memory_atomic_notify) unsupported",)
.into(),
)
let instance = (*vmctx).instance_mut();
instance
.get_memory(memory)
.validate_addr(addr_index, 4, 4)?;
let shared_mem = instance.get_runtime_memory(memory).as_shared_memory();
if count == 0 {
return Ok(0);
}
let unparked_threads = shared_mem.map_or(0, |shared_mem| {
// SAFETY: checked `addr_index` above
unsafe { shared_mem.unchecked_atomic_notify(addr_index, count) }
});
Ok(unparked_threads)
}
// Implementation of `memory.atomic.wait32` for locally defined memories.
unsafe fn memory_atomic_wait32(
vmctx: *mut VMContext,
memory_index: u32,
addr: u64,
_expected: u32,
_timeout: u64,
addr_index: u64,
expected: u32,
timeout: u64,
) -> Result<u32, TrapReason> {
// convert timeout to Instant, before any wait happens on locking
let timeout = (timeout as i64 >= 0).then(|| Instant::now() + Duration::from_nanos(timeout));
let memory = MemoryIndex::from_u32(memory_index);
let instance = (*vmctx).instance();
validate_atomic_addr(instance, memory, addr, 4, 4)?;
Err(
anyhow::anyhow!("unimplemented: wasm atomics (fn memory_atomic_wait32) unsupported",)
.into(),
)
let instance = (*vmctx).instance_mut();
let addr = instance
.get_memory(memory)
.validate_addr(addr_index, 4, 4)?;
let shared_mem: SharedMemory = instance
.get_runtime_memory(memory)
.as_shared_memory()
.ok_or(Trap::AtomicWaitNonSharedMemory)?;
// SAFETY: checked `addr_index` above
let res = unsafe { shared_mem.unchecked_atomic_wait32(addr_index, addr, expected, timeout) };
Ok(res)
}
// Implementation of `memory.atomic.wait64` for locally defined memories.
unsafe fn memory_atomic_wait64(
vmctx: *mut VMContext,
memory_index: u32,
addr: u64,
_expected: u64,
_timeout: u64,
addr_index: u64,
expected: u64,
timeout: u64,
) -> Result<u32, TrapReason> {
// convert timeout to Instant, before any wait happens on locking
let timeout = (timeout as i64 >= 0).then(|| Instant::now() + Duration::from_nanos(timeout));
let memory = MemoryIndex::from_u32(memory_index);
let instance = (*vmctx).instance();
validate_atomic_addr(instance, memory, addr, 8, 8)?;
Err(
anyhow::anyhow!("unimplemented: wasm atomics (fn memory_atomic_wait64) unsupported",)
.into(),
)
}
let instance = (*vmctx).instance_mut();
let addr = instance
.get_memory(memory)
.validate_addr(addr_index, 8, 8)?;
macro_rules! ensure {
($cond:expr, $trap:expr) => {
if !($cond) {
return Err($trap);
}
};
}
let shared_mem: SharedMemory = instance
.get_runtime_memory(memory)
.as_shared_memory()
.ok_or(Trap::AtomicWaitNonSharedMemory)?;
/// In the configurations where bounds checks were elided in JIT code (because
/// we are using static memories with virtual memory guard pages) this manual
/// check is here so we don't segfault from Rust. For other configurations,
/// these checks are required anyways.
unsafe fn validate_atomic_addr(
instance: &Instance,
memory: MemoryIndex,
addr: u64,
access_size: u64,
access_alignment: u64,
) -> Result<(), Trap> {
debug_assert!(access_alignment.is_power_of_two());
ensure!(addr % access_alignment == 0, Trap::HeapMisaligned);
let length = u64::try_from(instance.get_memory(memory).current_length()).unwrap();
ensure!(
addr.saturating_add(access_size) < length,
Trap::MemoryOutOfBounds
);
Ok(())
// SAFETY: checked `addr_index` above
let res = unsafe { shared_mem.unchecked_atomic_wait64(addr_index, addr, expected, timeout) };
Ok(res)
}
// Hook for when an instance runs out of fuel.

View File

@@ -3,6 +3,7 @@
//! `RuntimeLinearMemory` is to WebAssembly linear memories what `Table` is to WebAssembly tables.
use crate::mmap::Mmap;
use crate::parking_spot::{ParkResult, ParkingSpot};
use crate::vmcontext::VMMemoryDefinition;
use crate::MemoryImage;
use crate::MemoryImageSlot;
@@ -10,9 +11,10 @@ use crate::Store;
use anyhow::Error;
use anyhow::{bail, format_err, Result};
use std::convert::TryFrom;
use std::sync::atomic::Ordering;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::sync::{Arc, RwLock};
use wasmtime_environ::{MemoryPlan, MemoryStyle, WASM32_MAX_PAGES, WASM64_MAX_PAGES};
use std::time::Instant;
use wasmtime_environ::{MemoryPlan, MemoryStyle, Trap, WASM32_MAX_PAGES, WASM64_MAX_PAGES};
const WASM_PAGE_SIZE: usize = wasmtime_environ::WASM_PAGE_SIZE as usize;
const WASM_PAGE_SIZE_U64: u64 = wasmtime_environ::WASM_PAGE_SIZE as u64;
@@ -432,7 +434,8 @@ impl RuntimeLinearMemory for StaticMemory {
/// [thread proposal]:
/// https://github.com/WebAssembly/threads/blob/master/proposals/threads/Overview.md#webassemblymemoryprototypegrow
#[derive(Clone)]
pub struct SharedMemory(Arc<RwLock<SharedMemoryInner>>);
pub struct SharedMemory(Arc<SharedMemoryInner>);
impl SharedMemory {
/// Construct a new [`SharedMemory`].
pub fn new(plan: MemoryPlan) -> Result<Self> {
@@ -458,16 +461,14 @@ impl SharedMemory {
"cannot re-wrap a shared memory"
);
let def = LongTermVMMemoryDefinition(memory.vmmemory());
Ok(Self(Arc::new(RwLock::new(SharedMemoryInner {
memory: memory,
ty,
def,
}))))
let spot = ParkingSpot::default();
let inner = RwLock::new(SharedMemoryInnerRwLocked { memory, ty, def });
Ok(Self(Arc::new(SharedMemoryInner { spot, lock: inner })))
}
/// Return the memory type for this [`SharedMemory`].
pub fn ty(&self) -> wasmtime_environ::Memory {
self.0.read().unwrap().ty
self.0.lock.read().unwrap().ty
}
/// Convert this shared memory into a [`Memory`].
@@ -477,21 +478,120 @@ impl SharedMemory {
/// Return a mutable pointer to the shared memory's [VMMemoryDefinition].
pub fn vmmemory_ptr_mut(&mut self) -> *mut VMMemoryDefinition {
&self.0.read().unwrap().def.0 as *const _ as *mut _
&self.0.lock.read().unwrap().def.0 as *const _ as *mut _
}
/// Return a pointer to the shared memory's [VMMemoryDefinition].
pub fn vmmemory_ptr(&self) -> *const VMMemoryDefinition {
&self.0.read().unwrap().def.0 as *const _
&self.0.lock.read().unwrap().def.0 as *const _
}
/// Implementation of `memory.atomic.notify` for this shared memory.
pub fn atomic_notify(&self, addr_index: u64, count: u32) -> Result<u32, Trap> {
let definition = &self.0.lock.read().unwrap().def.0;
definition.validate_addr(addr_index, 4, 4)?;
// SAFETY: checked `addr_index` above
Ok(unsafe { self.unchecked_atomic_notify(addr_index, count) })
}
/// Unchecked implementation of `memory.atomic.notify` for this shared memory.
///
/// # Safety
/// The caller must ensure that `addr_index` is a valid address in the memory.
pub unsafe fn unchecked_atomic_notify(&self, addr_index: u64, count: u32) -> u32 {
if count == 0 {
return 0;
}
self.0.spot.unpark(addr_index, count)
}
/// Implementation of `memory.atomic.wait32` for this shared memory.
pub fn atomic_wait32(
&self,
addr_index: u64,
expected: u32,
timeout: Option<Instant>,
) -> Result<u32, Trap> {
let definition = &self.0.lock.read().unwrap().def.0;
let addr = definition.validate_addr(addr_index, 4, 4)?;
// SAFETY: checked `addr` above
Ok(unsafe { self.unchecked_atomic_wait32(addr_index, addr, expected, timeout) })
}
/// Unchecked implementation of `memory.atomic.wait32` for this shared memory.
///
/// # Safety
/// The caller must ensure that `addr` is a valid address in the memory.
pub unsafe fn unchecked_atomic_wait32(
&self,
addr_index: u64,
addr: *const u8,
expected: u32,
timeout: Option<Instant>,
) -> u32 {
// SAFETY: `addr_index` was validated by `validate_addr` above.
let atomic = unsafe { &*(addr as *const AtomicU32) };
// We want the sequential consistency of `SeqCst` to ensure that the `load` sees the value that the `notify` will/would see.
// All WASM atomic operations are also `SeqCst`.
let validate = || atomic.load(Ordering::SeqCst) == expected;
match self.0.spot.park(addr_index, validate, timeout) {
ParkResult::Unparked => 0,
ParkResult::Invalid => 1,
ParkResult::TimedOut => 2,
}
}
/// Implementation of `memory.atomic.wait64` for this shared memory.
pub fn atomic_wait64(
&self,
addr_index: u64,
expected: u64,
timeout: Option<Instant>,
) -> Result<u32, Trap> {
let definition = &self.0.lock.read().unwrap().def.0;
let addr = definition.validate_addr(addr_index, 8, 8)?;
// SAFETY: checked `addr` above
Ok(unsafe { self.unchecked_atomic_wait64(addr_index, addr, expected, timeout) })
}
/// Unchecked implementation of `memory.atomic.wait64` for this shared memory.
///
/// # Safety
/// The caller must ensure that `addr` is a valid address in the memory.
pub unsafe fn unchecked_atomic_wait64(
&self,
addr_index: u64,
addr: *const u8,
expected: u64,
timeout: Option<Instant>,
) -> u32 {
// SAFETY: `addr_index` was validated by `validate_addr` above.
let atomic = unsafe { &*(addr as *const AtomicU64) };
// We want the sequential consistency of `SeqCst` to ensure that the `load` sees the value that the `notify` will/would see.
// All WASM atomic operations are also `SeqCst`.
let validate = || atomic.load(Ordering::SeqCst) == expected;
match self.0.spot.park(addr_index, validate, timeout) {
ParkResult::Unparked => 0,
ParkResult::Invalid => 1,
ParkResult::TimedOut => 2,
}
}
}
struct SharedMemoryInner {
struct SharedMemoryInnerRwLocked {
memory: Box<dyn RuntimeLinearMemory>,
ty: wasmtime_environ::Memory,
def: LongTermVMMemoryDefinition,
}
struct SharedMemoryInner {
lock: RwLock<SharedMemoryInnerRwLocked>,
spot: ParkingSpot,
}
/// Shared memory needs some representation of a `VMMemoryDefinition` for
/// JIT-generated code to access. This structure owns the base pointer and
/// length to the actual memory and we share this definition across threads by:
@@ -507,11 +607,11 @@ unsafe impl Sync for LongTermVMMemoryDefinition {}
/// Proxy all calls through the [`RwLock`].
impl RuntimeLinearMemory for SharedMemory {
fn byte_size(&self) -> usize {
self.0.read().unwrap().memory.byte_size()
self.0.lock.read().unwrap().memory.byte_size()
}
fn maximum_byte_size(&self) -> Option<usize> {
self.0.read().unwrap().memory.maximum_byte_size()
self.0.lock.read().unwrap().memory.maximum_byte_size()
}
fn grow(
@@ -519,7 +619,7 @@ impl RuntimeLinearMemory for SharedMemory {
delta_pages: u64,
store: Option<&mut dyn Store>,
) -> Result<Option<(usize, usize)>, Error> {
let mut inner = self.0.write().unwrap();
let mut inner = self.0.lock.write().unwrap();
let result = inner.memory.grow(delta_pages, store)?;
if let Some((_old_size_in_bytes, new_size_in_bytes)) = result {
// Store the new size to the `VMMemoryDefinition` for JIT-generated
@@ -551,7 +651,7 @@ impl RuntimeLinearMemory for SharedMemory {
}
fn grow_to(&mut self, size: usize) -> Result<()> {
self.0.write().unwrap().memory.grow_to(size)
self.0.lock.write().unwrap().memory.grow_to(size)
}
fn vmmemory(&mut self) -> VMMemoryDefinition {
@@ -563,7 +663,7 @@ impl RuntimeLinearMemory for SharedMemory {
}
fn needs_init(&self) -> bool {
self.0.read().unwrap().memory.needs_init()
self.0.lock.read().unwrap().memory.needs_init()
}
fn as_any_mut(&mut self) -> &mut dyn std::any::Any {

View File

@@ -0,0 +1,445 @@
//! Implements thread wait and notify primitives with `std::sync` primitives.
//!
//! This is a simplified version of the `parking_lot_core` crate.
//!
//! There are two main operations that can be performed:
//!
//! - *Parking* refers to suspending the thread while simultaneously enqueuing it
//! on a queue keyed by some address.
//! - *Unparking* refers to dequeuing a thread from a queue keyed by some address
//! and resuming it.
#![deny(clippy::all)]
#![deny(clippy::pedantic)]
#![deny(missing_docs)]
#![deny(unsafe_code)]
use std::collections::BTreeMap;
use std::sync::{Arc, Condvar, Mutex};
use std::time::Instant;
/// Result of a park operation.
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub enum ParkResult {
/// Unparked by another thread.
Unparked,
/// The validation callback returned false.
Invalid,
/// The timeout expired.
TimedOut,
}
#[derive(Default, Debug)]
struct Spot {
/// The number of threads parked on this spot.
num_parked: u32,
/// The number of threads that have been unparked but not yet woken up.
/// This is used to avoid spurious wakeups.
to_unpark: u32,
/// The [`Condvar`] used to notify parked threads.
cvar: Arc<Condvar>,
}
/// The thread global `ParkingSpot`.
#[derive(Default, Debug)]
pub struct ParkingSpot {
inner: Mutex<BTreeMap<u64, Spot>>,
}
impl ParkingSpot {
/// Park the current thread until it is unparked or a timeout is reached.
///
/// The `key` is used to identify the parking spot. If another thread calls
/// `unpark_all` or `unpark` with the same key, the current thread will be unparked.
///
/// The `validate` callback is called before parking.
/// If it returns `false`, the thread is not parked and `ParkResult::Invalid` is returned.
///
/// The `timeout` argument specifies the maximum amount of time the thread will be parked.
pub fn park(
&self,
key: u64,
validate: impl FnOnce() -> bool,
timeout: impl Into<Option<Instant>>,
) -> ParkResult {
self.park_inner(key, validate, timeout.into())
}
fn park_inner(
&self,
key: u64,
validate: impl FnOnce() -> bool,
timeout: Option<Instant>,
) -> ParkResult {
let mut inner = self
.inner
.lock()
.expect("failed to lock inner parking table");
// check validation with lock held
if !validate() {
return ParkResult::Invalid;
}
// clone the condvar, so we can move the lock
let cvar = {
let spot = inner.entry(key).or_insert_with(Spot::default);
spot.num_parked = spot
.num_parked
.checked_add(1)
.expect("parking spot number overflow");
spot.cvar.clone()
};
loop {
let timed_out = if let Some(timeout) = timeout {
let now = Instant::now();
if now >= timeout {
true
} else {
let dur = timeout - now;
let (lock, result) = cvar
.wait_timeout(inner, dur)
.expect("failed to wait for condition");
inner = lock;
result.timed_out()
}
} else {
inner = cvar.wait(inner).expect("failed to wait for condition");
false
};
let spot = inner.get_mut(&key).expect("failed to get spot");
if !timed_out {
if spot.to_unpark == 0 {
continue;
}
spot.to_unpark -= 1;
}
spot.num_parked = spot
.num_parked
.checked_sub(1)
.expect("corrupted parking spot state");
if spot.num_parked == 0 {
assert_eq!(spot.to_unpark, 0);
inner
.remove(&key)
.expect("failed to remove spot from inner parking table");
}
if timed_out {
return ParkResult::TimedOut;
}
return ParkResult::Unparked;
}
}
/// Unpark at most `n` threads that are parked with the given key.
///
/// Returns the number of threads that were actually unparked.
pub fn unpark(&self, key: u64, n: u32) -> u32 {
let mut num_unpark = 0;
self.with_lot(key, |spot| {
num_unpark = n.min(spot.num_parked - spot.to_unpark);
spot.to_unpark += num_unpark;
if n >= num_unpark {
spot.cvar.notify_all();
} else {
for _ in 0..num_unpark {
spot.cvar.notify_one();
}
}
});
num_unpark
}
fn with_lot<F: FnMut(&mut Spot)>(&self, key: u64, mut f: F) {
let mut inner = self
.inner
.lock()
.expect("failed to lock inner parking table");
if let Some(spot) = inner.get_mut(&key) {
f(spot);
}
}
}
#[cfg(test)]
mod tests {
use super::ParkingSpot;
use once_cell::sync::Lazy;
use std::ptr::addr_of;
use std::sync::atomic::{AtomicU64, Ordering};
use std::thread;
static PARKING_SPOT: Lazy<ParkingSpot> = Lazy::new(ParkingSpot::default);
static ATOMIC: AtomicU64 = AtomicU64::new(0);
#[test]
fn atomic_wait_notify() {
let thread1 = thread::spawn(|| {
let atomic_key = addr_of!(ATOMIC) as u64;
ATOMIC.store(1, Ordering::SeqCst);
PARKING_SPOT.unpark(atomic_key, u32::MAX);
PARKING_SPOT.park(atomic_key, || ATOMIC.load(Ordering::SeqCst) == 1, None);
});
let thread2 = thread::spawn(|| {
let atomic_key = addr_of!(ATOMIC) as u64;
while ATOMIC.load(Ordering::SeqCst) != 1 {
PARKING_SPOT.park(atomic_key, || ATOMIC.load(Ordering::SeqCst) != 1, None);
}
ATOMIC.store(2, Ordering::SeqCst);
PARKING_SPOT.unpark(atomic_key, u32::MAX);
PARKING_SPOT.park(atomic_key, || ATOMIC.load(Ordering::SeqCst) == 2, None);
});
let thread3 = thread::spawn(|| {
let atomic_key = addr_of!(ATOMIC) as u64;
while ATOMIC.load(Ordering::SeqCst) != 2 {
PARKING_SPOT.park(atomic_key, || ATOMIC.load(Ordering::SeqCst) != 2, None);
}
ATOMIC.store(3, Ordering::SeqCst);
PARKING_SPOT.unpark(atomic_key, u32::MAX);
PARKING_SPOT.park(atomic_key, || ATOMIC.load(Ordering::SeqCst) == 3, None);
});
let atomic_key = addr_of!(ATOMIC) as u64;
while ATOMIC.load(Ordering::SeqCst) != 3 {
PARKING_SPOT.park(atomic_key, || ATOMIC.load(Ordering::SeqCst) != 3, None);
}
ATOMIC.store(4, Ordering::SeqCst);
PARKING_SPOT.unpark(atomic_key, u32::MAX);
thread1.join().unwrap();
thread2.join().unwrap();
thread3.join().unwrap();
}
mod parking_lot {
// This is a modified version of the parking_lot_core tests,
// which are licensed under the MIT and Apache 2.0 licenses.
use super::*;
use std::sync::atomic::{AtomicIsize, AtomicU32};
use std::sync::Arc;
use std::time::Duration;
macro_rules! test {
( $( $name:ident(
repeats: $repeats:expr,
latches: $latches:expr,
delay: $delay:expr,
threads: $threads:expr,
single_unparks: $single_unparks:expr);
)* ) => {
$(
#[test]
fn $name() {
if std::env::var("WASMTIME_TEST_NO_HOG_MEMORY").is_ok() {
return;
}
let delay = Duration::from_micros($delay);
for _ in 0..$repeats {
run_parking_test($latches, delay, $threads, $single_unparks);
}
})*
};
}
test! {
unpark_all_one_fast(
repeats: 10000, latches: 1, delay: 0, threads: 1, single_unparks: 0
);
unpark_all_hundred_fast(
repeats: 100, latches: 1, delay: 0, threads: 100, single_unparks: 0
);
unpark_one_one_fast(
repeats: 1000, latches: 1, delay: 0, threads: 1, single_unparks: 1
);
unpark_one_hundred_fast(
repeats: 20, latches: 1, delay: 0, threads: 100, single_unparks: 100
);
unpark_one_fifty_then_fifty_all_fast(
repeats: 50, latches: 1, delay: 0, threads: 100, single_unparks: 50
);
unpark_all_one(
repeats: 100, latches: 1, delay: 10000, threads: 1, single_unparks: 0
);
unpark_all_hundred(
repeats: 100, latches: 1, delay: 10000, threads: 100, single_unparks: 0
);
unpark_one_one(
repeats: 10, latches: 1, delay: 10000, threads: 1, single_unparks: 1
);
unpark_one_fifty(
repeats: 1, latches: 1, delay: 10000, threads: 50, single_unparks: 50
);
unpark_one_fifty_then_fifty_all(
repeats: 2, latches: 1, delay: 10000, threads: 100, single_unparks: 50
);
hundred_unpark_all_one_fast(
repeats: 100, latches: 100, delay: 0, threads: 1, single_unparks: 0
);
hundred_unpark_all_one(
repeats: 1, latches: 100, delay: 10000, threads: 1, single_unparks: 0
);
}
fn run_parking_test(
num_latches: usize,
delay: Duration,
num_threads: u32,
num_single_unparks: u32,
) {
let mut tests = Vec::with_capacity(num_latches);
for _ in 0..num_latches {
let test = Arc::new(SingleLatchTest::new(num_threads));
let mut threads = Vec::with_capacity(num_threads as _);
for _ in 0..num_threads {
let test = test.clone();
threads.push(thread::spawn(move || test.run()));
}
tests.push((test, threads));
}
for unpark_index in 0..num_single_unparks {
thread::sleep(delay);
for (test, _) in &tests {
test.unpark_one(unpark_index);
}
}
for (test, threads) in tests {
test.finish(num_single_unparks);
for thread in threads {
thread.join().expect("Test thread panic");
}
}
}
struct SingleLatchTest {
semaphore: AtomicIsize,
num_awake: AtomicU32,
/// Total number of threads participating in this test.
num_threads: u32,
}
impl SingleLatchTest {
pub fn new(num_threads: u32) -> Self {
Self {
// This implements a fair (FIFO) semaphore, and it starts out unavailable.
semaphore: AtomicIsize::new(0),
num_awake: AtomicU32::new(0),
num_threads,
}
}
pub fn run(&self) {
// Get one slot from the semaphore
self.down();
self.num_awake.fetch_add(1, Ordering::SeqCst);
}
pub fn unpark_one(&self, _single_unpark_index: u32) {
let num_awake_before_up = self.num_awake.load(Ordering::SeqCst);
self.up();
// Wait for a parked thread to wake up and update num_awake + last_awoken.
while self.num_awake.load(Ordering::SeqCst) != num_awake_before_up + 1 {
thread::yield_now();
}
}
pub fn finish(&self, num_single_unparks: u32) {
// The amount of threads not unparked via unpark_one
let mut num_threads_left =
self.num_threads.checked_sub(num_single_unparks).unwrap();
// Wake remaining threads up with unpark_all. Has to be in a loop, because there might
// still be threads that has not yet parked.
while num_threads_left > 0 {
let mut num_waiting_on_address = 0;
PARKING_SPOT.with_lot(self.semaphore_addr(), |thread_data| {
num_waiting_on_address = thread_data.num_parked;
});
assert!(num_waiting_on_address <= num_threads_left);
let num_awake_before_unpark = self.num_awake.load(Ordering::SeqCst);
let num_unparked = PARKING_SPOT.unpark(self.semaphore_addr(), u32::MAX);
assert!(num_unparked >= num_waiting_on_address);
assert!(num_unparked <= num_threads_left);
// Wait for all unparked threads to wake up and update num_awake + last_awoken.
while self.num_awake.load(Ordering::SeqCst)
!= num_awake_before_unpark + num_unparked
{
thread::yield_now();
}
num_threads_left = num_threads_left.checked_sub(num_unparked).unwrap();
}
// By now, all threads should have been woken up
assert_eq!(self.num_awake.load(Ordering::SeqCst), self.num_threads);
// Make sure no thread is parked on our semaphore address
let mut num_waiting_on_address = 0;
PARKING_SPOT.with_lot(self.semaphore_addr(), |thread_data| {
num_waiting_on_address = thread_data.num_parked;
});
assert_eq!(num_waiting_on_address, 0);
}
pub fn down(&self) {
let old_semaphore_value = self.semaphore.fetch_sub(1, Ordering::SeqCst);
if old_semaphore_value > 0 {
// We acquired the semaphore. Done.
return;
}
// We need to wait.
let validate = || true;
PARKING_SPOT.park(self.semaphore_addr(), validate, None);
}
pub fn up(&self) {
let old_semaphore_value = self.semaphore.fetch_add(1, Ordering::SeqCst);
// Check if anyone was waiting on the semaphore. If they were, then pass ownership to them.
if old_semaphore_value < 0 {
// We need to continue until we have actually unparked someone. It might be that
// the thread we want to pass ownership to has decremented the semaphore counter,
// but not yet parked.
loop {
match PARKING_SPOT.unpark(self.semaphore_addr(), 1) {
1 => break,
0 => (),
i => panic!("Should not wake up {i} threads"),
}
}
}
}
fn semaphore_addr(&self) -> u64 {
addr_of!(self.semaphore) as _
}
}
}
}

View File

@@ -12,7 +12,7 @@ use std::ptr::NonNull;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::u32;
pub use vm_host_func_context::VMHostFuncContext;
use wasmtime_environ::DefinedMemoryIndex;
use wasmtime_environ::{DefinedMemoryIndex, Trap};
pub const VMCONTEXT_MAGIC: u32 = u32::from_le_bytes(*b"core");
@@ -248,6 +248,30 @@ impl VMMemoryDefinition {
current_length: other.current_length().into(),
}
}
/// In the configurations where bounds checks were elided in JIT code (because
/// we are using static memories with virtual memory guard pages) this manual
/// check is here so we don't segfault from Rust. For other configurations,
/// these checks are required anyways.
pub fn validate_addr(
&self,
addr: u64,
access_size: u64,
access_alignment: u64,
) -> Result<*const u8, Trap> {
debug_assert!(access_alignment.is_power_of_two());
if !(addr % access_alignment == 0) {
return Err(Trap::HeapMisaligned);
}
let length = u64::try_from(self.current_length()).unwrap();
if !(addr.saturating_add(access_size) < length) {
return Err(Trap::MemoryOutOfBounds);
}
// SAFETY: checked above that the address is in bounds
Ok(unsafe { self.base.add(addr as usize) })
}
}
#[cfg(test)]