https://segmentfault.com/a/1190000018193161

Why Pool

go自从出生就身带“高并发”的标签,其并发编程就是由groutine实现的,因其消耗资源低,性能高效,开发成本低的特性而被广泛应用到各种场景,例如服务端开发中使用的HTTP服务,在golang net/http包中,每一个被监听到的tcp链接都是由一个groutine去完成处理其上下文的,由此使得其拥有极其优秀的并发量吞吐量

for {// 监听tcprw, e := l.Accept()if e != nil {.......}tempDelay = 0c := srv.newConn(rw)c.setState(c.rwc, StateNew) // before Serve can return// 启动协程处理上下文go c.serve(ctx)
}

虽然创建一个groutine占用的内存极小(大约2KB左右,线程通常2M左右),但是在实际生产环境无限制的开启协程显然是不科学的,比如上图的逻辑,如果来几千万个请求就会开启几千万个groutine,当没有更多内存可用时,go的调度器就会阻塞groutine最终导致内存溢出乃至严重的崩溃,所以本文将通过实现一个简单的协程池,以及剖析几个开源的协程池源码来探讨一下对groutine的并发控制以及多路复用的设计和实现。

一个简单的协程池

过年前做过一波小需求,是将主播管理系统中信息不完整的主播找出来然后再到其相对应的直播平台爬取完整信息并补全,当时考虑到每一个主播的数据都要访问一次直播平台所以就用应对每一个主播开启一个groutine去抓取数据,虽然这个业务量还远远远远达不到能造成groutine性能瓶颈的地步,但是心里总是不舒服,于是放假回来后将其优化成从协程池中控制groutine数量再开启爬虫进行数据抓取。思路其实非常简单,用一个channel当做任务队列,初始化groutine池时确定好并发量,然后以设置好的并发量开启groutine同时读取channel中的任务并执行, 模型如下图

实现

type SimplePool struct {wg   sync.WaitGroupwork chan func() //任务队列
}func NewSimplePoll(workers int) *SimplePool {p := &SimplePool{wg:   sync.WaitGroup{},work: make(chan func()),}p.wg.Add(workers)//根据指定的并发量去读取管道并执行for i := 0; i < workers; i++ {go func() {defer func() {// 捕获异常 防止waitGroup阻塞if err := recover(); err != nil {fmt.Println(err)p.wg.Done()}}()// 从workChannel中取出任务执行for fn := range p.work {fn()}p.wg.Done()}()}return p
}
// 添加任务
func (p *SimplePool) Add(fn func()) {p.work <- fn
}// 执行
func (p *SimplePool) Run() {close(p.work)p.wg.Wait()
}

测试

测试设定为在并发数量为20的协程池中并发抓取一百个人的信息, 因为代码包含较多业务逻辑所以sleep 1秒模拟爬虫过程,理论上执行时间为5秒

func TestSimplePool(t *testing.T) {p := NewSimplePoll(20)for i := 0; i < 100; i++ {p.Add(parseTask(i))}p.Run()
}func parseTask(i int) func() {return func() {// 模拟抓取数据的过程time.Sleep(time.Second * 1)fmt.Println("finish parse ", i)}
}

这样一来最简单的一个groutine池就完成了

go-playground/pool

上面的groutine池虽然简单,但是对于每一个并发任务的状态,pool的状态缺少控制,所以又去看了一下go-playground/pool的源码实现,先从每一个需要执行的任务入手,该库中对并发单元做了如下的结构体,可以看到除工作单元的值,错误,执行函数等,还用了三个分别表示,取消,取消中,写 的三个并发安全的原子操作值来标识其运行状态。

// 需要加入pool 中执行的任务
type WorkFunc func(wu WorkUnit) (interface{}, error)// 工作单元
type workUnit struct {value      interface{}    // 任务结果 err        error          // 任务的报错done       chan struct{}  // 通知任务完成fn         WorkFunc    cancelled  atomic.Value   // 任务是否被取消cancelling atomic.Value   // 是否正在取消任务writing    atomic.Value   // 任务是否正在执行
}

接下来看Pool的结构

type limitedPool struct {workers uint            // 并发量 work    chan *workUnit  // 任务channelcancel  chan struct{}   // 用于通知结束的channelclosed  bool            // 是否关闭m       sync.RWMutex    // 读写锁,主要用来保证 closed值的并发安全
}

初始化groutine池, 以及启动设定好数量的groutine

// 初始化pool,设定并发量
func NewLimited(workers uint) Pool {if workers == 0 {panic("invalid workers '0'")}p := &limitedPool{workers: workers,}p.initialize()return p
}func (p *limitedPool) initialize() {p.work = make(chan *workUnit, p.workers*2)p.cancel = make(chan struct{})p.closed = falsefor i := 0; i < int(p.workers); i++ {// 初始化并发单元p.newWorker(p.work, p.cancel)}
}// passing work and cancel channels to newWorker() to avoid any potential race condition
// betweeen p.work read & write
func (p *limitedPool) newWorker(work chan *workUnit, cancel chan struct{}) {go func(p *limitedPool) {var wu *workUnitdefer func(p *limitedPool) {// 捕获异常,结束掉异常的工作单元,并将其再次作为新的任务启动if err := recover(); err != nil {trace := make([]byte, 1<<16)n := runtime.Stack(trace, true)s := fmt.Sprintf(errRecovery, err, string(trace[:int(math.Min(float64(n), float64(7000)))]))iwu := wuiwu.err = &ErrRecovery{s: s}close(iwu.done)// need to fire up new worker to replace this one as this one is exitingp.newWorker(p.work, p.cancel)}}(p)var value interface{}var err errorfor {select {// workChannel中读取任务case wu = <-work:// 防止channel 被关闭后读取到零值if wu == nil {continue}// 先判断任务是否被取消if wu.cancelled.Load() == nil {// 执行任务value, err = wu.fn(wu)wu.writing.Store(struct{}{})// 任务执行完在写入结果时需要再次检查工作单元是否被取消,防止产生竞争条件if wu.cancelled.Load() == nil && wu.cancelling.Load() == nil {wu.value, wu.err = value, errclose(wu.done)}}// pool是否被停止case <-cancel:return}}}(p)
}

往POOL中添加任务,并检查pool是否关闭

func (p *limitedPool) Queue(fn WorkFunc) WorkUnit {w := &workUnit{done: make(chan struct{}),fn:   fn,}go func() {p.m.RLock()if p.closed {w.err = &ErrPoolClosed{s: errClosed}if w.cancelled.Load() == nil {close(w.done)}p.m.RUnlock()return}// 将工作单元写入workChannel, pool启动后将由上面newWorker函数中读取执行p.work <- wp.m.RUnlock()}()return w
}

在go-playground/pool包中, limitedPool的批量并发执行还需要借助batch.go来完成

// batch contains all information for a batch run of WorkUnits
type batch struct {pool    Pool          // 上面的limitedPool实现了Pool interfacem       sync.Mutex    // 互斥锁,用来判断closedunits   []WorkUnit    // 工作单元的slice, 这个主要用在不设并发限制的场景,这里忽略results chan WorkUnit // 结果集,执行完后的workUnit会更新其value,error,可以从结果集channel中读取done    chan struct{} // 通知batch是否完成closed  boolwg      *sync.WaitGroup
}
//  go-playground/pool 中有设置并发量和不设并发量的批量任务,都实现Pool interface,初始化batch批量任务时会将之前创建好的Pool传入newBatch
func newBatch(p Pool) Batch {return &batch{pool:    p,units:   make([]WorkUnit, 0, 4), // capacity it to 4 so it doesn't grow and allocate too many times.results: make(chan WorkUnit),done:    make(chan struct{}),wg:      new(sync.WaitGroup),}
}// 往批量任务中添加workFunc任务
func (b *batch) Queue(fn WorkFunc) {b.m.Lock()if b.closed {b.m.Unlock()return}//往上述的limitPool中添加workFuncwu := b.pool.Queue(fn)b.units = append(b.units, wu) // keeping a reference for cancellation purposesb.wg.Add(1)b.m.Unlock()// 执行完后将workUnit写入结果集channelgo func(b *batch, wu WorkUnit) {wu.Wait()b.results <- wub.wg.Done()}(b, wu)
}// 通知批量任务不再接受新的workFunc, 如果添加完workFunc不执行改方法的话将导致取结果集时done channel一直阻塞
func (b *batch) QueueComplete() {b.m.Lock()b.closed = trueclose(b.done)b.m.Unlock()
}// 获取批量任务结果集
func (b *batch) Results() <-chan WorkUnit {go func(b *batch) {<-b.doneb.m.Lock()b.wg.Wait()b.m.Unlock()close(b.results)}(b)return b.results
}

测试

func SendMail(int int) pool.WorkFunc {fn := func(wu pool.WorkUnit) (interface{}, error) {// sleep 1s 模拟发邮件过程time.Sleep(time.Second * 1)// 模拟异常任务需要取消if int == 17 {wu.Cancel()}if wu.IsCancelled() {return false, nil}fmt.Println("send to", int)return true, nil}return fn
}func TestBatchWork(t *testing.T) {// 初始化groutine数量为20的poolp := pool.NewLimited(20)defer p.Close()batch := p.Batch()// 设置一个批量任务的过期超时时间t := time.After(10 * time.Second)go func() {for i := 0; i < 100; i++ {batch.Queue(SendMail(i))}batch.QueueComplete()}()// 因为 batch.Results 中要close results channel 所以不能将其放在LOOP中执行r := batch.Results()
LOOP:for {select {case <-t:// 登台超时通知fmt.Println("recived timeout")break LOOPcase email, ok := <-r:// 读取结果集if ok {if err := email.Error(); err != nil {fmt.Println("err", err.Error())}fmt.Println(email.Value())} else {fmt.Println("finish")break LOOP}}}
}



接近理论值5s, 通知模拟被取消的work也正常取消

go-playground/pool在比起之前简单的协程池的基础上, 对pool, worker的状态有了很好的管理。但是,但是问题来了,在第一个实现的简单groutine池和go-playground/pool中,都是先启动预定好的groutine来完成任务执行,在并发量远小于任务量的情况下确实能够做到groutine的复用,如果任务量不多则会导致任务分配到每个groutine不均匀,甚至可能出现启动的groutine根本不会执行任务从而导致浪费,而且对于协程池也没有动态的扩容和缩小。所以我又去看了一下ants的设计和实现。

ants

ants是一个受fasthttp启发的高性能协程池, fasthttp号称是比go原生的net/http快10倍,其快速高性能的原因之一就是采用了各种池化技术(这个日后再开新坑去读源码), ants相比之前两种协程池,其模型更像是之前接触到的数据库连接池,需要从空余的worker中取出一个来执行任务, 当无可用空余worker的时候再去创建,而当pool的容量达到上线之后,剩余的任务阻塞等待当前进行中的worker执行完毕将worker放回pool, 直至pool中有空闲worker。 ants在内存的管理上做得很好,除了定期清除过期worker(一定时间内没有分配到任务的worker),ants还实现了一种适用于大批量相同任务的pool, 这种pool与一个需要大批量重复执行的函数锁绑定,避免了调用方不停的创建,更加节省内存。

先看一下ants的pool 结构体 (pool.go)

type Pool struct {// 协程池的容量 (groutine数量的上限)capacity int32// 正在执行中的groutinerunning int32// 过期清理间隔时间expiryDuration time.Duration// 当前可用空闲的groutineworkers []*Worker// 表示pool是否关闭release int32// lock for synchronous operation.lock sync.Mutex// 用于控制pool等待获取可用的groutinecond *sync.Cond// 确保pool只被关闭一次once sync.Once// worker临时对象池,在复用worker时减少新对象的创建并加速worker从pool中的获取速度workerCache sync.Pool// pool引发panic时的执行函数PanicHandler func(interface{})
}

接下来看pool的工作单元 worker (worker.go)

type Worker struct {// worker 所属的poo;pool *Pool// 任务队列task chan func()// 回收时间,即该worker的最后一次结束运行的时间recycleTime time.Time
}

执行worker的代码 (worker.go)

func (w *Worker) run() {// pool中正在执行的worker数+1w.pool.incRunning()go func() {defer func() {if p := recover(); p != nil {//若worker因各种问题引发panic, //pool中正在执行的worker数 -1,         //如果设置了Pool中的PanicHandler,此时会被调用w.pool.decRunning()if w.pool.PanicHandler != nil {w.pool.PanicHandler(p)} else {log.Printf("worker exits from a panic: %v", p)}}}()// worker 执行任务队列for f := range w.task {//任务队列中的函数全部被执行完后,//pool中正在执行的worker数 -1, //将worker 放回对象池if f == nil {w.pool.decRunning()w.pool.workerCache.Put(w)return}f()//worker 执行完任务后放回Pool //使得其余正在阻塞的任务可以获取workerw.pool.revertWorker(w)}}()
}

了解了工作单元worker如何执行任务以及与pool交互后,回到pool中查看其实现, pool的核心就是取出可用worker提供给任务执行 (pool.go)

// 向pool提交任务
func (p *Pool) Submit(task func()) error {if 1 == atomic.LoadInt32(&p.release) {return ErrPoolClosed}// 获取pool中的可用worker并向其任务队列中写入任务p.retrieveWorker().task <- taskreturn nil
}// **核心代码** 获取可用worker
func (p *Pool) retrieveWorker() *Worker {var w *Workerp.lock.Lock()idleWorkers := p.workersn := len(idleWorkers) - 1// 当前pool中有可用worker, 取出(队尾)worker并执行if n >= 0 {w = idleWorkers[n]idleWorkers[n] = nilp.workers = idleWorkers[:n]p.lock.Unlock()} else if p.Running() < p.Cap() {p.lock.Unlock()// 当前pool中无空闲worker,且pool数量未达到上线// pool会先从临时对象池中寻找是否有已完成任务的worker,// 若临时对象池中不存在,则重新创建一个worker并将其启动if cacheWorker := p.workerCache.Get(); cacheWorker != nil {w = cacheWorker.(*Worker)} else {w = &Worker{pool: p,task: make(chan func(), workerChanCap),}}w.run()} else {// pool中没有空余worker且达到并发上限// 任务会阻塞等待当前运行的worker完成任务释放会poolfor {p.cond.Wait() // 等待通知, 暂时阻塞l := len(p.workers) - 1if l < 0 {continue}// 当有可用worker释放回pool之后, 取出w = p.workers[l]p.workers[l] = nilp.workers = p.workers[:l]break}p.lock.Unlock()}return w
}// 释放worker回pool
func (p *Pool) revertWorker(worker *Worker) {worker.recycleTime = time.Now()p.lock.Lock()p.workers = append(p.workers, worker)// 通知pool中已经获取锁的groutine, 有一个worker已完成任务p.cond.Signal()p.lock.Unlock()
}

在批量并发任务的执行过程中, 如果有超过5纳秒(ants中默认worker过期时间为5ns)的worker未被分配新的任务,则将其作为过期worker清理掉,从而保证pool中可用的worker都能发挥出最大的作用以及将任务分配得更均匀
(pool.go)

// 该函数会在pool初始化后在协程中启动
func (p *Pool) periodicallyPurge() {// 创建一个5ns定时的心跳heartbeat := time.NewTicker(p.expiryDuration)defer heartbeat.Stop()for range heartbeat.C {currentTime := time.Now()p.lock.Lock()idleWorkers := p.workersif len(idleWorkers) == 0 && p.Running() == 0 && atomic.LoadInt32(&p.release) == 1 {p.lock.Unlock()return}n := -1for i, w := range idleWorkers {// 因为pool 的worker队列是先进后出的,所以正序遍历可用worker时前面的往往里当前时间越久if currentTime.Sub(w.recycleTime) <= p.expiryDuration {break}    // 如果worker最后一次运行时间距现在超过5纳秒,视为过期,worker收到nil, 执行上述worker.go中 if n == nil 的操作n = iw.task <- nilidleWorkers[i] = nil}if n > -1 {// 全部过期if n >= len(idleWorkers)-1 {p.workers = idleWorkers[:0]} else {// 部分过期p.workers = idleWorkers[n+1:]}}p.lock.Unlock()}
}

测试

func TestAnts(t *testing.T) {wg := sync.WaitGroup{}pool, _ := ants.NewPool(20)defer pool.Release()for i := 0; i < 100; i++ {wg.Add(1)pool.Submit(sendMail(i, &wg))}wg.Wait()
}func sendMail(i int, wg *sync.WaitGroup) func() {return func() {time.Sleep(time.Second * 1)fmt.Println("send mail to ", i)wg.Done()}
}


这里虽只简单的测试批量并发任务的场景, 如果大家有兴趣可以去看看ants的压力测试, ants的吞吐量能够比原生groutine高出N倍,内存节省10到20倍, 可谓是协程池中的神器。

借用ants作者的原话来说:
然而又有多少场景是单台机器需要扛100w甚至1000w同步任务的?基本没有啊!结果就是造出了屠龙刀,可是世界上没有龙啊!也是无情…

Over

一口气从简单到复杂总结了三个协程池的实现,受益匪浅, 感谢各开源库的作者, 虽然世界上没有龙,但是屠龙技是必须练的,因为它就像存款,不一定要全部都用了,但是一定不能没有!

golang协程池设计相关推荐

  1. 深入浅出 Golang 协程池设计

    使用Go语言实现并发的协程调度池阉割版,本文主要介绍协程池的基本设计思路,目的为深入浅出快速了解协程池工作原理,与真实的企业协程池还有很大差距,本文仅供学习参考. 一.何为并发,Go又是如何实现并发? ...

  2. Golang的协程池设计

    转载地址:https://studygolang.com/articles/15477 使用Go语言实现并发的协程调度池阉割版,本文主要介绍协程池的基本设计思路,目的为深入浅出快速了解协程池工作原理, ...

  3. 白话 Golang 协程池

    文章目录 1.何为并发 2.并发的好处 3.Go 如何并发 4.G-P-M 调度模型 5.Go 程的代价 6.协程池的作用 7.简易协程池的设计&实现 8.开源协程池的使用 9.小结 参考文献 ...

  4. Go协程池设计思路(Task-Job-Worker)

    1. 铺垫:Go 的接收器Receiver 在go语言中,没有类的概念,但是可以给类型(结构体,自定义类型)定义方法.所谓方法就是定义了接受者的函数.接受者定义在func关键字和函数名之间.可以理解成 ...

  5. Golang - 协程池 ants.NewPoolWithFunc使用介绍

    前言 ants是一个高性能的协程池,实现了对大规模goroutine的调度管理.goroutine复用,允许使用者在开发并发程序的时候限制协程数量,复用资源,达到更高效执行任务的效果. 提示:以下是本 ...

  6. Golang并发模型:轻松入门协程池

    goroutine是非常轻量的,不会暂用太多资源,基本上有多少任务,我们可以开多少goroutine去处理.但有时候,我们还是想控制一下. 比如,我们有A.B两类工作,不想把太多资源花费在B类务上,而 ...

  7. golang 数组 最后一个_面试必问:Golang高阶Golang协程实现原理

    1 01 引言 实现并发编程有进程,线程,IO多路复用的方式.(并发和并行我们这里不区分,如果CPU是多核的,可能在多个核同时进行,我们叫并行,如果是单核,需要排队切换,我们叫并发). 1.1 进程和 ...

  8. 一期每日一GO群分享-flag、viper、协程池、异常处理

    1.11 flag库 今天介绍一个库flag,命令行程序常用,用来接受参数的. var (intflag intboolflag boolstringflag string )func init() ...

  9. GoLang协程与通道---下

    GoLang协程与通道---下 新旧模型对比:任务和worker 惰性生成器的实现 实现 Futures 模式 复用 典型的客户端/服务器(C/S)模式 卸载(Teardown):通过信号通道关闭服务 ...

最新文章

  1. 好文分享:我是如何在求职中把自己“推销”出去的
  2. BZOJ3233【AHOI2013】找硬币
  3. 联想超融合平台oracle,联想AIO超融合云一体机解决方案.pdf
  4. 斯皮尔 皮尔森 肯德尔_一起来学应用统计学(全部)(二)持续更新
  5. JEECG(J2EE Code Generation) 基于代码生成器J2EE智能开发框架 杂记:发布新版本 JEECG_v2.0
  6. mysql项目数据库文档_项目mysql数据库
  7. Spring-Logback-动态修改日志级别
  8. 排序算法_总结与复习
  9. 如何提取左声道音频_TRS? TRRS? 正式录制前,您确保麦克风的音频线插对了吗?...
  10. 3.第一个HelloMaven/快速入门
  11. 关于CityEngine导入shp数据
  12. WINDOWS下内存泄漏检测工具VLD(Visual Leak Detector)的使用
  13. 计算机算法分析与设计心得体会,算法设计与分析课程的心得体会
  14. WEB前端经典笔试/面试题
  15. 关于MATLAB命令窗口(command window)清理的相关设置
  16. 远离僵尸网络的14种方法
  17. 保命小诀窍:IDEA远程Debug技巧,你了解吗?
  18. word制作员工手册教学
  19. 【Redis】练习题
  20. js将汉字转为相应的拼音

热门文章

  1. 麦轮平台的速度分解与合成
  2. 2019重庆对口高职计算机类分数排名,重庆2019高职分类考试分数线公布
  3. opencv 骨架提取_抗爆墙方盛提取车间抗爆墙记录@温州贴吧
  4. JSP的基础语法和指令(源码刨析,建议收藏)
  5. 在Angular中添加第三方库jQuery、bootstrap
  6. 中如何设置电气栅格_游戏中的设置界面如何设计?
  7. 3d立体相册特效html网页代码_网页设计程序设计的必备知识点
  8. js - 预加载+监听图片资源加载制作进度条
  9. MVC Remote属性验证
  10. 二.hadoop环境搭建