今天聊一下gRPC的服务发现和负载均衡原理相关的话题,不同于NginxLvs或者F5这些服务端的负载均衡策略,gRPC采用的是客户端实现的负载均衡。什么意思呢,对于使用服务端负载均衡的系统,客户端会首先访问负载均衡的域名/IP,再由负载均衡按照策略分发请求到后端具体某个服务节点上。而对于客户端的负载均衡则是,客户端从可用的后端服务节点列表中根据自己的负载均衡策略选择一个节点直连后端服务器。

Etcd软件包的naming组件里提供了一个命名解析器(naming resolver)结合gRPC本身自带的RoundRobin 轮询调度负载均衡器,让使用者能方便地搭建起一套服务注册/发现和负载均衡体系。如果轮询调度满足不了调度需求或者不想使用Etcd作为服务的注册中心和命名解析器的话,可以通过写代码实现gRPC定义的ResolverBalancer接口来满足系统的自定义需求。

本文引用的源码对应的版本为:gRPC v1.2.x、 Etcd v3.3
如果你对gRPC和Etcd还不了解,可以先看看我很早之前写的gRPC入门和Etcd入门 系列的文章。

gRPC服务注册发现

先来简单的说明一下用Etcd实现服务注册和发现的原理。服务注册和发现这个流程可以用下面这个示意图简单描述出来:

上图的服务A包含了两个节点,服务在节点上启动后,会以包含服务名加节点IP的唯一标识作为Key(比如/service/a/114.128.45.117),服务节点IP和端口信息作为值存储到Etcd上。这些Key都是带租约的Key,需要我们的服务自己去定期续租,一旦服务节点本身宕掉,比如node2上的服务宕掉,无法完成续租后,那么它对应的Key:/service/a/114.128.45.117 就会过期,客户端也就无法再从Etcd上获取到这个服务节点的信息了。

与此同时客户端也会利用EtcdWatch功能监听以/servive/a为前缀的所有Key的变化,如果有新增或者删除节点Key的事件发生Etcd都会通过WatchChan发送给客户端,WatchChan在编程语言上的实现就是GoChannel

服务注册

关于Etcd的服务注册,官方提供的软件包里并没有提供统一的注册函数供调用。那么我们在新增服务节点后怎么把节点的信息存储到Etcd上并通知给命名解析器呢?在Etcd源码包的naming/grpc.go里可以发现提供了一个Update方法,这个Update既能执行添加也能执行删除操作:

func (gr *GRPCResolver) Update(ctx context.Context, target string, nm naming.Update, opts ...etcd.OpOption) (err error) {switch nm.Op {case naming.Add:var v []byteif v, err = json.Marshal(nm); err != nil {return status.Error(codes.InvalidArgument, err.Error())}_, err = gr.Client.KV.Put(ctx, target+"/"+nm.Addr, string(v), opts...)case naming.Delete:_, err = gr.Client.Delete(ctx, target+"/"+nm.Addr, opts...)default:return status.Error(codes.InvalidArgument, "naming: bad naming op")}return err
}

服务在启动完成后可以通过Update方法把自己的服务地址和端口Put到自定义的target为前缀的key里,针对上面图示里的例子,变量target就应该是我们定义的服务名/service/a。一般在具体实践里都是自己根据系统的需求封装Update方法完成服务注册,以及服务节点Key在Etcd上的定期续租,这块每个公司的实践都不一样,我就不放具体的代码了,一般续租都是通过Etcd租约里的KeepAlive方法实现的(Lease.KeepAlive)。

服务发现

在注册完新节点、或者是原来的节点停掉后,客户端是怎么知道的呢?这块就需要命名解析器Resolver来帮助实现了,Resolver的作用可以理解为从一个字符串映射到一组IP端口等信息。

gRPC对Resolver的接口定义如下:

type Resolver interface {// Resolve creates a Watcher for target.Resolve(target string) (Watcher, error)
}

命名解析器的Resolve方法会返回一个Watcher,这个Watcher可以监听命名解析器发来的target(类似上面例子里说的与服务名相对应的Key)对应的后端服务器地址信息变化,通知Balancer对自己维护的地址进行动态地增删。

Watcher接口的定义如下:

//源码地址 https://github.com/grpc/grpc-go/blob/v1.2.x/naming/naming.go
type Watcher interface {Next() ([]*Update, error)// Close closes the Watcher.Close()
}

Etcd为这两个接口都提供了实现:

// 源码地址:https://github.com/etcd-io/etcd/blob/release-3.3/clientv3/naming/grpc.go// GRPCResolver 实现了grpc的naming.Resolver接口
type GRPCResolver struct {// Client is an initialized etcd client.Client *etcd.Client
}func (gr *GRPCResolver) Resolve(target string) (naming.Watcher, error) {ctx, cancel := context.WithCancel(context.Background())w := &gRPCWatcher{c: gr.Client, target: target + "/", ctx: ctx, cancel: cancel}return w, nil
}// 实现了grpc的naming.Watcher接口
type gRPCWatcher struct {c      *etcd.Clienttarget stringctx    context.Contextcancel context.CancelFuncwch    etcd.WatchChanerr    error
}func (gw *gRPCWatcher) Next() ([]*naming.Update, error) {if gw.wch == nil {// first Next() returns all addressesreturn gw.firstNext()}// process new events on target/*wr, ok := <-gw.wchif !ok {...updates := make([]*naming.Update, 0, len(wr.Events))for _, e := range wr.Events {var jupdate naming.Updatevar err errorswitch e.Type {case etcd.EventTypePut:err = json.Unmarshal(e.Kv.Value, &jupdate)jupdate.Op = naming.Addcase etcd.EventTypeDelete:err = json.Unmarshal(e.PrevKv.Value, &jupdate)jupdate.Op = naming.Deletedefault:continue}if err == nil {updates = append(updates, &jupdate)}}return updates, nil
}func (gw *gRPCWatcher) firstNext() ([]*naming.Update, error) {// 获取前缀为gw.target的所有Key的值,放到现有数组里resp, err := gw.c.Get(gw.ctx, gw.target, etcd.WithPrefix(), etcd.WithSerializable())if gw.err = err; err != nil {return nil, err}updates := make([]*naming.Update, 0, len(resp.Kvs))for _, kv := range resp.Kvs {var jupdate naming.Updateif err := json.Unmarshal(kv.Value, &jupdate); err != nil {continue}updates = append(updates, &jupdate)}opts := []etcd.OpOption{etcd.WithRev(resp.Header.Revision + 1), etcd.WithPrefix(), etcd.WithPrevKV()}// watch 监听这些Key的变化,包括前缀相同的新Key的加入gw.wch = gw.c.Watch(gw.ctx, gw.target, opts...)return updates, nil
}func (gw *gRPCWatcher) Close() { gw.cancel() }

这部分GRPCResolvergRPCWatcher类型的每个方法的功能和起到的作用都和RoundRobin这个gRPC Balancer结合地比较紧密,我准备放到下面和负载均衡的源码实现一起说明。

负载均衡

首先我们来看一下gRPC对负载均衡的接口定义:

type Balancer interface {Start(target string, config BalancerConfig) errorUp(addr Address) (down func(error))Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error)Notify() <-chan []Address// Close shuts down the balancer.Close() error
}

在gRPC 客户端与服务端之间建立连接时调用的Dail方法里可以用WithBalancer方法在DiaplOption里指定负载均衡组件:

client, err := etcd.Client()...resolver := &naming.GRPCResolver{Client: client}b := grpc.RoundRobin(resolver)opt0 := grpc.WithBalancer(b)grpc.Dial(target, opt0 , opt1, ...) // 后面省略了

上面的例子使用了gRPC自带的Balancer实现RoundRobin,RoundRobin除了实现了Balancer接口外自己内置了Resolver用来从名字获取其后绑定的IP信息以及服务的更新事件(增加删除服务节点这些事件) 。上面的例子里给RoundRobin指定了Etcd提供的name.GRPCResolver做为它的命名解析器,这个命名解析器就是上一节说的Etcd软件包里提供的gRPCnaming.Resolver接口实现。

RoundRobin

下面我们研究一下gRPC包里提供的RoundRobin代码实现,主要关注负载均衡和利用Resolver进行服务发现及节点更新这两个功能的代码实现原理

RoundRobin结构体定义如下:

// 源码在:https://github.com/grpc/grpc-go/blob/v1.2.x/balancer.go
type roundRobin struct {r      naming.Resolverw      naming.Watcheraddrs  []*addrInfo // 客户端可以尝试连接的所有地址mu     sync.MutexaddrCh chan []Address // 用于通知gRPC内部的,客户端可连接地址的信道next   int            // index of the next address to return for Get()waitCh chan struct{}  // the channel to block when there is no connected address availabledone   bool           // The Balancer is closed.
}

  • r是命名解析器,可以定义自己的命名解析器,如Etcd命名解析器。如果r为nil,那么Dial中参数target将直接作为可请求地址添加到addrs中。
  • w是命名解析器Resolve方法返回的watcher,该watcher可以监听命名解析器发来的地址信息变化,通知roundRobin对addrs中的地址进行动态的增删。
  • addrs是从命名解析器获取地址信息数组,数组中每个地址不仅有地址信息,还有gRPC与该地址是否已经创建了ready状态的连接的标记。
  • addrCh是地址数组的Channel,该Channel会在每次命名解析器发来地址信息变化后,将所有地址更新通知到gRPC内部的lbWatcher,lbWatcher是统一管理地址连接状态的协程,负责新地址的连接与被删除地址的关闭操作。
  • next是roundRobin的Index,即轮询调度遍历到addrs数组中的哪个位置了。
  • waitCh是当addrs中地址为空时,grpc调用Get()方法希望获取到一个到target的连接,如果设置了gRPC的failfast为false,那么Get()方法会阻塞在此Channel上,直到有ready的连接。

启动RoundRobin

启动RoundRobin就是实现Balancer接口的Start方法,该方法是由一开始通过grpc.WithBalancer把负载均衡器指定给的BalancerWrapperBuilder在创建BalancerWrapper时触发的:

func (bwb *balancerWrapperBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {// 这里触发Balancer的Start方法bwb.b.Start(opts.Target.Endpoint, BalancerConfig{DialCreds: opts.DialCreds,Dialer:    opts.Dialer,})_, pickfirst := bwb.b.(*pickFirst)bw := &balancerWrapper{......}cc.UpdateBalancerState(connectivity.Idle, bw)go bw.lbWatcher() // 监听Balancer 通知过来的地址变化return bw
}

Start方法其主要功能就是通过RoundRobin的命名解析器的Resolve方法拿到监听命名解析器后端变化的Watcher。与此同时还会新建一个addrChan用于向gRPC内部的lbWatcher推送Watcher监听到的地址变化。

func (rr *roundRobin) Start(target string, config BalancerConfig) error {rr.mu.Lock()defer rr.mu.Unlock()if rr.done {return ErrClientConnClosing}if rr.r == nil {// 如果没有解析器,那么直接将target加入addrs地址数组rr.addrs = append(rr.addrs, &addrInfo{addr: Address{Addr: target}})return nil}// Resolve接口会返回一个watcher,watcher可以监听解析器的地址变化w, err := rr.r.Resolve(target)if err != nil {return err}rr.w = w// 创建一个channel,当watcher监听到地址变化时,通知grpc内部lbWatcher去连接该地址rr.addrCh = make(chan []Address, 1)// go 创建新协程监听watcher,监听地址变化。go func() {for {if err := rr.watchAddrUpdates(); err != nil {return}}}()return nil
}

创建完addrCh后在Start方法最后会开启一个goroutine,这个goroutine会不停地循环调用watchAddrUpdates查询是否有命名解析器的Watcher传递过来的更新。

监听服务端地址的更新

watchAddrUpdates方法里就是通过上面Start方法里创建的Resolver Watcher的Next方法来监听Etcd上后端服务节点的更新,这个Watcher的实现就是上面服务发现章节里说的Etcd软件包里提供的gRPCWatcher类型,它的Next方法里会去通过监听Etcd上由服务名组成的Key的变化,然后在这里把这些信息传递给上面Start方法里创建好的addrChan通道。

func (rr *roundRobin) watchAddrUpdates() error {// watcher的next方法会阻塞,直至有地址变化信息过来,updates即为变化信息updates, err := rr.w.Next()if err != nil {return err}// 对于addrs地址数组的操作,显然是要加锁的,因为有多个goroutine在同时操作rr.mu.Lock()defer rr.mu.Unlock()for _, update := range updates {addr := Address{Addr:     update.Addr,Metadata: update.Metadata,}switch update.Op {case naming.Add://对于新增类型的地址,注意这里不会重复添加。var exist boolfor _, v := range rr.addrs {if addr == v.addr {exist = truebreak}}if exist {continue}rr.addrs = append(rr.addrs, &addrInfo{addr: addr})case naming.Delete://对于删除的地址,直接在addrs中删除就行了for i, v := range rr.addrs {if addr == v.addr {copy(rr.addrs[i:], rr.addrs[i+1:])rr.addrs = rr.addrs[:len(rr.addrs)-1]break}}default:grpclog.Errorln("Unknown update.Op ", update.Op)}}// 这里复制了整个addrs地址数组,然后丢到addrCh channel中通知grpc内部lbWatcher,// lbWatcher会关闭删除的地址,连接新增的地址。// 连接ready后会有专门的goroutine调用Up方法修改addrs中地址的状态。open := make([]Address, len(rr.addrs))for i, v := range rr.addrs {open[i] = v.addr}if rr.done {return ErrClientConnClosing}select {case <-rr.addrCh:default:}rr.addrCh <- openreturn nil
}

建立连接

Up方法是gRPC内部负载均衡的watcher调用的,该watcher会读全局的连接状态队列,改变RoundRobin维护的连接列表的里连接的状态 (会有单独的goroutine向目标服务发起连接尝试,尝试成功后才会把连接对象的连接状态改为connected),如果是已连接状态的连接 ,会调用Up方法来改变addrs地址数组中该地址的状态为已连接

func (rr *roundRobin) Up(addr Address) func(error) {rr.mu.Lock()defer rr.mu.Unlock()var cnt int//将地址数组中的addr置为已连接状态,这样这个地址就可以被client使用了。for _, a := range rr.addrs {if a.addr == addr {if a.connected {return nil}a.connected = true}if a.connected {cnt++}}// 当有一个可用地址时,之前可能是0个,可能要很多client阻塞在获取连接地址上,这里通知所有的client有可用连接啦。// 为什么只等于1时通知?因为可用地址数量>1时,client是不会阻塞的。if cnt == 1 && rr.waitCh != nil {close(rr.waitCh)rr.waitCh = nil}//返回禁用该地址的方法return func(err error) {rr.down(addr, err)}
}

关闭连接

关闭连接使用的是Down方法,这个方法就简单, 直接找到addr置为不可用就行了。

func (rr *roundRobin) down(addr Address, err error) {rr.mu.Lock()defer rr.mu.Unlock()for _, a := range rr.addrs {if addr == a.addr {a.connected = falsebreak}}
}

客户端获取连接

客户端在调用gRPC具体MethodInvoke方法里,会去RoundRobin的连接池addrs里获取连接,如果addrs为空,或者addrs里的地址都不可用,Get()方法会返回错误。但是如果设置了failfast = falseGet()方法会阻塞在waitCh这个通道上,直至Up方法给到通知,然后轮询调度可用的地址。

func (rr *roundRobin) Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) {var ch chan struct{}rr.mu.Lock()if rr.done {rr.mu.Unlock()err = ErrClientConnClosingreturn}if len(rr.addrs) > 0 {// addrs的长度可能变化,如果next值超出了,就置为0,从头开始调度。if rr.next >= len(rr.addrs) {rr.next = 0}next := rr.next//遍历整个addrs数组,直到选出一个可用的地址for {a := rr.addrs[next]// next值加一,当然是循环的,到len(addrs)后,变为0next = (next + 1) % len(rr.addrs)if a.connected {addr = a.addrrr.next = nextrr.mu.Unlock()return}if next == rr.next {// 遍历完一圈了,还没找到,走下面逻辑break}}}if !opts.BlockingWait { //如果是非阻塞模式,如果没有可用地址,那么报错if len(rr.addrs) == 0 {rr.mu.Unlock()err = status.Errorf(codes.Unavailable, "there is no address available")return}// Returns the next addr on rr.addrs for failfast RPCs.addr = rr.addrs[rr.next].addrrr.next++rr.mu.Unlock()return}// Wait on rr.waitCh for non-failfast RPCs.// 如果是阻塞模式,那么需要阻塞在waitCh上,直到Up方法给通知if rr.waitCh == nil {ch = make(chan struct{})rr.waitCh = ch} else {ch = rr.waitCh}rr.mu.Unlock()for {select {case <-ctx.Done():err = ctx.Err()returncase <-ch:rr.mu.Lock()if rr.done {rr.mu.Unlock()err = ErrClientConnClosingreturn}if len(rr.addrs) > 0 {if rr.next >= len(rr.addrs) {rr.next = 0}next := rr.nextfor {a := rr.addrs[next]next = (next + 1) % len(rr.addrs)if a.connected {addr = a.addrrr.next = nextrr.mu.Unlock()return}if next == rr.next {// 遍历完一圈了,还没找到,可能刚Up的地址被down掉了,重新等待。break}}}// The newly added addr got removed by Down() again.if rr.waitCh == nil {ch = make(chan struct{})rr.waitCh = ch} else {ch = rr.waitCh}rr.mu.Unlock()}}
}

总结

整个gRPC基于Etcd实现服务注册/发现以及负载均衡的流程和关键的源码实现就梳理完了,其实源码实现的细节远比我这里列举的要复杂,这篇文章的目的也是希望能记录下一学习和实践gRPC的负载均衡和服务解析时的一些关键路径。另外需要注意的是本文里使用的是gRPC v1.2.x的代码,在1.3版本后官方包重新调整了目录和包名,与本文里列举的源码以及Balancer的使用上都会有些出入,不过原理还是大致一样的,只不过每一版都一直在此基础上演进。

看到这里了,如果喜欢我的文章可以帮我点个赞,我会每周通过技术文章分享我的所学所见和第一手实践经验,感谢你的支持。微信搜索关注公众号「网管叨bi叨」第一时间获取我的文章推送。

e盾服务端源码_gRPC服务注册发现及负载均衡的实现方案与源码解析相关推荐

  1. gRPC服务注册发现及负载均衡的实现方案与源码解析

    今天聊一下gRPC的服务发现和负载均衡原理相关的话题,不同于Nginx.Lvs或者F5这些服务端的负载均衡策略,gRPC采用的是客户端实现的负载均衡.什么意思呢,对于使用服务端负载均衡的系统,客户端会 ...

  2. 【学习日记2023.6.9】之 SpringCloud入门(认识微服务_服务拆分和远程调用RestTemplate_Eureka注册中心_Ribbon负载均衡_Nacos注册中心)

    文章目录 SpringCloud 1. 认识微服务 1.1 单体架构 1.2 分布式架构 1.3 微服务 1.4 SpringCloud 1.5 总结 2. 服务拆分和远程调用 2.1 服务拆分原则 ...

  3. 服务发现与负载均衡 dubbo zk原理

    服务发现与负载均衡 拓展阅读 : dubbo 原理概念图 2016-03-03 杜亦舒 性能与架构 性能与架构 性能与架构 微信号 yogoup 功能介绍 网站性能提升与架构设计 内容整理自文章&qu ...

  4. EJB3.0学习笔记---理解远程调用服务端和本地调用服务端的区别

    项目目的:理解远程调用服务端和本地调用服务端的区别 1.异常:       javax.ejb.EJBException: Local and Remote Interfaces cannot hav ...

  5. Netty即是服务端又是客户端,服务端和客户端相互对应

    Netty即是服务端又是客户端,服务端和客户端相互对应. 具体功能细节是: 上游有一个服务,会主动发送消息给我中间件平台.中间件平台既有服务端也有客户端.通过下游客户端连接进来的客户端和中间件传下去的 ...

  6. Asp.net webApi 通过WebSocket推送消息给客户端,搭建一个即是服务端又是客户端的服务

    Asp.net webApi 通过WebSocket推送消息给客户端,搭建一个即是服务端又是客户端的服务_IT_ziliang的博客-CSDN博客 WebSocket是一种在单个TCP连接上进行全双工 ...

  7. Dubbo 框架设计与源码解读(配置解析优先级、线程分配、负载均衡、容错方案)

    整体框架设计 图例说明: 图中左边淡蓝背景的为服务消费⽅使⽤的接⼝,右边淡绿⾊背景的为服务提供⽅使⽤的接⼝,位于中轴线上的为双⽅都⽤到的接⼝. 图中从下⾄上分为⼗层,各层均为单向依赖,右边的⿊⾊箭头代 ...

  8. java服务架构 之MGW(美团点评高性能四层负载均衡)

    为什么80%的码农都做不了架构师?>>>    本文整理自美团点评技术沙龙第14期:美团背后的故事-你不知道的美团云. 美团点评技术沙龙由美团点评技术团队主办,每月一期.每期沙龙邀请 ...

  9. 【微服务】Eureka+Ribbon实现注册中心与负载均衡

    文章目录 前言 1.微服务引入 1.1.相关概念 1.2.软件架构的演进 1.2.1.单体架构 1.2.2.垂直架构 1.2.3.分布式架构 1.2.4.SOA架构 1.2.5.微服务架构 1.3.S ...

最新文章

  1. pytorch 与 numpy 的数组广播机制
  2. [导入][转]好企业是什么样?
  3. 界面原型创建工具Axure 基本操作
  4. java aar 文件,将本地.aar文件添加到我的gradle构建中
  5. SICP学习笔记(1.1.4~1.1.5)
  6. 弹性盒模型--新版与旧版比较(1)
  7. 使用maven构建的Spring boot项目在开始搭建的时候出的一些错误
  8. python串口数据绘图_使用Python串口实时显示数据并绘图的例子
  9. jquery获取设置元素宽高位置height()、width()、offset()、position()、scrollTop()、scrollLeft()
  10. v3是c语言吗 yolo_你真的明白yolo v3吗?
  11. AIX添加ASM的裸盘
  12. 三角函数正交性理解与Matlab分析
  13. c语言编程 等边三角形图形,c语言问题 打印图形,菜单包括:直角三角形、等腰三角形,输入图形...,c语言编程 打印图形,菜单包括:矩形,平行四边形,输入图形的...
  14. mysql数据库安装错误报错Falled不成功,感叹号
  15. treeset可以重复吗_买了好几份意外险,可以重复理赔吗?
  16. 【后缀自动机】Luogu P3975 [TJOI2015]弦论题解
  17. ...mapMutations的使用
  18. 计算机表示法是知识表示法么,知识表示方法比较.pdf
  19. 鼠标不听使唤,在屏幕上乱窜乱动怎么办?
  20. Mono及MonoDevelop介绍与安装

热门文章

  1. Django08-1:模型层(ORM)--聚合查询/分组查询/F与Q查询/开启事务/常用字段及参数/自定义字段/数据库查询优化
  2. Git 操作笔记/pip换源
  3. Redis 数据持久化的方案的实现
  4. Flutter之Container
  5. Android之如何用dextra.ELF64查看安卓手机“设置“图标的源代码
  6. linux之用一张图片说明文件系统结构
  7. Tree的前序序列化
  8. 我使用 html 反向输出自己打自己(7)
  9. 一个几何级数的无限和思考
  10. 看了《隐秘的角落》才知道,掉头发有多可怕!10个掉头发最快的专业!快看看你中枪了没有!...