引言

ants是一个高性能的 goroutine 池,实现了对大规模 goroutine 的调度管理、goroutine 复用,允许使用者在开发并发程序的时候限制 goroutine 数量,复用资源,达到更高效执行任务的效果。

本文将从头开始分析ants库是如何实现的,并在结尾给出性能测试结果.

ants库代码简洁,总代码量并不大,相信大家跟随此文可以较为轻松地了解如何设计一个简洁可用的goroutine池.


ants的功能(from 官方)

  • 自动调度海量的 goroutines,复用 goroutines
  • 定期清理过期的 goroutines,进一步节省资源
  • 提供了大量有用的接口:任务提交、获取运行中的 goroutine 数量、动态调整 Pool 大小、释放 Pool、重启 Pool
  • 优雅处理 panic,防止程序崩溃
  • 资源复用,极大节省内存使用量;在大规模批量并发任务场景下比原生 goroutine 并发具有更高的性能
  • 非阻塞机制

默认goroutine池

Ants会初始化一个默认的goroutine池,并且为其配备了各种函数.

var (//当处理器为单核,使用阻塞workerChan,多核则使用非阻塞workerChan//对于多核非阻塞workerChan可以有效提高效率,减少性能抖动workerChanCap = func() int {if runtime.GOMAXPROCS(0) == 1 {return 0}return 1}()defaultLogger = Logger(log.New(os.Stderr, "", log.LstdFlags))defaultAntsPool, _ = NewPool(DefaultAntsPoolSize)
)// 向工作池提交任务
func Submit(task func()) error {return defaultAntsPool.Submit(task)
}// 返回当前goroutine的并发数
func Running() int {return defaultAntsPool.Running()
}// 返回默认池的容量
func Cap() int {return defaultAntsPool.Cap()
}
...

默认goroutine池由NewPool函数生成,接下来我们看看它是如何实现的.


Poll & NewPoll()

type Pool struct {capacity int32    //协程池容量running int32     //当前并发的协程数量lock sync.Locker  //用于保护工作队列的锁workers workerArray  //用于存放可用workerstate int32       //用于提醒pool去关闭自己cond *sync.Cond   //用于等待获取可用worker的条件量workerCache sync.Pool  //用于在retrieveWorker函数中加速获取可用workerblockingNum int   //当前阻塞在pool.Submit过程的协程数量,其收到pool.lock保护options *Options  //协程池的配置信息
}func NewPool(size int, options ...Option) (*Pool, error) {opts := loadOptions(options...)    //载入配置if size <= 0 {size = -1     //size为负数意味着协程池容量是无限的}if expiry := opts.ExpiryDuration; expiry < 0 {return nil, ErrInvalidPoolExpiry} else if expiry == 0 {opts.ExpiryDuration = DefaultCleanIntervalTime}if opts.Logger == nil {opts.Logger = defaultLogger}p := &Pool{capacity: int32(size),lock:     internal.NewSpinLock(),options:  opts,}p.workerCache.New = func() interface{} {     //初始化一个worker的缓存,用于加快worker的获取return &goWorker{pool: p,task: make(chan func(), workerChanCap),}}if p.options.PreAlloc {    //进行worker预分配if size == -1 {        //size为负数说明协程池容量是无限大,因此不可能进行预分配操作return nil, ErrInvalidPreAllocSize  }p.workers = newWorkerArray(loopQueueType, size)} else {p.workers = newWorkerArray(stackType, 0)}p.cond = sync.NewCond(p.lock)// 开启一个按时间间隔自动清理协程池的协程go p.purgePeriodically()return p, nil
}

按照上述流程,我们就得到了一个标准协程池.接下来我们看看如何使用协程池吧.


Poll配套方法

  • submit:向协程池发布任务

    //注意:如果当前协程池没有可用worker,就会阻塞Pool.Submit()操作
    //想要避免这一点就需要在实例化协程池的时候,将NonBlocking参数设为true
    //这样的话可以在没有可用worker时直接返回nil
    func (p *Pool) Submit(task func()) error {if p.IsClosed() {return ErrPoolClosed}var w *goWorkerif w = p.retrieveWorker(); w == nil {return ErrPoolOverload}w.task <- taskreturn nil
    }
  • Cap & Running & IsClosed & Free & incRunning & decRunning:返回或更改协程池的元数据

    func (p *Pool) Running() int {return int(atomic.LoadInt32(&p.running))
    }func (p *Pool) Cap() int {return int(atomic.LoadInt32(&p.capacity))
    }func (p *Pool) IsClosed() bool {return atomic.LoadInt32(&p.state) == CLOSED
    }func (p *Pool) Free() int { //返回可用空间大小c := p.Cap()if c < 0 {return -1}return c - p.Running()
    }func (p *Pool) incRunning() {atomic.AddInt32(&p.running, 1)
    }func (p *Pool) decRunning() {atomic.AddInt32(&p.running, -1)
    }
  • retrieveWorker:获取一个可用的worker

    func (p *Pool) retrieveWorker() (w *goWorker) {spawnWorker := func() {w = p.workerCache.Get().(*goWorker)w.run()}p.lock.Lock()w = p.workers.detach()if w != nil { // 首先尝试从协程池的worker队列中取一个可用的workerp.lock.Unlock()} else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {//如果worker队列是空的,但是还没有用完协程池的容量,就使用从workerCache获取一个workerp.lock.Unlock()spawnWorker()} else { //否则我们需要等待一个worker被放回到协程池中if p.options.Nonblocking {  //如果是非阻塞,则直接返回p.lock.Unlock()return}retry:  //如果当前协程池阻塞在submit上的数量大于允许的数量 则直接返回if p.options.MaxBlockingTasks != 0 && p.blockingNum >= p.options.MaxBlockingTasks {p.lock.Unlock()return}p.blockingNum++p.cond.Wait() //阻塞等待一个可用的workerp.blockingNum-- //被清道夫(scavenger)唤醒var nw intif nw = p.Running(); nw == 0 { p.lock.Unlock()if !p.IsClosed() {spawnWorker()}return}if w = p.workers.detach(); w == nil { //再次尝试从worker队列中获取一个workerif nw < capacity {p.lock.Unlock()spawnWorker()return}goto retry    //到这里说明还是没有获取到可用的worker}p.lock.Unlock()}return
    }

    获取可用worker的流程如下:

    • 首先尝试从worker队列中获取一个worker

    • 如果worker队列为空则查看协程池容量是否足够,若足够则从workerCache中生成一个worker

    • 如果协程池容量不足且协程池是非阻塞模式,则直接返回nil

    • 如果协程池是阻塞模式,则先判断当前是否达到了允许的最大阻塞数量,若达到了则直接返回nil,

      如果还未达到最大阻塞数量,则调用ool.cond.Wait()陷入阻塞状态

    • 当被唤醒时,先查看当前运行中的worker数量,若为0则需要确认当前协程池状态,如果协程池关闭了就直接返回nil,

      如果协程池未关闭,则从workerCache中生成一个worker

    • 如果当前运行中的worker数量不为0,则重复上面的逻辑,先尝试获取worker队列中的worker,再根据协程池容量决定是否从workerCache中生成一个worker使用

    • 如果还是没有获取到worker,则goto retry,进入下一轮阻塞-唤醒周期

  • Release & Reboot:释放/重启 协程池

    // 关闭协程池以及worker队列
    func (p *Pool) Release() {atomic.StoreInt32(&p.state, CLOSED)p.lock.Lock()p.workers.reset()p.lock.Unlock()p.cond.Broadcast()    //有一些调用者阻塞在retrieveWorker中,为了避免它们被永久阻塞,我们需要唤醒他们
    }// 重启一个已经关闭的协程池
    func (p *Pool) Reboot() {if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {go p.purgePeriodically()}
    }
  • Tune:动态调节协程池容量

    // 调整协程池容量,注意:这对无限容量以及预分配空间的协程池是无效的
    func (p *Pool) Tune(size int) {capacity := p.Cap()if capacity == -1 || size <= 0 || size == capacity || p.options.PreAlloc {return}atomic.StoreInt32(&p.capacity, int32(size))if size > capacity {if size-capacity == 1 {  //出于效率的考虑,如果只是扩容一个单位,那么不需要进行全局广播,只唤醒一个阻塞的协程即可p.cond.Signal()return}p.cond.Broadcast()}
    }
  • purgePeriodically:作为一个清扫线程(scavenger)定期清理已过期的worker

    func (p *Pool) purgePeriodically() {heartbeat := time.NewTicker(p.options.ExpiryDuration)defer heartbeat.Stop()for range heartbeat.C {if p.IsClosed() {break}p.lock.Lock()expiredWorkers := p.workers.retrieveExpiry(p.options.ExpiryDuration) //从worker列表中获取过期workerp.lock.Unlock()//通知过期worker停止工作//通知必须在p.lock锁的外部,因为worker.task可能是阻塞的channel,//因此如果在锁的内部而且很多worker位于非本地CPU上的时候就会花费大量的时间,//这样会干扰正常的协程池运行工作for i := range expiredWorkers {expiredWorkers[i].task <- nilexpiredWorkers[i] = nil}//有一种情形:所有worker都被清扫了(即没有worker在运行),当一些调用者阻塞在pool.cond.Wait()中时//清扫线程需要唤醒他们if p.Running() == 0 {p.cond.Broadcast()}}
    }
  • revertWorker:将worker放回到worker列表中回收利用

    func (p *Pool) revertWorker(worker *goWorker) bool {if capacity := p.Cap(); (capacity > 0 && p.Running() > capacity) || p.IsClosed() {p.cond.Broadcast()return false}worker.recycleTime = time.Now() //更新被回收的时间p.lock.Lock()//为了避免内存泄露,需要在临界区内做双重检验(double check)if p.IsClosed() {p.lock.Unlock()return false}err := p.workers.insert(worker)if err != nil {p.lock.Unlock()return false}//通知陷入retrieveWorker调用阻塞的线程在worker列表中已经有可用的worker了p.cond.Signal()p.lock.Unlock()return true
    }
---## worker```go
type goWorker struct {pool *Pooltask chan func()recycleTime time.Time  //当被放回到worker队列时会更新
}func (w *goWorker) run() { //运行workerw.pool.incRunning()go func() {defer func() {    //清理工作w.pool.decRunning()w.pool.workerCache.Put(w)if p := recover(); p != nil {    //panic处理if ph := w.pool.options.PanicHandler; ph != nil {ph(p)} else {w.pool.options.Logger.Printf("worker exits from a panic: %v\n", p)var buf [4096]byten := runtime.Stack(buf[:], false)    //打印运行栈信息w.pool.options.Logger.Printf("worker exits from panic: %s\n", string(buf[:n]))}}// 唤醒等待获取worker的协程w.pool.cond.Signal()}()for f := range w.task {    //循环获取任务并执行if f == nil {return}f()if ok := w.pool.revertWorker(w); !ok {return}}}()
}

worker_array

Ants库通过定义一个workerArray接口,实现了两种worker队列:loopQueue和Stack

type workerArray interface {len() intisEmpty() boolinsert(worker *goWorker) errordetach() *goWorkerretrieveExpiry(duration time.Duration) []*goWorkerreset()
}type arrayType intconst (stackType arrayType = 1 << iotaloopQueueType
)func newWorkerArray(aType arrayType, size int) workerArray {switch aType {case stackType:return newWorkerStack(size)case loopQueueType:return newWorkerLoopQueue(size)default:return newWorkerStack(size)}
}

由于loopQueue和Stack队列原理基本一致,因此本文只剖析较为复杂的loopQueue

type loopQueue struct {    //循环队列.使用头尾指针和size定位items  []*goWorkerexpiry []*goWorkerhead   inttail   intsize   intisFull bool
}func newWorkerLoopQueue(size int) *loopQueue {return &loopQueue{items: make([]*goWorker, size),size:  size,}
}func (wq *loopQueue) len() int {    //返回worker队列中worker个数if wq.size == 0 {return 0}if wq.head == wq.tail {if wq.isFull {return wq.size}return 0}if wq.tail > wq.head {    return wq.tail - wq.head}return wq.size - wq.head + wq.tail
}func (wq *loopQueue) isEmpty() bool {return wq.head == wq.tail && !wq.isFull
}func (wq *loopQueue) insert(worker *goWorker) error { //插入队列尾部...wq.items[wq.tail] = workerwq.tail++if wq.tail == wq.size {    //尾部越界了,将其指向队列头部wq.tail = 0}if wq.tail == wq.head {    //记得处理队列满的情况wq.isFull = true}return nil
}func (wq *loopQueue) detach() *goWorker {    //从队列头取一个workerif wq.isEmpty() {return nil}w := wq.items[wq.head]wq.items[wq.head] = nilwq.head++if wq.head == wq.size {    //同上 越界时将其指向队列头部wq.head = 0}wq.isFull = false    //更新队列状态return w
}func (wq *loopQueue) retrieveExpiry(duration time.Duration) []*goWorker { //获取过期的worker列表expiryTime := time.Now().Add(-duration)index := wq.binarySearch(expiryTime)    //折半查找最近的过期workerif index == -1 {return nil}wq.expiry = wq.expiry[:0]if wq.head <= index {    //说明此时head < tailwq.expiry = append(wq.expiry, wq.items[wq.head:index+1]...)for i := wq.head; i < index+1; i++ {    wq.items[i] = nil}} else {    //此时head > tail 需要将两部分expiry都添加进过期列表wq.expiry = append(wq.expiry, wq.items[0:index+1]...)wq.expiry = append(wq.expiry, wq.items[wq.head:]...)for i := 0; i < index+1; i++ {wq.items[i] = nil}for i := wq.head; i < wq.size; i++ {wq.items[i] = nil}}head := (index + 1) % wq.size    //找到新head的位置wq.head = headif len(wq.expiry) > 0 {wq.isFull = false}return wq.expiry
}func (wq *loopQueue) binarySearch(expiryTime time.Time) int {var mid, nlen, basel, tmid intnlen = len(wq.items)// 如果worker队列已经为空/队列头还未到失效时间,则直接返回-1if wq.isEmpty() || expiryTime.Before(wq.items[wq.head].recycleTime) {return -1}//在worker_stack的算法基础上将head和tail映射到left和rightr := (wq.tail - 1 - wq.head + nlen) % nlenbasel = wq.headl := 0for l <= r {mid = l + ((r - l) >> 1)// 根据映射的mid计算真正的midtmid = (mid + basel + nlen) % nlenif expiryTime.Before(wq.items[tmid].recycleTime) {r = mid - 1} else {l = mid + 1}}// 根据映射索引返回真正的索引return (r + basel + nlen) % nlen
}func (wq *loopQueue) reset() {    //重置队列if wq.isEmpty() {return}Releasing:if w := wq.detach(); w != nil {w.task <- nilgoto Releasing}wq.items = wq.items[:0]wq.size = 0wq.head = 0wq.tail = 0
}

更纯粹的pool---PoolWithFunc

有时候我们只需要协程池做一件事,这时我们可以使用PoolWithFunc,将工作函数与协程池绑定以获得更好的性能.

PoolWithFunc结构体

type PoolWithFunc struct {capacity int32running int32lock sync.Lockerworkers []*goWorkerWithFuncstate int32cond *sync.CondpoolFunc func(interface{})    //存储协程池绑定的函数workerCache sync.PoolblockingNum intoptions *Options
}

与普通协程池不同的是,PoolWithFunc使用切片来管理空闲的worker.并且由于和工作函数绑定,每个worker都执行相同的任务

func (p *PoolWithFunc) Invoke(args interface{}) error {if p.IsClosed() {return ErrPoolClosed}var w *goWorkerWithFuncif w = p.retrieveWorker(); w == nil {return ErrPoolOverload}w.args <- argsreturn nil
}

PoolWithFunc发布任务只需要调用Invoke,传入参数包即可.

goWorkerWithFunc结构体

type goWorkerWithFunc struct {pool *PoolWithFuncargs chan interface{}recycleTime time.Time
}

与普通worker相比,goWorkerWithFunc只有args Channel用于接受参数包,然后执行工作函数,其run方法与普通worker基本一致.

func (w *goWorkerWithFunc) run() {w.pool.incRunning()go func() {defer func() {...w.pool.cond.Signal()}()for args := range w.args {if args == nil {return}w.pool.poolFunc(args)if ok := w.pool.revertWorker(w); !ok {return}}}()
}

性能测试

我运行了一下官方的基准测试,发现使用ants库以后,内存耗费明显减少了,如果使用PoolWithFunc则可以进一步减少内存消耗.可以见得ants库在大量并发的情况下能大大减少内存占用.

goos: linux
goarch: amd64
pkg: github.com/panjf2000/ants/v2
cpu: Intel(R) Xeon(R) Platinum 8269CY CPU @ 2.50GHz
BenchmarkGoroutines-2                 10         124269653 ns/op         9002868 B/op     102309 allocs/op
BenchmarkAntsPool-2                    9         114329539 ns/op         2376181 B/op     110609 allocs/op
BenchmarkAntsPoolWithFunc-2            9         111820051 ns/op          795664 B/op      10627 allocs/op
PASS
ok      github.com/panjf2000/ants/v2    83.012sgoos: linux
goarch: amd64
pkg: github.com/panjf2000/ants/v2
cpu: Intel(R) Xeon(R) Platinum 8269CY CPU @ 2.50GHz
BenchmarkGoroutines-2                  9         116295918 ns/op         8101271 B/op     101055 allocs/op
BenchmarkAntsPool-2                    9         125107370 ns/op         2351351 B/op     110351 allocs/op
BenchmarkAntsPoolWithFunc-2            9         116057004 ns/op          783399 B/op      10499 allocs/op
PASS
ok      github.com/panjf2000/ants/v2    85.388s

总结

至此,Ants库的源码分析到此为止,希望大家能通过这篇文章了解ants库的设计思路以及实现方法.

references:

GMP 并发调度器深度解析之手撸一个高性能 goroutine pool - Strike Freedom

panjf2000/ants:

高性能goroutine池---ants(2.5.0) 源码解析相关推荐

  1. solrlucene3.6.0源码解析(三)

    solr索引操作(包括新增 更新 删除 提交 合并等)相关UML图如下 从上面的类图我们可以发现,其中体现了工厂方法模式及责任链模式的运用 UpdateRequestProcessor相当于责任链模式 ...

  2. Heritrix 3.1.0 源码解析(八)

    本文接着分析存储CrawlURI curi的队列容器,最重要的是BdbWorkQueue类及BdbMultipleWorkQueues类 BdbWorkQueue类继承自抽象类WorkQueue,抽象 ...

  3. Heritrix 3.1.0 源码解析(六)

    本文分析BdbFrontier对象的相关状态和方法 BdbFrontier类继承自WorkQueueFrontier类   WorkQueueFrontier类继承自AbstractFrontier类 ...

  4. Heritrix 3.1.0 源码解析(十一)

    上文分析了Heritrix3.1.0系统是怎么添加CrawlURI curi对象的,那么在系统初始化的时候,是怎么载入CrawlURI curi种子的呢? 我们回顾前面的文章,在我们执行采集任务的la ...

  5. Heritrix 3.1.0 源码解析(三十四)

    本文主要分析FetchFTP处理器,该处理器用于ftp文件的下载,该处理器的实现是通过封装commons-net-2.0.jar组件来实现ftp文件下载 在FetchFTP处理器里面定义了内部类Soc ...

  6. Heritrix 3.1.0 源码解析(十四)

    我在分析BdbFrontier对象的void schedule(CrawlURI caURI).CrawlURI next() .void finished(CrawlURI cURI)方法是,其实还 ...

  7. 锚框、交并比和非极大值抑制(tf2.0源码解析)

    锚框.交并比和非极大值抑制(tf2.0源码解析) 文章目录 锚框.交并比和非极大值抑制(tf2.0源码解析) 一.锚框生成 1.锚框的宽高 2.锚框的个数 3.注意点(★★★) 4.tf2.0代码 二 ...

  8. 基于8.0源码解析:startService 启动过程

    基于8.0源码解析:startService 启动过程 首先看一张startService的图,心里有个大概的预估,跟Activity启动流程比,Service的启动稍微简单点,并且我把Service ...

  9. Android Glide 3.7.0 源码解析(八) , RecyclableBufferedInputStream 的 mark/reset 实现

    个人博客传送门 一.mark / reset 的作用 Android Glide 3.7.0 源码解析(七) , 细说图形变换和解码有提到过RecyclableBufferedInputStream ...

最新文章

  1. 基于matlab的硅晶体模型,基于Matlab的图像处理技术识别硅太阳电池的缺陷
  2. 电信设备产品简介材料收集
  3. router vue 动态改变url_Vue教程(路由router-基本使用)
  4. Linux CentOS 7下 Apache Tomcat 7 安装与配置
  5. Ubuntu16.04 php7.0+mysql5.7+apache2环境搭配
  6. JavaSE第九天20160815
  7. 设计模式:原型模式(C++)【克隆羊多莉】
  8. arduino控制直流电机_Arduino的高电流直流电机控制板
  9. [2019杭电多校第五场][hdu6625]three arrays(01字典树)
  10. ECSHOP商城SEO优化商城更新内容后将页面链接自动推送给百度
  11. SUBMAIL 短网址 API 授权与验证机制
  12. 透镜成像、眼球成像、小孔成像原理
  13. android开发之高仿中国建设银行App
  14. 数据中心机房设备发热量精确计算方法
  15. python代码变量作业_1作业python数据类型 条件循环 列表
  16. 当年腾讯为什么从QQ转移扶植到微信,如今微信已撑起腾讯半壁江山
  17. pg数据库插件timescale时序库使用记录
  18. 【口才】谈判说服技巧及策略
  19. 自定义Camera系列之:SurfaceView + Camera2
  20. 如何助力银行精准营销

热门文章

  1. linux求数组的交集,shell/bash 交集、并集、差集
  2. Win7或是过渡品Win8前瞻消息全预览
  3. Android通知栏不显示 - RemoteView
  4. 100个python算法超详细讲解:黑白子交换
  5. PV操作每日一题-黑白棋子问题
  6. 低功耗产品休眠唤醒电池用电功耗计算
  7. RecyclerView条目多样式显示
  8. 云享M密码:云享电子烟M1,有点意思
  9. 安徽专业知识计算机考试试卷,安徽计算机一级考试试题及答案
  10. POJ1011-Sticks-经典搜索题详解优化