Skip to main content

tokio/runtime/scheduler/multi_thread/
idle.rs

1//! Coordinates idling workers
2
3use crate::loom::sync::atomic::AtomicUsize;
4use crate::runtime::scheduler::multi_thread::Shared;
5
6use std::fmt;
7use std::sync::atomic::Ordering::{self, SeqCst};
8
9pub(crate) struct Idle {
10    /// Tracks both the number of searching workers and the number of unparked
11    /// workers.
12    ///
13    /// Used as a fast-path to avoid acquiring the lock when needed.
14    state: AtomicUsize,
15
16    /// Total number of workers.
17    num_workers: usize,
18}
19
20/// Data synchronized by the scheduler mutex
21pub(crate) struct Synced {
22    /// Sleeping workers
23    sleepers: Vec<usize>,
24}
25
26pub(super) struct TransitionToParked {
27    pub(super) is_last_searcher: bool,
28    pub(super) any_lifo: bool,
29}
30
31const UNPARK_SHIFT: usize = (usize::BITS as usize / 2) - 2;
32const ANY_LIFO: usize = 1 << (usize::BITS - 1);
33const UNPARK_MASK: usize = !(SEARCH_MASK | ANY_LIFO);
34const SEARCH_MASK: usize = (1 << UNPARK_SHIFT) - 1;
35
36#[derive(Copy, Clone)]
37struct State(usize);
38
39impl Idle {
40    pub(crate) fn new(num_workers: usize) -> (Idle, Synced) {
41        assert!(
42            num_workers <= UNPARK_MASK,
43            "{num_workers} is too many workers (max is {UNPARK_MASK})"
44        );
45        let init = State::new(num_workers);
46
47        let idle = Idle {
48            state: AtomicUsize::new(init.into()),
49            num_workers,
50        };
51
52        let synced = Synced {
53            sleepers: Vec::with_capacity(num_workers),
54        };
55
56        (idle, synced)
57    }
58
59    /// If there are no workers actively searching, returns the index of a
60    /// worker currently sleeping.
61    pub(super) fn worker_to_notify(&self, shared: &Shared) -> Option<usize> {
62        // If at least one worker is spinning, work being notified will
63        // eventually be found. A searching thread will find **some** work and
64        // notify another worker, eventually leading to our work being found.
65        //
66        // For this to happen, this load must happen before the thread
67        // transitioning `num_searching` to zero. Acquire / Release does not
68        // provide sufficient guarantees, so this load is done with `SeqCst` and
69        // will pair with the `fetch_sub(1)` when transitioning out of
70        // searching.
71        if !self.notify_should_wakeup() {
72            return None;
73        }
74
75        // Acquire the lock
76        let mut lock = shared.synced.lock();
77
78        // Check again, now that the lock is acquired
79        if !self.notify_should_wakeup() {
80            return None;
81        }
82
83        // A worker should be woken up, atomically increment the number of
84        // searching workers as well as the number of unparked workers.
85        State::unpark_one(&self.state, 1);
86
87        // Get the worker to unpark
88        let ret = lock.idle.sleepers.pop();
89        debug_assert!(ret.is_some());
90
91        ret
92    }
93
94    /// Returns `true` if the worker needs to do a final check for submitted
95    /// work.
96    pub(super) fn transition_worker_to_parked(
97        &self,
98        shared: &Shared,
99        worker: usize,
100        is_searching: bool,
101    ) -> TransitionToParked {
102        // Acquire the lock
103        let mut lock = shared.synced.lock();
104
105        // Decrement the number of unparked threads
106        let ret = State::dec_num_unparked(&self.state, is_searching);
107
108        // Track the sleeping worker
109        lock.idle.sleepers.push(worker);
110
111        ret
112    }
113
114    pub(super) fn transition_worker_to_searching(&self) -> bool {
115        let state = State::load(&self.state, SeqCst);
116        if 2 * state.num_searching() >= self.num_workers {
117            return false;
118        }
119
120        // It is possible for this routine to allow more than 50% of the workers
121        // to search. That is OK. Limiting searchers is only an optimization to
122        // prevent too much contention.
123        State::inc_num_searching(&self.state, SeqCst);
124        true
125    }
126
127    /// A lightweight transition from searching -> running.
128    ///
129    /// Returns `true` if this is the final searching worker. The caller
130    /// **must** notify a new worker.
131    pub(super) fn transition_worker_from_searching(&self) -> bool {
132        State::dec_num_searching(&self.state)
133    }
134
135    /// Unpark a specific worker. This happens if tasks are submitted from
136    /// within the worker's park routine.
137    ///
138    /// Returns `true` if the worker was parked before calling the method.
139    pub(super) fn unpark_worker_by_id(&self, shared: &Shared, worker_id: usize) -> bool {
140        let mut lock = shared.synced.lock();
141        let sleepers = &mut lock.idle.sleepers;
142
143        for index in 0..sleepers.len() {
144            if sleepers[index] == worker_id {
145                sleepers.swap_remove(index);
146
147                // Update the state accordingly while the lock is held.
148                State::unpark_one(&self.state, 0);
149
150                return true;
151            }
152        }
153
154        false
155    }
156
157    pub(crate) fn put_lifo(&self) -> bool {
158        State(self.state.fetch_or(ANY_LIFO, SeqCst)).any_lifo()
159    }
160
161    pub(crate) fn clear_lifo(&self) {
162        self.state.fetch_and(!ANY_LIFO, SeqCst);
163    }
164
165    pub(crate) fn should_attempt_lifo_steal(&self) -> bool {
166        let state = State(self.state.fetch_add(0, SeqCst));
167        state.any_lifo()
168    }
169
170    /// Returns `true` if `worker_id` is contained in the sleep set.
171    pub(super) fn is_parked(&self, shared: &Shared, worker_id: usize) -> bool {
172        let lock = shared.synced.lock();
173        lock.idle.sleepers.contains(&worker_id)
174    }
175
176    fn notify_should_wakeup(&self) -> bool {
177        let state = State(self.state.fetch_add(0, SeqCst));
178        state.num_searching() == 0 && state.num_unparked() < self.num_workers
179    }
180}
181
182impl State {
183    fn new(num_workers: usize) -> State {
184        // All workers start in the unparked state
185        let ret = State(num_workers << UNPARK_SHIFT);
186        debug_assert_eq!(num_workers, ret.num_unparked());
187        debug_assert_eq!(0, ret.num_searching());
188        ret
189    }
190
191    fn load(cell: &AtomicUsize, ordering: Ordering) -> State {
192        State(cell.load(ordering))
193    }
194
195    fn unpark_one(cell: &AtomicUsize, num_searching: usize) {
196        cell.fetch_add(num_searching | (1 << UNPARK_SHIFT), SeqCst);
197    }
198
199    fn inc_num_searching(cell: &AtomicUsize, ordering: Ordering) {
200        cell.fetch_add(1, ordering);
201    }
202
203    /// Returns `true` if this is the final searching worker
204    fn dec_num_searching(cell: &AtomicUsize) -> bool {
205        let state = State(cell.fetch_sub(1, SeqCst));
206        state.num_searching() == 1
207    }
208
209    /// Track a sleeping worker
210    ///
211    /// Returns `true` if this is the final searching worker.
212    fn dec_num_unparked(cell: &AtomicUsize, is_searching: bool) -> TransitionToParked {
213        let mut dec = 1 << UNPARK_SHIFT;
214
215        if is_searching {
216            dec += 1;
217        }
218
219        let prev = State(cell.fetch_sub(dec, SeqCst));
220        let is_last_searcher = is_searching && prev.num_searching() == 1;
221        TransitionToParked {
222            is_last_searcher,
223            any_lifo: prev.any_lifo(),
224        }
225    }
226
227    /// Number of workers currently searching
228    fn num_searching(self) -> usize {
229        self.0 & SEARCH_MASK
230    }
231
232    /// Number of workers currently unparked
233    fn num_unparked(self) -> usize {
234        (self.0 & UNPARK_MASK) >> UNPARK_SHIFT
235    }
236
237    fn any_lifo(self) -> bool {
238        self.0 & ANY_LIFO == ANY_LIFO
239    }
240}
241
242impl From<usize> for State {
243    fn from(src: usize) -> State {
244        State(src)
245    }
246}
247
248impl From<State> for usize {
249    fn from(src: State) -> usize {
250        src.0
251    }
252}
253
254impl fmt::Debug for State {
255    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
256        fmt.debug_struct("worker::State")
257            .field("num_unparked", &self.num_unparked())
258            .field("num_searching", &self.num_searching())
259            .finish()
260    }
261}
262
263#[test]
264fn test_state() {
265    assert_eq!(0, UNPARK_MASK & SEARCH_MASK);
266    assert_eq!(0, !(UNPARK_MASK | SEARCH_MASK));
267
268    let state = State::new(10);
269    assert_eq!(10, state.num_unparked());
270    assert_eq!(0, state.num_searching());
271}
272
273#[test]
274fn masks() {
275    println!("UNPARK_SHIFT = {UNPARK_SHIFT}");
276    println!("UNPARK_MASK  = {UNPARK_MASK:064b}");
277    println!("SEARCH_MASK  = {SEARCH_MASK:064b}");
278    println!("ANY_LIFO     = {ANY_LIFO:064b}");
279}