最近一直在看游双的《高性能linux服务器编程》一书,下载链接: http://download.csdn.net/detail/analogous_love/9673008

书上是这么介绍Reactor模式的:

按照这个思路,我写个简单的练习:

/** *@desc:   用reactor模式练习服务器程序,main.cpp*@author: zhangyl*@date:   2016.11.23*/#include <iostream>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>  //for htonl() and htons()
#include <unistd.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <signal.h>     //for signal()
#include <pthread.h>
#include <semaphore.h>
#include <list>
#include <errno.h>
#include <time.h>
#include <sstream>
#include <iomanip> //for std::setw()/setfill()
#include <stdlib.h>#define WORKER_THREAD_NUM   5#define min(a, b) ((a <= b) ? (a) : (b)) int g_epollfd = 0;
bool g_bStop = false;
int g_listenfd = 0;
pthread_t g_acceptthreadid = 0;
pthread_t g_threadid[WORKER_THREAD_NUM] = { 0 };
pthread_cond_t g_acceptcond;
pthread_mutex_t g_acceptmutex;pthread_cond_t g_cond /*= PTHREAD_COND_INITIALIZER*/;
pthread_mutex_t g_mutex /*= PTHREAD_MUTEX_INITIALIZER*/;pthread_mutex_t g_clientmutex;std::list<int> g_listClients;void prog_exit(int signo)
{::signal(SIGINT, SIG_IGN);//::signal(SIGKILL, SIG_IGN);//该信号不能被阻塞、处理或者忽略::signal(SIGTERM, SIG_IGN);std::cout << "program recv signal " << signo << " to exit." << std::endl;g_bStop = true;::epoll_ctl(g_epollfd, EPOLL_CTL_DEL, g_listenfd, NULL);//TODO: 是否需要先调用shutdown()一下?::shutdown(g_listenfd, SHUT_RDWR);::close(g_listenfd);::close(g_epollfd);::pthread_cond_destroy(&g_acceptcond);::pthread_mutex_destroy(&g_acceptmutex);::pthread_cond_destroy(&g_cond);::pthread_mutex_destroy(&g_mutex);::pthread_mutex_destroy(&g_clientmutex);
}bool create_server_listener(const char* ip, short port)
{g_listenfd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);if (g_listenfd == -1)return false;int on = 1;::setsockopt(g_listenfd, SOL_SOCKET, SO_REUSEADDR, (char *)&on, sizeof(on));::setsockopt(g_listenfd, SOL_SOCKET, SO_REUSEPORT, (char *)&on, sizeof(on));struct sockaddr_in servaddr;memset(&servaddr, 0, sizeof(servaddr)); servaddr.sin_family = AF_INET;servaddr.sin_addr.s_addr = inet_addr(ip);servaddr.sin_port = htons(port);if (::bind(g_listenfd, (sockaddr *)&servaddr, sizeof(servaddr)) == -1)return false;if (::listen(g_listenfd, 50) == -1)return false;g_epollfd = ::epoll_create(1);if (g_epollfd == -1)return false;struct epoll_event e;memset(&e, 0, sizeof(e));e.events = EPOLLIN | EPOLLRDHUP;e.data.fd = g_listenfd;if (::epoll_ctl(g_epollfd, EPOLL_CTL_ADD, g_listenfd, &e) == -1)return false;return true;
}void release_client(int clientfd)
{if (::epoll_ctl(g_epollfd, EPOLL_CTL_DEL, clientfd, NULL) == -1)std::cout << "release client socket failed as call epoll_ctl failed" << std::endl;::close(clientfd);
}void* accept_thread_func(void* arg)
{   while (!g_bStop){::pthread_mutex_lock(&g_acceptmutex);::pthread_cond_wait(&g_acceptcond, &g_acceptmutex);//::pthread_mutex_lock(&g_acceptmutex);//std::cout << "run loop in accept_thread_func" << std::endl;struct sockaddr_in clientaddr;socklen_t addrlen;int newfd = ::accept(g_listenfd, (struct sockaddr *)&clientaddr, &addrlen);::pthread_mutex_unlock(&g_acceptmutex);if (newfd == -1)continue;std::cout << "new client connected: " << ::inet_ntoa(clientaddr.sin_addr) << ":" << ::ntohs(clientaddr.sin_port) << std::endl;//将新socket设置为non-blockingint oldflag = ::fcntl(newfd, F_GETFL, 0);int newflag = oldflag | O_NONBLOCK;if (::fcntl(newfd, F_SETFL, newflag) == -1){std::cout << "fcntl error, oldflag =" << oldflag << ", newflag = " << newflag << std::endl;continue;}struct epoll_event e;memset(&e, 0, sizeof(e));e.events = EPOLLIN | EPOLLRDHUP | EPOLLET;e.data.fd = newfd;if (::epoll_ctl(g_epollfd, EPOLL_CTL_ADD, newfd, &e) == -1){std::cout << "epoll_ctl error, fd =" << newfd << std::endl;}}return NULL;
}void* worker_thread_func(void* arg)
{   while (!g_bStop){int clientfd;::pthread_mutex_lock(&g_clientmutex);while (g_listClients.empty())::pthread_cond_wait(&g_cond, &g_clientmutex);clientfd = g_listClients.front();g_listClients.pop_front();  pthread_mutex_unlock(&g_clientmutex);//gdb调试时不能实时刷新标准输出,用这个函数刷新标准输出,使信息在屏幕上实时显示出来std::cout << std::endl;std::string strclientmsg;char buff[256];bool bError = false;while (true){memset(buff, 0, sizeof(buff));int nRecv = ::recv(clientfd, buff, 256, 0);if (nRecv == -1){if (errno == EWOULDBLOCK)break;else{std::cout << "recv error, client disconnected, fd = " << clientfd << std::endl;release_client(clientfd);bError = true;break;}}//对端关闭了socket,这端也关闭。else if (nRecv == 0){std::cout << "peer closed, client disconnected, fd = " << clientfd << std::endl;release_client(clientfd);bError = true;break;}strclientmsg += buff;}//出错了,就不要再继续往下执行了if (bError)continue;std::cout << "client msg: " << strclientmsg;//将消息加上时间标签后发回time_t now = time(NULL);struct tm* nowstr = localtime(&now);std::ostringstream ostimestr;ostimestr << "[" << nowstr->tm_year + 1900 << "-" << std::setw(2) << std::setfill('0') << nowstr->tm_mon + 1 << "-" << std::setw(2) << std::setfill('0') << nowstr->tm_mday << " "<< std::setw(2) << std::setfill('0') << nowstr->tm_hour << ":" << std::setw(2) << std::setfill('0') << nowstr->tm_min << ":" << std::setw(2) << std::setfill('0') << nowstr->tm_sec << "]server reply: ";strclientmsg.insert(0, ostimestr.str());while (true){int nSent = ::send(clientfd, strclientmsg.c_str(), strclientmsg.length(), 0);if (nSent == -1){if (errno == EWOULDBLOCK){::sleep(10);continue;}else{std::cout << "send error, fd = " << clientfd << std::endl;release_client(clientfd);break;}}          std::cout << "send: " << strclientmsg;strclientmsg.erase(0, nSent);if (strclientmsg.empty())break;}}return NULL;
}void daemon_run()
{int pid;signal(SIGCHLD, SIG_IGN);//1)在父进程中,fork返回新创建子进程的进程ID;//2)在子进程中,fork返回0;//3)如果出现错误,fork返回一个负值;pid = fork();if (pid < 0){std:: cout << "fork error" << std::endl;exit(-1);}//父进程退出,子进程独立运行else if (pid > 0) {exit(0);}//之前parent和child运行在同一个session里,parent是会话(session)的领头进程,//parent进程作为会话的领头进程,如果exit结束执行的话,那么子进程会成为孤儿进程,并被init收养。//执行setsid()之后,child将重新获得一个新的会话(session)id。//这时parent退出之后,将不会影响到child了。setsid();int fd;fd = open("/dev/null", O_RDWR, 0);if (fd != -1){dup2(fd, STDIN_FILENO);dup2(fd, STDOUT_FILENO);dup2(fd, STDERR_FILENO);}if (fd > 2)close(fd);}int main(int argc, char* argv[])
{  short port = 0;int ch;bool bdaemon = false;while ((ch = getopt(argc, argv, "p:d")) != -1){switch (ch){case 'd':bdaemon = true;break;case 'p':port = atol(optarg);break;}}if (bdaemon)daemon_run();if (port == 0)port = 12345;if (!create_server_listener("0.0.0.0", port)){std::cout << "Unable to create listen server: ip=0.0.0.0, port=" << port << "." << std::endl;return -1;}//设置信号处理signal(SIGCHLD, SIG_DFL);signal(SIGPIPE, SIG_IGN);signal(SIGINT, prog_exit);//signal(SIGKILL, prog_exit);//该信号不能被阻塞、处理或者忽略signal(SIGTERM, prog_exit);::pthread_cond_init(&g_acceptcond, NULL);::pthread_mutex_init(&g_acceptmutex, NULL);::pthread_cond_init(&g_cond, NULL);::pthread_mutex_init(&g_mutex, NULL);::pthread_mutex_init(&g_clientmutex, NULL);::pthread_create(&g_acceptthreadid, NULL, accept_thread_func, NULL);//启动工作线程for (int i = 0; i < WORKER_THREAD_NUM; ++i){::pthread_create(&g_threadid[i], NULL, worker_thread_func, NULL);}while (!g_bStop){       struct epoll_event ev[1024];int n = ::epoll_wait(g_epollfd, ev, 1024, 10);if (n == 0)continue;else if (n < 0){std::cout << "epoll_wait error" << std::endl;continue;}int m = min(n, 1024);for (int i = 0; i < m; ++i){//通知接收连接线程接收新连接if (ev[i].data.fd == g_listenfd)pthread_cond_signal(&g_acceptcond);//通知普通工作线程接收数据else{               pthread_mutex_lock(&g_clientmutex);              g_listClients.push_back(ev[i].data.fd);pthread_mutex_unlock(&g_clientmutex);pthread_cond_signal(&g_cond);//std::cout << "signal" << std::endl;}}}return 0;
}
//该信号不能被阻塞、处理或者忽略signal(SIGTERM, prog_exit);::pthread_cond_init(&g_acceptcond, NULL);::pthread_mutex_init(&g_acceptmutex, NULL);::pthread_cond_init(&g_cond, NULL);::pthread_mutex_init(&g_mutex, NULL);::pthread_mutex_init(&g_clientmutex, NULL);::pthread_create(&g_acceptthreadid, NULL, accept_thread_func, NULL);//启动工作线程for (int i = 0; i < WORKER_THREAD_NUM; ++i){::pthread_create(&g_threadid[i], NULL, worker_thread_func, NULL);}while (!g_bStop){       struct epoll_event ev[1024];int n = ::epoll_wait(g_epollfd, ev, 1024, 10);if (n == 0)continue;else if (n < 0){std::cout << "epoll_wait error" << std::endl;continue;}int m = min(n, 1024);for (int i = 0; i < m; ++i){//通知接收连接线程接收新连接if (ev[i].data.fd == g_listenfd)pthread_cond_signal(&g_acceptcond);//通知普通工作线程接收数据else{               pthread_mutex_lock(&g_clientmutex);              g_listClients.push_back(ev[i].data.fd);pthread_mutex_unlock(&g_clientmutex);pthread_cond_signal(&g_cond);//std::cout << "signal" << std::endl;}}}return 0;
}

程序的功能一个简单的echo服务:客户端连接上服务器之后,给服务器发送信息,服务器加上时间戳等信息后返回给客户端。使用到的知识点有:

1. 条件变量

2.epoll的边缘触发模式

程序的大致框架是:

1. 主线程只负责监听侦听socket上是否有新连接,如果有新连接到来,交给一个叫accept的工作线程去接收新连接,并将新连接socket绑定到主线程使用epollfd上去。

2. 主线程如果侦听到客户端的socket上有可读事件,则通知另外五个工作线程去接收处理客户端发来的数据,并将数据加上时间戳后发回给客户端。

3. 可以通过传递-p port来设置程序的监听端口号;可以通过传递-d来使程序以daemon模式运行在后台。这也是标准linux daemon模式的书写方法。

程序难点和需要注意的地方是:

1. 条件变量为了防止虚假唤醒,一定要在一个循环里面调用pthread_cond_wait()函数,我在worker_thread_func()中使用了:

while (g_listClients.empty())::pthread_cond_wait(&g_cond, &g_clientmutex);

在accept_thread_func()函数里面我没有使用循环,这样会有问题吗?

2. 使用条件变量pthread_cond_wait()函数的时候一定要先获得与该条件变量相关的mutex,即像下面这样的结构:

mutex_lock(...);while (condition is true)::pthread_cond_wait(...);//这里可以有其他代码...
mutex_unlock(...);//这里可以有其他代码...

因为pthread_cond_wait()如果阻塞的话,它解锁相关mutex和阻塞当前线程这两个动作加在一起是原子的。

3. 作为服务器端程序最好对侦听socket调用setsocketopt()设置SO_REUSEADDR和SO_REUSEPORT两个标志,因为服务程序有时候会需要重启(比如调试的时候就会不断重启),如果不设置这两个标志的话,绑定端口时就会调用失败。因为一个端口使用后,即使不再使用,因为四次挥手该端口处于TIME_WAIT状态,有大约2min的MSL(Maximum Segment Lifetime,最大存活期)。这2min内,该端口是不能被重复使用的。你的服务器程序上次使用了这个端口号,接着重启,因为这个缘故,你再次绑定这个端口就会失败(bind函数调用失败)。要不你就每次重启时需要等待2min后再试(这在频繁重启程序调试是难以接收的),或者设置这种SO_REUSEADDR和SO_REUSEPORT立即回收端口使用。

其实,SO_REUSEADDR在windows上和Unix平台上还有些细微的区别,我在libevent源码中看到这样的描述:

int evutil_make_listen_socket_reuseable(evutil_socket_t sock)
{
#ifndef WIN32int one = 1;/* REUSEADDR on Unix means, "don't hang on to this address after the* listener is closed."  On Windows, though, it means "don't keep other* processes from binding to this address while we're using it. */return setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void*) &one,(ev_socklen_t)sizeof(one));
#elsereturn 0;
#endif
}

注意注释部分,在Unix平台上设置这个选项意味着,任意进程可以复用该地址;而在windows,不要阻止其他进程复用该地址。也就是在在Unix平台上,如果不设置这个选项,任意进程在一定时间内,不能bind该地址;在windows平台上,在一定时间内,其他进程不能bind该地址,而本进程却可以再次bind该地址。

4. epoll_wait对新连接socket使用的是边缘触发模式EPOLLET(edge trigger),而不是默认的水平触发模式(level trigger)。因为如果采取水平触发模式的话,主线程检测到某个客户端socket数据可读时,通知工作线程去收取该socket上的数据,这个时候主线程继续循环,只要在工作线程没有将该socket上数据全部收完,或者在工作线程收取数据的过程中,客户端有新数据到来,主线程会继续发通知(通过pthread_cond_signal())函数,再次通知工作线程收取数据。这样会可能导致多个工作线程同时调用recv函数收取该客户端socket上的数据,这样产生的结果将会导致数据错乱。

相反,采取边缘触发模式,只有等某个工作线程将那个客户端socket上数据全部收取完毕,主线程的epoll_wait才可能会再次触发来通知工作线程继续收取那个客户端socket新来的数据。

5. 代码中有这样一行:

//gdb调试时不能实时刷新标准输出,用这个函数刷新标准输出,使信息在屏幕上实时显示出来std::cout << std::endl;

如果不加上这一行,正常运行服务器程序,程序中要打印到控制台的信息都会打印出来,但是如果用gdb调试状态下,程序的所有输出就不显示了。我不知道这是不是gdb的一个bug,所以这里加上std::endl来输出一个换行符并flush标准输出,让输出显示出来。(std::endl不仅是输出一个换行符而且是同时刷新输出,相当于fflush()函数)。

程序我部署起来了,你可以使用linux的nc命令或自己写程序连接服务器来查看程序效果,当然也可以使用telnet命令,方法:

linux:

nc 120.55.94.78 12345

telnet 120.55.94.78 12345

然后就可以给服务器自由发送数据了,服务器会给你发送的信息加上时间戳返回给你。效果如图:

另外我将这个代码改写了成纯C++11版本,使用CMake编译,为了支持编译必须加上这-std=c++11:

CMakeLists.txt代码如下:

cmake_minimum_required(VERSION 2.8)PROJECT(myreactorserver)AUX_SOURCE_DIRECTORY(./ SRC_LIST)
SET(EXECUTABLE_OUTPUT_PATH ./)ADD_DEFINITIONS(-g -W -Wall -Wno-deprecated -DLINUX -D_REENTRANT -D_FILE_OFFSET_BITS=64 -DAC_HAS_INFO -DAC_HAS_WARNING -DAC_HAS_ERROR -DAC_HAS_CRITICAL -DTIXML_USE_STL -DHAVE_CXX_STDHEADERS ${CMAKE_CXX_FLAGS} -std=c++11)INCLUDE_DIRECTORIES(
./
)
LINK_DIRECTORIES(
./
)set(
main.cpp
myreator.cpp
)ADD_EXECUTABLE(myreactorserver ${SRC_LIST})TARGET_LINK_LIBRARIES(myreactorserver pthread)

myreactor.h文件内容:

/**
*@desc: myreactor头文件, myreactor.h
*@author: zhangyl
*@date: 2016.12.03
*/
#ifndef __MYREACTOR_H__
#define __MYREACTOR_H__#include <list>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>#define WORKER_THREAD_NUM   5class CMyReactor
{
public:CMyReactor();~CMyReactor();bool init(const char* ip, short nport);bool uninit();bool close_client(int clientfd);static void* main_loop(void* p);private://no copyableCMyReactor(const CMyReactor& rhs);CMyReactor& operator = (const CMyReactor& rhs);bool create_server_listener(const char* ip, short port);static void accept_thread_proc(CMyReactor* pReatcor);static void worker_thread_proc(CMyReactor* pReatcor);private://C11语法可以在这里初始化int                           m_listenfd = 0;int                             m_epollfd  = 0;bool                        m_bStop    = false;std::shared_ptr<std::thread> m_acceptthread;std::shared_ptr<std::thread> m_workerthreads[WORKER_THREAD_NUM];std::condition_variable         m_acceptcond;std::mutex                     m_acceptmutex;std::condition_variable       m_workercond ;std::mutex                    m_workermutex;std::list<int>              m_listClients;
};#endif //!__MYREACTOR_H__

myreactor.cpp文件内容:

/** *@desc: myreactor实现文件, myreactor.cpp*@author: zhangyl*@date: 2016.12.03*/
#include "myreactor.h"
#include <iostream>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>  //for htonl() and htons()
#include <fcntl.h>
#include <sys/epoll.h>
#include <list>
#include <errno.h>
#include <time.h>
#include <sstream>
#include <iomanip> //for std::setw()/setfill()
#include <unistd.h>#define min(a, b) ((a <= b) ? (a) : (b))CMyReactor::CMyReactor()
{//m_listenfd = 0;//m_epollfd = 0;//m_bStop = false;
}CMyReactor::~CMyReactor()
{}bool CMyReactor::init(const char* ip, short nport)
{if (!create_server_listener(ip, nport)){std::cout << "Unable to bind: " << ip << ":" << nport << "." << std::endl;return false;}std::cout << "main thread id = " << std::this_thread::get_id() << std::endl;//启动接收新连接的线程m_acceptthread.reset(new std::thread(CMyReactor::accept_thread_proc, this));//启动工作线程for (auto& t : m_workerthreads){t.reset(new std::thread(CMyReactor::worker_thread_proc, this));}return true;
}bool CMyReactor::uninit()
{m_bStop = true;m_acceptcond.notify_one();m_workercond.notify_all();m_acceptthread->join();for (auto& t : m_workerthreads){t->join();}::epoll_ctl(m_epollfd, EPOLL_CTL_DEL, m_listenfd, NULL);//TODO: 是否需要先调用shutdown()一下?::shutdown(m_listenfd, SHUT_RDWR);::close(m_listenfd);::close(m_epollfd);return true;
}bool CMyReactor::close_client(int clientfd)
{if (::epoll_ctl(m_epollfd, EPOLL_CTL_DEL, clientfd, NULL) == -1){std::cout << "close client socket failed as call epoll_ctl failed" << std::endl;//return false;}::close(clientfd);return true;
}void* CMyReactor::main_loop(void* p)
{std::cout << "main thread id = " << std::this_thread::get_id() << std::endl;CMyReactor* pReatcor = static_cast<CMyReactor*>(p);while (!pReatcor->m_bStop){struct epoll_event ev[1024];int n = ::epoll_wait(pReatcor->m_epollfd, ev, 1024, 10);if (n == 0)continue;else if (n < 0){std::cout << "epoll_wait error" << std::endl;continue;}int m = min(n, 1024);for (int i = 0; i < m; ++i){//通知接收连接线程接收新连接if (ev[i].data.fd == pReatcor->m_listenfd)pReatcor->m_acceptcond.notify_one();//通知普通工作线程接收数据else{{std::unique_lock<std::mutex> guard(pReatcor->m_workermutex);pReatcor->m_listClients.push_back(ev[i].data.fd);}pReatcor->m_workercond.notify_one();//std::cout << "signal" << std::endl;}// end if}// end for-loop}// end whilestd::cout << "main loop exit ..." << std::endl;return NULL;
}void CMyReactor::accept_thread_proc(CMyReactor* pReatcor)
{std::cout << "accept thread, thread id = " << std::this_thread::get_id() << std::endl;while (true){int newfd;struct sockaddr_in clientaddr;socklen_t addrlen;{std::unique_lock<std::mutex> guard(pReatcor->m_acceptmutex);pReatcor->m_acceptcond.wait(guard);if (pReatcor->m_bStop)break;//std::cout << "run loop in accept_thread_proc" << std::endl;newfd = ::accept(pReatcor->m_listenfd, (struct sockaddr *)&clientaddr, &addrlen);}if (newfd == -1)continue;std::cout << "new client connected: " << ::inet_ntoa(clientaddr.sin_addr) << ":" << ::ntohs(clientaddr.sin_port) << std::endl;//将新socket设置为non-blockingint oldflag = ::fcntl(newfd, F_GETFL, 0);int newflag = oldflag | O_NONBLOCK;if (::fcntl(newfd, F_SETFL, newflag) == -1){std::cout << "fcntl error, oldflag =" << oldflag << ", newflag = " << newflag << std::endl;continue;}struct epoll_event e;memset(&e, 0, sizeof(e));e.events = EPOLLIN | EPOLLRDHUP | EPOLLET;e.data.fd = newfd;if (::epoll_ctl(pReatcor->m_epollfd, EPOLL_CTL_ADD, newfd, &e) == -1){std::cout << "epoll_ctl error, fd =" << newfd << std::endl;}}std::cout << "accept thread exit ..." << std::endl;
}void CMyReactor::worker_thread_proc(CMyReactor* pReatcor)
{std::cout << "new worker thread, thread id = " << std::this_thread::get_id() << std::endl;while (true){int clientfd;{std::unique_lock<std::mutex> guard(pReatcor->m_workermutex);while (pReatcor->m_listClients.empty()){if (pReatcor->m_bStop){std::cout << "worker thread exit ..." << std::endl;return;}pReatcor->m_workercond.wait(guard);}clientfd = pReatcor->m_listClients.front();pReatcor->m_listClients.pop_front();}//gdb调试时不能实时刷新标准输出,用这个函数刷新标准输出,使信息在屏幕上实时显示出来std::cout << std::endl;std::string strclientmsg;char buff[256];bool bError = false;while (true){memset(buff, 0, sizeof(buff));int nRecv = ::recv(clientfd, buff, 256, 0);if (nRecv == -1){if (errno == EWOULDBLOCK)break;else{std::cout << "recv error, client disconnected, fd = " << clientfd << std::endl;pReatcor->close_client(clientfd);bError = true;break;}}//对端关闭了socket,这端也关闭。else if (nRecv == 0){std::cout << "peer closed, client disconnected, fd = " << clientfd << std::endl;pReatcor->close_client(clientfd);bError = true;break;}strclientmsg += buff;}//出错了,就不要再继续往下执行了if (bError)continue;std::cout << "client msg: " << strclientmsg;//将消息加上时间标签后发回time_t now = time(NULL);struct tm* nowstr = localtime(&now);std::ostringstream ostimestr;ostimestr << "[" << nowstr->tm_year + 1900 << "-"<< std::setw(2) << std::setfill('0') << nowstr->tm_mon + 1 << "-"<< std::setw(2) << std::setfill('0') << nowstr->tm_mday << " "<< std::setw(2) << std::setfill('0') << nowstr->tm_hour << ":"<< std::setw(2) << std::setfill('0') << nowstr->tm_min << ":"<< std::setw(2) << std::setfill('0') << nowstr->tm_sec << "]server reply: ";strclientmsg.insert(0, ostimestr.str());while (true){int nSent = ::send(clientfd, strclientmsg.c_str(), strclientmsg.length(), 0);if (nSent == -1){if (errno == EWOULDBLOCK){std::this_thread::sleep_for(std::chrono::milliseconds(10));continue;}else{std::cout << "send error, fd = " << clientfd << std::endl;pReatcor->close_client(clientfd);break;}}std::cout << "send: " << strclientmsg;strclientmsg.erase(0, nSent);if (strclientmsg.empty())break;}}
}bool CMyReactor::create_server_listener(const char* ip, short port)
{m_listenfd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);if (m_listenfd == -1)return false;int on = 1;::setsockopt(m_listenfd, SOL_SOCKET, SO_REUSEADDR, (char *)&on, sizeof(on));::setsockopt(m_listenfd, SOL_SOCKET, SO_REUSEPORT, (char *)&on, sizeof(on));struct sockaddr_in servaddr;memset(&servaddr, 0, sizeof(servaddr));servaddr.sin_family = AF_INET;servaddr.sin_addr.s_addr = inet_addr(ip);servaddr.sin_port = htons(port);if (::bind(m_listenfd, (sockaddr *)&servaddr, sizeof(servaddr)) == -1)return false;if (::listen(m_listenfd, 50) == -1)return false;m_epollfd = ::epoll_create(1);if (m_epollfd == -1)return false;struct epoll_event e;memset(&e, 0, sizeof(e));e.events = EPOLLIN | EPOLLRDHUP;e.data.fd = m_listenfd;if (::epoll_ctl(m_epollfd, EPOLL_CTL_ADD, m_listenfd, &e) == -1)return false;return true;
}

main.cpp文件内容:

/** *@desc:   用reactor模式练习服务器程序*@author: zhangyl*@date:   2016.12.03*/#include <iostream>
#include <signal.h>     //for signal()
#include<unistd.h>
#include <stdlib.h>       //for exit()
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include "myreactor.h"CMyReactor g_reator;void prog_exit(int signo)
{std::cout << "program recv signal " << signo << " to exit." << std::endl; g_reator.uninit();
}void daemon_run()
{int pid;signal(SIGCHLD, SIG_IGN);//1)在父进程中,fork返回新创建子进程的进程ID;//2)在子进程中,fork返回0;//3)如果出现错误,fork返回一个负值;pid = fork();if (pid < 0){std:: cout << "fork error" << std::endl;exit(-1);}//父进程退出,子进程独立运行else if (pid > 0) {exit(0);}//之前parent和child运行在同一个session里,parent是会话(session)的领头进程,//parent进程作为会话的领头进程,如果exit结束执行的话,那么子进程会成为孤儿进程,并被init收养。//执行setsid()之后,child将重新获得一个新的会话(session)id。//这时parent退出之后,将不会影响到child了。setsid();int fd;fd = open("/dev/null", O_RDWR, 0);if (fd != -1){dup2(fd, STDIN_FILENO);dup2(fd, STDOUT_FILENO);dup2(fd, STDERR_FILENO);}if (fd > 2)close(fd);
}int main(int argc, char* argv[])
{  //设置信号处理signal(SIGCHLD, SIG_DFL);signal(SIGPIPE, SIG_IGN);signal(SIGINT, prog_exit);signal(SIGKILL, prog_exit);signal(SIGTERM, prog_exit);short port = 0;int ch;bool bdaemon = false;while ((ch = getopt(argc, argv, "p:d")) != -1){switch (ch){case 'd':bdaemon = true;break;case 'p':port = atol(optarg);break;}}if (bdaemon)daemon_run();if (port == 0)port = 12345;if (!g_reator.init("0.0.0.0", 12345))return -1;g_reator.main_loop(&g_reator);return 0;
}

完整实例代码下载地址:

普通版本:https://pan.baidu.com/s/1o82Mkno

C++11版本:https://pan.baidu.com/s/1dEJdrih

服务器端编程心得(二)—— Reactor模式相关推荐

  1. 服务器端编程心得(三)—— 一个服务器程序的架构介绍

    本文将介绍我曾经做过的一个项目的服务器架构和服务器编程的一些重要细节. 一.程序运行环境 操作系统:centos 7.0 编译器:gcc/g++ 4.8.3     cmake 2.8.11 mysq ...

  2. 服务器端编程心得(七)——开源一款即时通讯软件的源码

    服务器端编程心得(七)--开源一款即时通讯软件的源码 2017年04月06日 22:57:01 analogous_love 阅读数:30222更多 所属专栏: 高性能服务器编程实现细节详解 版权声明 ...

  3. 游戏杆编程心得二:如何判断按钮的有效按下

    作者:朱金灿 来源:http://blog.csdn.net/clever101 在游戏杆编程中通过一般需要获取按钮状态来执行特定事件,比如实现按下按钮1一次,变量num递增100,但在调试系统时往往 ...

  4. 服务器端编程心得(一)—— 主线程与工作线程的分工

    服务器端为了能流畅处理多个客户端链接,一般在某个线程A里面accept新的客户端连接并生成新连接的socket fd,然后将这些新连接的socketfd给另外开的数个工作线程B1.B2.B3.B4,这 ...

  5. reactor模式学习

    1.什么是reactor模式 2.reactor模式解决的问题是什么 3.reactor模式的实现方式有哪些 一. 什么是reactor模式 讲这个之前,先整体上看看,一般的服务端架构设计都是怎么做的 ...

  6. Linux网络编程 | 事件处理模式:Reactor模式、Proactor模式

    文章目录 Reactor模式 Proactor模式 同步I/O模型模拟Proactor模式 两者的优缺点 Reactor Proactor 随着网络设计模式的兴起,Reactor和Proactor事件 ...

  7. 文件服务器和客户模式有什么区别,客户端和服务器端编程有什么区别?

    皈依舞 我将尝试以简单的方式解释它.客户端是用户在浏览器上看到/看到的代码.客户端编程包括HTML(HTML,HTML5,DHTML),CSS(CSS,CSS3)和JavaScript(JavaScr ...

  8. (二)什么是Reactor模式

    Reactor模式(反应堆模式): 这便是libevent的中心思想.在常规的I/O多路复用中采用select和poll.epoll等来实现,而将这些机制封装而成的就是I/O多路复用模式,Reacto ...

  9. Java网络编程入门(二)之客户端与服务器端编程步骤详解

    1.2 网络编程技术 前面介绍了网络编程的相关基础知识,初步建立了网络编程的概念,但是实际学习网络编程还必须使用某种程序设计语言进行代码实现,下面就介绍一下网络编程的代码实现. 1.2.1 网络编程步 ...

最新文章

  1. 5G:这次中国说了算!
  2. ‘numpy.float64‘ object cannot be interpreted as an integer
  3. shell实例第2讲:获取随机字符串
  4. linux命令大写输入,Linux命令行:对内容进行大小写字符转换 ????
  5. 以太网应用于控制时存在的问题
  6. 网络基础---IP编址
  7. 移动webAPP前端开发技巧汇总
  8. 安全企业 Stormshield 披露数据泄露事件 源代码被盗
  9. paip.python错误解决9
  10. 吃豆人 博弈_强化吃豆人
  11. 黑客是如何入侵电脑的,没有互联网可以入侵电脑吗?
  12. IOS版aplayer使用教程_管家婆服装版使用教程,重庆任我行管家婆服装软件的使用方法...
  13. 【Android】_MediaServer_仿网易云音乐播放器1(指针和唱片)
  14. linux shell经典脚本,10个经典Linux Shell脚本
  15. css3实现3d正方体动画效果
  16. excel oss 上传_excel上传数据库失败
  17. vue中inject用法
  18. C# List集合快速拼接字符串
  19. 花2w培训数据分析真的值得吗?
  20. Android 一个像素几个字节,android 像素单位的一系列疑问困扰

热门文章

  1. SQL Server 2014 无法连接到服务器
  2. 如何从外网穿透到内网
  3. WebRTC[52] - WebRTC 带宽分配逻辑详解
  4. BI神器Power Query(16)-- PQ制作时间维度表(5)
  5. Google(谷歌)高级搜索
  6. QCA三天写论文!清晰集分析实战
  7. 两片8-3优先编码器转化为16-4线优先编码器真值表--python实现
  8. Tigo Energy通过Stark Renováveis安装案例向巴西安装商展示优化技术
  9. 研究7——发展与应用
  10. 资深SRE带你看阿里云香港故障