项目源码:https://github.com/wwcd/grpc-lb

项目文档:https://segmentfault.com/a/1190000008672912

我们先把项目down下来,它的目录结构如下:

我们先去掉其他组件,单来看下gRPC的调用流程,下图是官方文档中的调用流程图:

  • 首先客户端(gRPC stub)调用A方法,发起RPC调用;

  • 对请求信息使用Protobuf进行对象序列化压缩;

  • 然后在服务端(gRPC Sever)接收到请求后,解码请求体,进行业务处理逻辑并返回;

  • 对响应结果使用Protobuf进行对象序列化压缩;

  • 客户端接收到服务端响应,解码请求体。回调被调用的A方法,唤醒正在等待响应(阻塞)的客户端调用并响应结果。

gRPC使用流程如图:

我们先定义一个helloworld.proto,然后自动用protoc-gen-go生成go代码。以下是proto:

service Greeter {rpc SayHello (HelloRequest) returns (HelloReply) {option (google.api.http) = {post: "/hello"body: "*"};}
}message HelloRequest {string name = 1;
}message HelloReply {string message = 1;
}

首先在服务启动后gRPC客户端向命名服务器发出名称解析请求,名称将解析为一个或多个IP地址,对应cli.go。

package mainimport ("context""flag""strconv""time""google.golang.org/grpc""google.golang.org/grpc/balancer/roundrobin""google.golang.org/grpc/resolver""github.com/sirupsen/logrus"pb "github.com/wwcd/grpc-lb/cmd/helloworld"grpclb "github.com/wwcd/grpc-lb/etcdv3"
)var (svc = flag.String("service", "hello_service", "service name")reg = flag.String("reg", "http://localhost:2379", "register etcd address")
)func main() {flag.Parse()r := grpclb.NewResolver(*reg, *svc)resolver.Register(r)ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)conn, err := grpc.DialContext(ctx, r.Scheme()+"://authority/"+*svc, grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name), grpc.WithBlock())cancel()if err != nil {panic(err)}ticker := time.NewTicker(1000 * time.Millisecond)for t := range ticker.C {client := pb.NewGreeterClient(conn)resp, err := client.SayHello(context.Background(), &pb.HelloRequest{Name: "world " + strconv.Itoa(t.Second())})if err == nil {logrus.Infof("%v: Reply is %s\n", t, resp.Message)}}
}

DialContext为给定目标创建客户端连接,方法默认是非阻塞的,即该功能不会等待建立连接,连接在后台进行,可以使用WithBlock()改为阻塞。非阻塞情况下,ctx不会对连接起作用只用作设置;阻塞情况可以使用ctx取消或终止挂起的连接。

NewTicker设置一个滴答时钟,用来调整时间间隔和发送速度,返回一个包含时间channel的结构体。声明一个变量接收已建立连接的客户端GreeterClient对象,客户端对象开始调用SayHello方法。

客户端调用方式我们大概有数了,对应服务端svr.go代码如下。

package mainimport ("context""flag""net""os""os/signal""syscall""time""google.golang.org/grpc""github.com/sirupsen/logrus"pb "github.com/wwcd/grpc-lb/cmd/helloworld"grpclb "github.com/wwcd/grpc-lb/etcdv3"
)var (serv = flag.String("service", "hello_service", "service name")host = flag.String("host", "localhost", "listening host")port = flag.String("port", "50001", "listening port")reg  = flag.String("reg", "http://localhost:2379", "register etcd address")
)func main() {flag.Parse()lis, err := net.Listen("tcp", net.JoinHostPort(*host, *port))if err != nil {panic(err)}err = grpclb.Register(*reg, *serv, *host, *port, time.Second*10, 15)if err != nil {panic(err)}ch := make(chan os.Signal, 1)signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGHUP, syscall.SIGQUIT)go func() {s := <-chlogrus.Infof("receive signal '%v'", s)grpclb.UnRegister()os.Exit(1)}()logrus.Infof("starting hello service at %s", *port)s := grpc.NewServer()pb.RegisterGreeterServer(s, &server{})s.Serve(lis)
}type server struct{}func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {logrus.Infof("%v: Receive is %s\n", time.Now(), in.Name)return &pb.HelloReply{Message: "Hello " + in.Name + " from " + net.JoinHostPort(*host, *port)}, nil
}

JoinHostPort将主机和端口组合成“host: port”形式的网络地址,用来作为监听对象;

Etcd中的Register方法将“host: port”形式的网络监听信息用KV形式存储,之后进行建立租约、存放键值、赋予租约永久有效操作。

NewServer创建一个gRPC服务器,该服务器没有注册服务,并且还没有开始接受请求,调用注册GreeterServer方法后,Server方法为每个监听的连接创建一个新的ServerTransport和service goroutine。在func SayHello中补充了业务逻辑处理。

客户端和服务端的逻辑大概了解了,我们知道gRPC是支持多路复用技术的,我们在网关处创建一个映射为空的ServeMux,然后将handlers处理程序注册到里面,对应gw.go。

package mainimport ("context""flag""net""net/http""time""github.com/grpc-ecosystem/grpc-gateway/runtime""github.com/sirupsen/logrus""google.golang.org/grpc""google.golang.org/grpc/balancer/roundrobin""google.golang.org/grpc/resolver"pb "github.com/wwcd/grpc-lb/cmd/helloworld"grpclb "github.com/wwcd/grpc-lb/etcdv3"
)var (svc  = flag.String("service", "hello_service", "service name")host = flag.String("host", "localhost", "listening host")port = flag.String("port", "60001", "listening port")reg  = flag.String("reg", "http://localhost:2379", "register etcd address")
)func main() {flag.Parse()r := grpclb.NewResolver(*reg, *svc)resolver.Register(r)ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)// https://github.com/grpc/grpc/blob/master/doc/naming.md// The gRPC client library will use the specified scheme to pick the right resolver plugin and pass it the fully qualified name string.conn, err := grpc.DialContext(ctx, r.Scheme()+"://authority/"+*svc, grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name), grpc.WithBlock())cancel()if err != nil {panic(err)}mux := runtime.NewServeMux()err = pb.RegisterGreeterHandler(ctx, mux, conn)if err != nil {panic(err)}// Start HTTP server (and proxy calls to gRPC server endpoint)logrus.Fatal(http.ListenAndServe(net.JoinHostPort(*host, *port), mux))
}

到此我们详细看了客户端及服务端的具体实现。在gRPC的设计文档中提供了服务注册及服务发现的思路,也为不同语言提供了命名解析和负载均衡接口供扩展。

其基本实现原理:

  1. gRPC客户端向命名服务器(resolver)发出名称解析请求,名称将解析为一个或者多个IP,每个IP标识它是服务器地址还是负载均衡器地址,以及标识要使用哪个客户端服务配置或负载均衡策略。

  2. 客户端实例化负载均衡策略,如果解析返回负载均衡地址,客户端将使用grpclb策略,否则客户端使用服务配置请求的负载均衡策略。

  3. 负载均衡策略为每个服务器地址创建一个子通道(channel)。

  4. 当有rpc请求时,负载均衡策略决定哪个子通道即grpc服务器将接收请求,当可用服务器为空时客户端的请求将被阻塞。

我们接着对基于Etcd3的服务发现部分代码进行深入,对应etcdv3 resolver.go中的watch(),方法中调用了etcd提供的clientv3.Watcher。

func (r *Resolver) watch(prefix string) {addrDict := make(map[string]resolver.Address)update := func() {addrList := make([]resolver.Address, 0, len(addrDict))for _, v := range addrDict {addrList = append(addrList, v)}r.cc.UpdateState(resolver.State{Addresses: addrList})}resp, err := r.cli.Get(context.Background(), prefix, clientv3.WithPrefix())if err == nil {for i := range resp.Kvs {addrDict[string(resp.Kvs[i].Value)] = resolver.Address{Addr: string(resp.Kvs[i].Value)}}}update()rch := r.cli.Watch(context.Background(), prefix, clientv3.WithPrefix(), clientv3.WithPrevKV())for n := range rch {for _, ev := range n.Events {switch ev.Type {case mvccpb.PUT:addrDict[string(ev.Kv.Key)] = resolver.Address{Addr: string(ev.Kv.Value)}case mvccpb.DELETE:delete(addrDict, string(ev.PrevKv.Key))}}update()}
}

主要方法在cli.Get()以及cli.Watch()方法,先来看cli.Get(),它的作用是检索key对应的value。

再来看cli.Watch()方法

启动测试程序:

*注: golang1.11以上版本进行测试*
# 分别启动服务端go run -mod vendor cmd/svr/svr.go -port 50001go run -mod vendor cmd/svr/svr.go -port 50002go run -mod vendor cmd/svr/svr.go -port 50003
# 启动客户端go run -mod vendor cmd/cli/cli.go# 启动grpc-gateway代理,提供HTTP-RESTful服务go run -mod vendor cmd/gw/gw.gocurl -X POST http://localhost:60001/hello -d '{"name": "fromGW"}'

运行三个服务端,一个客户端。经测试我们发现:

  • 各服务端接收的请求数相等;

  • 关闭一个服务端S1,请求会转到另外两个服务端;

  • 重启S1,请求会重新平均分到S1;

  • 关闭etcd3服务器,客户端与服务端仍正常通信,但新服务端不会注册进来,服务端掉线了也无法摘除掉;

  • 重新启动Etcd3服务器,服务端上下线可自动恢复正常;

  • 关闭所有服务端,客户端请求将被阻塞。

参考:

https://segmentfault.com/a/1190000008672912

https://www.grpc.io/doc

https://mp.weixin.qq.com/s/yWwbbBP-n1BjLwlCd_74RA

欢迎关注公众号才浅coding攻略,本人是热爱技术干货的程序媛,从事游戏及微服务后端开发,分享Go、微服务、云原生及Python、网络及算法等相关内容。日拱一卒。欢迎各位催更扯淡一条龙!

读猿码系列——1. gRPC+Etcd3的服务发现负载均衡相关推荐

  1. .net core grpc consul 实现服务注册 服务发现 负载均衡(二)

    在上一篇 .net core grpc 实现通信(一) 中,我们实现的grpc通信在.net core中的可行性,但要在微服务中真正使用,还缺少 服务注册,服务发现及负载均衡等,本篇我们将在 .net ...

  2. Spring读源码系列之AOP--03---aop底层基础类学习

    Spring读源码系列之AOP--03---aop底层基础类学习 引子 Spring AOP常用类解释 AopInfrastructureBean---免被AOP代理的标记接口 ProxyConfig ...

  3. nginx系列之八:使用upsync模块实现负载均衡

    ** 前言 ** nginx系列之一:nginx入门 nginx系列之二:配置文件解读 nginx系列之三:日志配置 nginx系列之四:web服务器 nginx系列之五: 负载均衡 nginx系列之 ...

  4. 服务网关Ocelot 入门Demo系列(01-Ocelot极简单Demo及负载均衡的配置)

    服务网关Ocelot 入门Demo系列(01-Ocelot极简单Demo及负载均衡的配置) 原文:服务网关Ocelot 入门Demo系列(01-Ocelot极简单Demo及负载均衡的配置) [前言] ...

  5. 扩展gRPC支持consul服务发现和Polly策略

    gRPC由于需要用工具生成代码实现,可开发性不是很高,在扩展这方面不是很友好 最近研究了下,进行了扩展,不需要额外的工具生成,直接使用默认Grpc.Tools生成的代理类即可 相关源码在文章底部 客户 ...

  6. 10x系列之Clay.io的服务发现

    本文讲的是10x系列之Clay.io的服务发现,[编者的话]Clay.io的Zoli Kahan撰写了"10X"系列博文,分享如何只使用一个很小的团队支撑Clay.io的大规模应用 ...

  7. Ambassador系列-03-服务配置和服务发现

    Ambassador 服务配置 Ambassador提供了三种服务配置方法. CRDs方式:Customer Resource Definitions 注解方式:Kubernetes Service ...

  8. 鸟枪换炮读源码系列之ArrayList(java11)

    经常用到ArrayList,知道和LinkedList的对比优缺点,但是没那么读过源码. 首先它继承了AbstractList,实现了List接口,RandomAccess接口(支持快速随机访问,查询 ...

  9. 干货实操:微服务Spring Cloud 系列(二) Eureka服务发现与服务注册(strand alone)

    此篇主要实操Eureka 服务端的服务注册,以及服务发现,并需要认证才能访问控制中心. 分五个部分说明: 一.  认识 Eureka 二.  Eureka  服务端开发 三.  Eureka 客户端开 ...

最新文章

  1. FortiGate基本信息
  2. R语言ggplot2可视化图例放置在图像底部(bottom)并分两行显示实战
  3. vc++ 显式链接dll
  4. python常用模块实例_python中常用的各种数据库操作模块和连接实例
  5. 作业收缴系统使用手册(自写开源小系统)
  6. idea编辑器关闭重复代码检查
  7. math.floor java_Java Math.floor() 方法
  8. java 虚基类_重拾C++之虚函数和虚基类以及抽象类
  9. android+read_logs这权限有什么用,READ_LOGS是正常或危险的Android权限吗?
  10. vb html table,VB6.0 如何是用 datatable
  11. laravel redis_php session 存储到redis里
  12. comsol积分函数_COMSOL教程- 巧用PDE、ODE耦合方程实现变量的时间积分或者空间微分...
  13. Linux下的Libsvm使用历程录
  14. JAVA 实现《中国象棋》游戏
  15. 微信小程序怎么做店铺_微信小程序如何开通店铺
  16. InnoDB数据恢复的工具——TwinDB介绍
  17. 在Visual Studio 2019 搭建 QGIS3.22 二次开发环境
  18. 计算机基础课程听课记录,听课记录-计算机应用基础
  19. 【MySQL综合练习1】
  20. LogicFlow(Vue3)

热门文章

  1. 张朝阳先生,您能不能踏实一点?
  2. GIS底层开发、GIS前端开发和GIS后端开发有什么区别?
  3. 【沐风老师】3dMax创建缝线插件StitchLines使用方法详解
  4. 组织机构对象模型设计及实现
  5. 传统企业怎样利用矩阵型组织结构转型?
  6. 投票助力源码_公众号投票源码_女神来了投票5.2.0
  7. 华为手机7个超实用的功能,关键时刻帮你大忙,赶紧开启吧!
  8. cocos2d-x 3 0 制作横版格斗游戏
  9. jmeter自定义java_【脚本开发】:在jmeter中使用自定义的java脚本
  10. JavaScript高级 |如何玩转箭头函数?