前面一篇文章实现了一个抢先式的用户级多线程框架,现在用那个框架编写一个echo服务,

因为只是个实验,所以代码写得比较杂乱,还有很多可能出错的情况也没有处理,这些在今后的进一

步研究中都会慢慢修改,下面是代码:

uthread.h

/** brief: 用ucontext实现的用户级线程框架* author: kenny huang* date: 2009/10/13* email: huangweilook@21cn.com*/#ifndef _UTHREAD_H#define _UTHREAD_H#include <ucontext.h>#include <stdio.h>#include <string.h>#include <list>#include <pthread.h>#include <signal.h>#include "socketwrapper.h"

#define MAX_UTHREAD 128

void int_signal_handler(int sig);//用户态线程的当前状态enum thread_status{    ACTIVED = 0,//可运行的    BLOCKED,//被阻塞    SLEEP,//主动休眠    DIE,//死死亡};

typedef int (*uthread_func)(void*);

class Scheduler;class u_thread;

typedef struct{int index;//在Scheduler::threads中的下标    u_thread *p_uthread;    ucontext_t *p_context; }uthread_id;

/** 用户态线程*/class u_thread{    friend class Scheduler;private:    u_thread(unsigned int ssize,int index,uthread_id parent)        :ssize(ssize),_status(BLOCKED),parent_context(parent.p_context)        {            stack = new char[ssize];            ucontext.uc_stack.ss_sp = stack;            ucontext.uc_stack.ss_size = ssize;            getcontext(&ucontext);            uid.index = index;            uid.p_uthread = this;            uid.p_context = &ucontext;

        }    ~u_thread()    {        delete []stack;    }static void star_routine(int uthread,int func,int arg);

public:    ucontext_t* GetParentContext()    {return parent_context;    }    ucontext_t *GetContext()    {return &ucontext;    }void SetStatus(thread_status _status)    {this->_status = _status;    }    thread_status GetStatus()    {return _status;    }    uthread_id GetUid()    {return uid;    }

private:    ucontext_t ucontext;    ucontext_t *parent_context;//父亲的context    char *stack;//coroutine使用的栈    unsigned int ssize;//栈的大小    thread_status _status;    uthread_id uid;};

void BeginRun();bool cSpawn(uthread_func func,void *arg,unsigned int stacksize);int  cRecv(int sock,char *buf,int len);int  cSend(int sock,char *buf,int len);int  cListen(int sock);void cSleep(int t);

class beat;

/** 任务调度器*/class Scheduler{    friend class beat;    friend class u_thread;    friend void BeginRun();    friend bool cSpawn(uthread_func func,void *arg,unsigned int stacksize);    friend int  cRecv(int sock,char *buf,int len);    friend int  cSend(int sock,char *buf,int len);    friend int  cListen(int sock);    friend void cSleep(int t);public:static void scheduler_init();static void int_sig();

private:

//休眠time时间    static void sleep(int t);static void check_Network();static void schedule();static bool spawn(uthread_func func,void *arg,unsigned int stacksize);static int recv(int sock,char *buf,int len);static int send(int sock,char *buf,int len);static int listen(int sock);

private:static std::list<u_thread*> activeList;//可运行uthread列表

static std::list<std::pair<u_thread*,time_t> > sleepList;//正在睡眠uthread列表    static volatile bool block_signal; static  char stack[4096];

static     ucontext_t ucontext;//Scheduler的context

static uthread_id uid_current;//当前正获得运行权的context

static uthread_id uid_self;

static  u_thread *threads[MAX_UTHREAD];static  int total_count;static  int epollfd;const static int maxsize = 10;};/*心跳发射器,发射器必须运行在一个独立的线程中,以固定的间隔* 往所有运行着coroutine的线程发送中断信号*/class beat{public:    beat(unsigned int interval):interval(interval)    {}void setThread(pthread_t id)    {        thread_scheduler = id;    }void loop()    {while(true)        {//每隔固定时间向所有线程发中断信号            ::usleep(1000 * interval);while(1)            {if(!Scheduler::block_signal)                {                    pthread_kill(thread_scheduler,SIGUSR1);break;                }            }

        }    }private:    unsigned int interval;//发送中断的间隔(豪秒)    pthread_t thread_scheduler;};

bool initcoroutine(unsigned int interval);

#endif

uthread.cpp

#include "uthread.h"#include <assert.h>#include <stdlib.h>#include <time.h>#include <netinet/in.h>#include <sys/socket.h>#include <sys/time.h>#include <netdb.h>#include <stdio.h>#include <stdlib.h>#include <string.h>#include <unistd.h>#include <sys/epoll.h>#include "thread.h"

ucontext_t Scheduler::ucontext;

char Scheduler::stack[4096];

uthread_id Scheduler::uid_current;

uthread_id Scheduler::uid_self;

u_thread *Scheduler::threads[MAX_UTHREAD];

int Scheduler::total_count = 0;

int Scheduler::epollfd = 0;

volatile bool Scheduler::block_signal = true; 

std::list<u_thread*> Scheduler::activeList;

std::list<std::pair<u_thread*,time_t> > Scheduler::sleepList;

struct sock_struct{int sockfd;    u_thread *uThread;};

void int_signal_handler(int sig){    Scheduler::int_sig();}

void u_thread::star_routine(int uthread,int func,int arg){    u_thread *self_uthread = (u_thread *)uthread;    assert(self_uthread);

    self_uthread->SetStatus(ACTIVED);    ucontext_t *self_context = self_uthread->GetContext();    swapcontext(self_context,self_uthread->GetParentContext());    Scheduler::block_signal = false;

    uthread_func _func = (uthread_func)func;void *_arg = (void*)arg;int ret = _func(_arg);    self_uthread->SetStatus(DIE);}void Scheduler::scheduler_init(){for(int i = 0; i < MAX_UTHREAD; ++i)        threads[i] = 0;    getcontext(&ucontext);    ucontext.uc_stack.ss_sp = stack;    ucontext.uc_stack.ss_size = sizeof(stack);    ucontext.uc_link = NULL;

//scheduler占用下标0    makecontext(&ucontext,schedule, 0);    

    uid_self.index = 0;    uid_self.p_uthread = 0;    uid_self.p_context = &ucontext;    uid_current = uid_self;

}int Scheduler::listen(int sock){    u_thread *self_thread = uid_current.p_uthread;

    epoll_event ev;    sock_struct *ss = new sock_struct;    ss->uThread = self_thread;    ss->sockfd = sock;    ev.data.ptr = (void*)ss;    ev.events = EPOLLIN;int ret;    TEMP_FAILURE_RETRY(ret = epoll_ctl(epollfd,EPOLL_CTL_ADD,sock,&ev));if(ret != 0)    {return -1;    }

    self_thread->SetStatus(BLOCKED);    Scheduler::block_signal = true;    swapcontext(uid_current.p_context,&Scheduler::ucontext);    Scheduler::block_signal = false;return 1;}

int Scheduler::recv(int sock,char *buf,int len){if(!buf || !(len > 0))return -1;    u_thread *self_thread = uid_current.p_uthread;     sock_struct *ss = new sock_struct;    ss->uThread = self_thread;    ss->sockfd = sock;    epoll_event ev;    ev.data.ptr = (void*)ss;    ev.events = EPOLLIN;int ret;    TEMP_FAILURE_RETRY(ret = epoll_ctl(epollfd,EPOLL_CTL_ADD,sock,&ev));if(ret != 0)return -1;

    self_thread->SetStatus(BLOCKED);    Scheduler::block_signal = true;    swapcontext(uid_current.p_context,&Scheduler::ucontext);    printf("recv return/n");    Scheduler::block_signal = false;    ret = read(sock,buf,len);return ret;}int Scheduler::send(int sock,char *buf,int len){if(!buf || !(len > 0))return -1;

    u_thread *self_thread = uid_current.p_uthread; 

    sock_struct *ss = new sock_struct;    ss->uThread = self_thread;    ss->sockfd = sock;    epoll_event ev;    ev.data.ptr = (void*)ss;    ev.events = EPOLLOUT;int ret;    TEMP_FAILURE_RETRY(ret = epoll_ctl(epollfd,EPOLL_CTL_ADD,sock,&ev));if(ret != 0)return -1;    self_thread->SetStatus(BLOCKED);    Scheduler::block_signal = true;    swapcontext(uid_current.p_context,&Scheduler::ucontext);    Scheduler::block_signal = false;    ret = write(sock,buf,len);return ret;    }void Scheduler::check_Network(){        epoll_event events[maxsize];    sock_struct *ss;int nfds = TEMP_FAILURE_RETRY(epoll_wait(epollfd,events,maxsize,0));for( int i = 0 ; i < nfds ; ++i)    {//套接口可读        if(events[i].events & EPOLLIN)        {                 ss = (sock_struct*)events[i].data.ptr;            printf("a sock can read/n");            ss->uThread->SetStatus(ACTIVED);            epoll_event ev;            ev.data.fd = ss->sockfd;if(0 != TEMP_FAILURE_RETRY(epoll_ctl(epollfd,EPOLL_CTL_DEL,ev.data.fd,&ev)))            {                printf("error here/n");                exit(0);            }            delete ss;continue;        }//套接口可写        if(events[i].events & EPOLLOUT)        {            ss = (sock_struct*)events[i].data.ptr;            printf("a sock can write/n");            ss->uThread->SetStatus(ACTIVED);            epoll_event ev;            ev.data.fd = ss->sockfd;            TEMP_FAILURE_RETRY(epoll_ctl(epollfd,EPOLL_CTL_DEL,ev.data.fd,&ev));            delete ss;continue;        }    }}void Scheduler::schedule(){    epollfd = TEMP_FAILURE_RETRY(epoll_create(maxsize)); 

if(epollfd<= 0)    {        printf("epoll init error/n");return;    }

while(total_count > 0)    {

//首先执行active列表中的uthread        std::list<u_thread*>::iterator it = activeList.begin();         std::list<u_thread*>::iterator end = activeList.end();for( ; it != end; ++it)        {if(*it && (*it)->GetStatus() == ACTIVED)            {                uid_current = (*it)->GetUid();                swapcontext(&ucontext,uid_current.p_context);                uid_current = uid_self;

int index = (*it)->GetUid().index;if((*it)->GetStatus() == DIE)                {                    printf("%d die/n",index);                    delete threads[index];                    threads[index] = 0;                    --total_count;                    activeList.erase(it);break;                }else if((*it)->GetStatus() == SLEEP)                {                    printf("%d sleep/n",index);                    activeList.erase(it);break;                }            }        }//检查网络,看看是否有套接口可以操作        check_Network();//看看Sleep列表中是否有uthread该醒来了        std::list<std::pair<u_thread*,time_t> >::iterator its = sleepList.begin();        std::list<std::pair<u_thread*,time_t> >::iterator ends = sleepList.end();        time_t now = time(NULL);for( ; its != ends; ++its)        {//可以醒来了            if(now >= its->second)            {                    u_thread *uthread = its->first;                uthread->SetStatus(ACTIVED);                activeList.push_back(uthread);                sleepList.erase(its);break;            }        }    }    printf("scheduler end/n");}

bool Scheduler::spawn(uthread_func func,void *arg,unsigned int stacksize){    printf("uthread_create/n");if(total_count >= MAX_UTHREAD)return false;int i = 1;for( ; i < MAX_UTHREAD; ++i)    {if(threads[i] == 0)        {            threads[i] = new u_thread(stacksize,i,uid_current);            ++total_count;            ucontext_t *cur_context = threads[i]->GetContext();            activeList.push_back(threads[i]);

            cur_context->uc_link = &ucontext;            makecontext(cur_context,(void (*)())u_thread::star_routine, 3,(int)&(*threads[i]),(int)func,(int)arg);            swapcontext(uid_current.p_context, cur_context);            printf("return from parent/n");return true;        }    }

return false;}void Scheduler::sleep(int t){    u_thread *self_thread = uid_current.p_uthread; 

    time_t now = time(NULL);    now += t;//插入到sleep列表中    sleepList.push_back(std::make_pair(self_thread,now));

//保存当前上下文切换回scheduler    self_thread->SetStatus(SLEEP);

    ucontext_t *cur_context = self_thread->GetContext();    Scheduler::block_signal = true;    swapcontext(cur_context,&Scheduler::ucontext);    Scheduler::block_signal = false;}void Scheduler::int_sig(){//printf("Scheduler::int_sig()%x/n",uid_current.p_context);    Scheduler::block_signal = true;    swapcontext(uid_current.p_context,&Scheduler::ucontext);    Scheduler::block_signal = false;}

class HeartBeat : public runnable{public:    HeartBeat(unsigned int interval)    {        _beat = new beat(interval);        _beat->setThread(pthread_self());    }

    ~HeartBeat()    {        delete _beat;    }

bool run()    {        _beat->loop();return true;    }

private:    beat *_beat;};

bool initcoroutine(unsigned int interval){//初始化信号    struct sigaction sigusr1;    sigusr1.sa_flags = 0;    sigusr1.sa_handler = int_signal_handler;    sigemptyset(&sigusr1.sa_mask);int status = sigaction(SIGUSR1,&sigusr1,NULL);if(status == -1)    {        printf("error sigaction/n");return false;    }

//首先初始化调度器    Scheduler::scheduler_init();

//启动心跳    static HeartBeat hb(interval);static Thread c(&hb);    c.start();return true;}

void BeginRun(){    Scheduler::schedule();}

bool cSpawn(uthread_func func,void *arg,unsigned int stacksize){return Scheduler::spawn(func,arg,stacksize);    }

int cRecv(int sock,char *buf,int len){return Scheduler::recv(sock,buf,len);    }

int cSend(int sock,char *buf,int len){return Scheduler::send(sock,buf,len);    }

int cListen(int sock){return Scheduler::listen(sock);}

void cSleep(int t){return Scheduler::sleep(t);}

echoserver.c

// kcoroutine.cpp : 定义控制台应用程序的入口点。//#include "uthread.h"#include "thread.h"int port;int test(void *arg){char *name = (char*)arg;    unsigned long c = 0;while(1)    {if(c % 10000 == 0)        {            printf("%d/n",c);            cSleep(1);        }        ++c;    }}int echo(void *arg){int sock = *(int*)arg;while(1)    {char buf[1024];int ret = cRecv(sock,buf,1024);if(ret > 0)        {            printf("%s/n",buf);            ret = cSend(sock,buf,ret);        }    }}int listener(void *arg){struct sockaddr_in servaddr;int listenfd;if(0 > (listenfd = Tcp_Listen("127.0.0.1",port,servaddr,5)))    {        printf("listen error/n");return 0;    }while(1)    {if(cListen(listenfd) > 0)        {            printf("a user comming/n");struct sockaddr_in cliaddr;            socklen_t len;int sock = Accept(listenfd,(struct sockaddr*)NULL,NULL);if(sock >= 0)            {                cSpawn(echo,&sock,4096);            }

        }    }    }int main(int argc, char **argv){    port = atoi(argv[1]);if(!initcoroutine(20))return 0;    cSpawn(listener,0,4096);char name[10] = "test";    cSpawn(test,name,4096);

    printf("create finish/n");

//开始调度线程的运行    BeginRun();return 0;}

运行后会看到控制台中不断的输出1,那是runable_test在工作,

telnet几个客户端上去,就可以看到echo的效果了,总体来看效果还不错,

不过context的切换效率还没有测试过.

转载于:https://www.cnblogs.com/sniperHW/archive/2012/04/02/2429644.html

ucontext实现的用户级多线程框架3(实现echo服务器)相关推荐

  1. 内核级线程(KLT)和用户级线程(ULT)

    文章目录 进程和线程 内核级线程(Kemel-Level Threads, KLT 也有叫做内核支持的线程) 纯内核级线程特点 用户级线程(User-Level Threads ULT) 纯用户级线程 ...

  2. linux ucontext 类型,协程:posix::ucontext用户级线程实现原理分析 | WalkerTalking

    在听完leader的课程后,对其中协程的实现方式有了基本的了解,无论的POSIX的ucontex,boost::fcontext,还是libco,都是通过保存和恢复寄存器状态,来进行各个协程上下文的保 ...

  3. 利用用户级线程提高多线程应用的性能

    随着处理器往多核的发展,多线程被越来越多的应用到软件的开发中.但是如果没有正确的使用多线程,反而可能会导致软件性能的下降. 多线程程序中一个影响程序性能的因素就是同步.对于windows系统来说,最快 ...

  4. 微信亿级用户异常检测框架的设计与实践

    微信亿级用户异常检测框架的设计与实践 参考文章: (1)微信亿级用户异常检测框架的设计与实践 (2)https://www.cnblogs.com/qcloud1001/p/8351385.html ...

  5. Linux下的LWP(轻量级进程)、进程 、 线程、用户级线程、内核线程

    一.定义 再看正文之前我要先强调一下几点: 1. Linux中没有真正的线程,但windows中确实有线程 2. Linux中没有的线程是由进程来模拟实现的(又称作:轻量级进程) 3. 所以在Linu ...

  6. 百万用户级游戏服务器架构设计

    百万用户级游戏服务器架构设计 服务器结构探讨 -- 最简单的结构 所谓服务器结构,也就是如何将服务器各部分合理地安排,以实现最初的功能需求.所以,结构本无所谓正确与错误:当然,优秀的结构更有助于系统的 ...

  7. 百万用户级游戏服务器架构设计与游戏视频开发平台源码分享

    服务器结构探讨 -- 最简单的结构 所谓服务器结构,也就是如何将服务器各部分合理地安排,以实现最初的功能需求.所以,结构本无所谓正确与错误:当然,优秀的结构更有助于系统的搭建,对系统的可扩展性及可维护 ...

  8. 百万用户级游戏服务器架构介绍

    服务器结构探讨 -- 最简单的结构 所谓服务器结构,也就是如何将服务器各部分合理地安排,以实现最初的功能需求.所以,结构本无所谓正确与错误:当然,优秀的结构更有助于系统的搭建,对系统的可扩展性及可维护 ...

  9. 应用退出前不让线程切换_用户级线程和内核级线程,你分清楚了吗?

    前天晚上有个伙伴私信我说在学进程和线程,问我有没有好的方法和学习教程,刚好我最近也在备相关的课. 班上不少学生学的还是很不错的.拿班上小白和小明的例子吧(艺名哈).小明接受能力很强,小白则稍差些. 关 ...

最新文章

  1. 13款基于jQuery Mobile的布局插件和示例
  2. 接口经常超时?线程池+ FutureTask来解决!
  3. 网络工程中,VLAN到底有什么作用?
  4. 刷爆了!GitHub标星1.6W,这个 Python 项目太实用!
  5. SMT32H743+CubeMX-配置MPU后,在Keil上的程序卡死
  6. 小爱同学app安卓版_小爱课程表3.0全新升级 课表倒入更简单所有手机能用
  7. 基于visual Studio2013解决面试题之0410计算二进制中1的个数
  8. 【概率论基础】机器学习领域必知必会的12种概率分布(附Python代码实现)
  9. AndroidStudio修改布局文件运行无效
  10. c# RestClient 请求接口
  11. Opencv相机校准之棋盘格标定
  12. java里面有radix树吗_基数树(radix tree)
  13. 微信小程序学习之路——API用户信息
  14. 蓝桥java练习记录
  15. Unity中的角色属性芒星比例图
  16. barman备份的配置使用(备份+恢复)
  17. MATLAB求单位阶跃响应,并分析参量的影响。自控例题。
  18. 【Ubuntu 20.04 LTS】设置笔记本合并盖子不休眠
  19. 二分法查找(dichotomy)--python实现
  20. 【VCS】Git之无尽探索

热门文章

  1. 怎么关闭wordpress评论
  2. vc6静态库的生成和调用
  3. 【GStreamer】gstreamer工具详解之:gst-discoverer-1.0
  4. #每天一种设计模式# 观察者模式
  5. ue4蓝图节点手册中文_在UE4中播放视频
  6. python tkinter选择路径控件_Python3 Tkinter选择路径功能的实现方法
  7. python编写脚本方法_【Python】教你一步步编写banner获取脚本
  8. php好的mvc中index方法,创建一个mvc应用目录架构并创建入口文件index.php
  9. 【java】maven工程使用switch时不能使用String解决方法
  10. Express应用配置端口