tokio::sync::watch as an observable parameter

A couple of pull requests landed recently in tokio (one, two) which make tokio’s watch channel more flexible in versions ≥ 1.13.0. It’s now very easy to use if you have some sort of “configuration provider” in your program with a varying number of subscribers who need to be kept up-to-date as that value changes.

If you’re not familiar with it, the idea of this channel is that some task uses the Sender to update a value over time. As it changes you want receivers to be kept up-to-date with the latest version of that value. It’s similar to a broadcast channel except receivers don’t get their own copies and they’re not guaranteed to see every intermediate value if it’s changing quickly. The Receiver API includes an async method changed() which will tell you when there’s a new value.

Until recently, using this for the “configuration watcher” use case was a little tricky because the only way to get an extra Receiver was to clone it from an existing one. This meant that the sending side had to keep an extra Receiver lying around just so it could be cloned for new subscribers. This was fixed by exposing a subscribe() method on the Sender so that you could create a new Receiver at will, even if the existing Receiver count had dropped to zero.

Then there was a second paper cut where the send() method for placing a new value in the channel would fail with an error if there were no receivers currently existing. For the configuration-watcher use case you don’t generally care if there aren’t any subscribers. You just want to update the value anyway. This error could be handled by recreating the entire channel, which works if you have mutable access but it’s a bit of a pain. This was fixed as part of a new send_replace() method, which both returns the previous value and doesn’t care if there are no subscribers.

Now that both of those issues have been fixed you can write nice code like this:

use tokio::sync::watch;
use futures::future::FutureExt;

#[tokio::main]
async fn main() {
    // Create a watch channel and immediately discard the receiver
    let (option_tx, _) = watch::channel(5u32);

    // User changes the value to 6
    option_tx.send_replace(6u32);

    // Now some task wants to track it
    let mut option_rx = option_tx.subscribe();
    // It sees the latest value
    assert_eq!(*option_rx.borrow(), 6u32);
    // It can also await changes, but there won't be anything yet
    if let Some(_) = option_rx.changed().now_or_never() {
        panic!("Shouldn't happen");
    }

    // You can have as many subscribers as you want
    let mut option_rx_2 = option_tx.subscribe();

    // Change the setting again
    option_tx.send_replace(10);

    // Subscribers will learn about it
    assert!(option_rx.changed().await.is_ok());
    assert!(option_rx_2.changed().await.is_ok());
    assert_eq!(*option_rx.borrow(), 10);
    assert_eq!(*option_rx_2.borrow(), 10);

    println!("Success");
}

Of course the channel is much more abstract than a “setting” or “configuration value”. I’m using that as a motivating example but this applies to any situation where a value generated in one component will change over time and other components need to respond to that immediately.