Add epoch-based interruption for cooperative async timeslicing.

This PR introduces a new way of performing cooperative timeslicing that
is intended to replace the "fuel" mechanism. The tradeoff is that this
mechanism interrupts with less precision: not at deterministic points
where fuel runs out, but rather when the Engine enters a new epoch. The
generated code instrumentation is substantially faster, however, because
it does not need to do as much work as when tracking fuel; it only loads
the global "epoch counter" and does a compare-and-branch at backedges
and function prologues.

This change has been measured as ~twice as fast as fuel-based
timeslicing for some workloads, especially control-flow-intensive
workloads such as the SpiderMonkey JS interpreter on Wasm/WASI.

The intended interface is that the embedder of the `Engine` performs an
`engine.increment_epoch()` call periodically, e.g. once per millisecond.
An async invocation of a Wasm guest on a `Store` can specify a number of
epoch-ticks that are allowed before an async yield back to the
executor's event loop. (The initial amount and automatic "refills" are
configured on the `Store`, just as for fuel.) This call does only
signal-safe work (it increments an `AtomicU64`) so could be invoked from
a periodic signal, or from a thread that wakes up once per period.
This commit is contained in:
Chris Fallin
2022-01-18 17:23:09 -08:00
parent ae476fde60
commit 8a55b5c563
19 changed files with 1034 additions and 26 deletions

View File

@@ -137,6 +137,19 @@ pub struct FuncEnvironment<'module_environment> {
/// so if we load it up front we can continue to use it throughout. /// so if we load it up front we can continue to use it throughout.
vminterrupts_ptr: cranelift_frontend::Variable, vminterrupts_ptr: cranelift_frontend::Variable,
/// A cached epoch deadline value, when performing epoch-based
/// interruption. Loaded from `VMInterrupts` and reloaded after
/// any yield.
epoch_deadline_var: cranelift_frontend::Variable,
/// A cached pointer to the per-Engine epoch counter, when
/// performing epoch-based interruption. Initialized in the
/// function prologue. We prefer to use a variable here rather
/// than reload on each check because it's better to let the
/// regalloc keep it in a register if able; if not, it can always
/// spill, and this isn't any worse than reloading each time.
epoch_ptr_var: cranelift_frontend::Variable,
fuel_consumed: i64, fuel_consumed: i64,
} }
@@ -166,6 +179,8 @@ impl<'module_environment> FuncEnvironment<'module_environment> {
offsets: VMOffsets::new(isa.pointer_bytes(), &translation.module), offsets: VMOffsets::new(isa.pointer_bytes(), &translation.module),
tunables, tunables,
fuel_var: Variable::new(0), fuel_var: Variable::new(0),
epoch_deadline_var: Variable::new(0),
epoch_ptr_var: Variable::new(0),
vminterrupts_ptr: Variable::new(0), vminterrupts_ptr: Variable::new(0),
// Start with at least one fuel being consumed because even empty // Start with at least one fuel being consumed because even empty
@@ -558,6 +573,125 @@ impl<'module_environment> FuncEnvironment<'module_environment> {
builder.switch_to_block(continuation_block); builder.switch_to_block(continuation_block);
} }
fn epoch_function_entry(&mut self, builder: &mut FunctionBuilder<'_>) {
builder.declare_var(self.epoch_deadline_var, ir::types::I64);
self.epoch_load_deadline_into_var(builder);
builder.declare_var(self.epoch_ptr_var, self.pointer_type());
let epoch_ptr = self.epoch_ptr(builder);
builder.def_var(self.epoch_ptr_var, epoch_ptr);
// We must check for an epoch change when entering a
// function. Why? Why aren't checks at loops sufficient to
// bound runtime to O(|static program size|)?
//
// The reason is that one can construct a "zip-bomb-like"
// program with exponential-in-program-size runtime, with no
// backedges (loops), by building a tree of function calls: f0
// calls f1 ten tims, f1 calls f2 ten times, etc. E.g., nine
// levels of this yields a billion function calls with no
// backedges. So we can't do checks only at backedges.
//
// In this "call-tree" scenario, and in fact in any program
// that uses calls as a sort of control flow to try to evade
// backedge checks, a check at every function entry is
// sufficient. Then, combined with checks at every backedge
// (loop) the longest runtime between checks is bounded by the
// straightline length of any function body.
self.epoch_check(builder);
}
fn epoch_ptr(&mut self, builder: &mut FunctionBuilder<'_>) -> ir::Value {
let vmctx = self.vmctx(builder.func);
let pointer_type = self.pointer_type();
let base = builder.ins().global_value(pointer_type, vmctx);
let offset = i32::try_from(self.offsets.vmctx_epoch_ptr()).unwrap();
let epoch_ptr = builder
.ins()
.load(pointer_type, ir::MemFlags::trusted(), base, offset);
epoch_ptr
}
fn epoch_load_current(&mut self, builder: &mut FunctionBuilder<'_>) -> ir::Value {
let addr = builder.use_var(self.epoch_ptr_var);
builder.ins().load(
ir::types::I64,
ir::MemFlags::trusted(),
addr,
ir::immediates::Offset32::new(0),
)
}
fn epoch_load_deadline_into_var(&mut self, builder: &mut FunctionBuilder<'_>) {
let interrupts = builder.use_var(self.vminterrupts_ptr);
let deadline = builder.ins().load(
ir::types::I64,
ir::MemFlags::trusted(),
interrupts,
ir::immediates::Offset32::new(self.offsets.vminterupts_epoch_deadline() as i32),
);
builder.def_var(self.epoch_deadline_var, deadline);
}
fn epoch_check(&mut self, builder: &mut FunctionBuilder<'_>) {
let new_epoch_block = builder.create_block();
let new_epoch_doublecheck_block = builder.create_block();
let continuation_block = builder.create_block();
builder.set_cold_block(new_epoch_block);
builder.set_cold_block(new_epoch_doublecheck_block);
let epoch_deadline = builder.use_var(self.epoch_deadline_var);
// Load new epoch and check against cached deadline. The
// deadline may be out of date if it was updated (within
// another yield) during some function that we called; this is
// fine, as we'll reload it and check again before yielding in
// the cold path.
let cur_epoch_value = self.epoch_load_current(builder);
let cmp = builder.ins().ifcmp(cur_epoch_value, epoch_deadline);
builder
.ins()
.brif(IntCC::UnsignedGreaterThanOrEqual, cmp, new_epoch_block, &[]);
builder.ins().jump(continuation_block, &[]);
builder.seal_block(new_epoch_block);
// In the "new epoch block", we've noticed that the epoch has
// exceeded our cached deadline. However the real deadline may
// have been moved in the meantime. We keep the cached value
// in a register to speed the checks in the common case
// (between epoch ticks) but we want to do a precise check
// here, on the cold path, by reloading the latest value
// first.
builder.switch_to_block(new_epoch_block);
self.epoch_load_deadline_into_var(builder);
let fresh_epoch_deadline = builder.use_var(self.epoch_deadline_var);
let fresh_cmp = builder.ins().ifcmp(cur_epoch_value, fresh_epoch_deadline);
builder.ins().brif(
IntCC::UnsignedGreaterThanOrEqual,
fresh_cmp,
new_epoch_doublecheck_block,
&[],
);
builder.ins().jump(continuation_block, &[]);
builder.seal_block(new_epoch_doublecheck_block);
builder.switch_to_block(new_epoch_doublecheck_block);
let new_epoch_sig = self.builtin_function_signatures.new_epoch(builder.func);
let (vmctx, new_epoch) = self.translate_load_builtin_function_address(
&mut builder.cursor(),
BuiltinFunctionIndex::new_epoch(),
);
// new_epoch() returns the new deadline, so we don't have to
// reload it.
let call = builder
.ins()
.call_indirect(new_epoch_sig, new_epoch, &[vmctx]);
let new_deadline = *builder.func.dfg.inst_results(call).first().unwrap();
builder.def_var(self.epoch_deadline_var, new_deadline);
builder.ins().jump(continuation_block, &[]);
builder.seal_block(continuation_block);
builder.switch_to_block(continuation_block);
}
fn memory_index_type(&self, index: MemoryIndex) -> ir::Type { fn memory_index_type(&self, index: MemoryIndex) -> ir::Type {
if self.module.memory_plans[index].memory.memory64 { if self.module.memory_plans[index].memory.memory64 {
I64 I64
@@ -633,6 +767,8 @@ impl<'module_environment> cranelift_wasm::FuncEnvironment for FuncEnvironment<'m
fn after_locals(&mut self, num_locals: usize) { fn after_locals(&mut self, num_locals: usize) {
self.vminterrupts_ptr = Variable::new(num_locals); self.vminterrupts_ptr = Variable::new(num_locals);
self.fuel_var = Variable::new(num_locals + 1); self.fuel_var = Variable::new(num_locals + 1);
self.epoch_deadline_var = Variable::new(num_locals + 2);
self.epoch_ptr_var = Variable::new(num_locals + 3);
} }
fn make_table(&mut self, func: &mut ir::Function, index: TableIndex) -> WasmResult<ir::Table> { fn make_table(&mut self, func: &mut ir::Function, index: TableIndex) -> WasmResult<ir::Table> {
@@ -1787,6 +1923,12 @@ impl<'module_environment> cranelift_wasm::FuncEnvironment for FuncEnvironment<'m
self.fuel_check(builder); self.fuel_check(builder);
} }
// If we are performing epoch-based interruption, check to see
// if the epoch counter has changed.
if self.tunables.epoch_interruption {
self.epoch_check(builder);
}
Ok(()) Ok(())
} }
@@ -1821,13 +1963,20 @@ impl<'module_environment> cranelift_wasm::FuncEnvironment for FuncEnvironment<'m
) -> WasmResult<()> { ) -> WasmResult<()> {
// If the `vminterrupts_ptr` variable will get used then we initialize // If the `vminterrupts_ptr` variable will get used then we initialize
// it here. // it here.
if self.tunables.consume_fuel || self.tunables.interruptable { if self.tunables.consume_fuel
|| self.tunables.interruptable
|| self.tunables.epoch_interruption
{
self.declare_vminterrupts_ptr(builder); self.declare_vminterrupts_ptr(builder);
} }
// Additionally we initialize `fuel_var` if it will get used. // Additionally we initialize `fuel_var` if it will get used.
if self.tunables.consume_fuel { if self.tunables.consume_fuel {
self.fuel_function_entry(builder); self.fuel_function_entry(builder);
} }
// Initialize `epoch_var` with the current epoch.
if self.tunables.epoch_interruption {
self.epoch_function_entry(builder);
}
Ok(()) Ok(())
} }

View File

@@ -45,6 +45,8 @@ macro_rules! foreach_builtin_function {
memory_atomic_wait64(vmctx, i32, pointer, i64, i64) -> (i32); memory_atomic_wait64(vmctx, i32, pointer, i64, i64) -> (i32);
/// Invoked when fuel has run out while executing a function. /// Invoked when fuel has run out while executing a function.
out_of_gas(vmctx) -> (); out_of_gas(vmctx) -> ();
/// Invoked when we reach a new epoch.
new_epoch(vmctx) -> (i64);
} }
}; };
} }

View File

@@ -36,6 +36,9 @@ pub struct Tunables {
/// will be consumed every time a wasm instruction is executed. /// will be consumed every time a wasm instruction is executed.
pub consume_fuel: bool, pub consume_fuel: bool,
/// Whether or not we use epoch-based interruption.
pub epoch_interruption: bool,
/// Whether or not to treat the static memory bound as the maximum for unbounded heaps. /// Whether or not to treat the static memory bound as the maximum for unbounded heaps.
pub static_memory_bound_is_maximum: bool, pub static_memory_bound_is_maximum: bool,
@@ -88,6 +91,7 @@ impl Default for Tunables {
parse_wasm_debuginfo: true, parse_wasm_debuginfo: true,
interruptable: false, interruptable: false,
consume_fuel: false, consume_fuel: false,
epoch_interruption: false,
static_memory_bound_is_maximum: false, static_memory_bound_is_maximum: false,
guard_before_linear_memory: true, guard_before_linear_memory: true,
generate_address_map: true, generate_address_map: true,

View File

@@ -73,6 +73,7 @@ pub struct VMOffsets<P> {
// precalculated offsets of various member fields // precalculated offsets of various member fields
interrupts: u32, interrupts: u32,
epoch_ptr: u32,
externref_activations_table: u32, externref_activations_table: u32,
store: u32, store: u32,
signature_ids: u32, signature_ids: u32,
@@ -174,6 +175,7 @@ impl<P: PtrSize> From<VMOffsetsFields<P>> for VMOffsets<P> {
num_defined_memories: fields.num_defined_memories, num_defined_memories: fields.num_defined_memories,
num_defined_globals: fields.num_defined_globals, num_defined_globals: fields.num_defined_globals,
interrupts: 0, interrupts: 0,
epoch_ptr: 0,
externref_activations_table: 0, externref_activations_table: 0,
store: 0, store: 0,
signature_ids: 0, signature_ids: 0,
@@ -190,10 +192,14 @@ impl<P: PtrSize> From<VMOffsetsFields<P>> for VMOffsets<P> {
}; };
ret.interrupts = 0; ret.interrupts = 0;
ret.externref_activations_table = ret ret.epoch_ptr = ret
.interrupts .interrupts
.checked_add(u32::from(ret.ptr.size())) .checked_add(u32::from(ret.ptr.size()))
.unwrap(); .unwrap();
ret.externref_activations_table = ret
.epoch_ptr
.checked_add(u32::from(ret.ptr.size()))
.unwrap();
ret.store = ret ret.store = ret
.externref_activations_table .externref_activations_table
.checked_add(u32::from(ret.ptr.size())) .checked_add(u32::from(ret.ptr.size()))
@@ -469,6 +475,12 @@ impl<P: PtrSize> VMOffsets<P> {
pub fn vminterrupts_fuel_consumed(&self) -> u8 { pub fn vminterrupts_fuel_consumed(&self) -> u8 {
self.pointer_size() self.pointer_size()
} }
/// Return the offset of the `epoch_deadline` field of `VMInterrupts`
#[inline]
pub fn vminterupts_epoch_deadline(&self) -> u8 {
self.pointer_size() + 8 // `stack_limit` is a pointer; `fuel_consumed` is an `i64`
}
} }
/// Offsets for `VMCallerCheckedAnyfunc`. /// Offsets for `VMCallerCheckedAnyfunc`.
@@ -508,6 +520,13 @@ impl<P: PtrSize> VMOffsets<P> {
self.interrupts self.interrupts
} }
/// Return the offset to the `*const AtomicU64` epoch-counter
/// pointer.
#[inline]
pub fn vmctx_epoch_ptr(&self) -> u32 {
self.epoch_ptr
}
/// The offset of the `*mut VMExternRefActivationsTable` member. /// The offset of the `*mut VMExternRefActivationsTable` member.
#[inline] #[inline]
pub fn vmctx_externref_activations_table(&self) -> u32 { pub fn vmctx_externref_activations_table(&self) -> u32 {

View File

@@ -73,8 +73,8 @@ fn main() -> anyhow::Result<()> {
format!("examples/{}.{}", example, extension) format!("examples/{}.{}", example, extension)
}; };
if extension == &"cc" && !std::path::Path::new(&file).exists() { if !std::path::Path::new(&file).exists() {
// cc files are optional so we can skip them. // C and C++ files are optional so we can skip them.
continue; continue;
} }

View File

@@ -21,6 +21,7 @@ use std::convert::TryFrom;
use std::hash::Hash; use std::hash::Hash;
use std::ops::Range; use std::ops::Range;
use std::ptr::NonNull; use std::ptr::NonNull;
use std::sync::atomic::AtomicU64;
use std::sync::Arc; use std::sync::Arc;
use std::{mem, ptr, slice}; use std::{mem, ptr, slice};
use wasmtime_environ::{ use wasmtime_environ::{
@@ -203,6 +204,11 @@ impl Instance {
unsafe { self.vmctx_plus_offset(self.offsets.vmctx_interrupts()) } unsafe { self.vmctx_plus_offset(self.offsets.vmctx_interrupts()) }
} }
/// Return a pointer to the global epoch counter used by this instance.
pub fn epoch_ptr(&self) -> *mut *const AtomicU64 {
unsafe { self.vmctx_plus_offset(self.offsets.vmctx_epoch_ptr()) }
}
/// Return a pointer to the `VMExternRefActivationsTable`. /// Return a pointer to the `VMExternRefActivationsTable`.
pub fn externref_activations_table(&self) -> *mut *mut VMExternRefActivationsTable { pub fn externref_activations_table(&self) -> *mut *mut VMExternRefActivationsTable {
unsafe { self.vmctx_plus_offset(self.offsets.vmctx_externref_activations_table()) } unsafe { self.vmctx_plus_offset(self.offsets.vmctx_externref_activations_table()) }

View File

@@ -463,6 +463,7 @@ fn initialize_instance(
unsafe fn initialize_vmcontext(instance: &mut Instance, req: InstanceAllocationRequest) { unsafe fn initialize_vmcontext(instance: &mut Instance, req: InstanceAllocationRequest) {
if let Some(store) = req.store.as_raw() { if let Some(store) = req.store.as_raw() {
*instance.interrupts() = (*store).vminterrupts(); *instance.interrupts() = (*store).vminterrupts();
*instance.epoch_ptr() = (*store).epoch_ptr();
*instance.externref_activations_table() = (*store).externref_activations_table().0; *instance.externref_activations_table() = (*store).externref_activations_table().0;
instance.set_store(store); instance.set_store(store);
} }

View File

@@ -438,6 +438,7 @@ mod test {
Imports, InstanceAllocationRequest, InstanceLimits, ModuleLimits, Imports, InstanceAllocationRequest, InstanceLimits, ModuleLimits,
PoolingAllocationStrategy, Store, StorePtr, VMSharedSignatureIndex, PoolingAllocationStrategy, Store, StorePtr, VMSharedSignatureIndex,
}; };
use std::sync::atomic::AtomicU64;
use std::sync::Arc; use std::sync::Arc;
use wasmtime_environ::{Memory, MemoryPlan, MemoryStyle, Module, PrimaryMap, Tunables}; use wasmtime_environ::{Memory, MemoryPlan, MemoryStyle, Module, PrimaryMap, Tunables};
@@ -546,6 +547,12 @@ mod test {
fn out_of_gas(&mut self) -> Result<(), anyhow::Error> { fn out_of_gas(&mut self) -> Result<(), anyhow::Error> {
Ok(()) Ok(())
} }
fn epoch_ptr(&self) -> *const AtomicU64 {
std::ptr::null()
}
fn new_epoch(&mut self) -> Result<u64, anyhow::Error> {
Ok(0)
}
} }
struct MockModuleInfo; struct MockModuleInfo;
impl crate::ModuleInfoLookup for MockModuleInfo { impl crate::ModuleInfoLookup for MockModuleInfo {

View File

@@ -20,6 +20,8 @@
) )
)] )]
use std::sync::atomic::AtomicU64;
use anyhow::Error; use anyhow::Error;
mod export; mod export;
@@ -84,6 +86,11 @@ pub unsafe trait Store {
/// in the `VMContext`. /// in the `VMContext`.
fn vminterrupts(&self) -> *mut VMInterrupts; fn vminterrupts(&self) -> *mut VMInterrupts;
/// Returns a pointer to the global epoch counter.
///
/// Used to configure the `VMContext` on initialization.
fn epoch_ptr(&self) -> *const AtomicU64;
/// Returns the externref management structures necessary for this store. /// Returns the externref management structures necessary for this store.
/// ///
/// The first element returned is the table in which externrefs are stored /// The first element returned is the table in which externrefs are stored
@@ -119,4 +126,8 @@ pub unsafe trait Store {
/// is returned that's raised as a trap. Otherwise wasm execution will /// is returned that's raised as a trap. Otherwise wasm execution will
/// continue as normal. /// continue as normal.
fn out_of_gas(&mut self) -> Result<(), Error>; fn out_of_gas(&mut self) -> Result<(), Error>;
/// Callback invoked whenever an instance observes a new epoch
/// number. Cannot fail; cooperative epoch-based yielding is
/// completely semantically transparent. Returns the new deadline.
fn new_epoch(&mut self) -> Result<u64, Error>;
} }

View File

@@ -557,3 +557,11 @@ pub unsafe extern "C" fn wasmtime_out_of_gas(vmctx: *mut VMContext) {
Err(err) => crate::traphandlers::raise_user_trap(err), Err(err) => crate::traphandlers::raise_user_trap(err),
} }
} }
/// Hook for when an instance observes that the epoch has changed.
pub unsafe extern "C" fn wasmtime_new_epoch(vmctx: *mut VMContext) -> u64 {
match (*(*vmctx).instance().store()).new_epoch() {
Ok(new_deadline) => new_deadline,
Err(err) => crate::traphandlers::raise_user_trap(err),
}
}

View File

@@ -631,6 +631,7 @@ impl VMBuiltinFunctionsArray {
ptrs[BuiltinFunctionIndex::memory_atomic_wait64().index() as usize] = ptrs[BuiltinFunctionIndex::memory_atomic_wait64().index() as usize] =
wasmtime_memory_atomic_wait64 as usize; wasmtime_memory_atomic_wait64 as usize;
ptrs[BuiltinFunctionIndex::out_of_gas().index() as usize] = wasmtime_out_of_gas as usize; ptrs[BuiltinFunctionIndex::out_of_gas().index() as usize] = wasmtime_out_of_gas as usize;
ptrs[BuiltinFunctionIndex::new_epoch().index() as usize] = wasmtime_new_epoch as usize;
if cfg!(debug_assertions) { if cfg!(debug_assertions) {
for i in 0..ptrs.len() { for i in 0..ptrs.len() {
@@ -694,12 +695,18 @@ pub struct VMInterrupts {
/// turning positive a wasm trap will be generated. This field is only /// turning positive a wasm trap will be generated. This field is only
/// modified if wasm is configured to consume fuel. /// modified if wasm is configured to consume fuel.
pub fuel_consumed: UnsafeCell<i64>, pub fuel_consumed: UnsafeCell<i64>,
/// Deadline epoch for interruption: if epoch-based interruption
/// is enabled and the global (per engine) epoch counter is
/// observed to reach or exceed this value, the guest code will
/// yield if running asynchronously.
pub epoch_deadline: UnsafeCell<u64>,
} }
// The `VMInterrupts` type is a pod-type with no destructor, and we only access // The `VMInterrupts` type is a pod-type with no destructor, and we
// `stack_limit` from other threads, so add in these trait impls which are // only access `stack_limit` from other threads, so add in these trait
// otherwise not available due to the `fuel_consumed` variable in // impls which are otherwise not available due to the `fuel_consumed`
// `VMInterrupts`. // and `epoch_deadline` variables in `VMInterrupts`.
// //
// Note that users of `fuel_consumed` understand that the unsafety encompasses // Note that users of `fuel_consumed` understand that the unsafety encompasses
// ensuring that it's only mutated/accessed from one thread dynamically. // ensuring that it's only mutated/accessed from one thread dynamically.
@@ -719,6 +726,7 @@ impl Default for VMInterrupts {
VMInterrupts { VMInterrupts {
stack_limit: AtomicUsize::new(usize::max_value()), stack_limit: AtomicUsize::new(usize::max_value()),
fuel_consumed: UnsafeCell::new(0), fuel_consumed: UnsafeCell::new(0),
epoch_deadline: UnsafeCell::new(0),
} }
} }
} }

View File

@@ -316,6 +316,91 @@ impl Config {
self self
} }
/// Enables epoch-based interruption.
///
/// When executing code in async mode, we sometimes want to
/// implement a form of cooperative timeslicing: long-running Wasm
/// guest code should periodically yield to the executor
/// loop. This yielding could be implemented by using "fuel" (see
/// [`consume_fuel`](Config::consume_fuel)). However, fuel
/// instrumentation is somewhat expensive: it modifies the
/// compiled form of the Wasm code so that it maintains a precise
/// instruction count, frequently checking this count against the
/// remaining fuel. If one does not need this precise count or
/// deterministic interruptions, and only needs a periodic
/// interrupt of some form, then It would be better to have a more
/// lightweight mechanism.
///
/// Epoch-based interruption is that mechanism. There is a global
/// "epoch", which is a counter that divides time into arbitrary
/// periods (or epochs). This counter lives on the
/// [`Engine`](crate::Engine) and can be incremented by calling
/// [`Engine::increment_epoch`](crate::Engine::increment_epoch).
/// Epoch-based instrumentation works by setting a "deadline
/// epoch". The compiled code knows the deadline, and at certain
/// points, checks the current epoch against that deadline. It
/// will yield if the deadline has been reached.
///
/// The idea is that checking an infrequently-changing counter is
/// cheaper than counting and frequently storing a precise metric
/// (instructions executed) locally. The interruptions are not
/// deterministic, but if the embedder increments the epoch in a
/// periodic way (say, every regular timer tick by a thread or
/// signal handler), then we can ensure that all async code will
/// yield to the executor within a bounded time.
///
/// The [`Store`](crate::Store) tracks the deadline, and controls
/// what happens when the deadline is reached during
/// execution. Two behaviors are possible:
///
/// - Trap if code is executing when the epoch deadline is
/// met. See
/// [`Store::epoch_deadline_trap`](crate::Store::epoch_deadline_trap).
///
/// - Yield to the executor loop, then resume when the future is
/// next polled. See
/// [`Store::epoch_dealdine_async_yield_and_update`](crate::Store::epoch_deadline_async_yield_and_update).
///
/// The first is the default; set the second for the timeslicing
/// behavior described above.
///
/// This feature is available with or without async
/// support. However, without async support, only the trapping
/// behavior is available. In this mode, epoch-based interruption
/// can serve as a simple external-interruption mechanism.
///
/// An initial deadline can be set before executing code by
/// calling
/// [`Store::set_epoch_deadline`](crate::Store::set_epoch_deadline).
///
/// ## When to use fuel vs. epochs
///
/// In general, epoch-based interruption results in faster
/// execution. This difference is sometimes significant: in some
/// measurements, up to 2-3x. This is because epoch-based
/// interruption does less work: it only watches for a global
/// rarely-changing counter to increment, rather than keeping a
/// local frequently-changing counter and comparing it to a
/// deadline.
///
/// Fuel, in contrast, should be used when *deterministic*
/// yielding or trapping is needed. For example, if it is required
/// that the same function call with the same starting state will
/// always either complete or trap with an out-of-fuel error,
/// deterministically, then fuel with a fixed bound should be
/// used.
///
/// # See Also
///
/// - [`Engine::increment_epoch`](crate::Engine::increment_epoch)
/// - [`Store::set_epoch_deadline`](crate::Store::set_epoch_deadline)
/// - [`Store::epoch_deadline_trap`](crate::Store::epoch_deadline_trap)
/// - [`Store::epoch_deadline_async_yield_and_update`](crate::Store::epoch_deadline_async_yield_and_update)
pub fn epoch_interruption(&mut self, enable: bool) -> &mut Self {
self.tunables.epoch_interruption = enable;
self
}
/// Configures the maximum amount of stack space available for /// Configures the maximum amount of stack space available for
/// executing WebAssembly code. /// executing WebAssembly code.
/// ///

View File

@@ -3,6 +3,7 @@ use crate::{Config, Trap};
use anyhow::Result; use anyhow::Result;
#[cfg(feature = "parallel-compilation")] #[cfg(feature = "parallel-compilation")]
use rayon::prelude::*; use rayon::prelude::*;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc; use std::sync::Arc;
#[cfg(feature = "cache")] #[cfg(feature = "cache")]
use wasmtime_cache::CacheConfig; use wasmtime_cache::CacheConfig;
@@ -41,6 +42,7 @@ struct EngineInner {
compiler: Box<dyn wasmtime_environ::Compiler>, compiler: Box<dyn wasmtime_environ::Compiler>,
allocator: Box<dyn InstanceAllocator>, allocator: Box<dyn InstanceAllocator>,
signatures: SignatureRegistry, signatures: SignatureRegistry,
epoch: AtomicU64,
} }
impl Engine { impl Engine {
@@ -65,6 +67,7 @@ impl Engine {
config, config,
allocator, allocator,
signatures: registry, signatures: registry,
epoch: AtomicU64::new(0),
}), }),
}) })
} }
@@ -119,6 +122,37 @@ impl Engine {
&self.inner.signatures &self.inner.signatures
} }
pub(crate) fn epoch_counter(&self) -> &AtomicU64 {
&self.inner.epoch
}
pub(crate) fn current_epoch(&self) -> u64 {
self.epoch_counter().load(Ordering::Relaxed)
}
/// Increments the epoch.
///
/// When using epoch-based interruption, currently-executing Wasm
/// code within this engine will trap or yield "soon" when the
/// epoch deadline is reached or exceeded. (The configuration, and
/// the deadline, are set on the `Store`.) The intent of the
/// design is for this method to be called by the embedder at some
/// regular cadence, for example by a thread that wakes up at some
/// interval, or by a signal handler.
///
/// See [`Config::epoch_interruption`](crate::Config::epoch_interruption)
/// for an introduction to epoch-based interruption and pointers
/// to the other relevant methods.
///
/// ## Signal Safety
///
/// This method is signal-safe: it does not make any syscalls, and
/// performs only an atomic increment to the epoch value in
/// memory.
pub fn increment_epoch(&self) {
self.inner.epoch.fetch_add(1, Ordering::Relaxed);
}
/// Ahead-of-time (AOT) compiles a WebAssembly module. /// Ahead-of-time (AOT) compiles a WebAssembly module.
/// ///
/// The `bytes` provided must be in one of two formats: /// The `bytes` provided must be in one of two formats:

View File

@@ -596,6 +596,7 @@ impl<'a> SerializedModule<'a> {
parse_wasm_debuginfo, parse_wasm_debuginfo,
interruptable, interruptable,
consume_fuel, consume_fuel,
epoch_interruption,
static_memory_bound_is_maximum, static_memory_bound_is_maximum,
guard_before_linear_memory, guard_before_linear_memory,
@@ -636,6 +637,11 @@ impl<'a> SerializedModule<'a> {
)?; )?;
Self::check_bool(interruptable, other.interruptable, "interruption support")?; Self::check_bool(interruptable, other.interruptable, "interruption support")?;
Self::check_bool(consume_fuel, other.consume_fuel, "fuel support")?; Self::check_bool(consume_fuel, other.consume_fuel, "fuel support")?;
Self::check_bool(
epoch_interruption,
other.epoch_interruption,
"epoch interruption",
)?;
Self::check_bool( Self::check_bool(
static_memory_bound_is_maximum, static_memory_bound_is_maximum,
other.static_memory_bound_is_maximum, other.static_memory_bound_is_maximum,

View File

@@ -88,6 +88,7 @@ use std::mem::{self, ManuallyDrop};
use std::ops::{Deref, DerefMut}; use std::ops::{Deref, DerefMut};
use std::pin::Pin; use std::pin::Pin;
use std::ptr; use std::ptr;
use std::sync::atomic::AtomicU64;
use std::sync::Arc; use std::sync::Arc;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use wasmtime_runtime::{ use wasmtime_runtime::{
@@ -272,6 +273,7 @@ pub struct StoreOpaque {
#[cfg(feature = "async")] #[cfg(feature = "async")]
async_state: AsyncState, async_state: AsyncState,
out_of_gas_behavior: OutOfGas, out_of_gas_behavior: OutOfGas,
epoch_deadline_behavior: EpochDeadline,
store_data: StoreData, store_data: StoreData,
default_callee: InstanceHandle, default_callee: InstanceHandle,
@@ -379,6 +381,18 @@ enum OutOfGas {
}, },
} }
/// What to do when the engine epoch reaches the deadline for a Store
/// during execution of a function using that store.
#[derive(Copy, Clone)]
enum EpochDeadline {
/// Return early with a trap.
Trap,
/// Extend the deadline by the specified number of ticks after
/// yielding to the async executor loop.
#[cfg(feature = "async")]
YieldAndExtendDeadline { delta: u64 },
}
impl<T> Store<T> { impl<T> Store<T> {
/// Creates a new [`Store`] to be associated with the given [`Engine`] and /// Creates a new [`Store`] to be associated with the given [`Engine`] and
/// `data` provided. /// `data` provided.
@@ -435,6 +449,7 @@ impl<T> Store<T> {
current_poll_cx: UnsafeCell::new(ptr::null_mut()), current_poll_cx: UnsafeCell::new(ptr::null_mut()),
}, },
out_of_gas_behavior: OutOfGas::Trap, out_of_gas_behavior: OutOfGas::Trap,
epoch_deadline_behavior: EpochDeadline::Trap,
store_data: StoreData::new(), store_data: StoreData::new(),
default_callee, default_callee,
hostcall_val_storage: Vec::new(), hostcall_val_storage: Vec::new(),
@@ -809,6 +824,86 @@ impl<T> Store<T> {
self.inner self.inner
.out_of_fuel_async_yield(injection_count, fuel_to_inject) .out_of_fuel_async_yield(injection_count, fuel_to_inject)
} }
/// Sets the epoch deadline to a certain number of ticks in the future.
///
/// When the Wasm guest code is compiled with epoch-interruption
/// instrumentation
/// ([`Config::epoch_interruption()`](crate::Config::epoch_interruption)),
/// and when the `Engine`'s epoch is incremented
/// ([`Engine::increment_epoch()`](crate::Engine::increment_epoch))
/// past a deadline, execution can be configured to either trap or
/// yield and then continue.
///
/// This deadline is always set relative to the current epoch:
/// `delta_beyond_current` ticks in the future. The deadline can
/// be set explicitly via this method, or refilled automatically
/// on a yield if configured via
/// [`epoch_deadline_async_yield_and_update()`](Store::epoch_deadline_async_yield_and_update). After
/// this method is invoked, the deadline is reached when
/// [`Engine::increment_epoch()`] has been invoked at least
/// `ticks_beyond_current` times.
///
/// See documentation on
/// [`Config::epoch_interruption()`](crate::Config::epoch_interruption)
/// for an introduction to epoch-based interruption.
pub fn set_epoch_deadline(&mut self, ticks_beyond_current: u64) {
self.inner.set_epoch_deadline(ticks_beyond_current);
}
/// Configures epoch-deadline expiration to trap.
///
/// When epoch-interruption-instrumented code is executed on this
/// store and the epoch deadline is reached before completion,
/// with the store configured in this way, execution will
/// terminate with a trap as soon as an epoch check in the
/// instrumented code is reached.
///
/// This behavior is the default if the store is not otherwise
/// configured via
/// [`epoch_deadline_trap()`](Store::epoch_deadline_trap) or
/// [`epoch_deadline_async_yield_and_update()`](Store::epoch_deadline_async_yield_and_update).
///
/// This setting is intended to allow for coarse-grained
/// interruption, but not a deterministic deadline of a fixed,
/// finite interval. For deterministic interruption, see the
/// "fuel" mechanism instead.
///
/// See documentation on
/// [`Config::epoch_interruption()`](crate::Config::epoch_interruption)
/// for an introduction to epoch-based interruption.
pub fn epoch_deadline_trap(&mut self) {
self.inner.epoch_deadline_trap();
}
#[cfg_attr(nightlydoc, doc(cfg(feature = "async")))]
/// Configures epoch-deadline expiration to yield to the async
/// caller and the update the deadline.
///
/// When epoch-interruption-instrumented code is executed on this
/// store and the epoch deadline is reached before completion,
/// with the store configured in this way, execution will yield
/// (the future will return `Pending` but re-awake itself for
/// later execution) and, upon resuming, the store will be
/// configured with an epoch deadline equal to the current epoch
/// plus `delta` ticks.
///
/// This setting is intended to allow for cooperative timeslicing
/// of multiple CPU-bound Wasm guests in different stores, all
/// executing under the control of an async executor. To drive
/// this, stores should be configured to "yield and update"
/// automatically with this function, and some external driver (a
/// thread that wakes up periodically, or a timer
/// signal/interrupt) should call
/// [`Engine::increment_epoch()`](crate::Engine::increment_epoch).
///
/// See documentation on
/// [`Config::epoch_interruption()`](crate::Config::epoch_interruption)
/// for an introduction to epoch-based interruption.
#[cfg(feature = "async")]
pub fn epoch_deadline_async_yield_and_update(&mut self, delta: u64) {
self.inner.epoch_deadline_async_yield_and_update(delta);
}
} }
impl<'a, T> StoreContext<'a, T> { impl<'a, T> StoreContext<'a, T> {
@@ -913,6 +1008,31 @@ impl<'a, T> StoreContextMut<'a, T> {
self.0 self.0
.out_of_fuel_async_yield(injection_count, fuel_to_inject) .out_of_fuel_async_yield(injection_count, fuel_to_inject)
} }
/// Sets the epoch deadline to a certain number of ticks in the future.
///
/// For more information see [`Store::set_epoch_deadline`].
pub fn set_epoch_deadline(&mut self, ticks_beyond_current: u64) {
self.0.set_epoch_deadline(ticks_beyond_current);
}
/// Configures epoch-deadline expiration to trap.
///
/// For more information see [`Store::epoch_deadline_trap`].
pub fn epoch_deadline_trap(&mut self) {
self.0.epoch_deadline_trap();
}
#[cfg_attr(nightlydoc, doc(cfg(feature = "async")))]
/// Configures epoch-deadline expiration to yield to the async
/// caller and the update the deadline.
///
/// For more information see
/// [`Store::epoch_deadline_async_yield_and_update`].
#[cfg(feature = "async")]
pub fn epoch_deadline_async_yield_and_update(&mut self, delta: u64) {
self.0.epoch_deadline_async_yield_and_update(delta);
}
} }
impl<T> StoreInner<T> { impl<T> StoreInner<T> {
@@ -1093,13 +1213,12 @@ impl StoreOpaque {
}; };
} }
/// Yields execution to the caller on out-of-gas /// Yields execution to the caller on out-of-gas or epoch interruption.
/// ///
/// This only works on async futures and stores, and assumes that we're /// This only works on async futures and stores, and assumes that we're
/// executing on a fiber. This will yield execution back to the caller once /// executing on a fiber. This will yield execution back to the caller once.
/// and when we come back we'll continue with `fuel_to_inject` more fuel.
#[cfg(feature = "async")] #[cfg(feature = "async")]
fn out_of_gas_yield(&mut self, fuel_to_inject: u64) -> Result<(), Trap> { fn async_yield_impl(&mut self) -> Result<(), Trap> {
// Small future that yields once and then returns () // Small future that yields once and then returns ()
#[derive(Default)] #[derive(Default)]
struct Yield { struct Yield {
@@ -1124,19 +1243,15 @@ impl StoreOpaque {
} }
let mut future = Yield::default(); let mut future = Yield::default();
let result = unsafe { self.async_cx().block_on(Pin::new_unchecked(&mut future)) };
match result { // When control returns, we have a `Result<(), Trap>` passed
// If this finished successfully then we were resumed normally via a // in from the host fiber. If this finished successfully then
// `poll`, so inject some more fuel and keep going. // we were resumed normally via a `poll`, so keep going. If
Ok(()) => { // the future was dropped while we were yielded, then we need
self.add_fuel(fuel_to_inject).unwrap(); // to clean up this fiber. Do so by raising a trap which will
Ok(()) // abort all wasm and get caught on the other side to clean
} // things up.
// If the future was dropped while we were yielded, then we need to unsafe { self.async_cx().block_on(Pin::new_unchecked(&mut future)) }
// clean up this fiber. Do so by raising a trap which will abort all
// wasm and get caught on the other side to clean things up.
Err(trap) => Err(trap),
}
} }
fn add_fuel(&mut self, fuel: u64) -> Result<()> { fn add_fuel(&mut self, fuel: u64) -> Result<()> {
@@ -1187,6 +1302,22 @@ impl StoreOpaque {
} }
} }
fn epoch_deadline_trap(&mut self) {
self.epoch_deadline_behavior = EpochDeadline::Trap;
}
fn epoch_deadline_async_yield_and_update(&mut self, delta: u64) {
assert!(
self.async_support(),
"cannot use `epoch_deadline_async_yield_and_update` without enabling async support in the config"
);
#[cfg(feature = "async")]
{
self.epoch_deadline_behavior = EpochDeadline::YieldAndExtendDeadline { delta };
}
drop(delta); // suppress warning in non-async build
}
#[inline] #[inline]
pub fn signal_handler(&self) -> Option<*const SignalHandler<'static>> { pub fn signal_handler(&self) -> Option<*const SignalHandler<'static>> {
let handler = self.signal_handler.as_ref()?; let handler = self.signal_handler.as_ref()?;
@@ -1535,6 +1666,10 @@ unsafe impl<T> wasmtime_runtime::Store for StoreInner<T> {
<StoreOpaque>::vminterrupts(self) <StoreOpaque>::vminterrupts(self)
} }
fn epoch_ptr(&self) -> *const AtomicU64 {
self.engine.epoch_counter() as *const _
}
fn externref_activations_table( fn externref_activations_table(
&mut self, &mut self,
) -> ( ) -> (
@@ -1651,7 +1786,10 @@ unsafe impl<T> wasmtime_runtime::Store for StoreInner<T> {
} }
*injection_count -= 1; *injection_count -= 1;
let fuel = *fuel_to_inject; let fuel = *fuel_to_inject;
self.out_of_gas_yield(fuel)?; self.async_yield_impl()?;
if fuel > 0 {
self.add_fuel(fuel).unwrap();
}
Ok(()) Ok(())
} }
#[cfg(not(feature = "async"))] #[cfg(not(feature = "async"))]
@@ -1669,6 +1807,59 @@ unsafe impl<T> wasmtime_runtime::Store for StoreInner<T> {
impl std::error::Error for OutOfGasError {} impl std::error::Error for OutOfGasError {}
} }
fn new_epoch(&mut self) -> Result<u64, anyhow::Error> {
return match &self.epoch_deadline_behavior {
&EpochDeadline::Trap => Err(anyhow::Error::new(EpochDeadlineError)),
#[cfg(feature = "async")]
&EpochDeadline::YieldAndExtendDeadline { delta } => {
// Do the async yield. May return a trap if future was
// canceled while we're yielded.
self.async_yield_impl()?;
// Set a new deadline.
self.set_epoch_deadline(delta);
// Return the new epoch deadline so the Wasm code
// doesn't have to reload it.
Ok(self.get_epoch_deadline())
}
};
#[derive(Debug)]
struct EpochDeadlineError;
impl fmt::Display for EpochDeadlineError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("epoch deadline reached during execution")
}
}
impl std::error::Error for EpochDeadlineError {}
}
}
impl<T> StoreInner<T> {
pub(crate) fn set_epoch_deadline(&mut self, delta: u64) {
// Set a new deadline based on the "epoch deadline delta".
//
// Safety: this is safe because the epoch deadline in the
// `VMInterrupts` is accessed only here and by Wasm guest code
// running in this store, and we have a `&mut self` here.
//
// Also, note that when this update is performed while Wasm is
// on the stack, the Wasm will reload the new value once we
// return into it.
let epoch_deadline = unsafe { (*self.vminterrupts()).epoch_deadline.get_mut() };
*epoch_deadline = self.engine().current_epoch() + delta;
}
fn get_epoch_deadline(&self) -> u64 {
// Safety: this is safe because, as above, it is only invoked
// from within `new_epoch` which is called from guest Wasm
// code, which will have an exclusive borrow on the Store.
let epoch_deadline = unsafe { (*self.vminterrupts()).epoch_deadline.get_mut() };
*epoch_deadline
}
} }
impl<T: Default> Default for Store<T> { impl<T: Default> Default for Store<T> {

48
examples/epochs.rs Normal file
View File

@@ -0,0 +1,48 @@
//! Example of interrupting a WebAssembly function's runtime via epoch
//! changes ("epoch interruption") in a synchronous context. To see
//! an example of setup for asynchronous usage, see
//! `tests/all/epoch_interruption.rs`
use anyhow::Error;
use std::sync::Arc;
use wasmtime::{Config, Engine, Instance, Module, Store};
fn main() -> Result<(), Error> {
// Set up an engine configured with epoch interruption enabled.
let mut config = Config::new();
config.epoch_interruption(true);
let engine = Arc::new(Engine::new(&config)?);
let mut store = Store::new(&engine, ());
// Configure the store to trap on reaching the epoch deadline.
// This is the default, but we do it explicitly here to
// demonstrate.
store.epoch_deadline_trap();
// Configure the store to have an initial epoch deadline one tick
// in the future.
store.set_epoch_deadline(1);
// Reuse the fibonacci function from the Fuel example. This is a
// long-running function that we will want to interrupt.
let module = Module::from_file(store.engine(), "examples/fuel.wat")?;
let instance = Instance::new(&mut store, &module, &[])?;
// Start a thread that will bump the epoch after 1 second.
let engine_clone = engine.clone();
std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_secs(1));
engine_clone.increment_epoch();
});
// Invoke `fibonacci` with a large argument such that a normal
// invocation would take many seconds to complete.
let fibonacci = instance.get_typed_func::<i32, i32, _>(&mut store, "fibonacci")?;
match fibonacci.call(&mut store, 100) {
Ok(_) => panic!("Somehow we computed recursive fib(100) in less than a second!"),
Err(_) => {
println!("Trapped out of fib(100) after epoch increment");
}
};
Ok(())
}

View File

@@ -236,6 +236,12 @@ struct CommonOptions {
#[structopt(long)] #[structopt(long)]
consume_fuel: bool, consume_fuel: bool,
/// Executing wasm code will yield when a global epoch counter
/// changes, allowing for async operation without blocking the
/// executor.
#[structopt(long)]
epoch_interruption: bool,
/// Disables the on-by-default address map from native code to wasm code. /// Disables the on-by-default address map from native code to wasm code.
#[structopt(long)] #[structopt(long)]
disable_address_map: bool, disable_address_map: bool,
@@ -315,6 +321,7 @@ impl CommonOptions {
} }
config.consume_fuel(self.consume_fuel); config.consume_fuel(self.consume_fuel);
config.epoch_interruption(self.epoch_interruption);
config.generate_address_map(!self.disable_address_map); config.generate_address_map(!self.disable_address_map);
config.paged_memory_initialization(self.paged_memory_initialization); config.paged_memory_initialization(self.paged_memory_initialization);

View File

@@ -0,0 +1,421 @@
use std::future::Future;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
use wasmtime::*;
fn dummy_waker() -> Waker {
return unsafe { Waker::from_raw(clone(5 as *const _)) };
unsafe fn clone(ptr: *const ()) -> RawWaker {
assert_eq!(ptr as usize, 5);
const VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake_by_ref, drop);
RawWaker::new(ptr, &VTABLE)
}
unsafe fn wake(ptr: *const ()) {
assert_eq!(ptr as usize, 5);
}
unsafe fn wake_by_ref(ptr: *const ()) {
assert_eq!(ptr as usize, 5);
}
unsafe fn drop(ptr: *const ()) {
assert_eq!(ptr as usize, 5);
}
}
fn build_engine() -> Arc<Engine> {
let mut config = Config::new();
config.async_support(true);
config.epoch_interruption(true);
Arc::new(Engine::new(&config).unwrap())
}
fn make_env(engine: &Engine) -> Linker<()> {
let mut linker = Linker::new(engine);
let engine = engine.clone();
linker
.func_new(
"",
"bump_epoch",
FuncType::new(None, None),
move |_caller, _params, _results| {
engine.increment_epoch();
Ok(())
},
)
.unwrap();
linker
}
/// Run a test with the given wasm, giving an initial deadline of
/// `initial` ticks in the future, and either configuring the wasm to
/// yield and set a deadline `delta` ticks in the future if `delta` is
/// `Some(..)` or trapping if `delta` is `None`.
///
/// Returns `Some(yields)` if function completed normally, giving the
/// number of yields that occured, or `None` if a trap occurred.
fn run_and_count_yields_or_trap<F: Fn(Arc<Engine>)>(
wasm: &str,
initial: u64,
delta: Option<u64>,
setup_func: F,
) -> Option<usize> {
let engine = build_engine();
let linker = make_env(&engine);
let module = Module::new(&engine, wasm).unwrap();
let mut store = Store::new(&engine, ());
let instance = linker.instantiate(&mut store, &module).unwrap();
let f = instance.get_func(&mut store, "run").unwrap();
store.set_epoch_deadline(initial);
match delta {
Some(delta) => {
store.epoch_deadline_async_yield_and_update(delta);
}
None => {
store.epoch_deadline_trap();
}
}
let engine_clone = engine.clone();
setup_func(engine_clone);
let mut future = Box::pin(f.call_async(&mut store, &[], &mut []));
let mut yields = 0;
loop {
match future
.as_mut()
.poll(&mut Context::from_waker(&dummy_waker()))
{
Poll::Pending => {
yields += 1;
}
Poll::Ready(Ok(..)) => {
break;
}
Poll::Ready(Err(e)) => match e.downcast::<wasmtime::Trap>() {
Ok(_) => {
return None;
}
e => {
e.unwrap();
}
},
}
}
Some(yields)
}
#[test]
fn epoch_yield_at_func_entry() {
// Should yield at start of call to func $subfunc.
assert_eq!(
Some(1),
run_and_count_yields_or_trap(
"
(module
(import \"\" \"bump_epoch\" (func $bump))
(func (export \"run\")
call $bump ;; bump epoch
call $subfunc) ;; call func; will notice new epoch and yield
(func $subfunc))
",
1,
Some(1),
|_| {},
)
);
}
#[test]
fn epoch_yield_at_loop_header() {
// Should yield at top of loop, once per five iters.
assert_eq!(
Some(2),
run_and_count_yields_or_trap(
"
(module
(import \"\" \"bump_epoch\" (func $bump))
(func (export \"run\")
(local $i i32)
(local.set $i (i32.const 10))
(loop $l
call $bump
(br_if $l (local.tee $i (i32.sub (local.get $i) (i32.const 1)))))))
",
0,
Some(5),
|_| {},
)
);
}
#[test]
fn epoch_yield_immediate() {
// We should see one yield immediately when the initial deadline
// is zero.
assert_eq!(
Some(1),
run_and_count_yields_or_trap(
"
(module
(import \"\" \"bump_epoch\" (func $bump))
(func (export \"run\")))
",
0,
Some(1),
|_| {},
)
);
}
#[test]
fn epoch_yield_only_once() {
// We should yield from the subfunction, and then when we return
// to the outer function and hit another loop header, we should
// not yield again (the double-check block will reload the correct
// epoch).
assert_eq!(
Some(1),
run_and_count_yields_or_trap(
"
(module
(import \"\" \"bump_epoch\" (func $bump))
(func (export \"run\")
(local $i i32)
(call $subfunc)
(local.set $i (i32.const 0))
(loop $l
(br_if $l (i32.eq (i32.const 10)
(local.tee $i (i32.add (i32.const 1) (local.get $i)))))))
(func $subfunc
(call $bump)))
",
1,
Some(1),
|_| {},
)
);
}
#[test]
fn epoch_interrupt_infinite_loop() {
assert_eq!(
None,
run_and_count_yields_or_trap(
"
(module
(import \"\" \"bump_epoch\" (func $bump))
(func (export \"run\")
(loop $l
(br $l))))
",
1,
None,
|engine| {
std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_millis(50));
engine.increment_epoch();
});
},
)
);
}
#[test]
fn epoch_interrupt_function_entries() {
assert_eq!(
None,
run_and_count_yields_or_trap(
"
(module
(import \"\" \"bump_epoch\" (func $bump))
(func (export \"run\")
call $f1
call $f1
call $f1
call $f1
call $f1
call $f1
call $f1
call $f1
call $f1
call $f1)
(func $f1
call $f2
call $f2
call $f2
call $f2
call $f2
call $f2
call $f2
call $f2
call $f2
call $f2)
(func $f2
call $f3
call $f3
call $f3
call $f3
call $f3
call $f3
call $f3
call $f3
call $f3
call $f3)
(func $f3
call $f4
call $f4
call $f4
call $f4
call $f4
call $f4
call $f4
call $f4
call $f4
call $f4)
(func $f4
call $f5
call $f5
call $f5
call $f5
call $f5
call $f5
call $f5
call $f5
call $f5
call $f5)
(func $f5
call $f6
call $f6
call $f6
call $f6
call $f6
call $f6
call $f6
call $f6
call $f6
call $f6)
(func $f6
call $f7
call $f7
call $f7
call $f7
call $f7
call $f7
call $f7
call $f7
call $f7
call $f7)
(func $f7
call $f8
call $f8
call $f8
call $f8
call $f8
call $f8
call $f8
call $f8
call $f8
call $f8)
(func $f8
call $f9
call $f9
call $f9
call $f9
call $f9
call $f9
call $f9
call $f9
call $f9
call $f9)
(func $f9))
",
1,
None,
|engine| {
std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_millis(50));
engine.increment_epoch();
});
},
)
);
}
#[test]
fn drop_future_on_epoch_yield() {
let wasm = "
(module
(import \"\" \"bump_epoch\" (func $bump))
(import \"\" \"im_alive\" (func $im_alive))
(import \"\" \"oops\" (func $oops))
(func (export \"run\")
(call $im_alive)
(call $bump)
(call $subfunc) ;; subfunc entry to do epoch check
(call $oops))
(func $subfunc))
";
let engine = build_engine();
let mut linker = make_env(&engine);
// Create a few helpers for the Wasm to call.
let alive_flag = Arc::new(AtomicBool::new(false));
let alive_flag_clone = alive_flag.clone();
linker
.func_new(
"",
"oops",
FuncType::new(None, None),
move |_caller, _params, _results| {
panic!("Should not have reached this point!");
},
)
.unwrap();
linker
.func_new(
"",
"im_alive",
FuncType::new(None, None),
move |_caller, _params, _results| {
alive_flag_clone.store(true, Ordering::Release);
Ok(())
},
)
.unwrap();
let module = Module::new(&engine, wasm).unwrap();
let mut store = Store::new(&engine, ());
let instance = linker.instantiate(&mut store, &module).unwrap();
let f = instance.get_func(&mut store, "run").unwrap();
store.set_epoch_deadline(1);
store.epoch_deadline_async_yield_and_update(1);
let mut future = Box::pin(f.call_async(&mut store, &[], &mut []));
match future
.as_mut()
.poll(&mut Context::from_waker(&dummy_waker()))
{
Poll::Pending => {
// OK: expected yield.
}
Poll::Ready(Ok(..)) => {
panic!("Shoulud not have returned");
}
Poll::Ready(e) => {
e.unwrap();
}
}
drop(future);
assert_eq!(true, alive_flag.load(Ordering::Acquire));
}

View File

@@ -3,6 +3,7 @@ mod call_hook;
mod cli_tests; mod cli_tests;
mod custom_signal_handler; mod custom_signal_handler;
mod debug; mod debug;
mod epoch_interruption;
mod externals; mod externals;
mod fuel; mod fuel;
mod func; mod func;