tokio/runtime/scheduler/multi_thread/
idle.rs1use crate::loom::sync::atomic::AtomicUsize;
4use crate::runtime::scheduler::multi_thread::Shared;
5
6use std::fmt;
7use std::sync::atomic::Ordering::{self, SeqCst};
8
9pub(crate) struct Idle {
10 state: AtomicUsize,
15
16 num_workers: usize,
18}
19
20pub(crate) struct Synced {
22 sleepers: Vec<usize>,
24}
25
26pub(super) struct TransitionToParked {
27 pub(super) is_last_searcher: bool,
28 pub(super) any_lifo: bool,
29}
30
31const UNPARK_SHIFT: usize = (usize::BITS as usize / 2) - 2;
32const ANY_LIFO: usize = 1 << (usize::BITS - 1);
33const UNPARK_MASK: usize = !(SEARCH_MASK | ANY_LIFO);
34const SEARCH_MASK: usize = (1 << UNPARK_SHIFT) - 1;
35
36#[derive(Copy, Clone)]
37struct State(usize);
38
39impl Idle {
40 pub(crate) fn new(num_workers: usize) -> (Idle, Synced) {
41 assert!(
42 num_workers <= UNPARK_MASK,
43 "{num_workers} is too many workers (max is {UNPARK_MASK})"
44 );
45 let init = State::new(num_workers);
46
47 let idle = Idle {
48 state: AtomicUsize::new(init.into()),
49 num_workers,
50 };
51
52 let synced = Synced {
53 sleepers: Vec::with_capacity(num_workers),
54 };
55
56 (idle, synced)
57 }
58
59 pub(super) fn worker_to_notify(&self, shared: &Shared) -> Option<usize> {
62 if !self.notify_should_wakeup() {
72 return None;
73 }
74
75 let mut lock = shared.synced.lock();
77
78 if !self.notify_should_wakeup() {
80 return None;
81 }
82
83 State::unpark_one(&self.state, 1);
86
87 let ret = lock.idle.sleepers.pop();
89 debug_assert!(ret.is_some());
90
91 ret
92 }
93
94 pub(super) fn transition_worker_to_parked(
97 &self,
98 shared: &Shared,
99 worker: usize,
100 is_searching: bool,
101 ) -> TransitionToParked {
102 let mut lock = shared.synced.lock();
104
105 let ret = State::dec_num_unparked(&self.state, is_searching);
107
108 lock.idle.sleepers.push(worker);
110
111 ret
112 }
113
114 pub(super) fn transition_worker_to_searching(&self) -> bool {
115 let state = State::load(&self.state, SeqCst);
116 if 2 * state.num_searching() >= self.num_workers {
117 return false;
118 }
119
120 State::inc_num_searching(&self.state, SeqCst);
124 true
125 }
126
127 pub(super) fn transition_worker_from_searching(&self) -> bool {
132 State::dec_num_searching(&self.state)
133 }
134
135 pub(super) fn unpark_worker_by_id(&self, shared: &Shared, worker_id: usize) -> bool {
140 let mut lock = shared.synced.lock();
141 let sleepers = &mut lock.idle.sleepers;
142
143 for index in 0..sleepers.len() {
144 if sleepers[index] == worker_id {
145 sleepers.swap_remove(index);
146
147 State::unpark_one(&self.state, 0);
149
150 return true;
151 }
152 }
153
154 false
155 }
156
157 pub(crate) fn put_lifo(&self) -> bool {
158 State(self.state.fetch_or(ANY_LIFO, SeqCst)).any_lifo()
159 }
160
161 pub(crate) fn clear_lifo(&self) {
162 self.state.fetch_and(!ANY_LIFO, SeqCst);
163 }
164
165 pub(crate) fn should_attempt_lifo_steal(&self) -> bool {
166 let state = State(self.state.fetch_add(0, SeqCst));
167 state.any_lifo()
168 }
169
170 pub(super) fn is_parked(&self, shared: &Shared, worker_id: usize) -> bool {
172 let lock = shared.synced.lock();
173 lock.idle.sleepers.contains(&worker_id)
174 }
175
176 fn notify_should_wakeup(&self) -> bool {
177 let state = State(self.state.fetch_add(0, SeqCst));
178 state.num_searching() == 0 && state.num_unparked() < self.num_workers
179 }
180}
181
182impl State {
183 fn new(num_workers: usize) -> State {
184 let ret = State(num_workers << UNPARK_SHIFT);
186 debug_assert_eq!(num_workers, ret.num_unparked());
187 debug_assert_eq!(0, ret.num_searching());
188 ret
189 }
190
191 fn load(cell: &AtomicUsize, ordering: Ordering) -> State {
192 State(cell.load(ordering))
193 }
194
195 fn unpark_one(cell: &AtomicUsize, num_searching: usize) {
196 cell.fetch_add(num_searching | (1 << UNPARK_SHIFT), SeqCst);
197 }
198
199 fn inc_num_searching(cell: &AtomicUsize, ordering: Ordering) {
200 cell.fetch_add(1, ordering);
201 }
202
203 fn dec_num_searching(cell: &AtomicUsize) -> bool {
205 let state = State(cell.fetch_sub(1, SeqCst));
206 state.num_searching() == 1
207 }
208
209 fn dec_num_unparked(cell: &AtomicUsize, is_searching: bool) -> TransitionToParked {
213 let mut dec = 1 << UNPARK_SHIFT;
214
215 if is_searching {
216 dec += 1;
217 }
218
219 let prev = State(cell.fetch_sub(dec, SeqCst));
220 let is_last_searcher = is_searching && prev.num_searching() == 1;
221 TransitionToParked {
222 is_last_searcher,
223 any_lifo: prev.any_lifo(),
224 }
225 }
226
227 fn num_searching(self) -> usize {
229 self.0 & SEARCH_MASK
230 }
231
232 fn num_unparked(self) -> usize {
234 (self.0 & UNPARK_MASK) >> UNPARK_SHIFT
235 }
236
237 fn any_lifo(self) -> bool {
238 self.0 & ANY_LIFO == ANY_LIFO
239 }
240}
241
242impl From<usize> for State {
243 fn from(src: usize) -> State {
244 State(src)
245 }
246}
247
248impl From<State> for usize {
249 fn from(src: State) -> usize {
250 src.0
251 }
252}
253
254impl fmt::Debug for State {
255 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
256 fmt.debug_struct("worker::State")
257 .field("num_unparked", &self.num_unparked())
258 .field("num_searching", &self.num_searching())
259 .finish()
260 }
261}
262
263#[test]
264fn test_state() {
265 assert_eq!(0, UNPARK_MASK & SEARCH_MASK);
266 assert_eq!(0, !(UNPARK_MASK | SEARCH_MASK));
267
268 let state = State::new(10);
269 assert_eq!(10, state.num_unparked());
270 assert_eq!(0, state.num_searching());
271}
272
273#[test]
274fn masks() {
275 println!("UNPARK_SHIFT = {UNPARK_SHIFT}");
276 println!("UNPARK_MASK = {UNPARK_MASK:064b}");
277 println!("SEARCH_MASK = {SEARCH_MASK:064b}");
278 println!("ANY_LIFO = {ANY_LIFO:064b}");
279}