今天聊一下gRPC的服务发现和负载均衡原理相关的话题,不同于NginxLvs或者F5这些服务端的负载均衡策略,gRPC采用的是客户端实现的负载均衡。什么意思呢,对于使用服务端负载均衡的系统,客户端会首先访问负载均衡的域名/IP,再由负载均衡按照策略分发请求到后端具体某个服务节点上。而对于客户端的负载均衡则是,客户端从可用的后端服务节点列表中根据自己的负载均衡策略选择一个节点直连后端服务器。

Etcd软件包的naming组件里提供了一个命名解析器(naming resolver)结合gRPC本身自带的RoundRobin 轮询调度负载均衡器,让使用者能方便地搭建起一套服务注册/发现和负载均衡体系。如果轮询调度满足不了调度需求或者不想使用Etcd作为服务的注册中心和命名解析器的话,可以通过写代码实现gRPC定义的ResolverBalancer接口来满足系统的自定义需求。

本文引用的源码对应的版本为:gRPC v1.2.x、  Etcd  v3.3

如果你对gRPC和Etcd还不了解,可以先看看我很早之前写的gRPC入门和Etcd入门 系列的文章。

gRPC服务注册发现

先来简单的说明一下用Etcd实现服务注册和发现的原理。服务注册和发现这个流程可以用下面这个示意图简单描述出来:

上图的服务A包含了两个节点,服务在节点上启动后,会以包含服务名加节点IP的唯一标识作为Key(比如/service/a/114.128.45.117),服务节点IP和端口信息作为值存储到Etcd上。这些Key都是带租约的Key,需要我们的服务自己去定期续租,一旦服务节点本身宕掉,比如node2上的服务宕掉,无法完成续租后,那么它对应的Key:/service/a/114.128.45.117 就会过期,客户端也就无法再从Etcd上获取到这个服务节点的信息了。

与此同时客户端也会利用EtcdWatch功能监听以/servive/a为前缀的所有Key的变化,如果有新增或者删除节点Key的事件发生Etcd都会通过WatchChan发送给客户端,WatchChan在编程语言上的实现就是GoChannel

服务注册

关于Etcd的服务注册,官方提供的软件包里并没有提供统一的注册函数供调用。那么我们在新增服务节点后怎么把节点的信息存储到Etcd上并通知给命名解析器呢?在Etcd源码包的naming/grpc.go里可以发现提供了一个Update方法,这个Update既能执行添加也能执行删除操作:

func (gr *GRPCResolver) Update(ctx context.Context, target string, nm naming.Update, opts ...etcd.OpOption) (err error) {switch nm.Op {case naming.Add:var v []byteif v, err = json.Marshal(nm); err != nil {return status.Error(codes.InvalidArgument, err.Error())}_, err = gr.Client.KV.Put(ctx, target+"/"+nm.Addr, string(v), opts...)case naming.Delete:_, err = gr.Client.Delete(ctx, target+"/"+nm.Addr, opts...)default:return status.Error(codes.InvalidArgument, "naming: bad naming op")}return err
}

服务在启动完成后可以通过Update方法把自己的服务地址和端口Put到自定义的target为前缀的key里,针对上面图示里的例子,变量target就应该是我们定义的服务名/service/a。一般在具体实践里都是自己根据系统的需求封装Update方法完成服务注册,以及服务节点Key在Etcd上的定期续租,这块每个公司的实践都不一样,我就不放具体的代码了,一般续租都是通过Etcd租约里的KeepAlive方法实现的(Lease.KeepAlive)。

服务发现

在注册完新节点、或者是原来的节点停掉后,客户端是怎么知道的呢?这块就需要命名解析器Resolver来帮助实现了,Resolver的作用可以理解为从一个字符串映射到一组IP端口等信息。

gRPC对Resolver的接口定义如下:

type Resolver interface {// Resolve creates a Watcher for target.Resolve(target string) (Watcher, error)
}

命名解析器的Resolve方法会返回一个Watcher,这个Watcher可以监听命名解析器发来的target(类似上面例子里说的与服务名相对应的Key)对应的后端服务器地址信息变化,通知Balancer对自己维护的地址进行动态地增删。

Watcher接口的定义如下:

//源码地址 https://github.com/grpc/grpc-go/blob/v1.2.x/naming/naming.go
type Watcher interface {Next() ([]*Update, error)// Close closes the Watcher.Close()
}

Etcd为这两个接口都提供了实现:

// 源码地址:https://github.com/etcd-io/etcd/blob/release-3.3/clientv3/naming/grpc.go// GRPCResolver 实现了grpc的naming.Resolver接口
type GRPCResolver struct {// Client is an initialized etcd client.Client *etcd.Client
}func (gr *GRPCResolver) Resolve(target string) (naming.Watcher, error) {ctx, cancel := context.WithCancel(context.Background())w := &gRPCWatcher{c: gr.Client, target: target + "/", ctx: ctx, cancel: cancel}return w, nil
}// 实现了grpc的naming.Watcher接口
type gRPCWatcher struct {c      *etcd.Clienttarget stringctx    context.Contextcancel context.CancelFuncwch    etcd.WatchChanerr    error
}func (gw *gRPCWatcher) Next() ([]*naming.Update, error) {if gw.wch == nil {// first Next() returns all addressesreturn gw.firstNext()}// process new events on target/*wr, ok := <-gw.wchif !ok {...updates := make([]*naming.Update, 0, len(wr.Events))for _, e := range wr.Events {var jupdate naming.Updatevar err errorswitch e.Type {case etcd.EventTypePut:err = json.Unmarshal(e.Kv.Value, &jupdate)jupdate.Op = naming.Addcase etcd.EventTypeDelete:err = json.Unmarshal(e.PrevKv.Value, &jupdate)jupdate.Op = naming.Deletedefault:continue}if err == nil {updates = append(updates, &jupdate)}}return updates, nil
}func (gw *gRPCWatcher) firstNext() ([]*naming.Update, error) {// 获取前缀为gw.target的所有Key的值,放到现有数组里resp, err := gw.c.Get(gw.ctx, gw.target, etcd.WithPrefix(), etcd.WithSerializable())if gw.err = err; err != nil {return nil, err}updates := make([]*naming.Update, 0, len(resp.Kvs))for _, kv := range resp.Kvs {var jupdate naming.Updateif err := json.Unmarshal(kv.Value, &jupdate); err != nil {continue}updates = append(updates, &jupdate)}opts := []etcd.OpOption{etcd.WithRev(resp.Header.Revision + 1), etcd.WithPrefix(), etcd.WithPrevKV()}// watch 监听这些Key的变化,包括前缀相同的新Key的加入gw.wch = gw.c.Watch(gw.ctx, gw.target, opts...)return updates, nil
}func (gw *gRPCWatcher) Close() { gw.cancel() }

这部分GRPCResolvergRPCWatcher类型的每个方法的功能和起到的作用都和RoundRobin这个gRPC Balancer结合地比较紧密,我准备放到下面和负载均衡的源码实现一起说明。

负载均衡

首先我们来看一下gRPC对负载均衡的接口定义:

type Balancer interface {Start(target string, config BalancerConfig) errorUp(addr Address) (down func(error))Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error)Notify() <-chan []Address// Close shuts down the balancer.Close() error
}

在gRPC 客户端与服务端之间建立连接时调用的Dail方法里可以用WithBalancer方法在DiaplOption里指定负载均衡组件:

  client, err := etcd.Client()...resolver := &naming.GRPCResolver{Client: client}b := grpc.RoundRobin(resolver)opt0 := grpc.WithBalancer(b)grpc.Dial(target, opt0 , opt1, ...) // 后面省略了

上面的例子使用了gRPC自带的Balancer实现RoundRobin,RoundRobin除了实现了Balancer接口外自己内置了Resolver用来从名字获取其后绑定的IP信息以及服务的更新事件(增加删除服务节点这些事件) 。上面的例子里给RoundRobin指定了Etcd提供的name.GRPCResolver做为它的命名解析器,这个命名解析器就是上一节说的Etcd软件包里提供的gRPCnaming.Resolver接口实现。

RoundRobin

下面我们研究一下gRPC包里提供的RoundRobin代码实现,主要关注负载均衡和利用Resolver进行服务发现及节点更新这两个功能的代码实现原理

RoundRobin结构体定义如下:

// 源码在:https://github.com/grpc/grpc-go/blob/v1.2.x/balancer.go
type roundRobin struct {r      naming.Resolverw      naming.Watcheraddrs  []*addrInfo // 客户端可以尝试连接的所有地址mu     sync.MutexaddrCh chan []Address // 用于通知gRPC内部的,客户端可连接地址的信道next   int            // index of the next address to return for Get()waitCh chan struct{}  // the channel to block when there is no connected address availabledone   bool           // The Balancer is closed.
}
  • r是命名解析器,可以定义自己的命名解析器,如Etcd命名解析器。如果r为nil,那么Dial中参数target将直接作为可请求地址添加到addrs中。

  • w是命名解析器Resolve方法返回的watcher,该watcher可以监听命名解析器发来的地址信息变化,通知roundRobin对addrs中的地址进行动态的增删。

  • addrs是从命名解析器获取地址信息数组,数组中每个地址不仅有地址信息,还有gRPC与该地址是否已经创建了ready状态的连接的标记。

  • addrCh是地址数组的Channel,该Channel会在每次命名解析器发来地址信息变化后,将所有地址更新通知到gRPC内部的lbWatcher,lbWatcher是统一管理地址连接状态的协程,负责新地址的连接与被删除地址的关闭操作。

  • next是roundRobin的Index,即轮询调度遍历到addrs数组中的哪个位置了。

  • waitCh是当addrs中地址为空时,grpc调用Get()方法希望获取到一个到target的连接,如果设置了gRPC的failfast为false,那么Get()方法会阻塞在此Channel上,直到有ready的连接。

启动RoundRobin

启动RoundRobin就是实现Balancer接口的Start方法,该方法是由一开始通过grpc.WithBalancer把负载均衡器指定给的BalancerWrapperBuilder在创建BalancerWrapper时触发的:

func (bwb *balancerWrapperBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {// 这里触发Balancer的Start方法bwb.b.Start(opts.Target.Endpoint, BalancerConfig{DialCreds: opts.DialCreds,Dialer:    opts.Dialer,})_, pickfirst := bwb.b.(*pickFirst)bw := &balancerWrapper{......}cc.UpdateBalancerState(connectivity.Idle, bw)go bw.lbWatcher() // 监听Balancer 通知过来的地址变化return bw
}

Start方法其主要功能就是通过RoundRobin的命名解析器的Resolve方法拿到监听命名解析器后端变化的Watcher。与此同时还会新建一个addrChan用于向gRPC内部的lbWatcher推送Watcher监听到的地址变化。

func (rr *roundRobin) Start(target string, config BalancerConfig) error {rr.mu.Lock()defer rr.mu.Unlock()if rr.done {return ErrClientConnClosing}if rr.r == nil {// 如果没有解析器,那么直接将target加入addrs地址数组rr.addrs = append(rr.addrs, &addrInfo{addr: Address{Addr: target}})return nil}// Resolve接口会返回一个watcher,watcher可以监听解析器的地址变化w, err := rr.r.Resolve(target)if err != nil {return err}rr.w = w// 创建一个channel,当watcher监听到地址变化时,通知grpc内部lbWatcher去连接该地址rr.addrCh = make(chan []Address, 1)// go 创建新协程监听watcher,监听地址变化。go func() {for {if err := rr.watchAddrUpdates(); err != nil {return}}}()return nil
}

创建完addrCh后在Start方法最后会开启一个goroutine,这个goroutine会不停地循环调用watchAddrUpdates查询是否有命名解析器的Watcher传递过来的更新。

监听服务端地址的更新

watchAddrUpdates方法里就是通过上面Start方法里创建的Resolver Watcher的Next方法来监听Etcd上后端服务节点的更新,这个Watcher的实现就是上面服务发现章节里说的Etcd软件包里提供的gRPCWatcher类型,它的Next方法里会去通过监听Etcd上由服务名组成的Key的变化,然后在这里把这些信息传递给上面Start方法里创建好的addrChan通道。

func (rr *roundRobin) watchAddrUpdates() error {// watcher的next方法会阻塞,直至有地址变化信息过来,updates即为变化信息updates, err := rr.w.Next()if err != nil {return err}// 对于addrs地址数组的操作,显然是要加锁的,因为有多个goroutine在同时操作rr.mu.Lock()defer rr.mu.Unlock()for _, update := range updates {addr := Address{Addr:     update.Addr,Metadata: update.Metadata,}switch update.Op {case naming.Add://对于新增类型的地址,注意这里不会重复添加。var exist boolfor _, v := range rr.addrs {if addr == v.addr {exist = truebreak}}if exist {continue}rr.addrs = append(rr.addrs, &addrInfo{addr: addr})case naming.Delete://对于删除的地址,直接在addrs中删除就行了for i, v := range rr.addrs {if addr == v.addr {copy(rr.addrs[i:], rr.addrs[i+1:])rr.addrs = rr.addrs[:len(rr.addrs)-1]break}}default:grpclog.Errorln("Unknown update.Op ", update.Op)}}// 这里复制了整个addrs地址数组,然后丢到addrCh channel中通知grpc内部lbWatcher,// lbWatcher会关闭删除的地址,连接新增的地址。// 连接ready后会有专门的goroutine调用Up方法修改addrs中地址的状态。open := make([]Address, len(rr.addrs))for i, v := range rr.addrs {open[i] = v.addr}if rr.done {return ErrClientConnClosing}select {case <-rr.addrCh:default:}rr.addrCh <- openreturn nil
}

建立连接

Up方法是gRPC内部负载均衡的watcher调用的,该watcher会读全局的连接状态队列,改变RoundRobin维护的连接列表的里连接的状态 (会有单独的goroutine向目标服务发起连接尝试,尝试成功后才会把连接对象的连接状态改为connected),如果是已连接状态的连接 ,会调用Up方法来改变addrs地址数组中该地址的状态为已连接

func (rr *roundRobin) Up(addr Address) func(error) {rr.mu.Lock()defer rr.mu.Unlock()var cnt int//将地址数组中的addr置为已连接状态,这样这个地址就可以被client使用了。for _, a := range rr.addrs {if a.addr == addr {if a.connected {return nil}a.connected = true}if a.connected {cnt++}}// 当有一个可用地址时,之前可能是0个,可能要很多client阻塞在获取连接地址上,这里通知所有的client有可用连接啦。// 为什么只等于1时通知?因为可用地址数量>1时,client是不会阻塞的。if cnt == 1 && rr.waitCh != nil {close(rr.waitCh)rr.waitCh = nil}//返回禁用该地址的方法return func(err error) {rr.down(addr, err)}
}

关闭连接

关闭连接使用的是Down方法,这个方法就简单, 直接找到addr置为不可用就行了。

func (rr *roundRobin) down(addr Address, err error) {rr.mu.Lock()defer rr.mu.Unlock()for _, a := range rr.addrs {if addr == a.addr {a.connected = falsebreak}}
}

客户端获取连接

客户端在调用gRPC具体MethodInvoke方法里,会去RoundRobin的连接池addrs里获取连接,如果addrs为空,或者addrs里的地址都不可用,Get()方法会返回错误。但是如果设置了failfast = falseGet()方法会阻塞在waitCh这个通道上,直至Up方法给到通知,然后轮询调度可用的地址。

func (rr *roundRobin) Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) {var ch chan struct{}rr.mu.Lock()if rr.done {rr.mu.Unlock()err = ErrClientConnClosingreturn}if len(rr.addrs) > 0 {// addrs的长度可能变化,如果next值超出了,就置为0,从头开始调度。if rr.next >= len(rr.addrs) {rr.next = 0}next := rr.next//遍历整个addrs数组,直到选出一个可用的地址for {a := rr.addrs[next]// next值加一,当然是循环的,到len(addrs)后,变为0next = (next + 1) % len(rr.addrs)if a.connected {addr = a.addrrr.next = nextrr.mu.Unlock()return}if next == rr.next {// 遍历完一圈了,还没找到,走下面逻辑break}}}if !opts.BlockingWait { //如果是非阻塞模式,如果没有可用地址,那么报错if len(rr.addrs) == 0 {rr.mu.Unlock()err = status.Errorf(codes.Unavailable, "there is no address available")return}// Returns the next addr on rr.addrs for failfast RPCs.addr = rr.addrs[rr.next].addrrr.next++rr.mu.Unlock()return}// Wait on rr.waitCh for non-failfast RPCs.// 如果是阻塞模式,那么需要阻塞在waitCh上,直到Up方法给通知if rr.waitCh == nil {ch = make(chan struct{})rr.waitCh = ch} else {ch = rr.waitCh}rr.mu.Unlock()for {select {case <-ctx.Done():err = ctx.Err()returncase <-ch:rr.mu.Lock()if rr.done {rr.mu.Unlock()err = ErrClientConnClosingreturn}if len(rr.addrs) > 0 {if rr.next >= len(rr.addrs) {rr.next = 0}next := rr.nextfor {a := rr.addrs[next]next = (next + 1) % len(rr.addrs)if a.connected {addr = a.addrrr.next = nextrr.mu.Unlock()return}if next == rr.next {// 遍历完一圈了,还没找到,可能刚Up的地址被down掉了,重新等待。break}}}// The newly added addr got removed by Down() again.if rr.waitCh == nil {ch = make(chan struct{})rr.waitCh = ch} else {ch = rr.waitCh}rr.mu.Unlock()}}
}

总结

整个gRPC基于Etcd实现服务注册/发现以及负载均衡的流程和关键的源码实现就梳理完了,其实源码实现的细节远比我这里列举的要复杂,这篇文章的目的也是希望能记录下一学习和实践gRPC的负载均衡和服务解析时的一些关键路径。另外需要注意的是本文里使用的是gRPC v1.2.x的代码,在1.3版本后官方包重新调整了目录和包名,与本文里列举的源码以及Balancer的使用上都会有些出入,不过原理还是大致一样的,只不过每一版都一直在此基础上演进。

- END -

关注公众号,获取更多精选技术原创文章

gRPC服务注册发现及负载均衡的实现方案与源码解析相关推荐

  1. e盾服务端源码_gRPC服务注册发现及负载均衡的实现方案与源码解析

    今天聊一下gRPC的服务发现和负载均衡原理相关的话题,不同于Nginx.Lvs或者F5这些服务端的负载均衡策略,gRPC采用的是客户端实现的负载均衡.什么意思呢,对于使用服务端负载均衡的系统,客户端会 ...

  2. RocketMQ:消费端的消息消息队列负载均衡与重新发布机制源码解析

    文章目录 前言 流程解析 总结 前言 在上一篇博客中我们了解到,PullMessageService线程主要是负责从pullRequestQueue中获得拉取消息请求并进行请求处理的. PullMes ...

  3. marathon的高可用服务自动发现和负载均衡

    上一篇我们说谈了docker+zookeeper+mesos+marathon集群,本篇我们来谈谈marathon的集群和自动发现服务. marathon的服务自动发现和负载均衡有两种,1是mesos ...

  4. 云原生服务网格Istio:原理、实践、架构与源码解析

    华为云原生团队600多页的Istio实战精华总结,云原生服务网格Istio:原理.实践.架构与源码解析的电子书. 图书介绍 <云原生服务网格Istio:原理.实践.架构与源码解析>分为原理 ...

  5. Nacos作为服务注册中心及负载均衡、服务流量权重设置

    如果我的博客对你有帮助,欢迎进行评论✏️✏️.点赞

  6. Spring RSocket:基于服务注册发现的 RSocket 负载均衡

    作者 | 雷卷 来源|阿里巴巴云原生公众号 RSocket 分布式通讯协议是 Spring Reactive 的核心内容,从 Spring Framework 5.2 开始,RSocket 已经是 S ...

  7. 服务发现与负载均衡 dubbo zk原理

    服务发现与负载均衡 拓展阅读 : dubbo 原理概念图 2016-03-03 杜亦舒 性能与架构 性能与架构 性能与架构 微信号 yogoup 功能介绍 网站性能提升与架构设计 内容整理自文章&qu ...

  8. 【详解】Ribbon 负载均衡服务调用原理及默认轮询负载均衡算法源码解析、手写

    Ribbon 负载均衡服务调用 一.什么是 Ribbon 二.LB负载均衡(Load Balancer)是什么 1.Ribbon 本地负载均衡客户端 VS Nginx 服务端负载均衡的区别 2.LB负 ...

  9. [云原生专题-33]:K8S - 核心概念 - 服务Service管理、服务发现、负载均衡

    作者主页(文火冰糖的硅基工坊):文火冰糖(王文兵)的博客_文火冰糖的硅基工坊_CSDN博客 本文网址:https://blog.csdn.net/HiWangWenBing/article/detai ...

最新文章

  1. Java 原子变量类
  2. SendMessage、PostMessage原理
  3. PyCharm中目录directory与包package的区别
  4. @Transactional 使用
  5. 后台UI专辑模板有这些就够了!
  6. linux netstat
  7. 一套完整自定义工作流的实现
  8. 更改自定义按钮显示值并对单元格赋值
  9. Physics Bodies(中文翻译)—UE4官方文档
  10. 【22】基于java的电影院售票管理系统
  11. vector2Drawable(批量将png图片转换成android使用的矢量图 )
  12. Delphi微信公众号开发
  13. postgres 命令行建数据库表_PostgreSQL 创建表格
  14. 大数据基础数据之中国法定节假日API
  15. Linux 容器化技术详解(虚拟化、容器化、Docker)
  16. 如何使用OBS开启直播
  17. 死亡细胞1.9最新辅助
  18. 前端基础 CSS 第十一章 使用CSS样式表 特效属性部分 ----暑假学习第九天
  19. 怎么发表计算机论文,潮州发表计算机论文写作方法,怎么发表论文
  20. mysql的报错日志哪里看_mysql错误日志

热门文章

  1. JavaScript文本特效实例小结【3个示例】
  2. 使用 ale.js 制作一个小而美的表格编辑器(1)
  3. 撩课-Web大前端每天5道面试题-Day35
  4. 在sphinx中处理使用特殊字符时所引起错误的办法
  5. 探寻完美 之 JavaScript继承
  6. 保持windows2003域控制器的安全
  7. [CTSC2008]网络管理Network
  8. 与女儿谈商业模式 (4):戴尔的成功秘诀
  9. Lock-Free 编程
  10. Uploadify 配置错误信息提示