Merge pull request #552 from marmistrz/poll
Minimal viable implementation of poll_oneoff for Windows
This commit is contained in:
@@ -176,7 +176,6 @@ mod wasi_tests {
|
|||||||
"dangling_symlink" => true,
|
"dangling_symlink" => true,
|
||||||
"symlink_loop" => true,
|
"symlink_loop" => true,
|
||||||
"truncation_rights" => true,
|
"truncation_rights" => true,
|
||||||
"poll_oneoff" => true,
|
|
||||||
"path_link" => true,
|
"path_link" => true,
|
||||||
"dangling_fd" => true,
|
"dangling_fd" => true,
|
||||||
_ => false,
|
_ => false,
|
||||||
|
|||||||
@@ -45,10 +45,6 @@ unsafe fn test_timeout() {
|
|||||||
}];
|
}];
|
||||||
let out = poll_oneoff_impl(&r#in, 1);
|
let out = poll_oneoff_impl(&r#in, 1);
|
||||||
let event = &out[0];
|
let event = &out[0];
|
||||||
assert_eq!(
|
|
||||||
event.userdata, CLOCK_ID,
|
|
||||||
"the event.userdata should contain clock_id specified by the user"
|
|
||||||
);
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
event.error,
|
event.error,
|
||||||
wasi::ERRNO_SUCCESS,
|
wasi::ERRNO_SUCCESS,
|
||||||
@@ -59,6 +55,10 @@ unsafe fn test_timeout() {
|
|||||||
wasi::EVENTTYPE_CLOCK,
|
wasi::EVENTTYPE_CLOCK,
|
||||||
"the event.type should equal clock"
|
"the event.type should equal clock"
|
||||||
);
|
);
|
||||||
|
assert_eq!(
|
||||||
|
event.userdata, CLOCK_ID,
|
||||||
|
"the event.userdata should contain clock_id specified by the user"
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
unsafe fn test_stdin_read() {
|
unsafe fn test_stdin_read() {
|
||||||
@@ -77,6 +77,7 @@ unsafe fn test_stdin_read() {
|
|||||||
r#type: wasi::EVENTTYPE_CLOCK,
|
r#type: wasi::EVENTTYPE_CLOCK,
|
||||||
u: wasi::SubscriptionU { clock },
|
u: wasi::SubscriptionU { clock },
|
||||||
},
|
},
|
||||||
|
// Make sure that timeout is returned only once even if there are multiple read events
|
||||||
wasi::Subscription {
|
wasi::Subscription {
|
||||||
userdata: 1,
|
userdata: 1,
|
||||||
r#type: wasi::EVENTTYPE_FD_READ,
|
r#type: wasi::EVENTTYPE_FD_READ,
|
||||||
@@ -85,10 +86,6 @@ unsafe fn test_stdin_read() {
|
|||||||
];
|
];
|
||||||
let out = poll_oneoff_impl(&r#in, 1);
|
let out = poll_oneoff_impl(&r#in, 1);
|
||||||
let event = &out[0];
|
let event = &out[0];
|
||||||
assert_eq!(
|
|
||||||
event.userdata, CLOCK_ID,
|
|
||||||
"the event.userdata should contain clock_id specified by the user"
|
|
||||||
);
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
event.error,
|
event.error,
|
||||||
wasi::ERRNO_SUCCESS,
|
wasi::ERRNO_SUCCESS,
|
||||||
@@ -99,6 +96,10 @@ unsafe fn test_stdin_read() {
|
|||||||
wasi::EVENTTYPE_CLOCK,
|
wasi::EVENTTYPE_CLOCK,
|
||||||
"the event.type should equal clock"
|
"the event.type should equal clock"
|
||||||
);
|
);
|
||||||
|
assert_eq!(
|
||||||
|
event.userdata, CLOCK_ID,
|
||||||
|
"the event.userdata should contain clock_id specified by the user"
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
unsafe fn test_stdout_stderr_write() {
|
unsafe fn test_stdout_stderr_write() {
|
||||||
|
|||||||
@@ -245,3 +245,18 @@ impl Error {
|
|||||||
pub(crate) trait FromRawOsError {
|
pub(crate) trait FromRawOsError {
|
||||||
fn from_raw_os_error(code: i32) -> Self;
|
fn from_raw_os_error(code: i32) -> Self;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) type Result<T> = std::result::Result<T, Error>;
|
||||||
|
|
||||||
|
pub(crate) trait AsWasiError {
|
||||||
|
fn as_wasi_error(&self) -> WasiError;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> AsWasiError for Result<T> {
|
||||||
|
fn as_wasi_error(&self) -> WasiError {
|
||||||
|
self.as_ref()
|
||||||
|
.err()
|
||||||
|
.unwrap_or(&Error::ESUCCESS)
|
||||||
|
.as_wasi_error()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -42,5 +42,5 @@ pub mod hostcalls {
|
|||||||
pub use ctx::{WasiCtx, WasiCtxBuilder};
|
pub use ctx::{WasiCtx, WasiCtxBuilder};
|
||||||
pub use sys::preopen_dir;
|
pub use sys::preopen_dir;
|
||||||
|
|
||||||
pub type Error = error::Error;
|
pub use error::Error;
|
||||||
pub(crate) type Result<T> = std::result::Result<T, Error>;
|
pub(crate) use error::Result;
|
||||||
|
|||||||
@@ -1,18 +1,109 @@
|
|||||||
#![allow(non_camel_case_types)]
|
#![allow(non_camel_case_types)]
|
||||||
#![allow(unused_unsafe)]
|
#![allow(unused_unsafe)]
|
||||||
#![allow(unused)]
|
#![allow(unused)]
|
||||||
|
use crate::fdentry::Descriptor;
|
||||||
use crate::hostcalls_impl::{ClockEventData, FdEventData};
|
use crate::hostcalls_impl::{ClockEventData, FdEventData};
|
||||||
use crate::memory::*;
|
use crate::memory::*;
|
||||||
use crate::sys::host_impl;
|
use crate::sys::host_impl;
|
||||||
use crate::{wasi, wasi32, Error, Result};
|
use crate::{error::WasiError, wasi, wasi32, Error, Result};
|
||||||
use cpu_time::{ProcessTime, ThreadTime};
|
use cpu_time::{ProcessTime, ThreadTime};
|
||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
|
use log::{debug, error, trace, warn};
|
||||||
use std::convert::TryInto;
|
use std::convert::TryInto;
|
||||||
|
use std::io;
|
||||||
|
use std::os::windows::io::AsRawHandle;
|
||||||
|
use std::sync::mpsc::{self, Receiver, RecvTimeoutError, Sender, TryRecvError};
|
||||||
|
use std::sync::Mutex;
|
||||||
|
use std::thread;
|
||||||
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
|
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
|
||||||
|
|
||||||
|
struct StdinPoll {
|
||||||
|
request_tx: Sender<()>,
|
||||||
|
notify_rx: Receiver<PollState>,
|
||||||
|
}
|
||||||
|
|
||||||
|
enum PollState {
|
||||||
|
Ready,
|
||||||
|
NotReady, // it's not ready, but we didn't wait
|
||||||
|
TimedOut, // it's not ready and a timeout has occurred
|
||||||
|
Error(WasiError), // not using the top-lever Error because it's not Clone
|
||||||
|
}
|
||||||
|
|
||||||
|
enum WaitMode {
|
||||||
|
Timeout(Duration),
|
||||||
|
Infinite,
|
||||||
|
Immediate,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl StdinPoll {
|
||||||
|
// This function should not be used directly
|
||||||
|
// Correctness of this function crucially depends on the fact that
|
||||||
|
// mpsc::Receiver is !Sync.
|
||||||
|
fn poll(&self, wait_mode: WaitMode) -> PollState {
|
||||||
|
// Clean up possible unread result from the previous poll
|
||||||
|
match self.notify_rx.try_recv() {
|
||||||
|
Ok(_) | Err(TryRecvError::Empty) => {}
|
||||||
|
Err(TryRecvError::Disconnected) => panic!("notify_rx channel closed"),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Notify the worker thread that we want to poll stdin
|
||||||
|
self.request_tx.send(()).expect("request_tx channel closed");
|
||||||
|
|
||||||
|
// Wait for the worker thread to send a readiness notification
|
||||||
|
let pollret = match wait_mode {
|
||||||
|
WaitMode::Timeout(timeout) => {
|
||||||
|
self.notify_rx
|
||||||
|
.recv_timeout(timeout)
|
||||||
|
.unwrap_or_else(|e| match e {
|
||||||
|
RecvTimeoutError::Disconnected => panic!("notify_rx channel closed"),
|
||||||
|
RecvTimeoutError::Timeout => PollState::TimedOut,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
WaitMode::Infinite => self.notify_rx.recv().expect("notify_rx channel closed"),
|
||||||
|
WaitMode::Immediate => self.notify_rx.try_recv().unwrap_or_else(|e| match e {
|
||||||
|
TryRecvError::Disconnected => panic!("notify_rx channel closed"),
|
||||||
|
TryRecvError::Empty => PollState::NotReady,
|
||||||
|
}),
|
||||||
|
};
|
||||||
|
|
||||||
|
pollret
|
||||||
|
}
|
||||||
|
|
||||||
|
fn event_loop(request_rx: Receiver<()>, notify_tx: Sender<PollState>) -> ! {
|
||||||
|
use std::io::BufRead;
|
||||||
|
loop {
|
||||||
|
// Wait for the request to poll stdin
|
||||||
|
request_rx.recv().expect("request_rx channel closed");
|
||||||
|
|
||||||
|
// Wait for data to appear in stdin.
|
||||||
|
// If `fill_buf` returns any slice, then it means that either
|
||||||
|
// (a) there some data in stdin, if it's non-empty
|
||||||
|
// (b) EOF was received, if it's empty
|
||||||
|
// Linux returns `POLLIN` in both cases, and we imitate this behavior.
|
||||||
|
let resp = match std::io::stdin().lock().fill_buf() {
|
||||||
|
Ok(_) => PollState::Ready,
|
||||||
|
Err(e) => PollState::Error(Error::from(e).as_wasi_error()),
|
||||||
|
};
|
||||||
|
|
||||||
|
// Notify the requestor about data in stdin. They may have already timed out,
|
||||||
|
// then the next requestor will have to clean the channel.
|
||||||
|
notify_tx.send(resp).expect("notify_tx channel closed");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
lazy_static! {
|
lazy_static! {
|
||||||
static ref START_MONOTONIC: Instant = Instant::now();
|
static ref START_MONOTONIC: Instant = Instant::now();
|
||||||
static ref PERF_COUNTER_RES: u64 = get_perf_counter_resolution_ns();
|
static ref PERF_COUNTER_RES: u64 = get_perf_counter_resolution_ns();
|
||||||
|
static ref STDIN_POLL: Mutex<StdinPoll> = {
|
||||||
|
let (request_tx, request_rx) = mpsc::channel();
|
||||||
|
let (notify_tx, notify_rx) = mpsc::channel();
|
||||||
|
thread::spawn(move || StdinPoll::event_loop(request_rx, notify_tx));
|
||||||
|
Mutex::new(StdinPoll {
|
||||||
|
request_tx,
|
||||||
|
notify_rx,
|
||||||
|
})
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
// Timer resolution on Windows is really hard. We may consider exposing the resolution of the respective
|
// Timer resolution on Windows is really hard. We may consider exposing the resolution of the respective
|
||||||
@@ -76,12 +167,208 @@ pub(crate) fn clock_time_get(clock_id: wasi::__wasi_clockid_t) -> Result<wasi::_
|
|||||||
duration.as_nanos().try_into().map_err(Into::into)
|
duration.as_nanos().try_into().map_err(Into::into)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn make_rw_event(event: &FdEventData, nbytes: Result<u64>) -> wasi::__wasi_event_t {
|
||||||
|
use crate::error::AsWasiError;
|
||||||
|
let error = nbytes.as_wasi_error();
|
||||||
|
let nbytes = nbytes.unwrap_or_default();
|
||||||
|
wasi::__wasi_event_t {
|
||||||
|
userdata: event.userdata,
|
||||||
|
r#type: event.r#type,
|
||||||
|
error: error.as_raw_errno(),
|
||||||
|
u: wasi::__wasi_event_u_t {
|
||||||
|
fd_readwrite: wasi::__wasi_event_fd_readwrite_t { nbytes, flags: 0 },
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn make_timeout_event(timeout: &ClockEventData) -> wasi::__wasi_event_t {
|
||||||
|
wasi::__wasi_event_t {
|
||||||
|
userdata: timeout.userdata,
|
||||||
|
r#type: wasi::__WASI_EVENTTYPE_CLOCK,
|
||||||
|
error: wasi::__WASI_ERRNO_SUCCESS,
|
||||||
|
u: wasi::__wasi_event_u_t {
|
||||||
|
fd_readwrite: wasi::__wasi_event_fd_readwrite_t {
|
||||||
|
nbytes: 0,
|
||||||
|
flags: 0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_timeout(
|
||||||
|
timeout_event: ClockEventData,
|
||||||
|
timeout: Duration,
|
||||||
|
events: &mut Vec<wasi::__wasi_event_t>,
|
||||||
|
) {
|
||||||
|
thread::sleep(timeout);
|
||||||
|
handle_timeout_event(timeout_event, events);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_timeout_event(timeout_event: ClockEventData, events: &mut Vec<wasi::__wasi_event_t>) {
|
||||||
|
let new_event = make_timeout_event(&timeout_event);
|
||||||
|
events.push(new_event);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_rw_event(event: FdEventData, out_events: &mut Vec<wasi::__wasi_event_t>) {
|
||||||
|
let size = match event.descriptor {
|
||||||
|
Descriptor::OsHandle(os_handle) => {
|
||||||
|
if event.r#type == wasi::__WASI_EVENTTYPE_FD_READ {
|
||||||
|
os_handle.metadata().map(|m| m.len()).map_err(Into::into)
|
||||||
|
} else {
|
||||||
|
// The spec is unclear what nbytes should actually be for __WASI_EVENTTYPE_FD_WRITE and
|
||||||
|
// the implementation on Unix just returns 0 here, so it's probably fine
|
||||||
|
// to do the same on Windows for now.
|
||||||
|
// cf. https://github.com/WebAssembly/WASI/issues/148
|
||||||
|
Ok(0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// We return the only universally correct lower bound, see the comment later in the function.
|
||||||
|
Descriptor::Stdin => Ok(1),
|
||||||
|
// On Unix, ioctl(FIONREAD) will return 0 for stdout/stderr. Emulate the same behavior on Windows.
|
||||||
|
Descriptor::Stdout | Descriptor::Stderr => Ok(0),
|
||||||
|
};
|
||||||
|
|
||||||
|
let new_event = make_rw_event(&event, size);
|
||||||
|
out_events.push(new_event);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_error_event(
|
||||||
|
event: FdEventData,
|
||||||
|
error: Error,
|
||||||
|
out_events: &mut Vec<wasi::__wasi_event_t>,
|
||||||
|
) {
|
||||||
|
let new_event = make_rw_event(&event, Err(error));
|
||||||
|
out_events.push(new_event);
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) fn poll_oneoff(
|
pub(crate) fn poll_oneoff(
|
||||||
timeout: Option<ClockEventData>,
|
timeout: Option<ClockEventData>,
|
||||||
fd_events: Vec<FdEventData>,
|
fd_events: Vec<FdEventData>,
|
||||||
events: &mut Vec<wasi::__wasi_event_t>,
|
events: &mut Vec<wasi::__wasi_event_t>,
|
||||||
) -> Result<Vec<wasi::__wasi_event_t>> {
|
) -> Result<()> {
|
||||||
unimplemented!("poll_oneoff")
|
use std::fs::Metadata;
|
||||||
|
use std::thread;
|
||||||
|
|
||||||
|
let timeout = timeout
|
||||||
|
.map(|event| {
|
||||||
|
event
|
||||||
|
.delay
|
||||||
|
.try_into()
|
||||||
|
.map(Duration::from_nanos)
|
||||||
|
.map(|dur| (event, dur))
|
||||||
|
})
|
||||||
|
.transpose()?;
|
||||||
|
|
||||||
|
// With no events to listen, poll_oneoff just becomes a sleep.
|
||||||
|
if fd_events.is_empty() {
|
||||||
|
match timeout {
|
||||||
|
Some((event, dur)) => return Ok(handle_timeout(event, dur, events)),
|
||||||
|
// The implementation has to return Ok(()) in this case,
|
||||||
|
// cf. the comment in src/hostcalls_impl/misc.rs
|
||||||
|
None => return Ok(()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut stdin_events = vec![];
|
||||||
|
let mut immediate_events = vec![];
|
||||||
|
let mut pipe_events = vec![];
|
||||||
|
|
||||||
|
for event in fd_events {
|
||||||
|
match event.descriptor {
|
||||||
|
Descriptor::Stdin if event.r#type == wasi::__WASI_EVENTTYPE_FD_READ => {
|
||||||
|
stdin_events.push(event)
|
||||||
|
}
|
||||||
|
// stdout/stderr are always considered ready to write because there seems to
|
||||||
|
// be no way of checking if a write to stdout would block.
|
||||||
|
//
|
||||||
|
// If stdin is polled for anything else then reading, then it is also
|
||||||
|
// considered immediately ready, following the behavior on Linux.
|
||||||
|
Descriptor::Stdin | Descriptor::Stderr | Descriptor::Stdout => {
|
||||||
|
immediate_events.push(event)
|
||||||
|
}
|
||||||
|
Descriptor::OsHandle(os_handle) => {
|
||||||
|
let ftype = unsafe { winx::file::get_file_type(os_handle.as_raw_handle()) }?;
|
||||||
|
if ftype.is_unknown() || ftype.is_char() {
|
||||||
|
debug!("poll_oneoff: unsupported file type: {:?}", ftype);
|
||||||
|
handle_error_event(event, Error::ENOTSUP, events);
|
||||||
|
} else if ftype.is_disk() {
|
||||||
|
immediate_events.push(event);
|
||||||
|
} else if ftype.is_pipe() {
|
||||||
|
pipe_events.push(event);
|
||||||
|
} else {
|
||||||
|
unreachable!();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let immediate = !immediate_events.is_empty();
|
||||||
|
// Process all the events that do not require waiting.
|
||||||
|
if immediate {
|
||||||
|
trace!(" | have immediate events, will return immediately");
|
||||||
|
for mut event in immediate_events {
|
||||||
|
handle_rw_event(event, events);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !stdin_events.is_empty() {
|
||||||
|
// During the firt request to poll stdin, we spin up a separate thread to
|
||||||
|
// waiting for data to arrive on stdin. This thread will not terminate.
|
||||||
|
//
|
||||||
|
// We'd like to do the following:
|
||||||
|
// (1) wait in a non-blocking way for data to be available in stdin, with timeout
|
||||||
|
// (2) find out, how many bytes are there available to be read.
|
||||||
|
//
|
||||||
|
// One issue is that we are currently relying on the Rust libstd for interaction
|
||||||
|
// with stdin. More precisely, `io::stdin` is used via the `BufRead` trait,
|
||||||
|
// in the `fd_read` function, which always does buffering on the libstd side. [1]
|
||||||
|
// This means that even if there's still some unread data in stdin,
|
||||||
|
// the lower-level Windows system calls may return false negatives,
|
||||||
|
// claiming that stdin is empty.
|
||||||
|
//
|
||||||
|
// Theoretically, one could use `WaitForSingleObject` on the stdin handle
|
||||||
|
// to achieve (1). Unfortunately, this function doesn't seem to honor the
|
||||||
|
// requested timeout and to misbehaves after the stdin is closed.
|
||||||
|
//
|
||||||
|
// There appears to be no way of achieving (2) on Windows.
|
||||||
|
// [1]: https://github.com/rust-lang/rust/pull/12422
|
||||||
|
let waitmode = if immediate {
|
||||||
|
trace!(" | tentatively checking stdin");
|
||||||
|
WaitMode::Immediate
|
||||||
|
} else {
|
||||||
|
trace!(" | passively waiting on stdin");
|
||||||
|
match timeout {
|
||||||
|
Some((event, dur)) => WaitMode::Timeout(dur),
|
||||||
|
None => WaitMode::Infinite,
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let state = STDIN_POLL.lock().unwrap().poll(waitmode);
|
||||||
|
for event in stdin_events {
|
||||||
|
match state {
|
||||||
|
PollState::Ready => handle_rw_event(event, events),
|
||||||
|
PollState::NotReady => {} // not immediately available, so just ignore
|
||||||
|
PollState::TimedOut => handle_timeout_event(timeout.unwrap().0, events),
|
||||||
|
PollState::Error(e) => handle_error_event(event, Error::Wasi(e), events),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !immediate && !pipe_events.is_empty() {
|
||||||
|
trace!(" | actively polling pipes");
|
||||||
|
match timeout {
|
||||||
|
Some((event, dur)) => {
|
||||||
|
// In the tests stdin is replaced with a dummy pipe, so for now
|
||||||
|
// we just time out. Support for pipes will be decided later on.
|
||||||
|
warn!("Polling pipes not supported on Windows, will just time out.");
|
||||||
|
handle_timeout(event, dur, events);
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
error!("Polling only pipes with no timeout not supported on Windows.");
|
||||||
|
return Err(Error::ENOTSUP);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_monotonic_time() -> Duration {
|
fn get_monotonic_time() -> Duration {
|
||||||
|
|||||||
Reference in New Issue
Block a user