3.RPC引入

上篇回顾:万物互联之~深入篇

Code:https://github.com/lotapp/BaseCode/tree/master/python/6.net/6.rpc/

其他专栏最新篇:协程加强之~兼容答疑篇 | 聊聊数据库~SQL环境篇

3.1.概念

RPC(Remote Procedure Call):分布式系统常见的一种通信方法(远程过程调用),通俗讲:可以一台计算机的程序调用另一台计算机的子程序(可以把它看成之前我们说的进程间通信,只不过这一次的进程不在同一台PC上了)

PS:RPC的设计思想是力图使远程调用中的通讯细节对于使用者透明,调用双方无需关心网络通讯的具体实现

引用一张网上的图:

HTTP有点相似,你可以这样理解:

  1. 老版本的HTTP/1.0是短链接,而RPC是长连接进行通信

    • HTTP协议(header、body),RPC可以采取HTTP协议,也可以自定义二进制格式
  2. 后来HTTP/1.1支持了长连接(Connection:keep-alive),基本上和RPC差不多了
    • keep-alive一般都限制有最长时间,或者最多处理的请求数,而RPC是基于长连接的,基本上没有这个限制
  3. 后来谷歌直接基于HTTP/2.0建立了gRPC,它们之间的基本上也就差不多了
    • 如果硬是要区分就是:HTTP-普通话RPC-方言的区别了
    • RPC高效而小众,HTTP效率没RPC高,但更通用
  4. PS:RPCHTTP调用不用经过中间件,而是端到端的直接数据交互
    • 网络交互可以理解为基于Socket实现的(RPCHTTP都是Socket的读写操作)

简单概括一下RPC的优缺点就是:

  1. 优点:

    1. 效率更高(可以自定义二进制格式)
    2. 发起RPC调用的一方,在编写代码时可忽略RPC的具体实现(跟编写本地函数调用一般
  2. 缺点:
    • 通用性不如HTTP(方言普及程度肯定不如普通话),如果传输协议不是HTTP协议格式,调用双方就需要专门实现通信库

PS:HTTP更多是ClientServer的通讯;RPC更多是内部服务器间的通讯

3.2.引入

上面说这么多,可能还没有来个案例实在,我们看个案例:

本地调用sum()

def sum(a, b):"""return a+b"""return a + bdef main():result = sum(1, 2)print(f"1+2={result}")if __name__ == "__main__":main()

输出:(这个大家都知道)

1+2=3

1.xmlrpc案例

官方文档:

https://docs.python.org/3/library/xmlrpc.client.html
https://docs.python.org/3/library/xmlrpc.server.html

都说RPC用起来就像本地调用一样,那么用起来啥样呢?看个案例:

服务端:(CentOS7:192.168.36.123:50051)

from xmlrpc.server import SimpleXMLRPCServerdef sum(a, b):"""return a+b"""return a + b# PS:50051是gRPC默认端口
server = SimpleXMLRPCServer(('', 50051))
# 把函数注册到RPC服务器中
server.register_function(sum)
print("Server启动ing,Port:50051")
server.serve_forever()

客户端:(Win10:192.168.36.144

from xmlrpc.client import ServerProxystub = ServerProxy("http://192.168.36.123:50051")
result = stub.sum(1, 2)
print(f"1+2={result}")

输出:(Client用起来是不是和本地差不多?就是通过代理访问了下RPCServer而已)

1+2=3

PS:CentOS服务器不是你绑定个端口就一定能访问的,如果不能记让防火墙开放对应的端口

这个之前在说MariaDB环境的时候有详细说:https://www.cnblogs.com/dotnetcrazy/p/9887708.html#_map4

# 添加 --permanent永久生效(没有此参数重启后失效)
firewall-cmd --zone=public --add-port=80/tcp --permanent

2.ZeroRPC案例:

zeroRPC用起来和这个差不多,也简单举个例子吧:

把服务的某个方法注册到RPCServer中,供外部服务调用

import zerorpcclass Test(object):def say_hi(self, name):return f"Hi,My Name is{name}"# 注册一个Test的实例
server = zerorpc.Server(Test())
server.bind("tcp://0.0.0.0:50051")
server.run()

调用服务端代码

import zerorpcclient = zerorpc.Client("tcp://192.168.36.123:50051")
result = client.say_hi("RPC")
print(result)

3.3.简单版自定义RPC

看了上面的引入案例,是不是感觉RPC不过如此?NoNoNo,要是真这么简单也就谈不上RPC架构了,上面两个是最简单的RPC服务了,可以这么说:生产环境基本上用不到,只能当案例练习罢了,对Python来说,最常用的RPC就两个gRPC and Thrift

PS:国产最出名的是Dubbo and Tars,Net最常用的是gRPCThriftSurging

1.RPC服务的流程

要自己实现一个RPC Server那么就得了解整个流程了:

  1. Client(调用者)以本地调用的方式发起调用
  2. 通过RPC服务进行远程过程调用(RPC的目标就是要把这些步骤都封装起来,让使用者感觉不到这个过程)
    1. 客户端的RPC Proxy组件收到调用后,负责将被调用的方法名、参数等打包编码成自定义的协议
    2. 客户端的RPC Proxy组件在打包完成后通过网络把数据包发送给RPC Server
    3. 服务端的RPC Proxy组件把通过网络接收到的数据包按照相应格式进行拆包解码,获取方法名和参数
    4. 服务端的RPC Proxy组件根据方法名和参数进行本地调用
    5. RPC Server(被调用者)本地执行后将结果返回给服务端的RPC Proxy
    6. 服务端的RPC Proxy组件将返回值打包编码成自定义的协议数据包,并通过网络发送给客户端的RPC Proxy组件
    7. 客户端的RPC Proxy组件收到数据包后,进行拆包解码,把数据返回给Client
  3. Client(调用者)得到本次RPC调用的返回结果

用一张时序图来描述下整个过程:

PS:RPC Proxy有时候也叫Stub(存根):(Client Stub,Server Stub)

为屏蔽客户调用远程主机上的对象,必须提供某种方式来模拟本地对象,这种本地对象称为存根(stub),存根负责接收本地方法调用,并将它们委派给各自的具体实现对象

PRC服务实现的过程中其实就两核心点:

  1. 消息协议:客户端调用的参数和服务端的返回值这些在网络上传输的数据以何种方式打包编码和拆包解码

    • 经典代表:Protocol Buffers
  2. 传输控制:在网络中数据的收发传输控制具体如何实现(TCP/UDP/HTTP

2.手写RPC

下面我们就根据上面的流程来手写一个简单的RPC:

1.Client调用:

# client.py
from client_stub import ClientStubdef main():stub = ClientStub(("192.168.36.144", 50051))result = stub.get("sum", (1, 2))print(f"1+2={result}")result = stub.get("sum", (1.1, 2))print(f"1.1+2={result}")time_str = stub.get("get_time")print(time_str)if __name__ == "__main__":main()

输出:

1+2=3
1.1+2=3.1
Wed Jan 16 22

2.Client Stub,客户端存根:(主要有打包解包、和RPC服务器通信的方法)

# client_stub.py
import socketclass ClientStub(object):def __init__(self, address):"""address ==> (ip,port)"""self.socket = socket.socket()self.socket.connect(address)def convert(self, obj):"""根据类型转换成对应的类型编号"""if isinstance(obj, int):return 1if isinstance(obj, float):return 2if isinstance(obj, str):return 3def pack(self, func, args):"""打包:把方法和参数拼接成自定义的协议格式:func:函数名@params:类型-参数,类型2-参数2..."""result = f"func:{func}"if args:params = ""# params:类型-参数,类型2-参数2...for item in args:params += f"{self.convert(item)}-{item},"# 去除最后一个,result += f"@params:{params[:-1]}"# print(result)  # log 输出return result.encode("utf-8")def unpack(self, data):"""解包:获取返回结果"""msg = data.decode("utf-8")# 格式应该是"data:xxxx"params = msg.split(":")if len(params) > 1:return params[1]return Nonedef get(self, func, args=None):"""1.客户端的RPC Proxy组件收到调用后,负责将被调用的方法名、参数等打包编码成自定义的协议"""data = self.pack(func, args)# 2.客户端的RPC Proxy组件在打包完成后通过网络把数据包发送给RPC Serverself.socket.send(data)# 等待服务端返回结果data = self.socket.recv(2048)if data:return self.unpack(data)return None

简要说明下:(我根据流程在Code里面标注了,看起来应该很轻松)

之前有说到核心其实就是消息协议and传输控制,我客户端存根的消息协议是自定义的格式(后面会说简化方案):func:函数名@params:类型-参数,类型2-参数2...,传输我是基于TCP进行了简单的封装


3.Server端:(实现很简单)

# server.py
import socket
from server_stub import ServerStubclass RPCServer(object):def __init__(self, address, mycode):self.mycode = mycode# 服务端存根(RPC Proxy)self.server_stub = ServerStub(mycode)# TCP Socketself.socket = socket.socket()# 端口复用self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)# 绑定端口self.socket.bind(address)def run(self):self.socket.listen()while True:# 等待客户端连接client_socket, client_addr = self.socket.accept()print(f"来自{client_addr}的请求:\n")# 交给服务端存根(Server Proxy)处理self.server_stub.handle(client_socket, client_addr)if __name__ == "__main__":from server_code import MyCodeserver = RPCServer(('', 50051), MyCode())print("Server启动ing,Port:50051")server.run()

为了简洁,服务端代码我单独放在了server_code.py中:

# 5.RPC Server(被调用者)本地执行后将结果返回给服务端的RPC Proxy
class MyCode(object):def sum(self, a, b):return a + bdef get_time(self):import timereturn time.ctime()

4.然后再看看重头戏Server Stub:

# server_stub.py
import socketclass ServerStub(object):def __init__(self, mycode):self.mycode = mycodedef convert(self, num, obj):"""根据类型编号转换类型"""if num == "1":obj = int(obj)if num == "2":obj = float(obj)if num == "3":obj = str(obj)return objdef unpack(self, data):"""3.服务端的RPC Proxy组件把通过网络接收到的数据包按照相应格式进行拆包解码,获取方法名和参数"""msg = data.decode("utf-8")# 格式应该是"格式:func:函数名@params:类型编号-参数,类型编号2-参数2..."array = msg.split("@")func = array[0].split(":")[1]if len(array) > 1:args = list()for item in array[1].split(":")[1].split(","):temps = item.split("-")# 类型转换args.append(self.convert(temps[0], temps[1]))return (func, tuple(args))  # (func,args)return (func, )def pack(self, result):"""打包:把方法和参数拼接成自定义的协议"""# 格式:"data:返回值"return f"data:{result}".encode("utf-8")def exec(self, func, args=None):"""4.服务端的RPC Proxy组件根据方法名和参数进行本地调用"""# 如果没有这个方法则返回Nonefunc = getattr(self.mycode, func, None)if args:return func(*args)  # 解包else:return func()  # 无参函数def handle(self, client_socket, client_addr):while True:# 获取客户端发送的数据包data = client_socket.recv(2048)if data:try:data = self.unpack(data)  # 解包if len(data) == 1:data = self.exec(data[0])  # 执行无参函数elif len(data) > 1:data = self.exec(data[0], data[1])  # 执行带参函数else:data = "RPC Server Error Code:500"except Exception as ex:data = "RPC Server Function Error"print(ex)# 6.服务端的RPC Proxy组件将返回值打包编码成自定义的协议数据包,并通过网络发送给客户端的RPC Proxy组件data = self.pack(data)  # 把函数执行结果按指定协议打包# 把处理过的数据发送给客户端client_socket.send(data)else:print(f"客户端:{client_addr}已断开\n")break

再简要说明一下:里面方法其实主要就是解包执行函数返回值打包

输出图示:

再贴一下上面的时序图:

课外拓展:

HTTP1.0、HTTP1.1 和 HTTP2.0 的区别
https://www.cnblogs.com/heluan/p/8620312.html简述分布式RPC框架
https://blog.csdn.net/jamebing/article/details/79610994分布式基础—RPC
http://www.dataguru.cn/article-14244-1.html

4.RPC简化与提炼

上篇回顾:万物互联之~RPC专栏 https://www.cnblogs.com/dunitian/p/10279946.html

上节课解答

之前有网友问,很多开源的RPC中都是使用路由表,这个怎么实现?

其实路由表实现起来也简单,代码基本上不变化,就修改一下server_stub.py__init__exe两个方法就可以了:

class ServerStub(object):def __init__(self, mycode):self.func_dict = dict()# 初始化一个方法名和方法的字典({func_name:func})for item in mycode.__dir__():if not item.startswith("_"):self.func_dict[item] = getattr(mycode, item)def exec(self, func, args=None):"""4.服务端的RPC Proxy组件根据方法名和参数进行本地调用"""# 如果没有这个方法则返回None# func = getattr(self.mycode, func, None)func = self.func_dict[func]if args:return func(*args)  # 解包else:return func()  # 无参函数

4.1.Json序列化

Python比较6的同志对上节课的Code肯定嗤之以鼻,上次自定义协议是同的通用方法,这节课我们先来简化下代码:

再贴一下上节课的时序图:

1.Json知识点

官方文档:https://docs.python.org/3/library/json.html

# 把字典对象转换为Json字符串
json_str = json.dumps({"func": func, "args": args})# 把Json字符串重新变成字典对象
data = json.loads(data)
func, args = data["func"], data["args"]

需要注意的就是类型转换了(eg:python tuple ==> json array

Python JSON
dict object
list, tuple array
str string
int, float number
True true
False false
None null

PS:序列化:json.dumps(obj),反序列化:json.loads(json_str)

2.消息协议采用Json格式

在原有基础上只需要修改下Stubpackunpack方法即可

Client_Stub(类型转换都省掉了)

import json
import socketclass ClientStub(object):def pack(self, func, args):"""打包:把方法和参数拼接成自定义的协议格式:{"func": "sum", "args": [1, 2]}"""json_str = json.dumps({"func": func, "args": args})# print(json_str)  # log 输出return json_str.encode("utf-8")def unpack(self, data):"""解包:获取返回结果"""data = data.decode("utf-8")# 格式应该是"{data:xxxx}"data = json.loads(data)# 获取不到就返回Nonereturn data.get("data", None)# 其他Code我没有改变

Server Stub()

import json
import socketclass ServerStub(object):def unpack(self, data):"""3.服务端的RPC Proxy组件把通过网络接收到的数据包按照相应格式进行拆包解码,获取方法名和参数"""data = data.decode("utf-8")# 格式应该是"格式:{"func": "sum", "args": [1, 2]}"data = json.loads(data)func, args = data["func"], data["args"]if args:return (func, tuple(args))  # (func,args)return (func, )def pack(self, result):"""打包:把方法和参数拼接成自定义的协议"""# 格式:"data:返回值"json_str = json.dumps({"data": result})return json_str.encode("utf-8")# 其他Code我没有改变

输出图示:

4.2.Buffer序列化

RPC其实更多的是二进制的序列化方式,这边简单介绍下

1.pickle知识点

官方文档:https://docs.python.org/3/library/pickle.html

用法和Json类似,PS:序列化:pickle.dumps(obj),反序列化:pickle.loads(buffer)

2.简单案例

和Json案例类似,也只是改了packunpack,我这边就贴一下完整代码(防止被吐槽)

1.Client

# 和上一节一样
from client_stub import ClientStubdef main():stub = ClientStub(("192.168.36.144", 50051))result = stub.get("sum", (1, 2))print(f"1+2={result}")result = stub.get("sum", (1.1, 2))print(f"1.1+2={result}")time_str = stub.get("get_time")print(time_str)if __name__ == "__main__":main()

2.ClientStub

import socket
import pickleclass ClientStub(object):def __init__(self, address):"""address ==> (ip,port)"""self.socket = socket.socket()self.socket.connect(address)def pack(self, func, args):"""打包:把方法和参数拼接成自定义的协议"""return pickle.dumps((func, args))def unpack(self, data):"""解包:获取返回结果"""return pickle.loads(data)def get(self, func, args=None):"""1.客户端的RPC Proxy组件收到调用后,负责将被调用的方法名、参数等打包编码成自定义的协议"""data = self.pack(func, args)# 2.客户端的RPC Proxy组件在打包完成后通过网络把数据包发送给RPC Serverself.socket.send(data)# 等待服务端返回结果data = self.socket.recv(2048)if data:return self.unpack(data)return None

3.Server

# 和上一节一样
import socket
from server_stub import ServerStubclass RPCServer(object):def __init__(self, address, mycode):self.mycode = mycode# 服务端存根(RPC Proxy)self.server_stub = ServerStub(mycode)# TCP Socketself.socket = socket.socket()# 端口复用self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)# 绑定端口self.socket.bind(address)def run(self):self.socket.listen()while True:# 等待客户端连接client_socket, client_addr = self.socket.accept()print(f"来自{client_addr}的请求:\n")try:# 交给服务端存根(Server Proxy)处理self.server_stub.handle(client_socket, client_addr)except Exception as ex:print(ex)if __name__ == "__main__":from server_code import MyCodeserver = RPCServer(('', 50051), MyCode())print("Server启动ing,Port:50051")server.run()

4.ServerCode

# 和上一节一样
# 5.RPC Server(被调用者)本地执行后将结果返回给服务端的RPC Proxy
class MyCode(object):def sum(self, a, b):return a + bdef get_time(self):import timereturn time.ctime()

5.ServerStub

import socket
import pickleclass ServerStub(object):def __init__(self, mycode):self.mycode = mycodedef unpack(self, data):"""3.服务端的RPC Proxy组件把通过网络接收到的数据包按照相应格式进行拆包解码,获取方法名和参数"""func, args = pickle.loads(data)if args:return (func, args)  # (func,args)return (func, )def pack(self, result):"""打包:把方法和参数拼接成自定义的协议"""return pickle.dumps(result)def exec(self, func, args=None):"""4.服务端的RPC Proxy组件根据方法名和参数进行本地调用"""# 如果没有这个方法则返回Nonefunc = getattr(self.mycode, func)if args:return func(*args)  # 解包else:return func()  # 无参函数def handle(self, client_socket, client_addr):while True:# 获取客户端发送的数据包data = client_socket.recv(2048)if data:try:data = self.unpack(data)  # 解包if len(data) == 1:data = self.exec(data[0])  # 执行无参函数elif len(data) > 1:data = self.exec(data[0], data[1])  # 执行带参函数else:data = "RPC Server Error Code:500"except Exception as ex:data = "RPC Server Function Error"print(ex)# 6.服务端的RPC Proxy组件将返回值打包编码成自定义的协议数据包,并通过网络发送给客户端的RPC Proxy组件data = self.pack(data)  # 把函数执行结果按指定协议打包# 把处理过的数据发送给客户端client_socket.send(data)else:print(f"客户端:{client_addr}已断开\n")break

输出图示:

然后关于RPC高级的内容(会涉及到注册中心),咱们后面说架构的时候继续,网络这边就说到这

转载于:https://www.cnblogs.com/dunitian/p/10279946.html

万物互联之~RPC专栏相关推荐

  1. “万物互联·泛在智能” 2019 嵌入式智能国际大会烧脑开幕!

    12月6日,以"万物互联·泛在智能"为主题的「2019嵌入式智能国际大会」在深圳市人才研修院隆重开幕. 本次大会由哈尔滨工业大学(深圳).清华大学国际研究生院.CSDN.嵌入式视觉 ...

  2. HarmonyOS IoT首著,走进万物互联的世界!(附送书)

    关注+星标公众号,不错过精彩内容 来源 | 博文视点 编排 | strongerHuang 1 鸿蒙来了 6月2日,华为发布HarmonyOS 2(鸿蒙)操作系统,连带多款搭载新系统的手机.平板电脑和 ...

  3. 万物互联之~网络编程基础篇

    入门篇¶ 官方文档:https://docs.python.org/3/library/ipc.html(进程间通信和网络) 实例代码:https://github.com/lotapp/BaseCo ...

  4. 万物互联!盘点国内八大物联网平台

    本文是对马智撰写的<国内物联网平台初探>系列文章的整理. 作者:马智.北京资信物联科技有限公司联合创始人兼研发总监. 责编:贾维娣.关注物联网领域,寻求报道或投稿请邮件联系 jiawd@c ...

  5. 物联网布局卡位战:东芝的万物互联梦想

    万物互联的趋势变得越来越明显,物联网(IoT)正飞速发展,继计算机.互联网之后,世界信息产业发展的第三次浪潮正悄然袭来. 近一两年,已经有越来越多的IoT产品开始逐渐渗透到人们的日常生活当中,IoT的 ...

  6. 鸿蒙OS:万物互联,方舟Compiler

    鸿蒙OS:万物互联,方舟Compiler 1.方舟JS运行时组件 简介 方舟JS运行时(ARK JavaScript Runtime)是OpenHarmony上JS应用使用的运行时.包含JS对象的分配 ...

  7. 5g信号云端服务器,随着5G万物互联的到来,云服务器迎来新机遇

    (文章来源:砍柴网) 近日,金山云宣布基于全新Intel Cascade Lake CPU.最新六通道DDR4内存的第四代云服务器正式上线. 该云服务器整体性能出色,无论是运算能力.还是网络收发包能力 ...

  8. 华为出鸿蒙是不是给人看的,谁来成为鸿蒙OS失去的“躯壳” 鸿蒙OS(HarmonyOS),在很多人眼中,是华为万物互联的起点,也是反抗之下诞生出的杰作,亦是中国科技史上重要的里程... - 雪球...

    来源:雪球App,作者: 速途网,(https://xueqiu.com/2989821209/181729468) 鸿蒙OS(HarmonyOS),在很多人眼中,是华为万物互联的起点,也是反抗之下诞 ...

  9. 万物生长,万物互联的时代来了

    一,题记 互联网原住民的四肢已经被技术解放 大脑也已经被科技加持 这个充满奇迹的失控的世界,从此进入了指数增长的人类高能时代 然而我们深知,智能是人类智慧的外延 我们依旧对人性充满敬畏 未来世界是冒险 ...

最新文章

  1. Linux vsftp配置详解
  2. linux上安装nero4j 关系数据库
  3. python虚拟环境的作用_python虚拟环境搭建
  4. Linux系统编程——基于文件描述符的文件操作(1)
  5. 【单片机相关】的网站
  6. leetcode 813. Largest Sum of Averages | 813. 最大平均值和的分组(暴力递归->傻缓存->DP)
  7. 美工一流的个人网站源码系列(2),不漂亮你可以不下载!
  8. 【C++深度剖析教程26】父子间的冲突
  9. 解决 IE8下 vs2008 无法调试
  10. 隐藏的图片在浏览器中的请求
  11. Lua: 给 Redis 用户的入门指导
  12. (转载)设计模式之-策略模式(Strategy)
  13. Python3.x建立服务器自动监测端口数据,客户端测试服务器
  14. Kubernetes 开发流程中的三个关键步骤
  15. HNUCM 1325:fps游戏
  16. 服务器系统打不上网卡驱动,服务器网卡驱动程序不能正确加载
  17. 从appfuse开始学习Spring和Hibernate - (1)构建项目
  18. GIF微信表情如何制作
  19. 自动设置为兼容模式html,什么是兼容模式?
  20. 数据库分类和负载均衡方案

热门文章

  1. java mvc引擎_SpringMvc+JavaConfig+Idea 搭建项目
  2. apache 设置404 页面_SpringBoot自定义错误页面
  3. 中文TeX的编辑环境推荐
  4. MySQL 8.0索引合并
  5. GIT-Linux(CentOS7)系统部署git服务器
  6. 练习2-1 Programming in C is fun!
  7. java如何实现Socket的长连接和短连接
  8. mysql5.7半自动同步设置【转】
  9. juce中的BailOutChecker
  10. 用手机EchoEcho问询朋友所在的位置