
  • 0. 引言
  • 1. runBlocking()
    • 1.1. 开启协程
    • 1.2. 同步阻塞式执行协程
  • 2. receive()
    • 2.1. 若receive操作时队列包含Send元素则异步唤醒send协程
    • 2.2. 若receive操作时队列包不含Send元素则挂起receive协程
  • 3. send()
    • 3.1. 若send操作时队列包含receive元素则异步唤醒receive协程
    • 3.2. 若send操作时队列不包含receive元素则挂起send协程

0. 引言

在Kotlin官方文档介绍中,Channel是用于协程间的通信的,它的宗旨是:Do not communicate by sharing memory; instead, share memory by communicating. 下面借助官方文档给的一个Channel的使用例子来感受一下这一通信过程的实现:

private fun testChannel() {runBlocking {val channel = Channel<Int>()launch {// 这里可能是消耗大量 CPU 运算的异步逻辑,我们将仅仅做 5 次整数的平方并发送for (x in 1..5) {channel.send(x * x)println("do send")}}// 这里我们打印了 5 次被接收的整数:repeat(5) {val receive = channel.receive()println(receive)}println("Done!")}


1. runBlocking()


1.1. 开启协程

public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {val currentThread = Thread.currentThread()val contextInterceptor = context[ContinuationInterceptor]val eventLoop: EventLoop?val newContext: CoroutineContextif (contextInterceptor == null) {// create or use private event loop if no dispatcher is specifiedeventLoop = ThreadLocalEventLoop.eventLoopnewContext = GlobalScope.newCoroutineContext(context + eventLoop)} else {// See if context's interceptor is an event loop that we shall use (to support TestContext)// or take an existing thread-local event loop if present to avoid blocking it (but don't create one)eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() }?: ThreadLocalEventLoop.currentOrNull()newContext = GlobalScope.newCoroutineContext(context)}val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop)coroutine.start(CoroutineStart.DEFAULT, coroutine, block)return coroutine.joinBlocking()

首先获取了当前线程实例currentThread,如果在main()函数中调用testChannel()函数,则currentThread即为主线程。在没有指明dispatcher情况下,contextInterceptor == null,这时eventLoop的初始化为:

internal object ThreadLocalEventLoop {private val ref = CommonThreadLocal<EventLoop?>()internal val eventLoop: EventLoopget() = ref.get() ?: createEventLoop().also { ref.set(it) }
internal class BlockingEventLoop(override val thread: Thread
) : EventLoopImplBase()internal actual fun createEventLoop(): EventLoop = BlockingEventLoop(Thread.currentThread())


1.2. 同步阻塞式执行协程


    fun joinBlocking(): T {//空实现registerTimeLoopThread()try {eventLoop?.incrementUseCount()try {while (true) {@Suppress("DEPRECATION")if (Thread.interrupted()) throw InterruptedException().also { cancelCoroutine(it) }val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE// note: process next even may loose unpark flag, so check if completed before parkingif (isCompleted) breakparkNanos(this, parkNanos)}} finally { // paranoiaeventLoop?.decrementUseCount()}} finally { // paranoia//空实现unregisterTimeLoopThread()}// now return resultval state = this.state.unboxState()(state as? CompletedExceptionally)?.let { throw it.cause }return state as T}

主要逻辑是当isCompleted = false时会一直运行while循环,循环内通过eventLoop的processNextEvent()方法来执行添加到eventLoop上的task:

 //class EventLoopImplBaseoverride fun processNextEvent(): Long {// unconfined events take priorityif (processUnconfinedEvent()) return nextTime// queue all delayed tasks that are due to be executedval delayed = _delayed.valueif (delayed != null && !delayed.isEmpty) {val now = nanoTime()while (true) {// make sure that moving from delayed to queue removes from delayed only after it is added to queue// to make sure that 'isEmpty' and `nextTime` that check both of them// do not transiently report that both delayed and queue are empty during movedelayed.removeFirstIf {if (it.timeToExecute(now)) {enqueueImpl(it)} elsefalse} ?: break // quit loop when nothing more to remove or enqueueImpl returns false on "isComplete"}}// then process one event from queuedequeue()?.run()return nextTime}protected override val nextTime: Longget() {if (super.nextTime == 0L) return 0Lval queue = _queue.valuewhen {queue === null -> {} // empty queue -- proceedqueue is Queue<*> -> if (!queue.isEmpty) return 0 // non-empty queuequeue === CLOSED_EMPTY -> return Long.MAX_VALUE // no more events -- closedelse -> return 0 // non-empty queue}val nextDelayedTask = _delayed.value?.peek() ?: return Long.MAX_VALUEreturn (nextDelayedTask.nanoTime - nanoTime()).coerceAtLeast(0)}

这段代码在Coroutine挂起与恢复分析这篇文章中分析过,可以看到如果delayed队列中的第一个任务达到可执行的时机,会从delayed队列移除,然后调用enqueueImpl(it)方法加入到_queue队列,再调用dequeue()方法从_queue队列取出任务执行。返回的nextTime实际是下一个任务可执行时间点距离现在的时间间隔,也即是joinBlocking()中parkNanos的值。下一步调用parkNanos(this, parkNanos)来阻塞当前线程,当有新任务被添加进delayed队列时会调用unpark()函数唤醒当前线程。回到joinBlocking()函数,最后会执行decrementUseCount()函数来减少incrementUseCount()中增加的useCount值,并完成一些收尾工作:

//class EventLoop
fun incrementUseCount(unconfined: Boolean = false) {useCount += delta(unconfined)if (!unconfined) shared = true }
//class EventLoop
fun decrementUseCount(unconfined: Boolean = false) {useCount -= delta(unconfined)if (useCount > 0) returnassert { useCount == 0L } // "Extra decrementUseCount"if (shared) {// shut it down and remove from ThreadLocalEventLoopshutdown()}}
//class EventLoopImplBaseoverride fun shutdown() {// Clean up thread-local reference here -- this event loop is shutting downThreadLocalEventLoop.resetEventLoop()// We should signal that this event loop should not accept any more tasks// and process queued events (that could have been added after last processNextEvent)//此时会跳出joinBlocking()方法中的循环isCompleted = truecloseQueue()// complete processing of all queued taskswhile (processNextEvent() <= 0) { /* spin */ }// reschedule the rest of delayed tasksrescheduleAllDelayed()}


2. receive()


public final override suspend fun receive(): E {// fast path -- try poll non-blockingval result = pollInternal()/** If result is Closed -- go to tail-call slow-path that will allow us to* properly recover stacktrace without paying a performance cost on fast path.* We prefer to recover stacktrace using suspending path to have a more precise stacktrace.*/@Suppress("UNCHECKED_CAST")if (result !== POLL_FAILED && result !is Closed<*>) return result as E// slow-path does suspendreturn receiveSuspend(RECEIVE_THROWS_ON_CLOSE)}

2.1. 若receive操作时队列包含Send元素则异步唤醒send协程


//class AbstractSendChannel
protected val queue = LockFreeLinkedListHead()/*** Tries to remove element from buffer or from queued sender.* Return type is `E | POLL_FAILED | Closed`* @suppress **This is unstable API and it is subject to change.***/protected open fun pollInternal(): Any? {while (true) {val send = takeFirstSendOrPeekClosed() ?: return POLL_FAILEDval token = send.tryResumeSend(null)if (token != null) {assert { token === RESUME_TOKEN }send.completeResumeSend()return send.pollResult}}}protected fun takeFirstSendOrPeekClosed(): Send? =queue.removeFirstIfIsInstanceOfOrPeekIf<Send> { it is Closed<*> }


2.2. 若receive操作时队列包不含Send元素则挂起receive协程

private suspend fun <R> receiveSuspend(receiveMode: Int): R = suspendAtomicCancellableCoroutineReusable sc@ { cont ->val receive = ReceiveElement<E>(cont as CancellableContinuation<Any?>, receiveMode)while (true) {if (enqueueReceive(receive)) {//第一次调用receive()会执行这里removeReceiveOnCancel(cont, receive)return@sc}// hm... something is not right. try to pollval result = pollInternal()if (result is Closed<*>) {receive.resumeReceiveClosed(result)return@sc}if (result !== POLL_FAILED) {cont.resume(receive.resumeValue(result as E))return@sc}}}

首先挂起当前调用receive()的地方,并将挂起的续体cont和receiveMode = RECEIVE_THROWS_ON_CLOSE传给ReceiveElement构造函数生成一个receive对象。接着调用enqueueReceive(receive)方法将receive对象添加到AbstractChannel类中queue队列中,若前驱对象不是send,则其返回值为true,因此会调用removeReceiveOnCancel()来处理取消时的逻辑然后就返回了,若前驱对象是send,则表明可以马上接收数据,因此调用pollInternal()方法获取send对象中的pollResult,并赋值给result。最后根据result的值来判断通过receive唤醒接收或者通过cont来唤醒接收。先看一下将receive添加进queue队列的过程:

//class AbstractSendChannel
protected val queue = LockFreeLinkedListHead()private fun enqueueReceive(receive: Receive<E>) = enqueueReceiveInternal(receive).also { result ->if (result) onReceiveEnqueued()}//class LockFreeLinkedListNode
protected open fun enqueueReceiveInternal(receive: Receive<E>): Boolean = if (isBufferAlwaysEmpty)queue.addLastIfPrev(receive) { it !is Send } elsequeue.addLastIfPrevAndIf(receive, { it !is Send }, { isBufferEmpty })public actual inline fun addLastIfPrev(node: Node, predicate: (Node) -> Boolean): Boolean {while (true) { // lock-free loop on prev.nextval prev = prevNode // sentinel node is never removed, so prev is always definedif (!predicate(prev)) return falseif (prev.addNext(node, this)) return true}}


//class AbstractChannel
protected open fun pollInternal(): Any? {while (true) {val send = takeFirstSendOrPeekClosed() ?: return POLL_FAILEDval token = send.tryResumeSend(null)if (token != null) {assert { token === RESUME_TOKEN }send.completeResumeSend()return send.pollResult}}}protected fun takeFirstSendOrPeekClosed(): Send? =queue.removeFirstIfIsInstanceOfOrPeekIf<Send> { it is Closed<*> }

调用takeFirstSendOrPeekClosed()方法拿到queue队列中第一个Send元素,然后恢复Send中的挂起的续体对象。回到receiveSuspend()方法,如果result is Closed<*>为真,则表明是异常情况,调用ReceiveElement类的resumeReceiveClosed()函数恢复调用channel.receive()挂起的续体对象:

//class ReceiveElement
override fun resumeReceiveClosed(closed: Closed<*>) {when {receiveMode == RECEIVE_NULL_ON_CLOSE && closed.closeCause == null -> cont.resume(null)receiveMode == RECEIVE_RESULT -> cont.resume(closed.toResult<Any>())else -> cont.resumeWithException(closed.receiveException)}}

构造ReceiveElement对象时,receiveMode的值为RECEIVE_THROWS_ON_CLOSE,因此会执行cont.resumeWithException(closed.receiveException)方法恢复协程,恢复结果是closed.receiveException。若过没有出现异常关闭,且result !== POLL_FAILED,则调用cont.resume()恢复协程。

3. send()

从上一节分析可知:调用receive()方法过程中构造了一个ReceiveElement对象,并被添加在AbstractChannel对象的queue队列中。接下里就会执行joinBlocking()函数中while循环的eventLoop?.processNextEvent(),因为执行launch()函数时会将一个DispatchedContinuation对象添加到EventLoopImplBase中的_queue队列尾部。因此这里会拿到该DispatchedContinuation对象,并执行其run()方法,恢复续体执行,因此会执行launch {}代码段。

public final override suspend fun send(element: E) {// fast path -- try offer non-blockingif (offerInternal(element) === OFFER_SUCCESS) return// slow-path does suspend or throws exceptionreturn sendSuspend(element)}


3.1. 若send操作时队列包含receive元素则异步唤醒receive协程

protected open fun offerInternal(element: E): Any {while (true) {val receive = takeFirstReceiveOrPeekClosed() ?: return OFFER_FAILEDval token = receive.tryResumeReceive(element, null)if (token != null) {assert { token === RESUME_TOKEN }receive.completeResumeReceive(element)return receive.offerResult}}}protected open fun takeFirstReceiveOrPeekClosed(): ReceiveOrClosed<E>? =queue.removeFirstIfIsInstanceOfOrPeekIf<ReceiveOrClosed<E>>({ it is Closed<*> })


override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? {val token = cont.tryResume(resumeValue(value), otherOp?.desc) ?: return nullassert { token === RESUME_TOKEN } // the only other possible result// We can call finishPrepare only after successful tryResume, so that only good affected node is savedotherOp?.finishPrepare()return RESUME_TOKEN}
override fun tryResume(value: T, idempotent: Any?): Any? {_state.loop { state ->when (state) {is NotCompleted -> {val update: Any? = if (idempotent == null) value elseCompletedIdempotentResult(idempotent, value)if (!_state.compareAndSet(state, update)) return@loop // retry on cas failuredetachChildIfNonResuable()return RESUME_TOKEN}is CompletedIdempotentResult -> {return if (state.idempotentResume === idempotent) {assert { state.result === value } // "Non-idempotent resume"RESUME_TOKEN} else {null}}else -> return null // cannot resume -- not active anymore}}}


override fun <T> getSuccessfulResult(state: Any?): T =when (state) {is CompletedIdempotentResult -> state.result as Tis CompletedWithCancellation -> state.result as Telse -> state as T}


//class ReceiveElement
override fun completeResumeReceive(value: E) = cont.completeResume(RESUME_TOKEN)//CancellableContinuationImpl.kt
override fun completeResume(token: Any) {assert { token === RESUME_TOKEN }dispatchResume(resumeMode)}private fun dispatchResume(mode: Int) {if (tryResume()) return // completed before getResult invocation -- bail out// otherwise, getResult has already commenced, i.e. completed later or in other threaddispatch(mode)}

最终回调用CancellableContinuationImpl类的父类DispatchedTask的dispatch()方法来分发,从 Coroutine挂起-从源码看挂起这篇文章中可知分发过程最后会将该恢复操作封装程一个Task添加到EventLoopImplBase类的_queue队列中。当上一节分析runBlock()函数时调用processNextEvent()时会从时间循环的_queue队列取出该Task执行。

3.2. 若send操作时队列不包含receive元素则挂起send协程


private suspend fun sendSuspend(element: E): Unit = suspendAtomicCancellableCoroutineReusable sc@ { cont ->loop@ while (true) {if (isFullImpl) {val send = SendElement(element, cont)val enqueueResult = enqueueSend(send)when {enqueueResult == null -> { // enqueued successfullycont.removeOnCancellation(send)return@sc}enqueueResult is Closed<*> -> {cont.helpCloseAndResumeWithSendException(enqueueResult)return@sc}enqueueResult === ENQUEUE_FAILED -> {} // try to offer insteadenqueueResult is Receive<*> -> {} // try to offer insteadelse -> error("enqueueSend returned $enqueueResult")}}// hm... receiver is waiting or buffer is not full. try to offerval offerResult = offerInternal(element)when {offerResult === OFFER_SUCCESS -> {cont.resume(Unit)return@sc}offerResult === OFFER_FAILED -> continue@loopofferResult is Closed<*> -> {cont.helpCloseAndResumeWithSendException(offerResult)return@sc}else -> error("offerInternal returned $offerResult")}}}


/*** Result is:* * null -- successfully enqueued* * ENQUEUE_FAILED -- buffer is not full (should not enqueue)* * ReceiveOrClosed<*> -- receiver is waiting or it is closed (should not enqueue)*/protected open fun enqueueSend(send: Send): Any? {if (isBufferAlwaysFull) {queue.addLastIfPrev(send) { prev ->if (prev is ReceiveOrClosed<*>) return@enqueueSend prevtrue}} else {if (!queue.addLastIfPrevAndIf(send, { prev ->if (prev is ReceiveOrClosed<*>) return@enqueueSend prevtrue}, { isBufferFull }))return ENQUEUE_FAILED}return null}



