Run tests concurrently.

Spin up one worker thread per cpu, and run filetests on all of them. Use
a reorder buffer in Runner to make sure results are still reported in
order.

Individual test files given as command line arguments are still run
synchronously for easier debugging. Only directories are run on worker
threads. The recursive directory traversal is still happening on the
main thread.

Use a heartbeat thread to send ticks on the reply channel every second,
and use the ticks to detect tests that are stuck. When
Receiver::recv_timeout() is stabilized, we can probably get rid of the
heartbeat thread.

Catch panics on the worker threads and report them as test failures.
This commit is contained in:
Jakob Stoklund Olesen
2016-09-17 09:51:06 -07:00
parent 356e05d225
commit 1c1ae524aa
6 changed files with 267 additions and 30 deletions

9
src/tools/Cargo.lock generated
View File

@@ -6,6 +6,7 @@ dependencies = [
"cretonne-reader 0.0.0", "cretonne-reader 0.0.0",
"docopt 0.6.83 (registry+https://github.com/rust-lang/crates.io-index)", "docopt 0.6.83 (registry+https://github.com/rust-lang/crates.io-index)",
"filecheck 0.0.0", "filecheck 0.0.0",
"num_cpus 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
@@ -73,6 +74,14 @@ dependencies = [
"libc 0.2.16 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.16 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]]
name = "num_cpus"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"libc 0.2.16 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]] [[package]]
name = "regex" name = "regex"
version = "0.1.77" version = "0.1.77"

View File

@@ -15,3 +15,4 @@ cretonne-reader = { path = "../libreader" }
filecheck = { path = "../libfilecheck" } filecheck = { path = "../libfilecheck" }
docopt = "0.6.80" docopt = "0.6.80"
rustc-serialize = "0.3.19" rustc-serialize = "0.3.19"
num_cpus = "1.1.0"

View File

@@ -0,0 +1,153 @@
//! Run tests concurrently.
//!
//! This module provides the `ConcurrentRunner` struct which uses a pool of threads to run tests
//! concurrently.
use std::panic::catch_unwind;
use std::path::{Path, PathBuf};
use std::sync::mpsc::{channel, Sender, Receiver};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use num_cpus;
use filetest::{TestResult, runone};
// Request sent to worker threads contains jobid and path.
struct Request(usize, PathBuf);
/// Reply from worker thread,
pub enum Reply {
Starting { jobid: usize, thread_num: usize },
Done { jobid: usize, result: TestResult },
Tick,
}
/// Manage threads that run test jobs concurrently.
pub struct ConcurrentRunner {
// Channel for sending requests to the worker threads.
// The workers are sharing the receiver with an `Arc<Mutex<Receiver>>`.
// This is `None` when shutting down.
request_tx: Option<Sender<Request>>,
// Channel for receiving replies from the workers.
// Workers have their own `Sender`.
reply_rx: Receiver<Reply>,
handles: Vec<thread::JoinHandle<()>>,
}
impl ConcurrentRunner {
/// Create a new `ConcurrentRunner` with threads spun up.
pub fn new() -> ConcurrentRunner {
let (request_tx, request_rx) = channel();
let request_mutex = Arc::new(Mutex::new(request_rx));
let (reply_tx, reply_rx) = channel();
heartbeat_thread(reply_tx.clone());
let handles = (0..num_cpus::get())
.map(|num| worker_thread(num, request_mutex.clone(), reply_tx.clone()))
.collect();
ConcurrentRunner {
request_tx: Some(request_tx),
reply_rx: reply_rx,
handles: handles,
}
}
/// Shut down worker threads orderly. They will finish any queued jobs first.
pub fn shutdown(&mut self) {
self.request_tx = None;
}
/// Join all the worker threads.
pub fn join(&mut self) {
assert!(self.request_tx.is_none(), "must shutdown before join");
for h in self.handles.drain(..) {
if let Err(e) = h.join() {
println!("worker panicked: {:?}", e);
}
}
}
/// Add a new job to the queues.
pub fn put(&mut self, jobid: usize, path: &Path) {
self.request_tx
.as_ref()
.expect("cannot push after shutdown")
.send(Request(jobid, path.to_owned()))
.expect("all the worker threads are gone");
}
/// Get a job reply without blocking.
pub fn try_get(&mut self) -> Option<Reply> {
self.reply_rx.try_recv().ok()
}
/// Get a job reply, blocking until one is available.
pub fn get(&mut self) -> Option<Reply> {
self.reply_rx.recv().ok()
}
}
/// Spawn a heartbeat thread which sends ticks down the reply channel every second.
/// This lets us implement timeouts without the not yet stable `recv_timeout`.
fn heartbeat_thread(replies: Sender<Reply>) -> thread::JoinHandle<()> {
thread::Builder::new()
.name("heartbeat".to_string())
.spawn(move || {
while replies.send(Reply::Tick).is_ok() {
thread::sleep(Duration::from_secs(1));
}
})
.unwrap()
}
/// Spawn a worker thread running tests.
fn worker_thread(thread_num: usize,
requests: Arc<Mutex<Receiver<Request>>>,
replies: Sender<Reply>)
-> thread::JoinHandle<()> {
thread::Builder::new()
.name(format!("worker #{}", thread_num))
.spawn(move || {
loop {
// Lock the mutex only long enough to extract a request.
let Request(jobid, path) = match requests.lock().unwrap().recv() {
Err(..) => break, // TX end shuit down. exit thread.
Ok(req) => req,
};
// Tell them we're starting this job.
// The receiver should always be present for this as long as we have jobs.
replies.send(Reply::Starting {
jobid: jobid,
thread_num: thread_num,
})
.unwrap();
let result = match catch_unwind(|| runone::run(path.as_path())) {
Ok(r) => r,
Err(e) => {
// The test panicked, leaving us a `Box<Any>`.
// Panics are usually strings.
if let Some(msg) = e.downcast_ref::<String>() {
Err(format!("panicked in worker #{}: {}", thread_num, msg))
} else if let Some(msg) = e.downcast_ref::<&'static str>() {
Err(format!("panicked in worker #{}: {}", thread_num, msg))
} else {
Err(format!("panicked in worker #{}", thread_num))
}
}
};
replies.send(Reply::Done {
jobid: jobid,
result: result,
})
.unwrap();
}
})
.unwrap()
}

View File

@@ -14,6 +14,7 @@ use filetest::runner::TestRunner;
pub mod subtest; pub mod subtest;
mod runner; mod runner;
mod runone; mod runone;
mod concurrent;
mod domtree; mod domtree;
mod verifier; mod verifier;
@@ -40,6 +41,7 @@ pub fn run(files: Vec<String>) -> CommandResult {
} }
} }
runner.start_threads();
runner.run() runner.run()
} }

View File

@@ -7,8 +7,15 @@ use std::error::Error;
use std::ffi::OsStr; use std::ffi::OsStr;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use filetest::{TestResult, runone}; use filetest::{TestResult, runone};
use filetest::concurrent::{ConcurrentRunner, Reply};
use CommandResult; use CommandResult;
// Timeout in seconds when we're not making progress.
const TIMEOUT_PANIC: usize = 10;
// Timeout for reporting slow tests without panicking.
const TIMEOUT_SLOW: usize = 3;
struct QueueEntry { struct QueueEntry {
path: PathBuf, path: PathBuf,
state: State, state: State,
@@ -17,6 +24,7 @@ struct QueueEntry {
#[derive(PartialEq, Eq, Debug)] #[derive(PartialEq, Eq, Debug)]
enum State { enum State {
New, New,
Queued,
Running, Running,
Done(TestResult), Done(TestResult),
} }
@@ -40,7 +48,13 @@ pub struct TestRunner {
// Number of contiguous finished tests at the front of `tests`. // Number of contiguous finished tests at the front of `tests`.
finished_tests: usize, finished_tests: usize,
// Number of errors seen so far.
errors: usize, errors: usize,
// Number of ticks received since we saw any progress.
ticks_since_progress: usize,
threads: Option<ConcurrentRunner>,
} }
impl TestRunner { impl TestRunner {
@@ -52,6 +66,8 @@ impl TestRunner {
new_tests: 0, new_tests: 0,
finished_tests: 0, finished_tests: 0,
errors: 0, errors: 0,
ticks_since_progress: 0,
threads: None,
} }
} }
@@ -73,32 +89,10 @@ impl TestRunner {
}); });
} }
/// Take a new test for running as a job. /// Begin running tests concurrently.
/// Leaves the queue entry marked as `Runnning`. pub fn start_threads(&mut self) {
fn take_job(&mut self) -> Option<usize> { assert!(self.threads.is_none());
let jobid = self.new_tests; self.threads = Some(ConcurrentRunner::new());
if jobid == self.tests.len() {
return None;
}
self.new_tests += 1;
assert_eq!(self.tests[jobid].state, State::New);
self.tests[jobid].state = State::Running;
Some(jobid)
}
/// Report the end of a job.
fn finish_job(&mut self, jobid: usize, result: TestResult) {
assert_eq!(self.tests[jobid].state, State::Running);
if let Err(ref e) = result {
self.job_error(jobid, e);
}
self.tests[jobid].state = State::Done(result);
if jobid == self.finished_tests {
while let Some(&QueueEntry { state: State::Done(_), .. }) = self.tests
.get(self.finished_tests) {
self.finished_tests += 1;
}
}
} }
/// Scan any directories pushed so far. /// Scan any directories pushed so far.
@@ -166,11 +160,87 @@ impl TestRunner {
println!("FAIL {}: {}", self.tests[jobid].path.to_string_lossy(), err); println!("FAIL {}: {}", self.tests[jobid].path.to_string_lossy(), err);
} }
/// Schedule and new jobs to run. /// Schedule any new jobs to run.
fn schedule_jobs(&mut self) { fn schedule_jobs(&mut self) {
while let Some(jobid) = self.take_job() { for jobid in self.new_tests..self.tests.len() {
let result = runone::run(self.tests[jobid].path()); assert_eq!(self.tests[jobid].state, State::New);
self.finish_job(jobid, result); if let Some(ref mut conc) = self.threads {
// Queue test for concurrent execution.
self.tests[jobid].state = State::Queued;
conc.put(jobid, self.tests[jobid].path());
} else {
// Run test synchronously.
self.tests[jobid].state = State::Running;
let result = runone::run(self.tests[jobid].path());
self.finish_job(jobid, result);
}
self.new_tests = jobid + 1;
}
// Check for any asynchronous replies without blocking.
while let Some(reply) = self.threads.as_mut().and_then(ConcurrentRunner::try_get) {
self.handle_reply(reply);
}
}
/// Report the end of a job.
fn finish_job(&mut self, jobid: usize, result: TestResult) {
assert_eq!(self.tests[jobid].state, State::Running);
if let Err(ref e) = result {
self.job_error(jobid, e);
}
self.tests[jobid].state = State::Done(result);
if jobid == self.finished_tests {
while let Some(&QueueEntry { state: State::Done(_), .. }) = self.tests
.get(self.finished_tests) {
self.finished_tests += 1;
}
}
}
/// Handle a reply from the async threads.
fn handle_reply(&mut self, reply: Reply) {
match reply {
Reply::Starting { jobid, .. } => {
assert_eq!(self.tests[jobid].state, State::Queued);
self.tests[jobid].state = State::Running;
}
Reply::Done { jobid, result } => {
self.ticks_since_progress = 0;
self.finish_job(jobid, result)
}
Reply::Tick => {
self.ticks_since_progress += 1;
if self.ticks_since_progress == TIMEOUT_SLOW {
println!("STALLED for {} seconds with {}/{} tests finished",
self.ticks_since_progress,
self.finished_tests,
self.tests.len());
for jobid in self.finished_tests..self.tests.len() {
if self.tests[jobid].state == State::Running {
println!("slow: {}", self.tests[jobid].path.to_string_lossy());
}
}
}
if self.ticks_since_progress >= TIMEOUT_PANIC {
panic!("worker threads stalled for {} seconds.",
self.ticks_since_progress);
}
}
}
}
/// Drain the async jobs and shut down the threads.
fn drain_threads(&mut self) {
if let Some(mut conc) = self.threads.take() {
conc.shutdown();
while self.finished_tests < self.tests.len() {
match conc.get() {
Some(reply) => self.handle_reply(reply),
None => break,
}
}
conc.join();
} }
} }
@@ -178,6 +248,7 @@ impl TestRunner {
pub fn run(&mut self) -> CommandResult { pub fn run(&mut self) -> CommandResult {
self.scan_dirs(); self.scan_dirs();
self.schedule_jobs(); self.schedule_jobs();
self.drain_threads();
println!("{} tests", self.tests.len()); println!("{} tests", self.tests.len());
match self.errors { match self.errors {
0 => Ok(()), 0 => Ok(()),

View File

@@ -4,6 +4,7 @@ extern crate cton_reader;
extern crate docopt; extern crate docopt;
extern crate rustc_serialize; extern crate rustc_serialize;
extern crate filecheck; extern crate filecheck;
extern crate num_cpus;
use cretonne::VERSION; use cretonne::VERSION;
use docopt::Docopt; use docopt::Docopt;