一、背景

本文基于client-go@v0.16.4

之前业务中有这么一段伪代码:

func batchCreateVmi(ds []UserData) {wg := &sync.WaitGroup{}// vmiNum会动态改变,开发测试一般只有十几// 生产能到200(有逻辑限制了上限200)const vmiNum = 200for i := 0; i < vmiNum; i++ {wg.Add(1)createOneVmi(wg, ds[i])}wg.Wait()
}func createOneVmi(wg *sync.WaitGroup, data UserData) {defer wg.Done()// 15秒超时的contextctx, _ := context.WithTimeout(context.Backgroud, 15*time.Second)// 从数据库中查询vmi模板数据vmiTemplate, _ := getVmiTemplateFromDB(ctx, data.TemplateID)// 用户数据和模板数据结合target := combindTemplateAndData(vmiTemplate, data)// client-go调api server接口创建vmi// 和其它协程用的相同的clientcreateVmi(target)// 更新数据库中的状态// 注意这里用的还是最开始的contextif err := updateDBStatus(ctx, data); err != nil {log.Errorf("updateDBStatus error: %s", err.Error())return}
}

开发测试功能验证并没有发现什么问题,然而到了生产之后,出现大量的updateDBStatus error: context deadline错误,很明显是context超时了,正常来说整个函数走完都是在1秒之内的,到底是哪个环节导致的15秒context的超时呢?

在这段代码中,有数据库操作和调apiServer接口创建vmi的操作。首先怀疑是否是数据库性能问题——查看数据库性能监控数据,未发现异常;接着怀疑apiServer压力——同样没有发现异常信息;再查看服务到数据库和服务到apiServer之间的网络——仍未发现异常。

最后,怀疑客户端有问题。通过查阅一些资料,发现可能是client-go默认的QPS和Burst参数导致的:默认QPS为5,Burst为10

// k8s.io/client-go/rest/config.go// Config holds the common attributes that can be passed to a Kubernetes client on
// initialization.
type Config struct {/*...*/// QPS indicates the maximum QPS to the master from this client.// If it's zero, the created RESTClient will use DefaultQPS: 5QPS float32// Maximum burst for throttle.// If it's zero, the created RESTClient will use DefaultBurst: 10.Burst int/*...*/
}

QPS和Burst是client-go令牌桶限流的两个参数,其中QPS=5表示每秒产生5个令牌放到令牌桶中,Burst=10表示令牌桶的容量是10,client-go只有拿到了令牌才能对apiServer发请求,这意味着默认最多一秒钟发送15个请求(令牌桶中10个令牌+这一秒新产生的5个令牌),但是第二秒开始只有新产生的5个令牌,因此第二秒只能发送5个请求。开发测试流量小,最多也就是十几并发的样子,生产高峰时段能到上限200,如果每秒发送5个,200个就需要200/5=40秒,远远超过context的15秒超时时间。

在开发环境上量压测,复现问题;增大QPS和Burst,不再出现超时现象:

import ("k8s.io/client-go/kubernetes""k8s.io/client-go/tools/clientcmd""myproject/myconfig" // 项目中的配置项
)func initClientSe() error {/*...*/restConfig, err := clientcmd.BuildConfigFromFlags("", kubeconfig)if err != nil {return err}// 如果配置文件中配置了QPS和Burst,以配置文件中的为准// 很多开源项目把QPS和Burst都配置为100if myconfig.QPS > 0 {restConfig.QPS = myconfig.QPS}if myconfig.Burst > 0 {restConfig.Burst = myconfig.Burst}// 修改QPS和Burst后初始化clientSetclientSet, err := kubernetes.NewForConfig(restConfig)/*..*/
}

二、client-go令牌桶限流源码分析

接着前文client-go初始化部分,在kubernetes.NewForConfig中有对应的限流初始化逻辑:

// k8s.io/client-go/kubernetes/lientset.go// NewForConfig creates a new Clientset for the given config.
// If config's RateLimiter is not set and QPS and Burst are acceptable,
// NewForConfig will generate a rate-limiter in configShallowCopy.
func NewForConfig(c *rest.Config) (*Clientset, error) {configShallowCopy := *cif configShallowCopy.RateLimiter == nil && configShallowCopy.QPS > 0 {if configShallowCopy.Burst <= 0 {return nil, fmt.Errorf("Burst is required to be greater than 0 when RateLimiter is not set and QPS is set to greater than 0")}configShallowCopy.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(configShallowCopy.QPS, configShallowCopy.Burst)}/*...*/
}

可以看到有调用flowcontrol(流控)包下的NewTokenBucketRateLimiter方法初始化令牌桶限速器,返回一个RateLimiter,RateLimiter定义如下:

type RateLimiter interface {// TryAccept returns true if a token is taken immediately. Otherwise,// it returns false.TryAccept() bool// Accept returns once a token becomes available.Accept()// Stop stops the rate limiter, subsequent calls to CanAccept will return falseStop()// QPS returns QPS of this rate limiterQPS() float32// Wait returns nil if a token is taken before the Context is done.Wait(ctx context.Context) error
}

client-go请求apiServer的request函数有如下逻辑,每次请求apiServer都会先执行tryThrottle:

// k8s.io/client-go/rest/request.go// Do formats and executes the request. Returns a Result object for easy response
// processing.
//
// Error type:
//  * If the request can't be constructed, or an error happened earlier while building its
//    arguments: *RequestConstructionError
//  * If the server responds with a status: *errors.StatusError or *errors.UnexpectedObjectError
//  * http.Client.Do errors are returned directly.
func (r *Request) Do() Result {if err := r.tryThrottle(); err != nil {return Result{err: err}}var result Resulterr := r.request(func(req *http.Request, resp *http.Response) {result = r.transformResponse(resp, req)})if err != nil {return Result{err: err}}return result
}

tryThrottle就是与令牌桶限速关系密切的函数,其实现如下:

// k8s.io/client-go/rest/request.go
func (r *Request) tryThrottle() error {if r.throttle == nil {return nil}now := time.Now()var err errorif r.ctx != nil {err = r.throttle.Wait(r.ctx)} else {r.throttle.Accept()}if latency := time.Since(now); latency > longThrottleLatency {klog.V(4).Infof("Throttling request took %v, request: %s:%s", latency, r.verb, r.URL().String())}return err
}

tryThrottle函数主要涉及r.throttle.Waitr.throttle.Accept两个函数,而r.throttle正是前面NewForConfig中初始化的RateLimiter。因此我们来看看RateLimiter的Wait和Accept实现:

// k8s.io/client-go/util/flowcontrol/throttle.gofunc (t *tokenBucketRateLimiter) Wait(ctx context.Context) error {return t.limiter.Wait(ctx)
}func (t *tokenBucketRateLimiter) Accept() {now := t.clock.Now()t.clock.Sleep(t.limiter.ReserveN(now, 1).DelayFrom(now))
}

可以看到Wait和Accept基本没什么逻辑,主要还是调用limter的Wait方法和ReserveN方法。limiter初始化如下:

// k8s.io/client-go/util/flowcontrol/throttle.goimport ("golang.org/x/time/rate"
)// NewTokenBucketRateLimiter creates a rate limiter which implements a token bucket approach.
// The rate limiter allows bursts of up to 'burst' to exceed the QPS, while still maintaining a
// smoothed qps rate of 'qps'.
// The bucket is initially filled with 'burst' tokens, and refills at a rate of 'qps'.
// The maximum number of tokens in the bucket is capped at 'burst'.
func NewTokenBucketRateLimiter(qps float32, burst int) RateLimiter {limiter := rate.NewLimiter(rate.Limit(qps), burst)return newTokenBucketRateLimiter(limiter, realClock{}, qps)
}func newTokenBucketRateLimiter(limiter *rate.Limiter, c Clock, qps float32) RateLimiter {return &tokenBucketRateLimiter{limiter: limiter,clock:   c,qps:     qps,}
}

limiter是golang.org/x/time/rate包中初始化的对象,我们来看看这个包下Wait和ReserveN两个方法的实现。

golang.org/x/time/rate包对应github上github.com/golang/time/rate

// Wait is shorthand for WaitN(ctx, 1).
func (lim *Limiter) Wait(ctx context.Context) (err error) {return lim.WaitN(ctx, 1)
}// WaitN blocks until lim permits n events to happen.
// It returns an error if n exceeds the Limiter's burst size, the Context is
// canceled, or the expected wait time exceeds the Context's Deadline.
// The burst limit is ignored if the rate limit is Inf.
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {// The test code calls lim.wait with a fake timer generator.// This is the real timer generator.newTimer := func(d time.Duration) (<-chan time.Time, func() bool, func()) {timer := time.NewTimer(d)return timer.C, timer.Stop, func() {}}return lim.wait(ctx, n, time.Now(), newTimer)
}// wait is the internal implementation of WaitN.
func (lim *Limiter) wait(ctx context.Context, n int, t time.Time, newTimer func(d time.Duration) (<-chan time.Time, func() bool, func())) error {lim.mu.Lock()burst := lim.burstlimit := lim.limitlim.mu.Unlock()if n > burst && limit != Inf {return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, burst)}// Check if ctx is already cancelledselect {case <-ctx.Done():return ctx.Err()default:}// Determine wait limitwaitLimit := InfDurationif deadline, ok := ctx.Deadline(); ok {waitLimit = deadline.Sub(t)}// Reserver := lim.reserveN(t, n, waitLimit)if !r.ok {return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)}// Wait if necessarydelay := r.DelayFrom(t)if delay == 0 {return nil}ch, stop, advance := newTimer(delay)defer stop()advance() // only has an effect when testingselect {case <-ch:// We can proceed.return nilcase <-ctx.Done():// Context was canceled before we could proceed.  Cancel the// reservation, which may permit other events to proceed sooner.r.Cancel()return ctx.Err()}
}// reserveN is a helper method for AllowN, ReserveN, and WaitN.
// maxFutureReserve specifies the maximum reservation wait duration allowed.
// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN.
func (lim *Limiter) reserveN(t time.Time, n int, maxFutureReserve time.Duration) Reservation {lim.mu.Lock()defer lim.mu.Unlock()if lim.limit == Inf {return Reservation{ok:        true,lim:       lim,tokens:    n,timeToAct: t,}} else if lim.limit == 0 {var ok boolif lim.burst >= n {ok = truelim.burst -= n}return Reservation{ok:        ok,lim:       lim,tokens:    lim.burst,timeToAct: t,}}t, tokens := lim.advance(t)// Calculate the remaining number of tokens resulting from the request.tokens -= float64(n)// Calculate the wait durationvar waitDuration time.Durationif tokens < 0 {waitDuration = lim.limit.durationFromTokens(-tokens)}// Decide resultok := n <= lim.burst && waitDuration <= maxFutureReserve// Prepare reservationr := Reservation{ok:    ok,lim:   lim,limit: lim.limit,}if ok {r.tokens = nr.timeToAct = t.Add(waitDuration)// Update statelim.last = tlim.tokens = tokenslim.lastEvent = r.timeToAct}return r
}// advance calculates and returns an updated state for lim resulting from the passage of time.
// lim is not changed.
// advance requires that lim.mu is held.
func (lim *Limiter) advance(t time.Time) (newT time.Time, newTokens float64) {last := lim.lastif t.Before(last) {last = t}// Calculate the new number of tokens, due to time that passed.elapsed := t.Sub(last)delta := lim.limit.tokensFromDuration(elapsed)tokens := lim.tokens + deltaif burst := float64(lim.burst); tokens > burst {tokens = burst}return t, tokens
}// tokensFromDuration is a unit conversion function from a time duration to the number of tokens
// which could be accumulated during that duration at a rate of limit tokens per second.
func (limit Limit) tokensFromDuration(d time.Duration) float64 {if limit <= 0 {return 0}return d.Seconds() * float64(limit)
}

三、总结

golang.org/x/time/rate下的rate.go文件只有400多行,上个章节有省略部分代码,如果对省略部分有兴趣,可以自行研读官网上的代码。

从前面的代码来看,client-go的令牌桶限流实现有以下几个注意点:

  1. Burst表示令牌桶的大小,QPS表示1s可以产生多少令牌。在golang.org/x/time/rate包中,如果QPS设置为InfDuration(1<<63-1,表示无限大),则client每次都能立刻拿到令牌,不会受限于Burst;
  2. 如果client-go Burst=10,QPS=5,则最多一秒内可以有15个请求可以拿到令牌(令牌桶10个+这1秒内新产生的5个),但不表示每秒都能有15个请求能拿到令牌;
  3. golang.org/x/time/rate产生令牌并没有单独起个协程定时往令牌桶里放令牌,而是当有拿令牌的请求过来时,计算当前时间与上次生成令牌的时间差,再结合QPS参数,往令牌桶里生成对应数量的令牌(如果时间间隔太长,计算出来的令牌数量大于Burst,则生成Burst个令牌),再从令牌桶中拿对应数量的令牌(client-go中1个请求1个令牌)。

因此,有如下示意图:

微信公众号卡巴斯同步发布,欢迎大家关注。

client-go QPS、Burst和令牌桶相关推荐

  1. 限流10万QPS、跨域、过滤器、令牌桶算法-网关Gateway内容都在这儿

    点击上方 好好学java ,选择 星标 公众号 重磅资讯.干货,第一时间送达 今日推荐:硬刚一周,3W字总结,一年的经验告诉你如何准备校招! 个人原创100W+访问量博客:点击前往,查看更多 作者:雄 ...

  2. 限流算法之漏桶算法、令牌桶算法

    限流 每个API接口都是有访问上限的,当访问频率或者并发量超过其承受范围时候,我们就必须考虑限流来保证接口的可用性或者降级可用性.即接口也需要安装上保险丝,以防止非预期的请求对系统压力过大而引起的系统 ...

  3. c++ lua 可以做什么_Redis令牌桶算法(全网最全,后续可以接入lua做原子性操作)...

    一 .场景描述 在开发接口服务器的过程中,为了防止客户端对于接口的滥用,保护服务器的资源, 通常来说我们会对于服务器上的各种接口进行调用次数的限制.比如对于某个 用户,他在一个时间段(interval ...

  4. 十三水算法php_基于PHP+Redis令牌桶限流

    一 .场景描述 在开发接口服务器的过程中,为了防止客户端对于接口的滥用,保护服务器的资源, 通常来说我们会对于服务器上的各种接口进行调用次数的限制.比如对于某个 用户,他在一个时间段(interval ...

  5. php限制接口访问次数_令牌桶限流思路分享(PHP+Redis实现机制)

    一 .场景描述 在开发接口服务器的过程中,为了防止客户端对于接口的滥用,保护服务器的资源, 通常来说我们会对于服务器上的各种接口进行调用次数的限制.比如对于某个 用户,他在一个时间段(interval ...

  6. 高并发系统限流-漏桶算法和令牌桶算法

    参考: https://www.cnblogs.com/xuwc/p/9123078.html http://www.cnblogs.com/LBSer/p/4083131.html https:// ...

  7. sentinel 时间窗口_Sentinel使用令牌桶实现预热【原理源码】

    前言 Sentinel的QPS流控效果有快速失败.预热模式.排队等待.预热+排队等待模式,本文主要分析预热模式中是如何使用令牌桶算法限流的. 一.流控效果源码结构 在FlowRule更新缓存时,根据配 ...

  8. 令牌桶算法和漏桶算法有什么区别_高并发之限流,到底限的什么鬼 (精品长文)...

    你可能知道高并发系统需要限流这个东西,但具体是限制的什么,该如何去做,还是模凌两可.我们接下来系统性的给它归个小类,希望对你有所帮助. google guava中提供了一个限流实现: RateLimi ...

  9. CIR,CBS,EBS,PIR,PBS傻傻分不清楚?看这里!—-揭秘令牌桶

    概述 春暖花开的时候,大家都开着汽车外出旅游欣赏美丽的风景,却被堵在高速公路上,你是否为此感到痛苦?但如果有一种机制可以评估高速公路上的车流量.控制车流情况,确保进入高速公路的汽车都能在路上安全畅行, ...

最新文章

  1. ensp大型网络环境设计与实现_mongodb内核源码设计实现、性能优化、最佳运维系列-网络传输层模块源码实现三...
  2. Calendar使用方法
  3. 自然语言处理在医学领域的应用
  4. linux vino vnc,CentOS 远程桌面(vnc,vino)
  5. 可怕!那些你看不到的进程
  6. 观察者模式在SAP ui5修改theme实现中的应用
  7. linux脚本调用job,linux shell - 脚本中调用fg调取后台任务报错
  8. setInterval和setTImeout中的this指向问题
  9. AAAI、IJCAI和ACL录用三名清华本科生成果,华人NLP最杰出HowNet成功融入DL模型
  10. 棋牌游戏框架解析(一)
  11. Swiper、vue-awesome-swiper中文文档
  12. 废旧安卓手机利用(一)安装linux系统(Centos、Debian)
  13. 浏览器无法访问此网站,连接已被重置,无法加载
  14. 记一次mysql5.7的新特性derived_merge的坑
  15. ICMP协议及报文格式
  16. 坐在电脑前是高一点好还是低一点好
  17. 迅镭激光20000瓦高速切割机顺利交付柳工,助力工程机械行业高速发展!
  18. jstl标签库jar包下载
  19. ASP NET - ArrayList 对象 方法描述
  20. 下载 github上面脚本_带大家一起来白嫖一波Github的免费计算资源呀~

热门文章

  1. 看看一位清华计算机专业的学生怎么看LINUX与WINDOWS的! (转载)
  2. 《Java后端性能调优实战方案手册》,看完至少阿里P7
  3. 机器人社社长事迹_【青春榜样】佳木斯大学十佳科技创新创新标兵事迹——黄鹏...
  4. 通过read读入一个网址,将网址解析赋值给一个数组
  5. 【图片新闻】DARPA的新设想可能将海洋生物群体变成一个巨大的潜艇探测网络
  6. C语言实现单片机整点蜂鸣器报时
  7. 两车追及或相遇问题(hdu1275)数学题
  8. x570安装服务器系统,技嘉 X570 AORUS MASTER主板u盘装系统win7教程
  9. 在ubuntu下看电视
  10. springboot手机推荐网站 毕业设计-附源码052329