From 9880d09f1fbcbf4d82b81c9c915658bdcf7194b9 Mon Sep 17 00:00:00 2001 From: Pat Hickey Date: Thu, 15 Apr 2021 14:50:17 -0700 Subject: [PATCH] do some programming --- crates/wasi-common/tokio/src/dir.rs | 353 +++++++++++++++++++++++++++ crates/wasi-common/tokio/src/file.rs | 278 +++++++++++++++++++++ crates/wasi-common/tokio/src/lib.rs | 17 ++ 3 files changed, 648 insertions(+) create mode 100644 crates/wasi-common/tokio/src/dir.rs create mode 100644 crates/wasi-common/tokio/src/file.rs diff --git a/crates/wasi-common/tokio/src/dir.rs b/crates/wasi-common/tokio/src/dir.rs new file mode 100644 index 0000000000..9188d0cca7 --- /dev/null +++ b/crates/wasi-common/tokio/src/dir.rs @@ -0,0 +1,353 @@ +use crate::{ + asyncify, + file::{filetype_from, File}, +}; +use cap_fs_ext::{DirEntryExt, DirExt, MetadataExt, SystemTimeSpec}; +use std::any::Any; +use std::path::{Path, PathBuf}; +use wasi_common::{ + dir::{ReaddirCursor, ReaddirEntity, WasiDir}, + file::{FdFlags, FileType, Filestat, OFlags, WasiFile}, + Error, ErrorExt, +}; + +pub struct Dir(cap_std::fs::Dir); + +impl Dir { + pub fn from_cap_std(dir: cap_std::fs::Dir) -> Self { + Dir(dir) + } +} + +#[wiggle::async_trait] +impl WasiDir for Dir { + fn as_any(&self) -> &dyn Any { + self + } + async fn open_file( + &self, + symlink_follow: bool, + path: &str, + oflags: OFlags, + read: bool, + write: bool, + fdflags: FdFlags, + ) -> Result, Error> { + use cap_fs_ext::{FollowSymlinks, OpenOptionsFollowExt}; + use wasi_common::file::FdFlags; + + let mut opts = cap_std::fs::OpenOptions::new(); + + if oflags.contains(OFlags::CREATE | OFlags::EXCLUSIVE) { + opts.create_new(true); + opts.write(true); + } else if oflags.contains(OFlags::CREATE) { + opts.create(true); + opts.write(true); + } + if oflags.contains(OFlags::TRUNCATE) { + opts.truncate(true); + } + if read { + opts.read(true); + } + if write { + opts.write(true); + } else { + // If not opened write, open read. This way the OS lets us open the file. + // If FileCaps::READ is not set, read calls will be rejected at the + // get_cap check. + opts.read(true); + } + if fdflags.contains(FdFlags::APPEND) { + opts.append(true); + } + + if symlink_follow { + opts.follow(FollowSymlinks::Yes); + } else { + opts.follow(FollowSymlinks::No); + } + // the DSYNC, SYNC, and RSYNC flags are ignored! We do not + // have support for them in cap-std yet. + // ideally OpenOptions would just support this though: + // https://github.com/bytecodealliance/cap-std/issues/146 + if fdflags.intersects(FdFlags::DSYNC | FdFlags::SYNC | FdFlags::RSYNC) { + return Err(Error::not_supported().context("SYNC family of FdFlags")); + } + + let f = asyncify(move || self.0.open_with(Path::new(path), &opts)).await?; + let mut f = File::from_cap_std(f); + // NONBLOCK does not have an OpenOption either, but we can patch that on with set_fd_flags: + if fdflags.contains(FdFlags::NONBLOCK) { + f.set_fdflags(FdFlags::NONBLOCK).await?; + } + Ok(Box::new(f)) + } + + async fn open_dir(&self, symlink_follow: bool, path: &str) -> Result, Error> { + let path = unsafe { std::mem::transmute::<_, &'static str>(path) }; + let d = if symlink_follow { + asyncify(move || self.0.open_dir(Path::new(path))).await? + } else { + asyncify(move || self.0.open_dir_nofollow(Path::new(path))).await? + }; + Ok(Box::new(Dir::from_cap_std(d))) + } + + async fn create_dir(&self, path: &str) -> Result<(), Error> { + asyncify(|| self.0.create_dir(Path::new(path))).await?; + Ok(()) + } + async fn readdir( + &self, + cursor: ReaddirCursor, + ) -> Result>>, Error> { + // cap_std's read_dir does not include . and .., we should prepend these. + // Why does the Ok contain a tuple? We can't construct a cap_std::fs::DirEntry, and we don't + // have enough info to make a ReaddirEntity yet. + let dir_meta = asyncify(|| self.0.dir_metadata()).await?; + let rd = vec![ + { + let name = ".".to_owned(); + Ok((FileType::Directory, dir_meta.ino(), name)) + }, + { + let name = "..".to_owned(); + Ok((FileType::Directory, dir_meta.ino(), name)) + }, + ] + .into_iter() + .chain( + // Now process the `DirEntry`s: + self.0.entries()?.map(|entry| { + let entry = entry?; + // XXX full_metadata blocks, but we arent in an async iterator: + let meta = entry.full_metadata()?; + let inode = meta.ino(); + let filetype = filetype_from(&meta.file_type()); + let name = entry + .file_name() + .into_string() + .map_err(|_| Error::illegal_byte_sequence().context("filename"))?; + Ok((filetype, inode, name)) + }), + ) + // Enumeration of the iterator makes it possible to define the ReaddirCursor + .enumerate() + .map(|(ix, r)| match r { + Ok((filetype, inode, name)) => Ok(ReaddirEntity { + next: ReaddirCursor::from(ix as u64 + 1), + filetype, + inode, + name, + }), + Err(e) => Err(e), + }) + .skip(u64::from(cursor) as usize); + + Ok(Box::new(rd)) + } + + async fn symlink(&self, src_path: &str, dest_path: &str) -> Result<(), Error> { + asyncify(|| self.0.symlink(src_path, dest_path)).await?; + Ok(()) + } + async fn remove_dir(&self, path: &str) -> Result<(), Error> { + asyncify(|| self.0.remove_dir(Path::new(path))).await?; + Ok(()) + } + + async fn unlink_file(&self, path: &str) -> Result<(), Error> { + asyncify(|| self.0.remove_file_or_symlink(Path::new(path))).await?; + Ok(()) + } + async fn read_link(&self, path: &str) -> Result { + let link = asyncify(|| self.0.read_link(Path::new(path))).await?; + Ok(link) + } + async fn get_filestat(&self) -> Result { + let meta = asyncify(|| self.0.dir_metadata()).await?; + Ok(Filestat { + device_id: meta.dev(), + inode: meta.ino(), + filetype: filetype_from(&meta.file_type()), + nlink: meta.nlink(), + size: meta.len(), + atim: meta.accessed().map(|t| Some(t.into_std())).unwrap_or(None), + mtim: meta.modified().map(|t| Some(t.into_std())).unwrap_or(None), + ctim: meta.created().map(|t| Some(t.into_std())).unwrap_or(None), + }) + } + async fn get_path_filestat( + &self, + path: &str, + follow_symlinks: bool, + ) -> Result { + let meta = if follow_symlinks { + asyncify(|| self.0.metadata(Path::new(path))).await? + } else { + asyncify(|| self.0.symlink_metadata(Path::new(path))).await? + }; + Ok(Filestat { + device_id: meta.dev(), + inode: meta.ino(), + filetype: filetype_from(&meta.file_type()), + nlink: meta.nlink(), + size: meta.len(), + atim: meta.accessed().map(|t| Some(t.into_std())).unwrap_or(None), + mtim: meta.modified().map(|t| Some(t.into_std())).unwrap_or(None), + ctim: meta.created().map(|t| Some(t.into_std())).unwrap_or(None), + }) + } + async fn rename( + &self, + src_path: &str, + dest_dir: &dyn WasiDir, + dest_path: &str, + ) -> Result<(), Error> { + let dest_dir = dest_dir + .as_any() + .downcast_ref::() + .ok_or(Error::badf().context("failed downcast to cap-std Dir"))?; + asyncify(|| { + self.0 + .rename(Path::new(src_path), &dest_dir.0, Path::new(dest_path)) + }) + .await?; + Ok(()) + } + async fn hard_link( + &self, + src_path: &str, + target_dir: &dyn WasiDir, + target_path: &str, + ) -> Result<(), Error> { + let target_dir = target_dir + .as_any() + .downcast_ref::() + .ok_or(Error::badf().context("failed downcast to cap-std Dir"))?; + let src_path = Path::new(src_path); + let target_path = Path::new(target_path); + asyncify(|| self.0.hard_link(src_path, &target_dir.0, target_path)).await?; + Ok(()) + } + async fn set_times( + &self, + path: &str, + atime: Option, + mtime: Option, + follow_symlinks: bool, + ) -> Result<(), Error> { + asyncify(|| { + if follow_symlinks { + self.0.set_times( + Path::new(path), + convert_systimespec(atime), + convert_systimespec(mtime), + ) + } else { + self.0.set_symlink_times( + Path::new(path), + convert_systimespec(atime), + convert_systimespec(mtime), + ) + } + }) + .await?; + Ok(()) + } +} + +fn convert_systimespec(t: Option) -> Option { + match t { + Some(wasi_common::SystemTimeSpec::Absolute(t)) => Some(SystemTimeSpec::Absolute(t)), + Some(wasi_common::SystemTimeSpec::SymbolicNow) => Some(SystemTimeSpec::SymbolicNow), + None => None, + } +} + +#[cfg(test)] +mod test { + use super::Dir; + #[tokio::test] + async fn scratch_dir() { + let tempdir = tempfile::Builder::new() + .prefix("cap-std-sync") + .tempdir() + .expect("create temporary dir"); + let preopen_dir = unsafe { cap_std::fs::Dir::open_ambient_dir(tempdir.path()) } + .expect("open ambient temporary dir"); + let preopen_dir = Dir::from_cap_std(preopen_dir); + wasi_common::WasiDir::open_dir(&preopen_dir, false, ".") + .await + .expect("open the same directory via WasiDir abstraction"); + } + + // Readdir does not work on windows, so we won't test it there. + #[cfg(not(windows))] + #[tokio::test] + async fn readdir() { + use std::collections::HashMap; + use wasi_common::dir::{ReaddirCursor, ReaddirEntity, WasiDir}; + use wasi_common::file::{FdFlags, FileType, OFlags}; + + async fn readdir_into_map(dir: &dyn WasiDir) -> HashMap { + let mut out = HashMap::new(); + for readdir_result in dir + .readdir(ReaddirCursor::from(0)) + .await + .expect("readdir succeeds") + { + let entity = readdir_result.expect("readdir entry is valid"); + out.insert(entity.name.clone(), entity); + } + out + } + + let tempdir = tempfile::Builder::new() + .prefix("cap-std-sync") + .tempdir() + .expect("create temporary dir"); + let preopen_dir = unsafe { cap_std::fs::Dir::open_ambient_dir(tempdir.path()) } + .expect("open ambient temporary dir"); + let preopen_dir = Dir::from_cap_std(preopen_dir); + + let entities = readdir_into_map(&preopen_dir).await; + assert_eq!( + entities.len(), + 2, + "should just be . and .. in empty dir: {:?}", + entities + ); + assert!(entities.get(".").is_some()); + assert!(entities.get("..").is_some()); + + preopen_dir + .open_file( + false, + "file1", + OFlags::CREATE, + true, + false, + FdFlags::empty(), + ) + .await + .expect("create file1"); + + let entities = readdir_into_map(&preopen_dir).await; + assert_eq!(entities.len(), 3, "should be ., .., file1 {:?}", entities); + assert_eq!( + entities.get(".").expect(". entry").filetype, + FileType::Directory + ); + assert_eq!( + entities.get("..").expect(".. entry").filetype, + FileType::Directory + ); + assert_eq!( + entities.get("file1").expect("file1 entry").filetype, + FileType::RegularFile + ); + } +} diff --git a/crates/wasi-common/tokio/src/file.rs b/crates/wasi-common/tokio/src/file.rs new file mode 100644 index 0000000000..209e1eab15 --- /dev/null +++ b/crates/wasi-common/tokio/src/file.rs @@ -0,0 +1,278 @@ +use crate::asyncify; +use cap_fs_ext::MetadataExt; +use fs_set_times::{SetTimes, SystemTimeSpec}; +use std::any::Any; +use std::convert::TryInto; +use std::io; +use system_interface::fs::{FileIoExt, GetSetFdFlags}; +use system_interface::io::ReadReady; +use wasi_common::{ + file::{Advice, FdFlags, FileType, Filestat, WasiFile}, + Error, ErrorExt, +}; + +mod internal { + #[cfg(not(windows))] + use unsafe_io::os::posish::{AsRawFd, RawFd}; + #[cfg(windows)] + use unsafe_io::os::windows::{AsRawHandleOrSocket, RawHandleOrSocket}; + use unsafe_io::OwnsRaw; + use wasi_common::Error; + + // This internal type wraps tokio's File so that we can impl the + // `AsUnsafeFile` trait. We impl this on an internal type, rather than on + // super::File, because we don't want consumers of this library to be able + // to use our `AsUnsafeFile`. + pub(super) struct Internal(pub(super) tokio::fs::File); + + #[cfg(not(windows))] + impl AsRawFd for Internal { + fn as_raw_fd(&self) -> RawFd { + self.0.as_raw_fd() + } + } + + #[cfg(windows)] + impl AsRawHandleOrSocket for Internal { + fn as_raw_handle_or_socket(&self) -> RawHandleOrSocket { + self.0.as_raw_handle_or_socket() + } + } + + // Safety: `Internal` owns its handle. + unsafe impl OwnsRaw for Internal {} + + // Tokio provides implementations of these methods, which are not + // available via AsUnsafeFile. + impl Internal { + pub async fn set_len(&self, size: u64) -> Result<(), Error> { + Ok(self.0.set_len(size).await?) + } + pub async fn sync_data(&self) -> Result<(), Error> { + Ok(self.0.sync_data().await?) + } + pub async fn sync_all(&self) -> Result<(), Error> { + Ok(self.0.sync_all().await?) + } + } +} + +pub struct File(internal::Internal); + +impl File { + pub fn from_cap_std(file: cap_std::fs::File) -> Self { + File(internal::Internal(tokio::fs::File::from_std( + file.into_std(), + ))) + } + + async fn metadata(&self) -> Result { + use unsafe_io::AsUnsafeFile; + asyncify(|| Ok(cap_std::fs::Metadata::from_file(&self.0.as_file_view())?)).await + } +} + +#[wiggle::async_trait] +impl WasiFile for File { + fn as_any(&self) -> &dyn Any { + self + } + async fn datasync(&self) -> Result<(), Error> { + self.0.sync_data().await + } + async fn sync(&self) -> Result<(), Error> { + self.0.sync_all().await + } + async fn get_filetype(&self) -> Result { + let meta = self.metadata().await?; + Ok(filetype_from(&meta.file_type())) + } + async fn get_fdflags(&self) -> Result { + let fdflags = asyncify(|| self.0.get_fd_flags()).await?; + Ok(from_sysif_fdflags(fdflags)) + } + async fn set_fdflags(&mut self, fdflags: FdFlags) -> Result<(), Error> { + if fdflags.intersects( + wasi_common::file::FdFlags::DSYNC + | wasi_common::file::FdFlags::SYNC + | wasi_common::file::FdFlags::RSYNC, + ) { + return Err(Error::invalid_argument().context("cannot set DSYNC, SYNC, or RSYNC flag")); + } + asyncify(move || self.0.set_fd_flags(to_sysif_fdflags(fdflags))).await?; + Ok(()) + } + async fn get_filestat(&self) -> Result { + let meta = self.metadata().await?; + Ok(Filestat { + device_id: meta.dev(), + inode: meta.ino(), + filetype: filetype_from(&meta.file_type()), + nlink: meta.nlink(), + size: meta.len(), + atim: meta.accessed().map(|t| Some(t.into_std())).unwrap_or(None), + mtim: meta.modified().map(|t| Some(t.into_std())).unwrap_or(None), + ctim: meta.created().map(|t| Some(t.into_std())).unwrap_or(None), + }) + } + async fn set_filestat_size(&self, size: u64) -> Result<(), Error> { + // Tokio asyncified this already: + self.0.set_len(size).await?; + Ok(()) + } + async fn advise(&self, offset: u64, len: u64, advice: Advice) -> Result<(), Error> { + asyncify(move || self.0.advise(offset, len, convert_advice(advice))).await?; + Ok(()) + } + async fn allocate(&self, offset: u64, len: u64) -> Result<(), Error> { + asyncify(move || self.0.allocate(offset, len)).await?; + Ok(()) + } + async fn set_times( + &self, + atime: Option, + mtime: Option, + ) -> Result<(), Error> { + asyncify(|| { + self.0 + .set_times(convert_systimespec(atime), convert_systimespec(mtime)) + }) + .await?; + Ok(()) + } + async fn read_vectored<'a>(&self, bufs: &mut [io::IoSliceMut<'a>]) -> Result { + // XXX use tokio's AsyncReadExt trait instead! + let n = self.0.read_vectored(bufs)?; + Ok(n.try_into()?) + } + async fn read_vectored_at<'a>( + &self, + bufs: &mut [io::IoSliceMut<'a>], + offset: u64, + ) -> Result { + let n = asyncify(move || self.0.read_vectored_at(bufs, offset)).await?; + Ok(n.try_into()?) + } + async fn write_vectored<'a>(&self, bufs: &[io::IoSlice<'a>]) -> Result { + // XXX use tokio's AsyncWriteExt trait instead! + let n = self.0.write_vectored(bufs)?; + Ok(n.try_into()?) + } + async fn write_vectored_at<'a>( + &self, + bufs: &[io::IoSlice<'a>], + offset: u64, + ) -> Result { + let n = asyncify(move || self.0.write_vectored_at(bufs, offset)).await?; + Ok(n.try_into()?) + } + async fn seek(&self, pos: std::io::SeekFrom) -> Result { + asyncify(move || self.0.seek(pos)).await + } + async fn peek(&self, buf: &mut [u8]) -> Result { + let n = asyncify(move || self.0.peek(buf)).await?; + Ok(n.try_into()?) + } + async fn num_ready_bytes(&self) -> Result { + use unsafe_io::AsUnsafeFile; + asyncify(|| self.0.as_file_view().num_ready_bytes()).await + } +} +pub fn filetype_from(ft: &cap_std::fs::FileType) -> FileType { + use cap_fs_ext::FileTypeExt; + if ft.is_dir() { + FileType::Directory + } else if ft.is_symlink() { + FileType::SymbolicLink + } else if ft.is_socket() { + if ft.is_block_device() { + FileType::SocketDgram + } else { + FileType::SocketStream + } + } else if ft.is_block_device() { + FileType::BlockDevice + } else if ft.is_char_device() { + FileType::CharacterDevice + } else if ft.is_file() { + FileType::RegularFile + } else { + FileType::Unknown + } +} + +#[cfg(windows)] +use std::os::windows::io::{AsRawHandle, RawHandle}; +#[cfg(windows)] +impl AsRawHandle for File { + fn as_raw_handle(&self) -> RawHandle { + self.0.as_raw_handle() + } +} + +#[cfg(unix)] +use std::os::unix::io::{AsRawFd, RawFd}; +#[cfg(unix)] +impl AsRawFd for File { + fn as_raw_fd(&self) -> RawFd { + self.0.as_raw_fd() + } +} +pub fn convert_systimespec(t: Option) -> Option { + match t { + Some(wasi_common::SystemTimeSpec::Absolute(t)) => { + Some(SystemTimeSpec::Absolute(t.into_std())) + } + Some(wasi_common::SystemTimeSpec::SymbolicNow) => Some(SystemTimeSpec::SymbolicNow), + None => None, + } +} + +pub fn to_sysif_fdflags(f: wasi_common::file::FdFlags) -> system_interface::fs::FdFlags { + let mut out = system_interface::fs::FdFlags::empty(); + if f.contains(wasi_common::file::FdFlags::APPEND) { + out |= system_interface::fs::FdFlags::APPEND; + } + if f.contains(wasi_common::file::FdFlags::DSYNC) { + out |= system_interface::fs::FdFlags::DSYNC; + } + if f.contains(wasi_common::file::FdFlags::NONBLOCK) { + out |= system_interface::fs::FdFlags::NONBLOCK; + } + if f.contains(wasi_common::file::FdFlags::RSYNC) { + out |= system_interface::fs::FdFlags::RSYNC; + } + if f.contains(wasi_common::file::FdFlags::SYNC) { + out |= system_interface::fs::FdFlags::SYNC; + } + out +} +pub fn from_sysif_fdflags(f: system_interface::fs::FdFlags) -> wasi_common::file::FdFlags { + let mut out = wasi_common::file::FdFlags::empty(); + if f.contains(system_interface::fs::FdFlags::APPEND) { + out |= wasi_common::file::FdFlags::APPEND; + } + if f.contains(system_interface::fs::FdFlags::DSYNC) { + out |= wasi_common::file::FdFlags::DSYNC; + } + if f.contains(system_interface::fs::FdFlags::NONBLOCK) { + out |= wasi_common::file::FdFlags::NONBLOCK; + } + if f.contains(system_interface::fs::FdFlags::RSYNC) { + out |= wasi_common::file::FdFlags::RSYNC; + } + if f.contains(system_interface::fs::FdFlags::SYNC) { + out |= wasi_common::file::FdFlags::SYNC; + } + out +} +pub fn convert_advice(advice: Advice) -> system_interface::fs::Advice { + match advice { + Advice::Normal => system_interface::fs::Advice::Normal, + Advice::Sequential => system_interface::fs::Advice::Sequential, + Advice::Random => system_interface::fs::Advice::Random, + Advice::WillNeed => system_interface::fs::Advice::WillNeed, + Advice::DontNeed => system_interface::fs::Advice::DontNeed, + Advice::NoReuse => system_interface::fs::Advice::NoReuse, + } +} diff --git a/crates/wasi-common/tokio/src/lib.rs b/crates/wasi-common/tokio/src/lib.rs index d7d4687428..be03c21216 100644 --- a/crates/wasi-common/tokio/src/lib.rs +++ b/crates/wasi-common/tokio/src/lib.rs @@ -109,3 +109,20 @@ impl WasiCtxBuilder { self.0.build() } } + +pub(crate) async fn asyncify<'a, F, T>(f: F) -> Result +where + F: FnOnce() -> Result + Send + 'a, + T: Send + 'static, +{ + // spawn_blocking requires a 'static function, but since we await on the + // JoinHandle the lifetime of the spawn will be no longer than this function's body + let f: Box Result + Send + 'a> = Box::new(f); + let f = unsafe { + std::mem::transmute::<_, Box Result + Send + 'static>>(f) + }; + match tokio::task::spawn_blocking(|| f()).await { + Ok(res) => Ok(res?), + Err(_) => panic!("spawn_blocking died"), + } +}