diff --git a/src/bin/wasm2obj.rs b/src/bin/wasm2obj.rs index acefb5e18d..cbcb36e456 100644 --- a/src/bin/wasm2obj.rs +++ b/src/bin/wasm2obj.rs @@ -50,7 +50,7 @@ use std::str; use std::str::FromStr; use target_lexicon::Triple; use wasmtime_debug::{emit_debugsections, read_debuginfo}; -use wasmtime_environ::cache_config; +use wasmtime_environ::cache_init; use wasmtime_environ::{ Compiler, Cranelift, ModuleEnvironment, ModuleVmctxInfo, Tunables, VMOffsets, }; @@ -123,7 +123,7 @@ fn main() { Some(prefix) }; - let errors = cache_config::init( + let errors = cache_init( args.flag_cache || args.flag_cache_config_file.is_some(), args.flag_cache_config_file.as_ref(), args.flag_create_cache_config, diff --git a/src/bin/wasmtime.rs b/src/bin/wasmtime.rs index b770b48a57..02f8d39463 100644 --- a/src/bin/wasmtime.rs +++ b/src/bin/wasmtime.rs @@ -45,7 +45,7 @@ use std::process::exit; use wabt; use wasi_common::preopen_dir; use wasmtime_api::{Config, Engine, HostRef, Instance, Module, Store}; -use wasmtime_environ::cache_config; +use wasmtime_environ::cache_init; use wasmtime_interface_types::ModuleData; use wasmtime_jit::Features; use wasmtime_wasi::instantiate_wasi; @@ -222,7 +222,7 @@ fn rmain() -> Result<(), Error> { Some(prefix) }; - let errors = cache_config::init( + let errors = cache_init( args.flag_cache || args.flag_cache_config_file.is_some(), args.flag_cache_config_file.as_ref(), args.flag_create_cache_config, diff --git a/src/bin/wast.rs b/src/bin/wast.rs index 098d204eff..175be9c95b 100644 --- a/src/bin/wast.rs +++ b/src/bin/wast.rs @@ -33,7 +33,7 @@ use pretty_env_logger; use serde::Deserialize; use std::path::Path; use std::process; -use wasmtime_environ::cache_config; +use wasmtime_environ::cache_init; use wasmtime_jit::{Compiler, Features}; use wasmtime_wast::WastContext; @@ -89,7 +89,7 @@ fn main() { Some(prefix) }; - let errors = cache_config::init( + let errors = cache_init( args.flag_cache || args.flag_cache_config_file.is_some(), args.flag_cache_config_file.as_ref(), args.flag_create_cache_config, diff --git a/wasmtime-environ/src/cache.rs b/wasmtime-environ/src/cache.rs index 0a4359236d..27f8387194 100644 --- a/wasmtime-environ/src/cache.rs +++ b/wasmtime-environ/src/cache.rs @@ -15,10 +15,13 @@ use std::io::Write; use std::path::{Path, PathBuf}; use std::string::{String, ToString}; -pub mod config; -use config as cache_config; // so we have namespaced methods +mod config; mod worker; +pub use config::init; +use config::{cache_config, CacheConfig}; +use worker::worker; + lazy_static! { static ref SELF_MTIME: String = { std::env::current_exe() @@ -45,8 +48,9 @@ lazy_static! { }; } -pub struct ModuleCacheEntry { +pub struct ModuleCacheEntry<'config> { mod_cache_path: Option, + cache_config: &'config CacheConfig, } #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] @@ -68,7 +72,7 @@ type ModuleCacheDataTupleType = ( struct Sha256Hasher(Sha256); -impl ModuleCacheEntry { +impl<'config> ModuleCacheEntry<'config> { pub fn new<'data>( module: &Module, function_body_inputs: &PrimaryMap>, @@ -76,7 +80,25 @@ impl ModuleCacheEntry { compiler_name: &str, generate_debug_info: bool, ) -> Self { - let mod_cache_path = if cache_config::enabled() { + Self::new_with_config( + module, + function_body_inputs, + isa, + compiler_name, + generate_debug_info, + cache_config(), + ) + } + + fn new_with_config<'data>( + module: &Module, + function_body_inputs: &PrimaryMap>, + isa: &dyn isa::TargetIsa, + compiler_name: &str, + generate_debug_info: bool, + cache_config: &'config CacheConfig, + ) -> Self { + let mod_cache_path = if cache_config.enabled() { let hash = Sha256Hasher::digest(module, function_body_inputs); let compiler_dir = if cfg!(debug_assertions) { format!( @@ -98,7 +120,8 @@ impl ModuleCacheEntry { mod_dbg = if generate_debug_info { ".d" } else { "" }, ); Some( - cache_config::directory() + cache_config + .directory() .join(isa.triple().to_string()) .join(compiler_dir) .join(mod_filename), @@ -107,7 +130,10 @@ impl ModuleCacheEntry { None }; - Self { mod_cache_path } + Self { + mod_cache_path, + cache_config, + } } pub fn get_data(&self) -> Option { @@ -121,14 +147,14 @@ impl ModuleCacheEntry { .map_err(|err| warn!("Failed to deserialize cached code: {}", err)) .ok()?; - worker::on_cache_get_async(path); // call on success + worker().on_cache_get_async(path); // call on success Some(ret) } pub fn update_data(&self, data: &ModuleCacheData) { if self.update_data_impl(data).is_some() { let path = self.mod_cache_path.as_ref().unwrap(); - worker::on_cache_update_async(path); // call on success + worker().on_cache_update_async(path); // call on success } } @@ -140,7 +166,7 @@ impl ModuleCacheEntry { .ok()?; let compressed_data = zstd::encode_all( &serialized_data[..], - cache_config::baseline_compression_level(), + self.cache_config.baseline_compression_level(), ) .map_err(|err| warn!("Failed to compress cached code: {}", err)) .ok()?; diff --git a/wasmtime-environ/src/cache/config.rs b/wasmtime-environ/src/cache/config.rs index 14611149df..d234e78774 100644 --- a/wasmtime-environ/src/cache/config.rs +++ b/wasmtime-environ/src/cache/config.rs @@ -22,46 +22,53 @@ struct Config { cache: CacheConfig, } -// todo: markdown documention of these options +// todo: markdown documention of these options (name, format, default, explanation) // todo: don't flush default values (create config from simple template + url to docs) // todo: more user-friendly cache config creation -#[derive(Serialize, Deserialize, Debug)] -struct CacheConfig { +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct CacheConfig { #[serde(skip)] - pub errors: Vec, + errors: Vec, - pub enabled: bool, - pub directory: Option, + enabled: bool, + directory: Option, #[serde(rename = "worker-event-queue-size")] - pub worker_event_queue_size: Option, + worker_event_queue_size: Option, #[serde(rename = "baseline-compression-level")] - pub baseline_compression_level: Option, + baseline_compression_level: Option, #[serde(rename = "optimized-compression-level")] - pub optimized_compression_level: Option, + optimized_compression_level: Option, #[serde(rename = "optimized-compression-usage-counter-threshold")] - pub optimized_compression_usage_counter_threshold: Option, + optimized_compression_usage_counter_threshold: Option, #[serde( default, rename = "cleanup-interval-in-seconds", serialize_with = "serialize_duration", deserialize_with = "deserialize_duration" )] // todo unit? - pub cleanup_interval: Option, + cleanup_interval: Option, #[serde( default, rename = "optimizing-compression-task-timeout-in-seconds", serialize_with = "serialize_duration", deserialize_with = "deserialize_duration" )] // todo unit? - pub optimizing_compression_task_timeout: Option, + optimizing_compression_task_timeout: Option, + #[serde( + default, + rename = "allowed-clock-drift-for-locks-from-future", + serialize_with = "serialize_duration", + deserialize_with = "deserialize_duration" + )] // todo unit? + allowed_clock_drift_for_locks_from_future: Option, #[serde(rename = "files-count-soft-limit")] - pub files_count_soft_limit: Option, + files_count_soft_limit: Option, #[serde(rename = "files-total-size-soft-limit")] - pub files_total_size_soft_limit: Option, // todo unit? + files_total_size_soft_limit: Option, // todo unit? #[serde(rename = "files-count-limit-percent-if-deleting")] - pub files_count_limit_percent_if_deleting: Option, // todo format: integer + % + files_count_limit_percent_if_deleting: Option, // todo format: integer + % #[serde(rename = "files-total-size-limit-percent-if-deleting")] - pub files_total_size_limit_percent_if_deleting: Option, + files_total_size_limit_percent_if_deleting: Option, } // toml-rs fails to serialize Duration ("values must be emitted before tables") @@ -84,53 +91,14 @@ where static CONFIG: Once = Once::new(); static INIT_CALLED: AtomicBool = AtomicBool::new(false); -/// Returns true if and only if the cache is enabled. -pub fn enabled() -> bool { - // Not everyone knows about the cache system, i.e. the tests, - // so the default is cache disabled. - CONFIG - .call_once(|| CacheConfig::new_cache_disabled()) - .enabled -} - -/// Returns path to the cache directory. +/// Returns cache configuration. /// -/// Panics if the cache is disabled. -pub fn directory() -> &'static PathBuf { - &CONFIG - .r#try() - .expect("Cache system must be initialized") - .directory - .as_ref() - .expect("All cache system settings must be validated or defaulted") +/// If system has not been initialized, it disables it. +/// You mustn't call init() after it. +pub fn cache_config() -> &'static CacheConfig { + CONFIG.call_once(|| CacheConfig::new_cache_disabled()) } -macro_rules! generate_setting_getter { - ($setting:ident: $setting_type:ty) => { - /// Returns `$setting`. - /// - /// Panics if the cache is disabled. - pub fn $setting() -> $setting_type { - CONFIG - .r#try() - .expect("Cache system must be initialized") - .$setting - .expect("All cache system settings must be validated or defaulted") - } - }; -} - -generate_setting_getter!(worker_event_queue_size: usize); -generate_setting_getter!(baseline_compression_level: i32); -generate_setting_getter!(optimized_compression_level: i32); -generate_setting_getter!(optimized_compression_usage_counter_threshold: u64); -generate_setting_getter!(cleanup_interval: Duration); -generate_setting_getter!(optimizing_compression_task_timeout: Duration); -generate_setting_getter!(files_count_soft_limit: u64); -generate_setting_getter!(files_total_size_soft_limit: u64); -generate_setting_getter!(files_count_limit_percent_if_deleting: u8); -generate_setting_getter!(files_total_size_limit_percent_if_deleting: u8); - /// Initializes the cache system. Should be called exactly once, /// and before using the cache system. Otherwise it can panic. /// Returns list of errors. If empty, initialization succeeded. @@ -178,12 +146,53 @@ const DEFAULT_OPTIMIZED_COMPRESSION_LEVEL: i32 = 20; const DEFAULT_OPTIMIZED_COMPRESSION_USAGE_COUNTER_THRESHOLD: u64 = 0x100; const DEFAULT_CLEANUP_INTERVAL: Duration = Duration::from_secs(60 * 60); const DEFAULT_OPTIMIZING_COMPRESSION_TASK_TIMEOUT: Duration = Duration::from_secs(30 * 60); +const DEFAULT_ALLOWED_CLOCK_DRIFT_FOR_LOCKS_FROM_FUTURE: Duration = + Duration::from_secs(60 * 60 * 24); const DEFAULT_FILES_COUNT_SOFT_LIMIT: u64 = 0x10_000; const DEFAULT_FILES_TOTAL_SIZE_SOFT_LIMIT: u64 = 1024 * 1024 * 512; const DEFAULT_FILES_COUNT_LIMIT_PERCENT_IF_DELETING: u8 = 70; const DEFAULT_FILES_TOTAL_SIZE_LIMIT_PERCENT_IF_DELETING: u8 = 70; +macro_rules! generate_setting_getter { + ($setting:ident: $setting_type:ty) => { + /// Returns `$setting`. + /// + /// Panics if the cache is disabled. + pub fn $setting(&self) -> $setting_type { + self + .$setting + .expect("All cache system settings must be validated or defaulted") + } + }; +} + impl CacheConfig { + generate_setting_getter!(worker_event_queue_size: usize); + generate_setting_getter!(baseline_compression_level: i32); + generate_setting_getter!(optimized_compression_level: i32); + generate_setting_getter!(optimized_compression_usage_counter_threshold: u64); + generate_setting_getter!(cleanup_interval: Duration); + generate_setting_getter!(optimizing_compression_task_timeout: Duration); + generate_setting_getter!(allowed_clock_drift_for_locks_from_future: Duration); + generate_setting_getter!(files_count_soft_limit: u64); + generate_setting_getter!(files_total_size_soft_limit: u64); + generate_setting_getter!(files_count_limit_percent_if_deleting: u8); + generate_setting_getter!(files_total_size_limit_percent_if_deleting: u8); + + /// Returns true if and only if the cache is enabled. + pub fn enabled(&self) -> bool { + self.enabled + } + + /// Returns path to the cache directory. + /// + /// Panics if the cache is disabled. + pub fn directory(&self) -> &PathBuf { + self.directory + .as_ref() + .expect("All cache system settings must be validated or defaulted") + } + pub fn new_cache_disabled() -> Self { Self { errors: Vec::new(), @@ -195,6 +204,7 @@ impl CacheConfig { optimized_compression_usage_counter_threshold: None, cleanup_interval: None, optimizing_compression_task_timeout: None, + allowed_clock_drift_for_locks_from_future: None, files_count_soft_limit: None, files_total_size_soft_limit: None, files_count_limit_percent_if_deleting: None, @@ -237,6 +247,7 @@ impl CacheConfig { config.validate_optimized_compression_usage_counter_threshold_or_default(); config.validate_cleanup_interval_or_default(); config.validate_optimizing_compression_task_timeout_or_default(); + config.validate_allowed_clock_drift_for_locks_from_future_or_default(); config.validate_files_count_soft_limit_or_default(); config.validate_files_total_size_soft_limit_or_default(); config.validate_files_count_limit_percent_if_deleting_or_default(); @@ -410,6 +421,13 @@ impl CacheConfig { } } + fn validate_allowed_clock_drift_for_locks_from_future_or_default(&mut self) { + if self.allowed_clock_drift_for_locks_from_future.is_none() { + self.allowed_clock_drift_for_locks_from_future = + Some(DEFAULT_ALLOWED_CLOCK_DRIFT_FOR_LOCKS_FROM_FUTURE); + } + } + fn validate_files_count_soft_limit_or_default(&mut self) { if self.files_count_soft_limit.is_none() { self.files_count_soft_limit = Some(DEFAULT_FILES_COUNT_SOFT_LIMIT); diff --git a/wasmtime-environ/src/cache/tests.rs b/wasmtime-environ/src/cache/tests.rs index 2392188979..155d427635 100644 --- a/wasmtime-environ/src/cache/tests.rs +++ b/wasmtime-environ/src/cache/tests.rs @@ -19,7 +19,7 @@ use tempfile; // Since cache system is a global thing, each test needs to be run in seperate process. // So, init() tests are run as integration tests. // However, caching is a private thing, an implementation detail, and needs to be tested -// from the inside of the module. Thus we have one big test here. +// from the inside of the module. #[test] fn test_write_read_cache() { @@ -40,16 +40,17 @@ fn test_write_read_cache() { ); fs::write(&config_path, config_content).expect("Failed to write test config file"); - let errors = cache_config::init(true, Some(&config_path), false, None); + let errors = init(true, Some(&config_path), false, None); assert!(errors.is_empty()); - assert!(cache_config::enabled()); + let cache_config = cache_config(); + assert!(cache_config.enabled()); // assumption: config init creates cache directory and returns canonicalized path assert_eq!( - *cache_config::directory(), + *cache_config.directory(), fs::canonicalize(cache_dir).unwrap() ); assert_eq!( - cache_config::baseline_compression_level(), + cache_config.baseline_compression_level(), baseline_compression_level ); @@ -276,7 +277,7 @@ fn new_module_cache_data(rng: &mut impl Rng) -> ModuleCacheData { )) } -impl ModuleCacheEntry { +impl ModuleCacheEntry<'_> { pub fn mod_cache_path(&self) -> &Option { &self.mod_cache_path } diff --git a/wasmtime-environ/src/cache/worker.rs b/wasmtime-environ/src/cache/worker.rs index c87c4ad31f..559ebdd677 100644 --- a/wasmtime-environ/src/cache/worker.rs +++ b/wasmtime-environ/src/cache/worker.rs @@ -5,7 +5,9 @@ //! but we guarantee eventual consistency and fault tolerancy. //! Background tasks can be CPU intensive, but the worker thread has low priority. -use super::{cache_config, fs_write_atomic}; +use super::{cache_config, fs_write_atomic, CacheConfig}; +#[cfg(test)] +use core::borrow::Borrow; use log::{debug, info, trace, warn}; use serde::{Deserialize, Serialize}; use spin::Once; @@ -16,19 +18,43 @@ use std::fs; use std::path::{Path, PathBuf}; use std::sync::atomic::{self, AtomicBool}; use std::sync::mpsc::{sync_channel, Receiver, SyncSender}; -use std::thread; +#[cfg(test)] +use std::sync::{atomic::AtomicU32, Arc}; +use std::thread::{self}; use std::time::Duration; use std::time::SystemTime; use std::vec::Vec; -enum CacheEvent { - OnCacheGet(PathBuf), - OnCacheUpdate(PathBuf), +pub(super) struct Worker { + sender: SyncSender, + #[cfg(test)] + stats: Arc, } -static SENDER: Once> = Once::new(); +struct WorkerThread { + receiver: Receiver, + cache_config: CacheConfig, + #[cfg(test)] + stats: Arc, +} + +#[cfg(test)] +#[derive(Default)] +struct WorkerStats { + dropped: AtomicU32, + sent: AtomicU32, + handled: AtomicU32, +} + +static WORKER: Once = Once::new(); static INIT_CALLED: AtomicBool = AtomicBool::new(false); +pub(super) fn worker() -> &'static Worker { + WORKER + .r#try() + .expect("Cache worker must be initialized before usage") +} + pub(super) fn init(init_file_per_thread_logger: Option<&'static str>) { INIT_CALLED .compare_exchange( @@ -39,96 +65,88 @@ pub(super) fn init(init_file_per_thread_logger: Option<&'static str>) { ) .expect("Cache worker init must be called at most once"); - let (tx, rx) = sync_channel(cache_config::worker_event_queue_size()); - let _ = SENDER.call_once(move || tx); - thread::spawn(move || worker_thread(rx, init_file_per_thread_logger)); + let worker = Worker::start_new(cache_config(), init_file_per_thread_logger); + WORKER.call_once(|| worker); } -pub(super) fn on_cache_get_async(path: impl AsRef) { - let event = CacheEvent::OnCacheGet(path.as_ref().to_path_buf()); - send_cache_event(event); +enum CacheEvent { + OnCacheGet(PathBuf), + OnCacheUpdate(PathBuf), } -pub(super) fn on_cache_update_async(path: impl AsRef) { - let event = CacheEvent::OnCacheUpdate(path.as_ref().to_path_buf()); - send_cache_event(event); -} +impl Worker { + pub(super) fn start_new( + cache_config: &CacheConfig, + init_file_per_thread_logger: Option<&'static str>, + ) -> Self { + let (tx, rx) = sync_channel(cache_config.worker_event_queue_size()); -#[inline] -fn send_cache_event(event: CacheEvent) { - match SENDER - .r#try() - .expect("Cache worker init must be called before using the worker") - .try_send(event) - { - Ok(()) => (), - Err(err) => info!( - "Failed to send asynchronously message to worker thread: {}", - err - ), - } -} + #[cfg(test)] + let stats = Arc::new(WorkerStats::default()); -fn worker_thread( - receiver: Receiver, - init_file_per_thread_logger: Option<&'static str>, -) { - assert!(INIT_CALLED.load(atomic::Ordering::SeqCst)); + let worker_thread = WorkerThread { + receiver: rx, + cache_config: cache_config.clone(), + #[cfg(test)] + stats: stats.clone(), + }; - if let Some(prefix) = init_file_per_thread_logger { - file_per_thread_logger::initialize(prefix); - } + // when self is dropped, sender will be dropped, what will cause the channel + // to hang, and the worker thread to exit -- it happens in the tests + // non-tests binary has only a static worker, so Rust doesn't drop it + thread::spawn(move || worker_thread.run(init_file_per_thread_logger)); - debug!("Cache worker thread started."); - - lower_thread_priority(); - - for event in receiver.iter() { - match event { - CacheEvent::OnCacheGet(path) => handle_on_cache_get(path), - CacheEvent::OnCacheUpdate(path) => handle_on_cache_update(path), + Worker { + sender: tx, + #[cfg(test)] + stats: stats, } } - // The receiver can stop iteration iff the channel has hung up. The channel will never - // hang up, because we have static SyncSender, and Rust doesn't drop static variables. - unreachable!() -} - -#[cfg(target_os = "windows")] -fn lower_thread_priority() { - use core::convert::TryInto; - use winapi::um::processthreadsapi::{GetCurrentThread, SetThreadPriority}; - use winapi::um::winbase::THREAD_MODE_BACKGROUND_BEGIN; - - // https://docs.microsoft.com/en-us/windows/win32/api/processthreadsapi/nf-processthreadsapi-setthreadpriority - // https://docs.microsoft.com/en-us/windows/win32/procthread/scheduling-priorities - - if unsafe { - SetThreadPriority( - GetCurrentThread(), - THREAD_MODE_BACKGROUND_BEGIN.try_into().unwrap(), - ) - } == 0 - { - warn!("Failed to lower worker thread priority. It might affect application performance."); + pub(super) fn on_cache_get_async(&self, path: impl AsRef) { + let event = CacheEvent::OnCacheGet(path.as_ref().to_path_buf()); + self.send_cache_event(event); } -} -#[cfg(not(target_os = "windows"))] -fn lower_thread_priority() { - // http://man7.org/linux/man-pages/man7/sched.7.html + pub(super) fn on_cache_update_async(&self, path: impl AsRef) { + let event = CacheEvent::OnCacheUpdate(path.as_ref().to_path_buf()); + self.send_cache_event(event); + } - const NICE_DELTA_FOR_BACKGROUND_TASKS: i32 = 3; + #[inline] + fn send_cache_event(&self, event: CacheEvent) { + #[cfg(test)] + let stats: &WorkerStats = self.stats.borrow(); + match self.sender.try_send(event) { + Ok(()) => { + #[cfg(test)] + stats.sent.fetch_add(1, atomic::Ordering::SeqCst); + } + Err(err) => { + info!( + "Failed to send asynchronously message to worker thread: {}", + err + ); - errno::set_errno(errno::Errno(0)); - let current_nice = unsafe { libc::nice(NICE_DELTA_FOR_BACKGROUND_TASKS) }; - let errno_val = errno::errno().0; + #[cfg(test)] + stats.dropped.fetch_add(1, atomic::Ordering::SeqCst); + } + } + } - if errno_val != 0 { - warn!("Failed to lower worker thread priority. It might affect application performance. errno: {}", errno_val); - } else { - debug!("New nice value of worker thread: {}", current_nice); + #[allow(dead_code)] // todo for worker tests + #[cfg(test)] + pub(super) fn events_dropped(&self) -> u32 { + let stats: &WorkerStats = self.stats.borrow(); + stats.dropped.load(atomic::Ordering::SeqCst) + } + + // todo wait_for_* instead? + #[allow(dead_code)] // todo for worker tests + #[cfg(test)] + pub(super) fn all_events_handled(&self) -> bool { + let stats: &WorkerStats = self.stats.borrow(); + stats.sent.load(atomic::Ordering::SeqCst) == stats.handled.load(atomic::Ordering::SeqCst) } } @@ -139,139 +157,15 @@ struct ModuleCacheStatistics { pub compression_level: i32, } -impl Default for ModuleCacheStatistics { - fn default() -> Self { +impl ModuleCacheStatistics { + fn default(cache_config: &CacheConfig) -> Self { Self { usages: 0, - compression_level: cache_config::baseline_compression_level(), + compression_level: cache_config.baseline_compression_level(), } } } -/// Increases the usage counter and recompresses the file -/// if the usage counter reached configurable treshold. -fn handle_on_cache_get(path: PathBuf) { - trace!("handle_on_cache_get() for path: {}", path.display()); - - // construct .stats file path - let filename = path.file_name().unwrap().to_str().unwrap(); - let stats_path = path.with_file_name(format!("{}.stats", filename)); - - // load .stats file (default if none or error) - let mut stats = - read_stats_file(stats_path.as_ref()).unwrap_or_else(|| ModuleCacheStatistics::default()); - - // step 1: update the usage counter & write to the disk - // it's racy, but it's fine (the counter will be just smaller, - // sometimes will retrigger recompression) - stats.usages += 1; - if !write_stats_file(stats_path.as_ref(), &stats) { - return; - } - - // step 2: recompress if there's a need - let opt_compr_lvl = cache_config::optimized_compression_level(); - if stats.compression_level >= opt_compr_lvl - || stats.usages < cache_config::optimized_compression_usage_counter_threshold() - { - return; - } - - let lock_path = if let Some(p) = acquire_task_fs_lock( - path.as_ref(), - cache_config::optimizing_compression_task_timeout(), - ) { - p - } else { - return; - }; - - trace!("Trying to recompress file: {}", path.display()); - - // recompress, write to other file, rename (it's atomic file content exchange) - // and update the stats file - fs::read(&path) - .map_err(|err| { - warn!( - "Failed to read old cache file, path: {}, err: {}", - path.display(), - err - ) - }) - .ok() - .and_then(|compressed_cache_bytes| { - zstd::decode_all(&compressed_cache_bytes[..]) - .map_err(|err| warn!("Failed to decompress cached code: {}", err)) - .ok() - }) - .and_then(|cache_bytes| { - zstd::encode_all( - &cache_bytes[..], - opt_compr_lvl, - ) - .map_err(|err| warn!("Failed to compress cached code: {}", err)) - .ok() - }) - .and_then(|recompressed_cache_bytes| { - fs::write(&lock_path, &recompressed_cache_bytes) - .map_err(|err| { - warn!( - "Failed to write recompressed cache, path: {}, err: {}", - lock_path.display(), - err - ) - }) - .ok() - }) - .and_then(|()| { - fs::rename(&lock_path, &path) - .map_err(|err| { - warn!( - "Failed to rename recompressed cache, path from: {}, path to: {}, err: {}", - lock_path.display(), - path.display(), - err - ); - if let Err(err) = fs::remove_file(&lock_path) { - warn!( - "Failed to clean up (remove) recompressed cache, path {}, err: {}", - lock_path.display(), - err - ); - } - }) - .ok() - }) - .map(|()| { - // update stats file (reload it! recompression can take some time) - if let Some(mut new_stats) = read_stats_file(stats_path.as_ref()) { - if new_stats.compression_level >= opt_compr_lvl { - // Rare race: - // two instances with different opt_compr_lvl: we don't know in which order they updated - // the cache file and the stats file (they are not updated together atomically) - // Possible solution is to use directories per cache entry, but it complicates the system - // and is not worth it. - debug!("DETECTED task did more than once (or race with new file): recompression of {}. \ - Note: if optimized compression level setting has changed in the meantine, \ - the stats file might contain inconsistent compression level due to race.", path.display()); - } - else { - new_stats.compression_level = opt_compr_lvl; - let _ = write_stats_file(stats_path.as_ref(), &new_stats); - } - - if new_stats.usages < stats.usages { - debug!("DETECTED lower usage count (new file or race with counter increasing): file {}", path.display()); - } - } - else { - debug!("Can't read stats file again to update compression level (it might got cleaned up): file {}", stats_path.display()); - } - }); - - trace!("Task finished: recompress file: {}", path.display()); -} - enum CacheEntry { Recognized { path: PathBuf, @@ -284,114 +178,521 @@ enum CacheEntry { }, } -fn handle_on_cache_update(path: PathBuf) { - trace!("handle_on_cache_update() for path: {}", path.display()); +impl WorkerThread { + fn run(self, init_file_per_thread_logger: Option<&'static str>) { + assert!(INIT_CALLED.load(atomic::Ordering::SeqCst)); - // ---------------------- step 1: create .stats file + if let Some(prefix) = init_file_per_thread_logger { + file_per_thread_logger::initialize(prefix); + } - // construct .stats file path - let filename = path - .file_name() - .expect("Expected valid cache file name") - .to_str() - .expect("Expected valid cache file name"); - let stats_path = path.with_file_name(format!("{}.stats", filename)); + debug!("Cache worker thread started."); - // create and write stats file - let mut stats = ModuleCacheStatistics::default(); - stats.usages += 1; - write_stats_file(&stats_path, &stats); + Self::lower_thread_priority(); - // ---------------------- step 2: perform cleanup task if needed + #[cfg(test)] + let stats: &WorkerStats = self.stats.borrow(); - // acquire lock for cleanup task - // Lock is a proof of recent cleanup task, so we don't want to delete them. - // Expired locks will be deleted by the cleanup task. - let cleanup_file = cache_config::directory().join(".cleanup"); // some non existing marker file - if acquire_task_fs_lock(&cleanup_file, cache_config::cleanup_interval()).is_none() { - return; + for event in self.receiver.iter() { + match event { + CacheEvent::OnCacheGet(path) => self.handle_on_cache_get(path), + CacheEvent::OnCacheUpdate(path) => self.handle_on_cache_update(path), + } + + #[cfg(test)] + stats.handled.fetch_add(1, atomic::Ordering::SeqCst); + } + + // The receiver can stop iteration iff the channel has hung up. + // The channel will hung when sender is dropped. It only happens in tests. + // In non-test case we have static worker and Rust doesn't drop static variables. + #[cfg(not(test))] + unreachable!() } - trace!("Trying to clean up cache"); + #[cfg(target_os = "windows")] + fn lower_thread_priority() { + use core::convert::TryInto; + use winapi::um::processthreadsapi::{GetCurrentThread, SetThreadPriority}; + use winapi::um::winbase::THREAD_MODE_BACKGROUND_BEGIN; - let mut cache_index = list_cache_contents(); - cache_index.sort_unstable_by(|lhs, rhs| { - // sort by age - use CacheEntry::*; - match (lhs, rhs) { - (Recognized { mtime: lhs_mt, .. }, Recognized { mtime: rhs_mt, .. }) => { - rhs_mt.cmp(lhs_mt) - } // later == younger - // unrecognized is kind of infinity - (Recognized { .. }, Unrecognized { .. }) => cmp::Ordering::Less, - (Unrecognized { .. }, Recognized { .. }) => cmp::Ordering::Greater, - (Unrecognized { .. }, Unrecognized { .. }) => cmp::Ordering::Equal, + // https://docs.microsoft.com/en-us/windows/win32/api/processthreadsapi/nf-processthreadsapi-setthreadpriority + // https://docs.microsoft.com/en-us/windows/win32/procthread/scheduling-priorities + + if unsafe { + SetThreadPriority( + GetCurrentThread(), + THREAD_MODE_BACKGROUND_BEGIN.try_into().unwrap(), + ) + } == 0 + { + warn!( + "Failed to lower worker thread priority. It might affect application performance." + ); } - }); + } - // find "cut" boundary: - // - remove unrecognized files anyway, - // - remove some cache files if some quota has been exceeded - let mut total_size = 0u64; - let mut start_delete_idx = None; - let mut start_delete_idx_if_deleting_recognized_items: Option = None; + #[cfg(not(target_os = "windows"))] + fn lower_thread_priority() { + // http://man7.org/linux/man-pages/man7/sched.7.html - let total_size_limit = cache_config::files_total_size_soft_limit(); - let files_count_limit = cache_config::files_count_soft_limit(); - let tsl_if_deleting = total_size_limit - .checked_mul(cache_config::files_total_size_limit_percent_if_deleting() as u64) - .unwrap() - / 100; - let fcl_if_deleting = files_count_limit - .checked_mul(cache_config::files_count_limit_percent_if_deleting() as u64) - .unwrap() - / 100; + const NICE_DELTA_FOR_BACKGROUND_TASKS: i32 = 3; - for (idx, item) in cache_index.iter().enumerate() { - let size = if let CacheEntry::Recognized { size, .. } = item { - size + errno::set_errno(errno::Errno(0)); + let current_nice = unsafe { libc::nice(NICE_DELTA_FOR_BACKGROUND_TASKS) }; + let errno_val = errno::errno().0; + + if errno_val != 0 { + warn!("Failed to lower worker thread priority. It might affect application performance. errno: {}", errno_val); } else { - start_delete_idx = Some(idx); - break; + debug!("New nice value of worker thread: {}", current_nice); + } + } + + /// Increases the usage counter and recompresses the file + /// if the usage counter reached configurable treshold. + fn handle_on_cache_get(&self, path: PathBuf) { + trace!("handle_on_cache_get() for path: {}", path.display()); + + // construct .stats file path + let filename = path.file_name().unwrap().to_str().unwrap(); + let stats_path = path.with_file_name(format!("{}.stats", filename)); + + // load .stats file (default if none or error) + let mut stats = read_stats_file(stats_path.as_ref()) + .unwrap_or_else(|| ModuleCacheStatistics::default(&self.cache_config)); + + // step 1: update the usage counter & write to the disk + // it's racy, but it's fine (the counter will be just smaller, + // sometimes will retrigger recompression) + stats.usages += 1; + if !write_stats_file(stats_path.as_ref(), &stats) { + return; + } + + // step 2: recompress if there's a need + let opt_compr_lvl = self.cache_config.optimized_compression_level(); + if stats.compression_level >= opt_compr_lvl + || stats.usages + < self + .cache_config + .optimized_compression_usage_counter_threshold() + { + return; + } + + let lock_path = if let Some(p) = acquire_task_fs_lock( + path.as_ref(), + self.cache_config.optimizing_compression_task_timeout(), + self.cache_config + .allowed_clock_drift_for_locks_from_future(), + ) { + p + } else { + return; }; - total_size += size; - if start_delete_idx_if_deleting_recognized_items.is_none() { - if total_size >= tsl_if_deleting || (idx + 1) as u64 >= fcl_if_deleting { - start_delete_idx_if_deleting_recognized_items = Some(idx); - } - } + trace!("Trying to recompress file: {}", path.display()); - if total_size >= total_size_limit || (idx + 1) as u64 >= files_count_limit { - start_delete_idx = start_delete_idx_if_deleting_recognized_items; - break; - } - } - - if let Some(idx) = start_delete_idx { - for item in &cache_index[idx..] { - let (result, path, entity) = match item { - CacheEntry::Recognized { path, .. } - | CacheEntry::Unrecognized { - path, - is_dir: false, - } => (fs::remove_file(path), path, "file"), - CacheEntry::Unrecognized { path, is_dir: true } => { - (fs::remove_dir_all(path), path, "directory") - } - }; - if let Err(err) = result { + // recompress, write to other file, rename (it's atomic file content exchange) + // and update the stats file + fs::read(&path) + .map_err(|err| { warn!( - "Failed to remove {} during cleanup, path: {}, err: {}", - entity, + "Failed to read old cache file, path: {}, err: {}", path.display(), err - ); - } - } + ) + }) + .ok() + .and_then(|compressed_cache_bytes| { + zstd::decode_all(&compressed_cache_bytes[..]) + .map_err(|err| warn!("Failed to decompress cached code: {}", err)) + .ok() + }) + .and_then(|cache_bytes| { + zstd::encode_all( + &cache_bytes[..], + opt_compr_lvl, + ) + .map_err(|err| warn!("Failed to compress cached code: {}", err)) + .ok() + }) + .and_then(|recompressed_cache_bytes| { + fs::write(&lock_path, &recompressed_cache_bytes) + .map_err(|err| { + warn!( + "Failed to write recompressed cache, path: {}, err: {}", + lock_path.display(), + err + ) + }) + .ok() + }) + .and_then(|()| { + fs::rename(&lock_path, &path) + .map_err(|err| { + warn!( + "Failed to rename recompressed cache, path from: {}, path to: {}, err: {}", + lock_path.display(), + path.display(), + err + ); + if let Err(err) = fs::remove_file(&lock_path) { + warn!( + "Failed to clean up (remove) recompressed cache, path {}, err: {}", + lock_path.display(), + err + ); + } + }) + .ok() + }) + .map(|()| { + // update stats file (reload it! recompression can take some time) + if let Some(mut new_stats) = read_stats_file(stats_path.as_ref()) { + if new_stats.compression_level >= opt_compr_lvl { + // Rare race: + // two instances with different opt_compr_lvl: we don't know in which order they updated + // the cache file and the stats file (they are not updated together atomically) + // Possible solution is to use directories per cache entry, but it complicates the system + // and is not worth it. + debug!("DETECTED task did more than once (or race with new file): recompression of {}. \ + Note: if optimized compression level setting has changed in the meantine, \ + the stats file might contain inconsistent compression level due to race.", path.display()); + } + else { + new_stats.compression_level = opt_compr_lvl; + let _ = write_stats_file(stats_path.as_ref(), &new_stats); + } + + if new_stats.usages < stats.usages { + debug!("DETECTED lower usage count (new file or race with counter increasing): file {}", path.display()); + } + } + else { + debug!("Can't read stats file again to update compression level (it might got cleaned up): file {}", stats_path.display()); + } + }); + + trace!("Task finished: recompress file: {}", path.display()); } - trace!("Task finished: clean up cache"); + fn handle_on_cache_update(&self, path: PathBuf) { + trace!("handle_on_cache_update() for path: {}", path.display()); + + // ---------------------- step 1: create .stats file + + // construct .stats file path + let filename = path + .file_name() + .expect("Expected valid cache file name") + .to_str() + .expect("Expected valid cache file name"); + let stats_path = path.with_file_name(format!("{}.stats", filename)); + + // create and write stats file + let mut stats = ModuleCacheStatistics::default(&self.cache_config); + stats.usages += 1; + write_stats_file(&stats_path, &stats); + + // ---------------------- step 2: perform cleanup task if needed + + // acquire lock for cleanup task + // Lock is a proof of recent cleanup task, so we don't want to delete them. + // Expired locks will be deleted by the cleanup task. + let cleanup_file = self.cache_config.directory().join(".cleanup"); // some non existing marker file + if acquire_task_fs_lock( + &cleanup_file, + self.cache_config.cleanup_interval(), + self.cache_config + .allowed_clock_drift_for_locks_from_future(), + ) + .is_none() + { + return; + } + + trace!("Trying to clean up cache"); + + let mut cache_index = self.list_cache_contents(); + cache_index.sort_unstable_by(|lhs, rhs| { + // sort by age + use CacheEntry::*; + match (lhs, rhs) { + (Recognized { mtime: lhs_mt, .. }, Recognized { mtime: rhs_mt, .. }) => { + rhs_mt.cmp(lhs_mt) + } // later == younger + // unrecognized is kind of infinity + (Recognized { .. }, Unrecognized { .. }) => cmp::Ordering::Less, + (Unrecognized { .. }, Recognized { .. }) => cmp::Ordering::Greater, + (Unrecognized { .. }, Unrecognized { .. }) => cmp::Ordering::Equal, + } + }); + + // find "cut" boundary: + // - remove unrecognized files anyway, + // - remove some cache files if some quota has been exceeded + let mut total_size = 0u64; + let mut start_delete_idx = None; + let mut start_delete_idx_if_deleting_recognized_items: Option = None; + + let total_size_limit = self.cache_config.files_total_size_soft_limit(); + let files_count_limit = self.cache_config.files_count_soft_limit(); + let tsl_if_deleting = total_size_limit + .checked_mul( + self.cache_config + .files_total_size_limit_percent_if_deleting() as u64, + ) + .unwrap() + / 100; + let fcl_if_deleting = files_count_limit + .checked_mul(self.cache_config.files_count_limit_percent_if_deleting() as u64) + .unwrap() + / 100; + + for (idx, item) in cache_index.iter().enumerate() { + let size = if let CacheEntry::Recognized { size, .. } = item { + size + } else { + start_delete_idx = Some(idx); + break; + }; + + total_size += size; + if start_delete_idx_if_deleting_recognized_items.is_none() { + if total_size >= tsl_if_deleting || (idx + 1) as u64 >= fcl_if_deleting { + start_delete_idx_if_deleting_recognized_items = Some(idx); + } + } + + if total_size >= total_size_limit || (idx + 1) as u64 >= files_count_limit { + start_delete_idx = start_delete_idx_if_deleting_recognized_items; + break; + } + } + + if let Some(idx) = start_delete_idx { + for item in &cache_index[idx..] { + let (result, path, entity) = match item { + CacheEntry::Recognized { path, .. } + | CacheEntry::Unrecognized { + path, + is_dir: false, + } => (fs::remove_file(path), path, "file"), + CacheEntry::Unrecognized { path, is_dir: true } => { + (fs::remove_dir_all(path), path, "directory") + } + }; + if let Err(err) = result { + warn!( + "Failed to remove {} during cleanup, path: {}, err: {}", + entity, + path.display(), + err + ); + } + } + } + + trace!("Task finished: clean up cache"); + } + + // Be fault tolerant: list as much as you can, and ignore the rest + fn list_cache_contents(&self) -> Vec { + fn enter_dir( + vec: &mut Vec, + dir_path: &Path, + level: u8, + cache_config: &CacheConfig, + ) { + macro_rules! unwrap_or { + ($result:expr, $cont:stmt, $err_msg:expr) => { + unwrap_or!($result, $cont, $err_msg, dir_path) + }; + ($result:expr, $cont:stmt, $err_msg:expr, $path:expr) => { + match $result { + Ok(val) => val, + Err(err) => { + warn!( + "{}, level: {}, path: {}, msg: {}", + $err_msg, + level, + $path.display(), + err + ); + $cont + } + } + }; + } + macro_rules! add_unrecognized { + (file: $path:expr) => { + add_unrecognized!(false, $path) + }; + (dir: $path:expr) => { + add_unrecognized!(true, $path) + }; + ($is_dir:expr, $path:expr) => { + vec.push(CacheEntry::Unrecognized { + path: $path.to_path_buf(), + is_dir: $is_dir, + }); + }; + } + macro_rules! add_unrecognized_and { + ([ $( $ty:ident: $path:expr ),* ], $cont:stmt) => {{ + $( add_unrecognized!($ty: $path); )* + $cont + }}; + } + + // If we fail to list a directory, something bad is happening anyway + // (something touches our cache or we have disk failure) + // Try to delete it, so we can stay within soft limits of the cache size. + // This comment applies later in this function, too. + let it = unwrap_or!( + fs::read_dir(dir_path), + add_unrecognized_and!([dir: dir_path], return), + "Failed to list cache directory, deleting it" + ); + + let mut cache_files = HashMap::new(); + for entry in it { + // read_dir() returns an iterator over results - in case some of them are errors + // we don't know their names, so we can't delete them. We don't want to delete + // the whole directory with good entries too, so we just ignore the erroneous entries. + let entry = unwrap_or!( + entry, + continue, + "Failed to read a cache dir entry (NOT deleting it, it still occupies space)" + ); + let path = entry.path(); + match (level, path.is_dir()) { + (0..=1, true) => enter_dir(vec, &path, level + 1, cache_config), + (0..=1, false) => { + if level == 0 && path.file_stem() == Some(OsStr::new(".cleanup")) { + if let Some(_) = path.extension() { + // assume it's cleanup lock + if !is_fs_lock_expired( + Some(&entry), + &path, + cache_config.cleanup_interval(), + cache_config.allowed_clock_drift_for_locks_from_future(), + ) { + continue; // skip active lock + } + } + } + add_unrecognized!(file: path); + } + (2, false) => { + // assumption: only mod cache (no ext), .stats & .wip-* files + let ext = path.extension(); + if ext.is_none() || ext == Some(OsStr::new("stats")) { + cache_files.insert(path, entry); + } else { + // assume it's .wip file (lock) + if is_fs_lock_expired( + Some(&entry), + &path, + cache_config.optimizing_compression_task_timeout(), + cache_config.allowed_clock_drift_for_locks_from_future(), + ) { + add_unrecognized!(file: path); + } // else: skip active lock + } + } + (_, is_dir) => add_unrecognized!(is_dir, path), + } + } + + // associate module with its stats & handle them + // assumption: just mods and stats + for (path, entry) in cache_files.iter() { + let path_buf: PathBuf; + let (mod_, stats_, is_mod) = match path.extension() { + Some(_) => { + path_buf = path.with_extension(""); + ( + cache_files.get(&path_buf).map(|v| (&path_buf, v)), + Some((path, entry)), + false, + ) + } + None => { + path_buf = path.with_extension("stats"); + ( + Some((path, entry)), + cache_files.get(&path_buf).map(|v| (&path_buf, v)), + true, + ) + } + }; + + // construct a cache entry + match (mod_, stats_, is_mod) { + (Some((mod_path, mod_entry)), Some((stats_path, stats_entry)), true) => { + let mod_metadata = unwrap_or!( + mod_entry.metadata(), + add_unrecognized_and!([file: stats_path, file: mod_path], continue), + "Failed to get metadata, deleting BOTH module cache and stats files", + mod_path + ); + let stats_mtime = unwrap_or!( + stats_entry.metadata().and_then(|m| m.modified()), + add_unrecognized_and!( + [file: stats_path], + unwrap_or!( + mod_metadata.modified(), + add_unrecognized_and!([file: stats_path, file: mod_path], continue), + "Failed to get mtime, deleting BOTH module cache and stats files", + mod_path + ) + ), + "Failed to get metadata/mtime, deleting the file", + stats_path + ); + vec.push(CacheEntry::Recognized { + path: mod_path.to_path_buf(), + mtime: stats_mtime, + size: mod_metadata.len(), + }) + } + (Some(_), Some(_), false) => (), // was or will be handled by previous branch + (Some((mod_path, mod_entry)), None, _) => { + let (mod_metadata, mod_mtime) = unwrap_or!( + mod_entry + .metadata() + .and_then(|md| md.modified().map(|mt| (md, mt))), + add_unrecognized_and!([file: mod_path], continue), + "Failed to get metadata/mtime, deleting the file", + mod_path + ); + vec.push(CacheEntry::Recognized { + path: mod_path.to_path_buf(), + mtime: mod_mtime, + size: mod_metadata.len(), + }) + } + (None, Some((stats_path, _stats_entry)), _) => { + debug!("Found orphaned stats file: {}", stats_path.display()); + add_unrecognized!(file: stats_path); + } + _ => unreachable!(), + } + } + } + + let mut vec = Vec::new(); + enter_dir( + &mut vec, + self.cache_config.directory(), + 0, + &self.cache_config, + ); + vec + } } fn read_stats_file(path: &Path) -> Option { @@ -434,190 +735,6 @@ fn write_stats_file(path: &Path, stats: &ModuleCacheStatistics) -> bool { .is_ok() } -// Be fault tolerant: list as much as you can, and ignore the rest -fn list_cache_contents() -> Vec { - fn enter_dir(vec: &mut Vec, dir_path: &Path, level: u8) { - macro_rules! unwrap_or { - ($result:expr, $cont:stmt, $err_msg:expr) => { - unwrap_or!($result, $cont, $err_msg, dir_path) - }; - ($result:expr, $cont:stmt, $err_msg:expr, $path:expr) => { - match $result { - Ok(val) => val, - Err(err) => { - warn!( - "{}, level: {}, path: {}, msg: {}", - $err_msg, - level, - $path.display(), - err - ); - $cont - } - } - }; - } - macro_rules! add_unrecognized { - (file: $path:expr) => { - add_unrecognized!(false, $path) - }; - (dir: $path:expr) => { - add_unrecognized!(true, $path) - }; - ($is_dir:expr, $path:expr) => { - vec.push(CacheEntry::Unrecognized { - path: $path.to_path_buf(), - is_dir: $is_dir, - }); - }; - } - macro_rules! add_unrecognized_and { - ([ $( $ty:ident: $path:expr ),* ], $cont:stmt) => {{ - $( add_unrecognized!($ty: $path); )* - $cont - }}; - } - - // If we fail to list a directory, something bad is happening anyway - // (something touches our cache or we have disk failure) - // Try to delete it, so we can stay within soft limits of the cache size. - // This comment applies later in this function, too. - let it = unwrap_or!( - fs::read_dir(dir_path), - add_unrecognized_and!([dir: dir_path], return), - "Failed to list cache directory, deleting it" - ); - - let mut cache_files = HashMap::new(); - for entry in it { - // read_dir() returns an iterator over results - in case some of them are errors - // we don't know their names, so we can't delete them. We don't want to delete - // the whole directory with good entries too, so we just ignore the erroneous entries. - let entry = unwrap_or!( - entry, - continue, - "Failed to read a cache dir entry (NOT deleting it, it still occupies space)" - ); - let path = entry.path(); - match (level, path.is_dir()) { - (0..=1, true) => enter_dir(vec, &path, level + 1), - (0..=1, false) => { - if level == 0 && path.file_stem() == Some(OsStr::new(".cleanup")) { - if let Some(_) = path.extension() { - // assume it's cleanup lock - if !is_fs_lock_expired( - Some(&entry), - &path, - cache_config::cleanup_interval(), - ) { - continue; // skip active lock - } - } - } - add_unrecognized!(file: path); - } - (2, false) => { - // assumption: only mod cache (no ext), .stats & .wip-* files - let ext = path.extension(); - if ext.is_none() || ext == Some(OsStr::new("stats")) { - cache_files.insert(path, entry); - } else { - // assume it's .wip file (lock) - if is_fs_lock_expired( - Some(&entry), - &path, - cache_config::optimizing_compression_task_timeout(), - ) { - add_unrecognized!(file: path); - } // else: skip active lock - } - } - (_, is_dir) => add_unrecognized!(is_dir, path), - } - } - - // associate module with its stats & handle them - // assumption: just mods and stats - for (path, entry) in cache_files.iter() { - let path_buf: PathBuf; - let (mod_, stats_, is_mod) = match path.extension() { - Some(_) => { - path_buf = path.with_extension(""); - ( - cache_files.get(&path_buf).map(|v| (&path_buf, v)), - Some((path, entry)), - false, - ) - } - None => { - path_buf = path.with_extension("stats"); - ( - Some((path, entry)), - cache_files.get(&path_buf).map(|v| (&path_buf, v)), - true, - ) - } - }; - - // construct a cache entry - match (mod_, stats_, is_mod) { - (Some((mod_path, mod_entry)), Some((stats_path, stats_entry)), true) => { - let mod_metadata = unwrap_or!( - mod_entry.metadata(), - add_unrecognized_and!([file: stats_path, file: mod_path], continue), - "Failed to get metadata, deleting BOTH module cache and stats files", - mod_path - ); - let stats_mtime = unwrap_or!( - stats_entry.metadata().and_then(|m| m.modified()), - add_unrecognized_and!( - [file: stats_path], - unwrap_or!( - mod_metadata.modified(), - add_unrecognized_and!([file: stats_path, file: mod_path], continue), - "Failed to get mtime, deleting BOTH module cache and stats files", - mod_path - ) - ), - "Failed to get metadata/mtime, deleting the file", - stats_path - ); - vec.push(CacheEntry::Recognized { - path: mod_path.to_path_buf(), - mtime: stats_mtime, - size: mod_metadata.len(), - }) - } - (Some(_), Some(_), false) => (), // was or will be handled by previous branch - (Some((mod_path, mod_entry)), None, _) => { - let (mod_metadata, mod_mtime) = unwrap_or!( - mod_entry - .metadata() - .and_then(|md| md.modified().map(|mt| (md, mt))), - add_unrecognized_and!([file: mod_path], continue), - "Failed to get metadata/mtime, deleting the file", - mod_path - ); - vec.push(CacheEntry::Recognized { - path: mod_path.to_path_buf(), - mtime: mod_mtime, - size: mod_metadata.len(), - }) - } - (None, Some((stats_path, _stats_entry)), _) => { - debug!("Found orphaned stats file: {}", stats_path.display()); - add_unrecognized!(file: stats_path); - } - _ => unreachable!(), - } - } - } - - let mut vec = Vec::new(); - enter_dir(&mut vec, cache_config::directory(), 0); - vec -} - /// Tries to acquire a lock for specific task. /// /// Returns Some(path) to the lock if succeeds. The task path must not @@ -629,7 +746,11 @@ fn list_cache_contents() -> Vec { /// Note: this function is racy. Main idea is: be fault tolerant and /// never block some task. The price is that we rarely do some task /// more than once. -fn acquire_task_fs_lock(task_path: &Path, timeout: Duration) -> Option { +fn acquire_task_fs_lock( + task_path: &Path, + timeout: Duration, + allowed_future_drift: Duration, +) -> Option { assert!(task_path.extension().is_none()); assert!(task_path.file_stem().is_some()); @@ -669,7 +790,7 @@ fn acquire_task_fs_lock(task_path: &Path, timeout: Duration) -> Option if let Some(ext_str) = ext.to_str() { // if it's None, i.e. not valid UTF-8 string, then that's not our lock for sure if ext_str.starts_with("wip-") - && !is_fs_lock_expired(Some(&entry), &path, timeout) + && !is_fs_lock_expired(Some(&entry), &path, timeout, allowed_future_drift) { return None; } @@ -699,7 +820,12 @@ fn acquire_task_fs_lock(task_path: &Path, timeout: Duration) -> Option // we have either both, or just path; dir entry is desirable since on some platforms we can get // metadata without extra syscalls // futhermore: it's better to get a path if we have it instead of allocating a new one from the dir entry -fn is_fs_lock_expired(entry: Option<&fs::DirEntry>, path: &PathBuf, threshold: Duration) -> bool { +fn is_fs_lock_expired( + entry: Option<&fs::DirEntry>, + path: &PathBuf, + threshold: Duration, + allowed_future_drift: Duration, +) -> bool { let mtime = match entry .map(|e| e.metadata()) .unwrap_or_else(|| path.metadata()) @@ -727,8 +853,7 @@ fn is_fs_lock_expired(entry: Option<&fs::DirEntry>, path: &PathBuf, threshold: D // the lock is expired if the time is too far in the future // it is fine to have network share and not synchronized clocks, // but it's not good when user changes time in their system clock - static DEFAULT_THRESHOLD: Duration = Duration::from_secs(60 * 60 * 24); // todo dependant refactor PR adds this as a setting - err.duration() > DEFAULT_THRESHOLD + err.duration() > allowed_future_drift } } } diff --git a/wasmtime-environ/src/lib.rs b/wasmtime-environ/src/lib.rs index a4235f864a..e5c7eb3d95 100644 --- a/wasmtime-environ/src/lib.rs +++ b/wasmtime-environ/src/lib.rs @@ -54,7 +54,7 @@ pub mod lightbeam; pub use crate::address_map::{ FunctionAddressMap, InstructionAddressMap, ModuleAddressMap, ModuleVmctxInfo, ValueLabelsRanges, }; -pub use crate::cache::config as cache_config; +pub use crate::cache::init as cache_init; pub use crate::compilation::{ Compilation, CompileError, Compiler, Relocation, RelocationTarget, Relocations, }; diff --git a/wasmtime-environ/tests/cache_default_config_in_memory.rs b/wasmtime-environ/tests/cache_default_config_in_memory.rs index dab9960e32..607385e5c8 100644 --- a/wasmtime-environ/tests/cache_default_config_in_memory.rs +++ b/wasmtime-environ/tests/cache_default_config_in_memory.rs @@ -1,8 +1,8 @@ -use wasmtime_environ::cache_config; +use wasmtime_environ::cache_init; #[test] fn test_cache_default_config_in_memory() { - let errors = cache_config::init::<&str>(true, None, false, None); + let errors = cache_init::<&str>(true, None, false, None); assert!( errors.is_empty(), "This test loads config from the default location, if there's one. Make sure it's correct!" diff --git a/wasmtime-environ/tests/cache_fail_calling_init_twice.rs b/wasmtime-environ/tests/cache_fail_calling_init_twice.rs index b575e3f9b9..7892fe100f 100644 --- a/wasmtime-environ/tests/cache_fail_calling_init_twice.rs +++ b/wasmtime-environ/tests/cache_fail_calling_init_twice.rs @@ -1,6 +1,6 @@ use std::fs; use tempfile; -use wasmtime_environ::cache_config; +use wasmtime_environ::cache_init; #[test] #[should_panic] @@ -20,7 +20,7 @@ fn test_cache_fail_calling_init_twice() { ); fs::write(&config_path, config_content).expect("Failed to write test config file"); - let errors = cache_config::init(true, Some(&config_path), false, None); + let errors = cache_init(true, Some(&config_path), false, None); assert!(errors.is_empty()); - let _errors = cache_config::init(true, Some(&config_path), false, None); + let _errors = cache_init(true, Some(&config_path), false, None); } diff --git a/wasmtime-environ/tests/cache_fail_invalid_config.rs b/wasmtime-environ/tests/cache_fail_invalid_config.rs index 42a4512b11..a8154ee375 100644 --- a/wasmtime-environ/tests/cache_fail_invalid_config.rs +++ b/wasmtime-environ/tests/cache_fail_invalid_config.rs @@ -1,6 +1,6 @@ use std::fs; use tempfile; -use wasmtime_environ::cache_config; +use wasmtime_environ::cache_init; #[test] fn test_cache_fail_invalid_config() { @@ -18,6 +18,6 @@ fn test_cache_fail_invalid_config() { ); fs::write(&config_path, config_content).expect("Failed to write test config file"); - let errors = cache_config::init(true, Some(&config_path), false, None); + let errors = cache_init(true, Some(&config_path), false, None); assert!(!errors.is_empty()); } diff --git a/wasmtime-environ/tests/cache_fail_invalid_path_to_config.rs b/wasmtime-environ/tests/cache_fail_invalid_path_to_config.rs index 819734adea..d4c759fcb8 100644 --- a/wasmtime-environ/tests/cache_fail_invalid_path_to_config.rs +++ b/wasmtime-environ/tests/cache_fail_invalid_path_to_config.rs @@ -1,10 +1,10 @@ use tempfile; -use wasmtime_environ::cache_config; +use wasmtime_environ::cache_init; #[test] fn test_cache_fail_invalid_path_to_config() { let dir = tempfile::tempdir().expect("Can't create temporary directory"); let config_path = dir.path().join("cache-config.toml"); // doesn't exist - let errors = cache_config::init(true, Some(&config_path), false, None); + let errors = cache_init(true, Some(&config_path), false, None); assert!(!errors.is_empty()); } diff --git a/wasmtime-environ/tests/cache_fail_usage_without_init.rs b/wasmtime-environ/tests/cache_fail_usage_without_init.rs deleted file mode 100644 index 7538bfd505..0000000000 --- a/wasmtime-environ/tests/cache_fail_usage_without_init.rs +++ /dev/null @@ -1,15 +0,0 @@ -// These tests doesn't call init(), so we can test a multiple certain things here - -use wasmtime_environ::cache_config; - -#[test] -#[should_panic] -fn test_cache_fail_usage_without_init_directory() { - let _ = cache_config::directory(); -} - -#[test] -#[should_panic] -fn test_cache_fail_usage_without_init_baseline_compression_level() { - let _ = cache_config::baseline_compression_level(); -} diff --git a/wasmtime-environ/tests/cache_write_default_config.rs b/wasmtime-environ/tests/cache_write_default_config.rs index b4d56e45b6..17fd562fff 100644 --- a/wasmtime-environ/tests/cache_write_default_config.rs +++ b/wasmtime-environ/tests/cache_write_default_config.rs @@ -1,12 +1,12 @@ use tempfile; -use wasmtime_environ::cache_config; +use wasmtime_environ::cache_init; #[test] fn test_cache_write_default_config() { let dir = tempfile::tempdir().expect("Can't create temporary directory"); let config_path = dir.path().join("cache-config.toml"); - let errors = cache_config::init(true, Some(&config_path), true, None); + let errors = cache_init(true, Some(&config_path), true, None); assert!(errors.is_empty()); assert!(config_path.exists()); }