wasi-tokio kinda sorta working

This commit is contained in:
Pat Hickey
2021-05-01 15:39:09 -07:00
parent 7f34ccb909
commit 62c4f0d1f7
9 changed files with 474 additions and 76 deletions

View File

@@ -1,48 +1,87 @@
use cap_std::time::Duration;
use std::convert::TryInto;
use std::future::Future;
use std::ops::Deref;
use std::pin::Pin;
use std::task::Context;
use std::task::{Context, Poll as FPoll};
use wasi_common::{
file::WasiFile,
sched::{
subscription::{RwEventFlags, Subscription},
Poll,
},
Error, ErrorExt,
Context as _, Error,
};
pub async fn poll_oneoff<'a>(poll: &'_ Poll<'a>) -> Result<(), Error> {
struct FirstReady<'a, T>(Vec<Pin<Box<dyn Future<Output = T> + 'a>>>);
impl<'a, T> FirstReady<'a, T> {
fn new() -> Self {
FirstReady(Vec::new())
}
fn push(&mut self, f: impl Future<Output = T> + 'a) {
self.0.push(Box::pin(f));
}
}
impl<'a, T> Future for FirstReady<'a, T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> FPoll<T> {
let mut result = FPoll::Pending;
for f in self.as_mut().0.iter_mut() {
match f.as_mut().poll(cx) {
FPoll::Ready(r) => match result {
// First ready gets to set the result. But, continue the loop so all futures
// get the opportunity to become ready.
FPoll::Pending => {
result = FPoll::Ready(r);
}
_ => {}
},
_ => continue,
}
}
return result;
}
}
pub async fn poll_oneoff<'a>(poll: &mut Poll<'a>) -> Result<(), Error> {
if poll.is_empty() {
return Ok(());
}
let mut futures: Vec<Pin<Box<dyn Future<Output = Result<(), Error>>>>> = Vec::new();
let timeout = poll.earliest_clock_deadline();
let duration = poll
.earliest_clock_deadline()
.map(|sub| sub.duration_until());
let mut futures = FirstReady::new();
for s in poll.rw_subscriptions() {
match s {
Subscription::Read(f) => {
futures.push(Box::pin(async move {
f.file()?.readable().await?;
f.complete(f.file()?.num_ready_bytes().await?, RwEventFlags::empty());
Ok(())
}));
futures.push(async move {
f.file.readable().await.context("readable future")?;
f.complete(
f.file
.num_ready_bytes()
.await
.context("read num_ready_bytes")?,
RwEventFlags::empty(),
);
Ok::<(), Error>(())
});
}
Subscription::Write(f) => {
futures.push(Box::pin(async move {
f.file()?.writable().await?;
futures.push(async move {
f.file.writable().await.context("writable future")?;
f.complete(0, RwEventFlags::empty());
Ok(())
}));
});
}
Subscription::MonotonicClock { .. } => unreachable!(),
}
}
// Incorrect, but lets get the type errors fixed before we write the right multiplexer here:
for f in futures {
f.await?;
if let Some(Some(remaining_duration)) = duration {
tokio::time::timeout(remaining_duration, futures).await??;
} else {
futures.await?;
}
Ok(())
}