From 5ab8346a05e47a2bf730d12e26c73d378e06d94e Mon Sep 17 00:00:00 2001 From: Pat Hickey Date: Mon, 3 May 2021 12:23:14 -0700 Subject: [PATCH] fix windows sync scheduler --- .../cap-std-sync/src/sched/windows.rs | 94 +++++++++---------- 1 file changed, 46 insertions(+), 48 deletions(-) diff --git a/crates/wasi-common/cap-std-sync/src/sched/windows.rs b/crates/wasi-common/cap-std-sync/src/sched/windows.rs index 8d11df4663..a98e558a7b 100644 --- a/crates/wasi-common/cap-std-sync/src/sched/windows.rs +++ b/crates/wasi-common/cap-std-sync/src/sched/windows.rs @@ -23,27 +23,47 @@ impl SyncSched { #[wiggle::async_trait] impl WasiSched for SyncSched { - async fn poll_oneoff<'a>(&self, poll: &'_ Poll<'a>) -> Result<(), Error> { + async fn poll_oneoff<'a>(&self, poll: &mut Poll<'a>) -> Result<(), Error> { if poll.is_empty() { return Ok(()); } let mut ready = false; - let timeout = poll.earliest_clock_deadline(); + let waitmode = if let Some(t) = poll.earliest_clock_deadline() { + if let Some(duration) = t.duration_until() { + WaitMode::Timeout(duration) + } else { + WaitMode::Immediate + } + } else { + if ready { + WaitMode::Immediate + } else { + WaitMode::Infinite + } + }; let mut stdin_read_subs = Vec::new(); - let mut immediate_subs = Vec::new(); + let mut immediate_reads = Vec::new(); + let mut immediate_writes = Vec::new(); for s in poll.rw_subscriptions() { match s { - Subscription::Read(r) if r.file.as_any().is::() => { - stdin_read_subs.push(r); - } - Subscription::Read(rw) | Subscription::Write(rw) => { - if wasi_file_raw_handle(rw.file.deref()).is_some() { - immediate_subs.push(s); + Subscription::Read(r) => { + if r.file.as_any().is::() { + stdin_read_subs.push(r); + } else if wasi_file_raw_handle(r.file.deref()).is_some() { + immediate_reads.push(r); } else { return Err(Error::invalid_argument() - .context("read/write subscription fd downcast failed")); + .context("read subscription fd downcast failed")); + } + } + Subscription::Write(w) => { + if wasi_file_raw_handle(w.file.deref()).is_some() { + immediate_writes.push(w); + } else { + return Err(Error::invalid_argument() + .context("write subscription fd downcast failed")); } } Subscription::MonotonicClock { .. } => unreachable!(), @@ -51,19 +71,6 @@ impl WasiSched for SyncSched { } if !stdin_read_subs.is_empty() { - let waitmode = if let Some(t) = timeout { - if let Some(duration) = t.duration_until() { - WaitMode::Timeout(duration) - } else { - WaitMode::Immediate - } - } else { - if ready { - WaitMode::Immediate - } else { - WaitMode::Infinite - } - }; let state = STDIN_POLL .lock() .map_err(|_| Error::trap("failed to take lock of STDIN_POLL"))? @@ -89,37 +96,27 @@ impl WasiSched for SyncSched { } } } - for sub in immediate_subs { - match sub { - Subscription::Read(r) => { - // XXX This doesnt strictly preserve the behavior in the earlier - // implementation, which would always do complete(0) for reads from - // stdout/err. - match r.file.num_ready_bytes().await { - Ok(ready_bytes) => { - r.complete(ready_bytes, RwEventFlags::empty()); - ready = true; - } - Err(e) => { - r.error(e); - ready = true; - } - } - } - Subscription::Write(w) => { - // Everything is always ready for writing, apparently? - w.complete(0, RwEventFlags::empty()); + for r in immediate_reads { + match r.file.num_ready_bytes().await { + Ok(ready_bytes) => { + r.complete(ready_bytes, RwEventFlags::empty()); + ready = true; + } + Err(e) => { + r.error(e); ready = true; } - Subscription::MonotonicClock { .. } => unreachable!(), } } + for w in immediate_writes { + // Everything is always ready for writing, apparently? + w.complete(0, RwEventFlags::empty()); + ready = true; + } if !ready { - if let Some(t) = timeout { - if let Some(duration) = t.duration_until() { - thread::sleep(duration); - } + if let WaitMode::Timeout(duration) = waitmode { + thread::sleep(duration); } } @@ -173,6 +170,7 @@ enum PollState { Error(std::io::Error), } +#[derive(Copy, Clone)] enum WaitMode { Timeout(Duration), Infinite,