目录

  • 一. 协程的挂起、恢复和调度的设计思想
  • 二. 深入解析协程
    • 1. 协程的创建与启动
    • 2. 协程的线程调度
    • 3. 协程的挂起和恢复
    • 4. 不同 resumeWith 的解析
    • 5. 协程整体结构

一. 协程的挂起、恢复和调度的设计思想

被 suspend 修饰符修饰的函数在编译期间会被编译器做特殊处理:CPS(续体传递风格)变换,它会改变挂起函数的函数签名。

suspend fun <T> CompletableFuture<T>.await(): T

会转变成

fun <T> CompletableFuture<T>.await(continuation: Continuation<T>): Any?

编译器对挂起函数的第一个改变就是对函数签名的改变,这种改变被称为 CPS(续体传递风格)变换。

我们可以看到,函数变换之后多了一个参数Continuation,声明如下:

interface Continuation<in T> {val context: CoroutineContextfun resumeWith(result: Result<T>)
}

Continuation 包装了协程在挂起之后应该继续执行的代码;在编译的过程中,一个完整的协程可能会有多个挂起点 (suspension point) , 挂起点把协程分割切块成一个又一个续体。在 await 函数的挂起结束以后,它会调用 continuation 参数的 resumeWith 函数,来恢复执行 await 函数后面的代码。

值得一提的是,除了会返回一个本身的返回值,还会返回一个标记,COROUTINE_SUSPENDED,返回它的挂起函数表示这个挂起函数会发生事实上的挂起操作。什么叫事实上的挂起操作呢?比如:

launch {val deferred = async {// 发起了一个网络请求......}// 做了一些操作......deferred.await()// 后续的一些操作......
}

在 deferred.await() 这行执行的时候,如果网络请求已经取得了结果,那 await 函数会直接取得结果,而不会事实上的挂起协程。

明白了这么多概念之后,我们看看一个具体的例子:

val a = a()
val y = foo(a).await() // 挂起点 #1
b()
val z = bar(a, y).await() // 挂起点 #2
c(z)

这里有两个挂起点,编译后可以看到生成的伪字节码:

class <anonymous_for_state_machine> extends SuspendLambda<...> {// 状态机当前状态int label = 0// 协程的局部变量A a = nullY y = nullvoid resumeWith(Object result) {if (label == 0) goto L0if (label == 1) goto L1if (label == 2) goto L2else throw IllegalStateException()L0:a = a()label = 1// 'this' 作为续体传递result = foo(a).await(this)// 如果 await 挂起了执行则返回if (result == COROUTINE_SUSPENDED) returnL1:// 外部代码调用resumeWith y = (Y) resultb()label = 2result = bar(a, y).await(this)if (result == COROUTINE_SUSPENDED) return L2:Z z = (Z) resultc(z)// label = -1 代表已经没有其他的步骤了label = -1return}
}

在这段伪代码中,我们很容易理解它的实现逻辑:L0 代表挂起点1之前的续体,首先goto L0开始,直到调用挂起点1的 result = foo(a).await(this) 方法,this就是续体,如果 await 没挂起,直接使用结果跳入L1中;如果挂起了则直接返回,await 方法执行完后,调用 await 方法体中的 Continuation 对象,调用它的 resumeWith ,goto L1,依次类推。

其中 label 记录了状态,这也被称为状态机的实现方式。

到这里,大家可能不清楚,为什么协程刚开始就进入resumeWith方法呢?别着急,后面会提到为什么。

上面只是简单介绍以下协程的实现原理,介绍了以下相关的概念:CPS、续体、挂起点、状态机等,具体如何如何实现,必须深入源码去了解。

二. 深入解析协程

1. 协程的创建与启动

先从一个简单的创建方法CoroutineScope.launch开始:

public fun CoroutineScope.launch(context: CoroutineContext = EmptyCoroutineContext,start: CoroutineStart = CoroutineStart.DEFAULT,block: suspend CoroutineScope.() -> Unit
): Job {...coroutine.start(start, coroutine, block)return coroutine
}

coroutine.start(start, coroutine, block) 这里会根据start属性决定初始化何种协程对象:

    public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>) =when (this) {CoroutineStart.DEFAULT -> block.startCoroutineCancellable(completion)CoroutineStart.ATOMIC -> block.startCoroutine(completion)CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(completion)CoroutineStart.LAZY -> Unit // will start lazily}

我们直接从默认的CoroutineStart.DEFAULT入手,其最终会调用到createCoroutineUnintercepted:

// his function creates a new, fresh instance of suspendable computation every time it is invoked.
// To start executing the created coroutine, invoke `resume(Unit)` on the returned [Continuation] instance.
public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(completion: Continuation<T>
): Continuation<Unit> { ... }

这里贴了一下注释,意思是创建一个可挂起的协程,启动时调用返回对象Continuation的resume(Unit)方法,这个方法是它的内联扩展方法:

public inline fun <T> Continuation<T>.resume(value: T): Unit =resumeWith(Result.success(value))

这里调用的其实就是Continuation接口的resumeWith方法。

所以协程创建出来时就会去调用是Continuation接口的resumeWith方法。这就解释了上文的流程图为什么从resumeWith开始。

2. 协程的线程调度

我们从 launch 创建协程调用的 startCoroutineCancellable 开始;

internal fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>) =createCoroutineUnintercepted(completion).intercepted().resumeCancellable(Unit)
  • createCoroutineUnintercepted(completion) 会创建一个新的协程,返回值类型为 Continuation
  • intercepted() 是给 Continuation 加上 ContinuationInterceptor 拦截器,也是线程调度的关键
  • resumeCancellable(Unit) 最终将调用 resume(Unit) 启动协程

我们来看一下intercepted()的具体实现:

public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =(this as? ContinuationImpl)?.intercepted() ?: this
// ContinuationImpl 是 SuspendLambda 的父类
internal abstract class ContinuationImpl(...) : BaseContinuationImpl(completion) {@Transientprivate var intercepted: Continuation<Any?>? = nullpublic fun intercepted(): Continuation<Any?> =intercepted?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this).also { intercepted = it }
}

context[ContinuationInterceptor]?.interceptContinuation(this) 就是利用上下文对象 context 得到 CoroutineDispatcher,会使用协程的CoroutineDispatcher的interceptContinuation 方法:

public abstract class CoroutineDispatcher :AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =DispatchedContinuation(this, continuation)
}

interceptContinuation 方法中使用 DispatchedContinuation类 包装原来的 Continuation,拦截所有的协程运行操作:

internal class DispatchedContinuation<in T>(@JvmField val dispatcher: CoroutineDispatcher,@JvmField val continuation: Continuation<T>
) : Continuation<T> by continuation, DispatchedTask<T> {inline fun resumeCancellable(value: T) {// 判断是否需要线程调度if (dispatcher.isDispatchNeeded(context)) {...// 将协程的运算分发到另一个线程dispatcher.dispatch(context, this)} else {...// 如果不需要调度,直接在当前线程执行协程运算resumeUndispatched(value)}}override fun resumeWith(result: Result<T>) {// 判断是否需要线程调度if (dispatcher.isDispatchNeeded(context)) {...// 将协程的运算分发到另一个线程dispatcher.dispatch(context, this)} else {...// 如果不需要调度,直接在当前线程执行协程运算continuation.resumeWith(result)}}
}internal interface DispatchedTask<in T> : Runnable {public override fun run() {// 任务的执行最终来到这里,这里封装了 continuation.resume 逻辑}
}

总结: 协程的调度是通过 CoroutineDispatcher 的 interceptContinuation 方法来包装原来的 Continuation 为 DispatchedContinuation,来拦截每个续体的运行操作,DispatchedContinuation 拦截了协程的启动和恢复,分别是 resumeCancellable(Unit) 和重写的 resumeWith(Result),然后通过 CoroutineDispatcher 的 dispatch 分发协程的运算任务,最终调用到DispatchedTask 这个 Runnable。

3. 协程的挂起和恢复

我们先来看一下挂起,看一个例子:

fun main(args: Array<String>) = runBlocking<Unit> { launch(Dispatchers.Unconfined) { println("${Thread.currentThread().name} : launch start")async(Dispatchers.Default) { println("${Thread.currentThread().name} : async start")delay(100)  println("${Thread.currentThread().name} : async end")}.await()  println("${Thread.currentThread().name} : launch end")}
}

async在delay函数中被挂起,我们来看一下launch函数内反编译得到的代码:

public final Object invokeSuspend(@NotNull Object result) {Object coroutine_suspended = IntrinsicsKt.getCOROUTINE_SUSPENDED();switch (this.label) {case 0:...System.out.println(stringBuilder.append(currentThread.getName()).append(" : launch start").toString());// 新建并启动 async 协程 Deferred async$default = BuildersKt.async$default(coroutineScope, (CoroutineContext) Dispatchers.getDefault(), null, (Function2) new 1(null), 2, null);this.label = 1;// 调用 await() 挂起函数if (async$default.await(this) == coroutine_suspended) {return coroutine_suspended;}break;case 1:// 恢复协程后再执行一次 resumeWith(),然后无异常的话跳出if (result instanceof Failure) {throw ((Failure) result).exception;}break;default:throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");}...System.out.println(stringBuilder2.append(currentThread2.getName()).append(" : launch end").toString());return Unit.INSTANCE;
}

上面代码最关键的地方在于 async$default.await(this) == coroutine_suspended , 如果async线程未执行完成,那么await()返回为IntrinsicsKt.getCOROUTINE_SUSPENDED(),就会 return,然后async所在的线程就会继续执行。当恢复该协程后再执行一次 resumeWith(),调用invokeSuspend(),

总结:协程挂起实际上就是协程挂起点之前的逻辑执行完,然后判断是否是事实上的挂起,如果挂起了则返回,等待挂起函数执行完成,完成后调用resumeWith恢复协程,继续执行该协程下面的代码。

我们再来看一下协程怎么恢复:

我们来看一下await()的代码,关键点在于,实现了一个CompletableDeferredImple对象,调用了 JobSupport.awaitSuspend() 方法

private suspend fun awaitSuspend(): Any? = suspendCoroutineUninterceptedOrReturn { uCont ->val cont = AwaitContinuation(uCont.intercepted(), this)cont.initCancellability()invokeOnCompletion(ResumeAwaitOnCompletion(this, cont).asHandler)cont.getResult()
}

在这里,将 launch(this) 协程封装为 ResumeAwaitOnCompletion 作为 handler 节点。

在方法 invokeOnCompletion 中:

// handler 就是 ResumeAwaitOnCompletion 的实例,将 handler 作为节点
val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
// 将 node 节点添加到 state.list 中
if (!addLastAtomic(state, list, node)) return@loopOnState // retry

这里将 handler 节点添加到 aynsc 协程的 state.list 中,然后在 async 协程完成时会通知 handler 节点调用 launch 协程的 resume(result) 方法将结果传给 launch 协程。

事实上,handler节点完成到launch恢复的过程也是比较复杂的,这里可以通过断点调试查看调用的过程:

从 async 协程的 SuspendLambda 的子类 BaseContinuationImpl 的completion.resumeWith(outcome) -> AbstractCoroutine.resumeWith(result) …-> JobSupport.tryFinalizeSimpleState() -> JobSupport.completeStateFinalization() -> state.list?.notifyCompletion(cause) -> node.invoke,最后 handler 节点里面通过调用resume(result)恢复协程。

总结:所以await()挂起函数恢复协程的原理是,将 launch 协程封装为 ResumeAwaitOnCompletion 作为 handler 节点添加到 aynsc 协程的 state.list,然后在 async 协程完成时会通知 handler 节点,最终会调用 launch 协程的 resume(result) 方法将结果传给 launch 协程,并恢复 launch 协程继续执行 await 挂起点之后的逻辑。

4. resumeWith解析

值得一提的是,续体completion有两种不一样的实现方式,分别是BaseContinuationImpl和AbstractCoroutine,它们的resumeWith执行着不一样的逻辑,先来看BaseContinuationImpl:

internal abstract class BaseContinuationImpl(public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {public final override fun resumeWith(result: Result<Any?>) {...var param = resultwhile (true) {with(current) {val completion = completion!!val outcome: Result<Any?> =try {// 调用 invokeSuspend 方法执行,执行协程的真正运算逻辑val outcome = invokeSuspend(param)// 协程挂起时 invokeSuspend 才会返回 COROUTINE_SUSPENDED,所以协程挂起时,先return,再次调用 resumeWith 时,协程挂起点之后的逻辑才能继续执行if (outcome === COROUTINE_SUSPENDED) returnResult.success(outcome)} catch (exception: Throwable) {Result.failure(exception)}releaseIntercepted() // 这里可以看出 Continuation 其实分为两类,一种是 BaseContinuationImpl,封装了协程的真正运算逻辑if (completion is BaseContinuationImpl) {// unrolling recursion via loopcurrent = completionparam = outcome} else {//  这里实际调用的是其父类 AbstractCoroutine 的 resumeWith 方法completion.resumeWith(outcome)return}}}}

看一下AbstractCoroutine 的resumeWith实现:

  public final override fun resumeWith(result: Result<T>) {makeCompletingOnce(result.toState(), defaultResumeMode)}/** *  Returns:* * `true` if state was updated to completed/cancelled;* * `false` if made completing or it is cancelling and is waiting for children.*/internal fun makeCompletingOnce(proposedUpdate: Any?, mode: Int): Boolean = loopOnState { state ->when (tryMakeCompleting(state, proposedUpdate, mode)) {COMPLETING_ALREADY_COMPLETING -> throw IllegalStateException("Job $this is already complete or completing, " +"but is being completed with $proposedUpdate", proposedUpdate.exceptionOrNull)COMPLETING_COMPLETED -> return trueCOMPLETING_WAITING_CHILDREN -> return falseCOMPLETING_RETRY -> return@loopOnStateelse -> error("unexpected result")}}

可以看到 BaseContinuationImpl 的 resumeWith 封装了协程的运算逻辑,而 AbstractCoroutine 的 resumeWith 主要用来管理协程的状态。

5. 协程整体结构

从上面的协程执行流程,我们可以梳理一下协程的整体结构;


其中最上层的DispatcherContinuation负责协程的调度逻辑,第二层的BaseContinuaImpl的 invokeSuspend 封装了协程真正的运算逻辑,AbstractCoroutine封装了协程的状态(Job,deferred)。

协程的挂起、恢复和调度的原理 (二)相关推荐

  1. Kotlin协程:挂起与恢复原理逆向刨析

    前言:只有在那崎岖的小路上不畏艰险奋勇攀登的人,才有希望达到光辉的顶点. --马克思 前言 经过前面两篇协程的学习,我相信大家对协程的使用已经非常熟悉了.本着知其然更要知其之所以然的心态,很想知道它里 ...

  2. 分析Kotlin协程只挂起不恢复会怎样(是否存在协程泄漏),以及挂起的协程存在哪里?

    前言 刚开始正式学协程原理的时候(以前只是学api怎么用),大概是20年6月,也就是bennyhuo大佬出书<深入理解Kotlin协程>的时候,我买了本然后细细研究,我的内心就一直有一个问 ...

  3. Kotlin的协程:挂起函数

    挂起函数 挂起函数是指使用 suspend 关键字修饰的函数. suspend fun getUserInfo(): String {withContext(Dispatchers.IO) {dela ...

  4. lua服务执行过程中协程的挂起和重新唤醒

    lua服务在执行回调函数的过程中,调用某些函数会挂起协程,比如skynet.call, skynet.ret, skynet.response等等,这些函数把协程挂起后,如何唤醒呢? 本文将对所有调用 ...

  5. linux ucontext 类型,协程:posix::ucontext用户级线程实现原理分析 | WalkerTalking

    在听完leader的课程后,对其中协程的实现方式有了基本的了解,无论的POSIX的ucontex,boost::fcontext,还是libco,都是通过保存和恢复寄存器状态,来进行各个协程上下文的保 ...

  6. Kotlin 协程与架构组件一起使用及底层原理分析,音视频开发前景

    if (!isChangingConfigurations()) { getViewModelStore().clear(); } } } }); } 在Activity的生命周期走到onDestro ...

  7. Kotlin 协程调度切换线程是时候解开真相了

    前言 协程系列文章: 一个小故事讲明白进程.线程.Kotlin 协程到底啥关系? 少年,你可知 Kotlin 协程最初的样子? 讲真,Kotlin 协程的挂起/恢复没那么神秘(故事篇) 讲真,Kotl ...

  8. 一个小故事讲明白进程、线程、Kotlin 协程到底啥关系?

    前言 协程系列文章: 一个小故事讲明白进程.线程.Kotlin 协程到底啥关系? 少年,你可知 Kotlin 协程最初的样子? 讲真,Kotlin 协程的挂起/恢复没那么神秘(故事篇) 讲真,Kotl ...

  9. pdf 深入理解kotlin协程_Kotlin协程实现原理:挂起与恢复

    今天我们来聊聊Kotlin的协程Coroutine. 如果你还没有接触过协程,推荐你先阅读这篇入门级文章What? 你还不知道Kotlin Coroutine? 如果你已经接触过协程,但对协程的原理存 ...

最新文章

  1. 5篇CVPR 各路大佬显身手 点云分割、姿态估计、物体检测、生成重建
  2. IFE-16 addEventHandler跨浏览器实现事件绑定
  3. [bzoj 2653][国家集训队]middle
  4. 计算机单词修改是否正确,计算机组装必懂的53个单词及装机步骤51条.doc
  5. 华水c语言课程设计,【图片】发几个C语言课程设计源代码(恭喜自己当上技术小吧主)【东华理工大学吧】_百度贴吧...
  6. 液体火箭发动机技术国家级重点实验室2021年度对外开放项目指南
  7. Mr.J-- jQuery学习笔记(四)--内容选择器
  8. JS 面向对象实例 prototype
  9. 已有项目要不要迁移到Addressable系统?
  10. python如何运行一个python程序_python如何运行?第一个python小程序示范
  11. vue打开二级或者三级页面传输对象,再刷新浏览器数据丢失问题解决(vue使用router传递数据)
  12. ASCSDK-------通用包接入文档(COCOS篇)
  13. matlab中firrcos,DMR数字集群关键技术的应用研究
  14. 实习面试感悟-阿里云
  15. ssh: Could not resolve hostname f: Name or service not known的解决
  16. 基于Caffe ResNet-50网络实现图片分类(仅推理)的实验复现
  17. 个人网站个人主页的建立
  18. ERC20接口下USDT代币的深入解析
  19. 微信小程序如何修改小程序名称
  20. windows配置spark开发环境

热门文章

  1. PostgreSQL学习笔记9之事务隔离
  2. setsockopt函数全面解析
  3. WebRTC 的 AudioSource/AudioTrack
  4. OkHttp3 HTTP请求执行流程分析
  5. 数据结构与算法 | 树与二叉树
  6. 注意System.currentTimeMillis()潜在的性能问题
  7. 深入理解Netty高性能网络框架
  8. 为什么我建议你现在学Vue3?
  9. 多线程小抄集(新编四)
  10. C++中的多态(一)