When Ferrous Metals Corrode, pt. XIX

Intro

For this post I'm summarizing chapter 20, "Asynchronous Programming".

In the previous chapter we've been talking about concurrency via threads. Threads are, of course, expensive. Some systems do their own scheduling of lightweights threads/processes (e.g. Erlang) to come around this. Rust doesn't have a VM / runtime built-in to run green threads on like Erlang has. Rust uses the concept of asynchronous tasks – lightweight, non-blocking, cooperatively scheduled units of execution – instead, and requires a runtime only if those are in use.

As an example the book presents code samples from an async chat server (a popular example for using lightweight concurrency!).

Here's the server that listens for incoming chat requests:

// one of the available (ext.) runtimes
use async_std::{net, task};

// calls that may block are postfixed with .await
let listener = net::TcpListener::bind(address).await?;

let mut new_connections = listener.incoming();
while let Some(socket_result) = new_connections.next().await {
    let socket = socket_result?;
    let groups = chat_group_table.clone();
    task::spawn(async {
        log_error(serve(socket, groups).await);
    });
}

Details follow below.

From Synchronous to Asynchronous

Futures

std::future::Future is a trait with a method poll() and an associated type called Output:

trait Future {
    type Output;
    // For now, read `Pin<&mut Self>` as `&mut Self`.
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

enum Poll<T> {
    Ready(T),
    Pending,
}

Futures represent ongoing operations. Polling it never blocks and will return the current status of that operation – either Ready (with output) or Pending. Futures also will invoke a waker (passed in via Context) if it's worthwhile to poll again.

A function that runs async and only returns a Future to be polled by the caller until completion could have a signature like this:

fn read_to_string<'a>(&'a mut self, buf: &'a mut String)
    -> impl Future<Output = Result<usize>> + 'a;

I.e. it'll return a Future that promises to return a Result<usize> as an output sometime in the future.

It also states that the Future can only live as long as the self and buf ref params do (which makes sense since the fun will read data into the buf presumably).

The async-std crate aims to provide async variants for the blocking I/O in the std lib.

With the above the transition of a Future from Pending to Ready is supposed to be a one-time thing: once a poll returns a Ready state, the Future should not be polled again.

Async Functions and Await Expressions

An async fun that GETs some url:

use async_std::io::prelude::*;
use async_std::net;

async fn cheapo_request(host: &str, port: u16, path: &str)
                            -> std::io::Result<String>
{
    // async version of tcp a tcp sock, a-waiting for connection
    let mut socket = net::TcpStream::connect((host, port)).await?;

    let request = format!("GET {} HTTP/1.1\r\nHost: {}\r\n\r\n", path, host);
    // write to socket and a-wait for respone
    socket.write_all(request.as_bytes()).await?;
    // close our end
    socket.shutdown(net::Shutdown::Write)?;

    let mut response = String::new();
    // read to response buffer
    socket.read_to_string(&mut response).await?;

    Ok(response)
}
  • async fn defines an async function; those will return Futures of the specified return type

  • I/O is done by the async-std crates' version of std functions

  • on all the futures' we're getting we add .await, which is special syntax built into the language to wait until the Future returns Ready.

Note that traits may not have async fns. The async-trait crate provides a macro-based workaround for this restriction.

Calling Async Functions from Synchronous Code: blockon

The above .await magic only works within async fns. From a regular non-async fn we need to use task::block_on(), for instance to call the async fn from above:

fn main() -> std::io::Result<()> {
    use async_std::task;

    let response = task::block_on(cheapo_request("example.com", 80, "/"))?;
    println!("{}", response);
    Ok(())
}

block_on() is, as the name suggests, a blocking function. It should not be run in an async fn.

Spawning Async Tasks

The async-std crate has a task::spawn_local fn that takes a future and adds it to a pool of futures to be run when the next block_on() call comes around. It returns a JoinHandle future which can be used to retrieve the async final value. (Note, need to enable "unstable" for this).

One attempt at using this to make a bunch of GETs:

pub async fn many_requests(requests: Vec<(String, u16, String)>)
                           -> Vec<std::io::Result<String>>
{
    use async_std::task;

    let mut handles = vec![];
    for (host, port, path) in requests {
        handles.push(task::spawn_local(cheapo_request(&host, port, &path)));
    }

    let mut results = vec![];
    for handle in handles {
        results.push(handle.await);
    }

    results
}

However the rustc will complain about lifetimes of host and path. We're passing those by ref to cheapo_request(), but the compiler doesn't know we will stay around long enough – we might return early and the ref for host could point at a dropped value. A similar thing would happen with std::thread::spawn().

One way around this is to pass in host/path by value.

Example adapted from the book:

use async_std::net;
use async_std::task;
use async_std::io::prelude::*;


async fn cheapo_owning_request(host: String, path: String)
                               -> std::io::Result<String> {
    let mut socket = net::TcpStream::connect(&host).await?;

    let request = format!("GET {} HTTP/1.1\r\nHost: {}\r\n\r\n", path, host);
    socket.write_all(request.as_bytes()).await?;
    socket.shutdown(net::Shutdown::Write)?;

    let mut response = String::new();
    socket.read_to_string(&mut response).await?;

    Ok(response)
}

pub async fn many_requests(requests: Vec<(String, String)>)
                           -> Vec<std::io::Result<String>>
{

    let mut handles = vec![];
    for (host, path) in requests {
        handles.push(task::spawn_local(cheapo_owning_request(host, path)));
    }

    let mut results = vec![];
    for handle in handles {
        results.push(handle.await);
    }

    results
}

fn main() {
    let requests = vec![
        ("example.com:80".to_string(), "/".to_string()),
        ("www.red-bean.com:80".to_string(), "/".to_string()),
        ("en.wikipedia.org:80".to_string(), "/".to_string()),
    ];

    let results = async_std::task::block_on(many_requests(requests));
    for result in results {
        match result {
            Ok(response) => println!("{}", response),
            Err(err) => eprintln!("error: {}", err),
        }
    }
}

This needs unstable, ie. this in Cargo.toml:

[dependencies.async-std]
version = "1.12.0"
features = ["unstable"]

Async Blocks

These are like normal blocks, but return a Future of the value the block would return. Within async blocks one can use the .await exp:

let serve_one = async {
    use async_std::net;

    // Listen for connections, and accept one.
    let listener = net::TcpListener::bind("localhost:8087").await?;
    let (mut socket, _addr) = listener.accept().await?;

    // Talk to client on `socket`.
    ...
};

Async blocks are somewhat like closures when accessing vars outside their scope – they capture them like a closure would. Also a block can be declared async move to move vars inside the block.

This would have been another option to get around the issues with lifetimes we encountered earlier. Instead of introducing cheapo_owning_request() we could have used an async move block like this:

...
for (host, port, path) in requests {
    handles.push(task::spawn_local(async move {
        cheapo_request(&host, port, &path).await
    }));
}
...

Here the rust borrow checker can clearly see that host/path are moved into the block, even if cheapo_request() request itself only takes refs; basically the same thing we had achieved with cheapo_owning_request()

One limitation of async blocks is they can't explicitly specify a return type for their Future so infering types might fail.

Building Async Functions from Async Blocks

fn cheapo_request(host: &str, port: u16, path: &str)
    -> impl Future<Output = io::Result<String>> + 'static
{
    let host = host.to_string();
    let path = path.to_string();

    async move {
        ... use &*host, port, and path ...
    }
}

Spawning Async Tasks on a Thread Pool

The async-std crate has a task::spawn() fun which is similar to spawn_local() but will use a thread pool to put callables on. It'll return JoinHandles as well but will begin to run tasks async immediately.

Example:

use async_std::task;

let mut handles = vec![];
for (host, port, path) in requests {
    handles.push(task::spawn(async move {
        cheapo_request(&host, port, &path).await
    }));
}

Need to keep in mind that tasks are possibly served by different threads, i.e. not required to stay thread-local. The crate has a concept of task local storage (instead of thread-local storage), see the task_local! macro.

But Does Your Future Implement Send?

As spawn moves futures between threads they must implement the Send trait. This implies all the values in it must be Send. This is similar to what we need when using threads directly.

This is problematic e.g.:

use async_std::task;
use std::rc::Rc;

async fn reluctant() -> String {
    let string = Rc::new("ref-counted string".to_string());

    some_asynchronous_thing().await;

    format!("Your splendid string: {}", string)
}

task::spawn(reluctant());

The string isn't Send, and because it has .await in it, it needs to be able to resume from the from point of .await. If it moved to another thread, the Rc might not be there.

Restricting the scope would help:

async fn reluctant() -> String {
    let return_value = {
        let string = Rc::new("ref-counted string".to_string());
        format!("Your splendid string: {}", string)
        // The `Rc<String>` goes out of scope here...
    };

    // ... and thus is not around when we suspend here.
    some_asynchronous_thing().await;

    return_value
}

At the point of the .await, the string Rc has already been dropped.

Alternatively, we could use an Arc instead of an Rc.

Consider these as well:

// Not recommended!
type GenericError = Box<dyn std::error::Error>;
type GenericResult<T> = Result<T, GenericError>;

// Better! Make GenericError require Send
type GenericError = Box<dyn std::error::Error + Send + Sync + 'static>;
...

Long Running Computations: yieldnow and spawnblocking

Async concurreny works cooperatively i.e. the scheduler won't interrupt a long-running task. This can result in unfair distribution of resources. For a long-running task to give up a thread occasionally it can use the async_std::task::yield_now() fun:

while computation_not_done() {
    ... do one medium-sized step of computation ...
    async_std::task::yield_now().await;
}

This in effect will move the task to the back of the execution queue.

If using yield_now() isn't possible another option is async_std::task::spawn_blocking() which takes a closure and runs it in a special thread pool for blocking tasks.

Comparing Asynchronous Designs

Unlike other environments Rust doesn't have a built in global event loop; you have to choose an executor for your futures (e.g. async-std or tokio). I wonder how that works with library code and mixing executors.

I am, tbh, wary of async in general, here or in other languages, it's a bit of a pain to have various flavors of functions around.

A Real Asynchronous HTTP Client

Example async http client using the surf crate:

pub async fn many_requests(urls: &[String])
                           -> Vec<Result<String, surf::Exception>>
{
    let client = surf::Client::new();

    let mut handles = vec![];
    for url in urls {
        let request = client.get(&url).recv_string();
        handles.push(async_std::task::spawn(request));
    }

    let mut results = vec![];
    for handle in handles {
        results.push(handle.await);
    }

    results
}

fn main() {
    let requests = &["http://example.com".to_string(),
                     "https://www.red-bean.com".to_string(),
                     "https://en.wikipedia.org/wiki/Main_Page".to_string()];

    let results = async_std::task::block_on(many_requests(requests));
    for result in results {
        match result {
            Ok(response) => println!("*** {}\n", response),
            Err(err) => eprintln!("error: {}\n", err),
        }
    }
}

An Asynchronous Client and Server

This section introduces an example chat client/server system based on tokio. Despite being a simple example system it's designed to handle backpressure well: slow or lossy clients mustn't impact message exchange of other clients, and if a client is inable to keep up the server should drop messages for them (but inform the client about this fact).

Beside tokio, this will also pull in the async-std crate – this should be possible as long as the documented rules for these crates are followed.

For client/server communication, json serialization via serde will be used.

Cargo run can invoke both client and server, e.g.:

$ cargo run --release --bin server -- localhost:8088
$ cargo run --release --bin client -- localhost:8088

Error and Result Types

The example defines a boxed error type:

pub type ChatError = Box<dyn Error + Send + Sync + 'static>;

In a real application should consider the anyhow crate.

The Protocol

Communication protocol is defined via two enums FromClient and FromServer; for both we derive Serialization and Deserialization (provided by serde).

The enums use Arc<String> in order to help the server avoid making copies when distributing messages.

Taking User Input: Asynchronous Streams

Clients read user input from stdin into an async_std::io::BufReader. This has a method .lines() which provides a stream of Result<String> values. A stream is the async version of an iterator i.e. it produces a sequence of values on demand, in an async-friendly way.

The Stream trait:

trait Stream {
    type Item;

    // For now, read `Pin<&mut Self>` as `&mut Self`.
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<Option<Self::Item>>;
}

Same as an iterator it has an Item type and returns an Option which will be None if the stream has ended. Unlike an iterator however one must call poll_next() until it returns Ready. If this returns Pending instead, it'll wake up the caller via Context if a new item is ready.

The poll_next() method is awkward to use directly; instead use next() which is a Future of the next item:

while let Some(item) = stream.next().await {
    ... use item ...
}

Sending Packets

This defines a send_as_json() fun that sends a packet to a stream. Packets must be Serialize, and streams async Write and Unpin (more on that later).

Receiving Packets: More Asynchronous Streams

The inverse of the above function: receive_as_json(), operating on types that implement BufRead, Unpin and DeserializeOwned.

The DeserializeOwned trait is a stricter variant of Deserialize which memcopies the data instead of returning refs. The fun returns sth that impls Stream with ChatResult items. Interestingly the receive_as_json() func is not async itself, it just returns an async Stream value.

use serde::de::DeserializeOwned;

pub fn receive_as_json<S, P>(inbound: S) -> impl Stream<Item = ChatResult<P>>
    where S: async_std::io::BufRead + Unpin,
          P: DeserializeOwned,
{
    inbound.lines()
        .map(|line_result| -> ChatResult<P> {
            let line = line_result?;
            let parsed = serde_json::from_str::<P>(&line)?;
            Ok(parsed)
        })
}

This async fun uses receive_as_json() from above:

use async_chat::FromServer;

async fn handle_replies(from_server: net::TcpStream) -> ChatResult<()> {
    let buffered = io::BufReader::new(from_server);
    let mut reply_stream = utils::receive_as_json(buffered);

    while let Some(reply) = reply_stream.next().await {
        match reply? {
            FromServer::Message { group_name, message } => {
                println!("message posted to {}: {}", group_name, message);
            }
            FromServer::Error(message) => {
                println!("error from server: {}", message);
            }
        }
    }

    Ok(())
}

It awaits the Stream from receive_as_json(), unwraps and matches the reply against pattern to decide if its a Message or an Error, then prints each of those variants. Note the BufReader in use above is the version from async_std

The Client’s Main Function

The main fn wraps the async fns it needs to call in an async block and passes this to task::block_on() which blocks the current thread until the async tasks are done.

use async_std::task;

fn main() -> ChatResult<()> {
    let address = std::env::args().nth(1)
        .expect("Usage: client ADDRESS:PORT");

    task::block_on(async {
        let socket = net::TcpStream::connect(address).await?;
        socket.set_nodelay(true)?;

        let to_server = send_commands(socket.clone());
        let from_server = handle_replies(socket);

        from_server.race(to_server).await?;

        Ok(())
    })
}

The from_server.race(to_server) bit is a poll on both from/to which will return Ready if either one of them is Ready. This method is defined as an extension trait FutureExt in the async-std prelude.

The Server’s Main Function

The main.rs module:

use async_std::prelude::*;
use async_chat::utils::ChatResult;
use std::sync::Arc;

mod connection;
mod group;
mod group_table;

use connection::serve;

fn main() -> ChatResult<()> {
    let address = std::env::args().nth(1).expect("Usage: server ADDRESS");

    let chat_group_table = Arc::new(group_table::GroupTable::new());

    async_std::task::block_on(async {
        use async_std::{net, task};

        let listener = net::TcpListener::bind(address).await?;

        let mut new_connections = listener.incoming();
        while let Some(socket_result) = new_connections.next().await {
            let socket = socket_result?;
            let groups = chat_group_table.clone();
            task::spawn(async {
                log_error(serve(socket, groups).await);
            });
        }

        Ok(())
    })
}

fn log_error(result: ChatResult<()>) {
    if let Err(error) = result {
        eprintln!("Error: {}", error);
    }
}

The most interesting bits are in the async block that is called via the task::block_on() call. It's waiting for connections on a tcp listener. Each incoming conn gets handled in a new async block that calls serve()

The serve fn also gets a new clone of the chat group table wrapped in a refcounted heap value.

Handling Chat Connections: Async Mutexes

The meat of the server is in an async serve() fn which takes a tcp socket and a shared group table. The fn clones the incoming socket for answers and creates a buffered reader from the ingress socket, then reads incoming json data. Each piece of data is handled in a while loop. Data is matched against a FromClient enum which is either a join or a post operation.

The egress socket is wrapped in the Outbound type, more on that below. The outbound value itself is shared via an atomic refcounted heap value.

use async_chat::{FromClient, FromServer};
use async_chat::utils::{self, ChatResult};
use async_std::prelude::*;
use async_std::io::BufReader;
use async_std::net::TcpStream;
use async_std::sync::Arc;

use crate::group_table::GroupTable;

pub async fn serve(socket: TcpStream, groups: Arc<GroupTable>)
                   -> ChatResult<()>
{
    let outbound = Arc::new(Outbound::new(socket.clone()));

    let buffered = BufReader::new(socket);
    let mut from_client = utils::receive_as_json(buffered);
    while let Some(request_result) = from_client.next().await {
        let request = request_result?;

        let result = match request {
            FromClient::Join { group_name } => {
                let group = groups.get_or_create(group_name);
                group.join(outbound.clone());
                Ok(())
            }

            FromClient::Post { group_name, message } => {
                match groups.get(&group_name) {
                    Some(group) => {
                        group.post(message);
                        Ok(())
                    }
                    None => {
                        Err(format!("Group '{}' does not exist", group_name))
                    }
                }
            }
        };

        if let Err(message) = result {
            let report = FromServer::Error(message);
            outbound.send(report).await?;
        }
    }

    Ok(())
}

The Outbound type is defined thusly:

use async_std::sync::Mutex;

pub struct Outbound(Mutex<TcpStream>);

impl Outbound {
    pub fn new(to_client: TcpStream) -> Outbound {
        Outbound(Mutex::new(to_client))
    }

    pub async fn send(&self, packet: FromServer) -> ChatResult<()> {
        let mut guard = self.0.lock().await;
        utils::send_as_json(&mut *guard, &packet).await?;
        guard.flush().await?;
        Ok(())
    }
}

When creating an Outbound value the passed-in tcp socket is wrapped in a Mutex (the async version of Mutex). Sending on the socket first locks the mutex. The &mut *guard bit is necessary to persuade Rust that this is a &mut TcpStream which the send_as_json() needs.

We need the async Mutex version because:

  1. to support suspending a task which holds the mutex

  2. locking also yields

  3. the async Mutex also can be sent to other threads (it implements Send)

The Group Table: Synchronous Mutexes

Async mutexes are relatively expensive when compared to standard mutexes though. In non-async code the std lib version should therefore be preferred.

Example: the GroupTable type below:

use crate::group::Group;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

pub struct GroupTable(Mutex<HashMap<Arc<String>, Arc<Group>>>);

impl GroupTable {
    pub fn new() -> GroupTable {
        GroupTable(Mutex::new(HashMap::new()))
    }

    pub fn get(&self, name: &String) -> Option<Arc<Group>> {
        self.0.lock()
            .unwrap()
            .get(name)
            .cloned()
    }

    pub fn get_or_create(&self, name: Arc<String>) -> Arc<Group> {
        self.0.lock()
            .unwrap()
            .entry(name.clone())
            .or_insert_with(|| Arc::new(Group::new(name)))
            .clone()
    }
}

The GroupTable type is a mutex-protected hash table; again using Arc refcounting for lightweight sharing.

There's also collection types specialized for concurrent access for improved scaling like found in the dashmap crate.

Chat Groups: tokio’s Broadcast Channels

These are multi-producer, multi-consumer channels (any number of async tasks or threads can send or receive). They also implement backpressure: usually all values are copied to all consumers, and values that can't be delivered are queued. However, if the number of values in a queue exceeds some threshold, old values are dropped.

For the chat server example, a group is defined as having a name and a broadcast channel. Example code:

use async_std::task;
use crate::connection::Outbound;
use std::sync::Arc;
use tokio::sync::broadcast;

pub struct Group {
    // both name and broadcast channel work on refcounted strings
    name: Arc<String>,
    sender: broadcast::Sender<Arc<String>>
}

impl Group {
    pub fn new(name: Arc<String>) -> Group {
        // max. queue cap: 1000
        let (sender, _receiver) = broadcast::channel(1000);
        Group { name, sender }
    }

    pub fn join(&self, outbound: Arc<Outbound>) {
        // joiners must pass in a back channel
        // they subscribe to the broadcast chan
        let receiver = self.sender.subscribe();

        // then an async task is spawned that handles passing
        // values in and out
        task::spawn(handle_subscriber(self.name.clone(),
                                      receiver,
                                      outbound));
    }

    pub fn post(&self, message: Arc<String>) {
        // This only returns an error when there are no subscribers. A
        // connection's outgoing side can exit, dropping its subscription,
        // slightly before its incoming side, which may end up trying to send a
        // message to an empty group.
        let _ignored = self.sender.send(message);
    }
}

Heres the async fn that handles subscriptions.

use async_chat::FromServer;
use tokio::sync::broadcast::error::RecvError;

async fn handle_subscriber(group_name: Arc<String>,
                           mut receiver: broadcast::Receiver<Arc<String>>,
                           outbound: Arc<Outbound>)
{
    loop {
        // constructing a packet from a received message
        let packet = match receiver.recv().await {
            // message: return a from server message type
            Ok(message) => FromServer::Message {
                group_name: group_name.clone(),
                message: message.clone(),
            },

            // overrun: generate error
            Err(RecvError::Lagged(n)) => FromServer::Error(
                format!("Dropped {} messages from {}.", n, group_name)
            ),

            // break if ingress is closed
            Err(RecvError::Closed) => break,
        };

        // send constructed packet back out (bail on err)
        // btw. if we break receiver is dropped, which also
        // unsubscribes us from group
        if outbound.send(packet).await.is_err() {
            break;
        }
    }
}

Again we're looping over packets from the receiver. In the Ok case we construct a FromServer packet which would get sent outbound-wards. We get a specific RecvError if we missed messages. If the receiving end is closed, or if sending out errors, we break out of the loop. This also drops the receiver which will cause it to unsub from the group, so memberships get cleaned up.

Above we can see that every client gets an async task for every group membership. This is possible because async tasks are lightweight; were we to use full threads this wouldn't be feasible really. That reminds me a bit of Erlang where completely new design patterns were made possible because Erlangs green threads ("processes" in Erlang) are so light on resources. I remember spawning 10M Erlang processes once as a quick experiment, each sending a message to the next process – and my laptop barely started spinning its fans.

Primitive Futures and Executors: When Is a Future Worth Polling Again?

What do async primitives look like under the hood? With the key question, when should a future be (re-) polled?

Invoking Wakers: spawnblocking

Executors like block_on() pass in a waker to a future they poll. If it's not ready, it should arrange the waker to be called later, something like this:

use std::task::Waker;

struct MyPrimitiveFuture {
    ...
    waker: Option<Waker>,
}

impl Future for MyPrimitiveFuture {
    type Output = ...;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<...> {
        ...

        if ... future is ready ... {
            return Poll::Ready(final_value);
        }

        // Save the waker for later.
        self.waker = Some(cx.waker().clone());
        Poll::Pending
    }
}

...

// If the future knows it's ready, and if we
// have a waker, invoke it, and clear `self.waker`.
if let Some(waker) = self.waker.take() {
    waker.wake();
}

Ideally, the executor (block_on()) and the future both poll and wake: the executor polls the future and goes to sleep, then the future invokes the waker, so the executor wakes up and polls the future again.

Futures of async fns and blocks pass their context down a chain to the futures they await. Wakers are both Clone and Send, so can be passed to other threads.

Implementing blockon

Exercise: implement our own version of block_on()

Commented source code below:

use waker_fn::waker_fn;      // Cargo.toml: waker-fn = "1.1"
use futures_lite::pin;       // Cargo.toml: futures-lite = "1.11"
use crossbeam::sync::Parker; // Cargo.toml: crossbeam = "0.8"
use std::future::Future;
use std::task::{Context, Poll};

fn block_on<F: Future>(future: F) -> F::Output {

    // parker / unparker are utilities that block and unblock a thread
    // we're using this to block when the future isn't ready
    // and correspondingly the waker will unpark once it is
    let parker = Parker::new();
    let unparker = parker.unparker().clone();

    // the waker takes ownership of the unparker
    let waker = waker_fn(move || unparker.unpark());
    let mut context = Context::from_waker(&waker);

    // pin! macro creates a Pin<&mut F> out of F
    // more on pinning below
    pin!(future);

    // loop: poll the future, return if it's ready,
    // and park it if not
    loop {
        match future.as_mut().poll(&mut context) {
            Poll::Ready(value) => return value,
            Poll::Pending => parker.park(),
        }
    }
}

Pinning

The Pin type is used to mark pointers that are safe to use for futures.

The Two Life Stages of a Future

Why are Pins needed? The basic motivation is that futures must hold all the data needed to allow executions be resumed once it's re-polled. Local variables in the future that have a ref borrowed, for instance using it in a subfuture mean part of the future is borrowed. However you can't move a value while it has refs borrowed.

Now, futures have two distinct life stages

  1. Future freshly created: no part of the future could be borrowed yet. Future is safe to move

  2. Future has been polled. Once the body has begun execution it could have borrowed out data in the future. Therefore we must assume the future is not safe to move anymore.

Here the Pin comes into play. Pin is a wrapper for pointers that ensures that their wrapped values (pointers) can't be moved.

Rust requires that the poll() method be called with a Pin<&mut Self>; therefore the Self can't be moved again.

Pinned Pointers

Per the above, Pins wrap pointers and enforce the no-move rule on them; they're called "pinned pointers".

Ways to create pinned pointers:

  • the pin! macro from the futures-lite crate which shadows a T with a Pin<&mut T>

  • Box::pin constructor from the stdlib which takes a T and returns a Pin<Box<T>>

  • Pin::from(boxed) takes a boxed T and returns a pinned box

For all of these, once a T is pinned it can't be accessed. The pin itself can be moved, but there's no way to reach the pinned value, and therefore pinned futures can be polled safely.

Pin<pointer to T> types have an asmut method that dereferences the pointer and returns the Pin<&mut T> we need for polling. The as_mut() can also help polling a future without giving up ownership; this is why it was used in the block_on above. Calling future.poll() directly would take ownership, as a mutable ref is not Copy, and we need to avoid that so we can poll in a loop.

We can't get mut refs to pinned values – those would allow shenanigans e.g. with std::mem::replace on it.

The Unpin Trait

The Unpin trait is a marker trait that says a specific type doesn't need the restrictions imposed by pinning. Almost all Rust types implement Unpin, which means the value can be retrieved again.

Example, String implements Unpin:

let mut string = "Pinned?".to_string();
let mut pinned: Pin<&mut String> = Pin::new(&mut string);

pinned.push_str(" Not");
Pin::into_inner(pinned).push_str(" so much.");

let new_home = string;
assert_eq!(new_home, "Pinned? Not so much.");

Note that Pin<&mut F> and Pin<Box<F>> do implement Unpin – even though F is not save to move once polled, a pointer to F can be moved fine.

When Is Asynchronous Code Helpful?

There are some advantages to writing code in an async style and some non-advantages – perceived benefits that aren't really:

  • "Async is good for I/O". Async by itself does nothing to speed up I/O

  • "Async is easier to write than multithreaded code". Multithreaded code in Python or similar languages can be miserable to debug; as you have no concept of ownership it's easy to access and mutate variables from several threads simultaneously, leading to data races. In Rust access is much clearer: you have ownership (and mut ref) with exclusive access, or shared refs which are read-only. This makes using threads in Rust much safer to use, so no real benefit for async here.

Good use cases for async in Rust:

  • Memory usage: Futures are a few hundred bytes, compared to thread sizes that start at around 20kb (including kernel mem and physical alloc, not reserved virtual pages). For some concurrency patterns, e.g. creating (at least one) future per connection, this can make a difference

  • Faster creation: a Linux thread takes around 15us to create, while spawning an async task takes around 300ns

  • Faster context switching: on Linux, switching contexts between async tasks is much faster than between threads. However, switches for I/O readiness make async more expensive; switching between multiple processors is more expensive due higher communication costs as well.

Coda

This was quite a long chapter, and it took me some time to wrap my head around it. I feel like while I've used async in Python quite heavily to avoid the pitfalls of threading, I might stick with plain threading in Rust more often – less pitfalls, and no additional runtime and extra color of funcs needed.