【协程】MyCoroutine轻量级协程框架代码详细剖解
协程是什么
协程是用于解决IO密集型业务的轻量级框架。一个项目用到的IO读写非常多而且操作频繁,操作系统多次进行系统调用时,多个IO读写会出现其中一个IO如果出现长时间阻塞的时候,其他读写已经就绪的IO就无法操作,需要等待IO同步完成,降低效率和性能。为此出现协程这一东西,使用了同步的方式,做到了异步的性能和效率。
同步:检测IO和读写IO位于同一个流程。
异步:检测IO和读写IO不位于同一流程。
协程框架都有什么
协程用来干嘛
协程这里可以理解成线程,不过区别于线程位于进程中是并行执行,协程是在线程中串行执行,并非异步,只不过是将协程中的IO设置成非阻塞,当需要IO同步操作的时候,将IO(协程)统一交由调度器使用epoll(或poll)进行管理检测,当epoll发现IO可以读或者可以写的时候,调度器让出CPU恢复协程的运行。协程也有类似于线程的内容,拥有自己id,创建时间,fd,事件,栈,栈大小,协程状态以及对应队列(睡眠、等待、就绪、结束等等),所属调度器,切换现场时的寄存器值等等。
协程栈和线程栈的关系
协程栈是划分线程栈的,协程栈有两种划分:
- 独立栈:一个个协程按区域划分线程的栈,这里划分默认一个协程栈大小是4k。
- 共享栈:所有协程共用一个线程栈,这样实现需要加锁,因为共享栈带来的问题就是当多个协程对同一个内存(IO)读写的时候会出现数据读写顺序错乱。所以需要加锁,加互斥锁和自旋锁。
协程的栈如何分配
int ret = posix_memalign(&co->stack, getpagesize(), sched->stack_size); //协程和调度器的栈大小一样
协程定义
typedef struct _nty_coroutine {//privatenty_cpu_ctx ctx; //协程的寄存器proc_coroutine func; //协程实现的内容void *arg; //用于传参,调用协程的时候传入arg,协程进入的地方co->func(arg),位于_exec;void *data; //协程数据size_t stack_size; //栈size_t last_stack_size; //协程栈的大小nty_coroutine_status status; //协程所处于的状态nty_schedule *sched; //协程所处于的状态uint64_t birth; //协程创建的时间,使用的是gettimeofdayuint64_t id; //协程id
#if CANCEL_FD_WAIT_UINT64int fd;unsigned short events; //POLL_EVENT
#elseint64_t fd_wait;
#endifchar funcname[64];struct _nty_coroutine *co_join;void **co_exit_ptr;void *stack;void *ebp;uint32_t ops;uint64_t sleep_usecs; //睡眠时间,睡多久RB_ENTRY(_nty_coroutine) sleep_node; //睡眠红黑树RB_ENTRY(_nty_coroutine) wait_node; //等待红黑树LIST_ENTRY(_nty_coroutine) busy_next;TAILQ_ENTRY(_nty_coroutine) ready_next; //就绪队列TAILQ_ENTRY(_nty_coroutine) defer_next;TAILQ_ENTRY(_nty_coroutine) cond_next;TAILQ_ENTRY(_nty_coroutine) io_next;TAILQ_ENTRY(_nty_coroutine) compute_next;struct {void *buf;size_t nbytes;int fd;int ret;int err;} io; //io的内容struct _nty_coroutine_compute_sched *compute_sched;int ready_fds;struct pollfd *pfds; //对应epollfdnfds_t nfds;
} nty_coroutine;
创建并初始化协程
创建协程:
- 先获取系统当前的调度器
- 为协程分配内存
- 分配协程栈(posix_memalign)
- 给协程赋值属性
- 将协程加入就绪队列
int qs_coroutine_create(qs_coroutine **new_co, proc_coroutine func, void *arg){//部分代码/*1、先获取调度器*/assert(pthread_once(&sched_key_once, qs_coroutine_sched_key_creator)==0);//确保qs_coroutine_sched_key_creator 只被执行一次qs_schedule *sched = qs_coroutine_get_sched();//通过pthread_getspecific获取全局键值调度器if(sched == NULL){ //如果调度器未被创建或者获取失败qs_schedule_create(0); //重新创建if(sched==NULL){printf("Failed to create shceduler\n");return -1;}}/*2、为协程分配内存*//*3、分配协程栈*/int ret = posix_memalign(&co->stack, getpagesize(), sched->stack_size); //将协程栈分配为一个页的大小/*4、给协程赋值属性*/ /*5、将协程加入就绪队列*/TALIQ_INSERT_TAIL(&co->shced->ready, co, ready_next); //将协程加入就绪队列
}
初始化协程
- 获取协程栈顶
- 将协程结构体作为参数压入栈中
- 确定协程的栈顶、栈底指针
- 将协程的寄存器结构体的**eip指令指针_exec,_exec最后会进入协程函数。**co->func(arg),位于_exec中;
- 将协程状态设置为就绪
static void nty_coroutine_init(nty_coroutine *co) {void **stack = (void **)(co->stack + co->stack_size); //栈底,高地址stack[-3] = NULL; //stack[-2] = (void *)co; //参数co->ctx.esp = (void*)stack - (4 * sizeof(void*)); co->ctx.ebp = (void*)stack - (3 * sizeof(void*));co->ctx.eip = (void*)_exec;co->status = BIT(NTY_COROUTINE_STATUS_READY);
}
调度器用来干嘛
调度器的工作内容主要有两个
- 检测协程的IO
- 调度睡眠、就绪、等待区中的协程
调度器定义
typedef struct _nty_schedule {uint64_t birth; //调度器创建时间nty_cpu_ctx ctx; //CPU寄存器void *stack; //栈size_t stack_size; int spawned_coroutines; //正在调度的协程数量uint64_t default_timeout; //默认超时时间struct _nty_coroutine *curr_thread; //正在运行的协程int page_size;int poller_fd; //epollfdint eventfd; //eventstruct epoll_event eventlist[NTY_CO_MAX_EVENTS]; //epoll事件集合int nevents; //events数量int num_new_events;pthread_mutex_t defer_mutex;nty_coroutine_queue ready; //就绪队列nty_coroutine_queue defer;nty_coroutine_link busy;nty_coroutine_rbtree_sleep sleeping; //睡眠区,红黑树nty_coroutine_rbtree_wait waiting; //等待红黑树
#if COROUTINE_MP
#endif
//private
} nty_schedule;
调度器如何检测协程的IO
1、协程遇到IO读写之前会将自己的fd添加到poll或者epoll
- 如果需要阻塞检测这个协程的IO时,就使用poll,若不是则使用epoll。
- 随后获取调度器中的epollfd,并设置epoll属性,然后添加自己的fd到epoll中。
- 添加之后将协程结构体放入睡眠红黑树和等待红黑树上。
- 其中加入睡眠红黑树时,会设置睡眠时间,睡眠时间=当前的时间 - 调度器创建的时间 + timeout=timeout
- 如果树上出现相同时间则timeout++(很微小),防止冲突。
- 最后协程保留现场将CPU让出给调度器。
- 当调度器将CPU给回协程时将协程从睡眠树和等待树上移除,将fd从epollfd中移除。
static int nty_poll_inner(struct pollfd *fds, nfds_t nfds, int timeout) {if (timeout == 0)return poll(fds, nfds, timeout); //如果需要阻塞检测这个协程的IOif (timeout < 0)timeout = INT_MAX;nty_schedule *sched = nty_coroutine_get_sched(); //获取调度器nty_coroutine *co = sched->curr_thread;for (int i = 0;i < nfds;i ++) { //将fd添加到epoll中进行管理struct epoll_event ev;ev.events = nty_pollevent_2epoll(fds[i].events);ev.data.fd = fds[i].fd;epoll_ctl(sched->poller_fd, EPOLL_CTL_ADD, fds[i].fd, &ev);co->events = fds[i].events;nty_schedule_sched_wait(co, fds[i].fd, fds[i].events, timeout); //将协程加入睡眠和等待红黑树上}nty_coroutine_yield(co); //让出CPU/*调度器将CPU给回协程*/for (i = 0;i < nfds;i ++) { //将fd从epoll中移除struct epoll_event ev;ev.events = nty_pollevent_2epoll(fds[i].events);ev.data.fd = fds[i].fd;epoll_ctl(sched->poller_fd, EPOLL_CTL_DEL, fds[i].fd, &ev);nty_schedule_desched_wait(fds[i].fd); //将协程从等待和睡眠红黑树上移除}return nfds;
2、调度器如何管理epoll
- 先计算出epoll的等待时间
- 如果就绪队列为空就进行epoll_wait
- 等到了调度器就保留信息
static int nty_schedule_epoll(nty_schedule *sched) {sched->num_new_events = 0;struct timespec t = {0, 0};uint64_t usecs = nty_schedule_min_timeout(sched); //先取出等待列中时间最小的协程,//然后计算出当前时间距离最小时间还有多久并返回,如果等待列中没有协程则返回0if (usecs && TAILQ_EMPTY(&sched->ready)) { //如果就绪就绪队列是空的,就返回0,让就绪队列执行t.tv_sec = usecs / 1000000u;if (t.tv_sec != 0) {t.tv_nsec = (usecs % 1000u) * 1000u;} else {t.tv_nsec = usecs * 1000u;}} else {return 0;}int nready = 0;while (1) {nready = nty_epoller_wait(t); //epoll_wait, t是距离最近协程的时间if (nready == -1) {if (errno == EINTR) continue;else assert(0);}break;}//等到了调度器就保留epoll的信息sched->nevents = 0;sched->num_new_events = nready;return 0;
}
重点:调度器如何调度正在等待、睡眠、就绪的协程?
void nty_schedule_run(void)
睡眠
根据红黑树上协程设置的睡眠时间来判断是否要恢复现场。
nty_coroutine *expired = NULL;while ((expired = nty_schedule_expired(sched)) != NULL) { //获取已经完成睡眠时间的协程nty_coroutine_resume(expired); //恢复协程运行}
static nty_coroutine *nty_schedule_expired(nty_schedule *sched) {uint64_t t_diff_usecs = nty_coroutine_diff_usecs(sched->birth, nty_coroutine_usec_now());nty_coroutine *co = RB_MIN(_nty_coroutine_rbtree_sleep, &sched->sleeping); //拿最小的if (co == NULL) return NULL;if (co->sleep_usecs <= t_diff_usecs) { //如果已经到了设置的时间返回coRB_REMOVE(_nty_coroutine_rbtree_sleep, &co->sched->sleeping, co);return co;}return NULL;
}
就绪
很直观,如果就绪队列上有就绪协程就直接恢复运行。
nty_coroutine *last_co_ready = TAILQ_LAST(&sched->ready, _nty_coroutine_queue);while (!TAILQ_EMPTY(&sched->ready)) {nty_coroutine *co = TAILQ_FIRST(&sched->ready);TAILQ_REMOVE(&co->sched->ready, co, ready_next);if (co->status & BIT(NTY_COROUTINE_STATUS_FDEOF)) {nty_coroutine_free(co);break;}nty_coroutine_resume(co);if (co == last_co_ready) break;}
等待
使用epoll进行管理,文章上面也提到了。使用epoll获取已经就绪的fd,然后通过fd再红黑树上查找到对应的协程,最后恢复协程运行。
nty_schedule_epoll(sched); //epollwhile (sched->num_new_events) {//如果epoll检测到有IO就绪int idx = --sched->num_new_events;struct epoll_event *ev = sched->eventlist+idx;int fd = ev->data.fd;int is_eof = ev->events & EPOLLHUP;if (is_eof) errno = ECONNRESET;nty_coroutine *co = nty_schedule_search_wait(fd);if (co != NULL) {if (is_eof) {co->status |= BIT(NTY_COROUTINE_STATUS_FDEOF);}nty_coroutine_resume(co);}is_eof = 0;}
至此,调度器对协程的所有调度就这么多。
总的来说
总的来说协程和调度器的运行关系就是
- 一旦协程遇到IO就保留现场,将fd交由epoll管理,并让出CPU给调度器。
- 调度器对等待区、就绪区、睡眠区的协程进行管理。
- 对等待区的协程是使用epoll管理,如果协程的IO就绪了,就恢复运行协程。
- 对睡眠区的协程是使用睡眠时间管理,如果时间到了就恢复协程运行。
- 对就绪区的协程是就绪队列有协程就直接运行协程。
协程和调度器之间如何切换工作环境
将工作环境保存再协程和调度器内部,工作环境就是指CPU的寄存器,这里使用一个结构体。
typedef struct _nty_cpu_ctx {void *esp; //void *ebp;void *eip;void *edi;void *esi;void *ebx;void *r1;void *r2;void *r3;void *r4;void *r5;
} nty_cpu_ctx;
实现切换就是改变寄存器的值,使用mov指令。先保留当前的CPU寄存器,再将新的CPU覆盖原来的寄存器。
#elif defined(__x86_64__)
__asm__ (
" .text \n"
" .p2align 4,,15 \n"
".globl _switch \n"
".globl __switch \n"
"_switch: \n"
"__switch: \n" "#保留旧的工作环境 "
"#将rsi(cur_ctx)放入寄存器"
" rsi-->cur_ctx 数字是对应偏移字节数"" movq %rsp, 0(%rsi) # esp = rsp 栈顶 \n"
" movq %rbp, 8(%rsi) # ebp = rbp 栈底 \n"
"#rsi 偏移8, 对应cur_ctx 的第2个值栈底指针"
"#rsi 偏移0, 对应cur_ctx 的第1个值 栈顶指针"" movq (%rsp), %rax # save insn_pointer \n"
"#将栈顶指针赋值给函数返回值,ret后出栈,执行栈顶""#rax = eip 函数返回值"
"#保留rbx r12-r15"
" movq %rax, 16(%rsi) \n"
" movq %rbx, 24(%rsi) # 保存rbx,r12-r15 \n"
" movq %r12, 32(%rsi) \n"
" movq %r13, 40(%rsi) \n"
" movq %r14, 48(%rsi) \n"
" movq %r15, 56(%rsi) \n" "#新的工作环境"
" rdi-->new_ctx "
" movq 56(%rdi), %r15 \n"
" movq 48(%rdi), %r14 \n"
" movq 40(%rdi), %r13 # restore rbx,r12-r15 \n"
" movq 32(%rdi), %r12 \n"
" movq 24(%rdi), %rbx \n"
" movq 8(%rdi), %rbp # restore frame_pointer \n"
" movq 0(%rdi), %rsp # restore stack_pointer \n"
" movq 16(%rdi), %rax # restore insn_pointer \n" "#将函数返回值赋值给栈顶寄存器"
" movq %rax, (%rsp) \n""#跳转执行栈顶,即进入新的工作空间"
" ret \n"
);
#endif
hook重写系统调用
使用dlsym,这部分实现简单。具体实现看代码,主要是将socket设置成非阻塞的,socket地址可以复用。
int init_hook(void) {socket_f = (socket_t)dlsym(RTLD_NEXT, "socket");//read_f = (read_t)dlsym(RTLD_NEXT, "read");recv_f = (recv_t)dlsym(RTLD_NEXT, "recv");recvfrom_f = (recvfrom_t)dlsym(RTLD_NEXT, "recvfrom");//write_f = (write_t)dlsym(RTLD_NEXT, "write");send_f = (send_t)dlsym(RTLD_NEXT, "send");sendto_f = (sendto_t)dlsym(RTLD_NEXT, "sendto");accept_f = (accept_t)dlsym(RTLD_NEXT, "accept");close_f = (close_t)dlsym(RTLD_NEXT, "close");connect_f = (connect_t)dlsym(RTLD_NEXT, "connect");
}
代码自行Git
git@github.com:qiushii/fastDFS.git
【协程】MyCoroutine轻量级协程框架代码详细剖解相关推荐
- Python 进程、线程、协程傻傻分不清楚?详细总结(附代码)
目录 1 什么是并发编程? 2 进程与多进程 3 线程与多线程 4 协程与多协程 5 总结 1 什么是并发编程? 并发编程是实现多任务协同处理,改善系统性能的方式.Python中实现并发编程主要依靠 ...
- 干货 | 携程基于Quasar协程的NIO实践
作者简介 Ryan,携程Java开发工程师,对高并发.网络编程等领域有浓厚兴趣. IO密集型系统在高并发场景下,会有大量线程处于阻塞状态,性能低下,JAVA上成熟的非阻塞IO(NIO)技术可解决该问题 ...
- 携程基于Quasar协程的NIO实践
IO密集型系统在高并发场景下,会有大量线程处于阻塞状态,性能低下,JAVA上成熟的非阻塞IO(NIO)技术可解决该问题.目前Java项目对接NIO的方式主要依靠回调,代码复杂度高,降低了代码可读性与可 ...
- java 修改最大nio连接数_携程基于Quasar协程的NIO实践
IO密集型系统在高并发场景下,会有大量线程处于阻塞状态,性能低下,JAVA上成熟的非阻塞IO(NIO)技术可解决该问题.目前Java项目对接NIO的方式主要依靠回调,代码复杂度高,降低了代码可读性与可 ...
- python多线程调用携程,Python 协程,Python携程
Python 协程,Python携程 协程 进程:操作系统中存在 线程:操作系统中存在 协程:是微线程 模块(greenlet) 协程不是一个真实存在的东西,是由程序员创造出来的 协程,是对一个线程分 ...
- 深入浅出c++协程丨C++协程实现
1|0一些实现的c++协程 C++协程实现相关视频讲解:(视频代码资料点击 正在跳转 获取) 协程的实现与原理剖析(上) 协程的实现与原理剖析(下) 协程是一种函数对象,可以设置锚点做暂停,然后再该锚 ...
- python 协程可以嵌套协程吗_Python线程、协程探究(2)——揭开协程的神秘面纱...
一.上集回顾 在上一篇中我们主要研究了python的多线程困境,发现多核情况下由于GIL的存在,python的多线程程序无法发挥多线程该有的并行威力.在文章的结尾,我们提出如下需求: 既然python ...
- python 协程可以嵌套协程吗_Python | 详解Python中的协程,为什么说它的底层是生成器?...
今天是Python专题的第26篇文章,我们来聊聊Python当中的协程. 我们曾经在golang关于goroutine的文章当中简单介绍过协程的概念,我们再来简单review一下.协程又称为是微线程, ...
- python 协程库_python --- 协程编程(第三方库gevent的使用)
1. 什么是协程? 协程(coroutine),又称微线程.协程不是线程也不是进程,它的上下文关系切换不是由CPU控制,一个协程由当前任务切换到其他任务由当前任务来控制.一个线程可以包含多个协程,对于 ...
最新文章
- python安装步骤电脑版-超详细的小白python3.X安装教程|Python安装
- Android 经典欧美小游戏 guess who
- 收藏 | 9 个技巧让你的 PyTorch 模型训练变得飞快!
- 《机器学习实战》学习笔记第七章 —— AdaBoost元算法
- 6. Zend_Uri
- mysql rownum写法_mysql类似oracle rownum写法实例详解
- Chrome 里的小恐龙游戏是怎么做出来的?
- 百度地图三维效果实现
- 【2016NOIP十连测】【test4】【状压DP】【容斥原理】巨神兵
- 通过一道CTF题,学习pillow模块切割、合并图片
- python怎么输出变量加文字书名_python的交互模式怎么输出名文汉字
- 云开发电商小程序实战教程-篇首语
- fifa18怎么改服务器位置,fifa18 球员职业生涯怎么改位置 | 手游网游页游攻略大全...
- Pycharm Debugger - Frames Not Available
- python爬取微博数据词云_爬虫篇:使用Python动态爬取某大V微博,再用词云分析...
- The Generalized Detection Method for the Dim Small Targets by Faster R-CNN Integrated with GAN 论文翻译
- PMOS管/NMOS管控制供电电路
- [Ynoi2015]纵使日薄西山
- webstorm快捷键问题,求大神赐教
- 教育数据大全 1949-2021年全国省级地级市人力资本受教育年限 上市公司教育背景学历结构 教育支出 学校教职工学生