async-std是rust异步生态中的基础运行时库之一,核心理念是合理的性能 + 用户友好的api体验。经过几个月密集的开发,前些天已经发布1.0稳定版本。因此是时候来一次深入的底层源码分析。async-std的核心是一个带工作窃取的多线程Executor,而其本身的实现又依赖于async-task这个关键库,因此本文主要对async-task的源码进行分析。

当Future提交给Executor执行时,Executor需要在堆上为这个Future分配空间,同时需要给它分配一些状态信息,比如Future是否可以执行(poll),是否在等待被唤醒,是否已经执行完成等等。我们一般把提交给Executor执行的Future和其连带的状态称为 task。async-task这个库就是对task进行抽象封装,以便于Executor的实现,其有几个创新的特性:

  1. 整个task只需要一次内存分配;
  2. 完全隐藏了RawWaker,以避免实现Executor时处理unsafe代码的麻烦;
  3. 提供了 JoinHandle,这样spawn函数对Future没有 Output=()的限制,极大方便用户使用;

使用方式

async-task只对外暴露了一个函数接口以及对应了两个返回值类型:

pub fn spawn<F, R, S, T>(future: F, schedule: S, tag: T) -> (Task<T>, JoinHandle<R, T>)
whereF: Future<Output = R> + Send + 'static,R: Send + 'static,S: Fn(Task<T>) + Send + Sync + 'static,T: Send + Sync + 'static,

其中,参数future表示要执行的Future,schedule是一个闭包,当task变为可执行状态时会调用这个函数以调度该task重新执行,tag是附带在该task上的额外上下文信息,比如task的名字,id等。 返回值Task就是构造好的task对象,JoinHandle实现了Future,用于接收最终执行的结果。

值得注意的是spawn这个函数并不会做类似在后台进行计算的操作,而仅仅是分配内存,创建一个task出来,因此其实叫create_task反而更为恰当且好理解。

Task提供了如下几个方法:

    // 对该task进行调度pub fn schedule(self);// poll一次内部的Future,如果Future完成了,则会通知JoinHandle取结果。否则task进// 入等待,直到被被下一次唤醒进行重新调度执行。pub fn run(self);// 取消task的执行pub fn cancel(&self);// 返回创建时传入的tag信息pub fn tag(&self) -> &T;

JoinHandle实现了Future trait,同时也提供了如下几个方法:

    // 取消task的执行pub fn cancel(&self);// 返回创建时传入的tag信息pub fn tag(&self) -> &T;

同时,Task和JoinHandle都实现了Send+Sync,所以他们可以出现在不同的线程,并通过tag方法可以同时持有 &T,因此spawn函数对T有Sync的约束。

借助于async_task的抽象,下面的几十行代码就实现了一个共享全局任务队列的多线程Executor:

use std::future::Future;
use std::thread;use crossbeam::channel::{unbounded, Sender};
use futures::executor;
use once_cell::sync::Lazy;static QUEUE: Lazy<Sender<async_task::Task<()>>> = Lazy::new(|| {let (sender, receiver) = unbounded::<async_task::Task<()>>();for _ in 0..4 {let recv = receiver.clone();thread::spawn(|| {for task in recv {task.run(); }});}sender
});fn spawn<F, R>(future: F) -> async_task::JoinHandle<R, ()>
whereF: Future<Output = R> + Send + 'static,R: Send + 'static,
{let schedule = |task| QUEUE.send(task).unwrap();let (task, handle) = async_task::spawn(future, schedule, ());task.schedule();handle
}fn main() {let handles: Vec<_> = (0..10).map(|i| {spawn(async move {println!("Hello from task {}", i);})}).collect();// Wait for the tasks to finish.for handle in handles {executor::block_on(handle);}
}

Task的结构图

通常rust里的并发数据结构会包含底层的实现,一般叫Inner或者RawXXX,包含大量裸指针等unsafe操作,然后再其基础上进行类型安全包装,提供上层语义。比如channel,上层暴露出 SenderReceiver,其行为不一样,但内部表示是完全一样的。async-task也类似,JoinHandle, Task以及调用Future::poll时传递的Waker类型内部都共享同一个RawTask结构。由于JoinHandle本身是一个Future,整个并发结构还有第四个角色-在JoinHandle上调用poll的task传递的Waker,为避免引起混淆就称它为Awaiter吧。整个的结构图大致如下:

整个task在堆上一次分配,内存布局按Header,Tag, Schedule,Future/Output排列。由于Future和Output不同时存在,因此他们共用同一块内存。

  • JoinHandle:只有一个,不访问Future,可以访问Output,一旦销毁就不再生成;
  • Task:主要访问Future,销毁后可以继续生成,不过同一时间最多只有一个,这样可以避免潜在的多个Task对Future进行并发访问的bug;
  • Waker:可以存在多份,主要访问schedule数据,由于spawn函数的参数要求schedule必须是Send+Sync,因此多个waker并发调用是安全的。
  • Header:本身包含三个部分,state是一个原子变量,包含引用计数,task的执行状态,awaiter锁等信息;awaiter保存的是JoinHandle所在的task执行时传递的Waker,用于当Output生成后通知JoinHandle来取;vtable是一个指向静态变量的虚表指针。

task中的状态

所有的并发操作都是通过Header中的state这个原子变量来进行同步协调的。主要有以下几种flag:

  1. constSCHEDULED:usize=1<<0; task已经调度准备下一次执行,这个flag可以和RUNGING同时存在。
  2. constRUNNING:usize=1<<1; 这个task正在执行中,这个flag可以和SCHEDULED同时存在。
  3. constCOMPLETED:usize=1<<2; 这个task的future已经执行完成。
  4. constCLOSED:usize=1<<3; 表示这个task要么被cancel掉了,要么output被JoinHandle取走了,是一个终结状态。
  5. constHANDLE:usize=1<<4; 表示JoinHandle存在。
  6. constAWAITER:usize=1<<5; 表示JoinHandle正在等待Output,用于快速判断Header里的awaiter不为None,避免获取锁的操作。
  7. constLOCKED:usize=1<<6; 读写Header里的awaiter时,需要设置这个字段,标识是否处于locked状态。
  8. constREFERENCE:usize=1<<7; 从第7bit开始到最高位当作引用计数用,代表Task和Waker的总数,主要JoinHandle在HANDLE的flag里跟踪。

JoinHandle的实现分析

JoinHandle::cancel

为避免并发问题,JoinHandle不接触Future数据,而由于取消task的执行需要析构Future数据,因此cancel操作通过重新schedule一次,把操作传递给Task执行。

impl<R, T> JoinHandle<R, T> {pub fn cancel(&self) {let ptr = self.raw_task.as_ptr();let header = ptr as *const Header;unsafe {let mut state = (*header).state.load(Ordering::Acquire);loop {// 如果task已经结束或者closed,什么也不做。if state & (COMPLETED | CLOSED) != 0 {break;}let new = if state & (SCHEDULED | RUNNING) == 0 {// 如果不处于scheduled或running状态,那么下面就需要调用schedule// 函数通知Task,因此要加上SCHEDULED 和增加引用计数(state | SCHEDULED | CLOSED) + REFERENCE} else {// 否则要么task已经schedue过了,过段时间会重新执行,要么当前正在// 运行,因此只需要设置closed状态,task执行完后会收到close状态并// 进行处理。state | CLOSED};match (*header).state.compare_exchange_weak(state,new,Ordering::AcqRel,Ordering::Acquire,) {Ok(_) => {// 重新schedule以便executor将Future销毁if state & (SCHEDULED | RUNNING) == 0 {((*header).vtable.schedule)(ptr);}// 如果有awaiter的话,通知相应的的task。if state & AWAITER != 0 {(*header).notify();}break;}Err(s) => state = s,// 失败重试}}}}
}

JoinHandle::drop

由于整个task的所有权是由JoinHandle,Task和Waker共享的,因此都需要手动实现drop。Output只会由JoinHandle访问,因此如果有的话也要一同销毁。

impl<R, T> Drop for JoinHandle<R, T> {fn drop(&mut self) {let ptr = self.raw_task.as_ptr();let header = ptr as *const Header;let mut output = None;unsafe {// 由于很多时候JoinHandle不用,会在刚创建的时候直接drop掉,因此针对这种情// 况作一个特殊化处理。这样一个原子操作就完成了。if let Err(mut state) = (*header).state.compare_exchange_weak(SCHEDULED | HANDLE | REFERENCE,SCHEDULED | REFERENCE,Ordering::AcqRel,Ordering::Acquire,) {loop {// 如果task完成了,但是还没有close掉,说明output还没有被取走,需// 要在这里取出来进行析构。if state & COMPLETED != 0 && state & CLOSED == 0 {// 标记为closed,这样就可以安全地读取output的数据。match (*header).state.compare_exchange_weak(state,state | CLOSED,Ordering::AcqRel,Ordering::Acquire,) {Ok(_) => {                               output =Some((((*header).vtable.get_output)(ptr) as *mut R).read());// 更新状态重新循环state |= CLOSED;}Err(s) => state = s,}} else {// 进到这里说明task要么没完成,要么已经closed了。let new = if state & (!(REFERENCE - 1) | CLOSED) == 0 {// Task和Waker都已经没了,并且没closed,根据进else的条// 件可知task没完成,Future还在,重新schedule一次,让// executor把Future析构掉。SCHEDULED | CLOSED | REFERENCE} else {// 移除HANDLE flagstate & !HANDLE};match (*header).state.compare_exchange_weak(state,new,Ordering::AcqRel,Ordering::Acquire,) {Ok(_) => {// 如果这是最后一个引用if state & !(REFERENCE - 1) == 0 { if state & CLOSED == 0 {//并且没closed,根据进else的条件可知task没// 完成,重新schedule一次,析构Future((*header).vtable.schedule)(ptr);} else {// task已经完成了,output也已经在上面读出// 来了,同时也是最后一个引用,需要把task自// 身析构掉。((*header).vtable.destroy)(ptr);}}// 还有其他引用在,资源的释放由他们负责。break;}Err(s) => state = s,}}}}}// 析构读取出来的outputdrop(output);}
}

JoinHandle::poll

检查Output是否已经可以拿,没有的话注册cx.waker()等通知。

impl<R, T> Future for JoinHandle<R, T> {type Output = Option<R>;fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {let ptr = self.raw_task.as_ptr();let header = ptr as *const Header;unsafe {let mut state = (*header).state.load(Ordering::Acquire);loop {// task已经closed了,没output可拿。if state & CLOSED != 0 {// 大部分可情况下,header里的awaiter就是cx.waker,也有例外,因// 此一并进行通知。(*header).notify_unless(cx.waker());return Poll::Ready(None);}// 如果task还没完成if state & COMPLETED == 0 {// 那么注册当前的cx.waker到Header::awaiter里,这样完成了可以收// 到通知。abort_on_panic(|| {(*header).swap_awaiter(Some(cx.waker().clone()));});// 要是在上面注册前正好task完成了,那么就收不到通知了,因此注册后// 需要重新读取下状态看看。state = (*header).state.load(Ordering::Acquire);// task已经closed了,没output可拿,返回None。if state & CLOSED != 0 {// 这里我分析下来是不需要再通知了,提了个pr等作者回应。(*header).notify_unless(cx.waker());return Poll::Ready(None);}// task还没完成,上面已经注册了waker,可以直接返回Pending。if state & COMPLETED == 0 {return Poll::Pending;}}// 到这里说明task已经完成了。把它设置为closed状态,就可以拿output了。match (*header).state.compare_exchange(state,state | CLOSED,Ordering::AcqRel,Ordering::Acquire,) {Ok(_) => {// 设置closed成功,通知其他的awaiter。由于上面是原子的swap操// 作,且一旦设置为closed,awaiter就不会再变更了,因此可以// 用AWAITER这个flag进行快速判断。if state & AWAITER != 0 {(*header).notify_unless(cx.waker());}// 读取出Output并返回。let output = ((*header).vtable.get_output)(ptr) as *mut R;return Poll::Ready(Some(output.read()));}Err(s) => state = s,}}}}
}

Task的实现分析

Task::schedule

这个函数先通过Task内部保存的指针指向Header,并从Header的vtable字段中拿到schedule函数指针,这个函数最终调用的是用户调用spawn时传入的schedule闭包。因此本身很直接。

Task::run

这个函数先通过Task内部保存的指针指向Header,并从Header的vtable字段中拿到run函数指针,其指向RawTask::run,实现如下:

首先根据指针参数强转为RawTask,并根据Header的vtable拿到RawWakerVTable,构造好Waker和Context,为调用Future::poll做准备。

unsafe fn run(ptr: *const ()) {let raw = Self::from_ptr(ptr);let waker = ManuallyDrop::new(Waker::from_raw(RawWaker::new(ptr,&(*raw.header).vtable.raw_waker,)));let cx = &mut Context::from_waker(&waker);//...
}

然后获取当前的state,循环直到更新state的RUNING成功为止。

    let mut state = (*raw.header).state.load(Ordering::Acquire);loop {// 如果task已经closed,那么Future可以直接析构掉,并返回。if state & CLOSED != 0 {if state & AWAITER != 0 {(*raw.header).notify();}Self::drop_future(ptr);// 扣掉当前task的引用计数,因为run函数的参数是self。Self::decrement(ptr);return;}// 移除SCHEDULED状态,并标记RUNINGmatch (*raw.header).state.compare_exchange_weak(state,(state & !SCHEDULED) | RUNNING,Ordering::AcqRel,Ordering::Acquire,) {Ok(_) => {// 更新state到新的状态,后面的代码还要复用state。state = (state & !SCHEDULED) | RUNNING;break;}Err(s) => state = s,}}

标记为RUNING状态后,就可以开始正式调用Future::poll了,不过在调用前设置Guard,以便poll函数panic时,可以调用Guard的drop函数保证状态一致。

    let guard = Guard(raw);let poll = <F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx);mem::forget(guard); // 没panic,移除掉guard.drop的调用。match poll {Poll::Ready(out) => {/// ...              }Poll::Pending => {// ... }}

如果Future完成了,那么先把Future析构掉,腾出内存把output写进去。并循环尝试将RUNING状态去掉。

match poll {Poll::Ready(out) => {Self::drop_future(ptr);raw.output.write(out);let mut output = None;loop {// JoinHandle已经没了,那么output没人取,我们需要析构掉output,并设置为// closed状态。let new = if state & HANDLE == 0 {(state & !RUNNING & !SCHEDULED) | COMPLETED | CLOSED} else {(state & !RUNNING & !SCHEDULED) | COMPLETED};match (*raw.header).state.compare_exchange_weak(state,new,Ordering::AcqRel,Ordering::Acquire,) {Ok(_) => {// 如果handle没了,或者跑的时候closed了,那么需要把output再读取// 出来析构掉。if state & HANDLE == 0 || state & CLOSED != 0 {output = Some(raw.output.read());}// 通知JoinHandle来取数据。if state & AWAITER != 0 {(*raw.header).notify();}Self::decrement(ptr);break;}Err(s) => state = s,}}drop(output);}Poll::Pending => {// ...} 

如果没完成的话,循环尝试移除RUNING,同时在poll的时候其他线程不能调用shedule函数,而是设置SCHEDULED,所以需要检查这个flag,如果设置了,则需要代劳。

match poll {Poll::Ready(out) => {/// handle ready case ...               }Poll::Pending => {loop {// poll的时候closed了,这里为啥要移除SCHEDULED状态,暂时不清楚,需要问问// 作者。let new = if state & CLOSED != 0 {state & !RUNNING & !SCHEDULED} else {state & !RUNNING};match (*raw.header).state.compare_exchange_weak(state,new,Ordering::AcqRel,Ordering::Acquire,) {Ok(state) => {if state & CLOSED != 0 {// 设置closed状态的那个线程是不能碰Future的,否则和当前线程// 产生内存并发访问冲突。因此代劳析构操作。Self::drop_future(ptr);Self::decrement(ptr);} else if state & SCHEDULED != 0 {// poll的时候其他线程想schedule这个task,但是不能调用,因此// 当前线程代劳。 chedule函数接收self,类似move语义,因此这里// 不需要decrement。Self::schedule(ptr);} else {Self::decrement(ptr);}break;}Err(s) => state = s,}}}
}

在poll时如果发生panic,则Guard负责收拾残局。

fn drop(&mut self) {let raw = self.0;let ptr = raw.header as *const ();unsafe {let mut state = (*raw.header).state.load(Ordering::Acquire);loop {// poll的时候被其他线程closed了,if state & CLOSED != 0 {// 看代码state一旦处于CLOSED后,schedule不会再运行。这里为啥要移除// SCHEDULED状态,暂时不清楚,需要问问作者。(*raw.header).state.fetch_and(!SCHEDULED, Ordering::AcqRel);// 析构FutureRawTask::<F, R, S, T>::drop_future(ptr);RawTask::<F, R, S, T>::decrement(ptr);break;}match (*raw.header).state.compare_exchange_weak(state,(state & !RUNNING & !SCHEDULED) | CLOSED,Ordering::AcqRel,Ordering::Acquire,) {Ok(state) => {// 析构FutureRawTask::<F, R, S, T>::drop_future(ptr);// 通知awaitertask已经close了.if state & AWAITER != 0 {(*raw.header).notify();}RawTask::<F, R, S, T>::decrement(ptr);break;}Err(s) => state = s,}}}
}

Waker相关函数的实现

wake函数

wake函数主要功能是设置SCHEDULE状态,并尝试调用schedule函数,有两个重要的细节需要注意:

  1. task正在执行时不能调用schedule函数;
  2. 当task已经被schedule过了时,也需要额外做一次原子操作,施加Release语义。
unsafe fn wake(ptr: *const ()) {let raw = Self::from_ptr(ptr);let mut state = (*raw.header).state.load(Ordering::Acquire);loop {            if state & (COMPLETED | CLOSED) != 0 {// 如果task完成或者close了,直接drop掉自己,wake的参数是self语义Self::decrement(ptr);break;}if state & SCHEDULED != 0 {// 这段代码极为关键,如果task已经schedule过了,则重新把读出来的state// 设置回去,虽然看起来好像是无用的,其实是为了施加Release同步语义,// 把当前线程的内存视图同步到其他线程去。即便是rust标准库,之前也因为// 没处理好类似这个情况出过bug。match (*raw.header).state.compare_exchange_weak(state,state,Ordering::AcqRel,Ordering::Acquire,) {Ok(_) => {Self::decrement(ptr);break;}Err(s) => state = s,}} else {// task没schedule过,则设置状态。match (*raw.header).state.compare_exchange_weak(state,state | SCHEDULED,Ordering::AcqRel,Ordering::Acquire,) {Ok(_) => {// 如果task当前没有运行,那么可以调用schedule函数。if state & (SCHEDULED | RUNNING) == 0 {// Schedule the task.let task = Task {raw_task: NonNull::new_unchecked(ptr as *mut ()),_marker: PhantomData,};(*raw.schedule)(task);} else {// task正在运行,不需要调用schedule,等运行结束后对应的// 线程会代劳。Self::decrement(ptr);}break;}Err(s) => state = s,}}}
}

wake_by_ref

这个函数的功能和wake类似,唯一的区别就是wake的参数是self,有move语义,wakebyref是&self。实现差异不大,就不做具体分析了。

clone_waker

waker的clone实现也比较简单,直接将Header里的state的引用计数加一即可。

unsafe fn clone_waker(ptr: *const ()) -> RawWaker {let raw = Self::from_ptr(ptr);let raw_waker = &(*raw.header).vtable.raw_waker;let state = (*raw.header).state.fetch_add(REFERENCE, Ordering::Relaxed);if state > isize::max_value() as usize {std::process::abort();}RawWaker::new(ptr, raw_waker)
}

总结

整个task的设计非常精细,api也非常直观,难怪一发布就直接上1.0版本。

线程中task取消_Rust Async: async-task源码分析相关推荐

  1. Java Review - 线程池资源一直不被释放案例源码分析

    文章目录 概述 问题复现 源码分析 小结 概述 在日常开发中为了便于线程的有效复用,经常会用到线程池,然而使用完线程池后如果不调用shutdown关闭线程池,则会导致线程池资源一直不被释放. 下面通过 ...

  2. Java中ConcurrentHashMap底层实现原理(JDK1.8)源码分析2

    https://blog.csdn.net/programmer_at/article/details/79715177 https://blog.csdn.net/qq_41737716/categ ...

  3. react中context到底是如何传递的-源码分析

    react中使用context 基本要求就是 父组件中声明Parent.prototype.getChildContext 父组件中声明Parent.childContextType 子组件声明 Ch ...

  4. hashmap实现原理_Java中HashMap底层实现原理(JDK1.8)源码分析

    在JDK1.6,JDK1.7中,HashMap采用位桶+链表实现,即使用链表处理冲突,同一hash值的链表都存储在一个链表里.但是当位于一个桶中的元素较多,即hash值相等的元素较多时,通过key值依 ...

  5. Java面试绕不开的问题: Java中HashMap底层实现原理(JDK1.8)源码分析

    这几天学习了HashMap的底层实现,但是发现好几个版本的,代码不一,而且看了Android包的HashMap和JDK中的HashMap的也不是一样,原来他们没有指定JDK版本,很多文章都是旧版本JD ...

  6. Yii中Flash数据的处理机制(源码分析)

    Flash数据是一种特别的session数据,它一旦在某个请求中设置后, 只会在下次请求中有效,然后该数据就会自动被删除. 常用于实现只需显示给终端用户一次的信息, 如用户提交一个表单后显示确认信息. ...

  7. java 线程池 源码_java线程池源码分析

    我们在关闭线程池的时候会使用shutdown()和shutdownNow(),那么问题来了: 这两个方法又什么区别呢? 他们背后的原理是什么呢? 线程池中线程超过了coresize后会怎么操作呢? 为 ...

  8. Spark源码分析之七:Task运行(一)

    在Task调度相关的两篇文章<Spark源码分析之五:Task调度(一)>与<Spark源码分析之六:Task调度(二)>中,我们大致了解了Task调度相关的主要逻辑,并且在T ...

  9. celery源码分析-Task的初始化与发送任务

    celery源码分析 本文环境python3.5.2,celery4.0.2,django1.10.x系列 celery的任务发送 在Django项目中使用了装饰器来包装待执行任务, from cel ...

最新文章

  1. [ZJOI2010]网络扩容
  2. [Erlang33]使用recon从网页查看Erlang运行状态
  3. 邀请函丨云和恩墨邀您一起迈向混合数据库时代!
  4. PHP 使用header函数设置HTTP头的示例方法 表头(转)
  5. kettle转换和作业插件开发及调试
  6. UVA11219 How old are you?【日期】
  7. webpack之loader篇
  8. javascript实现的时钟
  9. ios app 提交评审注意事项
  10. 关于MeScroll的下拉刷新,上拉加载的js框架的学习和使用
  11. win10引导安卓x86_手把手教你在电脑上安装安卓x86版+win10的双系统(只能在能用微软电脑系统的平板上安装)...
  12. (CentOS7)IP地址的配置与主机名和hosts映射
  13. 超强,废弃手机用来做服务器,不用root,外网可访问!
  14. keil5图标变成白色_桌面图标出现白块,显示图标异常的解决方法
  15. balser相机连接设置设置步骤
  16. Java并发的四种风味:Thread、Executor、ForkJoin和Actor
  17. excel下拉菜单创建
  18. 几款国产FPGA系列器件参数汇总
  19. flutter Container设置渐变色
  20. 百度地图绘制行驶轨迹、折线上添加箭头、修改地图底色

热门文章

  1. Atitit.js this错误指向window的解决方案
  2. 18 4Sum(寻找四个数之和为指定数的集合Medium)
  3. JDK内置的进制转换
  4. 一个简单的parser
  5. PHP环境安全性能检查
  6. hdu 1165 坑爹找规律题
  7. 【开始研究Community Server,转贴一点东西】Community Server资料收集
  8. git-从入门到熟悉
  9. hbase单机模式配置
  10. Leetcode--76. 最小覆盖子串