Kotlin协程实现原理
前言
本篇解析Kotlin/JVM
中的协程的实现原理。
初看suspend关键字
下面的例子模拟一个网络请求:
class Temp {suspend fun fetchData(argument: String): Boolean {val result = netRequest(argument)return result == 0}// 模拟网络请求suspend fun netRequest(argument: String): Int {delay(1000)return argument.length}
}
这两个方法都使用了suspend
关键字修饰,我们将这个文件的字节码反编译为等同效果的Java
代码:
public final class Temp {@Nullablepublic final Object fetchData(@NotNull String argument, @NotNull Continuation var2) {Object $continuation;label25: {if (var2 instanceof <undefinedtype>) {$continuation = (<undefinedtype>)var2;if ((((<undefinedtype>)$continuation).label & Integer.MIN_VALUE) != 0) {((<undefinedtype>)$continuation).label -= Integer.MIN_VALUE;break label25;}}$continuation = new ContinuationImpl(var2) {// $FF: synthetic fieldObject result;int label;@Nullablepublic final Object invokeSuspend(@NotNull Object $result) {this.result = $result;this.label |= Integer.MIN_VALUE;return Temp.this.fetchData((String)null, this);}};}Object $result = ((<undefinedtype>)$continuation).result;Object var6 = IntrinsicsKt.getCOROUTINE_SUSPENDED();Object var10000;switch(((<undefinedtype>)$continuation).label) {case 0:ResultKt.throwOnFailure($result);((<undefinedtype>)$continuation).label = 1;var10000 = this.netRequest(argument, (Continuation)$continuation);if (var10000 == var6) {return var6;}break;case 1:ResultKt.throwOnFailure($result);var10000 = $result;break;default:throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");}int result = ((Number)var10000).intValue();return Boxing.boxBoolean(result == 0);}@Nullablepublic final Object netRequest(@NotNull String argument, @NotNull Continuation var2) {Object $continuation;label20: {if (var2 instanceof <undefinedtype>) {$continuation = (<undefinedtype>)var2;if ((((<undefinedtype>)$continuation).label & Integer.MIN_VALUE) != 0) {((<undefinedtype>)$continuation).label -= Integer.MIN_VALUE;break label20;}}$continuation = new ContinuationImpl(var2) {// $FF: synthetic fieldObject result;int label;Object L$0;@Nullablepublic final Object invokeSuspend(@NotNull Object $result) {this.result = $result;this.label |= Integer.MIN_VALUE;return Temp.this.netRequest((String)null, this);}};}Object $result = ((<undefinedtype>)$continuation).result;Object var5 = IntrinsicsKt.getCOROUTINE_SUSPENDED();switch(((<undefinedtype>)$continuation).label) {case 0:ResultKt.throwOnFailure($result);((<undefinedtype>)$continuation).L$0 = argument;((<undefinedtype>)$continuation).label = 1;if (DelayKt.delay(1000L, (Continuation)$continuation) == var5) {return var5;}break;case 1:argument = (String)((<undefinedtype>)$continuation).L$0;ResultKt.throwOnFailure($result);break;default:throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");}return Boxing.boxInt(argument.length());}
}
几行协程相关的代码,竟然对应了这么多的Java
代码,可见kotlin
编译器为我们做了很多事情。
上面代码的可读性不高,例如有<undefinedtype>
这种未定义的类型,我使用jd-gui
对Temp.class
文件再进行了一次反编译,获取到了更多信息,我将上面的反编译的一大串代码和jd-gui
反编译获取的信息进行整合,并且对一些类和变量进行适当的重命名,得出信息更完整且可读性更高的「Temp.class
反编译后对应的Java
代码」,首先是fetchData
相关的:
public final Object fetchData(@NotNull String argument,@NotNull Continuation completion) {Object $continuation;label25:{if (completion instanceof FetchDataStateMachine) {$continuation = (FetchDataStateMachine) completion;if (($continuation.label & Integer.MIN_VALUE) != 0) {$continuation.label -= Integer.MIN_VALUE;break label25;}}$continuation = new FetchDataStateMachine(completion);}Object $result = $continuation.result;Object resultTemp;switch ($continuation.label) {case 0:ResultKt.throwOnFailure($result);$continuation.label = 1;resultTemp = this.netRequest(argument, (Continuation) $continuation);if (resultTemp == COROUTINE_SUSPENDED) {return COROUTINE_SUSPENDED;}break;case 1:ResultKt.throwOnFailure($result);resultTemp = $result;break;default:throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");}int result = ((Number) resultTemp).intValue();return Boxing.boxBoolean(result == 0);
}static final class FetchDataStateMachine extends ContinuationImpl {Object result;int label;FetchDataStateMachine(Continuation $completion) {super($completion);}@Nullablepublic final Object invokeSuspend(@NotNull Object $result) {this.result = $result;this.label |= Integer.MIN_VALUE;return Temp.this.fetchData(null, (Continuation<? super Boolean>) this);}
}
netRequest
相关的代码,与fetchData
相关的代码,在结构和形式上类似:
public final Object netRequest(@NotNull String argument,@NotNull Continuation completion) {Object $continuation;label20:{if (completion instanceof NetRequestStateMachine) {$continuation = (NetRequestStateMachine) completion;if (($continuation.label & Integer.MIN_VALUE) != 0) {$continuation.label -= Integer.MIN_VALUE;break label20;}}$continuation = new NetRequestStateMachine(completion);}Object $result = $continuation.result;switch ($continuation.label) {case 0:ResultKt.throwOnFailure($result);$continuation.functionParameter = argument;$continuation.label = 1;if (DelayKt.delay(1000L, (Continuation) $continuation) == COROUTINE_SUSPENDED) {return COROUTINE_SUSPENDED;}break;case 1:argument = (String) ($continuation.functionParameter);ResultKt.throwOnFailure($result);break;default:throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");}return Boxing.boxInt(argument.length());
}static final class NetRequestStateMachine extends ContinuationImpl {Object result;int label;Object functionParameter;NetRequestStateMachine(Continuation $completion) {super($completion);}@Nullablepublic final Object invokeSuspend(@NotNull Object $result) {this.result = $result;this.label |= Integer.MIN_VALUE;return Temp.this.netRequest(null, (Continuation<? super Integer>) this);}
}
可以发现,反编译后的Java
代码中,fetchData
和netRequest
方法都多了一个Continuation completion
参数,这是Kotlin Compiler
帮我们做的,对于suspend
修饰的函数,编译的时候Kotlin Compiler
会帮我们在该函数中传入一个Continuation
参数,使用Continuation
参数代替了suspend
修饰符,这个参数有什么含义呢?
初识续体
续体是理解协程工作原理的一个关键。
先看传统的网络请求:
data class User(val id: Long, val name: String)interface Callback {fun success(user: User)fun failure(t: Throwable)
}class Model {fun getUserInfo(callback: Callback) {Thread.sleep(1000) // 模拟网络请求callback.success(User(1, "giagor"))}
}class Business {val model = Model()fun getUserInfo() {model.getUserInfo(object : Callback {override fun success(user: User) {showMsg(user.toString())}override fun failure(t: Throwable) {showMsg(t.message ?: "")}})}fun showMsg(msg: String) {// ...}
}
在使用Model
进行网络请求的时候,使用Callback
接收网络请求的结果,我们这时候可以将Callback
看作一个续体,即网络请求的续体,用于接收网络请求的结果。
在协程中使用Continuation
接口表示一个续体,它代表一个挂起点之后的延续,即 挂起点之后的剩余应执行的代码:
public interface Continuation<in T> {// 与该续体对应的协程的上下文public val context: CoroutineContext// 恢复对应协程的执行,并且传递一个表示成功或失败的result作为最后一个挂起点的返回值public fun resumeWith(result: Result<T>)
}
在Kotlin 1.3
,也有可以方便地调用resumeWith
的扩展函数:
public inline fun <T> Continuation<T>.resume(value: T): Unit =resumeWith(Result.success(value))public inline fun <T> Continuation<T>.resumeWithException(exception: Throwable): Unit =resumeWith(Result.failure(exception))
正如前面所说,对于suspend
修饰的函数,Kotlin Compiler
会帮我们在该函数中传入一个Continuation
参数,使用Continuation
参数代替了suspend
修饰符,通过Continuation
参数,Kotlin Compiler
可以将我们的协程代码转化为等价的回调代码,也就是说,Kt
编译器帮我们写好了那些回调的代码,至于怎么帮我们写的后面会分析,这种通过传递Continuation
来控制异步调用流程被称作CPS
变换(Continuation-Passing-Style Transformation
)。
状态机
fetchData
函数编译时会生成下面的一个静态内部类(续体):
static final class FetchDataStateMachine extends ContinuationImpl {Object result;int label;FetchDataStateMachine(Continuation $completion) {super($completion);}@Nullablepublic final Object invokeSuspend(@NotNull Object $result) {this.result = $result;this.label |= Integer.MIN_VALUE;return Temp.this.fetchData(null, (Continuation<? super Boolean>) this);}
}
FetchDataStateMachine
的继承关系如下:
FetchDataStateMachine -> ContinuationImpl -> BaseContinuationImpl -> Continuation
FetchDataStateMachine
接收一个名称为$completion
的Continuation
参数,$completion
被保存在父类BaseContinuationImpl
中:
internal abstract class BaseContinuationImpl(public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {...}
通过$completion
可以将fetchData
函数的执行结果传递回给调用fetchData
的函数,有了$completion
,才有能力去实现回调。
状态机FetchDataStateMachine
声明了result
和label
两个变量
result
:表示上一个Continuation
的结果,比如有函数A
和B
,函数内部分别声明了ContinuationA
和ContinuationB
,A
调用B
并且将ContinuationA
传入B
中保存。在后续回调的过程中,ContinuationA
可以从result
变量中拿到ContinuationB::invokeSuspend
的执行结果。label
:Kotlin Compiler
可以识别函数内部哪个地方会挂起,每一个挂起点(suspension point
)被表示为状态机的一个状态(state
),这些状态通过switch case
语句表示出来。label
表示当前应该执行状态机的哪一个状态,具体来说就是要进入哪一个case
,通过label
变量就记录下了状态机当前的状态。
再看下fetchData
的前半部分代码:
public final Object fetchData(@NotNull String argument,@NotNull Continuation completion) {Object $continuation;label25:{if (completion instanceof FetchDataStateMachine) {$continuation = (FetchDataStateMachine) completion;if (($continuation.label & Integer.MIN_VALUE) != 0) {$continuation.label -= Integer.MIN_VALUE;break label25;}}$continuation = new FetchDataStateMachine(completion);}...
}
它会判断传入的completion
是否为FetchDataStateMachine
类型,若是则对它的label
变量做些操作,若不是则直接创建一个FetchDataStateMachine
并且传入completion
(completion
会被保存下来)。
再看下fetchData
的后半部分代码:
public final Object fetchData(@NotNull String argument,@NotNull Continuation completion) {Object $continuation;...Object $result = $continuation.result;Object resultTemp;switch ($continuation.label) {case 0:ResultKt.throwOnFailure($result);$continuation.label = 1;resultTemp = this.netRequest(argument, (Continuation) $continuation);if (resultTemp == COROUTINE_SUSPENDED) {return COROUTINE_SUSPENDED;}break;case 1:ResultKt.throwOnFailure($result);resultTemp = $result;break;default:throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");}int result = ((Number) resultTemp).intValue();return Boxing.boxBoolean(result == 0);
}
fetchData
方法原先的代码语句会被划分为switch
下的多个case
语句,在这里就是
FetchDataStateMachine
中的label
变量就是控制当前要执行哪个case
分支。
可见,函数与续体构成了一个有限状态机(FSM,即 Finite-State Machine),来控制协程代码的执行。
何为「非阻塞式挂起」?
在netRequest
方法中,调用了delay(1000)
挂起了当前的协程,简单看下delay
方法反编译后的代码:
public static final Object delay(long timeMillis, @NotNull Continuation $completion) {if (timeMillis <= 0L) {return Unit.INSTANCE;} else {// 实现类CancellableContinuationImpl cancellableContinuationImpl = new CancellableContinuationImpl(IntrinsicsKt.intercepted($completion), 1);cancellableContinuationImpl.initCancellability();// 向上转型CancellableContinuation cont = (CancellableContinuation)cancellableContinuationImpl;if (timeMillis < Long.MAX_VALUE) {// 延时操作getDelay(cont.getContext()).scheduleResumeAfterDelay(timeMillis, cont);}// 获取执行结果Object result = cancellableContinuationImpl.getResult();if (result == COROUTINE_SUSPENDED) {DebugProbesKt.probeCoroutineSuspended($completion);}// 返回结果return result;}
}
在该方法里会执行延时操作,如果需要挂起,就会返回COROUTINE_SUSPENDED
值给调用者。
结合fetchData
、netRequest
和delay
反编译的代码,我们可以得出下面的这个调用图:
图中红色的线表示函数返回COROUTINE_SUSPENDED
,需要挂起。当delay
方法需要挂起的时候,它返回COROUTINE_SUSPENDED
,接着netRequest
方法返回COROUTINE_SUSPENDED
,接着fetchData
方法返回COROUTINE_SUSPENDED
,重复这个过程直到调用栈的最上层。
通过这种「结束方法调用」的方式,让协程暂时不在这个线程上面执行,让线程可以去处理其它的任务(包括执行其它的协程),这也就是为什么协程的挂起不会阻塞当前的线程,这也是「非阻塞式挂起」的由来。
如何恢复?
既然协程挂起了,那就有相应的协程的恢复。先说结论:协程恢复的实质是对续体进行回调。
暂时还没有研究delay
函数的具体实现,但是delay
函数会在某个子线程执行等待操作,等延时时间到达之后,就会调用传给delay
函数的$completion
的resumeWith
方法,也就是调用NetRequestStateMachine
的resumeWith
方法。NetRequestStateMachine
的继承关系、父类如下:
NetRequestStateMachine -> ContinuationImpl -> BaseContinuationImpl -> Continuation
BaseContinuationImpl
目前是我们分析的一个重点,它主要做了下面的几件事情:
- 保存
completion
:它保存了fetchData
方法的FetchDataStateMachine
实例,使得可以一级一级地向上回调续体。 - 重写
resumeWith
方法:BaseContinuationImpl
重写了Continuation
接口的resumeWith
方法,该方法用于恢复协程,也是协程恢复的核心逻辑。
我们查看BaseContinuationImpl
类的定义:
internal abstract class BaseContinuationImpl(public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {// This implementation is final. This fact is used to unroll resumeWith recursion.public final override fun resumeWith(result: Result<Any?>) {// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resumevar current = thisvar param = resultwhile (true) {// 在每个恢复的continuation进行调试探测,使得调试库可以精确跟踪挂起的调用栈中哪些部分// 已经恢复了。probeCoroutineResumed(current)with(current) {val completion = completion!! // fail fast when trying to resume continuation without completionval outcome: Result<Any?> =try {val outcome = invokeSuspend(param)if (outcome === COROUTINE_SUSPENDED) returnResult.success(outcome)} catch (exception: Throwable) {Result.failure(exception)}releaseIntercepted() // this state machine instance is terminatingif (completion is BaseContinuationImpl) {// unrolling recursion via loopcurrent = completionparam = outcome} else {// top-level completion reached -- invoke and returncompletion.resumeWith(outcome)return}}}}protected abstract fun invokeSuspend(result: Result<Any?>): Any?protected open fun releaseIntercepted() {// does nothing here, overridden in ContinuationImpl} ...
}
重点是resumeWith
方法的实现,它在一个while(true)
循环下面执行回调的逻辑。我们结合前面给出的fetchData
和netRequest
反编译后的代码,看看delay
函数的延时时间到达时调用NetRequestStateMachine
的resumeWith
方法,后续的执行流程是怎样的:
- 执行
NetRequestStateMachine
父类BaseContinuationImpl
的resumeWith
方法。 - 执行当前续体也就是
NetRequestStateMachine
的invokeSuspend
方法(NetRequestStateMachine
有实现该方法,忘记了的话可以回头看看之前的反编译代码)。 - 在
NetRequestStateMachine
的invokeSuspend
方法调用了netRequest
方法,并且将续体自身作为参数传入。 - 在
netRequest
方法中,由于completion
的类型就是NetRequestStateMachine
,因此可以直接使用该续体,不用像之前第一次进入netRequest
方法那样需要创建一个新的续体。此时续体的label
值为1
,于是进入netRequest
的case 1
语句分支。
实际上这个过程有对续体的
label
进行一些运算转化的操作,但是最终label
的值都是1
,做的运算转化操作不影响我们的分析,因此并不是重点
- 从续体中取出一开始传入
netRequest
方法的参数,也就是argument
,返回argument.length
。为了方便后面阐述,这里将该返回值argument.length
记为netRequest-Return
。 - 接着
netRequest
方法结束,NetRequestStateMachine::invokeSuspend
方法也执行结束,netRequest-Return
也作为invokeSuspend
方法的返回值,该返回值会传递到BaseContinuationImpl
的resumeWith
方法中,在resumeWith
方法中,将netRequest-Return
包装为Result
保存到outcome
变量中。 - 判断
NetRequestStateMachine
持有的completion
是否为BaseContinuationImpl
类型,我们知道它持有的实例其实就是FetchDataStateMachine
,因此肯定是BaseContinuationImpl
,于是进行了变量的更新
// 把current更新为FetchDataStateMachine实例current = completion// 把param更新为outcome(包装了netRequest-Return的Result)param = outcome
通过这种方式,其实就可以实现回调,我们继续往后看。
- 继续进行下一轮
while
循环,在with
块中会执行FetchDataStateMachine::invokeSuspend
,在invokeSuspend
里,将传入的参数param
保存到result
变量里(其实这和传统的回调类似,传统的回调中也是要将下层的执行结果回调给上层),接着调用了fetchData
方法。 - 在
fetchData
方法中,由于传入的completion
已是FetchDataStateMachine
类型,因此无需再去创建新的续体。由于此时续体label
的值为1
,所以会进入case 1
语句,并且将netRequest
方法的执行结果保存在resultTemp
变量中,最终fetchData
方法结束并返回结果result == 0
,为了方便阐述,将fetchData
方法的执行结果记为fetchData-Return
。 FetchDataStateMachine::invokeSuspend
方法也会结束并返回fetchData-Return
,然后在BaseContinuationImpl
的resumeWith
方法中将fetchData-Return
包装为Result
。然后会判断FetchDataStateMachine
持有的completion
是否为BaseContinuationImpl
类型。- 代码的后续走向,我们目前是不清楚的,我们得知道在协程中调用
fetchData
方法的时候会做些什么,才能清楚后续的代码走向。
从上面的流程分析中,我们对协程的恢复有了一个基本的认识,下面给出流程图进行总结:
再看看上面续体的调用过程,其实就是层层往上地调用续体的invokeSuspend
方法,从过程来看有点像递归调用,但是BaseContinuationImpl::resumeWith
的实现却和递归不太一样,它的实现是在while(true)
循环中,对续体调用一次invokeSuspend
方法,然后记录它的返回结果,将这个返回结果作为下一个续体invokeSuspend
的方法参数。
简单来讲,就是在调用一个续体的invokeSuspend
方法,待这个方法执行结束后,再调用下一个续体的invokeSuspend
方法。这样做的一个原因是避免调用栈过深,在BaseContinuationImpl::resumeWith
也有相关的注释说明:
This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
启动协程
我们在一个协程中去调用fetchData
方法:
class Temp2 {fun execute() {GlobalScope.launch(Dispatchers.Main) {Temp().fetchData("argument")}}
}
通过launch
方法可以启动一个协程,其源码如下:
public fun CoroutineScope.launch(context: CoroutineContext = EmptyCoroutineContext,start: CoroutineStart = CoroutineStart.DEFAULT,block: suspend CoroutineScope.() -> Unit
): Job {val newContext = newCoroutineContext(context)val coroutine = if (start.isLazy)LazyStandaloneCoroutine(newContext, block) elseStandaloneCoroutine(newContext, active = true)coroutine.start(start, coroutine, block)return coroutine
}
协程中的代码会被包装为一个block
,默认情况下会创建一个StandaloneCoroutine
,然后调用它的start
方法并返回StandaloneCoroutine
。
StandaloneCoroutine
间接的实现了Job
接口和Continuation<T>
接口,如下:
private open class StandaloneCoroutine(parentContext: CoroutineContext,active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active) {override fun handleJobException(exception: Throwable): Boolean {handleCoroutineException(context, exception)return true}
}public abstract class AbstractCoroutine<in T>(/*** The context of the parent coroutine.*/@JvmFieldprotected val parentContext: CoroutineContext,active: Boolean = true
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {...}
可以看出StandaloneCoroutine
身兼多职,实现了Job, Continuation<T>, CoroutineScope
接口。后面代码跟踪可以得出一个结论,最顶层的续体实现是协程自身,也就是协程恢复的时候续体会一层层地往上回调,最顶层的续体就是协程coroutine
自身,即StandaloneCoroutine
(这里以StandaloneCoroutine
为例)。
另外还要注意一点,launch
方法中传入的block
块类型:
block: suspend CoroutineScope.() -> Unit
它等价于下面的这种函数类型:
// CoroutineScope:扩展函数转化而来
// Continuation:suspend关键字转化而来,Continuation参数由编译器传入
block : (CoroutineScope,Continuation) -> Unit// 或者通过Function2的形式表示
block : Function2<CoroutineScope,Continuation,Unit>
接着跟踪下启动协程的调用过程。在launch
方法中,调用了AbstractCoroutine::start
方法:
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {initParentJob()// 语法糖,实际是调用CoroutineStart.invoke方法start(block, receiver, this)}
CoroutineStart::invoke
方法:
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =when (this) {DEFAULT -> block.startCoroutineCancellable(receiver, completion)ATOMIC -> block.startCoroutine(receiver, completion)UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)LAZY -> Unit // will start lazily}
从launch
方法可以知道CoroutineStart
的默认值是CoroutineStart.DEFAULT
,因此会调用到block
的startCoroutineCancellable
方法:
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>,onCancellation: ((cause: Throwable) -> Unit)? = null
) =runSafely(completion) {createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)}
我在AS
跟踪createCoroutineUnintercepted
的代码调用时,发现会跳转到IntrinsicsKt.class
文件,这个文件里面找不到方法的源代码,最后找到了IntrinsicsJvm.kt
文件,找到createCoroutineUnintercepted
方法源码,如下:
# R:CoroutineScope
# T:Unit
@SinceKotlin("1.3")
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(receiver: R,completion: Continuation<T>
): Continuation<Unit> {// probeCoroutineCreated方法直接返回completionval probeCompletion = probeCoroutineCreated(completion)return if (this is BaseContinuationImpl)create(receiver, probeCompletion)else {createCoroutineFromSuspendFunction(probeCompletion) {(this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)}}
}
这里会判断this
的类型是否为BaseContinuationImpl
,this
就是我们之前在launch
中传入的lambda
块,那么这个lambda
代码块是什么类型的呢?想要知道这个答案,我们得对这一节刚开始给出的代码进行反编译
kotlin
代码:
class Temp2 {fun execute() {GlobalScope.launch(Dispatchers.Main) {Temp().fetchData("argument")}}
}
对反编译后的java
代码进行适当的重命名和调整,得出:
public final class Temp2 {...static final class LaunchLambda extends SuspendLambda implements Function2<CoroutineScope, Continuation<? super Unit>, Object> {int label;LaunchLambda(Continuation $completion) {super(2, $completion);}@Nullablepublic final Object invokeSuspend(@NotNull Object $result) {switch (this.label) {case 0:ResultKt.throwOnFailure(SYNTHETIC_LOCAL_VARIABLE_1);this.label = 1;if ((new Temp()).fetchData("argument", (Continuation<? super Boolean>) this) == COROUTINE_SUSPENDED)return COROUTINE_SUSPENDED;(new Temp()).fetchData("argument", (Continuation<? super Boolean>) this);return Unit.INSTANCE;case 1:ResultKt.throwOnFailure(SYNTHETIC_LOCAL_VARIABLE_1);return Unit.INSTANCE;}throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");}@NotNullpublic final Continuation<Unit> create(@Nullable Object value, @NotNull Continuation<? super LaunchLambda> $completion) {return (Continuation<Unit>) new LaunchLambda($completion);}@Nullablepublic final Object invoke(@NotNull CoroutineScope p1, @Nullable Continuation<?> p2) {return ((LaunchLambda) create(p1, p2)).invokeSuspend(Unit.INSTANCE);}}
}
可以看出在Temp2
里面会自动生成一个静态内部类LaunchLambda
,它对应着launch
方法中传入的lambda
块。LaunchLambda
的继承关系(由上到下,子类到父类的顺序):
LaunchLambda
-> SuspendLambda // 用suspend修饰的lambda块都会继承至这个类
-> ContinuationImpl
-> BaseContinuationImpl // 重写了resumeWith函数
-> Continuation
OK,回到createCoroutineUnintercepted
方法中,现在可以回答刚刚提出的问题了,lambda
传入的lambda
块是不是BaseContinuationImpl
类型呢?根据上面的继承关系得出,当然是!那么它就会调用LaunchLambda
的create
方法,注意第二个参数传入的是completion
(代码中写的是probeCompletion
),它最终会被保存在父类BaseContinuationImpl
的completion
变量中,这个completion
参数就是launch
方法中创建的StandaloneCoroutine
,即协程本身,它作为协程恢复时的最顶层续体。
通过调用create
方法获取到一个LaunchLambda
实例,createCoroutineUnintercepted
方法执行结束并返回LaunchLambda
实例,接着代码执行又回到startCoroutineCancellable
中,回顾下该方法:
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>,onCancellation: ((cause: Throwable) -> Unit)? = null
) =runSafely(completion) {createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)}
这里有两部分调用,先是调用intercepted
方法,然后再调用resumeCancellableWith
方法。intercepted
方法与续体拦截机制有关,后面会介绍,这里先忽略,这里直接认为调用了LaunchLambda
实例的resumeCancellableWith
方法即可,该方法如下:
public fun <T> Continuation<T>.resumeCancellableWith(result: Result<T>,onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)else -> resumeWith(result)
}
那么会走到resumeWith
方法,前面提到过该方法在父类BaseContinuationImpl
实现,在该方法里面会调用invokeSuspend
方法,invokeSuspend
方法在LaunchLambda
中实现了,如下:
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {switch (this.label) {case 0:ResultKt.throwOnFailure(SYNTHETIC_LOCAL_VARIABLE_1);this.label = 1;if ((new Temp()).fetchData("argument", (Continuation<? super Boolean>) this) == COROUTINE_SUSPENDED)return COROUTINE_SUSPENDED;(new Temp()).fetchData("argument", (Continuation<? super Boolean>) this);return Unit.INSTANCE;case 1:ResultKt.throwOnFailure(SYNTHETIC_LOCAL_VARIABLE_1);return Unit.INSTANCE;}throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
一开始label
的值为0
,所以会进入case 0
语句分支,在该语句分支里面,会设置label
的值为1
,然后创建一个Temp
对象并且调用它的fetchData
方法,并把LaunchLambda
自身作为参数传入,也就是LaunchLambda
实例会被保存在fetchData
方法创建的续体的completion
变量里,方便协程恢复的时候进行回调。
现在续体的持有图:
到了这里,从启动一个协程到协程最终是如何挂起的,我们已经可以串联起来了。在「如何恢复?」一节中,协程恢复的最后几个步骤我们还没有分析,这里把它分析完,然后整个协程恢复的流程也可以串起来了。
协程恢复的后续流程:
- 当
FetchDataStateMachine::invokeSuspend
执行完后,会在BaseContinuationImpl
的resumeWith
方法中判断FetchDataStateMachine
所持有的completion
(即LaunchLambda
)是否为BaseContinuationImpl
类型,由LaunchLambda
的继承关系,容易得出答案为「是」,所以会进入下一轮while
循环,调用LaunchLambda
的invokeSuspend
方法。 - 由于
label = 1
所以会进入case 1
语句,里面直接return Unit
。接着判断LaunchLambda
持有的completion
(即StandaloneCoroutine
)是否为BaseContinuationImpl
类型,根据StandaloneCoroutine
的继承关系容易得出答案为「不是」,所以会调用StandaloneCoroutine
的resumeWith
方法。 StandaloneCoroutine
的resumeWith
方法在父类AbstractCoroutine
中实现:
public final override fun resumeWith(result: Result<T>) {val state = makeCompletingOnce(result.toState())// 如果在等子协程完成,则返回if (state === COMPLETING_WAITING_CHILDREN) return// 应该是做一些后续处理afterResume(state)}
此时最顶层的续体(协程自身)也恢复了。
BaseContinuationImpl::resumeWith
方法执行结束,整个协程的恢复也完成了。
在之前流程图的基础上进行补充完善:
一、协程至上而下调用的流程图(协程挂起)
其中蓝色的文本和线条表示新增的,红色的文本和线条表示挂起的过程。
二、协程至下而上恢复的流程图(协程恢复)
其中蓝色的文本和线条表示新增的,橙色的文本和线条表示方法调用的结束。
协程上下文
协程上下文CoroutineContext
定义了协程的行为,它记录了当前协程所持有的信息,是协程运行中一个重要的数据对象。CoroutineContext
是一个接口:
public interface CoroutineContext {...}
在续体中就有CoroutineContext
的相关信息:
public interface Continuation<in T> {// 与该续体对应的协程的上下文public val context: CoroutineContext// 恢复对应协程的执行,并且传递一个表示成功或失败的result作为最后一个挂起点的返回值public fun resumeWith(result: Result<T>)
}
下面几种元素都是「协程上下文」的元素:
Job
:控制协程的生命周期。CoroutineDispatcher
:将工作分派到适当的线程。CoroutineName
:协程的名称,可用于调试。CoroutineExceptionHandler
:处理未捕获的异常。
CoroutineContext
可以看做是CoroutineContext.Element
的一个集合,集合中的每个元素都可以使用CoroutineContext.Key
进行定位,且每个元素的Key
都是不同的。
CoroutineContext.Element
的定义:
public interface Element : CoroutineContext {...}
可以看到Element
本身也实现了CoroutineContext
接口,这很奇怪,看上去好像是Int
实现了List<Int>
接口一样,为什么元素本身也是集合了呢?其实这主要是为了方便API的设计,这样的话,一个元素比如Job
也可以直接作为一个CoroutineContext
,而不需要创建一个只包含一个元素的List
,多个元素之间也可以通过「+」进行拼接,如:
scope.launch(CoroutineName("coroutine") + Dispatchers.Main) {...}
这里的「+」其实是操作符重载,对应CoroutineContext
声明的plus
方法:
public operator fun plus(context: CoroutineContext): CoroutineContext = ...
「协程上下文」存储元素的方式比较巧妙,它内部并不是创建一个集合,集合的每个位置都存放一个元素。它借助了一个CombinedContext
结构来实现数据的存取,CombinedContext
的定义及get
方法:
internal class CombinedContext(private val left: CoroutineContext,private val element: Element
) : CoroutineContext, Serializable {override fun <E : Element> get(key: Key<E>): E? {var cur = thiswhile (true) {cur.element[key]?.let { return it }val next = cur.leftif (next is CombinedContext) {cur = next} else {return next[key]}}}...
}
从构造函数中可以看出它包含两部分内容:left
和element
。也就是说一个CombinedContext
内部可能包含多个元素。
- left:可能是普通的上下文元素(
CoroutineContext.Element
),也可能又是一个CombinedContext
(又包含多个上下文元素)。 - element:一个协程上下文元素。
在CombinedContext
的get
方法中,有一个while(true)
循环,执行过程如下:
- 它会先判断当前
element
元素与传入的key
是否相符,是的话直接返回该元素,否则获取到left
部分。 - 若
left
是CombinedContext
部分,则对left
变量重复步骤1。 - 若
left
不是CombinedContext
部分,则直接调用它的get
方法获取元素(获取不到则返回null
)。
另外,也可以看出element
先于left
被访问,所以越靠右边的上下文元素,其优先级越高。
Key
用于标识协程上下文元素,看看它的定义:
public interface CoroutineContext {...public interface Key<E : Element>public interface Element : CoroutineContext {// 用于标识元素的Keypublic val key: Key<*>...}
}
CoroutineContext.Element
有个抽象类实现,可以让我们更方便地实现上下文元素:
public abstract class AbstractCoroutineContextElement(public override val key: Key<*>) : Element
以CoroutineName
为例,分析如何实现一个协程上下文元素:
public data class CoroutineName(val name: String/* CoroutineName.Key可以简写为CoroutineName */
) : AbstractCoroutineContextElement(CoroutineName) {public companion object Key : CoroutineContext.Key<CoroutineName>...
}
首先声明一点,传入父类AbstractCoroutineContextElement
的参数是CoroutineName.Key
,只是它可以简写为CoroutineName
。其实这也很好理解,在Kotlin
中,我们调用伴生对象方法的时候,是可以省去伴生对象的类名的,这里也是同样的道理。
CoroutineName
内部声明了一个继承至CoroutineContext.Key
的伴生对象Key
,并将其作为构造参数传入父类AbstractCoroutineContextElement
中,以此作为该协程上下文元素的Key
。
上面是实现协程上下文元素的一种普遍做法,即在协程上下文元素里面定义一个伴生对象,以伴生对象为Key
,标识该上下文元素。
最后再看一下CoroutineContext
的完整定义:
public interface CoroutineContext {// 根据key获取元素public operator fun <E : Element> get(key: Key<E>): E?// 翻译为"折叠",它与上下文元素的累加有关public fun <R> fold(initial: R, operation: (R, Element) -> R): R// 协程上下文元素的累加public operator fun plus(context: CoroutineContext): CoroutineContext = ...// 当前CoroutineContext中,去掉key标识的元素后,剩下的上下文元素(以CoroutineContext形式返回)public fun minusKey(key: Key<*>): CoroutineContextpublic interface Key<E : Element>public interface Element : CoroutineContext {// 标识上下文元素的Keypublic val key: Key<*>// key相同则返回元素自身,否则返回nullpublic override operator fun <E : Element> get(key: Key<E>): E? =@Suppress("UNCHECKED_CAST")if (this.key == key) this as E else null// 执行传入的operation函数public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =operation(initial, this)public override fun minusKey(key: Key<*>): CoroutineContext =if (this.key == key) EmptyCoroutineContext else this}
}
CoroutineContext
的plus
方法:
public operator fun plus(context: CoroutineContext): CoroutineContext =if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creationcontext.fold(this) { acc, element ->val removed = acc.minusKey(element.key)if (removed === EmptyCoroutineContext) element else {// make sure interceptor is always last in the context (and thus is fast to get when present)val interceptor = removed[ContinuationInterceptor]if (interceptor == null) CombinedContext(removed, element) else {val left = removed.minusKey(ContinuationInterceptor)if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) elseCombinedContext(CombinedContext(left, element), interceptor)}}}
为了方便后面阐述,记调用形式为A + B,假设A是含有多个元素的协程上下文,B是单个上下文元素。该方法的大致执行流程如下:
- 若元素B是空的,则返回原来的上下文A。
- 在fold的lambda块中,可以认为acc为A,element为B。
- 若A中减去element.key元素后(记为C),C为空上下文,则返回B(相当于元素B替换了上下文A)。
- 查看C中是否有ContinuationInterceptor元素,没有则将C和B拼接后返回。
- C中剔除ContinuationInterceptor,记为D,若D是空的,则将B和ContinuationInterceptor拼接然后返回。
- D不是空的,则将D和B和ContinuationInterceptor拼接然后返回。
简单来说,这里就是要将「传入的协程上下文元素」与「原来的协程上下文元素」进行拼接,若传入的元素与原来集合中的元素的key
有冲突,则用传入的元素替换掉原来集合中key
冲突的元素。在上下文元素拼接的时候,若有ContinuationInterceptor
元素则要确保它在「协程上下文元素集合」的最右边,这样它的优先级最高,从协程上下文获取该元素的时候可以更快地获取到(至于为什么元素在右边,元素的优先级就高、获取快,在前面介绍CombinedContext
中已经说明过了)。
plus
方法的执行流程很难用文字叙述清楚,如果想要知道它的实现流程,可以代入几个例子试试。但是它具体的执行流程并不是要分析的重点,有个大概的印象即可。
续体拦截机制
这里算是协程实现原理解析的最后一环了。我们在使用协程的时候,会使用到一些调度器如Dispatchers.Main
和Dispatchers.IO
等调度器来调度线程,在前面的分析中并没有提到协程是如何进行线程调度的。
线程的调度与续体拦截器ContinuationInterceptor
有关,它也是一种「协程上下文元素」:
public interface ContinuationInterceptor : CoroutineContext.Element {// 续体拦截器对应的Keycompanion object Key : CoroutineContext.Key<ContinuationInterceptor>// 返回一个续体,该续体对原始的续体进行包装(原始的续体作为方法参数传入)。// 如果该方法不想拦截传入的续体,也可以直接返回原来的续体。// 当原始续体完成时,如果该续体之前被拦截了,协程框架会调用releaseInterceptedContinuation// 方法,传入的参数就是「续体的包装类」。public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>// 该函数只有在interceptContinuation成功拦截的情况下,才会被调用。// 若原始续体成功被拦截,当原始续体完成且不再被使用时,该方法会被调用,传入的参数是「续体的包装类」。public fun releaseInterceptedContinuation(continuation: Continuation<*>) {/* do nothing by default */}...
}
续体拦截器可以用于拦截一个续体,最常见的续体拦截器就是协程调度器CoroutineDispatcher
,可以通过单例类Dispatchers
获取到相应的协程调度器。查看CoroutineDispatcher
的实现:
public abstract class CoroutineDispatcher :AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {@ExperimentalStdlibApipublic companion object Key : AbstractCoroutineContextKey<ContinuationInterceptor, CoroutineDispatcher>(ContinuationInterceptor,{ it as? CoroutineDispatcher }) public open fun isDispatchNeeded(context: CoroutineContext): Boolean = truepublic abstract fun dispatch(context: CoroutineContext, block: Runnable)public open fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = dispatch(context, block)public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =DispatchedContinuation(this, continuation) @InternalCoroutinesApipublic override fun releaseInterceptedContinuation(continuation: Continuation<*>) {(continuation as DispatchedContinuation<*>).reusableCancellableContinuation?.detachChild()}...
}
- 拦截器:
CoroutineDispatcher
继承至ContinuationInterceptor
,所以它也是一种续体拦截器。 - 上下文元素的标识:
CoroutineDispatcher
继承至AbstractCoroutineContextElement
,并传入ContinuationInterceptor.Key
构造参数,以此来标识自身。 - isDispatchNeeded:若需要使用
dispatch
方法进行调度则返回true
,否则返回false
。该方法默认返回true
。协程调度器可以重写该方法,提供一个性能优化以避免不必要的dispatch
,例如主线程调度器Dispatchers.Main
会判断当前协程是否已经在UI
线程中,如果是的话该方法就会返回false
,没有必要再去执行dispatch
方法进行不必要的线程调度。 - dispatch:在给定的上下文和线程中,去执行
block
块。
假设使用的协程调度器是主线程调度器Dispatchers.Main
:
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
查看MainDispatcherLoader.dispatcher
:
@JvmFieldval dispatcher: MainCoroutineDispatcher = loadMainDispatcher()private fun loadMainDispatcher(): MainCoroutineDispatcher {return try {val factories = if (FAST_SERVICE_LOADER_ENABLED) {FastServiceLoader.loadMainDispatcherFactory()} else {// We are explicitly using the// `ServiceLoader.load(MyClass::class.java, MyClass::class.java.classLoader).iterator()`// form of the ServiceLoader call to enable R8 optimization when compiled on Android.ServiceLoader.load(MainDispatcherFactory::class.java,MainDispatcherFactory::class.java.classLoader).iterator().asSequence().toList()}@Suppress("ConstantConditionIf")factories.maxBy { it.loadPriority }?.tryCreateDispatcher(factories)?: createMissingDispatcher()} catch (e: Throwable) {// Service loader can throw an exception as wellcreateMissingDispatcher(e)}}
调用了tryCreateDispatcher
:
public fun MainDispatcherFactory.tryCreateDispatcher(factories: List<MainDispatcherFactory>): MainCoroutineDispatcher =try {createDispatcher(factories)} catch (cause: Throwable) {createMissingDispatcher(cause, hintOnError())}
继续跟踪,发现createDispatcher
是MainDispatcherFactory
接口的一个方法,其中的一个实现在AndroidDispatcherFactory
中:
internal class AndroidDispatcherFactory : MainDispatcherFactory {override fun createDispatcher(allFactories: List<MainDispatcherFactory>) =HandlerContext(Looper.getMainLooper().asHandler(async = true))...
}
HandlerContext
其实就是调度器Dispatchers.Main
的最终实现:
# handler:主线程的Handler
internal class HandlerContext private constructor(private val handler: Handler,private val name: String?,private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {public constructor(handler: Handler,name: String? = null) : this(handler, name, false)...override fun isDispatchNeeded(context: CoroutineContext): Boolean {return !invokeImmediately || Looper.myLooper() != handler.looper}override fun dispatch(context: CoroutineContext, block: Runnable) {handler.post(block)} ...
}
isDispatchNeeded:通过
looper
判断协程当前是否在主线程上,是的话返回false
,表示不需要再进行线程调度,否则返回true
表示需要进行线程调度。dispatch:使用主线程的
handler
对传入的block
块进行post
操作。
对「续体拦截器」「协程调度器」有了一定的了解之后,我们再回过头看一下协程调度器是如何发挥作用的。我们前面分析过Cancellable
文件的startCoroutineCancellable
方法:
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>,onCancellation: ((cause: Throwable) -> Unit)? = null
) =runSafely(completion) {createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)}
在createCoroutineUnintercepted
方法中返回了LaunchLambda
实例,在之前的分析中,我们忽略了intercepted
方法,直接分析为LaunchLambda
会调用resumeCancellableWith
方法,若没有为协程设定续体拦截器,那么确实是LaunchLambda
会直接调用到resumeCancellableWith
方法。我们看看,如果为协程设定了续体拦截器,会发生什么?
查看LaunchLambda
调用的intercepted
方法,它在IntrinsicsJVM
文件中:
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =(this as? ContinuationImpl)?.intercepted() ?: this
LaunchLambda
是ContinuationImpl
类型,因此会调用到父类ContinuationImpl::intercepted
:
internal abstract class ContinuationImpl(completion: Continuation<Any?>?,private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)...@Transientprivate var intercepted: Continuation<Any?>? = null public fun intercepted(): Continuation<Any?> =intercepted?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this).also { intercepted = it }...
}
刚开始intercepted
为null
,所以会判断协程上下文中是否有ContinuationInterceptor
元素,若没有则会返回this
(即LaunchLambda
自身,并将intercepted
变量设置为LaunchLambda
),有的话则会调用interceptContinuation
方法,假设使用的续体拦截器是Dispatchers.Main
,那么就是调用到CoroutineDispatcher
的interceptContinuation
方法,该方法会返回一个DispatchedContinuation
(并将DispatchedContinuation
设置到intercepted
变量中)。
查看CoroutineDispatcher::interceptContinuation
:
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =DispatchedContinuation(this, continuation)
DispatchedContinuation
类:
internal class DispatchedContinuation<in T>(@JvmField val dispatcher: CoroutineDispatcher,@JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {...}
在这里的例子中,dispatcher
就是Dispatchers.Main
,continuation
就是LaunchLambda
。
再回到Cancellable
文件的startCoroutineCancellable
方法:
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>,onCancellation: ((cause: Throwable) -> Unit)? = null
) =runSafely(completion) {createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)}
在有续体拦截器(Dispatchers.Main
)的情况下,intercepted
方法会返回DispatchedContinuation
,接着调用它的resumeCancellableWith
方法:
public fun <T> Continuation<T>.resumeCancellableWith(result: Result<T>,onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)else -> resumeWith(result)
}
调用到另外一个resumeCancellableWith
方法,这个方法就是在DispatchedContinuation
中实现的了:
inline fun resumeCancellableWith(result: Result<T>,noinline onCancellation: ((cause: Throwable) -> Unit)?) {val state = result.toState(onCancellation)if (dispatcher.isDispatchNeeded(context)) { // 需要线程调度_state = stateresumeMode = MODE_CANCELLABLE// 线程调度,将自身以Runnable块形式传入dispatcher.dispatch(context, this)} else { // 不需要线程调度executeUnconfined(state, MODE_CANCELLABLE) {if (!resumeCancelled(state)) {// 最终会调用continuation.resumeWith,即LaunchLambda.resumeWithresumeUndispatchedWith(result)}}}}
可以看到,它调用了dispatcher.isDispatchNeeded
来判断是否需要进行线程调度,以Dispatchers.Main
为例,就是判断当前协程是否在主线程中运行,是的话则不需要调度,否则需要将协程调度到主线程中运行。
- 不需线程调度:最终会调用到
LaunchLambda.resumeWith
,它后续的执行流程之前已经分析过了。 - 需要线程调度:(以主线程的协程调度器为例)最终会将传入的
Runnable
在主线程中执行。
Runnable
的run
方法在哪实现的呢?在DispatchedContinuation
的父类DispatchedTask
中有run
方法的实现:
public final override fun run() {...try {// 获取到的delegate其实就是DispatchedContinuationval delegate = delegate as DispatchedContinuation<T>// 获取到的continuation其实就是LaunchLambdaval continuation = delegate.continuationval context = continuation.contextval state = takeState() // NOTE: Must take state in any case, even if cancelledwithCoroutineContext(context, delegate.countOrElement) {val exception = getExceptionalResult(state)/** Check whether continuation was originally resumed with an exception.* If so, it dominates cancellation, otherwise the original exception* will be silently lost.*/val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else nullif (job != null && !job.isActive) {val cause = job.getCancellationException()cancelCompletedResult(state, cause)continuation.resumeWithStackTrace(cause)} else {if (exception != null) {continuation.resumeWithException(exception)} else {// 正常情况下,会执行到这里,调用LaunchLambda的resume方法continuation.resume(getSuccessfulResult(state))}}}} catch (e: Throwable) {...} finally {...}}
在run
方法中,最终会调用到LaunchLambda
的resume
方法(内部又会调用到resumeWith
方法)。所以这里做的线程调度,其实就是通过主线程的handler
,将代码post
到主线程中去运行,从而完成线程的调度工作。
另外,还有几个未研究的地方与自己的猜想:
一、releaseIntercepted方法:在BaseContinuationImpl::resumeWith
中,每执行完一个续体的invokeSuspend
方法,就会调用该续体的releaseIntercepted
方法
protected override fun releaseIntercepted() {val intercepted = intercepted// intercepted不为null且不为自身(即之前成功拦截续体),就进入If块if (intercepted != null && intercepted !== this) {// 调用续体拦截器的releaseInterceptedContinuation方法,并传入续体包装类context[ContinuationInterceptor]!!.releaseInterceptedContinuation(intercepted)}// 将intercepted变量设置为CompletedContinuationthis.intercepted = CompletedContinuation // just in case}
续体拦截器的releaseInterceptedContinuation
方法应该是做一些资源清理的工作。
二、像withContext
这样的函数:
scope.launch(Dispatchers.Main) {withContext(Dispatchers.IO) {}
}
public suspend fun <T> withContext(context: CoroutineContext,block: suspend CoroutineScope.() -> T
): T {...}
在block
块执行完后,会将线程自动切回「启动协程时的协程调度器所指定」的线程,那么它是如何切回来的呢?个人猜测,在协程至上而下调用的时候,协程上下文会一层一层地向下传递,withContext
的block
块执行的时候,协程上下文会被保存在某个地方,等到block
块执行结束的时候,会从之前保存的协程上下文中取出协程调度器,将剩余的代码(协程恢复)调度到相应的线程中去执行,从而实现了 block
块执行完后,线程会自动切回「启动协程时的协程调度器所指定」的线程。
参考
协程咖啡厅 - 构造魔法 - 探索 Kotlin 协程实现原理 - M.D。
Suspend functions - Kotlin Vocabulary - YouTube。
Kotlin协程实现原理相关推荐
- Kotlin协程 - launch原理 笔记
一.协程是如何创建的? launch.async 可以创建.启动新的协程,那么协程到底是如何创建的? runBlocking {println(Thread.currentThread().name) ...
- pdf 深入理解kotlin协程_Kotlin协程实现原理:挂起与恢复
今天我们来聊聊Kotlin的协程Coroutine. 如果你还没有接触过协程,推荐你先阅读这篇入门级文章What? 你还不知道Kotlin Coroutine? 如果你已经接触过协程,但对协程的原理存 ...
- Kotlin 协程,怎么开始的又是怎么结束的?原理讲解!
九心 | 作者 承香墨影 | 校对 https://juejin.cn/post/6862548590092140558 | 原文 Hi,大家好,这里是承香墨影! 上周我们聊到 Kotlin 协程的使 ...
- plsql develop怎么停止job_Kotlin协程实现原理:CoroutineScopeamp;Job
今天我们来聊聊Kotlin的协程Coroutine. 如果你还没有接触过协程,推荐你先阅读这篇入门级文章What? 你还不知道Kotlin Coroutine? 如果你已经接触过协程,但对协程的原理存 ...
- 一文看透 Kotlin 协程本质
前言 公司开启新项目了,想着准备亮一手 Kotlin 协程应用到项目中去,之前有对 Kotlin 协程的知识进行一定量的学习,以为自己理解协程了,结果--实在拿不出手! 为了更好的加深记忆和理解,更全 ...
- 枯燥的Kotlin协程三部曲(上)——概念启蒙篇
0x0.引言 Kotlin 1.3 版本开始引入协程 Coroutine,简练的官方文档和网上一堆浅尝辄止的文章让我心里有些没底,不想止步于仅仅知道: ① Android中,Kotlin协程用于解决: ...
- 深入理解Kotlin协程suspend工作原理(初学者也能看得懂)
1. 概述 挂起函数是Kotlin协程最重要的一个特性,所有其他概念都建立在它的基础上.所以我们需要深入了解它的工作原理. 挂起协程意味着在中间停止它.这类似于玩游戏,当我们想暂停游戏时,可以先存档, ...
- 探索 Kotlin 协程原理
接下来跟大家分享一下我在了解 Kotlin 协程实现的过程中理解的一些概念,如果你发现哪些地方我说错了的话,欢迎提出你的理解. 1. Kotlin 协程原理概述 Kotlin 协程的大致的执行流程如上 ...
- Kotlin协程:挂起与恢复原理逆向刨析
前言:只有在那崎岖的小路上不畏艰险奋勇攀登的人,才有希望达到光辉的顶点. --马克思 前言 经过前面两篇协程的学习,我相信大家对协程的使用已经非常熟悉了.本着知其然更要知其之所以然的心态,很想知道它里 ...
最新文章
- 原型、原型对象、构造函数、原型链理解
- 中柏平板触摸驱动_一文总览2019年最新最全的工业平板电脑定制化服务
- 王者荣耀服务器维护啥时候结束,王者荣耀维护几点结束今天?11月10日维护公告...
- LeetCode 1191. K 次串联后最大子数组之和(前缀和+分类讨论)
- 如何从一张图片里取出其中一部分_如何鉴别坑人的锌合金龙头
- oracle数据库扩容方案_ORACLE数据库扩容
- rx2700_第二代锐龙 7 2700X 台式处理器 | AMD
- 修复android下webView控件的总结
- 控制x86汇编指令eip的方法
- 2021华为杯数学建模获奖经验分享
- Chrome浏览器离线安装Axure插件
- AHCI驱动问题导致Windows 10卡死的解决办法
- YOLOv5+TensorRT+Win11(Python版)
- 校招和社招有什么区别?不同时期重点不同!
- 问题:现有12个外形相同的小球,只有其中一个小球质量不同(不能确定较重还是较轻),请用天平找出是哪个小球不同,而且还要找出究竟是轻是重?条件:只能称三次
- 7.25 10figting!
- EasyClick 易点云测 IOS版自动化测试工具
- 风口中的智慧城市:智慧的面子,数字的里子
- 无人机动力测试台-自动化测试系统拉力、扭矩、电压、电流、转速和效率
- 前端每日三问#200501怎样在文本框中禁用中文输入法?