1. reactor模型,本质上讲管理网络IO

使用下面这个结构用来管理我们的IO

struct ntyreactor {int epfd;struct ntyevent *events;
};

下面这段代码比较核心(我们关心的事件与发生的事件发生时,才去调用)

if ((events[i].events & EPOLLIN) && (ev->events & EPOLLIN)) {ev->callback(ev->fd, events[i].events, ev->arg);
}
if ((events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT)) {ev->callback(ev->fd, events[i].events, ev->arg);
}


2. 代码实现:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <arpa/inet.h>#include <fcntl.h>
#include <unistd.h>
#include <errno.h>#define BUFFER_LENGTH        4096
#define MAX_EPOLL_EVENTS    1024
#define SERVER_PORT            8888typedef int NCALLBACK(int ,int, void*);struct ntyevent {int fd;int events;void *arg;int (*callback)(int fd, int events, void *arg);int status;char buffer[BUFFER_LENGTH];int length;long last_active;
};struct ntyreactor {int epfd;struct ntyevent *events;
};int recv_cb(int fd, int events, void *arg);
int send_cb(int fd, int events, void *arg);void nty_event_set(struct ntyevent *ev, int fd, NCALLBACK callback, void *arg) {ev->fd = fd;ev->callback = callback;ev->events = 0;ev->arg = arg;ev->last_active = time(NULL);return ;}//nty_event_add(reactor->epfd, EPOLLIN, &reactor->events[clientfd]);
int nty_event_add(int epfd, int events, struct ntyevent *ev) {struct epoll_event ep_ev = {0, {0}};ep_ev.data.ptr = ev;ep_ev.events = ev->events = events;int op;if (ev->status == 1) {op = EPOLL_CTL_MOD;} else {op = EPOLL_CTL_ADD;ev->status = 1;}if (epoll_ctl(epfd, op, ev->fd, &ep_ev) < 0) {printf("event add failed [fd=%d], events[%d]\n", ev->fd, events);return -1;}return 0;
}int nty_event_del(int epfd, struct ntyevent *ev) {struct epoll_event ep_ev = {0, {0}};if (ev->status != 1) {return -1;}ep_ev.data.ptr = ev;ev->status = 0;epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, &ep_ev);return 0;
}int recv_cb(int fd, int events, void *arg) {struct ntyreactor *reactor = (struct ntyreactor*)arg;struct ntyevent *ev = reactor->events+fd;int len = recv(fd, ev->buffer, BUFFER_LENGTH, 0);nty_event_del(reactor->epfd, ev);if (len > 0) {ev->length = len;ev->buffer[len] = '\0';printf("C[%d]:%s\n", fd, ev->buffer);nty_event_set(ev, fd, send_cb, reactor);nty_event_add(reactor->epfd, EPOLLOUT, ev);} else if (len == 0) {close(ev->fd);printf("[fd=%d] pos[%ld], closed\n", fd, ev-reactor->events);} else {close(ev->fd);printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno));}return len;
}int send_cb(int fd, int events, void *arg) {struct ntyreactor *reactor = (struct ntyreactor*)arg;struct ntyevent *ev = reactor->events+fd;int len = send(fd, ev->buffer, ev->length, 0);if (len > 0) {printf("send[fd=%d], [%d]%s\n", fd, len, ev->buffer);nty_event_del(reactor->epfd, ev);nty_event_set(ev, fd, recv_cb, reactor);nty_event_add(reactor->epfd, EPOLLIN, ev);} else {close(ev->fd);nty_event_del(reactor->epfd, ev);printf("send[fd=%d] error %s\n", fd, strerror(errno));}return len;
}int accept_cb(int fd, int events, void *arg) {struct ntyreactor *reactor = (struct ntyreactor*)arg;if (reactor == NULL) return -1;struct sockaddr_in client_addr;socklen_t len = sizeof(client_addr);int clientfd;if ((clientfd = accept(fd, (struct sockaddr*)&client_addr, &len)) == -1) {if (errno != EAGAIN && errno != EINTR) {}printf("accept: %s\n", strerror(errno));return -1;}int i = 0;do {for (i = 0;i < MAX_EPOLL_EVENTS;i ++) {if (reactor->events[i].status == 0) {break;}}if (i == MAX_EPOLL_EVENTS) {printf("%s: max connect limit[%d]\n", __func__, MAX_EPOLL_EVENTS);break;}int flag = 0;if ((flag = fcntl(clientfd, F_SETFL, O_NONBLOCK)) < 0) {printf("%s: fcntl nonblocking failed, %d\n", __func__, MAX_EPOLL_EVENTS);break;}nty_event_set(&reactor->events[clientfd], clientfd, recv_cb, reactor);//设置注册函数nty_event_add(reactor->epfd, EPOLLIN, &reactor->events[clientfd]);} while (0);printf("new connect [%s:%d][time:%ld], pos[%d]\n", inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port), reactor->events[i].last_active, i);return 0;}int init_sock(short port) {int fd = socket(AF_INET, SOCK_STREAM, 0);fcntl(fd, F_SETFL, O_NONBLOCK);struct sockaddr_in server_addr;memset(&server_addr, 0, sizeof(server_addr));server_addr.sin_family = AF_INET;server_addr.sin_addr.s_addr = htonl(INADDR_ANY);server_addr.sin_port = htons(port);bind(fd, (struct sockaddr*)&server_addr, sizeof(server_addr));if (listen(fd, 20) < 0) {printf("listen failed : %s\n", strerror(errno));}return fd;
}int ntyreactor_init(struct ntyreactor *reactor) {if (reactor == NULL) return -1;memset(reactor, 0, sizeof(struct ntyreactor));reactor->epfd = epoll_create(1);if (reactor->epfd <= 0) {printf("create epfd in %s err %s\n", __func__, strerror(errno));return -2;}reactor->events = (struct ntyevent*)malloc((MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));if (reactor->events == NULL) {printf("create epfd in %s err %s\n", __func__, strerror(errno));close(reactor->epfd);return -3;}
}int ntyreactor_destory(struct ntyreactor *reactor) {close(reactor->epfd);free(reactor->events);}int ntyreactor_addlistener(struct ntyreactor *reactor, int sockfd, NCALLBACK *acceptor) {if (reactor == NULL) return -1;if (reactor->events == NULL) return -1;nty_event_set(&reactor->events[sockfd], sockfd, acceptor, reactor);//给监听事件+accpternty_event_add(reactor->epfd, EPOLLIN, &reactor->events[sockfd]);//将监听事件,添加到红黑树return 0;
}int ntyreactor_run(struct ntyreactor *reactor) {if (reactor == NULL) return -1;if (reactor->epfd < 0) return -1;if (reactor->events == NULL) return -1;struct epoll_event events[MAX_EPOLL_EVENTS+1];int checkpos = 0, i;while (1) {long now = time(NULL);for (i = 0;i < 100;i ++, checkpos ++) {if (checkpos == MAX_EPOLL_EVENTS) {checkpos = 0;}if (reactor->events[checkpos].status != 1) {continue;}long duration = now - reactor->events[checkpos].last_active;if (duration >= 60) {close(reactor->events[checkpos].fd);printf("[fd=%d] timeout\n", reactor->events[checkpos].fd);nty_event_del(reactor->epfd, &reactor->events[checkpos]);}}int nready = epoll_wait(reactor->epfd, events, MAX_EPOLL_EVENTS, 1000);if (nready < 0) {printf("epoll_wait error, exit\n");continue;}for (i = 0;i < nready;i ++) {struct ntyevent *ev = (struct ntyevent*)events[i].data.ptr;if ((events[i].events & EPOLLIN) && (ev->events & EPOLLIN)) {ev->callback(ev->fd, events[i].events, ev->arg);}if ((events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT)) {ev->callback(ev->fd, events[i].events, ev->arg);}}}
}int main(int argc, char *argv[]) {unsigned short port = SERVER_PORT;if (argc == 2) {port = atoi(argv[1]);}int sockfd = init_sock(port);struct ntyreactor *reactor = (struct ntyreactor*)malloc(sizeof(struct ntyreactor));ntyreactor_init(reactor);//初始化reactorntyreactor_addlistener(reactor, sockfd, accept_cb);//添加监听事件ntyreactor_run(reactor);ntyreactor_destory(reactor);close(sockfd);return 0;
}

reactor ---- 反应堆模型相关推荐

  1. epoll服务器反应堆模型

    常规的epoll处理 epoll是io多路复用的一种实现方式,最开始我们使用epoll是对多个fd进行管理,当epoll_wait从内核的rdllist就绪链表中取出一定数量的poll_event时, ...

  2. 【Netty】主从反应器 ( Reactor ) 多线程模型

    文章目录 一. 主从 反应器 ( Reactor ) 多线程 模式 二. 主从 反应器 ( Reactor ) 多线程 工作流程 三. 主从 反应器 ( Reactor ) 多线程 优缺点分析 四. ...

  3. reactor线程模型_简单了解Java Netty Reactor三种线程模型

    1. Reactor三种线程模型 1.1. 单线程模型 Reactor单线程模型,指的是所有的IO操作都在同一个NIO线程上面完成,NIO线程的职责如下: 1)作为NIO服务端,接收客户端的TCP连接 ...

  4. reactor多线程模型_Netty运用Reactor模式到极致

    常见的reactor模式有以下三种 单线程reactor 多线程reactor 主从reactor 1.单线程reactor ractor 单线程模式是指所有的I/O操作都在一个NIO线程完成,该线程 ...

  5. 浅谈Reactor 线程模型

    Reactor 的线程模型有三种:单线程模型.多线程模型.主从多线程模型.首先来看一下单线程模型,如下图所示: 所谓单线程, 即Acceptor 处理和andler 处理都在同一个线程中处理.这个模型 ...

  6. reactor多线程模型_网络编程模型的演进之路

    在没有IO多路复用的模型的情况下,为了支持高并发采取以下网络模型 一:阻塞IO+多线程 client连接服务器,服务器有一个线程阻塞的调用accept,accept接收到连接后,创建一个线程来读写读写 ...

  7. Netty Reactor线程模型与EventLoop详解

    本文来说下Netty Reactor线程模型与EventLoop 文章目录 EventLoop事件循环 任务调度 线程管理 线程分配 非阻塞传输 阻塞传输 Netty线程模型 单Reactor单线程模 ...

  8. Reactor线程模型

    一 传统阻塞线程模型 第一:服务器端有一个Acceptor线程接收客户端请求 第二:Acceptor接收到每一个客户端请求后,为每一个线程分配一个线程处理客户端请求 缺点: 第一:当数据量很大或者客户 ...

  9. 主从reactor 多线程模型

    相比多线程reactor模型,主从reactor多线程模型拥有了一个独立处理 SocketChannel 连接的线程池,当客户端从Acceptor建立连接之后,便将该连接绑定到subreactor 线 ...

  10. reactor线程模型_面试一文搞定JAVA的网络IO模型

    1,最原始的BIO模型 该模型的整体思路是有一个独立的Acceptor线程负责监听客户端的链接,它接收到客户端链接请求之后为每个客户端创建一个新的线程进行链路处理,处理完成之后,通过输出流返回应答给客 ...

最新文章

  1. 蔚来招聘|多传感器联合标定算法工程师
  2. 【数据结构与算法】之深入解析“两个数组的交集”的求解思路与算法示例
  3. windows下远程连接Linux桌面
  4. 生产环境下,oracle不同用户间的数据迁移。第三部分
  5. 程序员50题(JS版本)(二)
  6. 给用户添加sudo功能
  7. 类与类加载器---《深入理解java虚拟机》
  8. [LeetCode]两两交换链表中的节点(Swap Nodes in Pairs)
  9. ENSP路由交换机配置
  10. 可以免费测试的短信验证码接口接入
  11. 实现自己选取歌曲制作手机铃声
  12. 【转】redis利用姿势收集
  13. Unity(5.x)跑酷游戏 Ultimate Endless Runner Kit v1.03资源包
  14. python selenium下载电子书
  15. 使用ViKey加密狗实现Windows登陆的方法
  16. Facebook应用开发之应用后台配置,以及GraphAPI使用(PHP-SDKJS-SDK)
  17. 图文超详细教学解决windows右键没有没有显示git属性
  18. 算法模型是什么意思,算法模型定义介绍
  19. 天池数字生态创新大赛-遥感
  20. 网站虚拟空间购买指南

热门文章

  1. eclipse没有server选项怎么解决
  2. 7-7 整数的分类处理 (20 分)
  3. [导入][凤穿牡丹][2008精品年代剧][全38集][李小冉 应采儿]
  4. sitecore系统教程之使用修补程序文件自定义Sitecore配置
  5. React的单向数据流与组件间的沟通
  6. [bzoj 1855][SCOI2010]股票交易
  7. tld 第二个函数tldInitFirstFrame
  8. 如何配置cocos2d-x安卓开发环境?
  9. Snabbt.js – 极简的 JavaScript 动画库
  10. TCP的三次握手和四次挥手(超详解)