Cache worker tests
This commit is contained in:
@@ -45,6 +45,7 @@ target-lexicon = { version = "0.8.1", default-features = false }
|
||||
pretty_env_logger = "0.3.0"
|
||||
rand = { version = "0.7.0", features = ["small_rng"] }
|
||||
cranelift-codegen = { version = "0.44.0", features = ["enable-serde", "all-arch"] }
|
||||
filetime = "0.2.7"
|
||||
|
||||
[features]
|
||||
default = ["std"]
|
||||
|
||||
@@ -15,12 +15,13 @@ use std::io::Write;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::string::{String, ToString};
|
||||
|
||||
#[macro_use] // for tests
|
||||
mod config;
|
||||
mod worker;
|
||||
|
||||
use config::{cache_config, CacheConfig};
|
||||
pub use config::{create_new_config, init};
|
||||
use worker::worker;
|
||||
use worker::{worker, Worker};
|
||||
|
||||
lazy_static! {
|
||||
static ref SELF_MTIME: String = {
|
||||
@@ -48,9 +49,12 @@ lazy_static! {
|
||||
};
|
||||
}
|
||||
|
||||
pub struct ModuleCacheEntry<'config> {
|
||||
mod_cache_path: Option<PathBuf>,
|
||||
pub struct ModuleCacheEntry<'config, 'worker>(Option<ModuleCacheEntryInner<'config, 'worker>>);
|
||||
|
||||
struct ModuleCacheEntryInner<'config, 'worker> {
|
||||
mod_cache_path: PathBuf,
|
||||
cache_config: &'config CacheConfig,
|
||||
worker: &'worker Worker,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
|
||||
@@ -72,7 +76,7 @@ type ModuleCacheDataTupleType = (
|
||||
|
||||
struct Sha256Hasher(Sha256);
|
||||
|
||||
impl<'config> ModuleCacheEntry<'config> {
|
||||
impl<'config, 'worker> ModuleCacheEntry<'config, 'worker> {
|
||||
pub fn new<'data>(
|
||||
module: &Module,
|
||||
function_body_inputs: &PrimaryMap<DefinedFuncIndex, FunctionBodyData<'data>>,
|
||||
@@ -80,87 +84,104 @@ impl<'config> ModuleCacheEntry<'config> {
|
||||
compiler_name: &str,
|
||||
generate_debug_info: bool,
|
||||
) -> Self {
|
||||
Self::new_with_config(
|
||||
module,
|
||||
function_body_inputs,
|
||||
isa,
|
||||
compiler_name,
|
||||
generate_debug_info,
|
||||
cache_config(),
|
||||
)
|
||||
let cache_config = cache_config();
|
||||
if cache_config.enabled() {
|
||||
Self(Some(ModuleCacheEntryInner::new(
|
||||
module,
|
||||
function_body_inputs,
|
||||
isa,
|
||||
compiler_name,
|
||||
generate_debug_info,
|
||||
cache_config,
|
||||
worker(),
|
||||
)))
|
||||
} else {
|
||||
Self(None)
|
||||
}
|
||||
}
|
||||
|
||||
fn new_with_config<'data>(
|
||||
#[cfg(test)]
|
||||
fn from_inner<'data>(inner: ModuleCacheEntryInner<'config, 'worker>) -> Self {
|
||||
Self(Some(inner))
|
||||
}
|
||||
|
||||
pub fn get_data(&self) -> Option<ModuleCacheData> {
|
||||
if let Some(inner) = &self.0 {
|
||||
inner.get_data().map(|val| {
|
||||
inner.worker.on_cache_get_async(&inner.mod_cache_path); // call on success
|
||||
val
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn update_data(&self, data: &ModuleCacheData) {
|
||||
if let Some(inner) = &self.0 {
|
||||
inner.update_data(data).map(|val| {
|
||||
inner.worker.on_cache_update_async(&inner.mod_cache_path); // call on success
|
||||
val
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'config, 'worker> ModuleCacheEntryInner<'config, 'worker> {
|
||||
fn new<'data>(
|
||||
module: &Module,
|
||||
function_body_inputs: &PrimaryMap<DefinedFuncIndex, FunctionBodyData<'data>>,
|
||||
isa: &dyn isa::TargetIsa,
|
||||
compiler_name: &str,
|
||||
generate_debug_info: bool,
|
||||
cache_config: &'config CacheConfig,
|
||||
worker: &'worker Worker,
|
||||
) -> Self {
|
||||
let mod_cache_path = if cache_config.enabled() {
|
||||
let hash = Sha256Hasher::digest(module, function_body_inputs);
|
||||
let compiler_dir = if cfg!(debug_assertions) {
|
||||
format!(
|
||||
"{comp_name}-{comp_ver}-{comp_mtime}",
|
||||
comp_name = compiler_name,
|
||||
comp_ver = env!("GIT_REV"),
|
||||
comp_mtime = *SELF_MTIME,
|
||||
)
|
||||
} else {
|
||||
format!(
|
||||
"{comp_name}-{comp_ver}",
|
||||
comp_name = compiler_name,
|
||||
comp_ver = env!("GIT_REV"),
|
||||
)
|
||||
};
|
||||
let mod_filename = format!(
|
||||
"mod-{mod_hash}{mod_dbg}",
|
||||
mod_hash = base64::encode_config(&hash, base64::URL_SAFE_NO_PAD), // standard encoding uses '/' which can't be used for filename
|
||||
mod_dbg = if generate_debug_info { ".d" } else { "" },
|
||||
);
|
||||
Some(
|
||||
cache_config
|
||||
.directory()
|
||||
.join(isa.triple().to_string())
|
||||
.join(compiler_dir)
|
||||
.join(mod_filename),
|
||||
let hash = Sha256Hasher::digest(module, function_body_inputs);
|
||||
let compiler_dir = if cfg!(debug_assertions) {
|
||||
format!(
|
||||
"{comp_name}-{comp_ver}-{comp_mtime}",
|
||||
comp_name = compiler_name,
|
||||
comp_ver = env!("GIT_REV"),
|
||||
comp_mtime = *SELF_MTIME,
|
||||
)
|
||||
} else {
|
||||
None
|
||||
format!(
|
||||
"{comp_name}-{comp_ver}",
|
||||
comp_name = compiler_name,
|
||||
comp_ver = env!("GIT_REV"),
|
||||
)
|
||||
};
|
||||
let mod_filename = format!(
|
||||
"mod-{mod_hash}{mod_dbg}",
|
||||
mod_hash = base64::encode_config(&hash, base64::URL_SAFE_NO_PAD), // standard encoding uses '/' which can't be used for filename
|
||||
mod_dbg = if generate_debug_info { ".d" } else { "" },
|
||||
);
|
||||
let mod_cache_path = cache_config
|
||||
.directory()
|
||||
.join(isa.triple().to_string())
|
||||
.join(compiler_dir)
|
||||
.join(mod_filename);
|
||||
|
||||
Self {
|
||||
mod_cache_path,
|
||||
cache_config,
|
||||
worker,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_data(&self) -> Option<ModuleCacheData> {
|
||||
let path = self.mod_cache_path.as_ref()?;
|
||||
trace!("get_data() for path: {}", path.display());
|
||||
let compressed_cache_bytes = fs::read(path).ok()?;
|
||||
fn get_data(&self) -> Option<ModuleCacheData> {
|
||||
trace!("get_data() for path: {}", self.mod_cache_path.display());
|
||||
let compressed_cache_bytes = fs::read(&self.mod_cache_path).ok()?;
|
||||
let cache_bytes = zstd::decode_all(&compressed_cache_bytes[..])
|
||||
.map_err(|err| warn!("Failed to decompress cached code: {}", err))
|
||||
.ok()?;
|
||||
let ret = bincode::deserialize(&cache_bytes[..])
|
||||
bincode::deserialize(&cache_bytes[..])
|
||||
.map_err(|err| warn!("Failed to deserialize cached code: {}", err))
|
||||
.ok()?;
|
||||
|
||||
worker().on_cache_get_async(path); // call on success
|
||||
Some(ret)
|
||||
.ok()
|
||||
}
|
||||
|
||||
pub fn update_data(&self, data: &ModuleCacheData) {
|
||||
if self.update_data_impl(data).is_some() {
|
||||
let path = self.mod_cache_path.as_ref().unwrap();
|
||||
worker().on_cache_update_async(path); // call on success
|
||||
}
|
||||
}
|
||||
|
||||
fn update_data_impl(&self, data: &ModuleCacheData) -> Option<()> {
|
||||
let path = self.mod_cache_path.as_ref()?;
|
||||
trace!("update_data() for path: {}", path.display());
|
||||
fn update_data(&self, data: &ModuleCacheData) -> Option<()> {
|
||||
trace!("update_data() for path: {}", self.mod_cache_path.display());
|
||||
let serialized_data = bincode::serialize(&data)
|
||||
.map_err(|err| warn!("Failed to serialize cached code: {}", err))
|
||||
.ok()?;
|
||||
@@ -173,17 +194,17 @@ impl<'config> ModuleCacheEntry<'config> {
|
||||
|
||||
// Optimize syscalls: first, try writing to disk. It should succeed in most cases.
|
||||
// Otherwise, try creating the cache directory and retry writing to the file.
|
||||
if fs_write_atomic(path, "mod", &compressed_data) {
|
||||
if fs_write_atomic(&self.mod_cache_path, "mod", &compressed_data) {
|
||||
return Some(());
|
||||
}
|
||||
|
||||
debug!(
|
||||
"Attempting to create the cache directory, because \
|
||||
failed to write cached code to disk, path: {}",
|
||||
path.display(),
|
||||
self.mod_cache_path.display(),
|
||||
);
|
||||
|
||||
let cache_dir = path.parent().unwrap();
|
||||
let cache_dir = self.mod_cache_path.parent().unwrap();
|
||||
fs::create_dir_all(cache_dir)
|
||||
.map_err(|err| {
|
||||
warn!(
|
||||
@@ -194,7 +215,7 @@ impl<'config> ModuleCacheEntry<'config> {
|
||||
})
|
||||
.ok()?;
|
||||
|
||||
if fs_write_atomic(path, "mod", &compressed_data) {
|
||||
if fs_write_atomic(&self.mod_cache_path, "mod", &compressed_data) {
|
||||
Some(())
|
||||
} else {
|
||||
None
|
||||
|
||||
3
wasmtime-environ/src/cache/config.rs
vendored
3
wasmtime-environ/src/cache/config.rs
vendored
@@ -621,4 +621,5 @@ impl CacheConfig {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
#[macro_use]
|
||||
pub mod tests;
|
||||
|
||||
3
wasmtime-environ/src/cache/config/tests.rs
vendored
3
wasmtime-environ/src/cache/config/tests.rs
vendored
@@ -8,7 +8,8 @@ use tempfile::{self, TempDir};
|
||||
// that's why these function and macro always use custom cache directory
|
||||
// note: tempdir removes directory when being dropped, so we need to return it to the caller,
|
||||
// so the paths are valid
|
||||
fn test_prolog() -> (TempDir, PathBuf, PathBuf) {
|
||||
pub fn test_prolog() -> (TempDir, PathBuf, PathBuf) {
|
||||
let _ = pretty_env_logger::try_init();
|
||||
let temp_dir = tempfile::tempdir().expect("Can't create temporary directory");
|
||||
let cache_dir = temp_dir.path().join("cache-dir");
|
||||
let config_path = temp_dir.path().join("cache-config.toml");
|
||||
|
||||
100
wasmtime-environ/src/cache/tests.rs
vendored
100
wasmtime-environ/src/cache/tests.rs
vendored
@@ -1,3 +1,4 @@
|
||||
use super::config::tests::test_prolog;
|
||||
use super::*;
|
||||
use crate::address_map::{FunctionAddressMap, InstructionAddressMap};
|
||||
use crate::compilation::{CodeAndJTOffsets, Relocation, RelocationTarget};
|
||||
@@ -14,22 +15,17 @@ use std::fs;
|
||||
use std::str::FromStr;
|
||||
use std::vec::Vec;
|
||||
use target_lexicon::triple;
|
||||
use tempfile;
|
||||
|
||||
// Since cache system is a global thing, each test needs to be run in seperate process.
|
||||
// So, init() tests are run as integration tests.
|
||||
// However, caching is a private thing, an implementation detail, and needs to be tested
|
||||
// from the inside of the module.
|
||||
// We test init() in exactly one test, rest of the tests doesn't rely on it.
|
||||
|
||||
#[test]
|
||||
fn test_write_read_cache() {
|
||||
pretty_env_logger::init();
|
||||
let dir = tempfile::tempdir().expect("Can't create temporary directory");
|
||||
|
||||
let cache_dir = dir.path().join("cache-dir");
|
||||
let baseline_compression_level = 5;
|
||||
|
||||
let config_path = dir.path().join("cache-config.toml");
|
||||
fn test_cache_init() {
|
||||
let (_tempdir, cache_dir, config_path) = test_prolog();
|
||||
let baseline_compression_level = 4;
|
||||
let config_content = format!(
|
||||
"[cache]\n\
|
||||
enabled = true\n\
|
||||
@@ -42,6 +38,8 @@ fn test_write_read_cache() {
|
||||
|
||||
let errors = init(true, Some(&config_path), None);
|
||||
assert!(errors.is_empty());
|
||||
|
||||
// test if we can use config
|
||||
let cache_config = cache_config();
|
||||
assert!(cache_config.enabled());
|
||||
// assumption: config init creates cache directory and returns canonicalized path
|
||||
@@ -54,6 +52,31 @@ fn test_write_read_cache() {
|
||||
baseline_compression_level
|
||||
);
|
||||
|
||||
// test if we can use worker
|
||||
let worker = worker();
|
||||
worker.on_cache_update_async(config_path);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_write_read_cache() {
|
||||
let (_tempdir, cache_dir, config_path) = test_prolog();
|
||||
let cache_config = load_config!(
|
||||
config_path,
|
||||
"[cache]\n\
|
||||
enabled = true\n\
|
||||
directory = {cache_dir}\n\
|
||||
baseline-compression-level = 3\n",
|
||||
cache_dir
|
||||
);
|
||||
assert!(cache_config.enabled());
|
||||
let worker = Worker::start_new(&cache_config, None);
|
||||
|
||||
// assumption: config load creates cache directory and returns canonicalized path
|
||||
assert_eq!(
|
||||
*cache_config.directory(),
|
||||
fs::canonicalize(cache_dir).unwrap()
|
||||
);
|
||||
|
||||
let mut rng = SmallRng::from_seed([
|
||||
0x42, 0x04, 0xF3, 0x44, 0x11, 0x22, 0x33, 0x44, 0x67, 0x68, 0xFF, 0x00, 0x44, 0x23, 0x7F,
|
||||
0x96,
|
||||
@@ -72,27 +95,59 @@ fn test_write_read_cache() {
|
||||
let compiler1 = "test-1";
|
||||
let compiler2 = "test-2";
|
||||
|
||||
let entry1 = ModuleCacheEntry::new(&module1, &function_body_inputs1, &*isa1, compiler1, false);
|
||||
assert!(entry1.mod_cache_path().is_some());
|
||||
let entry1 = ModuleCacheEntry::from_inner(ModuleCacheEntryInner::new(
|
||||
&module1,
|
||||
&function_body_inputs1,
|
||||
&*isa1,
|
||||
compiler1,
|
||||
false,
|
||||
&cache_config,
|
||||
&worker,
|
||||
));
|
||||
assert!(entry1.0.is_some());
|
||||
assert!(entry1.get_data().is_none());
|
||||
let data1 = new_module_cache_data(&mut rng);
|
||||
entry1.update_data(&data1);
|
||||
assert_eq!(entry1.get_data().expect("Cache should be available"), data1);
|
||||
|
||||
let entry2 = ModuleCacheEntry::new(&module2, &function_body_inputs1, &*isa1, compiler1, false);
|
||||
let entry2 = ModuleCacheEntry::from_inner(ModuleCacheEntryInner::new(
|
||||
&module2,
|
||||
&function_body_inputs1,
|
||||
&*isa1,
|
||||
compiler1,
|
||||
false,
|
||||
&cache_config,
|
||||
&worker,
|
||||
));
|
||||
let data2 = new_module_cache_data(&mut rng);
|
||||
entry2.update_data(&data2);
|
||||
assert_eq!(entry1.get_data().expect("Cache should be available"), data1);
|
||||
assert_eq!(entry2.get_data().expect("Cache should be available"), data2);
|
||||
|
||||
let entry3 = ModuleCacheEntry::new(&module1, &function_body_inputs2, &*isa1, compiler1, false);
|
||||
let entry3 = ModuleCacheEntry::from_inner(ModuleCacheEntryInner::new(
|
||||
&module1,
|
||||
&function_body_inputs2,
|
||||
&*isa1,
|
||||
compiler1,
|
||||
false,
|
||||
&cache_config,
|
||||
&worker,
|
||||
));
|
||||
let data3 = new_module_cache_data(&mut rng);
|
||||
entry3.update_data(&data3);
|
||||
assert_eq!(entry1.get_data().expect("Cache should be available"), data1);
|
||||
assert_eq!(entry2.get_data().expect("Cache should be available"), data2);
|
||||
assert_eq!(entry3.get_data().expect("Cache should be available"), data3);
|
||||
|
||||
let entry4 = ModuleCacheEntry::new(&module1, &function_body_inputs1, &*isa2, compiler1, false);
|
||||
let entry4 = ModuleCacheEntry::from_inner(ModuleCacheEntryInner::new(
|
||||
&module1,
|
||||
&function_body_inputs1,
|
||||
&*isa2,
|
||||
compiler1,
|
||||
false,
|
||||
&cache_config,
|
||||
&worker,
|
||||
));
|
||||
let data4 = new_module_cache_data(&mut rng);
|
||||
entry4.update_data(&data4);
|
||||
assert_eq!(entry1.get_data().expect("Cache should be available"), data1);
|
||||
@@ -100,7 +155,15 @@ fn test_write_read_cache() {
|
||||
assert_eq!(entry3.get_data().expect("Cache should be available"), data3);
|
||||
assert_eq!(entry4.get_data().expect("Cache should be available"), data4);
|
||||
|
||||
let entry5 = ModuleCacheEntry::new(&module1, &function_body_inputs1, &*isa1, compiler2, false);
|
||||
let entry5 = ModuleCacheEntry::from_inner(ModuleCacheEntryInner::new(
|
||||
&module1,
|
||||
&function_body_inputs1,
|
||||
&*isa1,
|
||||
compiler2,
|
||||
false,
|
||||
&cache_config,
|
||||
&worker,
|
||||
));
|
||||
let data5 = new_module_cache_data(&mut rng);
|
||||
entry5.update_data(&data5);
|
||||
assert_eq!(entry1.get_data().expect("Cache should be available"), data1);
|
||||
@@ -186,7 +249,6 @@ fn new_function_body_inputs<'data>(
|
||||
}
|
||||
|
||||
fn new_module_cache_data(rng: &mut impl Rng) -> ModuleCacheData {
|
||||
// WARNING: if method changed, update PartialEq impls below, too!
|
||||
let funcs = (0..rng.gen_range(0, 10))
|
||||
.map(|i| {
|
||||
let mut sm = SecondaryMap::new(); // doesn't implement from iterator
|
||||
@@ -276,9 +338,3 @@ fn new_module_cache_data(rng: &mut impl Rng) -> ModuleCacheData {
|
||||
stack_slots,
|
||||
))
|
||||
}
|
||||
|
||||
impl ModuleCacheEntry<'_> {
|
||||
pub fn mod_cache_path(&self) -> &Option<PathBuf> {
|
||||
&self.mod_cache_path
|
||||
}
|
||||
}
|
||||
|
||||
127
wasmtime-environ/src/cache/worker.rs
vendored
127
wasmtime-environ/src/cache/worker.rs
vendored
@@ -6,8 +6,6 @@
|
||||
//! Background tasks can be CPU intensive, but the worker thread has low priority.
|
||||
|
||||
use super::{cache_config, fs_write_atomic, CacheConfig};
|
||||
#[cfg(test)]
|
||||
use core::borrow::Borrow;
|
||||
use log::{debug, info, trace, warn};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use spin::Once;
|
||||
@@ -19,31 +17,34 @@ use std::path::{Path, PathBuf};
|
||||
use std::sync::atomic::{self, AtomicBool};
|
||||
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
|
||||
#[cfg(test)]
|
||||
use std::sync::{atomic::AtomicU32, Arc};
|
||||
use std::thread::{self};
|
||||
use std::sync::{Arc, Condvar, Mutex};
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
#[cfg(not(test))]
|
||||
use std::time::SystemTime;
|
||||
use std::vec::Vec;
|
||||
#[cfg(test)]
|
||||
use tests::system_time_stub::SystemTimeStub as SystemTime;
|
||||
|
||||
pub(super) struct Worker {
|
||||
sender: SyncSender<CacheEvent>,
|
||||
#[cfg(test)]
|
||||
stats: Arc<WorkerStats>,
|
||||
stats: Arc<(Mutex<WorkerStats>, Condvar)>,
|
||||
}
|
||||
|
||||
struct WorkerThread {
|
||||
receiver: Receiver<CacheEvent>,
|
||||
cache_config: CacheConfig,
|
||||
#[cfg(test)]
|
||||
stats: Arc<WorkerStats>,
|
||||
stats: Arc<(Mutex<WorkerStats>, Condvar)>,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[derive(Default)]
|
||||
struct WorkerStats {
|
||||
dropped: AtomicU32,
|
||||
sent: AtomicU32,
|
||||
handled: AtomicU32,
|
||||
dropped: u32,
|
||||
sent: u32,
|
||||
handled: u32,
|
||||
}
|
||||
|
||||
static WORKER: Once<Worker> = Once::new();
|
||||
@@ -87,7 +88,7 @@ impl Worker {
|
||||
let (tx, rx) = sync_channel(queue_size);
|
||||
|
||||
#[cfg(test)]
|
||||
let stats = Arc::new(WorkerStats::default());
|
||||
let stats = Arc::new((Mutex::new(WorkerStats::default()), Condvar::new()));
|
||||
|
||||
let worker_thread = WorkerThread {
|
||||
receiver: rx,
|
||||
@@ -101,7 +102,7 @@ impl Worker {
|
||||
// non-tests binary has only a static worker, so Rust doesn't drop it
|
||||
thread::spawn(move || worker_thread.run(init_file_per_thread_logger));
|
||||
|
||||
Worker {
|
||||
Self {
|
||||
sender: tx,
|
||||
#[cfg(test)]
|
||||
stats: stats,
|
||||
@@ -121,11 +122,15 @@ impl Worker {
|
||||
#[inline]
|
||||
fn send_cache_event(&self, event: CacheEvent) {
|
||||
#[cfg(test)]
|
||||
let stats: &WorkerStats = self.stats.borrow();
|
||||
let mut stats = self
|
||||
.stats
|
||||
.0
|
||||
.lock()
|
||||
.expect("Failed to acquire worker stats lock");
|
||||
match self.sender.try_send(event.clone()) {
|
||||
Ok(()) => {
|
||||
#[cfg(test)]
|
||||
stats.sent.fetch_add(1, atomic::Ordering::SeqCst);
|
||||
let _ = stats.sent += 1;
|
||||
}
|
||||
Err(err) => {
|
||||
info!(
|
||||
@@ -135,24 +140,30 @@ impl Worker {
|
||||
);
|
||||
|
||||
#[cfg(test)]
|
||||
stats.dropped.fetch_add(1, atomic::Ordering::SeqCst);
|
||||
let _ = stats.dropped += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)] // todo for worker tests
|
||||
#[cfg(test)]
|
||||
pub(super) fn events_dropped(&self) -> u32 {
|
||||
let stats: &WorkerStats = self.stats.borrow();
|
||||
stats.dropped.load(atomic::Ordering::SeqCst)
|
||||
let stats = self
|
||||
.stats
|
||||
.0
|
||||
.lock()
|
||||
.expect("Failed to acquire worker stats lock");
|
||||
stats.dropped
|
||||
}
|
||||
|
||||
// todo wait_for_* instead?
|
||||
#[allow(dead_code)] // todo for worker tests
|
||||
#[cfg(test)]
|
||||
pub(super) fn all_events_handled(&self) -> bool {
|
||||
let stats: &WorkerStats = self.stats.borrow();
|
||||
stats.sent.load(atomic::Ordering::SeqCst) == stats.handled.load(atomic::Ordering::SeqCst)
|
||||
pub(super) fn wait_for_all_events_handled(&self) {
|
||||
let (stats, condvar) = &*self.stats;
|
||||
let mut stats = stats.lock().expect("Failed to acquire worker stats lock");
|
||||
while stats.handled != stats.sent {
|
||||
stats = condvar
|
||||
.wait(stats)
|
||||
.expect("Failed to reacquire worker stats lock");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -186,6 +197,7 @@ enum CacheEntry {
|
||||
|
||||
impl WorkerThread {
|
||||
fn run(self, init_file_per_thread_logger: Option<&'static str>) {
|
||||
#[cfg(not(test))] // We want to test the worker without relying on init() being called
|
||||
assert!(INIT_CALLED.load(atomic::Ordering::SeqCst));
|
||||
|
||||
if let Some(prefix) = init_file_per_thread_logger {
|
||||
@@ -197,7 +209,7 @@ impl WorkerThread {
|
||||
Self::lower_thread_priority();
|
||||
|
||||
#[cfg(test)]
|
||||
let stats: &WorkerStats = self.stats.borrow();
|
||||
let (stats, condvar) = &*self.stats;
|
||||
|
||||
for event in self.receiver.iter() {
|
||||
match event {
|
||||
@@ -206,7 +218,11 @@ impl WorkerThread {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
stats.handled.fetch_add(1, atomic::Ordering::SeqCst);
|
||||
{
|
||||
let mut stats = stats.lock().expect("Failed to acquire worker stats lock");
|
||||
stats.handled += 1;
|
||||
condvar.notify_all();
|
||||
}
|
||||
}
|
||||
|
||||
// The receiver can stop iteration iff the channel has hung up.
|
||||
@@ -422,13 +438,29 @@ impl WorkerThread {
|
||||
trace!("Trying to clean up cache");
|
||||
|
||||
let mut cache_index = self.list_cache_contents();
|
||||
let future_tolerance = SystemTime::now()
|
||||
.checked_add(
|
||||
self.cache_config
|
||||
.allowed_clock_drift_for_files_from_future(),
|
||||
)
|
||||
.expect("Brace your cache, the next Big Bang is coming (time overflow)");
|
||||
cache_index.sort_unstable_by(|lhs, rhs| {
|
||||
// sort by age
|
||||
use CacheEntry::*;
|
||||
match (lhs, rhs) {
|
||||
(Recognized { mtime: lhs_mt, .. }, Recognized { mtime: rhs_mt, .. }) => {
|
||||
rhs_mt.cmp(lhs_mt)
|
||||
} // later == younger
|
||||
match (*lhs_mt > future_tolerance, *rhs_mt > future_tolerance) {
|
||||
// later == younger
|
||||
(false, false) => rhs_mt.cmp(lhs_mt),
|
||||
// files from far future are treated as oldest recognized files
|
||||
// we want to delete them, so the cache keeps track of recent files
|
||||
// however, we don't delete them uncodintionally,
|
||||
// because .stats file can be overwritten with a meaningful mtime
|
||||
(true, false) => cmp::Ordering::Greater,
|
||||
(false, true) => cmp::Ordering::Less,
|
||||
(true, true) => cmp::Ordering::Equal,
|
||||
}
|
||||
}
|
||||
// unrecognized is kind of infinity
|
||||
(Recognized { .. }, Unrecognized { .. }) => cmp::Ordering::Less,
|
||||
(Unrecognized { .. }, Recognized { .. }) => cmp::Ordering::Greater,
|
||||
@@ -467,12 +499,12 @@ impl WorkerThread {
|
||||
|
||||
total_size += size;
|
||||
if start_delete_idx_if_deleting_recognized_items.is_none() {
|
||||
if total_size >= tsl_if_deleting || (idx + 1) as u64 >= fcl_if_deleting {
|
||||
if total_size > tsl_if_deleting || (idx + 1) as u64 > fcl_if_deleting {
|
||||
start_delete_idx_if_deleting_recognized_items = Some(idx);
|
||||
}
|
||||
}
|
||||
|
||||
if total_size >= total_size_limit || (idx + 1) as u64 >= file_count_limit {
|
||||
if total_size > total_size_limit || (idx + 1) as u64 > file_count_limit {
|
||||
start_delete_idx = start_delete_idx_if_deleting_recognized_items;
|
||||
break;
|
||||
}
|
||||
@@ -593,20 +625,28 @@ impl WorkerThread {
|
||||
add_unrecognized!(file: path);
|
||||
}
|
||||
(2, false) => {
|
||||
// assumption: only mod cache (no ext), .stats & .wip-* files
|
||||
let ext = path.extension();
|
||||
if ext.is_none() || ext == Some(OsStr::new("stats")) {
|
||||
// mod or stats file
|
||||
cache_files.insert(path, entry);
|
||||
} else {
|
||||
// assume it's .wip file (lock)
|
||||
if is_fs_lock_expired(
|
||||
Some(&entry),
|
||||
&path,
|
||||
cache_config.optimizing_compression_task_timeout(),
|
||||
cache_config.allowed_clock_drift_for_files_from_future(),
|
||||
) {
|
||||
let recognized = if let Some(ext_str) = ext.unwrap().to_str() {
|
||||
// check if valid lock
|
||||
ext_str.starts_with("wip-")
|
||||
&& !is_fs_lock_expired(
|
||||
Some(&entry),
|
||||
&path,
|
||||
cache_config.optimizing_compression_task_timeout(),
|
||||
cache_config.allowed_clock_drift_for_files_from_future(),
|
||||
)
|
||||
} else {
|
||||
// if it's None, i.e. not valid UTF-8 string, then that's not our lock for sure
|
||||
false
|
||||
};
|
||||
|
||||
if !recognized {
|
||||
add_unrecognized!(file: path);
|
||||
} // else: skip active lock
|
||||
}
|
||||
}
|
||||
}
|
||||
(_, is_dir) => add_unrecognized!(is_dir, path),
|
||||
@@ -661,7 +701,7 @@ impl WorkerThread {
|
||||
);
|
||||
vec.push(CacheEntry::Recognized {
|
||||
path: mod_path.to_path_buf(),
|
||||
mtime: stats_mtime,
|
||||
mtime: stats_mtime.into(), // .into() called for the SystemTimeStub if cfg(test)
|
||||
size: mod_metadata.len(),
|
||||
})
|
||||
}
|
||||
@@ -677,7 +717,7 @@ impl WorkerThread {
|
||||
);
|
||||
vec.push(CacheEntry::Recognized {
|
||||
path: mod_path.to_path_buf(),
|
||||
mtime: mod_mtime,
|
||||
mtime: mod_mtime.into(), // .into() called for the SystemTimeStub if cfg(test)
|
||||
size: mod_metadata.len(),
|
||||
})
|
||||
}
|
||||
@@ -833,8 +873,7 @@ fn is_fs_lock_expired(
|
||||
allowed_future_drift: Duration,
|
||||
) -> bool {
|
||||
let mtime = match entry
|
||||
.map(|e| e.metadata())
|
||||
.unwrap_or_else(|| path.metadata())
|
||||
.map_or_else(|| path.metadata(), |e| e.metadata())
|
||||
.and_then(|metadata| metadata.modified())
|
||||
{
|
||||
Ok(mt) => mt,
|
||||
@@ -848,7 +887,8 @@ fn is_fs_lock_expired(
|
||||
}
|
||||
};
|
||||
|
||||
match mtime.elapsed() {
|
||||
// DON'T use: mtime.elapsed() -- we must call SystemTime directly for the tests to be deterministic
|
||||
match SystemTime::now().duration_since(mtime) {
|
||||
Ok(elapsed) => elapsed >= threshold,
|
||||
Err(err) => {
|
||||
trace!(
|
||||
@@ -864,4 +904,5 @@ fn is_fs_lock_expired(
|
||||
}
|
||||
}
|
||||
|
||||
// todo tests
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
758
wasmtime-environ/src/cache/worker/tests.rs
vendored
Normal file
758
wasmtime-environ/src/cache/worker/tests.rs
vendored
Normal file
@@ -0,0 +1,758 @@
|
||||
use super::*;
|
||||
use crate::cache::config::tests::test_prolog;
|
||||
use std::iter::repeat;
|
||||
use std::process;
|
||||
// load_config! comes from crate::cache(::config::tests);
|
||||
|
||||
// when doing anything with the tests, make sure they are DETERMINISTIC
|
||||
// -- the result shouldn't rely on system time!
|
||||
pub mod system_time_stub;
|
||||
|
||||
#[test]
|
||||
fn test_on_get_create_stats_file() {
|
||||
let (_tempdir, cache_dir, config_path) = test_prolog();
|
||||
let cache_config = load_config!(
|
||||
config_path,
|
||||
"[cache]\n\
|
||||
enabled = true\n\
|
||||
directory = {cache_dir}",
|
||||
cache_dir
|
||||
);
|
||||
assert!(cache_config.enabled());
|
||||
let worker = Worker::start_new(&cache_config, None);
|
||||
|
||||
let mod_file = cache_dir.join("some-mod");
|
||||
worker.on_cache_get_async(mod_file);
|
||||
worker.wait_for_all_events_handled();
|
||||
assert_eq!(worker.events_dropped(), 0);
|
||||
|
||||
let stats_file = cache_dir.join("some-mod.stats");
|
||||
let stats = read_stats_file(&stats_file).expect("Failed to read stats file");
|
||||
assert_eq!(stats.usages, 1);
|
||||
assert_eq!(
|
||||
stats.compression_level,
|
||||
cache_config.baseline_compression_level()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_on_get_update_usage_counter() {
|
||||
let (_tempdir, cache_dir, config_path) = test_prolog();
|
||||
let cache_config = load_config!(
|
||||
config_path,
|
||||
"[cache]\n\
|
||||
enabled = true\n\
|
||||
directory = {cache_dir}\n\
|
||||
worker-event-queue-size = '16'",
|
||||
cache_dir
|
||||
);
|
||||
assert!(cache_config.enabled());
|
||||
let worker = Worker::start_new(&cache_config, None);
|
||||
|
||||
let mod_file = cache_dir.join("some-mod");
|
||||
let stats_file = cache_dir.join("some-mod.stats");
|
||||
let default_stats = ModuleCacheStatistics::default(&cache_config);
|
||||
assert!(write_stats_file(&stats_file, &default_stats));
|
||||
|
||||
let mut usages = 0;
|
||||
for times_used in &[4, 7, 2] {
|
||||
for _ in 0..*times_used {
|
||||
worker.on_cache_get_async(mod_file.clone());
|
||||
usages += 1;
|
||||
}
|
||||
|
||||
worker.wait_for_all_events_handled();
|
||||
assert_eq!(worker.events_dropped(), 0);
|
||||
|
||||
let stats = read_stats_file(&stats_file).expect("Failed to read stats file");
|
||||
assert_eq!(stats.usages, usages);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_on_get_recompress_no_mod_file() {
|
||||
let (_tempdir, cache_dir, config_path) = test_prolog();
|
||||
let cache_config = load_config!(
|
||||
config_path,
|
||||
"[cache]\n\
|
||||
enabled = true\n\
|
||||
directory = {cache_dir}\n\
|
||||
worker-event-queue-size = '16'\n\
|
||||
baseline-compression-level = 3\n\
|
||||
optimized-compression-level = 7\n\
|
||||
optimized-compression-usage-counter-threshold = '256'",
|
||||
cache_dir
|
||||
);
|
||||
assert!(cache_config.enabled());
|
||||
let worker = Worker::start_new(&cache_config, None);
|
||||
|
||||
let mod_file = cache_dir.join("some-mod");
|
||||
let stats_file = cache_dir.join("some-mod.stats");
|
||||
let mut start_stats = ModuleCacheStatistics::default(&cache_config);
|
||||
start_stats.usages = 250;
|
||||
assert!(write_stats_file(&stats_file, &start_stats));
|
||||
|
||||
let mut usages = start_stats.usages;
|
||||
for times_used in &[4, 7, 2] {
|
||||
for _ in 0..*times_used {
|
||||
worker.on_cache_get_async(mod_file.clone());
|
||||
usages += 1;
|
||||
}
|
||||
|
||||
worker.wait_for_all_events_handled();
|
||||
assert_eq!(worker.events_dropped(), 0);
|
||||
|
||||
let stats = read_stats_file(&stats_file).expect("Failed to read stats file");
|
||||
assert_eq!(stats.usages, usages);
|
||||
assert_eq!(
|
||||
stats.compression_level,
|
||||
cache_config.baseline_compression_level()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_on_get_recompress_with_mod_file() {
|
||||
let (_tempdir, cache_dir, config_path) = test_prolog();
|
||||
let cache_config = load_config!(
|
||||
config_path,
|
||||
"[cache]\n\
|
||||
enabled = true\n\
|
||||
directory = {cache_dir}\n\
|
||||
worker-event-queue-size = '16'\n\
|
||||
baseline-compression-level = 3\n\
|
||||
optimized-compression-level = 7\n\
|
||||
optimized-compression-usage-counter-threshold = '256'",
|
||||
cache_dir
|
||||
);
|
||||
assert!(cache_config.enabled());
|
||||
let worker = Worker::start_new(&cache_config, None);
|
||||
|
||||
let mod_file = cache_dir.join("some-mod");
|
||||
let mod_data = "some test data to be compressed";
|
||||
let data = zstd::encode_all(
|
||||
mod_data.as_bytes(),
|
||||
cache_config.baseline_compression_level(),
|
||||
)
|
||||
.expect("Failed to compress sample mod file");
|
||||
fs::write(&mod_file, &data).expect("Failed to write sample mod file");
|
||||
|
||||
let stats_file = cache_dir.join("some-mod.stats");
|
||||
let mut start_stats = ModuleCacheStatistics::default(&cache_config);
|
||||
start_stats.usages = 250;
|
||||
assert!(write_stats_file(&stats_file, &start_stats));
|
||||
|
||||
// scenarios:
|
||||
// 1. Shouldn't be recompressed
|
||||
// 2. Should be recompressed
|
||||
// 3. After lowering compression level, should be recompressed
|
||||
let scenarios = [(4, false), (7, true), (2, false)];
|
||||
|
||||
let mut usages = start_stats.usages;
|
||||
assert!(usages < cache_config.optimized_compression_usage_counter_threshold());
|
||||
let mut tested_higher_opt_compr_lvl = false;
|
||||
for (times_used, lower_compr_lvl) in &scenarios {
|
||||
for _ in 0..*times_used {
|
||||
worker.on_cache_get_async(mod_file.clone());
|
||||
usages += 1;
|
||||
}
|
||||
|
||||
worker.wait_for_all_events_handled();
|
||||
assert_eq!(worker.events_dropped(), 0);
|
||||
|
||||
let mut stats = read_stats_file(&stats_file).expect("Failed to read stats file");
|
||||
assert_eq!(stats.usages, usages);
|
||||
assert_eq!(
|
||||
stats.compression_level,
|
||||
if usages < cache_config.optimized_compression_usage_counter_threshold() {
|
||||
cache_config.baseline_compression_level()
|
||||
} else {
|
||||
cache_config.optimized_compression_level()
|
||||
}
|
||||
);
|
||||
let compressed_data = fs::read(&mod_file).expect("Failed to read mod file");
|
||||
let decoded_data =
|
||||
zstd::decode_all(&compressed_data[..]).expect("Failed to decompress mod file");
|
||||
assert_eq!(decoded_data, mod_data.as_bytes());
|
||||
|
||||
if *lower_compr_lvl {
|
||||
assert!(usages >= cache_config.optimized_compression_usage_counter_threshold());
|
||||
tested_higher_opt_compr_lvl = true;
|
||||
stats.compression_level -= 1;
|
||||
assert!(write_stats_file(&stats_file, &stats));
|
||||
}
|
||||
}
|
||||
assert!(usages >= cache_config.optimized_compression_usage_counter_threshold());
|
||||
assert!(tested_higher_opt_compr_lvl);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_on_get_recompress_lock() {
|
||||
let (_tempdir, cache_dir, config_path) = test_prolog();
|
||||
let cache_config = load_config!(
|
||||
config_path,
|
||||
"[cache]\n\
|
||||
enabled = true\n\
|
||||
directory = {cache_dir}\n\
|
||||
worker-event-queue-size = '16'\n\
|
||||
baseline-compression-level = 3\n\
|
||||
optimized-compression-level = 7\n\
|
||||
optimized-compression-usage-counter-threshold = '256'\n\
|
||||
optimizing-compression-task-timeout = '30m'\n\
|
||||
allowed-clock-drift-for-files-from-future = '1d'",
|
||||
cache_dir
|
||||
);
|
||||
assert!(cache_config.enabled());
|
||||
let worker = Worker::start_new(&cache_config, None);
|
||||
|
||||
let mod_file = cache_dir.join("some-mod");
|
||||
let mod_data = "some test data to be compressed";
|
||||
let data = zstd::encode_all(
|
||||
mod_data.as_bytes(),
|
||||
cache_config.baseline_compression_level(),
|
||||
)
|
||||
.expect("Failed to compress sample mod file");
|
||||
fs::write(&mod_file, &data).expect("Failed to write sample mod file");
|
||||
|
||||
let stats_file = cache_dir.join("some-mod.stats");
|
||||
let mut start_stats = ModuleCacheStatistics::default(&cache_config);
|
||||
start_stats.usages = 255;
|
||||
|
||||
let lock_file = cache_dir.join("some-mod.wip-lock");
|
||||
|
||||
let scenarios = [
|
||||
// valid lock
|
||||
(true, "past", Duration::from_secs(30 * 60 - 1)),
|
||||
// valid future lock
|
||||
(true, "future", Duration::from_secs(24 * 60 * 60)),
|
||||
// expired lock
|
||||
(false, "past", Duration::from_secs(30 * 60)),
|
||||
// expired future lock
|
||||
(false, "future", Duration::from_secs(24 * 60 * 60 + 1)),
|
||||
];
|
||||
|
||||
for (lock_valid, duration_sign, duration) in &scenarios {
|
||||
assert!(write_stats_file(&stats_file, &start_stats)); // restore usage & compression level
|
||||
create_file_with_mtime(&lock_file, "", duration_sign, &duration);
|
||||
|
||||
worker.on_cache_get_async(mod_file.clone());
|
||||
worker.wait_for_all_events_handled();
|
||||
assert_eq!(worker.events_dropped(), 0);
|
||||
|
||||
let stats = read_stats_file(&stats_file).expect("Failed to read stats file");
|
||||
assert_eq!(stats.usages, start_stats.usages + 1);
|
||||
assert_eq!(
|
||||
stats.compression_level,
|
||||
if *lock_valid {
|
||||
cache_config.baseline_compression_level()
|
||||
} else {
|
||||
cache_config.optimized_compression_level()
|
||||
}
|
||||
);
|
||||
let compressed_data = fs::read(&mod_file).expect("Failed to read mod file");
|
||||
let decoded_data =
|
||||
zstd::decode_all(&compressed_data[..]).expect("Failed to decompress mod file");
|
||||
assert_eq!(decoded_data, mod_data.as_bytes());
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_on_update_fresh_stats_file() {
|
||||
let (_tempdir, cache_dir, config_path) = test_prolog();
|
||||
let cache_config = load_config!(
|
||||
config_path,
|
||||
"[cache]\n\
|
||||
enabled = true\n\
|
||||
directory = {cache_dir}\n\
|
||||
worker-event-queue-size = '16'\n\
|
||||
baseline-compression-level = 3\n\
|
||||
optimized-compression-level = 7\n\
|
||||
cleanup-interval = '1h'",
|
||||
cache_dir
|
||||
);
|
||||
assert!(cache_config.enabled());
|
||||
let worker = Worker::start_new(&cache_config, None);
|
||||
|
||||
let mod_file = cache_dir.join("some-mod");
|
||||
let stats_file = cache_dir.join("some-mod.stats");
|
||||
let cleanup_certificate = cache_dir.join(".cleanup.wip-done");
|
||||
create_file_with_mtime(&cleanup_certificate, "", "future", &Duration::from_secs(0));
|
||||
// the below created by the worker if it cleans up
|
||||
let worker_lock_file = cache_dir.join(format!(".cleanup.wip-{}", process::id()));
|
||||
|
||||
// scenarios:
|
||||
// 1. Create new stats file
|
||||
// 2. Overwrite existing file
|
||||
for update_file in &[true, false] {
|
||||
worker.on_cache_update_async(mod_file.clone());
|
||||
worker.wait_for_all_events_handled();
|
||||
assert_eq!(worker.events_dropped(), 0);
|
||||
|
||||
let mut stats = read_stats_file(&stats_file).expect("Failed to read stats file");
|
||||
assert_eq!(stats.usages, 1);
|
||||
assert_eq!(
|
||||
stats.compression_level,
|
||||
cache_config.baseline_compression_level()
|
||||
);
|
||||
|
||||
if *update_file {
|
||||
stats.usages += 42;
|
||||
stats.compression_level += 1;
|
||||
assert!(write_stats_file(&stats_file, &stats));
|
||||
}
|
||||
|
||||
assert!(!worker_lock_file.exists());
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_on_update_cleanup_limits_trash_locks() {
|
||||
let (_tempdir, cache_dir, config_path) = test_prolog();
|
||||
let cache_config = load_config!(
|
||||
config_path,
|
||||
"[cache]\n\
|
||||
enabled = true\n\
|
||||
directory = {cache_dir}\n\
|
||||
worker-event-queue-size = '16'\n\
|
||||
cleanup-interval = '30m'\n\
|
||||
optimizing-compression-task-timeout = '30m'\n\
|
||||
allowed-clock-drift-for-files-from-future = '1d'\n\
|
||||
file-count-soft-limit = '5'\n\
|
||||
files-total-size-soft-limit = '30K'\n\
|
||||
file-count-limit-percent-if-deleting = '70%'\n\
|
||||
files-total-size-limit-percent-if-deleting = '70%'
|
||||
",
|
||||
cache_dir
|
||||
);
|
||||
assert!(cache_config.enabled());
|
||||
let worker = Worker::start_new(&cache_config, None);
|
||||
let content_1k = "a".repeat(1_000);
|
||||
let content_10k = "a".repeat(10_000);
|
||||
|
||||
let mods_files_dir = cache_dir.join("target-triple").join("compiler-version");
|
||||
let mod_with_stats = mods_files_dir.join("mod-with-stats");
|
||||
let trash_dirs = [
|
||||
mods_files_dir.join("trash"),
|
||||
mods_files_dir.join("trash").join("trash"),
|
||||
];
|
||||
let trash_files = [
|
||||
cache_dir.join("trash-file"),
|
||||
cache_dir.join("trash-file.wip-lock"),
|
||||
cache_dir.join("target-triple").join("trash.txt"),
|
||||
cache_dir.join("target-triple").join("trash.txt.wip-lock"),
|
||||
mods_files_dir.join("trash.ogg"),
|
||||
mods_files_dir.join("trash").join("trash.doc"),
|
||||
mods_files_dir.join("trash").join("trash.doc.wip-lock"),
|
||||
mods_files_dir.join("trash").join("trash").join("trash.xls"),
|
||||
mods_files_dir
|
||||
.join("trash")
|
||||
.join("trash")
|
||||
.join("trash.xls.wip-lock"),
|
||||
];
|
||||
let mod_locks = [
|
||||
// valid lock
|
||||
(
|
||||
mods_files_dir.join("mod0.wip-lock"),
|
||||
true,
|
||||
"past",
|
||||
Duration::from_secs(30 * 60 - 1),
|
||||
),
|
||||
// valid future lock
|
||||
(
|
||||
mods_files_dir.join("mod1.wip-lock"),
|
||||
true,
|
||||
"future",
|
||||
Duration::from_secs(24 * 60 * 60),
|
||||
),
|
||||
// expired lock
|
||||
(
|
||||
mods_files_dir.join("mod2.wip-lock"),
|
||||
false,
|
||||
"past",
|
||||
Duration::from_secs(30 * 60),
|
||||
),
|
||||
// expired future lock
|
||||
(
|
||||
mods_files_dir.join("mod3.wip-lock"),
|
||||
false,
|
||||
"future",
|
||||
Duration::from_secs(24 * 60 * 60 + 1),
|
||||
),
|
||||
];
|
||||
// the below created by the worker if it cleans up
|
||||
let worker_lock_file = cache_dir.join(format!(".cleanup.wip-{}", process::id()));
|
||||
|
||||
let scenarios = [
|
||||
// Close to limits, but not reached, only trash deleted
|
||||
(2, 2, 4),
|
||||
// File count limit exceeded
|
||||
(1, 10, 3),
|
||||
// Total size limit exceeded
|
||||
(4, 0, 2),
|
||||
// Both limits exceeded
|
||||
(3, 5, 3),
|
||||
];
|
||||
|
||||
for (files_10k, files_1k, remaining_files) in &scenarios {
|
||||
let mut secs_ago = 100;
|
||||
|
||||
for d in &trash_dirs {
|
||||
fs::create_dir_all(d).expect("Failed to create directories");
|
||||
}
|
||||
for f in &trash_files {
|
||||
create_file_with_mtime(f, "", "past", &Duration::from_secs(0));
|
||||
}
|
||||
for (f, _, sign, duration) in &mod_locks {
|
||||
create_file_with_mtime(f, "", sign, &duration);
|
||||
}
|
||||
|
||||
let mut mods_paths = vec![];
|
||||
for content in repeat(&content_10k)
|
||||
.take(*files_10k)
|
||||
.chain(repeat(&content_1k).take(*files_1k))
|
||||
{
|
||||
mods_paths.push(mods_files_dir.join(format!("test-mod-{}", mods_paths.len())));
|
||||
create_file_with_mtime(
|
||||
mods_paths.last().unwrap(),
|
||||
content,
|
||||
"past",
|
||||
&Duration::from_secs(secs_ago),
|
||||
);
|
||||
assert!(secs_ago > 0);
|
||||
secs_ago -= 1;
|
||||
}
|
||||
|
||||
// creating .stats file updates mtime what affects test results
|
||||
// so we use a separate nonexistent module here (orphaned .stats will be removed anyway)
|
||||
worker.on_cache_update_async(mod_with_stats.clone());
|
||||
worker.wait_for_all_events_handled();
|
||||
assert_eq!(worker.events_dropped(), 0);
|
||||
|
||||
for ent in trash_dirs.iter().chain(trash_files.iter()) {
|
||||
assert!(!ent.exists());
|
||||
}
|
||||
for (f, valid, ..) in &mod_locks {
|
||||
assert_eq!(f.exists(), *valid);
|
||||
}
|
||||
for (idx, path) in mods_paths.iter().enumerate() {
|
||||
let should_exist = idx >= mods_paths.len() - *remaining_files;
|
||||
assert_eq!(path.exists(), should_exist);
|
||||
if should_exist {
|
||||
// cleanup before next iteration
|
||||
fs::remove_file(path).expect("Failed to remove a file");
|
||||
}
|
||||
}
|
||||
fs::remove_file(&worker_lock_file).expect("Failed to remove lock file");
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_on_update_cleanup_lru_policy() {
|
||||
let (_tempdir, cache_dir, config_path) = test_prolog();
|
||||
let cache_config = load_config!(
|
||||
config_path,
|
||||
"[cache]\n\
|
||||
enabled = true\n\
|
||||
directory = {cache_dir}\n\
|
||||
worker-event-queue-size = '16'\n\
|
||||
file-count-soft-limit = '5'\n\
|
||||
files-total-size-soft-limit = '30K'\n\
|
||||
file-count-limit-percent-if-deleting = '80%'\n\
|
||||
files-total-size-limit-percent-if-deleting = '70%'",
|
||||
cache_dir
|
||||
);
|
||||
assert!(cache_config.enabled());
|
||||
let worker = Worker::start_new(&cache_config, None);
|
||||
let content_1k = "a".repeat(1_000);
|
||||
let content_5k = "a".repeat(5_000);
|
||||
let content_10k = "a".repeat(10_000);
|
||||
|
||||
let mods_files_dir = cache_dir.join("target-triple").join("compiler-version");
|
||||
fs::create_dir_all(&mods_files_dir).expect("Failed to create directories");
|
||||
let nonexistent_mod_file = cache_dir.join("nonexistent-mod");
|
||||
let orphaned_stats_file = cache_dir.join("orphaned-mod.stats");
|
||||
let worker_lock_file = cache_dir.join(format!(".cleanup.wip-{}", process::id()));
|
||||
|
||||
// content, how long ago created, how long ago stats created (if created), should be alive
|
||||
let scenarios = [
|
||||
&[
|
||||
(&content_10k, 29, None, false),
|
||||
(&content_10k, 28, None, false),
|
||||
(&content_10k, 27, None, false),
|
||||
(&content_1k, 26, None, true),
|
||||
(&content_10k, 25, None, true),
|
||||
(&content_1k, 24, None, true),
|
||||
],
|
||||
&[
|
||||
(&content_10k, 29, None, false),
|
||||
(&content_10k, 28, None, false),
|
||||
(&content_10k, 27, None, true),
|
||||
(&content_1k, 26, None, true),
|
||||
(&content_5k, 25, None, true),
|
||||
(&content_1k, 24, None, true),
|
||||
],
|
||||
&[
|
||||
(&content_10k, 29, Some(19), true),
|
||||
(&content_10k, 28, None, false),
|
||||
(&content_10k, 27, None, false),
|
||||
(&content_1k, 26, Some(18), true),
|
||||
(&content_5k, 25, None, true),
|
||||
(&content_1k, 24, None, true),
|
||||
],
|
||||
&[
|
||||
(&content_10k, 29, Some(19), true),
|
||||
(&content_10k, 28, Some(18), true),
|
||||
(&content_10k, 27, None, false),
|
||||
(&content_1k, 26, Some(17), true),
|
||||
(&content_5k, 25, None, false),
|
||||
(&content_1k, 24, None, false),
|
||||
],
|
||||
&[
|
||||
(&content_10k, 29, Some(19), true),
|
||||
(&content_10k, 28, None, false),
|
||||
(&content_1k, 27, None, false),
|
||||
(&content_5k, 26, Some(18), true),
|
||||
(&content_1k, 25, None, false),
|
||||
(&content_10k, 24, None, false),
|
||||
],
|
||||
];
|
||||
|
||||
for mods in &scenarios {
|
||||
let filenames = (0..mods.len())
|
||||
.map(|i| {
|
||||
(
|
||||
mods_files_dir.join(format!("mod-{}", i)),
|
||||
mods_files_dir.join(format!("mod-{}.stats", i)),
|
||||
)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
for ((content, mod_secs_ago, create_stats, _), (mod_filename, stats_filename)) in
|
||||
mods.iter().zip(filenames.iter())
|
||||
{
|
||||
create_file_with_mtime(
|
||||
mod_filename,
|
||||
content,
|
||||
"past",
|
||||
&Duration::from_secs(*mod_secs_ago),
|
||||
);
|
||||
if let Some(stats_secs_ago) = create_stats {
|
||||
create_file_with_mtime(
|
||||
stats_filename,
|
||||
"cleanup doesn't care",
|
||||
"past",
|
||||
&Duration::from_secs(*stats_secs_ago),
|
||||
);
|
||||
}
|
||||
}
|
||||
create_file_with_mtime(
|
||||
&orphaned_stats_file,
|
||||
"cleanup doesn't care",
|
||||
"past",
|
||||
&Duration::from_secs(0),
|
||||
);
|
||||
|
||||
worker.on_cache_update_async(nonexistent_mod_file.clone());
|
||||
worker.wait_for_all_events_handled();
|
||||
assert_eq!(worker.events_dropped(), 0);
|
||||
|
||||
assert!(!orphaned_stats_file.exists());
|
||||
for ((_, _, create_stats, alive), (mod_filename, stats_filename)) in
|
||||
mods.iter().zip(filenames.iter())
|
||||
{
|
||||
assert_eq!(mod_filename.exists(), *alive);
|
||||
assert_eq!(stats_filename.exists(), *alive && create_stats.is_some());
|
||||
|
||||
// cleanup for next iteration
|
||||
if *alive {
|
||||
fs::remove_file(&mod_filename).expect("Failed to remove a file");
|
||||
if create_stats.is_some() {
|
||||
fs::remove_file(&stats_filename).expect("Failed to remove a file");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fs::remove_file(&worker_lock_file).expect("Failed to remove lock file");
|
||||
}
|
||||
}
|
||||
|
||||
// clock drift should be applied to mod cache & stats, too
|
||||
// however, postpone deleting files to as late as possible
|
||||
#[test]
|
||||
fn test_on_update_cleanup_future_files() {
|
||||
let (_tempdir, cache_dir, config_path) = test_prolog();
|
||||
let cache_config = load_config!(
|
||||
config_path,
|
||||
"[cache]\n\
|
||||
enabled = true\n\
|
||||
directory = {cache_dir}\n\
|
||||
worker-event-queue-size = '16'\n\
|
||||
allowed-clock-drift-for-files-from-future = '1d'\n\
|
||||
file-count-soft-limit = '3'\n\
|
||||
files-total-size-soft-limit = '1M'\n\
|
||||
file-count-limit-percent-if-deleting = '70%'\n\
|
||||
files-total-size-limit-percent-if-deleting = '70%'",
|
||||
cache_dir
|
||||
);
|
||||
assert!(cache_config.enabled());
|
||||
let worker = Worker::start_new(&cache_config, None);
|
||||
let content_1k = "a".repeat(1_000);
|
||||
|
||||
let mods_files_dir = cache_dir.join("target-triple").join("compiler-version");
|
||||
fs::create_dir_all(&mods_files_dir).expect("Failed to create directories");
|
||||
let nonexistent_mod_file = cache_dir.join("nonexistent-mod");
|
||||
// the below created by the worker if it cleans up
|
||||
let worker_lock_file = cache_dir.join(format!(".cleanup.wip-{}", process::id()));
|
||||
|
||||
let scenarios: [&[_]; 5] = [
|
||||
// NOT cleaning up, everythings ok
|
||||
&[
|
||||
(Duration::from_secs(0), None, true),
|
||||
(Duration::from_secs(24 * 60 * 60), None, true),
|
||||
],
|
||||
// NOT cleaning up, everythings ok
|
||||
&[
|
||||
(Duration::from_secs(0), None, true),
|
||||
(Duration::from_secs(24 * 60 * 60 + 1), None, true),
|
||||
],
|
||||
// cleaning up, removing files from oldest
|
||||
&[
|
||||
(Duration::from_secs(0), None, false),
|
||||
(Duration::from_secs(24 * 60 * 60), None, true),
|
||||
(Duration::from_secs(1), None, false),
|
||||
(Duration::from_secs(2), None, true),
|
||||
],
|
||||
// cleaning up, removing files from oldest; deleting file from far future
|
||||
&[
|
||||
(Duration::from_secs(0), None, false),
|
||||
(Duration::from_secs(1), None, true),
|
||||
(Duration::from_secs(24 * 60 * 60 + 1), None, false),
|
||||
(Duration::from_secs(2), None, true),
|
||||
],
|
||||
// cleaning up, removing files from oldest; file from far future should have .stats from +-now => it's a legitimate file
|
||||
&[
|
||||
(Duration::from_secs(0), None, false),
|
||||
(Duration::from_secs(1), None, false),
|
||||
(
|
||||
Duration::from_secs(24 * 60 * 60 + 1),
|
||||
Some(Duration::from_secs(3)),
|
||||
true,
|
||||
),
|
||||
(Duration::from_secs(2), None, true),
|
||||
],
|
||||
];
|
||||
|
||||
for mods in &scenarios {
|
||||
let filenames = (0..mods.len())
|
||||
.map(|i| {
|
||||
(
|
||||
mods_files_dir.join(format!("mod-{}", i)),
|
||||
mods_files_dir.join(format!("mod-{}.stats", i)),
|
||||
)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
for ((duration, opt_stats_duration, _), (mod_filename, stats_filename)) in
|
||||
mods.iter().zip(filenames.iter())
|
||||
{
|
||||
create_file_with_mtime(mod_filename, &content_1k, "future", duration);
|
||||
if let Some(stats_duration) = opt_stats_duration {
|
||||
create_file_with_mtime(stats_filename, "", "future", stats_duration);
|
||||
}
|
||||
}
|
||||
|
||||
worker.on_cache_update_async(nonexistent_mod_file.clone());
|
||||
worker.wait_for_all_events_handled();
|
||||
assert_eq!(worker.events_dropped(), 0);
|
||||
|
||||
for ((_, opt_stats_duration, alive), (mod_filename, stats_filename)) in
|
||||
mods.iter().zip(filenames.iter())
|
||||
{
|
||||
assert_eq!(mod_filename.exists(), *alive);
|
||||
assert_eq!(
|
||||
stats_filename.exists(),
|
||||
*alive && opt_stats_duration.is_some()
|
||||
);
|
||||
if *alive {
|
||||
fs::remove_file(mod_filename).expect("Failed to remove a file");
|
||||
if opt_stats_duration.is_some() {
|
||||
fs::remove_file(stats_filename).expect("Failed to remove a file");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fs::remove_file(&worker_lock_file).expect("Failed to remove lock file");
|
||||
}
|
||||
}
|
||||
|
||||
// this tests if worker triggered cleanup or not when some cleanup lock/certificate was out there
|
||||
#[test]
|
||||
fn test_on_update_cleanup_self_lock() {
|
||||
let (_tempdir, cache_dir, config_path) = test_prolog();
|
||||
let cache_config = load_config!(
|
||||
config_path,
|
||||
"[cache]\n\
|
||||
enabled = true\n\
|
||||
directory = {cache_dir}\n\
|
||||
worker-event-queue-size = '16'\n\
|
||||
cleanup-interval = '30m'\n\
|
||||
allowed-clock-drift-for-files-from-future = '1d'",
|
||||
cache_dir
|
||||
);
|
||||
assert!(cache_config.enabled());
|
||||
let worker = Worker::start_new(&cache_config, None);
|
||||
|
||||
let mod_file = cache_dir.join("some-mod");
|
||||
let trash_file = cache_dir.join("trash-file.txt");
|
||||
|
||||
let lock_file = cache_dir.join(".cleanup.wip-lock");
|
||||
// the below created by the worker if it cleans up
|
||||
let worker_lock_file = cache_dir.join(format!(".cleanup.wip-{}", process::id()));
|
||||
|
||||
let scenarios = [
|
||||
// valid lock
|
||||
(true, "past", Duration::from_secs(30 * 60 - 1)),
|
||||
// valid future lock
|
||||
(true, "future", Duration::from_secs(24 * 60 * 60)),
|
||||
// expired lock
|
||||
(false, "past", Duration::from_secs(30 * 60)),
|
||||
// expired future lock
|
||||
(false, "future", Duration::from_secs(24 * 60 * 60 + 1)),
|
||||
];
|
||||
|
||||
for (lock_valid, duration_sign, duration) in &scenarios {
|
||||
create_file_with_mtime(
|
||||
&trash_file,
|
||||
"with trash content",
|
||||
"future",
|
||||
&Duration::from_secs(0),
|
||||
);
|
||||
create_file_with_mtime(&lock_file, "", duration_sign, &duration);
|
||||
|
||||
worker.on_cache_update_async(mod_file.clone());
|
||||
worker.wait_for_all_events_handled();
|
||||
assert_eq!(worker.events_dropped(), 0);
|
||||
|
||||
assert_eq!(trash_file.exists(), *lock_valid);
|
||||
assert_eq!(lock_file.exists(), *lock_valid);
|
||||
if *lock_valid {
|
||||
assert!(!worker_lock_file.exists());
|
||||
} else {
|
||||
fs::remove_file(&worker_lock_file).expect("Failed to remove lock file");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn create_file_with_mtime(filename: &Path, contents: &str, offset_sign: &str, offset: &Duration) {
|
||||
fs::write(filename, contents).expect("Failed to create a file");
|
||||
let mtime = match offset_sign {
|
||||
"past" => system_time_stub::NOW
|
||||
.checked_sub(*offset)
|
||||
.expect("Failed to calculate new mtime"),
|
||||
"future" => system_time_stub::NOW
|
||||
.checked_add(*offset)
|
||||
.expect("Failed to calculate new mtime"),
|
||||
_ => unreachable!(),
|
||||
};
|
||||
filetime::set_file_mtime(filename, mtime.into()).expect("Failed to set mtime");
|
||||
}
|
||||
29
wasmtime-environ/src/cache/worker/tests/system_time_stub.rs
vendored
Normal file
29
wasmtime-environ/src/cache/worker/tests/system_time_stub.rs
vendored
Normal file
@@ -0,0 +1,29 @@
|
||||
use lazy_static::lazy_static;
|
||||
use std::time::{Duration, SystemTime, SystemTimeError};
|
||||
|
||||
lazy_static! {
|
||||
pub static ref NOW: SystemTime = SystemTime::now(); // no need for RefCell and set_now() for now
|
||||
}
|
||||
|
||||
#[derive(PartialOrd, PartialEq, Ord, Eq)]
|
||||
pub struct SystemTimeStub(SystemTime);
|
||||
|
||||
impl SystemTimeStub {
|
||||
pub fn now() -> Self {
|
||||
Self(*NOW)
|
||||
}
|
||||
|
||||
pub fn checked_add(&self, duration: Duration) -> Option<Self> {
|
||||
self.0.checked_add(duration).map(|t| t.into())
|
||||
}
|
||||
|
||||
pub fn duration_since(&self, earlier: SystemTime) -> Result<Duration, SystemTimeError> {
|
||||
self.0.duration_since(earlier)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<SystemTime> for SystemTimeStub {
|
||||
fn from(time: SystemTime) -> Self {
|
||||
Self(time)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user