协程是什么

协程是用于解决IO密集型业务的轻量级框架。一个项目用到的IO读写非常多而且操作频繁,操作系统多次进行系统调用时,多个IO读写会出现其中一个IO如果出现长时间阻塞的时候,其他读写已经就绪的IO就无法操作,需要等待IO同步完成,降低效率和性能。为此出现协程这一东西,使用了同步的方式,做到了异步的性能和效率

同步:检测IO和读写IO位于同一个流程。
异步:检测IO和读写IO不位于同一流程。

协程框架都有什么

协程用来干嘛

协程这里可以理解成线程,不过区别于线程位于进程中是并行执行,协程是在线程中串行执行,并非异步,只不过是将协程中的IO设置成非阻塞当需要IO同步操作的时候将IO(协程)统一交由调度器使用epoll(或poll)进行管理检测,当epoll发现IO可以读或者可以写的时候,调度器让出CPU恢复协程的运行。协程也有类似于线程的内容,拥有自己id,创建时间,fd事件,栈大小,协程状态以及对应队列(睡眠、等待、就绪、结束等等),所属调度器,切换现场时的寄存器值等等。

协程栈和线程栈的关系

协程栈是划分线程栈的,协程栈有两种划分:

  • 独立栈:一个个协程按区域划分线程的栈,这里划分默认一个协程栈大小是4k。
  • 共享栈:所有协程共用一个线程栈,这样实现需要加锁,因为共享栈带来的问题就是当多个协程对同一个内存(IO)读写的时候会出现数据读写顺序错乱。所以需要加锁,加互斥锁和自旋锁。

协程的栈如何分配

int ret = posix_memalign(&co->stack, getpagesize(), sched->stack_size); //协程和调度器的栈大小一样

协程定义

typedef struct _nty_coroutine {//privatenty_cpu_ctx ctx; //协程的寄存器proc_coroutine func; //协程实现的内容void *arg;            //用于传参,调用协程的时候传入arg,协程进入的地方co->func(arg),位于_exec;void *data;            //协程数据size_t stack_size;    //栈size_t last_stack_size;  //协程栈的大小nty_coroutine_status status; //协程所处于的状态nty_schedule *sched; //协程所处于的状态uint64_t birth;       //协程创建的时间,使用的是gettimeofdayuint64_t id;           //协程id
#if CANCEL_FD_WAIT_UINT64int fd;unsigned short events;  //POLL_EVENT
#elseint64_t fd_wait;
#endifchar funcname[64];struct _nty_coroutine *co_join;void **co_exit_ptr;void *stack;void *ebp;uint32_t ops;uint64_t sleep_usecs;      //睡眠时间,睡多久RB_ENTRY(_nty_coroutine) sleep_node; //睡眠红黑树RB_ENTRY(_nty_coroutine) wait_node; //等待红黑树LIST_ENTRY(_nty_coroutine) busy_next;TAILQ_ENTRY(_nty_coroutine) ready_next; //就绪队列TAILQ_ENTRY(_nty_coroutine) defer_next;TAILQ_ENTRY(_nty_coroutine) cond_next;TAILQ_ENTRY(_nty_coroutine) io_next;TAILQ_ENTRY(_nty_coroutine) compute_next;struct {void *buf;size_t nbytes;int fd;int ret;int err;} io; //io的内容struct _nty_coroutine_compute_sched *compute_sched;int ready_fds;struct pollfd *pfds;  //对应epollfdnfds_t nfds;
} nty_coroutine;

创建并初始化协程

创建协程:

  1. 先获取系统当前的调度器
  2. 为协程分配内存
  3. 分配协程栈(posix_memalign)
  4. 给协程赋值属性
  5. 将协程加入就绪队列
int qs_coroutine_create(qs_coroutine **new_co, proc_coroutine func, void *arg){//部分代码/*1、先获取调度器*/assert(pthread_once(&sched_key_once, qs_coroutine_sched_key_creator)==0);//确保qs_coroutine_sched_key_creator 只被执行一次qs_schedule *sched = qs_coroutine_get_sched();//通过pthread_getspecific获取全局键值调度器if(sched == NULL){ //如果调度器未被创建或者获取失败qs_schedule_create(0);  //重新创建if(sched==NULL){printf("Failed to create shceduler\n");return -1;}}/*2、为协程分配内存*//*3、分配协程栈*/int ret = posix_memalign(&co->stack, getpagesize(), sched->stack_size); //将协程栈分配为一个页的大小/*4、给协程赋值属性*/   /*5、将协程加入就绪队列*/TALIQ_INSERT_TAIL(&co->shced->ready, co, ready_next); //将协程加入就绪队列
}

初始化协程

  1. 获取协程栈顶
  2. 将协程结构体作为参数压入栈中
  3. 确定协程的栈顶、栈底指针
  4. 将协程的寄存器结构体的**eip指令指针_exec,_exec最后会进入协程函数。**co->func(arg),位于_exec中;
  5. 将协程状态设置为就绪
static void nty_coroutine_init(nty_coroutine *co) {void **stack = (void **)(co->stack + co->stack_size);     //栈底,高地址stack[-3] = NULL;                   //stack[-2] = (void *)co;          //参数co->ctx.esp = (void*)stack - (4 * sizeof(void*)); co->ctx.ebp = (void*)stack - (3 * sizeof(void*));co->ctx.eip = (void*)_exec;co->status = BIT(NTY_COROUTINE_STATUS_READY);
}

调度器用来干嘛

调度器的工作内容主要有两个

  1. 检测协程的IO
  2. 调度睡眠、就绪、等待区中的协程

调度器定义

typedef struct _nty_schedule {uint64_t birth;    //调度器创建时间nty_cpu_ctx ctx;   //CPU寄存器void *stack;        //栈size_t stack_size;   int spawned_coroutines; //正在调度的协程数量uint64_t default_timeout;    //默认超时时间struct _nty_coroutine *curr_thread;     //正在运行的协程int page_size;int poller_fd;       //epollfdint eventfd;           //eventstruct epoll_event eventlist[NTY_CO_MAX_EVENTS]; //epoll事件集合int nevents;         //events数量int num_new_events;pthread_mutex_t defer_mutex;nty_coroutine_queue ready;     //就绪队列nty_coroutine_queue defer;nty_coroutine_link busy;nty_coroutine_rbtree_sleep sleeping;    //睡眠区,红黑树nty_coroutine_rbtree_wait waiting;      //等待红黑树
#if COROUTINE_MP
#endif
//private
} nty_schedule;

调度器如何检测协程的IO

1、协程遇到IO读写之前会将自己的fd添加到poll或者epoll

  1. 如果需要阻塞检测这个协程的IO时,就使用poll,若不是则使用epoll
  2. 随后获取调度器中的epollfd,并设置epoll属性,然后添加自己的fd到epoll中。
  3. 添加之后将协程结构体放入睡眠红黑树和等待红黑树上。
  4. 其中加入睡眠红黑树时,会设置睡眠时间睡眠时间=当前的时间 - 调度器创建的时间 + timeout=timeout
  5. 如果树上出现相同时间则timeout++(很微小),防止冲突
  6. 最后协程保留现场将CPU让出给调度器。
  7. 当调度器将CPU给回协程时将协程从睡眠树和等待树上移除,将fd从epollfd中移除
static int nty_poll_inner(struct pollfd *fds, nfds_t nfds, int timeout) {if (timeout == 0)return poll(fds, nfds, timeout);     //如果需要阻塞检测这个协程的IOif (timeout < 0)timeout = INT_MAX;nty_schedule *sched = nty_coroutine_get_sched();    //获取调度器nty_coroutine *co = sched->curr_thread;for (int i = 0;i < nfds;i ++) {         //将fd添加到epoll中进行管理struct epoll_event ev;ev.events = nty_pollevent_2epoll(fds[i].events);ev.data.fd = fds[i].fd;epoll_ctl(sched->poller_fd, EPOLL_CTL_ADD, fds[i].fd, &ev);co->events = fds[i].events;nty_schedule_sched_wait(co, fds[i].fd, fds[i].events, timeout);  //将协程加入睡眠和等待红黑树上}nty_coroutine_yield(co);        //让出CPU/*调度器将CPU给回协程*/for (i = 0;i < nfds;i ++) {     //将fd从epoll中移除struct epoll_event ev;ev.events = nty_pollevent_2epoll(fds[i].events);ev.data.fd = fds[i].fd;epoll_ctl(sched->poller_fd, EPOLL_CTL_DEL, fds[i].fd, &ev);nty_schedule_desched_wait(fds[i].fd);            //将协程从等待和睡眠红黑树上移除}return nfds;

2、调度器如何管理epoll

  1. 先计算出epoll的等待时间
  2. 如果就绪队列为空就进行epoll_wait
  3. 等到了调度器就保留信息
static int nty_schedule_epoll(nty_schedule *sched) {sched->num_new_events = 0;struct timespec t = {0, 0};uint64_t usecs = nty_schedule_min_timeout(sched); //先取出等待列中时间最小的协程,//然后计算出当前时间距离最小时间还有多久并返回,如果等待列中没有协程则返回0if (usecs && TAILQ_EMPTY(&sched->ready)) {  //如果就绪就绪队列是空的,就返回0,让就绪队列执行t.tv_sec = usecs / 1000000u;if (t.tv_sec != 0) {t.tv_nsec = (usecs % 1000u) * 1000u;} else {t.tv_nsec = usecs * 1000u;}} else {return 0;}int nready = 0;while (1) {nready = nty_epoller_wait(t);  //epoll_wait, t是距离最近协程的时间if (nready == -1) {if (errno == EINTR) continue;else assert(0);}break;}//等到了调度器就保留epoll的信息sched->nevents = 0;sched->num_new_events = nready;return 0;
}

重点:调度器如何调度正在等待、睡眠、就绪的协程?

void nty_schedule_run(void)

睡眠

根据红黑树上协程设置的睡眠时间来判断是否要恢复现场。

     nty_coroutine *expired = NULL;while ((expired = nty_schedule_expired(sched)) != NULL) {  //获取已经完成睡眠时间的协程nty_coroutine_resume(expired);   //恢复协程运行}
static nty_coroutine *nty_schedule_expired(nty_schedule *sched) {uint64_t t_diff_usecs = nty_coroutine_diff_usecs(sched->birth, nty_coroutine_usec_now());nty_coroutine *co = RB_MIN(_nty_coroutine_rbtree_sleep, &sched->sleeping); //拿最小的if (co == NULL) return NULL;if (co->sleep_usecs <= t_diff_usecs) {   //如果已经到了设置的时间返回coRB_REMOVE(_nty_coroutine_rbtree_sleep, &co->sched->sleeping, co);return co;}return NULL;
}

就绪

很直观,如果就绪队列上有就绪协程就直接恢复运行。

     nty_coroutine *last_co_ready = TAILQ_LAST(&sched->ready, _nty_coroutine_queue);while (!TAILQ_EMPTY(&sched->ready)) {nty_coroutine *co = TAILQ_FIRST(&sched->ready);TAILQ_REMOVE(&co->sched->ready, co, ready_next);if (co->status & BIT(NTY_COROUTINE_STATUS_FDEOF)) {nty_coroutine_free(co);break;}nty_coroutine_resume(co);if (co == last_co_ready) break;}

等待

使用epoll进行管理,文章上面也提到了。使用epoll获取已经就绪的fd,然后通过fd再红黑树上查找到对应的协程,最后恢复协程运行。

     nty_schedule_epoll(sched);      //epollwhile (sched->num_new_events) {//如果epoll检测到有IO就绪int idx = --sched->num_new_events;struct epoll_event *ev = sched->eventlist+idx;int fd = ev->data.fd;int is_eof = ev->events & EPOLLHUP;if (is_eof) errno = ECONNRESET;nty_coroutine *co = nty_schedule_search_wait(fd);if (co != NULL) {if (is_eof) {co->status |= BIT(NTY_COROUTINE_STATUS_FDEOF);}nty_coroutine_resume(co);}is_eof = 0;}

至此,调度器对协程的所有调度就这么多。

总的来说

总的来说协程和调度器的运行关系就是

  1. 一旦协程遇到IO就保留现场,将fd交由epoll管理,并让出CPU给调度器。
  2. 调度器对等待区、就绪区、睡眠区的协程进行管理。
  3. 对等待区的协程是使用epoll管理,如果协程的IO就绪了,就恢复运行协程。
  4. 对睡眠区的协程是使用睡眠时间管理,如果时间到了就恢复协程运行。
  5. 对就绪区的协程是就绪队列有协程就直接运行协程。

协程和调度器之间如何切换工作环境

将工作环境保存再协程和调度器内部,工作环境就是指CPU的寄存器,这里使用一个结构体。

typedef struct _nty_cpu_ctx {void *esp; //void *ebp;void *eip;void *edi;void *esi;void *ebx;void *r1;void *r2;void *r3;void *r4;void *r5;
} nty_cpu_ctx;

实现切换就是改变寄存器的值,使用mov指令。先保留当前的CPU寄存器,再将新的CPU覆盖原来的寄存器。

#elif defined(__x86_64__)
__asm__ (
"    .text                                  \n"
"       .p2align 4,,15                                   \n"
".globl _switch                                          \n"
".globl __switch                                         \n"
"_switch:                                                \n"
"__switch:                                               \n"    "#保留旧的工作环境         "
"#将rsi(cur_ctx)放入寄存器"
" rsi-->cur_ctx  数字是对应偏移字节数""       movq %rsp, 0(%rsi)      # esp = rsp   栈顶    \n"
"       movq %rbp, 8(%rsi)      # ebp = rbp   栈底     \n"
"#rsi 偏移8,  对应cur_ctx   的第2个值栈底指针"
"#rsi 偏移0,  对应cur_ctx   的第1个值 栈顶指针""       movq (%rsp), %rax       # save insn_pointer      \n"
"#将栈顶指针赋值给函数返回值,ret后出栈,执行栈顶""#rax = eip 函数返回值"
"#保留rbx r12-r15"
"       movq %rax, 16(%rsi)                              \n"
"       movq %rbx, 24(%rsi)     # 保存rbx,r12-r15 \n"
"       movq %r12, 32(%rsi)                              \n"
"       movq %r13, 40(%rsi)                              \n"
"       movq %r14, 48(%rsi)                              \n"
"       movq %r15, 56(%rsi)                              \n"    "#新的工作环境"
" rdi-->new_ctx    "
"       movq 56(%rdi), %r15                              \n"
"       movq 48(%rdi), %r14                              \n"
"       movq 40(%rdi), %r13     # restore rbx,r12-r15    \n"
"       movq 32(%rdi), %r12                              \n"
"       movq 24(%rdi), %rbx                              \n"
"       movq 8(%rdi), %rbp      # restore frame_pointer  \n"
"       movq 0(%rdi), %rsp      # restore stack_pointer  \n"
"       movq 16(%rdi), %rax     # restore insn_pointer   \n" "#将函数返回值赋值给栈顶寄存器"
"       movq %rax, (%rsp)                                \n""#跳转执行栈顶,即进入新的工作空间"
"       ret                                              \n"
);
#endif

hook重写系统调用

使用dlsym,这部分实现简单。具体实现看代码,主要是将socket设置成非阻塞的,socket地址可以复用。

int init_hook(void) {socket_f = (socket_t)dlsym(RTLD_NEXT, "socket");//read_f = (read_t)dlsym(RTLD_NEXT, "read");recv_f = (recv_t)dlsym(RTLD_NEXT, "recv");recvfrom_f = (recvfrom_t)dlsym(RTLD_NEXT, "recvfrom");//write_f = (write_t)dlsym(RTLD_NEXT, "write");send_f = (send_t)dlsym(RTLD_NEXT, "send");sendto_f = (sendto_t)dlsym(RTLD_NEXT, "sendto");accept_f = (accept_t)dlsym(RTLD_NEXT, "accept");close_f = (close_t)dlsym(RTLD_NEXT, "close");connect_f = (connect_t)dlsym(RTLD_NEXT, "connect");
}

代码自行Git

git@github.com:qiushii/fastDFS.git

【协程】MyCoroutine轻量级协程框架代码详细剖解相关推荐

  1. Python 进程、线程、协程傻傻分不清楚?详细总结(附代码)

    目录 1 什么是并发编程? 2 进程与多进程 3 线程与多线程 4 协程与多协程 5 总结 1 什么是并发编程? 并发编程是实现多任务协同处理,改善系统性能的方式.Python中实现并发编程主要依靠 ...

  2. 干货 | 携程基于Quasar协程的NIO实践

    作者简介 Ryan,携程Java开发工程师,对高并发.网络编程等领域有浓厚兴趣. IO密集型系统在高并发场景下,会有大量线程处于阻塞状态,性能低下,JAVA上成熟的非阻塞IO(NIO)技术可解决该问题 ...

  3. 携程基于Quasar协程的NIO实践

    IO密集型系统在高并发场景下,会有大量线程处于阻塞状态,性能低下,JAVA上成熟的非阻塞IO(NIO)技术可解决该问题.目前Java项目对接NIO的方式主要依靠回调,代码复杂度高,降低了代码可读性与可 ...

  4. java 修改最大nio连接数_携程基于Quasar协程的NIO实践

    IO密集型系统在高并发场景下,会有大量线程处于阻塞状态,性能低下,JAVA上成熟的非阻塞IO(NIO)技术可解决该问题.目前Java项目对接NIO的方式主要依靠回调,代码复杂度高,降低了代码可读性与可 ...

  5. python多线程调用携程,Python 协程,Python携程

    Python 协程,Python携程 协程 进程:操作系统中存在 线程:操作系统中存在 协程:是微线程 模块(greenlet) 协程不是一个真实存在的东西,是由程序员创造出来的 协程,是对一个线程分 ...

  6. 深入浅出c++协程丨C++协程实现

    1|0一些实现的c++协程 C++协程实现相关视频讲解:(视频代码资料点击 正在跳转 获取) 协程的实现与原理剖析(上) 协程的实现与原理剖析(下) 协程是一种函数对象,可以设置锚点做暂停,然后再该锚 ...

  7. python 协程可以嵌套协程吗_Python线程、协程探究(2)——揭开协程的神秘面纱...

    一.上集回顾 在上一篇中我们主要研究了python的多线程困境,发现多核情况下由于GIL的存在,python的多线程程序无法发挥多线程该有的并行威力.在文章的结尾,我们提出如下需求: 既然python ...

  8. python 协程可以嵌套协程吗_Python | 详解Python中的协程,为什么说它的底层是生成器?...

    今天是Python专题的第26篇文章,我们来聊聊Python当中的协程. 我们曾经在golang关于goroutine的文章当中简单介绍过协程的概念,我们再来简单review一下.协程又称为是微线程, ...

  9. python 协程库_python --- 协程编程(第三方库gevent的使用)

    1. 什么是协程? 协程(coroutine),又称微线程.协程不是线程也不是进程,它的上下文关系切换不是由CPU控制,一个协程由当前任务切换到其他任务由当前任务来控制.一个线程可以包含多个协程,对于 ...

最新文章

  1. python安装步骤电脑版-超详细的小白python3.X安装教程|Python安装
  2. Android 经典欧美小游戏 guess who
  3. 收藏 | 9 个技巧让你的 PyTorch 模型训练变得飞快!
  4. 《机器学习实战》学习笔记第七章 —— AdaBoost元算法
  5. 6. Zend_Uri
  6. mysql rownum写法_mysql类似oracle rownum写法实例详解
  7. Chrome 里的小恐龙游戏是怎么做出来的?
  8. 百度地图三维效果实现
  9. 【2016NOIP十连测】【test4】【状压DP】【容斥原理】巨神兵
  10. 通过一道CTF题,学习pillow模块切割、合并图片
  11. python怎么输出变量加文字书名_python的交互模式怎么输出名文汉字
  12. 云开发电商小程序实战教程-篇首语
  13. fifa18怎么改服务器位置,fifa18 球员职业生涯怎么改位置 | 手游网游页游攻略大全...
  14. Pycharm Debugger - Frames Not Available
  15. python爬取微博数据词云_爬虫篇:使用Python动态爬取某大V微博,再用词云分析...
  16. The Generalized Detection Method for the Dim Small Targets by Faster R-CNN Integrated with GAN 论文翻译
  17. PMOS管/NMOS管控制供电电路
  18. [Ynoi2015]纵使日薄西山
  19. webstorm快捷键问题,求大神赐教
  20. 教育数据大全 1949-2021年全国省级地级市人力资本受教育年限 上市公司教育背景学历结构 教育支出 学校教职工学生

热门文章

  1. bsoj 1512 金明的预算方案(树型DP)
  2. 超出预算,他的处理的方式对吗? | 每天成就更大成功
  3. JavaScript高级第04天笔记
  4. 最新MTK芯片型号汇总,MTK开发资料大全下载
  5. 开心下单助手v1.0免费版
  6. 计算机系女学霸男生追,杨紫李现解锁恋爱新姿势:吃最甜的糖,追最燃的梦
  7. Markdown——图片、文字显示居中的一种方法
  8. 计算机视觉和模式识别的code
  9. 计算机毕业设计ssm+vue基本微信小程序的拼车自助服务小程序-网约车拼车系统
  10. 知识工程重点知识介绍-1