文章目录

  • 一、基本设计思路
  • 二、实现细节
  • 三、总结

作为rpc服务器,在启动过后,最主要的一个过程就是收到请求后的处理,而这就牵涉到一个网络编程相关最基本的部分:如何有效地处理socket传过来地数据,这篇文章就来详细聊一聊brpc是怎么处理收到的请求的。

一、基本设计思路

作为服务器,业界最典型的实现就是区分I/O线程和工作线程,一个或多个I/O线程负责从socket读取数据放入一个队列,然后一堆worker线程来从队列里取数据并处理,或者I/O线程读完数据直接交给worker,此类严格区分I/O线程和worker线程的机制会有几种典型的问题:

1.一个I/O线程同时只能读取一个fd,I/O线程通常只有一个或者几个,如果某个fd上数据量很大,读取很耗时会容易导致其他fd数据的阻塞。
2.如果要操作队列会产生竞争,高并发的时候严重影响性能。
3.当前主流的多核cpu机制下,各个核之间进行cache同步是微妙级的,并不是很快,容易出现cache bouncing,I/O线程将任务交给worker涉及到跨核,效率不是很高,之间的同样cpu缓存到内存之间交换数据也比较慢。

和很多rpc框架不同,brpc有一套独特的机制来解决上述问题,总的来说就是,不区分I/O线程和worker线程,通过全局的EventDispatcher来负责监听fd上的事件,如果fd上已经有bthread在处理了直接返回,否则启动一个bthread处理,被EventDispatcher启动的bthread负责读取fd上的消息,每读到一条就启动bthread去处理,最后一条原地执行,直到读完为止,因为EventDispatcher本身既不负责读消息也不负责处理消息,自身吞吐量可以做得很大,fd间和fd内的消息都能获得并发,所有线程都是wait-free的,这使得brpc在高负载时仍能及时处理不同来源的消息。下面就结合源码详细阐述相关机制。

二、实现细节

在上篇文章里已经介绍过了,服务器启动后会由EventDispatcher监听epoll事件,作为服务器,收到一个新连接属于epoll_in事件,会调用Socket::StartInputEvent进行处理。StartInputEvent函数如下:

StartInputEvent函数里,首先是调用Address方法根据SocketId获取socket s,因为一个fd上会不断地发生事件,socket类里面设置了一个butil::atomic _nevent变量,用来保证对于一个fd,同时只会有一个bthread在处理,当收到事件时,EventDispatcher给nevent加1,只有当加1前的值是0时启动一个bthread处理对应fd上的数据,前值不为0说明已经有bthread在处理该fd上的数据了,可以直接返回,因为正在处理的bthread会一直读下去。总的来说StartInputEvent仅仅是负责判断发生事件的fd上有没有bthread在处理了,没有就原地启动一个,有就直接返回,注意调用的是bthread_start_urgent,这个函数启动bthread会让出当前bthread,也就是官方文档所说的,“EventDispatcher把所在的pthread让给了新建的bthread,使其有更好的cache locality,可以尽快地读取fd上的数据”,不需要pthread切换对性能有帮助”。如果因为某些原因bthread_start_urgent启动失败,则原地执行ProcessEvent兜底。

调用的ProcessEvent如下:

也就是执行socket 里的_on_edge_triggered_events,对于目前收到新连接请求的情况,事件发生在监听端口上,对应的_on_edge_triggered_events是上篇文章提到的StartAccept里赋值的OnNewConnection,也就是来了新连接要调用的回调函数

OnNewConnections内部调用OnNewConnectionsUntilEAGAIN函数,OnNewConnectionsUntilEAGAIN核心如下:

首先accept监听fd来得到clinet fd

然后调用Create新建一个socket来具体处理本次新连接的后续消息,在里面添加了一个新的epoll_in 事件,注册的处理函数是InputMessenger::OnNewMessages,注意和前面的OnNewConnections对比,前面是处理新连接,这个是新连接处理后来处理新连接上的新消息。在InputMessenger::OnNewMessages函数里面,核心是读取消息然后交由相应协议的处理函数,进而调用用户服务里的实际业务函数。

对于clinet fd上收到的epoll_in事件,入口仍然是StartInputEvent,只不过随后的ProcessEvent调用的处理函数是InputMessenger::OnNewMessages,核心部分代码如下:

一直读取直到读完。

做一次读取

messenger是InputMessenger对象,负责从fd上切割和处理消息。上篇文章介绍了brpc的自定义协议,每个协议其中最重要的就是ParseXXXmessage和ProcessXXX,分别对应处理消息的两个基本步骤,Parse和Process,Parse一般是把消息从二进制流上切割下来,运行时间较固定;Process则是进一步解析消息(比如反序列化为protobuf)后调用用户回调,时间不确定。因为brpc是支持单端口多协议的,因此InputMessenger会逐一尝试用户指定的种协议的回调,当某一个Parse成功切割下一个消息后,调用对应的ProcessXXX。由于一个连接上往往只有一种消息格式,InputMessenger会记录下上次的选择,而避免每次都重复尝试。


QueueMessage是启动一个bthread执行ProcessInputMessage来处理一个msg。若连续从某个fd读取出n个消息(n > 1),InputMessenger会启动n-1个bthread分别处理前n-1个消息,最后一个消息则会在原地被Process。
根据handler里的解析器处理消息得到解析后的结果
ProcessInputMessage如下,调用的是msg->_process,也就是对应协议的process_request来处理cut下来的message:

这里实现的关键就是last_msg,定义如下,是一个自定义deleter的unique_ptr:

每次循环都会去执行对last_msg的处理,如果last_msg非null,则新启一个bthread去处理,在第一次循环,也就是处理第一个msg的时候,last_msg为null,随后的每一次循环都会先调用QueueMessage处理last_msg,

随后last_msg置成这次循环的msg,待下次循环处理,分别会对msg->_process (消息处理函数)和msg->_arg 赋值供后续上述ProcessInputMessage调用

所以最后一次循环完了后还剩一个last_msg没有处理,怎么办呢,last_msg的deleter是RunlastMessage,如下:

RunlastMessage跟queueMessage一样都是调用的ProcessInputMessage,析构的时候会自动调用deleter,从而实现最后一个msg原地调用。
Msg->_process是对应协议的process_request请求处理函数,其中最终会调用用户函数,以http协议为例,Msg->_proces指向的是协议注册时我们看到的ProcessHttpRequest,

可以看到msg作为参数进来后,通过msg->arg()拿到了指向server的指针,msg->arg()返回的就是msg->_arg,前面图里可以看到msg->_arg 所赋予的是handler 的arg,回忆一下,handler 的arg其实就是所启动的server的指针,是在BuildAcceptor()里面添加各种handler的时候赋值的:

ProcessHttpRequest核心处理部分如下:
先拿到MethodProperty sp

得到service和method

调用业务代码,比如echo示例:


具体的打包发送则是通过调用done->Run(),在推荐用法里,由RAII机制保证,也就是上图的done_guard。

三、总结

brpc在处理接收到的请求的时候,不区分io线程核worker线程,二是利用EventDispatcher基于边缘触发的epoll做event的分发,这样并发可以做得很大,具体的消息切割、协议相关的请求处理、以及后续的用户逻辑均交由独立的bthread来处理,实现了wait-free,不用担心某个请求io耗时过大造成批量IO阻塞,也没有生产者消费者队列所带来的瓶颈。并且是让出当前pthread的方式进行调度,保证了cache locality。以上消息处理上的优势也是brpc高性能的基石之一。

brpc源码解析(二)—— brpc收到请求的处理过程相关推荐

  1. 【深度学习模型】智云视图中文车牌识别源码解析(二)

    [深度学习模型]智云视图中文车牌识别源码解析(二) 感受 HyperLPR可以识别多种中文车牌包括白牌,新能源车牌,使馆车牌,教练车牌,武警车牌等. 代码不可谓不混乱(别忘了这是职业公司的准产品级代码 ...

  2. gin 源码解析 - 详解http请求在gin中的流转过程

    本篇文章是 gin 源码分析系列的第二篇,这篇文章我们主要弄清一个问题:一个请求通过 net/http 的 socket 接收到请求后, 是如何回到 gin 中处理逻辑的? 我们仍然以 net/htt ...

  3. android网络框架retrofit源码解析二

    注:源码解析文章参考了该博客:http://www.2cto.com/kf/201405/305248.html 前一篇文章讲解了retrofit的annotation,既然定义了,那么就应该有解析的 ...

  4. erlang下lists模块sort(排序)方法源码解析(二)

    上接erlang下lists模块sort(排序)方法源码解析(一),到目前为止,list列表已经被分割成N个列表,而且每个列表的元素是有序的(从大到小) 下面我们重点来看看mergel和rmergel ...

  5. Kubernetes学习笔记之Calico CNI Plugin源码解析(二)

    女主宣言 今天小编继续为大家分享Kubernetes Calico CNI Plugin学习笔记,希望能对大家有所帮助. PS:丰富的一线技术.多元化的表现形式,尽在"360云计算" ...

  6. Mobx 源码解析 二(autorun)

    前言 我们在Mobx 源码解析 一(observable)已经知道了observable 做的事情了, 但是我们的还是没有讲解明白在我们的Demo中,我们在Button 的Click 事件中只是对ba ...

  7. Thrift源码解析(二)序列化协议

    概述 对于一个RPC框架,定义好网络数据的序列化协议是最基本的工作,thrift的序列化协议主要包含如下几种: TBinaryProtocol TCompactProtocol TJSONProtoc ...

  8. brpc源码解析(一)—— rpc服务添加以及服务器启动主要过程

    目录 1.往Server里添加Service(业务代码) 2.设置服务器参数 3.启动服务器 平时的工作用到了baidu-rpc搭建rpc服务,作为戈君大神的大作,在没有开源的时候,这个c++ 的rp ...

  9. brpc源码解析(四)—— Bthread机制

    目录 一.概述 二.启动入口函数 三.内部启动函数 四.worker工作入口 五.总结 Bthread是brpc用到的一个线程库,也是brpc的核心之一,默认情况下,包括用户代码在内的绝大部分代码都是 ...

最新文章

  1. Android-环境问题
  2. 利用开源社区打造微服务生态体系
  3. 实际应用中git(合并本地与服务器项目)
  4. Activity和Service通信 使用BroadcastReceiver
  5. Git 添加空文件夹的方法
  6. sar图像去噪matlab,一种基于总曲率的SAR图像变分去噪方法与流程
  7. tess4j 注意事项
  8. OpenCV 4.5 发布!
  9. vscode无法打开源文件iostream_C++的iostream标准库介绍(1)
  10. Winform/WPF国际化处理
  11. java 某年某月的天数_Java练习 SDUT-1160_某年某月的天数
  12. tenacity 报错_Python tenacity源码分析(一个专门用来做重试的库)v1.0
  13. 在ubuntu11.10上安装6款顶级漂亮的BURG主题
  14. 学会Python有哪些可以做的兼职?所有途径全在这里了...
  15. TeamViewer和向日葵远控软件的个人使用感觉
  16. python 自动执行 apdl_【转载】利用VB生成APDL文件 和Python文件的方法
  17. JavaWeb:HTML
  18. clover安装黑苹果10.15.3常见问题集合
  19. 推荐系统之DIN模型(注意力机制对业务的理解)
  20. 论文排版中的页眉页脚设置

热门文章

  1. Dubbo异常 Can not lock the registry cache file解决方案
  2. 弹性云服务器(Elastic Cloud Server,ECS)
  3. 电源系统该如何选择电容大小及数量
  4. mysql 小版本直接升级
  5. 数据挖掘之坦坦尼克号获救情况分析
  6. 如何可以同时登录多个账户:无痕模式的使用
  7. Idea设置代码自动提示快捷键
  8. .Net MAUI 安卓状态栏透明、半透明、全屏
  9. 建立一个学生数据库/表
  10. 银行软件测试面试题目总结,希望可以帮到你