diff --git a/crates/runtime/src/parking_spot.rs b/crates/runtime/src/parking_spot.rs index dee1019b22..33f9bce0ff 100644 --- a/crates/runtime/src/parking_spot.rs +++ b/crates/runtime/src/parking_spot.rs @@ -104,17 +104,53 @@ impl ParkingSpot { let spot = inner.get_mut(&key).expect("failed to get spot"); if timed_out { - if let Some(timeout) = timeout { - if Instant::now() < timeout { - // Did not sleep long enough, try again. - continue; - } - } - } else { - if spot.to_unpark == 0 { + // If waiting on the cvar timed out then due to how system cvars + // are implemented we may need to continue to sleep longer. If + // the deadline has not been reached then turn the crank again + // and go back to sleep. + if Instant::now() < timeout.unwrap() { continue; } + // Opportunistically consume `to_unpark` signals even on + // timeout. From the perspective of `unpark` this "agent" raced + // between its own timeout and receiving the unpark signal, but + // from unpark's perspective it's definitely going to wake up N + // agents as returned from the `unpark` return value. + // + // Note that this may actually prevent other threads from + // getting unparked. For example: + // + // * Thread A parks with a timeout + // * Thread B parks with no timeout + // * Thread C decides to unpark 1 thread + // * Thread A's cvar wakes up due to a timeout, blocks on the + // lock + // * Thread C finishes unpark and signals the cvar once + // * Thread B wakes up + // * Thread A and B contend for the lock and A wins + // * A consumes the "to_unpark" value + // * B goes back to sleep since `to_unpark == 0`, thinking that + // a spurious wakeup happened. + // + // It's believed that this is ok, however, since from C's + // perspective one agent was still woken up and is allowed to + // continue, notably A in this case. C doesn't know that A raced + // with B and "stole" its wakeup signal. + if spot.to_unpark > 0 { + spot.to_unpark -= 1; + } + } else { + if spot.to_unpark == 0 { + // If no timeout happen but nothing has unparked this spot (as + // signaled through `to_unpark`) then this is indicative of a + // spurious wakeup. In this situation turn the crank again and + // go back to sleep as this interface doesn't allow for spurious + // wakeups. + continue; + } + // No timeout happened, and some other thread registered to + // unpark this thread, so consume one unpark notification. spot.to_unpark -= 1; } @@ -176,55 +212,53 @@ impl ParkingSpot { #[cfg(test)] mod tests { use super::ParkingSpot; - use once_cell::sync::Lazy; use std::ptr::addr_of; use std::sync::atomic::{AtomicU64, Ordering}; use std::thread; - - static PARKING_SPOT: Lazy = Lazy::new(ParkingSpot::default); - - static ATOMIC: AtomicU64 = AtomicU64::new(0); + use std::time::{Duration, Instant}; #[test] fn atomic_wait_notify() { - let thread1 = thread::spawn(|| { - let atomic_key = addr_of!(ATOMIC) as u64; - ATOMIC.store(1, Ordering::SeqCst); - PARKING_SPOT.unpark(atomic_key, u32::MAX); - PARKING_SPOT.park(atomic_key, || ATOMIC.load(Ordering::SeqCst) == 1, None); - }); + let parking_spot = &ParkingSpot::default(); + let atomic = &AtomicU64::new(0); - let thread2 = thread::spawn(|| { - let atomic_key = addr_of!(ATOMIC) as u64; - while ATOMIC.load(Ordering::SeqCst) != 1 { - PARKING_SPOT.park(atomic_key, || ATOMIC.load(Ordering::SeqCst) != 1, None); + thread::scope(|s| { + let atomic_key = addr_of!(atomic) as u64; + let thread1 = s.spawn(move || { + atomic.store(1, Ordering::SeqCst); + parking_spot.unpark(atomic_key, u32::MAX); + parking_spot.park(atomic_key, || atomic.load(Ordering::SeqCst) == 1, None); + }); + + let thread2 = s.spawn(move || { + while atomic.load(Ordering::SeqCst) != 1 { + parking_spot.park(atomic_key, || atomic.load(Ordering::SeqCst) != 1, None); + } + atomic.store(2, Ordering::SeqCst); + parking_spot.unpark(atomic_key, u32::MAX); + parking_spot.park(atomic_key, || atomic.load(Ordering::SeqCst) == 2, None); + }); + + let thread3 = s.spawn(move || { + while atomic.load(Ordering::SeqCst) != 2 { + parking_spot.park(atomic_key, || atomic.load(Ordering::SeqCst) != 2, None); + } + atomic.store(3, Ordering::SeqCst); + parking_spot.unpark(atomic_key, u32::MAX); + + parking_spot.park(atomic_key, || atomic.load(Ordering::SeqCst) == 3, None); + }); + + while atomic.load(Ordering::SeqCst) != 3 { + parking_spot.park(atomic_key, || atomic.load(Ordering::SeqCst) != 3, None); } - ATOMIC.store(2, Ordering::SeqCst); - PARKING_SPOT.unpark(atomic_key, u32::MAX); - PARKING_SPOT.park(atomic_key, || ATOMIC.load(Ordering::SeqCst) == 2, None); + atomic.store(4, Ordering::SeqCst); + parking_spot.unpark(atomic_key, u32::MAX); + + thread1.join().unwrap(); + thread2.join().unwrap(); + thread3.join().unwrap(); }); - - let thread3 = thread::spawn(|| { - let atomic_key = addr_of!(ATOMIC) as u64; - while ATOMIC.load(Ordering::SeqCst) != 2 { - PARKING_SPOT.park(atomic_key, || ATOMIC.load(Ordering::SeqCst) != 2, None); - } - ATOMIC.store(3, Ordering::SeqCst); - PARKING_SPOT.unpark(atomic_key, u32::MAX); - - PARKING_SPOT.park(atomic_key, || ATOMIC.load(Ordering::SeqCst) == 3, None); - }); - - let atomic_key = addr_of!(ATOMIC) as u64; - while ATOMIC.load(Ordering::SeqCst) != 3 { - PARKING_SPOT.park(atomic_key, || ATOMIC.load(Ordering::SeqCst) != 3, None); - } - ATOMIC.store(4, Ordering::SeqCst); - PARKING_SPOT.unpark(atomic_key, u32::MAX); - - thread1.join().unwrap(); - thread2.join().unwrap(); - thread3.join().unwrap(); } mod parking_lot { @@ -302,47 +336,53 @@ mod tests { num_threads: u32, num_single_unparks: u32, ) { - let mut tests = Vec::with_capacity(num_latches); + let spot = ParkingSpot::default(); - for _ in 0..num_latches { - let test = Arc::new(SingleLatchTest::new(num_threads)); - let mut threads = Vec::with_capacity(num_threads as _); - for _ in 0..num_threads { - let test = test.clone(); - threads.push(thread::spawn(move || test.run())); - } - tests.push((test, threads)); - } + thread::scope(|s| { + let mut tests = Vec::with_capacity(num_latches); - for unpark_index in 0..num_single_unparks { - thread::sleep(delay); - for (test, _) in &tests { - test.unpark_one(unpark_index); + for _ in 0..num_latches { + let test = Arc::new(SingleLatchTest::new(num_threads, &spot)); + let mut threads = Vec::with_capacity(num_threads as _); + for _ in 0..num_threads { + let test = test.clone(); + threads.push(s.spawn(move || test.run())); + } + tests.push((test, threads)); } - } - for (test, threads) in tests { - test.finish(num_single_unparks); - for thread in threads { - thread.join().expect("Test thread panic"); + for unpark_index in 0..num_single_unparks { + thread::sleep(delay); + for (test, _) in &tests { + test.unpark_one(unpark_index); + } } - } + + for (test, threads) in tests { + test.finish(num_single_unparks); + for thread in threads { + thread.join().expect("Test thread panic"); + } + } + }); } - struct SingleLatchTest { + struct SingleLatchTest<'a> { semaphore: AtomicIsize, num_awake: AtomicU32, /// Total number of threads participating in this test. num_threads: u32, + spot: &'a ParkingSpot, } - impl SingleLatchTest { - pub fn new(num_threads: u32) -> Self { + impl<'a> SingleLatchTest<'a> { + pub fn new(num_threads: u32, spot: &'a ParkingSpot) -> Self { Self { // This implements a fair (FIFO) semaphore, and it starts out unavailable. semaphore: AtomicIsize::new(0), num_awake: AtomicU32::new(0), num_threads, + spot, } } @@ -373,14 +413,14 @@ mod tests { // still be threads that has not yet parked. while num_threads_left > 0 { let mut num_waiting_on_address = 0; - PARKING_SPOT.with_lot(self.semaphore_addr(), |thread_data| { + self.spot.with_lot(self.semaphore_addr(), |thread_data| { num_waiting_on_address = thread_data.num_parked; }); assert!(num_waiting_on_address <= num_threads_left); let num_awake_before_unpark = self.num_awake.load(Ordering::SeqCst); - let num_unparked = PARKING_SPOT.unpark(self.semaphore_addr(), u32::MAX); + let num_unparked = self.spot.unpark(self.semaphore_addr(), u32::MAX); assert!(num_unparked >= num_waiting_on_address); assert!(num_unparked <= num_threads_left); @@ -398,7 +438,7 @@ mod tests { // Make sure no thread is parked on our semaphore address let mut num_waiting_on_address = 0; - PARKING_SPOT.with_lot(self.semaphore_addr(), |thread_data| { + self.spot.with_lot(self.semaphore_addr(), |thread_data| { num_waiting_on_address = thread_data.num_parked; }); assert_eq!(num_waiting_on_address, 0); @@ -414,7 +454,7 @@ mod tests { // We need to wait. let validate = || true; - PARKING_SPOT.park(self.semaphore_addr(), validate, None); + self.spot.park(self.semaphore_addr(), validate, None); } pub fn up(&self) { @@ -426,7 +466,7 @@ mod tests { // the thread we want to pass ownership to has decremented the semaphore counter, // but not yet parked. loop { - match PARKING_SPOT.unpark(self.semaphore_addr(), 1) { + match self.spot.unpark(self.semaphore_addr(), 1) { 1 => break, 0 => (), i => panic!("Should not wake up {i} threads"), @@ -440,4 +480,42 @@ mod tests { } } } + + #[test] + fn wait_with_timeout() { + let parking_spot = &ParkingSpot::default(); + let atomic = &AtomicU64::new(0); + + thread::scope(|s| { + let atomic_key = addr_of!(atomic) as u64; + + const N: u64 = 5; + const M: u64 = 1000; + + let thread = s.spawn(move || { + while atomic.load(Ordering::SeqCst) != N * M { + let timeout = Instant::now() + Duration::from_millis(1); + parking_spot.park( + atomic_key, + || atomic.load(Ordering::SeqCst) != N * M, + Some(timeout), + ); + } + }); + + let mut threads = vec![thread]; + for _ in 0..N { + threads.push(s.spawn(move || { + for _ in 0..M { + atomic.fetch_add(1, Ordering::SeqCst); + parking_spot.unpark(atomic_key, 1); + } + })); + } + + for thread in threads { + thread.join().unwrap(); + } + }); + } }