diff --git a/Cargo.lock b/Cargo.lock index 05e1192e64..b287e067bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2541,12 +2541,14 @@ dependencies = [ name = "wasi-c2-cap-std-sync" version = "0.22.0" dependencies = [ + "anyhow", "bitflags", "cap-fs-ext", "cap-rand", "cap-std", "cap-time-ext", "fs-set-times", + "lazy_static", "libc", "system-interface", "tracing", diff --git a/crates/wasi-c2/cap-std-sync/Cargo.toml b/crates/wasi-c2/cap-std-sync/Cargo.toml index ad9880e685..8c51136400 100644 --- a/crates/wasi-c2/cap-std-sync/Cargo.toml +++ b/crates/wasi-c2/cap-std-sync/Cargo.toml @@ -14,6 +14,7 @@ publish = false [dependencies] wasi-c2 = { path = "../" } +anyhow = "1.0" cap-std = "0.11" cap-fs-ext = "0.11" cap-time-ext = "0.11" @@ -29,3 +30,4 @@ libc = "0.2" [target.'cfg(windows)'.dependencies] winapi = "0.3" +lazy_static = "1.4" diff --git a/crates/wasi-c2/cap-std-sync/src/lib.rs b/crates/wasi-c2/cap-std-sync/src/lib.rs index 0d4063f08a..495d9ed35e 100644 --- a/crates/wasi-c2/cap-std-sync/src/lib.rs +++ b/crates/wasi-c2/cap-std-sync/src/lib.rs @@ -17,7 +17,7 @@ impl WasiCtxBuilder { WasiCtxBuilder(WasiCtx::builder( random(), clocks::clocks(), - Box::new(sched::SyncSched), + Box::new(sched::SyncSched::new()), Rc::new(RefCell::new(Table::new())), )) } diff --git a/crates/wasi-c2/cap-std-sync/src/sched/unix.rs b/crates/wasi-c2/cap-std-sync/src/sched/unix.rs index 24ce8b251f..ad169f0408 100644 --- a/crates/wasi-c2/cap-std-sync/src/sched/unix.rs +++ b/crates/wasi-c2/cap-std-sync/src/sched/unix.rs @@ -15,6 +15,12 @@ use poll::{PollFd, PollFlags}; pub struct SyncSched; +impl SyncSched { + pub fn new() -> Self { + SyncSched + } +} + impl WasiSched for SyncSched { fn poll_oneoff<'a>(&self, poll: &'a Poll<'a>) -> Result<(), Error> { if poll.is_empty() { @@ -43,10 +49,7 @@ impl WasiSched for SyncSched { let ready = loop { let poll_timeout = if let Some(t) = timeout { - let duration = t - .deadline - .checked_duration_since(t.clock.now(t.precision)) - .unwrap_or(Duration::from_secs(0)); + let duration = t.duration_until().unwrap_or(Duration::from_secs(0)); (duration.as_millis() + 1) // XXX try always rounding up? .try_into() .map_err(|_| Error::overflow().context("poll timeout"))? diff --git a/crates/wasi-c2/cap-std-sync/src/sched/windows.rs b/crates/wasi-c2/cap-std-sync/src/sched/windows.rs index 7bb6d9a824..8b7bdd3317 100644 --- a/crates/wasi-c2/cap-std-sync/src/sched/windows.rs +++ b/crates/wasi-c2/cap-std-sync/src/sched/windows.rs @@ -1,21 +1,135 @@ - +use anyhow::{anyhow, Context}; +use std::ops::Deref; use std::os::windows::io::{AsRawHandle, RawHandle}; +use std::sync::mpsc::{self, Receiver, RecvTimeoutError, Sender, TryRecvError}; +use std::sync::Mutex; +use std::thread; +use std::time::Duration; use wasi_c2::{ file::WasiFile, - sched::{Poll, WasiSched}, - Error, + sched::{ + subscription::{RwEventFlags, Subscription}, + Poll, WasiSched, + }, + Error, ErrorExt, }; -pub struct SyncSched; +pub struct SyncSched { + stdin_poll: Mutex, +} + +impl SyncSched { + pub fn new() -> Self { + Self { + stdin_poll: StdinPoll::new(), + } + } +} impl WasiSched for SyncSched { fn poll_oneoff<'a>(&self, poll: &'a Poll<'a>) -> Result<(), Error> { if poll.is_empty() { return Ok(()); } - todo!() + + let mut ready = false; + let timeout = poll.earliest_clock_deadline(); + + let mut stdin_read_subs = Vec::new(); + let mut immediate_subs = Vec::new(); + for s in poll.rw_subscriptions() { + match s { + Subscription::Read(r) if r.file.as_any().is::() => { + stdin_read_subs.push(r); + } + Subscription::Read(rw) | Subscription::Write(rw) => { + if wasi_file_raw_handle(rw.file.deref()).is_some() { + immediate_subs.push(s); + } else { + return Err(Error::invalid_argument() + .context("read/write subscription fd downcast failed")); + } + } + Subscription::MonotonicClock { .. } => unreachable!(), + } + } + + if !stdin_read_subs.is_empty() { + let waitmode = if let Some(t) = timeout { + if let Some(duration) = t.duration_until() { + WaitMode::Timeout(duration) + } else { + WaitMode::Immediate + } + } else { + if ready { + WaitMode::Immediate + } else { + WaitMode::Infinite + } + }; + let state = STDIN_POLL + .lock() + .map_err(|_| anyhow!("failed to take lock of STDIN_POLL"))? + .poll(waitmode)?; + for readsub in stdin_read_subs.into_iter() { + match state { + PollState::Ready => { + readsub.complete(1, RwEventFlags::empty()); + ready = true; + } + PollState::NotReady | PollState::TimedOut => {} + PollState::Error(ref e) => { + // Unfortunately, we need to deliver the Error to each of the + // subscriptions, but there is no Clone on std::io::Error. So, we convert it to the + // kind, and then back to std::io::Error, and finally to anyhow::Error. + // When its time to turn this into an errno elsewhere, the error kind will + // be inspected. + let ekind = e.kind(); + let ioerror = std::io::Error::from(ekind); + readsub.error(ioerror.into()); + ready = true; + } + } + } + } + for sub in immediate_subs { + match sub { + Subscription::Read(r) => { + // XXX This doesnt strictly preserve the behavior in the earlier + // implementation, which would always do complete(0) for reads from + // stdout/err. + match r.file.num_ready_bytes() { + Ok(ready_bytes) => { + r.complete(ready_bytes, RwEventFlags::empty()); + ready = true; + } + Err(e) => { + r.error(e); + ready = true; + } + } + } + Subscription::Write(w) => { + // Everything is always ready for writing, apparently? + w.complete(0, RwEventFlags::empty()); + ready = true; + } + Subscription::MonotonicClock { .. } => unreachable!(), + } + } + + if !ready { + if let Some(t) = timeout { + if let Some(duration) = t.duration_until() { + thread::sleep(duration); + } + } + } + + Ok(()) } fn sched_yield(&self) -> Result<(), Error> { - std::thread::yield_now(); + thread::yield_now(); Ok(()) } } @@ -50,3 +164,97 @@ fn wasi_file_raw_handle(f: &dyn WasiFile) -> Option { None } } + +enum PollState { + Ready, + NotReady, // Not ready, but did not wait + TimedOut, // Not ready, waited until timeout + Error(std::io::Error), +} + +enum WaitMode { + Timeout(Duration), + Infinite, + Immediate, +} + +struct StdinPoll { + request_tx: Sender<()>, + notify_rx: Receiver, +} + +lazy_static::lazy_static! { + static ref STDIN_POLL: Mutex = StdinPoll::new(); +} + +impl StdinPoll { + pub fn new() -> Mutex { + let (request_tx, request_rx) = mpsc::channel(); + let (notify_tx, notify_rx) = mpsc::channel(); + thread::spawn(move || Self::event_loop(request_rx, notify_tx)); + Mutex::new(StdinPoll { + request_tx, + notify_rx, + }) + } + + // This function should not be used directly. + // Correctness of this function crucially depends on the fact that + // mpsc::Receiver is !Sync. + fn poll(&self, wait_mode: WaitMode) -> Result { + match self.notify_rx.try_recv() { + // Clean up possibly unread result from previous poll. + Ok(_) | Err(TryRecvError::Empty) => {} + Err(TryRecvError::Disconnected) => { + return Err(anyhow!("StdinPoll notify_rx channel closed")) + } + } + + // Notify the worker thread to poll stdin + self.request_tx + .send(()) + .context("request_tx channel closed")?; + + // Wait for the worker thread to send a readiness notification + match wait_mode { + WaitMode::Timeout(timeout) => match self.notify_rx.recv_timeout(timeout) { + Ok(r) => Ok(r), + Err(RecvTimeoutError::Timeout) => Ok(PollState::TimedOut), + Err(RecvTimeoutError::Disconnected) => { + Err(anyhow!("StdinPoll notify_rx channel closed")) + } + }, + WaitMode::Infinite => self + .notify_rx + .recv() + .context("StdinPoll notify_rx channel closed"), + WaitMode::Immediate => match self.notify_rx.try_recv() { + Ok(r) => Ok(r), + Err(TryRecvError::Empty) => Ok(PollState::NotReady), + Err(TryRecvError::Disconnected) => { + Err(anyhow!("StdinPoll notify_rx channel closed")) + } + }, + } + } + + fn event_loop(request_rx: Receiver<()>, notify_tx: Sender) -> ! { + use std::io::BufRead; + loop { + // Wait on a request: + request_rx.recv().expect("request_rx channel"); + // Wait for data to appear in stdin. If fill_buf returns any slice, it means + // that either: + // (a) there is some data in stdin, if non-empty, + // (b) EOF was recieved, if its empty + // Linux returns `POLLIN` in both cases, so we imitate this behavior. + let resp = match std::io::stdin().lock().fill_buf() { + Ok(_) => PollState::Ready, + Err(e) => PollState::Error(e), + }; + // Notify about data in stdin. If the read on this channel has timed out, the + // next poller will have to clean the channel. + notify_tx.send(resp).expect("notify_tx channel"); + } + } +} diff --git a/crates/wasi-c2/src/sched/subscription.rs b/crates/wasi-c2/src/sched/subscription.rs index 7994dfcfc3..799cfc665f 100644 --- a/crates/wasi-c2/src/sched/subscription.rs +++ b/crates/wasi-c2/src/sched/subscription.rs @@ -44,6 +44,9 @@ impl<'a> MonotonicClockSubscription<'a> { pub fn now(&self) -> Instant { self.clock.now(self.precision) } + pub fn duration_until(&self) -> Option { + self.deadline.checked_duration_since(self.now()) + } pub fn result(&self) -> Option> { if self.now().checked_duration_since(self.deadline).is_some() { Some(Ok(()))