diff --git a/Cargo.lock b/Cargo.lock index 3f5d294b2f..3e6649aa34 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -285,6 +285,12 @@ version = "1.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ae44d1a3d5a19df61dd0c8beb138458ac2a53a7ac09eba97d55592540004306b" +[[package]] +name = "bytes" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b700ce4376041dcd0a327fd0097c41095743c4c8af8887265942faf1100bd040" + [[package]] name = "cap-fs-ext" version = "0.13.7" @@ -2819,6 +2825,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "83f0c8e7c0addab50b663055baf787d0af7f413a46e6e7fb9559a4e4db7137a5" dependencies = [ "autocfg 1.0.1", + "bytes", + "memchr", "num_cpus", "pin-project-lite", "tokio-macros", diff --git a/crates/wasi-common/tokio/Cargo.toml b/crates/wasi-common/tokio/Cargo.toml index 13c5b23281..f480de0635 100644 --- a/crates/wasi-common/tokio/Cargo.toml +++ b/crates/wasi-common/tokio/Cargo.toml @@ -15,7 +15,7 @@ include = ["src/**/*", "LICENSE" ] wasi-common = { path = "../", version = "0.26.0" } wasi-cap-std-sync = { path = "../cap-std-sync", version = "0.26.0" } wiggle = { path = "../../wiggle", version = "0.26.0" } -tokio = { version = "1.5.0", features = [ "rt", "fs", "time" ] } +tokio = { version = "1.5.0", features = [ "rt", "fs", "time" , "io-util"] } cap-std = "0.13.7" cap-fs-ext = "0.13.7" cap-time-ext = "0.13.7" diff --git a/crates/wasi-common/tokio/src/file.rs b/crates/wasi-common/tokio/src/file.rs index 209e1eab15..fa63fe8b09 100644 --- a/crates/wasi-common/tokio/src/file.rs +++ b/crates/wasi-common/tokio/src/file.rs @@ -12,56 +12,52 @@ use wasi_common::{ }; mod internal { + use std::sync::{Mutex, MutexGuard}; #[cfg(not(windows))] use unsafe_io::os::posish::{AsRawFd, RawFd}; #[cfg(windows)] use unsafe_io::os::windows::{AsRawHandleOrSocket, RawHandleOrSocket}; use unsafe_io::OwnsRaw; - use wasi_common::Error; // This internal type wraps tokio's File so that we can impl the // `AsUnsafeFile` trait. We impl this on an internal type, rather than on // super::File, because we don't want consumers of this library to be able // to use our `AsUnsafeFile`. - pub(super) struct Internal(pub(super) tokio::fs::File); + // Mutex is required because this type requires internal mutation for the + // tokio AsyncWriteExt methods to work, and must be Send. + pub(super) struct Internal(Mutex); + impl Internal { + pub fn new(f: tokio::fs::File) -> Self { + Internal(Mutex::new(f)) + } + pub fn inner(&self) -> MutexGuard { + self.0.lock().unwrap() + } + } #[cfg(not(windows))] impl AsRawFd for Internal { fn as_raw_fd(&self) -> RawFd { - self.0.as_raw_fd() + self.inner().as_raw_fd() } } #[cfg(windows)] impl AsRawHandleOrSocket for Internal { fn as_raw_handle_or_socket(&self) -> RawHandleOrSocket { - self.0.as_raw_handle_or_socket() + self.inner().as_raw_handle_or_socket() } } // Safety: `Internal` owns its handle. unsafe impl OwnsRaw for Internal {} - - // Tokio provides implementations of these methods, which are not - // available via AsUnsafeFile. - impl Internal { - pub async fn set_len(&self, size: u64) -> Result<(), Error> { - Ok(self.0.set_len(size).await?) - } - pub async fn sync_data(&self) -> Result<(), Error> { - Ok(self.0.sync_data().await?) - } - pub async fn sync_all(&self) -> Result<(), Error> { - Ok(self.0.sync_all().await?) - } - } } pub struct File(internal::Internal); impl File { pub fn from_cap_std(file: cap_std::fs::File) -> Self { - File(internal::Internal(tokio::fs::File::from_std( + File(internal::Internal::new(tokio::fs::File::from_std( file.into_std(), ))) } @@ -78,10 +74,12 @@ impl WasiFile for File { self } async fn datasync(&self) -> Result<(), Error> { - self.0.sync_data().await + self.0.inner().sync_data().await?; + Ok(()) } async fn sync(&self) -> Result<(), Error> { - self.0.sync_all().await + self.0.inner().sync_all().await?; + Ok(()) } async fn get_filetype(&self) -> Result { let meta = self.metadata().await?; @@ -116,8 +114,7 @@ impl WasiFile for File { }) } async fn set_filestat_size(&self, size: u64) -> Result<(), Error> { - // Tokio asyncified this already: - self.0.set_len(size).await?; + self.0.inner().set_len(size).await?; Ok(()) } async fn advise(&self, offset: u64, len: u64, advice: Advice) -> Result<(), Error> { @@ -141,9 +138,17 @@ impl WasiFile for File { Ok(()) } async fn read_vectored<'a>(&self, bufs: &mut [io::IoSliceMut<'a>]) -> Result { - // XXX use tokio's AsyncReadExt trait instead! - let n = self.0.read_vectored(bufs)?; - Ok(n.try_into()?) + use std::ops::DerefMut; + use tokio::io::AsyncReadExt; + let mut nbytes: usize = 0; + for b in bufs.iter_mut() { + let n = self.0.inner().read(b.deref_mut()).await?; + nbytes += n; + if n < b.len() { + break; + } + } + Ok(nbytes.try_into()?) } async fn read_vectored_at<'a>( &self, @@ -154,8 +159,9 @@ impl WasiFile for File { Ok(n.try_into()?) } async fn write_vectored<'a>(&self, bufs: &[io::IoSlice<'a>]) -> Result { - // XXX use tokio's AsyncWriteExt trait instead! - let n = self.0.write_vectored(bufs)?; + use tokio::io::AsyncWriteExt; + let mut n: usize = 0; + n += self.0.inner().write_vectored(bufs).await?; Ok(n.try_into()?) } async fn write_vectored_at<'a>( @@ -201,23 +207,6 @@ pub fn filetype_from(ft: &cap_std::fs::FileType) -> FileType { } } -#[cfg(windows)] -use std::os::windows::io::{AsRawHandle, RawHandle}; -#[cfg(windows)] -impl AsRawHandle for File { - fn as_raw_handle(&self) -> RawHandle { - self.0.as_raw_handle() - } -} - -#[cfg(unix)] -use std::os::unix::io::{AsRawFd, RawFd}; -#[cfg(unix)] -impl AsRawFd for File { - fn as_raw_fd(&self) -> RawFd { - self.0.as_raw_fd() - } -} pub fn convert_systimespec(t: Option) -> Option { match t { Some(wasi_common::SystemTimeSpec::Absolute(t)) => { diff --git a/crates/wasi-common/tokio/src/lib.rs b/crates/wasi-common/tokio/src/lib.rs index be03c21216..7ae8849259 100644 --- a/crates/wasi-common/tokio/src/lib.rs +++ b/crates/wasi-common/tokio/src/lib.rs @@ -85,6 +85,7 @@ impl WasiCtxBuilder { pub fn stderr(self, f: Box) -> Self { WasiCtxBuilder(self.0.stderr(f)) } + // XXX our crate needs its own stdios pub fn inherit_stdin(self) -> Self { self.stdin(Box::new(wasi_cap_std_sync::stdio::stdin())) } @@ -102,7 +103,7 @@ impl WasiCtxBuilder { dir: Dir, guest_path: impl AsRef, ) -> Result { - let dir = Box::new(wasi_cap_std_sync::dir::Dir::from_cap_std(dir)); + let dir = Box::new(crate::dir::Dir::from_cap_std(dir)); Ok(WasiCtxBuilder(self.0.preopened_dir(dir, guest_path)?)) } pub fn build(self) -> Result {