Refactor shared memory internals, expose embedder methods (#5311)

This commit refactors the internals of `wasmtime_runtime::SharedMemory`
a bit to expose the necessary functions to invoke from the
`wasmtime::SharedMemory` layer. Notably some items are moved out of the
`RwLock` from prior, such as the type and the `VMMemoryDefinition`.
Additionally the organization around the `atomic_*` methods has been
redone to ensure that the `wasmtime`-layer abstraction has a single
method to call into which everything else uses as well.
This commit is contained in:
Alex Crichton
2022-11-22 10:51:55 -06:00
committed by GitHub
parent 0a2a0444b3
commit 6ce2ac19b8
9 changed files with 401 additions and 246 deletions

View File

@@ -966,8 +966,8 @@ impl Instance {
let def_ptr = self.memories[defined_memory_index]
.as_shared_memory()
.unwrap()
.vmmemory_ptr_mut();
ptr::write(ptr, def_ptr);
.vmmemory_ptr();
ptr::write(ptr, def_ptr.cast_mut());
} else {
ptr::write(owned_ptr, self.memories[defined_memory_index].vmmemory());
ptr::write(ptr, owned_ptr);

View File

@@ -231,3 +231,17 @@ pub fn page_size() -> usize {
unsafe { libc::sysconf(libc::_SC_PAGESIZE) as usize }
}
}
/// Result of [`Memory::atomic_wait32`] and [`Memory::atomic_wait64`]
#[derive(Copy, Clone, PartialEq, Eq, Debug)]
pub enum WaitResult {
/// Indicates that a `wait` completed by being awoken by a different thread.
/// This means the thread went to sleep and didn't time out.
Ok = 0,
/// Indicates that `wait` did not complete and instead returned due to the
/// value in memory not matching the expected value.
Mismatch = 1,
/// Indicates that `wait` completed with a timeout, meaning that the
/// original value matched as expected but nothing ever called `notify`.
TimedOut = 2,
}

View File

@@ -57,7 +57,7 @@
use crate::externref::VMExternRef;
use crate::table::{Table, TableElementType};
use crate::vmcontext::{VMCallerCheckedAnyfunc, VMContext};
use crate::{SharedMemory, TrapReason};
use crate::TrapReason;
use anyhow::Result;
use std::mem;
use std::ptr::{self, NonNull};
@@ -436,25 +436,12 @@ unsafe fn memory_atomic_notify(
memory_index: u32,
addr_index: u64,
count: u32,
) -> Result<u32, TrapReason> {
) -> Result<u32, Trap> {
let memory = MemoryIndex::from_u32(memory_index);
let instance = (*vmctx).instance_mut();
instance
.get_memory(memory)
.validate_addr(addr_index, 4, 4)?;
let shared_mem = instance.get_runtime_memory(memory).as_shared_memory();
if count == 0 {
return Ok(0);
}
let unparked_threads = shared_mem.map_or(0, |shared_mem| {
// SAFETY: checked `addr_index` above
unsafe { shared_mem.unchecked_atomic_notify(addr_index, count) }
});
Ok(unparked_threads)
.get_runtime_memory(memory)
.atomic_notify(addr_index, count)
}
// Implementation of `memory.atomic.wait32` for locally defined memories.
@@ -464,24 +451,14 @@ unsafe fn memory_atomic_wait32(
addr_index: u64,
expected: u32,
timeout: u64,
) -> Result<u32, TrapReason> {
) -> Result<u32, Trap> {
// convert timeout to Instant, before any wait happens on locking
let timeout = (timeout as i64 >= 0).then(|| Instant::now() + Duration::from_nanos(timeout));
let memory = MemoryIndex::from_u32(memory_index);
let instance = (*vmctx).instance_mut();
let addr = instance
.get_memory(memory)
.validate_addr(addr_index, 4, 4)?;
let shared_mem: SharedMemory = instance
Ok(instance
.get_runtime_memory(memory)
.as_shared_memory()
.ok_or(Trap::AtomicWaitNonSharedMemory)?;
// SAFETY: checked `addr_index` above
let res = unsafe { shared_mem.unchecked_atomic_wait32(addr_index, addr, expected, timeout) };
Ok(res)
.atomic_wait32(addr_index, expected, timeout)? as u32)
}
// Implementation of `memory.atomic.wait64` for locally defined memories.
@@ -491,24 +468,14 @@ unsafe fn memory_atomic_wait64(
addr_index: u64,
expected: u64,
timeout: u64,
) -> Result<u32, TrapReason> {
) -> Result<u32, Trap> {
// convert timeout to Instant, before any wait happens on locking
let timeout = (timeout as i64 >= 0).then(|| Instant::now() + Duration::from_nanos(timeout));
let memory = MemoryIndex::from_u32(memory_index);
let instance = (*vmctx).instance_mut();
let addr = instance
.get_memory(memory)
.validate_addr(addr_index, 8, 8)?;
let shared_mem: SharedMemory = instance
Ok(instance
.get_runtime_memory(memory)
.as_shared_memory()
.ok_or(Trap::AtomicWaitNonSharedMemory)?;
// SAFETY: checked `addr_index` above
let res = unsafe { shared_mem.unchecked_atomic_wait64(addr_index, addr, expected, timeout) };
Ok(res)
.atomic_wait64(addr_index, expected, timeout)? as u32)
}
// Hook for when an instance runs out of fuel.

View File

@@ -3,11 +3,9 @@
//! `RuntimeLinearMemory` is to WebAssembly linear memories what `Table` is to WebAssembly tables.
use crate::mmap::Mmap;
use crate::parking_spot::{ParkResult, ParkingSpot};
use crate::parking_spot::ParkingSpot;
use crate::vmcontext::VMMemoryDefinition;
use crate::MemoryImage;
use crate::MemoryImageSlot;
use crate::Store;
use crate::{MemoryImage, MemoryImageSlot, Store, WaitResult};
use anyhow::Error;
use anyhow::{bail, format_err, Result};
use std::convert::TryFrom;
@@ -152,8 +150,7 @@ pub trait RuntimeLinearMemory: Send + Sync {
/// `RuntimeMemoryCreator::new_memory()`.
fn needs_init(&self) -> bool;
/// For the pooling allocator, we must be able to downcast this trait to its
/// underlying structure.
/// Used for optional dynamic downcasting.
fn as_any_mut(&mut self) -> &mut dyn std::any::Any;
}
@@ -436,6 +433,13 @@ impl RuntimeLinearMemory for StaticMemory {
#[derive(Clone)]
pub struct SharedMemory(Arc<SharedMemoryInner>);
struct SharedMemoryInner {
memory: RwLock<Box<dyn RuntimeLinearMemory>>,
spot: ParkingSpot,
ty: wasmtime_environ::Memory,
def: LongTermVMMemoryDefinition,
}
impl SharedMemory {
/// Construct a new [`SharedMemory`].
pub fn new(plan: MemoryPlan) -> Result<Self> {
@@ -460,15 +464,17 @@ impl SharedMemory {
memory.as_any_mut().type_id() != std::any::TypeId::of::<SharedMemory>(),
"cannot re-wrap a shared memory"
);
let def = LongTermVMMemoryDefinition(memory.vmmemory());
let spot = ParkingSpot::default();
let inner = RwLock::new(SharedMemoryInnerRwLocked { memory, ty, def });
Ok(Self(Arc::new(SharedMemoryInner { spot, lock: inner })))
Ok(Self(Arc::new(SharedMemoryInner {
ty,
spot: ParkingSpot::default(),
def: LongTermVMMemoryDefinition(memory.vmmemory()),
memory: RwLock::new(memory),
})))
}
/// Return the memory type for this [`SharedMemory`].
pub fn ty(&self) -> wasmtime_environ::Memory {
self.0.lock.read().unwrap().ty
self.0.ty
}
/// Convert this shared memory into a [`Memory`].
@@ -476,151 +482,19 @@ impl SharedMemory {
Memory(Box::new(self))
}
/// Return a mutable pointer to the shared memory's [VMMemoryDefinition].
pub fn vmmemory_ptr_mut(&mut self) -> *mut VMMemoryDefinition {
&self.0.lock.read().unwrap().def.0 as *const _ as *mut _
}
/// Return a pointer to the shared memory's [VMMemoryDefinition].
pub fn vmmemory_ptr(&self) -> *const VMMemoryDefinition {
&self.0.lock.read().unwrap().def.0 as *const _
&self.0.def.0
}
/// Implementation of `memory.atomic.notify` for this shared memory.
pub fn atomic_notify(&self, addr_index: u64, count: u32) -> Result<u32, Trap> {
let definition = &self.0.lock.read().unwrap().def.0;
definition.validate_addr(addr_index, 4, 4)?;
// SAFETY: checked `addr_index` above
Ok(unsafe { self.unchecked_atomic_notify(addr_index, count) })
}
/// Unchecked implementation of `memory.atomic.notify` for this shared memory.
///
/// # Safety
/// The caller must ensure that `addr_index` is a valid address in the memory.
pub unsafe fn unchecked_atomic_notify(&self, addr_index: u64, count: u32) -> u32 {
if count == 0 {
return 0;
}
self.0.spot.unpark(addr_index, count)
}
/// Implementation of `memory.atomic.wait32` for this shared memory.
pub fn atomic_wait32(
/// Same as `RuntimeLinearMemory::grow`, except with `&self`.
pub fn grow(
&self,
addr_index: u64,
expected: u32,
timeout: Option<Instant>,
) -> Result<u32, Trap> {
let definition = &self.0.lock.read().unwrap().def.0;
let addr = definition.validate_addr(addr_index, 4, 4)?;
// SAFETY: checked `addr` above
Ok(unsafe { self.unchecked_atomic_wait32(addr_index, addr, expected, timeout) })
}
/// Unchecked implementation of `memory.atomic.wait32` for this shared memory.
///
/// # Safety
/// The caller must ensure that `addr` is a valid address in the memory.
pub unsafe fn unchecked_atomic_wait32(
&self,
addr_index: u64,
addr: *const u8,
expected: u32,
timeout: Option<Instant>,
) -> u32 {
// SAFETY: `addr_index` was validated by `validate_addr` above.
let atomic = unsafe { &*(addr as *const AtomicU32) };
// We want the sequential consistency of `SeqCst` to ensure that the `load` sees the value that the `notify` will/would see.
// All WASM atomic operations are also `SeqCst`.
let validate = || atomic.load(Ordering::SeqCst) == expected;
match self.0.spot.park(addr_index, validate, timeout) {
ParkResult::Unparked => 0,
ParkResult::Invalid => 1,
ParkResult::TimedOut => 2,
}
}
/// Implementation of `memory.atomic.wait64` for this shared memory.
pub fn atomic_wait64(
&self,
addr_index: u64,
expected: u64,
timeout: Option<Instant>,
) -> Result<u32, Trap> {
let definition = &self.0.lock.read().unwrap().def.0;
let addr = definition.validate_addr(addr_index, 8, 8)?;
// SAFETY: checked `addr` above
Ok(unsafe { self.unchecked_atomic_wait64(addr_index, addr, expected, timeout) })
}
/// Unchecked implementation of `memory.atomic.wait64` for this shared memory.
///
/// # Safety
/// The caller must ensure that `addr` is a valid address in the memory.
pub unsafe fn unchecked_atomic_wait64(
&self,
addr_index: u64,
addr: *const u8,
expected: u64,
timeout: Option<Instant>,
) -> u32 {
// SAFETY: `addr_index` was validated by `validate_addr` above.
let atomic = unsafe { &*(addr as *const AtomicU64) };
// We want the sequential consistency of `SeqCst` to ensure that the `load` sees the value that the `notify` will/would see.
// All WASM atomic operations are also `SeqCst`.
let validate = || atomic.load(Ordering::SeqCst) == expected;
match self.0.spot.park(addr_index, validate, timeout) {
ParkResult::Unparked => 0,
ParkResult::Invalid => 1,
ParkResult::TimedOut => 2,
}
}
}
struct SharedMemoryInnerRwLocked {
memory: Box<dyn RuntimeLinearMemory>,
ty: wasmtime_environ::Memory,
def: LongTermVMMemoryDefinition,
}
struct SharedMemoryInner {
lock: RwLock<SharedMemoryInnerRwLocked>,
spot: ParkingSpot,
}
/// Shared memory needs some representation of a `VMMemoryDefinition` for
/// JIT-generated code to access. This structure owns the base pointer and
/// length to the actual memory and we share this definition across threads by:
/// - never changing the base pointer; according to the specification, shared
/// memory must be created with a known maximum size so it can be allocated
/// once and never moved
/// - carefully changing the length, using atomic accesses in both the runtime
/// and JIT-generated code.
struct LongTermVMMemoryDefinition(VMMemoryDefinition);
unsafe impl Send for LongTermVMMemoryDefinition {}
unsafe impl Sync for LongTermVMMemoryDefinition {}
/// Proxy all calls through the [`RwLock`].
impl RuntimeLinearMemory for SharedMemory {
fn byte_size(&self) -> usize {
self.0.lock.read().unwrap().memory.byte_size()
}
fn maximum_byte_size(&self) -> Option<usize> {
self.0.lock.read().unwrap().memory.maximum_byte_size()
}
fn grow(
&mut self,
delta_pages: u64,
store: Option<&mut dyn Store>,
) -> Result<Option<(usize, usize)>, Error> {
let mut inner = self.0.lock.write().unwrap();
let result = inner.memory.grow(delta_pages, store)?;
let mut memory = self.0.memory.write().unwrap();
let result = memory.grow(delta_pages, store)?;
if let Some((_old_size_in_bytes, new_size_in_bytes)) = result {
// Store the new size to the `VMMemoryDefinition` for JIT-generated
// code (and runtime functions) to access. No other code can be
@@ -641,7 +515,7 @@ impl RuntimeLinearMemory for SharedMemory {
// https://github.com/WebAssembly/threads/issues/26#issuecomment-433930711).
// In other words, some non-determinism is acceptable when using
// `memory.size` on work being done by `memory.grow`.
inner
self.0
.def
.0
.current_length
@@ -650,8 +524,85 @@ impl RuntimeLinearMemory for SharedMemory {
Ok(result)
}
/// Implementation of `memory.atomic.notify` for this shared memory.
pub fn atomic_notify(&self, addr_index: u64, count: u32) -> Result<u32, Trap> {
validate_atomic_addr(&self.0.def.0, addr_index, 4, 4)?;
Ok(self.0.spot.unpark(addr_index, count))
}
/// Implementation of `memory.atomic.wait32` for this shared memory.
pub fn atomic_wait32(
&self,
addr_index: u64,
expected: u32,
timeout: Option<Instant>,
) -> Result<WaitResult, Trap> {
let addr = validate_atomic_addr(&self.0.def.0, addr_index, 4, 4)?;
// SAFETY: `addr_index` was validated by `validate_atomic_addr` above.
assert!(std::mem::size_of::<AtomicU32>() == 4);
assert!(std::mem::align_of::<AtomicU32>() <= 4);
let atomic = unsafe { &*(addr as *const AtomicU32) };
// We want the sequential consistency of `SeqCst` to ensure that the `load` sees the value that the `notify` will/would see.
// All WASM atomic operations are also `SeqCst`.
let validate = || atomic.load(Ordering::SeqCst) == expected;
Ok(self.0.spot.park(addr_index, validate, timeout))
}
/// Implementation of `memory.atomic.wait64` for this shared memory.
pub fn atomic_wait64(
&self,
addr_index: u64,
expected: u64,
timeout: Option<Instant>,
) -> Result<WaitResult, Trap> {
let addr = validate_atomic_addr(&self.0.def.0, addr_index, 8, 8)?;
// SAFETY: `addr_index` was validated by `validate_atomic_addr` above.
assert!(std::mem::size_of::<AtomicU64>() == 8);
assert!(std::mem::align_of::<AtomicU64>() <= 8);
let atomic = unsafe { &*(addr as *const AtomicU64) };
// We want the sequential consistency of `SeqCst` to ensure that the `load` sees the value that the `notify` will/would see.
// All WASM atomic operations are also `SeqCst`.
let validate = || atomic.load(Ordering::SeqCst) == expected;
Ok(self.0.spot.park(addr_index, validate, timeout))
}
}
/// Shared memory needs some representation of a `VMMemoryDefinition` for
/// JIT-generated code to access. This structure owns the base pointer and
/// length to the actual memory and we share this definition across threads by:
/// - never changing the base pointer; according to the specification, shared
/// memory must be created with a known maximum size so it can be allocated
/// once and never moved
/// - carefully changing the length, using atomic accesses in both the runtime
/// and JIT-generated code.
struct LongTermVMMemoryDefinition(VMMemoryDefinition);
unsafe impl Send for LongTermVMMemoryDefinition {}
unsafe impl Sync for LongTermVMMemoryDefinition {}
/// Proxy all calls through the [`RwLock`].
impl RuntimeLinearMemory for SharedMemory {
fn byte_size(&self) -> usize {
self.0.memory.read().unwrap().byte_size()
}
fn maximum_byte_size(&self) -> Option<usize> {
self.0.memory.read().unwrap().maximum_byte_size()
}
fn grow(
&mut self,
delta_pages: u64,
store: Option<&mut dyn Store>,
) -> Result<Option<(usize, usize)>, Error> {
SharedMemory::grow(self, delta_pages, store)
}
fn grow_to(&mut self, size: usize) -> Result<()> {
self.0.lock.write().unwrap().memory.grow_to(size)
self.0.memory.write().unwrap().grow_to(size)
}
fn vmmemory(&mut self) -> VMMemoryDefinition {
@@ -663,7 +614,7 @@ impl RuntimeLinearMemory for SharedMemory {
}
fn needs_init(&self) -> bool {
self.0.lock.read().unwrap().memory.needs_init()
self.0.memory.read().unwrap().needs_init()
}
fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
@@ -872,12 +823,78 @@ impl Memory {
/// If the [Memory] is a [SharedMemory], unwrap it and return a clone to
/// that shared memory.
pub fn as_shared_memory(&mut self) -> Option<SharedMemory> {
pub fn as_shared_memory(&mut self) -> Option<&mut SharedMemory> {
let as_any = self.0.as_any_mut();
if let Some(m) = as_any.downcast_mut::<SharedMemory>() {
Some(m.clone())
Some(m)
} else {
None
}
}
/// Implementation of `memory.atomic.notify` for all memories.
pub fn atomic_notify(&mut self, addr: u64, count: u32) -> Result<u32, Trap> {
match self.0.as_any_mut().downcast_mut::<SharedMemory>() {
Some(m) => m.atomic_notify(addr, count),
None => {
validate_atomic_addr(&self.vmmemory(), addr, 4, 4)?;
Ok(0)
}
}
}
/// Implementation of `memory.atomic.wait32` for all memories.
pub fn atomic_wait32(
&mut self,
addr: u64,
expected: u32,
deadline: Option<Instant>,
) -> Result<WaitResult, Trap> {
match self.0.as_any_mut().downcast_mut::<SharedMemory>() {
Some(m) => m.atomic_wait32(addr, expected, deadline),
None => {
validate_atomic_addr(&self.vmmemory(), addr, 4, 4)?;
Err(Trap::AtomicWaitNonSharedMemory)
}
}
}
/// Implementation of `memory.atomic.wait64` for all memories.
pub fn atomic_wait64(
&mut self,
addr: u64,
expected: u64,
deadline: Option<Instant>,
) -> Result<WaitResult, Trap> {
match self.0.as_any_mut().downcast_mut::<SharedMemory>() {
Some(m) => m.atomic_wait64(addr, expected, deadline),
None => {
validate_atomic_addr(&self.vmmemory(), addr, 8, 8)?;
Err(Trap::AtomicWaitNonSharedMemory)
}
}
}
}
/// In the configurations where bounds checks were elided in JIT code (because
/// we are using static memories with virtual memory guard pages) this manual
/// check is here so we don't segfault from Rust. For other configurations,
/// these checks are required anyways.
fn validate_atomic_addr(
def: &VMMemoryDefinition,
addr: u64,
access_size: u64,
access_alignment: u64,
) -> Result<*mut u8, Trap> {
debug_assert!(access_alignment.is_power_of_two());
if !(addr % access_alignment == 0) {
return Err(Trap::HeapMisaligned);
}
let length = u64::try_from(def.current_length()).unwrap();
if !(addr.saturating_add(access_size) < length) {
return Err(Trap::MemoryOutOfBounds);
}
Ok(def.base.wrapping_add(addr as usize))
}

View File

@@ -14,23 +14,11 @@
#![deny(missing_docs)]
#![deny(unsafe_code)]
use crate::WaitResult;
use std::collections::BTreeMap;
use std::sync::{Arc, Condvar, Mutex};
use std::time::Instant;
/// Result of a park operation.
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub enum ParkResult {
/// Unparked by another thread.
Unparked,
/// The validation callback returned false.
Invalid,
/// The timeout expired.
TimedOut,
}
#[derive(Default, Debug)]
struct Spot {
/// The number of threads parked on this spot.
@@ -57,7 +45,7 @@ impl ParkingSpot {
/// `unpark_all` or `unpark` with the same key, the current thread will be unparked.
///
/// The `validate` callback is called before parking.
/// If it returns `false`, the thread is not parked and `ParkResult::Invalid` is returned.
/// If it returns `false`, the thread is not parked and `WaitResult::Mismatch` is returned.
///
/// The `timeout` argument specifies the maximum amount of time the thread will be parked.
pub fn park(
@@ -65,7 +53,7 @@ impl ParkingSpot {
key: u64,
validate: impl FnOnce() -> bool,
timeout: impl Into<Option<Instant>>,
) -> ParkResult {
) -> WaitResult {
self.park_inner(key, validate, timeout.into())
}
@@ -74,7 +62,7 @@ impl ParkingSpot {
key: u64,
validate: impl FnOnce() -> bool,
timeout: Option<Instant>,
) -> ParkResult {
) -> WaitResult {
let mut inner = self
.inner
.lock()
@@ -82,7 +70,7 @@ impl ParkingSpot {
// check validation with lock held
if !validate() {
return ParkResult::Invalid;
return WaitResult::Mismatch;
}
// clone the condvar, so we can move the lock
@@ -143,10 +131,10 @@ impl ParkingSpot {
}
if timed_out {
return ParkResult::TimedOut;
return WaitResult::TimedOut;
}
return ParkResult::Unparked;
return WaitResult::Ok;
}
}
@@ -154,6 +142,9 @@ impl ParkingSpot {
///
/// Returns the number of threads that were actually unparked.
pub fn unpark(&self, key: u64, n: u32) -> u32 {
if n == 0 {
return 0;
}
let mut num_unpark = 0;
self.with_lot(key, |spot| {

View File

@@ -12,7 +12,7 @@ use std::ptr::NonNull;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::u32;
pub use vm_host_func_context::VMHostFuncContext;
use wasmtime_environ::{DefinedMemoryIndex, Trap};
use wasmtime_environ::DefinedMemoryIndex;
pub const VMCONTEXT_MAGIC: u32 = u32::from_le_bytes(*b"core");
@@ -248,30 +248,6 @@ impl VMMemoryDefinition {
current_length: other.current_length().into(),
}
}
/// In the configurations where bounds checks were elided in JIT code (because
/// we are using static memories with virtual memory guard pages) this manual
/// check is here so we don't segfault from Rust. For other configurations,
/// these checks are required anyways.
pub fn validate_addr(
&self,
addr: u64,
access_size: u64,
access_alignment: u64,
) -> Result<*const u8, Trap> {
debug_assert!(access_alignment.is_power_of_two());
if !(addr % access_alignment == 0) {
return Err(Trap::HeapMisaligned);
}
let length = u64::try_from(self.current_length()).unwrap();
if !(addr.saturating_add(access_size) < length) {
return Err(Trap::MemoryOutOfBounds);
}
// SAFETY: checked above that the address is in bounds
Ok(unsafe { self.base.add(addr as usize) })
}
}
#[cfg(test)]

View File

@@ -1,13 +1,17 @@
use crate::store::{StoreData, StoreOpaque, Stored};
use crate::trampoline::generate_memory_export;
use crate::Trap;
use crate::{AsContext, AsContextMut, Engine, MemoryType, StoreContext, StoreContextMut};
use anyhow::{bail, Result};
use std::cell::UnsafeCell;
use std::convert::TryFrom;
use std::slice;
use std::time::Instant;
use wasmtime_environ::MemoryPlan;
use wasmtime_runtime::{RuntimeLinearMemory, VMMemoryImport};
pub use wasmtime_runtime::WaitResult;
/// Error for out of bounds [`Memory`] access.
#[derive(Debug)]
#[non_exhaustive]
@@ -780,7 +784,7 @@ impl SharedMemory {
/// the maximum limits of this memory. A
/// [`ResourceLimiter`](crate::ResourceLimiter) is another example of
/// preventing a memory to grow.
pub fn grow(&mut self, delta: u64) -> Result<u64> {
pub fn grow(&self, delta: u64) -> Result<u64> {
match self.0.grow(delta, None)? {
Some((old_size, _new_size)) => {
// For shared memory, the `VMMemoryDefinition` is updated inside
@@ -791,6 +795,87 @@ impl SharedMemory {
}
}
/// Equivalent of the WebAssembly `memory.atomic.notify` instruction for
/// this shared memory.
///
/// This method allows embedders to notify threads blocked on the specified
/// `addr`, an index into wasm linear memory. Threads could include
/// wasm threads blocked on a `memory.atomic.wait*` instruction or embedder
/// threads blocked on [`SharedMemory::atomic_wait32`], for example.
///
/// The `count` argument is the number of threads to wake up.
///
/// This function returns the number of threads awoken.
///
/// # Errors
///
/// This function will return an error if `addr` is not within bounds or
/// not aligned to a 4-byte boundary.
pub fn atomic_notify(&self, addr: u64, count: u32) -> Result<u32, Trap> {
self.0.atomic_notify(addr, count)
}
/// Equivalent of the WebAssembly `memory.atomic.wait32` instruction for
/// this shared memory.
///
/// This method allows embedders to block the current thread until notified
/// via the `memory.atomic.notify` instruction or the
/// [`SharedMemory::atomic_notify`] method, enabling synchronization with
/// the wasm guest as desired.
///
/// The `expected` argument is the expected 32-bit value to be stored at
/// the byte address `addr` specified. The `addr` specified is an index
/// into this linear memory.
///
/// The optional `timeout` argument is the point in time after which the
/// calling thread is guaranteed to be woken up. Blocking will not occur
/// past this point.
///
/// This function returns one of three possible values:
///
/// * `WaitResult::Ok` - this function, loaded the value at `addr`, found
/// it was equal to `expected`, and then blocked (all as one atomic
/// operation). The thread was then awoken with a `memory.atomic.notify`
/// instruction or the [`SharedMemory::atomic_notify`] method.
/// * `WaitResult::Mismatch` - the value at `addr` was loaded but was not
/// equal to `expected` so the thread did not block and immediately
/// returned.
/// * `WaitResult::TimedOut` - all the steps of `Ok` happened, except this
/// thread was woken up due to a timeout.
///
/// This function will not return due to spurious wakeups.
///
/// # Errors
///
/// This function will return an error if `addr` is not within bounds or
/// not aligned to a 4-byte boundary.
pub fn atomic_wait32(
&self,
addr: u64,
expected: u32,
timeout: Option<Instant>,
) -> Result<WaitResult, Trap> {
self.0.atomic_wait32(addr, expected, timeout)
}
/// Equivalent of the WebAssembly `memory.atomic.wait64` instruction for
/// this shared memory.
///
/// For more information see [`SharedMemory::atomic_wait32`].
///
/// # Errors
///
/// Returns the same error as [`SharedMemory::atomic_wait32`] except that
/// the specified address must be 8-byte aligned instead of 4-byte aligned.
pub fn atomic_wait64(
&self,
addr: u64,
expected: u64,
timeout: Option<Instant>,
) -> Result<WaitResult, Trap> {
self.0.atomic_wait64(addr, expected, timeout)
}
/// Return a reference to the [`Engine`] used to configure the shared
/// memory.
pub(crate) fn engine(&self) -> &Engine {
@@ -824,7 +909,8 @@ impl SharedMemory {
.unwrap();
let shared_memory = memory
.as_shared_memory()
.expect("unable to convert from a shared memory");
.expect("unable to convert from a shared memory")
.clone();
Self(shared_memory, store.engine().clone())
}
}

View File

@@ -1,5 +1,7 @@
use anyhow::Result;
use rayon::prelude::*;
use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
use std::time::{Duration, Instant};
use wasmtime::*;
fn module(engine: &Engine) -> Result<Module> {
@@ -462,3 +464,105 @@ fn memory64_maximum_minimum() -> Result<()> {
Ok(())
}
#[test]
fn shared_memory_basics() -> Result<()> {
let engine = Engine::default();
assert!(SharedMemory::new(&engine, MemoryType::new(1, None)).is_err());
assert!(SharedMemory::new(&engine, MemoryType::new(1, Some(1))).is_err());
assert!(SharedMemory::new(&engine, MemoryType::new64(1, None)).is_err());
assert!(SharedMemory::new(&engine, MemoryType::new64(1, Some(1))).is_err());
assert!(SharedMemory::new(&engine, MemoryType::shared(1, 0)).is_err());
let memory = SharedMemory::new(&engine, MemoryType::shared(1, 1))?;
assert!(memory.ty().is_shared());
assert_eq!(memory.ty().minimum(), 1);
assert_eq!(memory.ty().maximum(), Some(1));
assert_eq!(memory.size(), 1);
assert_eq!(memory.data_size(), 65536);
assert_eq!(memory.data().len(), 65536);
assert!(memory.grow(1).is_err());
// misaligned
assert_eq!(memory.atomic_notify(1, 100), Err(Trap::HeapMisaligned));
assert_eq!(
memory.atomic_wait32(1, 100, None),
Err(Trap::HeapMisaligned)
);
assert_eq!(
memory.atomic_wait64(1, 100, None),
Err(Trap::HeapMisaligned)
);
// oob
assert_eq!(
memory.atomic_notify(1 << 20, 100),
Err(Trap::MemoryOutOfBounds)
);
assert_eq!(
memory.atomic_wait32(1 << 20, 100, None),
Err(Trap::MemoryOutOfBounds)
);
assert_eq!(
memory.atomic_wait64(1 << 20, 100, None),
Err(Trap::MemoryOutOfBounds)
);
// ok
assert_eq!(memory.atomic_notify(8, 100), Ok(0));
assert_eq!(memory.atomic_wait32(8, 1, None), Ok(WaitResult::Mismatch));
assert_eq!(memory.atomic_wait64(8, 1, None), Ok(WaitResult::Mismatch));
// timeout
let near_future = Instant::now() + Duration::new(0, 100);
assert_eq!(
memory.atomic_wait32(8, 0, Some(near_future)),
Ok(WaitResult::TimedOut)
);
assert_eq!(
memory.atomic_wait64(8, 0, Some(near_future)),
Ok(WaitResult::TimedOut)
);
Ok(())
}
#[test]
fn shared_memory_wait_notify() -> Result<()> {
const THREADS: usize = 8;
const COUNT: usize = 100_000;
let engine = Engine::default();
let memory = SharedMemory::new(&engine, MemoryType::shared(1, 1))?;
let data = unsafe { &*(memory.data().as_ptr() as *const AtomicU32) };
let locked = unsafe { &*(memory.data().as_ptr().add(4) as *const AtomicU32) };
// Note that `SeqCst` is used here to not think much about the orderings
// here, and it also somewhat more closely mirrors what's happening in wasm.
let lock = || {
while locked.swap(1, SeqCst) == 1 {
memory.atomic_wait32(0, 1, None).unwrap();
}
};
let unlock = || {
locked.store(0, SeqCst);
memory.atomic_notify(0, 1).unwrap();
};
std::thread::scope(|s| {
for _ in 0..THREADS {
s.spawn(|| {
for _ in 0..COUNT {
lock();
let next = data.load(SeqCst) + 1;
data.store(next, SeqCst);
unlock();
}
});
}
});
assert_eq!(data.load(SeqCst), (THREADS * COUNT) as u32);
Ok(())
}

View File

@@ -107,7 +107,7 @@ fn test_probe_shared_memory_size() -> Result<()> {
let mut store = Store::new(&engine, ());
let instance = Instance::new(&mut store, &module, &[])?;
let size_fn = instance.get_typed_func::<(), i32>(&mut store, "size")?;
let mut shared_memory = instance.get_shared_memory(&mut store, "memory").unwrap();
let shared_memory = instance.get_shared_memory(&mut store, "memory").unwrap();
assert_eq!(size_fn.call(&mut store, ())?, 1);
assert_eq!(shared_memory.size(), 1);
@@ -244,7 +244,7 @@ fn test_memory_size_accessibility() -> Result<()> {
let shared_memory = SharedMemory::new(&engine, MemoryType::shared(1, NUM_GROW_OPS as u32))?;
let done = Arc::new(AtomicBool::new(false));
let mut grow_memory = shared_memory.clone();
let grow_memory = shared_memory.clone();
let grow_thread = std::thread::spawn(move || {
for i in 0..NUM_GROW_OPS {
if grow_memory.grow(1).is_err() {