九心 | 作者

承香墨影 | 校对

https://juejin.cn/post/6862548590092140558 | 原文

Hi,大家好,这里是承香墨影!

上周我们聊到 Kotlin 协程的使用,还不了解的可以先从上篇文章开始阅读《协程的使用》。本篇文章,我们继续深入了解 Kotlin 协程的原理。

Kotlin 协程在 JVM 中是 1:1 对应线程的,那么它实现线程切换的原理是什么?就是今天的主题。

前言

今天我们继续聊一聊协程的原理,通过上一篇的学习,相信大家对于如何使用协程已经非常熟悉了。

故事还得从上次协程的分享开始,大家对协程的实践经验有限,下面这段代码如何执行,就引起了讨论。

GlobalScope.launch {  val a = async {  1+2  }  val b = async {  1+3  }  val c = a + b  Log.e(TAG,"result:$c")
}

结论无非 2 种:

  1. a 和 b 会串行执行;

  2. a 和 b 会并行执行;

那么执行的结果到底是什么样的?我们将在下面的文章给出。

一、结构简要介绍

首先,我们得明确协程中有哪些东西。

如果你会使用协程,那你肯定知道协程中有 CoroutineScopeCoroutineContextCoroutineDispatcher,这些都是使用过程中我们可以接触到的 API。

我简单的整理了协程中主要的基础类:

协程的类结构可分为三部分:CoroutineScopeCoroutineContextContinuation

1. Continuation

如果你会使用协程,那你肯定知道,协程遇到耗时 suspend 操作可以挂起,等到任务结束的时候,协程会自动切回来。

它的奥秘就是 ContinuationContinuation 可以理解成续体,你可以理解其每次在协程挂起点,将剩余的代码包括起来,等到结束以后执行剩余的内容。

一个协程的代码块可能会被切割成若干个 Continuation,在每个需要挂起的地方都会分配一个 Continuation

先抛出一些结论,协程在做耗时操作的时候,如果执行了耗时 suspend 操作,会自动挂起,但是这个耗时操作终究会被切换到其他线程去做了,做完以后协程就需要切回来,但是切到哪儿呢?这便是 Continuation 需要解决的问题。Continuation 的流程是这样的:

无论是使用 launch 还是 async 启动的协程,都会有一个结束的时候用来回调的 continuation

2. CoroutineScope

关于 CoroutineScope 没有特别多要说的,它持有了 CoroutineContext,主要用于对协程的生命周期进行管理。

3. CoroutineContext

一开始看 CoroutineContext 觉得特别晕,不明白为啥要这么设计,看了 Bennyhuo 大佬的文章以后才稍微好转。

从上面协程类的结构中可以看出,光看这个 CoroutineContext 这个接口(源码内容我们下面讲),会发现它有点像 List 集合,而继承自 CoroutineContext 接口的 Element 接口则定义了其中的元素。随后,这个 Element 接口被划分成了两种类,JobContinuationInterceptor

  • Job:从字面上来讲,它代表一个任务,Thread 也是执行任务,所以我们可以理解它定义了协程的一些东西。比如协程的状态,协程和子协程的管理方式等等;

  • ContinuationInterceptor:也从字面上来看,它是 Continuation 的拦截器,通过拦截 Continuation,完成我们想要完成的工作,比如说线程的切换;

二、结构源码分析

上面我们从概念上介绍了协程的三大件,在这部分,我们从源码分析。

1. Continuation

suspend 修饰的方法,会在编译期间被编译器做特殊处理,这种处理被成为CPS(续体转换风格) 转化suspend 方法会被包裹成 Continuation

说了这么久的 Continuation,我们还没有见过接口代码,由于接口内容不多,我就把所有的内容贴出来了:

/**  * Interface representing a continuation after a suspension point that returns a value of type `T`.  */
@SinceKotlin("1.3")
public interface Continuation<in T> {  /**  * The context of the coroutine that corresponds to this continuation.  */  public val context: CoroutineContext  /**  * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the  * return value of the last suspension point.  */  public fun resumeWith(result: Result<T>)
}

我们重点关注Continuation#resumeWith()方法。

从注释来看,通过返回 suspend 挂起点的值,来恢复协程的执行,协程可以从参数 Result<T>) 获取成功的值,或者失败的结果,如果没有结果,那么 Result<T> 的泛型是 UnitResulut 这个类也特别简单,感兴趣的同学可以查看源码。

BaseContinuationImpl 实现了 Continuation 接口,我们看一下 Continuation#resumeWith 方法的实现:

internal abstract class BaseContinuationImpl(  // 完成后调用的 Continuation  public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {  public final override fun resumeWith(result: Result<Any?>) {  // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume  var current = this  var param = result  while (true) {  probeCoroutineResumed(current)  with(current) {  val completion = completion!! // fail fast when trying to resume continuation without completion  val outcome: Result<Any?> =  try {  // 1. 执行 suspend 中的代码块  val outcome = invokeSuspend(param)  // 2. 如果代码挂起就提前返回  if (outcome === COROUTINE_SUSPENDED) return  // 3. 返回结果  Result.success(outcome)  } catch (exception: Throwable) {  // 3. 返回失败结果  Result.failure(exception)  }  releaseIntercepted() // this state machine instance is terminating  if (completion is BaseContinuationImpl) {  // 4. 如果 completion 中还有子 completion,递归  current = completion  param = outcome  } else {  // 5. 结果通知  completion.resumeWith(outcome)  return  }  }  }  }
}

主要的过程,我在注释中已经标注出来了,我来解释一下 Continuation 的机制。

每个 suspend 方法生成的 BaseContinuationImpl,其构造方法有一个参数叫 completion,它也是一个 Continuation,调用时机是在 suspen 方法执行完毕的时候。

这个流程展示给我们的内容很直观了。

简单起见,我们直接看 3、4 和 5 这一个 launch 启动流程就好。通常一个 launch 生成一个外层 Continuation一个相应的结果 Continuation,我们后面称结果 continuationcompleteContinuation 调用顺序是:

  1. 调用外层 Continuation 中的 Continuation#resumeWith() 方法;

  2. 该方法会去执行 launch 包裹的代码块,并返回一个结果;

  3. 将上述代码块执行的结果交给 completion,由它完成协程结束的通知;

上述的过程,只存在于一个 launch,并且里面没有执行其他耗时的挂起操作,对于这些情况,我们将会在下面的文章讨论。

抛出问题一:可以看到,在注释 2,遇到耗时的 suspend,返回的结果是一个 COROUTINE_SUSPENDED,后面会直接返回,耗时操作结束的时候,我们的 completion 怎么恢复呢?

2. CoroutineContext 和 Element

在概要分析的时候,我们说 CoroutineContext 的结构像一个集合,是从它的接口得出结论的:

public interface CoroutineContext {  // get 方法,通过 key 获取  public operator fun <E : Element> get(key: Key<E>): E?  // 累加操作  public fun <R> fold(initial: R, operation: (R, Element) -> R): R  // 操作符 + , 实际的实现调用了 fold 方法  public operator fun plus(context: CoroutineContext): CoroutineContext  // 移除操作  public fun minusKey(key: Key<*>): CoroutineContext  // CoroutineContext 定义的 Key  public interface Key<E : Element>  // CoroutineContext 中元素的定义  public interface Element : CoroutineContext {  // key  public val key: Key<*>  //...  }
}

从中我们可以大致看出,CoroutineContext 中可以通过 Key 来获取元素 Element,并且 Element 接口也是继承自 CoroutineContext 接口。

除此以外,CoroutineContext 支持增加和移除操作,并且支持 + 操作符来完成增加。+ 操作符即 plus 方法是有具体实现的,感兴趣的可以自己看一下,主要涉及到了拦截器 ContinuationInterceptor 的添加。

1.1 Job

Job 的注释中阐述定义是这样的:

“A background job. Conceptually, a job is a cancellable thing with a life-cycle that culminates in its completion.

从中我们可以得出:

  1. 后台任务;

  2. 可取消;

  3. 生命周期在完成它的时候结束;

从后台任务的角度来看,Job 听着有点像 Thread,和 Thread 一样,Job 也有各种状态,文档中对 Job 各种状态的注释(感觉大佬们的注释写的真棒~):

Job 另一个值得关注的点是对子 Job 的管理,主要的规则如下:

  1. Job都会结束的时候,父 Job 才会结束;

  2. Job 取消的时候,子 Job 也会取消;

上述的一些内容都可以从 Job 的接口文档中得出。那么,Job哪里来的?

如果你看一下CoroutineScope#launch方法,你就会得出结论。该方法的返回类型就是 Job,我们每次调用该方法,都会创建一个 Job

1.2 ContinuationInterceptor

顾名思义,Continuation 拦截器,先看接口:

interface ContinuationInterceptor : CoroutineContext.Element {  // ContinuationInterceptor 在 CoroutineContext 中的 Key  companion object Key : CoroutineContext.Key<ContinuationInterceptor>  /**  * 拦截 continuation  */  fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>  //...
}

这个接口可以提炼的就这两个信息:

  1. 拦截器的 Key,也就是无论后面的一个 CoroutineContext 放了多少个拦截器,KeyContinuationInterceptor 的拦截器只能有一个;

  2. 我们都知道,Continuation 在调用其 Continuation#resumeWith() 方法,会执行其 suspend 修饰的函数代码块,如果我们提前拦截到,是不是可以做点其他事情,比如说切换线程,这也是 ContinuationInterceptor 的作用之一;

需要说明一下,我们通过 Dispatchers 来指定协程发生的线程,Dispatchers 实现了 ContinuationInterceptor接口。

3. CoroutineScope

CoroutineScope 的接口很简单:

public interface CoroutineScope {  public val coroutineContext: CoroutineContext
}

它要求后续的实现都要提供 CoroutineContext,不过我们都知道,CoroutineContext 是协程中很重要的东西,既包括 Job,也包括调度器。

在上面的代码中,我多次使用了 Android Jetpack 中的 Lifecycle 中协程的扩展库,好处让获取 CoroutineScope 更加简单,无需在组件 onDestroy 的时候手动 cancel,并且它的源码超级简单,前提是你会使用 Lifecycle

internal class LifecycleCoroutineScopeImpl(  override val lifecycle: Lifecycle,  override val coroutineContext: CoroutineContext
) : LifecycleCoroutineScope(), LifecycleEventObserver {  // ...  override fun onStateChanged(source: LifecycleOwner, event: Lifecycle.Event) {  if (lifecycle.currentState <= Lifecycle.State.DESTROYED) {  lifecycle.removeObserver(this)  coroutineContext.cancel()  }  }
}

并且它也支持你在指定的生命周期调用协程,大家看一下接口就明白了。

三、过程源码分析

先上一段使用代码:

lifecycleScope.launch(Dispatchers.Main) {  val a = async { getResult(1, 2) }  val b = async { getResult(3, 5) }  val c = a.await() + b.await()  Log.e(TAG, "result:$c")
}   suspend fun getResult(a: Int, b: Int): Int {  return withContext(Dispatchers.IO) {  delay(1000)  return@withContext a + b  }
}

虽然代码很简单,但是源码还是比较复杂的,我们分步讲。

第一步 获取 CoroutineScope

我已经在上面说明了,我们使用的 Lifecycle 的协程扩展库,如果我们不使用扩展库,就得使用 MainScope,它们的 CoroutineContext 都是一样的:

public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)  // LifecycleCoroutineScope
val Lifecycle.coroutineScope: LifecycleCoroutineScope  get() {  while (true) {  // ...  val newScope = LifecycleCoroutineScopeImpl(  this,  SupervisorJob() + Dispatchers.Main.immediate  )  // ...  return newScope  }  }

显而易见,MainScopeLifecycleCoroutineScope 都使用了 SupervisorJob() + Dispatchers.Main, 作为它们的 CoroutineContext。说明一下,SupervisorJobDispatchers.Main 很重要,它们分别代表了 CoroutineContext 之前提及的 JobContinuationInterceptor,后面用到的时候再分析。

第二步 启动协程

直接进入 CoroutineScope#launch() 方法:

public fun CoroutineScope.launch(  context: CoroutineContext = EmptyCoroutineContext,  start: CoroutineStart = CoroutineStart.DEFAULT,  block: suspend CoroutineScope.() -> Unit
): Job {  val newContext = newCoroutineContext(context)  val coroutine = if (start.isLazy)  LazyStandaloneCoroutine(newContext, block) else  StandaloneCoroutine(newContext, active = true)  coroutine.start(start, coroutine, block)  return coroutine
}

上面的方法一共有三个参数,前两个不作过多介绍,第三个参数:

block: suspend CoroutineScope.() -> Unit)

这是一个方法,是一个 lambda 参数,同时也表明了它需要被 suspend 修饰。继续看 launch 方法,发现它主要做了两件事:

  1. 组合新的 CoroutineContext

  2. 再创建一个 Continuation

组合新的CoroutineContext

在第一行代码 val newContext = newCoroutineContext(context) 做了第一件事,这里的 newCoroutineContext(context) 是一个扩展方法:

public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {  val combined = coroutineContext + context  val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined  return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)  debug + Dispatchers.Default else debug
}

CoroutineScope 使用本身的 coroutineContext 集合,利用 + 操作符将我们在 launch 方法中提供的 coroutineContext 添加进来。

再创建一个Continuation

回到上一段代码,通常我们不会指定 start 参数,所以它会使用默认的 CoroutineStart.DEFAULT,最终 coroutine 会得到一个 StandaloneCoroutineStandaloneCoroutine 实现自 AbstractCoroutine,翻开上面的类图,你会发现,它实现了 ContinuationJobCoroutineScope 等一堆接口。需要说明一下,这个 StandaloneCoroutine 其实是我们当前 Suspend Continationcomplete。接着会调用

coroutine.start(start, coroutine, block)

这就表明协程开始启动了。

第三步 start

进入到 AbstractCoroutine#start 方法:

public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {  initParentJob()  start(block, receiver, this)
}

跳过层层嵌套,最后到达了:

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =  runSafely(completion) {  // 外面再包一层 Coroutine  createCoroutineUnintercepted(receiver, completion)  // 如果需要,做拦截处理  .intercepted()  // 调用 resumeWith 方法        .resumeCancellableWith(Result.success(Unit))  }

虽然这仅仅是一个函数,但是后面主要的逻辑都揭露了:

  1. 创建一个没有拦截过的 Continuation

  2. 拦截 Continuation

  3. 执行 Continuation#resumeWith 方法;

第四步 又创建 Continuation

我这里用了 ,因为我们在 launch 中已经创建了一个 AbstractContinuaion,不过它是一个 complete,从各个函数的行参就可以看出来。

不过我们 suspend 修饰的外层 Continuation 还没有创建,它来了,是 SuspendLambda,它继承自 ContinuationImpl,如果你问我为什么源码中没找到具体实现,我觉得可能跟 suspend 修饰符有关,由编译器处理,但是调用栈确实是这样的。

看一下 SuspendLambda 类的实现:

internal abstract class SuspendLambda(  public override val arity: Int,  completion: Continuation<Any?>?
) : ContinuationImpl(completion), FunctionBase<Any?>, SuspendFunction {  constructor(arity: Int) : this(arity, null)  //...
}

可以看到,它的构造方法的形参就包括一个 complete

第五步 拦截处理

回到:

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =  runSafely(completion) {  // 外面再包一层 Coroutine  createCoroutineUnintercepted(receiver, completion)  // 如果需要,做拦截处理  .intercepted()  // 调用 resumeWith 方法        .resumeCancellableWith(Result.success(Unit))  }

里面的拦截方法 Continuation#intercepted() 方法是一个扩展方法:

@SinceKotlin("1.3")
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =  (this as? ContinuationImpl)?.intercepted() ?: this

createCoroutineUnintercepted(receiver, completion) 返回的是一个 SuspendLambda,所以它肯定是一个 ContinuationImpl,看一下它的拦截方法的实现:

internal abstract class ContinuationImpl(  completion: Continuation<Any?>?,  private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {  constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)  public override val context: CoroutineContext  get() = _context!!  public fun intercepted(): Continuation<Any?> =  intercepted  ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)  .also { intercepted = it }  // ...
}

ContinuationImpl#intercepted()方法中,直接利用 context 这个数据结构通过 context[ContinuationInterceptor] 获取拦截器。

CoroutineDispatcher拦截实现

我们都知道 ContinuationInterceptor 具有拦截作用,它的直接实现是 CoroutineDispatcher 这个抽象类,所有其他调度器都直接或者间接继承这个类,我们关注一下它的拦截方法:

public abstract class CoroutineDispatcher :  AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {  //...  public abstract fun dispatch(context: CoroutineContext, block: Runnable)  // 1.拦截的 Continuation 被包了一层 DispatchedContinuation  public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =  DispatchedContinuation(this, continuation)  //...
}  internal class DispatchedContinuation<in T>(  @JvmField val dispatcher: CoroutineDispatcher,  @JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_ATOMIC_DEFAULT), CoroutineStackFrame, Continuation<T> by continuation {  // ...  override fun resumeWith(result: Result<T>) {  // ...  if (dispatcher.isDispatchNeeded(context)) {  // 2. 后面一个参数需要提供 Runnable,父类已经实现  dispatcher.dispatch(context, this)  }   //...  }  // ...
}  // SchedulerTask 是一个 Runnable
internal abstract class DispatchedTask<in T>(  @JvmField public var resumeMode: Int
) : SchedulerTask() {  // ...  public final override fun run() {  // ...  try {  //...  withCoroutineContext(context, delegate.countOrElement) {  // 3. continuation 是 DispatchedContinuation 包裹的 continuation  continuation.resume(...)  }  }  //...  }
}

简单来说,就是对原有的 ContinuationresumeWith 操作加了一层拦截,就像这样:

加入 CoroutineDispatcher 以后,执行真正的 Continue#resumeWith() 之前,会执行 CoroutineDispatcher#dispatch() 方法,所以我们现在关注 CoroutineDispatcher#dispatch 具体实现即可。

讲一个CoroutineDispatcher具体实现

首先我们得明确这个 CoroutineDispatcher 来自哪里?它从 context 获取,context来自哪里?

注意 SuspendLambdaContinuationImpl 的构造方法,SuspendLambda 中的参数没有 CoroutineContext,所以只能来自 completion 中的 CoroutineContext,而completionCoroutineContext 来自 launch 方法中来自 CoroutineScope,默认是 SupervisorJob() + Dispatchers.Main,不过只有 Dispatchers.Main 继承了 CoroutineDispatcherDispatchers.Main 是一个 MainCoroutineDispatcher,Android 中对应的 MainCoroutineDispatcherHandlerContext

internal class HandlerContext private constructor(  private val handler: Handler,  private val name: String?,  private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {  public constructor(  handler: Handler,  name: String? = null  ) : this(handler, name, false)  //...  override fun dispatch(context: CoroutineContext, block: Runnable) {  // 利用主线程的 Handler 执行任务  handler.post(block)  }  override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {  // 利用主线程的 Handler 延迟执行任务,将完成的 continuation 放在任务中执行  val block = Runnable {  with(continuation) { resumeUndispatched(Unit) }  }  handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))  continuation.invokeOnCancellation { handler.removeCallbacks(block) }  }  //..
}

重点来了,调度任务最后竟然交给了主线程的 Handler,其实想想也对,主线程的任务最后一般都会交给主线程的 Handler。好奇的同学可能问了,如果不是主线程呢?不是主线程就利用的线程池:

public open class ExperimentalCoroutineDispatcher(  private val corePoolSize: Int,  private val maxPoolSize: Int,  private val idleWorkerKeepAliveNs: Long,  private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() {  // 执行期  override val executor: Executor  get() = coroutineScheduler  private var coroutineScheduler = createScheduler()  override fun dispatch(context: CoroutineContext, block: Runnable): Unit =  try {  coroutineScheduler.dispatch(block)  } catch (e: RejectedExecutionException) {  DefaultExecutor.dispatch(context, block)  }
}

结果可以说是很清晰了,coroutineScheduler 是一个线程池,如果想了解具体的过程,同学们可以自行查看代码。

读到这里,你可能有一点明白 CoroutineContext 为什么要设计成一种数据结构:

  1. coroutineContext[ContinuationInterceptor] 就可以直接取到当前协程的拦截器,并且一个协程只能对应一个调度器;

  2. 调度器都放在其他 coroutineContext 的前面,所以在执行协程的时候,可以做拦截处理;

同理,我们也可以使用 coroutineContext[Job] 获取当前协程。

第六步 resumeWith

再次回到:

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =  runSafely(completion) {  // 外面再包一层 Coroutine  createCoroutineUnintercepted(receiver, completion)  // 如果需要,做拦截处理  .intercepted()  // 调用 resumeWith 方法        .resumeCancellableWith(Result.success(Unit))  }

现在我们看 Continue#resumeCancellableWith() 方法,它是一个扩展方法,里面的调度逻辑是:

  1. DispatchContinuation#resumeCancellableWith

  2. CoroutineDispatcher#dispatch

  3. Continuation#resumeWith

这里的 Continuation 就是 SuspendLambda,它继承了 BaseContinuationImpl,我们看一下它的实现方法:

internal abstract class BaseContinuationImpl(  public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {  // This implementation is final. This fact is used to unroll resumeWith recursion.  public final override fun resumeWith(result: Result<Any?>) {  // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume  var current = this  var param = result  while (true) {  // Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure  // can precisely track what part of suspended callstack was already resumed  probeCoroutineResumed(current)  with(current) {  val completion = completion!! // fail fast when trying to resume continuation without completion  val outcome: Result<Any?> =  try {  // 1. 执行 suspend 里面的代码块  val outcome = invokeSuspend(param)  // 2. 如果代码块里面执行了挂起方法,会提前返回  if (outcome === COROUTINE_SUSPENDED) return  Result.success(outcome)  } catch (exception: Throwable) {  Result.failure(exception)  }  releaseIntercepted() // this state machine instance is terminating  if (completion is BaseContinuationImpl) {  // 3. 如果完成的completion也是BaseContinuationImpl,就会进入循环  current = completion  param = outcome  } else {  // 4. 执行 completion resumeWith 方法   completion.resumeWith(outcome)  return  }  }  }  }
}

这边被我分为2个部分:

  • 执行 suspend 方法,并获取结果;

  • 调用 complete (放在下一步讲);

执行suspend方法

在第一处会先执行 suspend 修饰的方法内容,在方法里面可能又会调度 suspend 方法,比如说我们的实例方法:

lifecycleScope.launch(Dispatchers.Main) {  val a = async { getResult(1, 2) }  val b = async { getResult(3, 5) }  val c = a.await() + b.await()  Log.e(TAG, "result:$c")
}   suspend fun getResult(a: Int, b: Int): Int {  return withContext(Dispatchers.IO) {  delay(1000)  return@withContext a + b  }
}

因为我们在 getResult 执行了延时操作,所以我们 launch 方法肯定执行了耗时挂起方法,所以 BaseContinuationImpl#invokeSuspend 方法会返回一个 COROUTINE_SUSPENDED,结果你也看到了,该方法会提前结束。(说明一下,我没有找到BaseContinuationImpl#invokeSuspend 方法的具体实现,我猜可能跟编译器有关)我猜你肯定跟我一样好奇,遇到耗时挂起会提前返回,那么耗时挂起如何对 complete 进行恢复的?

我们看一下 delay(1000) 这个延时操作在主线程是如何处理的:

public suspend fun delay(timeMillis: Long) {  if (timeMillis <= 0) return // don't delay  return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->  cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)  }
}  internal class HandlerContext private constructor(  private val handler: Handler,  private val name: String?,  private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {  //...  override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {  val block = Runnable {  with(continuation) { resumeUndispatched(Unit) }  }  handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))  continuation.invokeOnCancellation { handler.removeCallbacks(block) }  }  //...
}

可以看到,将恢复任务包了一个 Runnable,交给 HandlerHandler#postDelayed() 方法了。

第七步 complete resumeWith

对于 complete 的处理一般会有两种。

complete是BaseContinuationImpl

第一种情况是我们称之为套娃,完成回调的 Continuation 它本身也有自己的完成回调 Continuation,接下来循环就对了。

调用complete的resumeWith

第二种情况,就是通过 complete 去完成回调,由于 completeAbstractContinuation,我们看一下它的 resumeWith

public abstract class AbstractCoroutine<in T>(  /**  * The context of the parent coroutine.  */  @JvmField  protected val parentContext: CoroutineContext,  active: Boolean = true
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {  // ...  public final override fun resumeWith(result: Result<T>) {  // 1. 获取当前协程的技术状态  val state = makeCompletingOnce(result.toState())  // 2. 如果当前还在等待完成,说明还有子协程没有结束  if (state === COMPLETING_WAITING_CHILDREN) return  // 3. 执行结束恢复的方法,默认为空  afterResume(state)  }  // 这是父类 JobSupport 中的 makeCompletingOnce 方法  // 为了方便查看,我复制过来  internal fun makeCompletingOnce(proposedUpdate: Any?): Any? {  loopOnState { state ->  // tryMakeCompleting 的内容主要根据是否有子Job做不同处理  val finalState = tryMakeCompleting(state, proposedUpdate)  when {  finalState === COMPLETING_ALREADY ->  throw IllegalStateException(  "Job $this is already complete or completing, " +  "but is being completed with $proposedUpdate", proposedUpdate.exceptionOrNull  )  finalState === COMPLETING_RETRY -> return@loopOnState  else -> return finalState // COMPLETING_WAITING_CHILDREN or final state  }  }  }
}

这段代码的意思其实也很简单,就是协程即将完成,得先评估一下协程的技术状态,别协程还有东西在运行,就给结束了。对于一些有子协程的一些协程,会等待子协程结束的时候,才会结束当前协程。一个 launch 的过程大概就是这样了。大致的流程图是这样的:

下面我们再谈谈 async

四、关于async

asynclaunch 的代码相似度很高:

public fun <T> CoroutineScope.async(  context: CoroutineContext = EmptyCoroutineContext,  start: CoroutineStart = CoroutineStart.DEFAULT,  block: suspend CoroutineScope.() -> T
): Deferred<T> {  val newContext = newCoroutineContext(context)  val coroutine = if (start.isLazy)  LazyDeferredCoroutine(newContext, block) else  DeferredCoroutine<T>(newContext, active = true)  coroutine.start(start, coroutine, block)  return coroutine
}

最终也会进行三步走:

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =  runSafely(completion) {  // 外面再包一层 Coroutine  createCoroutineUnintercepted(receiver, completion)  // 如果需要,做拦截处理  .intercepted()  // 调用 resumeWith 方法        .resumeCancellableWith(Result.success(Unit))  }

不同的是,async 返回的是一个 Deferred<T>,我们需要调用 Deferred#await() 去获取返回结果,它的实现在 JobSupport

private open class DeferredCoroutine<T>(  parentContext: CoroutineContext,  active: Boolean
) : AbstractCoroutine<T>(parentContext, active), Deferred<T>, SelectClause1<T> {  // ... awaitInternal方法来自父类 JobSupport  override suspend fun await(): T = awaitInternal() as T  // ...  // 这是 JobSupport 中的实现  internal suspend fun awaitInternal(): Any? {  // 循环获取结果  while (true) { // lock-free loop on state  val state = this.state  // 1. 如果处于完成状态  if (state !is Incomplete) {  if (state is CompletedExceptionally) { // Slow path to recover stacktrace  recoverAndThrow(state.cause)  }  return state.unboxState()  }  // 2. 除非需要重试,不然就 break  if (startInternal(state) >= 0) break   }  // 等待挂起的方法  return awaitSuspend() // slow-path  }
}

它的具体过程可以从我的注释看出,就不一一介绍了,感兴趣的同学可以查看源码。

1. 本文一开始的讨论

本文一开始的代码是错的,连编译器都过不了,尴尬~正确的代码应该是:

GlobalScope.launch {  val a = async {  1+2  }  val b = async {  1+3  }  val c = a.await() + bawait()  Log.e(TAG,"result:$c")
}

如果是正确的代码,这里可能分两种情况:

如果你放在 UI 线程,那肯定是串行的,这时候有人说,我在 a 里使用 delay(1000),在 b 里使用 delay(2000),得到 c 的时候就花了 2000 毫秒啊,这不是并行吗?

事情并不是这样的,delay 操作使用了 Handler#postDelay 方法,一个延迟了 1000 毫秒执行,一个延迟了 2000 毫秒执行,但是主线程只有一个,所以只能是串行。

如果是子线程,通常都是并行的,因为我们使用了线程池啊~

总结

写这边源码分析的时候,一些细节总是找不到,比如说 suspendLambda 的子类找不到,自己对 Kotlin 的学习有待深入。

所以本文有些地方还值得商榷,如果你有更好的理解,欢迎下方交流。

-- End --

本文对你有帮助吗?留言、转发、点好看是最大的支持,谢谢!

往期推荐

一个全新 Flutter UI 适配方案,低入侵 & 100% 还原设计稿!

Kotlin 协程第一弹:协程的使用,一文讲清楚!

Fragment懒加载还在用setUserVisiblity?看AndroidX带来那些新的Api.

Kotlin 协程,怎么开始的又是怎么结束的?原理讲解!相关推荐

  1. Kotlin 协程与架构组件一起使用及底层原理分析,音视频开发前景

    if (!isChangingConfigurations()) { getViewModelStore().clear(); } } } }); } 在Activity的生命周期走到onDestro ...

  2. 一个小故事讲明白进程、线程、Kotlin 协程到底啥关系?

    前言 协程系列文章: 一个小故事讲明白进程.线程.Kotlin 协程到底啥关系? 少年,你可知 Kotlin 协程最初的样子? 讲真,Kotlin 协程的挂起/恢复没那么神秘(故事篇) 讲真,Kotl ...

  3. Kotlin 协程调度切换线程是时候解开真相了

    前言 协程系列文章: 一个小故事讲明白进程.线程.Kotlin 协程到底啥关系? 少年,你可知 Kotlin 协程最初的样子? 讲真,Kotlin 协程的挂起/恢复没那么神秘(故事篇) 讲真,Kotl ...

  4. 在 Android 开发中使用 Kotlin 协程 (一) -- 初识 Kotlin 协程

    前言 最近在研究 Kotlin 协程,发现功能真的超级强大,很有用,而且很好学,如果你正在或计划使用 Kotlin 开发 Android,那么 Kotlin 协程你一定不能错过! 协程是什么? 我们平 ...

  5. Kotlin协程重新认知 CoroutineContext

    转载请标明出处:http://blog.csdn.net/zhaoyanjun6/article/details/121972791 本文出自[赵彦军的博客] 文章目录 前言 2. Coroutine ...

  6. pdf 深入理解kotlin协程_Kotlin协程实现原理:挂起与恢复

    今天我们来聊聊Kotlin的协程Coroutine. 如果你还没有接触过协程,推荐你先阅读这篇入门级文章What? 你还不知道Kotlin Coroutine? 如果你已经接触过协程,但对协程的原理存 ...

  7. 一文看透 Kotlin 协程本质

    前言 公司开启新项目了,想着准备亮一手 Kotlin 协程应用到项目中去,之前有对 Kotlin 协程的知识进行一定量的学习,以为自己理解协程了,结果--实在拿不出手! 为了更好的加深记忆和理解,更全 ...

  8. Kotlin协程实现原理

    前言 本篇解析Kotlin/JVM中的协程的实现原理. 初看suspend关键字 下面的例子模拟一个网络请求: class Temp {suspend fun fetchData(argument: ...

  9. 破解 Kotlin 协程(6) - 协程挂起篇

    关键词:Kotlin 协程 协程挂起 任务挂起 suspend 非阻塞 协程的挂起最初是一个很神秘的东西,因为我们总是用线程的概念去思考,所以我们只能想到阻塞.不阻塞的挂起到底是怎么回事呢?说出来你也 ...

最新文章

  1. CentOS7.5下yum安装MySQL8图文教程
  2. 32位浮点数在威纶触摸屏显示_MCGS触摸屏与与西门子 S7-1200 PLC以太网通讯
  3. java修改数据库表结构_数据库设计(一):设计传统系统表结构(Java开发)
  4. (三)PHP网页架站
  5. java list 接口_Java 集合 List接口
  6. h5页面预览pdf文件_H5移动端在线浏览pdf文件,推荐插件TouchPDF
  7. webpack2终极优化
  8. Linux 的 history 命令显示时间
  9. 激光雕刻机——广告制作新利器
  10. PS学习笔记一:跟着李涛学PS第一讲——光和色的关系
  11. idea文件颜色代表的含义
  12. h桥控制电机刹车_一种电机H桥制动电路的制作方法
  13. [zkaq靶场]命令执行--IBOS协同办公系统通杀漏洞
  14. 游戏开发unity编辑器扩展知识系列:进度条显示EditorUtility.ProgressBar
  15. 微信红包封面免费做了
  16. 数据库复习 - PART2 - 建模设计与范式
  17. 树结构(Java实现)
  18. 套接字中的数据转换(大端模式/小端模式)
  19. 罗斯蒙特CNG050S290NQEPMZZZ流量计
  20. usercontroller.java_userlogin 用户登录程序的编写,输入 名和密码,JAVA语言,可运行 Develop 254万源代码下载- www.pudn.com...

热门文章

  1. windows7如何安装mysql8_Windows7 安装配置mysql8.0
  2. 一种针对工控系统攻击的远程检测方案(工控系统安全)
  3. Java面试题全集(下)
  4. Business English-Unit 4 Memos -B
  5. [转]linux不需要磁盘碎片整理
  6. 递归一题总结(OJ P1117倒牛奶)
  7. 万字拆解!追溯ChatGPT各项能力的起源
  8. pyTorch入门(六)——实战Android Minist OpenCV手写数字识别(附源码地址)
  9. 删除商品时,如何不影响订单里该商品相关的信息显示
  10. 一个员工的离职成本有多高,超出想象