Future 特征

Future 特征是 Rust 异步编程的核心,毕竟异步函数是异步编程的核心,而 Future 恰恰是异步函数的返回值和被执行的关键。

首先,来给出 Future 的定义:它是一个能产出值的异步计算(虽然该值可能为空

trait SimpleFuture {
    type Output;
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output>;
}
 
enum Poll<T> {
    Ready(T),
    Pending,
}

若在当前 poll 中, Future 可以被完成,则会返回 Poll::Ready(result)

反之则返回 Poll::Pending, 并且安排一个 wake 函数:当未来 Future 准备好进一步执行时, 该函数会被调用,然后管理该 Future 的执行器(例如上一章节中的block_on函数)会再次调用 poll 方法,此时 Future 就可以继续执行了

即如果存在 wake ,那么这个 Future 就可以继续被执行。 如果没有 wake 方法,那执行器无法知道某个 Future 是否可以继续被执行,除非执行器定期的轮询每一个 Future,确认它是否能被执行,但这种作法效率较低。而有了 wakeFuture 就可以主动通知执行器,然后执行器就可以精确的执行该 Future。 这种“事件通知 -> 执行”的方式要远比定期对所有 Future 进行一次全遍历来的高效。

例子

考虑一个需要从 socket 读取数据的场景:如果有数据,可以直接读取数据并返回 Poll::Ready(data), 但如果没有数据,Future 会被阻塞且不会再继续执行,此时它会注册一个 wake 函数,当 socket 数据准备好时,该函数将被调用以通知执行器:我们的 Future 已经准备好了,可以继续执行。

pub struct SocketRead<'a> {
    socket: &'a Socket,
}
 
impl SimpleFuture for SocketRead<'_> {
    type Output = Vec<u8>;
 
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if self.socket.has_data_to_read() {
            // socket有数据,写入buffer中并返回
            Poll::Ready(self.socket.read_buf())
        } else {
            // socket中还没数据
            //
            // 注册一个`wake`函数,当数据可用时,该函数会被调用,
            // 然后当前Future的执行器会再次调用`poll`方法,此时就可以读取到数据
            self.socket.set_readable_callback(wake);
            Poll::Pending
        }
    }
}

使用 Waker 来唤醒任务

对于 Future 来说,第一次被 poll 时无法完成任务是很正常的。但它需要确保在未来一旦准备好时,可以通知执行器再次对其进行 poll 进而继续往下执行,该通知就是通过 Waker 类型完成的。

Waker 提供了一个 wake() 方法可以用于告诉执行器:相关的任务可以被唤醒了,此时执行器就可以对相应的 Future 再次进行 poll 操作。

构建一个定时器

新建线程在睡眠结束后会需要将状态同步给定时器 Future ,由于是多线程环境,我们需要使用 Arc<Mutex<T>> 来作为一个共享状态,用于在新线程和 Future 定时器间共享。

pub struct TimerFuture {
    shared_state: Arc<Mutex<SharedState>>,
}
 
/// 在Future和等待的线程间共享状态
struct SharedState {
    /// 定时(睡眠)是否结束
    completed: bool,
 
    /// 当睡眠结束后,线程可以用`waker`通知`TimerFuture`来唤醒任务
    waker: Option<Waker>,
}

Future 的具体实现:

impl Future for TimerFuture {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // 通过检查共享状态,来确定定时器是否已经完成
        let mut shared_state = self.shared_state.lock().unwrap();
        if shared_state.completed {
            Poll::Ready(())
        } else {
            // 设置`waker`,这样新线程在睡眠(计时)结束后可以唤醒当前的任务,接着再次对`Future`进行`poll`操作,
            //
            // 下面的`clone`每次被`poll`时都会发生一次,实际上,应该是只`clone`一次更加合理。
            // 选择每次都`clone`的原因是: `TimerFuture`可以在执行器的不同任务间移动,如果只克隆一次,
            // 那么获取到的`waker`可能已经被篡改并指向了其它任务,最终导致执行器运行了错误的任务
            shared_state.waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}
 

再来创建一个 API 用于构建定时器和启动计时线程:

impl TimerFuture {
    /// 创建一个新的`TimerFuture`,在指定的时间结束后,该`Future`可以完成
    pub fn new(duration: Duration) -> Self {
        let shared_state = Arc::new(Mutex::new(SharedState {
            completed: false,
            waker: None,
        }));
 
        // 创建新线程
        let thread_shared_state = shared_state.clone();
        thread::spawn(move || {
            // 睡眠指定时间实现计时功能
            thread::sleep(duration);
            let mut shared_state = thread_shared_state.lock().unwrap();
            // 通知执行器定时器已经完成,可以继续`poll`对应的`Future`了
            shared_state.completed = true;
            if let Some(waker) = shared_state.waker.take() {
                waker.wake()
            }
        });
 
        TimerFuture { shared_state }
    }
}

执行器 Executor

Rust 的 Future 是惰性的:只有屁股上拍一拍,它才会努力动一动。其中一个推动它的方式就是在 async 函数中使用 .await 来调用另一个 async 函数,但是这个只能解决 async 内部的问题,那么这些最外层的 async 函数,谁来推动它们运行呢?答案就是我们之前多次提到的执行器 executor 。

参考

底层探秘: Future 执行与任务调度 - Rust语言圣经(Rust Course)