diff --git a/crates/runtime/src/instance.rs b/crates/runtime/src/instance.rs index e8d9ed929f..9e4f88ecb4 100644 --- a/crates/runtime/src/instance.rs +++ b/crates/runtime/src/instance.rs @@ -966,8 +966,8 @@ impl Instance { let def_ptr = self.memories[defined_memory_index] .as_shared_memory() .unwrap() - .vmmemory_ptr_mut(); - ptr::write(ptr, def_ptr); + .vmmemory_ptr(); + ptr::write(ptr, def_ptr.cast_mut()); } else { ptr::write(owned_ptr, self.memories[defined_memory_index].vmmemory()); ptr::write(ptr, owned_ptr); diff --git a/crates/runtime/src/lib.rs b/crates/runtime/src/lib.rs index 325e347401..2c1db36ad1 100644 --- a/crates/runtime/src/lib.rs +++ b/crates/runtime/src/lib.rs @@ -231,3 +231,17 @@ pub fn page_size() -> usize { unsafe { libc::sysconf(libc::_SC_PAGESIZE) as usize } } } + +/// Result of [`Memory::atomic_wait32`] and [`Memory::atomic_wait64`] +#[derive(Copy, Clone, PartialEq, Eq, Debug)] +pub enum WaitResult { + /// Indicates that a `wait` completed by being awoken by a different thread. + /// This means the thread went to sleep and didn't time out. + Ok = 0, + /// Indicates that `wait` did not complete and instead returned due to the + /// value in memory not matching the expected value. + Mismatch = 1, + /// Indicates that `wait` completed with a timeout, meaning that the + /// original value matched as expected but nothing ever called `notify`. + TimedOut = 2, +} diff --git a/crates/runtime/src/libcalls.rs b/crates/runtime/src/libcalls.rs index 384e7f190e..e6bebc25df 100644 --- a/crates/runtime/src/libcalls.rs +++ b/crates/runtime/src/libcalls.rs @@ -57,7 +57,7 @@ use crate::externref::VMExternRef; use crate::table::{Table, TableElementType}; use crate::vmcontext::{VMCallerCheckedAnyfunc, VMContext}; -use crate::{SharedMemory, TrapReason}; +use crate::TrapReason; use anyhow::Result; use std::mem; use std::ptr::{self, NonNull}; @@ -436,25 +436,12 @@ unsafe fn memory_atomic_notify( memory_index: u32, addr_index: u64, count: u32, -) -> Result { +) -> Result { let memory = MemoryIndex::from_u32(memory_index); let instance = (*vmctx).instance_mut(); instance - .get_memory(memory) - .validate_addr(addr_index, 4, 4)?; - - let shared_mem = instance.get_runtime_memory(memory).as_shared_memory(); - - if count == 0 { - return Ok(0); - } - - let unparked_threads = shared_mem.map_or(0, |shared_mem| { - // SAFETY: checked `addr_index` above - unsafe { shared_mem.unchecked_atomic_notify(addr_index, count) } - }); - - Ok(unparked_threads) + .get_runtime_memory(memory) + .atomic_notify(addr_index, count) } // Implementation of `memory.atomic.wait32` for locally defined memories. @@ -464,24 +451,14 @@ unsafe fn memory_atomic_wait32( addr_index: u64, expected: u32, timeout: u64, -) -> Result { +) -> Result { // convert timeout to Instant, before any wait happens on locking let timeout = (timeout as i64 >= 0).then(|| Instant::now() + Duration::from_nanos(timeout)); - let memory = MemoryIndex::from_u32(memory_index); let instance = (*vmctx).instance_mut(); - let addr = instance - .get_memory(memory) - .validate_addr(addr_index, 4, 4)?; - - let shared_mem: SharedMemory = instance + Ok(instance .get_runtime_memory(memory) - .as_shared_memory() - .ok_or(Trap::AtomicWaitNonSharedMemory)?; - - // SAFETY: checked `addr_index` above - let res = unsafe { shared_mem.unchecked_atomic_wait32(addr_index, addr, expected, timeout) }; - Ok(res) + .atomic_wait32(addr_index, expected, timeout)? as u32) } // Implementation of `memory.atomic.wait64` for locally defined memories. @@ -491,24 +468,14 @@ unsafe fn memory_atomic_wait64( addr_index: u64, expected: u64, timeout: u64, -) -> Result { +) -> Result { // convert timeout to Instant, before any wait happens on locking let timeout = (timeout as i64 >= 0).then(|| Instant::now() + Duration::from_nanos(timeout)); - let memory = MemoryIndex::from_u32(memory_index); let instance = (*vmctx).instance_mut(); - let addr = instance - .get_memory(memory) - .validate_addr(addr_index, 8, 8)?; - - let shared_mem: SharedMemory = instance + Ok(instance .get_runtime_memory(memory) - .as_shared_memory() - .ok_or(Trap::AtomicWaitNonSharedMemory)?; - - // SAFETY: checked `addr_index` above - let res = unsafe { shared_mem.unchecked_atomic_wait64(addr_index, addr, expected, timeout) }; - Ok(res) + .atomic_wait64(addr_index, expected, timeout)? as u32) } // Hook for when an instance runs out of fuel. diff --git a/crates/runtime/src/memory.rs b/crates/runtime/src/memory.rs index 5ba8bc52eb..3cbcd125da 100644 --- a/crates/runtime/src/memory.rs +++ b/crates/runtime/src/memory.rs @@ -3,11 +3,9 @@ //! `RuntimeLinearMemory` is to WebAssembly linear memories what `Table` is to WebAssembly tables. use crate::mmap::Mmap; -use crate::parking_spot::{ParkResult, ParkingSpot}; +use crate::parking_spot::ParkingSpot; use crate::vmcontext::VMMemoryDefinition; -use crate::MemoryImage; -use crate::MemoryImageSlot; -use crate::Store; +use crate::{MemoryImage, MemoryImageSlot, Store, WaitResult}; use anyhow::Error; use anyhow::{bail, format_err, Result}; use std::convert::TryFrom; @@ -152,8 +150,7 @@ pub trait RuntimeLinearMemory: Send + Sync { /// `RuntimeMemoryCreator::new_memory()`. fn needs_init(&self) -> bool; - /// For the pooling allocator, we must be able to downcast this trait to its - /// underlying structure. + /// Used for optional dynamic downcasting. fn as_any_mut(&mut self) -> &mut dyn std::any::Any; } @@ -436,6 +433,13 @@ impl RuntimeLinearMemory for StaticMemory { #[derive(Clone)] pub struct SharedMemory(Arc); +struct SharedMemoryInner { + memory: RwLock>, + spot: ParkingSpot, + ty: wasmtime_environ::Memory, + def: LongTermVMMemoryDefinition, +} + impl SharedMemory { /// Construct a new [`SharedMemory`]. pub fn new(plan: MemoryPlan) -> Result { @@ -460,15 +464,17 @@ impl SharedMemory { memory.as_any_mut().type_id() != std::any::TypeId::of::(), "cannot re-wrap a shared memory" ); - let def = LongTermVMMemoryDefinition(memory.vmmemory()); - let spot = ParkingSpot::default(); - let inner = RwLock::new(SharedMemoryInnerRwLocked { memory, ty, def }); - Ok(Self(Arc::new(SharedMemoryInner { spot, lock: inner }))) + Ok(Self(Arc::new(SharedMemoryInner { + ty, + spot: ParkingSpot::default(), + def: LongTermVMMemoryDefinition(memory.vmmemory()), + memory: RwLock::new(memory), + }))) } /// Return the memory type for this [`SharedMemory`]. pub fn ty(&self) -> wasmtime_environ::Memory { - self.0.lock.read().unwrap().ty + self.0.ty } /// Convert this shared memory into a [`Memory`]. @@ -476,151 +482,19 @@ impl SharedMemory { Memory(Box::new(self)) } - /// Return a mutable pointer to the shared memory's [VMMemoryDefinition]. - pub fn vmmemory_ptr_mut(&mut self) -> *mut VMMemoryDefinition { - &self.0.lock.read().unwrap().def.0 as *const _ as *mut _ - } - /// Return a pointer to the shared memory's [VMMemoryDefinition]. pub fn vmmemory_ptr(&self) -> *const VMMemoryDefinition { - &self.0.lock.read().unwrap().def.0 as *const _ + &self.0.def.0 } - /// Implementation of `memory.atomic.notify` for this shared memory. - pub fn atomic_notify(&self, addr_index: u64, count: u32) -> Result { - let definition = &self.0.lock.read().unwrap().def.0; - definition.validate_addr(addr_index, 4, 4)?; - // SAFETY: checked `addr_index` above - Ok(unsafe { self.unchecked_atomic_notify(addr_index, count) }) - } - - /// Unchecked implementation of `memory.atomic.notify` for this shared memory. - /// - /// # Safety - /// The caller must ensure that `addr_index` is a valid address in the memory. - pub unsafe fn unchecked_atomic_notify(&self, addr_index: u64, count: u32) -> u32 { - if count == 0 { - return 0; - } - self.0.spot.unpark(addr_index, count) - } - - /// Implementation of `memory.atomic.wait32` for this shared memory. - pub fn atomic_wait32( + /// Same as `RuntimeLinearMemory::grow`, except with `&self`. + pub fn grow( &self, - addr_index: u64, - expected: u32, - timeout: Option, - ) -> Result { - let definition = &self.0.lock.read().unwrap().def.0; - let addr = definition.validate_addr(addr_index, 4, 4)?; - // SAFETY: checked `addr` above - Ok(unsafe { self.unchecked_atomic_wait32(addr_index, addr, expected, timeout) }) - } - - /// Unchecked implementation of `memory.atomic.wait32` for this shared memory. - /// - /// # Safety - /// The caller must ensure that `addr` is a valid address in the memory. - pub unsafe fn unchecked_atomic_wait32( - &self, - addr_index: u64, - addr: *const u8, - expected: u32, - timeout: Option, - ) -> u32 { - // SAFETY: `addr_index` was validated by `validate_addr` above. - let atomic = unsafe { &*(addr as *const AtomicU32) }; - - // We want the sequential consistency of `SeqCst` to ensure that the `load` sees the value that the `notify` will/would see. - // All WASM atomic operations are also `SeqCst`. - let validate = || atomic.load(Ordering::SeqCst) == expected; - - match self.0.spot.park(addr_index, validate, timeout) { - ParkResult::Unparked => 0, - ParkResult::Invalid => 1, - ParkResult::TimedOut => 2, - } - } - - /// Implementation of `memory.atomic.wait64` for this shared memory. - pub fn atomic_wait64( - &self, - addr_index: u64, - expected: u64, - timeout: Option, - ) -> Result { - let definition = &self.0.lock.read().unwrap().def.0; - let addr = definition.validate_addr(addr_index, 8, 8)?; - // SAFETY: checked `addr` above - Ok(unsafe { self.unchecked_atomic_wait64(addr_index, addr, expected, timeout) }) - } - /// Unchecked implementation of `memory.atomic.wait64` for this shared memory. - /// - /// # Safety - /// The caller must ensure that `addr` is a valid address in the memory. - pub unsafe fn unchecked_atomic_wait64( - &self, - addr_index: u64, - addr: *const u8, - expected: u64, - timeout: Option, - ) -> u32 { - // SAFETY: `addr_index` was validated by `validate_addr` above. - let atomic = unsafe { &*(addr as *const AtomicU64) }; - - // We want the sequential consistency of `SeqCst` to ensure that the `load` sees the value that the `notify` will/would see. - // All WASM atomic operations are also `SeqCst`. - let validate = || atomic.load(Ordering::SeqCst) == expected; - - match self.0.spot.park(addr_index, validate, timeout) { - ParkResult::Unparked => 0, - ParkResult::Invalid => 1, - ParkResult::TimedOut => 2, - } - } -} - -struct SharedMemoryInnerRwLocked { - memory: Box, - ty: wasmtime_environ::Memory, - def: LongTermVMMemoryDefinition, -} - -struct SharedMemoryInner { - lock: RwLock, - spot: ParkingSpot, -} - -/// Shared memory needs some representation of a `VMMemoryDefinition` for -/// JIT-generated code to access. This structure owns the base pointer and -/// length to the actual memory and we share this definition across threads by: -/// - never changing the base pointer; according to the specification, shared -/// memory must be created with a known maximum size so it can be allocated -/// once and never moved -/// - carefully changing the length, using atomic accesses in both the runtime -/// and JIT-generated code. -struct LongTermVMMemoryDefinition(VMMemoryDefinition); -unsafe impl Send for LongTermVMMemoryDefinition {} -unsafe impl Sync for LongTermVMMemoryDefinition {} - -/// Proxy all calls through the [`RwLock`]. -impl RuntimeLinearMemory for SharedMemory { - fn byte_size(&self) -> usize { - self.0.lock.read().unwrap().memory.byte_size() - } - - fn maximum_byte_size(&self) -> Option { - self.0.lock.read().unwrap().memory.maximum_byte_size() - } - - fn grow( - &mut self, delta_pages: u64, store: Option<&mut dyn Store>, ) -> Result, Error> { - let mut inner = self.0.lock.write().unwrap(); - let result = inner.memory.grow(delta_pages, store)?; + let mut memory = self.0.memory.write().unwrap(); + let result = memory.grow(delta_pages, store)?; if let Some((_old_size_in_bytes, new_size_in_bytes)) = result { // Store the new size to the `VMMemoryDefinition` for JIT-generated // code (and runtime functions) to access. No other code can be @@ -641,7 +515,7 @@ impl RuntimeLinearMemory for SharedMemory { // https://github.com/WebAssembly/threads/issues/26#issuecomment-433930711). // In other words, some non-determinism is acceptable when using // `memory.size` on work being done by `memory.grow`. - inner + self.0 .def .0 .current_length @@ -650,8 +524,85 @@ impl RuntimeLinearMemory for SharedMemory { Ok(result) } + /// Implementation of `memory.atomic.notify` for this shared memory. + pub fn atomic_notify(&self, addr_index: u64, count: u32) -> Result { + validate_atomic_addr(&self.0.def.0, addr_index, 4, 4)?; + Ok(self.0.spot.unpark(addr_index, count)) + } + + /// Implementation of `memory.atomic.wait32` for this shared memory. + pub fn atomic_wait32( + &self, + addr_index: u64, + expected: u32, + timeout: Option, + ) -> Result { + let addr = validate_atomic_addr(&self.0.def.0, addr_index, 4, 4)?; + // SAFETY: `addr_index` was validated by `validate_atomic_addr` above. + assert!(std::mem::size_of::() == 4); + assert!(std::mem::align_of::() <= 4); + let atomic = unsafe { &*(addr as *const AtomicU32) }; + + // We want the sequential consistency of `SeqCst` to ensure that the `load` sees the value that the `notify` will/would see. + // All WASM atomic operations are also `SeqCst`. + let validate = || atomic.load(Ordering::SeqCst) == expected; + + Ok(self.0.spot.park(addr_index, validate, timeout)) + } + + /// Implementation of `memory.atomic.wait64` for this shared memory. + pub fn atomic_wait64( + &self, + addr_index: u64, + expected: u64, + timeout: Option, + ) -> Result { + let addr = validate_atomic_addr(&self.0.def.0, addr_index, 8, 8)?; + // SAFETY: `addr_index` was validated by `validate_atomic_addr` above. + assert!(std::mem::size_of::() == 8); + assert!(std::mem::align_of::() <= 8); + let atomic = unsafe { &*(addr as *const AtomicU64) }; + + // We want the sequential consistency of `SeqCst` to ensure that the `load` sees the value that the `notify` will/would see. + // All WASM atomic operations are also `SeqCst`. + let validate = || atomic.load(Ordering::SeqCst) == expected; + + Ok(self.0.spot.park(addr_index, validate, timeout)) + } +} + +/// Shared memory needs some representation of a `VMMemoryDefinition` for +/// JIT-generated code to access. This structure owns the base pointer and +/// length to the actual memory and we share this definition across threads by: +/// - never changing the base pointer; according to the specification, shared +/// memory must be created with a known maximum size so it can be allocated +/// once and never moved +/// - carefully changing the length, using atomic accesses in both the runtime +/// and JIT-generated code. +struct LongTermVMMemoryDefinition(VMMemoryDefinition); +unsafe impl Send for LongTermVMMemoryDefinition {} +unsafe impl Sync for LongTermVMMemoryDefinition {} + +/// Proxy all calls through the [`RwLock`]. +impl RuntimeLinearMemory for SharedMemory { + fn byte_size(&self) -> usize { + self.0.memory.read().unwrap().byte_size() + } + + fn maximum_byte_size(&self) -> Option { + self.0.memory.read().unwrap().maximum_byte_size() + } + + fn grow( + &mut self, + delta_pages: u64, + store: Option<&mut dyn Store>, + ) -> Result, Error> { + SharedMemory::grow(self, delta_pages, store) + } + fn grow_to(&mut self, size: usize) -> Result<()> { - self.0.lock.write().unwrap().memory.grow_to(size) + self.0.memory.write().unwrap().grow_to(size) } fn vmmemory(&mut self) -> VMMemoryDefinition { @@ -663,7 +614,7 @@ impl RuntimeLinearMemory for SharedMemory { } fn needs_init(&self) -> bool { - self.0.lock.read().unwrap().memory.needs_init() + self.0.memory.read().unwrap().needs_init() } fn as_any_mut(&mut self) -> &mut dyn std::any::Any { @@ -872,12 +823,78 @@ impl Memory { /// If the [Memory] is a [SharedMemory], unwrap it and return a clone to /// that shared memory. - pub fn as_shared_memory(&mut self) -> Option { + pub fn as_shared_memory(&mut self) -> Option<&mut SharedMemory> { let as_any = self.0.as_any_mut(); if let Some(m) = as_any.downcast_mut::() { - Some(m.clone()) + Some(m) } else { None } } + + /// Implementation of `memory.atomic.notify` for all memories. + pub fn atomic_notify(&mut self, addr: u64, count: u32) -> Result { + match self.0.as_any_mut().downcast_mut::() { + Some(m) => m.atomic_notify(addr, count), + None => { + validate_atomic_addr(&self.vmmemory(), addr, 4, 4)?; + Ok(0) + } + } + } + + /// Implementation of `memory.atomic.wait32` for all memories. + pub fn atomic_wait32( + &mut self, + addr: u64, + expected: u32, + deadline: Option, + ) -> Result { + match self.0.as_any_mut().downcast_mut::() { + Some(m) => m.atomic_wait32(addr, expected, deadline), + None => { + validate_atomic_addr(&self.vmmemory(), addr, 4, 4)?; + Err(Trap::AtomicWaitNonSharedMemory) + } + } + } + + /// Implementation of `memory.atomic.wait64` for all memories. + pub fn atomic_wait64( + &mut self, + addr: u64, + expected: u64, + deadline: Option, + ) -> Result { + match self.0.as_any_mut().downcast_mut::() { + Some(m) => m.atomic_wait64(addr, expected, deadline), + None => { + validate_atomic_addr(&self.vmmemory(), addr, 8, 8)?; + Err(Trap::AtomicWaitNonSharedMemory) + } + } + } +} + +/// In the configurations where bounds checks were elided in JIT code (because +/// we are using static memories with virtual memory guard pages) this manual +/// check is here so we don't segfault from Rust. For other configurations, +/// these checks are required anyways. +fn validate_atomic_addr( + def: &VMMemoryDefinition, + addr: u64, + access_size: u64, + access_alignment: u64, +) -> Result<*mut u8, Trap> { + debug_assert!(access_alignment.is_power_of_two()); + if !(addr % access_alignment == 0) { + return Err(Trap::HeapMisaligned); + } + + let length = u64::try_from(def.current_length()).unwrap(); + if !(addr.saturating_add(access_size) < length) { + return Err(Trap::MemoryOutOfBounds); + } + + Ok(def.base.wrapping_add(addr as usize)) } diff --git a/crates/runtime/src/parking_spot.rs b/crates/runtime/src/parking_spot.rs index 3b87bd636b..dee1019b22 100644 --- a/crates/runtime/src/parking_spot.rs +++ b/crates/runtime/src/parking_spot.rs @@ -14,23 +14,11 @@ #![deny(missing_docs)] #![deny(unsafe_code)] +use crate::WaitResult; use std::collections::BTreeMap; use std::sync::{Arc, Condvar, Mutex}; use std::time::Instant; -/// Result of a park operation. -#[derive(Copy, Clone, Eq, PartialEq, Debug)] -pub enum ParkResult { - /// Unparked by another thread. - Unparked, - - /// The validation callback returned false. - Invalid, - - /// The timeout expired. - TimedOut, -} - #[derive(Default, Debug)] struct Spot { /// The number of threads parked on this spot. @@ -57,7 +45,7 @@ impl ParkingSpot { /// `unpark_all` or `unpark` with the same key, the current thread will be unparked. /// /// The `validate` callback is called before parking. - /// If it returns `false`, the thread is not parked and `ParkResult::Invalid` is returned. + /// If it returns `false`, the thread is not parked and `WaitResult::Mismatch` is returned. /// /// The `timeout` argument specifies the maximum amount of time the thread will be parked. pub fn park( @@ -65,7 +53,7 @@ impl ParkingSpot { key: u64, validate: impl FnOnce() -> bool, timeout: impl Into>, - ) -> ParkResult { + ) -> WaitResult { self.park_inner(key, validate, timeout.into()) } @@ -74,7 +62,7 @@ impl ParkingSpot { key: u64, validate: impl FnOnce() -> bool, timeout: Option, - ) -> ParkResult { + ) -> WaitResult { let mut inner = self .inner .lock() @@ -82,7 +70,7 @@ impl ParkingSpot { // check validation with lock held if !validate() { - return ParkResult::Invalid; + return WaitResult::Mismatch; } // clone the condvar, so we can move the lock @@ -143,10 +131,10 @@ impl ParkingSpot { } if timed_out { - return ParkResult::TimedOut; + return WaitResult::TimedOut; } - return ParkResult::Unparked; + return WaitResult::Ok; } } @@ -154,6 +142,9 @@ impl ParkingSpot { /// /// Returns the number of threads that were actually unparked. pub fn unpark(&self, key: u64, n: u32) -> u32 { + if n == 0 { + return 0; + } let mut num_unpark = 0; self.with_lot(key, |spot| { diff --git a/crates/runtime/src/vmcontext.rs b/crates/runtime/src/vmcontext.rs index f954d70e0d..ae15342ea4 100644 --- a/crates/runtime/src/vmcontext.rs +++ b/crates/runtime/src/vmcontext.rs @@ -12,7 +12,7 @@ use std::ptr::NonNull; use std::sync::atomic::{AtomicUsize, Ordering}; use std::u32; pub use vm_host_func_context::VMHostFuncContext; -use wasmtime_environ::{DefinedMemoryIndex, Trap}; +use wasmtime_environ::DefinedMemoryIndex; pub const VMCONTEXT_MAGIC: u32 = u32::from_le_bytes(*b"core"); @@ -248,30 +248,6 @@ impl VMMemoryDefinition { current_length: other.current_length().into(), } } - - /// In the configurations where bounds checks were elided in JIT code (because - /// we are using static memories with virtual memory guard pages) this manual - /// check is here so we don't segfault from Rust. For other configurations, - /// these checks are required anyways. - pub fn validate_addr( - &self, - addr: u64, - access_size: u64, - access_alignment: u64, - ) -> Result<*const u8, Trap> { - debug_assert!(access_alignment.is_power_of_two()); - if !(addr % access_alignment == 0) { - return Err(Trap::HeapMisaligned); - } - - let length = u64::try_from(self.current_length()).unwrap(); - if !(addr.saturating_add(access_size) < length) { - return Err(Trap::MemoryOutOfBounds); - } - - // SAFETY: checked above that the address is in bounds - Ok(unsafe { self.base.add(addr as usize) }) - } } #[cfg(test)] diff --git a/crates/wasmtime/src/memory.rs b/crates/wasmtime/src/memory.rs index 4d5cc788b2..61d5ef3a2d 100644 --- a/crates/wasmtime/src/memory.rs +++ b/crates/wasmtime/src/memory.rs @@ -1,13 +1,17 @@ use crate::store::{StoreData, StoreOpaque, Stored}; use crate::trampoline::generate_memory_export; +use crate::Trap; use crate::{AsContext, AsContextMut, Engine, MemoryType, StoreContext, StoreContextMut}; use anyhow::{bail, Result}; use std::cell::UnsafeCell; use std::convert::TryFrom; use std::slice; +use std::time::Instant; use wasmtime_environ::MemoryPlan; use wasmtime_runtime::{RuntimeLinearMemory, VMMemoryImport}; +pub use wasmtime_runtime::WaitResult; + /// Error for out of bounds [`Memory`] access. #[derive(Debug)] #[non_exhaustive] @@ -780,7 +784,7 @@ impl SharedMemory { /// the maximum limits of this memory. A /// [`ResourceLimiter`](crate::ResourceLimiter) is another example of /// preventing a memory to grow. - pub fn grow(&mut self, delta: u64) -> Result { + pub fn grow(&self, delta: u64) -> Result { match self.0.grow(delta, None)? { Some((old_size, _new_size)) => { // For shared memory, the `VMMemoryDefinition` is updated inside @@ -791,6 +795,87 @@ impl SharedMemory { } } + /// Equivalent of the WebAssembly `memory.atomic.notify` instruction for + /// this shared memory. + /// + /// This method allows embedders to notify threads blocked on the specified + /// `addr`, an index into wasm linear memory. Threads could include + /// wasm threads blocked on a `memory.atomic.wait*` instruction or embedder + /// threads blocked on [`SharedMemory::atomic_wait32`], for example. + /// + /// The `count` argument is the number of threads to wake up. + /// + /// This function returns the number of threads awoken. + /// + /// # Errors + /// + /// This function will return an error if `addr` is not within bounds or + /// not aligned to a 4-byte boundary. + pub fn atomic_notify(&self, addr: u64, count: u32) -> Result { + self.0.atomic_notify(addr, count) + } + + /// Equivalent of the WebAssembly `memory.atomic.wait32` instruction for + /// this shared memory. + /// + /// This method allows embedders to block the current thread until notified + /// via the `memory.atomic.notify` instruction or the + /// [`SharedMemory::atomic_notify`] method, enabling synchronization with + /// the wasm guest as desired. + /// + /// The `expected` argument is the expected 32-bit value to be stored at + /// the byte address `addr` specified. The `addr` specified is an index + /// into this linear memory. + /// + /// The optional `timeout` argument is the point in time after which the + /// calling thread is guaranteed to be woken up. Blocking will not occur + /// past this point. + /// + /// This function returns one of three possible values: + /// + /// * `WaitResult::Ok` - this function, loaded the value at `addr`, found + /// it was equal to `expected`, and then blocked (all as one atomic + /// operation). The thread was then awoken with a `memory.atomic.notify` + /// instruction or the [`SharedMemory::atomic_notify`] method. + /// * `WaitResult::Mismatch` - the value at `addr` was loaded but was not + /// equal to `expected` so the thread did not block and immediately + /// returned. + /// * `WaitResult::TimedOut` - all the steps of `Ok` happened, except this + /// thread was woken up due to a timeout. + /// + /// This function will not return due to spurious wakeups. + /// + /// # Errors + /// + /// This function will return an error if `addr` is not within bounds or + /// not aligned to a 4-byte boundary. + pub fn atomic_wait32( + &self, + addr: u64, + expected: u32, + timeout: Option, + ) -> Result { + self.0.atomic_wait32(addr, expected, timeout) + } + + /// Equivalent of the WebAssembly `memory.atomic.wait64` instruction for + /// this shared memory. + /// + /// For more information see [`SharedMemory::atomic_wait32`]. + /// + /// # Errors + /// + /// Returns the same error as [`SharedMemory::atomic_wait32`] except that + /// the specified address must be 8-byte aligned instead of 4-byte aligned. + pub fn atomic_wait64( + &self, + addr: u64, + expected: u64, + timeout: Option, + ) -> Result { + self.0.atomic_wait64(addr, expected, timeout) + } + /// Return a reference to the [`Engine`] used to configure the shared /// memory. pub(crate) fn engine(&self) -> &Engine { @@ -824,7 +909,8 @@ impl SharedMemory { .unwrap(); let shared_memory = memory .as_shared_memory() - .expect("unable to convert from a shared memory"); + .expect("unable to convert from a shared memory") + .clone(); Self(shared_memory, store.engine().clone()) } } diff --git a/tests/all/memory.rs b/tests/all/memory.rs index 281a2e7ab8..2f7663db93 100644 --- a/tests/all/memory.rs +++ b/tests/all/memory.rs @@ -1,5 +1,7 @@ use anyhow::Result; use rayon::prelude::*; +use std::sync::atomic::{AtomicU32, Ordering::SeqCst}; +use std::time::{Duration, Instant}; use wasmtime::*; fn module(engine: &Engine) -> Result { @@ -462,3 +464,105 @@ fn memory64_maximum_minimum() -> Result<()> { Ok(()) } + +#[test] +fn shared_memory_basics() -> Result<()> { + let engine = Engine::default(); + assert!(SharedMemory::new(&engine, MemoryType::new(1, None)).is_err()); + assert!(SharedMemory::new(&engine, MemoryType::new(1, Some(1))).is_err()); + assert!(SharedMemory::new(&engine, MemoryType::new64(1, None)).is_err()); + assert!(SharedMemory::new(&engine, MemoryType::new64(1, Some(1))).is_err()); + assert!(SharedMemory::new(&engine, MemoryType::shared(1, 0)).is_err()); + + let memory = SharedMemory::new(&engine, MemoryType::shared(1, 1))?; + assert!(memory.ty().is_shared()); + assert_eq!(memory.ty().minimum(), 1); + assert_eq!(memory.ty().maximum(), Some(1)); + assert_eq!(memory.size(), 1); + assert_eq!(memory.data_size(), 65536); + assert_eq!(memory.data().len(), 65536); + assert!(memory.grow(1).is_err()); + + // misaligned + assert_eq!(memory.atomic_notify(1, 100), Err(Trap::HeapMisaligned)); + assert_eq!( + memory.atomic_wait32(1, 100, None), + Err(Trap::HeapMisaligned) + ); + assert_eq!( + memory.atomic_wait64(1, 100, None), + Err(Trap::HeapMisaligned) + ); + + // oob + assert_eq!( + memory.atomic_notify(1 << 20, 100), + Err(Trap::MemoryOutOfBounds) + ); + assert_eq!( + memory.atomic_wait32(1 << 20, 100, None), + Err(Trap::MemoryOutOfBounds) + ); + assert_eq!( + memory.atomic_wait64(1 << 20, 100, None), + Err(Trap::MemoryOutOfBounds) + ); + + // ok + assert_eq!(memory.atomic_notify(8, 100), Ok(0)); + assert_eq!(memory.atomic_wait32(8, 1, None), Ok(WaitResult::Mismatch)); + assert_eq!(memory.atomic_wait64(8, 1, None), Ok(WaitResult::Mismatch)); + + // timeout + let near_future = Instant::now() + Duration::new(0, 100); + assert_eq!( + memory.atomic_wait32(8, 0, Some(near_future)), + Ok(WaitResult::TimedOut) + ); + assert_eq!( + memory.atomic_wait64(8, 0, Some(near_future)), + Ok(WaitResult::TimedOut) + ); + + Ok(()) +} + +#[test] +fn shared_memory_wait_notify() -> Result<()> { + const THREADS: usize = 8; + const COUNT: usize = 100_000; + + let engine = Engine::default(); + let memory = SharedMemory::new(&engine, MemoryType::shared(1, 1))?; + let data = unsafe { &*(memory.data().as_ptr() as *const AtomicU32) }; + let locked = unsafe { &*(memory.data().as_ptr().add(4) as *const AtomicU32) }; + + // Note that `SeqCst` is used here to not think much about the orderings + // here, and it also somewhat more closely mirrors what's happening in wasm. + let lock = || { + while locked.swap(1, SeqCst) == 1 { + memory.atomic_wait32(0, 1, None).unwrap(); + } + }; + let unlock = || { + locked.store(0, SeqCst); + memory.atomic_notify(0, 1).unwrap(); + }; + + std::thread::scope(|s| { + for _ in 0..THREADS { + s.spawn(|| { + for _ in 0..COUNT { + lock(); + let next = data.load(SeqCst) + 1; + data.store(next, SeqCst); + unlock(); + } + }); + } + }); + + assert_eq!(data.load(SeqCst), (THREADS * COUNT) as u32); + + Ok(()) +} diff --git a/tests/all/threads.rs b/tests/all/threads.rs index fe497beec0..30ca86bdd3 100644 --- a/tests/all/threads.rs +++ b/tests/all/threads.rs @@ -107,7 +107,7 @@ fn test_probe_shared_memory_size() -> Result<()> { let mut store = Store::new(&engine, ()); let instance = Instance::new(&mut store, &module, &[])?; let size_fn = instance.get_typed_func::<(), i32>(&mut store, "size")?; - let mut shared_memory = instance.get_shared_memory(&mut store, "memory").unwrap(); + let shared_memory = instance.get_shared_memory(&mut store, "memory").unwrap(); assert_eq!(size_fn.call(&mut store, ())?, 1); assert_eq!(shared_memory.size(), 1); @@ -244,7 +244,7 @@ fn test_memory_size_accessibility() -> Result<()> { let shared_memory = SharedMemory::new(&engine, MemoryType::shared(1, NUM_GROW_OPS as u32))?; let done = Arc::new(AtomicBool::new(false)); - let mut grow_memory = shared_memory.clone(); + let grow_memory = shared_memory.clone(); let grow_thread = std::thread::spawn(move || { for i in 0..NUM_GROW_OPS { if grow_memory.grow(1).is_err() {