fix windows sync scheduler

This commit is contained in:
Pat Hickey
2021-05-03 12:23:14 -07:00
parent 3df9cddf10
commit 5ab8346a05

View File

@@ -23,27 +23,47 @@ impl SyncSched {
#[wiggle::async_trait] #[wiggle::async_trait]
impl WasiSched for SyncSched { impl WasiSched for SyncSched {
async fn poll_oneoff<'a>(&self, poll: &'_ Poll<'a>) -> Result<(), Error> { async fn poll_oneoff<'a>(&self, poll: &mut Poll<'a>) -> Result<(), Error> {
if poll.is_empty() { if poll.is_empty() {
return Ok(()); return Ok(());
} }
let mut ready = false; let mut ready = false;
let timeout = poll.earliest_clock_deadline(); let waitmode = if let Some(t) = poll.earliest_clock_deadline() {
if let Some(duration) = t.duration_until() {
WaitMode::Timeout(duration)
} else {
WaitMode::Immediate
}
} else {
if ready {
WaitMode::Immediate
} else {
WaitMode::Infinite
}
};
let mut stdin_read_subs = Vec::new(); let mut stdin_read_subs = Vec::new();
let mut immediate_subs = Vec::new(); let mut immediate_reads = Vec::new();
let mut immediate_writes = Vec::new();
for s in poll.rw_subscriptions() { for s in poll.rw_subscriptions() {
match s { match s {
Subscription::Read(r) if r.file.as_any().is::<crate::stdio::Stdin>() => { Subscription::Read(r) => {
stdin_read_subs.push(r); if r.file.as_any().is::<crate::stdio::Stdin>() {
} stdin_read_subs.push(r);
Subscription::Read(rw) | Subscription::Write(rw) => { } else if wasi_file_raw_handle(r.file.deref()).is_some() {
if wasi_file_raw_handle(rw.file.deref()).is_some() { immediate_reads.push(r);
immediate_subs.push(s);
} else { } else {
return Err(Error::invalid_argument() return Err(Error::invalid_argument()
.context("read/write subscription fd downcast failed")); .context("read subscription fd downcast failed"));
}
}
Subscription::Write(w) => {
if wasi_file_raw_handle(w.file.deref()).is_some() {
immediate_writes.push(w);
} else {
return Err(Error::invalid_argument()
.context("write subscription fd downcast failed"));
} }
} }
Subscription::MonotonicClock { .. } => unreachable!(), Subscription::MonotonicClock { .. } => unreachable!(),
@@ -51,19 +71,6 @@ impl WasiSched for SyncSched {
} }
if !stdin_read_subs.is_empty() { if !stdin_read_subs.is_empty() {
let waitmode = if let Some(t) = timeout {
if let Some(duration) = t.duration_until() {
WaitMode::Timeout(duration)
} else {
WaitMode::Immediate
}
} else {
if ready {
WaitMode::Immediate
} else {
WaitMode::Infinite
}
};
let state = STDIN_POLL let state = STDIN_POLL
.lock() .lock()
.map_err(|_| Error::trap("failed to take lock of STDIN_POLL"))? .map_err(|_| Error::trap("failed to take lock of STDIN_POLL"))?
@@ -89,37 +96,27 @@ impl WasiSched for SyncSched {
} }
} }
} }
for sub in immediate_subs { for r in immediate_reads {
match sub { match r.file.num_ready_bytes().await {
Subscription::Read(r) => { Ok(ready_bytes) => {
// XXX This doesnt strictly preserve the behavior in the earlier r.complete(ready_bytes, RwEventFlags::empty());
// implementation, which would always do complete(0) for reads from ready = true;
// stdout/err. }
match r.file.num_ready_bytes().await { Err(e) => {
Ok(ready_bytes) => { r.error(e);
r.complete(ready_bytes, RwEventFlags::empty());
ready = true;
}
Err(e) => {
r.error(e);
ready = true;
}
}
}
Subscription::Write(w) => {
// Everything is always ready for writing, apparently?
w.complete(0, RwEventFlags::empty());
ready = true; ready = true;
} }
Subscription::MonotonicClock { .. } => unreachable!(),
} }
} }
for w in immediate_writes {
// Everything is always ready for writing, apparently?
w.complete(0, RwEventFlags::empty());
ready = true;
}
if !ready { if !ready {
if let Some(t) = timeout { if let WaitMode::Timeout(duration) = waitmode {
if let Some(duration) = t.duration_until() { thread::sleep(duration);
thread::sleep(duration);
}
} }
} }
@@ -173,6 +170,7 @@ enum PollState {
Error(std::io::Error), Error(std::io::Error),
} }
#[derive(Copy, Clone)]
enum WaitMode { enum WaitMode {
Timeout(Duration), Timeout(Duration),
Infinite, Infinite,