最近在看memcached的源码,觉得它那种libevent+多线程的服务器模型真的很不错,我将这个模型封装成一个C++类,根据我的简单测试,这个模型的效率真的很不错,欢迎大家试用。

这个类的使用方法很简单(缺点是不太灵活),只要派生一个类,根据需要重写以下这几个虚函数就行了:

//新建连接成功后,会调用该函数
virtual void ConnectionEvent(Conn *conn) { }
//读取完数据后,会调用该函数
virtual void ReadEvent(Conn *conn) { }
//发送完成功后,会调用该函数(因为串包的问题,所以并不是每次发送完数据都会被调用)
virtual void WriteEvent(Conn *conn) { }
//断开连接(客户自动断开或异常断开)后,会调用该函数
virtual void CloseEvent(Conn *conn, short events) { }
//发生致命错误(如果创建子线程失败等)后,会调用该函数
//该函数的默认操作是输出错误提示,终止程序
virtual void ErrorQuit(const char *str);

如果大家有什么建议或意见,欢迎给我发邮件:aa1080711@163.com

上代码:

头文件:TcpEventServer.h

//TcpEventServer.h
#ifndef TCPEVENTSERVER_H_
#define TCPEVENTSERVER_H_#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <signal.h>
#include <time.h>
#include <pthread.h>
#include <fcntl.h>#include <map>
using std::map;#include <event.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <event2/listener.h>
#include <event2/util.h>
#include <event2/event.h>class TcpEventServer;
class Conn;
class ConnQueue;
struct LibeventThread;//这个类一个链表的结点类,结点里存储各个连接的信息,
//并提供了读写数据的接口
class Conn
{
  //此类只能由TcpBaseServer创建,
  //并由ConnQueue类管理
  friend class ConnQueue;
  friend class TcpEventServer;private:
  const int m_fd;               //socket的ID
  evbuffer *m_ReadBuf;      //读数据的缓冲区
  evbuffer *m_WriteBuf;     //写数据的缓冲区  Conn *m_Prev;                //前一个结点的指针
  Conn *m_Next;             //后一个结点的指针
  LibeventThread *m_Thread;  Conn(int fd=0);
  ~Conn();public:
  LibeventThread *GetThread() { return m_Thread; }
  int GetFd() { return m_fd; }  //获取可读数据的长度
  int GetReadBufferLen()
  { return evbuffer_get_length(m_ReadBuf); }  //从读缓冲区中取出len个字节的数据,存入buffer中,若不够,则读出所有数据
  //返回读出数据的字节数
  int GetReadBuffer(char *buffer, int len)
  { return evbuffer_remove(m_ReadBuf, buffer, len); }  //从读缓冲区中复制出len个字节的数据,存入buffer中,若不够,则复制出所有数据
  //返回复制出数据的字节数
  //执行该操作后,数据还会留在缓冲区中,buffer中的数据只是原数据的副本
  int CopyReadBuffer(char *buffer, int len)
  { return evbuffer_copyout(m_ReadBuf, buffer, len); }  //获取可写数据的长度
  int GetWriteBufferLen()
  { return evbuffer_get_length(m_WriteBuf); }  //将数据加入写缓冲区,准备发送
  int AddToWriteBuffer(char *buffer, int len)
  { return evbuffer_add(m_WriteBuf, buffer, len); }  //将读缓冲区中的数据移动到写缓冲区
  void MoveBufferData()
  { evbuffer_add_buffer(m_WriteBuf, m_ReadBuf); }};//带头尾结点的双链表类,每个结点存储一个连接的数据
class ConnQueue
{
private:
  Conn *m_head;
  Conn *m_tail;
public:
  ConnQueue();
  ~ConnQueue();
  Conn *InsertConn(int fd, LibeventThread *t);
  void DeleteConn(Conn *c);
  //void PrintQueue();
};//每个子线程的线程信息
struct LibeventThread
{
  pthread_t tid;                //线程的ID
  struct event_base *base;  //libevent的事件处理机
  struct event notifyEvent; //监听管理的事件机
  int notifyReceiveFd;      //管理的接收端
  int notifySendFd;         //管道的发送端
  ConnQueue connectQueue;       //socket连接的链表  //在libevent的事件处理中要用到很多回调函数,不能使用类隐含的this指针
  //所以用这样方式将TcpBaseServer的类指针传过去
  TcpEventServer *tcpConnect;    //TcpBaseServer类的指针
};class TcpEventServer
{
private:
  int m_ThreadCount;                    //子线程数
  int m_Port;                           //监听的端口
  LibeventThread *m_MainBase;           //主线程的libevent事件处理机
  LibeventThread *m_Threads;            //存储各个子线程信息的数组
  map<int, event*> m_SignalEvents;    //自定义的信号处理public:
  static const int EXIT_CODE = -1;private:
  //初始化子线程的数据
  void SetupThread(LibeventThread *thread);  //子线程的入门函数
  static void *WorkerLibevent(void *arg);
  //(主线程收到请求后),对应子线程的处理函数
  static void ThreadProcess(int fd, short which, void *arg);
  //被libevent回调的各个静态函数
  static void ListenerEventCb(evconnlistener *listener, evutil_socket_t fd,
    sockaddr *sa, int socklen, void *user_data);
  static void ReadEventCb(struct bufferevent *bev, void *data);
  static void WriteEventCb(struct bufferevent *bev, void *data);
  static void CloseEventCb(struct bufferevent *bev, short events, void *data);protected:
  //这五个虚函数,一般是要被子类继承,并在其中处理具体业务的  //新建连接成功后,会调用该函数
  virtual void ConnectionEvent(Conn *conn) { }  //读取完数据后,会调用该函数
  virtual void ReadEvent(Conn *conn) { }  //发送完成功后,会调用该函数(因为串包的问题,所以并不是每次发送完数据都会被调用)
  virtual void WriteEvent(Conn *conn) { }  //断开连接(客户自动断开或异常断开)后,会调用该函数
  virtual void CloseEvent(Conn *conn, short events) { }  //发生致命错误(如果创建子线程失败等)后,会调用该函数
  //该函数的默认操作是输出错误提示,终止程序
  virtual void ErrorQuit(const char *str);public:
  TcpEventServer(int count);
  ~TcpEventServer();  //设置监听的端口号,如果不需要监听,请将其设置为EXIT_CODE
  void SetPort(int port)
  { m_Port = port; }  //开始事件循环
  bool StartRun();
  //在tv时间里结束事件循环
  //否tv为空,则立即停止
  void StopRun(timeval *tv);  //添加和删除信号处理事件
  //sig是信号,ptr为要回调的函数
  bool AddSignalEvent(int sig, void (*ptr)(int, short, void*));
  bool DeleteSignalEvent(int sig);  //添加和删除定时事件
  //ptr为要回调的函数,tv是间隔时间,once决定是否只执行一次
  event *AddTimerEvent(void(*ptr)(int, short, void*),
    timeval tv, bool once);
  bool DeleteTImerEvent(event *ev);
};#endif

实现文件:TcpEventServer.cpp

//TcpEventServer.cpp
#include "TcpEventServer.h"Conn::Conn(int fd) : m_fd(fd)
{
  m_Prev = NULL;
  m_Next = NULL;
}Conn::~Conn()
{}ConnQueue::ConnQueue()
{
  //建立头尾结点,并调整其指针
  m_head = new Conn(0);
  m_tail = new Conn(0);
  m_head->m_Prev = m_tail->m_Next = NULL;
  m_head->m_Next = m_tail;
  m_tail->m_Prev = m_head;
}ConnQueue::~ConnQueue()
{
  Conn *tcur, *tnext;
  tcur = m_head;
  //循环删除链表中的各个结点
  while( tcur != NULL )
  {
    tnext = tcur->m_Next;
    delete tcur;
    tcur = tnext;
  }
}Conn *ConnQueue::InsertConn(int fd, LibeventThread *t)
{
  Conn *c = new Conn(fd);
  c->m_Thread = t;
  Conn *next = m_head->m_Next;  c->m_Prev = m_head;
  c->m_Next = m_head->m_Next;
  m_head->m_Next = c;
  next->m_Prev = c;
  return c;
}void ConnQueue::DeleteConn(Conn *c)
{
  c->m_Prev->m_Next = c->m_Next;
  c->m_Next->m_Prev = c->m_Prev;
  delete c;
}/*
void ConnQueue::PrintQueue()
{
  Conn *cur = m_head->m_Next;
  while( cur->m_Next != NULL )
  {
    printf("%d ", cur->m_fd);
    cur = cur->m_Next;
  }
  printf("\n");
}
*/TcpEventServer::TcpEventServer(int count)
{
  //初始化各项数据
  m_ThreadCount = count;
  m_Port = -1;
  m_MainBase = new LibeventThread;
  m_Threads = new LibeventThread[m_ThreadCount];
  m_MainBase->tid = pthread_self();
  m_MainBase->base = event_base_new();  //初始化各个子线程的结构体
  for(int i=0; i<m_ThreadCount; i++)
  {
    SetupThread(&m_Threads[i]);
  }}TcpEventServer::~TcpEventServer()
{
  //停止事件循环(如果事件循环没开始,则没效果)
  StopRun(NULL);  //释放内存
  event_base_free(m_MainBase->base);
  for(int i=0; i<m_ThreadCount; i++)
    event_base_free(m_Threads[i].base);  delete m_MainBase;
  delete [] m_Threads;
}void TcpEventServer::ErrorQuit(const char *str)
{
  //输出错误信息,退出程序
  fprintf(stderr, "%s", str);
  if( errno != 0 )
    fprintf(stderr, " : %s", strerror(errno));
  fprintf(stderr, "\n");
  exit(1);
}void TcpEventServer::SetupThread(LibeventThread *me)
{
  //建立libevent事件处理机制
  me->tcpConnect = this;
  me->base = event_base_new();
  if( NULL == me->base )
    ErrorQuit("event base new error");  //在主线程和子线程之间建立管道
  int fds[2];
  if( pipe(fds) )
    ErrorQuit("create pipe error");
  me->notifyReceiveFd = fds[0];
  me->notifySendFd = fds[1];  //让子线程的状态机监听管道
  event_set( &me->notifyEvent, me->notifyReceiveFd,
    EV_READ | EV_PERSIST, ThreadProcess, me );
  event_base_set(me->base, &me->notifyEvent);
  if ( event_add(&me->notifyEvent, 0) == -1 )
    ErrorQuit("Can't monitor libevent notify pipe\n");
}void *TcpEventServer::WorkerLibevent(void *arg)
{
  //开启libevent的事件循环,准备处理业务
  LibeventThread *me = (LibeventThread*)arg;
  //printf("thread %u started\n", (unsigned int)me->tid);
  event_base_dispatch(me->base);
  //printf("subthread done\n");
}bool TcpEventServer::StartRun()
{
  evconnlistener *listener;  //如果端口号不是EXIT_CODE,就监听该端口号
  if( m_Port != EXIT_CODE )
  {
    sockaddr_in sin;
    memset(&sin, 0, sizeof(sin));
    sin.sin_family = AF_INET;
    sin.sin_port = htons(m_Port);
    listener = evconnlistener_new_bind(m_MainBase->base,
      ListenerEventCb, (void*)this,
      LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_FREE, -1,
      (sockaddr*)&sin, sizeof(sockaddr_in));
    if( NULL == listener )
      ErrorQuit("TCP listen error");
  }  //开启各个子线程
  for(int i=0; i<m_ThreadCount; i++)
  {
    pthread_create(&m_Threads[i].tid, NULL,
      WorkerLibevent, (void*)&m_Threads[i]);
  }  //开启主线程的事件循环
  event_base_dispatch(m_MainBase->base);  //事件循环结果,释放监听者的内存
  if( m_Port != EXIT_CODE )
  {
    //printf("free listen\n");
    evconnlistener_free(listener);
  }
}void TcpEventServer::StopRun(timeval *tv)
{
  int contant = EXIT_CODE;
  //向各个子线程的管理中写入EXIT_CODE,通知它们退出
  for(int i=0; i<m_ThreadCount; i++)
  {
    write(m_Threads[i].notifySendFd, &contant, sizeof(int));
  }
  //结果主线程的事件循环
  event_base_loopexit(m_MainBase->base, tv);
}void TcpEventServer::ListenerEventCb(struct evconnlistener *listener,
                  evutil_socket_t fd,
                  struct sockaddr *sa,
                  int socklen,
                  void *user_data)
{
  TcpEventServer *server = (TcpEventServer*)user_data;  //随机选择一个子线程,通过管道向其传递socket描述符
  int num = rand() % server->m_ThreadCount;
  int sendfd = server->m_Threads[num].notifySendFd;
  write(sendfd, &fd, sizeof(evutil_socket_t));
}void TcpEventServer::ThreadProcess(int fd, short which, void *arg)
{
  LibeventThread *me = (LibeventThread*)arg;  //从管道中读取数据(socket的描述符或操作码)
  int pipefd = me->notifyReceiveFd;
  evutil_socket_t confd;
  read(pipefd, &confd, sizeof(evutil_socket_t));  //如果操作码是EXIT_CODE,则终于事件循环
  if( EXIT_CODE == confd )
  {
    event_base_loopbreak(me->base);
    return;
  }  //新建连接
  struct bufferevent *bev;
  bev = bufferevent_socket_new(me->base, confd, BEV_OPT_CLOSE_ON_FREE);
  if (!bev)
  {
    fprintf(stderr, "Error constructing bufferevent!");
    event_base_loopbreak(me->base);
    return;
  }  //将该链接放入队列
  Conn *conn = me->connectQueue.InsertConn(confd, me);  //准备从socket中读写数据
  bufferevent_setcb(bev, ReadEventCb, WriteEventCb, CloseEventCb, conn);
  bufferevent_enable(bev, EV_WRITE);
  bufferevent_enable(bev, EV_READ);  //调用用户自定义的连接事件处理函数
  me->tcpConnect->ConnectionEvent(conn);
}void TcpEventServer::ReadEventCb(struct bufferevent *bev, void *data)
{
  Conn *conn = (Conn*)data;
  conn->m_ReadBuf = bufferevent_get_input(bev);
  conn->m_WriteBuf = bufferevent_get_output(bev);  //调用用户自定义的读取事件处理函数
  conn->m_Thread->tcpConnect->ReadEvent(conn);
} void TcpEventServer::WriteEventCb(struct bufferevent *bev, void *data)
{
  Conn *conn = (Conn*)data;
  conn->m_ReadBuf = bufferevent_get_input(bev);
  conn->m_WriteBuf = bufferevent_get_output(bev);  //调用用户自定义的写入事件处理函数
  conn->m_Thread->tcpConnect->WriteEvent(conn);}void TcpEventServer::CloseEventCb(struct bufferevent *bev, short events, void *data)
{
  Conn *conn = (Conn*)data;
  //调用用户自定义的断开事件处理函数
  conn->m_Thread->tcpConnect->CloseEvent(conn, events);
  conn->GetThread()->connectQueue.DeleteConn(conn);
  bufferevent_free(bev);
}bool TcpEventServer::AddSignalEvent(int sig, void (*ptr)(int, short, void*))
{
  //新建一个信号事件
  event *ev = evsignal_new(m_MainBase->base, sig, ptr, (void*)this);
  if ( !ev ||
    event_add(ev, NULL) < 0 )
  {
    event_del(ev);
    return false;
  }  //删除旧的信号事件(同一个信号只能有一个信号事件)
  DeleteSignalEvent(sig);
  m_SignalEvents[sig] = ev;  return true;
}bool TcpEventServer::DeleteSignalEvent(int sig)
{
  map<int, event*>::iterator iter = m_SignalEvents.find(sig);
  if( iter == m_SignalEvents.end() )
    return false;  event_del(iter->second);
  m_SignalEvents.erase(iter);
  return true;
}event *TcpEventServer::AddTimerEvent(void (*ptr)(int, short, void *),
                  timeval tv, bool once)
{
  int flag = 0;
  if( !once )
    flag = EV_PERSIST;  //新建定时器信号事件
  event *ev = new event;
  event_assign(ev, m_MainBase->base, -1, flag, ptr, (void*)this);
  if( event_add(ev, &tv) < 0 )
  {
    event_del(ev);
    return NULL;
  }
  return ev;
}bool TcpEventServer::DeleteTImerEvent(event *ev)
{
  int res = event_del(ev);
  return (0 == res);
}

测试文件:test.cpp

/*
这是一个测试用的服务器,只有两个功能:
1:对于每个已连接客户端,每10秒向其发送一句hello, world
2:若客户端向服务器发送数据,服务器收到后,再将数据回发给客户端
*/
//test.cpp
#include "TcpEventServer.h"
#include <set>
#include <vector>
using namespace std;//测试示例
class TestServer : public TcpEventServer
{
private:
  vector<Conn*> vec;
protected:
  //重载各个处理业务的虚函数
  void ReadEvent(Conn *conn);
  void WriteEvent(Conn *conn);
  void ConnectionEvent(Conn *conn);
  void CloseEvent(Conn *conn, short events);
public:
  TestServer(int count) : TcpEventServer(count) { }
  ~TestServer() { } 

  //退出事件,响应Ctrl+C
  static void QuitCb(int sig, short events, void *data);
  //定时器事件,每10秒向所有客户端发一句hello, world
  static void TimeOutCb(int id, int short events, void *data);
};void TestServer::ReadEvent(Conn *conn)
{
  conn->MoveBufferData();
}void TestServer::WriteEvent(Conn *conn)
{}void TestServer::ConnectionEvent(Conn *conn)
{
  TestServer *me = (TestServer*)conn->GetThread()->tcpConnect;
  printf("new connection: %d\n", conn->GetFd());
  me->vec.push_back(conn);
}void TestServer::CloseEvent(Conn *conn, short events)
{
  printf("connection closed: %d\n", conn->GetFd());
}void TestServer::QuitCb(int sig, short events, void *data)
{
  printf("Catch the SIGINT signal, quit in one second\n");
  TestServer *me = (TestServer*)data;
  timeval tv = {1, 0};
  me->StopRun(&tv);
}void TestServer::TimeOutCb(int id, short events, void *data)
{
  TestServer *me = (TestServer*)data;
  char temp[33] = "hello, world\n";
  for(int i=0; i<me->vec.size(); i++)
    me->vec[i]->AddToWriteBuffer(temp, strlen(temp));
}int main()
{
  printf("pid: %d\n", getpid());
  TestServer server(3);
  server.AddSignalEvent(SIGINT, TestServer::QuitCb);
  timeval tv = {10, 0};
  server.AddTimerEvent(TestServer::TimeOutCb, tv, false);
  server.SetPort(2111);
  server.StartRun();
  printf("done\n");

  return 0;
}

编译与运行命令:

qch@LinuxMint ~/program/ztemp $ g++ TcpEventServer.cpp test.cpp -o test -levent -pthread
qch@LinuxMint ~/program/ztemp $ ./test
pid: 20264
new connection: 22
connection closed: 22
^CCatch the SIGINT signal, quit in one second
done

libevent+多线程的服务器模型相关推荐

  1. 使用libevent多线程验证Linux上的服务器惊群现象

    什么是惊群现象? 惊群(thundering herd)是指,只有一个子进程能获得连接,但所有N个子进程却都被唤醒了,这种情况将使性能受损. 举一个很简单的例子,当你往一群鸽子中间扔一块食物,虽然最终 ...

  2. libevent多线程

    利用libevent和多线程 实现多并发的服务器的设计.主进程监听连接的到来使用一个base,进行事件循环.每当 一个连接进来时,创建一个新的线程实现与客户端之间的通信,子线程建立一个base,进行事 ...

  3. libevent多线程使用bufferevent的那些事

    void do_accept(struct evconnlistener *listener, evutil_socket_t fd, struct sockaddr *sa, int socklen ...

  4. linux下多进程聊天室,从0实现基于Linux socket聊天室-多线程服务器模型-1

    前言 Socket在实际系统程序开发张中,应用非常广泛,也非常重要.实际应用中服务器经常需要支持多个客户端连接,实现高并发服务器模型显得尤为重要.高并发服务器从简单的循环服务器模型处理少量网络并发请求 ...

  5. 服务器模型---总结

    前言 事件驱动为广大的程序员所熟悉,其最为人津津乐道的是在图形化界面编程中的应用:事实上,在网络编程中事件驱动也被广泛使用,并大规模部署在高连接 数高吞吐量的服务器程序中,如 http 服务器程序.f ...

  6. 网络IO管理-简单一问一答、多线程方式

    思考 1. 那网络中进程之间如何通信,浏览器的进程怎么与web服务器通信的? 2. 什么时候用一请求一线程的方式? 3. 什么时候用select/poll? 4. 什么时候用epoll? 准备工作 下 ...

  7. mysql io模型_5种网络IO模型

    同步(synchronous) IO和异步(asynchronous) IO,阻塞(blocking) IO和非阻塞(non-blocking)IO分别是什么,到底有什么区别?这个问题其实不同的人给出 ...

  8. 几种经典的网络服务器架构模型的分析与比较

    前言 事件驱动为广大的程序员所熟悉,其最为人津津乐道的是在图形化界面编程中的应用:事实上,在网络编程中事件驱动也被广泛使用,并大规模部署在高连接数高吞吐量的服务器程序中,如 http 服务器程序.ft ...

  9. 5种网络IO模型:阻塞IO、非阻塞IO、异步IO、多路复用IO、信号驱动IO

    目录 前言 阻塞IO(blocking IO) 非阻塞IO(non-blocking IO) 多路复用IO(IO multiplexing) 异步IO(Asynchronous I/O) 模型间的区别 ...

最新文章

  1. 根据JSON自动生成select联动
  2. VirtualBox上装CentOS5.8网络不通问题
  3. cent os mysql下载_Cent OS 6.4安装mysql
  4. 【Android 逆向】Android 逆向通用工具开发 ( 网络模块开发 | 配置头文件 | 配置编译参数 | 网络初始化 WSAStartup 与清理 WSACleanup 操作 )
  5. c#学习笔记之第一个程序“Hello world”
  6. 1MySQL是面向对象型数据库_数据库及MySQL基础(1)
  7. 使用Area(区域)会遇到的问题
  8. 一个有关Update类型的存储过程的问题
  9. python对初学者的看法_python学习之道(1)——新手小白对print()函数的理解,Python,之路,一,浅谈...
  10. 液冷计算机组装,电脑水冷散热器原理解密及安装方法
  11. RabbitMQ路由模式(direct)
  12. python和对象复习_python 面向对象基础和高级复习
  13. python 数组中取出最小值_Python 数组中的冒号使用
  14. ClassGraph使用
  15. 强化学习从入门到放弃的资料
  16. [免费专栏] Android安全之Drozer安全测试详细使用教程
  17. win10快捷键整理记录
  18. 各种音视频编解码学习详解 h264 ,mpeg4 ,aac 等所有音视频格式
  19. Luogu P4231 三步必杀 (差分)
  20. Chapter9.2:线性系统的状态空间分析与综合(上)

热门文章

  1. SpringBoot + Redis 解决海量重复提交问题
  2. Spring Boot如何实现在线预览?这个开源项目可以学习一下,支持99%常用文件!...
  3. 皮一皮:大义灭亲啊这是...
  4. 你要的Spring Boot多图片上传回显功能已经实现了,赶紧收藏吃灰~
  5. 想让好友不停地擦手机屏幕?微信头像这样设置就行了!
  6. 为什么阿里如此钟爱Flink?
  7. 两个奇技淫巧,将 Docker 镜像体积减小 99%
  8. 详细解读 Prometheus 的指标类型
  9. 分享一套主流框架源码资料,征服阿里 P7 面试必备!
  10. 使用Consul做服务发现的若干姿势