From b79bdcee84eee9896383c28cfc54ad084e664caf Mon Sep 17 00:00:00 2001 From: Pat Hickey Date: Tue, 12 Jan 2021 15:55:10 -0800 Subject: [PATCH] port subscriptions in from old branch --- crates/wasi-c2/src/sched.rs | 58 +++++++++++++- crates/wasi-c2/src/sched/subscription.rs | 92 +++++++++++++++++++++++ crates/wasi-c2/src/snapshots/preview_1.rs | 2 +- 3 files changed, 148 insertions(+), 4 deletions(-) create mode 100644 crates/wasi-c2/src/sched/subscription.rs diff --git a/crates/wasi-c2/src/sched.rs b/crates/wasi-c2/src/sched.rs index 635a96cb6d..9c5c82806e 100644 --- a/crates/wasi-c2/src/sched.rs +++ b/crates/wasi-c2/src/sched.rs @@ -1,8 +1,15 @@ +use crate::file::WasiFile; use crate::Error; +use cap_std::time::{SystemClock, SystemTime}; +use std::cell::Ref; +pub mod subscription; + +use subscription::{ + RwSubscription, Subscription, SubscriptionResult, SubscriptionSet, TimerSubscription, +}; pub trait WasiSched { - // XXX poll oneoff needs args and results. - fn poll_oneoff(&self) -> Result<(), Error>; + fn poll_oneoff(&self, subs: SubscriptionSet) -> Result<(), Error>; fn sched_yield(&self) -> Result<(), Error>; } @@ -10,7 +17,7 @@ pub trait WasiSched { pub struct SyncSched {} impl WasiSched for SyncSched { - fn poll_oneoff(&self) -> Result<(), Error> { + fn poll_oneoff(&self, subs: SubscriptionSet) -> Result<(), Error> { todo!() } fn sched_yield(&self) -> Result<(), Error> { @@ -18,3 +25,48 @@ impl WasiSched for SyncSched { Ok(()) } } + +pub struct Userdata(u64); +impl From for Userdata { + fn from(u: u64) -> Userdata { + Userdata(u) + } +} +impl From for u64 { + fn from(u: Userdata) -> u64 { + u.0 + } +} + +pub struct Poll<'a> { + subs: Vec<(Subscription<'a>, Userdata)>, +} + +impl<'a> Poll<'a> { + pub fn new() -> Self { + Self { subs: Vec::new() } + } + pub fn subscribe_timer(&mut self, deadline: SystemTime, ud: Userdata) { + self.subs + .push((Subscription::Timer(TimerSubscription { deadline }), ud)); + } + pub fn subscribe_read(&mut self, file: Ref<'a, dyn WasiFile>, ud: Userdata) { + self.subs + .push((Subscription::Read(RwSubscription::new(file)), ud)); + } + pub fn subscribe_write(&mut self, file: Ref<'a, dyn WasiFile>, ud: Userdata) { + self.subs + .push((Subscription::Read(RwSubscription::new(file)), ud)); + } + pub fn results(self, clock: &SystemClock) -> Vec<(SubscriptionResult, Userdata)> { + self.subs + .into_iter() + .filter_map(|(s, ud)| SubscriptionResult::from_subscription(s, clock).map(|r| (r, ud))) + .collect() + } + pub(crate) fn subscriptions(&'a mut self) -> SubscriptionSet<'a> { + SubscriptionSet { + subs: self.subs.iter().map(|(s, _ud)| s).collect(), + } + } +} diff --git a/crates/wasi-c2/src/sched/subscription.rs b/crates/wasi-c2/src/sched/subscription.rs new file mode 100644 index 0000000000..98172fd02e --- /dev/null +++ b/crates/wasi-c2/src/sched/subscription.rs @@ -0,0 +1,92 @@ +use crate::file::WasiFile; +use crate::Error; +use bitflags::bitflags; +use cap_std::time::{SystemClock, SystemTime}; +use std::cell::{Cell, Ref}; + +bitflags! { + pub struct RwEventFlags: u32 { + const HANGUP = 0b1; + } +} + +pub struct RwSubscription<'a> { + file: Ref<'a, dyn WasiFile>, + status: Cell>>, +} + +impl<'a> RwSubscription<'a> { + pub fn new(file: Ref<'a, dyn WasiFile>) -> Self { + Self { + file, + status: Cell::new(None), + } + } + pub fn complete(&self, size: u64, flags: RwEventFlags) { + self.status.set(Some(Ok((size, flags)))) + } + pub fn error(&self, error: Error) { + self.status.set(Some(Err(error))) + } + pub fn result(self) -> Option> { + self.status.into_inner() + } +} + +pub struct TimerSubscription { + pub deadline: SystemTime, +} + +impl TimerSubscription { + pub fn result(&self, clock: &SystemClock) -> Option> { + if self.deadline.duration_since(clock.now()).is_ok() { + Some(Ok(())) + } else { + None + } + } +} + +pub enum Subscription<'a> { + Read(RwSubscription<'a>), + Write(RwSubscription<'a>), + Timer(TimerSubscription), +} + +pub struct SubscriptionSet<'a> { + pub subs: Vec<&'a Subscription<'a>>, +} + +impl<'a> SubscriptionSet<'a> { + pub fn earliest_deadline(&self) -> Option { + self.subs + .iter() + .filter_map(|s| match s { + Subscription::Timer(ts) => Some(ts.deadline), + _ => None, + }) + .fold(None, |early, ts| { + if let Some(early) = early { + Some(early.min(ts)) + } else { + Some(ts) + } + }) + } +} + +pub enum SubscriptionResult { + Read(Result<(u64, RwEventFlags), Error>), + Write(Result<(u64, RwEventFlags), Error>), + Timer(Result<(), Error>), +} + +impl SubscriptionResult { + pub fn from_subscription(s: Subscription, clock: &SystemClock) -> Option { + match s { + Subscription::Read(s) => s.result().map(SubscriptionResult::Read), + Subscription::Write(s) => s.result().map(SubscriptionResult::Write), + Subscription::Timer(s) => s.result(clock).map(SubscriptionResult::Timer), + } + } +} diff --git a/crates/wasi-c2/src/snapshots/preview_1.rs b/crates/wasi-c2/src/snapshots/preview_1.rs index 037521b162..b05a1ffd81 100644 --- a/crates/wasi-c2/src/snapshots/preview_1.rs +++ b/crates/wasi-c2/src/snapshots/preview_1.rs @@ -886,7 +886,7 @@ impl<'a> wasi_snapshot_preview1::WasiSnapshotPreview1 for WasiCtx { events: &GuestPtr, nsubscriptions: types::Size, ) -> Result { - self.sched.poll_oneoff()?; + self.sched.poll_oneoff(todo!())?; Ok(0) }