基于 protobuf 的 RPC 可以说是五花八门,其中不乏非常优秀的代码例如 brpc, muduo-rpc 等。

protobuf 实现了序列化部分,并且预留了 RPC 接口,但是没有实现网络交互的部分。

本文想介绍下,如何实现基于 protobuf 实现一个极简版的 RPC ,这样有助于我们阅读 RPC 源码。

一次完整的 RPC 通信实际上是有三部分代码共同完成:

  • protobuf 自动生成的代码
  • RPC 框架
  • 用户填充代码

本文假设用户熟悉 protobuf 并且有 RPC 框架的使用经验。首先介绍下 protobuf 自动生成的代码,接着介绍下用户填充代码,然后逐步介绍下极简的 RPC 框架的实现思路,相关代码可以直接跳到文章最后。

1 proto

我们定义了EchoService, method 为Echo.

package echo;option cc_generic_services = true;message EchoRequest {required string msg = 1;
}message EchoResponse {required string msg = 2;
}service EchoService {rpc Echo(EchoRequest) returns (EchoResponse);
}

protoc 自动生成echo.pb.h echo.pb.cc两部分代码. 其中service EchoService这一句会生成EchoService EchoService_Stub两个类,分别是 server 端和 client 端需要关心的。

对 server 端,通过EchoService::Echo来处理请求,代码未实现,需要子类来 override.

class EchoService : public ::google::protobuf::Service {...virtual void Echo(::google::protobuf::RpcController* controller,const ::echo::EchoRequest* request,::echo::EchoResponse* response,::google::protobuf::Closure* done);
};void EchoService::Echo(::google::protobuf::RpcController* controller,const ::echo::EchoRequest*,::echo::EchoResponse*,::google::protobuf::Closure* done) {//代码未实现controller->SetFailed("Method Echo() not implemented.");done->Run();
}

对 client 端,通过EchoService_Stub来发送数据,EchoService_Stub::Echo调用了::google::protobuf::Channel::CallMethod,但是Channel是一个纯虚类,需要 RPC 框架在子类里实现需要的功能。

class EchoService_Stub : public EchoService {...void Echo(::google::protobuf::RpcController* controller,const ::echo::EchoRequest* request,::echo::EchoResponse* response,::google::protobuf::Closure* done);private:::google::protobuf::RpcChannel* channel_;
};void EchoService_Stub::Echo(::google::protobuf::RpcController* controller,const ::echo::EchoRequest* request,::echo::EchoResponse* response,::google::protobuf::Closure* done) {channel_->CallMethod(descriptor()->method(0),controller, request, response, done);
}

2 server && client

有过 RPC 使用经验的话,都了解 server 端代码类似于这样(参考brpc echo_c++ server.cpp)

//override Echo method
class MyEchoService : public echo::EchoService {public:virtual void Echo(::google::protobuf::RpcController* /* controller */,const ::echo::EchoRequest* request,::echo::EchoResponse* response,::google::protobuf::Closure* done) {std::cout << request->msg() << std::endl;response->set_msg(std::string("I have received '") + request->msg() + std::string("'"));done->Run();}
};//MyEchoServiceint main() {MyServer my_server;MyEchoService echo_service;my_server.add(&echo_service);my_server.start("127.0.0.1", 6688);return 0;
}

只要定义子类 service 实现 method 方法,再把 service 加到 server 里就可以了。

而 client 基本这么实现(参考brpc echo_c++ client.cpp)

int main() {MyChannel channel;channel.init("127.0.0.1", 6688);echo::EchoRequest request;echo::EchoResponse response;request.set_msg("hello, myrpc.");echo::EchoService_Stub stub(&channel);MyController cntl;stub.Echo(&cntl, &request, &response, NULL);std::cout << "resp:" << response.msg() << std::endl;return 0;
}

这样的用法看起来很自然,但是仔细想想背后的实现,肯定会有很多疑问:

为什么 server 端只需要实现MyEchoService::Echo函数,client端只需要调用EchoService_Stub::Echo就能发送和接收对应格式的数据?中间的调用流程是怎么样子的?

如果 server 端接收多种 pb 数据(例如还有一个 method rpc Post(DeepLinkReq) returns (DeepLinkResp);),那么怎么区分接收到的是哪个格式?

区分之后,又如何构造出对应的对象来?例如MyEchoService::Echo参数里的EchoRequest EchoResponse,因为 rpc 框架并不清楚这些具体类和函数的存在,框架并不清楚具体类的名字,也不清楚 method 名字,却要能够构造对象并调用这个函数?

可以推测答案在MyServer MyChannel MyController里,接下来我们逐步分析下。

3 处理流程

考虑下 server 端的处理流程

从对端接收数据
通过标识机制判断如何反序列化到 request 数据类型
生成对应的 response 数据类型
调用对应的 service-method ,填充 response 数据
序列化 response
发送数据回对端
具体讲下上一节提到的接口设计的问题,体现在2 3 4步骤里,还是上面 Echo 的例子,因为 RPC 框架并不能提前知道EchoService::Echo这个函数,怎么调用这个函数呢?

google/protobuf/service.h里::google::protobuf::Service的源码如下:

class LIBPROTOBUF_EXPORT Service {virtual void CallMethod(const MethodDescriptor* method,RpcController* controller,const Message* request,Message* response,Closure* done) = 0;
};//Service

Service 是一个纯虚类,CallMethod = 0,EchoService实现如下

void EchoService::CallMethod(const ::google::protobuf::MethodDescriptor* method,::google::protobuf::RpcController* controller,const ::google::protobuf::Message* request,::google::protobuf::Message* response,::google::protobuf::Closure* done) {GOOGLE_DCHECK_EQ(method->service(), EchoService_descriptor_);switch(method->index()) {case 0:Echo(controller,::google::protobuf::down_cast<const ::echo::EchoRequest*>(request),::google::protobuf::down_cast< ::echo::EchoResponse*>(response),done);break;default:GOOGLE_LOG(FATAL) << "Bad method index; this should never happen.";break;}
}

可以看到这里会有一次数据转化down_cast,因此框架可以通过调用::google::protobuf::Service::CallMethod函数来调用Echo,数据统一为Message*格式,这样就可以解决框架的接口问题了。

再考虑下 client 端处理流程。

EchoService_Stub::Echo的实现里:

channel_->CallMethod(descriptor()->method(0),controller, request, response, done);

因此先看下::google::protobuf::RpcChannel的实现:

// Abstract interface for an RPC channel.  An RpcChannel represents a
// communication line to a Service which can be used to call that Service's
// methods.  The Service may be running on another machine.  Normally, you
// should not call an RpcChannel directly, but instead construct a stub Service
// wrapping it.  Example:
//   RpcChannel* channel = new MyRpcChannel("remotehost.example.com:1234");
//   MyService* service = new MyService::Stub(channel);
//   service->MyMethod(request, &response, callback);
class LIBPROTOBUF_EXPORT RpcChannel {public:inline RpcChannel() {}virtual ~RpcChannel();// Call the given method of the remote service.  The signature of this// procedure looks the same as Service::CallMethod(), but the requirements// are less strict in one important way:  the request and response objects// need not be of any specific class as long as their descriptors are// method->input_type() and method->output_type().virtual void CallMethod(const MethodDescriptor* method,RpcController* controller,const Message* request,Message* response,Closure* done) = 0;private:GOOGLE_DISALLOW_EVIL_CONSTRUCTORS(RpcChannel);
};

pb 的注释非常清晰,channel 可以理解为一个通道,连接了 rpc 服务的两端,本质上也是通过 socket 通信的。

但是RpcChannel也是一个纯虚类,CallMethod = 0。

因此我们需要实现一个子类,基类为RpcChannel,并且实现CallMethod方法,应该实现两个功能:

序列化 request ,发送到对端,同时需要标识机制使得对端知道如何解析(schema)和处理(method)这类数据。
接收对端数据,反序列化到 response
此外还有RpcController,也是一个纯虚类,是一个辅助类,用于获取RPC结果,对端IP等。

4 标识机制

上一节提到的所谓标识机制,就是当 client 发送一段数据流到 server ,server 能够知道这段 buffer 对应的数据格式,应该如何处理,对应的返回数据格式是什么样的。

最简单暴力的方式就是在每组数据里都标识下是什么格式的,返回值希望是什么格式的,这样一定能解决问题。

但是 pb 里明显不用这样,因为 server/client 使用相同(或者兼容)的 proto,只要标识下数据类型名就可以了。不过遇到相同类型的 method 也会有问题,例如

service EchoService {rpc Echo(EchoRequest) returns (EchoResponse);rpc AnotherEcho(EchoRequest) returns (EchoResponse)
}

因此可以使用 service 和 method 名字,通过 proto 就可以知道 request/response 类型了。

因此,结论是:我们在每次数据传递里加上service method名字就可以了。

pb 里有很多 xxxDescriptor 的类,service method也不例外。例如GetDescriptor可以获取ServiceDescriptor.

class LIBPROTOBUF_EXPORT Service {...// Get the ServiceDescriptor describing this service and its methods.virtual const ServiceDescriptor* GetDescriptor() = 0;
};//Service

通过ServiceDescriptor就可以获取对应的name及MethodDescriptor.

class LIBPROTOBUF_EXPORT ServiceDescriptor {public:// The name of the service, not including its containing scope.const string& name() const;...// The number of methods this service defines.int method_count() const;// Gets a MethodDescriptor by index, where 0 <= index < method_count().// These are returned in the order they were defined in the .proto file.const MethodDescriptor* method(int index) const;
};//ServiceDescriptor

而MethodDecriptor可以获取对应的name及从属的ServiceDescriptor

class LIBPROTOBUF_EXPORT MethodDescriptor {public:// Name of this method, not including containing scope.const string& name() const;...// Gets the service to which this method belongs.  Never NULL.const ServiceDescriptor* service() const;
};//MethodDescriptor

因此:

server 端传入一个::google::protobuf::Service时,我们可以记录 service name 及所有的 method name.
client 端调用virtual void CallMethod(const MethodDescriptor* method…时,也可以获取到 method name 及对应的 service name.
这样,就可以知道发送的数据类型了。

5 构造参数

前面还提到的一个问题,是如何构造具体参数的问题。实现 RPC 框架时,肯定是不知道EchoRequest EchoResponse类名的,但是通过::google::protobuf::Service的接口可以构造出对应的对象来

  //   const MethodDescriptor* method =//     service->GetDescriptor()->FindMethodByName("Foo");//   Message* request  = stub->GetRequestPrototype (method)->New();//   Message* response = stub->GetResponsePrototype(method)->New();//   request->ParseFromString(input);//   service->CallMethod(method, *request, response, callback);virtual const Message& GetRequestPrototype(const MethodDescriptor* method) const = 0;virtual const Message& GetResponsePrototype(const MethodDescriptor* method) const = 0;

而Message通过New可以构造出对应的对象

class LIBPROTOBUF_EXPORT Message : public MessageLite {public:inline Message() {}virtual ~Message();// Basic Operations ------------------------------------------------// Construct a new instance of the same type.  Ownership is passed to the// caller.  (This is also defined in MessageLite, but is defined again here// for return-type covariance.)virtual Message* New() const = 0;...

这样,我们就可以得到Service::Method需要的对象了。

6 Server/Channel/Controller子类实现

前面已经介绍了基本思路,本节介绍下具体的实现部分。

6.1 RpcMeta

RpcMeta用于解决传递 service-name method-name 的问题,定义如下

package myrpc;message RpcMeta {optional string service_name = 1;optional string method_name = 2;optional int32 data_size = 3;
}

其中data_size表示接下来要传输的数据大小,例如EchoRequest对象的大小。

同时我们还需要一个int来表示RpcMeta的大小,因此我们来看下Channel的实现

6.2 Channel

//继承自RpcChannel,实现数据发送和接收

class MyChannel : public ::google::protobuf::RpcChannel {public://init传入ip:port,网络交互使用boost.asiovoid init(const std::string& ip, const int port) {_io = boost::make_shared<boost::asio::io_service>();_sock = boost::make_shared<boost::asio::ip::tcp::socket>(*_io);boost::asio::ip::tcp::endpoint ep(boost::asio::ip::address::from_string(ip), port);_sock->connect(ep);}//EchoService_Stub::Echo会调用Channel::CallMethod//其中第一个参数MethodDescriptor* method,可以获取service-name method-namevirtual void CallMethod(const ::google::protobuf::MethodDescriptor* method,::google::protobuf::RpcController* /* controller */,const ::google::protobuf::Message* request,::google::protobuf::Message* response,::google::protobuf::Closure*) {//request数据序列化std::string serialzied_data = request->SerializeAsString();//获取service-name method-name,填充到rpc_metamyrpc::RpcMeta rpc_meta;rpc_meta.set_service_name(method->service()->name());rpc_meta.set_method_name(method->name());rpc_meta.set_data_size(serialzied_data.size());//rpc_meta序列化std::string serialzied_str = rpc_meta.SerializeAsString();//获取rpc_meta序列化数据大小,填充到数据头部,占用4个字节int serialzied_size = serialzied_str.size();serialzied_str.insert(0, std::string((const char*)&serialzied_size, sizeof(int)));//尾部追加request序列化后的数据serialzied_str += serialzied_data;//发送全部数据://|rpc_meta大小(定长4字节)|rpc_meta序列化数据(不定长)|request序列化数据(不定长)|_sock->send(boost::asio::buffer(serialzied_str));//接收4个字节:序列化的resp数据大小char resp_data_size[sizeof(int)];_sock->receive(boost::asio::buffer(resp_data_size));//接收N个字节:N=序列化的resp数据大小int resp_data_len = *(int*)resp_data_size;std::vector<char> resp_data(resp_data_len, 0);_sock->receive(boost::asio::buffer(resp_data));//反序列化到respresponse->ParseFromString(std::string(&resp_data[0], resp_data.size()));}private:boost::shared_ptr<boost::asio::io_service> _io;boost::shared_ptr<boost::asio::ip::tcp::socket> _sock;
};//MyChannel

通过实现Channel::CallMethod方法,我们就可以在调用子类方法,例如EchoService_Stub::Echo时自动实现数据的发送/接收、序列化/反序列化了。

6.3 Server

Server的实现会复杂一点,因为可能注册多个Service::Method,当接收到 client 端的数据,解析RpcMeta得到service-name method-name后,需要找到对应的Service::Method,注册时就需要记录这部分信息。

因此,我们先看下add方法的实现:

class MyServer {public:void add(::google::protobuf::Service* service) {ServiceInfo service_info;service_info.service = service;service_info.sd = service->GetDescriptor();for (int i = 0; i < service_info.sd->method_count(); ++i) {service_info.mds[service_info.sd->method(i)->name()] = service_info.sd->method(i);}_services[service_info.sd->name()] = service_info;}...
private:struct ServiceInfo{::google::protobuf::Service* service;const ::google::protobuf::ServiceDescriptor* sd;std::map<std::string, const ::google::protobuf::MethodDescriptor*> mds;};//ServiceInfo//service_name -> {Service*, ServiceDescriptor*, MethodDescriptor* []}std::map<std::string, ServiceInfo> _services;

我在实现里,_services记录了 service 及对应的ServiceDescriptor MethodDescriptor。而ServiceDescritpr::FindMethodByName方法可以查找 method ,因此不记录method_name也可以。不过出于性能考虑,我觉得还可以记录更多,例如 req/resp 数据类型等。

注册 service 后,就可以启动 server 监听端口和接收数据了

//监听ip:port,接收数据

void MyServer::start(const std::string& ip, const int port) {boost::asio::io_service io;boost::asio::ip::tcp::acceptor acceptor(io,boost::asio::ip::tcp::endpoint(boost::asio::ip::address::from_string(ip),port));while (true) {auto sock = boost::make_shared<boost::asio::ip::tcp::socket>(io);acceptor.accept(*sock);std::cout << "recv from client:"<< sock->remote_endpoint().address()<< std::endl;//接收4个字节:rpc_meta长度char meta_size[sizeof(int)];sock->receive(boost::asio::buffer(meta_size));int meta_len = *(int*)(meta_size);//接收rpc_meta数据std::vector<char> meta_data(meta_len, 0);sock->receive(boost::asio::buffer(meta_data));myrpc::RpcMeta meta;meta.ParseFromString(std::string(&meta_data[0], meta_data.size()));//接收req数据std::vector<char> data(meta.data_size(), 0);sock->receive(boost::asio::buffer(data));//数据处理dispatch_msg(meta.service_name(),meta.method_name(),std::string(&data[0], data.size()),sock);}
}

start启动一个循环,解析RpcMeta数据并接收 request 数据,之后交给 dispatch_msg 处理。

void MyServer::dispatch_msg(const std::string& service_name,const std::string& method_name,const std::string& serialzied_data,const boost::shared_ptr<boost::asio::ip::tcp::socket>& sock) {//根据service_name method_name查找对应的注册的Service*auto service = _services[service_name].service;auto md = _services[service_name].mds[method_name];std::cout << "recv service_name:" << service_name << std::endl;std::cout << "recv method_name:" << method_name << std::endl;std::cout << "recv type:" << md->input_type()->name() << std::endl;std::cout << "resp type:" << md->output_type()->name() << std::endl;//根据Service*生成req resp对象auto recv_msg = service->GetRequestPrototype(md).New();recv_msg->ParseFromString(serialzied_data);auto resp_msg = service->GetResponsePrototype(md).New();MyController controller;auto done = ::google::protobuf::NewCallback(this,&MyServer::on_resp_msg_filled,recv_msg,resp_msg,sock);//调用Service::Method(即用户实现的子类方法)service->CallMethod(md, &controller, recv_msg, resp_msg, done);
}

用户填充resp_msg后,会调用done指定的回调函数(也就是我们在 MyEchoService::Echo 代码里对应的done->Run()这一句)。

在用户填充数据后,on_resp_msg_filled用于完成序列化及发送的工作。


void MyServer::on_resp_msg_filled(::google::protobuf::Message* recv_msg,::google::protobuf::Message* resp_msg,const boost::shared_ptr<boost::asio::ip::tcp::socket> sock) {//avoid mem leakboost::scoped_ptr<::google::protobuf::Message> recv_msg_guard(recv_msg);boost::scoped_ptr<::google::protobuf::Message> resp_msg_guard(resp_msg);std::string resp_str;pack_message(resp_msg, &resp_str);sock->send(boost::asio::buffer(resp_str));
}

pack_message用于打包数据,其实就是在序列化数据前插入4字节长度数据

void pack_message(const ::google::protobuf::Message* msg,std::string* serialized_data) {int serialized_size = msg->ByteSize();serialized_data->assign((const char*)&serialized_size,sizeof(serialized_size));msg->AppendToString(serialized_data);
}

程序输出如下

$ ./client
resp:I have received 'hello, myrpc.'
$ ./server
recv from client:127.0.0.1
recv service_name:EchoService
recv method_name:Echo
recv type:EchoRequest
resp type:EchoResponse
hello, myrpc.

完整代码,打包放在了Tiny-Tools,使用 cmake 编译,注意指定 protobuf boost 库的路径。

如何基于protobuf实现一个极简版的RPC(转载)相关推荐

  1. python3web库_基于 Python3 写的极简版 webserver

    基于 Python3 写的极简版 webserver.用于学习 HTTP协议,及 WEB服务器 工作原理.笔者对 WEB服务器 的工作原理理解的比较粗浅,仅是基于个人的理解来写的,存在很多不足和漏洞, ...

  2. 很多小伙伴不太了解ORM框架的底层原理,这不,冰河带你10分钟手撸一个极简版ORM框架(赶快收藏吧)

    大家好,我是冰河~~ 最近很多小伙伴对ORM框架的实现很感兴趣,不少读者在冰河的微信上问:冰河,你知道ORM框架是如何实现的吗?比如像MyBatis和Hibernate这种ORM框架,它们是如何实现的 ...

  3. 10分钟手撸极简版ORM框架!

    最近很多小伙伴对ORM框架的实现很感兴趣,不少读者在冰河的微信上问:冰河,你知道ORM框架是如何实现的吗?比如像MyBatis和Hibernte这种ORM框架,它们是如何实现的呢? 为了能够让小伙伴们 ...

  4. 【Liunx】进程的程序替换——自定义编写极简版shell

    目录 进程程序替换[1~5] 1.程序替换的接口(加载器) 2.什么是程序替换? 3.进程替换的原理 4.引入多进程 5.系列程序替换接口的详细解析(重点!) 自定义编写一个极简版shell[6~8] ...

  5. Atlas 200 DK开发者套件环境部署(1.0.9.alpha)极简版

    Atlas 200 DK开发者套件环境部署(1.0.9.alpha)极简版 前言 Atlas 200 DK开发者套件介绍 环境部署介绍 资源要求 开发环境部署 安装Docker 获取镜像(两种方法任选 ...

  6. 7句话让Codex给我做了个小游戏,还是极简版塞尔达,一玩简直停不下来

    点击上方"视学算法",选择加"星标"或"置顶" 重磅干货,第一时间送达 梦晨 萧箫 发自 凹非寺 量子位 | 公众号 QbitAI 什么,7 ...

  7. Serverless 实战 —— 基于 Serverless 的 VuePress 极简静态网站

    基于 Serverless 的 VuePress 极简静态网站 作者: Aceyclee 之前用过 Docsify + Serverless Framework 快速创建个人博客系统,虽然 docsi ...

  8. koa2+html模板,lenneth -- 基于koa2 的web极简框架

    说明 封装 lenneth 旨在快速方便的搭建出一个 node web 应用,不过度封装也不随波逐流,koa 的 node 是简单的,lenneth 也是. 基于 ES6+typescript 的一些 ...

  9. 一个极简版本的 VUE SSR demo

    我本人在刚开始看 VUE SSR 官方文档的时候遇到很多问题,它一开始是建立在你有一个可运行的构建环境的,所以它直接讲代码的实现,但是对于刚接触的开发者来说并没有一个运行环境,所以所有的代码片段都无法 ...

  10. 极简linux版本,4MLinux 26.0发布,这是一个极简版本

    4MLinux 26.0版已经发布,这是一个极简版本,包括桌面版(带有JWM)和服务器版(具有完整的LAMP环境). 该项目的最新稳定版本附带升级包以及对现代图像和视频编码的支持: 4MLinux 2 ...

最新文章

  1. 百度、长沙加码自动驾驶,湖南阿波罗智行科技公司成立...
  2. SDK location not found. Define location with sdk.dir in the local.properties file or with an ANDROID
  3. spring boot apollo demo
  4. 用SMTP,POP3访问Exchange邮箱:Exchange2003系列之六
  5. Android View滚动、拉伸到顶/底部弹性回弹复位
  6. WIN10+VS2015环境下安装PCL1.8.1
  7. layoutIfNeeded 就这样把我害惨
  8. 2018Web前端面试题及答案大全
  9. python爬京东联盟_PHP调用京东联盟开普勒、宙斯API模板
  10. 拼多多进军社区团购 店宝宝:巨头竞争加剧
  11. R语言manova函数多元方差分析(MANOVA)、单因素多元方差分析的两个假设是多元正态性和方差-协方差矩阵的齐性、QQ图评估多元正态性、mvoutlier包中的aq.plot函数检验多变量异常值
  12. Python简答题编程题
  13. 良田高拍仪接口文档对接
  14. android时钟每秒 1,极简时钟
  15. 上楼梯问题+不死兔子
  16. 河工计院ACM2022寒假培训题单以及超详细题解
  17. BOM和DOM的区别和关联
  18. python制作闯关答题软件_闯关答题-可以用做问答互动的软件-闯关答题会议 微信 问答游戏GO互动智能现场...
  19. [Python]FPG(FP-growth)算法核心实现
  20. python将文字转换为语音_python把文字转成语音

热门文章

  1. Java并发面试,幸亏有点道行,不然又被忽悠了 1
  2. 跑步进入全站 HTTPS ,这些经验值得你看看
  3. Hadoop 面试题之七
  4. STM32F10x 学习笔记4(CRC计算单元 续)
  5. C++生成随机数:几何分布(geometric distribution)
  6. MOOON-agent系统设计与使用说明
  7. 精通MVC3摘译(5)-使用URL模式最佳实践
  8. ISA Server、虚拟机、托管服务器的使用
  9. Blog访问量提升秘笈
  10. 15.企业应用架构模式 --- 分布模式