Cache worker
This commit is contained in:
232
wasmtime-environ/src/cache/config.rs
vendored
232
wasmtime-environ/src/cache/config.rs
vendored
@@ -1,9 +1,10 @@
|
||||
//! Module for configuring the cache system.
|
||||
|
||||
use super::worker;
|
||||
use directories::ProjectDirs;
|
||||
use lazy_static::lazy_static;
|
||||
use log::{debug, error, trace};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use log::{debug, error, trace, warn};
|
||||
use serde::{de::Deserializer, ser::Serializer, Deserialize, Serialize};
|
||||
use spin::Once;
|
||||
use std::fmt::Debug;
|
||||
use std::fs;
|
||||
@@ -11,24 +12,72 @@ use std::mem;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::string::{String, ToString};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::time::Duration;
|
||||
use std::vec::Vec;
|
||||
|
||||
// wrapped, so we have named section in config,
|
||||
// also, for possible future compatibility
|
||||
#[derive(Serialize, Deserialize)]
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
struct Config {
|
||||
cache: CacheConfig,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
// todo: markdown documention of these options
|
||||
// todo: don't flush default values (create config from simple template + url to docs)
|
||||
// todo: more user-friendly cache config creation
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
struct CacheConfig {
|
||||
#[serde(skip)]
|
||||
pub errors: Vec<String>,
|
||||
|
||||
pub enabled: bool,
|
||||
pub directory: Option<PathBuf>,
|
||||
#[serde(rename = "worker-event-queue-size")]
|
||||
pub worker_event_queue_size: Option<usize>,
|
||||
#[serde(rename = "baseline-compression-level")]
|
||||
pub baseline_compression_level: Option<i32>,
|
||||
#[serde(rename = "optimized-compression-level")]
|
||||
pub optimized_compression_level: Option<i32>,
|
||||
#[serde(rename = "optimized-compression-usage-counter-threshold")]
|
||||
pub optimized_compression_usage_counter_threshold: Option<u64>,
|
||||
#[serde(
|
||||
default,
|
||||
rename = "cleanup-interval-in-seconds",
|
||||
serialize_with = "serialize_duration",
|
||||
deserialize_with = "deserialize_duration"
|
||||
)] // todo unit?
|
||||
pub cleanup_interval: Option<Duration>,
|
||||
#[serde(
|
||||
default,
|
||||
rename = "optimizing-compression-task-timeout-in-seconds",
|
||||
serialize_with = "serialize_duration",
|
||||
deserialize_with = "deserialize_duration"
|
||||
)] // todo unit?
|
||||
pub optimizing_compression_task_timeout: Option<Duration>,
|
||||
#[serde(rename = "files-count-soft-limit")]
|
||||
pub files_count_soft_limit: Option<u64>,
|
||||
#[serde(rename = "files-total-size-soft-limit")]
|
||||
pub files_total_size_soft_limit: Option<u64>, // todo unit?
|
||||
#[serde(rename = "files-count-limit-percent-if-deleting")]
|
||||
pub files_count_limit_percent_if_deleting: Option<u8>, // todo format: integer + %
|
||||
#[serde(rename = "files-total-size-limit-percent-if-deleting")]
|
||||
pub files_total_size_limit_percent_if_deleting: Option<u8>,
|
||||
}
|
||||
|
||||
// toml-rs fails to serialize Duration ("values must be emitted before tables")
|
||||
// so we're providing custom functions for it
|
||||
fn serialize_duration<S>(duration: &Option<Duration>, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
duration.map(|d| d.as_secs()).serialize(serializer)
|
||||
}
|
||||
|
||||
fn deserialize_duration<'de, D>(deserializer: D) -> Result<Option<Duration>, D::Error>
|
||||
where
|
||||
D: Deserializer<'de>,
|
||||
{
|
||||
Ok(Option::<u64>::deserialize(deserializer)?.map(Duration::from_secs))
|
||||
}
|
||||
|
||||
// Private static, so only internal function can access it.
|
||||
@@ -53,20 +102,35 @@ pub fn directory() -> &'static PathBuf {
|
||||
.expect("Cache system must be initialized")
|
||||
.directory
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.expect("All cache system settings must be validated or defaulted")
|
||||
}
|
||||
|
||||
/// Returns cache compression level.
|
||||
///
|
||||
/// Panics if the cache is disabled.
|
||||
pub fn baseline_compression_level() -> i32 {
|
||||
CONFIG
|
||||
.r#try()
|
||||
.expect("Cache system must be initialized")
|
||||
.baseline_compression_level
|
||||
.unwrap()
|
||||
macro_rules! generate_setting_getter {
|
||||
($setting:ident: $setting_type:ty) => {
|
||||
/// Returns `$setting`.
|
||||
///
|
||||
/// Panics if the cache is disabled.
|
||||
pub fn $setting() -> $setting_type {
|
||||
CONFIG
|
||||
.r#try()
|
||||
.expect("Cache system must be initialized")
|
||||
.$setting
|
||||
.expect("All cache system settings must be validated or defaulted")
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
generate_setting_getter!(worker_event_queue_size: usize);
|
||||
generate_setting_getter!(baseline_compression_level: i32);
|
||||
generate_setting_getter!(optimized_compression_level: i32);
|
||||
generate_setting_getter!(optimized_compression_usage_counter_threshold: u64);
|
||||
generate_setting_getter!(cleanup_interval: Duration);
|
||||
generate_setting_getter!(optimizing_compression_task_timeout: Duration);
|
||||
generate_setting_getter!(files_count_soft_limit: u64);
|
||||
generate_setting_getter!(files_total_size_soft_limit: u64);
|
||||
generate_setting_getter!(files_count_limit_percent_if_deleting: u8);
|
||||
generate_setting_getter!(files_total_size_limit_percent_if_deleting: u8);
|
||||
|
||||
/// Initializes the cache system. Should be called exactly once,
|
||||
/// and before using the cache system. Otherwise it can panic.
|
||||
/// Returns list of errors. If empty, initialization succeeded.
|
||||
@@ -74,6 +138,7 @@ pub fn init<P: AsRef<Path> + Debug>(
|
||||
enabled: bool,
|
||||
config_file: Option<P>,
|
||||
create_new_config: bool,
|
||||
init_file_per_thread_logger: Option<&'static str>,
|
||||
) -> &'static Vec<String> {
|
||||
INIT_CALLED
|
||||
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
|
||||
@@ -85,10 +150,8 @@ pub fn init<P: AsRef<Path> + Debug>(
|
||||
let conf_file_str = format!("{:?}", config_file);
|
||||
let conf = CONFIG.call_once(|| CacheConfig::from_file(enabled, config_file, create_new_config));
|
||||
if conf.errors.is_empty() {
|
||||
debug!(
|
||||
"Cache init(\"{}\"): enabled={}, directory={:?}, baseline-compression-level={:?}",
|
||||
conf_file_str, conf.enabled, conf.directory, conf.baseline_compression_level,
|
||||
)
|
||||
worker::init(init_file_per_thread_logger);
|
||||
debug!("Cache init(\"{}\"): {:#?}", conf_file_str, conf)
|
||||
} else {
|
||||
error!(
|
||||
"Cache init(\"{}\"): errors: {:#?}",
|
||||
@@ -104,6 +167,19 @@ lazy_static! {
|
||||
static ref PROJECT_DIRS: Option<ProjectDirs> =
|
||||
ProjectDirs::from("", "CraneStation", "wasmtime");
|
||||
}
|
||||
// TODO: values to be tuned
|
||||
// TODO: what do we want to warn users about?
|
||||
const DEFAULT_WORKER_EVENT_QUEUE_SIZE: usize = 0x10;
|
||||
const WORKER_EVENT_QUEUE_SIZE_WARNING_TRESHOLD: usize = 3;
|
||||
const DEFAULT_BASELINE_COMPRESSION_LEVEL: i32 = zstd::DEFAULT_COMPRESSION_LEVEL;
|
||||
const DEFAULT_OPTIMIZED_COMPRESSION_LEVEL: i32 = 20;
|
||||
const DEFAULT_OPTIMIZED_COMPRESSION_USAGE_COUNTER_THRESHOLD: u64 = 0x100;
|
||||
const DEFAULT_CLEANUP_INTERVAL: Duration = Duration::from_secs(60 * 60);
|
||||
const DEFAULT_OPTIMIZING_COMPRESSION_TASK_TIMEOUT: Duration = Duration::from_secs(30 * 60);
|
||||
const DEFAULT_FILES_COUNT_SOFT_LIMIT: u64 = 0x10_000;
|
||||
const DEFAULT_FILES_TOTAL_SIZE_SOFT_LIMIT: u64 = 1024 * 1024 * 512;
|
||||
const DEFAULT_FILES_COUNT_LIMIT_PERCENT_IF_DELETING: u8 = 70;
|
||||
const DEFAULT_FILES_TOTAL_SIZE_LIMIT_PERCENT_IF_DELETING: u8 = 70;
|
||||
|
||||
impl CacheConfig {
|
||||
pub fn new_cache_disabled() -> Self {
|
||||
@@ -111,7 +187,16 @@ impl CacheConfig {
|
||||
errors: Vec::new(),
|
||||
enabled: false,
|
||||
directory: None,
|
||||
worker_event_queue_size: None,
|
||||
baseline_compression_level: None,
|
||||
optimized_compression_level: None,
|
||||
optimized_compression_usage_counter_threshold: None,
|
||||
cleanup_interval: None,
|
||||
optimizing_compression_task_timeout: None,
|
||||
files_count_soft_limit: None,
|
||||
files_total_size_soft_limit: None,
|
||||
files_count_limit_percent_if_deleting: None,
|
||||
files_total_size_limit_percent_if_deleting: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -143,8 +228,17 @@ impl CacheConfig {
|
||||
};
|
||||
|
||||
// validate values and fill in defaults
|
||||
config.validate_cache_directory_or_default();
|
||||
config.validate_directory_or_default();
|
||||
config.validate_worker_event_queue_size_or_default();
|
||||
config.validate_baseline_compression_level_or_default();
|
||||
config.validate_optimized_compression_level_or_default();
|
||||
config.validate_optimized_compression_usage_counter_threshold_or_default();
|
||||
config.validate_cleanup_interval_or_default();
|
||||
config.validate_optimizing_compression_task_timeout_or_default();
|
||||
config.validate_files_count_soft_limit_or_default();
|
||||
config.validate_files_total_size_soft_limit_or_default();
|
||||
config.validate_files_count_limit_percent_if_deleting_or_default();
|
||||
config.validate_files_total_size_limit_percent_if_deleting_or_default();
|
||||
|
||||
path_if_flush_to_disk.map(|p| config.flush_to_disk(p));
|
||||
|
||||
@@ -195,7 +289,7 @@ impl CacheConfig {
|
||||
}
|
||||
}
|
||||
|
||||
fn validate_cache_directory_or_default(&mut self) {
|
||||
fn validate_directory_or_default(&mut self) {
|
||||
if self.directory.is_none() {
|
||||
match &*PROJECT_DIRS {
|
||||
Some(proj_dirs) => self.directory = Some(proj_dirs.cache_dir().to_path_buf()),
|
||||
@@ -246,9 +340,19 @@ impl CacheConfig {
|
||||
}
|
||||
}
|
||||
|
||||
fn validate_worker_event_queue_size_or_default(&mut self) {
|
||||
if self.worker_event_queue_size.is_none() {
|
||||
self.worker_event_queue_size = Some(DEFAULT_WORKER_EVENT_QUEUE_SIZE);
|
||||
}
|
||||
|
||||
if self.worker_event_queue_size.unwrap() < WORKER_EVENT_QUEUE_SIZE_WARNING_TRESHOLD {
|
||||
warn!("Detected small worker event queue size. Some messages might be lost.");
|
||||
}
|
||||
}
|
||||
|
||||
fn validate_baseline_compression_level_or_default(&mut self) {
|
||||
if self.baseline_compression_level.is_none() {
|
||||
self.baseline_compression_level = Some(zstd::DEFAULT_COMPRESSION_LEVEL);
|
||||
self.baseline_compression_level = Some(DEFAULT_BASELINE_COMPRESSION_LEVEL);
|
||||
}
|
||||
|
||||
if !ZSTD_COMPRESSION_LEVELS.contains(&self.baseline_compression_level.unwrap()) {
|
||||
@@ -260,6 +364,92 @@ impl CacheConfig {
|
||||
}
|
||||
}
|
||||
|
||||
// assumption: baseline compression level has been verified
|
||||
fn validate_optimized_compression_level_or_default(&mut self) {
|
||||
if self.optimized_compression_level.is_none() {
|
||||
self.optimized_compression_level = Some(DEFAULT_OPTIMIZED_COMPRESSION_LEVEL);
|
||||
}
|
||||
|
||||
let opt_lvl = self.optimized_compression_level.unwrap();
|
||||
let base_lvl = self.baseline_compression_level.unwrap();
|
||||
|
||||
if !ZSTD_COMPRESSION_LEVELS.contains(&opt_lvl) {
|
||||
self.errors.push(format!(
|
||||
"Invalid optimized compression level: {} not in {:#?}",
|
||||
opt_lvl, ZSTD_COMPRESSION_LEVELS
|
||||
));
|
||||
}
|
||||
|
||||
if opt_lvl < base_lvl {
|
||||
self.errors.push(format!(
|
||||
"Invalid optimized compression level is lower than baseline: {} < {}",
|
||||
opt_lvl, base_lvl
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
fn validate_optimized_compression_usage_counter_threshold_or_default(&mut self) {
|
||||
if self.optimized_compression_usage_counter_threshold.is_none() {
|
||||
self.optimized_compression_usage_counter_threshold =
|
||||
Some(DEFAULT_OPTIMIZED_COMPRESSION_USAGE_COUNTER_THRESHOLD);
|
||||
}
|
||||
}
|
||||
|
||||
fn validate_cleanup_interval_or_default(&mut self) {
|
||||
if self.cleanup_interval.is_none() {
|
||||
self.cleanup_interval = Some(DEFAULT_CLEANUP_INTERVAL);
|
||||
}
|
||||
}
|
||||
|
||||
fn validate_optimizing_compression_task_timeout_or_default(&mut self) {
|
||||
if self.optimizing_compression_task_timeout.is_none() {
|
||||
self.optimizing_compression_task_timeout =
|
||||
Some(DEFAULT_OPTIMIZING_COMPRESSION_TASK_TIMEOUT);
|
||||
}
|
||||
}
|
||||
|
||||
fn validate_files_count_soft_limit_or_default(&mut self) {
|
||||
if self.files_count_soft_limit.is_none() {
|
||||
self.files_count_soft_limit = Some(DEFAULT_FILES_COUNT_SOFT_LIMIT);
|
||||
}
|
||||
}
|
||||
|
||||
fn validate_files_total_size_soft_limit_or_default(&mut self) {
|
||||
if self.files_total_size_soft_limit.is_none() {
|
||||
self.files_total_size_soft_limit = Some(DEFAULT_FILES_TOTAL_SIZE_SOFT_LIMIT);
|
||||
}
|
||||
}
|
||||
|
||||
fn validate_files_count_limit_percent_if_deleting_or_default(&mut self) {
|
||||
if self.files_count_limit_percent_if_deleting.is_none() {
|
||||
self.files_count_limit_percent_if_deleting =
|
||||
Some(DEFAULT_FILES_COUNT_LIMIT_PERCENT_IF_DELETING);
|
||||
}
|
||||
|
||||
let percent = self.files_count_limit_percent_if_deleting.unwrap();
|
||||
if percent > 100 {
|
||||
self.errors.push(format!(
|
||||
"Invalid files count limit percent if deleting: {} not in range 0-100%",
|
||||
percent
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
fn validate_files_total_size_limit_percent_if_deleting_or_default(&mut self) {
|
||||
if self.files_total_size_limit_percent_if_deleting.is_none() {
|
||||
self.files_total_size_limit_percent_if_deleting =
|
||||
Some(DEFAULT_FILES_TOTAL_SIZE_LIMIT_PERCENT_IF_DELETING);
|
||||
}
|
||||
|
||||
let percent = self.files_total_size_limit_percent_if_deleting.unwrap();
|
||||
if percent > 100 {
|
||||
self.errors.push(format!(
|
||||
"Invalid files total size limit percent if deleting: {} not in range 0-100%",
|
||||
percent
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
fn flush_to_disk(&mut self, path: PathBuf) {
|
||||
if !self.errors.is_empty() {
|
||||
return;
|
||||
|
||||
Reference in New Issue
Block a user