认识channel

channel是一个并发安全的队列,可以连接协程,实现不同协程的通信。

Library中定义了几种类型的Channel。 它们在内部能够存储多种元素,只是在send调用是否能够挂起方面有所不一样。 对于全部通道类型,receive调用的行为方式相同:若是通道不为空,则接收元素,不然将挂起。

Unlimited channel

无限制通道(Unlimited channel)是最接近队列的模拟:生产者能够将元素发送到此通道,而且它将无限增加。 send方法将永远不会被挂起。 若是没有更多的内存,则会抛出OutOfMemoryException。 和队列不一样的是当使用者尝试从空通道接收消息并被挂起直到有一些新元素发送到该通道时继续使用。

Buffered channel

缓冲通道(Buffered channel)的大小受指定数字的限制。 生产者能够将元素发送到此通道,直到达到最大限制。 全部元素都在内部存储。 通道已满时,下一个send呼叫将被挂起,直到出现更多可用空间。

Rendezvous channel

"约定"通道(Rendezvous channel)是没有缓冲区的通道。 这与建立大小为零的缓冲通道(Buffered channel)相同。 其中一个功能(send或receive)始终被挂起,直到调用另外一个功能为止。 若是调用了send函数,但消费者没有准备好处理该元素则receive会挂起,而且send也会被挂起。 一样,若是调用了receive函数且通道为空,换句话说,没有准备好发送该元素的的send被挂起-receive也会被挂起。

Conflated channel

发送到合并通道( Conflated channel)的新元素将覆盖先前发送的元素,所以接收方将始终仅能获取最新元素。 send调用将永远不会被挂起。
建立通道时,指定其类型或缓冲区大小(若是须要缓冲的通道):

val rendezvousChannel = Channel<String>()
val bufferedChannel = Channel<String>(10)
val conflatedChannel = Channel<String>(CONFLATED)
val unlimitedChannel = Channel<String>(UNLIMITED)

默认状况下,会建立一个"约定"通道(Rendezvous channel)。

在如下示例中,将建立一个"约定"通道,两个生产者协程和一个消费者协程:

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.*fun main() = runBlocking<Unit> {val channel = Channel<String>()launch {channel.send("A1")channel.send("A2")log("A done")}launch {channel.send("B1")log("B done")}launch {repeat(3) {val x = channel.receive()log(x)}}
}fun log(message: Any?) {println("[${Thread.currentThread().name}] $message")
}以上将会打印以下结果:[main @coroutine#4] A1
[main @coroutine#4] B1
[main @coroutine#2] A done
[main @coroutine#3] B done
[main @coroutine#4] A2

channel实际上是一个队列,队列中一定存在缓冲区,这个缓冲区满了并且一直没有人调用receive取走函数,send就需要挂起,故意让接收端的节奏放慢,发现send总是被挂起,直到receive之后才会继续往下执行。

    fun run1() {val channel = Channel<Int>(Channel.UNLIMITED)//Channel协程间通信,并发安全的队列runBlocking {//生产者val p = launch {for (i in 1..5) {channel.send(i)log("send = $i")}}//消费者val c = launch {//正常接收数据while (true) {//故意让接收端的节奏放慢,发现send总是被挂起,直到receive之后才会继续往下执行delay(2000)val el = channel.receive()log("re = $el")}//通过迭代器iterator接收数据//val iterator = channel.iterator()//while (iterator.hasNext()) {//    delay(2000)//    log("iterator = ${iterator.next()}")//}}joinAll(p,c)}}打印:
com.z.zjetpack V/zx: send = 1
com.z.zjetpack V/zx: send = 2
com.z.zjetpack V/zx: send = 3
com.z.zjetpack V/zx: send = 4
com.z.zjetpack V/zx: send = 5
com.z.zjetpack V/zx: re = 1
com.z.zjetpack V/zx: re = 2
com.z.zjetpack V/zx: re = 3
com.z.zjetpack V/zx: re = 4
com.z.zjetpack V/zx: re = 5

produce 与actor

  • 构造生产者与消费者的便捷方法
  • 我们可以通过produce方法启动一个生产者协程,并返回一个reveive channel,其他协程就可以用这个channel来接收数据了。反过来我们可以用actor启动一个消费者协程。
    fun run2(){runBlocking {//快捷创建生产者协程,返回一个接收Channelval receiveChannel = produce<Int> {repeat(5){delay(1000)send(it)}}val job2 = launch {for (i in receiveChannel) {log("receiveChannel = $i")}}job2.join()}runBlocking {//构造消费者的便捷方法val sendChannel = actor<Int> {while (true) {val re = receive()log("re = $re")}}val p =  launch {for (i in 1..3) {sendChannel.send(i)}}p.join()}}打印:
com.z.zjetpack V/zx: receiveChannel = 0
com.z.zjetpack V/zx: receiveChannel = 1
com.z.zjetpack V/zx: receiveChannel = 2
com.z.zjetpack V/zx: receiveChannel = 3
com.z.zjetpack V/zx: receiveChannel = 4
com.z.zjetpack V/zx: re = 1
com.z.zjetpack V/zx: re = 2
com.z.zjetpack V/zx: re = 3

channel的关闭

  • produce和actor返回的channel都会随着对应的协程执行完毕而关闭,也正式这样,channel才会被称为热数据流.
  • 对于一个channel,如果我们调用了它的close方法,它会立即停止接收新元素,它的isClosedForSend会立即返回true,由于channel缓冲区的存在,可能还有一些元素没有被处理完,所以要等所有元素都被读取之后isClosedForReceive才会返回true
  • channel的生命周期最好由主导方来维护,建议由主导的一方实现关闭。
    fun run3(){runBlocking {val channel = Channel<Int>(3)//生产者launch {List(3){channel.send(it)log("send = $it")}channel.close()log("isClosedForSend = ${channel.isClosedForSend}")log("isClosedForReceive = ${channel.isClosedForReceive}")}//消费者launch {for (c in channel) {log("re = $c")delay(1000)}log("消费isClosedForSend = ${channel.isClosedForSend}")log("消费isClosedForReceive = ${channel.isClosedForReceive}")}}}打印:
com.z.zjetpack V/zx: send = 0
com.z.zjetpack V/zx: send = 1
com.z.zjetpack V/zx: send = 2
com.z.zjetpack V/zx: isClosedForSend = true
com.z.zjetpack V/zx: isClosedForReceive = false
com.z.zjetpack V/zx: re = 0
com.z.zjetpack V/zx: re = 1
com.z.zjetpack V/zx: re = 2
com.z.zjetpack V/zx: 消费isClosedForSend = true
com.z.zjetpack V/zx: 消费isClosedForReceive = true

BroadcastChannel

发送端和接收端在channel中存在一对多的场景,虽然有多个接收端,但是同一个元素只会被一个接收端读取到,广播则不同,多个接收端不存在互斥行为。

    fun run4() {runBlocking {//直接创建// val broadcastChannel = BroadcastChannel<Int>(Channel.BUFFERED)//broadcast方法创建val channel = Channel<Int>()val broadcastChannel = channel.broadcast(Channel.BUFFERED)//创建3个协程来接收List(3) {launch {val receiveChannel = broadcastChannel.openSubscription()for (r in receiveChannel) {log("协程 $it, re = $r")}}}launch {List(3) {broadcastChannel.send(it)}broadcastChannel.close()}}}打印:
com.z.zjetpack V/zx: 协程 0, re = 0
com.z.zjetpack V/zx: 协程 0, re = 1
com.z.zjetpack V/zx: 协程 0, re = 2
com.z.zjetpack V/zx: 协程 1, re = 0
com.z.zjetpack V/zx: 协程 1, re = 1
com.z.zjetpack V/zx: 协程 1, re = 2
com.z.zjetpack V/zx: 协程 2, re = 0
com.z.zjetpack V/zx: 协程 2, re = 1
com.z.zjetpack V/zx: 协程 2, re = 2

多路复用

复用多个await

两个api分别从网络和本地获取数据,期望哪个先返回就先用哪个做显示

    fun CoroutineScope.getFromLocal() = async {delay(1000)"返回本地数据"}fun CoroutineScope.getFromNet() = async {"返回网络数据"}fun run5() {runBlocking {launch {val local = getFromLocal()val net = getFromNet()val res = select<String> {local.onAwait { it }net.onAwait { it }}log("值 = $res")}.join()}}打印:
com.z.zjetpack V/zx: 值 = 返回网络数据

复用多个channel

跟await类似,会接收到最快的那个channel消息

    fun run6() {runBlocking {val channels = listOf(Channel<Int>(), Channel<Int>())launch {delay(100)channels[0].send(1)}launch {delay(500)channels[1].send(5)}val result = select<Int> {channels.forEach { re ->re.onReceive{it}}}log("result = $result")}}
打印:
com.z.zjetpack V/zx: result = 1

SelectClause

哪些事件可以被select?SelectClause类型
包括:
SelectClause0:对应事件没有返回值,例如 join 没有返回值,对应的 onJoin 就是这个类型,使用时 onJoin 的参数是一个无参函数:

   public val onJoin: SelectClause0runBlocking {val job1 = launch {delay(100)log("job1")}val job2 = launch {delay(10)log("job2")}select<Unit> {job1.onJoin {log("job1.onJoin")}job2.onJoin {log("job2.onJoin")}}}打印:
com.z.zjetpack V/zx: job2
com.z.zjetpack V/zx: job2.onJoin
com.z.zjetpack V/zx: job1

SelectClause1:对应事件有返回值,前面的 onAwait 和 onReceive 都是此类情况。

    public val onAwait: SelectClause1<T>public val onReceive: SelectClause1<E>

SelectClause2:对应事件有返回值,此外还需要额外的一个参数,例如 Channel.onSend 有两个参数,第一个就是一个 Channel 数据类型的值,表示即将发送的值,第二个是发送成功时的回调。
如果我们想要确认挂起函数是否支持select,查看是否存在对应的SelectClauseN类型可回调即可

 //返回SelectClause2public val onSend: SelectClause2<E, SendChannel<E>>runBlocking {val channels = listOf(Channel<Int>(), Channel<Int>())launch {select<Unit> {launch {delay(100)channels[0].onSend(1) { sendChannel ->log("send on $sendChannel")}}launch {delay(500)channels[1].onSend(5) { sendChannel ->log("send on $sendChannel")}}}}launch {for (c in channels) {log("数据  = ${c.receive()}")}}}
打印:
com.z.zjetpack V/zx: send on RendezvousChannel@63db1bf{EmptyQueue}
com.z.zjetpack V/zx: 数据  = 1

Flow实现多路复用

coroutineScope {val login = "..."listOf(::getUserFromApi, ::getUserFromLocal) ... ①.map { function ->function.call(login) ... ②}.map { deferred ->flow { emit(deferred.await()) } ... ③}.merge() ... ④.onEach { user ->println("Result: $user")}.launchIn(this)
}

这其中,① 处用创建了两个函数引用组成的 List;② 处调用它们得到 deferred;③ 处比较关键,对于每一个 deferred 我们创建一个单独的 Flow,并在 Flow 内部发送 deferred.await() 返回的结果,即返回的 User 对象;现在我们有了两个 Flow 实例,我们需要将它们整合成一个 Flow 进行处理,调用 merge 函数即可。

协程的并发安全

除了线程中常用的解决并发安全问题的手段外,协程提供了一些并发安全的工具

  • channel:并发安全的消息通道
  • Mutex:轻量级锁,lock和unlock和线程锁类似,轻量级是说它在获取不到锁时不会阻塞线程而是挂起等待锁的释放。
  • Semaphore:轻量级信号量,信号量可以有多个,协程在获取到信号量后即可执行并发操作,当Semaphore的参数为1时,效果等同于Mutex
    fun run7() {runBlocking {var count = 0List(10000) {//GlobalScope是线程不安全的GlobalScope.launch {count++}}.joinAll()log("默认count = $count")}//使用volatile解决并发问题runBlocking {var count = AtomicInteger(0)List(10000) {//GlobalScope是线程不安全的GlobalScope.launch {count.incrementAndGet()}}.joinAll()log("volatile count = ${count.get()}")}//使用Mutex解决并发问题runBlocking {var count = 0var mutex = Mutex()List(10000) {//GlobalScope是线程不安全的GlobalScope.launch {mutex.withLock {count++}}}.joinAll()log("Mutex count = $count")}//使用Semaphore解决并发问题runBlocking {var count = 0var semaphore = Semaphore(1)List(10000) {//GlobalScope是线程不安全的GlobalScope.launch {semaphore.withPermit {count++}}}.joinAll()log("Semaphore count = $count")}}

打印:

com.z.zjetpack V/zx: 默认count = 9991
com.z.zjetpack V/zx: volatile count = 10000
com.z.zjetpack V/zx: Mutex count = 10000
com.z.zjetpack V/zx: Semaphore count = 10000

除了使用这些工具解决并发问题,也可以避免访问外部可变状态,编写函数时,要求它不得访问外部状态,只能基于入参做运算,通过返回值提供运算结果。

           runBlocking {var count = 0//count在协程外面不存在并发问题val result = count + List(10000){GlobalScope.async { 1 }}.map {it.await()}.sum()log("count count = $result")}

Kotlin 之 协程(四)协程并发相关推荐

  1. Kotlin实战指南十四:协程启动模式

    转载请标明出处:https://blog.csdn.net/zhaoyanjun6/article/details/96008400 本文出自[赵彦军的博客] 文章目录 协程启动 DEFAULT LA ...

  2. 王学岗Kotlin协程(四)————Flow异步流

    参考文章 异步返回值的多个方案 1,什么时候用flow呢?----kotlin要表示多个值 如何表示多个值?挂起函数可以异步返回单个值,但是如果要异步返回多个计算好的值, 就只能用flow了.其它方案 ...

  3. Kotlin实战指南十三:协程

    转载请标明出处:https://blog.csdn.net/zhaoyanjun6/article/details/95626034 本文出自[赵彦军的博客] 文章目录 前言-协程介绍 主流语言对协程 ...

  4. pdf 深入理解kotlin协程_协程初探

    Hello,各位朋友,小笨鸟我回来了! 近期学习了Kotlin协程相关的知识,感觉这块技术在项目中的可应用性很大,对项目的开发效率和维护成本有较大的提升.于是就考虑深入研究下相关概念和使用方式,并引入 ...

  5. Kotlin协程:协程的基础与使用

    一.协程概述 1.概念 协程是Coroutine的中文简称,co表示协同.协作,routine表示程序.协程可以理解为多个互相协作的程序.协程是轻量级的线程,它的轻量体现在启动和切换,协程的启动不需要 ...

  6. Kotlin学习笔记26 协程part6 协程与线程的关系 Dispatchers.Unconfined 协程调试 协程上下文切换 Job详解 父子协程的关系

    参考链接 示例来自bilibili Kotlin语言深入解析 张龙老师的视频 1 协程与线程的关系 import kotlinx.coroutines.* import java.util.concu ...

  7. 如何理解高并发中的协程?协程的实现和历史

    <Libco是一个C/C++协程库,在微信服务中广泛使用> <协程到底是什么?> <如何理解高并发中的协程?协程的实现和历史> 目录 普通的函数 从普通函数到协程 ...

  8. kotlin协程_Kotlin协程

    kotlin协程 In this tutorial, we'll be looking into Kotlin Coroutines. Coroutines is a vital concept si ...

  9. python协程实现一万并发_python进阶:服务端实现并发的八种方式

    [本文导读]文中有许多不妥之处,敬请批评指正!python编写的服务端,有八种实现并发的方式,如阻塞(对等)套接字实现并发.非阻塞套接字实现并发.epoll实现并发.多进程实现并发.多线程实现并发.进 ...

  10. 【Kotlin 协程】协程简介 ( 协程概念 | 协程作用 | 创建 Android 工程并进行协程相关配置开发 | 异步任务与协程对比 )

    文章目录 一.协程概念 二.协程作用 三.创建 Android 工程并进行协程相关配置 1.创建 Android 工程 2.配置协程环境 3.布局文件 4.异步任务代码示例 5.协程代码示例 6.完整 ...

最新文章

  1. JAVA泛型知识(一)
  2. Linux静默安装oracle
  3. 转:Oracle物理文件
  4. Java多线程学习十三:synchronized 和 Lock 区别以及孰优孰劣,如何选择?
  5. android 4 高级编程 第一章摘
  6. 团队-科学计算器-代码设计规范
  7. 电脑W7系统怎样安装鸿蒙系统,家用电脑升级win7系统的操作方法
  8. 全国三级城市联动 js版
  9. 短视频拍摄脚本怎么写
  10. 应用之星破除行业门槛 零成本开发手机应用
  11. 腾讯互娱推出 PGOS 提供 Serverless 游戏上云
  12. STM32共阳数码管编程分享
  13. gcc环境配置时遇到的问题
  14. 排队论和对策论(博弈论)
  15. 以下关于python二维数据的描述中错误的是_关于二维数据CSV存储问题,以下选项中描述错误的是‪‪‪‪‪‪‫‪‪‪‪‪‫‪‪‪‪‪‪‪‪‪‪‪‪‪‪‪‪‪‪‪‪‪‪‪‪‪‪‫:...
  16. java jtable 复选框_java swing如何在JTable一个单元格添加多个复选框
  17. BLE协议栈 – SM
  18. 基于stm32单片机的模拟IIC时序(附源码)
  19. 利用“顺丰速运”下发GuLoader恶意软件的风险分析
  20. 矮子当中的巨人,让你在面试者中脱颖而出

热门文章

  1. aac蓝牙编解码协议_「干货」蓝牙耳机编码格式(SBC、AAC、aptX)都有啥区别?...
  2. 荣耀手机和小米打出了真火,针锋相对比拼千元机
  3. 使用springboot写一个记事小账本
  4. 广东技术师范大学计算机科学与技术期末考试,广东技术师范大学计算机科学与技术专业(留学生)本科人才培养方案...
  5. win10打开蓝牙_学会了这些win10快捷键,可以极大的提高你的工作效率
  6. Allegro添加Logo方法
  7. 解决把一篇word文档复制到另一篇word文档时, 更改标题格式
  8. 中蜂药花蜜记载于《神农本草经》
  9. 江苏法院基本解决执行难 设立全国首家环境资源法庭
  10. SteamVR 2.x 手柄使用3D物体(14)