作者:Zarten知乎专栏:框架工具篇详解知乎ID: Zarten简介: 互联网一线工作者,尊重原创并欢迎评论留言指出不足之处,也希望多些关注和点赞是给作者最好的鼓励 !

1-概述

gRPC框架是一个性能很好的rpc框架,但框架中没有实现分布式服务器负载均衡的方法,只是给出实现方案,需要我们自己实现。官方推荐是客户端负载均衡的方案,也就是由客户端主动选择路由,这样的好处是不用给代理服务器造成压力。

分布式场景下一般采用etcd、consul、zookeeper等分布式系统,这里采用etcd,etcd是go语言实现,在github可看到开源的全部实现代码。

目前网上都是go语言写的关于基于etcd的grpc分布式服务器的实现,几乎没有python版的grpc服务器实现,于是本人就写了一个python版的服务器和不同客户端(python、go)版的全套实现方案。

注:全部完整代码已上传至github:Zartenc/grpc_etcd_ms_py

2-实现思想

2-1-服务器端实现思想

服务器端思想主旨是:每个gRPC服务器启动(上线)都会在etcd的key中注册自己在本机中对外暴露的ip-port,一旦gRPC服务停止(下线、意外挂掉、租约到期等)会在etcd中注销掉自己的信息。

2-2-客户端实现思想

客户端思想主旨是:只需连接到etcd服务器产生一个etcdClient对象,长期维护这个对象即可,通过这个对象的负载均衡策略可获取众多gRPC服务器中的其中一个进行连接。在这个etcdClient对象内部会对etcd的key添加一个监视(watch)并维护一个可用gRPC服务器信息的集合。

3-具体实现核心代码

前提已部署并启动etcd服务器,若没部署请参考这里。

3-1-服务器端实现核心代码

服务器端采用python编写。

由于采用etcd分布式框架,这里首先实现一个etcd客户端的类EtcdClient,类方法有:

  • get_values_by_key():通过etcd的key获取服务器信息。
  • put_values_by_key():添加服务器信息到etcd的key中。
class EtcdClient(etcd3.Etcd3Client):def get_values_by_key(self, key, **kwargs):values, _ = self.get(key, **kwargs)values_list = []if values is not None:try:values_list = json.loads(values.decode('utf-8'))if not isinstance(values_list, list):raise TypeError()except:raise Exception()return values_listdef put_values_by_key(self, key, values):if not isinstance(values, list):raise Exception()self.put(key, json.dumps(values))

再实现一个此服务器对etcd操作的类EtcdHandleServ,类方法有:

register_service():注册本机信息到etcd的key中。

logout_service:从etcd的key中注销本机信息。

class EtcdHandleServ():def __init__(self, service_port, etcd_ip, etcd_port, etcd_prefix):self.etcd_ip = etcd_ipself.etcd_port = etcd_portself.etcd_prefix = etcd_prefix# service_ip = get_outside_ip()service_ip = '127.0.0.1' #在本机机器作实验使用self.endpoint = f'{service_ip}:{service_port}'def register_service(self):etcd_client = EtcdClient(host=self.etcd_ip, port=self.etcd_port)key_name = f'{self.etcd_prefix}/grpc'with etcd_client.lock(key_name):value_list = etcd_client.get_values_by_key(key_name)if self.endpoint not in value_list:value_list.append(self.endpoint)etcd_client.put_values_by_key(key_name, value_list)def logout_service(self):etcd_client = EtcdClient(host=self.etcd_ip, port=self.etcd_port)key_name = f'{self.etcd_prefix}/grpc'with etcd_client.lock(key_name):value_list = etcd_client.get_values_by_key(key_name)if self.endpoint in value_list:value_list.remove(self.endpoint)etcd_client.put_values_by_key(key_name, value_list)

最后在主函数中进行相关的注册和注销操作并监控程序停止信号。

*注意:在docker启动的操作事项

在下面代码中,若服务器要在docker中启动需要考虑2个问题:

  • 1.在docker run命令启动时如何将参数从外部传到容器内?
  • 2.docker stop命令停止时如何程序内部如何接收停止信号?

上面2个问题共同之处是在Dockerfile文件中的ENTRYPOINT命令:

ENTRYPOINT ["python3", "-u", "main.py"]

在Dockerfile文件中启动命名使用ENTRYPOINT的exec模式,这样程序在容器内为1号进程,可接收停止信号(若为shell模式,也可处理,但麻烦一些,后面给出解决方案)。

docker启动时从外部传参,只需在后面跟上需要传入的参数即可:

docker run -d -p 65510:65510 zartenImage:v ----service_port 65510 --etcd_ip xxxx --etcd_prefix /zarten

上面给出了2个问题的解决方案,具体更全面的方案,请参考本人之前写的一篇docker使用详解的文章,并在文章最后的常见问题中有提到解决方案。

def main(service_ip, service_port, etcd_ip, etcd_port, etcd_prefix):print('***service is starting...')grpc_server = grpc.server(futures.ThreadPoolExecutor(max_workers=500))zarten_pb2_grpc.add_ZartenServicer_to_server(ZartenServ(), grpc_server)grpc_server.add_insecure_port(f'{service_ip}:{service_port}')grpc_server.start()etcd_handle_serv = EtcdHandleServ(service_port=service_port, etcd_ip=etcd_ip, etcd_port=etcd_port, etcd_prefix=etcd_prefix)etcd_handle_serv.register_service()event = threading.Event()def signal_handler(*args):etcd_handle_serv.logout_service()event.set()signal.signal(signal.SIGINT, signal_handler)signal.signal(signal.SIGTERM, signal_handler)print("***serveice started")try:while True:time.sleep(60 * 60 * 24)except KeyboardInterrupt:etcd_handle_serv.logout_service()grpc_server.stop(0)

最后采用命令方式启动。

# python main.py --service_port 65510 --etcd_ip xxxx --etcd_prefix /zarten
if __name__ == '__main__':parser = get_arguments_parser()args = parser.parse_args()main(args.service_ip, args.service_port, args.etcd_ip, args.etcd_port, args.etcd_prefix)

3-2-客户端实现核心代码

客户端采用python和go编写。也可采用其他语言实现,这里就不作展示了,知道实现思想即可自己实现。

gRPC客户端中的负载均衡是以每次调用为基础而不是以每个连接为基础,即只需维护一个连接对象,每次调用都是连接不同gRPC的服务器。官方文档是这样描述的:

“It is worth noting that load-balancing within gRPC happens on a per-call basis, not a per-connection basis. In other words, even if all requests come from a single client, we still want them to be load-balanced across all servers.”

3-2-1-python客户端

首先同样是一个连接etcd的客户端类,跟服务器端代码差不多。

class EtcdClient(etcd3.Etcd3Client):def get_values_by_key(self, key, **kwargs):values, _ = self.get(key, **kwargs)values_list = []if values is not None:try:values_list = json.loads(values.decode('utf-8'))if not isinstance(values_list, list):raise TypeError()except:raise Exception()return values_listdef put_values_by_key(self, key, values):if not isinstance(values, list):raise Exception()self.put(key, json.dumps(values))

其次是此客户端对etcd的操作类,采用单例模式,主要方法就是监视etcd并进行回调处理。

class EtcdHandleClient(EtcdClient):_singleton = Nonedef __new__(cls, *args, **kwargs):if not cls._singleton:cls._singleton = super().__new__(cls)return cls._singletondef __init__(self, etcd_ip, etcd_port, etcd_prefix):self.etcd_ip = etcd_ipself.etcd_port = etcd_portself.etcd_prefix = etcd_prefixsuper().__init__(host=etcd_ip, port=etcd_port)self.endpoints_list = self.get_values_by_key(f'{self.etcd_prefix}/grpc')self.watched_id = self.add_watch_callback(key=f'{self.etcd_prefix}/grpc', callback=self._update_endpoints)def __del__(self):self.cancel_watch(self.watched_id)def get_grpc_serv_ip(self):endpoints_nums = len(self.endpoints_list)if endpoints_nums <= 0:raise RuntimeError('No grpc services are available.Please notify the administrator to start the grpc service')select_id = random.randint(0, len(self.endpoints_list)-1)return self.endpoints_list[select_id]def _update_endpoints(self, watched_response):watched_event = watched_response.events[0]try:update_endpoint_list = json.loads(watched_event.value)if not isinstance(update_endpoint_list, list):raise TypeErrorexcept Exception as e:print(e)returnself.endpoints_list = update_endpoint_list

最后main函数中只需长期维护一个EtcdHandleClient对象即可。

def main():etcd_client = EtcdHandleClient(etcd_ip='xxxx', etcd_port=2379, etcd_prefix='/zarten')endpoint = etcd_client.get_grpc_serv_ip()print('endpoint:', endpoint)with grpc.insecure_channel(endpoint) as channel:stub = zarten_pb2_grpc.ZartenStub(channel)response = stub.GetInfo(zarten_pb2.ZartenRequest(zhihu_name='Zarten123'))print(f'receive response: {response}')

3-2-2-go客户端

go语言客户思想跟python一样,只是代码不同而已。

首先定义一个GrpcClient结构体,包括一个etcd连接对象和一个可用gRPC服务器信息数组。

type GrpcClient struct {Etcd3Client *clientv3.ClientGrpcEndpoints []string
}

其次是初始化GrpcClient结构体的函数NewGrpcClient(),此函数中会调用一个协程来监视etcd的变动。

GrpcClient结构体的方法只有一个GetRrpcServIp()对外开放函数来获取某个gRPC服务器的信息。

func NewGrpcClient(EtcdIp string, EtcdPort int, EtcdPrefix string) *GrpcClient{keyName := EtcdPrefix+"/grpc"grpcClient := new(GrpcClient)cli, err := clientv3.New(clientv3.Config{Endpoints:   []string{EtcdIp + ":" + strconv.Itoa(EtcdPort)},DialTimeout: 10 * time.Second,})if err != nil {log.Fatal(err)}res, err := cli.Get(context.Background(), keyName)if err != nil{log.Fatal(err)}for _, ev := range res.Kvs {endPoints := ev.Valueerr := json.Unmarshal(endPoints, &grpcClient.GrpcEndpoints)if err != nil{log.Fatal(err)}break}if len(grpcClient.GrpcEndpoints) <= 0{log.Fatal("No grpc services are available.Please notify the administrator to start the grpc service")}grpcClient.Etcd3Client = clirch := cli.Watch(context.Background(), keyName)go func() {for wresp := range rch {for _, ev := range wresp.Events {fmt.Printf("%s %q : %qn", ev.Type, ev.Kv.Key, ev.Kv.Value)mu.Lock()err := json.Unmarshal(ev.Kv.Value, &grpcClient.GrpcEndpoints)mu.Unlock()if err != nil{log.Fatal(err)}fmt.Println(grpcClient.GrpcEndpoints)}}}()return grpcClient
}func (g *GrpcClient) GetRrpcServIp() string{rand.Seed(time.Now().Unix())n := len(g.GrpcEndpoints)return g.GrpcEndpoints[rand.Intn(n)]
}

main函数中只需维护一个NewGrpcClient对象即可。

func main() {grpcClient := client_center.NewGrpcClient("xxxx", 2379, "/zarten")ip := grpcClient.GetRrpcServIp()fmt.Println(ip)conn, err := grpc.Dial(grpcClient.GetRrpcServIp(), grpc.WithInsecure(), grpc.WithBlock())if err != nil {log.Fatalf("did not connect: %v", err)}defer conn.Close()client := go_protoc.NewZartenClient(conn)res, _ := client.GetInfo(context.Background(), &go_protoc.ZartenRequest{ZhihuName:"zarten456"})fmt.Println(res.Name)fmt.Println(res.Homepage)}

4-演示结果

在本地机器启动3个服务器端,端口分别为65510、65511、65512.

多次调用python和go版的客户端,都是使用不同gRPC服务器并成功返回信息。部分截图如下所示:

5-总结

主旨思想是通过etcd来进行交互来共享所有的gRPC服务器信息。

在服务器端的实现为了能更加健全,还需考虑加入租约功能,此功能主要用于服务器端自身挂掉后无法及时通知etcd来注销掉自己信息。此功能以后有时间会在github中更新,敬请期待!

go使用grpc实现异步_(python、go)基于ETCD的gRPC分布式服务器实现详解相关推荐

  1. python模拟手写笔迹_Python实现基于KNN算法的笔迹识别功能详解

    本文实例讲述了Python实现基于KNN算法的笔迹识别功能.分享给大家供大家参考,具体如下: 需要用到: Numpy库 Pandas库 手写识别数据 点击此处本站下载. 数据说明: 数据共有785列, ...

  2. pythondifflib详解_用python标准库difflib比较两份文件的异同详解

    [需求背景] 有时候我们要对比两份配置文件是不是一样,或者比较两个文本是否异样,可以使用linux命令行工具diff a_file b_file,但是输出的结果读起来不是很友好.这时候使用python ...

  3. python os模块安装方法_基于python中pygame模块的Linux下安装过程(详解)

    一.使用pip安装Python包 大多数较新的Python版本都自带pip,因此首先可检查系统是否已经安装了pip.在Python3中,pip有时被称为pip3. 1.在Linux和OS X系统中检查 ...

  4. python 生成器装饰器_对Python生成器、装饰器、递归的使用详解

    1.Python生成器表达式 1).Python生成器表达式 语法格式: (expr for iter_var in iterable) (expr for iter_var in iterable ...

  5. python二元多次函数拟合_对python实现二维函数高次拟合的示例详解

    在参加"数据挖掘"比赛中遇到了关于函数高次拟合的问题,然后就整理了一下源码,以便后期的学习与改进. 在本次"数据挖掘"比赛中感觉收获最大的还是对于神经网络的认识 ...

  6. python数组相减_对Python 中矩阵或者数组相减的法则详解

    对Python 中矩阵或者数组相减的法则详解 最近在做编程练习,发现有些结果的值与答案相差较大,通过分析比较得出结论,大概过程如下: 定义了一个计算损失的函数: def error(yhat,labe ...

  7. python telnet线程锁_对python使用telnet实现弱密码登录的方法详解

    系统环境: 64位win7企业版 python2.7.10 2016.08.16修改内容: 1)read_until()函数是可以设置timeout的,之前不能获取到password之后的返回是因为调 ...

  8. 百家号 python高手养成_【一点资讯】Python使用PyQt5进行图形界面GUI编程之详解QWidget类的坐标体系 www.yidianzixun.com...

    Python使用PyQt5进行图形界面GUI编程之详解QWidget类的坐标体系 上篇内容,我们详细介绍了使用PyCharm中LiveTemplate功能快速输入一个GUI应用框架的办法.(参见学会这 ...

  9. python url解码_对python中url参数编码与解码的实例详解

    一.简介 在python中url,对于中文等非ascii码字符,需要进行参数的编码与解码. 二.关键代码 1.url编码 对字符串编码用urllib.parse包下的quote(string, saf ...

最新文章

  1. mac mysql 可视化工具_推荐3款好用的Redis、MySQL和MongoDB可视化管理工具
  2. 【转载】关于Java堆和栈的解释,收藏下来以后学习
  3. C#设计模式(7)-Singleton Pattern
  4. Linux学习之基本介绍
  5. java json 去除空_详解Java去除json数据中的null空值问题
  6. java spark on hive_hive-on-spark 安装 以及 scala 实例
  7. 释放Ubuntu/Linux系统cache,增加可用内存空间
  8. php图像无法显示,php – 无法显示图像,因为它包含错误[图像生成器]
  9. linux 命令 读phy_Linux PHY几个状态的跟踪
  10. 星际二 地图制作过程
  11. ORB-SLAM3论文解读
  12. socketpair机制
  13. ARINC429硬件层初探
  14. 单组学的多变量分析|1.PCA和PLS-DA
  15. jQuery中的get和post请求
  16. win11更新安装错误0x80073701解决方法
  17. DBlink 创建 删除 脚本
  18. COSCon'22@Beijing | 北京分会场等你赴约
  19. maven强制刷新本地包:(用于打包后重新加载)
  20. 4.2.3 适配GNURadio的USRP驱动安装与检测过程

热门文章

  1. ASP.NET MVC 3 Beta 发布了
  2. Python3使用md5
  3. vue组件开发脚手架(vue-sfc-rollup),开发组件并发布至npm的利器
  4. Redis入门(暂不更新)
  5. Python3的urllib.parse常用函数小结
  6. 如何在Amazon AWS上设置一台Linux服务器
  7. linux vi修改后如何保存
  8. ORACLE EBS财务科目FLEX FIELD的添加
  9. [置顶]       设计模式之结构类模式——桥梁模式
  10. 润乾报表与DERBY数据库的创建连接详解