reuse cap-std-syncs windows scheduler without copypaste

This commit is contained in:
Pat Hickey
2021-05-06 15:45:54 -07:00
parent 3d9b98f1df
commit ff8bdc390b
4 changed files with 212 additions and 410 deletions

View File

@@ -1,15 +1,40 @@
#[cfg(unix)]
mod unix;
pub mod unix;
#[cfg(unix)]
pub use unix::*;
pub use unix::poll_oneoff;
#[cfg(windows)]
mod windows;
pub mod windows;
#[cfg(windows)]
pub use windows::*;
pub use windows::poll_oneoff;
use wasi_common::sched::WasiSched;
use std::thread;
use std::time::Duration;
use wasi_common::{
sched::{Poll, WasiSched},
Error,
};
pub struct SyncSched {}
impl SyncSched {
pub fn new() -> Self {
Self {}
}
}
#[async_trait::async_trait(?Send)]
impl WasiSched for SyncSched {
async fn poll_oneoff<'a>(&self, poll: &mut Poll<'a>) -> Result<(), Error> {
poll_oneoff(poll).await
}
async fn sched_yield(&self) -> Result<(), Error> {
thread::yield_now();
Ok(())
}
async fn sleep(&self, duration: Duration) -> Result<(), Error> {
std::thread::sleep(duration);
Ok(())
}
}
pub fn sched_ctx() -> Box<dyn WasiSched> {
Box::new(SyncSched::new())
}

View File

@@ -5,112 +5,93 @@ use wasi_common::{
file::WasiFile,
sched::{
subscription::{RwEventFlags, Subscription},
Poll, WasiSched,
Poll,
},
Error, ErrorExt,
};
use poll::{PollFd, PollFlags};
pub struct SyncSched;
impl SyncSched {
pub fn new() -> Self {
SyncSched
pub async fn poll_oneoff<'a>(poll: &mut Poll<'a>) -> Result<(), Error> {
if poll.is_empty() {
return Ok(());
}
}
let mut pollfds = Vec::new();
for s in poll.rw_subscriptions() {
match s {
Subscription::Read(f) => {
let raw_fd = wasi_file_raw_fd(f.file).ok_or(
Error::invalid_argument().context("read subscription fd downcast failed"),
)?;
pollfds.push(unsafe { PollFd::new(raw_fd, PollFlags::POLLIN) });
}
#[async_trait::async_trait(?Send)]
impl WasiSched for SyncSched {
async fn poll_oneoff<'a>(&self, poll: &mut Poll<'a>) -> Result<(), Error> {
if poll.is_empty() {
return Ok(());
Subscription::Write(f) => {
let raw_fd = wasi_file_raw_fd(f.file).ok_or(
Error::invalid_argument().context("write subscription fd downcast failed"),
)?;
pollfds.push(unsafe { PollFd::new(raw_fd, PollFlags::POLLOUT) });
}
Subscription::MonotonicClock { .. } => unreachable!(),
}
let mut pollfds = Vec::new();
for s in poll.rw_subscriptions() {
match s {
Subscription::Read(f) => {
let raw_fd = wasi_file_raw_fd(f.file).ok_or(
Error::invalid_argument().context("read subscription fd downcast failed"),
)?;
pollfds.push(unsafe { PollFd::new(raw_fd, PollFlags::POLLIN) });
}
}
Subscription::Write(f) => {
let raw_fd = wasi_file_raw_fd(f.file).ok_or(
Error::invalid_argument().context("write subscription fd downcast failed"),
)?;
pollfds.push(unsafe { PollFd::new(raw_fd, PollFlags::POLLOUT) });
}
Subscription::MonotonicClock { .. } => unreachable!(),
}
}
let ready = loop {
let poll_timeout = if let Some(t) = poll.earliest_clock_deadline() {
let duration = t.duration_until().unwrap_or(Duration::from_secs(0));
(duration.as_millis() + 1) // XXX try always rounding up?
.try_into()
.map_err(|_| Error::overflow().context("poll timeout"))?
} else {
libc::c_int::max_value()
};
tracing::debug!(
poll_timeout = tracing::field::debug(poll_timeout),
poll_fds = tracing::field::debug(&pollfds),
"poll"
);
match poll::poll(&mut pollfds, poll_timeout) {
Ok(ready) => break ready,
Err(_) => {
let last_err = std::io::Error::last_os_error();
if last_err.raw_os_error().unwrap() == libc::EINTR {
continue;
} else {
return Err(last_err.into());
}
}
}
};
if ready > 0 {
for (rwsub, pollfd) in poll.rw_subscriptions().zip(pollfds.into_iter()) {
if let Some(revents) = pollfd.revents() {
let (nbytes, rwsub) = match rwsub {
Subscription::Read(sub) => {
let ready = sub.file.num_ready_bytes().await?;
(std::cmp::max(ready, 1), sub)
}
Subscription::Write(sub) => (0, sub),
_ => unreachable!(),
};
if revents.contains(PollFlags::POLLNVAL) {
rwsub.error(Error::badf());
} else if revents.contains(PollFlags::POLLERR) {
rwsub.error(Error::io());
} else if revents.contains(PollFlags::POLLHUP) {
rwsub.complete(nbytes, RwEventFlags::HANGUP);
} else {
rwsub.complete(nbytes, RwEventFlags::empty());
};
}
}
let ready = loop {
let poll_timeout = if let Some(t) = poll.earliest_clock_deadline() {
let duration = t.duration_until().unwrap_or(Duration::from_secs(0));
(duration.as_millis() + 1) // XXX try always rounding up?
.try_into()
.map_err(|_| Error::overflow().context("poll timeout"))?
} else {
poll.earliest_clock_deadline()
.expect("timed out")
.result()
.expect("timer deadline is past")
.unwrap()
libc::c_int::max_value()
};
tracing::debug!(
poll_timeout = tracing::field::debug(poll_timeout),
poll_fds = tracing::field::debug(&pollfds),
"poll"
);
match poll::poll(&mut pollfds, poll_timeout) {
Ok(ready) => break ready,
Err(_) => {
let last_err = std::io::Error::last_os_error();
if last_err.raw_os_error().unwrap() == libc::EINTR {
continue;
} else {
return Err(last_err.into());
}
}
}
Ok(())
}
async fn sched_yield(&self) -> Result<(), Error> {
std::thread::yield_now();
Ok(())
}
async fn sleep(&self, duration: Duration) -> Result<(), Error> {
std::thread::sleep(duration);
Ok(())
};
if ready > 0 {
for (rwsub, pollfd) in poll.rw_subscriptions().zip(pollfds.into_iter()) {
if let Some(revents) = pollfd.revents() {
let (nbytes, rwsub) = match rwsub {
Subscription::Read(sub) => {
let ready = sub.file.num_ready_bytes().await?;
(std::cmp::max(ready, 1), sub)
}
Subscription::Write(sub) => (0, sub),
_ => unreachable!(),
};
if revents.contains(PollFlags::POLLNVAL) {
rwsub.error(Error::badf());
} else if revents.contains(PollFlags::POLLERR) {
rwsub.error(Error::io());
} else if revents.contains(PollFlags::POLLHUP) {
rwsub.complete(nbytes, RwEventFlags::HANGUP);
} else {
rwsub.complete(nbytes, RwEventFlags::empty());
};
}
}
} else {
poll.earliest_clock_deadline()
.expect("timed out")
.result()
.expect("timer deadline is past")
.unwrap()
}
Ok(())
}
fn wasi_file_raw_fd(f: &dyn WasiFile) -> Option<RawFd> {

View File

@@ -1,3 +1,13 @@
// The windows scheduler is unmaintained and due for a rewrite.
//
// Rather than use a polling mechanism for file read/write readiness,
// it checks readiness just once, before sleeping for any timer subscriptions.
// Checking stdin readiness uses a worker thread which, once started, lives for the
// lifetime of the process.
//
// We suspect there are bugs in this scheduler, however, we have not
// taken the time to improve it. See bug #2880.
use anyhow::Context;
use std::ops::Deref;
use std::os::windows::io::{AsRawHandle, RawHandle};
@@ -9,130 +19,121 @@ use wasi_common::{
file::WasiFile,
sched::{
subscription::{RwEventFlags, Subscription},
Poll, WasiSched,
Poll,
},
Error, ErrorExt,
};
pub struct SyncSched {}
impl SyncSched {
pub fn new() -> Self {
Self {}
}
pub async fn poll_oneoff<'a>(poll: &mut Poll<'a>) -> Result<(), Error> {
poll_oneoff_(poll, wasi_file_raw_handle).await
}
#[async_trait::async_trait(?Send)]
impl WasiSched for SyncSched {
async fn poll_oneoff<'a>(&self, poll: &mut Poll<'a>) -> Result<(), Error> {
if poll.is_empty() {
return Ok(());
}
// For reuse by wasi-tokio, which has a different WasiFile -> RawHandle translator.
pub async fn poll_oneoff_<'a>(
poll: &mut Poll<'a>,
file_to_handle: impl Fn(&dyn WasiFile) -> Option<RawHandle>,
) -> Result<(), Error> {
if poll.is_empty() {
return Ok(());
}
let mut ready = false;
let waitmode = if let Some(t) = poll.earliest_clock_deadline() {
if let Some(duration) = t.duration_until() {
WaitMode::Timeout(duration)
} else {
WaitMode::Immediate
}
let mut ready = false;
let waitmode = if let Some(t) = poll.earliest_clock_deadline() {
if let Some(duration) = t.duration_until() {
WaitMode::Timeout(duration)
} else {
if ready {
WaitMode::Immediate
} else {
WaitMode::Infinite
}
};
let mut stdin_read_subs = Vec::new();
let mut immediate_reads = Vec::new();
let mut immediate_writes = Vec::new();
for s in poll.rw_subscriptions() {
match s {
Subscription::Read(r) => {
if r.file.as_any().is::<crate::stdio::Stdin>() {
stdin_read_subs.push(r);
} else if wasi_file_raw_handle(r.file.deref()).is_some() {
immediate_reads.push(r);
} else {
return Err(Error::invalid_argument()
.context("read subscription fd downcast failed"));
}
}
Subscription::Write(w) => {
if wasi_file_raw_handle(w.file.deref()).is_some() {
immediate_writes.push(w);
} else {
return Err(Error::invalid_argument()
.context("write subscription fd downcast failed"));
}
}
Subscription::MonotonicClock { .. } => unreachable!(),
}
WaitMode::Immediate
}
} else {
if ready {
WaitMode::Immediate
} else {
WaitMode::Infinite
}
};
if !stdin_read_subs.is_empty() {
let state = STDIN_POLL
.lock()
.map_err(|_| Error::trap("failed to take lock of STDIN_POLL"))?
.poll(waitmode)?;
for readsub in stdin_read_subs.into_iter() {
match state {
PollState::Ready => {
readsub.complete(1, RwEventFlags::empty());
ready = true;
}
PollState::NotReady | PollState::TimedOut => {}
PollState::Error(ref e) => {
// Unfortunately, we need to deliver the Error to each of the
// subscriptions, but there is no Clone on std::io::Error. So, we convert it to the
// kind, and then back to std::io::Error, and finally to anyhow::Error.
// When its time to turn this into an errno elsewhere, the error kind will
// be inspected.
let ekind = e.kind();
let ioerror = std::io::Error::from(ekind);
readsub.error(ioerror.into());
ready = true;
}
let mut stdin_read_subs = Vec::new();
let mut immediate_reads = Vec::new();
let mut immediate_writes = Vec::new();
for s in poll.rw_subscriptions() {
match s {
Subscription::Read(r) => {
if r.file.as_any().is::<crate::stdio::Stdin>() {
stdin_read_subs.push(r);
} else if file_to_handle(r.file.deref()).is_some() {
immediate_reads.push(r);
} else {
return Err(
Error::invalid_argument().context("read subscription fd downcast failed")
);
}
}
Subscription::Write(w) => {
if wasi_file_raw_handle(w.file.deref()).is_some() {
immediate_writes.push(w);
} else {
return Err(
Error::invalid_argument().context("write subscription fd downcast failed")
);
}
}
Subscription::MonotonicClock { .. } => unreachable!(),
}
for r in immediate_reads {
match r.file.num_ready_bytes().await {
Ok(ready_bytes) => {
r.complete(ready_bytes, RwEventFlags::empty());
}
if !stdin_read_subs.is_empty() {
let state = STDIN_POLL
.lock()
.map_err(|_| Error::trap("failed to take lock of STDIN_POLL"))?
.poll(waitmode)?;
for readsub in stdin_read_subs.into_iter() {
match state {
PollState::Ready => {
readsub.complete(1, RwEventFlags::empty());
ready = true;
}
Err(e) => {
r.error(e);
PollState::NotReady | PollState::TimedOut => {}
PollState::Error(ref e) => {
// Unfortunately, we need to deliver the Error to each of the
// subscriptions, but there is no Clone on std::io::Error. So, we convert it to the
// kind, and then back to std::io::Error, and finally to anyhow::Error.
// When its time to turn this into an errno elsewhere, the error kind will
// be inspected.
let ekind = e.kind();
let ioerror = std::io::Error::from(ekind);
readsub.error(ioerror.into());
ready = true;
}
}
}
for w in immediate_writes {
// Everything is always ready for writing, apparently?
w.complete(0, RwEventFlags::empty());
ready = true;
}
if !ready {
if let WaitMode::Timeout(duration) = waitmode {
thread::sleep(duration);
}
for r in immediate_reads {
match r.file.num_ready_bytes().await {
Ok(ready_bytes) => {
r.complete(ready_bytes, RwEventFlags::empty());
ready = true;
}
Err(e) => {
r.error(e);
ready = true;
}
}
}
for w in immediate_writes {
// Everything is always ready for writing, apparently?
w.complete(0, RwEventFlags::empty());
ready = true;
}
Ok(())
}
async fn sched_yield(&self) -> Result<(), Error> {
thread::yield_now();
Ok(())
}
async fn sleep(&self, duration: Duration) -> Result<(), Error> {
std::thread::sleep(duration);
Ok(())
if !ready {
if let WaitMode::Timeout(duration) = waitmode {
thread::sleep(duration);
}
}
Ok(())
}
fn wasi_file_raw_handle(f: &dyn WasiFile) -> Option<RawHandle> {
pub fn wasi_file_raw_handle(f: &dyn WasiFile) -> Option<RawHandle> {
let a = f.as_any();
if a.is::<crate::file::File>() {
Some(