diff --git a/Cargo.lock b/Cargo.lock index 413091c03c..7f209bf16e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3045,6 +3045,10 @@ dependencies = [ "cap-std", "cargo_metadata", "cfg-if", + "http", + "http-body", + "http-body-util", + "hyper", "os_pipe", "target-lexicon", "tempfile", diff --git a/crates/test-programs/Cargo.toml b/crates/test-programs/Cargo.toml index 99f893f174..a45cb75879 100644 --- a/crates/test-programs/Cargo.toml +++ b/crates/test-programs/Cargo.toml @@ -23,8 +23,12 @@ os_pipe = "0.9" anyhow = { workspace = true } wat = { workspace = true } cap-std = { workspace = true } -tokio = { version = "1.8.0", features = ["rt-multi-thread"] } +tokio = { version = "1.8.0", features = ["net", "rt-multi-thread"] } wasmtime-wasi-http = { workspace = true } +hyper = { version = "1.0.0-rc.3", features = ["full"] } +http = { version = "0.2.9" } +http-body = "1.0.0-rc.2" +http-body-util = "0.1.0-rc.2" [features] test_programs = [] diff --git a/crates/test-programs/tests/http_tests/runtime/wasi_http_tests.rs b/crates/test-programs/tests/http_tests/runtime/wasi_http_tests.rs index e322b6d799..5ed731dbcd 100644 --- a/crates/test-programs/tests/http_tests/runtime/wasi_http_tests.rs +++ b/crates/test-programs/tests/http_tests/runtime/wasi_http_tests.rs @@ -4,11 +4,59 @@ use wasmtime::{Config, Engine, Linker, Module, Store}; use wasmtime_wasi::{sync::WasiCtxBuilder, WasiCtx}; use wasmtime_wasi_http::WasiHttp; +use http_body_util::combinators::BoxBody; +use http_body_util::BodyExt; +use hyper::server::conn::http1; +use hyper::{body::Bytes, service::service_fn, Request, Response}; +use std::{error::Error, net::SocketAddr}; +use tokio::net::TcpListener; + +async fn test( + req: Request, +) -> http::Result>> { + let method = req.method().to_string(); + Response::builder() + .status(http::StatusCode::OK) + .header("x-wasmtime-test-method", method) + .body(req.into_body().boxed()) +} + +async fn async_run_serve() -> Result<(), Box> { + let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); + + let listener = TcpListener::bind(addr).await?; + + loop { + let (stream, _) = listener.accept().await?; + + tokio::task::spawn(async move { + if let Err(err) = http1::Builder::new() + .serve_connection(stream, service_fn(test)) + .await + { + println!("Error serving connection: {:?}", err); + } + }); + } +} + +fn run_server() -> Result<(), Box> { + let rt = tokio::runtime::Runtime::new()?; + let _ent = rt.enter(); + + rt.block_on(async_run_serve())?; + Ok(()) +} + pub fn instantiate_inherit_stdio( data: &[u8], bin_name: &str, _workspace: Option<&Path>, ) -> anyhow::Result<()> { + let _thread = std::thread::spawn(|| { + run_server().unwrap(); + }); + let config = Config::new(); let engine = Engine::new(&config)?; let module = Module::new(&engine, &data).context("failed to create wasm module")?; diff --git a/crates/test-programs/wasi-http-tests/src/bin/outbound_request.rs b/crates/test-programs/wasi-http-tests/src/bin/outbound_request.rs index 0e96aed29b..fbb448dbb7 100644 --- a/crates/test-programs/wasi-http-tests/src/bin/outbound_request.rs +++ b/crates/test-programs/wasi-http-tests/src/bin/outbound_request.rs @@ -21,6 +21,14 @@ impl fmt::Debug for Response { } } +impl Response { + fn header(&self, name: &str) -> Option<&String> { + self.headers.iter().find_map( + |(k, v)| if k == name { Some(v) } else { None } + ) + } +} + fn request( method: types::MethodParam<'_>, scheme: types::SchemeParam<'_>, @@ -47,9 +55,13 @@ fn request( body_cursor += written as usize; } + let future_response = default_outgoing_http::handle(request, None); + + // TODO: The current implementation requires this drop after the request is sent. + // The ownership semantics are unclear in wasi-http we should clarify exactly what is + // supposed to happen here. streams::drop_output_stream(request_body); - let future_response = default_outgoing_http::handle(request, None); // TODO: we could create a pollable from the future_response and poll on it here to test that // its available immediately @@ -71,7 +83,6 @@ fn request( let body_stream = types::incoming_response_consume(incoming_response) .map_err(|()| anyhow!("incoming response has no body stream"))?; - types::drop_incoming_response(incoming_response); let mut body = Vec::new(); let mut eof = false; @@ -81,6 +92,7 @@ fn request( body.append(&mut body_chunk); } streams::drop_input_stream(body_stream); + types::drop_incoming_response(incoming_response); Ok(Response { status, @@ -89,44 +101,55 @@ fn request( }) } -fn main() -> Result<()> { +fn main() -> Result<()> { + let missing = "MISSING".to_string(); let r1 = request( types::MethodParam::Get, types::SchemeParam::Http, - "postman-echo.com", + "localhost:3000", "/get", "?some=arg?goes=here", &[], ) - .context("postman-echo /get")?; + .context("localhost:3000 /get")?; - println!("postman-echo /get: {r1:?}"); + println!("localhost:3000 /get: {r1:?}"); assert_eq!(r1.status, 200); + let method = r1.header("x-wasmtime-test-method").unwrap_or(&missing); + assert_eq!(method, "GET"); + assert_eq!(r1.body, b""); let r2 = request( types::MethodParam::Post, types::SchemeParam::Http, - "postman-echo.com", + "localhost:3000", "/post", "", b"{\"foo\": \"bar\"}", ) - .context("postman-echo /post")?; + .context("localhost:3000 /post")?; - println!("postman-echo /post: {r2:?}"); + println!("localhost:3000 /post: {r2:?}"); assert_eq!(r2.status, 200); + let method = r2.header("x-wasmtime-test-method").unwrap_or(&missing); + assert_eq!(method, "POST"); + assert_eq!(r2.body, b"{\"foo\": \"bar\"}"); let r3 = request( types::MethodParam::Put, types::SchemeParam::Http, - "postman-echo.com", + "localhost:3000", "/put", "", &[], ) - .context("postman-echo /put")?; + .context("localhost:3000 /put")?; - println!("postman-echo /put: {r3:?}"); + println!("localhost:3000 /put: {r3:?}"); assert_eq!(r3.status, 200); + let method = r3.header("x-wasmtime-test-method").unwrap_or(&missing); + assert_eq!(method, "PUT"); + assert_eq!(r3.body, b""); + Ok(()) } diff --git a/crates/wasi-http/src/http_impl.rs b/crates/wasi-http/src/http_impl.rs index 206ebabcc0..69c94d8489 100644 --- a/crates/wasi-http/src/http_impl.rs +++ b/crates/wasi-http/src/http_impl.rs @@ -8,6 +8,7 @@ use bytes::{BufMut, Bytes, BytesMut}; use http_body_util::{BodyExt, Full}; use hyper::Method; use hyper::Request; +use std::collections::HashMap; #[cfg(not(any(target_arch = "riscv64", target_arch = "s390x")))] use std::sync::Arc; use std::time::Duration; @@ -201,6 +202,23 @@ impl WasiHttp { if let Some(chunk) = frame.data_ref() { buf.put(chunk.clone()); } + if let Some(trailers) = frame.trailers_ref() { + response.trailers = self.fields_id_base; + self.fields_id_base += 1; + let mut map: HashMap> = HashMap::new(); + for (name, value) in trailers.iter() { + let key = name.to_string(); + match map.get_mut(&key) { + Some(vec) => vec.push(value.to_str()?.to_string()), + None => { + let mut vec = Vec::new(); + vec.push(value.to_str()?.to_string()); + map.insert(key, vec); + } + }; + } + self.fields.insert(response.trailers, map); + } } response.body = self.streams_id_base; self.streams_id_base = self.streams_id_base + 1; diff --git a/crates/wasi-http/src/streams_impl.rs b/crates/wasi-http/src/streams_impl.rs index afaf0153e6..6b88fae762 100644 --- a/crates/wasi-http/src/streams_impl.rs +++ b/crates/wasi-http/src/streams_impl.rs @@ -2,6 +2,7 @@ use crate::poll::Pollable; use crate::streams::{InputStream, OutputStream, StreamError}; use crate::WasiHttp; use anyhow::{anyhow, bail}; +use bytes::BufMut; use std::vec::Vec; impl crate::streams::Host for WasiHttp { @@ -27,10 +28,23 @@ impl crate::streams::Host for WasiHttp { fn skip( &mut self, - _this: InputStream, - _len: u64, + stream: InputStream, + len: u64, ) -> wasmtime::Result> { - bail!("unimplemented: skip"); + let s = self + .streams + .get_mut(&stream) + .ok_or_else(|| anyhow!("stream not found: {stream}"))?; + if len == 0 { + Ok(Ok((0, s.len() > 0))) + } else if s.len() > len.try_into()? { + s.truncate(len.try_into()?); + Ok(Ok((len, false))) + } else { + let bytes = s.len(); + s.truncate(s.len()); + Ok(Ok((bytes.try_into()?, true))) + } } fn subscribe_to_input_stream(&mut self, _this: InputStream) -> wasmtime::Result { @@ -38,11 +52,7 @@ impl crate::streams::Host for WasiHttp { } fn drop_input_stream(&mut self, stream: InputStream) -> wasmtime::Result<()> { - let r = self - .streams - .get_mut(&stream) - .ok_or_else(|| anyhow!("no such input-stream {stream}"))?; - r.truncate(0); + self.streams.remove(&stream); Ok(()) } @@ -51,17 +61,32 @@ impl crate::streams::Host for WasiHttp { this: OutputStream, buf: Vec, ) -> wasmtime::Result> { - // TODO: Make this a real write not a replace. - self.streams.insert(this, bytes::Bytes::from(buf.clone())); + match self.streams.get(&this) { + Some(data) => { + let mut new = bytes::BytesMut::with_capacity(data.len() + buf.len()); + new.put(data.clone()); + new.put(bytes::Bytes::from(buf.clone())); + self.streams.insert(this, new.freeze()); + } + None => { + self.streams.insert(this, bytes::Bytes::from(buf.clone())); + } + } Ok(Ok(buf.len().try_into()?)) } fn write_zeroes( &mut self, - _this: OutputStream, - _len: u64, + this: OutputStream, + len: u64, ) -> wasmtime::Result> { - bail!("unimplemented: write_zeroes"); + let mut data = Vec::with_capacity(len.try_into()?); + let mut i = 0; + while i < len { + data.push(0); + i = i + 1; + } + self.write(this, data) } fn splice( @@ -85,9 +110,8 @@ impl crate::streams::Host for WasiHttp { bail!("unimplemented: subscribe_to_output_stream"); } - fn drop_output_stream(&mut self, _this: OutputStream) -> wasmtime::Result<()> { - //bail!("unimplemented: drop_output_stream"); - //FIXME: intentionally ignoring + fn drop_output_stream(&mut self, stream: OutputStream) -> wasmtime::Result<()> { + self.streams.remove(&stream); Ok(()) } } diff --git a/crates/wasi-http/src/struct.rs b/crates/wasi-http/src/struct.rs index b1c02ad1a2..0cb1245048 100644 --- a/crates/wasi-http/src/struct.rs +++ b/crates/wasi-http/src/struct.rs @@ -34,6 +34,7 @@ pub struct ActiveResponse { pub status: u16, pub body: u32, pub response_headers: HashMap>, + pub trailers: u32, } impl ActiveRequest { @@ -60,6 +61,7 @@ impl ActiveResponse { status: 0, body: 0, response_headers: HashMap::new(), + trailers: 0, } } } diff --git a/crates/wasi-http/src/types_impl.rs b/crates/wasi-http/src/types_impl.rs index 61e5d3b1ae..721dc4cb9e 100644 --- a/crates/wasi-http/src/types_impl.rs +++ b/crates/wasi-http/src/types_impl.rs @@ -101,8 +101,16 @@ impl crate::types::Host for WasiHttp { self.fields.insert(id, m.clone()); Ok(id) } - fn finish_incoming_stream(&mut self, _s: IncomingStream) -> wasmtime::Result> { - bail!("unimplemented: finish_incoming_stream") + fn finish_incoming_stream(&mut self, s: IncomingStream) -> wasmtime::Result> { + for (_, value) in self.responses.iter() { + if value.body == s { + return match value.trailers { + 0 => Ok(None), + _ => Ok(Some(value.trailers)), + }; + } + } + bail!("unknown stream!") } fn finish_outgoing_stream( &mut self, @@ -181,8 +189,10 @@ impl crate::types::Host for WasiHttp { .requests .get_mut(&request) .ok_or_else(|| anyhow!("unknown request: {request}"))?; - req.body = self.streams_id_base; - self.streams_id_base = self.streams_id_base + 1; + if req.body == 0 { + req.body = self.streams_id_base; + self.streams_id_base = self.streams_id_base + 1; + } Ok(Ok(req.body)) } fn drop_response_outparam(&mut self, _response: ResponseOutparam) -> wasmtime::Result<()> {