Use tokio::test instead of dummy_waker in tests (#3975)

Currently wasmtime's async tests use a mixture of `#[tokio::test]` and
`dummy_waker`. To be consistent this tries to move all tests possible to
`#[tokio::test]` and just a two need to keep using `dummy_waker` (no
renamed to `noop_waker`) due to what they're testing.
This commit is contained in:
Alex Crichton
2022-04-18 15:56:35 -05:00
committed by GitHub
parent 5aa9bdc7eb
commit 534e4263ce
2 changed files with 260 additions and 334 deletions

View File

@@ -8,35 +8,35 @@ fn async_store() -> Store<()> {
Store::new(&Engine::new(Config::new().async_support(true)).unwrap(), ()) Store::new(&Engine::new(Config::new().async_support(true)).unwrap(), ())
} }
fn run_smoke_test(store: &mut Store<()>, func: Func) { async fn run_smoke_test(store: &mut Store<()>, func: Func) {
run(func.call_async(&mut *store, &[], &mut [])).unwrap(); func.call_async(&mut *store, &[], &mut []).await.unwrap();
run(func.call_async(&mut *store, &[], &mut [])).unwrap(); func.call_async(&mut *store, &[], &mut []).await.unwrap();
} }
fn run_smoke_typed_test(store: &mut Store<()>, func: Func) { async fn run_smoke_typed_test(store: &mut Store<()>, func: Func) {
let func = func.typed::<(), (), _>(&store).unwrap(); let func = func.typed::<(), (), _>(&store).unwrap();
run(func.call_async(&mut *store, ())).unwrap(); func.call_async(&mut *store, ()).await.unwrap();
run(func.call_async(&mut *store, ())).unwrap(); func.call_async(&mut *store, ()).await.unwrap();
} }
#[test] #[tokio::test]
fn smoke() { async fn smoke() {
let mut store = async_store(); let mut store = async_store();
let func = Func::new_async( let func = Func::new_async(
&mut store, &mut store,
FuncType::new(None, None), FuncType::new(None, None),
move |_caller, _params, _results| Box::new(async { Ok(()) }), move |_caller, _params, _results| Box::new(async { Ok(()) }),
); );
run_smoke_test(&mut store, func); run_smoke_test(&mut store, func).await;
run_smoke_typed_test(&mut store, func); run_smoke_typed_test(&mut store, func).await;
let func = Func::wrap0_async(&mut store, move |_caller| Box::new(async { Ok(()) })); let func = Func::wrap0_async(&mut store, move |_caller| Box::new(async { Ok(()) }));
run_smoke_test(&mut store, func); run_smoke_test(&mut store, func).await;
run_smoke_typed_test(&mut store, func); run_smoke_typed_test(&mut store, func).await;
} }
#[test] #[tokio::test]
fn smoke_host_func() -> Result<()> { async fn smoke_host_func() -> Result<()> {
let mut store = async_store(); let mut store = async_store();
let mut linker = Linker::new(store.engine()); let mut linker = Linker::new(store.engine());
@@ -54,48 +54,48 @@ fn smoke_host_func() -> Result<()> {
.unwrap() .unwrap()
.into_func() .into_func()
.unwrap(); .unwrap();
run_smoke_test(&mut store, func); run_smoke_test(&mut store, func).await;
run_smoke_typed_test(&mut store, func); run_smoke_typed_test(&mut store, func).await;
let func = linker let func = linker
.get(&mut store, "", "second") .get(&mut store, "", "second")
.unwrap() .unwrap()
.into_func() .into_func()
.unwrap(); .unwrap();
run_smoke_test(&mut store, func); run_smoke_test(&mut store, func).await;
run_smoke_typed_test(&mut store, func); run_smoke_typed_test(&mut store, func).await;
Ok(()) Ok(())
} }
#[test] #[tokio::test]
fn smoke_with_suspension() { async fn smoke_with_suspension() {
let mut store = async_store(); let mut store = async_store();
let func = Func::new_async( let func = Func::new_async(
&mut store, &mut store,
FuncType::new(None, None), FuncType::new(None, None),
move |_caller, _params, _results| { move |_caller, _params, _results| {
Box::new(async { Box::new(async {
PendingOnce::default().await; tokio::task::yield_now().await;
Ok(()) Ok(())
}) })
}, },
); );
run_smoke_test(&mut store, func); run_smoke_test(&mut store, func).await;
run_smoke_typed_test(&mut store, func); run_smoke_typed_test(&mut store, func).await;
let func = Func::wrap0_async(&mut store, move |_caller| { let func = Func::wrap0_async(&mut store, move |_caller| {
Box::new(async { Box::new(async {
PendingOnce::default().await; tokio::task::yield_now().await;
Ok(()) Ok(())
}) })
}); });
run_smoke_test(&mut store, func); run_smoke_test(&mut store, func).await;
run_smoke_typed_test(&mut store, func); run_smoke_typed_test(&mut store, func).await;
} }
#[test] #[tokio::test]
fn smoke_host_func_with_suspension() -> Result<()> { async fn smoke_host_func_with_suspension() -> Result<()> {
let mut store = async_store(); let mut store = async_store();
let mut linker = Linker::new(store.engine()); let mut linker = Linker::new(store.engine());
@@ -105,7 +105,7 @@ fn smoke_host_func_with_suspension() -> Result<()> {
FuncType::new(None, None), FuncType::new(None, None),
move |_caller, _params, _results| { move |_caller, _params, _results| {
Box::new(async { Box::new(async {
PendingOnce::default().await; tokio::task::yield_now().await;
Ok(()) Ok(())
}) })
}, },
@@ -113,7 +113,7 @@ fn smoke_host_func_with_suspension() -> Result<()> {
linker.func_wrap0_async("", "second", move |_caller| { linker.func_wrap0_async("", "second", move |_caller| {
Box::new(async { Box::new(async {
PendingOnce::default().await; tokio::task::yield_now().await;
Ok(()) Ok(())
}) })
})?; })?;
@@ -123,29 +123,29 @@ fn smoke_host_func_with_suspension() -> Result<()> {
.unwrap() .unwrap()
.into_func() .into_func()
.unwrap(); .unwrap();
run_smoke_test(&mut store, func); run_smoke_test(&mut store, func).await;
run_smoke_typed_test(&mut store, func); run_smoke_typed_test(&mut store, func).await;
let func = linker let func = linker
.get(&mut store, "", "second") .get(&mut store, "", "second")
.unwrap() .unwrap()
.into_func() .into_func()
.unwrap(); .unwrap();
run_smoke_test(&mut store, func); run_smoke_test(&mut store, func).await;
run_smoke_typed_test(&mut store, func); run_smoke_typed_test(&mut store, func).await;
Ok(()) Ok(())
} }
#[test] #[tokio::test]
fn recursive_call() { async fn recursive_call() {
let mut store = async_store(); let mut store = async_store();
let async_wasm_func = Func::new_async( let async_wasm_func = Func::new_async(
&mut store, &mut store,
FuncType::new(None, None), FuncType::new(None, None),
|_caller, _params, _results| { |_caller, _params, _results| {
Box::new(async { Box::new(async {
PendingOnce::default().await; tokio::task::yield_now().await;
Ok(()) Ok(())
}) })
}, },
@@ -183,16 +183,15 @@ fn recursive_call() {
) )
.unwrap(); .unwrap();
run(async { let instance = Instance::new_async(&mut store, &module, &[func2.into()])
let instance = Instance::new_async(&mut store, &module, &[func2.into()]).await?; .await
let func = instance.get_func(&mut store, "").unwrap(); .unwrap();
func.call_async(&mut store, &[], &mut []).await let func = instance.get_func(&mut store, "").unwrap();
}) func.call_async(&mut store, &[], &mut []).await.unwrap();
.unwrap();
} }
#[test] #[tokio::test]
fn suspend_while_suspending() { async fn suspend_while_suspending() {
let mut store = async_store(); let mut store = async_store();
// Create a synchronous function which calls our asynchronous function and // Create a synchronous function which calls our asynchronous function and
@@ -211,7 +210,11 @@ fn suspend_while_suspending() {
&mut store, &mut store,
FuncType::new(None, None), FuncType::new(None, None),
move |mut caller, _params, _results| { move |mut caller, _params, _results| {
run(async_thunk.call_async(&mut caller, &[], &mut []))?; let mut future = Box::pin(async_thunk.call_async(&mut caller, &[], &mut []));
let poll = future
.as_mut()
.poll(&mut Context::from_waker(&noop_waker()));
assert!(poll.is_ready());
Ok(()) Ok(())
}, },
); );
@@ -223,7 +226,7 @@ fn suspend_while_suspending() {
FuncType::new(None, None), FuncType::new(None, None),
move |_caller, _params, _results| { move |_caller, _params, _results| {
Box::new(async move { Box::new(async move {
PendingOnce::default().await; tokio::task::yield_now().await;
Ok(()) Ok(())
}) })
}, },
@@ -243,21 +246,19 @@ fn suspend_while_suspending() {
", ",
) )
.unwrap(); .unwrap();
run(async { let instance = Instance::new_async(
let instance = Instance::new_async( &mut store,
&mut store, &module,
&module, &[sync_call_async_thunk.into(), async_import.into()],
&[sync_call_async_thunk.into(), async_import.into()], )
) .await
.await?;
let func = instance.get_func(&mut store, "").unwrap();
func.call_async(&mut store, &[], &mut []).await
})
.unwrap(); .unwrap();
let func = instance.get_func(&mut store, "").unwrap();
func.call_async(&mut store, &[], &mut []).await.unwrap();
} }
#[test] #[tokio::test]
fn cancel_during_run() { async fn cancel_during_run() {
let mut store = Store::new(&Engine::new(Config::new().async_support(true)).unwrap(), 0); let mut store = Store::new(&Engine::new(Config::new().async_support(true)).unwrap(), 0);
let async_thunk = Func::new_async( let async_thunk = Func::new_async(
@@ -269,7 +270,7 @@ fn cancel_during_run() {
let dtor = SetOnDrop(caller); let dtor = SetOnDrop(caller);
Box::new(async move { Box::new(async move {
drop(&dtor); drop(&dtor);
PendingOnce::default().await; tokio::task::yield_now().await;
Ok(()) Ok(())
}) })
}, },
@@ -279,14 +280,11 @@ fn cancel_during_run() {
// Create our future, but as per async conventions this still doesn't // Create our future, but as per async conventions this still doesn't
// actually do anything. No wasm or host function has been called yet. // actually do anything. No wasm or host function has been called yet.
let mut future = Pin::from(Box::new(async_thunk.call_async(&mut store, &[], &mut []))); let future = Box::pin(async_thunk.call_async(&mut store, &[], &mut []));
// Push the future forward one tick, which actually runs the host code in // Push the future forward one tick, which actually runs the host code in
// our async func. Our future is designed to be pending once, however. // our async func. Our future is designed to be pending once, however.
let poll = future let future = PollOnce::new(future).await;
.as_mut()
.poll(&mut Context::from_waker(&dummy_waker()));
assert!(poll.is_pending());
// Now that our future is running (on a separate, now-suspended fiber), drop // Now that our future is running (on a separate, now-suspended fiber), drop
// the future and that should deallocate all the Rust bits as well. // the future and that should deallocate all the Rust bits as well.
@@ -303,61 +301,8 @@ fn cancel_during_run() {
} }
} }
#[derive(Default)] #[tokio::test]
struct PendingOnce { async fn iloop_with_fuel() {
already_polled: bool,
}
impl Future for PendingOnce {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if self.already_polled {
Poll::Ready(())
} else {
self.already_polled = true;
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
fn run<F: Future>(future: F) -> F::Output {
let mut f = Pin::from(Box::new(future));
let waker = dummy_waker();
let mut cx = Context::from_waker(&waker);
loop {
match f.as_mut().poll(&mut cx) {
Poll::Ready(val) => break val,
Poll::Pending => {}
}
}
}
fn dummy_waker() -> Waker {
return unsafe { Waker::from_raw(clone(5 as *const _)) };
unsafe fn clone(ptr: *const ()) -> RawWaker {
assert_eq!(ptr as usize, 5);
const VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake_by_ref, drop);
RawWaker::new(ptr, &VTABLE)
}
unsafe fn wake(ptr: *const ()) {
assert_eq!(ptr as usize, 5);
}
unsafe fn wake_by_ref(ptr: *const ()) {
assert_eq!(ptr as usize, 5);
}
unsafe fn drop(ptr: *const ()) {
assert_eq!(ptr as usize, 5);
}
}
#[test]
fn iloop_with_fuel() {
let engine = Engine::new(Config::new().async_support(true).consume_fuel(true)).unwrap(); let engine = Engine::new(Config::new().async_support(true).consume_fuel(true)).unwrap();
let mut store = Store::new(&engine, ()); let mut store = Store::new(&engine, ());
store.out_of_fuel_async_yield(1_000, 10); store.out_of_fuel_async_yield(1_000, 10);
@@ -372,26 +317,14 @@ fn iloop_with_fuel() {
) )
.unwrap(); .unwrap();
let instance = Instance::new_async(&mut store, &module, &[]); let instance = Instance::new_async(&mut store, &module, &[]);
let mut f = Pin::from(Box::new(instance));
let waker = dummy_waker();
let mut cx = Context::from_waker(&waker);
// This should yield a bunch of times... // This should yield a bunch of times but eventually finish
for _ in 0..100 { let (_, pending) = CountPending::new(Box::pin(instance)).await;
assert!(f.as_mut().poll(&mut cx).is_pending()); assert!(pending > 100);
}
// ... but it should eventually also finish.
loop {
match f.as_mut().poll(&mut cx) {
Poll::Ready(_) => break,
Poll::Pending => {}
}
}
} }
#[test] #[tokio::test]
fn fuel_eventually_finishes() { async fn fuel_eventually_finishes() {
let engine = Engine::new(Config::new().async_support(true).consume_fuel(true)).unwrap(); let engine = Engine::new(Config::new().async_support(true).consume_fuel(true)).unwrap();
let mut store = Store::new(&engine, ()); let mut store = Store::new(&engine, ());
store.out_of_fuel_async_yield(u64::max_value(), 10); store.out_of_fuel_async_yield(u64::max_value(), 10);
@@ -416,11 +349,11 @@ fn fuel_eventually_finishes() {
) )
.unwrap(); .unwrap();
let instance = Instance::new_async(&mut store, &module, &[]); let instance = Instance::new_async(&mut store, &module, &[]);
run(instance).unwrap(); instance.await.unwrap();
} }
#[test] #[tokio::test]
fn async_with_pooling_stacks() { async fn async_with_pooling_stacks() {
let mut config = Config::new(); let mut config = Config::new();
config.async_support(true); config.async_support(true);
config.allocation_strategy(InstanceAllocationStrategy::Pooling { config.allocation_strategy(InstanceAllocationStrategy::Pooling {
@@ -444,12 +377,12 @@ fn async_with_pooling_stacks() {
move |_caller, _params, _results| Box::new(async { Ok(()) }), move |_caller, _params, _results| Box::new(async { Ok(()) }),
); );
run_smoke_test(&mut store, func); run_smoke_test(&mut store, func).await;
run_smoke_typed_test(&mut store, func); run_smoke_typed_test(&mut store, func).await;
} }
#[test] #[tokio::test]
fn async_host_func_with_pooling_stacks() -> Result<()> { async fn async_host_func_with_pooling_stacks() -> Result<()> {
let mut config = Config::new(); let mut config = Config::new();
config.async_support(true); config.async_support(true);
config.allocation_strategy(InstanceAllocationStrategy::Pooling { config.allocation_strategy(InstanceAllocationStrategy::Pooling {
@@ -475,30 +408,21 @@ fn async_host_func_with_pooling_stacks() -> Result<()> {
)?; )?;
let func = linker.get(&mut store, "", "").unwrap().into_func().unwrap(); let func = linker.get(&mut store, "", "").unwrap().into_func().unwrap();
run_smoke_test(&mut store, func); run_smoke_test(&mut store, func).await;
run_smoke_typed_test(&mut store, func); run_smoke_typed_test(&mut store, func).await;
Ok(()) Ok(())
} }
fn execute_across_threads<F: Future + Send + 'static>(future: F) { async fn execute_across_threads<F: Future + Send + 'static>(future: F) {
let mut future = Pin::from(Box::new(future)); let future = PollOnce::new(Box::pin(future)).await;
let poll = future
.as_mut()
.poll(&mut Context::from_waker(&dummy_waker()));
assert!(poll.is_pending());
std::thread::spawn(move || { tokio::task::spawn_blocking(move || future)
let poll = future .await
.as_mut() .expect("shouldn't panic");
.poll(&mut Context::from_waker(&dummy_waker()));
assert!(!poll.is_pending());
})
.join()
.unwrap();
} }
#[test] #[tokio::test]
fn resume_separate_thread() { async fn resume_separate_thread() {
// This test will poll the following future on two threads. Simulating a // This test will poll the following future on two threads. Simulating a
// trap requires accessing TLS info, so that should be preserved correctly. // trap requires accessing TLS info, so that should be preserved correctly.
execute_across_threads(async { execute_across_threads(async {
@@ -515,17 +439,18 @@ fn resume_separate_thread() {
.unwrap(); .unwrap();
let func = Func::wrap0_async(&mut store, |_| { let func = Func::wrap0_async(&mut store, |_| {
Box::new(async { Box::new(async {
PendingOnce::default().await; tokio::task::yield_now().await;
Err::<(), _>(wasmtime::Trap::new("test")) Err::<(), _>(wasmtime::Trap::new("test"))
}) })
}); });
let result = Instance::new_async(&mut store, &module, &[func.into()]).await; let result = Instance::new_async(&mut store, &module, &[func.into()]).await;
assert!(result.is_err()); assert!(result.is_err());
}); })
.await;
} }
#[test] #[tokio::test]
fn resume_separate_thread2() { async fn resume_separate_thread2() {
// This test will poll the following future on two threads. Catching a // This test will poll the following future on two threads. Catching a
// signal requires looking up TLS information to determine whether it's a // signal requires looking up TLS information to determine whether it's a
// trap to handle or not, so that must be preserved correctly across threads. // trap to handle or not, so that must be preserved correctly across threads.
@@ -545,15 +470,18 @@ fn resume_separate_thread2() {
) )
.unwrap(); .unwrap();
let func = Func::wrap0_async(&mut store, |_| { let func = Func::wrap0_async(&mut store, |_| {
Box::new(async { PendingOnce::default().await }) Box::new(async {
tokio::task::yield_now().await;
})
}); });
let result = Instance::new_async(&mut store, &module, &[func.into()]).await; let result = Instance::new_async(&mut store, &module, &[func.into()]).await;
assert!(result.is_err()); assert!(result.is_err());
}); })
.await;
} }
#[test] #[tokio::test]
fn resume_separate_thread3() { async fn resume_separate_thread3() {
// This test doesn't actually do anything with cross-thread polls, but // This test doesn't actually do anything with cross-thread polls, but
// instead it deals with scheduling futures at "odd" times. // instead it deals with scheduling futures at "odd" times.
// //
@@ -581,15 +509,17 @@ fn resume_separate_thread3() {
) )
.unwrap(); .unwrap();
let func = Func::wrap0_async(&mut store, |_| { let func = Func::wrap0_async(&mut store, |_| {
Box::new(async { PendingOnce::default().await }) Box::new(async {
tokio::task::yield_now().await;
})
}); });
drop(Instance::new_async(&mut store, &module, &[func.into()]).await); drop(Instance::new_async(&mut store, &module, &[func.into()]).await);
unreachable!() unreachable!()
}; };
let mut future = Pin::from(Box::new(f)); let mut future = Box::pin(f);
let poll = future let poll = future
.as_mut() .as_mut()
.poll(&mut Context::from_waker(&dummy_waker())); .poll(&mut Context::from_waker(&noop_waker()));
assert!(poll.is_pending()); assert!(poll.is_pending());
// ... so at this point our call into wasm is suspended. The call into // ... so at this point our call into wasm is suspended. The call into
@@ -609,8 +539,8 @@ fn resume_separate_thread3() {
assert!(f.call(&mut store, &[], &mut []).is_err()); assert!(f.call(&mut store, &[], &mut []).is_err());
} }
#[test] #[tokio::test]
fn recursive_async() -> Result<()> { async fn recursive_async() -> Result<()> {
let mut store = async_store(); let mut store = async_store();
let m = Module::new( let m = Module::new(
store.engine(), store.engine(),
@@ -619,7 +549,7 @@ fn recursive_async() -> Result<()> {
(func (export \"normal\")) (func (export \"normal\"))
)", )",
)?; )?;
let i = run(Instance::new_async(&mut store, &m, &[]))?; let i = Instance::new_async(&mut store, &m, &[]).await?;
let overflow = i.get_typed_func::<(), (), _>(&mut store, "overflow")?; let overflow = i.get_typed_func::<(), (), _>(&mut store, "overflow")?;
let normal = i.get_typed_func::<(), (), _>(&mut store, "normal")?; let normal = i.get_typed_func::<(), (), _>(&mut store, "normal")?;
let f2 = Func::wrap0_async(&mut store, move |mut caller| { let f2 = Func::wrap0_async(&mut store, move |mut caller| {
@@ -634,88 +564,142 @@ fn recursive_async() -> Result<()> {
Ok(()) Ok(())
}) })
}); });
run(f2.call_async(&mut store, &[], &mut []))?; f2.call_async(&mut store, &[], &mut []).await?;
Ok(()) Ok(())
} }
#[test] #[tokio::test]
fn linker_module_command() -> Result<()> { async fn linker_module_command() -> Result<()> {
run(async { let mut store = async_store();
let mut store = async_store(); let mut linker = Linker::new(store.engine());
let mut linker = Linker::new(store.engine()); let module1 = Module::new(
let module1 = Module::new( store.engine(),
store.engine(), r#"
r#" (module
(module (global $g (mut i32) (i32.const 0))
(global $g (mut i32) (i32.const 0))
(func (export "_start")) (func (export "_start"))
(func (export "g") (result i32) (func (export "g") (result i32)
global.get $g global.get $g
i32.const 1 i32.const 1
global.set $g) global.set $g)
) )
"#, "#,
)?; )?;
let module2 = Module::new( let module2 = Module::new(
store.engine(), store.engine(),
r#" r#"
(module (module
(import "" "g" (func (result i32))) (import "" "g" (func (result i32)))
(func (export "get") (result i32) (func (export "get") (result i32)
call 0) call 0)
) )
"#, "#,
)?; )?;
linker.module_async(&mut store, "", &module1).await?; linker.module_async(&mut store, "", &module1).await?;
let instance = linker.instantiate_async(&mut store, &module2).await?; let instance = linker.instantiate_async(&mut store, &module2).await?;
let f = instance.get_typed_func::<(), i32, _>(&mut store, "get")?; let f = instance.get_typed_func::<(), i32, _>(&mut store, "get")?;
assert_eq!(f.call_async(&mut store, ()).await?, 0); assert_eq!(f.call_async(&mut store, ()).await?, 0);
assert_eq!(f.call_async(&mut store, ()).await?, 0); assert_eq!(f.call_async(&mut store, ()).await?, 0);
Ok(()) Ok(())
})
} }
#[test] #[tokio::test]
fn linker_module_reactor() -> Result<()> { async fn linker_module_reactor() -> Result<()> {
run(async { let mut store = async_store();
let mut store = async_store(); let mut linker = Linker::new(store.engine());
let mut linker = Linker::new(store.engine()); let module1 = Module::new(
let module1 = Module::new( store.engine(),
store.engine(), r#"
r#" (module
(module (global $g (mut i32) (i32.const 0))
(global $g (mut i32) (i32.const 0))
(func (export "g") (result i32) (func (export "g") (result i32)
global.get $g global.get $g
i32.const 1 i32.const 1
global.set $g) global.set $g)
) )
"#, "#,
)?; )?;
let module2 = Module::new( let module2 = Module::new(
store.engine(), store.engine(),
r#" r#"
(module (module
(import "" "g" (func (result i32))) (import "" "g" (func (result i32)))
(func (export "get") (result i32) (func (export "get") (result i32)
call 0) call 0)
) )
"#, "#,
)?; )?;
linker.module_async(&mut store, "", &module1).await?; linker.module_async(&mut store, "", &module1).await?;
let instance = linker.instantiate_async(&mut store, &module2).await?; let instance = linker.instantiate_async(&mut store, &module2).await?;
let f = instance.get_typed_func::<(), i32, _>(&mut store, "get")?; let f = instance.get_typed_func::<(), i32, _>(&mut store, "get")?;
assert_eq!(f.call_async(&mut store, ()).await?, 0); assert_eq!(f.call_async(&mut store, ()).await?, 0);
assert_eq!(f.call_async(&mut store, ()).await?, 1); assert_eq!(f.call_async(&mut store, ()).await?, 1);
Ok(()) Ok(())
}) }
pub struct CountPending<F> {
future: F,
yields: usize,
}
impl<F> CountPending<F> {
pub fn new(future: F) -> CountPending<F> {
CountPending { future, yields: 0 }
}
}
impl<F> Future for CountPending<F>
where
F: Future + Unpin,
{
type Output = (F::Output, usize);
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.future).poll(cx) {
Poll::Pending => {
self.yields += 1;
Poll::Pending
}
Poll::Ready(e) => Poll::Ready((e, self.yields)),
}
}
}
pub struct PollOnce<F>(Option<F>);
impl<F> PollOnce<F> {
pub fn new(future: F) -> PollOnce<F> {
PollOnce(Some(future))
}
}
impl<F> Future for PollOnce<F>
where
F: Future + Unpin,
{
type Output = F;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F> {
let mut future = self.0.take().unwrap();
match Pin::new(&mut future).poll(cx) {
Poll::Pending => Poll::Ready(future),
Poll::Ready(_) => panic!("should not be ready"),
}
}
}
fn noop_waker() -> Waker {
const VTABLE: RawWakerVTable =
RawWakerVTable::new(|ptr| RawWaker::new(ptr, &VTABLE), |_| {}, |_| {}, |_| {});
const RAW: RawWaker = RawWaker::new(0 as *const (), &VTABLE);
unsafe { Waker::from_raw(RAW) }
} }

View File

@@ -1,31 +1,8 @@
use std::future::Future; use crate::async_functions::{CountPending, PollOnce};
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
use wasmtime::*; use wasmtime::*;
fn dummy_waker() -> Waker {
return unsafe { Waker::from_raw(clone(5 as *const _)) };
unsafe fn clone(ptr: *const ()) -> RawWaker {
assert_eq!(ptr as usize, 5);
const VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake_by_ref, drop);
RawWaker::new(ptr, &VTABLE)
}
unsafe fn wake(ptr: *const ()) {
assert_eq!(ptr as usize, 5);
}
unsafe fn wake_by_ref(ptr: *const ()) {
assert_eq!(ptr as usize, 5);
}
unsafe fn drop(ptr: *const ()) {
assert_eq!(ptr as usize, 5);
}
}
fn build_engine() -> Arc<Engine> { fn build_engine() -> Arc<Engine> {
let mut config = Config::new(); let mut config = Config::new();
config.async_support(true); config.async_support(true);
@@ -59,7 +36,7 @@ fn make_env(engine: &Engine) -> Linker<()> {
/// ///
/// Returns `Some(yields)` if function completed normally, giving the /// Returns `Some(yields)` if function completed normally, giving the
/// number of yields that occured, or `None` if a trap occurred. /// number of yields that occured, or `None` if a trap occurred.
fn run_and_count_yields_or_trap<F: Fn(Arc<Engine>)>( async fn run_and_count_yields_or_trap<F: Fn(Arc<Engine>)>(
wasm: &str, wasm: &str,
initial: u64, initial: u64,
delta: Option<u64>, delta: Option<u64>,
@@ -82,39 +59,15 @@ fn run_and_count_yields_or_trap<F: Fn(Arc<Engine>)>(
let engine_clone = engine.clone(); let engine_clone = engine.clone();
setup_func(engine_clone); setup_func(engine_clone);
let mut future = Box::pin(async { let instance = linker.instantiate_async(&mut store, &module).await.unwrap();
let instance = linker.instantiate_async(&mut store, &module).await.unwrap(); let f = instance.get_func(&mut store, "run").unwrap();
let f = instance.get_func(&mut store, "run").unwrap(); let (result, yields) =
f.call_async(&mut store, &[], &mut []).await CountPending::new(Box::pin(f.call_async(&mut store, &[], &mut []))).await;
}); return result.ok().map(|_| yields);
let mut yields = 0;
loop {
match future
.as_mut()
.poll(&mut Context::from_waker(&dummy_waker()))
{
Poll::Pending => {
yields += 1;
}
Poll::Ready(Ok(..)) => {
break;
}
Poll::Ready(Err(e)) => match e.downcast::<wasmtime::Trap>() {
Ok(_) => {
return None;
}
e => {
e.unwrap();
}
},
}
}
Some(yields)
} }
#[test] #[tokio::test]
fn epoch_yield_at_func_entry() { async fn epoch_yield_at_func_entry() {
// Should yield at start of call to func $subfunc. // Should yield at start of call to func $subfunc.
assert_eq!( assert_eq!(
Some(1), Some(1),
@@ -131,11 +84,12 @@ fn epoch_yield_at_func_entry() {
Some(1), Some(1),
|_| {}, |_| {},
) )
.await
); );
} }
#[test] #[tokio::test]
fn epoch_yield_at_loop_header() { async fn epoch_yield_at_loop_header() {
// Should yield at top of loop, once per five iters. // Should yield at top of loop, once per five iters.
assert_eq!( assert_eq!(
Some(2), Some(2),
@@ -154,11 +108,12 @@ fn epoch_yield_at_loop_header() {
Some(5), Some(5),
|_| {}, |_| {},
) )
.await
); );
} }
#[test] #[tokio::test]
fn epoch_yield_immediate() { async fn epoch_yield_immediate() {
// We should see one yield immediately when the initial deadline // We should see one yield immediately when the initial deadline
// is zero. // is zero.
assert_eq!( assert_eq!(
@@ -173,11 +128,12 @@ fn epoch_yield_immediate() {
Some(1), Some(1),
|_| {}, |_| {},
) )
.await
); );
} }
#[test] #[tokio::test]
fn epoch_yield_only_once() { async fn epoch_yield_only_once() {
// We should yield from the subfunction, and then when we return // We should yield from the subfunction, and then when we return
// to the outer function and hit another loop header, we should // to the outer function and hit another loop header, we should
// not yield again (the double-check block will reload the correct // not yield again (the double-check block will reload the correct
@@ -202,11 +158,12 @@ fn epoch_yield_only_once() {
Some(1), Some(1),
|_| {}, |_| {},
) )
.await
); );
} }
#[test] #[tokio::test]
fn epoch_interrupt_infinite_loop() { async fn epoch_interrupt_infinite_loop() {
assert_eq!( assert_eq!(
None, None,
run_and_count_yields_or_trap( run_and_count_yields_or_trap(
@@ -226,11 +183,12 @@ fn epoch_interrupt_infinite_loop() {
}); });
}, },
) )
.await
); );
} }
#[test] #[tokio::test]
fn epoch_interrupt_function_entries() { async fn epoch_interrupt_function_entries() {
assert_eq!( assert_eq!(
None, None,
run_and_count_yields_or_trap( run_and_count_yields_or_trap(
@@ -347,11 +305,12 @@ fn epoch_interrupt_function_entries() {
}); });
}, },
) )
.await
); );
} }
#[test] #[tokio::test]
fn drop_future_on_epoch_yield() { async fn drop_future_on_epoch_yield() {
let wasm = " let wasm = "
(module (module
(import \"\" \"bump_epoch\" (func $bump)) (import \"\" \"bump_epoch\" (func $bump))
@@ -399,26 +358,9 @@ fn drop_future_on_epoch_yield() {
store.set_epoch_deadline(1); store.set_epoch_deadline(1);
store.epoch_deadline_async_yield_and_update(1); store.epoch_deadline_async_yield_and_update(1);
let mut future = Box::pin(async { let instance = linker.instantiate_async(&mut store, &module).await.unwrap();
let instance = linker.instantiate_async(&mut store, &module).await.unwrap(); let f = instance.get_func(&mut store, "run").unwrap();
let f = instance.get_func(&mut store, "run").unwrap(); PollOnce::new(Box::pin(f.call_async(&mut store, &[], &mut []))).await;
f.call_async(&mut store, &[], &mut []).await
});
match future
.as_mut()
.poll(&mut Context::from_waker(&dummy_waker()))
{
Poll::Pending => {
// OK: expected yield.
}
Poll::Ready(Ok(..)) => {
panic!("Shoulud not have returned");
}
Poll::Ready(e) => {
e.unwrap();
}
}
drop(future);
assert_eq!(true, alive_flag.load(Ordering::Acquire)); assert_eq!(true, alive_flag.load(Ordering::Acquire));
} }