restructure Poll to hold a Table and fd instead of a RefMut<dyn WasiFile>
unfortunately, the borrow checker defeated me: changing the RwSubscription file form a Ref to a RefMut turned into borrow checker errors in the impl of the poll_oneoff trait method. This implementation makes an end run by having Poll hold onto the table and fd, and borrow the file at the site of use, rather than try to own the RefMut. I have no idea why this convinces the borrow checker that anything is different, but it does and I need to get this PR done and I don't think comprimising on this internal abstraction is worth fighting against
This commit is contained in:
@@ -32,14 +32,14 @@ impl WasiSched for SyncSched {
|
|||||||
for s in poll.rw_subscriptions() {
|
for s in poll.rw_subscriptions() {
|
||||||
match s {
|
match s {
|
||||||
Subscription::Read(f) => {
|
Subscription::Read(f) => {
|
||||||
let raw_fd = wasi_file_raw_fd(f.file.deref()).ok_or(
|
let raw_fd = wasi_file_raw_fd(f.file()?.deref()).ok_or(
|
||||||
Error::invalid_argument().context("read subscription fd downcast failed"),
|
Error::invalid_argument().context("read subscription fd downcast failed"),
|
||||||
)?;
|
)?;
|
||||||
pollfds.push(unsafe { PollFd::new(raw_fd, PollFlags::POLLIN) });
|
pollfds.push(unsafe { PollFd::new(raw_fd, PollFlags::POLLIN) });
|
||||||
}
|
}
|
||||||
|
|
||||||
Subscription::Write(f) => {
|
Subscription::Write(f) => {
|
||||||
let raw_fd = wasi_file_raw_fd(f.file.deref()).ok_or(
|
let raw_fd = wasi_file_raw_fd(f.file()?.deref()).ok_or(
|
||||||
Error::invalid_argument().context("write subscription fd downcast failed"),
|
Error::invalid_argument().context("write subscription fd downcast failed"),
|
||||||
)?;
|
)?;
|
||||||
pollfds.push(unsafe { PollFd::new(raw_fd, PollFlags::POLLOUT) });
|
pollfds.push(unsafe { PollFd::new(raw_fd, PollFlags::POLLOUT) });
|
||||||
@@ -79,7 +79,11 @@ impl WasiSched for SyncSched {
|
|||||||
if let Some(revents) = pollfd.revents() {
|
if let Some(revents) = pollfd.revents() {
|
||||||
let (nbytes, rwsub) = match rwsub {
|
let (nbytes, rwsub) = match rwsub {
|
||||||
Subscription::Read(sub) => {
|
Subscription::Read(sub) => {
|
||||||
let ready = sub.file.num_ready_bytes().await?;
|
let ready = sub
|
||||||
|
.file()
|
||||||
|
.expect("validated file already")
|
||||||
|
.num_ready_bytes()
|
||||||
|
.await?;
|
||||||
(std::cmp::max(ready, 1), sub)
|
(std::cmp::max(ready, 1), sub)
|
||||||
}
|
}
|
||||||
Subscription::Write(sub) => (0, sub),
|
Subscription::Write(sub) => (0, sub),
|
||||||
|
|||||||
@@ -1,8 +1,8 @@
|
|||||||
use crate::clocks::WasiMonotonicClock;
|
use crate::clocks::WasiMonotonicClock;
|
||||||
use crate::file::WasiFile;
|
use crate::table::Table;
|
||||||
use crate::Error;
|
use crate::{Error, ErrorExt};
|
||||||
use cap_std::time::Instant;
|
use cap_std::time::Instant;
|
||||||
use std::cell::RefMut;
|
use std::collections::HashSet;
|
||||||
pub mod subscription;
|
pub mod subscription;
|
||||||
pub use cap_std::time::Duration;
|
pub use cap_std::time::Duration;
|
||||||
|
|
||||||
@@ -29,12 +29,18 @@ impl From<Userdata> for u64 {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub struct Poll<'a> {
|
pub struct Poll<'a> {
|
||||||
|
table: &'a Table,
|
||||||
|
fds: HashSet<u32>,
|
||||||
subs: Vec<(Subscription<'a>, Userdata)>,
|
subs: Vec<(Subscription<'a>, Userdata)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> Poll<'a> {
|
impl<'a> Poll<'a> {
|
||||||
pub fn new() -> Self {
|
pub fn new(table: &'a Table) -> Self {
|
||||||
Self { subs: Vec::new() }
|
Self {
|
||||||
|
table,
|
||||||
|
fds: HashSet::new(),
|
||||||
|
subs: Vec::new(),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
pub fn subscribe_monotonic_clock(
|
pub fn subscribe_monotonic_clock(
|
||||||
&mut self,
|
&mut self,
|
||||||
@@ -52,13 +58,31 @@ impl<'a> Poll<'a> {
|
|||||||
ud,
|
ud,
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
pub fn subscribe_read(&mut self, file: RefMut<'a, dyn WasiFile>, ud: Userdata) {
|
pub fn subscribe_read(&mut self, fd: u32, ud: Userdata) -> Result<(), Error> {
|
||||||
self.subs
|
if self.fds.contains(&fd) {
|
||||||
.push((Subscription::Read(RwSubscription::new(file)), ud));
|
return Err(
|
||||||
|
Error::invalid_argument().context("Fd can be subscribed to at most once per poll")
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
self.fds.insert(fd);
|
||||||
}
|
}
|
||||||
pub fn subscribe_write(&mut self, file: RefMut<'a, dyn WasiFile>, ud: Userdata) {
|
|
||||||
self.subs
|
self.subs
|
||||||
.push((Subscription::Write(RwSubscription::new(file)), ud));
|
.push((Subscription::Read(RwSubscription::new(self.table, fd)?), ud));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
pub fn subscribe_write(&mut self, fd: u32, ud: Userdata) -> Result<(), Error> {
|
||||||
|
if self.fds.contains(&fd) {
|
||||||
|
return Err(
|
||||||
|
Error::invalid_argument().context("Fd can be subscribed to at most once per poll")
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
self.fds.insert(fd);
|
||||||
|
}
|
||||||
|
self.subs.push((
|
||||||
|
Subscription::Write(RwSubscription::new(self.table, fd)?),
|
||||||
|
ud,
|
||||||
|
));
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
pub fn results(self) -> Vec<(SubscriptionResult, Userdata)> {
|
pub fn results(self) -> Vec<(SubscriptionResult, Userdata)> {
|
||||||
self.subs
|
self.subs
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
use crate::clocks::WasiMonotonicClock;
|
use crate::clocks::WasiMonotonicClock;
|
||||||
use crate::file::WasiFile;
|
use crate::file::{FileCaps, FileEntryMutExt, TableFileExt, WasiFile};
|
||||||
|
use crate::table::Table;
|
||||||
use crate::Error;
|
use crate::Error;
|
||||||
use bitflags::bitflags;
|
use bitflags::bitflags;
|
||||||
use cap_std::time::{Duration, Instant};
|
use cap_std::time::{Duration, Instant};
|
||||||
@@ -12,16 +13,29 @@ bitflags! {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub struct RwSubscription<'a> {
|
pub struct RwSubscription<'a> {
|
||||||
pub file: RefMut<'a, dyn WasiFile>,
|
table: &'a Table,
|
||||||
|
fd: u32,
|
||||||
status: Cell<Option<Result<(u64, RwEventFlags), Error>>>,
|
status: Cell<Option<Result<(u64, RwEventFlags), Error>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> RwSubscription<'a> {
|
impl<'a> RwSubscription<'a> {
|
||||||
pub fn new(file: RefMut<'a, dyn WasiFile>) -> Self {
|
/// Create an RwSubscription. This constructor checks to make sure the file we need exists, and
|
||||||
Self {
|
/// has the correct rights. But, we can't hold onto the WasiFile RefMut inside this structure
|
||||||
file,
|
/// (Pat can't convince borrow checker, either not clever enough or a rustc bug), so we need to
|
||||||
|
/// re-borrow at use time.
|
||||||
|
pub fn new(table: &'a Table, fd: u32) -> Result<Self, Error> {
|
||||||
|
let _ = table.get_file_mut(fd)?.get_cap(FileCaps::POLL_READWRITE)?;
|
||||||
|
Ok(Self {
|
||||||
|
table,
|
||||||
|
fd,
|
||||||
status: Cell::new(None),
|
status: Cell::new(None),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
/// This accessor could fail if there is an outstanding borrow of the file.
|
||||||
|
pub fn file(&self) -> Result<RefMut<'a, dyn WasiFile>, Error> {
|
||||||
|
self.table
|
||||||
|
.get_file_mut(self.fd)?
|
||||||
|
.get_cap(FileCaps::POLL_READWRITE)
|
||||||
}
|
}
|
||||||
pub fn complete(&self, size: u64, flags: RwEventFlags) {
|
pub fn complete(&self, size: u64, flags: RwEventFlags) {
|
||||||
self.status.set(Some(Ok((size, flags))))
|
self.status.set(Some(Ok((size, flags))))
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
use crate::file::{FileCaps, FileEntryExt, FileEntryMutExt, TableFileExt};
|
use crate::file::{FileCaps, FileEntryExt, TableFileExt};
|
||||||
use crate::sched::{
|
use crate::sched::{
|
||||||
subscription::{RwEventFlags, SubscriptionResult},
|
subscription::{RwEventFlags, SubscriptionResult},
|
||||||
Poll,
|
Poll,
|
||||||
@@ -7,7 +7,6 @@ use crate::snapshots::preview_1::types as snapshot1_types;
|
|||||||
use crate::snapshots::preview_1::wasi_snapshot_preview1::WasiSnapshotPreview1 as Snapshot1;
|
use crate::snapshots::preview_1::wasi_snapshot_preview1::WasiSnapshotPreview1 as Snapshot1;
|
||||||
use crate::{Error, ErrorExt, WasiCtx};
|
use crate::{Error, ErrorExt, WasiCtx};
|
||||||
use cap_std::time::Duration;
|
use cap_std::time::Duration;
|
||||||
use std::collections::HashSet;
|
|
||||||
use std::convert::{TryFrom, TryInto};
|
use std::convert::{TryFrom, TryInto};
|
||||||
use std::io::{IoSlice, IoSliceMut};
|
use std::io::{IoSlice, IoSliceMut};
|
||||||
use std::ops::Deref;
|
use std::ops::Deref;
|
||||||
@@ -779,8 +778,7 @@ impl wasi_unstable::WasiUnstable for WasiCtx {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let table = self.table();
|
let table = self.table();
|
||||||
let mut subscribed_fds = HashSet::new();
|
let mut poll = Poll::new(&table);
|
||||||
let mut poll = Poll::new();
|
|
||||||
|
|
||||||
let subs = subs.as_array(nsubscriptions);
|
let subs = subs.as_array(nsubscriptions);
|
||||||
for sub_elem in subs.iter() {
|
for sub_elem in subs.iter() {
|
||||||
@@ -818,29 +816,11 @@ impl wasi_unstable::WasiUnstable for WasiCtx {
|
|||||||
},
|
},
|
||||||
types::SubscriptionU::FdRead(readsub) => {
|
types::SubscriptionU::FdRead(readsub) => {
|
||||||
let fd = readsub.file_descriptor;
|
let fd = readsub.file_descriptor;
|
||||||
if subscribed_fds.contains(&fd) {
|
poll.subscribe_read(u32::from(fd), sub.userdata.into())?;
|
||||||
Err(Error::invalid_argument()
|
|
||||||
.context("Fd can be subscribed to at most once per poll_oneoff"))?;
|
|
||||||
} else {
|
|
||||||
subscribed_fds.insert(fd);
|
|
||||||
}
|
|
||||||
let file = table
|
|
||||||
.get_file_mut(u32::from(fd))?
|
|
||||||
.get_cap(FileCaps::POLL_READWRITE)?;
|
|
||||||
poll.subscribe_read(file, sub.userdata.into());
|
|
||||||
}
|
}
|
||||||
types::SubscriptionU::FdWrite(writesub) => {
|
types::SubscriptionU::FdWrite(writesub) => {
|
||||||
let fd = writesub.file_descriptor;
|
let fd = writesub.file_descriptor;
|
||||||
if subscribed_fds.contains(&fd) {
|
poll.subscribe_write(u32::from(fd), sub.userdata.into())?;
|
||||||
Err(Error::invalid_argument()
|
|
||||||
.context("Fd can be subscribed to at most once per poll_oneoff"))?;
|
|
||||||
} else {
|
|
||||||
subscribed_fds.insert(fd);
|
|
||||||
}
|
|
||||||
let file = table
|
|
||||||
.get_file_mut(u32::from(fd))?
|
|
||||||
.get_cap(FileCaps::POLL_READWRITE)?;
|
|
||||||
poll.subscribe_write(file, sub.userdata.into());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -13,7 +13,6 @@ use crate::{
|
|||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use cap_std::time::{Duration, SystemClock};
|
use cap_std::time::{Duration, SystemClock};
|
||||||
use std::cell::{Ref, RefMut};
|
use std::cell::{Ref, RefMut};
|
||||||
use std::collections::HashSet;
|
|
||||||
use std::convert::{TryFrom, TryInto};
|
use std::convert::{TryFrom, TryInto};
|
||||||
use std::io::{IoSlice, IoSliceMut};
|
use std::io::{IoSlice, IoSliceMut};
|
||||||
use std::ops::{Deref, DerefMut};
|
use std::ops::{Deref, DerefMut};
|
||||||
@@ -971,8 +970,7 @@ impl wasi_snapshot_preview1::WasiSnapshotPreview1 for WasiCtx {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let table = self.table();
|
let table = self.table();
|
||||||
let mut subscribed_fds = HashSet::new();
|
let mut poll = Poll::new(&table);
|
||||||
let mut poll = Poll::new();
|
|
||||||
|
|
||||||
let subs = subs.as_array(nsubscriptions);
|
let subs = subs.as_array(nsubscriptions);
|
||||||
for sub_elem in subs.iter() {
|
for sub_elem in subs.iter() {
|
||||||
@@ -1010,29 +1008,11 @@ impl wasi_snapshot_preview1::WasiSnapshotPreview1 for WasiCtx {
|
|||||||
},
|
},
|
||||||
types::SubscriptionU::FdRead(readsub) => {
|
types::SubscriptionU::FdRead(readsub) => {
|
||||||
let fd = readsub.file_descriptor;
|
let fd = readsub.file_descriptor;
|
||||||
if subscribed_fds.contains(&fd) {
|
poll.subscribe_read(u32::from(fd), sub.userdata.into())?;
|
||||||
return Err(Error::invalid_argument()
|
|
||||||
.context("Fd can be subscribed to at most once per poll_oneoff"));
|
|
||||||
} else {
|
|
||||||
subscribed_fds.insert(fd);
|
|
||||||
}
|
|
||||||
let file = table
|
|
||||||
.get_file_mut(u32::from(fd))?
|
|
||||||
.get_cap(FileCaps::POLL_READWRITE)?;
|
|
||||||
poll.subscribe_read(file, sub.userdata.into());
|
|
||||||
}
|
}
|
||||||
types::SubscriptionU::FdWrite(writesub) => {
|
types::SubscriptionU::FdWrite(writesub) => {
|
||||||
let fd = writesub.file_descriptor;
|
let fd = writesub.file_descriptor;
|
||||||
if subscribed_fds.contains(&fd) {
|
poll.subscribe_write(u32::from(fd), sub.userdata.into())?;
|
||||||
return Err(Error::invalid_argument()
|
|
||||||
.context("Fd can be subscribed to at most once per poll_oneoff"));
|
|
||||||
} else {
|
|
||||||
subscribed_fds.insert(fd);
|
|
||||||
}
|
|
||||||
let file = table
|
|
||||||
.get_file_mut(u32::from(fd))?
|
|
||||||
.get_cap(FileCaps::POLL_READWRITE)?;
|
|
||||||
poll.subscribe_write(file, sub.userdata.into());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
use cap_std::time::Duration;
|
use cap_std::time::Duration;
|
||||||
use std::convert::TryInto;
|
use std::convert::TryInto;
|
||||||
use std::future::{Future, Poll as FPoll};
|
use std::future::Future;
|
||||||
use std::ops::Deref;
|
use std::ops::Deref;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::task::Context;
|
use std::task::Context;
|
||||||
@@ -23,15 +23,15 @@ pub async fn poll_oneoff<'a>(poll: &'_ Poll<'a>) -> Result<(), Error> {
|
|||||||
match s {
|
match s {
|
||||||
Subscription::Read(f) => {
|
Subscription::Read(f) => {
|
||||||
futures.push(Box::pin(async move {
|
futures.push(Box::pin(async move {
|
||||||
f.file.readable().await?;
|
f.file()?.readable().await?;
|
||||||
f.complete(f.file.num_ready_bytes().await?, RwEventFlags::empty());
|
f.complete(f.file()?.num_ready_bytes().await?, RwEventFlags::empty());
|
||||||
Ok(())
|
Ok(())
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
Subscription::Write(f) => {
|
Subscription::Write(f) => {
|
||||||
futures.push(Box::pin(async move {
|
futures.push(Box::pin(async move {
|
||||||
f.file.writable().await?;
|
f.file()?.writable().await?;
|
||||||
f.complete(0, RwEventFlags::empty());
|
f.complete(0, RwEventFlags::empty());
|
||||||
Ok(())
|
Ok(())
|
||||||
}));
|
}));
|
||||||
|
|||||||
Reference in New Issue
Block a user