《ASCE1885的源码分析》の基于完成端口模型的TCP服务器框架
使用IOCP的TCP服务器使用过程大体如下:
1) 使用CreateIoCompletionPort函数创建完成端口,并以该I/O完成端口为参数创建多个服务线程;
2) 创建监听套接字;
3) 接收客户端连接请求,返回服务套接字;
4) 将服务套接字与完成端口绑定,并在该套接字上投递初始I/O操作请求;
5) 返回步骤3);
服务线程的流程如下:
1) 调用GetQueuedCompletionPort函数等待获取完成信息;
2) 根据需要对数据进行处理并投递后续的I/O操作请求;
3) 返回步骤1)。
程序代码及注释如下:
#include <stdio.h>
#include <winsock2.h>
#include <process.h>
#include <Windows.h>
#pragma comment(lib, "ws2_32.lib")
#define MAX_THREAD_NUM 24 //最大服务器线程数
#define MAX_BUF_LEN 5000 //最大服务器I/O缓冲区大小
//枚举类型,用于指示服务器I/O操作的类型
typedef enum _IO_OPER
{
SVR_IO_READ,
SVR_IO_WRITE
}IO_OPER, *LPIO_OPER;
//扩展重叠结构体,单I/O数据
typedef struct _OverLappedEx
{
OVERLAPPED OverLapped;
WSABUF wbuf; //I/O操作的数据对象
char data[MAX_BUF_LEN];//实际的数据缓冲区
IO_OPER oper; //用于标志I/O操作的类型
DWORD flags; //用于设定或者返回I/O操作的标志
} PER_IO_DATA, *LPPER_IO_DATA;
//完成键结构体,单句柄数据,对应每个服务套接字---每个连接
typedef struct _CONN_CTX
{
SOCKET sockAccept; //该连接的服务器侦听服务套接字
LPPER_IO_DATA pPerIOData; //指向该连接的I/O操作信息
struct _CONN_CTX *pPrev; //用于形成服务器当前所有连接信息的双向链表
struct _CONN_CTX *pNext; //分别指向链表中前一个节点和后一个节点
} CONN_CTX, *LPCONN_CTX;
CRITICAL_SECTION g_CriticalSection; //防止对连接信息链表的访问冲突
LPCONN_CTX g_ptrConnCtxHead = NULL; //指向双向链表头节点,用于对该链表的访问和维护
SOCKET g_sockListen = INVALID_SOCKET;
//该函数用于对控制台消息进行处理,当接收到CTRL_C_EVENT、
//CTRL_LOGOFF_EVENT、¡CTRL_SHUTDOWN_EVENT或者CTRL_CLOSE_EVENT
//事件时,服务器将关闭监听套接字,从而使主线程从接收连接的死循环中
//退出,并最终结束所有服务线程,释放连接等
BOOL WINAPI CtrlHandler(DWORD dwEvent);
//完成端口操作函数
HANDLE CreateNewIoCompletionPort(DWORD dwNumberOfConcurrentThreads);
BOOL AssociateWithIoCompletePort(HANDLE hComPort, HANDLE hDevice, DWORD dwCompKey);
//创建监听套接字
SOCKET CreateListenSock();
//当服务器接受了客户端连接请求后,将返回的服务套接字和完成端口
//作为参数调用该函数,该函数完成服务套接字与完成端口的绑定
//以及为该连接的相关信息分配存储区的工作
LPCONN_CTX CreateConnCtx(SOCKET sockAccept, HANDLE hIOCP);
//将新的连接信息加入到全局的连接信息链表中
void ConnListAdd(LPCONN_CTX lpConnCtx);
//将指定的连接信息从全局连接信息链表中删除,
//并关闭连接,释放相应的存储区资源
void ConnListRemove(LPCONN_CTX lpConnCtx);
//完成服务器退出时关闭连接、释放资源的工作
void ConnListClear();
//由于printf函数只能在用C运行库中函数创建的线程中使用
//因此,本程序重写自己的输出函数
int ASCEPrintf(const char* lpFormat, ...);
//工作线程函数
DWORD WINAPI WorkThread(LPVOID lpParam);
int main(int argc, char* argv[])
{
HANDLE hIOCP;
HANDLE hThreadHandles[MAX_THREAD_NUM];
int nThreadCount;
WSADATA wsaData;
if(WSAStartup(MAKEWORD(2,2), &wsaData) != 0)
{
ASCEPrintf("Winsock initialized failed.../n");
return -1;
}
int i;
for(i=0; i<MAX_THREAD_NUM; i++)
{
hThreadHandles[i] = NULL;
}
//设置控制台事件响应函数º
if(!SetConsoleCtrlHandler(CtrlHandler, TRUE))
{
ASCEPrintf("SetConsoleCtrlHandler:%d/n", GetLastError());
return -1;
}
InitializeCriticalSection(&g_CriticalSection);
__try
{
//创建I/O完成端口
hIOCP = CreateNewIoCompletionPort(0);
if(hIOCP == NULL)
{
ASCEPrintf("CreateIoCompletionPort:%d/n", GetLastError());
__leave;
}
//创建多个工作线程
SYSTEM_INFO sysInfo;
GetSystemInfo(&sysInfo);
//将sysInfo.dwNumberOfProcessors*2+2和¨ª
//MAX_THREAD_NUM之间的较小者赋给nThreadCount
nThreadCount = (sysInfo.dwNumberOfProcessors*2+2) < MAX_THREAD_NUM
? (sysInfo.dwNumberOfProcessors*2+2) : MAX_THREAD_NUM;
for(int i=0; i<nThreadCount; i++)
{
HANDLE hThread = CreateThread(NULL, 0, WorkThread, hIOCP, 0, NULL);
if(hThread == NULL)
{
ASCEPrintf("CreateThread:%d/n", GetLastError());
__leave;
} else {
hThreadHandles[i] = hThread;
}
}
g_sockListen = CreateListenSock();
if(g_sockListen == INVALID_SOCKET)
__leave;
SOCKET sockAccept;
LPCONN_CTX lpConnCtx;
int nResult;
while(true)
{
sockAccept = accept(g_sockListen, NULL, NULL);
if(sockAccept == INVALID_SOCKET)
__leave;
lpConnCtx = CreateConnCtx(sockAccept, hIOCP);
if(lpConnCtx == NULL)
_leave;
else
ConnListAdd(lpConnCtx);
//投递初始I/O操作
nResult = WSARecv(sockAccept,
&(lpConnCtx->pPerIOData->wbuf),
1, NULL,
&(lpConnCtx->pPerIOData->flags),
&(lpConnCtx->pPerIOData->OverLapped),
NULL);
if((nResult == SOCKET_ERROR) &&
(WSAGetLastError() != ERROR_IO_PENDING))
{
ASCEPrintf("WSARecv:%d/n", WSAGetLastError());
ConnListRemove(lpConnCtx);
break;
}
}
} __finally {
if(hIOCP)
{
for(int i=0; i<nThreadCount; i++)
{
PostQueuedCompletionStatus(hIOCP, 0, 0, NULL);
}
}
//等待所有工作线程结束
if(WAIT_OBJECT_0 != WaitForMultipleObjects(nThreadCount,
hThreadHandles, TRUE, 1000))
ASCEPrintf("WaitForMultipleObjects failed:%d/n", GetLastError());
else {
for(int i=0; i<nThreadCount; i++)
{
if(hThreadHandles[i] != NULL)
{
if(!CloseHandle(hThreadHandles[i]))
ASCEPrintf("CloseHandle:%d/n", GetLastError());
}
hThreadHandles[i] = NULL;
}
}
if(hIOCP)
{
CloseHandle(hIOCP);
hIOCP = NULL;
}
if(g_sockListen != INVALID_SOCKET)
{
closesocket(g_sockListen);
g_sockListen = INVALID_SOCKET;
}
if(g_ptrConnCtxHead)
ConnListClear();
ASCEPrintf("...............Stopped./n");
DeleteCriticalSection(&g_CriticalSection);
SetConsoleCtrlHandler(CtrlHandler, FALSE);
WSACleanup();
return 0;
}
}
BOOL WINAPI CtrlHandler(DWORD dwEvent)
{
SOCKET sockTemp = INVALID_SOCKET;
switch(dwEvent)
{
case CTRL_C_EVENT:
case CTRL_LOGOFF_EVENT:
case CTRL_SHUTDOWN_EVENT:
case CTRL_CLOSE_EVENT:
ASCEPrintf("Server Stopping........./n");
sockTemp = g_sockListen;
g_sockListen = INVALID_SOCKET;
if(sockTemp != INVALID_SOCKET)
{
closesocket(sockTemp);
sockTemp = INVALID_SOCKET;
}
break;
default:
return FALSE;
}
return TRUE;
}
//创建I/O完成端口
HANDLE CreateNewIoCompletionPort(DWORD dwNumberOfConcurrentThreads)
{
return (CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0,
dwNumberOfConcurrentThreads));
}
//将套接字与完成端口关联
BOOL AssociateWithIoCompletionPort(HANDLE hComPort, HANDLE hDevice,
DWORD dwCompKey)
{
return (CreateIoCompletionPort(hDevice, hComPort, dwCompKey, 0)
== hComPort);
}
//创建服务器监听套接字
SOCKET CreateListenSock()
{
//创建WSA_FLAG_OVERLAPPED属性的套接字
SOCKET sock = WSASocket(AF_INET, SOCK_STREAM, 0,
NULL, 0, WSA_FLAG_OVERLAPPED);
if(sock == INVALID_SOCKET)
return sock;
BOOL bReuseAddr = true;
if(setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char*)&bReuseAddr,
sizeof(bReuseAddr)) == SOCKET_ERROR)
{
ASCEPrintf("setsockopt:%d/n", WSAGetLastError());
closesocket(sock);
return INVALID_SOCKET;
}
struct sockaddr_in local;
memset(&local, 0, sizeof(local));
local.sin_addr.s_addr = INADDR_ANY;
local.sin_family = AF_INET;
local.sin_port = htons(9999);
if(bind(sock, (struct sockaddr*)&local, sizeof(local)) == SOCKET_ERROR)
{
ASCEPrintf("bind:%d/n", WSAGetLastError());
closesocket(sock);
return INVALID_SOCKET;
}
if(listen(sock, 5) == SOCKET_ERROR)
{
ASCEPrintf("listen:%d/n", WSAGetLastError());
closesocket(sock);
return INVALID_SOCKET;
}
return sock;
}
LPCONN_CTX CreateConnCtx(SOCKET sockAccept, HANDLE hIOCP)
{
LPCONN_CTX lpConnCtx = (LPCONN_CTX)GlobalAlloc(GPTR, sizeof(CONN_CTX));
if(lpConnCtx == NULL)
return NULL;
lpConnCtx->pPerIOData = (LPPER_IO_DATA)GlobalAlloc(GPTR, sizeof(PER_IO_DATA));
if(lpConnCtx->pPerIOData == NULL)
{
GlobalFree(lpConnCtx);
lpConnCtx = NULL;
return NULL;
}
//赋值
lpConnCtx->pNext = NULL;
lpConnCtx->pPrev = NULL;
lpConnCtx->sockAccept = sockAccept;
ZeroMemory(lpConnCtx->pPerIOData, sizeof(PER_IO_DATA));
lpConnCtx->pPerIOData->OverLapped.hEvent = NULL;
lpConnCtx->pPerIOData->OverLapped.Internal = 0;
lpConnCtx->pPerIOData->OverLapped.InternalHigh = 0;
lpConnCtx->pPerIOData->OverLapped.Offset = 0;
lpConnCtx->pPerIOData->OverLapped.OffsetHigh = 0;
lpConnCtx->pPerIOData->wbuf.buf = (char*)lpConnCtx->pPerIOData->data;
lpConnCtx->pPerIOData->wbuf.len = MAX_BUF_LEN;
lpConnCtx->pPerIOData->oper = SVR_IO_READ;
lpConnCtx->pPerIOData->flags = 0;
//将套接字和完成端口绑定
if(!AssociateWithIoCompletionPort(hIOCP, (HANDLE)sockAccept,
(DWORD)lpConnCtx))
{
ASCEPrintf("AssociateWithIoCompletionPort:%d/n", GetLastError());
GlobalFree(lpConnCtx->pPerIOData);
GlobalFree(lpConnCtx);
lpConnCtx = NULL;
return NULL;
}
return lpConnCtx;
}
void ConnListAdd(LPCONN_CTX lpConnCtx)
{
LPCONN_CTX pTemp;
EnterCriticalSection(&g_CriticalSection);
if(g_ptrConnCtxHead == NULL)
{
//链表的第一个节点
lpConnCtx->pPrev = NULL;
lpConnCtx->pNext = NULL;
g_ptrConnCtxHead = lpConnCtx;
} else {
//加到链表头部
pTemp = g_ptrConnCtxHead;
g_ptrConnCtxHead = lpConnCtx;
lpConnCtx->pNext = pTemp;
lpConnCtx->pPrev = NULL;
pTemp->pPrev = lpConnCtx;
}
LeaveCriticalSection(&g_CriticalSection);
}
void ConnListRemove(LPCONN_CTX lpConnCtx)
{
LPCONN_CTX pPrev = NULL;
LPCONN_CTX pNext = NULL;
EnterCriticalSection(&g_CriticalSection);
if(lpConnCtx != NULL)
{
pPrev = lpConnCtx->pPrev;
pNext = lpConnCtx->pNext;
if((pPrev == NULL) && (pNext == NULL)) //链表唯一的节点
{
g_ptrConnCtxHead = NULL;
} else if((pPrev == NULL) && (pNext != NULL)){ //链表首节点
pNext->pPrev = NULL;
g_ptrConnCtxHead = pNext;
} else if((pPrev != NULL) && (pNext == NULL)){ //链表尾节点
pPrev->pNext = NULL;
} else if((pPrev && pNext)){ //链表中间节点
pPrev->pNext = pNext;
pNext->pPrev = pPrev;
}
//关闭连接,释放资源
closesocket(lpConnCtx->sockAccept);
GlobalFree(lpConnCtx->pPerIOData);
GlobalFree(lpConnCtx);
lpConnCtx = NULL;
}
LeaveCriticalSection(&g_CriticalSection);
return;
}
void ConnListClear()
{
LPCONN_CTX pTemp1, pTemp2;
EnterCriticalSection(&g_CriticalSection);
pTemp1 = g_ptrConnCtxHead;
while(pTemp1)
{
pTemp2 = pTemp1->pNext;
ConnListRemove(pTemp1);
pTemp1 = pTemp2;
}
LeaveCriticalSection(&g_CriticalSection);
return;
}
int ASCEPrintf(const char* lpFormat, ...)
{
int nLen = 0;
int nRet = 0;
char cBuffer[512];
va_list arglist;
HANDLE hOut = NULL;
ZeroMemory(cBuffer, sizeof(cBuffer));
va_start(arglist, lpFormat);
nLen = lstrlen(lpFormat);
nRet = wvsprintf(cBuffer, lpFormat, arglist);
if(nRet >= nLen || GetLastError() == 0)
{
hOut = GetStdHandle(STD_OUTPUT_HANDLE);
if(hOut != INVALID_HANDLE_VALUE)
{
WriteConsole(hOut, cBuffer, lstrlen(cBuffer),
(LPDWORD)&nLen, NULL);
}
}
return nLen;
}
DWORD WINAPI WorkThread(LPVOID lpParam)
{
HANDLE hIOCP = (HANDLE)lpParam;
BOOL bSuccess = FALSE;
DWORD dwIOSize;
LPPER_IO_DATA lpPerIOData;
LPOVERLAPPED lpOverLapped;
LPCONN_CTX lpConnCtx;
int nResult;
while(1)
{
bSuccess = GetQueuedCompletionStatus(hIOCP, &dwIOSize,
(LPDWORD)&lpConnCtx, &lpOverLapped, INFINITE);
if(!bSuccess)
{
ASCEPrintf("GetQueuedCompletionStatus:%d/n", GetLastError());
}
if(lpConnCtx == NULL)
{
return 1;
}
lpPerIOData = (LPPER_IO_DATA)(lpOverLapped);
if(!bSuccess || (bSuccess && (dwIOSize == 0)))
{
ConnListRemove(lpConnCtx);
continue;
}
#ifdef _DEBUG
ASCEPrintf("Different way to obtain PER_IO_DATA/n");
ASCEPrintf("The two one must be equal - A:%x/tB:%x/n",
lpConnCtx->pPerIOData, lpPerIOData);
#endif
switch(lpPerIOData->oper)
{
case SVR_IO_WRITE: //send then reveive
#ifdef _DEBUG
ASCEPrintf("Socket %d Send: %s/n", lpConnCtx->sockAccept,
lpPerIOData->wbuf.buf);
#endif
ZeroMemory(lpPerIOData, sizeof(PER_IO_DATA));
lpPerIOData->OverLapped.hEvent = NULL;
lpPerIOData->OverLapped.Internal = 0;
lpPerIOData->OverLapped.InternalHigh = 0;
lpPerIOData->OverLapped.Offset = 0;
lpPerIOData->OverLapped.OffsetHigh = 0;
lpPerIOData->wbuf.buf = (char*)&(lpPerIOData->data);
lpPerIOData->wbuf.len = MAX_BUF_LEN;
lpPerIOData->oper = SVR_IO_READ;
lpPerIOData->flags = 0;
nResult = WSARecv(lpConnCtx->sockAccept,
&(lpPerIOData->wbuf),
1, NULL, &(lpPerIOData->flags),
&(lpPerIOData->OverLapped),
NULL);
if(nResult == SOCKET_ERROR && WSAGetLastError() != ERROR_IO_PENDING)
{
ASCEPrintf("WSARecv:%d/n", WSAGetLastError());
ConnListRemove(lpConnCtx);
}
break;
case SVR_IO_READ: //receive then echo
#ifdef _DEBUG
ASCEPrintf("Socket %d recv:%s/n", lpConnCtx->sockAccept,
lpPerIOData->wbuf.buf);
#endif
lpPerIOData->wbuf.len = dwIOSize;
lpPerIOData->oper = SVR_IO_WRITE;
lpPerIOData->flags = 0;
nResult = WSASend(lpConnCtx->sockAccept,
&(lpPerIOData->wbuf),
1, NULL, lpPerIOData->flags,
&(lpPerIOData->OverLapped),
NULL);
if(nResult == SOCKET_ERROR &&
WSAGetLastError() != ERROR_IO_PENDING)
{
ASCEPrintf("WSASend:%d/n", WSAGetLastError());
ConnListRemove(lpConnCtx);
}
break;
default:
break;
}
}
return 0;
}
转载于:https://www.cnblogs.com/android-html5/archive/2010/09/15/2533983.html
《ASCE1885的源码分析》の基于完成端口模型的TCP服务器框架相关推荐
- AsyncHttpClient源码分析-基于Netty的连接池实现
原文地址:asynchttpclient源码分析-基于Netty的连接池实现 最近项目重构,有了个机会更多接触一个有别于HttpAsyncClient的异步网络框架AsyncHttpClient,是个 ...
- uboot源码分析(基于S5PV210)之启动第一阶段
目录 一.start.S引入 1.u-boot.lds中找到start.S入口 2.SourceInsight中如何找到文件 3.SI中找文件技巧 二.start.S解析 1.不简单的头文件包含 2. ...
- Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法
Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法 1.spreadOutApp尽量平均分配到每个executor上: 2.非spreadO ...
- iostat IO统计原理linux内核源码分析----基于单通道SATA盘
iostat IO统计原理linux内核源码分析----基于单通道SATA盘 先上一个IO发送submit_bio流程图,本文基本就是围绕该流程讲解. 内核版本 3.10.96 详细的源码注释:htt ...
- HashSet及LinkedHashSet源码分析(基于JDK1.6)
Java容器类的用途是"保存对象",分为两类:Map--存储"键值对"组成的对象:Collection--存储独立元素.Collection又可以分为List和 ...
- 吐血整理:Java线程池源码分析(基于JDK1.8建议收藏)
文章目录 一.引言 二.线程池的参数介绍 1.ThreadPoolExecutor的UML图 三.线程池的使用 1.线程池的工作原理 2.线程池类型 2.1.newCachedThreadPool使用 ...
- YYModel 源码分析:字典转模型
本文拿一个简单的例子,看 YYModel 字典转模型的源代码 有这么个模型 @interface Author : NSObject @property NSString *name; @proper ...
- AtomicInteger源码分析——基于CAS的乐观锁实现
原文出处: bestStyle 1. 悲观锁与乐观锁 我们都知道,cpu是时分复用的,也就是把cpu的时间片,分配给不同的thread/process轮流执行,时间片与时间片之间,需要进行cpu切换, ...
- android输入法源码分析,基于Android的输入法的设计与实现.doc
本 科 毕 业 论 文 基于Android的输入法的设计与实现 The Design and Realization of the Input Method Based on Android 学 院 ...
最新文章
- php删除指定对象的属性及属性值
- python中for循环的用法_浅谈Python的for循环
- /usr/bin/ld: cannot find -lltdl collect2: ld returned 1 exit status make: *** [sapi/cgi/php-cgi] Err
- 修改shell命令提示符和命令的输入颜色
- Linux Vim 光标错位,技术|Vim 复制粘帖格式错乱问题的解决办法
- 站在智能路由的风口,他选择把传统OA放进盒子
- MYSQL中5.7.10ROOT密码及创建用户
- MySQL Innodb Engine -- 文件格式(innodb_file_format)
- linux入门生信,优秀学员的学习方法展示
- 赵栋 201771010137 《面向对象程序设计(java)》第二周学习总结
- php系统变量有哪些,php预定义系统变量
- editplus 打开大文件_CorelDRAW文件损坏的几种解决方法
- 【通信原理 入坑之路】—— 深入理解奈奎斯特第一准则与码间串扰
- 盘口功夫——研判股价启动前的四种征兆----
- TikTok二面:“聊聊二维码扫码登录的原理”。
- Submissions in Production是什么意思?
- 皮克定理,多边形面积以及线段上整点个数
- Vanishing Point Constrained Lane DetectionWith a Stereo Camera (IEEE 2017)
- js处理移动端有虚拟按键影响页面布局的处理方法
- 安装惠普打印机显示等待php,安装惠普打印机出现“新设备现已连接”一直不动怎么办?...