tokio::sync::watch as an observable parameter
A couple of pull requests landed recently in tokio (one, two) which make tokio’s watch
channel more flexible in versions ≥ 1.13.0. It’s now very easy to use if you have some sort of “configuration provider” in your program with a varying number of subscribers who need to be kept up-to-date as that value changes.
If you’re not familiar with it, the idea of this channel is that some task uses the Sender
to update a value over time. As it changes you want receivers to be kept up-to-date with the latest version of that value. It’s similar to a broadcast channel except receivers don’t get their own copies and they’re not guaranteed to see every intermediate value if it’s changing quickly. The Receiver
API includes an async method changed()
which will tell you when there’s a new value.
Until recently, using this for the “configuration watcher” use case was a little tricky because the only way to get an extra Receiver
was to clone it from an existing one. This meant that the sending side had to keep an extra Receiver
lying around just so it could be cloned for new subscribers. This was fixed by exposing a subscribe()
method on the Sender
so that you could create a new Receiver
at will, even if the existing Receiver
count had dropped to zero.
Then there was a second paper cut where the send()
method for placing a new value in the channel would fail with an error if there were no receivers currently existing. For the configuration-watcher use case you don’t generally care if there aren’t any subscribers. You just want to update the value anyway. This error could be handled by recreating the entire channel, which works if you have mutable access but it’s a bit of a pain. This was fixed as part of a new send_replace()
method, which both returns the previous value and doesn’t care if there are no subscribers.
Now that both of those issues have been fixed you can write nice code like this:
use tokio::sync::watch;
use futures::future::FutureExt;
#[tokio::main]
async fn main() {
// Create a watch channel and immediately discard the receiver
let (option_tx, _) = watch::channel(5u32);
// User changes the value to 6
option_tx.send_replace(6u32);
// Now some task wants to track it
let mut option_rx = option_tx.subscribe();
// It sees the latest value
assert_eq!(*option_rx.borrow(), 6u32);
// It can also await changes, but there won't be anything yet
if let Some(_) = option_rx.changed().now_or_never() {
panic!("Shouldn't happen");
}
// You can have as many subscribers as you want
let mut option_rx_2 = option_tx.subscribe();
// Change the setting again
option_tx.send_replace(10);
// Subscribers will learn about it
assert!(option_rx.changed().await.is_ok());
assert!(option_rx_2.changed().await.is_ok());
assert_eq!(*option_rx.borrow(), 10);
assert_eq!(*option_rx_2.borrow(), 10);
println!("Success");
}
Of course the channel is much more abstract than a “setting” or “configuration value”. I’m using that as a motivating example but this applies to any situation where a value generated in one component will change over time and other components need to respond to that immediately.