Rust语言的异步编程模型和协程支持

本文有 11113 字,大约需要 27 分钟可以读完, 创建于 2019-12-07

通用的注重性能的现代编程语言都在慢慢地加入对异步编程和协程模型的支持,作为一门强调性能和零成本抽象的语言,Rust自然也不甘落人之后。

可能是因为Rust语言社区出于对自身特殊的生存周期管理机制多带来的其它语言所有没有的额外的编译器实现复杂性,以及开放语言生态系统中的功能开发优先级(它迄今为止还在快速演进中)考虑,在去年年底发布的2018版本中,协程的支持并[没有完工] (//post/2019/01/19/rust-2018-edition-overview/) 而是仅仅留下了一个预留关键字,有点“强制占位”的味道。

换个角度看,也可以认为Rust社区对尽快支持基于async/await风格的协程还是非常重视的。 果不其然,经过快一年的打磨,这些复杂的特性随着最新稳定版本的发布而正式进入了一般程序员的视野(之前的支持放在Nightly发布频道里面给早期吃螃蟹的用户尝鲜)。

协程和它之前的历史背景

协程并不是一个怎么新鲜的编程概念,起码在上个世纪90年代或者更早的编程语言大发展的时候已经出现在某些语言中;只是因为当时最主流的C++和Java(还没有今天这么流行)的语言并不在语言层面提供对协程的支持而不被主流程序员所熟知。

多线程编程的挑战

那个时代主流的模型还是基于操作系统提供的线程的概念来实现并发编程;典型的如Apache服务器引入的一个连接一个线程的概念:每次有一个新的用户连接请求进来,服务器就会为其创建一个线程;这样可以做到多个线程之间互相不干扰。

这样的处理机制非常简单优雅,可惜在连接数目渐渐增多的时候,可扩展性很差,因为操作系统本身并不擅长处理大量的线程:任何一个线程的创建都需要设计到栈空间的分配、线程私有资源的创建和维护;同时大量的线程在操作系统层面被调度的时候,也很难保证调度策略的公平和有效。

曾经一度引发热烈讨论的C10K问题就是针对这一模型的不足:超过一定数量的连接之后,操作系统甚至将大量的CPU处理资源浪费在线程之间的切换和控制,而不能很好地处理用户层的实际应用需求。 标准的POSIX socket调用的多路处理函数select在传入的socket句柄数量超过一定数量之后也会耗费大量的CPU运算资源于内核态,而用户层程序则束手无策。

显然将用户层面的并发问题直接抛给操作系统并不是一个非常明智的选择;随着应用领域的需求对并发性的要求愈加严苛,这些问题需要找到更好的解决方案。

操作系统层面的探索

以Unix和Linux为主流的非Windows系统自身是从解决并发连接的角度来考虑的,它们通过在内核层面的IO多路复用角度触发,通过引入epoll/kqueue等技术来允许一个单一的用户线程的IO调用可以管理海量的网络连接而完美地解决了传统的select调用层面的问题。当然Windows自己也提供了类似的IOCP的概念实现类似的高性能IO多路复用方案。

这方面的探索解决了一个线程处理海量网络IO的系统资源过耗问题,但是这些改进对和网络无关的本地的耗费CPU的任务的并发挑战并没有多少帮助。 同时这些IO方面的处理暗含了用户必须书写异步代码的挑战:对于可能阻塞的IO调用,用户必须通过回调函数的方式将原来简单、直接的顺序代码改写为支离破碎的异步代码。 只需要看看boost.asio库的实例代码,就会明白这样的编程方式如何可耻地损伤了代码的可读性,显然代码的可维护性也很糟糕。

优化内核线程处理的尝试

这方面的尝试应该以Solaris的线程模型最为典型:对应的pthread接口低下的实现虽然也映射到了操作系统的内核线程,但是这些用户层面的线程和操作系统线程的对应关系不是1:1而是M:N的。

只是这个配置的灵活度并不是太高,其性能表现还是令人刮目相看的,用户可以轻松地创建上万个线程而内核还可以很高效的使用CPU资源来调度用户的处理。

当然因为一些非技术上的原因,Solaris的这些创举并没有在它自身意外的领域发扬光大,而是随着Sun公司的垮台和被收购,Solaris系统最终停止开发而烟消云散了。 主流的Linux系统和其它Unix变种依然在采用一一对应的线程模型。

用户态”线程”

另外一个有趣的探索方向是用户态线程的概念,或者成为Green Thread,它的基本思路是将逻辑线程的管理逻辑都放在操作系统的用户态来管理,而操作系统层面对这些机制一无所知。 运行在用户态的线程管理程序(往往是比较底层的基础库或者组件)自己负责这些线程状态的管理和调度。

这方面比较典型的例子是早期的Python所支持的线程模型,和来自爱立信的Erlang语言底层平台OTP;甚至于有一段实际Java语言的线程库也采用过Green Thread(尽管现在已经不再使用)。 Python的Green Thread实现一度由于它自身历史承袭的GIL锁的性能噩梦而终被弃之高阁了。

异步编程的心智负担

随着上面所说的各种尝试逐渐式微,主流的编程语言不得不回到多线程编程的路上继续探索,这一次随着进入新世纪依赖多核CPU的日渐流行,各种编程语言都不得不寻求其它办法来简化程序员的心智负担,因为编写没有问题的多线程、异步程序实在不是一个轻松的差事。

一方面不管是程序员还是底层的编译器乃至CPU硬件都更加适应一步接一步走下去的顺序执行的逻辑,已有的底层编译器可以对顺序执行程序做深入的分析和优化,处理器可以对指令流水线做高效的预测和预判执行;另一方面程序员却不得不面对顺序代码的效率不高的现实:不管是读取磁盘上的文件,还是从网络上下载内容,碰到这些情况的时候,要想高效地利用CPU,在不创建更多底层线程的情况下,程序员不得不将顺序执行的代码切割为多个部分,然后在阻塞的地方加入一个回调:当阻塞的外部条件满足的时候,由操作系统回调回来执行后面的部分。

协程

协程的引入其实就是为了缓解这一艰巨的挑战:它通过引入一个逻辑上抽象的概念来简化编程模型。某种程度上说,可以将一个协程看作是一段可以以非阻塞的方式高效执行的代码; 而多个协程之间可以通过特定关键字或者语句的方式进行组合,以便程序员可以直接写出看起来是同步执行而实际上底层却是被异步调度执行的代码。

Future

不同的编程语言和环境对于协程的具体实现可以是大相径庭的,只是大多编程语言多通过Future/Promise这样的机制来实现。 一个Future代表一个现在可能还没有结果(或者在等待IO完毕或者还没有被调度到CPU上去实际执行),但是将来会返回某个结果的抽象过程。将来会返回的结果抽象为Promise,即承诺一定会返回的某个东西。这两个概念如果存在,一定是成对存在的。

Executor

异步的Future最后必然要被底层的调度程序来驱动和执行,这个负责执行用户定义的抽象的Future的实体成为Executor,它底层的执行体可以是按需创建的线程池,也可以是共享固定大小的线程池甚至于单个线程等。 最好的Executor抽象应该是从JDK5开始流行开来的,目前已经成为一个跨编程语言的抽象。

async/await

对协程的组合和等待则通过async/await这样的关键字来标识。async用于标记某个可以返回Future的语句或者调用,await用来异步的等待某个Future的执行结果,即await的逻辑看起来同步的,但是其实依赖于底层协程的调度结果,即实际执行完毕之后才能得到结果。 如果有后续的逻辑处理依赖于await的结果,那么它们将被自动调度为顺序执行。

Rust的Future

Future本身是一个抽象的概念,对应的Rust语言里面的动态抽象机制是Trait,因此毫不意外地看到,Rust语言定义的Future就是一个Trait。

逻辑上来说,Rust的Future可以简单的被认为是定义为如下的形式

// Warning: this is not the offical definition
trait Future {
    type Output;
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output>;
}

该Trait的核心是定义了一个输出的类型,即实际执行结束之后的结果返回类型,以及一个相关的poll函数。 顾名思义这是一个底层的协程Executor提供的”轮询”执行结果的函数,返回的结果是操作是否完毕,它是一个枚举类型

enum Poll<T> {
    Ready<T>,
    Pending,
}

Rust自身的类型安全机制很好地封装了其它语言中不得不需要的Promise:这里我们不需要专门的Promise类型,仅仅根据枚举值来做模式匹配即可。

用户程序可以主动调用这个轮询函数,驱动底层的Executor来执行一次调用,然后检查实际结果; 只是绝大部分情况下,这并不是最佳的使用方式:因为这一的执行方式和协程底层封装的思路想违背了。大部分情况下,用户需要的是关注多个协程之间的互相组合,确定它们之间的逻辑依赖关系即可,Executor的驱动甚至可以是自动封装驱动的。

wake函数

如果没有这个函数,Executor在需要多次驱动用户提供的Future来持续运行的时候,将没有办法知道如何继续下去。 这个函数的作用就是一个唤醒回调,在Future被实际执行的时候,如果一次没有执行完毕,那么下次被调用的时候,可以从上次中断的地方继续执行下去。

Socket的例子

假设我们有一个从Socket读取数据的Future,它在有数据的时候,直接返回读取到的数据,没有数据的时候,则留待下次调度的时候继续执行;它可以被实现为

pub struct SocketRead<'a> {
    socket: &'a Socket,
}

impl Future for SocketRead <'_> {
    type Output = Vec<u8>;

    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if self.socket.has_data_to_read() {
            Poll::Ready(self.socket.read_buf())
        } else {
            self.socket.set_readable_callback(wake)
            Poll::Pending
        }
    }
}

实现Join的例子

这种实现方式允许我们自由地组合多个Future而不需要额外的中间内存分配,从而兼得抽象的好处而不损失一毫性能,比如我们需要组合两个Future而让它们可以并发执行的时候,可以用如下的实现:

pub struct Join<FutureA, FutureB> {
    a: Option<FutureA>,
    b: Option<FutureB>,
}

impl <FutureA, FutureB> Future for Join<FutureA, FutureB> 
    where 
        FutureA: Future<Output=()>,
        FutureB: Future<Output=()>,
{
    type Output = ();
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if let Some(a) = &mut self.a {
            if let Poll::Ready(()) = a.poll(wake) {
                self.a.take()
            }
        }

        if let Some(b) = &mut self.b {
            if let Poll::Ready(()) = b.poll(wake) {
                self.b.take()
            }
        }

        if self.a.is_none() && self.b.is_none() {
            Poll::Ready(())
        } else {
            Poll::Pending
        }
    }
}

这个轮询的过程非常简单直接,分别轮询传入的两个Future,并且在两个都完成的时候,返回结果。否则返回Pending以便后续的调度继续执行。

实现AndThen

另外一种场景的协程操作是实现穿行逻辑,即第一个future结束后才执行第二个。类似于上面的定义,我们先定义这种顺序组合的Future如下:

pub struct AndThenFut<FutureA, FutureB> {
    first: Option<FutureA>,
    second: FutureB,
}

它的实现则为

impl <FutureA, FutureB> Future for AndThenFut<FutureA, FutureB> 
    where 
        FutureA: Future<Output=()>,
        FutureB: Future<Output=()>,
{
    type Output = ();
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if let Some(first) = &mut self.first {
            match first.poll(wake) {
                Poll:Ready(()) => self.first.take(),
                Poll::Pending => return Poll::Pending,
            }
        }
        self.second.poll(wake)
    }
}

这里我们简化了处理逻辑,仅当第一个FutureA完毕的时候,才会轮询第二个Future,如果第一个Future没有完毕,则继续等待下一次的查询调度。

真正的Future定义

上述的逻辑定义其实是一种简化模型,而真正的标准库中的Future定义其实如下

trait Future {
    type Output;
    fn poll(self: Pin<&mut self>, ctx: &mut Context<'_>) -> Poll::(Self::Output);
}

最显著的不同在于函数的两个参数类型

  • self类型被定义为Pin的分装,这个封装简单的来说,就是允许我们创建不可移动的Future类型;所谓的不可移动对象,是指对象里面可以出现某个字段的指针指向对象的其它字段的情况;这对于实现async/await是至关重要的。
  • 第二个参数用一个Context对象替换了简单的wake函数

我们之所以需要一个Context对象,是因为我们需要在其中存储哪一个Future对象被轮询执行了;这在复杂的多线程程序中是不可或缺的。

Context对象和Waker类型

Context的定义如下

pub struct Context<'a> {
    waker: &'a Waker,
    _marker: PhantomData<fn(&'a ()) -> &'a ()>,
}

它里面仅仅是封装了一个额外的Waker对象,以便用户定义的Future和底层的Executor进行通信,通知对方自身已经准备好了下一步对应的结果就可以返回给外部,其本身提供了clone操作,并且可以可以在多线程环境中移动和共享,因为它标记自己实现了SyncSend

pub struct Waker {
    waker: RawWaker,
}
impl Unpin for Waker {}
unsafe impl Send for Waker {}
unsafe impl Sync for Waker {}

其中嵌套的内部实现里面封装了一个满足具体的Executor要求的、可以包含任意上下文数据的指针data,它的注释很好地阐述了数据的用途:

#[derive(PartialEq, Debug)]
pub struct RawWaker {
    /// A data pointer, which can be used to store arbitrary data as required
    /// by the executor. This could be e.g. a type-erased pointer to an `Arc`
    /// that is associated with the task.
    /// The value of this field gets passed to all functions that are part of
    /// the vtable as the first parameter.
    data: *const (),
    /// Virtual function pointer table that customizes the behavior of this waker.
    vtable: &'static RawWakerVTable,
}

Waker自身提供了wake()函数,和上面简化的例子类似

#[inline]
pub fn wake(self) {
    let wake = self.waker.vtable.wake;
    let data = self.waker.data;
    crate::mem::forget(self);
    unsafe { (wake)(data) };
}

一个定时器实现的例子

假设我们需要实现一个经过给定时间段后过期的名为TimerFuture的future对象,我们需要一个上下文数据标识timer是否已经完成,并且出于简化起见,当Timer被创建的时候,我们启动一个线程来在创建的线程中将其激活,因此我们需要一个跨线程共享的上下文。

类型定义

Arc类型是用来封装上下文状态的最简单的封装类

pub struct TimerFuture {
    shared_state: Arc<Mutex<SharedState>>,
}

struct SharedState {
    completed: bool,
    waker: Option<Waker>,
}
new函数

接下来是它的实现和new函数

impl TimerFuture {
    pub fn new(duration: Duration) -> Self {
        let shared_state = Arc::new(Mutex::new(SharedState {
            completed: false,
            waker: None,
        }));

因为需要创建一个线程来异步地标记Timer到期完毕,我们需要将该共享状态拷贝并移动到lambda对应的线程中去:

        let shared_state_clone = shared_state.clone();
        thread::spawn(move || {
            thread::sleep(duration);
            let mut shared_state = shared_state_clone.lock().unwrap();
            shared_state.completed = true;

当时间到期后,我们除了标记该共享状态的标志外,还需要在Future的使用方没有完成的情况下,主动唤醒底层的调度,此时就可以使用wake()函数了

            if let Some(waker) = shared_state.waker.take() {
                waker.wake();
            }
        });

        TimerFuture { shared_state }
    }
}
Future实现

该TimerFuture的trait实现其实就是对共享状态查询的封装;如果已经完毕,就返回Ready,否则就返回Pending。

这里有个非常细微的问题需要注意:因为waker对象可能随着Executor在线程之间的移动而被拷贝为其它的值,为了避免对象自身保存的waker对象指向了非法的上下文,我们需要在返回pending之前重新拷贝一次自身保存的waker!

impl Future for TimerFuture {
    type Output = ();
    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut shared_state = self.shared_state.lock().unwrap();
        if shared_state.completed {
            Poll::Ready(())
        } else {
            shared_state.waker = Some(ctx.waker().clone());
            Poll::Pending
        }
    }
}

Rust的awaitasync

这次加入1.36稳定版的功能体验在语言层面的两个关键字asyncawait

async

async用于声明一个代码块为返回一个Future,通过在某个普通的函数声明前面加上async,Rust可以自动完成返回类型到Future类型的封装和转换,如下面的代码

async fn do_something() {
    //some heavy operation
}

的返回值会是一个Future.

Rust本身的Executor库也提供了阻塞执行的方法,允许我们在当前的调用线程里面阻塞执行直到封装的异步执行块允许完毕,即如下的代码

let fut = do_something();
block_on(fut);
// proceeds until wrapped something is executed

await

await语句可以作用在Future上,用于非阻塞方式的同步逻辑,即异步地等待作用的Future对象的完成,然后读取返回的结果,考虑如下的三个异步的Future执行块,前两个有先后依赖而第三个可以同时进行:

async fn learn_song() -> Song {
    //dom something
    Song
}
async fn sing_song(song: Song) {
    //sing the song
}

async fn dance() {
    //dance
}

可以用如下的逻辑来表述上面的并发执行行为

async learn_and_sing() {
    let song = learn_song().await;
    sing_song(song).await;
}

let f1 = learn_and_sing().await;
let f2 = dance();
futures::join(f1, f2);

标准库中的Future和futures crate

目前有两个Future库同时存在,一个是标准库中的std::future::Future,另外一个则是futures中定义的futures::future::Future。这一重复定义多少让人感到困惑不解:其实这主要是由于Future特性正在被开发中还不算足够完善的缘故。

早期的实现是通过future-rs扩展库的方式提供的,最近的版本才将它加到了标准库中;甚至于实现也是移过去的;可以认为std::future::Future实现了future-rs里面的一个最小集。后续的功能演进也可能仍然采用类似的策略

  • 标准库中的Future进来保持最小的接口
  • 更复杂的组合功能将会用类似future库的方式来提供

FutureExt

这是基于Future之上的一个扩展的Trait,可以实现很多方便的转换,包括

  • map将包含的输出值经过一个函数处理变换为另外一种输出
  • then实现两个Feature的链式操作,并且将前一个的输出传递为第二个闭包函数的输入,例如
    let f1 = async {1};
    let f2 = f1.then(|x| sync move { x + 3});
    assert_eq!(f2.await, 4);
    
  • left_future/right_future实现根据不同的条件返回不同部分的EitherFuture功能,如
    let x = 6;
    let f = if x < 10 {
      async {true}.left_future()
    } else {
      async {false}.right_future()
    }
    
  • into_stream将future转换为包含单个元素的stream,这个stream的输出是future本身,支持方便的stream操作,如
    let f = async {17};
    let collected:Vec<_> = f.into_stream().collect().await;
    
  • flatten用于实现一次解封操作,等价于 f.then(|x| x)
  • inspect实现一个future传递之前的额外查看和处理,最简单的例子是打印计算的结果,如
    let f = async {1};
    let nf = f.inspect(|&x| println!("will resolve as {}", x)); //nf = 1
    

Leave a Comment

Your email address will not be published. Required fields are marked *

Loading...