More WIP
This commit is contained in:
@@ -22,6 +22,7 @@ filetime = "0.2.7"
|
|||||||
lazy_static = "1.4.0"
|
lazy_static = "1.4.0"
|
||||||
num = { version = "0.2.0", default-features = false }
|
num = { version = "0.2.0", default-features = false }
|
||||||
wig = { path = "wig" }
|
wig = { path = "wig" }
|
||||||
|
crossbeam = "0.7.3"
|
||||||
|
|
||||||
[target.'cfg(unix)'.dependencies]
|
[target.'cfg(unix)'.dependencies]
|
||||||
yanix = { path = "yanix" }
|
yanix = { path = "yanix" }
|
||||||
|
|||||||
@@ -1,19 +1,79 @@
|
|||||||
#![allow(non_camel_case_types)]
|
#![allow(non_camel_case_types)]
|
||||||
#![allow(unused_unsafe)]
|
#![allow(unused_unsafe)]
|
||||||
#![allow(unused)]
|
#![allow(unused)]
|
||||||
|
use crate::fdentry::Descriptor;
|
||||||
use crate::hostcalls_impl::{ClockEventData, FdEventData};
|
use crate::hostcalls_impl::{ClockEventData, FdEventData};
|
||||||
use crate::memory::*;
|
use crate::memory::*;
|
||||||
use crate::sys::host_impl;
|
use crate::sys::host_impl;
|
||||||
use crate::{wasi, wasi32, Error, Result};
|
use crate::{wasi, wasi32, Error, Result};
|
||||||
use cpu_time::{ProcessTime, ThreadTime};
|
use cpu_time::{ProcessTime, ThreadTime};
|
||||||
|
use crossbeam::channel::{self, Receiver, Sender};
|
||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
use log::trace;
|
use log::{error, trace};
|
||||||
use std::convert::TryInto;
|
use std::convert::TryInto;
|
||||||
|
use std::io;
|
||||||
|
use std::os::windows::io::AsRawHandle;
|
||||||
|
use std::thread;
|
||||||
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
|
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
|
||||||
|
|
||||||
|
type StdinPayload = io::Result<bool>;
|
||||||
|
struct StdinPoll {
|
||||||
|
request_tx: Sender<()>,
|
||||||
|
notify_rx: Receiver<StdinPayload>,
|
||||||
|
}
|
||||||
|
|
||||||
|
enum PollState {
|
||||||
|
Ready,
|
||||||
|
Closed,
|
||||||
|
TimedOut,
|
||||||
|
Error(Error),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl StdinPoll {
|
||||||
|
fn poll(&self, timeout: Option<Duration>) -> PollState {
|
||||||
|
use crossbeam::channel::{RecvTimeoutError, TryRecvError};
|
||||||
|
// Clean up possible unread result from previous poll
|
||||||
|
match self.notify_rx.try_recv() {
|
||||||
|
Ok(_) | Err(TryRecvError::Empty) => {}
|
||||||
|
Err(TryRecvError::Disconnected) => panic!("FIXME"),
|
||||||
|
}
|
||||||
|
self.request_tx.send(()).expect("FIXME");
|
||||||
|
let pollret = match timeout {
|
||||||
|
Some(timeout) => self.notify_rx.recv_timeout(timeout),
|
||||||
|
None => Ok(self.notify_rx.recv().expect("FIXME")),
|
||||||
|
};
|
||||||
|
match pollret {
|
||||||
|
Ok(Ok(true)) => PollState::Ready,
|
||||||
|
Ok(Ok(false)) => PollState::Closed,
|
||||||
|
Ok(Err(e)) => PollState::Error(e.into()),
|
||||||
|
Err(RecvTimeoutError::Timeout) => PollState::TimedOut,
|
||||||
|
Err(RecvTimeoutError::Disconnected) => panic!("FIXME"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn event_loop(request_rx: Receiver<()>, notify_tx: Sender<StdinPayload>) -> ! {
|
||||||
|
use std::io::BufRead;
|
||||||
|
loop {
|
||||||
|
request_rx.recv().expect("FIXME");
|
||||||
|
let buf = std::io::stdin().lock().fill_buf().map(|s| !s.is_empty());
|
||||||
|
notify_tx.send(buf).expect("FIXME");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
lazy_static! {
|
lazy_static! {
|
||||||
static ref START_MONOTONIC: Instant = Instant::now();
|
static ref START_MONOTONIC: Instant = Instant::now();
|
||||||
static ref PERF_COUNTER_RES: u64 = get_perf_counter_resolution_ns();
|
static ref PERF_COUNTER_RES: u64 = get_perf_counter_resolution_ns();
|
||||||
|
static ref STDIN_POLL: StdinPoll = {
|
||||||
|
let channel_size = 1;
|
||||||
|
let (request_tx, request_rx) = channel::bounded(channel_size);
|
||||||
|
let (notify_tx, notify_rx) = channel::bounded(channel_size);
|
||||||
|
thread::spawn(move || StdinPoll::event_loop(request_rx, notify_tx));
|
||||||
|
StdinPoll {
|
||||||
|
request_tx,
|
||||||
|
notify_rx,
|
||||||
|
}
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
// Timer resolution on Windows is really hard. We may consider exposing the resolution of the respective
|
// Timer resolution on Windows is really hard. We may consider exposing the resolution of the respective
|
||||||
@@ -110,66 +170,21 @@ fn make_timeout_event(timeout: &ClockEventData) -> wasi::__wasi_event_t {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn poll_oneoff(
|
fn handle_timeout(
|
||||||
timeout: Option<ClockEventData>,
|
timeout_event: ClockEventData,
|
||||||
fd_events: Vec<FdEventData>,
|
timeout: Duration,
|
||||||
events: &mut Vec<wasi::__wasi_event_t>,
|
events: &mut Vec<wasi::__wasi_event_t>,
|
||||||
) -> Result<()> {
|
) {
|
||||||
use crate::fdentry::Descriptor;
|
thread::sleep(timeout);
|
||||||
use std::fs::Metadata;
|
handle_timeout_event(timeout_event, events);
|
||||||
use std::thread;
|
}
|
||||||
|
|
||||||
let timeout_duration = timeout
|
fn handle_timeout_event(timeout_event: ClockEventData, events: &mut Vec<wasi::__wasi_event_t>) {
|
||||||
.map(|t| t.delay.try_into().map(Duration::from_nanos))
|
|
||||||
.transpose()?;
|
|
||||||
|
|
||||||
// With no events to listen, poll_oneoff just becomes a sleep.
|
|
||||||
if fd_events.is_empty() {
|
|
||||||
match timeout_duration {
|
|
||||||
Some(t) => {
|
|
||||||
thread::sleep(t);
|
|
||||||
let timeout_event = timeout.expect("timeout should be Some");
|
|
||||||
let new_event = make_timeout_event(&timeout_event);
|
let new_event = make_timeout_event(&timeout_event);
|
||||||
events.push(new_event);
|
events.push(new_event);
|
||||||
}
|
}
|
||||||
// `poll` invoked with nfds = 0, timeout = -1 appears to be an infinite sleep
|
|
||||||
// Even though the thread is not guanteed to remain parked forever, `poll(2)`
|
|
||||||
// mentions that spurious readiness notifications may occur, so it's probably fine
|
|
||||||
None => thread::park(),
|
|
||||||
}
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
// Currently WASI file support is only (a) regular files (b) directories (c) symlinks on Windows,
|
fn handle_rw_event(event: FdEventData, out_events: &mut Vec<wasi::__wasi_event_t>) {
|
||||||
// which are always ready to write on Unix.
|
|
||||||
//
|
|
||||||
// We need to consider stdin/stdout/stderr separately.
|
|
||||||
// We treat stdout/stderr as always ready to write. I'm not sure if it's correct
|
|
||||||
// on Windows but I have not find any way of checking if a write to stdout would block.
|
|
||||||
// Therefore, we only poll the stdin.
|
|
||||||
let mut stdin_events = vec![];
|
|
||||||
let mut immediate_events = vec![];
|
|
||||||
let mut stdin_ready = None;
|
|
||||||
|
|
||||||
for event in fd_events {
|
|
||||||
match event.descriptor {
|
|
||||||
Descriptor::Stdin if event.r#type == wasi::__WASI_EVENTTYPE_FD_READ => {
|
|
||||||
// Cache the non-emptiness for better performance.
|
|
||||||
let immediate = stdin_ready.get_or_insert_with(stdin_nonempty);
|
|
||||||
if *immediate {
|
|
||||||
immediate_events.push(event)
|
|
||||||
} else {
|
|
||||||
stdin_events.push(event)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_ => immediate_events.push(event),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Process all the events that do not require waiting.
|
|
||||||
if !immediate_events.is_empty() {
|
|
||||||
trace!(" | have immediate events, will return immediately");
|
|
||||||
for mut event in immediate_events {
|
|
||||||
let size = match event.descriptor {
|
let size = match event.descriptor {
|
||||||
Descriptor::OsHandle(os_handle) => {
|
Descriptor::OsHandle(os_handle) => {
|
||||||
if event.r#type == wasi::__WASI_EVENTTYPE_FD_READ {
|
if event.r#type == wasi::__WASI_EVENTTYPE_FD_READ {
|
||||||
@@ -189,11 +204,131 @@ pub(crate) fn poll_oneoff(
|
|||||||
};
|
};
|
||||||
|
|
||||||
let new_event = make_rw_event(&event, size);
|
let new_event = make_rw_event(&event, size);
|
||||||
events.push(new_event)
|
out_events.push(new_event);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_error_event(
|
||||||
|
event: FdEventData,
|
||||||
|
error: Error,
|
||||||
|
out_events: &mut Vec<wasi::__wasi_event_t>,
|
||||||
|
) {
|
||||||
|
let new_event = make_rw_event(&event, Err(error));
|
||||||
|
out_events.push(new_event);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn poll_oneoff(
|
||||||
|
timeout: Option<ClockEventData>,
|
||||||
|
fd_events: Vec<FdEventData>,
|
||||||
|
events: &mut Vec<wasi::__wasi_event_t>,
|
||||||
|
) -> Result<()> {
|
||||||
|
use std::fs::Metadata;
|
||||||
|
use std::thread;
|
||||||
|
|
||||||
|
let timeout = timeout
|
||||||
|
.map(|event| {
|
||||||
|
event
|
||||||
|
.delay
|
||||||
|
.try_into()
|
||||||
|
.map(Duration::from_nanos)
|
||||||
|
.map(|dur| (event, dur))
|
||||||
|
})
|
||||||
|
.transpose()?;
|
||||||
|
|
||||||
|
// With no events to listen, poll_oneoff just becomes a sleep.
|
||||||
|
if fd_events.is_empty() {
|
||||||
|
match timeout {
|
||||||
|
Some((event, dur)) => return Ok(handle_timeout(event, dur, events)),
|
||||||
|
// `poll` invoked with nfds = 0, timeout = -1 appears to be an infinite sleep on Unix
|
||||||
|
// usually meant to be interrupted by a signal. Unfortunately, WASI doesn't currently
|
||||||
|
// support signals and there is no way to interrupt this infinite sleep, so we
|
||||||
|
// return `ENOTSUP`
|
||||||
|
None => return Err(Error::ENOTSUP),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Currently WASI file support is only (a) regular files (b) directories (c) symlinks on Windows,
|
||||||
|
// which are always ready to write on Unix.
|
||||||
|
//
|
||||||
|
// We need to consider stdin/stdout/stderr separately.
|
||||||
|
// We treat stdout/stderr as always ready to write. I'm not sure if it's correct
|
||||||
|
// on Windows but I have not find any way of checking if a write to stdout would block.
|
||||||
|
// Therefore, we only poll the stdin.
|
||||||
|
let mut stdin_events = vec![];
|
||||||
|
let mut immediate_events = vec![];
|
||||||
|
let mut pipe_events = vec![];
|
||||||
|
let mut stdin_ready = None;
|
||||||
|
|
||||||
|
for event in fd_events {
|
||||||
|
match event.descriptor {
|
||||||
|
Descriptor::Stdin if event.r#type == wasi::__WASI_EVENTTYPE_FD_READ => {
|
||||||
|
// Cache the non-emptiness for better performance.
|
||||||
|
let immediate = stdin_ready.get_or_insert_with(stdin_nonempty);
|
||||||
|
if *immediate {
|
||||||
|
immediate_events.push(event)
|
||||||
|
} else {
|
||||||
|
stdin_events.push(event)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Descriptor::Stdin | Descriptor::Stderr | Descriptor::Stdout => {
|
||||||
|
immediate_events.push(event)
|
||||||
|
}
|
||||||
|
Descriptor::OsHandle(os_handle) => {
|
||||||
|
let ftype = unsafe { winx::file::get_file_type(os_handle.as_raw_handle()) }?;
|
||||||
|
if ftype.is_unknown() || ftype.is_char() {
|
||||||
|
error!("poll_oneoff: unsupported file type: {:?}", ftype);
|
||||||
|
handle_error_event(event, Error::ENOTSUP, events);
|
||||||
|
} else if ftype.is_disk() {
|
||||||
|
immediate_events.push(event);
|
||||||
|
} else if ftype.is_pipe() {
|
||||||
|
pipe_events.push(event);
|
||||||
|
} else {
|
||||||
|
unreachable!();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process all the events that do not require waiting.
|
||||||
|
if !immediate_events.is_empty() {
|
||||||
|
trace!(" | have immediate events, will return immediately");
|
||||||
|
for mut event in immediate_events {
|
||||||
|
handle_rw_event(event, events);
|
||||||
}
|
}
|
||||||
} else if !stdin_events.is_empty() {
|
} else if !stdin_events.is_empty() {
|
||||||
trace!(" | actively polling stdin");
|
// REVIEW: is there a better place to document this? Perhaps in
|
||||||
// There are some stdin poll requests and there's no data available immediately
|
// `struct PollStdin`?
|
||||||
|
//
|
||||||
|
// If there's a request to poll stdin, we spin up a separate thread to
|
||||||
|
// waiting for data to arrive on stdin. This thread will not terminate.
|
||||||
|
//
|
||||||
|
// TODO more explain why this way
|
||||||
|
trace!(" | passively waiting on stdin");
|
||||||
|
let dur = timeout.map(|t| t.1);
|
||||||
|
let state = STDIN_POLL.poll(dur);
|
||||||
|
for event in stdin_events {
|
||||||
|
match state {
|
||||||
|
PollState::Ready => handle_rw_event(event, events),
|
||||||
|
PollState::Closed => { /* error? FIXME */ }
|
||||||
|
PollState::TimedOut => { /* FIXME */ }
|
||||||
|
PollState::Error(ref e) => {
|
||||||
|
handle_error_event(event, Error::ENOTSUP /*FIXME*/, events);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if !pipe_events.is_empty() {
|
||||||
|
trace!(" | actively polling stdin or pipes");
|
||||||
|
match timeout {
|
||||||
|
Some((event, dur)) => {
|
||||||
|
error!("Polling pipes not supported on Windows, will just time out.");
|
||||||
|
return Ok(handle_timeout(event, dur, events));
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
error!("Polling only pipes with no timeout not supported on Windows.");
|
||||||
|
return Err(Error::ENOTSUP);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// TODO remove these old comments!!!
|
||||||
|
// There are some stdin or pipe poll requests and there's no data available immediately
|
||||||
|
|
||||||
// We are busy-polling the stdin with delay, unfortunately.
|
// We are busy-polling the stdin with delay, unfortunately.
|
||||||
//
|
//
|
||||||
@@ -216,55 +351,8 @@ pub(crate) fn poll_oneoff(
|
|||||||
//
|
//
|
||||||
// However, polling stdin is a relatively infrequent use case, so this hopefully won't be
|
// However, polling stdin is a relatively infrequent use case, so this hopefully won't be
|
||||||
// a major issue.
|
// a major issue.
|
||||||
let timeout_duration = timeout
|
|
||||||
.map(|t| t.delay.try_into().map(Duration::from_nanos))
|
|
||||||
.transpose()?;
|
|
||||||
|
|
||||||
// avoid issuing more syscalls if we're requested to return immediately
|
// avoid issuing more syscalls if we're requested to return immediately
|
||||||
if timeout_duration == Some(Duration::from_nanos(0)) {
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
let poll_interval = Duration::from_millis(10);
|
|
||||||
let poll_start = Instant::now();
|
|
||||||
|
|
||||||
let timeout_occurred: Option<ClockEventData> = loop {
|
|
||||||
// Even though we assume that stdin is not ready, it's better to check it
|
|
||||||
// sooner than later, as we're going to wait anyway if it's the case.
|
|
||||||
if stdin_nonempty() {
|
|
||||||
break None;
|
|
||||||
}
|
|
||||||
if let Some(timeout_duration) = timeout_duration {
|
|
||||||
if poll_start.elapsed() >= timeout_duration {
|
|
||||||
break timeout;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
thread::sleep(poll_interval);
|
|
||||||
};
|
|
||||||
|
|
||||||
match timeout_occurred {
|
|
||||||
Some(timeout_info) => {
|
|
||||||
let new_event = make_timeout_event(&timeout_info);
|
|
||||||
events.push(new_event);
|
|
||||||
}
|
|
||||||
None => {
|
|
||||||
// stdin became ready for reading
|
|
||||||
for event in stdin_events {
|
|
||||||
assert_eq!(
|
|
||||||
event.r#type,
|
|
||||||
wasi::__WASI_EVENTTYPE_FD_READ,
|
|
||||||
"stdin was expected to be polled for reading"
|
|
||||||
);
|
|
||||||
|
|
||||||
// Another limitation is that `std::io::BufRead` doesn't allow us
|
|
||||||
// to find out the number bytes available in the buffer,
|
|
||||||
// so we return the only universally correct lower bound,
|
|
||||||
// which is 1 byte.
|
|
||||||
let new_event = make_rw_event(&event, Ok(1));
|
|
||||||
events.push(new_event);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|||||||
Reference in New Issue
Block a user