rewrite wasi-tokio as just an task::block_in_place wrapper on cap-std-sync

This commit is contained in:
Pat Hickey
2021-05-04 11:29:02 -07:00
parent 686d8c22f9
commit f76fe8b764
13 changed files with 319 additions and 870 deletions

View File

@@ -78,7 +78,10 @@ pub async fn poll_oneoff<'a>(poll: &mut Poll<'a>) -> Result<(), Error> {
}
}
if let Some(Some(remaining_duration)) = duration {
tokio::time::timeout(remaining_duration, futures).await??;
match tokio::time::timeout(remaining_duration, futures).await {
Ok(r) => r?,
Err(_deadline_elapsed) => {}
}
} else {
futures.await?;
}

View File

@@ -9,31 +9,54 @@ use wasi_common::{
file::WasiFile,
sched::{
subscription::{RwEventFlags, Subscription},
Poll,
Poll, WasiSched,
},
Error, ErrorExt,
};
pub async fn poll_oneoff<'a>(poll: &'_ Poll<'a>) -> Result<(), Error> {
pub async fn poll_oneoff<'a>(poll: &mut Poll<'a>) -> Result<(), Error> {
if poll.is_empty() {
return Ok(());
}
let mut ready = false;
let timeout = poll.earliest_clock_deadline();
let waitmode = if let Some(t) = poll.earliest_clock_deadline() {
if let Some(duration) = t.duration_until() {
WaitMode::Timeout(duration)
} else {
WaitMode::Immediate
}
} else {
if ready {
WaitMode::Immediate
} else {
WaitMode::Infinite
}
};
let mut stdin_read_subs = Vec::new();
let mut immediate_subs = Vec::new();
let mut immediate_reads = Vec::new();
let mut immediate_writes = Vec::new();
for s in poll.rw_subscriptions() {
match s {
Subscription::Read(r) if r.file.as_any().is::<crate::stdio::Stdin>() => {
stdin_read_subs.push(r);
}
Subscription::Read(rw) | Subscription::Write(rw) => {
if wasi_file_raw_handle(rw.file.deref()).is_some() {
immediate_subs.push(s);
Subscription::Read(r) => {
if r.file.as_any().is::<crate::stdio::Stdin>() {
stdin_read_subs.push(r);
} else if wasi_file_raw_handle(r.file.deref()).is_some() {
immediate_reads.push(r);
} else {
return Err(Error::invalid_argument()
.context("read/write subscription fd downcast failed"));
return Err(
Error::invalid_argument().context("read subscription fd downcast failed")
);
}
}
Subscription::Write(w) => {
if wasi_file_raw_handle(w.file.deref()).is_some() {
immediate_writes.push(w);
} else {
return Err(
Error::invalid_argument().context("write subscription fd downcast failed")
);
}
}
Subscription::MonotonicClock { .. } => unreachable!(),
@@ -41,19 +64,6 @@ pub async fn poll_oneoff<'a>(poll: &'_ Poll<'a>) -> Result<(), Error> {
}
if !stdin_read_subs.is_empty() {
let waitmode = if let Some(t) = timeout {
if let Some(duration) = t.duration_until() {
WaitMode::Timeout(duration)
} else {
WaitMode::Immediate
}
} else {
if ready {
WaitMode::Immediate
} else {
WaitMode::Infinite
}
};
let state = STDIN_POLL
.lock()
.map_err(|_| Error::trap("failed to take lock of STDIN_POLL"))?
@@ -79,37 +89,27 @@ pub async fn poll_oneoff<'a>(poll: &'_ Poll<'a>) -> Result<(), Error> {
}
}
}
for sub in immediate_subs {
match sub {
Subscription::Read(r) => {
// XXX This doesnt strictly preserve the behavior in the earlier
// implementation, which would always do complete(0) for reads from
// stdout/err.
match r.file.num_ready_bytes().await {
Ok(ready_bytes) => {
r.complete(ready_bytes, RwEventFlags::empty());
ready = true;
}
Err(e) => {
r.error(e);
ready = true;
}
}
}
Subscription::Write(w) => {
// Everything is always ready for writing, apparently?
w.complete(0, RwEventFlags::empty());
for r in immediate_reads {
match r.file.num_ready_bytes().await {
Ok(ready_bytes) => {
r.complete(ready_bytes, RwEventFlags::empty());
ready = true;
}
Err(e) => {
r.error(e);
ready = true;
}
Subscription::MonotonicClock { .. } => unreachable!(),
}
}
for w in immediate_writes {
// Everything is always ready for writing, apparently?
w.complete(0, RwEventFlags::empty());
ready = true;
}
if !ready {
if let Some(t) = timeout {
if let Some(duration) = t.duration_until() {
thread::sleep(duration);
}
if let WaitMode::Timeout(duration) = waitmode {
thread::sleep(duration);
}
}
@@ -154,6 +154,7 @@ enum PollState {
Error(std::io::Error),
}
#[derive(Copy, Clone)]
enum WaitMode {
Timeout(Duration),
Infinite,