diff --git a/Cargo.lock b/Cargo.lock index 71ae1c7039..f829cc5bff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -439,29 +439,6 @@ dependencies = [ "wasmparser 0.47.0", ] -[[package]] -name = "crossbeam" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69323bff1fb41c635347b8ead484a5ca6c3f11914d784170b158d8449ab07f8e" -dependencies = [ - "cfg-if", - "crossbeam-channel", - "crossbeam-deque", - "crossbeam-epoch", - "crossbeam-queue", - "crossbeam-utils 0.7.0", -] - -[[package]] -name = "crossbeam-channel" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "acec9a3b0b3559f15aee4f90746c4e5e293b701c0f7d3925d24e01645267b68c" -dependencies = [ - "crossbeam-utils 0.7.0", -] - [[package]] name = "crossbeam-deque" version = "0.7.2" @@ -2008,7 +1985,6 @@ dependencies = [ "anyhow", "cfg-if", "cpu-time", - "crossbeam", "filetime", "getrandom", "lazy_static", diff --git a/crates/wasi-common/Cargo.toml b/crates/wasi-common/Cargo.toml index 3905ae0860..6c17f64a52 100644 --- a/crates/wasi-common/Cargo.toml +++ b/crates/wasi-common/Cargo.toml @@ -21,7 +21,6 @@ log = "0.4" filetime = "0.2.7" lazy_static = "1.4.0" num = { version = "0.2.0", default-features = false } -crossbeam = "0.7.3" wig = { path = "wig", version = "0.9.2" } [target.'cfg(unix)'.dependencies] diff --git a/crates/wasi-common/src/sys/windows/hostcalls_impl/misc.rs b/crates/wasi-common/src/sys/windows/hostcalls_impl/misc.rs index cdb3e6a35a..5e960d5137 100644 --- a/crates/wasi-common/src/sys/windows/hostcalls_impl/misc.rs +++ b/crates/wasi-common/src/sys/windows/hostcalls_impl/misc.rs @@ -7,56 +7,79 @@ use crate::memory::*; use crate::sys::host_impl; use crate::{wasi, wasi32, Error, Result}; use cpu_time::{ProcessTime, ThreadTime}; -use crossbeam::channel::{self, Receiver, Sender}; use lazy_static::lazy_static; -use log::{error, trace, warn}; +use log::{debug, error, trace, warn}; use std::convert::TryInto; use std::io; use std::os::windows::io::AsRawHandle; +use std::sync::mpsc::{self, Receiver, RecvTimeoutError, Sender, TryRecvError}; +use std::sync::Mutex; use std::thread; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; -type StdinPayload = io::Result; struct StdinPoll { request_tx: Sender<()>, - notify_rx: Receiver, + notify_rx: Receiver, } enum PollState { Ready, Closed, - TimedOut, + NotReady, // it's not ready, but we didn't wait + TimedOut, // it's not ready and a timeout has occurred Error(Error), } +enum WaitMode { + Timeout(Duration), + Infinite, + Immediate, +} + impl StdinPoll { - fn poll(&self, timeout: Option) -> PollState { - use crossbeam::channel::{RecvTimeoutError, TryRecvError}; - // Clean up possible unread result from previous poll + // This function should not be used directly + // Correctness of this function crucially depends on the fact that + // mpsc::Receiver is !Sync. + fn poll(&self, wait_mode: WaitMode) -> PollState { + // Clean up possible unread result from the previous poll match self.notify_rx.try_recv() { Ok(_) | Err(TryRecvError::Empty) => {} - Err(TryRecvError::Disconnected) => panic!("FIXME"), + Err(TryRecvError::Disconnected) => panic!("notify_rx channel closed"), } - self.request_tx.send(()).expect("FIXME"); - let pollret = match timeout { - Some(timeout) => self.notify_rx.recv_timeout(timeout), - None => Ok(self.notify_rx.recv().expect("FIXME")), + + // Notify the worker thread that we want to poll stdin + self.request_tx.send(()).expect("request_tx channel closed"); + + // Wait for the worker thread to send a readiness notification + let pollret = match wait_mode { + WaitMode::Timeout(timeout) => { + self.notify_rx + .recv_timeout(timeout) + .unwrap_or_else(|e| match e { + RecvTimeoutError::Disconnected => panic!("notify_rx channel closed"), + RecvTimeoutError::Timeout => PollState::TimedOut, + }) + } + WaitMode::Infinite => self.notify_rx.recv().expect("notify_rx channel closed"), + WaitMode::Immediate => self.notify_rx.try_recv().unwrap_or_else(|e| match e { + TryRecvError::Disconnected => panic!("notify_rx channel closed"), + TryRecvError::Empty => PollState::NotReady, + }), }; - match pollret { - Ok(Ok(true)) => PollState::Ready, - Ok(Ok(false)) => PollState::Closed, - Ok(Err(e)) => PollState::Error(e.into()), - Err(RecvTimeoutError::Timeout) => PollState::TimedOut, - Err(RecvTimeoutError::Disconnected) => panic!("FIXME"), - } + + pollret } - fn event_loop(request_rx: Receiver<()>, notify_tx: Sender) -> ! { + fn event_loop(request_rx: Receiver<()>, notify_tx: Sender) -> ! { use std::io::BufRead; loop { - request_rx.recv().expect("FIXME"); - let buf = std::io::stdin().lock().fill_buf().map(|s| !s.is_empty()); - notify_tx.send(buf).expect("FIXME"); + request_rx.recv().expect("request_rx channel closed"); + let resp = match std::io::stdin().lock().fill_buf().map(|s| !s.is_empty()) { + Ok(true) => PollState::Ready, + Ok(false) => PollState::Closed, + Err(e) => PollState::Error(e.into()), + }; + notify_tx.send(resp).expect("notify_tx channel closed"); } } } @@ -64,15 +87,14 @@ impl StdinPoll { lazy_static! { static ref START_MONOTONIC: Instant = Instant::now(); static ref PERF_COUNTER_RES: u64 = get_perf_counter_resolution_ns(); - static ref STDIN_POLL: StdinPoll = { - let channel_size = 1; - let (request_tx, request_rx) = channel::bounded(channel_size); - let (notify_tx, notify_rx) = channel::bounded(channel_size); + static ref STDIN_POLL: Mutex = { + let (request_tx, request_rx) = mpsc::channel(); + let (notify_tx, notify_rx) = mpsc::channel(); thread::spawn(move || StdinPoll::event_loop(request_rx, notify_tx)); - StdinPoll { + Mutex::new(StdinPoll { request_tx, notify_rx, - } + }) }; } @@ -281,13 +303,15 @@ pub(crate) fn poll_oneoff( } } + let immediate = !immediate_events.is_empty(); // Process all the events that do not require waiting. - if !immediate_events.is_empty() { + if immediate { trace!(" | have immediate events, will return immediately"); for mut event in immediate_events { handle_rw_event(event, events); } - } else if !stdin_events.is_empty() { + } + if !stdin_events.is_empty() { // During the firt request to poll stdin, we spin up a separate thread to // waiting for data to arrive on stdin. This thread will not terminate. // @@ -310,12 +334,21 @@ pub(crate) fn poll_oneoff( // // There appears to be no way of achieving (2) on Windows. // [1]: https://github.com/rust-lang/rust/pull/12422 - trace!(" | passively waiting on stdin"); - let dur = timeout.map(|t| t.1); - let state = STDIN_POLL.poll(dur); + let dur = if immediate { + trace!(" | tentatively checking stdin"); + WaitMode::Immediate + } else { + trace!(" | passively waiting on stdin"); + match timeout { + Some((event, dur)) => WaitMode::Timeout(dur), + None => WaitMode::Infinite, + } + }; + let state = STDIN_POLL.lock().unwrap().poll(dur); for event in stdin_events { match state { PollState::Ready => handle_rw_event(event, events), + PollState::NotReady => {} PollState::Closed => { /* error? FIXME */ } PollState::TimedOut => handle_timeout_event(timeout.unwrap().0, events), PollState::Error(ref e) => { @@ -326,14 +359,14 @@ pub(crate) fn poll_oneoff( } } - if !pipe_events.is_empty() { + if !immediate && !pipe_events.is_empty() { trace!(" | actively polling pipes"); match timeout { Some((event, dur)) => { // In the tests stdin is replaced with a dummy pipe, so for now // we just time out. Support for pipes will be decided later on. warn!("Polling pipes not supported on Windows, will just time out."); - return Ok(handle_timeout(event, dur, events)); + handle_timeout(event, dur, events); } None => { error!("Polling only pipes with no timeout not supported on Windows.");