diff --git a/src/tools/Cargo.lock b/src/tools/Cargo.lock index ca97206cc4..28d0f79f5b 100644 --- a/src/tools/Cargo.lock +++ b/src/tools/Cargo.lock @@ -6,6 +6,7 @@ dependencies = [ "cretonne-reader 0.0.0", "docopt 0.6.83 (registry+https://github.com/rust-lang/crates.io-index)", "filecheck 0.0.0", + "num_cpus 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -73,6 +74,14 @@ dependencies = [ "libc 0.2.16 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "num_cpus" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "libc 0.2.16 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "regex" version = "0.1.77" diff --git a/src/tools/Cargo.toml b/src/tools/Cargo.toml index b4572f9f49..3a1a8979a7 100644 --- a/src/tools/Cargo.toml +++ b/src/tools/Cargo.toml @@ -15,3 +15,4 @@ cretonne-reader = { path = "../libreader" } filecheck = { path = "../libfilecheck" } docopt = "0.6.80" rustc-serialize = "0.3.19" +num_cpus = "1.1.0" diff --git a/src/tools/filetest/concurrent.rs b/src/tools/filetest/concurrent.rs new file mode 100644 index 0000000000..3123124086 --- /dev/null +++ b/src/tools/filetest/concurrent.rs @@ -0,0 +1,153 @@ +//! Run tests concurrently. +//! +//! This module provides the `ConcurrentRunner` struct which uses a pool of threads to run tests +//! concurrently. + +use std::panic::catch_unwind; +use std::path::{Path, PathBuf}; +use std::sync::mpsc::{channel, Sender, Receiver}; +use std::sync::{Arc, Mutex}; +use std::thread; +use std::time::Duration; +use num_cpus; +use filetest::{TestResult, runone}; + +// Request sent to worker threads contains jobid and path. +struct Request(usize, PathBuf); + +/// Reply from worker thread, +pub enum Reply { + Starting { jobid: usize, thread_num: usize }, + Done { jobid: usize, result: TestResult }, + Tick, +} + +/// Manage threads that run test jobs concurrently. +pub struct ConcurrentRunner { + // Channel for sending requests to the worker threads. + // The workers are sharing the receiver with an `Arc>`. + // This is `None` when shutting down. + request_tx: Option>, + + // Channel for receiving replies from the workers. + // Workers have their own `Sender`. + reply_rx: Receiver, + + handles: Vec>, +} + +impl ConcurrentRunner { + /// Create a new `ConcurrentRunner` with threads spun up. + pub fn new() -> ConcurrentRunner { + let (request_tx, request_rx) = channel(); + let request_mutex = Arc::new(Mutex::new(request_rx)); + let (reply_tx, reply_rx) = channel(); + + heartbeat_thread(reply_tx.clone()); + + let handles = (0..num_cpus::get()) + .map(|num| worker_thread(num, request_mutex.clone(), reply_tx.clone())) + .collect(); + + ConcurrentRunner { + request_tx: Some(request_tx), + reply_rx: reply_rx, + handles: handles, + } + } + + /// Shut down worker threads orderly. They will finish any queued jobs first. + pub fn shutdown(&mut self) { + self.request_tx = None; + } + + /// Join all the worker threads. + pub fn join(&mut self) { + assert!(self.request_tx.is_none(), "must shutdown before join"); + for h in self.handles.drain(..) { + if let Err(e) = h.join() { + println!("worker panicked: {:?}", e); + } + } + } + + /// Add a new job to the queues. + pub fn put(&mut self, jobid: usize, path: &Path) { + self.request_tx + .as_ref() + .expect("cannot push after shutdown") + .send(Request(jobid, path.to_owned())) + .expect("all the worker threads are gone"); + } + + /// Get a job reply without blocking. + pub fn try_get(&mut self) -> Option { + self.reply_rx.try_recv().ok() + } + + /// Get a job reply, blocking until one is available. + pub fn get(&mut self) -> Option { + self.reply_rx.recv().ok() + } +} + +/// Spawn a heartbeat thread which sends ticks down the reply channel every second. +/// This lets us implement timeouts without the not yet stable `recv_timeout`. +fn heartbeat_thread(replies: Sender) -> thread::JoinHandle<()> { + thread::Builder::new() + .name("heartbeat".to_string()) + .spawn(move || { + while replies.send(Reply::Tick).is_ok() { + thread::sleep(Duration::from_secs(1)); + } + }) + .unwrap() +} + +/// Spawn a worker thread running tests. +fn worker_thread(thread_num: usize, + requests: Arc>>, + replies: Sender) + -> thread::JoinHandle<()> { + thread::Builder::new() + .name(format!("worker #{}", thread_num)) + .spawn(move || { + loop { + // Lock the mutex only long enough to extract a request. + let Request(jobid, path) = match requests.lock().unwrap().recv() { + Err(..) => break, // TX end shuit down. exit thread. + Ok(req) => req, + }; + + // Tell them we're starting this job. + // The receiver should always be present for this as long as we have jobs. + replies.send(Reply::Starting { + jobid: jobid, + thread_num: thread_num, + }) + .unwrap(); + + let result = match catch_unwind(|| runone::run(path.as_path())) { + Ok(r) => r, + Err(e) => { + // The test panicked, leaving us a `Box`. + // Panics are usually strings. + if let Some(msg) = e.downcast_ref::() { + Err(format!("panicked in worker #{}: {}", thread_num, msg)) + } else if let Some(msg) = e.downcast_ref::<&'static str>() { + Err(format!("panicked in worker #{}: {}", thread_num, msg)) + } else { + Err(format!("panicked in worker #{}", thread_num)) + } + } + }; + + replies.send(Reply::Done { + jobid: jobid, + result: result, + }) + .unwrap(); + } + }) + .unwrap() +} diff --git a/src/tools/filetest/mod.rs b/src/tools/filetest/mod.rs index dec7ff6016..60b6180c05 100644 --- a/src/tools/filetest/mod.rs +++ b/src/tools/filetest/mod.rs @@ -14,6 +14,7 @@ use filetest::runner::TestRunner; pub mod subtest; mod runner; mod runone; +mod concurrent; mod domtree; mod verifier; @@ -40,6 +41,7 @@ pub fn run(files: Vec) -> CommandResult { } } + runner.start_threads(); runner.run() } diff --git a/src/tools/filetest/runner.rs b/src/tools/filetest/runner.rs index 44501489dd..c4cfca1ec3 100644 --- a/src/tools/filetest/runner.rs +++ b/src/tools/filetest/runner.rs @@ -7,8 +7,15 @@ use std::error::Error; use std::ffi::OsStr; use std::path::{Path, PathBuf}; use filetest::{TestResult, runone}; +use filetest::concurrent::{ConcurrentRunner, Reply}; use CommandResult; +// Timeout in seconds when we're not making progress. +const TIMEOUT_PANIC: usize = 10; + +// Timeout for reporting slow tests without panicking. +const TIMEOUT_SLOW: usize = 3; + struct QueueEntry { path: PathBuf, state: State, @@ -17,6 +24,7 @@ struct QueueEntry { #[derive(PartialEq, Eq, Debug)] enum State { New, + Queued, Running, Done(TestResult), } @@ -40,7 +48,13 @@ pub struct TestRunner { // Number of contiguous finished tests at the front of `tests`. finished_tests: usize, + // Number of errors seen so far. errors: usize, + + // Number of ticks received since we saw any progress. + ticks_since_progress: usize, + + threads: Option, } impl TestRunner { @@ -52,6 +66,8 @@ impl TestRunner { new_tests: 0, finished_tests: 0, errors: 0, + ticks_since_progress: 0, + threads: None, } } @@ -73,32 +89,10 @@ impl TestRunner { }); } - /// Take a new test for running as a job. - /// Leaves the queue entry marked as `Runnning`. - fn take_job(&mut self) -> Option { - let jobid = self.new_tests; - if jobid == self.tests.len() { - return None; - } - self.new_tests += 1; - assert_eq!(self.tests[jobid].state, State::New); - self.tests[jobid].state = State::Running; - Some(jobid) - } - - /// Report the end of a job. - fn finish_job(&mut self, jobid: usize, result: TestResult) { - assert_eq!(self.tests[jobid].state, State::Running); - if let Err(ref e) = result { - self.job_error(jobid, e); - } - self.tests[jobid].state = State::Done(result); - if jobid == self.finished_tests { - while let Some(&QueueEntry { state: State::Done(_), .. }) = self.tests - .get(self.finished_tests) { - self.finished_tests += 1; - } - } + /// Begin running tests concurrently. + pub fn start_threads(&mut self) { + assert!(self.threads.is_none()); + self.threads = Some(ConcurrentRunner::new()); } /// Scan any directories pushed so far. @@ -166,11 +160,87 @@ impl TestRunner { println!("FAIL {}: {}", self.tests[jobid].path.to_string_lossy(), err); } - /// Schedule and new jobs to run. + /// Schedule any new jobs to run. fn schedule_jobs(&mut self) { - while let Some(jobid) = self.take_job() { - let result = runone::run(self.tests[jobid].path()); - self.finish_job(jobid, result); + for jobid in self.new_tests..self.tests.len() { + assert_eq!(self.tests[jobid].state, State::New); + if let Some(ref mut conc) = self.threads { + // Queue test for concurrent execution. + self.tests[jobid].state = State::Queued; + conc.put(jobid, self.tests[jobid].path()); + } else { + // Run test synchronously. + self.tests[jobid].state = State::Running; + let result = runone::run(self.tests[jobid].path()); + self.finish_job(jobid, result); + } + self.new_tests = jobid + 1; + } + + // Check for any asynchronous replies without blocking. + while let Some(reply) = self.threads.as_mut().and_then(ConcurrentRunner::try_get) { + self.handle_reply(reply); + } + } + + /// Report the end of a job. + fn finish_job(&mut self, jobid: usize, result: TestResult) { + assert_eq!(self.tests[jobid].state, State::Running); + if let Err(ref e) = result { + self.job_error(jobid, e); + } + self.tests[jobid].state = State::Done(result); + if jobid == self.finished_tests { + while let Some(&QueueEntry { state: State::Done(_), .. }) = self.tests + .get(self.finished_tests) { + self.finished_tests += 1; + } + } + } + + /// Handle a reply from the async threads. + fn handle_reply(&mut self, reply: Reply) { + match reply { + Reply::Starting { jobid, .. } => { + assert_eq!(self.tests[jobid].state, State::Queued); + self.tests[jobid].state = State::Running; + } + Reply::Done { jobid, result } => { + self.ticks_since_progress = 0; + self.finish_job(jobid, result) + } + Reply::Tick => { + self.ticks_since_progress += 1; + if self.ticks_since_progress == TIMEOUT_SLOW { + println!("STALLED for {} seconds with {}/{} tests finished", + self.ticks_since_progress, + self.finished_tests, + self.tests.len()); + for jobid in self.finished_tests..self.tests.len() { + if self.tests[jobid].state == State::Running { + println!("slow: {}", self.tests[jobid].path.to_string_lossy()); + } + } + } + if self.ticks_since_progress >= TIMEOUT_PANIC { + panic!("worker threads stalled for {} seconds.", + self.ticks_since_progress); + } + } + } + } + + /// Drain the async jobs and shut down the threads. + fn drain_threads(&mut self) { + if let Some(mut conc) = self.threads.take() { + conc.shutdown(); + while self.finished_tests < self.tests.len() { + match conc.get() { + Some(reply) => self.handle_reply(reply), + None => break, + } + } + conc.join(); } } @@ -178,6 +248,7 @@ impl TestRunner { pub fn run(&mut self) -> CommandResult { self.scan_dirs(); self.schedule_jobs(); + self.drain_threads(); println!("{} tests", self.tests.len()); match self.errors { 0 => Ok(()), diff --git a/src/tools/main.rs b/src/tools/main.rs index af7a092394..436b07bb05 100644 --- a/src/tools/main.rs +++ b/src/tools/main.rs @@ -4,6 +4,7 @@ extern crate cton_reader; extern crate docopt; extern crate rustc_serialize; extern crate filecheck; +extern crate num_cpus; use cretonne::VERSION; use docopt::Docopt;