BGCC源代码(一)
本文从整体上介绍下百度的通用通信组件, 需要下载源码的同学,请点这里 http://bgcc.baidu.com/
第一部分:服务端逻辑
1.线程池,在服务启动时创建线程,
2.一个线程池对应一个同步的任务队列 ,
3.线程池中的每个线程,初始时,都阻塞在任务队列, 等待唤醒,
4.主线程进入事件循环,监听事件
5.遇到读事件,调用添加事件时指定的回调函数, 默认为DataCallback, 如果是连接请求,调用AcceptDataCallback
6.DataCallback解析包头和包体, 得到processor名称, 找到对应的processor,
7.把processor、包体、序列化对象封装到成一个任务,push到同步队列中,并触发信号量 sem_post
8.线程池中的线程醒来, 调用processor处理task任务, 从包体中读出期望执行的函数名, 执行该函数(业务实现)
9.业务在该函数中实现功能后, 还需调用序列化类的writeMessageBegin, writeMessageEnd, 进而调用socket类中的write, 并最终调用send返回处理结果给客户端。
这里没有使用EpollOut事件, 当发送缓冲区满时, 会返回-1,errno=EAGAIN, 此时bgcc直接返回失败了, 没有继续处理。 这里如果了注册epoll_out事件,就可以处理了。
10.重新注册事件
看一个服务端使用的例子
1 Server* server;2 3 void* server_func(const bool* isstopped, void*) {4 SharedPointer<IProcessor> xp(5 new MathProcessor(6 SharedPointer<Math>(7 new MathImpl)));8 9 ServiceManager sm; 10 sm.add_service(xp); 11 12 ThreadPool tp; 13 tp.init(10); 14 15 server = new Server(&sm, &tp, 8321); 16 if (0 != server->serve()) { 17 return 0; 18 } 19 return NULL; 20 } 21 22 int main(int argc, char* argv[]) { 23 log_open("server.conf"); 24 Thread t(server_func); 25 t.start(); 26 27 return 0; 28 }
例子中, Server是bgcc::EpollServer类。定义如下
1 #ifndef _BGCC2_EPOLL_SERVER_H_2 #define _BGCC2_EPOLL_SERVER_H_3 4 #ifndef _WIN325 6 #include "bgcc_common.h"7 #include "server.h"8 #include "event_poll.h"9 #include "mempool.h" 10 #include "service_manager.h" 11 #include "thread_pool.h" 12 #include "server_task.h" 13 14 namespace bgcc { 15 16 class EpollServer : public IServer { 17 public: 18 EpollServer(ServiceManager* service_manager, 19 ThreadPool* thread_pool, 20 uint16_t port, 21 const std::string& node = ""); 22 23 virtual ~EpollServer() { 24 } 25 26 virtual int32_t init(); 27 virtual int32_t serve(); 28 virtual int32_t stop(); 29 30 ServiceManager* get_service_manager(); 31 ThreadPool* get_thread_pool(); 32 33 TaskAsso Tasks[MAXNFD]; 34 35 protected: 36 enum state_t { 37 S_UNINIT, 38 S_INIT, 39 S_SERVE, 40 S_STOPPED 41 }; 42 int32_t socket_init(); 43 44 ServiceManager* _service_manager; 45 ThreadPool* _thread_pool; 46 uint16_t _port; 47 state_t _state; 48 int32_t _listenfd; 49 EventLoop _loop; 50 std::string _node; 51 }; 52 53 typedef EpollServer Server; 54 } 55 56 #endif // _WIN32 57 58 #endif // _BGCC2_EPOLL_SERVER_H_
我们主要关心EpollServer::serve(), 其实现如下
1 int32_t EpollServer::serve() {2 3 int32_t ret;4 if (0 != (ret = init())) {5 return ret;6 }7 8 if (S_INIT != _state) {9 BGCC_NOTICE("bgcc", "Need to call `init' before `serve' on Instance of EpollServer\n"); 10 return E_BGCC_SERVER_NEED_INIT; 11 } 12 13 _listenfd = socket_init(); 14 if (INVALID_SOCKET == _listenfd) { 15 return E_BGCC_SERVER_CREATE_LISTENFD_FAILED; 16 } 17 18 Event e; 19 EventCallback::PrepareEvent(e, _listenfd, const_cast<EpollServer*>(this)); 20 e.read_cb = EventCallback::AcceptCallback; 21 _loop.add_event(&e); 22 23 BGCC_NOTICE("bgcc", "fd=%d Is Begin To Wait for accept new client On %s:%d", 24 _listenfd, (_node.empty()?"*":_node.c_str()), _port); 25 _state = S_SERVE; 26 27 return _loop.loop(); 28 }
其中_loop类型是bgcc::EventLoop, 其定义如下
1 /**2 * @brief 事件循环3 * @see4 * @note5 * @author liuxupeng(liuxupeng@baidu.com)6 * @date 2012年06月14日 20时05分36秒7 */8 class EventLoop {9 public: 10 /** 11 * @brief EventLoop 构造函数 12 * @see 13 * @note 14 * @author liuxupeng(liuxupeng@baidu.com) 15 * @date 2012年06月14日 20时19分50秒 16 */ 17 EventLoop(); 18 19 /** 20 * @brief create 创建内部epoll 21 * 22 * @return 成功返回0 23 * @see 24 * @note 25 * @author liuxupeng(liuxupeng@baidu.com) 26 * @date 2012年06月14日 20时19分59秒 27 */ 28 int32_t create(); 29 int32_t destroy(); 30 31 int32_t add_event(Event* event); 32 int32_t del_event(Event* event); 33 34 int32_t loop(); 35 int32_t unloop(); 36 bool is_stopped() const; 37 private: 38 enum state_t { 39 S_UNINIT, 40 S_INIT, 41 S_LOOP, 42 S_STOP, 43 S_DESTROYED 44 }; 45 private: 46 state_t _state; 47 volatile bool _stopped; 48 int32_t _epfd; 49 struct epoll_event _ep_events[MAXNFD]; 50 Event _events[MAXNFD]; 51 }; 52 }
其中loop()实现如下
1 int32_t EventLoop::loop() {2 if (S_INIT != _state) {3 return -1;4 }5 _state = S_LOOP;6 _stopped = false;7 8 while (!_stopped) {9 int32_t numevents; 10 while((numevents=epoll_wait(_epfd, _ep_events, MAXNFD, 200))==SOCKET_ERROR&&EINTR==errno); 11 12 if (numevents > 0) { 13 int j; 14 15 for (j = 0; j < numevents; j++) { 16 struct epoll_event* e = _ep_events + j; 17 int32_t fd = e->data.fd; 18 19 if (e->events & EPOLLIN) { 20 if (_events[fd].read_cb) 21 (_events[fd].read_cb)(this, fd, _events[fd].read_cb_arg); 22 } 23 if (e->events & EPOLLOUT) { 24 if (_events[fd].write_cb) 25 (_events[fd].write_cb)(this, fd, _events[fd].write_cb_arg); 26 } 27 if (e->events & EPOLLERR) { 28 if (_events[fd].error_cb) 29 (_events[fd].error_cb)(this, fd, _events[fd].error_cb_arg); 30 } 31 } 32 } 33 } 34 _state = S_STOP; 35 return 0;
从实现上看, 事件循环使用了epoll_wait,EventLoop类中有一个事件数组成员_events[], 所有fd事件都保存在该数组中。 增删fd事件,也都需要操作该数组。
添加事件
1 int32_t EventLoop::add_event(Event* event) {2 if (S_INIT != _state && S_LOOP != _state) {3 return -1;4 }5 6 if (NULL == event) {7 return 0;8 }9 10 int32_t fd = event->fd; 11 uint32_t mask = event->mask; 12 13 int32_t op; 14 if (EVENT_NONE == _events[fd].mask) { 15 op = EPOLL_CTL_ADD; 16 } 17 else { 18 op = EPOLL_CTL_MOD; 19 mask |= _events[fd].mask; 20 } 21 _events[fd].mask = mask; 22 23 struct epoll_event ee; 24 25 // To fix valgrind error: Syscall param epoll_ctl(event) points to uninitialised byte(s) 26 memset(&ee.data, 0, sizeof(ee.data)); 27 28 ee.data.fd = fd; 29 ee.events = 0; 30 31 if (mask & EVENT_READ) { 32 ee.events |= EPOLLIN; 33 _events[fd].read_cb = event->read_cb; 34 _events[fd].read_cb_arg = event->read_cb_arg; 35 } 36 37 if (mask & EVENT_WRITE) { 38 ee.events |= EPOLLOUT; 39 _events[fd].write_cb = event->write_cb; 40 _events[fd].write_cb_arg = event->write_cb_arg; 41 } 42 43 if (mask & EVENT_ERROR) { 44 ee.events |= EPOLLERR; 45 _events[fd].error_cb = event->error_cb; 46 _events[fd].error_cb_arg = event->error_cb_arg; 47 } 48 49 if(SocketTool::set_nonblock(fd, 1)!=0){ 50 BGCC_WARN("bgcc", "Before Add fd=%d to Epoll Set To Nonblock Failed(%d)", 51 fd, BgccGetLastError()); 52 return -1; 53 } 54 55 int32_t ret=epoll_ctl(_epfd, op, fd, &ee); 56 if(0!=ret&&EPOLL_CTL_ADD==op){ 57 if(SocketTool::set_nonblock(fd, 0)!=0){ 58 BGCC_WARN("bgcc", "Add fd=%d to Epoll Failed Set To Block Failed(%d)", 59 fd, BgccGetLastError()); 60 } 61 } 62 63 return ret; 64 }
删除事件
1 int32_t EventLoop::del_event(Event* event) {2 if (S_INIT != _state && S_LOOP != _state) {3 return -1;4 }5 6 if (NULL == event) {7 return 0;8 }9 10 int32_t fd = event->fd; 11 uint32_t mask = _events[fd].mask & (~event->mask); 12 _events[fd].mask = mask; 13 14 struct epoll_event ee; 15 ee.data.fd = fd; 16 ee.events = 0; 17 if (mask & EVENT_READ) ee.events |= EPOLLIN; 18 if (mask & EVENT_WRITE) ee.events |= EPOLLOUT; 19 20 int32_t op; 21 if (mask != EVENT_NONE) { 22 op = EPOLL_CTL_MOD; 23 } else { 24 op = EPOLL_CTL_DEL; 25 // _events[fd].Reset(); 26 } 27 28 int32_t ret=epoll_ctl(_epfd, op, fd, &ee); 29 if(EPOLL_CTL_DEL==op&&0==ret){ 30 if(SocketTool::set_nonblock(fd, 0)!=0){ 31 BGCC_WARN("bgcc", "Del fd=%d From Epoll Set To Block Failed(%d)", 32 fd, BgccGetLastError()); 33 } 34 } 35 return ret; 36 }
添加和删除事件,最终都是对epoll接口的封装,
接着看下_events[] 中的元素类型 Event, 其定义如下
1 /**2 * @brief 封装事件及事件处理函数3 * @see4 * @note5 * @author liuxupeng(liuxupeng@baidu.com)6 * @date 2012年06月14日 20时00分59秒7 */8 struct Event {9 /** 10 * @brief Event 事件类 11 * @see 12 * @note 13 * @author liuxupeng(liuxupeng@baidu.com) 14 * @date 2012年06月14日 20时04分59秒 15 */ 16 Event() { 17 Reset(); 18 } 19 20 void Reset(){ 21 fd=INVALID_SOCKET; 22 mask=EVENT_NONE; 23 read_cb=NULL; 24 write_cb=NULL; 25 error_cb=NULL; 26 read_cb_arg=NULL; 27 write_cb_arg=NULL; 28 error_cb_arg=NULL; 29 } 30 31 int32_t fd; /** 事件对应的fd*/ 32 uint32_t mask; /** 事件标识位*/ 33 callback_func_t read_cb; /** 读回调*/ 34 callback_func_t write_cb; /** 写回调*/ 35 callback_func_t error_cb; /** 错误回调*/ 36 void* read_cb_arg; 37 void* write_cb_arg; 38 void* error_cb_arg; 39 };
Event对象包含了事件fd、事件类型、以及回调函数、回调函数的参数, 这里有read_cb, write_cb, error_cb
回到EpollServer::serve(), 它调用了EpollServer::init()
转载于:https://my.oschina.net/u/4000302/blog/3100068
BGCC源代码(一)相关推荐
- idea中设置指向源代码Scala
1.到官网下载scala源代码 点击如下链接下载源码:http://www.scala-lang.org/download/all.html 选择需要的版本点击进行下载,我选择的是2.11.8版本,如 ...
- Android系统默认Home应用程序(Launcher)的启动过程源代码分析
在前面一篇文章中,我们分析了Android系统在启动时安装应用程序的过程,这些应用程序安装好之后,还需要有一个Home应用程序来负责把它们在桌面上展示出来,在Android系统中,这个默认的Home应 ...
- easymailobjects php,用easymailobject组件处理exchange邮件源代码(6)_asp实例
在ASP中用EasyMailObject处理Exchange邮件源代码-发送邮件的界面(sendmail1.asp) ************************************* 这个文 ...
- c语言编程学生管理系统的代码,C语言学生管理系统源代码.doc
C语言学生成绩管理系统源代码,保证能用-- #include "malloc.h" #include "stdio.h" #include "stdl ...
- java 流的方式抓取网页 但是显示不全_用java抓取网页源代码时总是无法获取完整的源代码信息,求指导...
该楼层疑似违规已被系统折叠 隐藏此楼查看此楼 无论是用urlconnection还是httpurlconnection都只能获得一部分网页源代码(即有的标签内容在网页上右键-查看源代码能看到,但是用下 ...
- linux下从git获取有权限的代码,linux下从源代码安装git
之所以有这样的需求,是因为部分预安装的git版本太低,很多功能没有并且安全性存在问题. 比如git submodule add xxx@host:yyy.git必须在父repo的root目录安装,而新 ...
- 【camera】自动泊车-视觉车位检测相关资料汇总(论文、数据集、源代码、相关博客、演示demo)(1)
[camera]自动泊车-视觉车位检测相关资料汇总(论文.数据集.源代码.相关博客.演示demo)parking slot detection 论文 2020论文 2019论文 2018论文 2017 ...
- 如何查看OpenCV自带函数的源代码
OpenCV提供的内部函数能实现好多图像处理功能,有时我们需要改进函数或者想看一下函数的具体实现,一般有以下两种方法来查看其内部函数代码: 方法一:在opencv的安装文件夹中找到 与头文件名字对应的 ...
- webstorm设置点击(单击)左侧项目资源管理器里面的文件,自动在右侧打开源代码文件
点击左侧"项目"右上角齿轮勾选"自动滚动到源代码"
最新文章
- dubbo-go 白话文 | 从零搭建 dubbogo 和 dubbo 的简单用例
- Bean的解析与注册
- 光伏业务爆发 同景新能源与信义光能签署103MW订单
- vue积累——另一种走马灯
- python合并路径和文件名_Python实例 分割路径和文件名
- zabbix监控MogDB之采集prometheus数据
- java 8 foreach获取索引
- 对数据库设计的一点感想
- java arrays_Java的Arrays方法分析
- 【C/C++】size_t 数据类型
- 热烈庆贺产品站点开通。正在建设中...
- Nexus下载安装及对接
- Cocos2d-x 3.17.2 集成X5WebView内核方法,完美运行
- qq空间音乐外链,音乐永久地址,连接dj,连接音乐,背景音乐,舞曲背景0sm.com
- 粒子滤波算法matlab代码,粒子滤波算法原理及Matlab程序(专题).ppt
- 随笔-人生第一份工作离职了
- gh-ost —— GitHub Online DDL 工具使用详解
- python中cfg_python操作cfg配置文件
- 图片鼠标移入图片改变颜色、显示另外一张图片(2种方式)
- bat批量安装软件,完成最后删除文件夹里所有安装包