Pooling allocator: add a reuse-affinity policy.
This policy attempts to reuse the same instance slot for subsequent instantiations of the same module. This is particularly useful when using a pooling backend such as memfd that benefits from this reuse: for example, in the memfd case, instantiating the same module into the same slot allows us to avoid several calls to mmap() because the same mappings can be reused. The policy tracks a freelist per "compiled module ID", and when allocating a slot for an instance, tries these three options in order: 1. A slot from the freelist for this module (i.e., last used for another instantiation of this particular module), or 3. A slot that was last used by some other module or never before. The "victim" slot for choice 2 is randomly chosen. The data structures are carefully designed so that all updates are O(1), and there is no retry-loop in any of the random selection. This policy is now the default when the memfd backend is selected via the `memfd-allocator` feature flag.
This commit is contained in:
@@ -15,7 +15,6 @@ use crate::MemFdSlot;
|
||||
use crate::{instance::Instance, Memory, Mmap, ModuleMemFds, Table};
|
||||
use anyhow::{anyhow, bail, Context, Result};
|
||||
use libc::c_void;
|
||||
use rand::Rng;
|
||||
use std::convert::TryFrom;
|
||||
use std::mem;
|
||||
use std::sync::Arc;
|
||||
@@ -25,6 +24,9 @@ use wasmtime_environ::{
|
||||
WASM_PAGE_SIZE,
|
||||
};
|
||||
|
||||
mod index_allocator;
|
||||
use index_allocator::PoolingAllocationState;
|
||||
|
||||
cfg_if::cfg_if! {
|
||||
if #[cfg(windows)] {
|
||||
mod windows;
|
||||
@@ -250,20 +252,19 @@ pub enum PoolingAllocationStrategy {
|
||||
NextAvailable,
|
||||
/// Allocate from a random available instance.
|
||||
Random,
|
||||
}
|
||||
|
||||
impl PoolingAllocationStrategy {
|
||||
fn next(&self, free_count: usize) -> usize {
|
||||
debug_assert!(free_count > 0);
|
||||
|
||||
match self {
|
||||
Self::NextAvailable => free_count - 1,
|
||||
Self::Random => rand::thread_rng().gen_range(0..free_count),
|
||||
}
|
||||
}
|
||||
/// Try to allocate an instance slot that was previously used for
|
||||
/// the same module, potentially enabling faster instantiation by
|
||||
/// reusing e.g. memory mappings.
|
||||
ReuseAffinity,
|
||||
}
|
||||
|
||||
impl Default for PoolingAllocationStrategy {
|
||||
#[cfg(feature = "memfd-allocator")]
|
||||
fn default() -> Self {
|
||||
Self::ReuseAffinity
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "memfd-allocator"))]
|
||||
fn default() -> Self {
|
||||
Self::NextAvailable
|
||||
}
|
||||
@@ -283,13 +284,14 @@ struct InstancePool {
|
||||
mapping: Mmap,
|
||||
instance_size: usize,
|
||||
max_instances: usize,
|
||||
free_list: Mutex<Vec<usize>>,
|
||||
index_allocator: Mutex<PoolingAllocationState>,
|
||||
memories: MemoryPool,
|
||||
tables: TablePool,
|
||||
}
|
||||
|
||||
impl InstancePool {
|
||||
fn new(
|
||||
strategy: PoolingAllocationStrategy,
|
||||
module_limits: &ModuleLimits,
|
||||
instance_limits: &InstanceLimits,
|
||||
tunables: &Tunables,
|
||||
@@ -330,7 +332,7 @@ impl InstancePool {
|
||||
mapping,
|
||||
instance_size,
|
||||
max_instances,
|
||||
free_list: Mutex::new((0..max_instances).collect()),
|
||||
index_allocator: Mutex::new(PoolingAllocationState::new(strategy, max_instances)),
|
||||
memories: MemoryPool::new(module_limits, instance_limits, tunables)?,
|
||||
tables: TablePool::new(module_limits, instance_limits)?,
|
||||
};
|
||||
@@ -351,6 +353,7 @@ impl InstancePool {
|
||||
let host_state = std::mem::replace(&mut req.host_state, Box::new(()));
|
||||
let instance_data = Instance::create_raw(
|
||||
&req.module,
|
||||
req.unique_id,
|
||||
&*req.wasm_data,
|
||||
PrimaryMap::default(),
|
||||
PrimaryMap::default(),
|
||||
@@ -362,6 +365,7 @@ impl InstancePool {
|
||||
// chosen slot before we do anything else with it. (This is
|
||||
// paired with a `drop_in_place` in deallocate below.)
|
||||
let instance = self.instance(index);
|
||||
|
||||
std::ptr::write(instance as _, instance_data);
|
||||
|
||||
// set_instance_memories and _tables will need the store before we can completely
|
||||
@@ -393,16 +397,14 @@ impl InstancePool {
|
||||
|
||||
fn allocate(
|
||||
&self,
|
||||
strategy: PoolingAllocationStrategy,
|
||||
req: InstanceAllocationRequest,
|
||||
) -> Result<InstanceHandle, InstantiationError> {
|
||||
let index = {
|
||||
let mut free_list = self.free_list.lock().unwrap();
|
||||
if free_list.is_empty() {
|
||||
let mut alloc = self.index_allocator.lock().unwrap();
|
||||
if alloc.is_empty() {
|
||||
return Err(InstantiationError::Limit(self.max_instances as u32));
|
||||
}
|
||||
let free_index = strategy.next(free_list.len());
|
||||
free_list.swap_remove(free_index)
|
||||
alloc.alloc(req.unique_id)
|
||||
};
|
||||
|
||||
unsafe {
|
||||
@@ -428,6 +430,7 @@ impl InstancePool {
|
||||
debug_assert!(index < self.max_instances);
|
||||
|
||||
let instance = unsafe { &mut *handle.instance };
|
||||
let unique_id = instance.unique_id;
|
||||
|
||||
// Decommit any linear memories that were used
|
||||
for ((def_mem_idx, memory), base) in
|
||||
@@ -497,7 +500,7 @@ impl InstancePool {
|
||||
// touched again until we write a fresh Instance in-place with
|
||||
// std::ptr::write in allocate() above.
|
||||
|
||||
self.free_list.lock().unwrap().push(index);
|
||||
self.index_allocator.lock().unwrap().free(index, unique_id);
|
||||
}
|
||||
|
||||
fn set_instance_memories(
|
||||
@@ -860,7 +863,7 @@ struct StackPool {
|
||||
stack_size: usize,
|
||||
max_instances: usize,
|
||||
page_size: usize,
|
||||
free_list: Mutex<Vec<usize>>,
|
||||
index_allocator: Mutex<PoolingAllocationState>,
|
||||
}
|
||||
|
||||
#[cfg(all(feature = "async", unix))]
|
||||
@@ -903,25 +906,29 @@ impl StackPool {
|
||||
stack_size,
|
||||
max_instances,
|
||||
page_size,
|
||||
free_list: Mutex::new((0..max_instances).collect()),
|
||||
// We always use a `NextAvailable` strategy for stack
|
||||
// allocation. We don't want or need an affinity policy
|
||||
// here: stacks do not benefit from being allocated to the
|
||||
// same compiled module with the same image (they always
|
||||
// start zeroed just the same for everyone).
|
||||
index_allocator: Mutex::new(PoolingAllocationState::new(
|
||||
PoolingAllocationStrategy::NextAvailable,
|
||||
max_instances,
|
||||
)),
|
||||
})
|
||||
}
|
||||
|
||||
fn allocate(
|
||||
&self,
|
||||
strategy: PoolingAllocationStrategy,
|
||||
) -> Result<wasmtime_fiber::FiberStack, FiberStackError> {
|
||||
fn allocate(&self) -> Result<wasmtime_fiber::FiberStack, FiberStackError> {
|
||||
if self.stack_size == 0 {
|
||||
return Err(FiberStackError::NotSupported);
|
||||
}
|
||||
|
||||
let index = {
|
||||
let mut free_list = self.free_list.lock().unwrap();
|
||||
if free_list.is_empty() {
|
||||
let mut alloc = self.index_allocator.lock().unwrap();
|
||||
if alloc.is_empty() {
|
||||
return Err(FiberStackError::Limit(self.max_instances as u32));
|
||||
}
|
||||
let free_index = strategy.next(free_list.len());
|
||||
free_list.swap_remove(free_index)
|
||||
alloc.alloc(None)
|
||||
};
|
||||
|
||||
debug_assert!(index < self.max_instances);
|
||||
@@ -967,7 +974,7 @@ impl StackPool {
|
||||
|
||||
decommit_stack_pages(bottom_of_stack as _, stack_size).unwrap();
|
||||
|
||||
self.free_list.lock().unwrap().push(index);
|
||||
self.index_allocator.lock().unwrap().free(index, None);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -978,7 +985,6 @@ impl StackPool {
|
||||
/// Note: the resource pools are manually dropped so that the fault handler terminates correctly.
|
||||
#[derive(Debug)]
|
||||
pub struct PoolingInstanceAllocator {
|
||||
strategy: PoolingAllocationStrategy,
|
||||
module_limits: ModuleLimits,
|
||||
// This is manually drop so that the pools unmap their memory before the page fault handler drops.
|
||||
instances: mem::ManuallyDrop<InstancePool>,
|
||||
@@ -1003,7 +1009,7 @@ impl PoolingInstanceAllocator {
|
||||
bail!("the instance count limit cannot be zero");
|
||||
}
|
||||
|
||||
let instances = InstancePool::new(&module_limits, &instance_limits, tunables)?;
|
||||
let instances = InstancePool::new(strategy, &module_limits, &instance_limits, tunables)?;
|
||||
|
||||
#[cfg(all(feature = "uffd", target_os = "linux"))]
|
||||
let _fault_handler = imp::PageFaultHandler::new(&instances)?;
|
||||
@@ -1011,7 +1017,6 @@ impl PoolingInstanceAllocator {
|
||||
drop(stack_size); // suppress unused warnings w/o async feature
|
||||
|
||||
Ok(Self {
|
||||
strategy,
|
||||
module_limits,
|
||||
instances: mem::ManuallyDrop::new(instances),
|
||||
#[cfg(all(feature = "async", unix))]
|
||||
@@ -1050,7 +1055,7 @@ unsafe impl InstanceAllocator for PoolingInstanceAllocator {
|
||||
&self,
|
||||
req: InstanceAllocationRequest,
|
||||
) -> Result<InstanceHandle, InstantiationError> {
|
||||
self.instances.allocate(self.strategy, req)
|
||||
self.instances.allocate(req)
|
||||
}
|
||||
|
||||
unsafe fn initialize(
|
||||
@@ -1097,7 +1102,7 @@ unsafe impl InstanceAllocator for PoolingInstanceAllocator {
|
||||
|
||||
#[cfg(all(feature = "async", unix))]
|
||||
fn allocate_fiber_stack(&self) -> Result<wasmtime_fiber::FiberStack, FiberStackError> {
|
||||
self.stacks.allocate(self.strategy)
|
||||
self.stacks.allocate()
|
||||
}
|
||||
|
||||
#[cfg(all(feature = "async", unix))]
|
||||
@@ -1417,21 +1422,6 @@ mod test {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_next_available_allocation_strategy() {
|
||||
let strat = PoolingAllocationStrategy::NextAvailable;
|
||||
assert_eq!(strat.next(10), 9);
|
||||
assert_eq!(strat.next(5), 4);
|
||||
assert_eq!(strat.next(1), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_random_allocation_strategy() {
|
||||
let strat = PoolingAllocationStrategy::Random;
|
||||
assert!(strat.next(100) < 100);
|
||||
assert_eq!(strat.next(1), 0);
|
||||
}
|
||||
|
||||
#[cfg(target_pointer_width = "64")]
|
||||
#[test]
|
||||
fn test_instance_pool() -> Result<()> {
|
||||
@@ -1451,6 +1441,7 @@ mod test {
|
||||
let instance_limits = InstanceLimits { count: 3 };
|
||||
|
||||
let instances = InstancePool::new(
|
||||
PoolingAllocationStrategy::NextAvailable,
|
||||
&module_limits,
|
||||
&instance_limits,
|
||||
&Tunables {
|
||||
@@ -1464,7 +1455,10 @@ mod test {
|
||||
assert_eq!(instances.instance_size, region::page::size());
|
||||
assert_eq!(instances.max_instances, 3);
|
||||
|
||||
assert_eq!(&*instances.free_list.lock().unwrap(), &[0, 1, 2]);
|
||||
assert_eq!(
|
||||
instances.index_allocator.lock().unwrap().testing_freelist(),
|
||||
&[0, 1, 2]
|
||||
);
|
||||
|
||||
let mut handles = Vec::new();
|
||||
let module = Arc::new(Module::default());
|
||||
@@ -1473,50 +1467,49 @@ mod test {
|
||||
for _ in (0..3).rev() {
|
||||
handles.push(
|
||||
instances
|
||||
.allocate(
|
||||
PoolingAllocationStrategy::NextAvailable,
|
||||
InstanceAllocationRequest {
|
||||
module: module.clone(),
|
||||
image_base: 0,
|
||||
functions,
|
||||
imports: Imports {
|
||||
functions: &[],
|
||||
tables: &[],
|
||||
memories: &[],
|
||||
globals: &[],
|
||||
},
|
||||
shared_signatures: VMSharedSignatureIndex::default().into(),
|
||||
host_state: Box::new(()),
|
||||
store: StorePtr::empty(),
|
||||
wasm_data: &[],
|
||||
memfds: None,
|
||||
.allocate(InstanceAllocationRequest {
|
||||
module: module.clone(),
|
||||
unique_id: None,
|
||||
image_base: 0,
|
||||
functions,
|
||||
imports: Imports {
|
||||
functions: &[],
|
||||
tables: &[],
|
||||
memories: &[],
|
||||
globals: &[],
|
||||
},
|
||||
)
|
||||
shared_signatures: VMSharedSignatureIndex::default().into(),
|
||||
host_state: Box::new(()),
|
||||
store: StorePtr::empty(),
|
||||
wasm_data: &[],
|
||||
memfds: None,
|
||||
})
|
||||
.expect("allocation should succeed"),
|
||||
);
|
||||
}
|
||||
|
||||
assert_eq!(&*instances.free_list.lock().unwrap(), &[]);
|
||||
assert_eq!(
|
||||
instances.index_allocator.lock().unwrap().testing_freelist(),
|
||||
&[]
|
||||
);
|
||||
|
||||
match instances.allocate(
|
||||
PoolingAllocationStrategy::NextAvailable,
|
||||
InstanceAllocationRequest {
|
||||
module: module.clone(),
|
||||
functions,
|
||||
image_base: 0,
|
||||
imports: Imports {
|
||||
functions: &[],
|
||||
tables: &[],
|
||||
memories: &[],
|
||||
globals: &[],
|
||||
},
|
||||
shared_signatures: VMSharedSignatureIndex::default().into(),
|
||||
host_state: Box::new(()),
|
||||
store: StorePtr::empty(),
|
||||
wasm_data: &[],
|
||||
memfds: None,
|
||||
match instances.allocate(InstanceAllocationRequest {
|
||||
module: module.clone(),
|
||||
unique_id: None,
|
||||
functions,
|
||||
image_base: 0,
|
||||
imports: Imports {
|
||||
functions: &[],
|
||||
tables: &[],
|
||||
memories: &[],
|
||||
globals: &[],
|
||||
},
|
||||
) {
|
||||
shared_signatures: VMSharedSignatureIndex::default().into(),
|
||||
host_state: Box::new(()),
|
||||
store: StorePtr::empty(),
|
||||
wasm_data: &[],
|
||||
memfds: None,
|
||||
}) {
|
||||
Err(InstantiationError::Limit(3)) => {}
|
||||
_ => panic!("unexpected error"),
|
||||
};
|
||||
@@ -1525,7 +1518,10 @@ mod test {
|
||||
instances.deallocate(&handle);
|
||||
}
|
||||
|
||||
assert_eq!(&*instances.free_list.lock().unwrap(), &[2, 1, 0]);
|
||||
assert_eq!(
|
||||
instances.index_allocator.lock().unwrap().testing_freelist(),
|
||||
&[2, 1, 0]
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -1635,7 +1631,7 @@ mod test {
|
||||
assert_eq!(pool.page_size, native_page_size);
|
||||
|
||||
assert_eq!(
|
||||
&*pool.free_list.lock().unwrap(),
|
||||
pool.index_allocator.lock().unwrap().testing_freelist(),
|
||||
&[0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
|
||||
);
|
||||
|
||||
@@ -1643,9 +1639,7 @@ mod test {
|
||||
|
||||
let mut stacks = Vec::new();
|
||||
for i in (0..10).rev() {
|
||||
let stack = pool
|
||||
.allocate(PoolingAllocationStrategy::NextAvailable)
|
||||
.expect("allocation should succeed");
|
||||
let stack = pool.allocate().expect("allocation should succeed");
|
||||
assert_eq!(
|
||||
((stack.top().unwrap() as usize - base) / pool.stack_size) - 1,
|
||||
i
|
||||
@@ -1653,12 +1647,9 @@ mod test {
|
||||
stacks.push(stack);
|
||||
}
|
||||
|
||||
assert_eq!(&*pool.free_list.lock().unwrap(), &[]);
|
||||
assert_eq!(pool.index_allocator.lock().unwrap().testing_freelist(), &[]);
|
||||
|
||||
match pool
|
||||
.allocate(PoolingAllocationStrategy::NextAvailable)
|
||||
.unwrap_err()
|
||||
{
|
||||
match pool.allocate().unwrap_err() {
|
||||
FiberStackError::Limit(10) => {}
|
||||
_ => panic!("unexpected error"),
|
||||
};
|
||||
@@ -1668,7 +1659,7 @@ mod test {
|
||||
}
|
||||
|
||||
assert_eq!(
|
||||
&*pool.free_list.lock().unwrap(),
|
||||
pool.index_allocator.lock().unwrap().testing_freelist(),
|
||||
&[9, 8, 7, 6, 5, 4, 3, 2, 1, 0],
|
||||
);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user