Some improvements to the wasi-http client implementation of write. (#6161)
* Improve write implementation for streams * Add trailers implementation for responses. * Improve tests. * Update tests.
This commit is contained in:
4
Cargo.lock
generated
4
Cargo.lock
generated
@@ -3045,6 +3045,10 @@ dependencies = [
|
|||||||
"cap-std",
|
"cap-std",
|
||||||
"cargo_metadata",
|
"cargo_metadata",
|
||||||
"cfg-if",
|
"cfg-if",
|
||||||
|
"http",
|
||||||
|
"http-body",
|
||||||
|
"http-body-util",
|
||||||
|
"hyper",
|
||||||
"os_pipe",
|
"os_pipe",
|
||||||
"target-lexicon",
|
"target-lexicon",
|
||||||
"tempfile",
|
"tempfile",
|
||||||
|
|||||||
@@ -23,8 +23,12 @@ os_pipe = "0.9"
|
|||||||
anyhow = { workspace = true }
|
anyhow = { workspace = true }
|
||||||
wat = { workspace = true }
|
wat = { workspace = true }
|
||||||
cap-std = { workspace = true }
|
cap-std = { workspace = true }
|
||||||
tokio = { version = "1.8.0", features = ["rt-multi-thread"] }
|
tokio = { version = "1.8.0", features = ["net", "rt-multi-thread"] }
|
||||||
wasmtime-wasi-http = { workspace = true }
|
wasmtime-wasi-http = { workspace = true }
|
||||||
|
hyper = { version = "1.0.0-rc.3", features = ["full"] }
|
||||||
|
http = { version = "0.2.9" }
|
||||||
|
http-body = "1.0.0-rc.2"
|
||||||
|
http-body-util = "0.1.0-rc.2"
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
test_programs = []
|
test_programs = []
|
||||||
|
|||||||
@@ -4,11 +4,59 @@ use wasmtime::{Config, Engine, Linker, Module, Store};
|
|||||||
use wasmtime_wasi::{sync::WasiCtxBuilder, WasiCtx};
|
use wasmtime_wasi::{sync::WasiCtxBuilder, WasiCtx};
|
||||||
use wasmtime_wasi_http::WasiHttp;
|
use wasmtime_wasi_http::WasiHttp;
|
||||||
|
|
||||||
|
use http_body_util::combinators::BoxBody;
|
||||||
|
use http_body_util::BodyExt;
|
||||||
|
use hyper::server::conn::http1;
|
||||||
|
use hyper::{body::Bytes, service::service_fn, Request, Response};
|
||||||
|
use std::{error::Error, net::SocketAddr};
|
||||||
|
use tokio::net::TcpListener;
|
||||||
|
|
||||||
|
async fn test(
|
||||||
|
req: Request<hyper::body::Incoming>,
|
||||||
|
) -> http::Result<Response<BoxBody<Bytes, hyper::Error>>> {
|
||||||
|
let method = req.method().to_string();
|
||||||
|
Response::builder()
|
||||||
|
.status(http::StatusCode::OK)
|
||||||
|
.header("x-wasmtime-test-method", method)
|
||||||
|
.body(req.into_body().boxed())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn async_run_serve() -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||||
|
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
|
||||||
|
|
||||||
|
let listener = TcpListener::bind(addr).await?;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let (stream, _) = listener.accept().await?;
|
||||||
|
|
||||||
|
tokio::task::spawn(async move {
|
||||||
|
if let Err(err) = http1::Builder::new()
|
||||||
|
.serve_connection(stream, service_fn(test))
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
println!("Error serving connection: {:?}", err);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn run_server() -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||||
|
let rt = tokio::runtime::Runtime::new()?;
|
||||||
|
let _ent = rt.enter();
|
||||||
|
|
||||||
|
rt.block_on(async_run_serve())?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
pub fn instantiate_inherit_stdio(
|
pub fn instantiate_inherit_stdio(
|
||||||
data: &[u8],
|
data: &[u8],
|
||||||
bin_name: &str,
|
bin_name: &str,
|
||||||
_workspace: Option<&Path>,
|
_workspace: Option<&Path>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
|
let _thread = std::thread::spawn(|| {
|
||||||
|
run_server().unwrap();
|
||||||
|
});
|
||||||
|
|
||||||
let config = Config::new();
|
let config = Config::new();
|
||||||
let engine = Engine::new(&config)?;
|
let engine = Engine::new(&config)?;
|
||||||
let module = Module::new(&engine, &data).context("failed to create wasm module")?;
|
let module = Module::new(&engine, &data).context("failed to create wasm module")?;
|
||||||
|
|||||||
@@ -21,6 +21,14 @@ impl fmt::Debug for Response {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Response {
|
||||||
|
fn header(&self, name: &str) -> Option<&String> {
|
||||||
|
self.headers.iter().find_map(
|
||||||
|
|(k, v)| if k == name { Some(v) } else { None }
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn request(
|
fn request(
|
||||||
method: types::MethodParam<'_>,
|
method: types::MethodParam<'_>,
|
||||||
scheme: types::SchemeParam<'_>,
|
scheme: types::SchemeParam<'_>,
|
||||||
@@ -47,9 +55,13 @@ fn request(
|
|||||||
body_cursor += written as usize;
|
body_cursor += written as usize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let future_response = default_outgoing_http::handle(request, None);
|
||||||
|
|
||||||
|
// TODO: The current implementation requires this drop after the request is sent.
|
||||||
|
// The ownership semantics are unclear in wasi-http we should clarify exactly what is
|
||||||
|
// supposed to happen here.
|
||||||
streams::drop_output_stream(request_body);
|
streams::drop_output_stream(request_body);
|
||||||
|
|
||||||
let future_response = default_outgoing_http::handle(request, None);
|
|
||||||
// TODO: we could create a pollable from the future_response and poll on it here to test that
|
// TODO: we could create a pollable from the future_response and poll on it here to test that
|
||||||
// its available immediately
|
// its available immediately
|
||||||
|
|
||||||
@@ -71,7 +83,6 @@ fn request(
|
|||||||
|
|
||||||
let body_stream = types::incoming_response_consume(incoming_response)
|
let body_stream = types::incoming_response_consume(incoming_response)
|
||||||
.map_err(|()| anyhow!("incoming response has no body stream"))?;
|
.map_err(|()| anyhow!("incoming response has no body stream"))?;
|
||||||
types::drop_incoming_response(incoming_response);
|
|
||||||
|
|
||||||
let mut body = Vec::new();
|
let mut body = Vec::new();
|
||||||
let mut eof = false;
|
let mut eof = false;
|
||||||
@@ -81,6 +92,7 @@ fn request(
|
|||||||
body.append(&mut body_chunk);
|
body.append(&mut body_chunk);
|
||||||
}
|
}
|
||||||
streams::drop_input_stream(body_stream);
|
streams::drop_input_stream(body_stream);
|
||||||
|
types::drop_incoming_response(incoming_response);
|
||||||
|
|
||||||
Ok(Response {
|
Ok(Response {
|
||||||
status,
|
status,
|
||||||
@@ -89,44 +101,55 @@ fn request(
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn main() -> Result<()> {
|
fn main() -> Result<()> {
|
||||||
|
let missing = "MISSING".to_string();
|
||||||
let r1 = request(
|
let r1 = request(
|
||||||
types::MethodParam::Get,
|
types::MethodParam::Get,
|
||||||
types::SchemeParam::Http,
|
types::SchemeParam::Http,
|
||||||
"postman-echo.com",
|
"localhost:3000",
|
||||||
"/get",
|
"/get",
|
||||||
"?some=arg?goes=here",
|
"?some=arg?goes=here",
|
||||||
&[],
|
&[],
|
||||||
)
|
)
|
||||||
.context("postman-echo /get")?;
|
.context("localhost:3000 /get")?;
|
||||||
|
|
||||||
println!("postman-echo /get: {r1:?}");
|
println!("localhost:3000 /get: {r1:?}");
|
||||||
assert_eq!(r1.status, 200);
|
assert_eq!(r1.status, 200);
|
||||||
|
let method = r1.header("x-wasmtime-test-method").unwrap_or(&missing);
|
||||||
|
assert_eq!(method, "GET");
|
||||||
|
assert_eq!(r1.body, b"");
|
||||||
|
|
||||||
let r2 = request(
|
let r2 = request(
|
||||||
types::MethodParam::Post,
|
types::MethodParam::Post,
|
||||||
types::SchemeParam::Http,
|
types::SchemeParam::Http,
|
||||||
"postman-echo.com",
|
"localhost:3000",
|
||||||
"/post",
|
"/post",
|
||||||
"",
|
"",
|
||||||
b"{\"foo\": \"bar\"}",
|
b"{\"foo\": \"bar\"}",
|
||||||
)
|
)
|
||||||
.context("postman-echo /post")?;
|
.context("localhost:3000 /post")?;
|
||||||
|
|
||||||
println!("postman-echo /post: {r2:?}");
|
println!("localhost:3000 /post: {r2:?}");
|
||||||
assert_eq!(r2.status, 200);
|
assert_eq!(r2.status, 200);
|
||||||
|
let method = r2.header("x-wasmtime-test-method").unwrap_or(&missing);
|
||||||
|
assert_eq!(method, "POST");
|
||||||
|
assert_eq!(r2.body, b"{\"foo\": \"bar\"}");
|
||||||
|
|
||||||
let r3 = request(
|
let r3 = request(
|
||||||
types::MethodParam::Put,
|
types::MethodParam::Put,
|
||||||
types::SchemeParam::Http,
|
types::SchemeParam::Http,
|
||||||
"postman-echo.com",
|
"localhost:3000",
|
||||||
"/put",
|
"/put",
|
||||||
"",
|
"",
|
||||||
&[],
|
&[],
|
||||||
)
|
)
|
||||||
.context("postman-echo /put")?;
|
.context("localhost:3000 /put")?;
|
||||||
|
|
||||||
println!("postman-echo /put: {r3:?}");
|
println!("localhost:3000 /put: {r3:?}");
|
||||||
assert_eq!(r3.status, 200);
|
assert_eq!(r3.status, 200);
|
||||||
|
let method = r3.header("x-wasmtime-test-method").unwrap_or(&missing);
|
||||||
|
assert_eq!(method, "PUT");
|
||||||
|
assert_eq!(r3.body, b"");
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ use bytes::{BufMut, Bytes, BytesMut};
|
|||||||
use http_body_util::{BodyExt, Full};
|
use http_body_util::{BodyExt, Full};
|
||||||
use hyper::Method;
|
use hyper::Method;
|
||||||
use hyper::Request;
|
use hyper::Request;
|
||||||
|
use std::collections::HashMap;
|
||||||
#[cfg(not(any(target_arch = "riscv64", target_arch = "s390x")))]
|
#[cfg(not(any(target_arch = "riscv64", target_arch = "s390x")))]
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
@@ -201,6 +202,23 @@ impl WasiHttp {
|
|||||||
if let Some(chunk) = frame.data_ref() {
|
if let Some(chunk) = frame.data_ref() {
|
||||||
buf.put(chunk.clone());
|
buf.put(chunk.clone());
|
||||||
}
|
}
|
||||||
|
if let Some(trailers) = frame.trailers_ref() {
|
||||||
|
response.trailers = self.fields_id_base;
|
||||||
|
self.fields_id_base += 1;
|
||||||
|
let mut map: HashMap<String, Vec<String>> = HashMap::new();
|
||||||
|
for (name, value) in trailers.iter() {
|
||||||
|
let key = name.to_string();
|
||||||
|
match map.get_mut(&key) {
|
||||||
|
Some(vec) => vec.push(value.to_str()?.to_string()),
|
||||||
|
None => {
|
||||||
|
let mut vec = Vec::new();
|
||||||
|
vec.push(value.to_str()?.to_string());
|
||||||
|
map.insert(key, vec);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
self.fields.insert(response.trailers, map);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
response.body = self.streams_id_base;
|
response.body = self.streams_id_base;
|
||||||
self.streams_id_base = self.streams_id_base + 1;
|
self.streams_id_base = self.streams_id_base + 1;
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ use crate::poll::Pollable;
|
|||||||
use crate::streams::{InputStream, OutputStream, StreamError};
|
use crate::streams::{InputStream, OutputStream, StreamError};
|
||||||
use crate::WasiHttp;
|
use crate::WasiHttp;
|
||||||
use anyhow::{anyhow, bail};
|
use anyhow::{anyhow, bail};
|
||||||
|
use bytes::BufMut;
|
||||||
use std::vec::Vec;
|
use std::vec::Vec;
|
||||||
|
|
||||||
impl crate::streams::Host for WasiHttp {
|
impl crate::streams::Host for WasiHttp {
|
||||||
@@ -27,10 +28,23 @@ impl crate::streams::Host for WasiHttp {
|
|||||||
|
|
||||||
fn skip(
|
fn skip(
|
||||||
&mut self,
|
&mut self,
|
||||||
_this: InputStream,
|
stream: InputStream,
|
||||||
_len: u64,
|
len: u64,
|
||||||
) -> wasmtime::Result<Result<(u64, bool), StreamError>> {
|
) -> wasmtime::Result<Result<(u64, bool), StreamError>> {
|
||||||
bail!("unimplemented: skip");
|
let s = self
|
||||||
|
.streams
|
||||||
|
.get_mut(&stream)
|
||||||
|
.ok_or_else(|| anyhow!("stream not found: {stream}"))?;
|
||||||
|
if len == 0 {
|
||||||
|
Ok(Ok((0, s.len() > 0)))
|
||||||
|
} else if s.len() > len.try_into()? {
|
||||||
|
s.truncate(len.try_into()?);
|
||||||
|
Ok(Ok((len, false)))
|
||||||
|
} else {
|
||||||
|
let bytes = s.len();
|
||||||
|
s.truncate(s.len());
|
||||||
|
Ok(Ok((bytes.try_into()?, true)))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn subscribe_to_input_stream(&mut self, _this: InputStream) -> wasmtime::Result<Pollable> {
|
fn subscribe_to_input_stream(&mut self, _this: InputStream) -> wasmtime::Result<Pollable> {
|
||||||
@@ -38,11 +52,7 @@ impl crate::streams::Host for WasiHttp {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn drop_input_stream(&mut self, stream: InputStream) -> wasmtime::Result<()> {
|
fn drop_input_stream(&mut self, stream: InputStream) -> wasmtime::Result<()> {
|
||||||
let r = self
|
self.streams.remove(&stream);
|
||||||
.streams
|
|
||||||
.get_mut(&stream)
|
|
||||||
.ok_or_else(|| anyhow!("no such input-stream {stream}"))?;
|
|
||||||
r.truncate(0);
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -51,17 +61,32 @@ impl crate::streams::Host for WasiHttp {
|
|||||||
this: OutputStream,
|
this: OutputStream,
|
||||||
buf: Vec<u8>,
|
buf: Vec<u8>,
|
||||||
) -> wasmtime::Result<Result<u64, StreamError>> {
|
) -> wasmtime::Result<Result<u64, StreamError>> {
|
||||||
// TODO: Make this a real write not a replace.
|
match self.streams.get(&this) {
|
||||||
self.streams.insert(this, bytes::Bytes::from(buf.clone()));
|
Some(data) => {
|
||||||
|
let mut new = bytes::BytesMut::with_capacity(data.len() + buf.len());
|
||||||
|
new.put(data.clone());
|
||||||
|
new.put(bytes::Bytes::from(buf.clone()));
|
||||||
|
self.streams.insert(this, new.freeze());
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
self.streams.insert(this, bytes::Bytes::from(buf.clone()));
|
||||||
|
}
|
||||||
|
}
|
||||||
Ok(Ok(buf.len().try_into()?))
|
Ok(Ok(buf.len().try_into()?))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn write_zeroes(
|
fn write_zeroes(
|
||||||
&mut self,
|
&mut self,
|
||||||
_this: OutputStream,
|
this: OutputStream,
|
||||||
_len: u64,
|
len: u64,
|
||||||
) -> wasmtime::Result<Result<u64, StreamError>> {
|
) -> wasmtime::Result<Result<u64, StreamError>> {
|
||||||
bail!("unimplemented: write_zeroes");
|
let mut data = Vec::with_capacity(len.try_into()?);
|
||||||
|
let mut i = 0;
|
||||||
|
while i < len {
|
||||||
|
data.push(0);
|
||||||
|
i = i + 1;
|
||||||
|
}
|
||||||
|
self.write(this, data)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn splice(
|
fn splice(
|
||||||
@@ -85,9 +110,8 @@ impl crate::streams::Host for WasiHttp {
|
|||||||
bail!("unimplemented: subscribe_to_output_stream");
|
bail!("unimplemented: subscribe_to_output_stream");
|
||||||
}
|
}
|
||||||
|
|
||||||
fn drop_output_stream(&mut self, _this: OutputStream) -> wasmtime::Result<()> {
|
fn drop_output_stream(&mut self, stream: OutputStream) -> wasmtime::Result<()> {
|
||||||
//bail!("unimplemented: drop_output_stream");
|
self.streams.remove(&stream);
|
||||||
//FIXME: intentionally ignoring
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -34,6 +34,7 @@ pub struct ActiveResponse {
|
|||||||
pub status: u16,
|
pub status: u16,
|
||||||
pub body: u32,
|
pub body: u32,
|
||||||
pub response_headers: HashMap<String, Vec<String>>,
|
pub response_headers: HashMap<String, Vec<String>>,
|
||||||
|
pub trailers: u32,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ActiveRequest {
|
impl ActiveRequest {
|
||||||
@@ -60,6 +61,7 @@ impl ActiveResponse {
|
|||||||
status: 0,
|
status: 0,
|
||||||
body: 0,
|
body: 0,
|
||||||
response_headers: HashMap::new(),
|
response_headers: HashMap::new(),
|
||||||
|
trailers: 0,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -101,8 +101,16 @@ impl crate::types::Host for WasiHttp {
|
|||||||
self.fields.insert(id, m.clone());
|
self.fields.insert(id, m.clone());
|
||||||
Ok(id)
|
Ok(id)
|
||||||
}
|
}
|
||||||
fn finish_incoming_stream(&mut self, _s: IncomingStream) -> wasmtime::Result<Option<Trailers>> {
|
fn finish_incoming_stream(&mut self, s: IncomingStream) -> wasmtime::Result<Option<Trailers>> {
|
||||||
bail!("unimplemented: finish_incoming_stream")
|
for (_, value) in self.responses.iter() {
|
||||||
|
if value.body == s {
|
||||||
|
return match value.trailers {
|
||||||
|
0 => Ok(None),
|
||||||
|
_ => Ok(Some(value.trailers)),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
bail!("unknown stream!")
|
||||||
}
|
}
|
||||||
fn finish_outgoing_stream(
|
fn finish_outgoing_stream(
|
||||||
&mut self,
|
&mut self,
|
||||||
@@ -181,8 +189,10 @@ impl crate::types::Host for WasiHttp {
|
|||||||
.requests
|
.requests
|
||||||
.get_mut(&request)
|
.get_mut(&request)
|
||||||
.ok_or_else(|| anyhow!("unknown request: {request}"))?;
|
.ok_or_else(|| anyhow!("unknown request: {request}"))?;
|
||||||
req.body = self.streams_id_base;
|
if req.body == 0 {
|
||||||
self.streams_id_base = self.streams_id_base + 1;
|
req.body = self.streams_id_base;
|
||||||
|
self.streams_id_base = self.streams_id_base + 1;
|
||||||
|
}
|
||||||
Ok(Ok(req.body))
|
Ok(Ok(req.body))
|
||||||
}
|
}
|
||||||
fn drop_response_outparam(&mut self, _response: ResponseOutparam) -> wasmtime::Result<()> {
|
fn drop_response_outparam(&mut self, _response: ResponseOutparam) -> wasmtime::Result<()> {
|
||||||
|
|||||||
Reference in New Issue
Block a user