Run a callback when the interruption epoch is reached (#4152)

* Run a callback when the interruption epoch is reached

Adds Store::epoch_deadline_callback. This accepts a callback which, when
invoked, can mutate the store's contents. The callback can either return
an error (in which case we trap) or return a delta which we'll use to
set the new epoch deadline.

* Add a basic test for epoch interruption callback

* Some small nits

 - Remove use of &mut in the pattern match
 - Return both yields and state from run_and_count_yields_or_trap in
   test code and assert on them separately.
 - Add a test for trapping on a state failure.
This commit is contained in:
Jonathan Coates
2022-05-16 13:28:23 +01:00
committed by GitHub
parent 8d7bccefcb
commit f19d8cc851
3 changed files with 155 additions and 50 deletions

View File

@@ -360,23 +360,27 @@ impl Config {
/// ///
/// The [`Store`](crate::Store) tracks the deadline, and controls /// The [`Store`](crate::Store) tracks the deadline, and controls
/// what happens when the deadline is reached during /// what happens when the deadline is reached during
/// execution. Two behaviors are possible: /// execution. Several behaviors are possible:
/// ///
/// - Trap if code is executing when the epoch deadline is /// - Trap if code is executing when the epoch deadline is
/// met. See /// met. See
/// [`Store::epoch_deadline_trap`](crate::Store::epoch_deadline_trap). /// [`Store::epoch_deadline_trap`](crate::Store::epoch_deadline_trap).
/// ///
/// - Call an arbitrary function. This function may chose to trap or
/// increment the epoch. See
/// [`Store::epoch_deadline_callback`](crate::Store::epoch_deadline_callback).
///
/// - Yield to the executor loop, then resume when the future is /// - Yield to the executor loop, then resume when the future is
/// next polled. See /// next polled. See
/// [`Store::epoch_deadline_async_yield_and_update`](crate::Store::epoch_deadline_async_yield_and_update). /// [`Store::epoch_deadline_async_yield_and_update`](crate::Store::epoch_deadline_async_yield_and_update).
/// ///
/// The first is the default; set the second for the timeslicing /// Trapping is the default. The yielding behaviour may be used for
/// behavior described above. /// the timeslicing behavior described above.
/// ///
/// This feature is available with or without async /// This feature is available with or without async support.
/// support. However, without async support, only the trapping /// However, without async support, the timeslicing behaviour is
/// behavior is available. In this mode, epoch-based interruption /// not available. This means epoch-based interruption can only
/// can serve as a simple external-interruption mechanism. /// serve as a simple external-interruption mechanism.
/// ///
/// An initial deadline can be set before executing code by /// An initial deadline can be set before executing code by
/// calling /// calling
@@ -404,6 +408,7 @@ impl Config {
/// - [`Engine::increment_epoch`](crate::Engine::increment_epoch) /// - [`Engine::increment_epoch`](crate::Engine::increment_epoch)
/// - [`Store::set_epoch_deadline`](crate::Store::set_epoch_deadline) /// - [`Store::set_epoch_deadline`](crate::Store::set_epoch_deadline)
/// - [`Store::epoch_deadline_trap`](crate::Store::epoch_deadline_trap) /// - [`Store::epoch_deadline_trap`](crate::Store::epoch_deadline_trap)
/// - [`Store::epoch_deadline_callback`](crate::Store::epoch_deadline_callback)
/// - [`Store::epoch_deadline_async_yield_and_update`](crate::Store::epoch_deadline_async_yield_and_update) /// - [`Store::epoch_deadline_async_yield_and_update`](crate::Store::epoch_deadline_async_yield_and_update)
pub fn epoch_interruption(&mut self, enable: bool) -> &mut Self { pub fn epoch_interruption(&mut self, enable: bool) -> &mut Self {
self.tunables.epoch_interruption = enable; self.tunables.epoch_interruption = enable;

View File

@@ -202,6 +202,7 @@ pub struct StoreInner<T> {
limiter: Option<ResourceLimiterInner<T>>, limiter: Option<ResourceLimiterInner<T>>,
call_hook: Option<CallHookInner<T>>, call_hook: Option<CallHookInner<T>>,
epoch_deadline_behavior: EpochDeadline<T>,
// for comments about `ManuallyDrop`, see `Store::into_data` // for comments about `ManuallyDrop`, see `Store::into_data`
data: ManuallyDrop<T>, data: ManuallyDrop<T>,
} }
@@ -296,7 +297,6 @@ pub struct StoreOpaque {
#[cfg(feature = "async")] #[cfg(feature = "async")]
async_state: AsyncState, async_state: AsyncState,
out_of_gas_behavior: OutOfGas, out_of_gas_behavior: OutOfGas,
epoch_deadline_behavior: EpochDeadline,
/// Indexed data within this `Store`, used to store information about /// Indexed data within this `Store`, used to store information about
/// globals, functions, memories, etc. /// globals, functions, memories, etc.
/// ///
@@ -426,10 +426,11 @@ enum OutOfGas {
/// What to do when the engine epoch reaches the deadline for a Store /// What to do when the engine epoch reaches the deadline for a Store
/// during execution of a function using that store. /// during execution of a function using that store.
#[derive(Copy, Clone)] enum EpochDeadline<T> {
enum EpochDeadline {
/// Return early with a trap. /// Return early with a trap.
Trap, Trap,
/// Call a custom deadline handler.
Callback(Box<dyn FnMut(&mut T) -> Result<u64> + Send + Sync>),
/// Extend the deadline by the specified number of ticks after /// Extend the deadline by the specified number of ticks after
/// yielding to the async executor loop. /// yielding to the async executor loop.
#[cfg(feature = "async")] #[cfg(feature = "async")]
@@ -491,7 +492,6 @@ impl<T> Store<T> {
current_poll_cx: UnsafeCell::new(ptr::null_mut()), current_poll_cx: UnsafeCell::new(ptr::null_mut()),
}, },
out_of_gas_behavior: OutOfGas::Trap, out_of_gas_behavior: OutOfGas::Trap,
epoch_deadline_behavior: EpochDeadline::Trap,
store_data: ManuallyDrop::new(StoreData::new()), store_data: ManuallyDrop::new(StoreData::new()),
default_callee, default_callee,
hostcall_val_storage: Vec::new(), hostcall_val_storage: Vec::new(),
@@ -500,6 +500,7 @@ impl<T> Store<T> {
}, },
limiter: None, limiter: None,
call_hook: None, call_hook: None,
epoch_deadline_behavior: EpochDeadline::Trap,
data: ManuallyDrop::new(data), data: ManuallyDrop::new(data),
}); });
@@ -842,7 +843,8 @@ impl<T> Store<T> {
/// ///
/// This behavior is the default if the store is not otherwise /// This behavior is the default if the store is not otherwise
/// configured via /// configured via
/// [`epoch_deadline_trap()`](Store::epoch_deadline_trap) or /// [`epoch_deadline_trap()`](Store::epoch_deadline_trap),
/// [`epoch_deadline_callback()`](Store::epoch_deadline_callback) or
/// [`epoch_deadline_async_yield_and_update()`](Store::epoch_deadline_async_yield_and_update). /// [`epoch_deadline_async_yield_and_update()`](Store::epoch_deadline_async_yield_and_update).
/// ///
/// This setting is intended to allow for coarse-grained /// This setting is intended to allow for coarse-grained
@@ -857,6 +859,33 @@ impl<T> Store<T> {
self.inner.epoch_deadline_trap(); self.inner.epoch_deadline_trap();
} }
/// Configures epoch-deadline expiration to invoke a custom callback
/// function.
///
/// When epoch-interruption-instrumented code is executed on this
/// store and the epoch deadline is reached before completion, the
/// provided callback function is invoked.
///
/// This function should return a positive `delta`, which is used to
/// update the new epoch, setting it to the current epoch plus
/// `delta` ticks. Alternatively, the callback may return an error,
/// which will terminate execution.
///
/// This setting is intended to allow for coarse-grained
/// interruption, but not a deterministic deadline of a fixed,
/// finite interval. For deterministic interruption, see the
/// "fuel" mechanism instead.
///
/// See documentation on
/// [`Config::epoch_interruption()`](crate::Config::epoch_interruption)
/// for an introduction to epoch-based interruption.
pub fn epoch_deadline_callback(
&mut self,
callback: impl FnMut(&mut T) -> Result<u64> + Send + Sync + 'static,
) {
self.inner.epoch_deadline_callback(Box::new(callback));
}
#[cfg_attr(nightlydoc, doc(cfg(feature = "async")))] #[cfg_attr(nightlydoc, doc(cfg(feature = "async")))]
/// Configures epoch-deadline expiration to yield to the async /// Configures epoch-deadline expiration to yield to the async
/// caller and the update the deadline. /// caller and the update the deadline.
@@ -1351,22 +1380,6 @@ impl StoreOpaque {
} }
} }
fn epoch_deadline_trap(&mut self) {
self.epoch_deadline_behavior = EpochDeadline::Trap;
}
fn epoch_deadline_async_yield_and_update(&mut self, delta: u64) {
assert!(
self.async_support(),
"cannot use `epoch_deadline_async_yield_and_update` without enabling async support in the config"
);
#[cfg(feature = "async")]
{
self.epoch_deadline_behavior = EpochDeadline::YieldAndExtendDeadline { delta };
}
drop(delta); // suppress warning in non-async build
}
#[inline] #[inline]
pub fn signal_handler(&self) -> Option<*const SignalHandler<'static>> { pub fn signal_handler(&self) -> Option<*const SignalHandler<'static>> {
let handler = self.signal_handler.as_ref()?; let handler = self.signal_handler.as_ref()?;
@@ -1855,8 +1868,8 @@ unsafe impl<T> wasmtime_runtime::Store for StoreInner<T> {
} }
fn new_epoch(&mut self) -> Result<u64, anyhow::Error> { fn new_epoch(&mut self) -> Result<u64, anyhow::Error> {
return match &self.epoch_deadline_behavior { return match &mut self.epoch_deadline_behavior {
&EpochDeadline::Trap => { EpochDeadline::Trap => {
let trap = Trap::new_wasm( let trap = Trap::new_wasm(
None, None,
wasmtime_environ::TrapCode::Interrupt, wasmtime_environ::TrapCode::Interrupt,
@@ -1864,8 +1877,16 @@ unsafe impl<T> wasmtime_runtime::Store for StoreInner<T> {
); );
Err(anyhow::Error::from(trap)) Err(anyhow::Error::from(trap))
} }
EpochDeadline::Callback(callback) => {
let delta = callback(&mut self.data)?;
// Set a new deadline and return the new epoch deadline so
// the Wasm code doesn't have to reload it.
self.set_epoch_deadline(delta);
Ok(self.get_epoch_deadline())
}
#[cfg(feature = "async")] #[cfg(feature = "async")]
&EpochDeadline::YieldAndExtendDeadline { delta } => { EpochDeadline::YieldAndExtendDeadline { delta } => {
let delta = *delta;
// Do the async yield. May return a trap if future was // Do the async yield. May return a trap if future was
// canceled while we're yielded. // canceled while we're yielded.
self.async_yield_impl()?; self.async_yield_impl()?;
@@ -1895,6 +1916,29 @@ impl<T> StoreInner<T> {
*epoch_deadline = self.engine().current_epoch() + delta; *epoch_deadline = self.engine().current_epoch() + delta;
} }
fn epoch_deadline_trap(&mut self) {
self.epoch_deadline_behavior = EpochDeadline::Trap;
}
fn epoch_deadline_callback(
&mut self,
callback: Box<dyn FnMut(&mut T) -> Result<u64> + Send + Sync>,
) {
self.epoch_deadline_behavior = EpochDeadline::Callback(callback);
}
fn epoch_deadline_async_yield_and_update(&mut self, delta: u64) {
assert!(
self.async_support(),
"cannot use `epoch_deadline_async_yield_and_update` without enabling async support in the config"
);
#[cfg(feature = "async")]
{
self.epoch_deadline_behavior = EpochDeadline::YieldAndExtendDeadline { delta };
}
drop(delta); // suppress warning in non-async build
}
fn get_epoch_deadline(&self) -> u64 { fn get_epoch_deadline(&self) -> u64 {
// Safety: this is safe because, as above, it is only invoked // Safety: this is safe because, as above, it is only invoked
// from within `new_epoch` which is called from guest Wasm // from within `new_epoch` which is called from guest Wasm

View File

@@ -1,4 +1,5 @@
use crate::async_functions::{CountPending, PollOnce}; use crate::async_functions::{CountPending, PollOnce};
use anyhow::{anyhow, Result};
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
use wasmtime::*; use wasmtime::*;
@@ -10,7 +11,7 @@ fn build_engine() -> Arc<Engine> {
Arc::new(Engine::new(&config).unwrap()) Arc::new(Engine::new(&config).unwrap())
} }
fn make_env(engine: &Engine) -> Linker<()> { fn make_env<T>(engine: &Engine) -> Linker<T> {
let mut linker = Linker::new(engine); let mut linker = Linker::new(engine);
let engine = engine.clone(); let engine = engine.clone();
@@ -29,29 +30,38 @@ fn make_env(engine: &Engine) -> Linker<()> {
linker linker
} }
enum InterruptMode {
Trap,
Callback(fn(&mut usize) -> Result<u64>),
Yield(u64),
}
/// Run a test with the given wasm, giving an initial deadline of /// Run a test with the given wasm, giving an initial deadline of
/// `initial` ticks in the future, and either configuring the wasm to /// `initial` ticks in the future, and either configuring the wasm to
/// yield and set a deadline `delta` ticks in the future if `delta` is /// yield and set a deadline `delta` ticks in the future if `delta` is
/// `Some(..)` or trapping if `delta` is `None`. /// `Some(..)` or trapping if `delta` is `None`.
/// ///
/// Returns `Some(yields)` if function completed normally, giving the /// Returns `Some((yields, store))` if function completed normally, giving
/// number of yields that occured, or `None` if a trap occurred. /// the number of yields that occurred, or `None` if a trap occurred.
async fn run_and_count_yields_or_trap<F: Fn(Arc<Engine>)>( async fn run_and_count_yields_or_trap<F: Fn(Arc<Engine>)>(
wasm: &str, wasm: &str,
initial: u64, initial: u64,
delta: Option<u64>, delta: InterruptMode,
setup_func: F, setup_func: F,
) -> Option<usize> { ) -> Option<(usize, usize)> {
let engine = build_engine(); let engine = build_engine();
let linker = make_env(&engine); let linker = make_env(&engine);
let module = Module::new(&engine, wasm).unwrap(); let module = Module::new(&engine, wasm).unwrap();
let mut store = Store::new(&engine, ()); let mut store = Store::new(&engine, 0);
store.set_epoch_deadline(initial); store.set_epoch_deadline(initial);
match delta { match delta {
Some(delta) => { InterruptMode::Yield(delta) => {
store.epoch_deadline_async_yield_and_update(delta); store.epoch_deadline_async_yield_and_update(delta);
} }
None => { InterruptMode::Callback(func) => {
store.epoch_deadline_callback(func);
}
InterruptMode::Trap => {
store.epoch_deadline_trap(); store.epoch_deadline_trap();
} }
} }
@@ -63,14 +73,15 @@ async fn run_and_count_yields_or_trap<F: Fn(Arc<Engine>)>(
let f = instance.get_func(&mut store, "run").unwrap(); let f = instance.get_func(&mut store, "run").unwrap();
let (result, yields) = let (result, yields) =
CountPending::new(Box::pin(f.call_async(&mut store, &[], &mut []))).await; CountPending::new(Box::pin(f.call_async(&mut store, &[], &mut []))).await;
return result.ok().map(|_| yields); let store = store.data();
return result.ok().map(|_| (yields, *store));
} }
#[tokio::test] #[tokio::test]
async fn epoch_yield_at_func_entry() { async fn epoch_yield_at_func_entry() {
// Should yield at start of call to func $subfunc. // Should yield at start of call to func $subfunc.
assert_eq!( assert_eq!(
Some(1), Some((1, 0)),
run_and_count_yields_or_trap( run_and_count_yields_or_trap(
" "
(module (module
@@ -81,7 +92,7 @@ async fn epoch_yield_at_func_entry() {
(func $subfunc)) (func $subfunc))
", ",
1, 1,
Some(1), InterruptMode::Yield(1),
|_| {}, |_| {},
) )
.await .await
@@ -92,7 +103,7 @@ async fn epoch_yield_at_func_entry() {
async fn epoch_yield_at_loop_header() { async fn epoch_yield_at_loop_header() {
// Should yield at top of loop, once per five iters. // Should yield at top of loop, once per five iters.
assert_eq!( assert_eq!(
Some(2), Some((2, 0)),
run_and_count_yields_or_trap( run_and_count_yields_or_trap(
" "
(module (module
@@ -105,7 +116,7 @@ async fn epoch_yield_at_loop_header() {
(br_if $l (local.tee $i (i32.sub (local.get $i) (i32.const 1))))))) (br_if $l (local.tee $i (i32.sub (local.get $i) (i32.const 1)))))))
", ",
0, 0,
Some(5), InterruptMode::Yield(5),
|_| {}, |_| {},
) )
.await .await
@@ -117,7 +128,7 @@ async fn epoch_yield_immediate() {
// We should see one yield immediately when the initial deadline // We should see one yield immediately when the initial deadline
// is zero. // is zero.
assert_eq!( assert_eq!(
Some(1), Some((1, 0)),
run_and_count_yields_or_trap( run_and_count_yields_or_trap(
" "
(module (module
@@ -125,7 +136,7 @@ async fn epoch_yield_immediate() {
(func (export \"run\"))) (func (export \"run\")))
", ",
0, 0,
Some(1), InterruptMode::Yield(1),
|_| {}, |_| {},
) )
.await .await
@@ -139,7 +150,7 @@ async fn epoch_yield_only_once() {
// not yield again (the double-check block will reload the correct // not yield again (the double-check block will reload the correct
// epoch). // epoch).
assert_eq!( assert_eq!(
Some(1), Some((1, 0)),
run_and_count_yields_or_trap( run_and_count_yields_or_trap(
" "
(module (module
@@ -155,7 +166,7 @@ async fn epoch_yield_only_once() {
(call $bump))) (call $bump)))
", ",
1, 1,
Some(1), InterruptMode::Yield(1),
|_| {}, |_| {},
) )
.await .await
@@ -175,7 +186,7 @@ async fn epoch_interrupt_infinite_loop() {
(br $l)))) (br $l))))
", ",
1, 1,
None, InterruptMode::Trap,
|engine| { |engine| {
std::thread::spawn(move || { std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_millis(50)); std::thread::sleep(std::time::Duration::from_millis(50));
@@ -297,7 +308,7 @@ async fn epoch_interrupt_function_entries() {
(func $f9)) (func $f9))
", ",
1, 1,
None, InterruptMode::Trap,
|engine| { |engine| {
std::thread::spawn(move || { std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_millis(50)); std::thread::sleep(std::time::Duration::from_millis(50));
@@ -309,6 +320,51 @@ async fn epoch_interrupt_function_entries() {
); );
} }
#[tokio::test]
async fn epoch_callback_continue() {
assert_eq!(
Some((0, 1)),
run_and_count_yields_or_trap(
"
(module
(import \"\" \"bump_epoch\" (func $bump))
(func (export \"run\")
call $bump ;; bump epoch
call $subfunc) ;; call func; will notice new epoch and yield
(func $subfunc))
",
1,
InterruptMode::Callback(|s| {
*s += 1;
Ok(1)
}),
|_| {},
)
.await
);
}
#[tokio::test]
async fn epoch_callback_trap() {
assert_eq!(
None,
run_and_count_yields_or_trap(
"
(module
(import \"\" \"bump_epoch\" (func $bump))
(func (export \"run\")
call $bump ;; bump epoch
call $subfunc) ;; call func; will notice new epoch and yield
(func $subfunc))
",
1,
InterruptMode::Callback(|_| Err(anyhow!("Failing in callback"))),
|_| {},
)
.await
);
}
#[tokio::test] #[tokio::test]
async fn drop_future_on_epoch_yield() { async fn drop_future_on_epoch_yield() {
let wasm = " let wasm = "