Begin implementation of wasi-http (#5929)

* Integrate experimental HTTP into wasmtime.

* Reset Cargo.lock

* Switch to bail!, plumb options partially.

* Implement timeouts.

* Remove generated files & wasm, add Makefile

* Remove generated code textfile

* Update crates/wasi-http/Cargo.toml

Co-authored-by: Eduardo de Moura Rodrigues <16357187+eduardomourar@users.noreply.github.com>

* Update crates/wasi-http/Cargo.toml

Co-authored-by: Eduardo de Moura Rodrigues <16357187+eduardomourar@users.noreply.github.com>

* Extract streams from request/response.

* Fix read for len < buffer length.

* Formatting.

* types impl: swap todos for traps

* streams_impl: idioms, and swap todos for traps

* component impl: idioms, swap all unwraps for traps, swap all todos for traps

* http impl: idiom

* Remove an unnecessary mut.

* Remove an unsupported function.

* Switch to the tokio runtime for the HTTP request.

* Add a rust example.

* Update to latest wit definition

* Remove example code.

* wip: start writing a http test...

* finish writing the outbound request example

havent executed it yet

* better debug output

* wasi-http: some stubs required for rust rewrite of the example

* add wasi_http tests to test-programs

* CI: run the http tests

* Fix some warnings.

* bump new deps to latest releases (#3)

* Add tests for wasi-http to test-programs (#2)

* wip: start writing a http test...

* finish writing the outbound request example

havent executed it yet

* better debug output

* wasi-http: some stubs required for rust rewrite of the example

* add wasi_http tests to test-programs

* CI: run the http tests

* bump new deps to latest releases

h2 0.3.16
http 0.2.9
mio 0.8.6
openssl 0.10.48
openssl-sys 0.9.83
tokio 1.26.0

---------

Co-authored-by: Brendan Burns <bburns@microsoft.com>

* Update crates/test-programs/tests/http_tests/runtime/wasi_http_tests.rs

* Update crates/test-programs/tests/http_tests/runtime/wasi_http_tests.rs

* Update crates/test-programs/tests/http_tests/runtime/wasi_http_tests.rs

* wasi-http: fix cargo.toml file and publish script to work together (#4)

unfortunately, the publish script doesn't use a proper toml parser (in
order to not have any dependencies), so the whitespace has to be the
trivial expected case.

then, add wasi-http to the list of crates to publish.

* Update crates/test-programs/build.rs

* Switch to rustls

* Cleanups.

* Merge switch to rustls.

* Formatting

* Remove libssl install

* Fix tests.

* Rename wasi-http -> wasmtime-wasi-http

* prtest:full

Conditionalize TLS on riscv64gc.

* prtest:full

Fix formatting, also disable tls on s390x

* prtest:full

Add a path parameter to wit-bindgen, remove symlink.

* prtest:full

Fix tests for places where SSL isn't supported.

* Update crates/wasi-http/Cargo.toml

---------

Co-authored-by: Eduardo de Moura Rodrigues <16357187+eduardomourar@users.noreply.github.com>
Co-authored-by: Pat Hickey <phickey@fastly.com>
Co-authored-by: Pat Hickey <pat@moreproductive.org>
This commit is contained in:
Brendan Burns
2023-04-05 13:33:03 -07:00
committed by GitHub
parent 7eb8914090
commit 2d34dbef4b
26 changed files with 2284 additions and 123 deletions

View File

@@ -0,0 +1,26 @@
[package]
name = "wasmtime-wasi-http"
version = "0.0.1"
authors.workspace = true
edition.workspace = true
repository = "https://github.com/bytecodealliance/wasmtime"
license = "Apache-2.0 WITH LLVM-exception"
description = "Experimental HTTP library for WebAssembly in Wasmtime"
readme = "readme.md"
[dependencies]
anyhow = { workspace = true }
bytes = "1.1.0"
hyper = { version = "1.0.0-rc.3", features = ["full"] }
tokio = { version = "1", default-features = false, features = ["net", "rt-multi-thread", "time"] }
http = { version = "0.2.9" }
http-body = "1.0.0-rc.2"
http-body-util = "0.1.0-rc.2"
thiserror = { workspace = true }
wasmtime = { workspace = true, features = ['component-model'] }
# The `ring` crate, used to implement TLS, does not build on riscv64 or s390x
[target.'cfg(not(any(target_arch = "riscv64", target_arch = "s390x")))'.dependencies]
tokio-rustls = { version = "0.24.0" }
rustls = { version = "0.21.0" }
webpki-roots = { version = "0.23.0" }

View File

@@ -0,0 +1,431 @@
use crate::default_outgoing_http::Host;
pub use crate::r#struct::WasiHttp;
use crate::streams::Host as StreamsHost;
use crate::types::{Host as TypesHost, RequestOptions, Scheme};
use anyhow::anyhow;
use std::str;
use std::vec::Vec;
use wasmtime::{AsContext, AsContextMut, Caller, Extern, Memory};
const MEMORY: &str = "memory";
#[derive(Debug, thiserror::Error)]
enum HttpError {
#[error("Memory not found")]
MemoryNotFound,
#[error("Memory access error")]
MemoryAccessError(#[from] wasmtime::MemoryAccessError),
#[error("Buffer too small")]
BufferTooSmall,
#[error("UTF-8 error")]
Utf8Error(#[from] std::str::Utf8Error),
}
fn memory_get<T>(caller: &mut Caller<'_, T>) -> Result<Memory, HttpError> {
if let Some(Extern::Memory(mem)) = caller.get_export(MEMORY) {
Ok(mem)
} else {
Err(HttpError::MemoryNotFound)
}
}
/// Get a slice of length `len` from `memory`, starting at `offset`.
/// This will return an `HttpError::BufferTooSmall` if the size of the
/// requested slice is larger than the memory size.
fn slice_from_memory(
memory: &Memory,
mut ctx: impl AsContextMut,
offset: u32,
len: u32,
) -> Result<Vec<u8>, HttpError> {
let required_memory_size = offset.checked_add(len).ok_or(HttpError::BufferTooSmall)? as usize;
if required_memory_size > memory.data_size(&mut ctx) {
return Err(HttpError::BufferTooSmall);
}
let mut buf = vec![0u8; len as usize];
memory.read(&mut ctx, offset as usize, buf.as_mut_slice())?;
Ok(buf)
}
fn u32_from_memory(memory: &Memory, ctx: impl AsContextMut, ptr: u32) -> Result<u32, HttpError> {
let slice = slice_from_memory(memory, ctx, ptr, 4)?;
let mut dst = [0u8; 4];
dst.clone_from_slice(&slice[0..4]);
Ok(u32::from_le_bytes(dst))
}
/// Read a string of byte length `len` from `memory`, starting at `offset`.
fn string_from_memory(
memory: &Memory,
ctx: impl AsContextMut,
offset: u32,
len: u32,
) -> Result<String, HttpError> {
let slice = slice_from_memory(memory, ctx, offset, len)?;
Ok(std::str::from_utf8(&slice)?.to_string())
}
fn allocate_guest_pointer<T>(caller: &mut Caller<'_, T>, size: u32) -> anyhow::Result<u32> {
let realloc = caller
.get_export("cabi_realloc")
.ok_or_else(|| anyhow!("missing required export cabi_realloc"))?;
let func = realloc
.into_func()
.ok_or_else(|| anyhow!("cabi_realloc must be a func"))?;
let typed = func.typed::<(u32, u32, u32, u32), u32>(caller.as_context())?;
Ok(typed.call(caller.as_context_mut(), (0, 0, 4, size))?)
}
fn u32_array_to_u8(arr: &[u32]) -> Vec<u8> {
let mut result = std::vec::Vec::new();
for val in arr.iter() {
let bytes = val.to_le_bytes();
for b in bytes.iter() {
result.push(*b);
}
}
result
}
pub fn add_component_to_linker<T>(
linker: &mut wasmtime::Linker<T>,
get_cx: impl Fn(&mut T) -> &mut WasiHttp + Send + Sync + Copy + 'static,
) -> anyhow::Result<()> {
linker.func_wrap(
"default-outgoing-HTTP",
"handle",
move |mut caller: Caller<'_, T>,
request: u32,
has_options: i32,
has_timeout: i32,
timeout_ms: u32,
has_first_byte_timeout: i32,
first_byte_timeout_ms: u32,
has_between_bytes_timeout: i32,
between_bytes_timeout_ms: u32|
-> anyhow::Result<u32> {
let options = if has_options == 1 {
Some(RequestOptions {
connect_timeout_ms: if has_timeout == 1 {
Some(timeout_ms)
} else {
None
},
first_byte_timeout_ms: if has_first_byte_timeout == 1 {
Some(first_byte_timeout_ms)
} else {
None
},
between_bytes_timeout_ms: if has_between_bytes_timeout == 1 {
Some(between_bytes_timeout_ms)
} else {
None
},
})
} else {
None
};
Ok(get_cx(caller.data_mut()).handle(request, options)?)
},
)?;
linker.func_wrap(
"types",
"new-outgoing-request",
move |mut caller: Caller<'_, T>,
method: i32,
_b: i32,
_c: i32,
path_ptr: u32,
path_len: u32,
query_ptr: u32,
query_len: u32,
scheme_is_some: i32,
scheme: i32,
_h: i32,
_i: i32,
authority_ptr: u32,
authority_len: u32,
headers: u32|
-> anyhow::Result<u32> {
let memory = memory_get(&mut caller)?;
let path = string_from_memory(&memory, caller.as_context_mut(), path_ptr, path_len)?;
let query = string_from_memory(&memory, caller.as_context_mut(), query_ptr, query_len)?;
let authority = string_from_memory(
&memory,
caller.as_context_mut(),
authority_ptr,
authority_len,
)?;
let mut s = Scheme::Https;
if scheme_is_some == 1 {
s = match scheme {
0 => Scheme::Http,
1 => Scheme::Https,
_ => anyhow::bail!("unsupported scheme {scheme}"),
};
}
let m = match method {
0 => crate::types::Method::Get,
1 => crate::types::Method::Head,
2 => crate::types::Method::Post,
3 => crate::types::Method::Put,
4 => crate::types::Method::Delete,
5 => crate::types::Method::Connect,
6 => crate::types::Method::Options,
7 => crate::types::Method::Trace,
8 => crate::types::Method::Patch,
_ => anyhow::bail!("unsupported method {method}"),
};
let ctx = get_cx(caller.data_mut());
Ok(ctx.new_outgoing_request(m, path, query, Some(s), authority, headers)?)
},
)?;
linker.func_wrap(
"types",
"incoming-response-status",
move |mut caller: Caller<'_, T>, id: u32| -> anyhow::Result<u32> {
let ctx = get_cx(caller.data_mut());
Ok(ctx.incoming_response_status(id)?.into())
},
)?;
linker.func_wrap(
"types",
"drop-future-incoming-response",
move |_caller: Caller<'_, T>, _future: u32| -> anyhow::Result<()> {
// FIXME: Intentionally left blank
Ok(())
},
)?;
linker.func_wrap(
"types",
"future-incoming-response-get",
move |mut caller: Caller<'_, T>, future: u32, ptr: i32| -> anyhow::Result<()> {
let memory = memory_get(&mut caller)?;
// First == is_some
// Second == is_err
// Third == {ok: is_err = false, tag: is_err = true}
// Fourth == string ptr
// Fifth == string len
let result: [u32; 5] = [1, 0, future, 0, 0];
let raw = u32_array_to_u8(&result);
memory.write(caller.as_context_mut(), ptr as _, &raw)?;
Ok(())
},
)?;
linker.func_wrap(
"types",
"incoming-response-consume",
move |mut caller: Caller<'_, T>, response: u32, ptr: i32| -> anyhow::Result<()> {
let ctx = get_cx(caller.data_mut());
let stream = ctx.incoming_response_consume(response)?.unwrap_or(0);
let memory = memory_get(&mut caller).unwrap();
// First == is_some
// Second == stream_id
let result: [u32; 2] = [0, stream];
let raw = u32_array_to_u8(&result);
memory.write(caller.as_context_mut(), ptr as _, &raw)?;
Ok(())
},
)?;
linker.func_wrap(
"poll",
"drop-pollable",
move |_caller: Caller<'_, T>, _a: i32| -> anyhow::Result<()> {
anyhow::bail!("unimplemented")
},
)?;
linker.func_wrap(
"types",
"drop-fields",
move |mut caller: Caller<'_, T>, ptr: u32| -> anyhow::Result<()> {
let ctx = get_cx(caller.data_mut());
ctx.drop_fields(ptr)?;
Ok(())
},
)?;
linker.func_wrap(
"streams",
"drop-input-stream",
move |mut caller: Caller<'_, T>, id: u32| -> anyhow::Result<()> {
let ctx = get_cx(caller.data_mut());
ctx.drop_input_stream(id)?;
Ok(())
},
)?;
linker.func_wrap(
"streams",
"drop-output-stream",
move |mut caller: Caller<'_, T>, id: u32| -> anyhow::Result<()> {
let ctx = get_cx(caller.data_mut());
ctx.drop_output_stream(id)?;
Ok(())
},
)?;
linker.func_wrap(
"types",
"outgoing-request-write",
move |mut caller: Caller<'_, T>, request: u32, ptr: u32| -> anyhow::Result<()> {
let ctx = get_cx(caller.data_mut());
let stream = ctx
.outgoing_request_write(request)?
.map_err(|_| anyhow!("no outgoing stream present"))?;
let memory = memory_get(&mut caller)?;
// First == is_some
// Second == stream_id
let result: [u32; 2] = [0, stream];
let raw = u32_array_to_u8(&result);
memory.write(caller.as_context_mut(), ptr as _, &raw)?;
Ok(())
},
)?;
linker.func_wrap(
"types",
"drop-outgoing-request",
move |mut caller: Caller<'_, T>, id: u32| -> anyhow::Result<()> {
let ctx = get_cx(caller.data_mut());
ctx.drop_outgoing_request(id)?;
Ok(())
},
)?;
linker.func_wrap(
"types",
"drop-incoming-response",
move |mut caller: Caller<'_, T>, id: u32| -> anyhow::Result<()> {
let ctx = get_cx(caller.data_mut());
ctx.drop_incoming_response(id)?;
Ok(())
},
)?;
linker.func_wrap(
"types",
"new-fields",
move |mut caller: Caller<'_, T>, base_ptr: u32, len: u32| -> anyhow::Result<u32> {
let memory = memory_get(&mut caller)?;
let mut vec = Vec::new();
let mut i = 0;
// TODO: read this more efficiently as a single block.
while i < len {
let ptr = base_ptr + i * 16;
let name_ptr = u32_from_memory(&memory, caller.as_context_mut(), ptr)?;
let name_len = u32_from_memory(&memory, caller.as_context_mut(), ptr + 4)?;
let value_ptr = u32_from_memory(&memory, caller.as_context_mut(), ptr + 8)?;
let value_len = u32_from_memory(&memory, caller.as_context_mut(), ptr + 12)?;
let name =
string_from_memory(&memory, caller.as_context_mut(), name_ptr, name_len)?;
let value =
string_from_memory(&memory, caller.as_context_mut(), value_ptr, value_len)?;
vec.push((name, value));
i = i + 1;
}
let ctx = get_cx(caller.data_mut());
Ok(ctx.new_fields(vec)?)
},
)?;
linker.func_wrap(
"streams",
"read",
move |mut caller: Caller<'_, T>, stream: u32, len: u64, ptr: u32| -> anyhow::Result<()> {
let ctx = get_cx(caller.data_mut());
let bytes_tuple = ctx.read(stream, len)??;
let bytes = bytes_tuple.0;
let done = match bytes_tuple.1 {
true => 1,
false => 0,
};
let body_len: u32 = bytes.len().try_into()?;
let out_ptr = allocate_guest_pointer(&mut caller, body_len)?;
let result: [u32; 4] = [0, out_ptr, body_len, done];
let raw = u32_array_to_u8(&result);
let memory = memory_get(&mut caller)?;
memory.write(caller.as_context_mut(), out_ptr as _, &bytes)?;
memory.write(caller.as_context_mut(), ptr as _, &raw)?;
Ok(())
},
)?;
linker.func_wrap(
"streams",
"write",
move |mut caller: Caller<'_, T>,
stream: u32,
body_ptr: u32,
body_len: u32,
ptr: u32|
-> anyhow::Result<()> {
let memory = memory_get(&mut caller)?;
let body = string_from_memory(&memory, caller.as_context_mut(), body_ptr, body_len)?;
let result: [u32; 3] = [0, 0, body_len];
let raw = u32_array_to_u8(&result);
let memory = memory_get(&mut caller)?;
memory.write(caller.as_context_mut(), ptr as _, &raw)?;
let ctx = get_cx(caller.data_mut());
ctx.write(stream, body.as_bytes().to_vec())??;
Ok(())
},
)?;
linker.func_wrap(
"types",
"fields-entries",
move |mut caller: Caller<'_, T>, fields: u32, out_ptr: u32| -> anyhow::Result<()> {
let ctx = get_cx(caller.data_mut());
let entries = ctx.fields_entries(fields)?;
let header_len = entries.len();
let tuple_ptr = allocate_guest_pointer(&mut caller, (16 * header_len).try_into()?)?;
let mut ptr = tuple_ptr;
for item in entries.iter() {
let name = &item.0;
let value = &item.1;
let name_len: u32 = name.len().try_into()?;
let value_len: u32 = value.len().try_into()?;
let name_ptr = allocate_guest_pointer(&mut caller, name_len)?;
let value_ptr = allocate_guest_pointer(&mut caller, value_len)?;
let memory = memory_get(&mut caller)?;
memory.write(caller.as_context_mut(), name_ptr as _, &name.as_bytes())?;
memory.write(caller.as_context_mut(), value_ptr as _, &value.as_bytes())?;
let pair: [u32; 4] = [name_ptr, name_len, value_ptr, value_len];
let raw_pair = u32_array_to_u8(&pair);
memory.write(caller.as_context_mut(), ptr as _, &raw_pair)?;
ptr = ptr + 16;
}
let memory = memory_get(&mut caller)?;
let result: [u32; 2] = [tuple_ptr, header_len.try_into()?];
let raw = u32_array_to_u8(&result);
memory.write(caller.as_context_mut(), out_ptr as _, &raw)?;
Ok(())
},
)?;
linker.func_wrap(
"types",
"incoming-response-headers",
move |mut caller: Caller<'_, T>, handle: u32| -> anyhow::Result<u32> {
let ctx = get_cx(caller.data_mut());
Ok(ctx.incoming_response_headers(handle)?)
},
)?;
Ok(())
}

View File

@@ -0,0 +1,211 @@
use crate::r#struct::ActiveResponse;
pub use crate::r#struct::WasiHttp;
use crate::types::{RequestOptions, Scheme};
#[cfg(not(any(target_arch = "riscv64", target_arch = "s390x")))]
use anyhow::anyhow;
use anyhow::bail;
use bytes::{BufMut, Bytes, BytesMut};
use http_body_util::{BodyExt, Full};
use hyper::Method;
use hyper::Request;
#[cfg(not(any(target_arch = "riscv64", target_arch = "s390x")))]
use std::sync::Arc;
use std::time::Duration;
use tokio::net::TcpStream;
use tokio::runtime::Runtime;
use tokio::time::timeout;
#[cfg(not(any(target_arch = "riscv64", target_arch = "s390x")))]
use tokio_rustls::rustls::{self, OwnedTrustAnchor};
impl crate::default_outgoing_http::Host for WasiHttp {
fn handle(
&mut self,
request_id: crate::default_outgoing_http::OutgoingRequest,
options: Option<crate::default_outgoing_http::RequestOptions>,
) -> wasmtime::Result<crate::default_outgoing_http::FutureIncomingResponse> {
// TODO: Initialize this once?
let rt = Runtime::new().unwrap();
let _enter = rt.enter();
let f = self.handle_async(request_id, options);
match rt.block_on(f) {
Ok(r) => {
println!("{} OK", r);
Ok(r)
}
Err(e) => {
println!("{} ERR", e);
Err(e)
}
}
}
}
fn port_for_scheme(scheme: &Option<Scheme>) -> &str {
match scheme {
Some(s) => match s {
Scheme::Http => ":80",
Scheme::Https => ":443",
// This should never happen.
_ => panic!("unsupported scheme!"),
},
None => ":443",
}
}
impl WasiHttp {
async fn handle_async(
&mut self,
request_id: crate::default_outgoing_http::OutgoingRequest,
options: Option<crate::default_outgoing_http::RequestOptions>,
) -> wasmtime::Result<crate::default_outgoing_http::FutureIncomingResponse> {
let opts = options.unwrap_or(
// TODO: Configurable defaults here?
RequestOptions {
connect_timeout_ms: Some(600 * 1000),
first_byte_timeout_ms: Some(600 * 1000),
between_bytes_timeout_ms: Some(600 * 1000),
},
);
let connect_timeout =
Duration::from_millis(opts.connect_timeout_ms.unwrap_or(600 * 1000).into());
let first_bytes_timeout =
Duration::from_millis(opts.first_byte_timeout_ms.unwrap_or(600 * 1000).into());
let between_bytes_timeout =
Duration::from_millis(opts.between_bytes_timeout_ms.unwrap_or(600 * 1000).into());
let request = match self.requests.get(&request_id) {
Some(r) => r,
None => bail!("not found!"),
};
let method = match request.method {
crate::types::Method::Get => Method::GET,
crate::types::Method::Head => Method::HEAD,
crate::types::Method::Post => Method::POST,
crate::types::Method::Put => Method::PUT,
crate::types::Method::Delete => Method::DELETE,
crate::types::Method::Connect => Method::CONNECT,
crate::types::Method::Options => Method::OPTIONS,
crate::types::Method::Trace => Method::TRACE,
crate::types::Method::Patch => Method::PATCH,
_ => bail!("unknown method!"),
};
let scheme = match request.scheme.as_ref().unwrap_or(&Scheme::Https) {
Scheme::Http => "http://",
Scheme::Https => "https://",
// TODO: this is wrong, fix this.
_ => panic!("Unsupported scheme!"),
};
// Largely adapted from https://hyper.rs/guides/1/client/basic/
let authority = match request.authority.find(":") {
Some(_) => request.authority.clone(),
None => request.authority.clone() + port_for_scheme(&request.scheme),
};
let mut sender = if scheme == "https://" {
#[cfg(not(any(target_arch = "riscv64", target_arch = "s390x")))]
{
let stream = TcpStream::connect(authority.clone()).await?;
//TODO: uncomment this code and make the tls implementation a feature decision.
//let connector = tokio_native_tls::native_tls::TlsConnector::builder().build()?;
//let connector = tokio_native_tls::TlsConnector::from(connector);
//let host = authority.split(":").next().unwrap_or(&authority);
//let stream = connector.connect(&host, stream).await?;
// derived from https://github.com/tokio-rs/tls/blob/master/tokio-rustls/examples/client/src/main.rs
let mut root_cert_store = rustls::RootCertStore::empty();
root_cert_store.add_server_trust_anchors(
webpki_roots::TLS_SERVER_ROOTS.0.iter().map(|ta| {
OwnedTrustAnchor::from_subject_spki_name_constraints(
ta.subject,
ta.spki,
ta.name_constraints,
)
}),
);
let config = rustls::ClientConfig::builder()
.with_safe_defaults()
.with_root_certificates(root_cert_store)
.with_no_client_auth();
let connector = tokio_rustls::TlsConnector::from(Arc::new(config));
let mut parts = authority.split(":");
let host = parts.next().unwrap_or(&authority);
let domain =
rustls::ServerName::try_from(host).map_err(|_| anyhow!("invalid dnsname"))?;
let stream = connector.connect(domain, stream).await?;
let t = timeout(
connect_timeout,
hyper::client::conn::http1::handshake(stream),
)
.await?;
let (s, conn) = t?;
tokio::task::spawn(async move {
if let Err(err) = conn.await {
println!("Connection failed: {:?}", err);
}
});
s
}
#[cfg(any(target_arch = "riscv64", target_arch = "s390x"))]
bail!("unsupported architecture for SSL")
} else {
let tcp = TcpStream::connect(authority).await?;
let t = timeout(connect_timeout, hyper::client::conn::http1::handshake(tcp)).await?;
let (s, conn) = t?;
tokio::task::spawn(async move {
if let Err(err) = conn.await {
println!("Connection failed: {:?}", err);
}
});
s
};
let url = scheme.to_owned() + &request.authority + &request.path + &request.query;
let mut call = Request::builder()
.method(method)
.uri(url)
.header(hyper::header::HOST, request.authority.as_str());
for (key, val) in request.headers.iter() {
for item in val {
call = call.header(key, item.clone());
}
}
let response_id = self.response_id_base;
self.response_id_base = self.response_id_base + 1;
let mut response = ActiveResponse::new(response_id);
let body = Full::<Bytes>::new(
self.streams
.get(&request.body)
.unwrap_or(&Bytes::new())
.clone(),
);
let t = timeout(first_bytes_timeout, sender.send_request(call.body(body)?)).await?;
let mut res = t?;
response.status = res.status().try_into()?;
for (key, value) in res.headers().iter() {
let mut vec = std::vec::Vec::new();
vec.push(value.to_str()?.to_string());
response
.response_headers
.insert(key.as_str().to_string(), vec);
}
let mut buf = BytesMut::new();
while let Some(next) = timeout(between_bytes_timeout, res.frame()).await? {
let frame = next?;
if let Some(chunk) = frame.data_ref() {
buf.put(chunk.clone());
}
}
response.body = self.streams_id_base;
self.streams_id_base = self.streams_id_base + 1;
self.streams.insert(response.body, buf.freeze());
self.responses.insert(response_id, response);
Ok(response_id)
}
}

View File

@@ -0,0 +1,27 @@
use crate::component_impl::add_component_to_linker;
pub use crate::r#struct::WasiHttp;
wasmtime::component::bindgen!({ path: "wasi-http/wit", world: "proxy"});
pub mod component_impl;
pub mod http_impl;
pub mod streams_impl;
pub mod r#struct;
pub mod types_impl;
pub fn add_to_component_linker<T>(
linker: &mut wasmtime::component::Linker<T>,
get_cx: impl Fn(&mut T) -> &mut WasiHttp + Send + Sync + Copy + 'static,
) -> anyhow::Result<()> {
default_outgoing_http::add_to_linker(linker, get_cx)?;
types::add_to_linker(linker, get_cx)?;
streams::add_to_linker(linker, get_cx)?;
Ok(())
}
pub fn add_to_linker<T>(
linker: &mut wasmtime::Linker<T>,
get_cx: impl Fn(&mut T) -> &mut WasiHttp + Send + Sync + Copy + 'static,
) -> anyhow::Result<()> {
add_component_to_linker(linker, get_cx)
}

View File

@@ -0,0 +1,93 @@
use crate::poll::Pollable;
use crate::streams::{InputStream, OutputStream, StreamError};
use crate::WasiHttp;
use anyhow::{anyhow, bail};
use std::vec::Vec;
impl crate::streams::Host for WasiHttp {
fn read(
&mut self,
stream: InputStream,
len: u64,
) -> wasmtime::Result<Result<(Vec<u8>, bool), StreamError>> {
let s = self
.streams
.get_mut(&stream)
.ok_or_else(|| anyhow!("stream not found: {stream}"))?;
if len == 0 {
Ok(Ok((bytes::Bytes::new().to_vec(), s.len() > 0)))
} else if s.len() > len.try_into()? {
let result = s.split_to(len.try_into()?);
Ok(Ok((result.to_vec(), false)))
} else {
s.truncate(s.len());
Ok(Ok((s.clone().to_vec(), true)))
}
}
fn skip(
&mut self,
_this: InputStream,
_len: u64,
) -> wasmtime::Result<Result<(u64, bool), StreamError>> {
bail!("unimplemented: skip");
}
fn subscribe_to_input_stream(&mut self, _this: InputStream) -> wasmtime::Result<Pollable> {
bail!("unimplemented: subscribe_to_input_stream");
}
fn drop_input_stream(&mut self, stream: InputStream) -> wasmtime::Result<()> {
let r = self
.streams
.get_mut(&stream)
.ok_or_else(|| anyhow!("no such input-stream {stream}"))?;
r.truncate(0);
Ok(())
}
fn write(
&mut self,
this: OutputStream,
buf: Vec<u8>,
) -> wasmtime::Result<Result<u64, StreamError>> {
// TODO: Make this a real write not a replace.
self.streams.insert(this, bytes::Bytes::from(buf.clone()));
Ok(Ok(buf.len().try_into()?))
}
fn write_zeroes(
&mut self,
_this: OutputStream,
_len: u64,
) -> wasmtime::Result<Result<u64, StreamError>> {
bail!("unimplemented: write_zeroes");
}
fn splice(
&mut self,
_this: OutputStream,
_src: InputStream,
_len: u64,
) -> wasmtime::Result<Result<(u64, bool), StreamError>> {
bail!("unimplemented: splice");
}
fn forward(
&mut self,
_this: OutputStream,
_src: InputStream,
) -> wasmtime::Result<Result<u64, StreamError>> {
bail!("unimplemented: forward");
}
fn subscribe_to_output_stream(&mut self, _this: OutputStream) -> wasmtime::Result<Pollable> {
bail!("unimplemented: subscribe_to_output_stream");
}
fn drop_output_stream(&mut self, _this: OutputStream) -> wasmtime::Result<()> {
//bail!("unimplemented: drop_output_stream");
//FIXME: intentionally ignoring
Ok(())
}
}

View File

@@ -0,0 +1,80 @@
use crate::types::{Method, Scheme};
use bytes::Bytes;
use std::collections::HashMap;
#[derive(Clone)]
pub struct WasiHttp {
pub request_id_base: u32,
pub response_id_base: u32,
pub fields_id_base: u32,
pub streams_id_base: u32,
pub requests: HashMap<u32, ActiveRequest>,
pub responses: HashMap<u32, ActiveResponse>,
pub fields: HashMap<u32, HashMap<String, Vec<String>>>,
pub streams: HashMap<u32, Bytes>,
}
#[derive(Clone)]
pub struct ActiveRequest {
pub id: u32,
pub active_request: bool,
pub method: Method,
pub scheme: Option<Scheme>,
pub path: String,
pub query: String,
pub authority: String,
pub headers: HashMap<String, Vec<String>>,
pub body: u32,
}
#[derive(Clone)]
pub struct ActiveResponse {
pub id: u32,
pub active_response: bool,
pub status: u16,
pub body: u32,
pub response_headers: HashMap<String, Vec<String>>,
}
impl ActiveRequest {
pub fn new(id: u32) -> Self {
Self {
id: id,
active_request: false,
method: Method::Get,
scheme: Some(Scheme::Http),
path: "".to_string(),
query: "".to_string(),
authority: "".to_string(),
headers: HashMap::new(),
body: 0,
}
}
}
impl ActiveResponse {
pub fn new(id: u32) -> Self {
Self {
id: id,
active_response: false,
status: 0,
body: 0,
response_headers: HashMap::new(),
}
}
}
impl WasiHttp {
pub fn new() -> Self {
Self {
request_id_base: 1,
response_id_base: 1,
fields_id_base: 1,
streams_id_base: 1,
requests: HashMap::new(),
responses: HashMap::new(),
fields: HashMap::new(),
streams: HashMap::new(),
}
}
}

View File

@@ -0,0 +1,271 @@
use crate::poll::Pollable;
use crate::r#struct::ActiveRequest;
use crate::types::{
Error, Fields, FutureIncomingResponse, Headers, IncomingRequest, IncomingResponse,
IncomingStream, Method, OutgoingRequest, OutgoingResponse, OutgoingStream, ResponseOutparam,
Scheme, StatusCode, Trailers,
};
use crate::WasiHttp;
use anyhow::{anyhow, bail};
use std::collections::HashMap;
impl crate::types::Host for WasiHttp {
fn drop_fields(&mut self, fields: Fields) -> wasmtime::Result<()> {
self.fields.remove(&fields);
Ok(())
}
fn new_fields(&mut self, entries: Vec<(String, String)>) -> wasmtime::Result<Fields> {
let mut map = HashMap::new();
for item in entries.iter() {
let mut vec = std::vec::Vec::new();
vec.push(item.1.clone());
map.insert(item.0.clone(), vec);
}
let id = self.fields_id_base;
self.fields_id_base = id + 1;
self.fields.insert(id, map);
Ok(id)
}
fn fields_get(&mut self, fields: Fields, name: String) -> wasmtime::Result<Vec<String>> {
let res = self
.fields
.get(&fields)
.ok_or_else(|| anyhow!("fields not found: {fields}"))?
.get(&name)
.ok_or_else(|| anyhow!("key not found: {name}"))?
.clone();
Ok(res)
}
fn fields_set(
&mut self,
fields: Fields,
name: String,
value: Vec<String>,
) -> wasmtime::Result<()> {
match self.fields.get_mut(&fields) {
Some(m) => {
m.insert(name, value.clone());
Ok(())
}
None => bail!("fields not found"),
}
}
fn fields_delete(&mut self, fields: Fields, name: String) -> wasmtime::Result<()> {
match self.fields.get_mut(&fields) {
Some(m) => m.remove(&name),
None => None,
};
Ok(())
}
fn fields_append(
&mut self,
fields: Fields,
name: String,
value: String,
) -> wasmtime::Result<()> {
let m = self
.fields
.get_mut(&fields)
.ok_or_else(|| anyhow!("unknown fields: {fields}"))?;
match m.get_mut(&name) {
Some(v) => v.push(value),
None => {
let mut vec = std::vec::Vec::new();
vec.push(value);
m.insert(name, vec);
}
};
Ok(())
}
fn fields_entries(&mut self, fields: Fields) -> wasmtime::Result<Vec<(String, String)>> {
let field_map = match self.fields.get(&fields) {
Some(m) => m,
None => bail!("fields not found."),
};
let mut result = Vec::new();
for (name, value) in field_map {
result.push((name.clone(), value[0].clone()));
}
Ok(result)
}
fn fields_clone(&mut self, fields: Fields) -> wasmtime::Result<Fields> {
let id = self.fields_id_base;
self.fields_id_base = self.fields_id_base + 1;
let m = self
.fields
.get(&fields)
.ok_or_else(|| anyhow!("fields not found: {fields}"))?;
self.fields.insert(id, m.clone());
Ok(id)
}
fn finish_incoming_stream(&mut self, _s: IncomingStream) -> wasmtime::Result<Option<Trailers>> {
bail!("unimplemented: finish_incoming_stream")
}
fn finish_outgoing_stream(
&mut self,
_s: OutgoingStream,
_trailers: Option<Trailers>,
) -> wasmtime::Result<()> {
bail!("unimplemented: finish_outgoing_stream")
}
fn drop_incoming_request(&mut self, _request: IncomingRequest) -> wasmtime::Result<()> {
bail!("unimplemented: drop_incoming_request")
}
fn drop_outgoing_request(&mut self, request: OutgoingRequest) -> wasmtime::Result<()> {
self.requests.remove(&request);
Ok(())
}
fn incoming_request_method(&mut self, _request: IncomingRequest) -> wasmtime::Result<Method> {
bail!("unimplemented: incoming_request_method")
}
fn incoming_request_path(&mut self, _request: IncomingRequest) -> wasmtime::Result<String> {
bail!("unimplemented: incoming_request_path")
}
fn incoming_request_scheme(
&mut self,
_request: IncomingRequest,
) -> wasmtime::Result<Option<Scheme>> {
bail!("unimplemented: incoming_request_scheme")
}
fn incoming_request_authority(
&mut self,
_request: IncomingRequest,
) -> wasmtime::Result<String> {
bail!("unimplemented: incoming_request_authority")
}
fn incoming_request_headers(&mut self, _request: IncomingRequest) -> wasmtime::Result<Headers> {
bail!("unimplemented: incoming_request_headers")
}
fn incoming_request_consume(
&mut self,
_request: IncomingRequest,
) -> wasmtime::Result<Result<IncomingStream, ()>> {
bail!("unimplemented: incoming_request_consume")
}
fn incoming_request_query(&mut self, _request: IncomingRequest) -> wasmtime::Result<String> {
bail!("unimplemented: incoming_request_query")
}
fn new_outgoing_request(
&mut self,
method: Method,
path: String,
query: String,
scheme: Option<Scheme>,
authority: String,
headers: Headers,
) -> wasmtime::Result<OutgoingRequest> {
let id = self.request_id_base;
self.request_id_base = self.request_id_base + 1;
let mut req = ActiveRequest::new(id);
req.path = path;
req.query = query;
req.authority = authority;
req.method = method;
req.headers = match self.fields.get(&headers) {
Some(h) => h.clone(),
None => bail!("headers not found."),
};
req.scheme = scheme;
self.requests.insert(id, req);
Ok(id)
}
fn outgoing_request_write(
&mut self,
request: OutgoingRequest,
) -> wasmtime::Result<Result<OutgoingStream, ()>> {
let mut req = self
.requests
.get_mut(&request)
.ok_or_else(|| anyhow!("unknown request: {request}"))?;
req.body = self.streams_id_base;
self.streams_id_base = self.streams_id_base + 1;
Ok(Ok(req.body))
}
fn drop_response_outparam(&mut self, _response: ResponseOutparam) -> wasmtime::Result<()> {
bail!("unimplemented: drop_response_outparam")
}
fn set_response_outparam(
&mut self,
_outparam: ResponseOutparam,
_response: Result<OutgoingResponse, Error>,
) -> wasmtime::Result<Result<(), ()>> {
bail!("unimplemented: set_response_outparam")
}
fn drop_incoming_response(&mut self, response: IncomingResponse) -> wasmtime::Result<()> {
self.responses.remove(&response);
Ok(())
}
fn drop_outgoing_response(&mut self, _response: OutgoingResponse) -> wasmtime::Result<()> {
bail!("unimplemented: drop_outgoing_response")
}
fn incoming_response_status(
&mut self,
response: IncomingResponse,
) -> wasmtime::Result<StatusCode> {
let r = self
.responses
.get(&response)
.ok_or_else(|| anyhow!("response not found: {response}"))?;
Ok(r.status)
}
fn incoming_response_headers(
&mut self,
response: IncomingResponse,
) -> wasmtime::Result<Headers> {
let r = self
.responses
.get(&response)
.ok_or_else(|| anyhow!("response not found: {response}"))?;
let id = self.fields_id_base;
self.fields_id_base = self.fields_id_base + 1;
self.fields.insert(id, r.response_headers.clone());
Ok(id)
}
fn incoming_response_consume(
&mut self,
response: IncomingResponse,
) -> wasmtime::Result<Result<IncomingStream, ()>> {
let r = self
.responses
.get(&response)
.ok_or_else(|| anyhow!("response not found: {response}"))?;
Ok(Ok(r.body))
}
fn new_outgoing_response(
&mut self,
_status_code: StatusCode,
_headers: Headers,
) -> wasmtime::Result<OutgoingResponse> {
bail!("unimplemented: new_outgoing_response")
}
fn outgoing_response_write(
&mut self,
_response: OutgoingResponse,
) -> wasmtime::Result<Result<OutgoingStream, ()>> {
bail!("unimplemented: outgoing_response_write")
}
fn drop_future_incoming_response(
&mut self,
_f: FutureIncomingResponse,
) -> wasmtime::Result<()> {
bail!("unimplemented: drop_future_incoming_response")
}
fn future_incoming_response_get(
&mut self,
_f: FutureIncomingResponse,
) -> wasmtime::Result<Option<Result<IncomingResponse, Error>>> {
bail!("unimplemented: future_incoming_response_get")
}
fn listen_to_future_incoming_response(
&mut self,
_f: FutureIncomingResponse,
) -> wasmtime::Result<Pollable> {
bail!("unimplemented: listen_to_future_incoming_response")
}
}