Move to mpsc, drop crossbeam. Simplify

This commit is contained in:
Marcin Mielniczuk
2020-01-16 18:34:08 +01:00
parent 3c132d6909
commit 716acf77d1
3 changed files with 70 additions and 62 deletions

24
Cargo.lock generated
View File

@@ -439,29 +439,6 @@ dependencies = [
"wasmparser 0.47.0", "wasmparser 0.47.0",
] ]
[[package]]
name = "crossbeam"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69323bff1fb41c635347b8ead484a5ca6c3f11914d784170b158d8449ab07f8e"
dependencies = [
"cfg-if",
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-epoch",
"crossbeam-queue",
"crossbeam-utils 0.7.0",
]
[[package]]
name = "crossbeam-channel"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acec9a3b0b3559f15aee4f90746c4e5e293b701c0f7d3925d24e01645267b68c"
dependencies = [
"crossbeam-utils 0.7.0",
]
[[package]] [[package]]
name = "crossbeam-deque" name = "crossbeam-deque"
version = "0.7.2" version = "0.7.2"
@@ -2008,7 +1985,6 @@ dependencies = [
"anyhow", "anyhow",
"cfg-if", "cfg-if",
"cpu-time", "cpu-time",
"crossbeam",
"filetime", "filetime",
"getrandom", "getrandom",
"lazy_static", "lazy_static",

View File

@@ -21,7 +21,6 @@ log = "0.4"
filetime = "0.2.7" filetime = "0.2.7"
lazy_static = "1.4.0" lazy_static = "1.4.0"
num = { version = "0.2.0", default-features = false } num = { version = "0.2.0", default-features = false }
crossbeam = "0.7.3"
wig = { path = "wig", version = "0.9.2" } wig = { path = "wig", version = "0.9.2" }
[target.'cfg(unix)'.dependencies] [target.'cfg(unix)'.dependencies]

View File

@@ -7,56 +7,79 @@ use crate::memory::*;
use crate::sys::host_impl; use crate::sys::host_impl;
use crate::{wasi, wasi32, Error, Result}; use crate::{wasi, wasi32, Error, Result};
use cpu_time::{ProcessTime, ThreadTime}; use cpu_time::{ProcessTime, ThreadTime};
use crossbeam::channel::{self, Receiver, Sender};
use lazy_static::lazy_static; use lazy_static::lazy_static;
use log::{error, trace, warn}; use log::{debug, error, trace, warn};
use std::convert::TryInto; use std::convert::TryInto;
use std::io; use std::io;
use std::os::windows::io::AsRawHandle; use std::os::windows::io::AsRawHandle;
use std::sync::mpsc::{self, Receiver, RecvTimeoutError, Sender, TryRecvError};
use std::sync::Mutex;
use std::thread; use std::thread;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
type StdinPayload = io::Result<bool>;
struct StdinPoll { struct StdinPoll {
request_tx: Sender<()>, request_tx: Sender<()>,
notify_rx: Receiver<StdinPayload>, notify_rx: Receiver<PollState>,
} }
enum PollState { enum PollState {
Ready, Ready,
Closed, Closed,
TimedOut, NotReady, // it's not ready, but we didn't wait
TimedOut, // it's not ready and a timeout has occurred
Error(Error), Error(Error),
} }
impl StdinPoll { enum WaitMode {
fn poll(&self, timeout: Option<Duration>) -> PollState { Timeout(Duration),
use crossbeam::channel::{RecvTimeoutError, TryRecvError}; Infinite,
// Clean up possible unread result from previous poll Immediate,
match self.notify_rx.try_recv() {
Ok(_) | Err(TryRecvError::Empty) => {}
Err(TryRecvError::Disconnected) => panic!("FIXME"),
}
self.request_tx.send(()).expect("FIXME");
let pollret = match timeout {
Some(timeout) => self.notify_rx.recv_timeout(timeout),
None => Ok(self.notify_rx.recv().expect("FIXME")),
};
match pollret {
Ok(Ok(true)) => PollState::Ready,
Ok(Ok(false)) => PollState::Closed,
Ok(Err(e)) => PollState::Error(e.into()),
Err(RecvTimeoutError::Timeout) => PollState::TimedOut,
Err(RecvTimeoutError::Disconnected) => panic!("FIXME"),
}
} }
fn event_loop(request_rx: Receiver<()>, notify_tx: Sender<StdinPayload>) -> ! { impl StdinPoll {
// This function should not be used directly
// Correctness of this function crucially depends on the fact that
// mpsc::Receiver is !Sync.
fn poll(&self, wait_mode: WaitMode) -> PollState {
// Clean up possible unread result from the previous poll
match self.notify_rx.try_recv() {
Ok(_) | Err(TryRecvError::Empty) => {}
Err(TryRecvError::Disconnected) => panic!("notify_rx channel closed"),
}
// Notify the worker thread that we want to poll stdin
self.request_tx.send(()).expect("request_tx channel closed");
// Wait for the worker thread to send a readiness notification
let pollret = match wait_mode {
WaitMode::Timeout(timeout) => {
self.notify_rx
.recv_timeout(timeout)
.unwrap_or_else(|e| match e {
RecvTimeoutError::Disconnected => panic!("notify_rx channel closed"),
RecvTimeoutError::Timeout => PollState::TimedOut,
})
}
WaitMode::Infinite => self.notify_rx.recv().expect("notify_rx channel closed"),
WaitMode::Immediate => self.notify_rx.try_recv().unwrap_or_else(|e| match e {
TryRecvError::Disconnected => panic!("notify_rx channel closed"),
TryRecvError::Empty => PollState::NotReady,
}),
};
pollret
}
fn event_loop(request_rx: Receiver<()>, notify_tx: Sender<PollState>) -> ! {
use std::io::BufRead; use std::io::BufRead;
loop { loop {
request_rx.recv().expect("FIXME"); request_rx.recv().expect("request_rx channel closed");
let buf = std::io::stdin().lock().fill_buf().map(|s| !s.is_empty()); let resp = match std::io::stdin().lock().fill_buf().map(|s| !s.is_empty()) {
notify_tx.send(buf).expect("FIXME"); Ok(true) => PollState::Ready,
Ok(false) => PollState::Closed,
Err(e) => PollState::Error(e.into()),
};
notify_tx.send(resp).expect("notify_tx channel closed");
} }
} }
} }
@@ -64,15 +87,14 @@ impl StdinPoll {
lazy_static! { lazy_static! {
static ref START_MONOTONIC: Instant = Instant::now(); static ref START_MONOTONIC: Instant = Instant::now();
static ref PERF_COUNTER_RES: u64 = get_perf_counter_resolution_ns(); static ref PERF_COUNTER_RES: u64 = get_perf_counter_resolution_ns();
static ref STDIN_POLL: StdinPoll = { static ref STDIN_POLL: Mutex<StdinPoll> = {
let channel_size = 1; let (request_tx, request_rx) = mpsc::channel();
let (request_tx, request_rx) = channel::bounded(channel_size); let (notify_tx, notify_rx) = mpsc::channel();
let (notify_tx, notify_rx) = channel::bounded(channel_size);
thread::spawn(move || StdinPoll::event_loop(request_rx, notify_tx)); thread::spawn(move || StdinPoll::event_loop(request_rx, notify_tx));
StdinPoll { Mutex::new(StdinPoll {
request_tx, request_tx,
notify_rx, notify_rx,
} })
}; };
} }
@@ -281,13 +303,15 @@ pub(crate) fn poll_oneoff(
} }
} }
let immediate = !immediate_events.is_empty();
// Process all the events that do not require waiting. // Process all the events that do not require waiting.
if !immediate_events.is_empty() { if immediate {
trace!(" | have immediate events, will return immediately"); trace!(" | have immediate events, will return immediately");
for mut event in immediate_events { for mut event in immediate_events {
handle_rw_event(event, events); handle_rw_event(event, events);
} }
} else if !stdin_events.is_empty() { }
if !stdin_events.is_empty() {
// During the firt request to poll stdin, we spin up a separate thread to // During the firt request to poll stdin, we spin up a separate thread to
// waiting for data to arrive on stdin. This thread will not terminate. // waiting for data to arrive on stdin. This thread will not terminate.
// //
@@ -310,12 +334,21 @@ pub(crate) fn poll_oneoff(
// //
// There appears to be no way of achieving (2) on Windows. // There appears to be no way of achieving (2) on Windows.
// [1]: https://github.com/rust-lang/rust/pull/12422 // [1]: https://github.com/rust-lang/rust/pull/12422
let dur = if immediate {
trace!(" | tentatively checking stdin");
WaitMode::Immediate
} else {
trace!(" | passively waiting on stdin"); trace!(" | passively waiting on stdin");
let dur = timeout.map(|t| t.1); match timeout {
let state = STDIN_POLL.poll(dur); Some((event, dur)) => WaitMode::Timeout(dur),
None => WaitMode::Infinite,
}
};
let state = STDIN_POLL.lock().unwrap().poll(dur);
for event in stdin_events { for event in stdin_events {
match state { match state {
PollState::Ready => handle_rw_event(event, events), PollState::Ready => handle_rw_event(event, events),
PollState::NotReady => {}
PollState::Closed => { /* error? FIXME */ } PollState::Closed => { /* error? FIXME */ }
PollState::TimedOut => handle_timeout_event(timeout.unwrap().0, events), PollState::TimedOut => handle_timeout_event(timeout.unwrap().0, events),
PollState::Error(ref e) => { PollState::Error(ref e) => {
@@ -326,14 +359,14 @@ pub(crate) fn poll_oneoff(
} }
} }
if !pipe_events.is_empty() { if !immediate && !pipe_events.is_empty() {
trace!(" | actively polling pipes"); trace!(" | actively polling pipes");
match timeout { match timeout {
Some((event, dur)) => { Some((event, dur)) => {
// In the tests stdin is replaced with a dummy pipe, so for now // In the tests stdin is replaced with a dummy pipe, so for now
// we just time out. Support for pipes will be decided later on. // we just time out. Support for pipes will be decided later on.
warn!("Polling pipes not supported on Windows, will just time out."); warn!("Polling pipes not supported on Windows, will just time out.");
return Ok(handle_timeout(event, dur, events)); handle_timeout(event, dur, events);
} }
None => { None => {
error!("Polling only pipes with no timeout not supported on Windows."); error!("Polling only pipes with no timeout not supported on Windows.");