本项目将从零开始搭建出一个基于协程的异步RPC框架。

学习本项目需要有一定的C++,网络,RPC知识

项目地址:zavier-wong/acid: A high performance fiber RPC network framework. 高性能协程RPC网络框架

RPC框架设计

本项目将从零开始搭建出一个基于协程的异步RPC框架。

通过本项目你将学习到:

  • 协程同步原语
  • 序列化协议
  • 通信协议
  • 连接复用
  • 服务注册
  • 服务发现
  • 负载均衡
  • 健康检查
  • 三种异步调用方式

相信大家对RPC的相关概念都已经很熟悉了,这里不做过多介绍,直接进入重点。
本文档将简单介绍框架的设计,在最后的 examples 会给出完整的使用范例。
更多的细节需要仔细阅读源码。

本RPC框架主要有网络模块, 序列化模块,通信协议模块,客户端模块,服务端模块,服务注册中心模块,负载均衡模块

主要有以下三个角色:

注册中心 Registry

主要是用来完成服务注册和服务发现的工作。同时需要维护服务下线机制,管理了服务器的存活状态。

服务提供方 Service Provider

其需要对外提供服务接口,它需要在应用启动时连接注册中心,将服务名发往注册中心。服务端还需要启动Socket服务监听客户端请求。

服务消费方 Service Consumer

客户端需要有从注册中心获取服务的基本能力,它需要在发起调用时,
从注册中心拉取开放服务的服务器地址列表存入本地缓存,
然后选择一种负载均衡策略从本地缓存中筛选出一个目标地址发起调用,
并将这个连接存入连接池等待下一次调用。

协程同步原语

我们都知道,一旦协程阻塞后整个协程所在的线程都将阻塞,这也就失去了协程的优势。编写协程程序时难免会对一些数据进行同步,而Linux下常见的同步原语互斥量、条件变量、信号量等基本都会堵塞整个线程,使用原生同步原语协程性能将大幅下降,甚至发生死锁的概率大大增加。

框架实现了一套协程同步原语来解决原生同步原语带来的阻塞问题,在协程同步原语之上实现更高层次同步的抽象——Channel用于协程之间的便捷通信。

具体设计思路见 通用协程同步原语设计

序列化协议

本模块支持了基本类型以及标准库容器的序列化,包括:

  • 顺序容器:string, list, vector
  • 关联容器:set, multiset, map, multimap
  • 无序容器:unordered_set, unordered_multiset, unordered_map, unordered_multimap
  • 异构容器:tuple

以及通过以上任意组合嵌套类型的序列化

序列化有以下规则:

  1. 默认情况下序列化,8,16位类型以及浮点数不压缩,32,64位有符号/无符号数采用 zigzag 和 varints 编码压缩
  2. 针对 std::string 会将长度信息压缩序列化作为元数据,然后将原数据直接写入。char数组会先转换成 std::string 后按此规则序列化
  3. 调用 writeFint 将不会压缩数字,调用 writeRowData 不会加入长度信息

对于任意用户自定义类型,只要实现了以下的重载,即可参与传输时的序列化。

template<typename T>
Serializer &operator >> (Serializer& in, T& i){return *this;
}
template<typename T>
Serializer &operator << (Serializer& in, T i){return *this;
}

rpc调用过程:

  • 调用方发起过程调用时,自动将参数打包成tuple,然后序列化传输。
  • 被调用方收到调用请求时,先将参数包反序列回tuple,再解包转发给函数。

通信协议

+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+
|  BYTE  |        |        |        |        |        |        |        |        |        |        |             ........                                                           |
+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+
|  magic | version|  type  |          sequence id              |          content length           |             content byte[]                                                     |
+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+

封装通信协议,使RPC Server和RPC Client 可以基于同一协议通信。

采用私有通信协议,协议如下:

第一个字节是魔法数。

第二个字节代表协议版本号,以便对协议进行扩展,使用不同的协议解析器。

第四个字节开始是一个32位序列号,用来识别请求顺序。

第七个字节开始的四字节表示消息长度,即后面要接收的内容长度。

除了协议头固定7字节,消息不定长。

目前提供了以下几种请求

enum class MsgType : uint8_t {HEARTBEAT_PACKET,       // 心跳包RPC_PROVIDER,           // 向服务中心声明为providerRPC_CONSUMER,           // 向服务中心声明为consumerRPC_REQUEST,            // 通用请求RPC_RESPONSE,           // 通用响应RPC_METHOD_REQUEST ,    // 请求方法调用RPC_METHOD_RESPONSE,    // 响应方法调用RPC_SERVICE_REGISTER,   // 向中心注册服务RPC_SERVICE_REGISTER_RESPONSE,RPC_SERVICE_DISCOVER,   // 向中心请求服务发现RPC_SERVICE_DISCOVER_RESPONSE
};

连接复用

对于短连接来说,每次发起rpc调用就创建一条连接,由于没有竞争实现起来比较容易,但开销太大。所以本框架实现了rpc连接复用来支持更高的并发。

连接复用的问题在于,在一条连接上可以有多个并发的调用请求,由于服务器也是并发处理这些请求的,所以导致了服务器返回的响应顺序与请求顺序不一致。

具体的解决方法见 rpc连接复用设计

服务注册

每一个服务提供者对应的机器或者实例在启动运行的时候,
都去向注册中心注册自己提供的服务以及开放的端口。
注册中心维护一个服务名到服务地址的多重映射,一个服务下有多个服务地址,
同时需要维护连接状态,断开连接后移除服务。

/*** 维护服务名和服务地址列表的多重映射* serviceName -> serviceAddress1*             -> serviceAddress2*             ...*/
std::multimap<std::string, std::string> m_services;

服务发现

虽然服务调用是服务消费方直接发向服务提供方的,但是分布式的服务,都是集群部署的,
服务的提供者数量也是动态变化的,
所以服务的地址也就无法预先确定。
因此如何发现这些服务就需要一个统一注册中心来承载。

客户端从注册中心获取服务,它需要在发起调用时,
从注册中心拉取开放服务的服务器地址列表存入本地缓存,

负载均衡

实现通用类型负载均衡路由引擎(工厂)。
通过路由引擎获取指定枚举类型的负载均衡器,
降低了代码耦合,规范了各个负载均衡器的使用,减少出错的可能。

提供了三种路由策略(随机、轮询、一致性哈希),
由客户端使用,在客户端实现负载均衡


/*** @brief: 路由均衡引擎*/
template<class T>
class RouteEngine {public:static typename RouteStrategy<T>::ptr queryStrategy(typename RouteStrategy<T>::Strategy routeStrategyEnum) {switch (routeStrategyEnum){case RouteStrategy<T>::Random:return s_randomRouteStrategy;case RouteStrategy<T>::Polling:return std::make_shared<impl::PollingRouteStrategyImpl<T>>();case RouteStrategy<T>::HashIP:return s_hashIPRouteStrategy ;default:return s_randomRouteStrategy ;}}
private:static typename RouteStrategy<T>::ptr s_randomRouteStrategy;static typename RouteStrategy<T>::ptr s_hashIPRouteStrategy;
};

选择客户端负载均衡策略,根据路由策略选择服务地址。默认随机策略。

RouteStrategy<int>::ptr strategy =RouteEngine<int>::queryStrategy(Strategy::Random);

客户端同时会维护RPC连接池,以及服务发现结果缓存,减少频繁建立连接。

通过上述策略尽量消除或减少系统压力及系统中各节点负载不均衡的现象。

健康检查

服务中心必须管理服务器的存活状态,也就是健康检查。
注册服务的这一组机器,当这个服务组的某台机器如果出现宕机或者服务死掉的时候,
就会剔除掉这台机器。这样就实现了自动监控和管理。

项目采用了心跳发送的方式来检查健康状态。

服务器端:
开启一个定时器,定期给注册中心发心跳包,注册中心会回一个心跳包。

注册中心:
开启一个定时器倒计时,每次收到一个消息就更新一次定时器。如果倒计时结束还没有收到任何消息,则判断服务掉线。

三种异步调用方式

整个框架最终都要落实在服务消费者。为了方便用户,满足用户的不同需求,项目设计了三种异步调用方式。
三种调用方式的模板参数都是返回值类型,对void类型会默认转换uint8_t 。

  1. 以同步的方式异步调用

整个框架本身基于协程池,所以在遇到阻塞时会自动调度实现以同步的方式异步调用

Result<int> res = con->call<int>("add", 123, 321);
ACID_LOG_INFO(g_logger) << res.getVal();
  1. future 形式的异步调用

调用时会立即返回一个future

future<Result<int>> res = con->async_call<int>("add", 123, 321);
ACID_LOG_INFO(g_logger) << res.get().getVal();
  1. 异步回调

async_call的第一个参数为函数时,启用回调模式,回调参数必须是返回类型的包装。收到消息时执行回调。

con->async_call<int>([](Result<int> res){ACID_LOG_INFO(g_logger) << res.getVal();
}, "add", 123, 321);

对调用结果及状态的封装如下

/*** @brief RPC调用状态*/
enum RpcState{RPC_SUCCESS = 0,    // 成功RPC_FAIL,           // 失败RPC_NO_METHOD,      // 没有找到调用函数RPC_CLOSED,         // RPC 连接被关闭RPC_TIMEOUT         // RPC 调用超时
};/*** @brief 包装 RPC调用结果*/
template<typename T = void>
class Result {...
private:/// 调用状态code_type m_code = 0;/// 调用消息msg_type m_msg;/// 调用结果type m_val;
}

最后

通过以上介绍,我们粗略地了解了分布式服务的大概流程。但篇幅有限,无法面面俱到,
更多细节就需要去阅读代码来理解了。

这并不是终点,项目只是实现了简单的服务注册、发现。后续将考虑加入注册中心集群,限流,熔断,监控节点等。

示例

rpc服务注册中心

#include "acid/rpc/rpc_service_registry.h"// 服务注册中心
void Main() {acid::Address::ptr address = acid::Address::LookupAny("127.0.0.1:8080");acid::rpc::RpcServiceRegistry::ptr server = std::make_shared<acid::rpc::RpcServiceRegistry>();// 服务注册中心绑定在8080端口while (!server->bind(address)){sleep(1);}server->start();
}int main() {acid::IOManager loop;loop.submit(Main);
}

rpc 服务提供者

#include "acid/rpc/rpc_server.h"int add(int a,int b){return a + b;
}// 向服务中心注册服务,并处理客户端请求
void Main() {int port = 9000;acid::Address::ptr local = acid::IPv4Address::Create("127.0.0.1",port);acid::Address::ptr registry = acid::Address::LookupAny("127.0.0.1:8080");acid::rpc::RpcServer::ptr server = std::make_shared<acid::rpc::RpcServer>();;// 注册服务,支持函数指针server->registerMethod("add",add);// 支持函数对象server->registerMethod("echo", [](std::string str){return str;});// 支持标准库容器server->registerMethod("revers", [](std::vector<std::string> vec) -> std::vector<std::string>{std::reverse(vec.begin(), vec.end());return vec;});// 先绑定本地地址while (!server->bind(local)){sleep(1);}// 绑定服务注册中心server->bindRegistry(registry);// 开始监听并处理服务请求server->start();
}int main() {acid::IOManager loop;loop.submit(Main);
}

rpc 服务消费者,并不直接用RpcClient,而是采用更高级的封装,RpcConnectionPool。
提供了连接池和服务地址缓存。

#include "acid/log.h"
#include "acid/rpc/rpc_connection_pool.h"static acid::Logger::ptr g_logger = ACID_LOG_ROOT();// 连接服务中心,自动服务发现,执行负载均衡决策,同时会缓存发现的结果
void Main() {acid::Address::ptr registry = acid::Address::LookupAny("127.0.0.1:8080");// 设置连接池的数量acid::rpc::RpcConnectionPool::ptr con = std::make_shared<acid::rpc::RpcConnectionPool>();// 连接服务中心con->connect(registry);// 第一种调用接口,以同步的方式异步调用,原理是阻塞读时会在协程池里调度acid::rpc::Result<int> sync_call = con->call<int>("add", 123, 321);ACID_LOG_INFO(g_logger) << sync_call.getVal();// 第二种调用接口,future 形式的异步调用,调用时会立即返回一个futurestd::future<acid::rpc::Result<int>> async_call_future = con->async_call<int>("add", 123, 321);ACID_LOG_INFO(g_logger) << async_call_future.get().getVal();// 第三种调用接口,异步回调con->async_call<int>([](acid::rpc::Result<int> res){ACID_LOG_INFO(g_logger) << res.getVal();}, "add", 123, 321);// 测试并发int n=0;while(n != 1000) {n++;con->async_call<int>([](acid::rpc::Result<int> res){ACID_LOG_INFO(g_logger) << res.getVal();}, "add", 0, n);}
}int main() {acid::IOManager loop;loop.submit(Main);
}

C++高性能协程分布式服务框架设计相关推荐

  1. RSF 分布式服务框架设计

    为什么80%的码农都做不了架构师?>>>    是时候设计一个分布式服务框架了.我先将它定名为 Hasor-RSF,"RSF"为 Remote Service F ...

  2. 分布式服务框架设计笔记一

    一.服务发展历程: 传统MVC->  SOA ->微服务 SOA也是服务化一种,它与微服务的区分主要在:1.服务化粒度  2.微服务强调独立布署独立测试及独立发布 3.注册服务微服务是动态 ...

  3. RSF 分布式服务框架设计:线程模型

    为什么80%的码农都做不了架构师?>>>    RSF 的线程模型 使用了 RSF 框架之后系统一共会产生至少 7 条线程,有些功能的线程可能会产生多个.我们先来鸟瞰一下所有的线程和 ...

  4. RSF 分布式服务框架-传输协议层设计

    为什么80%的码农都做不了架构师?>>>    这是接上一篇文章<RSF 分布式服务框架设计>之后的续作,主要是 Hasor-RSF 协议层的设计. RSF 的协议层设计 ...

  5. RSF 分布式服务框架-服务端工作原理

    为什么80%的码农都做不了架构师?>>>    这是接上一篇文章<RSF 分布式服务框架设计>之后的续作,主要是 Hasor-RSF 的请求响应工作原理以及设计思路. 非 ...

  6. 分布式服务框架学习笔记2 常用的分布式服务框架 与 通信框架选择

    传统垂直架构改造的核心就是要对应用进行服务化,服务化改造使用到的核心技术就是分布式服务框架. 分布式服务框架演进 应用从集中式走向分布式 大规模系统架构的设计一般原则就是尽可能地拆分,以达到更好的独立 ...

  7. 分布式服务框架之服务化最佳实践

    在服务化之前,业务通常都是本地API调用,本地方法调用性能损耗较小.服务化之后,服务提供者和消费者之间采用远程网络通信,增加了额外的性能损耗,业务调用的时延将增大,同时由于网络闪断等原因,分布式调用失 ...

  8. 分布式服务框架(一)

    转载自:https://www.cnblogs.com/jiyukai/p/9459983.html 分布式服务框架(一) 一.RPC RPC(Remote Process Call),即远程服务调用 ...

  9. 华为18级大牛倾情奉送:分布式服务框架和微服务设计原理实战文档,啃完发现涨薪如此简单

    前言 分布式服务框架不仅仅包含核心的运行时类库,还包括服务划分原则.服务化最佳实践.服务治理.服务监控.服务开发框架等,它是一套完整的解决方案,用来协助应用做服务化改造,以及指导用户如何构建适合自己业 ...

最新文章

  1. 为什么很多SpringBoot开发者放弃了Tomcat,选择了Undertow?
  2. Android之ListView优化
  3. HDLBits答案(21)_Verilog有限状态机(8)
  4. 手动挡五个档位示意图_汽车档位越多越好?听听专业回答
  5. mysql查询没有权限试图_MySQL迁移后提示查询view权限不足的处理
  6. java中对时间的操作
  7. python写日志到文件_Python日志文件没有正确地写入日志消息,只有格式
  8. Python 进阶 —— 装饰器函数的使用
  9. opencv 缺少boostdesc_bgm.i等文件
  10. java路径通配符_java实现路径通配符*,**,?
  11. 使用 Python 进行双重退火优化
  12. 8000401a 错误及解决办法
  13. rollup函数(分组后对每组数据分别合计)
  14. C#+ AE实现地图注记功能
  15. Python爬虫前置知识
  16. 2022年江西二级建造师矿业工程施工技术综合测试题及答案
  17. Kanzi常用操作4
  18. Python爬虫练习:爬取猫眼电影实时票房
  19. 一文带你学会使用小程序CMS内容管理
  20. 国际短信平台短信路由搭建后台软件定制-移讯云短信系统

热门文章

  1. 谷歌邮件曝光:无人车项目提防百度、滴滴、Uber
  2. C语言——快速排序(前后指针法)
  3. php实现爬取知乎神回复数据——做成小程序上线
  4. vscode显示函数列表
  5. 【Excel】Excel清空空格位置
  6. LeetCode 636 Exclusive Time of Functions
  7. 责任链模式 - Unity
  8. map 转 json 字段为null的数据丢失
  9. 系统学习大模型的20篇论文
  10. 判断对象是否为空 三种方法