diff --git a/src/bin/wasm2obj.rs b/src/bin/wasm2obj.rs index 9afe4404ee..07e7159ecb 100644 --- a/src/bin/wasm2obj.rs +++ b/src/bin/wasm2obj.rs @@ -112,16 +112,20 @@ fn main() { }) .unwrap_or_else(|e| e.exit()); - if args.flag_debug { + let log_config = if args.flag_debug { pretty_env_logger::init(); + None } else { - wasmtime::init_file_per_thread_logger("wasm2obj.dbg."); - } + let prefix = "wasm2obj.dbg."; + wasmtime::init_file_per_thread_logger(prefix); + Some(prefix) + }; let errors = cache_config::init( args.flag_cache || args.flag_cache_config_file.is_some(), args.flag_cache_config_file.as_ref(), args.flag_create_cache_config, + log_config, ); if !errors.is_empty() { diff --git a/src/bin/wasmtime.rs b/src/bin/wasmtime.rs index 6c9c435b46..b770b48a57 100644 --- a/src/bin/wasmtime.rs +++ b/src/bin/wasmtime.rs @@ -213,16 +213,20 @@ fn rmain() -> Result<(), Error> { }) .unwrap_or_else(|e| e.exit()); - if args.flag_debug { + let log_config = if args.flag_debug { pretty_env_logger::init(); + None } else { - wasmtime::init_file_per_thread_logger("wasmtime.dbg."); - } + let prefix = "wasmtime.dbg."; + wasmtime::init_file_per_thread_logger(prefix); + Some(prefix) + }; let errors = cache_config::init( args.flag_cache || args.flag_cache_config_file.is_some(), args.flag_cache_config_file.as_ref(), args.flag_create_cache_config, + log_config, ); if !errors.is_empty() { diff --git a/src/bin/wast.rs b/src/bin/wast.rs index e9cc6fc706..098d204eff 100644 --- a/src/bin/wast.rs +++ b/src/bin/wast.rs @@ -80,16 +80,20 @@ fn main() { }) .unwrap_or_else(|e| e.exit()); - if args.flag_debug { + let log_config = if args.flag_debug { pretty_env_logger::init(); + None } else { - wasmtime::init_file_per_thread_logger("cranelift.dbg."); - } + let prefix = "cranelift.dbg."; + wasmtime::init_file_per_thread_logger(prefix); + Some(prefix) + }; let errors = cache_config::init( args.flag_cache || args.flag_cache_config_file.is_some(), args.flag_cache_config_file.as_ref(), args.flag_create_cache_config, + log_config, ); if !errors.is_empty() { diff --git a/wasmtime-environ/Cargo.toml b/wasmtime-environ/Cargo.toml index c1a4b44752..b43d915a3f 100644 --- a/wasmtime-environ/Cargo.toml +++ b/wasmtime-environ/Cargo.toml @@ -30,6 +30,14 @@ spin = "0.5.0" log = { version = "0.4.8", default-features = false } zstd = "0.4" toml = "0.5" +file-per-thread-logger = "0.1.1" + +[target.'cfg(target_os = "windows")'.dependencies] +winapi = "0.3.7" + +[target.'cfg(not(target_os = "windows"))'.dependencies] +libc = "0.2.60" +errno = "0.2.4" [dev-dependencies] tempfile = "3" diff --git a/wasmtime-environ/src/cache.rs b/wasmtime-environ/src/cache.rs index 42b2645427..0a4359236d 100644 --- a/wasmtime-environ/src/cache.rs +++ b/wasmtime-environ/src/cache.rs @@ -11,12 +11,13 @@ use log::{debug, trace, warn}; use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; use std::fs; -use std::io; -use std::path::PathBuf; +use std::io::Write; +use std::path::{Path, PathBuf}; use std::string::{String, ToString}; pub mod config; use config as cache_config; // so we have namespaced methods +mod worker; lazy_static! { static ref SELF_MTIME: String = { @@ -116,13 +117,19 @@ impl ModuleCacheEntry { let cache_bytes = zstd::decode_all(&compressed_cache_bytes[..]) .map_err(|err| warn!("Failed to decompress cached code: {}", err)) .ok()?; - bincode::deserialize(&cache_bytes[..]) + let ret = bincode::deserialize(&cache_bytes[..]) .map_err(|err| warn!("Failed to deserialize cached code: {}", err)) - .ok() + .ok()?; + + worker::on_cache_get_async(path); // call on success + Some(ret) } pub fn update_data(&self, data: &ModuleCacheData) { - let _ = self.update_data_impl(data); + if self.update_data_impl(data).is_some() { + let path = self.mod_cache_path.as_ref().unwrap(); + worker::on_cache_update_async(path); // call on success + } } fn update_data_impl(&self, data: &ModuleCacheData) -> Option<()> { @@ -140,12 +147,14 @@ impl ModuleCacheEntry { // Optimize syscalls: first, try writing to disk. It should succeed in most cases. // Otherwise, try creating the cache directory and retry writing to the file. - let err = fs::write(path, &compressed_data).err()?; // return on success + if fs_write_atomic(path, "mod", &compressed_data) { + return Some(()); + } + debug!( "Attempting to create the cache directory, because \ - failed to write cached code to disk, path: {}, message: {}", + failed to write cached code to disk, path: {}", path.display(), - err, ); let cache_dir = path.parent().unwrap(); @@ -159,23 +168,11 @@ impl ModuleCacheEntry { }) .ok()?; - let err = fs::write(path, &compressed_data).err()?; - warn!( - "Failed to write cached code to disk, path: {}, message: {}", - path.display(), - err - ); - fs::remove_file(path) - .map_err(|err| { - if err.kind() != io::ErrorKind::NotFound { - warn!( - "Failed to cleanup invalid cache, path: {}, message: {}", - path.display(), - err - ); - } - }) - .ok() + if fs_write_atomic(path, "mod", &compressed_data) { + Some(()) + } else { + None + } } } @@ -222,5 +219,28 @@ impl Hasher for Sha256Hasher { } } +// Assumption: path inside cache directory. +// Then, we don't have to use sound OS-specific exclusive file access. +// Note: there's no need to remove temporary file here - cleanup task will do it later. +fn fs_write_atomic(path: &Path, reason: &str, contents: &[u8]) -> bool { + let lock_path = path.with_extension(format!("wip-atomic-write-{}", reason)); + fs::OpenOptions::new() + .create_new(true) // atomic file creation (assumption: no one will open it without this flag) + .write(true) + .open(&lock_path) + .and_then(|mut file| file.write_all(contents)) + // file should go out of scope and be closed at this point + .and_then(|()| fs::rename(&lock_path, &path)) // atomic file rename + .map_err(|err| { + warn!( + "Failed to write file with rename, lock path: {}, target path: {}, err: {}", + lock_path.display(), + path.display(), + err + ) + }) + .is_ok() +} + #[cfg(test)] mod tests; diff --git a/wasmtime-environ/src/cache/config.rs b/wasmtime-environ/src/cache/config.rs index aaf84dea59..51e2c15d50 100644 --- a/wasmtime-environ/src/cache/config.rs +++ b/wasmtime-environ/src/cache/config.rs @@ -1,9 +1,10 @@ //! Module for configuring the cache system. +use super::worker; use directories::ProjectDirs; use lazy_static::lazy_static; -use log::{debug, error, trace}; -use serde::{Deserialize, Serialize}; +use log::{debug, error, trace, warn}; +use serde::{de::Deserializer, ser::Serializer, Deserialize, Serialize}; use spin::Once; use std::fmt::Debug; use std::fs; @@ -11,24 +12,72 @@ use std::mem; use std::path::{Path, PathBuf}; use std::string::{String, ToString}; use std::sync::atomic::{AtomicBool, Ordering}; +use std::time::Duration; use std::vec::Vec; // wrapped, so we have named section in config, // also, for possible future compatibility -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Debug)] struct Config { cache: CacheConfig, } -#[derive(Serialize, Deserialize)] +// todo: markdown documention of these options +// todo: don't flush default values (create config from simple template + url to docs) +// todo: more user-friendly cache config creation +#[derive(Serialize, Deserialize, Debug)] struct CacheConfig { #[serde(skip)] pub errors: Vec, pub enabled: bool, pub directory: Option, + #[serde(rename = "worker-event-queue-size")] + pub worker_event_queue_size: Option, #[serde(rename = "baseline-compression-level")] pub baseline_compression_level: Option, + #[serde(rename = "optimized-compression-level")] + pub optimized_compression_level: Option, + #[serde(rename = "optimized-compression-usage-counter-threshold")] + pub optimized_compression_usage_counter_threshold: Option, + #[serde( + default, + rename = "cleanup-interval-in-seconds", + serialize_with = "serialize_duration", + deserialize_with = "deserialize_duration" + )] // todo unit? + pub cleanup_interval: Option, + #[serde( + default, + rename = "optimizing-compression-task-timeout-in-seconds", + serialize_with = "serialize_duration", + deserialize_with = "deserialize_duration" + )] // todo unit? + pub optimizing_compression_task_timeout: Option, + #[serde(rename = "files-count-soft-limit")] + pub files_count_soft_limit: Option, + #[serde(rename = "files-total-size-soft-limit")] + pub files_total_size_soft_limit: Option, // todo unit? + #[serde(rename = "files-count-limit-percent-if-deleting")] + pub files_count_limit_percent_if_deleting: Option, // todo format: integer + % + #[serde(rename = "files-total-size-limit-percent-if-deleting")] + pub files_total_size_limit_percent_if_deleting: Option, +} + +// toml-rs fails to serialize Duration ("values must be emitted before tables") +// so we're providing custom functions for it +fn serialize_duration(duration: &Option, serializer: S) -> Result +where + S: Serializer, +{ + duration.map(|d| d.as_secs()).serialize(serializer) +} + +fn deserialize_duration<'de, D>(deserializer: D) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + Ok(Option::::deserialize(deserializer)?.map(Duration::from_secs)) } // Private static, so only internal function can access it. @@ -53,20 +102,35 @@ pub fn directory() -> &'static PathBuf { .expect("Cache system must be initialized") .directory .as_ref() - .unwrap() + .expect("All cache system settings must be validated or defaulted") } -/// Returns cache compression level. -/// -/// Panics if the cache is disabled. -pub fn baseline_compression_level() -> i32 { - CONFIG - .r#try() - .expect("Cache system must be initialized") - .baseline_compression_level - .unwrap() +macro_rules! generate_setting_getter { + ($setting:ident: $setting_type:ty) => { + /// Returns `$setting`. + /// + /// Panics if the cache is disabled. + pub fn $setting() -> $setting_type { + CONFIG + .r#try() + .expect("Cache system must be initialized") + .$setting + .expect("All cache system settings must be validated or defaulted") + } + }; } +generate_setting_getter!(worker_event_queue_size: usize); +generate_setting_getter!(baseline_compression_level: i32); +generate_setting_getter!(optimized_compression_level: i32); +generate_setting_getter!(optimized_compression_usage_counter_threshold: u64); +generate_setting_getter!(cleanup_interval: Duration); +generate_setting_getter!(optimizing_compression_task_timeout: Duration); +generate_setting_getter!(files_count_soft_limit: u64); +generate_setting_getter!(files_total_size_soft_limit: u64); +generate_setting_getter!(files_count_limit_percent_if_deleting: u8); +generate_setting_getter!(files_total_size_limit_percent_if_deleting: u8); + /// Initializes the cache system. Should be called exactly once, /// and before using the cache system. Otherwise it can panic. /// Returns list of errors. If empty, initialization succeeded. @@ -74,6 +138,7 @@ pub fn init + Debug>( enabled: bool, config_file: Option

, create_new_config: bool, + init_file_per_thread_logger: Option<&'static str>, ) -> &'static Vec { INIT_CALLED .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) @@ -85,10 +150,8 @@ pub fn init + Debug>( let conf_file_str = format!("{:?}", config_file); let conf = CONFIG.call_once(|| CacheConfig::from_file(enabled, config_file, create_new_config)); if conf.errors.is_empty() { - debug!( - "Cache init(\"{}\"): enabled={}, directory={:?}, baseline-compression-level={:?}", - conf_file_str, conf.enabled, conf.directory, conf.baseline_compression_level, - ) + worker::init(init_file_per_thread_logger); + debug!("Cache init(\"{}\"): {:#?}", conf_file_str, conf) } else { error!( "Cache init(\"{}\"): errors: {:#?}", @@ -104,6 +167,19 @@ lazy_static! { static ref PROJECT_DIRS: Option = ProjectDirs::from("", "CraneStation", "wasmtime"); } +// TODO: values to be tuned +// TODO: what do we want to warn users about? +const DEFAULT_WORKER_EVENT_QUEUE_SIZE: usize = 0x10; +const WORKER_EVENT_QUEUE_SIZE_WARNING_TRESHOLD: usize = 3; +const DEFAULT_BASELINE_COMPRESSION_LEVEL: i32 = zstd::DEFAULT_COMPRESSION_LEVEL; +const DEFAULT_OPTIMIZED_COMPRESSION_LEVEL: i32 = 20; +const DEFAULT_OPTIMIZED_COMPRESSION_USAGE_COUNTER_THRESHOLD: u64 = 0x100; +const DEFAULT_CLEANUP_INTERVAL: Duration = Duration::from_secs(60 * 60); +const DEFAULT_OPTIMIZING_COMPRESSION_TASK_TIMEOUT: Duration = Duration::from_secs(30 * 60); +const DEFAULT_FILES_COUNT_SOFT_LIMIT: u64 = 0x10_000; +const DEFAULT_FILES_TOTAL_SIZE_SOFT_LIMIT: u64 = 1024 * 1024 * 512; +const DEFAULT_FILES_COUNT_LIMIT_PERCENT_IF_DELETING: u8 = 70; +const DEFAULT_FILES_TOTAL_SIZE_LIMIT_PERCENT_IF_DELETING: u8 = 70; impl CacheConfig { pub fn new_cache_disabled() -> Self { @@ -111,7 +187,16 @@ impl CacheConfig { errors: Vec::new(), enabled: false, directory: None, + worker_event_queue_size: None, baseline_compression_level: None, + optimized_compression_level: None, + optimized_compression_usage_counter_threshold: None, + cleanup_interval: None, + optimizing_compression_task_timeout: None, + files_count_soft_limit: None, + files_total_size_soft_limit: None, + files_count_limit_percent_if_deleting: None, + files_total_size_limit_percent_if_deleting: None, } } @@ -143,8 +228,17 @@ impl CacheConfig { }; // validate values and fill in defaults - config.validate_cache_directory_or_default(); + config.validate_directory_or_default(); + config.validate_worker_event_queue_size_or_default(); config.validate_baseline_compression_level_or_default(); + config.validate_optimized_compression_level_or_default(); + config.validate_optimized_compression_usage_counter_threshold_or_default(); + config.validate_cleanup_interval_or_default(); + config.validate_optimizing_compression_task_timeout_or_default(); + config.validate_files_count_soft_limit_or_default(); + config.validate_files_total_size_soft_limit_or_default(); + config.validate_files_count_limit_percent_if_deleting_or_default(); + config.validate_files_total_size_limit_percent_if_deleting_or_default(); path_if_flush_to_disk.map(|p| config.flush_to_disk(p)); @@ -195,7 +289,7 @@ impl CacheConfig { } } - fn validate_cache_directory_or_default(&mut self) { + fn validate_directory_or_default(&mut self) { if self.directory.is_none() { match &*PROJECT_DIRS { Some(proj_dirs) => self.directory = Some(proj_dirs.cache_dir().to_path_buf()), @@ -246,9 +340,19 @@ impl CacheConfig { } } + fn validate_worker_event_queue_size_or_default(&mut self) { + if self.worker_event_queue_size.is_none() { + self.worker_event_queue_size = Some(DEFAULT_WORKER_EVENT_QUEUE_SIZE); + } + + if self.worker_event_queue_size.unwrap() < WORKER_EVENT_QUEUE_SIZE_WARNING_TRESHOLD { + warn!("Detected small worker event queue size. Some messages might be lost."); + } + } + fn validate_baseline_compression_level_or_default(&mut self) { if self.baseline_compression_level.is_none() { - self.baseline_compression_level = Some(zstd::DEFAULT_COMPRESSION_LEVEL); + self.baseline_compression_level = Some(DEFAULT_BASELINE_COMPRESSION_LEVEL); } if !ZSTD_COMPRESSION_LEVELS.contains(&self.baseline_compression_level.unwrap()) { @@ -260,6 +364,92 @@ impl CacheConfig { } } + // assumption: baseline compression level has been verified + fn validate_optimized_compression_level_or_default(&mut self) { + if self.optimized_compression_level.is_none() { + self.optimized_compression_level = Some(DEFAULT_OPTIMIZED_COMPRESSION_LEVEL); + } + + let opt_lvl = self.optimized_compression_level.unwrap(); + let base_lvl = self.baseline_compression_level.unwrap(); + + if !ZSTD_COMPRESSION_LEVELS.contains(&opt_lvl) { + self.errors.push(format!( + "Invalid optimized compression level: {} not in {:#?}", + opt_lvl, ZSTD_COMPRESSION_LEVELS + )); + } + + if opt_lvl < base_lvl { + self.errors.push(format!( + "Invalid optimized compression level is lower than baseline: {} < {}", + opt_lvl, base_lvl + )); + } + } + + fn validate_optimized_compression_usage_counter_threshold_or_default(&mut self) { + if self.optimized_compression_usage_counter_threshold.is_none() { + self.optimized_compression_usage_counter_threshold = + Some(DEFAULT_OPTIMIZED_COMPRESSION_USAGE_COUNTER_THRESHOLD); + } + } + + fn validate_cleanup_interval_or_default(&mut self) { + if self.cleanup_interval.is_none() { + self.cleanup_interval = Some(DEFAULT_CLEANUP_INTERVAL); + } + } + + fn validate_optimizing_compression_task_timeout_or_default(&mut self) { + if self.optimizing_compression_task_timeout.is_none() { + self.optimizing_compression_task_timeout = + Some(DEFAULT_OPTIMIZING_COMPRESSION_TASK_TIMEOUT); + } + } + + fn validate_files_count_soft_limit_or_default(&mut self) { + if self.files_count_soft_limit.is_none() { + self.files_count_soft_limit = Some(DEFAULT_FILES_COUNT_SOFT_LIMIT); + } + } + + fn validate_files_total_size_soft_limit_or_default(&mut self) { + if self.files_total_size_soft_limit.is_none() { + self.files_total_size_soft_limit = Some(DEFAULT_FILES_TOTAL_SIZE_SOFT_LIMIT); + } + } + + fn validate_files_count_limit_percent_if_deleting_or_default(&mut self) { + if self.files_count_limit_percent_if_deleting.is_none() { + self.files_count_limit_percent_if_deleting = + Some(DEFAULT_FILES_COUNT_LIMIT_PERCENT_IF_DELETING); + } + + let percent = self.files_count_limit_percent_if_deleting.unwrap(); + if percent > 100 { + self.errors.push(format!( + "Invalid files count limit percent if deleting: {} not in range 0-100%", + percent + )); + } + } + + fn validate_files_total_size_limit_percent_if_deleting_or_default(&mut self) { + if self.files_total_size_limit_percent_if_deleting.is_none() { + self.files_total_size_limit_percent_if_deleting = + Some(DEFAULT_FILES_TOTAL_SIZE_LIMIT_PERCENT_IF_DELETING); + } + + let percent = self.files_total_size_limit_percent_if_deleting.unwrap(); + if percent > 100 { + self.errors.push(format!( + "Invalid files total size limit percent if deleting: {} not in range 0-100%", + percent + )); + } + } + fn flush_to_disk(&mut self, path: PathBuf) { if !self.errors.is_empty() { return; diff --git a/wasmtime-environ/src/cache/tests.rs b/wasmtime-environ/src/cache/tests.rs index a6e7b5c609..2392188979 100644 --- a/wasmtime-environ/src/cache/tests.rs +++ b/wasmtime-environ/src/cache/tests.rs @@ -40,7 +40,7 @@ fn test_write_read_cache() { ); fs::write(&config_path, config_content).expect("Failed to write test config file"); - let errors = cache_config::init(true, Some(&config_path), false); + let errors = cache_config::init(true, Some(&config_path), false, None); assert!(errors.is_empty()); assert!(cache_config::enabled()); // assumption: config init creates cache directory and returns canonicalized path diff --git a/wasmtime-environ/src/cache/worker.rs b/wasmtime-environ/src/cache/worker.rs new file mode 100644 index 0000000000..c87c4ad31f --- /dev/null +++ b/wasmtime-environ/src/cache/worker.rs @@ -0,0 +1,736 @@ +//! Background worker that watches over the cache. +//! +//! It cleans up old cache, updates statistics and optimizes the cache. +//! We allow losing some messages (it doesn't hurt) and some races, +//! but we guarantee eventual consistency and fault tolerancy. +//! Background tasks can be CPU intensive, but the worker thread has low priority. + +use super::{cache_config, fs_write_atomic}; +use log::{debug, info, trace, warn}; +use serde::{Deserialize, Serialize}; +use spin::Once; +use std::cmp; +use std::collections::HashMap; +use std::ffi::OsStr; +use std::fs; +use std::path::{Path, PathBuf}; +use std::sync::atomic::{self, AtomicBool}; +use std::sync::mpsc::{sync_channel, Receiver, SyncSender}; +use std::thread; +use std::time::Duration; +use std::time::SystemTime; +use std::vec::Vec; + +enum CacheEvent { + OnCacheGet(PathBuf), + OnCacheUpdate(PathBuf), +} + +static SENDER: Once> = Once::new(); +static INIT_CALLED: AtomicBool = AtomicBool::new(false); + +pub(super) fn init(init_file_per_thread_logger: Option<&'static str>) { + INIT_CALLED + .compare_exchange( + false, + true, + atomic::Ordering::SeqCst, + atomic::Ordering::SeqCst, + ) + .expect("Cache worker init must be called at most once"); + + let (tx, rx) = sync_channel(cache_config::worker_event_queue_size()); + let _ = SENDER.call_once(move || tx); + thread::spawn(move || worker_thread(rx, init_file_per_thread_logger)); +} + +pub(super) fn on_cache_get_async(path: impl AsRef) { + let event = CacheEvent::OnCacheGet(path.as_ref().to_path_buf()); + send_cache_event(event); +} + +pub(super) fn on_cache_update_async(path: impl AsRef) { + let event = CacheEvent::OnCacheUpdate(path.as_ref().to_path_buf()); + send_cache_event(event); +} + +#[inline] +fn send_cache_event(event: CacheEvent) { + match SENDER + .r#try() + .expect("Cache worker init must be called before using the worker") + .try_send(event) + { + Ok(()) => (), + Err(err) => info!( + "Failed to send asynchronously message to worker thread: {}", + err + ), + } +} + +fn worker_thread( + receiver: Receiver, + init_file_per_thread_logger: Option<&'static str>, +) { + assert!(INIT_CALLED.load(atomic::Ordering::SeqCst)); + + if let Some(prefix) = init_file_per_thread_logger { + file_per_thread_logger::initialize(prefix); + } + + debug!("Cache worker thread started."); + + lower_thread_priority(); + + for event in receiver.iter() { + match event { + CacheEvent::OnCacheGet(path) => handle_on_cache_get(path), + CacheEvent::OnCacheUpdate(path) => handle_on_cache_update(path), + } + } + + // The receiver can stop iteration iff the channel has hung up. The channel will never + // hang up, because we have static SyncSender, and Rust doesn't drop static variables. + unreachable!() +} + +#[cfg(target_os = "windows")] +fn lower_thread_priority() { + use core::convert::TryInto; + use winapi::um::processthreadsapi::{GetCurrentThread, SetThreadPriority}; + use winapi::um::winbase::THREAD_MODE_BACKGROUND_BEGIN; + + // https://docs.microsoft.com/en-us/windows/win32/api/processthreadsapi/nf-processthreadsapi-setthreadpriority + // https://docs.microsoft.com/en-us/windows/win32/procthread/scheduling-priorities + + if unsafe { + SetThreadPriority( + GetCurrentThread(), + THREAD_MODE_BACKGROUND_BEGIN.try_into().unwrap(), + ) + } == 0 + { + warn!("Failed to lower worker thread priority. It might affect application performance."); + } +} + +#[cfg(not(target_os = "windows"))] +fn lower_thread_priority() { + // http://man7.org/linux/man-pages/man7/sched.7.html + + const NICE_DELTA_FOR_BACKGROUND_TASKS: i32 = 3; + + errno::set_errno(errno::Errno(0)); + let current_nice = unsafe { libc::nice(NICE_DELTA_FOR_BACKGROUND_TASKS) }; + let errno_val = errno::errno().0; + + if errno_val != 0 { + warn!("Failed to lower worker thread priority. It might affect application performance. errno: {}", errno_val); + } else { + debug!("New nice value of worker thread: {}", current_nice); + } +} + +#[derive(Serialize, Deserialize)] +struct ModuleCacheStatistics { + pub usages: u64, + #[serde(rename = "optimized-compression")] + pub compression_level: i32, +} + +impl Default for ModuleCacheStatistics { + fn default() -> Self { + Self { + usages: 0, + compression_level: cache_config::baseline_compression_level(), + } + } +} + +/// Increases the usage counter and recompresses the file +/// if the usage counter reached configurable treshold. +fn handle_on_cache_get(path: PathBuf) { + trace!("handle_on_cache_get() for path: {}", path.display()); + + // construct .stats file path + let filename = path.file_name().unwrap().to_str().unwrap(); + let stats_path = path.with_file_name(format!("{}.stats", filename)); + + // load .stats file (default if none or error) + let mut stats = + read_stats_file(stats_path.as_ref()).unwrap_or_else(|| ModuleCacheStatistics::default()); + + // step 1: update the usage counter & write to the disk + // it's racy, but it's fine (the counter will be just smaller, + // sometimes will retrigger recompression) + stats.usages += 1; + if !write_stats_file(stats_path.as_ref(), &stats) { + return; + } + + // step 2: recompress if there's a need + let opt_compr_lvl = cache_config::optimized_compression_level(); + if stats.compression_level >= opt_compr_lvl + || stats.usages < cache_config::optimized_compression_usage_counter_threshold() + { + return; + } + + let lock_path = if let Some(p) = acquire_task_fs_lock( + path.as_ref(), + cache_config::optimizing_compression_task_timeout(), + ) { + p + } else { + return; + }; + + trace!("Trying to recompress file: {}", path.display()); + + // recompress, write to other file, rename (it's atomic file content exchange) + // and update the stats file + fs::read(&path) + .map_err(|err| { + warn!( + "Failed to read old cache file, path: {}, err: {}", + path.display(), + err + ) + }) + .ok() + .and_then(|compressed_cache_bytes| { + zstd::decode_all(&compressed_cache_bytes[..]) + .map_err(|err| warn!("Failed to decompress cached code: {}", err)) + .ok() + }) + .and_then(|cache_bytes| { + zstd::encode_all( + &cache_bytes[..], + opt_compr_lvl, + ) + .map_err(|err| warn!("Failed to compress cached code: {}", err)) + .ok() + }) + .and_then(|recompressed_cache_bytes| { + fs::write(&lock_path, &recompressed_cache_bytes) + .map_err(|err| { + warn!( + "Failed to write recompressed cache, path: {}, err: {}", + lock_path.display(), + err + ) + }) + .ok() + }) + .and_then(|()| { + fs::rename(&lock_path, &path) + .map_err(|err| { + warn!( + "Failed to rename recompressed cache, path from: {}, path to: {}, err: {}", + lock_path.display(), + path.display(), + err + ); + if let Err(err) = fs::remove_file(&lock_path) { + warn!( + "Failed to clean up (remove) recompressed cache, path {}, err: {}", + lock_path.display(), + err + ); + } + }) + .ok() + }) + .map(|()| { + // update stats file (reload it! recompression can take some time) + if let Some(mut new_stats) = read_stats_file(stats_path.as_ref()) { + if new_stats.compression_level >= opt_compr_lvl { + // Rare race: + // two instances with different opt_compr_lvl: we don't know in which order they updated + // the cache file and the stats file (they are not updated together atomically) + // Possible solution is to use directories per cache entry, but it complicates the system + // and is not worth it. + debug!("DETECTED task did more than once (or race with new file): recompression of {}. \ + Note: if optimized compression level setting has changed in the meantine, \ + the stats file might contain inconsistent compression level due to race.", path.display()); + } + else { + new_stats.compression_level = opt_compr_lvl; + let _ = write_stats_file(stats_path.as_ref(), &new_stats); + } + + if new_stats.usages < stats.usages { + debug!("DETECTED lower usage count (new file or race with counter increasing): file {}", path.display()); + } + } + else { + debug!("Can't read stats file again to update compression level (it might got cleaned up): file {}", stats_path.display()); + } + }); + + trace!("Task finished: recompress file: {}", path.display()); +} + +enum CacheEntry { + Recognized { + path: PathBuf, + mtime: SystemTime, + size: u64, + }, + Unrecognized { + path: PathBuf, + is_dir: bool, + }, +} + +fn handle_on_cache_update(path: PathBuf) { + trace!("handle_on_cache_update() for path: {}", path.display()); + + // ---------------------- step 1: create .stats file + + // construct .stats file path + let filename = path + .file_name() + .expect("Expected valid cache file name") + .to_str() + .expect("Expected valid cache file name"); + let stats_path = path.with_file_name(format!("{}.stats", filename)); + + // create and write stats file + let mut stats = ModuleCacheStatistics::default(); + stats.usages += 1; + write_stats_file(&stats_path, &stats); + + // ---------------------- step 2: perform cleanup task if needed + + // acquire lock for cleanup task + // Lock is a proof of recent cleanup task, so we don't want to delete them. + // Expired locks will be deleted by the cleanup task. + let cleanup_file = cache_config::directory().join(".cleanup"); // some non existing marker file + if acquire_task_fs_lock(&cleanup_file, cache_config::cleanup_interval()).is_none() { + return; + } + + trace!("Trying to clean up cache"); + + let mut cache_index = list_cache_contents(); + cache_index.sort_unstable_by(|lhs, rhs| { + // sort by age + use CacheEntry::*; + match (lhs, rhs) { + (Recognized { mtime: lhs_mt, .. }, Recognized { mtime: rhs_mt, .. }) => { + rhs_mt.cmp(lhs_mt) + } // later == younger + // unrecognized is kind of infinity + (Recognized { .. }, Unrecognized { .. }) => cmp::Ordering::Less, + (Unrecognized { .. }, Recognized { .. }) => cmp::Ordering::Greater, + (Unrecognized { .. }, Unrecognized { .. }) => cmp::Ordering::Equal, + } + }); + + // find "cut" boundary: + // - remove unrecognized files anyway, + // - remove some cache files if some quota has been exceeded + let mut total_size = 0u64; + let mut start_delete_idx = None; + let mut start_delete_idx_if_deleting_recognized_items: Option = None; + + let total_size_limit = cache_config::files_total_size_soft_limit(); + let files_count_limit = cache_config::files_count_soft_limit(); + let tsl_if_deleting = total_size_limit + .checked_mul(cache_config::files_total_size_limit_percent_if_deleting() as u64) + .unwrap() + / 100; + let fcl_if_deleting = files_count_limit + .checked_mul(cache_config::files_count_limit_percent_if_deleting() as u64) + .unwrap() + / 100; + + for (idx, item) in cache_index.iter().enumerate() { + let size = if let CacheEntry::Recognized { size, .. } = item { + size + } else { + start_delete_idx = Some(idx); + break; + }; + + total_size += size; + if start_delete_idx_if_deleting_recognized_items.is_none() { + if total_size >= tsl_if_deleting || (idx + 1) as u64 >= fcl_if_deleting { + start_delete_idx_if_deleting_recognized_items = Some(idx); + } + } + + if total_size >= total_size_limit || (idx + 1) as u64 >= files_count_limit { + start_delete_idx = start_delete_idx_if_deleting_recognized_items; + break; + } + } + + if let Some(idx) = start_delete_idx { + for item in &cache_index[idx..] { + let (result, path, entity) = match item { + CacheEntry::Recognized { path, .. } + | CacheEntry::Unrecognized { + path, + is_dir: false, + } => (fs::remove_file(path), path, "file"), + CacheEntry::Unrecognized { path, is_dir: true } => { + (fs::remove_dir_all(path), path, "directory") + } + }; + if let Err(err) = result { + warn!( + "Failed to remove {} during cleanup, path: {}, err: {}", + entity, + path.display(), + err + ); + } + } + } + + trace!("Task finished: clean up cache"); +} + +fn read_stats_file(path: &Path) -> Option { + fs::read(path) + .map_err(|err| { + trace!( + "Failed to read stats file, path: {}, err: {}", + path.display(), + err + ) + }) + .and_then(|bytes| { + toml::from_slice::(&bytes[..]).map_err(|err| { + trace!( + "Failed to parse stats file, path: {}, err: {}", + path.display(), + err, + ) + }) + }) + .ok() +} + +fn write_stats_file(path: &Path, stats: &ModuleCacheStatistics) -> bool { + toml::to_string_pretty(&stats) + .map_err(|err| { + warn!( + "Failed to serialize stats file, path: {}, err: {}", + path.display(), + err + ) + }) + .and_then(|serialized| { + if fs_write_atomic(path, "stats", serialized.as_bytes()) { + Ok(()) + } else { + Err(()) + } + }) + .is_ok() +} + +// Be fault tolerant: list as much as you can, and ignore the rest +fn list_cache_contents() -> Vec { + fn enter_dir(vec: &mut Vec, dir_path: &Path, level: u8) { + macro_rules! unwrap_or { + ($result:expr, $cont:stmt, $err_msg:expr) => { + unwrap_or!($result, $cont, $err_msg, dir_path) + }; + ($result:expr, $cont:stmt, $err_msg:expr, $path:expr) => { + match $result { + Ok(val) => val, + Err(err) => { + warn!( + "{}, level: {}, path: {}, msg: {}", + $err_msg, + level, + $path.display(), + err + ); + $cont + } + } + }; + } + macro_rules! add_unrecognized { + (file: $path:expr) => { + add_unrecognized!(false, $path) + }; + (dir: $path:expr) => { + add_unrecognized!(true, $path) + }; + ($is_dir:expr, $path:expr) => { + vec.push(CacheEntry::Unrecognized { + path: $path.to_path_buf(), + is_dir: $is_dir, + }); + }; + } + macro_rules! add_unrecognized_and { + ([ $( $ty:ident: $path:expr ),* ], $cont:stmt) => {{ + $( add_unrecognized!($ty: $path); )* + $cont + }}; + } + + // If we fail to list a directory, something bad is happening anyway + // (something touches our cache or we have disk failure) + // Try to delete it, so we can stay within soft limits of the cache size. + // This comment applies later in this function, too. + let it = unwrap_or!( + fs::read_dir(dir_path), + add_unrecognized_and!([dir: dir_path], return), + "Failed to list cache directory, deleting it" + ); + + let mut cache_files = HashMap::new(); + for entry in it { + // read_dir() returns an iterator over results - in case some of them are errors + // we don't know their names, so we can't delete them. We don't want to delete + // the whole directory with good entries too, so we just ignore the erroneous entries. + let entry = unwrap_or!( + entry, + continue, + "Failed to read a cache dir entry (NOT deleting it, it still occupies space)" + ); + let path = entry.path(); + match (level, path.is_dir()) { + (0..=1, true) => enter_dir(vec, &path, level + 1), + (0..=1, false) => { + if level == 0 && path.file_stem() == Some(OsStr::new(".cleanup")) { + if let Some(_) = path.extension() { + // assume it's cleanup lock + if !is_fs_lock_expired( + Some(&entry), + &path, + cache_config::cleanup_interval(), + ) { + continue; // skip active lock + } + } + } + add_unrecognized!(file: path); + } + (2, false) => { + // assumption: only mod cache (no ext), .stats & .wip-* files + let ext = path.extension(); + if ext.is_none() || ext == Some(OsStr::new("stats")) { + cache_files.insert(path, entry); + } else { + // assume it's .wip file (lock) + if is_fs_lock_expired( + Some(&entry), + &path, + cache_config::optimizing_compression_task_timeout(), + ) { + add_unrecognized!(file: path); + } // else: skip active lock + } + } + (_, is_dir) => add_unrecognized!(is_dir, path), + } + } + + // associate module with its stats & handle them + // assumption: just mods and stats + for (path, entry) in cache_files.iter() { + let path_buf: PathBuf; + let (mod_, stats_, is_mod) = match path.extension() { + Some(_) => { + path_buf = path.with_extension(""); + ( + cache_files.get(&path_buf).map(|v| (&path_buf, v)), + Some((path, entry)), + false, + ) + } + None => { + path_buf = path.with_extension("stats"); + ( + Some((path, entry)), + cache_files.get(&path_buf).map(|v| (&path_buf, v)), + true, + ) + } + }; + + // construct a cache entry + match (mod_, stats_, is_mod) { + (Some((mod_path, mod_entry)), Some((stats_path, stats_entry)), true) => { + let mod_metadata = unwrap_or!( + mod_entry.metadata(), + add_unrecognized_and!([file: stats_path, file: mod_path], continue), + "Failed to get metadata, deleting BOTH module cache and stats files", + mod_path + ); + let stats_mtime = unwrap_or!( + stats_entry.metadata().and_then(|m| m.modified()), + add_unrecognized_and!( + [file: stats_path], + unwrap_or!( + mod_metadata.modified(), + add_unrecognized_and!([file: stats_path, file: mod_path], continue), + "Failed to get mtime, deleting BOTH module cache and stats files", + mod_path + ) + ), + "Failed to get metadata/mtime, deleting the file", + stats_path + ); + vec.push(CacheEntry::Recognized { + path: mod_path.to_path_buf(), + mtime: stats_mtime, + size: mod_metadata.len(), + }) + } + (Some(_), Some(_), false) => (), // was or will be handled by previous branch + (Some((mod_path, mod_entry)), None, _) => { + let (mod_metadata, mod_mtime) = unwrap_or!( + mod_entry + .metadata() + .and_then(|md| md.modified().map(|mt| (md, mt))), + add_unrecognized_and!([file: mod_path], continue), + "Failed to get metadata/mtime, deleting the file", + mod_path + ); + vec.push(CacheEntry::Recognized { + path: mod_path.to_path_buf(), + mtime: mod_mtime, + size: mod_metadata.len(), + }) + } + (None, Some((stats_path, _stats_entry)), _) => { + debug!("Found orphaned stats file: {}", stats_path.display()); + add_unrecognized!(file: stats_path); + } + _ => unreachable!(), + } + } + } + + let mut vec = Vec::new(); + enter_dir(&mut vec, cache_config::directory(), 0); + vec +} + +/// Tries to acquire a lock for specific task. +/// +/// Returns Some(path) to the lock if succeeds. The task path must not +/// contain any extension and have file stem. +/// +/// To release a lock you need either manually rename or remove it, +/// or wait until it expires and cleanup task removes it. +/// +/// Note: this function is racy. Main idea is: be fault tolerant and +/// never block some task. The price is that we rarely do some task +/// more than once. +fn acquire_task_fs_lock(task_path: &Path, timeout: Duration) -> Option { + assert!(task_path.extension().is_none()); + assert!(task_path.file_stem().is_some()); + + // list directory + let dir_path = task_path.parent()?; + let it = fs::read_dir(dir_path) + .map_err(|err| { + warn!( + "Failed to list cache directory, path: {}, err: {}", + dir_path.display(), + err + ) + }) + .ok()?; + + // look for existing locks + for entry in it { + let entry = entry + .map_err(|err| { + warn!( + "Failed to list cache directory, path: {}, err: {}", + dir_path.display(), + err + ) + }) + .ok()?; + + let path = entry.path(); + if path.is_dir() || path.file_stem() != task_path.file_stem() { + continue; + } + + // check extension and mtime + match path.extension() { + None => continue, + Some(ext) => { + if let Some(ext_str) = ext.to_str() { + // if it's None, i.e. not valid UTF-8 string, then that's not our lock for sure + if ext_str.starts_with("wip-") + && !is_fs_lock_expired(Some(&entry), &path, timeout) + { + return None; + } + } + } + } + } + + // create the lock + let lock_path = task_path.with_extension(format!("wip-{}", std::process::id())); + let _file = fs::OpenOptions::new() + .create_new(true) + .write(true) + .open(&lock_path) + .map_err(|err| { + warn!( + "Failed to create lock file (note: it shouldn't exists): path: {}, err: {}", + lock_path.display(), + err + ) + }) + .ok()?; + + Some(lock_path) +} + +// we have either both, or just path; dir entry is desirable since on some platforms we can get +// metadata without extra syscalls +// futhermore: it's better to get a path if we have it instead of allocating a new one from the dir entry +fn is_fs_lock_expired(entry: Option<&fs::DirEntry>, path: &PathBuf, threshold: Duration) -> bool { + let mtime = match entry + .map(|e| e.metadata()) + .unwrap_or_else(|| path.metadata()) + .and_then(|metadata| metadata.modified()) + { + Ok(mt) => mt, + Err(err) => { + warn!( + "Failed to get metadata/mtime, treating as an expired lock, path: {}, err: {}", + path.display(), + err + ); + return true; // can't read mtime, treat as expired, so this task will not be starved + } + }; + + match mtime.elapsed() { + Ok(elapsed) => elapsed >= threshold, + Err(err) => { + trace!( + "Found mtime in the future, treating as a not expired lock, path: {}, err: {}", + path.display(), + err + ); + // the lock is expired if the time is too far in the future + // it is fine to have network share and not synchronized clocks, + // but it's not good when user changes time in their system clock + static DEFAULT_THRESHOLD: Duration = Duration::from_secs(60 * 60 * 24); // todo dependant refactor PR adds this as a setting + err.duration() > DEFAULT_THRESHOLD + } + } +} + +// todo tests diff --git a/wasmtime-environ/tests/cache_default_config_in_memory.rs b/wasmtime-environ/tests/cache_default_config_in_memory.rs index 30fe195087..dab9960e32 100644 --- a/wasmtime-environ/tests/cache_default_config_in_memory.rs +++ b/wasmtime-environ/tests/cache_default_config_in_memory.rs @@ -2,6 +2,9 @@ use wasmtime_environ::cache_config; #[test] fn test_cache_default_config_in_memory() { - let errors = cache_config::init::<&str>(true, None, false); - assert!(errors.is_empty()); + let errors = cache_config::init::<&str>(true, None, false, None); + assert!( + errors.is_empty(), + "This test loads config from the default location, if there's one. Make sure it's correct!" + ); } diff --git a/wasmtime-environ/tests/cache_fail_calling_init_twice.rs b/wasmtime-environ/tests/cache_fail_calling_init_twice.rs index 557faf2eef..b575e3f9b9 100644 --- a/wasmtime-environ/tests/cache_fail_calling_init_twice.rs +++ b/wasmtime-environ/tests/cache_fail_calling_init_twice.rs @@ -20,7 +20,7 @@ fn test_cache_fail_calling_init_twice() { ); fs::write(&config_path, config_content).expect("Failed to write test config file"); - let errors = cache_config::init(true, Some(&config_path), false); + let errors = cache_config::init(true, Some(&config_path), false, None); assert!(errors.is_empty()); - let _errors = cache_config::init(true, Some(&config_path), false); + let _errors = cache_config::init(true, Some(&config_path), false, None); } diff --git a/wasmtime-environ/tests/cache_fail_invalid_config.rs b/wasmtime-environ/tests/cache_fail_invalid_config.rs index 05a69ba568..42a4512b11 100644 --- a/wasmtime-environ/tests/cache_fail_invalid_config.rs +++ b/wasmtime-environ/tests/cache_fail_invalid_config.rs @@ -18,6 +18,6 @@ fn test_cache_fail_invalid_config() { ); fs::write(&config_path, config_content).expect("Failed to write test config file"); - let errors = cache_config::init(true, Some(&config_path), false); + let errors = cache_config::init(true, Some(&config_path), false, None); assert!(!errors.is_empty()); } diff --git a/wasmtime-environ/tests/cache_fail_invalid_path_to_config.rs b/wasmtime-environ/tests/cache_fail_invalid_path_to_config.rs index 8a469369ae..819734adea 100644 --- a/wasmtime-environ/tests/cache_fail_invalid_path_to_config.rs +++ b/wasmtime-environ/tests/cache_fail_invalid_path_to_config.rs @@ -5,6 +5,6 @@ use wasmtime_environ::cache_config; fn test_cache_fail_invalid_path_to_config() { let dir = tempfile::tempdir().expect("Can't create temporary directory"); let config_path = dir.path().join("cache-config.toml"); // doesn't exist - let errors = cache_config::init(true, Some(&config_path), false); + let errors = cache_config::init(true, Some(&config_path), false, None); assert!(!errors.is_empty()); } diff --git a/wasmtime-environ/tests/cache_write_default_config.rs b/wasmtime-environ/tests/cache_write_default_config.rs index 4a3cebef17..b4d56e45b6 100644 --- a/wasmtime-environ/tests/cache_write_default_config.rs +++ b/wasmtime-environ/tests/cache_write_default_config.rs @@ -6,7 +6,7 @@ fn test_cache_write_default_config() { let dir = tempfile::tempdir().expect("Can't create temporary directory"); let config_path = dir.path().join("cache-config.toml"); - let errors = cache_config::init(true, Some(&config_path), true); + let errors = cache_config::init(true, Some(&config_path), true, None); assert!(errors.is_empty()); assert!(config_path.exists()); } diff --git a/wasmtime-runtime/Cargo.toml b/wasmtime-runtime/Cargo.toml index df9b489855..ae5898747e 100644 --- a/wasmtime-runtime/Cargo.toml +++ b/wasmtime-runtime/Cargo.toml @@ -17,7 +17,7 @@ cranelift-wasm = { version = "0.41.0", features = ["enable-serde"] } wasmtime-environ = { path = "../wasmtime-environ", default-features = false } region = "2.0.0" lazy_static = "1.2.0" -libc = { version = "0.2.48", default-features = false } +libc = { version = "0.2.60", default-features = false } errno = "0.2.4" memoffset = "0.5.1" failure = { version = "0.1.3", default-features = false } @@ -25,7 +25,7 @@ failure_derive = { version = "0.1.3", default-features = false } indexmap = "1.0.2" [target.'cfg(target_os = "windows")'.dependencies] -winapi = { version = "0.3.6", features = ["winbase", "memoryapi"] } +winapi = { version = "0.3.7", features = ["winbase", "memoryapi"] } [build-dependencies] cc = "1.0"