* Add shared memories This change adds the ability to use shared memories in Wasmtime when the [threads proposal] is enabled. Shared memories are annotated as `shared` in the WebAssembly syntax, e.g., `(memory 1 1 shared)`, and are protected from concurrent access during `memory.size` and `memory.grow`. [threads proposal]: https://github.com/WebAssembly/threads/blob/master/proposals/threads/Overview.md In order to implement this in Wasmtime, there are two main cases to cover: - a program may simply create a shared memory and possibly export it; this means that Wasmtime itself must be able to create shared memories - a user may create a shared memory externally and pass it in as an import during instantiation; this is the case when the program contains code like `(import "env" "memory" (memory 1 1 shared))`--this case is handled by a new Wasmtime API type--`SharedMemory` Because of the first case, this change allows any of the current memory-creation mechanisms to work as-is. Wasmtime can still create either static or dynamic memories in either on-demand or pooling modes, and any of these memories can be considered shared. When shared, the `Memory` runtime container will lock appropriately during `memory.size` and `memory.grow` operations; since all memories use this container, it is an ideal place for implementing the locking once and once only. The second case is covered by the new `SharedMemory` structure. It uses the same `Mmap` allocation under the hood as non-shared memories, but allows the user to perform the allocation externally to Wasmtime and share the memory across threads (via an `Arc`). The pointer address to the actual memory is carefully wired through and owned by the `SharedMemory` structure itself. This means that there are differing views of where to access the pointer (i.e., `VMMemoryDefinition`): for owned memories (the default), the `VMMemoryDefinition` is stored directly by the `VMContext`; in the `SharedMemory` case, however, this `VMContext` must point to this separate structure. To ensure that the `VMContext` can always point to the correct `VMMemoryDefinition`, this change alters the `VMContext` structure. Since a `SharedMemory` owns its own `VMMemoryDefinition`, the `defined_memories` table in the `VMContext` becomes a sequence of pointers--in the shared memory case, they point to the `VMMemoryDefinition` owned by the `SharedMemory` and in the owned memory case (i.e., not shared) they point to `VMMemoryDefinition`s stored in a new table, `owned_memories`. This change adds an additional indirection (through the `*mut VMMemoryDefinition` pointer) that could add overhead. Using an imported memory as a proxy, we measured a 1-3% overhead of this approach on the `pulldown-cmark` benchmark. To avoid this, Cranelift-generated code will special-case the owned memory access (i.e., load a pointer directly to the `owned_memories` entry) for `memory.size` so that only shared memories (and imported memories, as before) incur the indirection cost. * review: remove thread feature check * review: swap wasmtime-types dependency for existing wasmtime-environ use * review: remove unused VMMemoryUnion * review: reword cross-engine error message * review: improve tests * review: refactor to separate prevent Memory <-> SharedMemory conversion * review: into_shared_memory -> as_shared_memory * review: remove commented out code * review: limit shared min/max to 32 bits * review: skip imported memories * review: imported memories are not owned * review: remove TODO * review: document unsafe send + sync * review: add limiter assertion * review: remove TODO * review: improve tests * review: fix doc test * fix: fixes based on discussion with Alex This changes several key parts: - adds memory indexes to imports and exports - makes `VMMemoryDefinition::current_length` an atomic usize * review: add `Extern::SharedMemory` * review: remove TODO * review: atomically load from VMMemoryDescription in JIT-generated code * review: add test probing the last available memory slot across threads * fix: move assertion to new location due to rebase * fix: doc link * fix: add TODOs to c-api * fix: broken doc link * fix: modify pooling allocator messages in tests * review: make owned_memory_index panic instead of returning an option * review: clarify calculation of num_owned_memories * review: move 'use' to top of file * review: change '*const [u8]' to '*mut [u8]' * review: remove TODO * review: avoid hard-coding memory index * review: remove 'preallocation' parameter from 'Memory::_new' * fix: component model memory length * review: check that shared memory plans are static * review: ignore growth limits for shared memory * review: improve atomic store comment * review: add FIXME for memory growth failure * review: add comment about absence of bounds-checked 'memory.size' * review: make 'current_length()' doc comment more precise * review: more comments related to memory.size non-determinism * review: make 'vmmemory' unreachable for shared memory * review: move code around * review: thread plan through to 'wrap()' * review: disallow shared memory allocation with the pooling allocator
270 lines
9.5 KiB
Rust
270 lines
9.5 KiB
Rust
use anyhow::Result;
|
|
use std::{
|
|
collections::{hash_map::RandomState, HashSet},
|
|
sync::{
|
|
atomic::{AtomicBool, Ordering},
|
|
Arc, RwLock,
|
|
},
|
|
};
|
|
use wasmtime::*;
|
|
|
|
#[test]
|
|
fn test_instantiate_shared_memory() -> Result<()> {
|
|
let wat = r#"(module (memory 1 1 shared))"#;
|
|
let mut config = Config::new();
|
|
config.wasm_threads(true);
|
|
let engine = Engine::new(&config)?;
|
|
let module = Module::new(&engine, wat)?;
|
|
let mut store = Store::new(&engine, ());
|
|
let _instance = Instance::new(&mut store, &module, &[])?;
|
|
Ok(())
|
|
}
|
|
|
|
#[test]
|
|
fn test_import_shared_memory() -> Result<()> {
|
|
let wat = r#"(module (import "env" "memory" (memory 1 5 shared)))"#;
|
|
let mut config = Config::new();
|
|
config.wasm_threads(true);
|
|
let engine = Engine::new(&config)?;
|
|
let module = Module::new(&engine, wat)?;
|
|
let mut store = Store::new(&engine, ());
|
|
let shared_memory = SharedMemory::new(&engine, MemoryType::shared(1, 5))?;
|
|
let _instance = Instance::new(&mut store, &module, &[shared_memory.into()])?;
|
|
Ok(())
|
|
}
|
|
|
|
#[test]
|
|
fn test_export_shared_memory() -> Result<()> {
|
|
let wat = r#"(module (memory (export "memory") 1 5 shared))"#;
|
|
let mut config = Config::new();
|
|
config.wasm_threads(true);
|
|
let engine = Engine::new(&config)?;
|
|
let module = Module::new(&engine, wat)?;
|
|
let mut store = Store::new(&engine, ());
|
|
let instance = Instance::new(&mut store, &module, &[])?;
|
|
let shared_memory = instance.get_shared_memory(&mut store, "memory").unwrap();
|
|
|
|
assert_eq!(shared_memory.size(), 1);
|
|
assert!(shared_memory.ty().is_shared());
|
|
assert_eq!(shared_memory.ty().maximum(), Some(5));
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[test]
|
|
fn test_sharing_of_shared_memory() -> Result<()> {
|
|
let wat = r#"(module
|
|
(import "env" "memory" (memory 1 5 shared))
|
|
(func (export "first_word") (result i32) (i32.load (i32.const 0)))
|
|
)"#;
|
|
let mut config = Config::new();
|
|
config.wasm_threads(true);
|
|
let engine = Engine::new(&config)?;
|
|
let module = Module::new(&engine, wat)?;
|
|
let mut store = Store::new(&engine, ());
|
|
let shared_memory = SharedMemory::new(&engine, MemoryType::shared(1, 5))?;
|
|
let instance1 = Instance::new(&mut store, &module, &[shared_memory.clone().into()])?;
|
|
let instance2 = Instance::new(&mut store, &module, &[shared_memory.clone().into()])?;
|
|
|
|
// Modify the memory in one place.
|
|
unsafe {
|
|
(*(shared_memory.data()))[0] = 42;
|
|
}
|
|
|
|
// Verify that the memory is the same in all shared locations.
|
|
let shared_memory_first_word =
|
|
i32::from_le_bytes(unsafe { (*shared_memory.data())[0..4].try_into()? });
|
|
let instance1_first_word = instance1
|
|
.get_typed_func::<(), i32, _>(&mut store, "first_word")?
|
|
.call(&mut store, ())?;
|
|
let instance2_first_word = instance2
|
|
.get_typed_func::<(), i32, _>(&mut store, "first_word")?
|
|
.call(&mut store, ())?;
|
|
assert_eq!(shared_memory_first_word, 42);
|
|
assert_eq!(instance1_first_word, 42);
|
|
assert_eq!(instance2_first_word, 42);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[test]
|
|
fn test_probe_shared_memory_size() -> Result<()> {
|
|
let wat = r#"(module
|
|
(memory (export "memory") 1 2 shared)
|
|
(func (export "size") (result i32) (memory.size))
|
|
)"#;
|
|
let mut config = Config::new();
|
|
config.wasm_threads(true);
|
|
let engine = Engine::new(&config)?;
|
|
let module = Module::new(&engine, wat)?;
|
|
let mut store = Store::new(&engine, ());
|
|
let instance = Instance::new(&mut store, &module, &[])?;
|
|
let size_fn = instance.get_typed_func::<(), i32, _>(&mut store, "size")?;
|
|
let mut shared_memory = instance.get_shared_memory(&mut store, "memory").unwrap();
|
|
|
|
assert_eq!(size_fn.call(&mut store, ())?, 1);
|
|
assert_eq!(shared_memory.size(), 1);
|
|
|
|
shared_memory.grow(1)?;
|
|
|
|
assert_eq!(shared_memory.size(), 2);
|
|
assert_eq!(size_fn.call(&mut store, ())?, 2);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[test]
|
|
fn test_multi_memory() -> Result<()> {
|
|
let wat = r#"(module
|
|
(import "env" "imported" (memory $imported 5 10 shared))
|
|
(memory (export "owned") 10 20)
|
|
(memory (export "shared") 1 2 shared)
|
|
(export "imported" (memory $imported))
|
|
)"#;
|
|
let mut config = Config::new();
|
|
config.wasm_threads(true);
|
|
config.wasm_multi_memory(true);
|
|
let engine = Engine::new(&config)?;
|
|
let module = Module::new(&engine, wat)?;
|
|
let mut store = Store::new(&engine, ());
|
|
let incoming_shared_memory = SharedMemory::new(&engine, MemoryType::shared(5, 10))?;
|
|
let instance = Instance::new(&mut store, &module, &[incoming_shared_memory.into()])?;
|
|
let owned_memory = instance.get_memory(&mut store, "owned").unwrap();
|
|
let shared_memory = instance.get_shared_memory(&mut store, "shared").unwrap();
|
|
let imported_memory = instance.get_shared_memory(&mut store, "imported").unwrap();
|
|
|
|
assert_eq!(owned_memory.size(&store), 10);
|
|
assert_eq!(owned_memory.ty(&store).minimum(), 10);
|
|
assert_eq!(owned_memory.ty(&store).maximum(), Some(20));
|
|
assert_eq!(owned_memory.ty(&store).is_shared(), false);
|
|
assert_eq!(shared_memory.size(), 1);
|
|
assert_eq!(shared_memory.ty().minimum(), 1);
|
|
assert_eq!(shared_memory.ty().maximum(), Some(2));
|
|
assert_eq!(shared_memory.ty().is_shared(), true);
|
|
assert_eq!(imported_memory.size(), 5);
|
|
assert_eq!(imported_memory.ty().minimum(), 5);
|
|
assert_eq!(imported_memory.ty().maximum(), Some(10));
|
|
assert_eq!(imported_memory.ty().is_shared(), true);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[test]
|
|
fn test_grow_memory_in_multiple_threads() -> Result<()> {
|
|
const NUM_THREADS: usize = 4;
|
|
const NUM_GROW_OPS: usize = 1000;
|
|
|
|
let wat = r#"(module
|
|
(import "env" "memory" (memory 1 4000 shared))
|
|
(func (export "grow") (param $delta i32) (result i32) (memory.grow (local.get $delta)))
|
|
)"#;
|
|
|
|
let mut config = Config::new();
|
|
config.wasm_threads(true);
|
|
let engine = Engine::new(&config)?;
|
|
let module = Module::new(&engine, wat)?;
|
|
let shared_memory = SharedMemory::new(&engine, MemoryType::shared(1, NUM_GROW_OPS as u32))?;
|
|
let mut threads = vec![];
|
|
let observed_sizes = Arc::new(RwLock::new(vec![]));
|
|
|
|
// Spawn several threads using a single shared memory and grow the memory
|
|
// concurrently on all threads.
|
|
for _ in 0..NUM_THREADS {
|
|
let engine = engine.clone();
|
|
let module = module.clone();
|
|
let observed_sizes = observed_sizes.clone();
|
|
let shared_memory = shared_memory.clone();
|
|
let thread = std::thread::spawn(move || {
|
|
let mut store = Store::new(&engine, ());
|
|
let instance = Instance::new(&mut store, &module, &[shared_memory.into()]).unwrap();
|
|
let grow_fn = instance
|
|
.get_typed_func::<i32, i32, _>(&mut store, "grow")
|
|
.unwrap();
|
|
let mut thread_local_observed_sizes: Vec<_> = (0..NUM_GROW_OPS / NUM_THREADS)
|
|
.map(|_| grow_fn.call(&mut store, 1).unwrap() as u32)
|
|
.collect();
|
|
println!(
|
|
"Returned memory sizes for {:?}: {:?}",
|
|
std::thread::current().id(),
|
|
thread_local_observed_sizes
|
|
);
|
|
assert!(is_sorted(thread_local_observed_sizes.as_slice()));
|
|
observed_sizes
|
|
.write()
|
|
.unwrap()
|
|
.append(&mut thread_local_observed_sizes);
|
|
});
|
|
threads.push(thread);
|
|
}
|
|
|
|
// Wait for all threads to finish.
|
|
for t in threads {
|
|
t.join().unwrap()
|
|
}
|
|
|
|
// Ensure the returned "old memory sizes" are all unique--i.e., we have not
|
|
// observed the same growth twice.
|
|
let unique_observed_sizes: HashSet<u32, RandomState> =
|
|
HashSet::from_iter(observed_sizes.read().unwrap().iter().cloned());
|
|
assert_eq!(
|
|
observed_sizes.read().unwrap().len(),
|
|
unique_observed_sizes.len()
|
|
);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
fn is_sorted(data: &[u32]) -> bool {
|
|
data.windows(2).all(|d| d[0] <= d[1])
|
|
}
|
|
|
|
#[test]
|
|
fn test_memory_size_accessibility() -> Result<()> {
|
|
const NUM_GROW_OPS: usize = 1000;
|
|
let wat = r#"(module
|
|
(import "env" "memory" (memory $memory 1 1000 shared))
|
|
(func (export "probe_last_available") (result i32)
|
|
(local $last_address i32)
|
|
(local.set $last_address (i32.sub (i32.mul (memory.size) (i32.const 0x10000)) (i32.const 4)))
|
|
(i32.load $memory (local.get $last_address))
|
|
)
|
|
)"#;
|
|
|
|
let mut config = Config::new();
|
|
config.wasm_threads(true);
|
|
let engine = Engine::new(&config)?;
|
|
let module = Module::new(&engine, wat)?;
|
|
let shared_memory = SharedMemory::new(&engine, MemoryType::shared(1, NUM_GROW_OPS as u32))?;
|
|
let done = Arc::new(AtomicBool::new(false));
|
|
|
|
let mut grow_memory = shared_memory.clone();
|
|
let grow_thread = std::thread::spawn(move || {
|
|
for i in 0..NUM_GROW_OPS {
|
|
if grow_memory.grow(1).is_err() {
|
|
println!("stopping at grow operation #{}", i);
|
|
break;
|
|
}
|
|
}
|
|
});
|
|
|
|
let probe_memory = shared_memory.clone();
|
|
let probe_done = done.clone();
|
|
let probe_thread = std::thread::spawn(move || {
|
|
let mut store = Store::new(&engine, ());
|
|
let instance = Instance::new(&mut store, &module, &[probe_memory.into()]).unwrap();
|
|
let probe_fn = instance
|
|
.get_typed_func::<(), i32, _>(&mut store, "probe_last_available")
|
|
.unwrap();
|
|
while !probe_done.load(Ordering::SeqCst) {
|
|
let value = probe_fn.call(&mut store, ()).unwrap() as u32;
|
|
assert_eq!(value, 0);
|
|
}
|
|
});
|
|
|
|
grow_thread.join().unwrap();
|
|
done.store(true, Ordering::SeqCst);
|
|
probe_thread.join().unwrap();
|
|
|
|
Ok(())
|
|
}
|