This commit includes a set of changes that add initial support for `wasi-threads` to Wasmtime:
* feat: remove mutability from the WasiCtx Table
This patch adds interior mutability to the WasiCtx Table and the Table elements.
Major pain points:
* `File` only needs `RwLock<cap_std::fs::File>` to implement
`File::set_fdflags()` on Windows, because of [1]
* Because `File` needs a `RwLock` and `RwLock*Guard` cannot
be hold across an `.await`, The `async` from
`async fn num_ready_bytes(&self)` had to be removed
* Because `File` needs a `RwLock` and `RwLock*Guard` cannot
be dereferenced in `pollable`, the signature of
`fn pollable(&self) -> Option<rustix::fd::BorrowedFd>`
changed to `fn pollable(&self) -> Option<Arc<dyn AsFd + '_>>`
[1] da238e324e/src/fs/fd_flags.rs (L210-L217)
* wasi-threads: add an initial implementation
This change is a first step toward implementing `wasi-threads` in
Wasmtime. We may find that it has some missing pieces, but the core
functionality is there: when `wasi::thread_spawn` is called by a running
WebAssembly module, a function named `wasi_thread_start` is found in the
module's exports and called in a new instance. The shared memory of the
original instance is reused in the new instance.
This new WASI proposal is in its early stages and details are still
being hashed out in the [spec] and [wasi-libc] repositories. Due to its
experimental state, the `wasi-threads` functionality is hidden behind
both a compile-time and runtime flag: one must build with `--features
wasi-threads` but also run the Wasmtime CLI with `--wasm-features
threads` and `--wasi-modules experimental-wasi-threads`. One can
experiment with `wasi-threads` by running:
```console
$ cargo run --features wasi-threads -- \
--wasm-features threads --wasi-modules experimental-wasi-threads \
<a threads-enabled module>
```
Threads-enabled Wasm modules are not yet easy to build. Hopefully this
is resolved soon, but in the meantime see the use of
`THREAD_MODEL=posix` in the [wasi-libc] repository for some clues on
what is necessary. Wiggle complicates things by requiring the Wasm
memory to be exported with a certain name and `wasi-threads` also
expects that memory to be imported; this build-time obstacle can be
overcome with the `--import-memory --export-memory` flags only available
in the latest Clang tree. Due to all of this, the included tests are
written directly in WAT--run these with:
```console
$ cargo test --features wasi-threads -p wasmtime-cli -- cli_tests
```
[spec]: https://github.com/WebAssembly/wasi-threads
[wasi-libc]: https://github.com/WebAssembly/wasi-libc
This change does not protect the WASI implementations themselves from
concurrent access. This is already complete in previous commits or left
for future commits in certain cases (e.g., wasi-nn).
* wasi-threads: factor out process exit logic
As is being discussed [elsewhere], either calling `proc_exit` or
trapping in any thread should halt execution of all threads. The
Wasmtime CLI already has logic for adapting a WebAssembly error code to
a code expected in each OS. This change factors out this logic to a new
function, `maybe_exit_on_error`, for use within the `wasi-threads`
implementation.
This will work reasonably well for CLI users of Wasmtime +
`wasi-threads`, but embedders will want something better in the future:
when a `wasi-threads` threads fails, they may not want their application
to exit. Handling this is tricky, because it will require cancelling the
threads spawned by the `wasi-threads` implementation, something that is
not trivial to do in Rust. With this change, we defer that work until
later in order to provide a working implementation of `wasi-threads` for
experimentation.
[elsewhere]: https://github.com/WebAssembly/wasi-threads/pull/17
* review: work around `fd_fdstat_set_flags`
In order to make progress with wasi-threads, this change temporarily
works around limitations induced by `wasi-common`'s
`fd_fdstat_set_flags` to allow `&mut self` use in the implementation.
Eventual resolution is tracked in
https://github.com/bytecodealliance/wasmtime/issues/5643. This change
makes several related helper functions (e.g., `set_fdflags`) take `&mut
self` as well.
* test: use `wait`/`notify` to improve `threads.wat` test
Previously, the test simply executed in a loop for some hardcoded number
of iterations. This changes uses `wait` and `notify` and atomic
operations to keep track of when the spawned threads are done and join
on the main thread appropriately.
* various fixes and tweaks due to the PR review
---------
Signed-off-by: Harald Hoyer <harald@profian.com>
Co-authored-by: Harald Hoyer <harald@profian.com>
Co-authored-by: Alex Crichton <alex@alexcrichton.com>
223 lines
7.6 KiB
Rust
223 lines
7.6 KiB
Rust
// The windows scheduler is unmaintained and due for a rewrite.
|
|
//
|
|
// Rather than use a polling mechanism for file read/write readiness,
|
|
// it checks readiness just once, before sleeping for any timer subscriptions.
|
|
// Checking stdin readiness uses a worker thread which, once started, lives for the
|
|
// lifetime of the process.
|
|
//
|
|
// We suspect there are bugs in this scheduler, however, we have not
|
|
// taken the time to improve it. See bug #2880.
|
|
|
|
use once_cell::sync::Lazy;
|
|
use std::ops::Deref;
|
|
use std::sync::mpsc::{self, Receiver, RecvTimeoutError, Sender, TryRecvError};
|
|
use std::sync::Mutex;
|
|
use std::thread;
|
|
use std::time::Duration;
|
|
use wasi_common::sched::subscription::{RwEventFlags, Subscription};
|
|
use wasi_common::{file::WasiFile, sched::Poll, Error, ErrorExt};
|
|
|
|
pub async fn poll_oneoff<'a>(poll: &mut Poll<'a>) -> Result<(), Error> {
|
|
poll_oneoff_(poll, wasi_file_is_stdin).await
|
|
}
|
|
|
|
pub async fn poll_oneoff_<'a>(
|
|
poll: &mut Poll<'a>,
|
|
file_is_stdin: impl Fn(&dyn WasiFile) -> bool,
|
|
) -> Result<(), Error> {
|
|
if poll.is_empty() {
|
|
return Ok(());
|
|
}
|
|
|
|
let mut ready = false;
|
|
let waitmode = if let Some(t) = poll.earliest_clock_deadline() {
|
|
if let Some(duration) = t.duration_until() {
|
|
WaitMode::Timeout(duration)
|
|
} else {
|
|
WaitMode::Immediate
|
|
}
|
|
} else {
|
|
if ready {
|
|
WaitMode::Immediate
|
|
} else {
|
|
WaitMode::Infinite
|
|
}
|
|
};
|
|
|
|
let mut stdin_read_subs = Vec::new();
|
|
let mut immediate_reads = Vec::new();
|
|
let mut immediate_writes = Vec::new();
|
|
for s in poll.rw_subscriptions() {
|
|
match s {
|
|
Subscription::Read(r) => {
|
|
if file_is_stdin(r.file.deref()) {
|
|
stdin_read_subs.push(r);
|
|
} else if r.file.pollable().is_some() {
|
|
immediate_reads.push(r);
|
|
} else {
|
|
return Err(Error::invalid_argument().context("file is not pollable"));
|
|
}
|
|
}
|
|
Subscription::Write(w) => {
|
|
if w.file.pollable().is_some() {
|
|
immediate_writes.push(w);
|
|
} else {
|
|
return Err(Error::invalid_argument().context("file is not pollable"));
|
|
}
|
|
}
|
|
Subscription::MonotonicClock { .. } => unreachable!(),
|
|
}
|
|
}
|
|
|
|
if !stdin_read_subs.is_empty() {
|
|
let state = STDIN_POLL
|
|
.lock()
|
|
.map_err(|_| Error::trap(anyhow::Error::msg("failed to take lock of STDIN_POLL")))?
|
|
.poll(waitmode)?;
|
|
for readsub in stdin_read_subs.into_iter() {
|
|
match state {
|
|
PollState::Ready => {
|
|
readsub.complete(1, RwEventFlags::empty());
|
|
ready = true;
|
|
}
|
|
PollState::NotReady | PollState::TimedOut => {}
|
|
PollState::Error(ref e) => {
|
|
// Unfortunately, we need to deliver the Error to each of the
|
|
// subscriptions, but there is no Clone on std::io::Error. So, we convert it to the
|
|
// kind, and then back to std::io::Error, and finally to anyhow::Error.
|
|
// When its time to turn this into an errno elsewhere, the error kind will
|
|
// be inspected.
|
|
let ekind = e.kind();
|
|
let ioerror = std::io::Error::from(ekind);
|
|
readsub.error(ioerror.into());
|
|
ready = true;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
for r in immediate_reads {
|
|
match r.file.num_ready_bytes() {
|
|
Ok(ready_bytes) => {
|
|
r.complete(ready_bytes, RwEventFlags::empty());
|
|
ready = true;
|
|
}
|
|
Err(e) => {
|
|
r.error(e);
|
|
ready = true;
|
|
}
|
|
}
|
|
}
|
|
for w in immediate_writes {
|
|
// Everything is always ready for writing, apparently?
|
|
w.complete(0, RwEventFlags::empty());
|
|
ready = true;
|
|
}
|
|
|
|
if !ready {
|
|
if let WaitMode::Timeout(duration) = waitmode {
|
|
thread::sleep(duration);
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
pub fn wasi_file_is_stdin(f: &dyn WasiFile) -> bool {
|
|
f.as_any().is::<crate::stdio::Stdin>()
|
|
}
|
|
|
|
enum PollState {
|
|
Ready,
|
|
NotReady, // Not ready, but did not wait
|
|
TimedOut, // Not ready, waited until timeout
|
|
Error(std::io::Error),
|
|
}
|
|
|
|
#[derive(Copy, Clone)]
|
|
enum WaitMode {
|
|
Timeout(Duration),
|
|
Infinite,
|
|
Immediate,
|
|
}
|
|
|
|
struct StdinPoll {
|
|
request_tx: Sender<()>,
|
|
notify_rx: Receiver<PollState>,
|
|
}
|
|
|
|
static STDIN_POLL: Lazy<Mutex<StdinPoll>> = Lazy::new(StdinPoll::new);
|
|
|
|
impl StdinPoll {
|
|
pub fn new() -> Mutex<Self> {
|
|
let (request_tx, request_rx) = mpsc::channel();
|
|
let (notify_tx, notify_rx) = mpsc::channel();
|
|
thread::spawn(move || Self::event_loop(request_rx, notify_tx));
|
|
Mutex::new(StdinPoll {
|
|
request_tx,
|
|
notify_rx,
|
|
})
|
|
}
|
|
|
|
// This function should not be used directly.
|
|
// Correctness of this function crucially depends on the fact that
|
|
// mpsc::Receiver is !Sync.
|
|
fn poll(&self, wait_mode: WaitMode) -> Result<PollState, Error> {
|
|
match self.notify_rx.try_recv() {
|
|
// Clean up possibly unread result from previous poll.
|
|
Ok(_) | Err(TryRecvError::Empty) => {}
|
|
Err(TryRecvError::Disconnected) => {
|
|
return Err(Error::trap(anyhow::Error::msg(
|
|
"StdinPoll notify_rx channel closed",
|
|
)))
|
|
}
|
|
}
|
|
|
|
// Notify the worker thread to poll stdin
|
|
self.request_tx
|
|
.send(())
|
|
.map_err(|_| Error::trap(anyhow::Error::msg("request_tx channel closed")))?;
|
|
|
|
// Wait for the worker thread to send a readiness notification
|
|
match wait_mode {
|
|
WaitMode::Timeout(timeout) => match self.notify_rx.recv_timeout(timeout) {
|
|
Ok(r) => Ok(r),
|
|
Err(RecvTimeoutError::Timeout) => Ok(PollState::TimedOut),
|
|
Err(RecvTimeoutError::Disconnected) => Err(Error::trap(anyhow::Error::msg(
|
|
"StdinPoll notify_rx channel closed",
|
|
))),
|
|
},
|
|
WaitMode::Infinite => self
|
|
.notify_rx
|
|
.recv()
|
|
.map_err(|_| Error::trap(anyhow::Error::msg("StdinPoll notify_rx channel closed"))),
|
|
WaitMode::Immediate => match self.notify_rx.try_recv() {
|
|
Ok(r) => Ok(r),
|
|
Err(TryRecvError::Empty) => Ok(PollState::NotReady),
|
|
Err(TryRecvError::Disconnected) => Err(Error::trap(anyhow::Error::msg(
|
|
"StdinPoll notify_rx channel closed",
|
|
))),
|
|
},
|
|
}
|
|
}
|
|
|
|
fn event_loop(request_rx: Receiver<()>, notify_tx: Sender<PollState>) -> ! {
|
|
use std::io::BufRead;
|
|
loop {
|
|
// Wait on a request:
|
|
request_rx.recv().expect("request_rx channel");
|
|
// Wait for data to appear in stdin. If fill_buf returns any slice, it means
|
|
// that either:
|
|
// (a) there is some data in stdin, if non-empty,
|
|
// (b) EOF was recieved, if its empty
|
|
// Linux returns `POLLIN` in both cases, so we imitate this behavior.
|
|
let resp = match std::io::stdin().lock().fill_buf() {
|
|
Ok(_) => PollState::Ready,
|
|
Err(e) => PollState::Error(e),
|
|
};
|
|
// Notify about data in stdin. If the read on this channel has timed out, the
|
|
// next poller will have to clean the channel.
|
|
notify_tx.send(resp).expect("notify_tx channel");
|
|
}
|
|
}
|
|
}
|