gRPC在Python的异步非阻塞实现方式

  • 前言
  • 问题&分析
    • 问题阐述
    • 原因分析
  • 解决方案
    • 服务端
      • 原服务端实现方式
      • aio的服务端实现方式:
    • 客户端
      • 异步非阻塞方式
      • 同步阻塞方式
    • 性能优化效果
  • 最佳实践
  • 参考文献

前言

之前写过两篇文章讲述了RPC服务的概念和gRPC的基本使用、proto语法、TLS认证、异常处理和重连重试等等的教程。两篇文章传送入口:

  1. gRPC基本使用教程
  2. gRPC身份认证与流式通信

当我们真正把gRPC服务部署到生产环境上的时候,除了上诉这些基本使用和安全认证之外,我们还要考虑性能问题。本文主要探讨一下如何解决在请求并发数较高时如何保证gRPC服务的性能。

问题&分析

问题阐述

  1. 在生产环境中,api的gRPC客户端在连接和调用gRPC服务端的时候出现性能瓶颈,尤其是api服务在QPS陡增的时候,服务实例数扩容到了最大值,仍然有一些请求出现502,即由于阻塞gRPC请求导致请求队列中的一些请求响应返回给负载均衡器的时间超过了负载均衡器的超时时限;
  2. 定时脚本在连接和调研gRPC服务端时也是阻塞式请求,处理效率低。

原因分析

Python由于“大家都懂”的GIL问题,在使用多线程时CPU操作是无法分配在多核进行的,故而导致很多框架多线程实际上是阻塞的,但是在进行一些机器I/O或者网络I/O请求时,python的多线程或者协程是可以不阻塞的,也就是说GIL锁限制的是CPU密集型的服务的性能,而不会限制I/O密集型的服务性能。所以我们在使用gRPC这种网络I/O服务的时候,也可以达到非阻塞的效果。本文的问题即是如何让gRPC服务达到非阻塞的效果。

解决方案

由于asyncio的引入,使得python目前的协程得到较为完善的补充。在进行I/O操作时可以采用协程的方式,不占用I/O等待时间,让协程去处理I/O请求,主进程可以继续监听下一个请求,等协程的请求回调了再交付给主进程继续执行下去,以达到提高性能的目的,这就是异步非阻塞(aio)的方式。
之前的文章讲述过一些常用的支持aio的库如aiohttp、aiomysql等等,本文我们采用的是aio版本的gRPC库,目前在pypi上的grpcio库,1.35.0版本开始已经可以支持asyncio的方式,GitHub链接:https://github.com/grpc/grpc

完整gRPC asyncio api文档:https://grpc.github.io/grpc/python/grpc_asyncio.html
我们分别通过在服务端和客户端来实践一下gRPC的aio实现:

服务端

原服务端实现方式

import os
import sys
import time
import grpc
import asyncio
from concurrent import futuresasync def start_server():# start rpc serviceserver = grpc.server(futures.ThreadPoolExecutor(max_workers=40), options=[('grpc.max_send_message_length', 100 * 1024 * 1024),('grpc.max_receive_message_length', 100 * 1024 * 1024),('grpc.enable_retries', 1),])xxx_pb2_grpc.add_xxx_to_server(<rpc_function>, server) # 加入服务server.add_insecure_port('[::]:50051')server.start()# since server.start() will not block,# a sleep-loop is added to keep alivetry:while True:time.sleep(86400)except KeyboardInterrupt:server.stop(0)if __name__ == '__main__':loop = asyncio.get_event_loop()loop.run_until_complete(asyncio.wait([start_server()]))loop.close()

aio的服务端实现方式:

import os
import sys
import time
import grpc
from grpc.experimental import aio
import asyncio
from concurrent import futuresasync def start_server():# start rpc serviceserver = aio.server(futures.ThreadPoolExecutor(max_workers=40), options=[('grpc.so_reuseport', 0),('grpc.max_send_message_length', 100 * 1024 * 1024),('grpc.max_receive_message_length', 100 * 1024 * 1024),('grpc.enable_retries', 1),])xxx_pb2_grpc.add_xxx_to_server(<rpc_function>, server)  # 加入服务server.add_insecure_port('[::]:50051')await server.start()# since server.start() will not block,# a sleep-loop is added to keep alivetry:await server.wait_for_termination()except KeyboardInterrupt:await server.stop(None)if __name__ == '__main__':loop = asyncio.get_event_loop()loop.run_until_complete(asyncio.wait([start_server()]))loop.close()

采用aio的实现方式,只需要使用1.35.0版本及以上的grpcio库,在创建服务端的时候将grpc.server()改为aio.server(),服务启动的类方法采用async和await的方式即可完成。

注:服务端开启了aio的支持,是既可以支持阻塞式的客户端请求,也支持客户端非阻塞式请求的,所以对于旧版本客户端是直接兼容的

客户端

服务端增加了非阻塞式请求的支持,客户端则可以采用阻塞和非阻塞的方式来调用:

异步非阻塞方式

import grpc
import sys
import os
from . import rpc_configclass RpcClient(object):# rpc_client = {}rpc_client = None@staticmethoddef get_rpc_channel(host, port):options = rpc_config.RPC_OPTIONS# OPTIONS配置可根据需要自行设置:#RPC_OPTIONS = [('grpc.max_send_message_length', 100 * 1024 * 1024),#       ('grpc.max_receive_message_length', 100 * 1024 * 1024),#       ('grpc.enable_retries', 1),#       ('grpc.service_config',#        '{"retryPolicy": {"maxAttempts": 4, "initialBackoff": "0.1s", '#        '"maxBackoff": "1s", "backoffMutiplier": 2, '#        '"retryableStatusCodes": ["UNAVAILABLE"]}}'),#       ]channel = grpc.insecure_channel("{}:{}".format(host, port),options=options)return channeldef load_sub_rpc(self, platform, host, port, db_type):"""function return rpc instance:param platform:param host:param port:param db_type:return: instance"""channel = self.get_rpc_channel(host, port)stub = xxx_pb2_grpc.xxxStub(channel)return stub

同步阻塞方式

import grpc
import sys
import os
from . import rpc_configclass RpcClient(object):rpc_client = {}@staticmethoddef get_rpc_channel(host, port, is_aio=False):options = rpc_config.RPC_OPTIONS# OPTIONS配置可根据需要自行设置:#RPC_OPTIONS = [('grpc.max_send_message_length', 100 * 1024 * 1024),#       ('grpc.max_receive_message_length', 100 * 1024 * 1024),#       ('grpc.enable_retries', 1),#       ('grpc.service_config',#        '{"retryPolicy": {"maxAttempts": 4, "initialBackoff": "0.1s", '#        '"maxBackoff": "1s", "backoffMutiplier": 2, '#        '"retryableStatusCodes": ["UNAVAILABLE"]}}'),#       ]if is_aio:channel = grpc.aio.insecure_channel("{}:{}".format(host, port),options=options)else:channel = grpc.insecure_channel("{}:{}".format(host, port),options=options)return channeldef load_sub_rpc(self, platform, host, port, db_type, is_aio=False):"""function return rpc instance:param platform:param host:param port:param db_type:param is_aio:return: instance"""channel = self.get_rpc_channel(host, port, is_aio)stub = xxxe_pb2_grpc.xxxStub(channel)return stub

可以看出,aio的客户端实现方式(grpc.aio.insecure_channel)也只比原来的方式(grpc.insecure_channel)多了一步获取aio的对象进行连接而已,这里做简单的案例采用未认证的insecure_channel方式,生产环境建议使用secure_channel方式。

性能优化效果

我在Google Cloud Platform上做了个简单的压测进行性能对比,分别启动同步阻塞模型和重启建立非阻塞模型得到502分布如下:

最佳实践

  1. 通过aio方式实现gRPC客户端和服务端;
  2. 采用TLS或SSL加密的gRPC连接;
  3. 配置合适的请求最大字节数、连接数等资源;
  4. 建立连接的options参数里面加入重试、重连机制;

参考文献

  1. https://grpc.github.io/grpc/python/grpc_asyncio.html
  2. https://github.com/grpc/grpc
  3. https://github.com/grpc/proposal/pull/155

【Python进阶学习】gRPC在Python的异步非阻塞实现方式相关推荐

  1. tornado异步非阻塞实现方式

    目录 tornado异步非阻塞实现方式 1.多线程 2.老版协程 3.新版协程 实验 1.完全阻塞,同步代码 2.老版本协程 3新版本协程 4 多线程+新版协程,线程函数失效 5.线程+不声明异步,可 ...

  2. Python web框架 Tornado(二)异步非阻塞使用以及原理

    原文: http://www.liangxiansen.cn/2018/04/11/tornado/ 作者: 梁先森 稍有改动 Tornado默认是单进程单线程.实时的web特性通常需要为每个用户一个 ...

  3. 200行自定义异步非阻塞Web框架

    Python的Web框架中Tornado以异步非阻塞而闻名.本篇将使用200行代码完成一个微型异步非阻塞Web框架:Snow. 一.源码 本文基于非阻塞的Socket以及IO多路复用从而实现异步非阻塞 ...

  4. 真正的 Tornado 异步非阻塞

    其中 Tornado 的定义是 Web 框架和异步网络库,其中他具备有异步非阻塞能力,能解决他两个框架请求阻塞的问题,在需要并发能力时候就应该使用 Tornado. 但是在实际使用过程中很容易把 To ...

  5. Nginx的异步非阻塞

    转载地址:https://blog.csdn.net/dutsoft/article/details/55224755 同步与异步 同步与异步的理解 同步与异步的重点在消息通知的方式上,也就是调用结果 ...

  6. 面试问题什么是异步非阻塞

    一.同步与异步 同步/异步, 它们是消息的通知机制 1. 概念解释 A. 同步 所谓同步,就是在发出一个功能调用时,在没有得到结果之前,该调用就不返回. 按照这个定义,其实绝大多数函数都是同步调用(例 ...

  7. nginx异步非阻塞 解析

    同步与异步的理解 同步与异步的重点在消息通知的方式上,也就是调用结果通知的方式. 同步:当一个同步调用发出去后,调用者要一直等待调用结果的通知后,才能进行后续的执行. 异步:当一个异步调用发出去后,调 ...

  8. leetcode与python进阶学习总结

    转自:leetcode与python进阶学习总结 l1是一个链表型,val是其属性,以下句子意义为如果l1不为空则取l1.val否则取0,节省代码空间,干净利落 x= l1.val if l1 els ...

  9. python进阶学习--- django框架解析 ---领悟编程语言共性与特性【后续详解】

    python进阶学习---> django框架解析 --->领悟编程语言共性与特性 1.python语言介绍   python解释型脚本语言 2.python执行原理   python解释 ...

最新文章

  1. legacy引导gpt分区_windows分区模式和启动模式(UEFI+GPT或Legacy+MBR组合)
  2. f(f(x)) = -x
  3. 修改RAC VIP IP
  4. MPAI正式启动端到端的AI编码标准
  5. 阿里云应用高可用服务公测发布
  6. mysql 主从同步 阻塞_如何解决主从数据库同步延迟问题?
  7. k8s核心技术-Controller(Deployment)控制器对pod的管理实现_升级回滚和弹性伸缩---K8S_Google工作笔记0030
  8. mybatis使用和分析
  9. iOS开发之使用Runtime给Model类赋值
  10. access查询出生年月大于,access选择题题库
  11. 微信小程序蓝牙通讯、串口通讯、调试助手(HC-08等 )
  12. 高性能密码适用性分析
  13. 对接阿里云的短信接口发送手机验证码
  14. 了解C语言中的exec函数家族
  15. 山东理工ACM 1151 C语言实验——输出字符串
  16. Java实现打开浏览器的N种办法
  17. 使用MultipartFile+ElementUi(el-upload)实现前端向后端传图片
  18. 2018版 主流SDR设备横向比较
  19. java基础面试题-
  20. 计算机初学者的干货(写的非常好本人推荐)

热门文章

  1. 2023年Android黑科技保活方案,应用永生,拒绝强制杀死 最高适配Android 13 小米 华为 Oppo vivo 等最新机型 拒绝强杀 开机自启动 附demo apk 附研究资料
  2. [译] 如何阅读苹果开发文档
  3. ArrayList取值和HashMapget取值谁速度更快
  4. activiti5、activiti6、activiti7、flowable、camunda7、camunda8流程引擎对比分析和选型参考
  5. 关于OBS无法获取酷狗音乐窗口问题
  6. 什么是函数柯里化,函数柯里化的应用场景,函数柯里化的优缺点
  7. 从主流剪辑软件与配置标准,聊聊剪辑视频的电脑
  8. 苹果—Mac电脑关于电源不充电问题
  9. PostgreSQL 怎么通过vacuum 加速事务ID回收的速度 (翻译)
  10. python动态交互式图表库_五个创建交互式图表的Python库