Joining threads immediately as they finish in Rust
As of Rust 1.87.0, if you want to manage multiple threads using exclusively the standard library, joining them immediately as they finish (in case the order in which they finish is not deterministic) is not a straightforward task.
This post isn’t intended as criticism of Rust’s standard library. The absence of this feature may be an intentional design decision or simply a matter of priorities. Regardless, since thread joining is a common need, I’ll document my solution for future reference.
When you spawn threads, you get back a JoinHandle
(this is true for regular and scoped threads, and they are equivalent for the purposes of this post, see Appendix A). Then, you’ll need to call JoinHandle.join
to join each of those threads individually. The problem is that JoinHandle.join
blocks until the thread is finished. If you spawn multiple threads, which happens more often than not, there’s no way to wait on all of them simultaneously, and process each of them immediately as they finish.
How does it work in Python?
Let’s see how it can be done in another language, and then try to achieve something similar in Rust.
In Python, there is the ThreadPoolExecutor
. As you spawn threads using ThreadPoolExecutor.submit
, you get Future
s back, which you can store in an iterable. You can pass that iterable into the concurrent.futures.as_completed
function, and it will yield those futures as they finish. You can process each future individually right then and there, and then immediately start waiting for the next one to finish.
1 import concurrent.futures
2 import time
3 from concurrent.futures import ThreadPoolExecutor
4
5
6 # I'll call this function from my threads.
7 def target_fn(sleep_duration: int) -> int:
8 print(f"Sleeping for {sleep_duration} seconds.")
9 time.sleep(sleep_duration)
10 return sleep_duration
11
12
13 start_time = time.monotonic()
14 # Using `ThreadPoolExecutor` as a context manager guarantees that all threads
15 # will be joined before the context manager's scope ends. This is very similar
16 # to Rust's scoped threads.
17 with ThreadPoolExecutor() as executor:
18 # Collect the futures into a set.
19 futures = {
20 executor.submit(target_fn, 5),
21 executor.submit(target_fn, 3),
22 executor.submit(target_fn, 1),
23 }
24 # Get the futures as they finish using `as_completed`.
25 for future in concurrent.futures.as_completed(futures):
26 time_elapsed = time.monotonic() - start_time
27 return_value = future.result()
28 print(
29 f"{time_elapsed:.2f}:"
30 f" Thread finished:"
31 f" slept for {return_value} seconds."
32 )
Running the code above with Python 3.13.2 yields this (with some variation regarding the timestamps):
Sleeping for 5 seconds.
Sleeping for 3 seconds.
Sleeping for 1 seconds.
1.00: Thread finished: slept for 1 seconds.
3.01: Thread finished: slept for 3 seconds.
5.01: Thread finished: slept for 5 seconds.
You can tell from the timestamps in the last few lines that each thread was processed as soon as it finished.
Doing the same thing in Rust
There’s nothing like concurrent.futures.as_completed
in Rust’s standard library. JoinHandle.join
will block until each thread finishes, you’ll need to call it once for each thread, and you don’t know when each thread will finish.
The method JoinHandle.is_finished
can be used to check whether a thread has already finished, and it will always return immediately with a boolean. Therefore, it is possible to use it with a looping sleep to keep watch for when threads finish. But you can’t make it happen without that loop.
Now, all that said, I’m sure this is a solved problem. There must be a crate somewhere that solves this exact problem in an elegant and performant way. But I decided to tackle this problem myself as an opportunity to deepen my understanding of Rust’s threading model.
First, I’ll implement a Rust program that manages several threads using the looping approach:
1 use std::thread;
2 use std::time::{Duration, Instant};
3
4 // I'll call this function from my threads.
5 fn target_fn(sleep_duration: u64) -> u64 {
6 println!("Sleeping for {sleep_duration} seconds.");
7 thread::sleep(Duration::from_secs(sleep_duration));
8 sleep_duration
9 }
10
11 fn main() {
12 // Spawn the threads and put all the handles in a vector.
13 let mut handles = vec![
14 thread::spawn(|| target_fn(5)),
15 thread::spawn(|| target_fn(3)),
16 thread::spawn(|| target_fn(1)),
17 ];
18
19 let start_time = Instant::now();
20 while !handles.is_empty() {
21 let mut i = 0;
22
23 while i < handles.len() {
24 // Call `JoinHandle.is_finished` for each thread, until one of them is.
25 if handles[i].is_finished() {
26 let handle = handles.remove(i);
27 // Call `JoinHandle.join`, get the result, print it.
28 let sleep_duration = handle.join().unwrap();
29 let time_elapsed = start_time.elapsed().as_secs_f64();
30 println!("{time_elapsed:.2}: Thread finished: slept for {sleep_duration} seconds.");
31 } else {
32 i += 1;
33 }
34 }
35
36 // Sleep, the main thread yields control so the others can continue.
37 thread::sleep(Duration::from_millis(10));
38 }
39 }
Here’s the output from that program:
Sleeping for 5 seconds.
Sleeping for 1 seconds.
Sleeping for 3 seconds.
1.00: Thread finished: slept for 1 seconds.
3.00: Thread finished: slept for 3 seconds.
5.01: Thread finished: slept for 5 seconds.
That’s not terrible, but what if you don’t want to loop? I encountered this problem when writing fdintercept
.
“Self-shipping” threads in Rust
The solution that I found in Rust is this: let each spawned thread communicate to the parent thread when it has finished. This can be done via regular mpsc
channels, and thus the parent thread just needs to wait for something to arrive over the channel instead of looping.
First, I wrote a function that spawns a new thread inside an existing thread scope, but this is a special thread: it will send its own handle to the parent thread. In my code, I call it a “self-shipping thread”. The function also receives the target function that we want to run in the new thread.
Let’s see how this function is implemented below, and then we’ll go through it step by step.
7 pub fn spawn_self_shipping_thread_in_scope<'scope, F, R>(
8 scope: &'scope thread::Scope<'scope, '_>,
9 tx: mpsc::Sender<ScopedJoinHandle<'scope, R>>,
10 func: F,
11 ) where
12 F: FnOnce() -> R + Send + 'scope,
13 R: Send + 'scope,
14 {
15 // Create the channel that will be used to transfer the new thread's handle from the parent
16 // thread to the new thread.
17 let (handle_tx, handle_rx) = mpsc::channel();
18
19 // Spawn the new thread in the scope.
20 let handle = scope.spawn(move || {
21 // Execute the target function.
22 let result = func();
23
24 // Receive the handler that was sent by the parent thread to the new thread via the
25 // channel.
26 let handle = handle_rx.recv().unwrap();
27 // And immediately send it to the caller of `spawn_self_shipping_thread_in_scope`. It is
28 // responsibility of the caller to make sure that the `rx` side of this channel is alive
29 // until after this thread is finished.
30 tx.send(handle).unwrap();
31
32 // Return the same result as the target function.
33 result
34 });
35
36 // Send the new thread's handle into the new thread itself.
37 handle_tx.send(handle).unwrap();
38 }
The function receives three things: the thread scope in which the new threads will be created, the channel’s sending end which will be used the send the thread handle to the caller of spawn_self_shipping_thread_in_scope
, and the target function we want to run in the new thread.
The function uses two channels:
- From parent thread to new thread: sends the handle to the new thread (created in line 17, used in lines 26 and 37).
- From new thread to caller: returns the handle when the thread is finished (received from the caller in line 9, used in line 30).
The parent thread creates the first channel, spawns the new thread, and sends the handle to it. Meanwhile, the new thread executes the target function, receives its own handle via the first channel, and sends it back to the caller through the second channel before returning the function’s result.
This is how spawn_self_shipping_thread_in_scope
can be used:
40 // I'll call this function from my threads.
41 fn target_fn(sleep_duration: u64) -> u64 {
42 println!("Sleeping for {sleep_duration} seconds.");
43 thread::sleep(Duration::from_secs(sleep_duration));
44 sleep_duration
45 }
46
47 fn main() {
48 // Create the thread scope.
49 thread::scope(move |scope| {
50 // Create the channel that the new threads will use to send their handles to the main
51 // thread.
52 let (handle_tx, handle_rx) = mpsc::channel();
53
54 // Spawn the new threads.
55 spawn_self_shipping_thread_in_scope(scope, handle_tx.clone(), || target_fn(1));
56 spawn_self_shipping_thread_in_scope(scope, handle_tx.clone(), || target_fn(3));
57 spawn_self_shipping_thread_in_scope(scope, handle_tx.clone(), || target_fn(5));
58
59 // Drop this `handle_tx` so that when all the threads are finished and all the `handle_tx`
60 // clones are dropped, `handle_rx` will return `Err`.
61 drop(handle_tx);
62
63 let start_time = Instant::now();
64 // Receive the handle from the next thread that finishes.
65 while let Ok(handle) = handle_rx.recv() {
66 // Join the thread and get the result.
67 match handle.join() {
68 Ok(sleep_duration) => {
69 let time_elapsed = start_time.elapsed().as_secs_f64();
70 println!(
71 "{time_elapsed:.2}: Thread finished: slept for {sleep_duration} seconds."
72 );
73 }
74 Err(e) => eprintln!("Error joining thread: {e:?}"),
75 }
76 }
77 });
78 }
When spawning the threads, you need to create the scope and the channel, spawn the threads by calling spawn_self_shipping_thread_in_scope
while passing the scope, the handle_tx
side of the channel, and the target function, and then just wait on the handle_rx
side for the handles. As handles arrive, it is guaranteed that their threads have completed.
This is the result:
Sleeping for 1 seconds.
Sleeping for 3 seconds.
Sleeping for 5 seconds.
1.01: Thread finished: slept for 1 seconds.
3.00: Thread finished: slept for 3 seconds.
5.00: Thread finished: slept for 5 seconds.
But what if the thread panics?
But the code above has a fatal flaw: what if the thread panics? Then, it won’t send the handle to the parent thread. Although the thread will eventually be joined when the scope ends, it will bypass our custom processing logic.
Let’s fix that.
First, let’s write a test that proves that we have a problem and that we can later use to prove that we have fixed the issue.
87 #[test]
88 fn handles_panic() {
89 // Create the thread scope.
90 thread::scope(|scope| {
91 // And the channel.
92 let (tx, rx) = mpsc::channel();
93
94 // Spawn the self-shipping thread. Make it panic.
95 spawn_self_shipping_thread_in_scope(scope, tx, || {
96 panic!("Thread is panicking on purpose for testing");
97 });
98
99 // Ensure something was sent over the channel.
100 let handle = rx.recv().unwrap();
101 // Join the self-shipping thread.
102 let join_result = handle.join();
103 // And see that it errored with our custom panic message.
104 let err = join_result.unwrap_err();
105 let panic_msg = err.downcast_ref::<&str>().unwrap();
106 assert!(panic_msg.contains("Thread is panicking on purpose for testing"));
107 });
108 }
Running the test, this is what we get:
thread 'tests::spawn_self_shipping_thread_in_scope::handles_panic' panicked at src/main.rs:98:40:
called `Result::unwrap()` on an `Err` value: RecvError
The error comes from line 100: since nothing was ever sent into the channel before it was closed, calling recv
on it causes it to return an Err
.
So, how do we make the test pass?
The idea here is this: no matter what happens, we always want the thread handle to be sent to the caller of spawn_self_shipping_thread_in_scope
, even if the thread panics.
When a Rust function panics, it goes through a process called “unwinding”. During this process, everything in the thread stack is dropped as usual. Drop
implementations are still respected after panics! So we need to implement something that, when dropped, will send the handle through the channel.
What would such a thing look like? Let’s take a look.
7 // A struct with a `Drop` implementation to ensure the thread handle is sent to the caller of
8 // `spawn_self_shipping_thread_in_scope` even if the self-shipping thread panics.
9 struct SendOnDrop<'scope, R> {
10 handle: Option<ScopedJoinHandle<'scope, R>>,
11 tx: mpsc::Sender<ScopedJoinHandle<'scope, R>>,
12 }
13
14 impl<R> Drop for SendOnDrop<'_, R> {
15 fn drop(&mut self) {
16 if let Some(handle) = self.handle.take() {
17 self.tx.send(handle).unwrap();
18 }
19 }
20 }
The struct SendOnDrop
is something that owns the thread handle and tx
(the sending end of the channel created by the caller of spawn_self_shipping_thread_in_scope
). It implements the Drop
trait, which requires an implementation for the Drop.drop
method, and this is the method that runs when SendOnDrop
is dropped.
The Drop.drop
method will take ownership of the handle (that’s why its created as a Option<ScopedJoinHandle<'scope, R>>
on the struct, so we can take ownership of it and leave a None
in its place), and send it through the channel.
And here’s the version of spawn_self_shipping_thread_in_scope
that makes use of the SendOnDrop
struct:
22 pub fn spawn_self_shipping_thread_in_scope<'scope, F, R>(
23 scope: &'scope thread::Scope<'scope, '_>,
24 tx: mpsc::Sender<ScopedJoinHandle<'scope, R>>,
25 func: F,
26 ) where
27 F: FnOnce() -> R + Send + 'scope,
28 R: Send + 'scope,
29 {
30 // Create the channel that will be used to transfer the new thread's handle from the parent
31 // thread to the new thread.
32 let (handle_tx, handle_rx) = mpsc::channel();
33
34 // Spawn the new thread in the scope.
35 let handle = scope.spawn(move || {
36 // Receive the handler that was sent by the parent thread to the new thread via the
37 // channel.
38 let handle = handle_rx.recv().unwrap();
39
40 // This will send the new thread's handle to the caller of
41 // `spawn_self_shipping_thread_in_scope` when the thread stack is destroyed, even if that
42 // happens due to a panic.
43 SendOnDrop {
44 handle: Some(handle),
45 // It is responsibility of the caller to make sure that the `rx` side of this
46 // channel is alive until after this thread is finished.
47 tx,
48 };
49
50 // Execute the target function and return its result.
51 func()
52 });
53
54 // Send the new thread's handle into the new thread itself.
55 handle_tx.send(handle).unwrap();
56 }
The body of the self-shipping thread changed significantly: now, the first thing it does is get the handle from the parent thread. Then, it creates a SendOnDrop
struct on the stack. We don’t even need to give it a name: as long as it is sitting on the stack, it will be properly dropped in the event of a panic, or when the thread finishes.
Then, we run the target function and return its result.
And now, if we run the test from before, it passes! We now have self-shipping threads that we can act on immediately as they return, and they will still notify the caller of spawn_self_shipping_thread_in_scope
when they finish, even if that happens because of a panic.
Appendix A: Does it work with regular threads too?
This works with regular threads too, not only scoped threads. See the code below.
1 use std::{
2 sync::mpsc,
3 thread::{self, JoinHandle},
4 time::{Duration, Instant},
5 };
6
7 // A struct with a `Drop` implementation to ensure the thread handle is sent to the caller of
8 // `spawn_self_shipping_thread` even if the self-shipping thread panics.
9 struct SendOnDrop<R> {
10 handle: Option<JoinHandle<R>>,
11 tx: mpsc::Sender<JoinHandle<R>>,
12 }
13
14 impl<R> Drop for SendOnDrop<R> {
15 fn drop(&mut self) {
16 if let Some(handle) = self.handle.take() {
17 self.tx.send(handle).unwrap();
18 }
19 }
20 }
21
22 pub fn spawn_self_shipping_thread<F, R>(tx: mpsc::Sender<JoinHandle<R>>, func: F)
23 where
24 F: FnOnce() -> R + Send + 'static,
25 R: Send + 'static,
26 {
27 // Create the channel that will be used to transfer the new thread's handle from the parent
28 // thread to the new thread.
29 let (handle_tx, handle_rx) = mpsc::channel();
30
31 // Spawn the new thread in the scope.
32 let handle = std::thread::spawn(move || {
33 // Receive the handler that was sent by the parent thread to the new thread via the
34 // channel.
35 let handle = handle_rx.recv().unwrap();
36
37 // This will send the new thread's handle to the caller of
38 // `spawn_self_shipping_thread_in_scope` when the thread stack is destroyed, even if that
39 // happens due to a panic.
40 SendOnDrop {
41 handle: Some(handle),
42 // It is responsibility of the caller to make sure that the `rx` side of this channel
43 // is alive until after this thread is finished.
44 tx,
45 };
46
47 // Execute the target function and return its result.
48 func()
49 });
50
51 // Send the new thread's handle into the new thread itself.
52 handle_tx.send(handle).unwrap();
53 }
54
55 // I'll call this function from my threads.
56 fn target_fn(sleep_duration: u64) -> u64 {
57 println!("Sleeping for {sleep_duration} seconds.");
58 thread::sleep(Duration::from_secs(sleep_duration));
59 sleep_duration
60 }
61
62 fn main() {
63 // Create the channel that the new threads will use to send their handles to the main thread.
64 let (handle_tx, handle_rx) = mpsc::channel();
65
66 // Spawn the new threads.
67 spawn_self_shipping_thread(handle_tx.clone(), || target_fn(1));
68 spawn_self_shipping_thread(handle_tx.clone(), || target_fn(3));
69 spawn_self_shipping_thread(handle_tx.clone(), || target_fn(5));
70
71 // Drop this `handle_tx` so that when all the threads are finished and all the `handle_tx`
72 // clones are dropped, `handle_rx` will return `Err`.
73 drop(handle_tx);
74
75 let start_time = Instant::now();
76 // Receive the handle from the next thread that finishes.
77 while let Ok(handle) = handle_rx.recv() {
78 // Join the thread and get the result.
79 match handle.join() {
80 Ok(sleep_duration) => {
81 let time_elapsed = start_time.elapsed().as_secs_f64();
82 println!("{time_elapsed:.2}: Thread finished: slept for {sleep_duration} seconds.");
83 }
84 Err(e) => eprintln!("Error joining thread: {e:?}"),
85 }
86 }
87 }
88
89 #[cfg(test)]
90 mod tests {
91 use super::*;
92
93 mod spawn_self_shipping_thread {
94 use super::*;
95
96 #[test]
97 fn handles_panic() {
98 // Create the channel.
99 let (tx, rx) = mpsc::channel();
100
101 // Spawn the self-shipping thread. Make it panic.
102 spawn_self_shipping_thread(tx, || {
103 panic!("Thread is panicking on purpose for testing");
104 });
105
106 // Ensure something was sent over the channel.
107 let handle = rx.recv().unwrap();
108 // Join the self-shipping thread.
109 let join_result = handle.join();
110 // And see that it errored with our custom panic message.
111 let err = join_result.unwrap_err();
112 let panic_msg = err.downcast_ref::<&str>().unwrap();
113 assert!(panic_msg.contains("Thread is panicking on purpose for testing"));
114 }
115 }
116 }