协程的挂起、恢复和调度的原理 (二)
目录
- 一. 协程的挂起、恢复和调度的设计思想
- 二. 深入解析协程
- 1. 协程的创建与启动
- 2. 协程的线程调度
- 3. 协程的挂起和恢复
- 4. 不同 resumeWith 的解析
- 5. 协程整体结构
一. 协程的挂起、恢复和调度的设计思想
被 suspend 修饰符修饰的函数在编译期间会被编译器做特殊处理:CPS(续体传递风格)变换,它会改变挂起函数的函数签名。
suspend fun <T> CompletableFuture<T>.await(): T
会转变成
fun <T> CompletableFuture<T>.await(continuation: Continuation<T>): Any?
编译器对挂起函数的第一个改变就是对函数签名的改变,这种改变被称为 CPS(续体传递风格)变换。
我们可以看到,函数变换之后多了一个参数Continuation,声明如下:
interface Continuation<in T> {val context: CoroutineContextfun resumeWith(result: Result<T>)
}
Continuation 包装了协程在挂起之后应该继续执行的代码;在编译的过程中,一个完整的协程可能会有多个挂起点 (suspension point) , 挂起点把协程分割切块成一个又一个续体。在 await 函数的挂起结束以后,它会调用 continuation 参数的 resumeWith 函数,来恢复执行 await 函数后面的代码。
值得一提的是,除了会返回一个本身的返回值,还会返回一个标记,COROUTINE_SUSPENDED,返回它的挂起函数表示这个挂起函数会发生事实上的挂起操作。什么叫事实上的挂起操作呢?比如:
launch {val deferred = async {// 发起了一个网络请求......}// 做了一些操作......deferred.await()// 后续的一些操作......
}
在 deferred.await() 这行执行的时候,如果网络请求已经取得了结果,那 await 函数会直接取得结果,而不会事实上的挂起协程。
明白了这么多概念之后,我们看看一个具体的例子:
val a = a()
val y = foo(a).await() // 挂起点 #1
b()
val z = bar(a, y).await() // 挂起点 #2
c(z)
这里有两个挂起点,编译后可以看到生成的伪字节码:
class <anonymous_for_state_machine> extends SuspendLambda<...> {// 状态机当前状态int label = 0// 协程的局部变量A a = nullY y = nullvoid resumeWith(Object result) {if (label == 0) goto L0if (label == 1) goto L1if (label == 2) goto L2else throw IllegalStateException()L0:a = a()label = 1// 'this' 作为续体传递result = foo(a).await(this)// 如果 await 挂起了执行则返回if (result == COROUTINE_SUSPENDED) returnL1:// 外部代码调用resumeWith y = (Y) resultb()label = 2result = bar(a, y).await(this)if (result == COROUTINE_SUSPENDED) return L2:Z z = (Z) resultc(z)// label = -1 代表已经没有其他的步骤了label = -1return}
}
在这段伪代码中,我们很容易理解它的实现逻辑:L0 代表挂起点1之前的续体,首先goto L0开始,直到调用挂起点1的 result = foo(a).await(this) 方法,this就是续体,如果 await 没挂起,直接使用结果跳入L1中;如果挂起了则直接返回,await 方法执行完后,调用 await 方法体中的 Continuation 对象,调用它的 resumeWith ,goto L1,依次类推。
其中 label 记录了状态,这也被称为状态机的实现方式。
到这里,大家可能不清楚,为什么协程刚开始就进入resumeWith方法呢?别着急,后面会提到为什么。
上面只是简单介绍以下协程的实现原理,介绍了以下相关的概念:CPS、续体、挂起点、状态机等,具体如何如何实现,必须深入源码去了解。
二. 深入解析协程
1. 协程的创建与启动
先从一个简单的创建方法CoroutineScope.launch
开始:
public fun CoroutineScope.launch(context: CoroutineContext = EmptyCoroutineContext,start: CoroutineStart = CoroutineStart.DEFAULT,block: suspend CoroutineScope.() -> Unit
): Job {...coroutine.start(start, coroutine, block)return coroutine
}
coroutine.start(start, coroutine, block) 这里会根据start属性决定初始化何种协程对象:
public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>) =when (this) {CoroutineStart.DEFAULT -> block.startCoroutineCancellable(completion)CoroutineStart.ATOMIC -> block.startCoroutine(completion)CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(completion)CoroutineStart.LAZY -> Unit // will start lazily}
我们直接从默认的CoroutineStart.DEFAULT
入手,其最终会调用到createCoroutineUnintercepted
:
// his function creates a new, fresh instance of suspendable computation every time it is invoked.
// To start executing the created coroutine, invoke `resume(Unit)` on the returned [Continuation] instance.
public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(completion: Continuation<T>
): Continuation<Unit> { ... }
这里贴了一下注释,意思是创建一个可挂起的协程,启动时调用返回对象Continuation的resume(Unit)方法,这个方法是它的内联扩展方法:
public inline fun <T> Continuation<T>.resume(value: T): Unit =resumeWith(Result.success(value))
这里调用的其实就是Continuation接口的resumeWith方法。
所以协程创建出来时就会去调用是Continuation接口的resumeWith方法。这就解释了上文的流程图为什么从resumeWith开始。
2. 协程的线程调度
我们从 launch 创建协程调用的 startCoroutineCancellable
开始;
internal fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>) =createCoroutineUnintercepted(completion).intercepted().resumeCancellable(Unit)
- createCoroutineUnintercepted(completion) 会创建一个新的协程,返回值类型为 Continuation
- intercepted() 是给 Continuation 加上 ContinuationInterceptor 拦截器,也是线程调度的关键
- resumeCancellable(Unit) 最终将调用 resume(Unit) 启动协程
我们来看一下intercepted()的具体实现:
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =(this as? ContinuationImpl)?.intercepted() ?: this
// ContinuationImpl 是 SuspendLambda 的父类
internal abstract class ContinuationImpl(...) : BaseContinuationImpl(completion) {@Transientprivate var intercepted: Continuation<Any?>? = nullpublic fun intercepted(): Continuation<Any?> =intercepted?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this).also { intercepted = it }
}
context[ContinuationInterceptor]?.interceptContinuation(this)
就是利用上下文对象 context 得到 CoroutineDispatcher,会使用协程的CoroutineDispatcher的interceptContinuation 方法:
public abstract class CoroutineDispatcher :AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =DispatchedContinuation(this, continuation)
}
interceptContinuation 方法中使用 DispatchedContinuation类 包装原来的 Continuation,拦截所有的协程运行操作:
internal class DispatchedContinuation<in T>(@JvmField val dispatcher: CoroutineDispatcher,@JvmField val continuation: Continuation<T>
) : Continuation<T> by continuation, DispatchedTask<T> {inline fun resumeCancellable(value: T) {// 判断是否需要线程调度if (dispatcher.isDispatchNeeded(context)) {...// 将协程的运算分发到另一个线程dispatcher.dispatch(context, this)} else {...// 如果不需要调度,直接在当前线程执行协程运算resumeUndispatched(value)}}override fun resumeWith(result: Result<T>) {// 判断是否需要线程调度if (dispatcher.isDispatchNeeded(context)) {...// 将协程的运算分发到另一个线程dispatcher.dispatch(context, this)} else {...// 如果不需要调度,直接在当前线程执行协程运算continuation.resumeWith(result)}}
}internal interface DispatchedTask<in T> : Runnable {public override fun run() {// 任务的执行最终来到这里,这里封装了 continuation.resume 逻辑}
}
总结: 协程的调度是通过 CoroutineDispatcher 的 interceptContinuation 方法来包装原来的 Continuation 为 DispatchedContinuation,来拦截每个续体的运行操作,DispatchedContinuation 拦截了协程的启动和恢复,分别是 resumeCancellable(Unit) 和重写的 resumeWith(Result),然后通过 CoroutineDispatcher 的 dispatch 分发协程的运算任务,最终调用到DispatchedTask 这个 Runnable。
3. 协程的挂起和恢复
我们先来看一下挂起,看一个例子:
fun main(args: Array<String>) = runBlocking<Unit> { launch(Dispatchers.Unconfined) { println("${Thread.currentThread().name} : launch start")async(Dispatchers.Default) { println("${Thread.currentThread().name} : async start")delay(100) println("${Thread.currentThread().name} : async end")}.await() println("${Thread.currentThread().name} : launch end")}
}
async在delay函数中被挂起,我们来看一下launch函数内反编译得到的代码:
public final Object invokeSuspend(@NotNull Object result) {Object coroutine_suspended = IntrinsicsKt.getCOROUTINE_SUSPENDED();switch (this.label) {case 0:...System.out.println(stringBuilder.append(currentThread.getName()).append(" : launch start").toString());// 新建并启动 async 协程 Deferred async$default = BuildersKt.async$default(coroutineScope, (CoroutineContext) Dispatchers.getDefault(), null, (Function2) new 1(null), 2, null);this.label = 1;// 调用 await() 挂起函数if (async$default.await(this) == coroutine_suspended) {return coroutine_suspended;}break;case 1:// 恢复协程后再执行一次 resumeWith(),然后无异常的话跳出if (result instanceof Failure) {throw ((Failure) result).exception;}break;default:throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");}...System.out.println(stringBuilder2.append(currentThread2.getName()).append(" : launch end").toString());return Unit.INSTANCE;
}
上面代码最关键的地方在于 async$default.await(this) == coroutine_suspended
, 如果async线程未执行完成,那么await()返回为IntrinsicsKt.getCOROUTINE_SUSPENDED(),就会 return,然后async所在的线程就会继续执行。当恢复该协程后再执行一次 resumeWith(),调用invokeSuspend(),
总结:协程挂起实际上就是协程挂起点之前的逻辑执行完,然后判断是否是事实上的挂起,如果挂起了则返回,等待挂起函数执行完成,完成后调用resumeWith恢复协程,继续执行该协程下面的代码。
我们再来看一下协程怎么恢复:
我们来看一下await()的代码,关键点在于,实现了一个CompletableDeferredImple对象,调用了 JobSupport.awaitSuspend() 方法
private suspend fun awaitSuspend(): Any? = suspendCoroutineUninterceptedOrReturn { uCont ->val cont = AwaitContinuation(uCont.intercepted(), this)cont.initCancellability()invokeOnCompletion(ResumeAwaitOnCompletion(this, cont).asHandler)cont.getResult()
}
在这里,将 launch(this) 协程封装为 ResumeAwaitOnCompletion 作为 handler 节点。
在方法 invokeOnCompletion 中:
// handler 就是 ResumeAwaitOnCompletion 的实例,将 handler 作为节点
val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
// 将 node 节点添加到 state.list 中
if (!addLastAtomic(state, list, node)) return@loopOnState // retry
这里将 handler 节点添加到 aynsc 协程的 state.list 中,然后在 async 协程完成时会通知 handler 节点调用 launch 协程的 resume(result) 方法将结果传给 launch 协程。
事实上,handler节点完成到launch恢复的过程也是比较复杂的,这里可以通过断点调试查看调用的过程:
从 async 协程的 SuspendLambda 的子类 BaseContinuationImpl 的completion.resumeWith(outcome) -> AbstractCoroutine.resumeWith(result) …-> JobSupport.tryFinalizeSimpleState() -> JobSupport.completeStateFinalization() -> state.list?.notifyCompletion(cause) -> node.invoke,最后 handler 节点里面通过调用resume(result)恢复协程。
总结:所以await()挂起函数恢复协程的原理是,将 launch 协程封装为 ResumeAwaitOnCompletion 作为 handler 节点添加到 aynsc 协程的 state.list,然后在 async 协程完成时会通知 handler 节点,最终会调用 launch 协程的 resume(result) 方法将结果传给 launch 协程,并恢复 launch 协程继续执行 await 挂起点之后的逻辑。
4. resumeWith解析
值得一提的是,续体completion有两种不一样的实现方式,分别是BaseContinuationImpl和AbstractCoroutine,它们的resumeWith执行着不一样的逻辑,先来看BaseContinuationImpl:
internal abstract class BaseContinuationImpl(public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {public final override fun resumeWith(result: Result<Any?>) {...var param = resultwhile (true) {with(current) {val completion = completion!!val outcome: Result<Any?> =try {// 调用 invokeSuspend 方法执行,执行协程的真正运算逻辑val outcome = invokeSuspend(param)// 协程挂起时 invokeSuspend 才会返回 COROUTINE_SUSPENDED,所以协程挂起时,先return,再次调用 resumeWith 时,协程挂起点之后的逻辑才能继续执行if (outcome === COROUTINE_SUSPENDED) returnResult.success(outcome)} catch (exception: Throwable) {Result.failure(exception)}releaseIntercepted() // 这里可以看出 Continuation 其实分为两类,一种是 BaseContinuationImpl,封装了协程的真正运算逻辑if (completion is BaseContinuationImpl) {// unrolling recursion via loopcurrent = completionparam = outcome} else {// 这里实际调用的是其父类 AbstractCoroutine 的 resumeWith 方法completion.resumeWith(outcome)return}}}}
看一下AbstractCoroutine 的resumeWith实现:
public final override fun resumeWith(result: Result<T>) {makeCompletingOnce(result.toState(), defaultResumeMode)}/** * Returns:* * `true` if state was updated to completed/cancelled;* * `false` if made completing or it is cancelling and is waiting for children.*/internal fun makeCompletingOnce(proposedUpdate: Any?, mode: Int): Boolean = loopOnState { state ->when (tryMakeCompleting(state, proposedUpdate, mode)) {COMPLETING_ALREADY_COMPLETING -> throw IllegalStateException("Job $this is already complete or completing, " +"but is being completed with $proposedUpdate", proposedUpdate.exceptionOrNull)COMPLETING_COMPLETED -> return trueCOMPLETING_WAITING_CHILDREN -> return falseCOMPLETING_RETRY -> return@loopOnStateelse -> error("unexpected result")}}
可以看到 BaseContinuationImpl 的 resumeWith 封装了协程的运算逻辑,而 AbstractCoroutine 的 resumeWith 主要用来管理协程的状态。
5. 协程整体结构
从上面的协程执行流程,我们可以梳理一下协程的整体结构;
其中最上层的DispatcherContinuation负责协程的调度逻辑,第二层的BaseContinuaImpl的 invokeSuspend 封装了协程真正的运算逻辑,AbstractCoroutine封装了协程的状态(Job,deferred)。
协程的挂起、恢复和调度的原理 (二)相关推荐
- Kotlin协程:挂起与恢复原理逆向刨析
前言:只有在那崎岖的小路上不畏艰险奋勇攀登的人,才有希望达到光辉的顶点. --马克思 前言 经过前面两篇协程的学习,我相信大家对协程的使用已经非常熟悉了.本着知其然更要知其之所以然的心态,很想知道它里 ...
- 分析Kotlin协程只挂起不恢复会怎样(是否存在协程泄漏),以及挂起的协程存在哪里?
前言 刚开始正式学协程原理的时候(以前只是学api怎么用),大概是20年6月,也就是bennyhuo大佬出书<深入理解Kotlin协程>的时候,我买了本然后细细研究,我的内心就一直有一个问 ...
- Kotlin的协程:挂起函数
挂起函数 挂起函数是指使用 suspend 关键字修饰的函数. suspend fun getUserInfo(): String {withContext(Dispatchers.IO) {dela ...
- lua服务执行过程中协程的挂起和重新唤醒
lua服务在执行回调函数的过程中,调用某些函数会挂起协程,比如skynet.call, skynet.ret, skynet.response等等,这些函数把协程挂起后,如何唤醒呢? 本文将对所有调用 ...
- linux ucontext 类型,协程:posix::ucontext用户级线程实现原理分析 | WalkerTalking
在听完leader的课程后,对其中协程的实现方式有了基本的了解,无论的POSIX的ucontex,boost::fcontext,还是libco,都是通过保存和恢复寄存器状态,来进行各个协程上下文的保 ...
- Kotlin 协程与架构组件一起使用及底层原理分析,音视频开发前景
if (!isChangingConfigurations()) { getViewModelStore().clear(); } } } }); } 在Activity的生命周期走到onDestro ...
- Kotlin 协程调度切换线程是时候解开真相了
前言 协程系列文章: 一个小故事讲明白进程.线程.Kotlin 协程到底啥关系? 少年,你可知 Kotlin 协程最初的样子? 讲真,Kotlin 协程的挂起/恢复没那么神秘(故事篇) 讲真,Kotl ...
- 一个小故事讲明白进程、线程、Kotlin 协程到底啥关系?
前言 协程系列文章: 一个小故事讲明白进程.线程.Kotlin 协程到底啥关系? 少年,你可知 Kotlin 协程最初的样子? 讲真,Kotlin 协程的挂起/恢复没那么神秘(故事篇) 讲真,Kotl ...
- pdf 深入理解kotlin协程_Kotlin协程实现原理:挂起与恢复
今天我们来聊聊Kotlin的协程Coroutine. 如果你还没有接触过协程,推荐你先阅读这篇入门级文章What? 你还不知道Kotlin Coroutine? 如果你已经接触过协程,但对协程的原理存 ...
最新文章
- 5篇CVPR 各路大佬显身手 点云分割、姿态估计、物体检测、生成重建
- IFE-16 addEventHandler跨浏览器实现事件绑定
- [bzoj 2653][国家集训队]middle
- 计算机单词修改是否正确,计算机组装必懂的53个单词及装机步骤51条.doc
- 华水c语言课程设计,【图片】发几个C语言课程设计源代码(恭喜自己当上技术小吧主)【东华理工大学吧】_百度贴吧...
- 液体火箭发动机技术国家级重点实验室2021年度对外开放项目指南
- Mr.J-- jQuery学习笔记(四)--内容选择器
- JS 面向对象实例 prototype
- 已有项目要不要迁移到Addressable系统?
- python如何运行一个python程序_python如何运行?第一个python小程序示范
- vue打开二级或者三级页面传输对象,再刷新浏览器数据丢失问题解决(vue使用router传递数据)
- ASCSDK-------通用包接入文档(COCOS篇)
- matlab中firrcos,DMR数字集群关键技术的应用研究
- 实习面试感悟-阿里云
- ssh: Could not resolve hostname f: Name or service not known的解决
- 基于Caffe ResNet-50网络实现图片分类(仅推理)的实验复现
- 个人网站个人主页的建立
- ERC20接口下USDT代币的深入解析
- 微信小程序如何修改小程序名称
- windows配置spark开发环境