介绍

下面介绍 jupiter-0.2.7 版本中 grpc 通过 etcd 实现服务发现与注册。

服务发现与注册的实现解析

服务注册

服务注册的流程图:

etcd的服务注册代码模块在 jupiter/pkg/registry/etcdv3 中。

下面让我们来看看实际的代码

// Registry register/unregister service

// registry impl should control rpc timeout

type Registry interface {

RegisterService(context.Context, *server.ServiceInfo) error

UnregisterService(context.Context, *server.ServiceInfo) error

ListServices(context.Context, string, string) ([]*server.ServiceInfo, error)

WatchServices(context.Context, string, string) (chan Endpoints, error)

io.Closer

}

复制代码

在 pkg/registry/registry.go 中定义了注册服务对象的接口。不同的服务只要实现了这些接口,jupiter 就能使用。

首先我们来看看注册方法

// RegisterService register service to registry

func (reg *etcdv3Registry) RegisterService(ctx context.Context, info *server.ServiceInfo) error {

err := reg.registerBiz(ctx, info)

...

}

// 业务信息注册

func (reg *etcdv3Registry) registerBiz(ctx context.Context, info *server.ServiceInfo) error {

...

// 提交信息到 etcd

_, err := reg.client.Put(readCtx, key, val, opOptions...)

...

}

复制代码

这里主要的部分是 reg.client.Put()  将服务信息提交到 etcd 中。其中的租约机制我会在之后单独写一篇文章介绍。这里主要还是关注如何注册。

源码中还有个 registerMetric() 方法,这个方法的目的是将服务信息在提交到etcd的 prometheus 前缀目录下,用于服务监控,用的也是 client.Put() 方法。这里具体就不展示代码了,感兴趣的同学可以去源码库中查看。

服务退出

// 删除服务

func (reg *etcdv3Registry) unregister(ctx context.Context, key string) error {

...

// 删除服务信息

_, err := reg.client.Delete(ctx, key)

...

}

复制代码

这里通过 client.Delete()  方法将服务信息从 etcd 中删除掉。

获取服务列表

// ListServices list service registered in registry with name `name`

func (reg *etcdv3Registry) ListServices(ctx context.Context, name string, scheme string) (services []*server.ServiceInfo, err error) {

// 服务信息key的前缀

target := fmt.Sprintf("/%s/%s/providers/%s://", reg.Prefix, name, scheme)

// 获取相关前缀的所有信息

getResp, getErr := reg.client.Get(ctx, target, clientv3.WithPrefix())

...

}

复制代码

通过 client.Get()  方法获取到相同前缀的服务信息。

服务信息变动监控

// WatchServices watch service change event, then return address list

func (reg *etcdv3Registry) WatchServices(ctx context.Context, name string, scheme string) (chan registry.Endpoints, error) {

prefix := fmt.Sprintf("/%s/%s/", reg.Prefix, name)

// 通过etcd客户端创建一个监控通道

watch, err := reg.client.WatchPrefix(context.Background(), prefix)

if err != nil {

return nil, err

}

...

xgo.Go(func() {

// 不断接收etcd发送过来的变动事件

for event := range watch.C() {

switch event.Type {

case mvccpb.PUT:

updateAddrList(al, prefix, scheme, event.Kv)

case mvccpb.DELETE:

deleteAddrList(al, prefix, scheme, event.Kv)

}

out := al.DeepCopy()

fmt.Printf("al => %p\n", al.Nodes)

fmt.Printf("snapshot => %p\n", out.Nodes)

select {

// 将更新后的服务信息发送出去,接收方是 resolver

case addresses

default:

xlog.Warnf("invalid")

}

}

})

// 返回一个地址通道,用于传递

return addresses, nil

}

复制代码

WatchServices()  方法主要是监控信息的变动事件,并将变动后的服务信息重新返回给 resolver。具体思路是通过 etcdClient.Watch()  方法创建一个监控通道,然后放入一个 goroutine来不断接收 etcd 推送过来的事件,维护本地的服务信息,并通过 resolver 最终返回到 grpclb 负载均衡器进行服务地址信息的更新。

服务发现

服务发现流程图:

grpc 的 resolver 模块定义了两个接口

// Builder creates a resolver that will be used to watch name resolution updates.

type Builder interface {

Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)

Scheme() string

}

// Resolver watches for the updates on the specified target.

// Updates include address updates and service config updates.

type Resolver interface {

ResolveNow(ResolveNowOptions)

Close()

}

复制代码

首先我们来看看 Builder 接口的具体实现

type baseBuilder struct {

name string

reg  registry.Registry

}

// Build ...

func (b *baseBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {

endpoints, err := b.reg.WatchServices(context.Background(), target.Endpoint, "grpc")

if err != nil {

return nil, err

}

var stop = make(chan struct{})

xgo.Go(func() {

for {

select {

case endpoint :=

var state = resolver.State{

Addresses: make([]resolver.Address, 0),

...

}

for _, node := range endpoint.Nodes {

...

state.Addresses = append(state.Addresses, address)

}

cc.UpdateState(state)

case

return

}

}

})

return &baseResolver{

stop: stop,

}, nil

}

复制代码

这里Build 方法主要是通过 Registry 模块获得监控服务通道,然后将更新的服务信息再更新到 grpcClient 中去,保证 grpcClient 的负载均衡器的服务地址永远都是最新的。

如何将Builder的具体实现注册到 grpc 中

import "google.golang.org/grpc/resolver"

// Register ...

func Register(name string, reg registry.Registry) {

resolver.Register(&baseBuilder{

name: name,

reg:  reg,

})

}

复制代码

将 Registry模块注入到 Builder 对象中,然后注入到 grpc 的 resolver 模块中去。这样 grpcClient 在实际运行中就会调用 etcd 的服务发现功能了。

grpc 如何使用服务与发现的源码解析

这里在介绍一下jupiter框架在实际项目中如何使用服务发现与注册。

服务注册

func (app *Application) startServers() error {

var eg errgroup.Group

// start multi servers

for _, s := range app.servers {

s := s

eg.Go(func() (err error) {

_ = app.registerer.RegisterService(context.TODO(), s.Info())

defer app.registerer.UnregisterService(context.TODO(), s.Info())

...

})

}

return eg.Wait()

}

eng := engine.NewEngine()

eng.SetRegistry(compound_registry.New(

etcdv3_registry.StdConfig("default").Build(),

))

复制代码

在框架的 Application 模块中已经实现了服务的自动注册与删除。一般使用框架时不需要再调用。项目使用中只需要在创建 Application 对象时,将注册中心信息注入即可。

服务发现

// 服务发现需要初始化,拿到etcd中服务的信息

func (eng *Engine) initResolver() error {

resolver.Register("etcd", etcdv3.StdConfig("default").Build())

return nil

}

复制代码

服务发现也是类型的将注册中心信息注入即可。

文章系列

java如何通过grpc连接etcd_grpc通过 etcd 实现服务发现与注册-源码分析相关推荐

  1. 基于JAVA融呗智慧金融微资讯移动平台服务端计算机毕业设计源码+数据库+lw文档+系统+部署

    基于JAVA融呗智慧金融微资讯移动平台服务端计算机毕业设计源码+数据库+lw文档+系统+部署 基于JAVA融呗智慧金融微资讯移动平台服务端计算机毕业设计源码+数据库+lw文档+系统+部署 本源码技术栈 ...

  2. Java Review - 线程池中使用ThreadLocal不当导致的内存泄漏案例源码分析

    文章目录 概述 Why 内存泄露 ? 在线程池中使用ThreadLocal导致的内存泄漏 概述 ThreadLocal的基本使用我们就不赘述了,可以参考 每日一博 - ThreadLocal VS I ...

  3. JAVA并发:并发工具类CountDownLatch、CyclicBarrier、Semaphore使用及源码分析

    在 JUC 下包含了一些常用的同步工具类,今天就来详细介绍一下,CountDownLatch,CyclicBarrier,Semaphore 的使用方法以及它们之间的区别. 1 CountDownLa ...

  4. 【Java】NIO中Selector的select方法源码分析

    该篇博客的有些内容和在之前介绍过了,在这里再次涉及到的就不详细说了,如果有不理解请看[Java]NIO中Channel的注册源码分析, [Java]NIO中Selector的创建源码分析 Select ...

  5. Java的三种代理模式完整源码分析

    Java的三种代理模式&完整源码分析 Java的三种代理模式&完整源码分析 参考资料: 博客园-Java的三种代理模式 简书-JDK动态代理-超详细源码分析 [博客园-WeakCach ...

  6. Java的三种代理模式【附源码分析】

    Java的三种代理模式&完整源码分析 代理模式分为两种,静态代理和动态代理,动态代理包括JDK动态代理和Cglib动态代理. 静态代理 静态代理在使用时,需要定义接口或者父类,被代理对象与代理 ...

  7. 【Netty源码分析摘录】(八)新连接的接入

    文章目录 1.问题 2.检测新连接接入 3.创建客户端 channel 4. 绑定 NioEventLoop 4.1 register0 4.1.1 doRegister() 4.1.2 pipeli ...

  8. etcd学习和实战:4、Java使用etcd实现服务发现和管理

    etcd学习和实战:4.Java使用etcd实现服务发现和管理 文章目录 etcd学习和实战:4.Java使用etcd实现服务发现和管理 1. 前言 2. 代码 2.1 服务注册 2.2 服务发现 2 ...

  9. GRPC golang版源码分析之客户端(一)

    Table of Contents 1. 前言 2. 源码目录浏览 3. 客户端 4. 相关链接 1 前言 grpc是一个通用的rpc框架,用google实现,当然也有go语言的版本.在工作中主要用到 ...

最新文章

  1. 又爱又恨的 Microsoft Edge!
  2. JS学习笔记(第五章)(String类型)
  3. Linux小工具(3)之/proc目录详细介绍(上)
  4. VTK:网格之InterpolateFieldDataDemo
  5. Dotnet Core使用特定的SDKRuntime版本
  6. linux mysql 备份脚本_linux下mysql备份脚本
  7. CentOS7编译安装ntp
  8. C#动态调用WCF接口,两种方式任你选。
  9. redis事务冲突问题 - 乐观锁和悲观锁
  10. Centos6.9编译安装nginx1.14.0
  11. Javascript运行环境
  12. 美团内部讲座 | 清华大学崔鹏:因果推断技术最新的发展趋势
  13. 苹果xsmax是什么接口_除了苹果,越来越多笔记本都带雷电3了,它凭什么号称最强接口?...
  14. h5页面跳转微信小程序页面
  15. 学好java应该读的几本书
  16. 火云开发课堂 - 《Shader从入门到精通》系列 第九节:在Shader中实现马赛克滤镜
  17. centos 解除链接_KeyShot 9.2 新功能介绍!(附下载链接)
  18. android七大主流Android音乐播放器横向评测
  19. Java版学生信息管理系统 附源码(JavaFX图形界面)
  20. 斐波那契回调线怎么画_斐波那契回调线的标准画法(建议收藏)

热门文章

  1. 当Java 8 Streams API不够用时
  2. 计划Java EE 7批处理作业
  3. 在Spring MVC应用程序中使用Bean Validation 1.1获得更好的错误消息
  4. 在Spring启动时与mongodb一起摇摆
  5. 通过JNI使用C ++尖叫快速进行Lucene搜索
  6. 番石榴的ListenableFuture
  7. RESTEasy教程第2部分:Spring集成
  8. latex 多行公式_Markdown中输入多行并列的公式
  9. vim 底行命令模式下的全局命令 g(global)
  10. Mybatis实现CRUD操作