diff --git a/crates/wasi-common/cap-std-sync/src/sched.rs b/crates/wasi-common/cap-std-sync/src/sched.rs index 43af68a952..cbda527109 100644 --- a/crates/wasi-common/cap-std-sync/src/sched.rs +++ b/crates/wasi-common/cap-std-sync/src/sched.rs @@ -1,15 +1,40 @@ #[cfg(unix)] -mod unix; +pub mod unix; #[cfg(unix)] -pub use unix::*; +pub use unix::poll_oneoff; #[cfg(windows)] -mod windows; +pub mod windows; #[cfg(windows)] -pub use windows::*; +pub use windows::poll_oneoff; -use wasi_common::sched::WasiSched; +use std::thread; +use std::time::Duration; +use wasi_common::{ + sched::{Poll, WasiSched}, + Error, +}; +pub struct SyncSched {} +impl SyncSched { + pub fn new() -> Self { + Self {} + } +} +#[async_trait::async_trait(?Send)] +impl WasiSched for SyncSched { + async fn poll_oneoff<'a>(&self, poll: &mut Poll<'a>) -> Result<(), Error> { + poll_oneoff(poll).await + } + async fn sched_yield(&self) -> Result<(), Error> { + thread::yield_now(); + Ok(()) + } + async fn sleep(&self, duration: Duration) -> Result<(), Error> { + std::thread::sleep(duration); + Ok(()) + } +} pub fn sched_ctx() -> Box { Box::new(SyncSched::new()) } diff --git a/crates/wasi-common/cap-std-sync/src/sched/unix.rs b/crates/wasi-common/cap-std-sync/src/sched/unix.rs index 1c1227bd6a..7b232a4ed9 100644 --- a/crates/wasi-common/cap-std-sync/src/sched/unix.rs +++ b/crates/wasi-common/cap-std-sync/src/sched/unix.rs @@ -5,112 +5,93 @@ use wasi_common::{ file::WasiFile, sched::{ subscription::{RwEventFlags, Subscription}, - Poll, WasiSched, + Poll, }, Error, ErrorExt, }; use poll::{PollFd, PollFlags}; -pub struct SyncSched; - -impl SyncSched { - pub fn new() -> Self { - SyncSched +pub async fn poll_oneoff<'a>(poll: &mut Poll<'a>) -> Result<(), Error> { + if poll.is_empty() { + return Ok(()); } -} + let mut pollfds = Vec::new(); + for s in poll.rw_subscriptions() { + match s { + Subscription::Read(f) => { + let raw_fd = wasi_file_raw_fd(f.file).ok_or( + Error::invalid_argument().context("read subscription fd downcast failed"), + )?; + pollfds.push(unsafe { PollFd::new(raw_fd, PollFlags::POLLIN) }); + } -#[async_trait::async_trait(?Send)] -impl WasiSched for SyncSched { - async fn poll_oneoff<'a>(&self, poll: &mut Poll<'a>) -> Result<(), Error> { - if poll.is_empty() { - return Ok(()); + Subscription::Write(f) => { + let raw_fd = wasi_file_raw_fd(f.file).ok_or( + Error::invalid_argument().context("write subscription fd downcast failed"), + )?; + pollfds.push(unsafe { PollFd::new(raw_fd, PollFlags::POLLOUT) }); + } + Subscription::MonotonicClock { .. } => unreachable!(), } - let mut pollfds = Vec::new(); - for s in poll.rw_subscriptions() { - match s { - Subscription::Read(f) => { - let raw_fd = wasi_file_raw_fd(f.file).ok_or( - Error::invalid_argument().context("read subscription fd downcast failed"), - )?; - pollfds.push(unsafe { PollFd::new(raw_fd, PollFlags::POLLIN) }); - } + } - Subscription::Write(f) => { - let raw_fd = wasi_file_raw_fd(f.file).ok_or( - Error::invalid_argument().context("write subscription fd downcast failed"), - )?; - pollfds.push(unsafe { PollFd::new(raw_fd, PollFlags::POLLOUT) }); - } - Subscription::MonotonicClock { .. } => unreachable!(), - } - } - - let ready = loop { - let poll_timeout = if let Some(t) = poll.earliest_clock_deadline() { - let duration = t.duration_until().unwrap_or(Duration::from_secs(0)); - (duration.as_millis() + 1) // XXX try always rounding up? - .try_into() - .map_err(|_| Error::overflow().context("poll timeout"))? - } else { - libc::c_int::max_value() - }; - tracing::debug!( - poll_timeout = tracing::field::debug(poll_timeout), - poll_fds = tracing::field::debug(&pollfds), - "poll" - ); - match poll::poll(&mut pollfds, poll_timeout) { - Ok(ready) => break ready, - Err(_) => { - let last_err = std::io::Error::last_os_error(); - if last_err.raw_os_error().unwrap() == libc::EINTR { - continue; - } else { - return Err(last_err.into()); - } - } - } - }; - if ready > 0 { - for (rwsub, pollfd) in poll.rw_subscriptions().zip(pollfds.into_iter()) { - if let Some(revents) = pollfd.revents() { - let (nbytes, rwsub) = match rwsub { - Subscription::Read(sub) => { - let ready = sub.file.num_ready_bytes().await?; - (std::cmp::max(ready, 1), sub) - } - Subscription::Write(sub) => (0, sub), - _ => unreachable!(), - }; - if revents.contains(PollFlags::POLLNVAL) { - rwsub.error(Error::badf()); - } else if revents.contains(PollFlags::POLLERR) { - rwsub.error(Error::io()); - } else if revents.contains(PollFlags::POLLHUP) { - rwsub.complete(nbytes, RwEventFlags::HANGUP); - } else { - rwsub.complete(nbytes, RwEventFlags::empty()); - }; - } - } + let ready = loop { + let poll_timeout = if let Some(t) = poll.earliest_clock_deadline() { + let duration = t.duration_until().unwrap_or(Duration::from_secs(0)); + (duration.as_millis() + 1) // XXX try always rounding up? + .try_into() + .map_err(|_| Error::overflow().context("poll timeout"))? } else { - poll.earliest_clock_deadline() - .expect("timed out") - .result() - .expect("timer deadline is past") - .unwrap() + libc::c_int::max_value() + }; + tracing::debug!( + poll_timeout = tracing::field::debug(poll_timeout), + poll_fds = tracing::field::debug(&pollfds), + "poll" + ); + match poll::poll(&mut pollfds, poll_timeout) { + Ok(ready) => break ready, + Err(_) => { + let last_err = std::io::Error::last_os_error(); + if last_err.raw_os_error().unwrap() == libc::EINTR { + continue; + } else { + return Err(last_err.into()); + } + } } - Ok(()) - } - async fn sched_yield(&self) -> Result<(), Error> { - std::thread::yield_now(); - Ok(()) - } - async fn sleep(&self, duration: Duration) -> Result<(), Error> { - std::thread::sleep(duration); - Ok(()) + }; + if ready > 0 { + for (rwsub, pollfd) in poll.rw_subscriptions().zip(pollfds.into_iter()) { + if let Some(revents) = pollfd.revents() { + let (nbytes, rwsub) = match rwsub { + Subscription::Read(sub) => { + let ready = sub.file.num_ready_bytes().await?; + (std::cmp::max(ready, 1), sub) + } + Subscription::Write(sub) => (0, sub), + _ => unreachable!(), + }; + if revents.contains(PollFlags::POLLNVAL) { + rwsub.error(Error::badf()); + } else if revents.contains(PollFlags::POLLERR) { + rwsub.error(Error::io()); + } else if revents.contains(PollFlags::POLLHUP) { + rwsub.complete(nbytes, RwEventFlags::HANGUP); + } else { + rwsub.complete(nbytes, RwEventFlags::empty()); + }; + } + } + } else { + poll.earliest_clock_deadline() + .expect("timed out") + .result() + .expect("timer deadline is past") + .unwrap() } + Ok(()) } fn wasi_file_raw_fd(f: &dyn WasiFile) -> Option { diff --git a/crates/wasi-common/cap-std-sync/src/sched/windows.rs b/crates/wasi-common/cap-std-sync/src/sched/windows.rs index 3d5fdd37ad..135ec7b4a0 100644 --- a/crates/wasi-common/cap-std-sync/src/sched/windows.rs +++ b/crates/wasi-common/cap-std-sync/src/sched/windows.rs @@ -1,3 +1,13 @@ +// The windows scheduler is unmaintained and due for a rewrite. +// +// Rather than use a polling mechanism for file read/write readiness, +// it checks readiness just once, before sleeping for any timer subscriptions. +// Checking stdin readiness uses a worker thread which, once started, lives for the +// lifetime of the process. +// +// We suspect there are bugs in this scheduler, however, we have not +// taken the time to improve it. See bug #2880. + use anyhow::Context; use std::ops::Deref; use std::os::windows::io::{AsRawHandle, RawHandle}; @@ -9,130 +19,121 @@ use wasi_common::{ file::WasiFile, sched::{ subscription::{RwEventFlags, Subscription}, - Poll, WasiSched, + Poll, }, Error, ErrorExt, }; -pub struct SyncSched {} -impl SyncSched { - pub fn new() -> Self { - Self {} - } +pub async fn poll_oneoff<'a>(poll: &mut Poll<'a>) -> Result<(), Error> { + poll_oneoff_(poll, wasi_file_raw_handle).await } -#[async_trait::async_trait(?Send)] -impl WasiSched for SyncSched { - async fn poll_oneoff<'a>(&self, poll: &mut Poll<'a>) -> Result<(), Error> { - if poll.is_empty() { - return Ok(()); - } +// For reuse by wasi-tokio, which has a different WasiFile -> RawHandle translator. +pub async fn poll_oneoff_<'a>( + poll: &mut Poll<'a>, + file_to_handle: impl Fn(&dyn WasiFile) -> Option, +) -> Result<(), Error> { + if poll.is_empty() { + return Ok(()); + } - let mut ready = false; - let waitmode = if let Some(t) = poll.earliest_clock_deadline() { - if let Some(duration) = t.duration_until() { - WaitMode::Timeout(duration) - } else { - WaitMode::Immediate - } + let mut ready = false; + let waitmode = if let Some(t) = poll.earliest_clock_deadline() { + if let Some(duration) = t.duration_until() { + WaitMode::Timeout(duration) } else { - if ready { - WaitMode::Immediate - } else { - WaitMode::Infinite - } - }; - - let mut stdin_read_subs = Vec::new(); - let mut immediate_reads = Vec::new(); - let mut immediate_writes = Vec::new(); - for s in poll.rw_subscriptions() { - match s { - Subscription::Read(r) => { - if r.file.as_any().is::() { - stdin_read_subs.push(r); - } else if wasi_file_raw_handle(r.file.deref()).is_some() { - immediate_reads.push(r); - } else { - return Err(Error::invalid_argument() - .context("read subscription fd downcast failed")); - } - } - Subscription::Write(w) => { - if wasi_file_raw_handle(w.file.deref()).is_some() { - immediate_writes.push(w); - } else { - return Err(Error::invalid_argument() - .context("write subscription fd downcast failed")); - } - } - Subscription::MonotonicClock { .. } => unreachable!(), - } + WaitMode::Immediate } + } else { + if ready { + WaitMode::Immediate + } else { + WaitMode::Infinite + } + }; - if !stdin_read_subs.is_empty() { - let state = STDIN_POLL - .lock() - .map_err(|_| Error::trap("failed to take lock of STDIN_POLL"))? - .poll(waitmode)?; - for readsub in stdin_read_subs.into_iter() { - match state { - PollState::Ready => { - readsub.complete(1, RwEventFlags::empty()); - ready = true; - } - PollState::NotReady | PollState::TimedOut => {} - PollState::Error(ref e) => { - // Unfortunately, we need to deliver the Error to each of the - // subscriptions, but there is no Clone on std::io::Error. So, we convert it to the - // kind, and then back to std::io::Error, and finally to anyhow::Error. - // When its time to turn this into an errno elsewhere, the error kind will - // be inspected. - let ekind = e.kind(); - let ioerror = std::io::Error::from(ekind); - readsub.error(ioerror.into()); - ready = true; - } + let mut stdin_read_subs = Vec::new(); + let mut immediate_reads = Vec::new(); + let mut immediate_writes = Vec::new(); + for s in poll.rw_subscriptions() { + match s { + Subscription::Read(r) => { + if r.file.as_any().is::() { + stdin_read_subs.push(r); + } else if file_to_handle(r.file.deref()).is_some() { + immediate_reads.push(r); + } else { + return Err( + Error::invalid_argument().context("read subscription fd downcast failed") + ); } } + Subscription::Write(w) => { + if wasi_file_raw_handle(w.file.deref()).is_some() { + immediate_writes.push(w); + } else { + return Err( + Error::invalid_argument().context("write subscription fd downcast failed") + ); + } + } + Subscription::MonotonicClock { .. } => unreachable!(), } - for r in immediate_reads { - match r.file.num_ready_bytes().await { - Ok(ready_bytes) => { - r.complete(ready_bytes, RwEventFlags::empty()); + } + + if !stdin_read_subs.is_empty() { + let state = STDIN_POLL + .lock() + .map_err(|_| Error::trap("failed to take lock of STDIN_POLL"))? + .poll(waitmode)?; + for readsub in stdin_read_subs.into_iter() { + match state { + PollState::Ready => { + readsub.complete(1, RwEventFlags::empty()); ready = true; } - Err(e) => { - r.error(e); + PollState::NotReady | PollState::TimedOut => {} + PollState::Error(ref e) => { + // Unfortunately, we need to deliver the Error to each of the + // subscriptions, but there is no Clone on std::io::Error. So, we convert it to the + // kind, and then back to std::io::Error, and finally to anyhow::Error. + // When its time to turn this into an errno elsewhere, the error kind will + // be inspected. + let ekind = e.kind(); + let ioerror = std::io::Error::from(ekind); + readsub.error(ioerror.into()); ready = true; } } } - for w in immediate_writes { - // Everything is always ready for writing, apparently? - w.complete(0, RwEventFlags::empty()); - ready = true; - } - - if !ready { - if let WaitMode::Timeout(duration) = waitmode { - thread::sleep(duration); + } + for r in immediate_reads { + match r.file.num_ready_bytes().await { + Ok(ready_bytes) => { + r.complete(ready_bytes, RwEventFlags::empty()); + ready = true; + } + Err(e) => { + r.error(e); + ready = true; } } + } + for w in immediate_writes { + // Everything is always ready for writing, apparently? + w.complete(0, RwEventFlags::empty()); + ready = true; + } - Ok(()) - } - async fn sched_yield(&self) -> Result<(), Error> { - thread::yield_now(); - Ok(()) - } - async fn sleep(&self, duration: Duration) -> Result<(), Error> { - std::thread::sleep(duration); - Ok(()) + if !ready { + if let WaitMode::Timeout(duration) = waitmode { + thread::sleep(duration); + } } + + Ok(()) } - -fn wasi_file_raw_handle(f: &dyn WasiFile) -> Option { +pub fn wasi_file_raw_handle(f: &dyn WasiFile) -> Option { let a = f.as_any(); if a.is::() { Some( diff --git a/crates/wasi-common/tokio/src/sched/windows.rs b/crates/wasi-common/tokio/src/sched/windows.rs index f484056e8d..d962675f45 100644 --- a/crates/wasi-common/tokio/src/sched/windows.rs +++ b/crates/wasi-common/tokio/src/sched/windows.rs @@ -1,124 +1,14 @@ use crate::block_on_dummy_executor; -use anyhow::Context; -use std::ops::Deref; use std::os::windows::io::{AsRawHandle, RawHandle}; -use std::sync::mpsc::{self, Receiver, RecvTimeoutError, Sender, TryRecvError}; -use std::sync::Mutex; -use std::thread; -use std::time::Duration; -use wasi_common::{ - file::WasiFile, - sched::{ - subscription::{RwEventFlags, Subscription}, - Poll, - }, - Error, ErrorExt, -}; +use wasi_cap_std_sync::sched::windows::poll_oneoff_; +use wasi_common::{file::WasiFile, sched::Poll, Error}; pub async fn poll_oneoff<'a>(poll: &mut Poll<'a>) -> Result<(), Error> { - block_on_dummy_executor(move || poll_oneoff_(poll)) -} - -async fn poll_oneoff_<'a>(poll: &mut Poll<'a>) -> Result<(), Error> { - if poll.is_empty() { - return Ok(()); - } - - let mut ready = false; - let waitmode = if let Some(t) = poll.earliest_clock_deadline() { - if let Some(duration) = t.duration_until() { - WaitMode::Timeout(duration) - } else { - WaitMode::Immediate - } - } else { - if ready { - WaitMode::Immediate - } else { - WaitMode::Infinite - } - }; - - let mut stdin_read_subs = Vec::new(); - let mut immediate_reads = Vec::new(); - let mut immediate_writes = Vec::new(); - for s in poll.rw_subscriptions() { - match s { - Subscription::Read(r) => { - if r.file.as_any().is::() { - stdin_read_subs.push(r); - } else if wasi_file_raw_handle(r.file.deref()).is_some() { - immediate_reads.push(r); - } else { - return Err( - Error::invalid_argument().context("read subscription fd downcast failed") - ); - } - } - Subscription::Write(w) => { - if wasi_file_raw_handle(w.file.deref()).is_some() { - immediate_writes.push(w); - } else { - return Err( - Error::invalid_argument().context("write subscription fd downcast failed") - ); - } - } - Subscription::MonotonicClock { .. } => unreachable!(), - } - } - - if !stdin_read_subs.is_empty() { - let state = STDIN_POLL - .lock() - .map_err(|_| Error::trap("failed to take lock of STDIN_POLL"))? - .poll(waitmode)?; - for readsub in stdin_read_subs.into_iter() { - match state { - PollState::Ready => { - readsub.complete(1, RwEventFlags::empty()); - ready = true; - } - PollState::NotReady | PollState::TimedOut => {} - PollState::Error(ref e) => { - // Unfortunately, we need to deliver the Error to each of the - // subscriptions, but there is no Clone on std::io::Error. So, we convert it to the - // kind, and then back to std::io::Error, and finally to anyhow::Error. - // When its time to turn this into an errno elsewhere, the error kind will - // be inspected. - let ekind = e.kind(); - let ioerror = std::io::Error::from(ekind); - readsub.error(ioerror.into()); - ready = true; - } - } - } - } - for r in immediate_reads { - match r.file.num_ready_bytes().await { - Ok(ready_bytes) => { - r.complete(ready_bytes, RwEventFlags::empty()); - ready = true; - } - Err(e) => { - r.error(e); - ready = true; - } - } - } - for w in immediate_writes { - // Everything is always ready for writing, apparently? - w.complete(0, RwEventFlags::empty()); - ready = true; - } - - if !ready { - if let WaitMode::Timeout(duration) = waitmode { - thread::sleep(duration); - } - } - - Ok(()) + // Tokio doesn't provide us the AsyncFd primitive on Windows, so instead + // we use the blocking poll_oneoff implementation from the wasi-cap-std-crate. + // We provide a function specific to this crate's WasiFile types for downcasting + // to a RawHandle. + block_on_dummy_executor(move || poll_oneoff_(poll, wasi_file_raw_handle)) } fn wasi_file_raw_handle(f: &dyn WasiFile) -> Option { @@ -151,98 +41,3 @@ fn wasi_file_raw_handle(f: &dyn WasiFile) -> Option { None } } - -enum PollState { - Ready, - NotReady, // Not ready, but did not wait - TimedOut, // Not ready, waited until timeout - Error(std::io::Error), -} - -#[derive(Copy, Clone)] -enum WaitMode { - Timeout(Duration), - Infinite, - Immediate, -} - -struct StdinPoll { - request_tx: Sender<()>, - notify_rx: Receiver, -} - -lazy_static::lazy_static! { - static ref STDIN_POLL: Mutex = StdinPoll::new(); -} - -impl StdinPoll { - pub fn new() -> Mutex { - let (request_tx, request_rx) = mpsc::channel(); - let (notify_tx, notify_rx) = mpsc::channel(); - thread::spawn(move || Self::event_loop(request_rx, notify_tx)); - Mutex::new(StdinPoll { - request_tx, - notify_rx, - }) - } - - // This function should not be used directly. - // Correctness of this function crucially depends on the fact that - // mpsc::Receiver is !Sync. - fn poll(&self, wait_mode: WaitMode) -> Result { - match self.notify_rx.try_recv() { - // Clean up possibly unread result from previous poll. - Ok(_) | Err(TryRecvError::Empty) => {} - Err(TryRecvError::Disconnected) => { - return Err(Error::trap("StdinPoll notify_rx channel closed")) - } - } - - // Notify the worker thread to poll stdin - self.request_tx - .send(()) - .context("request_tx channel closed")?; - - // Wait for the worker thread to send a readiness notification - match wait_mode { - WaitMode::Timeout(timeout) => match self.notify_rx.recv_timeout(timeout) { - Ok(r) => Ok(r), - Err(RecvTimeoutError::Timeout) => Ok(PollState::TimedOut), - Err(RecvTimeoutError::Disconnected) => { - Err(Error::trap("StdinPoll notify_rx channel closed")) - } - }, - WaitMode::Infinite => self - .notify_rx - .recv() - .context("StdinPoll notify_rx channel closed"), - WaitMode::Immediate => match self.notify_rx.try_recv() { - Ok(r) => Ok(r), - Err(TryRecvError::Empty) => Ok(PollState::NotReady), - Err(TryRecvError::Disconnected) => { - Err(Error::trap("StdinPoll notify_rx channel closed")) - } - }, - } - } - - fn event_loop(request_rx: Receiver<()>, notify_tx: Sender) -> ! { - use std::io::BufRead; - loop { - // Wait on a request: - request_rx.recv().expect("request_rx channel"); - // Wait for data to appear in stdin. If fill_buf returns any slice, it means - // that either: - // (a) there is some data in stdin, if non-empty, - // (b) EOF was recieved, if its empty - // Linux returns `POLLIN` in both cases, so we imitate this behavior. - let resp = match std::io::stdin().lock().fill_buf() { - Ok(_) => PollState::Ready, - Err(e) => PollState::Error(e), - }; - // Notify about data in stdin. If the read on this channel has timed out, the - // next poller will have to clean the channel. - notify_tx.send(resp).expect("notify_tx channel"); - } - } -}