ucontext实现的用户级多线程框架3(实现echo服务器)
前面一篇文章实现了一个抢先式的用户级多线程框架,现在用那个框架编写一个echo服务,
因为只是个实验,所以代码写得比较杂乱,还有很多可能出错的情况也没有处理,这些在今后的进一
步研究中都会慢慢修改,下面是代码:
uthread.h
/** brief: 用ucontext实现的用户级线程框架* author: kenny huang* date: 2009/10/13* email: huangweilook@21cn.com*/#ifndef _UTHREAD_H#define _UTHREAD_H#include <ucontext.h>#include <stdio.h>#include <string.h>#include <list>#include <pthread.h>#include <signal.h>#include "socketwrapper.h" #define MAX_UTHREAD 128 void int_signal_handler(int sig);//用户态线程的当前状态enum thread_status{ ACTIVED = 0,//可运行的 BLOCKED,//被阻塞 SLEEP,//主动休眠 DIE,//死死亡}; typedef int (*uthread_func)(void*); class Scheduler;class u_thread; typedef struct{int index;//在Scheduler::threads中的下标 u_thread *p_uthread; ucontext_t *p_context; }uthread_id; /** 用户态线程*/class u_thread{ friend class Scheduler;private: u_thread(unsigned int ssize,int index,uthread_id parent) :ssize(ssize),_status(BLOCKED),parent_context(parent.p_context) { stack = new char[ssize]; ucontext.uc_stack.ss_sp = stack; ucontext.uc_stack.ss_size = ssize; getcontext(&ucontext); uid.index = index; uid.p_uthread = this; uid.p_context = &ucontext; } ~u_thread() { delete []stack; }static void star_routine(int uthread,int func,int arg); public: ucontext_t* GetParentContext() {return parent_context; } ucontext_t *GetContext() {return &ucontext; }void SetStatus(thread_status _status) {this->_status = _status; } thread_status GetStatus() {return _status; } uthread_id GetUid() {return uid; } private: ucontext_t ucontext; ucontext_t *parent_context;//父亲的context char *stack;//coroutine使用的栈 unsigned int ssize;//栈的大小 thread_status _status; uthread_id uid;}; void BeginRun();bool cSpawn(uthread_func func,void *arg,unsigned int stacksize);int cRecv(int sock,char *buf,int len);int cSend(int sock,char *buf,int len);int cListen(int sock);void cSleep(int t); class beat; /** 任务调度器*/class Scheduler{ friend class beat; friend class u_thread; friend void BeginRun(); friend bool cSpawn(uthread_func func,void *arg,unsigned int stacksize); friend int cRecv(int sock,char *buf,int len); friend int cSend(int sock,char *buf,int len); friend int cListen(int sock); friend void cSleep(int t);public:static void scheduler_init();static void int_sig(); private: //休眠time时间 static void sleep(int t);static void check_Network();static void schedule();static bool spawn(uthread_func func,void *arg,unsigned int stacksize);static int recv(int sock,char *buf,int len);static int send(int sock,char *buf,int len);static int listen(int sock); private:static std::list<u_thread*> activeList;//可运行uthread列表 static std::list<std::pair<u_thread*,time_t> > sleepList;//正在睡眠uthread列表 static volatile bool block_signal; static char stack[4096]; static ucontext_t ucontext;//Scheduler的context static uthread_id uid_current;//当前正获得运行权的context static uthread_id uid_self; static u_thread *threads[MAX_UTHREAD];static int total_count;static int epollfd;const static int maxsize = 10;};/*心跳发射器,发射器必须运行在一个独立的线程中,以固定的间隔* 往所有运行着coroutine的线程发送中断信号*/class beat{public: beat(unsigned int interval):interval(interval) {}void setThread(pthread_t id) { thread_scheduler = id; }void loop() {while(true) {//每隔固定时间向所有线程发中断信号 ::usleep(1000 * interval);while(1) {if(!Scheduler::block_signal) { pthread_kill(thread_scheduler,SIGUSR1);break; } } } }private: unsigned int interval;//发送中断的间隔(豪秒) pthread_t thread_scheduler;}; bool initcoroutine(unsigned int interval); #endif
uthread.cpp
#include "uthread.h"#include <assert.h>#include <stdlib.h>#include <time.h>#include <netinet/in.h>#include <sys/socket.h>#include <sys/time.h>#include <netdb.h>#include <stdio.h>#include <stdlib.h>#include <string.h>#include <unistd.h>#include <sys/epoll.h>#include "thread.h" ucontext_t Scheduler::ucontext; char Scheduler::stack[4096]; uthread_id Scheduler::uid_current; uthread_id Scheduler::uid_self; u_thread *Scheduler::threads[MAX_UTHREAD]; int Scheduler::total_count = 0; int Scheduler::epollfd = 0; volatile bool Scheduler::block_signal = true; std::list<u_thread*> Scheduler::activeList; std::list<std::pair<u_thread*,time_t> > Scheduler::sleepList; struct sock_struct{int sockfd; u_thread *uThread;}; void int_signal_handler(int sig){ Scheduler::int_sig();} void u_thread::star_routine(int uthread,int func,int arg){ u_thread *self_uthread = (u_thread *)uthread; assert(self_uthread); self_uthread->SetStatus(ACTIVED); ucontext_t *self_context = self_uthread->GetContext(); swapcontext(self_context,self_uthread->GetParentContext()); Scheduler::block_signal = false; uthread_func _func = (uthread_func)func;void *_arg = (void*)arg;int ret = _func(_arg); self_uthread->SetStatus(DIE);}void Scheduler::scheduler_init(){for(int i = 0; i < MAX_UTHREAD; ++i) threads[i] = 0; getcontext(&ucontext); ucontext.uc_stack.ss_sp = stack; ucontext.uc_stack.ss_size = sizeof(stack); ucontext.uc_link = NULL; //scheduler占用下标0 makecontext(&ucontext,schedule, 0); uid_self.index = 0; uid_self.p_uthread = 0; uid_self.p_context = &ucontext; uid_current = uid_self; }int Scheduler::listen(int sock){ u_thread *self_thread = uid_current.p_uthread; epoll_event ev; sock_struct *ss = new sock_struct; ss->uThread = self_thread; ss->sockfd = sock; ev.data.ptr = (void*)ss; ev.events = EPOLLIN;int ret; TEMP_FAILURE_RETRY(ret = epoll_ctl(epollfd,EPOLL_CTL_ADD,sock,&ev));if(ret != 0) {return -1; } self_thread->SetStatus(BLOCKED); Scheduler::block_signal = true; swapcontext(uid_current.p_context,&Scheduler::ucontext); Scheduler::block_signal = false;return 1;} int Scheduler::recv(int sock,char *buf,int len){if(!buf || !(len > 0))return -1; u_thread *self_thread = uid_current.p_uthread; sock_struct *ss = new sock_struct; ss->uThread = self_thread; ss->sockfd = sock; epoll_event ev; ev.data.ptr = (void*)ss; ev.events = EPOLLIN;int ret; TEMP_FAILURE_RETRY(ret = epoll_ctl(epollfd,EPOLL_CTL_ADD,sock,&ev));if(ret != 0)return -1; self_thread->SetStatus(BLOCKED); Scheduler::block_signal = true; swapcontext(uid_current.p_context,&Scheduler::ucontext); printf("recv return/n"); Scheduler::block_signal = false; ret = read(sock,buf,len);return ret;}int Scheduler::send(int sock,char *buf,int len){if(!buf || !(len > 0))return -1; u_thread *self_thread = uid_current.p_uthread; sock_struct *ss = new sock_struct; ss->uThread = self_thread; ss->sockfd = sock; epoll_event ev; ev.data.ptr = (void*)ss; ev.events = EPOLLOUT;int ret; TEMP_FAILURE_RETRY(ret = epoll_ctl(epollfd,EPOLL_CTL_ADD,sock,&ev));if(ret != 0)return -1; self_thread->SetStatus(BLOCKED); Scheduler::block_signal = true; swapcontext(uid_current.p_context,&Scheduler::ucontext); Scheduler::block_signal = false; ret = write(sock,buf,len);return ret; }void Scheduler::check_Network(){ epoll_event events[maxsize]; sock_struct *ss;int nfds = TEMP_FAILURE_RETRY(epoll_wait(epollfd,events,maxsize,0));for( int i = 0 ; i < nfds ; ++i) {//套接口可读 if(events[i].events & EPOLLIN) { ss = (sock_struct*)events[i].data.ptr; printf("a sock can read/n"); ss->uThread->SetStatus(ACTIVED); epoll_event ev; ev.data.fd = ss->sockfd;if(0 != TEMP_FAILURE_RETRY(epoll_ctl(epollfd,EPOLL_CTL_DEL,ev.data.fd,&ev))) { printf("error here/n"); exit(0); } delete ss;continue; }//套接口可写 if(events[i].events & EPOLLOUT) { ss = (sock_struct*)events[i].data.ptr; printf("a sock can write/n"); ss->uThread->SetStatus(ACTIVED); epoll_event ev; ev.data.fd = ss->sockfd; TEMP_FAILURE_RETRY(epoll_ctl(epollfd,EPOLL_CTL_DEL,ev.data.fd,&ev)); delete ss;continue; } }}void Scheduler::schedule(){ epollfd = TEMP_FAILURE_RETRY(epoll_create(maxsize)); if(epollfd<= 0) { printf("epoll init error/n");return; } while(total_count > 0) { //首先执行active列表中的uthread std::list<u_thread*>::iterator it = activeList.begin(); std::list<u_thread*>::iterator end = activeList.end();for( ; it != end; ++it) {if(*it && (*it)->GetStatus() == ACTIVED) { uid_current = (*it)->GetUid(); swapcontext(&ucontext,uid_current.p_context); uid_current = uid_self; int index = (*it)->GetUid().index;if((*it)->GetStatus() == DIE) { printf("%d die/n",index); delete threads[index]; threads[index] = 0; --total_count; activeList.erase(it);break; }else if((*it)->GetStatus() == SLEEP) { printf("%d sleep/n",index); activeList.erase(it);break; } } }//检查网络,看看是否有套接口可以操作 check_Network();//看看Sleep列表中是否有uthread该醒来了 std::list<std::pair<u_thread*,time_t> >::iterator its = sleepList.begin(); std::list<std::pair<u_thread*,time_t> >::iterator ends = sleepList.end(); time_t now = time(NULL);for( ; its != ends; ++its) {//可以醒来了 if(now >= its->second) { u_thread *uthread = its->first; uthread->SetStatus(ACTIVED); activeList.push_back(uthread); sleepList.erase(its);break; } } } printf("scheduler end/n");} bool Scheduler::spawn(uthread_func func,void *arg,unsigned int stacksize){ printf("uthread_create/n");if(total_count >= MAX_UTHREAD)return false;int i = 1;for( ; i < MAX_UTHREAD; ++i) {if(threads[i] == 0) { threads[i] = new u_thread(stacksize,i,uid_current); ++total_count; ucontext_t *cur_context = threads[i]->GetContext(); activeList.push_back(threads[i]); cur_context->uc_link = &ucontext; makecontext(cur_context,(void (*)())u_thread::star_routine, 3,(int)&(*threads[i]),(int)func,(int)arg); swapcontext(uid_current.p_context, cur_context); printf("return from parent/n");return true; } } return false;}void Scheduler::sleep(int t){ u_thread *self_thread = uid_current.p_uthread; time_t now = time(NULL); now += t;//插入到sleep列表中 sleepList.push_back(std::make_pair(self_thread,now)); //保存当前上下文切换回scheduler self_thread->SetStatus(SLEEP); ucontext_t *cur_context = self_thread->GetContext(); Scheduler::block_signal = true; swapcontext(cur_context,&Scheduler::ucontext); Scheduler::block_signal = false;}void Scheduler::int_sig(){//printf("Scheduler::int_sig()%x/n",uid_current.p_context); Scheduler::block_signal = true; swapcontext(uid_current.p_context,&Scheduler::ucontext); Scheduler::block_signal = false;} class HeartBeat : public runnable{public: HeartBeat(unsigned int interval) { _beat = new beat(interval); _beat->setThread(pthread_self()); } ~HeartBeat() { delete _beat; } bool run() { _beat->loop();return true; } private: beat *_beat;}; bool initcoroutine(unsigned int interval){//初始化信号 struct sigaction sigusr1; sigusr1.sa_flags = 0; sigusr1.sa_handler = int_signal_handler; sigemptyset(&sigusr1.sa_mask);int status = sigaction(SIGUSR1,&sigusr1,NULL);if(status == -1) { printf("error sigaction/n");return false; } //首先初始化调度器 Scheduler::scheduler_init(); //启动心跳 static HeartBeat hb(interval);static Thread c(&hb); c.start();return true;} void BeginRun(){ Scheduler::schedule();} bool cSpawn(uthread_func func,void *arg,unsigned int stacksize){return Scheduler::spawn(func,arg,stacksize); } int cRecv(int sock,char *buf,int len){return Scheduler::recv(sock,buf,len); } int cSend(int sock,char *buf,int len){return Scheduler::send(sock,buf,len); } int cListen(int sock){return Scheduler::listen(sock);} void cSleep(int t){return Scheduler::sleep(t);}
echoserver.c
// kcoroutine.cpp : 定义控制台应用程序的入口点。//#include "uthread.h"#include "thread.h"int port;int test(void *arg){char *name = (char*)arg; unsigned long c = 0;while(1) {if(c % 10000 == 0) { printf("%d/n",c); cSleep(1); } ++c; }}int echo(void *arg){int sock = *(int*)arg;while(1) {char buf[1024];int ret = cRecv(sock,buf,1024);if(ret > 0) { printf("%s/n",buf); ret = cSend(sock,buf,ret); } }}int listener(void *arg){struct sockaddr_in servaddr;int listenfd;if(0 > (listenfd = Tcp_Listen("127.0.0.1",port,servaddr,5))) { printf("listen error/n");return 0; }while(1) {if(cListen(listenfd) > 0) { printf("a user comming/n");struct sockaddr_in cliaddr; socklen_t len;int sock = Accept(listenfd,(struct sockaddr*)NULL,NULL);if(sock >= 0) { cSpawn(echo,&sock,4096); } } } }int main(int argc, char **argv){ port = atoi(argv[1]);if(!initcoroutine(20))return 0; cSpawn(listener,0,4096);char name[10] = "test"; cSpawn(test,name,4096); printf("create finish/n"); //开始调度线程的运行 BeginRun();return 0;}
运行后会看到控制台中不断的输出1,那是runable_test在工作,
telnet几个客户端上去,就可以看到echo的效果了,总体来看效果还不错,
不过context的切换效率还没有测试过.
转载于:https://www.cnblogs.com/sniperHW/archive/2012/04/02/2429644.html
ucontext实现的用户级多线程框架3(实现echo服务器)相关推荐
- 内核级线程(KLT)和用户级线程(ULT)
文章目录 进程和线程 内核级线程(Kemel-Level Threads, KLT 也有叫做内核支持的线程) 纯内核级线程特点 用户级线程(User-Level Threads ULT) 纯用户级线程 ...
- linux ucontext 类型,协程:posix::ucontext用户级线程实现原理分析 | WalkerTalking
在听完leader的课程后,对其中协程的实现方式有了基本的了解,无论的POSIX的ucontex,boost::fcontext,还是libco,都是通过保存和恢复寄存器状态,来进行各个协程上下文的保 ...
- 利用用户级线程提高多线程应用的性能
随着处理器往多核的发展,多线程被越来越多的应用到软件的开发中.但是如果没有正确的使用多线程,反而可能会导致软件性能的下降. 多线程程序中一个影响程序性能的因素就是同步.对于windows系统来说,最快 ...
- 微信亿级用户异常检测框架的设计与实践
微信亿级用户异常检测框架的设计与实践 参考文章: (1)微信亿级用户异常检测框架的设计与实践 (2)https://www.cnblogs.com/qcloud1001/p/8351385.html ...
- Linux下的LWP(轻量级进程)、进程 、 线程、用户级线程、内核线程
一.定义 再看正文之前我要先强调一下几点: 1. Linux中没有真正的线程,但windows中确实有线程 2. Linux中没有的线程是由进程来模拟实现的(又称作:轻量级进程) 3. 所以在Linu ...
- 百万用户级游戏服务器架构设计
百万用户级游戏服务器架构设计 服务器结构探讨 -- 最简单的结构 所谓服务器结构,也就是如何将服务器各部分合理地安排,以实现最初的功能需求.所以,结构本无所谓正确与错误:当然,优秀的结构更有助于系统的 ...
- 百万用户级游戏服务器架构设计与游戏视频开发平台源码分享
服务器结构探讨 -- 最简单的结构 所谓服务器结构,也就是如何将服务器各部分合理地安排,以实现最初的功能需求.所以,结构本无所谓正确与错误:当然,优秀的结构更有助于系统的搭建,对系统的可扩展性及可维护 ...
- 百万用户级游戏服务器架构介绍
服务器结构探讨 -- 最简单的结构 所谓服务器结构,也就是如何将服务器各部分合理地安排,以实现最初的功能需求.所以,结构本无所谓正确与错误:当然,优秀的结构更有助于系统的搭建,对系统的可扩展性及可维护 ...
- 应用退出前不让线程切换_用户级线程和内核级线程,你分清楚了吗?
前天晚上有个伙伴私信我说在学进程和线程,问我有没有好的方法和学习教程,刚好我最近也在备相关的课. 班上不少学生学的还是很不错的.拿班上小白和小明的例子吧(艺名哈).小明接受能力很强,小白则稍差些. 关 ...
最新文章
- 13款基于jQuery Mobile的布局插件和示例
- 接口经常超时?线程池+ FutureTask来解决!
- 网络工程中,VLAN到底有什么作用?
- 刷爆了!GitHub标星1.6W,这个 Python 项目太实用!
- SMT32H743+CubeMX-配置MPU后,在Keil上的程序卡死
- 小爱同学app安卓版_小爱课程表3.0全新升级 课表倒入更简单所有手机能用
- 基于visual Studio2013解决面试题之0410计算二进制中1的个数
- 【概率论基础】机器学习领域必知必会的12种概率分布(附Python代码实现)
- AndroidStudio修改布局文件运行无效
- c# RestClient 请求接口
- Opencv相机校准之棋盘格标定
- java里面有radix树吗_基数树(radix tree)
- 微信小程序学习之路——API用户信息
- 蓝桥java练习记录
- Unity中的角色属性芒星比例图
- barman备份的配置使用(备份+恢复)
- MATLAB求单位阶跃响应,并分析参量的影响。自控例题。
- 【Ubuntu 20.04 LTS】设置笔记本合并盖子不休眠
- 二分法查找(dichotomy)--python实现
- 【VCS】Git之无尽探索
热门文章
- 怎么关闭wordpress评论
- vc6静态库的生成和调用
- 【GStreamer】gstreamer工具详解之:gst-discoverer-1.0
- #每天一种设计模式# 观察者模式
- ue4蓝图节点手册中文_在UE4中播放视频
- python tkinter选择路径控件_Python3 Tkinter选择路径功能的实现方法
- python编写脚本方法_【Python】教你一步步编写banner获取脚本
- php好的mvc中index方法,创建一个mvc应用目录架构并创建入口文件index.php
- 【java】maven工程使用switch时不能使用String解决方法
- Express应用配置端口