文章目录

  • 导言
  • 有缓冲通道、线程池
    • 有缓冲通道是什么?
    • 例子
    • 另一个例子
    • 死锁
    • 容量 vs 长度
    • WaitGroup
    • 实现协程池
      • 1. 创建数据结构
      • 2. 创建相关函数
        • 1. `digits`函数
        • 2. `worker`函数
        • 3. `createWorkerPool` 函数
        • 5. `allocate`函数
        • 6. `result`函数
        • 7. `main`函数
      • 3. 最终程序
  • 原作者留言
  • 最后

导言

  • 原文链接: Part 23: Buffered Channels and Worker Pools
  • If translation is not allowed, please leave me in the comment area and I will delete it as soon as possible.

有缓冲通道、线程池

有缓冲通道是什么?

到目前为止,我们谈论的通道都是无缓冲通道。正如 Go语言 通道 所说,无缓冲通道的读写操作都会导致阻塞。

其实,我们可以创建一个有缓冲的通道。当该通道满时,写入操作才会阻塞。同样地,当该通道空时,读出操作才会阻塞。

通过为 make函数 传递一个额外的参数 — 指明缓冲容量的大小,我们可以创建一个有缓冲通道。

ch := make(chan type, capacity)

在上面的代码中,为了使通道拥有缓冲,capacity 应该大于 0。无缓冲通道的容量为 0,因此,在之前的 教程,创建无缓冲通道时,我们省略了 capacity参数。

来创建一个有缓冲通道吧~

例子

package mainimport (  "fmt"
)func main() {  ch := make(chan string, 2)ch <- "naveen"ch <- "paul"fmt.Println(<- ch)fmt.Println(<- ch)
}

在上面程序的第 9 行,我们创建了一个缓冲为 2 的通道。因为通道的容量为 2,所以向它写入 2 个字符串时,并不会产生阻塞。
在第 1011 行,我们写入了 2 个字符串。在 1213 行,它们被读取出来。

程序输出如下:

naveen
paul

另一个例子

接下来再来看个例子,在这个例子中,有 1 个协程向通道写入数据,而 main协程 从该通道读取数据。这个例子有助于我们理解:有缓冲通道的会在什么时候阻塞。

package mainimport (  "fmt""time"
)func write(ch chan int) {  for i := 0; i < 5; i++ {ch <- ifmt.Println("successfully wrote", i, "to ch")}close(ch)
}
func main() {  ch := make(chan int, 2)go write(ch)time.Sleep(2 * time.Second)for v := range ch {fmt.Println("read value", v,"from ch")time.Sleep(2 * time.Second)}
}

在上面程序的第 16 行,我们创建了一个容量为 2 的 有缓冲通道ch。在第 17 行,我们将它传给 write协程。之后 main协程 会休眠 2 秒,在此期间,write协程 会并发地运行。write协程将 04 写入 通道ch。因为通道 ch 的容量为 2,所以在 write协程 写入 01 后,它会进入阻塞状态,直到 ch 的某个数据被 main协程 读取。所以,程序会最先输出:

successfully wrote 0 to ch
successfully wrote 1 to ch

在打印了上面 2 句话后,write协程 进入阻塞状态,直到 main协程 从 通道ch 中读出数据。由于 main协程 在读取之前会休眠 2 秒,所以在此期间,程序没有任何输出。2 秒后,main协程苏醒,并使用 for range循环,从 通道ch 中读取并打印数据,随后又休眠 2 秒。for range循环 会一直执行,直到 通道ch 关闭且数据全被取出。

所以,在休眠 2 秒后,程序会输出:

read value 0 from ch
successfully wrote 2 to ch

在所有数据都写入了 通道ch 后,通道chwrite协程 关闭。

最终输出为:

successfully wrote 0 to ch
successfully wrote 1 to ch
read value 0 from ch
successfully wrote 2 to ch
read value 1 from ch
successfully wrote 3 to ch
read value 2 from ch
successfully wrote 4 to ch
read value 3 from ch
read value 4 from ch

死锁

package mainimport (  "fmt"
)func main() {  ch := make(chan string, 2)ch <- "naveen"ch <- "paul"ch <- "steve"fmt.Println(<-ch)fmt.Println(<-ch)
}

在上面的程序中,我们向容量为 2 的通道,写入 3 个字符串。在第 3 次写入时,main协程 会进入阻塞状态,因为通道已经没有剩余空间了。此时,为了使写入操作可以进行,其他协程必须从该通道中读取数据,然而并没有。因此,这个程序会出现死锁,它会在运行时奔溃,输出以下信息:

fatal error: all goroutines are asleep - deadlock!goroutine 1 [chan send]:
main.main()  /tmp/sandbox274756028/main.go:11 +0x100

容量 vs 长度

有缓冲通道的容量,就是 它能容纳的元素个数。在使用 make函数 创建有缓冲通道时,我们可以指定这个值。

而有缓冲通道的长度,是指 通道内当前的元素个数

通过代码说说这是啥意思:

package mainimport (  "fmt"
)func main() {  ch := make(chan string, 3)ch <- "naveen"ch <- "paul"fmt.Println("capacity is", cap(ch))fmt.Println("length is", len(ch))fmt.Println("read value", <-ch)fmt.Println("new length is", len(ch))
}

在上面的程序中,我们创建了一个容量为 3 的有缓冲通道,即它能容纳 3 个元素。之后,我们向其写入 2 个字符串。
此时,通道内有 2 个元素在排队,因此它的长度是 2。在第 13 行,我们读取出 1 个字符串。此时,通道内只剩下 1 个元素,因此它的长度是 1

程序输出如下:

capacity is 3
length is 2
read value naveen
new length is 1

WaitGroup

在下一节,我们会讨论协程池。为了理解协程池,我们首先需要知道 WaitGroup,因为实现协程池,需要用到 WaitGroup

WaitGroup 能被用于等待一些协程完成工作。

为了说明 WaitGroup 有什么用,这里我们举个例子:
假设现在,我们有 3 个协程正在并发运行 (它们由 main协程 创建),且在这 3 个协程完成工作前,main协程 不能终止。

使用 WaitGroup,我们能轻易实现。

不哔哔了,我们直接写代码吧~

package mainimport (  "fmt""sync""time"
)func process(i int, wg *sync.WaitGroup) {  fmt.Println("started Goroutine ", i)time.Sleep(2 * time.Second)fmt.Printf("Goroutine %d ended\n", i)wg.Done()
}func main() {  no := 3var wg sync.WaitGroupfor i := 0; i < no; i++ {wg.Add(1)go process(i, &wg)}wg.Wait()fmt.Println("All go routines finished executing")
}

WaitGroup 是一个结构体类型,内含 1 个计数器。

  • 调用 Add方法 时,WaitGroup 的计数器会进行增加相应的值。
  • 调用 Done方法 时,WaitGroup 的计数器会减 1
  • 调用 Wait方法 时,对应的协程会进入阻塞状态,WaitGroup 的计数器变为 0

在第 18 行,我们创建了 1 个类型为 WaitGroup 的变量。
在第 20 行,在 for循环 内,我们调用 3 次了 wg.Add(1),此时计数器变为了 3, 也产生了 3 个协程。
在第 23 行,main协程 调用了 wg.Wait方法,进入阻塞状态,直到计数器变为 0
在第 13 行,process协程 会通过调用 wg.Done,减少计数器的数值。一旦 3 个协程完成了工作 (这也意味着 wg.Done 被调用了 3 次),计数器就会变为 0,于是 main函数 会退出阻塞状态。
在第 21 行,我们将 wg 的地址传给了 process函数,这是必要的。如果我们采用值传递,此时每个协程拥有的只是 main函数WaitGroup 的拷贝。这意味着:当 3 个协程完成工作,main协程 并不会得到通知。

程序输出如下:

started Goroutine  2
started Goroutine  0
started Goroutine  1
Goroutine 0 ended
Goroutine 2 ended
Goroutine 1 ended
All go routines finished executing

你的输出可能与我的不同,因为协程执行顺序是随机的。

实现协程池

有缓冲通道的一个重要应用,就是实现协程池。

一般来说, 协程池就是容纳协程的池子。该池子内的协程会等待工作分配,而且一旦协程完成工作,它会进入池子,等待下一个工作。

我们将用有缓冲通道实现协程池。我们的协程池将会执行如下的任务:获得输入整数的数位和。

举个例子,如果输入是 234,那输出应该为 9 (2 + 3 + 4)。协程池的输入是一些伪随机数。

下面是我们协程池的核心功能:

  • 创建一些协程,它们会监听输入通道,等待工作分配。
  • 为输入通道添加工作。
  • 协程工作完成后,将结果写入输出通道。
  • 从输出通道读取并打印结果。

为了更容易理解,我将一步一步的完成这个程序。

1. 创建数据结构

我们先创建一些数据结构,用来代表工作和工作结果。

type Job struct {  id       intrandomno int
}
type Result struct {  job         Jobsumofdigits int
}

Job结构体 拥有 idrandomno 字段,我们的任务就是计算 randomno 的数位和。
Result结构体 拥有 jobsumofdigits 字段,sumofdigits 会存储 job 的工作结果 (数位和)。

接下来,我们来创建 接收作业、接收输出结果 的有缓冲通道。

var jobs = make(chan Job, 10)           // 接收工作的通道
var results = make(chan Result, 10)    // 接收输出结果的通道

工作协程将从 jobs通道 中接收作业。一旦完成,该作业的工作结果会被写入 results通道。

2. 创建相关函数

1. digits函数

下面的 digits 函数,它能计算数位和并返回。
在该函数内部,我们添加了 2 秒的休眠时间,这主要是为了模拟实际情况 — 计算数位和需要花费一定的时间。

func digits(number int) int {  sum := 0no := numberfor no != 0 {digit := no % 10sum += digitno /= 10}time.Sleep(2 * time.Second)return sum
}
2. worker函数

接下来,我们来写一个创建工作协程的函数。

func worker(wg *sync.WaitGroup) {  for job := range jobs {output := Result{job, digits(job.randomno)}results <- output}wg.Done()
}

上面的函数能执行下面的操作:

  1. jobs通道 中读取作业。
  2. 计算 job.randomno 的数位和,创建 Result结构体。
  3. 将结果写入 results通道。
  4. 在所有 jobs 完成后,它会调用 wg.Done
3. createWorkerPool 函数

通过 createWorkPooler函数,我们能创建一个协程池。

func createWorkerPool(noOfWorkers int) {  var wg sync.WaitGroupfor i := 0; i < noOfWorkers; i++ {wg.Add(1)go worker(&wg)}wg.Wait()close(results)
}

该函数会执行如下操作:

  1. 在创建协程前,它会调用 wg.Add(1) 增加 WaitGroup 的计数器。
  2. 它创建了一个 worker协程。在协程创建完毕后,它使用 wg.Wait(),等待所有协程完成工作。
  3. 在所有协程完成工作后,它关闭了 results通道。
5. allocate函数

allocate函数 能给协程们分配工作。

func allocate(noOfJobs int) {  for i := 0; i < noOfJobs; i++ {randomno := rand.Intn(999)job := Job{i, randomno}jobs <- job}close(jobs)
}

该函数会执行如下操作:

  1. 产生最大值为 998 的 随机数randomno
  2. 使用 i 和 随机数randomno 创建 job,将其写入 jobs 通道
  3. 在工作分配完毕后,关闭 jobs 通道。
6. result函数

result函数 能从 results通道 中读取数据并输出。

func result(done chan bool) {  for result := range results {fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)}done <- true
}

该函数执行以下操作:

  1. results通道 读取数据,并打印 idrandomnosumofdigits
  2. 所有结果打印完毕后,向 done通道 写入 true,表示工作完成。
7. main函数
func main() {  startTime := time.Now()noOfJobs := 100go allocate(noOfJobs)done := make(chan bool)go result(done)noOfWorkers := 10createWorkerPool(noOfWorkers)<-doneendTime := time.Now()diff := endTime.Sub(startTime)fmt.Println("total time taken ", diff.Seconds(), "seconds")
}

备注:

  • 为了计算执行时间,我们添加了 startTimeendTime 变量。执行时间能作为基准测试的参考指标。
  • 为了防止 main协程 的过早退出,我们创建了一个 done通道。

3. 最终程序

以下就是整个程序了,你可以参考参考。

package mainimport (  "fmt""math/rand""sync""time"
)type Job struct {  id       intrandomno int
}
type Result struct {  job         Jobsumofdigits int
}var jobs = make(chan Job, 10)
var results = make(chan Result, 10)func digits(number int) int {  sum := 0no := numberfor no != 0 {digit := no % 10sum += digitno /= 10}time.Sleep(2 * time.Second)return sum
}
func worker(wg *sync.WaitGroup) {  for job := range jobs {output := Result{job, digits(job.randomno)}results <- output}wg.Done()
}
func createWorkerPool(noOfWorkers int) {  var wg sync.WaitGroupfor i := 0; i < noOfWorkers; i++ {wg.Add(1)go worker(&wg)}wg.Wait()close(results)
}
func allocate(noOfJobs int) {  for i := 0; i < noOfJobs; i++ {randomno := rand.Intn(999)job := Job{i, randomno}jobs <- job}close(jobs)
}
func result(done chan bool) {  for result := range results {fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)}done <- true
}
func main() {  startTime := time.Now()noOfJobs := 100go allocate(noOfJobs)done := make(chan bool)go result(done)noOfWorkers := 10createWorkerPool(noOfWorkers)<-doneendTime := time.Now()diff := endTime.Sub(startTime)fmt.Println("total time taken ", diff.Seconds(), "seconds")
}

为了更准确地计算执行时间,请在你的本机上运行这个程序。

程序将会有如下输出:

Job id 1, input random no 636, sum of digits 15
Job id 0, input random no 878, sum of digits 23
Job id 9, input random no 150, sum of digits 6
...
total time taken  20.01081009 seconds

由于有 100 个作业,所以这里会有 101 行输出,最后一行打印会执行时间。

你的输出可能和我不一样,因为协程运行顺序是随机的。由于硬件的缘故,你的耗费时长也可能与我不同。

现在,让我们将 main函数 中的 noOfWorkers 增加到 20。此时我们将拥有双倍的工作协程。

运行时,输出结果如下:

...
total time taken  10.004364685 seconds

现在我们可以知道:在一定范围内,随着工作协程的增加,完成任务所耗费的时间将减少。

这里,我要给你留个小练习:请你修改 main函数 中的 noOfJobsnoOfWorkers,观察输出结果。

这就是全部内容了~

祝你不脱发~

原作者留言

优质内容来之不易,您可以通过该 链接 为我捐赠。

最后

感谢原作者的优质内容。

欢迎指出文中的任何错误。

Go语言 有缓冲通道、协程池相关推荐

  1. go语言ants协程池

    参考链接:大数据Ants go ants学习 比如多文件处理,常用 package patternhandleimport ("fmt""sync"" ...

  2. golang-ants协程池使用和实现逻辑

       golang中goroutine由运行时管理,使用go关键字就可以方便快捷的创建一个goroutine,受限于服务器硬件内存大小,如果不对goroutine数量进行限制,会出现Out of Me ...

  3. 连接池和协程池为何能提升并发能力?

    你有没有发现,"内存池"和"进程池"都带有"池"字?其实,这两种技术都属于"池化技术".它通常是由系统预先分配一批资源并 ...

  4. Golang并发模型:轻松入门协程池

    goroutine是非常轻量的,不会暂用太多资源,基本上有多少任务,我们可以开多少goroutine去处理.但有时候,我们还是想控制一下. 比如,我们有A.B两类工作,不想把太多资源花费在B类务上,而 ...

  5. Golang的协程池设计

    转载地址:https://studygolang.com/articles/15477 使用Go语言实现并发的协程调度池阉割版,本文主要介绍协程池的基本设计思路,目的为深入浅出快速了解协程池工作原理, ...

  6. 白话 Golang 协程池

    文章目录 1.何为并发 2.并发的好处 3.Go 如何并发 4.G-P-M 调度模型 5.Go 程的代价 6.协程池的作用 7.简易协程池的设计&实现 8.开源协程池的使用 9.小结 参考文献 ...

  7. 深入浅出 Golang 协程池设计

    使用Go语言实现并发的协程调度池阉割版,本文主要介绍协程池的基本设计思路,目的为深入浅出快速了解协程池工作原理,与真实的企业协程池还有很大差距,本文仅供学习参考. 一.何为并发,Go又是如何实现并发? ...

  8. 一期每日一GO群分享-flag、viper、协程池、异常处理

    1.11 flag库 今天介绍一个库flag,命令行程序常用,用来接受参数的. var (intflag intboolflag boolstringflag string )func init() ...

  9. Python进程池,线程池,协程池

    线程池 import threading import time def myThread():for i in range(10):time.sleep()print('d') sep=thread ...

最新文章

  1. 基于深度神经网络的风电场超短期功率预测系统【数据故事计划最佳学术奖】...
  2. 分布式系统工程实现:GFSamp;Bigtable设计的优势,互联网营销
  3. Appium安装(Mac版)
  4. 隐藏画质代码_【和平精英】变成恐怖精英?玩家在墙上发现了一幅隐藏的壁画!...
  5. 成功解决AttributeError: ‘str‘ object has no attribute ‘decode‘
  6. 私有属性和方法-通过父类方法间接访问
  7. Java程序员必经的实践之路:微服务与SOA架构
  8. HDU2001 计算两点间的距离【入门】
  9. aspose.word给表格插入行或列
  10. 电脑壁纸桌面放计算机,电脑壁纸
  11. thinkpadt410接口介绍_联想t410配置参数详解
  12. Python制作动态桌面壁纸程序-摆脱付费-Mili_Wallpaper
  13. Convolutional Neural Networks on Graphs with Fast Localized Spectral Filtering
  14. 「滑板+EdgeBoard 竟能搭出AI质检流水线?」“软件杯”全国一等奖团队参赛心得...
  15. java 公式计算_java 实现的公式计算
  16. forEach、for…in、 for…of 的区别
  17. Silverlight 2.5D RPG游戏技巧与特效处理(Game Effects):目录
  18. jquery id选择器获取id值含有特殊字符的方法
  19. G - 相遇周期 HDU - 1713
  20. 安卓逆向so篇(一):so文件调用

热门文章

  1. GitHub 之 上传文件(一)
  2. 高老师的架构设计_隽语集(CC_1051)
  3. 迅雷链总工程师来鑫:区块链3.0需解决4大难题
  4. odoo:开源 ERP/CRM 入门与实践
  5. Flask框架之模板继承与案例05
  6. 【数据结构与算法】五、哈希表和链表
  7. java后台生成二维码以及页面显示二维码方式
  8. linux设置合上电脑,[转载]笔记本上装CentOS 7 设置合上盖子不休眠
  9. MTK MT6771处理器,helio P60芯片参考资料
  10. 免费常用快递查询API接口及快递在线下单API分享