Files
wasmtime/crates/wasi-common/cap-std-sync/src/net.rs
Andrew Brown edfa10d607 wasi-threads: an initial implementation (#5484)
This commit includes a set of changes that add initial support for `wasi-threads` to Wasmtime:

* feat: remove mutability from the WasiCtx Table

This patch adds interior mutability to the WasiCtx Table and the Table elements.

Major pain points:
* `File` only needs `RwLock<cap_std::fs::File>` to implement
  `File::set_fdflags()` on Windows, because of [1]
* Because `File` needs a `RwLock` and `RwLock*Guard` cannot
  be hold across an `.await`, The `async` from
  `async fn num_ready_bytes(&self)` had to be removed
* Because `File` needs a `RwLock` and `RwLock*Guard` cannot
  be dereferenced in `pollable`, the signature of
  `fn pollable(&self) -> Option<rustix::fd::BorrowedFd>`
  changed to `fn pollable(&self) -> Option<Arc<dyn AsFd + '_>>`

[1] da238e324e/src/fs/fd_flags.rs (L210-L217)

* wasi-threads: add an initial implementation

This change is a first step toward implementing `wasi-threads` in
Wasmtime. We may find that it has some missing pieces, but the core
functionality is there: when `wasi::thread_spawn` is called by a running
WebAssembly module, a function named `wasi_thread_start` is found in the
module's exports and called in a new instance. The shared memory of the
original instance is reused in the new instance.

This new WASI proposal is in its early stages and details are still
being hashed out in the [spec] and [wasi-libc] repositories. Due to its
experimental state, the `wasi-threads` functionality is hidden behind
both a compile-time and runtime flag: one must build with `--features
wasi-threads` but also run the Wasmtime CLI with `--wasm-features
threads` and `--wasi-modules experimental-wasi-threads`. One can
experiment with `wasi-threads` by running:

```console
$ cargo run --features wasi-threads -- \
    --wasm-features threads --wasi-modules experimental-wasi-threads \
    <a threads-enabled module>
```

Threads-enabled Wasm modules are not yet easy to build. Hopefully this
is resolved soon, but in the meantime see the use of
`THREAD_MODEL=posix` in the [wasi-libc] repository for some clues on
what is necessary. Wiggle complicates things by requiring the Wasm
memory to be exported with a certain name and `wasi-threads` also
expects that memory to be imported; this build-time obstacle can be
overcome with the `--import-memory --export-memory` flags only available
in the latest Clang tree. Due to all of this, the included tests are
written directly in WAT--run these with:

```console
$ cargo test --features wasi-threads -p wasmtime-cli -- cli_tests
```

[spec]: https://github.com/WebAssembly/wasi-threads
[wasi-libc]: https://github.com/WebAssembly/wasi-libc

This change does not protect the WASI implementations themselves from
concurrent access. This is already complete in previous commits or left
for future commits in certain cases (e.g., wasi-nn).

* wasi-threads: factor out process exit logic

As is being discussed [elsewhere], either calling `proc_exit` or
trapping in any thread should halt execution of all threads. The
Wasmtime CLI already has logic for adapting a WebAssembly error code to
a code expected in each OS. This change factors out this logic to a new
function, `maybe_exit_on_error`, for use within the `wasi-threads`
implementation.

This will work reasonably well for CLI users of Wasmtime +
`wasi-threads`, but embedders will want something better in the future:
when a `wasi-threads` threads fails, they may not want their application
to exit. Handling this is tricky, because it will require cancelling the
threads spawned by the `wasi-threads` implementation, something that is
not trivial to do in Rust. With this change, we defer that work until
later in order to provide a working implementation of `wasi-threads` for
experimentation.

[elsewhere]: https://github.com/WebAssembly/wasi-threads/pull/17

* review: work around `fd_fdstat_set_flags`

In order to make progress with wasi-threads, this change temporarily
works around limitations induced by `wasi-common`'s
`fd_fdstat_set_flags` to allow `&mut self` use in the implementation.
Eventual resolution is tracked in
https://github.com/bytecodealliance/wasmtime/issues/5643. This change
makes several related helper functions (e.g., `set_fdflags`) take `&mut
self` as well.

* test: use `wait`/`notify` to improve `threads.wat` test

Previously, the test simply executed in a loop for some hardcoded number
of iterations. This changes uses `wait` and `notify` and atomic
operations to keep track of when the spawned threads are done and join
on the main thread appropriately.

* various fixes and tweaks due to the PR review

---------

Signed-off-by: Harald Hoyer <harald@profian.com>
Co-authored-by: Harald Hoyer <harald@profian.com>
Co-authored-by: Alex Crichton <alex@alexcrichton.com>
2023-02-07 13:43:02 -08:00

403 lines
13 KiB
Rust

#[cfg(windows)]
use io_extras::os::windows::{AsRawHandleOrSocket, RawHandleOrSocket};
use io_lifetimes::AsSocketlike;
#[cfg(unix)]
use io_lifetimes::{AsFd, BorrowedFd};
#[cfg(windows)]
use io_lifetimes::{AsSocket, BorrowedSocket};
use std::any::Any;
use std::convert::TryInto;
use std::io;
#[cfg(unix)]
use system_interface::fs::GetSetFdFlags;
use system_interface::io::IoExt;
use system_interface::io::IsReadWrite;
use system_interface::io::ReadReady;
use wasi_common::{
file::{FdFlags, FileType, RiFlags, RoFlags, SdFlags, SiFlags, WasiFile},
Error, ErrorExt,
};
pub enum Socket {
TcpListener(cap_std::net::TcpListener),
TcpStream(cap_std::net::TcpStream),
#[cfg(unix)]
UnixStream(cap_std::os::unix::net::UnixStream),
#[cfg(unix)]
UnixListener(cap_std::os::unix::net::UnixListener),
}
impl From<cap_std::net::TcpListener> for Socket {
fn from(listener: cap_std::net::TcpListener) -> Self {
Self::TcpListener(listener)
}
}
impl From<cap_std::net::TcpStream> for Socket {
fn from(stream: cap_std::net::TcpStream) -> Self {
Self::TcpStream(stream)
}
}
#[cfg(unix)]
impl From<cap_std::os::unix::net::UnixListener> for Socket {
fn from(listener: cap_std::os::unix::net::UnixListener) -> Self {
Self::UnixListener(listener)
}
}
#[cfg(unix)]
impl From<cap_std::os::unix::net::UnixStream> for Socket {
fn from(stream: cap_std::os::unix::net::UnixStream) -> Self {
Self::UnixStream(stream)
}
}
#[cfg(unix)]
impl From<Socket> for Box<dyn WasiFile> {
fn from(listener: Socket) -> Self {
match listener {
Socket::TcpListener(l) => Box::new(crate::net::TcpListener::from_cap_std(l)),
Socket::UnixListener(l) => Box::new(crate::net::UnixListener::from_cap_std(l)),
Socket::TcpStream(l) => Box::new(crate::net::TcpStream::from_cap_std(l)),
Socket::UnixStream(l) => Box::new(crate::net::UnixStream::from_cap_std(l)),
}
}
}
#[cfg(windows)]
impl From<Socket> for Box<dyn WasiFile> {
fn from(listener: Socket) -> Self {
match listener {
Socket::TcpListener(l) => Box::new(crate::net::TcpListener::from_cap_std(l)),
Socket::TcpStream(l) => Box::new(crate::net::TcpStream::from_cap_std(l)),
}
}
}
macro_rules! wasi_listen_write_impl {
($ty:ty, $stream:ty) => {
#[async_trait::async_trait]
impl WasiFile for $ty {
fn as_any(&self) -> &dyn Any {
self
}
#[cfg(unix)]
fn pollable(&self) -> Option<rustix::fd::BorrowedFd> {
Some(self.0.as_fd())
}
#[cfg(windows)]
fn pollable(&self) -> Option<io_extras::os::windows::RawHandleOrSocket> {
Some(self.0.as_raw_handle_or_socket())
}
async fn sock_accept(&self, fdflags: FdFlags) -> Result<Box<dyn WasiFile>, Error> {
let (stream, _) = self.0.accept()?;
let mut stream = <$stream>::from_cap_std(stream);
stream.set_fdflags(fdflags).await?;
Ok(Box::new(stream))
}
async fn get_filetype(&self) -> Result<FileType, Error> {
Ok(FileType::SocketStream)
}
#[cfg(unix)]
async fn get_fdflags(&self) -> Result<FdFlags, Error> {
let fdflags = get_fd_flags(&self.0)?;
Ok(fdflags)
}
async fn set_fdflags(&mut self, fdflags: FdFlags) -> Result<(), Error> {
if fdflags == wasi_common::file::FdFlags::NONBLOCK {
self.0.set_nonblocking(true)?;
} else if fdflags.is_empty() {
self.0.set_nonblocking(false)?;
} else {
return Err(
Error::invalid_argument().context("cannot set anything else than NONBLOCK")
);
}
Ok(())
}
fn num_ready_bytes(&self) -> Result<u64, Error> {
Ok(1)
}
}
#[cfg(windows)]
impl AsSocket for $ty {
#[inline]
fn as_socket(&self) -> BorrowedSocket<'_> {
self.0.as_socket()
}
}
#[cfg(windows)]
impl AsRawHandleOrSocket for $ty {
#[inline]
fn as_raw_handle_or_socket(&self) -> RawHandleOrSocket {
self.0.as_raw_handle_or_socket()
}
}
#[cfg(unix)]
impl AsFd for $ty {
fn as_fd(&self) -> BorrowedFd<'_> {
self.0.as_fd()
}
}
};
}
pub struct TcpListener(cap_std::net::TcpListener);
impl TcpListener {
pub fn from_cap_std(cap_std: cap_std::net::TcpListener) -> Self {
TcpListener(cap_std)
}
}
wasi_listen_write_impl!(TcpListener, TcpStream);
#[cfg(unix)]
pub struct UnixListener(cap_std::os::unix::net::UnixListener);
#[cfg(unix)]
impl UnixListener {
pub fn from_cap_std(cap_std: cap_std::os::unix::net::UnixListener) -> Self {
UnixListener(cap_std)
}
}
#[cfg(unix)]
wasi_listen_write_impl!(UnixListener, UnixStream);
macro_rules! wasi_stream_write_impl {
($ty:ty, $std_ty:ty) => {
#[async_trait::async_trait]
impl WasiFile for $ty {
fn as_any(&self) -> &dyn Any {
self
}
#[cfg(unix)]
fn pollable(&self) -> Option<rustix::fd::BorrowedFd> {
Some(self.0.as_fd())
}
#[cfg(windows)]
fn pollable(&self) -> Option<io_extras::os::windows::RawHandleOrSocket> {
Some(self.0.as_raw_handle_or_socket())
}
async fn get_filetype(&self) -> Result<FileType, Error> {
Ok(FileType::SocketStream)
}
#[cfg(unix)]
async fn get_fdflags(&self) -> Result<FdFlags, Error> {
let fdflags = get_fd_flags(&self.0)?;
Ok(fdflags)
}
async fn set_fdflags(&mut self, fdflags: FdFlags) -> Result<(), Error> {
if fdflags == wasi_common::file::FdFlags::NONBLOCK {
self.0.set_nonblocking(true)?;
} else if fdflags.is_empty() {
self.0.set_nonblocking(false)?;
} else {
return Err(
Error::invalid_argument().context("cannot set anything else than NONBLOCK")
);
}
Ok(())
}
async fn read_vectored<'a>(
&self,
bufs: &mut [io::IoSliceMut<'a>],
) -> Result<u64, Error> {
use std::io::Read;
let n = Read::read_vectored(&mut &*self.as_socketlike_view::<$std_ty>(), bufs)?;
Ok(n.try_into()?)
}
async fn write_vectored<'a>(&self, bufs: &[io::IoSlice<'a>]) -> Result<u64, Error> {
use std::io::Write;
let n = Write::write_vectored(&mut &*self.as_socketlike_view::<$std_ty>(), bufs)?;
Ok(n.try_into()?)
}
async fn peek(&self, buf: &mut [u8]) -> Result<u64, Error> {
let n = self.0.peek(buf)?;
Ok(n.try_into()?)
}
fn num_ready_bytes(&self) -> Result<u64, Error> {
let val = self.as_socketlike_view::<$std_ty>().num_ready_bytes()?;
Ok(val)
}
async fn readable(&self) -> Result<(), Error> {
let (readable, _writeable) = is_read_write(&self.0)?;
if readable {
Ok(())
} else {
Err(Error::io())
}
}
async fn writable(&self) -> Result<(), Error> {
let (_readable, writeable) = is_read_write(&self.0)?;
if writeable {
Ok(())
} else {
Err(Error::io())
}
}
async fn sock_recv<'a>(
&self,
ri_data: &mut [std::io::IoSliceMut<'a>],
ri_flags: RiFlags,
) -> Result<(u64, RoFlags), Error> {
if (ri_flags & !(RiFlags::RECV_PEEK | RiFlags::RECV_WAITALL)) != RiFlags::empty() {
return Err(Error::not_supported());
}
if ri_flags.contains(RiFlags::RECV_PEEK) {
if let Some(first) = ri_data.iter_mut().next() {
let n = self.0.peek(first)?;
return Ok((n as u64, RoFlags::empty()));
} else {
return Ok((0, RoFlags::empty()));
}
}
if ri_flags.contains(RiFlags::RECV_WAITALL) {
let n: usize = ri_data.iter().map(|buf| buf.len()).sum();
self.0.read_exact_vectored(ri_data)?;
return Ok((n as u64, RoFlags::empty()));
}
let n = self.0.read_vectored(ri_data)?;
Ok((n as u64, RoFlags::empty()))
}
async fn sock_send<'a>(
&self,
si_data: &[std::io::IoSlice<'a>],
si_flags: SiFlags,
) -> Result<u64, Error> {
if si_flags != SiFlags::empty() {
return Err(Error::not_supported());
}
let n = self.0.write_vectored(si_data)?;
Ok(n as u64)
}
async fn sock_shutdown(&self, how: SdFlags) -> Result<(), Error> {
let how = if how == SdFlags::RD | SdFlags::WR {
cap_std::net::Shutdown::Both
} else if how == SdFlags::RD {
cap_std::net::Shutdown::Read
} else if how == SdFlags::WR {
cap_std::net::Shutdown::Write
} else {
return Err(Error::invalid_argument());
};
self.0.shutdown(how)?;
Ok(())
}
}
#[cfg(unix)]
impl AsFd for $ty {
fn as_fd(&self) -> BorrowedFd<'_> {
self.0.as_fd()
}
}
#[cfg(windows)]
impl AsSocket for $ty {
/// Borrows the socket.
fn as_socket(&self) -> BorrowedSocket<'_> {
self.0.as_socket()
}
}
#[cfg(windows)]
impl AsRawHandleOrSocket for TcpStream {
#[inline]
fn as_raw_handle_or_socket(&self) -> RawHandleOrSocket {
self.0.as_raw_handle_or_socket()
}
}
};
}
pub struct TcpStream(cap_std::net::TcpStream);
impl TcpStream {
pub fn from_cap_std(socket: cap_std::net::TcpStream) -> Self {
TcpStream(socket)
}
}
wasi_stream_write_impl!(TcpStream, std::net::TcpStream);
#[cfg(unix)]
pub struct UnixStream(cap_std::os::unix::net::UnixStream);
#[cfg(unix)]
impl UnixStream {
pub fn from_cap_std(socket: cap_std::os::unix::net::UnixStream) -> Self {
UnixStream(socket)
}
}
#[cfg(unix)]
wasi_stream_write_impl!(UnixStream, std::os::unix::net::UnixStream);
pub fn filetype_from(ft: &cap_std::fs::FileType) -> FileType {
use cap_fs_ext::FileTypeExt;
if ft.is_block_device() {
FileType::SocketDgram
} else {
FileType::SocketStream
}
}
/// Return the file-descriptor flags for a given file-like object.
///
/// This returns the flags needed to implement [`WasiFile::get_fdflags`].
pub fn get_fd_flags<Socketlike: AsSocketlike>(
f: Socketlike,
) -> io::Result<wasi_common::file::FdFlags> {
// On Unix-family platforms, we can use the same system call that we'd use
// for files on sockets here.
#[cfg(not(windows))]
{
let mut out = wasi_common::file::FdFlags::empty();
if f.get_fd_flags()?
.contains(system_interface::fs::FdFlags::NONBLOCK)
{
out |= wasi_common::file::FdFlags::NONBLOCK;
}
Ok(out)
}
// On Windows, sockets are different, and there is no direct way to
// query for the non-blocking flag. We can get a sufficient approximation
// by testing whether a zero-length `recv` appears to block.
#[cfg(windows)]
match rustix::net::recv(f, &mut [], rustix::net::RecvFlags::empty()) {
Ok(_) => Ok(wasi_common::file::FdFlags::empty()),
Err(rustix::io::Errno::WOULDBLOCK) => Ok(wasi_common::file::FdFlags::NONBLOCK),
Err(e) => Err(e.into()),
}
}
/// Return the file-descriptor flags for a given file-like object.
///
/// This returns the flags needed to implement [`WasiFile::get_fdflags`].
pub fn is_read_write<Socketlike: AsSocketlike>(f: Socketlike) -> io::Result<(bool, bool)> {
// On Unix-family platforms, we have an `IsReadWrite` impl.
#[cfg(not(windows))]
{
f.is_read_write()
}
// On Windows, we only have a `TcpStream` impl, so make a view first.
#[cfg(windows)]
{
f.as_socketlike_view::<std::net::TcpStream>()
.is_read_write()
}
}