Initial reorg.

This is largely the same as #305, but updated for the current tree.
This commit is contained in:
Dan Gohman
2019-11-07 17:11:06 -08:00
parent 2c69546a24
commit 22641de629
351 changed files with 52 additions and 52 deletions

625
crates/environ/src/cache/config.rs vendored Normal file
View File

@@ -0,0 +1,625 @@
//! Module for configuring the cache system.
use super::worker;
use alloc::string::{String, ToString};
use alloc::vec::Vec;
use core::time::Duration;
use directories::ProjectDirs;
use lazy_static::lazy_static;
use log::{debug, error, trace, warn};
use serde::{
de::{self, Deserializer},
Deserialize,
};
use spin::Once;
use std::fmt::Debug;
use std::fs;
use std::mem;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
// wrapped, so we have named section in config,
// also, for possible future compatibility
#[derive(Deserialize, Debug)]
#[serde(deny_unknown_fields)]
struct Config {
cache: CacheConfig,
}
#[derive(Deserialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
pub struct CacheConfig {
#[serde(skip)]
errors: Vec<String>,
enabled: bool,
directory: Option<PathBuf>,
#[serde(
default,
rename = "worker-event-queue-size",
deserialize_with = "deserialize_si_prefix"
)]
worker_event_queue_size: Option<u64>,
#[serde(rename = "baseline-compression-level")]
baseline_compression_level: Option<i32>,
#[serde(rename = "optimized-compression-level")]
optimized_compression_level: Option<i32>,
#[serde(
default,
rename = "optimized-compression-usage-counter-threshold",
deserialize_with = "deserialize_si_prefix"
)]
optimized_compression_usage_counter_threshold: Option<u64>,
#[serde(
default,
rename = "cleanup-interval",
deserialize_with = "deserialize_duration"
)]
cleanup_interval: Option<Duration>,
#[serde(
default,
rename = "optimizing-compression-task-timeout",
deserialize_with = "deserialize_duration"
)]
optimizing_compression_task_timeout: Option<Duration>,
#[serde(
default,
rename = "allowed-clock-drift-for-files-from-future",
deserialize_with = "deserialize_duration"
)]
allowed_clock_drift_for_files_from_future: Option<Duration>,
#[serde(
default,
rename = "file-count-soft-limit",
deserialize_with = "deserialize_si_prefix"
)]
file_count_soft_limit: Option<u64>,
#[serde(
default,
rename = "files-total-size-soft-limit",
deserialize_with = "deserialize_disk_space"
)]
files_total_size_soft_limit: Option<u64>,
#[serde(
default,
rename = "file-count-limit-percent-if-deleting",
deserialize_with = "deserialize_percent"
)]
file_count_limit_percent_if_deleting: Option<u8>,
#[serde(
default,
rename = "files-total-size-limit-percent-if-deleting",
deserialize_with = "deserialize_percent"
)]
files_total_size_limit_percent_if_deleting: Option<u8>,
}
// Private static, so only internal function can access it.
static CONFIG: Once<CacheConfig> = Once::new();
static INIT_CALLED: AtomicBool = AtomicBool::new(false);
/// Returns cache configuration.
///
/// If system has not been initialized, it disables it.
/// You mustn't call init() after it.
pub fn cache_config() -> &'static CacheConfig {
CONFIG.call_once(CacheConfig::new_cache_disabled)
}
/// Initializes the cache system. Should be called exactly once,
/// and before using the cache system. Otherwise it can panic.
/// Returns list of errors. If empty, initialization succeeded.
pub fn init<P: AsRef<Path> + Debug>(
enabled: bool,
config_file: Option<P>,
init_file_per_thread_logger: Option<&'static str>,
) -> &'static Vec<String> {
INIT_CALLED
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.expect("Cache system init must be called at most once");
assert!(
CONFIG.r#try().is_none(),
"Cache system init must be called before using the system."
);
let conf_file_str = format!("{:?}", config_file);
let conf = CONFIG.call_once(|| CacheConfig::from_file(enabled, config_file));
if conf.errors.is_empty() {
if conf.enabled() {
worker::init(init_file_per_thread_logger);
}
debug!("Cache init(\"{}\"): {:#?}", conf_file_str, conf)
} else {
error!(
"Cache init(\"{}\"): errors: {:#?}",
conf_file_str, conf.errors,
)
}
&conf.errors
}
/// Creates a new configuration file at specified path, or default path if None is passed.
/// Fails if file already exists.
pub fn create_new_config<P: AsRef<Path> + Debug>(
config_file: Option<P>,
) -> Result<PathBuf, String> {
trace!("Creating new config file, path: {:?}", config_file);
let config_file = config_file.as_ref().map_or_else(
|| DEFAULT_CONFIG_PATH.as_ref().map(|p| p.as_ref()),
|p| Ok(p.as_ref()),
)?;
if config_file.exists() {
Err(format!(
"Specified config file already exists! Path: {}",
config_file.display()
))?;
}
let parent_dir = config_file
.parent()
.ok_or_else(|| format!("Invalid cache config path: {}", config_file.display()))?;
fs::create_dir_all(parent_dir).map_err(|err| {
format!(
"Failed to create config directory, config path: {}, error: {}",
config_file.display(),
err
)
})?;
let content = "\
# Comment out certain settings to use default values.
# For more settings, please refer to the documentation:
# https://github.com/CraneStation/wasmtime/blob/master/CACHE_CONFIGURATION.md
[cache]
enabled = true
";
fs::write(&config_file, &content).map_err(|err| {
format!(
"Failed to flush config to the disk, path: {}, msg: {}",
config_file.display(),
err
)
})?;
Ok(config_file.to_path_buf())
}
// permitted levels from: https://docs.rs/zstd/0.4.28+zstd.1.4.3/zstd/stream/write/struct.Encoder.html
const ZSTD_COMPRESSION_LEVELS: std::ops::RangeInclusive<i32> = 0..=21;
lazy_static! {
static ref PROJECT_DIRS: Option<ProjectDirs> =
ProjectDirs::from("", "CraneStation", "wasmtime");
static ref DEFAULT_CONFIG_PATH: Result<PathBuf, String> = PROJECT_DIRS
.as_ref()
.map(|proj_dirs| proj_dirs.config_dir().join("wasmtime-cache-config.toml"))
.ok_or_else(|| "Config file not specified and failed to get the default".to_string());
}
// Default settings, you're welcome to tune them!
// TODO: what do we want to warn users about?
// At the moment of writing, the modules couldn't depend on anothers,
// so we have at most one module per wasmtime instance
// if changed, update CACHE_CONFIGURATION.md
const DEFAULT_WORKER_EVENT_QUEUE_SIZE: u64 = 0x10;
const WORKER_EVENT_QUEUE_SIZE_WARNING_TRESHOLD: u64 = 3;
// should be quick and provide good enough compression
// if changed, update CACHE_CONFIGURATION.md
const DEFAULT_BASELINE_COMPRESSION_LEVEL: i32 = zstd::DEFAULT_COMPRESSION_LEVEL;
// should provide significantly better compression than baseline
// if changed, update CACHE_CONFIGURATION.md
const DEFAULT_OPTIMIZED_COMPRESSION_LEVEL: i32 = 20;
// shouldn't be to low to avoid recompressing too many files
// if changed, update CACHE_CONFIGURATION.md
const DEFAULT_OPTIMIZED_COMPRESSION_USAGE_COUNTER_THRESHOLD: u64 = 0x100;
// if changed, update CACHE_CONFIGURATION.md
const DEFAULT_CLEANUP_INTERVAL: Duration = Duration::from_secs(60 * 60);
// if changed, update CACHE_CONFIGURATION.md
const DEFAULT_OPTIMIZING_COMPRESSION_TASK_TIMEOUT: Duration = Duration::from_secs(30 * 60);
// the default assumes problems with timezone configuration on network share + some clock drift
// please notice 24 timezones = max 23h difference between some of them
// if changed, update CACHE_CONFIGURATION.md
const DEFAULT_ALLOWED_CLOCK_DRIFT_FOR_FILES_FROM_FUTURE: Duration =
Duration::from_secs(60 * 60 * 24);
// if changed, update CACHE_CONFIGURATION.md
const DEFAULT_FILE_COUNT_SOFT_LIMIT: u64 = 0x10_000;
// if changed, update CACHE_CONFIGURATION.md
const DEFAULT_FILES_TOTAL_SIZE_SOFT_LIMIT: u64 = 1024 * 1024 * 512;
// if changed, update CACHE_CONFIGURATION.md
const DEFAULT_FILE_COUNT_LIMIT_PERCENT_IF_DELETING: u8 = 70;
// if changed, update CACHE_CONFIGURATION.md
const DEFAULT_FILES_TOTAL_SIZE_LIMIT_PERCENT_IF_DELETING: u8 = 70;
// Deserializers of our custom formats
// can be replaced with const generics later
macro_rules! generate_deserializer {
($name:ident($numname:ident: $numty:ty, $unitname:ident: &str) -> $retty:ty {$body:expr}) => {
fn $name<'de, D>(deserializer: D) -> Result<$retty, D::Error>
where
D: Deserializer<'de>,
{
let text = Option::<String>::deserialize(deserializer)?;
let text = match text {
None => return Ok(None),
Some(text) => text,
};
let text = text.trim();
let split_point = text.find(|c: char| !c.is_numeric());
let (num, unit) = split_point.map_or_else(|| (text, ""), |p| text.split_at(p));
let deserialized = (|| {
let $numname = num.parse::<$numty>().ok()?;
let $unitname = unit.trim();
$body
})();
if deserialized.is_some() {
Ok(deserialized)
} else {
Err(de::Error::custom(
"Invalid value, please refer to the documentation",
))
}
}
};
}
generate_deserializer!(deserialize_duration(num: u64, unit: &str) -> Option<Duration> {
match unit {
"s" => Some(Duration::from_secs(num)),
"m" => Some(Duration::from_secs(num * 60)),
"h" => Some(Duration::from_secs(num * 60 * 60)),
"d" => Some(Duration::from_secs(num * 60 * 60 * 24)),
_ => None,
}
});
generate_deserializer!(deserialize_si_prefix(num: u64, unit: &str) -> Option<u64> {
match unit {
"" => Some(num),
"K" => num.checked_mul(1_000),
"M" => num.checked_mul(1_000_000),
"G" => num.checked_mul(1_000_000_000),
"T" => num.checked_mul(1_000_000_000_000),
"P" => num.checked_mul(1_000_000_000_000_000),
_ => None,
}
});
generate_deserializer!(deserialize_disk_space(num: u64, unit: &str) -> Option<u64> {
match unit {
"" => Some(num),
"K" => num.checked_mul(1_000),
"Ki" => num.checked_mul(1u64 << 10),
"M" => num.checked_mul(1_000_000),
"Mi" => num.checked_mul(1u64 << 20),
"G" => num.checked_mul(1_000_000_000),
"Gi" => num.checked_mul(1u64 << 30),
"T" => num.checked_mul(1_000_000_000_000),
"Ti" => num.checked_mul(1u64 << 40),
"P" => num.checked_mul(1_000_000_000_000_000),
"Pi" => num.checked_mul(1u64 << 50),
_ => None,
}
});
generate_deserializer!(deserialize_percent(num: u8, unit: &str) -> Option<u8> {
match unit {
"%" => Some(num),
_ => None,
}
});
static CACHE_IMPROPER_CONFIG_ERROR_MSG: &str =
"Cache system should be enabled and all settings must be validated or defaulted";
macro_rules! generate_setting_getter {
($setting:ident: $setting_type:ty) => {
/// Returns `$setting`.
///
/// Panics if the cache is disabled.
pub fn $setting(&self) -> $setting_type {
self
.$setting
.expect(CACHE_IMPROPER_CONFIG_ERROR_MSG)
}
};
}
impl CacheConfig {
generate_setting_getter!(worker_event_queue_size: u64);
generate_setting_getter!(baseline_compression_level: i32);
generate_setting_getter!(optimized_compression_level: i32);
generate_setting_getter!(optimized_compression_usage_counter_threshold: u64);
generate_setting_getter!(cleanup_interval: Duration);
generate_setting_getter!(optimizing_compression_task_timeout: Duration);
generate_setting_getter!(allowed_clock_drift_for_files_from_future: Duration);
generate_setting_getter!(file_count_soft_limit: u64);
generate_setting_getter!(files_total_size_soft_limit: u64);
generate_setting_getter!(file_count_limit_percent_if_deleting: u8);
generate_setting_getter!(files_total_size_limit_percent_if_deleting: u8);
/// Returns true if and only if the cache is enabled.
pub fn enabled(&self) -> bool {
self.enabled
}
/// Returns path to the cache directory.
///
/// Panics if the cache is disabled.
pub fn directory(&self) -> &PathBuf {
self.directory
.as_ref()
.expect(CACHE_IMPROPER_CONFIG_ERROR_MSG)
}
pub fn new_cache_disabled() -> Self {
Self {
errors: Vec::new(),
enabled: false,
directory: None,
worker_event_queue_size: None,
baseline_compression_level: None,
optimized_compression_level: None,
optimized_compression_usage_counter_threshold: None,
cleanup_interval: None,
optimizing_compression_task_timeout: None,
allowed_clock_drift_for_files_from_future: None,
file_count_soft_limit: None,
files_total_size_soft_limit: None,
file_count_limit_percent_if_deleting: None,
files_total_size_limit_percent_if_deleting: None,
}
}
fn new_cache_enabled_template() -> Self {
let mut conf = Self::new_cache_disabled();
conf.enabled = true;
conf
}
fn new_cache_with_errors(errors: Vec<String>) -> Self {
let mut conf = Self::new_cache_disabled();
conf.errors = errors;
conf
}
pub fn from_file<P: AsRef<Path>>(enabled: bool, config_file: Option<P>) -> Self {
if !enabled {
return Self::new_cache_disabled();
}
let mut config = match Self::load_and_parse_file(config_file) {
Ok(data) => data,
Err(err) => return Self::new_cache_with_errors(vec![err]),
};
// validate values and fill in defaults
config.validate_directory_or_default();
config.validate_worker_event_queue_size_or_default();
config.validate_baseline_compression_level_or_default();
config.validate_optimized_compression_level_or_default();
config.validate_optimized_compression_usage_counter_threshold_or_default();
config.validate_cleanup_interval_or_default();
config.validate_optimizing_compression_task_timeout_or_default();
config.validate_allowed_clock_drift_for_files_from_future_or_default();
config.validate_file_count_soft_limit_or_default();
config.validate_files_total_size_soft_limit_or_default();
config.validate_file_count_limit_percent_if_deleting_or_default();
config.validate_files_total_size_limit_percent_if_deleting_or_default();
config.disable_if_any_error();
config
}
fn load_and_parse_file<P: AsRef<Path>>(config_file: Option<P>) -> Result<Self, String> {
// get config file path
let (config_file, user_custom_file) = config_file.as_ref().map_or_else(
|| DEFAULT_CONFIG_PATH.as_ref().map(|p| (p.as_ref(), false)),
|p| Ok((p.as_ref(), true)),
)?;
// read config, or use default one
let entity_exists = config_file.exists();
match (entity_exists, user_custom_file) {
(false, false) => Ok(Self::new_cache_enabled_template()),
_ => match fs::read(&config_file) {
Ok(bytes) => match toml::from_slice::<Config>(&bytes[..]) {
Ok(config) => Ok(config.cache),
Err(err) => Err(format!(
"Failed to parse config file, path: {}, error: {}",
config_file.display(),
err
)),
},
Err(err) => Err(format!(
"Failed to read config file, path: {}, error: {}",
config_file.display(),
err
)),
},
}
}
fn validate_directory_or_default(&mut self) {
if self.directory.is_none() {
match &*PROJECT_DIRS {
Some(proj_dirs) => self.directory = Some(proj_dirs.cache_dir().to_path_buf()),
None => {
self.errors.push(
"Cache directory not specified and failed to get the default".to_string(),
);
return;
}
}
}
// On Windows, if we want long paths, we need '\\?\' prefix, but it doesn't work
// with relative paths. One way to get absolute path (the only one?) is to use
// fs::canonicalize, but it requires that given path exists. The extra advantage
// of this method is fact that the method prepends '\\?\' on Windows.
let cache_dir = self.directory.as_ref().unwrap();
if !cache_dir.is_absolute() {
self.errors.push(format!(
"Cache directory path has to be absolute, path: {}",
cache_dir.display(),
));
return;
}
match fs::create_dir_all(cache_dir) {
Ok(()) => (),
Err(err) => {
self.errors.push(format!(
"Failed to create the cache directory, path: {}, error: {}",
cache_dir.display(),
err
));
return;
}
};
match fs::canonicalize(cache_dir) {
Ok(p) => self.directory = Some(p),
Err(err) => {
self.errors.push(format!(
"Failed to canonicalize the cache directory, path: {}, error: {}",
cache_dir.display(),
err
));
}
}
}
fn validate_worker_event_queue_size_or_default(&mut self) {
if self.worker_event_queue_size.is_none() {
self.worker_event_queue_size = Some(DEFAULT_WORKER_EVENT_QUEUE_SIZE);
}
if self.worker_event_queue_size.unwrap() < WORKER_EVENT_QUEUE_SIZE_WARNING_TRESHOLD {
warn!("Detected small worker event queue size. Some messages might be lost.");
}
}
fn validate_baseline_compression_level_or_default(&mut self) {
if self.baseline_compression_level.is_none() {
self.baseline_compression_level = Some(DEFAULT_BASELINE_COMPRESSION_LEVEL);
}
if !ZSTD_COMPRESSION_LEVELS.contains(&self.baseline_compression_level.unwrap()) {
self.errors.push(format!(
"Invalid baseline compression level: {} not in {:#?}",
self.baseline_compression_level.unwrap(),
ZSTD_COMPRESSION_LEVELS
));
}
}
// assumption: baseline compression level has been verified
fn validate_optimized_compression_level_or_default(&mut self) {
if self.optimized_compression_level.is_none() {
self.optimized_compression_level = Some(DEFAULT_OPTIMIZED_COMPRESSION_LEVEL);
}
let opt_lvl = self.optimized_compression_level.unwrap();
let base_lvl = self.baseline_compression_level.unwrap();
if !ZSTD_COMPRESSION_LEVELS.contains(&opt_lvl) {
self.errors.push(format!(
"Invalid optimized compression level: {} not in {:#?}",
opt_lvl, ZSTD_COMPRESSION_LEVELS
));
}
if opt_lvl < base_lvl {
self.errors.push(format!(
"Invalid optimized compression level is lower than baseline: {} < {}",
opt_lvl, base_lvl
));
}
}
fn validate_optimized_compression_usage_counter_threshold_or_default(&mut self) {
if self.optimized_compression_usage_counter_threshold.is_none() {
self.optimized_compression_usage_counter_threshold =
Some(DEFAULT_OPTIMIZED_COMPRESSION_USAGE_COUNTER_THRESHOLD);
}
}
fn validate_cleanup_interval_or_default(&mut self) {
if self.cleanup_interval.is_none() {
self.cleanup_interval = Some(DEFAULT_CLEANUP_INTERVAL);
}
}
fn validate_optimizing_compression_task_timeout_or_default(&mut self) {
if self.optimizing_compression_task_timeout.is_none() {
self.optimizing_compression_task_timeout =
Some(DEFAULT_OPTIMIZING_COMPRESSION_TASK_TIMEOUT);
}
}
fn validate_allowed_clock_drift_for_files_from_future_or_default(&mut self) {
if self.allowed_clock_drift_for_files_from_future.is_none() {
self.allowed_clock_drift_for_files_from_future =
Some(DEFAULT_ALLOWED_CLOCK_DRIFT_FOR_FILES_FROM_FUTURE);
}
}
fn validate_file_count_soft_limit_or_default(&mut self) {
if self.file_count_soft_limit.is_none() {
self.file_count_soft_limit = Some(DEFAULT_FILE_COUNT_SOFT_LIMIT);
}
}
fn validate_files_total_size_soft_limit_or_default(&mut self) {
if self.files_total_size_soft_limit.is_none() {
self.files_total_size_soft_limit = Some(DEFAULT_FILES_TOTAL_SIZE_SOFT_LIMIT);
}
}
fn validate_file_count_limit_percent_if_deleting_or_default(&mut self) {
if self.file_count_limit_percent_if_deleting.is_none() {
self.file_count_limit_percent_if_deleting =
Some(DEFAULT_FILE_COUNT_LIMIT_PERCENT_IF_DELETING);
}
let percent = self.file_count_limit_percent_if_deleting.unwrap();
if percent > 100 {
self.errors.push(format!(
"Invalid files count limit percent if deleting: {} not in range 0-100%",
percent
));
}
}
fn validate_files_total_size_limit_percent_if_deleting_or_default(&mut self) {
if self.files_total_size_limit_percent_if_deleting.is_none() {
self.files_total_size_limit_percent_if_deleting =
Some(DEFAULT_FILES_TOTAL_SIZE_LIMIT_PERCENT_IF_DELETING);
}
let percent = self.files_total_size_limit_percent_if_deleting.unwrap();
if percent > 100 {
self.errors.push(format!(
"Invalid files total size limit percent if deleting: {} not in range 0-100%",
percent
));
}
}
fn disable_if_any_error(&mut self) {
if !self.errors.is_empty() {
let mut conf = Self::new_cache_disabled();
mem::swap(self, &mut conf);
mem::swap(&mut self.errors, &mut conf.errors);
}
}
}
#[cfg(test)]
#[macro_use]
pub mod tests;

581
crates/environ/src/cache/config/tests.rs vendored Normal file
View File

@@ -0,0 +1,581 @@
use super::CacheConfig;
use core::time::Duration;
use std::fs;
use std::path::PathBuf;
use tempfile::{self, TempDir};
// note: config loading during validation creates cache directory to canonicalize its path,
// that's why these function and macro always use custom cache directory
// note: tempdir removes directory when being dropped, so we need to return it to the caller,
// so the paths are valid
pub fn test_prolog() -> (TempDir, PathBuf, PathBuf) {
let _ = pretty_env_logger::try_init();
let temp_dir = tempfile::tempdir().expect("Can't create temporary directory");
let cache_dir = temp_dir.path().join("cache-dir");
let config_path = temp_dir.path().join("cache-config.toml");
(temp_dir, cache_dir, config_path)
}
macro_rules! load_config {
($config_path:ident, $content_fmt:expr, $cache_dir:ident) => {{
let config_path = &$config_path;
let content = format!(
$content_fmt,
cache_dir = toml::to_string_pretty(&format!("{}", $cache_dir.display())).unwrap()
);
fs::write(config_path, content).expect("Failed to write test config file");
CacheConfig::from_file(true, Some(config_path))
}};
}
// test without macros to test being disabled
#[test]
fn test_disabled() {
let dir = tempfile::tempdir().expect("Can't create temporary directory");
let config_path = dir.path().join("cache-config.toml");
let config_content = "[cache]\n\
enabled = true\n";
fs::write(&config_path, config_content).expect("Failed to write test config file");
let conf = CacheConfig::from_file(false, Some(&config_path));
assert!(!conf.enabled());
assert!(conf.errors.is_empty());
let config_content = "[cache]\n\
enabled = false\n";
fs::write(&config_path, config_content).expect("Failed to write test config file");
let conf = CacheConfig::from_file(true, Some(&config_path));
assert!(!conf.enabled());
assert!(conf.errors.is_empty());
}
#[test]
fn test_unrecognized_settings() {
let (_td, cd, cp) = test_prolog();
let conf = load_config!(
cp,
"unrecognized-setting = 42\n\
[cache]\n\
enabled = true\n\
directory = {cache_dir}",
cd
);
assert!(!conf.enabled());
assert!(!conf.errors.is_empty());
let conf = load_config!(
cp,
"[cache]\n\
enabled = true\n\
directory = {cache_dir}\n\
unrecognized-setting = 42",
cd
);
assert!(!conf.enabled());
assert!(!conf.errors.is_empty());
}
#[test]
fn test_all_settings() {
let (_td, cd, cp) = test_prolog();
let conf = load_config!(
cp,
"[cache]\n\
enabled = true\n\
directory = {cache_dir}\n\
worker-event-queue-size = '16'\n\
baseline-compression-level = 3\n\
optimized-compression-level = 20\n\
optimized-compression-usage-counter-threshold = '256'\n\
cleanup-interval = '1h'\n\
optimizing-compression-task-timeout = '30m'\n\
allowed-clock-drift-for-files-from-future = '1d'\n\
file-count-soft-limit = '65536'\n\
files-total-size-soft-limit = '512Mi'\n\
file-count-limit-percent-if-deleting = '70%'\n\
files-total-size-limit-percent-if-deleting = '70%'",
cd
);
check_conf(&conf, &cd);
let conf = load_config!(
cp,
// added some white spaces
"[cache]\n\
enabled = true\n\
directory = {cache_dir}\n\
worker-event-queue-size = ' 16\t'\n\
baseline-compression-level = 3\n\
optimized-compression-level =\t 20\n\
optimized-compression-usage-counter-threshold = '256'\n\
cleanup-interval = ' 1h'\n\
optimizing-compression-task-timeout = '30 m'\n\
allowed-clock-drift-for-files-from-future = '1\td'\n\
file-count-soft-limit = '\t \t65536\t'\n\
files-total-size-soft-limit = '512\t\t Mi '\n\
file-count-limit-percent-if-deleting = '70\t%'\n\
files-total-size-limit-percent-if-deleting = ' 70 %'",
cd
);
check_conf(&conf, &cd);
fn check_conf(conf: &CacheConfig, cd: &PathBuf) {
eprintln!("errors: {:#?}", conf.errors);
assert!(conf.enabled());
assert!(conf.errors.is_empty());
assert_eq!(
conf.directory(),
&fs::canonicalize(cd).expect("canonicalize failed")
);
assert_eq!(conf.worker_event_queue_size(), 0x10);
assert_eq!(conf.baseline_compression_level(), 3);
assert_eq!(conf.optimized_compression_level(), 20);
assert_eq!(conf.optimized_compression_usage_counter_threshold(), 0x100);
assert_eq!(conf.cleanup_interval(), Duration::from_secs(60 * 60));
assert_eq!(
conf.optimizing_compression_task_timeout(),
Duration::from_secs(30 * 60)
);
assert_eq!(
conf.allowed_clock_drift_for_files_from_future(),
Duration::from_secs(60 * 60 * 24)
);
assert_eq!(conf.file_count_soft_limit(), 0x10_000);
assert_eq!(conf.files_total_size_soft_limit(), 512 * (1u64 << 20));
assert_eq!(conf.file_count_limit_percent_if_deleting(), 70);
assert_eq!(conf.files_total_size_limit_percent_if_deleting(), 70);
}
}
#[test]
fn test_compression_level_settings() {
let (_td, cd, cp) = test_prolog();
let conf = load_config!(
cp,
"[cache]\n\
enabled = true\n\
directory = {cache_dir}\n\
baseline-compression-level = 1\n\
optimized-compression-level = 21",
cd
);
assert!(conf.enabled());
assert!(conf.errors.is_empty());
assert_eq!(conf.baseline_compression_level(), 1);
assert_eq!(conf.optimized_compression_level(), 21);
let conf = load_config!(
cp,
"[cache]\n\
enabled = true\n\
directory = {cache_dir}\n\
baseline-compression-level = -1\n\
optimized-compression-level = 21",
cd
);
assert!(!conf.enabled());
assert!(!conf.errors.is_empty());
let conf = load_config!(
cp,
"[cache]\n\
enabled = true\n\
directory = {cache_dir}\n\
baseline-compression-level = 15\n\
optimized-compression-level = 10",
cd
);
assert!(!conf.enabled());
assert!(!conf.errors.is_empty());
}
#[test]
fn test_si_prefix_settings() {
let (_td, cd, cp) = test_prolog();
let conf = load_config!(
cp,
"[cache]\n\
enabled = true\n\
directory = {cache_dir}\n\
worker-event-queue-size = '42'\n\
optimized-compression-usage-counter-threshold = '4K'\n\
file-count-soft-limit = '3M'",
cd
);
assert!(conf.enabled());
assert!(conf.errors.is_empty());
assert_eq!(conf.worker_event_queue_size(), 42);
assert_eq!(conf.optimized_compression_usage_counter_threshold(), 4_000);
assert_eq!(conf.file_count_soft_limit(), 3_000_000);
let conf = load_config!(
cp,
"[cache]\n\
enabled = true\n\
directory = {cache_dir}\n\
worker-event-queue-size = '2G'\n\
optimized-compression-usage-counter-threshold = '4444T'\n\
file-count-soft-limit = '1P'",
cd
);
assert!(conf.enabled());
assert!(conf.errors.is_empty());
assert_eq!(conf.worker_event_queue_size(), 2_000_000_000);
assert_eq!(
conf.optimized_compression_usage_counter_threshold(),
4_444_000_000_000_000
);
assert_eq!(conf.file_count_soft_limit(), 1_000_000_000_000_000);
// different errors
let conf = load_config!(
cp,
"[cache]\n\
enabled = true\n\
directory = {cache_dir}\n\
worker-event-queue-size = '2g'",
cd
);
assert!(!conf.enabled());
assert!(!conf.errors.is_empty());
let conf = load_config!(
cp,
"[cache]\n\
enabled = true\n\
directory = {cache_dir}\n\
file-count-soft-limit = 1",
cd
);
assert!(!conf.enabled());
assert!(!conf.errors.is_empty());
let conf = load_config!(
cp,
"[cache]\n\
enabled = true\n\
directory = {cache_dir}\n\
file-count-soft-limit = '-31337'",
cd
);
assert!(!conf.enabled());
assert!(!conf.errors.is_empty());
let conf = load_config!(
cp,
"[cache]\n\
enabled = true\n\
directory = {cache_dir}\n\
file-count-soft-limit = '3.14M'",
cd
);
assert!(!conf.enabled());
assert!(!conf.errors.is_empty());
}
#[test]
fn test_disk_space_settings() {
let (_td, cd, cp) = test_prolog();
let conf = load_config!(
cp,
"[cache]\n\
enabled = true\n\
directory = {cache_dir}\n\
files-total-size-soft-limit = '76'",
cd
);
assert!(conf.enabled());
assert!(conf.errors.is_empty());
assert_eq!(conf.files_total_size_soft_limit(), 76);
let conf = load_config!(
cp,
"[cache]\n\
enabled = true\n\
directory = {cache_dir}\n\
files-total-size-soft-limit = '42 Mi'",
cd
);
assert!(conf.enabled());
assert!(conf.errors.is_empty());
assert_eq!(conf.files_total_size_soft_limit(), 42 * (1u64 << 20));
let conf = load_config!(
cp,
"[cache]\n\
enabled = true\n\
directory = {cache_dir}\n\
files-total-size-soft-limit = '2 Gi'",
cd
);
assert!(conf.enabled());
assert!(conf.errors.is_empty());
assert_eq!(conf.files_total_size_soft_limit(), 2 * (1u64 << 30));
let conf = load_config!(
cp,
"[cache]\n\
enabled = true\n\
directory = {cache_dir}\n\
files-total-size-soft-limit = '31337 Ti'",
cd
);
assert!(conf.enabled());
assert!(conf.errors.is_empty());
assert_eq!(conf.files_total_size_soft_limit(), 31337 * (1u64 << 40));
let conf = load_config!(
cp,
"[cache]\n\
enabled = true\n\
directory = {cache_dir}\n\
files-total-size-soft-limit = '7 Pi'",
cd
);
assert!(conf.enabled());
assert!(conf.errors.is_empty());
assert_eq!(conf.files_total_size_soft_limit(), 7 * (1u64 << 50));
let conf = load_config!(
cp,
"[cache]\n\
enabled = true\n\
directory = {cache_dir}\n\
files-total-size-soft-limit = '7M'",
cd
);
assert!(conf.enabled());
assert!(conf.errors.is_empty());
assert_eq!(conf.files_total_size_soft_limit(), 7_000_000);
// different errors
let conf = load_config!(
cp,
"[cache]\n\
enabled = true\n\
directory = {cache_dir}\n\
files-total-size-soft-limit = '7 mi'",
cd
);
assert!(!conf.enabled());
assert!(!conf.errors.is_empty());
let conf = load_config!(
cp,
"[cache]\n\
enabled = true\n\
directory = {cache_dir}\n\
files-total-size-soft-limit = 1",
cd
);
assert!(!conf.enabled());
assert!(!conf.errors.is_empty());
let conf = load_config!(
cp,
"[cache]\n\
enabled = true\n\
directory = {cache_dir}\n\
files-total-size-soft-limit = '-31337'",
cd
);
assert!(!conf.enabled());
assert!(!conf.errors.is_empty());
let conf = load_config!(
cp,
"[cache]\n\
enabled = true\n\
directory = {cache_dir}\n\
files-total-size-soft-limit = '3.14Ki'",
cd
);
assert!(!conf.enabled());
assert!(!conf.errors.is_empty());
}
#[test]
fn test_duration_settings() {
let (_td, cd, cp) = test_prolog();
let conf = load_config!(
cp,
"[cache]\n\
enabled = true\n\
directory = {cache_dir}\n\
cleanup-interval = '100s'\n\
optimizing-compression-task-timeout = '3m'\n\
allowed-clock-drift-for-files-from-future = '4h'",
cd
);
assert!(conf.enabled());
assert!(conf.errors.is_empty());
assert_eq!(conf.cleanup_interval(), Duration::from_secs(100));
assert_eq!(
conf.optimizing_compression_task_timeout(),
Duration::from_secs(3 * 60)
);
assert_eq!(
conf.allowed_clock_drift_for_files_from_future(),
Duration::from_secs(4 * 60 * 60)
);
let conf = load_config!(
cp,
"[cache]\n\
enabled = true\n\
directory = {cache_dir}\n\
cleanup-interval = '2d'\n\
optimizing-compression-task-timeout = '333 m'",
cd
);
assert!(conf.enabled());
assert!(conf.errors.is_empty());
assert_eq!(
conf.cleanup_interval(),
Duration::from_secs(2 * 24 * 60 * 60)
);
assert_eq!(
conf.optimizing_compression_task_timeout(),
Duration::from_secs(333 * 60)
);
// different errors
let conf = load_config!(
cp,
"[cache]\n\
enabled = true\n\
directory = {cache_dir}\n\
optimizing-compression-task-timeout = '333'",
cd
);
assert!(!conf.enabled());
assert!(!conf.errors.is_empty());
let conf = load_config!(
cp,
"[cache]\n\
enabled = true\n\
directory = {cache_dir}\n\
optimizing-compression-task-timeout = 333",
cd
);
assert!(!conf.enabled());
assert!(!conf.errors.is_empty());
let conf = load_config!(
cp,
"[cache]\n\
enabled = true\n\
directory = {cache_dir}\n\
optimizing-compression-task-timeout = '10 M'",
cd
);
assert!(!conf.enabled());
assert!(!conf.errors.is_empty());
let conf = load_config!(
cp,
"[cache]\n\
enabled = true\n\
directory = {cache_dir}\n\
optimizing-compression-task-timeout = '10 min'",
cd
);
assert!(!conf.enabled());
assert!(!conf.errors.is_empty());
let conf = load_config!(
cp,
"[cache]\n\
enabled = true\n\
directory = {cache_dir}\n\
optimizing-compression-task-timeout = '-10s'",
cd
);
assert!(!conf.enabled());
assert!(!conf.errors.is_empty());
let conf = load_config!(
cp,
"[cache]\n\
enabled = true\n\
directory = {cache_dir}\n\
optimizing-compression-task-timeout = '1.5m'",
cd
);
assert!(!conf.enabled());
assert!(!conf.errors.is_empty());
}
#[test]
fn test_percent_settings() {
let (_td, cd, cp) = test_prolog();
let conf = load_config!(
cp,
"[cache]\n\
enabled = true\n\
directory = {cache_dir}\n\
file-count-limit-percent-if-deleting = '62%'\n\
files-total-size-limit-percent-if-deleting = '23 %'",
cd
);
assert!(conf.enabled());
assert!(conf.errors.is_empty());
assert_eq!(conf.file_count_limit_percent_if_deleting(), 62);
assert_eq!(conf.files_total_size_limit_percent_if_deleting(), 23);
// different errors
let conf = load_config!(
cp,
"[cache]\n\
enabled = true\n\
directory = {cache_dir}\n\
files-total-size-limit-percent-if-deleting = '23'",
cd
);
assert!(!conf.enabled());
assert!(!conf.errors.is_empty());
let conf = load_config!(
cp,
"[cache]\n\
enabled = true\n\
directory = {cache_dir}\n\
files-total-size-limit-percent-if-deleting = '22.5%'",
cd
);
assert!(!conf.enabled());
assert!(!conf.errors.is_empty());
let conf = load_config!(
cp,
"[cache]\n\
enabled = true\n\
directory = {cache_dir}\n\
files-total-size-limit-percent-if-deleting = '0.5'",
cd
);
assert!(!conf.enabled());
assert!(!conf.errors.is_empty());
let conf = load_config!(
cp,
"[cache]\n\
enabled = true\n\
directory = {cache_dir}\n\
files-total-size-limit-percent-if-deleting = '-1%'",
cd
);
assert!(!conf.enabled());
assert!(!conf.errors.is_empty());
let conf = load_config!(
cp,
"[cache]\n\
enabled = true\n\
directory = {cache_dir}\n\
files-total-size-limit-percent-if-deleting = '101%'",
cd
);
assert!(!conf.enabled());
assert!(!conf.errors.is_empty());
}

354
crates/environ/src/cache/tests.rs vendored Normal file
View File

@@ -0,0 +1,354 @@
use super::config::tests::test_prolog;
use super::*;
use crate::address_map::{FunctionAddressMap, InstructionAddressMap};
use crate::compilation::{CompiledFunction, Relocation, RelocationTarget, TrapInformation};
use crate::module::{MemoryPlan, MemoryStyle, Module};
use alloc::boxed::Box;
use alloc::vec::Vec;
use core::cmp::min;
use cranelift_codegen::{binemit, ir, isa, settings, ValueLocRange};
use cranelift_entity::EntityRef;
use cranelift_entity::{PrimaryMap, SecondaryMap};
use cranelift_wasm::{DefinedFuncIndex, FuncIndex, Global, GlobalInit, Memory, SignatureIndex};
use rand::rngs::SmallRng;
use rand::{Rng, SeedableRng};
use std::fs;
use std::str::FromStr;
use target_lexicon::triple;
// Since cache system is a global thing, each test needs to be run in seperate process.
// So, init() tests are run as integration tests.
// However, caching is a private thing, an implementation detail, and needs to be tested
// from the inside of the module.
// We test init() in exactly one test, rest of the tests doesn't rely on it.
#[test]
fn test_cache_init() {
let (_tempdir, cache_dir, config_path) = test_prolog();
let baseline_compression_level = 4;
let config_content = format!(
"[cache]\n\
enabled = true\n\
directory = {}\n\
baseline-compression-level = {}\n",
toml::to_string_pretty(&format!("{}", cache_dir.display())).unwrap(),
baseline_compression_level,
);
fs::write(&config_path, config_content).expect("Failed to write test config file");
let errors = init(true, Some(&config_path), None);
assert!(errors.is_empty());
// test if we can use config
let cache_config = cache_config();
assert!(cache_config.enabled());
// assumption: config init creates cache directory and returns canonicalized path
assert_eq!(
*cache_config.directory(),
fs::canonicalize(cache_dir).unwrap()
);
assert_eq!(
cache_config.baseline_compression_level(),
baseline_compression_level
);
// test if we can use worker
let worker = worker();
worker.on_cache_update_async(config_path);
}
#[test]
fn test_write_read_cache() {
let (_tempdir, cache_dir, config_path) = test_prolog();
let cache_config = load_config!(
config_path,
"[cache]\n\
enabled = true\n\
directory = {cache_dir}\n\
baseline-compression-level = 3\n",
cache_dir
);
assert!(cache_config.enabled());
let worker = Worker::start_new(&cache_config, None);
// assumption: config load creates cache directory and returns canonicalized path
assert_eq!(
*cache_config.directory(),
fs::canonicalize(cache_dir).unwrap()
);
let mut rng = SmallRng::from_seed([
0x42, 0x04, 0xF3, 0x44, 0x11, 0x22, 0x33, 0x44, 0x67, 0x68, 0xFF, 0x00, 0x44, 0x23, 0x7F,
0x96,
]);
let mut code_container = Vec::new();
code_container.resize(0x4000, 0);
rng.fill(&mut code_container[..]);
let isa1 = new_isa("riscv64-unknown-unknown");
let isa2 = new_isa("i386");
let module1 = new_module(&mut rng);
let module2 = new_module(&mut rng);
let function_body_inputs1 = new_function_body_inputs(&mut rng, &code_container);
let function_body_inputs2 = new_function_body_inputs(&mut rng, &code_container);
let compiler1 = "test-1";
let compiler2 = "test-2";
let entry1 = ModuleCacheEntry::from_inner(ModuleCacheEntryInner::new(
&module1,
&function_body_inputs1,
&*isa1,
compiler1,
false,
&cache_config,
&worker,
));
assert!(entry1.0.is_some());
assert!(entry1.get_data().is_none());
let data1 = new_module_cache_data(&mut rng);
entry1.update_data(&data1);
assert_eq!(entry1.get_data().expect("Cache should be available"), data1);
let entry2 = ModuleCacheEntry::from_inner(ModuleCacheEntryInner::new(
&module2,
&function_body_inputs1,
&*isa1,
compiler1,
false,
&cache_config,
&worker,
));
let data2 = new_module_cache_data(&mut rng);
entry2.update_data(&data2);
assert_eq!(entry1.get_data().expect("Cache should be available"), data1);
assert_eq!(entry2.get_data().expect("Cache should be available"), data2);
let entry3 = ModuleCacheEntry::from_inner(ModuleCacheEntryInner::new(
&module1,
&function_body_inputs2,
&*isa1,
compiler1,
false,
&cache_config,
&worker,
));
let data3 = new_module_cache_data(&mut rng);
entry3.update_data(&data3);
assert_eq!(entry1.get_data().expect("Cache should be available"), data1);
assert_eq!(entry2.get_data().expect("Cache should be available"), data2);
assert_eq!(entry3.get_data().expect("Cache should be available"), data3);
let entry4 = ModuleCacheEntry::from_inner(ModuleCacheEntryInner::new(
&module1,
&function_body_inputs1,
&*isa2,
compiler1,
false,
&cache_config,
&worker,
));
let data4 = new_module_cache_data(&mut rng);
entry4.update_data(&data4);
assert_eq!(entry1.get_data().expect("Cache should be available"), data1);
assert_eq!(entry2.get_data().expect("Cache should be available"), data2);
assert_eq!(entry3.get_data().expect("Cache should be available"), data3);
assert_eq!(entry4.get_data().expect("Cache should be available"), data4);
let entry5 = ModuleCacheEntry::from_inner(ModuleCacheEntryInner::new(
&module1,
&function_body_inputs1,
&*isa1,
compiler2,
false,
&cache_config,
&worker,
));
let data5 = new_module_cache_data(&mut rng);
entry5.update_data(&data5);
assert_eq!(entry1.get_data().expect("Cache should be available"), data1);
assert_eq!(entry2.get_data().expect("Cache should be available"), data2);
assert_eq!(entry3.get_data().expect("Cache should be available"), data3);
assert_eq!(entry4.get_data().expect("Cache should be available"), data4);
assert_eq!(entry5.get_data().expect("Cache should be available"), data5);
let data6 = new_module_cache_data(&mut rng);
entry1.update_data(&data6);
assert_eq!(entry1.get_data().expect("Cache should be available"), data6);
assert_eq!(entry2.get_data().expect("Cache should be available"), data2);
assert_eq!(entry3.get_data().expect("Cache should be available"), data3);
assert_eq!(entry4.get_data().expect("Cache should be available"), data4);
assert_eq!(entry5.get_data().expect("Cache should be available"), data5);
assert!(data1 != data2 && data1 != data3 && data1 != data4 && data1 != data5 && data1 != data6);
}
fn new_isa(name: &str) -> Box<dyn isa::TargetIsa> {
let shared_builder = settings::builder();
let shared_flags = settings::Flags::new(shared_builder);
isa::lookup(triple!(name))
.expect("can't find specified isa")
.finish(shared_flags)
}
fn new_module(rng: &mut impl Rng) -> Module {
// There are way too many fields. Just fill in some of them.
let mut m = Module::new();
if rng.gen_bool(0.5) {
m.signatures.push(ir::Signature {
params: vec![],
returns: vec![],
call_conv: isa::CallConv::Fast,
});
}
for i in 0..rng.gen_range(1, 0x8) {
m.functions.push(SignatureIndex::new(i));
}
if rng.gen_bool(0.8) {
m.memory_plans.push(MemoryPlan {
memory: Memory {
minimum: rng.gen(),
maximum: rng.gen(),
shared: rng.gen(),
},
style: MemoryStyle::Dynamic,
offset_guard_size: rng.gen(),
});
}
if rng.gen_bool(0.4) {
m.globals.push(Global {
ty: ir::Type::int(16).unwrap(),
mutability: rng.gen(),
initializer: GlobalInit::I32Const(rng.gen()),
});
}
m
}
fn new_function_body_inputs<'data>(
rng: &mut impl Rng,
code_container: &'data Vec<u8>,
) -> PrimaryMap<DefinedFuncIndex, FunctionBodyData<'data>> {
let len = code_container.len();
let mut pos = rng.gen_range(0, code_container.len());
(2..rng.gen_range(4, 14))
.map(|j| {
let (old_pos, end) = (pos, min(pos + rng.gen_range(0x10, 0x200), len));
pos = end % len;
FunctionBodyData {
data: &code_container[old_pos..end],
module_offset: (rng.next_u64() + j) as usize,
}
})
.collect()
}
fn new_module_cache_data(rng: &mut impl Rng) -> ModuleCacheData {
let funcs = (0..rng.gen_range(0, 10))
.map(|i| {
let mut sm = SecondaryMap::new(); // doesn't implement from iterator
sm.resize(i as usize * 2);
sm.values_mut().enumerate().for_each(|(j, v)| {
if rng.gen_bool(0.33) {
*v = (j as u32) * 3 / 4
}
});
CompiledFunction {
body: (0..(i * 3 / 2)).collect(),
jt_offsets: sm,
unwind_info: (0..(i * 3 / 2)).collect(),
}
})
.collect();
let relocs = (0..rng.gen_range(1, 0x10))
.map(|i| {
vec![
Relocation {
reloc: binemit::Reloc::X86CallPCRel4,
reloc_target: RelocationTarget::UserFunc(FuncIndex::new(i as usize * 42)),
offset: i + rng.next_u32(),
addend: 0,
},
Relocation {
reloc: binemit::Reloc::Arm32Call,
reloc_target: RelocationTarget::LibCall(ir::LibCall::CeilF64),
offset: rng.gen_range(4, i + 55),
addend: (42 * i) as i64,
},
]
})
.collect();
let trans = (4..rng.gen_range(4, 0x10))
.map(|i| FunctionAddressMap {
instructions: vec![InstructionAddressMap {
srcloc: ir::SourceLoc::new(rng.gen()),
code_offset: rng.gen(),
code_len: i,
}],
start_srcloc: ir::SourceLoc::new(rng.gen()),
end_srcloc: ir::SourceLoc::new(rng.gen()),
body_offset: rng.gen(),
body_len: 0x31337,
})
.collect();
let value_ranges = (4..rng.gen_range(4, 0x10))
.map(|i| {
(i..i + rng.gen_range(4, 8))
.map(|k| {
(
ir::ValueLabel::new(k),
(0..rng.gen_range(0, 4))
.map(|_| ValueLocRange {
loc: ir::ValueLoc::Reg(rng.gen()),
start: rng.gen(),
end: rng.gen(),
})
.collect(),
)
})
.collect()
})
.collect();
let stack_slots = (0..rng.gen_range(0, 0x6))
.map(|_| {
let mut slots = ir::StackSlots::new();
slots.push(ir::StackSlotData {
kind: ir::StackSlotKind::SpillSlot,
size: rng.gen(),
offset: rng.gen(),
});
slots.frame_size = rng.gen();
slots
})
.collect();
let traps = (0..rng.gen_range(0, 0xd))
.map(|i| {
((i..i + rng.gen_range(0, 4))
.map(|_| TrapInformation {
code_offset: rng.gen(),
source_loc: ir::SourceLoc::new(rng.gen()),
trap_code: ir::TrapCode::StackOverflow,
})
.collect())
})
.collect();
ModuleCacheData::from_tuple((
Compilation::new(funcs),
relocs,
trans,
value_ranges,
stack_slots,
traps,
))
}

912
crates/environ/src/cache/worker.rs vendored Normal file
View File

@@ -0,0 +1,912 @@
//! Background worker that watches over the cache.
//!
//! It cleans up old cache, updates statistics and optimizes the cache.
//! We allow losing some messages (it doesn't hurt) and some races,
//! but we guarantee eventual consistency and fault tolerancy.
//! Background tasks can be CPU intensive, but the worker thread has low priority.
use super::{cache_config, fs_write_atomic, CacheConfig};
use alloc::vec::Vec;
use core::cmp;
use core::time::Duration;
use log::{debug, info, trace, warn};
use serde::{Deserialize, Serialize};
use spin::Once;
use std::collections::HashMap;
use std::ffi::OsStr;
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::atomic::{self, AtomicBool};
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
#[cfg(test)]
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
#[cfg(not(test))]
use std::time::SystemTime;
#[cfg(test)]
use tests::system_time_stub::SystemTimeStub as SystemTime;
pub(super) struct Worker {
sender: SyncSender<CacheEvent>,
#[cfg(test)]
stats: Arc<(Mutex<WorkerStats>, Condvar)>,
}
struct WorkerThread {
receiver: Receiver<CacheEvent>,
cache_config: CacheConfig,
#[cfg(test)]
stats: Arc<(Mutex<WorkerStats>, Condvar)>,
}
#[cfg(test)]
#[derive(Default)]
struct WorkerStats {
dropped: u32,
sent: u32,
handled: u32,
}
static WORKER: Once<Worker> = Once::new();
static INIT_CALLED: AtomicBool = AtomicBool::new(false);
pub(super) fn worker() -> &'static Worker {
WORKER
.r#try()
.expect("Cache worker must be initialized before usage")
}
pub(super) fn init(init_file_per_thread_logger: Option<&'static str>) {
INIT_CALLED
.compare_exchange(
false,
true,
atomic::Ordering::SeqCst,
atomic::Ordering::SeqCst,
)
.expect("Cache worker init must be called at most once");
let worker = Worker::start_new(cache_config(), init_file_per_thread_logger);
WORKER.call_once(|| worker);
}
#[derive(Debug, Clone)]
enum CacheEvent {
OnCacheGet(PathBuf),
OnCacheUpdate(PathBuf),
}
impl Worker {
pub(super) fn start_new(
cache_config: &CacheConfig,
init_file_per_thread_logger: Option<&'static str>,
) -> Self {
let queue_size = match cache_config.worker_event_queue_size() {
num if num <= usize::max_value() as u64 => num as usize,
_ => usize::max_value(),
};
let (tx, rx) = sync_channel(queue_size);
#[cfg(test)]
let stats = Arc::new((Mutex::new(WorkerStats::default()), Condvar::new()));
let worker_thread = WorkerThread {
receiver: rx,
cache_config: cache_config.clone(),
#[cfg(test)]
stats: stats.clone(),
};
// when self is dropped, sender will be dropped, what will cause the channel
// to hang, and the worker thread to exit -- it happens in the tests
// non-tests binary has only a static worker, so Rust doesn't drop it
thread::spawn(move || worker_thread.run(init_file_per_thread_logger));
Self {
sender: tx,
#[cfg(test)]
stats,
}
}
pub(super) fn on_cache_get_async(&self, path: impl AsRef<Path>) {
let event = CacheEvent::OnCacheGet(path.as_ref().to_path_buf());
self.send_cache_event(event);
}
pub(super) fn on_cache_update_async(&self, path: impl AsRef<Path>) {
let event = CacheEvent::OnCacheUpdate(path.as_ref().to_path_buf());
self.send_cache_event(event);
}
#[inline]
fn send_cache_event(&self, event: CacheEvent) {
#[cfg(test)]
let mut stats = self
.stats
.0
.lock()
.expect("Failed to acquire worker stats lock");
match self.sender.try_send(event.clone()) {
Ok(()) => {
#[cfg(test)]
let _ = stats.sent += 1;
}
Err(err) => {
info!(
"Failed to send asynchronously message to worker thread, \
event: {:?}, error: {}",
event, err
);
#[cfg(test)]
let _ = stats.dropped += 1;
}
}
}
#[cfg(test)]
pub(super) fn events_dropped(&self) -> u32 {
let stats = self
.stats
.0
.lock()
.expect("Failed to acquire worker stats lock");
stats.dropped
}
#[cfg(test)]
pub(super) fn wait_for_all_events_handled(&self) {
let (stats, condvar) = &*self.stats;
let mut stats = stats.lock().expect("Failed to acquire worker stats lock");
while stats.handled != stats.sent {
stats = condvar
.wait(stats)
.expect("Failed to reacquire worker stats lock");
}
}
}
#[derive(Serialize, Deserialize)]
struct ModuleCacheStatistics {
pub usages: u64,
#[serde(rename = "optimized-compression")]
pub compression_level: i32,
}
impl ModuleCacheStatistics {
fn default(cache_config: &CacheConfig) -> Self {
Self {
usages: 0,
compression_level: cache_config.baseline_compression_level(),
}
}
}
enum CacheEntry {
Recognized {
path: PathBuf,
mtime: SystemTime,
size: u64,
},
Unrecognized {
path: PathBuf,
is_dir: bool,
},
}
impl WorkerThread {
fn run(self, init_file_per_thread_logger: Option<&'static str>) {
#[cfg(not(test))] // We want to test the worker without relying on init() being called
assert!(INIT_CALLED.load(atomic::Ordering::SeqCst));
if let Some(prefix) = init_file_per_thread_logger {
file_per_thread_logger::initialize(prefix);
}
debug!("Cache worker thread started.");
Self::lower_thread_priority();
#[cfg(test)]
let (stats, condvar) = &*self.stats;
for event in self.receiver.iter() {
match event {
CacheEvent::OnCacheGet(path) => self.handle_on_cache_get(path),
CacheEvent::OnCacheUpdate(path) => self.handle_on_cache_update(path),
}
#[cfg(test)]
{
let mut stats = stats.lock().expect("Failed to acquire worker stats lock");
stats.handled += 1;
condvar.notify_all();
}
}
// The receiver can stop iteration iff the channel has hung up.
// The channel will hung when sender is dropped. It only happens in tests.
// In non-test case we have static worker and Rust doesn't drop static variables.
#[cfg(not(test))]
unreachable!()
}
#[cfg(target_os = "windows")]
fn lower_thread_priority() {
use core::convert::TryInto;
use winapi::um::processthreadsapi::{GetCurrentThread, SetThreadPriority};
use winapi::um::winbase::THREAD_MODE_BACKGROUND_BEGIN;
// https://docs.microsoft.com/en-us/windows/win32/api/processthreadsapi/nf-processthreadsapi-setthreadpriority
// https://docs.microsoft.com/en-us/windows/win32/procthread/scheduling-priorities
if unsafe {
SetThreadPriority(
GetCurrentThread(),
THREAD_MODE_BACKGROUND_BEGIN.try_into().unwrap(),
)
} == 0
{
warn!(
"Failed to lower worker thread priority. It might affect application performance."
);
}
}
#[cfg(not(target_os = "windows"))]
fn lower_thread_priority() {
// http://man7.org/linux/man-pages/man7/sched.7.html
const NICE_DELTA_FOR_BACKGROUND_TASKS: i32 = 3;
errno::set_errno(errno::Errno(0));
let current_nice = unsafe { libc::nice(NICE_DELTA_FOR_BACKGROUND_TASKS) };
let errno_val = errno::errno().0;
if errno_val != 0 {
warn!("Failed to lower worker thread priority. It might affect application performance. errno: {}", errno_val);
} else {
debug!("New nice value of worker thread: {}", current_nice);
}
}
/// Increases the usage counter and recompresses the file
/// if the usage counter reached configurable treshold.
fn handle_on_cache_get(&self, path: PathBuf) {
trace!("handle_on_cache_get() for path: {}", path.display());
// construct .stats file path
let filename = path.file_name().unwrap().to_str().unwrap();
let stats_path = path.with_file_name(format!("{}.stats", filename));
// load .stats file (default if none or error)
let mut stats = read_stats_file(stats_path.as_ref())
.unwrap_or_else(|| ModuleCacheStatistics::default(&self.cache_config));
// step 1: update the usage counter & write to the disk
// it's racy, but it's fine (the counter will be just smaller,
// sometimes will retrigger recompression)
stats.usages += 1;
if !write_stats_file(stats_path.as_ref(), &stats) {
return;
}
// step 2: recompress if there's a need
let opt_compr_lvl = self.cache_config.optimized_compression_level();
if stats.compression_level >= opt_compr_lvl
|| stats.usages
< self
.cache_config
.optimized_compression_usage_counter_threshold()
{
return;
}
let lock_path = if let Some(p) = acquire_task_fs_lock(
path.as_ref(),
self.cache_config.optimizing_compression_task_timeout(),
self.cache_config
.allowed_clock_drift_for_files_from_future(),
) {
p
} else {
return;
};
trace!("Trying to recompress file: {}", path.display());
// recompress, write to other file, rename (it's atomic file content exchange)
// and update the stats file
fs::read(&path)
.map_err(|err| {
warn!(
"Failed to read old cache file, path: {}, err: {}",
path.display(),
err
)
})
.ok()
.and_then(|compressed_cache_bytes| {
zstd::decode_all(&compressed_cache_bytes[..])
.map_err(|err| warn!("Failed to decompress cached code: {}", err))
.ok()
})
.and_then(|cache_bytes| {
zstd::encode_all(
&cache_bytes[..],
opt_compr_lvl,
)
.map_err(|err| warn!("Failed to compress cached code: {}", err))
.ok()
})
.and_then(|recompressed_cache_bytes| {
fs::write(&lock_path, &recompressed_cache_bytes)
.map_err(|err| {
warn!(
"Failed to write recompressed cache, path: {}, err: {}",
lock_path.display(),
err
)
})
.ok()
})
.and_then(|()| {
fs::rename(&lock_path, &path)
.map_err(|err| {
warn!(
"Failed to rename recompressed cache, path from: {}, path to: {}, err: {}",
lock_path.display(),
path.display(),
err
);
if let Err(err) = fs::remove_file(&lock_path) {
warn!(
"Failed to clean up (remove) recompressed cache, path {}, err: {}",
lock_path.display(),
err
);
}
})
.ok()
})
.map(|()| {
// update stats file (reload it! recompression can take some time)
if let Some(mut new_stats) = read_stats_file(stats_path.as_ref()) {
if new_stats.compression_level >= opt_compr_lvl {
// Rare race:
// two instances with different opt_compr_lvl: we don't know in which order they updated
// the cache file and the stats file (they are not updated together atomically)
// Possible solution is to use directories per cache entry, but it complicates the system
// and is not worth it.
debug!("DETECTED task did more than once (or race with new file): recompression of {}. \
Note: if optimized compression level setting has changed in the meantine, \
the stats file might contain inconsistent compression level due to race.", path.display());
}
else {
new_stats.compression_level = opt_compr_lvl;
let _ = write_stats_file(stats_path.as_ref(), &new_stats);
}
if new_stats.usages < stats.usages {
debug!("DETECTED lower usage count (new file or race with counter increasing): file {}", path.display());
}
}
else {
debug!("Can't read stats file again to update compression level (it might got cleaned up): file {}", stats_path.display());
}
});
trace!("Task finished: recompress file: {}", path.display());
}
fn handle_on_cache_update(&self, path: PathBuf) {
trace!("handle_on_cache_update() for path: {}", path.display());
// ---------------------- step 1: create .stats file
// construct .stats file path
let filename = path
.file_name()
.expect("Expected valid cache file name")
.to_str()
.expect("Expected valid cache file name");
let stats_path = path.with_file_name(format!("{}.stats", filename));
// create and write stats file
let mut stats = ModuleCacheStatistics::default(&self.cache_config);
stats.usages += 1;
write_stats_file(&stats_path, &stats);
// ---------------------- step 2: perform cleanup task if needed
// acquire lock for cleanup task
// Lock is a proof of recent cleanup task, so we don't want to delete them.
// Expired locks will be deleted by the cleanup task.
let cleanup_file = self.cache_config.directory().join(".cleanup"); // some non existing marker file
if acquire_task_fs_lock(
&cleanup_file,
self.cache_config.cleanup_interval(),
self.cache_config
.allowed_clock_drift_for_files_from_future(),
)
.is_none()
{
return;
}
trace!("Trying to clean up cache");
let mut cache_index = self.list_cache_contents();
let future_tolerance = SystemTime::now()
.checked_add(
self.cache_config
.allowed_clock_drift_for_files_from_future(),
)
.expect("Brace your cache, the next Big Bang is coming (time overflow)");
cache_index.sort_unstable_by(|lhs, rhs| {
// sort by age
use CacheEntry::*;
match (lhs, rhs) {
(Recognized { mtime: lhs_mt, .. }, Recognized { mtime: rhs_mt, .. }) => {
match (*lhs_mt > future_tolerance, *rhs_mt > future_tolerance) {
// later == younger
(false, false) => rhs_mt.cmp(lhs_mt),
// files from far future are treated as oldest recognized files
// we want to delete them, so the cache keeps track of recent files
// however, we don't delete them uncodintionally,
// because .stats file can be overwritten with a meaningful mtime
(true, false) => cmp::Ordering::Greater,
(false, true) => cmp::Ordering::Less,
(true, true) => cmp::Ordering::Equal,
}
}
// unrecognized is kind of infinity
(Recognized { .. }, Unrecognized { .. }) => cmp::Ordering::Less,
(Unrecognized { .. }, Recognized { .. }) => cmp::Ordering::Greater,
(Unrecognized { .. }, Unrecognized { .. }) => cmp::Ordering::Equal,
}
});
// find "cut" boundary:
// - remove unrecognized files anyway,
// - remove some cache files if some quota has been exceeded
let mut total_size = 0u64;
let mut start_delete_idx = None;
let mut start_delete_idx_if_deleting_recognized_items: Option<usize> = None;
let total_size_limit = self.cache_config.files_total_size_soft_limit();
let file_count_limit = self.cache_config.file_count_soft_limit();
let tsl_if_deleting = total_size_limit
.checked_mul(
self.cache_config
.files_total_size_limit_percent_if_deleting() as u64,
)
.unwrap()
/ 100;
let fcl_if_deleting = file_count_limit
.checked_mul(self.cache_config.file_count_limit_percent_if_deleting() as u64)
.unwrap()
/ 100;
for (idx, item) in cache_index.iter().enumerate() {
let size = if let CacheEntry::Recognized { size, .. } = item {
size
} else {
start_delete_idx = Some(idx);
break;
};
total_size += size;
if start_delete_idx_if_deleting_recognized_items.is_none() {
if total_size > tsl_if_deleting || (idx + 1) as u64 > fcl_if_deleting {
start_delete_idx_if_deleting_recognized_items = Some(idx);
}
}
if total_size > total_size_limit || (idx + 1) as u64 > file_count_limit {
start_delete_idx = start_delete_idx_if_deleting_recognized_items;
break;
}
}
if let Some(idx) = start_delete_idx {
for item in &cache_index[idx..] {
let (result, path, entity) = match item {
CacheEntry::Recognized { path, .. }
| CacheEntry::Unrecognized {
path,
is_dir: false,
} => (fs::remove_file(path), path, "file"),
CacheEntry::Unrecognized { path, is_dir: true } => {
(fs::remove_dir_all(path), path, "directory")
}
};
if let Err(err) = result {
warn!(
"Failed to remove {} during cleanup, path: {}, err: {}",
entity,
path.display(),
err
);
}
}
}
trace!("Task finished: clean up cache");
}
// Be fault tolerant: list as much as you can, and ignore the rest
fn list_cache_contents(&self) -> Vec<CacheEntry> {
fn enter_dir(
vec: &mut Vec<CacheEntry>,
dir_path: &Path,
level: u8,
cache_config: &CacheConfig,
) {
macro_rules! unwrap_or {
($result:expr, $cont:stmt, $err_msg:expr) => {
unwrap_or!($result, $cont, $err_msg, dir_path)
};
($result:expr, $cont:stmt, $err_msg:expr, $path:expr) => {
match $result {
Ok(val) => val,
Err(err) => {
warn!(
"{}, level: {}, path: {}, msg: {}",
$err_msg,
level,
$path.display(),
err
);
$cont
}
}
};
}
macro_rules! add_unrecognized {
(file: $path:expr) => {
add_unrecognized!(false, $path)
};
(dir: $path:expr) => {
add_unrecognized!(true, $path)
};
($is_dir:expr, $path:expr) => {
vec.push(CacheEntry::Unrecognized {
path: $path.to_path_buf(),
is_dir: $is_dir,
});
};
}
macro_rules! add_unrecognized_and {
([ $( $ty:ident: $path:expr ),* ], $cont:stmt) => {{
$( add_unrecognized!($ty: $path); )*
$cont
}};
}
// If we fail to list a directory, something bad is happening anyway
// (something touches our cache or we have disk failure)
// Try to delete it, so we can stay within soft limits of the cache size.
// This comment applies later in this function, too.
let it = unwrap_or!(
fs::read_dir(dir_path),
add_unrecognized_and!([dir: dir_path], return),
"Failed to list cache directory, deleting it"
);
let mut cache_files = HashMap::new();
for entry in it {
// read_dir() returns an iterator over results - in case some of them are errors
// we don't know their names, so we can't delete them. We don't want to delete
// the whole directory with good entries too, so we just ignore the erroneous entries.
let entry = unwrap_or!(
entry,
continue,
"Failed to read a cache dir entry (NOT deleting it, it still occupies space)"
);
let path = entry.path();
match (level, path.is_dir()) {
(0..=1, true) => enter_dir(vec, &path, level + 1, cache_config),
(0..=1, false) => {
if level == 0 && path.file_stem() == Some(OsStr::new(".cleanup")) {
if path.extension().is_some() {
// assume it's cleanup lock
if !is_fs_lock_expired(
Some(&entry),
&path,
cache_config.cleanup_interval(),
cache_config.allowed_clock_drift_for_files_from_future(),
) {
continue; // skip active lock
}
}
}
add_unrecognized!(file: path);
}
(2, false) => {
let ext = path.extension();
if ext.is_none() || ext == Some(OsStr::new("stats")) {
// mod or stats file
cache_files.insert(path, entry);
} else {
let recognized = if let Some(ext_str) = ext.unwrap().to_str() {
// check if valid lock
ext_str.starts_with("wip-")
&& !is_fs_lock_expired(
Some(&entry),
&path,
cache_config.optimizing_compression_task_timeout(),
cache_config.allowed_clock_drift_for_files_from_future(),
)
} else {
// if it's None, i.e. not valid UTF-8 string, then that's not our lock for sure
false
};
if !recognized {
add_unrecognized!(file: path);
}
}
}
(_, is_dir) => add_unrecognized!(is_dir, path),
}
}
// associate module with its stats & handle them
// assumption: just mods and stats
for (path, entry) in cache_files.iter() {
let path_buf: PathBuf;
let (mod_, stats_, is_mod) = match path.extension() {
Some(_) => {
path_buf = path.with_extension("");
(
cache_files.get(&path_buf).map(|v| (&path_buf, v)),
Some((path, entry)),
false,
)
}
None => {
path_buf = path.with_extension("stats");
(
Some((path, entry)),
cache_files.get(&path_buf).map(|v| (&path_buf, v)),
true,
)
}
};
// construct a cache entry
match (mod_, stats_, is_mod) {
(Some((mod_path, mod_entry)), Some((stats_path, stats_entry)), true) => {
let mod_metadata = unwrap_or!(
mod_entry.metadata(),
add_unrecognized_and!([file: stats_path, file: mod_path], continue),
"Failed to get metadata, deleting BOTH module cache and stats files",
mod_path
);
let stats_mtime = unwrap_or!(
stats_entry.metadata().and_then(|m| m.modified()),
add_unrecognized_and!(
[file: stats_path],
unwrap_or!(
mod_metadata.modified(),
add_unrecognized_and!([file: stats_path, file: mod_path], continue),
"Failed to get mtime, deleting BOTH module cache and stats files",
mod_path
)
),
"Failed to get metadata/mtime, deleting the file",
stats_path
);
// .into() called for the SystemTimeStub if cfg(test)
#[allow(clippy::identity_conversion)]
vec.push(CacheEntry::Recognized {
path: mod_path.to_path_buf(),
mtime: stats_mtime.into(),
size: mod_metadata.len(),
})
}
(Some(_), Some(_), false) => (), // was or will be handled by previous branch
(Some((mod_path, mod_entry)), None, _) => {
let (mod_metadata, mod_mtime) = unwrap_or!(
mod_entry
.metadata()
.and_then(|md| md.modified().map(|mt| (md, mt))),
add_unrecognized_and!([file: mod_path], continue),
"Failed to get metadata/mtime, deleting the file",
mod_path
);
// .into() called for the SystemTimeStub if cfg(test)
#[allow(clippy::identity_conversion)]
vec.push(CacheEntry::Recognized {
path: mod_path.to_path_buf(),
mtime: mod_mtime.into(),
size: mod_metadata.len(),
})
}
(None, Some((stats_path, _stats_entry)), _) => {
debug!("Found orphaned stats file: {}", stats_path.display());
add_unrecognized!(file: stats_path);
}
_ => unreachable!(),
}
}
}
let mut vec = Vec::new();
enter_dir(
&mut vec,
self.cache_config.directory(),
0,
&self.cache_config,
);
vec
}
}
fn read_stats_file(path: &Path) -> Option<ModuleCacheStatistics> {
fs::read(path)
.map_err(|err| {
trace!(
"Failed to read stats file, path: {}, err: {}",
path.display(),
err
)
})
.and_then(|bytes| {
toml::from_slice::<ModuleCacheStatistics>(&bytes[..]).map_err(|err| {
trace!(
"Failed to parse stats file, path: {}, err: {}",
path.display(),
err,
)
})
})
.ok()
}
fn write_stats_file(path: &Path, stats: &ModuleCacheStatistics) -> bool {
toml::to_string_pretty(&stats)
.map_err(|err| {
warn!(
"Failed to serialize stats file, path: {}, err: {}",
path.display(),
err
)
})
.and_then(|serialized| {
if fs_write_atomic(path, "stats", serialized.as_bytes()) {
Ok(())
} else {
Err(())
}
})
.is_ok()
}
/// Tries to acquire a lock for specific task.
///
/// Returns Some(path) to the lock if succeeds. The task path must not
/// contain any extension and have file stem.
///
/// To release a lock you need either manually rename or remove it,
/// or wait until it expires and cleanup task removes it.
///
/// Note: this function is racy. Main idea is: be fault tolerant and
/// never block some task. The price is that we rarely do some task
/// more than once.
fn acquire_task_fs_lock(
task_path: &Path,
timeout: Duration,
allowed_future_drift: Duration,
) -> Option<PathBuf> {
assert!(task_path.extension().is_none());
assert!(task_path.file_stem().is_some());
// list directory
let dir_path = task_path.parent()?;
let it = fs::read_dir(dir_path)
.map_err(|err| {
warn!(
"Failed to list cache directory, path: {}, err: {}",
dir_path.display(),
err
)
})
.ok()?;
// look for existing locks
for entry in it {
let entry = entry
.map_err(|err| {
warn!(
"Failed to list cache directory, path: {}, err: {}",
dir_path.display(),
err
)
})
.ok()?;
let path = entry.path();
if path.is_dir() || path.file_stem() != task_path.file_stem() {
continue;
}
// check extension and mtime
match path.extension() {
None => continue,
Some(ext) => {
if let Some(ext_str) = ext.to_str() {
// if it's None, i.e. not valid UTF-8 string, then that's not our lock for sure
if ext_str.starts_with("wip-")
&& !is_fs_lock_expired(Some(&entry), &path, timeout, allowed_future_drift)
{
return None;
}
}
}
}
}
// create the lock
let lock_path = task_path.with_extension(format!("wip-{}", std::process::id()));
let _file = fs::OpenOptions::new()
.create_new(true)
.write(true)
.open(&lock_path)
.map_err(|err| {
warn!(
"Failed to create lock file (note: it shouldn't exists): path: {}, err: {}",
lock_path.display(),
err
)
})
.ok()?;
Some(lock_path)
}
// we have either both, or just path; dir entry is desirable since on some platforms we can get
// metadata without extra syscalls
// futhermore: it's better to get a path if we have it instead of allocating a new one from the dir entry
fn is_fs_lock_expired(
entry: Option<&fs::DirEntry>,
path: &PathBuf,
threshold: Duration,
allowed_future_drift: Duration,
) -> bool {
let mtime = match entry
.map_or_else(|| path.metadata(), |e| e.metadata())
.and_then(|metadata| metadata.modified())
{
Ok(mt) => mt,
Err(err) => {
warn!(
"Failed to get metadata/mtime, treating as an expired lock, path: {}, err: {}",
path.display(),
err
);
return true; // can't read mtime, treat as expired, so this task will not be starved
}
};
// DON'T use: mtime.elapsed() -- we must call SystemTime directly for the tests to be deterministic
match SystemTime::now().duration_since(mtime) {
Ok(elapsed) => elapsed >= threshold,
Err(err) => {
trace!(
"Found mtime in the future, treating as a not expired lock, path: {}, err: {}",
path.display(),
err
);
// the lock is expired if the time is too far in the future
// it is fine to have network share and not synchronized clocks,
// but it's not good when user changes time in their system clock
err.duration() > allowed_future_drift
}
}
}
#[cfg(test)]
mod tests;

758
crates/environ/src/cache/worker/tests.rs vendored Normal file
View File

@@ -0,0 +1,758 @@
use super::*;
use crate::cache::config::tests::test_prolog;
use core::iter::repeat;
use std::process;
// load_config! comes from crate::cache(::config::tests);
// when doing anything with the tests, make sure they are DETERMINISTIC
// -- the result shouldn't rely on system time!
pub mod system_time_stub;
#[test]
fn test_on_get_create_stats_file() {
let (_tempdir, cache_dir, config_path) = test_prolog();
let cache_config = load_config!(
config_path,
"[cache]\n\
enabled = true\n\
directory = {cache_dir}",
cache_dir
);
assert!(cache_config.enabled());
let worker = Worker::start_new(&cache_config, None);
let mod_file = cache_dir.join("some-mod");
worker.on_cache_get_async(mod_file);
worker.wait_for_all_events_handled();
assert_eq!(worker.events_dropped(), 0);
let stats_file = cache_dir.join("some-mod.stats");
let stats = read_stats_file(&stats_file).expect("Failed to read stats file");
assert_eq!(stats.usages, 1);
assert_eq!(
stats.compression_level,
cache_config.baseline_compression_level()
);
}
#[test]
fn test_on_get_update_usage_counter() {
let (_tempdir, cache_dir, config_path) = test_prolog();
let cache_config = load_config!(
config_path,
"[cache]\n\
enabled = true\n\
directory = {cache_dir}\n\
worker-event-queue-size = '16'",
cache_dir
);
assert!(cache_config.enabled());
let worker = Worker::start_new(&cache_config, None);
let mod_file = cache_dir.join("some-mod");
let stats_file = cache_dir.join("some-mod.stats");
let default_stats = ModuleCacheStatistics::default(&cache_config);
assert!(write_stats_file(&stats_file, &default_stats));
let mut usages = 0;
for times_used in &[4, 7, 2] {
for _ in 0..*times_used {
worker.on_cache_get_async(mod_file.clone());
usages += 1;
}
worker.wait_for_all_events_handled();
assert_eq!(worker.events_dropped(), 0);
let stats = read_stats_file(&stats_file).expect("Failed to read stats file");
assert_eq!(stats.usages, usages);
}
}
#[test]
fn test_on_get_recompress_no_mod_file() {
let (_tempdir, cache_dir, config_path) = test_prolog();
let cache_config = load_config!(
config_path,
"[cache]\n\
enabled = true\n\
directory = {cache_dir}\n\
worker-event-queue-size = '16'\n\
baseline-compression-level = 3\n\
optimized-compression-level = 7\n\
optimized-compression-usage-counter-threshold = '256'",
cache_dir
);
assert!(cache_config.enabled());
let worker = Worker::start_new(&cache_config, None);
let mod_file = cache_dir.join("some-mod");
let stats_file = cache_dir.join("some-mod.stats");
let mut start_stats = ModuleCacheStatistics::default(&cache_config);
start_stats.usages = 250;
assert!(write_stats_file(&stats_file, &start_stats));
let mut usages = start_stats.usages;
for times_used in &[4, 7, 2] {
for _ in 0..*times_used {
worker.on_cache_get_async(mod_file.clone());
usages += 1;
}
worker.wait_for_all_events_handled();
assert_eq!(worker.events_dropped(), 0);
let stats = read_stats_file(&stats_file).expect("Failed to read stats file");
assert_eq!(stats.usages, usages);
assert_eq!(
stats.compression_level,
cache_config.baseline_compression_level()
);
}
}
#[test]
fn test_on_get_recompress_with_mod_file() {
let (_tempdir, cache_dir, config_path) = test_prolog();
let cache_config = load_config!(
config_path,
"[cache]\n\
enabled = true\n\
directory = {cache_dir}\n\
worker-event-queue-size = '16'\n\
baseline-compression-level = 3\n\
optimized-compression-level = 7\n\
optimized-compression-usage-counter-threshold = '256'",
cache_dir
);
assert!(cache_config.enabled());
let worker = Worker::start_new(&cache_config, None);
let mod_file = cache_dir.join("some-mod");
let mod_data = "some test data to be compressed";
let data = zstd::encode_all(
mod_data.as_bytes(),
cache_config.baseline_compression_level(),
)
.expect("Failed to compress sample mod file");
fs::write(&mod_file, &data).expect("Failed to write sample mod file");
let stats_file = cache_dir.join("some-mod.stats");
let mut start_stats = ModuleCacheStatistics::default(&cache_config);
start_stats.usages = 250;
assert!(write_stats_file(&stats_file, &start_stats));
// scenarios:
// 1. Shouldn't be recompressed
// 2. Should be recompressed
// 3. After lowering compression level, should be recompressed
let scenarios = [(4, false), (7, true), (2, false)];
let mut usages = start_stats.usages;
assert!(usages < cache_config.optimized_compression_usage_counter_threshold());
let mut tested_higher_opt_compr_lvl = false;
for (times_used, lower_compr_lvl) in &scenarios {
for _ in 0..*times_used {
worker.on_cache_get_async(mod_file.clone());
usages += 1;
}
worker.wait_for_all_events_handled();
assert_eq!(worker.events_dropped(), 0);
let mut stats = read_stats_file(&stats_file).expect("Failed to read stats file");
assert_eq!(stats.usages, usages);
assert_eq!(
stats.compression_level,
if usages < cache_config.optimized_compression_usage_counter_threshold() {
cache_config.baseline_compression_level()
} else {
cache_config.optimized_compression_level()
}
);
let compressed_data = fs::read(&mod_file).expect("Failed to read mod file");
let decoded_data =
zstd::decode_all(&compressed_data[..]).expect("Failed to decompress mod file");
assert_eq!(decoded_data, mod_data.as_bytes());
if *lower_compr_lvl {
assert!(usages >= cache_config.optimized_compression_usage_counter_threshold());
tested_higher_opt_compr_lvl = true;
stats.compression_level -= 1;
assert!(write_stats_file(&stats_file, &stats));
}
}
assert!(usages >= cache_config.optimized_compression_usage_counter_threshold());
assert!(tested_higher_opt_compr_lvl);
}
#[test]
fn test_on_get_recompress_lock() {
let (_tempdir, cache_dir, config_path) = test_prolog();
let cache_config = load_config!(
config_path,
"[cache]\n\
enabled = true\n\
directory = {cache_dir}\n\
worker-event-queue-size = '16'\n\
baseline-compression-level = 3\n\
optimized-compression-level = 7\n\
optimized-compression-usage-counter-threshold = '256'\n\
optimizing-compression-task-timeout = '30m'\n\
allowed-clock-drift-for-files-from-future = '1d'",
cache_dir
);
assert!(cache_config.enabled());
let worker = Worker::start_new(&cache_config, None);
let mod_file = cache_dir.join("some-mod");
let mod_data = "some test data to be compressed";
let data = zstd::encode_all(
mod_data.as_bytes(),
cache_config.baseline_compression_level(),
)
.expect("Failed to compress sample mod file");
fs::write(&mod_file, &data).expect("Failed to write sample mod file");
let stats_file = cache_dir.join("some-mod.stats");
let mut start_stats = ModuleCacheStatistics::default(&cache_config);
start_stats.usages = 255;
let lock_file = cache_dir.join("some-mod.wip-lock");
let scenarios = [
// valid lock
(true, "past", Duration::from_secs(30 * 60 - 1)),
// valid future lock
(true, "future", Duration::from_secs(24 * 60 * 60)),
// expired lock
(false, "past", Duration::from_secs(30 * 60)),
// expired future lock
(false, "future", Duration::from_secs(24 * 60 * 60 + 1)),
];
for (lock_valid, duration_sign, duration) in &scenarios {
assert!(write_stats_file(&stats_file, &start_stats)); // restore usage & compression level
create_file_with_mtime(&lock_file, "", duration_sign, &duration);
worker.on_cache_get_async(mod_file.clone());
worker.wait_for_all_events_handled();
assert_eq!(worker.events_dropped(), 0);
let stats = read_stats_file(&stats_file).expect("Failed to read stats file");
assert_eq!(stats.usages, start_stats.usages + 1);
assert_eq!(
stats.compression_level,
if *lock_valid {
cache_config.baseline_compression_level()
} else {
cache_config.optimized_compression_level()
}
);
let compressed_data = fs::read(&mod_file).expect("Failed to read mod file");
let decoded_data =
zstd::decode_all(&compressed_data[..]).expect("Failed to decompress mod file");
assert_eq!(decoded_data, mod_data.as_bytes());
}
}
#[test]
fn test_on_update_fresh_stats_file() {
let (_tempdir, cache_dir, config_path) = test_prolog();
let cache_config = load_config!(
config_path,
"[cache]\n\
enabled = true\n\
directory = {cache_dir}\n\
worker-event-queue-size = '16'\n\
baseline-compression-level = 3\n\
optimized-compression-level = 7\n\
cleanup-interval = '1h'",
cache_dir
);
assert!(cache_config.enabled());
let worker = Worker::start_new(&cache_config, None);
let mod_file = cache_dir.join("some-mod");
let stats_file = cache_dir.join("some-mod.stats");
let cleanup_certificate = cache_dir.join(".cleanup.wip-done");
create_file_with_mtime(&cleanup_certificate, "", "future", &Duration::from_secs(0));
// the below created by the worker if it cleans up
let worker_lock_file = cache_dir.join(format!(".cleanup.wip-{}", process::id()));
// scenarios:
// 1. Create new stats file
// 2. Overwrite existing file
for update_file in &[true, false] {
worker.on_cache_update_async(mod_file.clone());
worker.wait_for_all_events_handled();
assert_eq!(worker.events_dropped(), 0);
let mut stats = read_stats_file(&stats_file).expect("Failed to read stats file");
assert_eq!(stats.usages, 1);
assert_eq!(
stats.compression_level,
cache_config.baseline_compression_level()
);
if *update_file {
stats.usages += 42;
stats.compression_level += 1;
assert!(write_stats_file(&stats_file, &stats));
}
assert!(!worker_lock_file.exists());
}
}
#[test]
fn test_on_update_cleanup_limits_trash_locks() {
let (_tempdir, cache_dir, config_path) = test_prolog();
let cache_config = load_config!(
config_path,
"[cache]\n\
enabled = true\n\
directory = {cache_dir}\n\
worker-event-queue-size = '16'\n\
cleanup-interval = '30m'\n\
optimizing-compression-task-timeout = '30m'\n\
allowed-clock-drift-for-files-from-future = '1d'\n\
file-count-soft-limit = '5'\n\
files-total-size-soft-limit = '30K'\n\
file-count-limit-percent-if-deleting = '70%'\n\
files-total-size-limit-percent-if-deleting = '70%'
",
cache_dir
);
assert!(cache_config.enabled());
let worker = Worker::start_new(&cache_config, None);
let content_1k = "a".repeat(1_000);
let content_10k = "a".repeat(10_000);
let mods_files_dir = cache_dir.join("target-triple").join("compiler-version");
let mod_with_stats = mods_files_dir.join("mod-with-stats");
let trash_dirs = [
mods_files_dir.join("trash"),
mods_files_dir.join("trash").join("trash"),
];
let trash_files = [
cache_dir.join("trash-file"),
cache_dir.join("trash-file.wip-lock"),
cache_dir.join("target-triple").join("trash.txt"),
cache_dir.join("target-triple").join("trash.txt.wip-lock"),
mods_files_dir.join("trash.ogg"),
mods_files_dir.join("trash").join("trash.doc"),
mods_files_dir.join("trash").join("trash.doc.wip-lock"),
mods_files_dir.join("trash").join("trash").join("trash.xls"),
mods_files_dir
.join("trash")
.join("trash")
.join("trash.xls.wip-lock"),
];
let mod_locks = [
// valid lock
(
mods_files_dir.join("mod0.wip-lock"),
true,
"past",
Duration::from_secs(30 * 60 - 1),
),
// valid future lock
(
mods_files_dir.join("mod1.wip-lock"),
true,
"future",
Duration::from_secs(24 * 60 * 60),
),
// expired lock
(
mods_files_dir.join("mod2.wip-lock"),
false,
"past",
Duration::from_secs(30 * 60),
),
// expired future lock
(
mods_files_dir.join("mod3.wip-lock"),
false,
"future",
Duration::from_secs(24 * 60 * 60 + 1),
),
];
// the below created by the worker if it cleans up
let worker_lock_file = cache_dir.join(format!(".cleanup.wip-{}", process::id()));
let scenarios = [
// Close to limits, but not reached, only trash deleted
(2, 2, 4),
// File count limit exceeded
(1, 10, 3),
// Total size limit exceeded
(4, 0, 2),
// Both limits exceeded
(3, 5, 3),
];
for (files_10k, files_1k, remaining_files) in &scenarios {
let mut secs_ago = 100;
for d in &trash_dirs {
fs::create_dir_all(d).expect("Failed to create directories");
}
for f in &trash_files {
create_file_with_mtime(f, "", "past", &Duration::from_secs(0));
}
for (f, _, sign, duration) in &mod_locks {
create_file_with_mtime(f, "", sign, &duration);
}
let mut mods_paths = vec![];
for content in repeat(&content_10k)
.take(*files_10k)
.chain(repeat(&content_1k).take(*files_1k))
{
mods_paths.push(mods_files_dir.join(format!("test-mod-{}", mods_paths.len())));
create_file_with_mtime(
mods_paths.last().unwrap(),
content,
"past",
&Duration::from_secs(secs_ago),
);
assert!(secs_ago > 0);
secs_ago -= 1;
}
// creating .stats file updates mtime what affects test results
// so we use a separate nonexistent module here (orphaned .stats will be removed anyway)
worker.on_cache_update_async(mod_with_stats.clone());
worker.wait_for_all_events_handled();
assert_eq!(worker.events_dropped(), 0);
for ent in trash_dirs.iter().chain(trash_files.iter()) {
assert!(!ent.exists());
}
for (f, valid, ..) in &mod_locks {
assert_eq!(f.exists(), *valid);
}
for (idx, path) in mods_paths.iter().enumerate() {
let should_exist = idx >= mods_paths.len() - *remaining_files;
assert_eq!(path.exists(), should_exist);
if should_exist {
// cleanup before next iteration
fs::remove_file(path).expect("Failed to remove a file");
}
}
fs::remove_file(&worker_lock_file).expect("Failed to remove lock file");
}
}
#[test]
fn test_on_update_cleanup_lru_policy() {
let (_tempdir, cache_dir, config_path) = test_prolog();
let cache_config = load_config!(
config_path,
"[cache]\n\
enabled = true\n\
directory = {cache_dir}\n\
worker-event-queue-size = '16'\n\
file-count-soft-limit = '5'\n\
files-total-size-soft-limit = '30K'\n\
file-count-limit-percent-if-deleting = '80%'\n\
files-total-size-limit-percent-if-deleting = '70%'",
cache_dir
);
assert!(cache_config.enabled());
let worker = Worker::start_new(&cache_config, None);
let content_1k = "a".repeat(1_000);
let content_5k = "a".repeat(5_000);
let content_10k = "a".repeat(10_000);
let mods_files_dir = cache_dir.join("target-triple").join("compiler-version");
fs::create_dir_all(&mods_files_dir).expect("Failed to create directories");
let nonexistent_mod_file = cache_dir.join("nonexistent-mod");
let orphaned_stats_file = cache_dir.join("orphaned-mod.stats");
let worker_lock_file = cache_dir.join(format!(".cleanup.wip-{}", process::id()));
// content, how long ago created, how long ago stats created (if created), should be alive
let scenarios = [
&[
(&content_10k, 29, None, false),
(&content_10k, 28, None, false),
(&content_10k, 27, None, false),
(&content_1k, 26, None, true),
(&content_10k, 25, None, true),
(&content_1k, 24, None, true),
],
&[
(&content_10k, 29, None, false),
(&content_10k, 28, None, false),
(&content_10k, 27, None, true),
(&content_1k, 26, None, true),
(&content_5k, 25, None, true),
(&content_1k, 24, None, true),
],
&[
(&content_10k, 29, Some(19), true),
(&content_10k, 28, None, false),
(&content_10k, 27, None, false),
(&content_1k, 26, Some(18), true),
(&content_5k, 25, None, true),
(&content_1k, 24, None, true),
],
&[
(&content_10k, 29, Some(19), true),
(&content_10k, 28, Some(18), true),
(&content_10k, 27, None, false),
(&content_1k, 26, Some(17), true),
(&content_5k, 25, None, false),
(&content_1k, 24, None, false),
],
&[
(&content_10k, 29, Some(19), true),
(&content_10k, 28, None, false),
(&content_1k, 27, None, false),
(&content_5k, 26, Some(18), true),
(&content_1k, 25, None, false),
(&content_10k, 24, None, false),
],
];
for mods in &scenarios {
let filenames = (0..mods.len())
.map(|i| {
(
mods_files_dir.join(format!("mod-{}", i)),
mods_files_dir.join(format!("mod-{}.stats", i)),
)
})
.collect::<Vec<_>>();
for ((content, mod_secs_ago, create_stats, _), (mod_filename, stats_filename)) in
mods.iter().zip(filenames.iter())
{
create_file_with_mtime(
mod_filename,
content,
"past",
&Duration::from_secs(*mod_secs_ago),
);
if let Some(stats_secs_ago) = create_stats {
create_file_with_mtime(
stats_filename,
"cleanup doesn't care",
"past",
&Duration::from_secs(*stats_secs_ago),
);
}
}
create_file_with_mtime(
&orphaned_stats_file,
"cleanup doesn't care",
"past",
&Duration::from_secs(0),
);
worker.on_cache_update_async(nonexistent_mod_file.clone());
worker.wait_for_all_events_handled();
assert_eq!(worker.events_dropped(), 0);
assert!(!orphaned_stats_file.exists());
for ((_, _, create_stats, alive), (mod_filename, stats_filename)) in
mods.iter().zip(filenames.iter())
{
assert_eq!(mod_filename.exists(), *alive);
assert_eq!(stats_filename.exists(), *alive && create_stats.is_some());
// cleanup for next iteration
if *alive {
fs::remove_file(&mod_filename).expect("Failed to remove a file");
if create_stats.is_some() {
fs::remove_file(&stats_filename).expect("Failed to remove a file");
}
}
}
fs::remove_file(&worker_lock_file).expect("Failed to remove lock file");
}
}
// clock drift should be applied to mod cache & stats, too
// however, postpone deleting files to as late as possible
#[test]
fn test_on_update_cleanup_future_files() {
let (_tempdir, cache_dir, config_path) = test_prolog();
let cache_config = load_config!(
config_path,
"[cache]\n\
enabled = true\n\
directory = {cache_dir}\n\
worker-event-queue-size = '16'\n\
allowed-clock-drift-for-files-from-future = '1d'\n\
file-count-soft-limit = '3'\n\
files-total-size-soft-limit = '1M'\n\
file-count-limit-percent-if-deleting = '70%'\n\
files-total-size-limit-percent-if-deleting = '70%'",
cache_dir
);
assert!(cache_config.enabled());
let worker = Worker::start_new(&cache_config, None);
let content_1k = "a".repeat(1_000);
let mods_files_dir = cache_dir.join("target-triple").join("compiler-version");
fs::create_dir_all(&mods_files_dir).expect("Failed to create directories");
let nonexistent_mod_file = cache_dir.join("nonexistent-mod");
// the below created by the worker if it cleans up
let worker_lock_file = cache_dir.join(format!(".cleanup.wip-{}", process::id()));
let scenarios: [&[_]; 5] = [
// NOT cleaning up, everythings ok
&[
(Duration::from_secs(0), None, true),
(Duration::from_secs(24 * 60 * 60), None, true),
],
// NOT cleaning up, everythings ok
&[
(Duration::from_secs(0), None, true),
(Duration::from_secs(24 * 60 * 60 + 1), None, true),
],
// cleaning up, removing files from oldest
&[
(Duration::from_secs(0), None, false),
(Duration::from_secs(24 * 60 * 60), None, true),
(Duration::from_secs(1), None, false),
(Duration::from_secs(2), None, true),
],
// cleaning up, removing files from oldest; deleting file from far future
&[
(Duration::from_secs(0), None, false),
(Duration::from_secs(1), None, true),
(Duration::from_secs(24 * 60 * 60 + 1), None, false),
(Duration::from_secs(2), None, true),
],
// cleaning up, removing files from oldest; file from far future should have .stats from +-now => it's a legitimate file
&[
(Duration::from_secs(0), None, false),
(Duration::from_secs(1), None, false),
(
Duration::from_secs(24 * 60 * 60 + 1),
Some(Duration::from_secs(3)),
true,
),
(Duration::from_secs(2), None, true),
],
];
for mods in &scenarios {
let filenames = (0..mods.len())
.map(|i| {
(
mods_files_dir.join(format!("mod-{}", i)),
mods_files_dir.join(format!("mod-{}.stats", i)),
)
})
.collect::<Vec<_>>();
for ((duration, opt_stats_duration, _), (mod_filename, stats_filename)) in
mods.iter().zip(filenames.iter())
{
create_file_with_mtime(mod_filename, &content_1k, "future", duration);
if let Some(stats_duration) = opt_stats_duration {
create_file_with_mtime(stats_filename, "", "future", stats_duration);
}
}
worker.on_cache_update_async(nonexistent_mod_file.clone());
worker.wait_for_all_events_handled();
assert_eq!(worker.events_dropped(), 0);
for ((_, opt_stats_duration, alive), (mod_filename, stats_filename)) in
mods.iter().zip(filenames.iter())
{
assert_eq!(mod_filename.exists(), *alive);
assert_eq!(
stats_filename.exists(),
*alive && opt_stats_duration.is_some()
);
if *alive {
fs::remove_file(mod_filename).expect("Failed to remove a file");
if opt_stats_duration.is_some() {
fs::remove_file(stats_filename).expect("Failed to remove a file");
}
}
}
fs::remove_file(&worker_lock_file).expect("Failed to remove lock file");
}
}
// this tests if worker triggered cleanup or not when some cleanup lock/certificate was out there
#[test]
fn test_on_update_cleanup_self_lock() {
let (_tempdir, cache_dir, config_path) = test_prolog();
let cache_config = load_config!(
config_path,
"[cache]\n\
enabled = true\n\
directory = {cache_dir}\n\
worker-event-queue-size = '16'\n\
cleanup-interval = '30m'\n\
allowed-clock-drift-for-files-from-future = '1d'",
cache_dir
);
assert!(cache_config.enabled());
let worker = Worker::start_new(&cache_config, None);
let mod_file = cache_dir.join("some-mod");
let trash_file = cache_dir.join("trash-file.txt");
let lock_file = cache_dir.join(".cleanup.wip-lock");
// the below created by the worker if it cleans up
let worker_lock_file = cache_dir.join(format!(".cleanup.wip-{}", process::id()));
let scenarios = [
// valid lock
(true, "past", Duration::from_secs(30 * 60 - 1)),
// valid future lock
(true, "future", Duration::from_secs(24 * 60 * 60)),
// expired lock
(false, "past", Duration::from_secs(30 * 60)),
// expired future lock
(false, "future", Duration::from_secs(24 * 60 * 60 + 1)),
];
for (lock_valid, duration_sign, duration) in &scenarios {
create_file_with_mtime(
&trash_file,
"with trash content",
"future",
&Duration::from_secs(0),
);
create_file_with_mtime(&lock_file, "", duration_sign, &duration);
worker.on_cache_update_async(mod_file.clone());
worker.wait_for_all_events_handled();
assert_eq!(worker.events_dropped(), 0);
assert_eq!(trash_file.exists(), *lock_valid);
assert_eq!(lock_file.exists(), *lock_valid);
if *lock_valid {
assert!(!worker_lock_file.exists());
} else {
fs::remove_file(&worker_lock_file).expect("Failed to remove lock file");
}
}
}
fn create_file_with_mtime(filename: &Path, contents: &str, offset_sign: &str, offset: &Duration) {
fs::write(filename, contents).expect("Failed to create a file");
let mtime = match offset_sign {
"past" => system_time_stub::NOW
.checked_sub(*offset)
.expect("Failed to calculate new mtime"),
"future" => system_time_stub::NOW
.checked_add(*offset)
.expect("Failed to calculate new mtime"),
_ => unreachable!(),
};
filetime::set_file_mtime(filename, mtime.into()).expect("Failed to set mtime");
}

View File

@@ -0,0 +1,29 @@
use lazy_static::lazy_static;
use std::time::{Duration, SystemTime, SystemTimeError};
lazy_static! {
pub static ref NOW: SystemTime = SystemTime::now(); // no need for RefCell and set_now() for now
}
#[derive(PartialOrd, PartialEq, Ord, Eq)]
pub struct SystemTimeStub(SystemTime);
impl SystemTimeStub {
pub fn now() -> Self {
Self(*NOW)
}
pub fn checked_add(&self, duration: Duration) -> Option<Self> {
self.0.checked_add(duration).map(|t| t.into())
}
pub fn duration_since(&self, earlier: SystemTime) -> Result<Duration, SystemTimeError> {
self.0.duration_since(earlier)
}
}
impl From<SystemTime> for SystemTimeStub {
fn from(time: SystemTime) -> Self {
Self(time)
}
}