* Implement support for `async` functions in Wasmtime This is an implementation of [RFC 2] in Wasmtime which is to support `async`-defined host functions. At a high level support is added by executing WebAssembly code that might invoke an asynchronous host function on a separate native stack. When the host function's future is not ready we switch back to the main native stack to continue execution. There's a whole bunch of details in this commit, and it's a bit much to go over them all here in this commit message. The most important changes here are: * A new `wasmtime-fiber` crate has been written to manage the low-level details of stack-switching. Unixes use `mmap` to allocate a stack and Windows uses the native fibers implementation. We'll surely want to refactor this to move stack allocation elsewhere in the future. Fibers are intended to be relatively general with a lot of type paremters to fling values back and forth across suspension points. The whole crate is a giant wad of `unsafe` unfortunately and involves handwritten assembly with custom dwarf CFI directives to boot. Definitely deserves a close eye in review! * The `Store` type has two new methods -- `block_on` and `on_fiber` which bridge between the async and non-async worlds. Lots of unsafe fiddly bits here as we're trying to communicate context pointers between disparate portions of the code. Extra eyes and care in review is greatly appreciated. * The APIs for binding `async` functions are unfortunately pretty ugly in `Func`. This is mostly due to language limitations and compiler bugs (I believe) in Rust. Instead of `Func::wrap` we have a `Func::wrapN_async` family of methods, and we've also got a whole bunch of `Func::getN_async` methods now too. It may be worth rethinking the API of `Func` to try to make the documentation page actually grok'able. This isn't super heavily tested but the various test should suffice for engaging hopefully nearly all the infrastructure in one form or another. This is just the start though! [RFC 2]: https://github.com/bytecodealliance/rfcs/pull/2 * Add wasmtime-fiber to publish script * Save vector/float registers on ARM too. * Fix a typo * Update lock file * Implement periodically yielding with fuel consumption This commit implements APIs on `Store` to periodically yield execution of futures through the consumption of fuel. When fuel runs out a future's execution is yielded back to the caller, and then upon resumption fuel is re-injected. The goal of this is to allow cooperative multi-tasking with futures. * Fix compile without async * Save/restore the frame pointer in fiber switching Turns out this is another caller-saved register! * Simplify x86_64 fiber asm Take a leaf out of aarch64's playbook and don't have extra memory to load/store these arguments, instead leverage how `wasmtime_fiber_switch` already loads a bunch of data into registers which we can then immediately start using on a fiber's start without any extra memory accesses. * Add x86 support to wasmtime-fiber * Add ARM32 support to fiber crate * Make fiber build file probing more flexible * Use CreateFiberEx on Windows * Remove a stray no-longer-used trait declaration * Don't reach into `Caller` internals * Tweak async fuel to eventually run out. With fuel it's probably best to not provide any way to inject infinite fuel. * Fix some typos * Cleanup asm a bit * Use a shared header file to deduplicate some directives * Guarantee hidden visibility for functions * Enable gc-sections on macOS x86_64 * Add `.type` annotations for ARM * Update lock file * Fix compile error * Review comments
367 lines
10 KiB
Rust
367 lines
10 KiB
Rust
use std::cell::Cell;
|
|
use std::future::Future;
|
|
use std::pin::Pin;
|
|
use std::rc::Rc;
|
|
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
|
|
use wasmtime::*;
|
|
|
|
fn async_store() -> Store {
|
|
let engine = Engine::default();
|
|
Store::new_async(&engine)
|
|
}
|
|
|
|
#[test]
|
|
fn smoke() {
|
|
let store = async_store();
|
|
let func = Func::new_async(
|
|
&store,
|
|
FuncType::new(None, None),
|
|
(),
|
|
move |_caller, _state, _params, _results| Box::new(async { Ok(()) }),
|
|
);
|
|
run(func.call_async(&[])).unwrap();
|
|
run(func.call_async(&[])).unwrap();
|
|
let future1 = func.call_async(&[]);
|
|
let future2 = func.call_async(&[]);
|
|
run(future2).unwrap();
|
|
run(future1).unwrap();
|
|
}
|
|
|
|
#[test]
|
|
fn smoke_with_suspension() {
|
|
let store = async_store();
|
|
let func = Func::new_async(
|
|
&store,
|
|
FuncType::new(None, None),
|
|
(),
|
|
move |_caller, _state, _params, _results| {
|
|
Box::new(async {
|
|
PendingOnce::default().await;
|
|
Ok(())
|
|
})
|
|
},
|
|
);
|
|
run(func.call_async(&[])).unwrap();
|
|
run(func.call_async(&[])).unwrap();
|
|
let future1 = func.call_async(&[]);
|
|
let future2 = func.call_async(&[]);
|
|
run(future2).unwrap();
|
|
run(future1).unwrap();
|
|
}
|
|
|
|
#[test]
|
|
fn smoke_get_with_suspension() {
|
|
let store = async_store();
|
|
let func = Func::new_async(
|
|
&store,
|
|
FuncType::new(None, None),
|
|
(),
|
|
move |_caller, _state, _params, _results| {
|
|
Box::new(async {
|
|
PendingOnce::default().await;
|
|
Ok(())
|
|
})
|
|
},
|
|
);
|
|
let func = func.get0_async::<()>().unwrap();
|
|
run(func()).unwrap();
|
|
run(func()).unwrap();
|
|
let future1 = func();
|
|
let future2 = func();
|
|
run(future2).unwrap();
|
|
run(future1).unwrap();
|
|
}
|
|
|
|
#[test]
|
|
fn recursive_call() {
|
|
let store = async_store();
|
|
let async_wasm_func = Rc::new(Func::new_async(
|
|
&store,
|
|
FuncType::new(None, None),
|
|
(),
|
|
|_caller, _state, _params, _results| {
|
|
Box::new(async {
|
|
PendingOnce::default().await;
|
|
Ok(())
|
|
})
|
|
},
|
|
));
|
|
let weak = Rc::downgrade(&async_wasm_func);
|
|
|
|
// Create an imported function which recursively invokes another wasm
|
|
// function asynchronously, although this one is just our own host function
|
|
// which suffices for this test.
|
|
let func2 = Func::new_async(
|
|
&store,
|
|
FuncType::new(None, None),
|
|
(),
|
|
move |_caller, _state, _params, _results| {
|
|
let async_wasm_func = weak.upgrade().unwrap();
|
|
Box::new(async move {
|
|
async_wasm_func.call_async(&[]).await?;
|
|
Ok(())
|
|
})
|
|
},
|
|
);
|
|
|
|
// Create an instance which calls an async import twice.
|
|
let module = Module::new(
|
|
store.engine(),
|
|
"
|
|
(module
|
|
(import \"\" \"\" (func))
|
|
(func (export \"\")
|
|
;; call imported function which recursively does an async
|
|
;; call
|
|
call 0
|
|
;; do it again, and our various pointers all better align
|
|
call 0))
|
|
",
|
|
)
|
|
.unwrap();
|
|
|
|
run(async {
|
|
let instance = Instance::new_async(&store, &module, &[func2.into()]).await?;
|
|
let func = instance.get_func("").unwrap();
|
|
func.call_async(&[]).await
|
|
})
|
|
.unwrap();
|
|
}
|
|
|
|
#[test]
|
|
fn suspend_while_suspending() {
|
|
let store = async_store();
|
|
|
|
// Create a synchronous function which calls our asynchronous function and
|
|
// runs it locally. This shouldn't generally happen but we know everything
|
|
// is synchronous in this test so it's fine for us to do this.
|
|
//
|
|
// The purpose of this test is intended to stress various cases in how
|
|
// we manage pointers in ways that are not necessarily common but are still
|
|
// possible in safe code.
|
|
let async_thunk = Rc::new(Func::new_async(
|
|
&store,
|
|
FuncType::new(None, None),
|
|
(),
|
|
|_caller, _state, _params, _results| Box::new(async { Ok(()) }),
|
|
));
|
|
let weak = Rc::downgrade(&async_thunk);
|
|
let sync_call_async_thunk = Func::new(
|
|
&store,
|
|
FuncType::new(None, None),
|
|
move |_caller, _params, _results| {
|
|
let async_thunk = weak.upgrade().unwrap();
|
|
run(async_thunk.call_async(&[]))?;
|
|
Ok(())
|
|
},
|
|
);
|
|
|
|
// A small async function that simply awaits once to pump the loops and
|
|
// then finishes.
|
|
let async_import = Func::new_async(
|
|
&store,
|
|
FuncType::new(None, None),
|
|
(),
|
|
move |_caller, _state, _params, _results| {
|
|
Box::new(async move {
|
|
PendingOnce::default().await;
|
|
Ok(())
|
|
})
|
|
},
|
|
);
|
|
|
|
let module = Module::new(
|
|
store.engine(),
|
|
"
|
|
(module
|
|
(import \"\" \"\" (func $sync_call_async_thunk))
|
|
(import \"\" \"\" (func $async_import))
|
|
(func (export \"\")
|
|
;; Set some store-local state and pointers
|
|
call $sync_call_async_thunk
|
|
;; .. and hopefully it's all still configured correctly
|
|
call $async_import))
|
|
",
|
|
)
|
|
.unwrap();
|
|
run(async {
|
|
let instance = Instance::new_async(
|
|
&store,
|
|
&module,
|
|
&[sync_call_async_thunk.into(), async_import.into()],
|
|
)
|
|
.await?;
|
|
let func = instance.get_func("").unwrap();
|
|
func.call_async(&[]).await
|
|
})
|
|
.unwrap();
|
|
}
|
|
|
|
#[test]
|
|
fn cancel_during_run() {
|
|
let store = async_store();
|
|
let state = Rc::new(Cell::new(0));
|
|
let state2 = state.clone();
|
|
|
|
let async_thunk = Func::new_async(
|
|
&store,
|
|
FuncType::new(None, None),
|
|
(),
|
|
move |_caller, _state, _params, _results| {
|
|
assert_eq!(state2.get(), 0);
|
|
state2.set(1);
|
|
let dtor = SetOnDrop(state2.clone());
|
|
Box::new(async move {
|
|
drop(&dtor);
|
|
PendingOnce::default().await;
|
|
Ok(())
|
|
})
|
|
},
|
|
);
|
|
// Shouldn't have called anything yet...
|
|
assert_eq!(state.get(), 0);
|
|
|
|
// Create our future, but as per async conventions this still doesn't
|
|
// actually do anything. No wasm or host function has been called yet.
|
|
let mut future = Pin::from(Box::new(async_thunk.call_async(&[])));
|
|
assert_eq!(state.get(), 0);
|
|
|
|
// Push the future forward one tick, which actually runs the host code in
|
|
// our async func. Our future is designed to be pending once, however.
|
|
let poll = future
|
|
.as_mut()
|
|
.poll(&mut Context::from_waker(&dummy_waker()));
|
|
assert!(poll.is_pending());
|
|
assert_eq!(state.get(), 1);
|
|
|
|
// Now that our future is running (on a separate, now-suspended fiber), drop
|
|
// the future and that should deallocate all the Rust bits as well.
|
|
drop(future);
|
|
assert_eq!(state.get(), 2);
|
|
|
|
struct SetOnDrop(Rc<Cell<u32>>);
|
|
|
|
impl Drop for SetOnDrop {
|
|
fn drop(&mut self) {
|
|
assert_eq!(self.0.get(), 1);
|
|
self.0.set(2);
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Default)]
|
|
struct PendingOnce {
|
|
already_polled: bool,
|
|
}
|
|
|
|
impl Future for PendingOnce {
|
|
type Output = ();
|
|
|
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
|
|
if self.already_polled {
|
|
Poll::Ready(())
|
|
} else {
|
|
self.already_polled = true;
|
|
cx.waker().wake_by_ref();
|
|
Poll::Pending
|
|
}
|
|
}
|
|
}
|
|
|
|
fn run<F: Future>(future: F) -> F::Output {
|
|
let mut f = Pin::from(Box::new(future));
|
|
let waker = dummy_waker();
|
|
let mut cx = Context::from_waker(&waker);
|
|
loop {
|
|
match f.as_mut().poll(&mut cx) {
|
|
Poll::Ready(val) => break val,
|
|
Poll::Pending => {}
|
|
}
|
|
}
|
|
}
|
|
|
|
fn dummy_waker() -> Waker {
|
|
return unsafe { Waker::from_raw(clone(5 as *const _)) };
|
|
|
|
unsafe fn clone(ptr: *const ()) -> RawWaker {
|
|
assert_eq!(ptr as usize, 5);
|
|
const VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake_by_ref, drop);
|
|
RawWaker::new(ptr, &VTABLE)
|
|
}
|
|
|
|
unsafe fn wake(ptr: *const ()) {
|
|
assert_eq!(ptr as usize, 5);
|
|
}
|
|
|
|
unsafe fn wake_by_ref(ptr: *const ()) {
|
|
assert_eq!(ptr as usize, 5);
|
|
}
|
|
|
|
unsafe fn drop(ptr: *const ()) {
|
|
assert_eq!(ptr as usize, 5);
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn iloop_with_fuel() {
|
|
let engine = Engine::new(Config::new().consume_fuel(true));
|
|
let store = Store::new_async(&engine);
|
|
store.out_of_fuel_async_yield(1_000, 10);
|
|
let module = Module::new(
|
|
&engine,
|
|
"
|
|
(module
|
|
(func (loop br 0))
|
|
(start 0)
|
|
)
|
|
",
|
|
)
|
|
.unwrap();
|
|
let instance = Instance::new_async(&store, &module, &[]);
|
|
let mut f = Pin::from(Box::new(instance));
|
|
let waker = dummy_waker();
|
|
let mut cx = Context::from_waker(&waker);
|
|
|
|
// This should yield a bunch of times...
|
|
for _ in 0..100 {
|
|
assert!(f.as_mut().poll(&mut cx).is_pending());
|
|
}
|
|
|
|
// ... but it should eventually also finish.
|
|
loop {
|
|
match f.as_mut().poll(&mut cx) {
|
|
Poll::Ready(_) => break,
|
|
Poll::Pending => {}
|
|
}
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn fuel_eventually_finishes() {
|
|
let engine = Engine::new(Config::new().consume_fuel(true));
|
|
let store = Store::new_async(&engine);
|
|
store.out_of_fuel_async_yield(u32::max_value(), 10);
|
|
let module = Module::new(
|
|
&engine,
|
|
"
|
|
(module
|
|
(func
|
|
(local i32)
|
|
i32.const 100
|
|
local.set 0
|
|
(loop
|
|
local.get 0
|
|
i32.const -1
|
|
i32.add
|
|
local.tee 0
|
|
br_if 0)
|
|
)
|
|
(start 0)
|
|
)
|
|
",
|
|
)
|
|
.unwrap();
|
|
let instance = Instance::new_async(&store, &module, &[]);
|
|
run(instance).unwrap();
|
|
}
|