Kotlin 协程,怎么开始的又是怎么结束的?原理讲解!
九心 | 作者
承香墨影 | 校对
https://juejin.cn/post/6862548590092140558 | 原文
Hi,大家好,这里是承香墨影!
上周我们聊到 Kotlin 协程的使用,还不了解的可以先从上篇文章开始阅读《协程的使用》。本篇文章,我们继续深入了解 Kotlin 协程的原理。
Kotlin 协程在 JVM 中是 1:1 对应线程的,那么它实现线程切换的原理是什么?就是今天的主题。
前言
今天我们继续聊一聊协程的原理,通过上一篇的学习,相信大家对于如何使用协程已经非常熟悉了。
故事还得从上次协程的分享开始,大家对协程的实践经验有限,下面这段代码如何执行,就引起了讨论。
GlobalScope.launch { val a = async { 1+2 } val b = async { 1+3 } val c = a + b Log.e(TAG,"result:$c")
}
结论无非 2 种:
a 和 b 会串行执行;
a 和 b 会并行执行;
那么执行的结果到底是什么样的?我们将在下面的文章给出。
一、结构简要介绍
首先,我们得明确协程中有哪些东西。
如果你会使用协程,那你肯定知道协程中有 CoroutineScope
、CoroutineContext
和 CoroutineDispatcher
,这些都是使用过程中我们可以接触到的 API。
我简单的整理了协程中主要的基础类:
协程的类结构可分为三部分:CoroutineScope
、CoroutineContext
和 Continuation
。
1. Continuation
如果你会使用协程,那你肯定知道,协程遇到耗时 suspend
操作可以挂起,等到任务结束的时候,协程会自动切回来。
它的奥秘就是 Continuation
,Continuation
可以理解成续体,你可以理解其每次在协程挂起点,将剩余的代码包括起来,等到结束以后执行剩余的内容。
一个协程的代码块可能会被切割成若干个 Continuation
,在每个需要挂起的地方都会分配一个 Continuation
。
先抛出一些结论,协程在做耗时操作的时候,如果执行了耗时 suspend
操作,会自动挂起,但是这个耗时操作终究会被切换到其他线程去做了,做完以后协程就需要切回来,但是切到哪儿呢?这便是 Continuation
需要解决的问题。Continuation
的流程是这样的:
无论是使用 launch
还是 async
启动的协程,都会有一个结束的时候用来回调的 continuation
。
2. CoroutineScope
关于 CoroutineScope
没有特别多要说的,它持有了 CoroutineContext
,主要用于对协程的生命周期进行管理。
3. CoroutineContext
一开始看 CoroutineContext
觉得特别晕,不明白为啥要这么设计,看了 Bennyhuo 大佬的文章以后才稍微好转。
从上面协程类的结构中可以看出,光看这个 CoroutineContext
这个接口(源码内容我们下面讲),会发现它有点像 List
集合,而继承自 CoroutineContext
接口的 Element
接口则定义了其中的元素。随后,这个 Element
接口被划分成了两种类,Job
和 ContinuationInterceptor
:
Job
:从字面上来讲,它代表一个任务,Thread
也是执行任务,所以我们可以理解它定义了协程的一些东西。比如协程的状态,协程和子协程的管理方式等等;ContinuationInterceptor
:也从字面上来看,它是Continuation
的拦截器,通过拦截Continuation
,完成我们想要完成的工作,比如说线程的切换;
二、结构源码分析
上面我们从概念上介绍了协程的三大件,在这部分,我们从源码分析。
1. Continuation
suspend
修饰的方法,会在编译期间被编译器做特殊处理,这种处理被成为CPS(续体转换风格) 转化,suspend
方法会被包裹成 Continuation
。
说了这么久的 Continuation
,我们还没有见过接口代码,由于接口内容不多,我就把所有的内容贴出来了:
/** * Interface representing a continuation after a suspension point that returns a value of type `T`. */
@SinceKotlin("1.3")
public interface Continuation<in T> { /** * The context of the coroutine that corresponds to this continuation. */ public val context: CoroutineContext /** * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the * return value of the last suspension point. */ public fun resumeWith(result: Result<T>)
}
我们重点关注Continuation#resumeWith()
方法。
从注释来看,通过返回 suspend
挂起点的值,来恢复协程的执行,协程可以从参数 Result<T>)
获取成功的值,或者失败的结果,如果没有结果,那么 Result<T>
的泛型是 Unit
。Resulut
这个类也特别简单,感兴趣的同学可以查看源码。
BaseContinuationImpl
实现了 Continuation
接口,我们看一下 Continuation#resumeWith
方法的实现:
internal abstract class BaseContinuationImpl( // 完成后调用的 Continuation public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable { public final override fun resumeWith(result: Result<Any?>) { // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume var current = this var param = result while (true) { probeCoroutineResumed(current) with(current) { val completion = completion!! // fail fast when trying to resume continuation without completion val outcome: Result<Any?> = try { // 1. 执行 suspend 中的代码块 val outcome = invokeSuspend(param) // 2. 如果代码挂起就提前返回 if (outcome === COROUTINE_SUSPENDED) return // 3. 返回结果 Result.success(outcome) } catch (exception: Throwable) { // 3. 返回失败结果 Result.failure(exception) } releaseIntercepted() // this state machine instance is terminating if (completion is BaseContinuationImpl) { // 4. 如果 completion 中还有子 completion,递归 current = completion param = outcome } else { // 5. 结果通知 completion.resumeWith(outcome) return } } } }
}
主要的过程,我在注释中已经标注出来了,我来解释一下 Continuation
的机制。
每个 suspend
方法生成的 BaseContinuationImpl
,其构造方法有一个参数叫 completion
,它也是一个 Continuation
,调用时机是在 suspen
方法执行完毕的时候。
这个流程展示给我们的内容很直观了。
简单起见,我们直接看 3、4 和 5 这一个 launch
启动流程就好。通常一个 launch
生成一个外层 Continuation
一个相应的结果 Continuation
,我们后面称结果 continuation
为 complete
,Continuation
调用顺序是:
调用外层
Continuation
中的Continuation#resumeWith()
方法;该方法会去执行
launch
包裹的代码块,并返回一个结果;将上述代码块执行的结果交给
completion
,由它完成协程结束的通知;
上述的过程,只存在于一个 launch
,并且里面没有执行其他耗时的挂起操作,对于这些情况,我们将会在下面的文章讨论。
抛出问题一:可以看到,在注释 2,遇到耗时的 suspend
,返回的结果是一个 COROUTINE_SUSPENDED
,后面会直接返回,耗时操作结束的时候,我们的 completion
怎么恢复呢?
2. CoroutineContext 和 Element
在概要分析的时候,我们说 CoroutineContext
的结构像一个集合,是从它的接口得出结论的:
public interface CoroutineContext { // get 方法,通过 key 获取 public operator fun <E : Element> get(key: Key<E>): E? // 累加操作 public fun <R> fold(initial: R, operation: (R, Element) -> R): R // 操作符 + , 实际的实现调用了 fold 方法 public operator fun plus(context: CoroutineContext): CoroutineContext // 移除操作 public fun minusKey(key: Key<*>): CoroutineContext // CoroutineContext 定义的 Key public interface Key<E : Element> // CoroutineContext 中元素的定义 public interface Element : CoroutineContext { // key public val key: Key<*> //... }
}
从中我们可以大致看出,CoroutineContext
中可以通过 Key
来获取元素 Element
,并且 Element
接口也是继承自 CoroutineContext
接口。
除此以外,CoroutineContext
支持增加和移除操作,并且支持 +
操作符来完成增加。+
操作符即 plus
方法是有具体实现的,感兴趣的可以自己看一下,主要涉及到了拦截器 ContinuationInterceptor
的添加。
1.1 Job
Job
的注释中阐述定义是这样的:
“A background job. Conceptually, a job is a cancellable thing with a life-cycle that culminates in its completion.
从中我们可以得出:
后台任务;
可取消;
生命周期在完成它的时候结束;
从后台任务的角度来看,Job
听着有点像 Thread
,和 Thread
一样,Job
也有各种状态,文档中对 Job
各种状态的注释(感觉大佬们的注释写的真棒~):
Job
另一个值得关注的点是对子 Job
的管理,主要的规则如下:
子
Job
都会结束的时候,父Job
才会结束;父
Job
取消的时候,子Job
也会取消;
上述的一些内容都可以从 Job
的接口文档中得出。那么,Job
哪里来的?
如果你看一下CoroutineScope#launch
方法,你就会得出结论。该方法的返回类型就是 Job
,我们每次调用该方法,都会创建一个 Job
。
1.2 ContinuationInterceptor
顾名思义,Continuation
拦截器,先看接口:
interface ContinuationInterceptor : CoroutineContext.Element { // ContinuationInterceptor 在 CoroutineContext 中的 Key companion object Key : CoroutineContext.Key<ContinuationInterceptor> /** * 拦截 continuation */ fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> //...
}
这个接口可以提炼的就这两个信息:
拦截器的
Key
,也就是无论后面的一个CoroutineContext
放了多少个拦截器,Key
为ContinuationInterceptor
的拦截器只能有一个;我们都知道,
Continuation
在调用其Continuation#resumeWith()
方法,会执行其suspend
修饰的函数代码块,如果我们提前拦截到,是不是可以做点其他事情,比如说切换线程,这也是ContinuationInterceptor
的作用之一;
需要说明一下,我们通过 Dispatchers
来指定协程发生的线程,Dispatchers
实现了 ContinuationInterceptor
接口。
3. CoroutineScope
CoroutineScope
的接口很简单:
public interface CoroutineScope { public val coroutineContext: CoroutineContext
}
它要求后续的实现都要提供 CoroutineContext
,不过我们都知道,CoroutineContext
是协程中很重要的东西,既包括 Job
,也包括调度器。
在上面的代码中,我多次使用了 Android Jetpack 中的 Lifecycle 中协程的扩展库,好处让获取 CoroutineScope
更加简单,无需在组件 onDestroy
的时候手动 cancel
,并且它的源码超级简单,前提是你会使用 Lifecycle
:
internal class LifecycleCoroutineScopeImpl( override val lifecycle: Lifecycle, override val coroutineContext: CoroutineContext
) : LifecycleCoroutineScope(), LifecycleEventObserver { // ... override fun onStateChanged(source: LifecycleOwner, event: Lifecycle.Event) { if (lifecycle.currentState <= Lifecycle.State.DESTROYED) { lifecycle.removeObserver(this) coroutineContext.cancel() } }
}
并且它也支持你在指定的生命周期调用协程,大家看一下接口就明白了。
三、过程源码分析
先上一段使用代码:
lifecycleScope.launch(Dispatchers.Main) { val a = async { getResult(1, 2) } val b = async { getResult(3, 5) } val c = a.await() + b.await() Log.e(TAG, "result:$c")
} suspend fun getResult(a: Int, b: Int): Int { return withContext(Dispatchers.IO) { delay(1000) return@withContext a + b }
}
虽然代码很简单,但是源码还是比较复杂的,我们分步讲。
第一步 获取 CoroutineScope
我已经在上面说明了,我们使用的 Lifecycle
的协程扩展库,如果我们不使用扩展库,就得使用 MainScope
,它们的 CoroutineContext
都是一样的:
public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main) // LifecycleCoroutineScope
val Lifecycle.coroutineScope: LifecycleCoroutineScope get() { while (true) { // ... val newScope = LifecycleCoroutineScopeImpl( this, SupervisorJob() + Dispatchers.Main.immediate ) // ... return newScope } }
显而易见,MainScope
和 LifecycleCoroutineScope
都使用了 SupervisorJob() + Dispatchers.Main
, 作为它们的 CoroutineContext
。说明一下,SupervisorJob
和 Dispatchers.Main
很重要,它们分别代表了 CoroutineContext
之前提及的 Job
和 ContinuationInterceptor
,后面用到的时候再分析。
第二步 启动协程
直接进入 CoroutineScope#launch()
方法:
public fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit
): Job { val newContext = newCoroutineContext(context) val coroutine = if (start.isLazy) LazyStandaloneCoroutine(newContext, block) else StandaloneCoroutine(newContext, active = true) coroutine.start(start, coroutine, block) return coroutine
}
上面的方法一共有三个参数,前两个不作过多介绍,第三个参数:
block: suspend CoroutineScope.() -> Unit)
这是一个方法,是一个 lambda
参数,同时也表明了它需要被 suspend
修饰。继续看 launch
方法,发现它主要做了两件事:
组合新的
CoroutineContext
;再创建一个
Continuation
;
组合新的CoroutineContext
在第一行代码 val newContext = newCoroutineContext(context)
做了第一件事,这里的 newCoroutineContext(context)
是一个扩展方法:
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext { val combined = coroutineContext + context val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null) debug + Dispatchers.Default else debug
}
CoroutineScope
使用本身的 coroutineContext
集合,利用 +
操作符将我们在 launch
方法中提供的 coroutineContext
添加进来。
再创建一个Continuation
回到上一段代码,通常我们不会指定 start
参数,所以它会使用默认的 CoroutineStart.DEFAULT
,最终 coroutine
会得到一个 StandaloneCoroutine
。StandaloneCoroutine
实现自 AbstractCoroutine
,翻开上面的类图,你会发现,它实现了 Continuation
、Job
和 CoroutineScope
等一堆接口。需要说明一下,这个 StandaloneCoroutine
其实是我们当前 Suspend Contination
的 complete
。接着会调用
coroutine.start(start, coroutine, block)
这就表明协程开始启动了。
第三步 start
进入到 AbstractCoroutine#start
方法:
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) { initParentJob() start(block, receiver, this)
}
跳过层层嵌套,最后到达了:
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) = runSafely(completion) { // 外面再包一层 Coroutine createCoroutineUnintercepted(receiver, completion) // 如果需要,做拦截处理 .intercepted() // 调用 resumeWith 方法 .resumeCancellableWith(Result.success(Unit)) }
虽然这仅仅是一个函数,但是后面主要的逻辑都揭露了:
创建一个没有拦截过的
Continuation
;拦截
Continuation
;执行
Continuation#resumeWith
方法;
第四步 又创建 Continuation
我这里用了 又,因为我们在 launch
中已经创建了一个 AbstractContinuaion
,不过它是一个 complete
,从各个函数的行参就可以看出来。
不过我们 suspend
修饰的外层 Continuation
还没有创建,它来了,是 SuspendLambda
,它继承自 ContinuationImpl
,如果你问我为什么源码中没找到具体实现,我觉得可能跟 suspend
修饰符有关,由编译器处理,但是调用栈确实是这样的。
看一下 SuspendLambda
类的实现:
internal abstract class SuspendLambda( public override val arity: Int, completion: Continuation<Any?>?
) : ContinuationImpl(completion), FunctionBase<Any?>, SuspendFunction { constructor(arity: Int) : this(arity, null) //...
}
可以看到,它的构造方法的形参就包括一个 complete
。
第五步 拦截处理
回到:
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) = runSafely(completion) { // 外面再包一层 Coroutine createCoroutineUnintercepted(receiver, completion) // 如果需要,做拦截处理 .intercepted() // 调用 resumeWith 方法 .resumeCancellableWith(Result.success(Unit)) }
里面的拦截方法 Continuation#intercepted()
方法是一个扩展方法:
@SinceKotlin("1.3")
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> = (this as? ContinuationImpl)?.intercepted() ?: this
createCoroutineUnintercepted(receiver, completion)
返回的是一个 SuspendLambda
,所以它肯定是一个 ContinuationImpl
,看一下它的拦截方法的实现:
internal abstract class ContinuationImpl( completion: Continuation<Any?>?, private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) { constructor(completion: Continuation<Any?>?) : this(completion, completion?.context) public override val context: CoroutineContext get() = _context!! public fun intercepted(): Continuation<Any?> = intercepted ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this) .also { intercepted = it } // ...
}
在 ContinuationImpl#intercepted()
方法中,直接利用 context
这个数据结构通过 context[ContinuationInterceptor]
获取拦截器。
CoroutineDispatcher拦截实现
我们都知道 ContinuationInterceptor
具有拦截作用,它的直接实现是 CoroutineDispatcher
这个抽象类,所有其他调度器都直接或者间接继承这个类,我们关注一下它的拦截方法:
public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor { //... public abstract fun dispatch(context: CoroutineContext, block: Runnable) // 1.拦截的 Continuation 被包了一层 DispatchedContinuation public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> = DispatchedContinuation(this, continuation) //...
} internal class DispatchedContinuation<in T>( @JvmField val dispatcher: CoroutineDispatcher, @JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_ATOMIC_DEFAULT), CoroutineStackFrame, Continuation<T> by continuation { // ... override fun resumeWith(result: Result<T>) { // ... if (dispatcher.isDispatchNeeded(context)) { // 2. 后面一个参数需要提供 Runnable,父类已经实现 dispatcher.dispatch(context, this) } //... } // ...
} // SchedulerTask 是一个 Runnable
internal abstract class DispatchedTask<in T>( @JvmField public var resumeMode: Int
) : SchedulerTask() { // ... public final override fun run() { // ... try { //... withCoroutineContext(context, delegate.countOrElement) { // 3. continuation 是 DispatchedContinuation 包裹的 continuation continuation.resume(...) } } //... }
}
简单来说,就是对原有的 Continuation
的 resumeWith
操作加了一层拦截,就像这样:
加入 CoroutineDispatcher
以后,执行真正的 Continue#resumeWith()
之前,会执行 CoroutineDispatcher#dispatch()
方法,所以我们现在关注 CoroutineDispatcher#dispatch
具体实现即可。
讲一个CoroutineDispatcher具体实现
首先我们得明确这个 CoroutineDispatcher
来自哪里?它从 context
获取,context
来自哪里?
注意 SuspendLambda
和 ContinuationImpl
的构造方法,SuspendLambda
中的参数没有 CoroutineContext
,所以只能来自 completion
中的 CoroutineContext
,而completion
的 CoroutineContext
来自 launch
方法中来自 CoroutineScope
,默认是 SupervisorJob() + Dispatchers.Main
,不过只有 Dispatchers.Main
继承了 CoroutineDispatcher
。Dispatchers.Main
是一个 MainCoroutineDispatcher
,Android 中对应的 MainCoroutineDispatcher
是 HandlerContext
:
internal class HandlerContext private constructor( private val handler: Handler, private val name: String?, private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay { public constructor( handler: Handler, name: String? = null ) : this(handler, name, false) //... override fun dispatch(context: CoroutineContext, block: Runnable) { // 利用主线程的 Handler 执行任务 handler.post(block) } override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) { // 利用主线程的 Handler 延迟执行任务,将完成的 continuation 放在任务中执行 val block = Runnable { with(continuation) { resumeUndispatched(Unit) } } handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY)) continuation.invokeOnCancellation { handler.removeCallbacks(block) } } //..
}
重点来了,调度任务最后竟然交给了主线程的 Handler
,其实想想也对,主线程的任务最后一般都会交给主线程的 Handler
。好奇的同学可能问了,如果不是主线程呢?不是主线程就利用的线程池:
public open class ExperimentalCoroutineDispatcher( private val corePoolSize: Int, private val maxPoolSize: Int, private val idleWorkerKeepAliveNs: Long, private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() { // 执行期 override val executor: Executor get() = coroutineScheduler private var coroutineScheduler = createScheduler() override fun dispatch(context: CoroutineContext, block: Runnable): Unit = try { coroutineScheduler.dispatch(block) } catch (e: RejectedExecutionException) { DefaultExecutor.dispatch(context, block) }
}
结果可以说是很清晰了,coroutineScheduler
是一个线程池,如果想了解具体的过程,同学们可以自行查看代码。
读到这里,你可能有一点明白 CoroutineContext
为什么要设计成一种数据结构:
coroutineContext[ContinuationInterceptor]
就可以直接取到当前协程的拦截器,并且一个协程只能对应一个调度器;调度器都放在其他
coroutineContext
的前面,所以在执行协程的时候,可以做拦截处理;
同理,我们也可以使用 coroutineContext[Job]
获取当前协程。
第六步 resumeWith
再次回到:
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) = runSafely(completion) { // 外面再包一层 Coroutine createCoroutineUnintercepted(receiver, completion) // 如果需要,做拦截处理 .intercepted() // 调用 resumeWith 方法 .resumeCancellableWith(Result.success(Unit)) }
现在我们看 Continue#resumeCancellableWith()
方法,它是一个扩展方法,里面的调度逻辑是:
DispatchContinuation#resumeCancellableWith
;CoroutineDispatcher#dispatch
;Continuation#resumeWith
;
这里的 Continuation
就是 SuspendLambda
,它继承了 BaseContinuationImpl
,我们看一下它的实现方法:
internal abstract class BaseContinuationImpl( public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable { // This implementation is final. This fact is used to unroll resumeWith recursion. public final override fun resumeWith(result: Result<Any?>) { // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume var current = this var param = result while (true) { // Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure // can precisely track what part of suspended callstack was already resumed probeCoroutineResumed(current) with(current) { val completion = completion!! // fail fast when trying to resume continuation without completion val outcome: Result<Any?> = try { // 1. 执行 suspend 里面的代码块 val outcome = invokeSuspend(param) // 2. 如果代码块里面执行了挂起方法,会提前返回 if (outcome === COROUTINE_SUSPENDED) return Result.success(outcome) } catch (exception: Throwable) { Result.failure(exception) } releaseIntercepted() // this state machine instance is terminating if (completion is BaseContinuationImpl) { // 3. 如果完成的completion也是BaseContinuationImpl,就会进入循环 current = completion param = outcome } else { // 4. 执行 completion resumeWith 方法 completion.resumeWith(outcome) return } } } }
}
这边被我分为2个部分:
执行
suspend
方法,并获取结果;调用
complete
(放在下一步讲);
执行suspend方法
在第一处会先执行 suspend
修饰的方法内容,在方法里面可能又会调度 suspend
方法,比如说我们的实例方法:
lifecycleScope.launch(Dispatchers.Main) { val a = async { getResult(1, 2) } val b = async { getResult(3, 5) } val c = a.await() + b.await() Log.e(TAG, "result:$c")
} suspend fun getResult(a: Int, b: Int): Int { return withContext(Dispatchers.IO) { delay(1000) return@withContext a + b }
}
因为我们在 getResult
执行了延时操作,所以我们 launch
方法肯定执行了耗时挂起方法,所以 BaseContinuationImpl#invokeSuspend
方法会返回一个 COROUTINE_SUSPENDED
,结果你也看到了,该方法会提前结束。(说明一下,我没有找到BaseContinuationImpl#invokeSuspend
方法的具体实现,我猜可能跟编译器有关)我猜你肯定跟我一样好奇,遇到耗时挂起会提前返回,那么耗时挂起如何对 complete
进行恢复的?
我们看一下 delay(1000)
这个延时操作在主线程是如何处理的:
public suspend fun delay(timeMillis: Long) { if (timeMillis <= 0) return // don't delay return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> -> cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont) }
} internal class HandlerContext private constructor( private val handler: Handler, private val name: String?, private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay { //... override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) { val block = Runnable { with(continuation) { resumeUndispatched(Unit) } } handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY)) continuation.invokeOnCancellation { handler.removeCallbacks(block) } } //...
}
可以看到,将恢复任务包了一个 Runnable
,交给 Handler
的 Handler#postDelayed()
方法了。
第七步 complete resumeWith
对于 complete
的处理一般会有两种。
complete是BaseContinuationImpl
第一种情况是我们称之为套娃,完成回调的 Continuation
它本身也有自己的完成回调 Continuation
,接下来循环就对了。
调用complete的resumeWith
第二种情况,就是通过 complete
去完成回调,由于 complete
是 AbstractContinuation
,我们看一下它的 resumeWith
:
public abstract class AbstractCoroutine<in T>( /** * The context of the parent coroutine. */ @JvmField protected val parentContext: CoroutineContext, active: Boolean = true
) : JobSupport(active), Job, Continuation<T>, CoroutineScope { // ... public final override fun resumeWith(result: Result<T>) { // 1. 获取当前协程的技术状态 val state = makeCompletingOnce(result.toState()) // 2. 如果当前还在等待完成,说明还有子协程没有结束 if (state === COMPLETING_WAITING_CHILDREN) return // 3. 执行结束恢复的方法,默认为空 afterResume(state) } // 这是父类 JobSupport 中的 makeCompletingOnce 方法 // 为了方便查看,我复制过来 internal fun makeCompletingOnce(proposedUpdate: Any?): Any? { loopOnState { state -> // tryMakeCompleting 的内容主要根据是否有子Job做不同处理 val finalState = tryMakeCompleting(state, proposedUpdate) when { finalState === COMPLETING_ALREADY -> throw IllegalStateException( "Job $this is already complete or completing, " + "but is being completed with $proposedUpdate", proposedUpdate.exceptionOrNull ) finalState === COMPLETING_RETRY -> return@loopOnState else -> return finalState // COMPLETING_WAITING_CHILDREN or final state } } }
}
这段代码的意思其实也很简单,就是协程即将完成,得先评估一下协程的技术状态,别协程还有东西在运行,就给结束了。对于一些有子协程的一些协程,会等待子协程结束的时候,才会结束当前协程。一个 launch
的过程大概就是这样了。大致的流程图是这样的:
下面我们再谈谈 async
。
四、关于async
async
和 launch
的代码相似度很高:
public fun <T> CoroutineScope.async( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> T
): Deferred<T> { val newContext = newCoroutineContext(context) val coroutine = if (start.isLazy) LazyDeferredCoroutine(newContext, block) else DeferredCoroutine<T>(newContext, active = true) coroutine.start(start, coroutine, block) return coroutine
}
最终也会进行三步走:
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) = runSafely(completion) { // 外面再包一层 Coroutine createCoroutineUnintercepted(receiver, completion) // 如果需要,做拦截处理 .intercepted() // 调用 resumeWith 方法 .resumeCancellableWith(Result.success(Unit)) }
不同的是,async
返回的是一个 Deferred<T>
,我们需要调用 Deferred#await()
去获取返回结果,它的实现在 JobSupport
:
private open class DeferredCoroutine<T>( parentContext: CoroutineContext, active: Boolean
) : AbstractCoroutine<T>(parentContext, active), Deferred<T>, SelectClause1<T> { // ... awaitInternal方法来自父类 JobSupport override suspend fun await(): T = awaitInternal() as T // ... // 这是 JobSupport 中的实现 internal suspend fun awaitInternal(): Any? { // 循环获取结果 while (true) { // lock-free loop on state val state = this.state // 1. 如果处于完成状态 if (state !is Incomplete) { if (state is CompletedExceptionally) { // Slow path to recover stacktrace recoverAndThrow(state.cause) } return state.unboxState() } // 2. 除非需要重试,不然就 break if (startInternal(state) >= 0) break } // 等待挂起的方法 return awaitSuspend() // slow-path }
}
它的具体过程可以从我的注释看出,就不一一介绍了,感兴趣的同学可以查看源码。
1. 本文一开始的讨论
本文一开始的代码是错的,连编译器都过不了,尴尬~正确的代码应该是:
GlobalScope.launch { val a = async { 1+2 } val b = async { 1+3 } val c = a.await() + bawait() Log.e(TAG,"result:$c")
}
如果是正确的代码,这里可能分两种情况:
如果你放在 UI 线程,那肯定是串行的,这时候有人说,我在 a
里使用 delay(1000)
,在 b
里使用 delay(2000)
,得到 c
的时候就花了 2000
毫秒啊,这不是并行吗?
事情并不是这样的,delay
操作使用了 Handler#postDelay
方法,一个延迟了 1000
毫秒执行,一个延迟了 2000
毫秒执行,但是主线程只有一个,所以只能是串行。
如果是子线程,通常都是并行的,因为我们使用了线程池啊~
总结
写这边源码分析的时候,一些细节总是找不到,比如说 suspendLambda
的子类找不到,自己对 Kotlin 的学习有待深入。
所以本文有些地方还值得商榷,如果你有更好的理解,欢迎下方交流。
-- End --
本文对你有帮助吗?留言、转发、点好看是最大的支持,谢谢!
往期推荐
一个全新 Flutter UI 适配方案,低入侵 & 100% 还原设计稿!
Kotlin 协程第一弹:协程的使用,一文讲清楚!
Fragment懒加载还在用setUserVisiblity?看AndroidX带来那些新的Api.
Kotlin 协程,怎么开始的又是怎么结束的?原理讲解!相关推荐
- Kotlin 协程与架构组件一起使用及底层原理分析,音视频开发前景
if (!isChangingConfigurations()) { getViewModelStore().clear(); } } } }); } 在Activity的生命周期走到onDestro ...
- 一个小故事讲明白进程、线程、Kotlin 协程到底啥关系?
前言 协程系列文章: 一个小故事讲明白进程.线程.Kotlin 协程到底啥关系? 少年,你可知 Kotlin 协程最初的样子? 讲真,Kotlin 协程的挂起/恢复没那么神秘(故事篇) 讲真,Kotl ...
- Kotlin 协程调度切换线程是时候解开真相了
前言 协程系列文章: 一个小故事讲明白进程.线程.Kotlin 协程到底啥关系? 少年,你可知 Kotlin 协程最初的样子? 讲真,Kotlin 协程的挂起/恢复没那么神秘(故事篇) 讲真,Kotl ...
- 在 Android 开发中使用 Kotlin 协程 (一) -- 初识 Kotlin 协程
前言 最近在研究 Kotlin 协程,发现功能真的超级强大,很有用,而且很好学,如果你正在或计划使用 Kotlin 开发 Android,那么 Kotlin 协程你一定不能错过! 协程是什么? 我们平 ...
- Kotlin协程重新认知 CoroutineContext
转载请标明出处:http://blog.csdn.net/zhaoyanjun6/article/details/121972791 本文出自[赵彦军的博客] 文章目录 前言 2. Coroutine ...
- pdf 深入理解kotlin协程_Kotlin协程实现原理:挂起与恢复
今天我们来聊聊Kotlin的协程Coroutine. 如果你还没有接触过协程,推荐你先阅读这篇入门级文章What? 你还不知道Kotlin Coroutine? 如果你已经接触过协程,但对协程的原理存 ...
- 一文看透 Kotlin 协程本质
前言 公司开启新项目了,想着准备亮一手 Kotlin 协程应用到项目中去,之前有对 Kotlin 协程的知识进行一定量的学习,以为自己理解协程了,结果--实在拿不出手! 为了更好的加深记忆和理解,更全 ...
- Kotlin协程实现原理
前言 本篇解析Kotlin/JVM中的协程的实现原理. 初看suspend关键字 下面的例子模拟一个网络请求: class Temp {suspend fun fetchData(argument: ...
- 破解 Kotlin 协程(6) - 协程挂起篇
关键词:Kotlin 协程 协程挂起 任务挂起 suspend 非阻塞 协程的挂起最初是一个很神秘的东西,因为我们总是用线程的概念去思考,所以我们只能想到阻塞.不阻塞的挂起到底是怎么回事呢?说出来你也 ...
最新文章
- CentOS7.5下yum安装MySQL8图文教程
- 32位浮点数在威纶触摸屏显示_MCGS触摸屏与与西门子 S7-1200 PLC以太网通讯
- java修改数据库表结构_数据库设计(一):设计传统系统表结构(Java开发)
- (三)PHP网页架站
- java list 接口_Java 集合 List接口
- h5页面预览pdf文件_H5移动端在线浏览pdf文件,推荐插件TouchPDF
- webpack2终极优化
- Linux 的 history 命令显示时间
- 激光雕刻机——广告制作新利器
- PS学习笔记一:跟着李涛学PS第一讲——光和色的关系
- idea文件颜色代表的含义
- h桥控制电机刹车_一种电机H桥制动电路的制作方法
- [zkaq靶场]命令执行--IBOS协同办公系统通杀漏洞
- 游戏开发unity编辑器扩展知识系列:进度条显示EditorUtility.ProgressBar
- 微信红包封面免费做了
- 数据库复习 - PART2 - 建模设计与范式
- 树结构(Java实现)
- 套接字中的数据转换(大端模式/小端模式)
- 罗斯蒙特CNG050S290NQEPMZZZ流量计
- usercontroller.java_userlogin 用户登录程序的编写,输入 名和密码,JAVA语言,可运行 Develop 254万源代码下载- www.pudn.com...