diff --git a/crates/test-programs/build.rs b/crates/test-programs/build.rs index 0f223b1c71..c7c9460623 100644 --- a/crates/test-programs/build.rs +++ b/crates/test-programs/build.rs @@ -176,7 +176,6 @@ mod wasi_tests { "dangling_symlink" => true, "symlink_loop" => true, "truncation_rights" => true, - "poll_oneoff" => true, "path_link" => true, "dangling_fd" => true, _ => false, diff --git a/crates/test-programs/wasi-tests/src/bin/poll_oneoff.rs b/crates/test-programs/wasi-tests/src/bin/poll_oneoff.rs index e87e707268..d049c1b9fe 100644 --- a/crates/test-programs/wasi-tests/src/bin/poll_oneoff.rs +++ b/crates/test-programs/wasi-tests/src/bin/poll_oneoff.rs @@ -45,10 +45,6 @@ unsafe fn test_timeout() { }]; let out = poll_oneoff_impl(&r#in, 1); let event = &out[0]; - assert_eq!( - event.userdata, CLOCK_ID, - "the event.userdata should contain clock_id specified by the user" - ); assert_eq!( event.error, wasi::ERRNO_SUCCESS, @@ -59,6 +55,10 @@ unsafe fn test_timeout() { wasi::EVENTTYPE_CLOCK, "the event.type should equal clock" ); + assert_eq!( + event.userdata, CLOCK_ID, + "the event.userdata should contain clock_id specified by the user" + ); } unsafe fn test_stdin_read() { @@ -77,6 +77,7 @@ unsafe fn test_stdin_read() { r#type: wasi::EVENTTYPE_CLOCK, u: wasi::SubscriptionU { clock }, }, + // Make sure that timeout is returned only once even if there are multiple read events wasi::Subscription { userdata: 1, r#type: wasi::EVENTTYPE_FD_READ, @@ -85,10 +86,6 @@ unsafe fn test_stdin_read() { ]; let out = poll_oneoff_impl(&r#in, 1); let event = &out[0]; - assert_eq!( - event.userdata, CLOCK_ID, - "the event.userdata should contain clock_id specified by the user" - ); assert_eq!( event.error, wasi::ERRNO_SUCCESS, @@ -99,6 +96,10 @@ unsafe fn test_stdin_read() { wasi::EVENTTYPE_CLOCK, "the event.type should equal clock" ); + assert_eq!( + event.userdata, CLOCK_ID, + "the event.userdata should contain clock_id specified by the user" + ); } unsafe fn test_stdout_stderr_write() { diff --git a/crates/wasi-common/src/error.rs b/crates/wasi-common/src/error.rs index 303b3ed547..4ce56bbc57 100644 --- a/crates/wasi-common/src/error.rs +++ b/crates/wasi-common/src/error.rs @@ -245,3 +245,18 @@ impl Error { pub(crate) trait FromRawOsError { fn from_raw_os_error(code: i32) -> Self; } + +pub(crate) type Result = std::result::Result; + +pub(crate) trait AsWasiError { + fn as_wasi_error(&self) -> WasiError; +} + +impl AsWasiError for Result { + fn as_wasi_error(&self) -> WasiError { + self.as_ref() + .err() + .unwrap_or(&Error::ESUCCESS) + .as_wasi_error() + } +} diff --git a/crates/wasi-common/src/lib.rs b/crates/wasi-common/src/lib.rs index c5e2343509..9e197fa74d 100644 --- a/crates/wasi-common/src/lib.rs +++ b/crates/wasi-common/src/lib.rs @@ -42,5 +42,5 @@ pub mod hostcalls { pub use ctx::{WasiCtx, WasiCtxBuilder}; pub use sys::preopen_dir; -pub type Error = error::Error; -pub(crate) type Result = std::result::Result; +pub use error::Error; +pub(crate) use error::Result; diff --git a/crates/wasi-common/src/sys/windows/hostcalls_impl/misc.rs b/crates/wasi-common/src/sys/windows/hostcalls_impl/misc.rs index d781409093..1e2db73404 100644 --- a/crates/wasi-common/src/sys/windows/hostcalls_impl/misc.rs +++ b/crates/wasi-common/src/sys/windows/hostcalls_impl/misc.rs @@ -1,18 +1,109 @@ #![allow(non_camel_case_types)] #![allow(unused_unsafe)] #![allow(unused)] +use crate::fdentry::Descriptor; use crate::hostcalls_impl::{ClockEventData, FdEventData}; use crate::memory::*; use crate::sys::host_impl; -use crate::{wasi, wasi32, Error, Result}; +use crate::{error::WasiError, wasi, wasi32, Error, Result}; use cpu_time::{ProcessTime, ThreadTime}; use lazy_static::lazy_static; +use log::{debug, error, trace, warn}; use std::convert::TryInto; +use std::io; +use std::os::windows::io::AsRawHandle; +use std::sync::mpsc::{self, Receiver, RecvTimeoutError, Sender, TryRecvError}; +use std::sync::Mutex; +use std::thread; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; +struct StdinPoll { + request_tx: Sender<()>, + notify_rx: Receiver, +} + +enum PollState { + Ready, + NotReady, // it's not ready, but we didn't wait + TimedOut, // it's not ready and a timeout has occurred + Error(WasiError), // not using the top-lever Error because it's not Clone +} + +enum WaitMode { + Timeout(Duration), + Infinite, + Immediate, +} + +impl StdinPoll { + // This function should not be used directly + // Correctness of this function crucially depends on the fact that + // mpsc::Receiver is !Sync. + fn poll(&self, wait_mode: WaitMode) -> PollState { + // Clean up possible unread result from the previous poll + match self.notify_rx.try_recv() { + Ok(_) | Err(TryRecvError::Empty) => {} + Err(TryRecvError::Disconnected) => panic!("notify_rx channel closed"), + } + + // Notify the worker thread that we want to poll stdin + self.request_tx.send(()).expect("request_tx channel closed"); + + // Wait for the worker thread to send a readiness notification + let pollret = match wait_mode { + WaitMode::Timeout(timeout) => { + self.notify_rx + .recv_timeout(timeout) + .unwrap_or_else(|e| match e { + RecvTimeoutError::Disconnected => panic!("notify_rx channel closed"), + RecvTimeoutError::Timeout => PollState::TimedOut, + }) + } + WaitMode::Infinite => self.notify_rx.recv().expect("notify_rx channel closed"), + WaitMode::Immediate => self.notify_rx.try_recv().unwrap_or_else(|e| match e { + TryRecvError::Disconnected => panic!("notify_rx channel closed"), + TryRecvError::Empty => PollState::NotReady, + }), + }; + + pollret + } + + fn event_loop(request_rx: Receiver<()>, notify_tx: Sender) -> ! { + use std::io::BufRead; + loop { + // Wait for the request to poll stdin + request_rx.recv().expect("request_rx channel closed"); + + // Wait for data to appear in stdin. + // If `fill_buf` returns any slice, then it means that either + // (a) there some data in stdin, if it's non-empty + // (b) EOF was received, if it's empty + // Linux returns `POLLIN` in both cases, and we imitate this behavior. + let resp = match std::io::stdin().lock().fill_buf() { + Ok(_) => PollState::Ready, + Err(e) => PollState::Error(Error::from(e).as_wasi_error()), + }; + + // Notify the requestor about data in stdin. They may have already timed out, + // then the next requestor will have to clean the channel. + notify_tx.send(resp).expect("notify_tx channel closed"); + } + } +} + lazy_static! { static ref START_MONOTONIC: Instant = Instant::now(); static ref PERF_COUNTER_RES: u64 = get_perf_counter_resolution_ns(); + static ref STDIN_POLL: Mutex = { + let (request_tx, request_rx) = mpsc::channel(); + let (notify_tx, notify_rx) = mpsc::channel(); + thread::spawn(move || StdinPoll::event_loop(request_rx, notify_tx)); + Mutex::new(StdinPoll { + request_tx, + notify_rx, + }) + }; } // Timer resolution on Windows is really hard. We may consider exposing the resolution of the respective @@ -76,12 +167,208 @@ pub(crate) fn clock_time_get(clock_id: wasi::__wasi_clockid_t) -> Result) -> wasi::__wasi_event_t { + use crate::error::AsWasiError; + let error = nbytes.as_wasi_error(); + let nbytes = nbytes.unwrap_or_default(); + wasi::__wasi_event_t { + userdata: event.userdata, + r#type: event.r#type, + error: error.as_raw_errno(), + u: wasi::__wasi_event_u_t { + fd_readwrite: wasi::__wasi_event_fd_readwrite_t { nbytes, flags: 0 }, + }, + } +} + +fn make_timeout_event(timeout: &ClockEventData) -> wasi::__wasi_event_t { + wasi::__wasi_event_t { + userdata: timeout.userdata, + r#type: wasi::__WASI_EVENTTYPE_CLOCK, + error: wasi::__WASI_ERRNO_SUCCESS, + u: wasi::__wasi_event_u_t { + fd_readwrite: wasi::__wasi_event_fd_readwrite_t { + nbytes: 0, + flags: 0, + }, + }, + } +} + +fn handle_timeout( + timeout_event: ClockEventData, + timeout: Duration, + events: &mut Vec, +) { + thread::sleep(timeout); + handle_timeout_event(timeout_event, events); +} + +fn handle_timeout_event(timeout_event: ClockEventData, events: &mut Vec) { + let new_event = make_timeout_event(&timeout_event); + events.push(new_event); +} + +fn handle_rw_event(event: FdEventData, out_events: &mut Vec) { + let size = match event.descriptor { + Descriptor::OsHandle(os_handle) => { + if event.r#type == wasi::__WASI_EVENTTYPE_FD_READ { + os_handle.metadata().map(|m| m.len()).map_err(Into::into) + } else { + // The spec is unclear what nbytes should actually be for __WASI_EVENTTYPE_FD_WRITE and + // the implementation on Unix just returns 0 here, so it's probably fine + // to do the same on Windows for now. + // cf. https://github.com/WebAssembly/WASI/issues/148 + Ok(0) + } + } + // We return the only universally correct lower bound, see the comment later in the function. + Descriptor::Stdin => Ok(1), + // On Unix, ioctl(FIONREAD) will return 0 for stdout/stderr. Emulate the same behavior on Windows. + Descriptor::Stdout | Descriptor::Stderr => Ok(0), + }; + + let new_event = make_rw_event(&event, size); + out_events.push(new_event); +} + +fn handle_error_event( + event: FdEventData, + error: Error, + out_events: &mut Vec, +) { + let new_event = make_rw_event(&event, Err(error)); + out_events.push(new_event); +} + pub(crate) fn poll_oneoff( timeout: Option, fd_events: Vec, events: &mut Vec, -) -> Result> { - unimplemented!("poll_oneoff") +) -> Result<()> { + use std::fs::Metadata; + use std::thread; + + let timeout = timeout + .map(|event| { + event + .delay + .try_into() + .map(Duration::from_nanos) + .map(|dur| (event, dur)) + }) + .transpose()?; + + // With no events to listen, poll_oneoff just becomes a sleep. + if fd_events.is_empty() { + match timeout { + Some((event, dur)) => return Ok(handle_timeout(event, dur, events)), + // The implementation has to return Ok(()) in this case, + // cf. the comment in src/hostcalls_impl/misc.rs + None => return Ok(()), + } + } + + let mut stdin_events = vec![]; + let mut immediate_events = vec![]; + let mut pipe_events = vec![]; + + for event in fd_events { + match event.descriptor { + Descriptor::Stdin if event.r#type == wasi::__WASI_EVENTTYPE_FD_READ => { + stdin_events.push(event) + } + // stdout/stderr are always considered ready to write because there seems to + // be no way of checking if a write to stdout would block. + // + // If stdin is polled for anything else then reading, then it is also + // considered immediately ready, following the behavior on Linux. + Descriptor::Stdin | Descriptor::Stderr | Descriptor::Stdout => { + immediate_events.push(event) + } + Descriptor::OsHandle(os_handle) => { + let ftype = unsafe { winx::file::get_file_type(os_handle.as_raw_handle()) }?; + if ftype.is_unknown() || ftype.is_char() { + debug!("poll_oneoff: unsupported file type: {:?}", ftype); + handle_error_event(event, Error::ENOTSUP, events); + } else if ftype.is_disk() { + immediate_events.push(event); + } else if ftype.is_pipe() { + pipe_events.push(event); + } else { + unreachable!(); + } + } + } + } + + let immediate = !immediate_events.is_empty(); + // Process all the events that do not require waiting. + if immediate { + trace!(" | have immediate events, will return immediately"); + for mut event in immediate_events { + handle_rw_event(event, events); + } + } + if !stdin_events.is_empty() { + // During the firt request to poll stdin, we spin up a separate thread to + // waiting for data to arrive on stdin. This thread will not terminate. + // + // We'd like to do the following: + // (1) wait in a non-blocking way for data to be available in stdin, with timeout + // (2) find out, how many bytes are there available to be read. + // + // One issue is that we are currently relying on the Rust libstd for interaction + // with stdin. More precisely, `io::stdin` is used via the `BufRead` trait, + // in the `fd_read` function, which always does buffering on the libstd side. [1] + // This means that even if there's still some unread data in stdin, + // the lower-level Windows system calls may return false negatives, + // claiming that stdin is empty. + // + // Theoretically, one could use `WaitForSingleObject` on the stdin handle + // to achieve (1). Unfortunately, this function doesn't seem to honor the + // requested timeout and to misbehaves after the stdin is closed. + // + // There appears to be no way of achieving (2) on Windows. + // [1]: https://github.com/rust-lang/rust/pull/12422 + let waitmode = if immediate { + trace!(" | tentatively checking stdin"); + WaitMode::Immediate + } else { + trace!(" | passively waiting on stdin"); + match timeout { + Some((event, dur)) => WaitMode::Timeout(dur), + None => WaitMode::Infinite, + } + }; + let state = STDIN_POLL.lock().unwrap().poll(waitmode); + for event in stdin_events { + match state { + PollState::Ready => handle_rw_event(event, events), + PollState::NotReady => {} // not immediately available, so just ignore + PollState::TimedOut => handle_timeout_event(timeout.unwrap().0, events), + PollState::Error(e) => handle_error_event(event, Error::Wasi(e), events), + } + } + } + + if !immediate && !pipe_events.is_empty() { + trace!(" | actively polling pipes"); + match timeout { + Some((event, dur)) => { + // In the tests stdin is replaced with a dummy pipe, so for now + // we just time out. Support for pipes will be decided later on. + warn!("Polling pipes not supported on Windows, will just time out."); + handle_timeout(event, dur, events); + } + None => { + error!("Polling only pipes with no timeout not supported on Windows."); + return Err(Error::ENOTSUP); + } + } + } + + Ok(()) } fn get_monotonic_time() -> Duration {