srs源码分析2-浅析state_threads
本文分析的state-threads的版本是1.9
。
srs源码分析1-搭建环境
srs源码分析2-浅析state_threads
srs源码分析3-srs的启动
srs源码分析4-客户端的连接
srs源码分析5-handshake
srs源码分析6-connect
以下正在写作中。。。
srs源码分析7-create stream
srs源码分析8-推流-publish
srs源码分析9-推流-unpublish
srs源码分析10-拉流-play
srs源码分析11-拉流-pause
srs源码分析12-转发-forward
srs是基于协程开发的,底层使用了state_threads协程库。为了更好的理解srs,所以需要先熟悉state_threads。这里并不会介绍协程的相关概念,只是简单的介绍一下state_threads的核心逻辑。
以下state_thread会被简称为st。
使用示例-echo server
使用st实现了一个简单的echo服务器,以下代码写的很简单,重点是理解st的使用。
#include <arpa/inet.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <st.h>#define LISTEN_PORT 9000#define ERR_EXIT(m) \do { \perror(m); \exit(-1); \} while (0)void *client_thread(void *arg) {st_netfd_t client_st_fd = (st_netfd_t)arg;int client_fd = st_netfd_fileno(client_st_fd);sockaddr_in client_addr;socklen_t client_addr_len = sizeof(client_addr);int ret = getpeername(client_fd, (sockaddr *)&client_addr, &client_addr_len);if (ret == -1) {printf("[WARN] Failed to get client ip: %s\n", strerror(ret));}char ip_buf[INET_ADDRSTRLEN];bzero(ip_buf, sizeof(ip_buf));inet_ntop(client_addr.sin_family, &client_addr.sin_addr, ip_buf,sizeof(ip_buf));while (1) {char buf[1024] = {0};ssize_t ret = st_read(client_st_fd, buf, sizeof(buf), ST_UTIME_NO_TIMEOUT);if (ret == -1) {printf("client st_read error\n");break;} else if (ret == 0) {printf("client quit, ip = %s\n", ip_buf);break;}printf("recv from %s, data = %s", ip_buf, buf);ret = st_write(client_st_fd, buf, ret, ST_UTIME_NO_TIMEOUT);if (ret == -1) {printf("client st_write error\n");}}
}void *listen_thread(void *arg) {while (1) {st_netfd_t client_st_fd =st_accept((st_netfd_t)arg, NULL, NULL, ST_UTIME_NO_TIMEOUT);if (client_st_fd == NULL) {continue;}printf("get a new client, fd = %d\n", st_netfd_fileno(client_st_fd));st_thread_t client_tid =st_thread_create(client_thread, (void *)client_st_fd, 0, 0);if (client_tid == NULL) {printf("Failed to st create client thread\n");}}
}int main() {int ret = st_set_eventsys(ST_EVENTSYS_ALT);if (ret == -1) {printf("st_set_eventsys use linux epoll failed\n");}ret = st_init();if (ret != 0) {printf("st_init failed. ret = %d\n", ret);return -1;}int listen_fd = socket(AF_INET, SOCK_STREAM, 0);if (listen_fd == -1) {ERR_EXIT("socket");}int reuse_socket = 1;ret = setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &reuse_socket,sizeof(int));if (ret == -1) {ERR_EXIT("setsockopt");}struct sockaddr_in server_addr;server_addr.sin_family = AF_INET;server_addr.sin_port = htons(LISTEN_PORT);server_addr.sin_addr.s_addr = INADDR_ANY;ret =bind(listen_fd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr));if (ret == -1) {ERR_EXIT("bind");}ret = listen(listen_fd, 128);if (ret == -1) {ERR_EXIT("listen");}st_netfd_t st_listen_fd = st_netfd_open_socket(listen_fd);if (!st_listen_fd) {printf("st_netfd_open_socket open socket failed.\n");return -1;}st_thread_t listen_tid =st_thread_create(listen_thread, (void *)st_listen_fd, 1, 0);if (listen_tid == NULL) {printf("Failed to st create listen thread\n");}while (1) {st_sleep(1); /*用于让出CPU执行权,重新调度就绪的协程。*/}return 0;
}
root@learner:~/tmp/st# gcc main.cpp -lst
root@learner-Lenovo:~/tmp/st# ./a.out
get a new client, fd = 4
recv from 192.168.30.17, data = hello world
client quit, ip = 192.168.30.17
^C
root@learner:~# nc 192.168.30.17 9000
hello world
hello world
^C
创建一个listen协程,用于监听客户端的连接,当客户端连接服务后,会为此客户端创建一个client协程,用于处理此客户端的所有请求。
协程的切换
st中协程的切换提供了两种方式:一种是使用系统提供的setjmp
和longjmp
接口,另一种是使用汇编实现的_st_md_cxt_save
和_st_md_cxt_restore
接口,这两个函数从用法上同setjmp和longjmp。
这两种方式的切换本质上都是栈帧的切换。
setjmp和longjmp
C语言中的goto语句只能在当前函数内跳转,而不能在函数间跳转。setjmp()和longjmp()可以执行非局部跳转,即跳转的目标为当前执行函数之外的某个位置。
setjmp()函数为后续由longjmp()调用执行的跳转确立了跳转目标,该目标正是程序发起setjmp()调用的位置。从编程角度看来,调用longjmp()函数后,看起来就和从第二次调用setjmp()返回时完全一样。通过setjmp()的返回值,可以区分setjmp()调用是初始返回还是第二次返回。初始调用返回值为0,后续“伪返回”的返回值为longjmp()调用中val参数所指定的任意值。通过对val参数使用不同值,能够区分程序中跳转至同一目标的不同起跳位置。更多相关setjmp()、longjmp()的介绍,可以参考《Linux/UNIX系统编程手册》上册第106页。
以下是从《Linux/UNIX系统编程手册》摘抄的示例:
#include <stdio.h>
#include <stdlib.h>
#include <setjmp.h>jmp_buf env;void f2(int num)
{longjmp(env, num);
}void f1(int num)
{if(num == 1){longjmp(env, num);}f2(num);
}int main(int argc, char** argv)
{if(argc != 2){printf("Usage: %s [1|2]\n", argv[0]);return -1;}switch(setjmp(env)){case 0:printf("Calling f1() after initial setjmp()\n");f1(atoi(argv[1]));break;case 1:printf("We jumped back from f1()\n");break;case 2:printf("We jumped back from f2()\n");break;}return 0;
}
这个示例我稍微做了一些修改,运行结果及分析如下:
root@learner:~/tmp# ./a.out 1
Calling f1() after initial setjmp()
We jumped back from f1()
root@learner:~/tmp# ./a.out 2
Calling f1() after initial setjmp()
We jumped back from f2()
_st_md_cxt_save和_st_md_cxt_restore
这两个函数是通过汇编实现的,代码如下:
#define JB_BX 0
#define JB_SI 1
#define JB_DI 2
#define JB_BP 3
#define JB_SP 4
#define JB_PC 5.file "md.S"
.text/* _st_md_cxt_save(__jmp_buf env) 存储函数栈帧 */
.globl _st_md_cxt_save.type _st_md_cxt_save, @function.align 16
_st_md_cxt_save:movl 4(%esp), %eax /*取得参数env的地址,保存到eax中。*/movl %ebx, (JB_BX*4)(%eax) /*保存ebx*/movl %esi, (JB_SI*4)(%eax) /*保存esi*/movl %edi, (JB_DI*4)(%eax) /*保存edi*//*保存esp,即栈顶,保存的栈顶是没有调用_st_md_cxt_save()函数之前的栈顶*/leal 4(%esp), %ecx /movl %ecx, (JB_SP*4)(%eax) /*保存ecx*/movl 0(%esp), %ecx movl %ecx, (JB_PC*4)(%eax) /*保存引用计数器pc*/movl %ebp, (JB_BP*4)(%eax) /*保存ebp 即调用_st_md_cxt_save()的函数的ebp*/xorl %eax, %eax /*清空eax 作为_st_md_cxt_save()的返回值*/ret
.size _st_md_cxt_save, .-_st_md_cxt_save/* _st_md_cxt_restore(__jmp_buf env, int val) 恢复函数栈帧 */
.globl _st_md_cxt_restore.type _st_md_cxt_restore, @function.align 16
_st_md_cxt_restore:movl 4(%esp), %ecx /*获取第一个参数的地址,即env的地址。*/movl 8(%esp), %eax /*获取第二个参数的地址,即val的地址。*/movl (JB_PC*4)(%ecx), %edx /*将原pc寄存器的值保存到edx中*/movl (JB_BX*4)(%ecx), %ebx /*恢复ebx*/movl (JB_SI*4)(%ecx), %esi /*恢复esi*/movl (JB_DI*4)(%ecx), %edi /*恢复edi*/movl (JB_BP*4)(%ecx), %ebp /*恢复ebp*/movl (JB_SP*4)(%ecx), %esp /*恢复esp*/testl %eax, %eax /*测试eax的值是否为0,也就是第二个参数是否为0。*/jnz 1f /*如果第二个参数不为0,则直接跳转到1:执行。*/incl %eax /*将返回值置为1*/1: jmp *%edx /*跳转到之前pc处*/
.size _st_md_cxt_restore, .-_st_md_cxt_restore
_st_md_cxt_save(__jmp_buf env)用于保存栈帧,_st_md_cxt_restore(__jmp_buf env, int val)用于恢复栈帧。
st中协程的切换宏
#if defined(MD_USE_BUILTIN_SETJMP) && !defined(USE_LIBC_SETJMP) #define MD_SETJMP(env) _st_md_cxt_save(env)#define MD_LONGJMP(env, val) _st_md_cxt_restore(env, val)extern int _st_md_cxt_save(jmp_buf env);extern void _st_md_cxt_restore(jmp_buf env, int val);
#else#define MD_SETJMP(env) setjmp(env) #define MD_LONGJMP(env, val) longjmp(env, val)
#endif
如果定义了MD_USE_BUILTIN_SETJMP
宏,且没有定义USE_LIBC_SETJMP
宏,则使用自定义的栈帧存取函数。否则使用系统提供的setjmp和longjmp切换栈帧。
#define _ST_SWITCH_CONTEXT(_thread) \ST_BEGIN_MACRO \ST_SWITCH_OUT_CB(_thread); \if (!MD_SETJMP((_thread)->context)) { \ /*调出协程返回0,调入协程返回1。*/ _st_vp_schedule(); \ /*选择下一个需要调度的协程*/} \ST_DEBUG_ITERATE_THREADS(); \ST_SWITCH_IN_CB(_thread); \ST_END_MACRO
_ST_SWITCH_CONTEXT
用于将协程的CPU执行权让出去,重新调度一个新的协程。
当协程调用_ST_SWITCH_CONTEXT
时,此时MD_SETJMP会返回0,则进入协程调度函数_st_vp_schedule(),CPU的执行权转移到其他协程。此时相当于在本协程中打上了一个切换点。当本协程将再次获得CPU执行权时,在_st_vp_schedule()中调用_ST_RESTORE_CONTEXT宏函数,会通过MD_SETJMP再次返回,此时返回值为1,跳过if语句返回到本协程调用_ST_SWITCH_CONTEXT的位置,继续往下执行。
#define _ST_RESTORE_CONTEXT(_thread) \ST_BEGIN_MACRO \_ST_SET_CURRENT_THREAD(_thread); \ /*标记此协程为当前运行的协程*/MD_LONGJMP((_thread)->context, 1); \ /*执行协程切换 恢复之前挂起的协程*/ST_END_MACRO
_ST_RESTORE_CONTEXT
用于恢复指定的协程,通过MD_LONGJMP宏,返回到MD_SETJMP打的断点处,从MD_SETJMP再次返回,从而再次获取到CPU的执行权。
void _st_vp_schedule(void)
{_st_thread_t *thread;/*从就绪的协程队列中取出一个协程*/if (_ST_RUNQ.next != &_ST_RUNQ) {thread = _ST_THREAD_PTR(_ST_RUNQ.next);_ST_DEL_RUNQ(thread); /*从就绪协程队列删除*/} else { /*如果就绪的协程队列为空,说明所有的就绪协程都处理完毕了。*/thread = _st_this_vp.idle_thread; /*现在切换至idle协程*/}ST_ASSERT(thread->state == _ST_ST_RUNNABLE); /*该协程必须处于可运行状态*/thread->state = _ST_ST_RUNNING; /*将即将运行协程的状态标记为正在运行*/_ST_RESTORE_CONTEXT(thread); /*切换至新的协程*/
}
在切换协程时,会从就绪的协程队列中取出一个协程,然后切换至该协程。如果就绪队列中没有可切换的协程,则说明没有协程需要处理,此时会切换至idle协程。返回idle协程后,会重新进入epoll_wait,重新开始监听待发生的事件和处理定时事件。
调度器
所有的协程都是在一个单线程中执行的,所以需要有一个调度器来调度所有的协程,以便需要执行权限的协程能够获取到CPU。通常协程在发生读事件
、写事件
、定时器事件
时才需要执行权限,也就是发生这些事件后,需要将协程调度到CPU上,让其获得CPU的执行权,处理对应的事情。
st中对读写事件的监控是通过epoll实现的,而定时器事件通过最小堆配合epoll的超时实现的。
typedef struct _st_eventsys_ops {const char *name; /* Name of this event system */int val; /* Type of this event system */int (*init)(void); /* Initialization */void (*dispatch)(void); /* Dispatch function */int (*pollset_add)(struct pollfd *, int); /* Add descriptor set */void (*pollset_del)(struct pollfd *, int); /* Delete descriptor set */int (*fd_new)(int); /* New descriptor allocated */int (*fd_close)(int); /* Descriptor closed */int (*fd_getlimit)(void); /* Descriptor hard limit */
} _st_eventsys_t;
这是调度器的接口,可以使用epoll实现,也可以使用select和poll实现。
static _st_eventsys_t _st_epoll_eventsys = {"epoll",ST_EVENTSYS_ALT,_st_epoll_init,_st_epoll_dispatch,_st_epoll_pollset_add,_st_epoll_pollset_del,_st_epoll_fd_new,_st_epoll_fd_close,_st_epoll_fd_getlimit
};
st中通过epoll实现了调度器,实现的这些函数作为回调函数封装到了结构体中。
ST_HIDDEN void _st_epoll_dispatch(void)
{...if (_ST_SLEEPQ == NULL) { /* 定时队列为空,说明没有定时器事件,则epoll_wait的超时时间为-1,即没有事件触发时,epoll_wait一直阻塞。*/timeout = -1;} else { /*从定时队列获取最小定时器*/min_timeout = (_ST_SLEEPQ->due <= _ST_LAST_CLOCK) ? 0 : (_ST_SLEEPQ->due - _ST_LAST_CLOCK);/*换算epoll_wait的超时时间 单位:us */timeout = (int) (min_timeout / 1000);
...}/*进入epoll等待事件的触发,也可能因为超时而退出。*/nfd = epoll_wait(..., ..., ..., timeout);
...pq->thread->state = _ST_ST_RUNNABLE; /*把协程的状态设置为可运行状态*/_ST_ADD_RUNQ(pq->thread); /*将协程添加到运行队列,等待新一轮的调度。*/
...
}
在进入epoll_wait之前,先从最小堆中获取最近一个定时器触发的时间,将此时间作为epoll_wait的超时时间,如果在这个超时时间之内发生了读写事件,则epoll_wait返回处理读写事件;如果段超时时间之内没有发生读写事件,epoll_wait会因为超时而退出,此时返回正好处理定时事件。
若不是因为超时而从epoll_wait返回,说明有的协程读写事件触发了,此时需要将触发事件的协程保存到可运行队列中,等待新一轮的调度。
创建协程
_st_thread_t *st_thread_create(void *(*start)(void *arg), void *arg, int joinable, int stk_size)
{_st_thread_t *thread;_st_stack_t *stack;void **ptds;char *sp;/* Adjust stack size 调整栈大小*/if (stk_size == 0)stk_size = ST_DEFAULT_STACK_SIZE; /*默认栈大小是128KB*//*页大小对齐*/stk_size = ((stk_size + _ST_PAGE_SIZE - 1) / _ST_PAGE_SIZE) * _ST_PAGE_SIZE; /*申请栈空间*/stack = _st_stack_new(stk_size);if (!stack)return NULL;sp = stack->stk_top; /*栈顶*/sp = sp - (ST_KEYS_MAX * sizeof(void *));/*栈顶空出一块区域,用于存放私有的数据。*/ptds = (void **) sp;sp = sp - sizeof(_st_thread_t); /*再空出一个_st_thread_t大小*/thread = (_st_thread_t *) sp;if ((unsigned long)sp & 0x3f)sp = sp - ((unsigned long)sp & 0x3f);stack->sp = sp - _ST_STACK_PAD_SIZE; /*栈顶再空出128字节的填充区域*/memset(thread, 0, sizeof(_st_thread_t));memset(ptds, 0, ST_KEYS_MAX * sizeof(void *));thread->private_data = ptds; /*指向协程私有数据*/thread->stack = stack; /*指向协程栈*/thread->start = start; /*协程入口函数*/thread->arg = arg; /*入口函数参数*//*保存切换上下文,打上还原点,当本协程下次获取到执行权限时,从这个还原点接着执行。*/_ST_INIT_CONTEXT(thread, stack->sp, _st_thread_main);/*如果需要主动回收协程,则需要协程创建一个条件变量,用于阻塞等待回收协程。*/if (joinable) { thread->term = st_cond_new();if (thread->term == NULL) {_st_stack_free(thread->stack);return NULL;}}thread->state = _ST_ST_RUNNABLE; /*标记协程为可运行状态*/_st_active_count++; /*增加活跃协程的个数*/_ST_ADD_RUNQ(thread); /*将协程插入到运行队列*/return thread;
}
创建一个新的协程,在创建的过程中,会将这个协程放到可运行队列,等待着调度。在调度到这个新的协程时,就可以获得CPU的执行权。
除了主协程外,其他协程的栈都是在堆上申请的空间,默认大小时128KB。
#define _ST_INIT_CONTEXT MD_INIT_CONTEXT#define MD_INIT_CONTEXT(_thread, _sp, _main) \ST_BEGIN_MACRO \if (MD_SETJMP((_thread)->context)) \ /*设置还原点,或从还原点返回。*/_main(); \ MD_GET_SP(_thread) = (long) (_sp); \ /*设置ctx中sp寄存器的值,设置新的栈帧*/ST_END_MACRO
在创建新协程时,会通过上面的宏函数设置还原点,当执行到MD_SETJMP时,会返回0,此时_main()函数不会被执行。当协程再次获取执行权时,会再次从MD_SETJMP返回,此时返回值为1,则进入_main()函数,也就是_st_thread_main()
函数。
void _st_thread_main(void)
{_st_thread_t *thread = _ST_CURRENT_THREAD(); /*获取当前协程的句柄*/MD_CAP_STACK(&thread);thread->retval = (*thread->start)(thread->arg); /*执行协程入口函数*/st_thread_exit(thread->retval); /*协程退出*/
}
新的协程创建后,并不会立即被执行,需要先打上还原点,然后放入可执行队列中。当调度器调度到这个新线程后才会真正获取到CPU的执行权,在MD_SETJMP返回后,进入这个函数,在此函数中才会进入协程的入口函数。协程入口函数处理完毕后,会进入协程退出函数,这个稍后分析。
st的初始化
int st_init(void)
{_st_thread_t *thread;if (_st_active_count) { /*如果已经初始化,则直接返回。*/return 0;}st_set_eventsys(ST_EVENTSYS_DEFAULT); /*设置epoll封装的接口 */if (_st_io_init() < 0)return -1;memset(&_st_this_vp, 0, sizeof(_st_vp_t));/*三个队列的初始化*/ST_INIT_CLIST(&_ST_RUNQ); /*可运行队列*/ST_INIT_CLIST(&_ST_IOQ); /*io队列*/ST_INIT_CLIST(&_ST_ZOMBIEQ); /*僵尸态队列*/if ((*_st_eventsys->init)() < 0) /*epoll的初始化*/return -1;_st_this_vp.pagesize = getpagesize(); /*页大小*/_st_this_vp.last_clock = st_utime(); /*时钟时间*//* 创建一个idle协程 */_st_this_vp.idle_thread = st_thread_create(_st_idle_thread_start, NULL, 0, 0);if (!_st_this_vp.idle_thread)return -1;_st_this_vp.idle_thread->flags = _ST_FL_IDLE_THREAD; /*标识为idle协程*/_st_active_count--; _ST_DEL_RUNQ(_st_this_vp.idle_thread); /*从可运行队列中删除idle协程*//*为主协程封装一个_st_thread_t */thread = (_st_thread_t *) calloc(1, sizeof(_st_thread_t) + (ST_KEYS_MAX * sizeof(void *))); if (!thread)return -1;thread->private_data = (void **) (thread + 1); /*指向协程私有数据*/thread->state = _ST_ST_RUNNING; /*设置协程为可运行状态*/thread->flags = _ST_FL_PRIMORDIAL; /*标识为主协程*/_ST_SET_CURRENT_THREAD(thread); /*设置当前工作的协程*/_st_active_count++; /*增加活跃协程个数*/return 0;
}
在使用st时,首先需要调用st_init()函数对st进行初始化。这个函数有三个作用:1、做一些初始化工作 2、创建idle协程 3、将主线程封装为主协程
主线程也是一条可执行流,需要将主线程封装成主协程,以便能够在调度器中进行调度。
idle协程是非常核心的,当就绪队列中没有可运行的协程时,会将CPU的执行权限调度到idle协程。在idle协程中重新开始监听读、写、定时器事件。
void *_st_idle_thread_start(void *arg)
{_st_thread_t *me = _ST_CURRENT_THREAD();while (_st_active_count > 0) {_ST_VP_IDLE(); /*进入epoll_wait,监听读写事件*/_st_vp_check_clock(); /*处理定时器事件*/me->state = _ST_ST_RUNNABLE; /*将idle线程标记为可运行状态*/_ST_SWITCH_CONTEXT(me); /*让出CPU执行权,重新开始调度。*/}exit(0);return NULL;
}
当就绪队列为空时,调度会进入idle线程,在idle线程中,会进入epoll_wait监听读写事件,有读写事件触发时,会将协程保存到就绪队列中;从epoll_wait返回后,查看是否有定时器触发,若有定时器触发,则将协程保存到就绪队列中。处理完读写事件和定时器事件后,idle协程让出CPU执行权,开始依次调度所有的就绪协程,所有的就绪协程处理完毕后,会再次进入idle协程,之后都是这样循环往复。
#define _ST_VP_IDLE() (*_st_eventsys->dispatch)()
_st_eventsys->dispatch是回调函数,这个函数指针实际指向_st_epoll_dispatch。
void _st_vp_check_clock(void)
{_st_thread_t *thread;st_utime_t now;now = st_utime(); /*获取当前时间*/_ST_LAST_CLOCK = now;if (_st_curr_time && now - _st_last_tset > 999000) {_st_curr_time = time(NULL);_st_last_tset = now;}while (_ST_SLEEPQ != NULL) { /*睡眠队列不为空*/thread = _ST_SLEEPQ; /*获取最小堆上的最小的定时器*/ST_ASSERT(thread->flags & _ST_FL_ON_SLEEPQ);if (thread->due > now) break; /*协程的定时器还没有到,立即返回。*//*协程的定时器触发了*/_ST_DEL_SLEEPQ(thread); /*从睡眠队列中删除*//*协程是因为条件变量而睡眠的,现在条件变量超时了。*/if (thread->state == _ST_ST_COND_WAIT) thread->flags |= _ST_FL_TIMEDOUT; ST_ASSERT(!(thread->flags & _ST_FL_IDLE_THREAD));thread->state = _ST_ST_RUNNABLE; /*标记协程为可运行状态*/_ST_INSERT_RUNQ(thread); /*将协程送至就绪队列,等待调度。*/}
}
从epoll_wait返回后,检查睡眠队列中的协程,当其定时器到了,则将协程送至就绪队列,等待新一轮的调度。
所有的定时器都放在最小堆中,从最小堆中获取到的是所有定时器的最小值。如果当前时间超过了最小堆中的定时器,说明定时器触发了。通过while循环将最小堆中的所有该触发的定时器全部都保存到就绪队列中。
协程的exit、join和yield
_st_thread_t *st_thread_create(void *(*start)(void *arg), void *arg, int joinable, int stk_size)
{...if (joinable) { /*如果协程需要主动回收,则为协程创建一个条件变量。*/thread->term = st_cond_new(); /*创建条件变量*/if (thread->term == NULL) {_st_stack_free(thread->stack);return NULL;}}
...
}
在创建协程的时候,需要指明是否会主动回收协程。如果需要主动回收协程,则需要为协程创建一个条件变量,以便其他协程阻塞的回收该协程。
void _st_thread_main(void)
{_st_thread_t *thread = _ST_CURRENT_THREAD(); /*获取当前协程的句柄*/MD_CAP_STACK(&thread);thread->retval = (*thread->start)(thread->arg); /*执行协程入口函数*/st_thread_exit(thread->retval); /*退出协程*/
}
当协程的主体函数执行完毕后,会进入st_thread_exit函数,用于退出协程。
void st_thread_exit(void *retval)
{_st_thread_t *thread = _ST_CURRENT_THREAD(); /*获取当前协程句柄*/thread->retval = retval; /*保存返回值*/_st_thread_cleanup(thread); /*释放协程的私有数据*/_st_active_count--; /*活跃协程数减一*/if (thread->term) { /*如果需要主动回收此协程*/thread->state = _ST_ST_ZOMBIE; /*设置协程为僵尸态*/_ST_ADD_ZOMBIEQ(thread); /*添加到僵尸态队列*/st_cond_signal(thread->term); /*通知阻塞等待回收的协程*/_ST_SWITCH_CONTEXT(thread); /*让出执行权*/st_cond_destroy(thread->term); /*销毁条件变量*/thread->term = NULL;}/*如果是主协程,则无需释放其对应的栈,否则释放在堆上申请的栈空间。*/if (!(thread->flags & _ST_FL_PRIMORDIAL))_st_stack_free(thread->stack);_ST_SWITCH_CONTEXT(thread); /*销毁完毕让出CPU执行权*/
}
若协程是主协程,则无需释放堆空间,否则需要释放在堆上申请的用于栈的空间。thread->term不为NULL,说明这个协程需要主动的回收,此时需要将协程设置为僵尸态,并加入僵尸态队列。同时通知阻塞等待回收的协程。
int st_thread_join(_st_thread_t *thread, void **retvalp)
{_st_cond_t *term = thread->term; /*获取协程的条件变量*/if (term == NULL) { errno = EINVAL;return -1;}if (_ST_CURRENT_THREAD() == thread) { /*不能是当前协程*/errno = EDEADLK;return -1;}/*不能多个线程回收同时回收同一个协程*/if (term->wait_q.next != &term->wait_q) {errno = EINVAL;return -1;}/*如果协程的状态不是僵尸态,则用于回收的线程将进入条件变量等待。*/while (thread->state != _ST_ST_ZOMBIE) {if (st_cond_timedwait(term, ST_UTIME_NO_TIMEOUT) != 0)return -1;}if (retvalp)*retvalp = thread->retval; /*获取待回收协程的返回值*/thread->state = _ST_ST_RUNNABLE; /*将待回收的协程设置为可运行状态*/_ST_DEL_ZOMBIEQ(thread); /*从僵尸态队列删除*/_ST_ADD_RUNQ(thread); /*加入就绪运行队列*/return 0;
}
协程在回收其他协程,此时待回收的协程还没有退出,主动回收的协程将进入条件变量等待。当待回收的协程退出时,会激活条件变量上的协程。
主动回收的协程从条件变量返回后,此时待回收的协程处于僵尸态,获取返回值后,此时需要再次将待回收的协程置为可运行状态,并加入就绪运行队列。待回收协程会再次进入st_thread_exit()
函数,从_ST_SWITCH_CONTEXT返回,主动销毁条件变量和栈空间,最后通过_ST_SWITCH_CONTEXT让出执行权,这时协程才算退出。
void st_thread_yield()
{_st_thread_t *me = _ST_CURRENT_THREAD(); /*获取当前协程句柄*//*检查是否有定时器事件触发*/_st_vp_check_clock();/*就绪队列为空,则直接返回。*/if (_ST_RUNQ.next == &_ST_RUNQ) {return;}me->state = _ST_ST_RUNNABLE; /*将本协程标记为可运行状态*/_ST_ADD_RUNQ(me); /*把本协程添加到就绪队列中*//*将执行权切换给就绪队列中的其他协程*/_ST_SWITCH_CONTEXT(me);
}
协程在运行的过程中,可以主动的让出执行权。在让出执行权的时候,需要将自己主动加入到就绪队列中,等待再次被调度。
socket的处理
int st_poll(struct pollfd *pds, int npds, st_utime_t timeout)
{struct pollfd *pd;struct pollfd *epd = pds + npds; /*指向数组末尾*/_st_pollq_t pq;_st_thread_t *me = _ST_CURRENT_THREAD();int n;if (me->flags & _ST_FL_INTERRUPT) {me->flags &= ~_ST_FL_INTERRUPT;errno = EINTR;return -1;}if ((*_st_eventsys->pollset_add)(pds, npds) < 0)return -1;pq.pds = pds;pq.npds = npds;pq.thread = me;pq.on_ioq = 1;_ST_ADD_IOQ(pq);if (timeout != ST_UTIME_NO_TIMEOUT)_ST_ADD_SLEEPQ(me, timeout);me->state = _ST_ST_IO_WAIT;/*主动切出协程,交出执行权。*/_ST_SWITCH_CONTEXT(me);n = 0;if (pq.on_ioq) {_ST_DEL_IOQ(pq);(*_st_eventsys->pollset_del)(pds, npds);} else {for (pd = pds; pd < epd; pd++) {if (pd->revents)n++;}}if (me->flags & _ST_FL_INTERRUPT) {me->flags &= ~_ST_FL_INTERRUPT;errno = EINTR;return -1;}return n;
}
注册需要监听的事件,然后让出CPU执行权,当事件触发后再次从_ST_SWITCH_CONTEXT返回继续处理。
int st_netfd_poll(_st_netfd_t *fd, int how, st_utime_t timeout)
{struct pollfd pd;int n;pd.fd = fd->osfd;pd.events = (short) how;pd.revents = 0;if ((n = st_poll(&pd, 1, timeout)) < 0) /*单一fd*/return -1;if (n == 0) {errno = ETIME;return -1;}if (pd.revents & POLLNVAL) {errno = EBADF;return -1;}return 0;
}
对监听一个文件描述符的封装
_st_netfd_t *st_accept(_st_netfd_t *fd, struct sockaddr *addr, int *addrlen, st_utime_t timeout)
{int osfd, err;_st_netfd_t *newfd;/*执行accept函数,如果没有client连接,则accept立即返回。*/while ((osfd = accept(fd->osfd, addr, (socklen_t *)addrlen)) < 0) {if (errno == EINTR)continue;if (!_IO_NOT_READY_ERROR)return NULL;/*进入poll函数,注册读事件,同时让出CPU的执行权,等待读事件触发。*/if (st_netfd_poll(fd, POLLIN, timeout) < 0)return NULL;}/*accept返回的client socket fd,将其进行封装。*/newfd = _st_netfd_new(osfd, 1, 1);if (!newfd) {err = errno;close(osfd);errno = err;}return newfd;
}
fd
被设置为了非阻塞,调用accept()函数后,若没有客户端请求连接,则立即从accept返回,若errno为EAGAIN或EWOULDBLOCK,说明没有客户端连接,然后执行st_netfd_poll()函数,在此函数内会为fd
注册读事件,同时会让出CPU的执行权。当fd
的读事件触发后,本协程会再次被调度从而获得CPU执行权,接着往下执行。
ssize_t st_read(_st_netfd_t *fd, void *buf, size_t nbyte, st_utime_t timeout)
{ssize_t n;while ((n = read(fd->osfd, buf, nbyte)) < 0) { /*非阻塞的读取*/if (errno == EINTR) /*被信号中断了*/continue;if (!_IO_NOT_READY_ERROR) /*不是EAGAIN或EWOULDBLOCK错误*/return -1;/*执行到这里说明发生了EAGAIN或EWOULDBLOCK错误,此时没有数据可读,让出执行权。*/if (st_netfd_poll(fd, POLLIN, timeout) < 0)return -1;}return n;
}
read的原理同accept,不再赘述。
srs源码分析2-浅析state_threads相关推荐
- srs源码分析3-srs的启动
本文分析的srs版本是0.6.0 srs源码分析1-搭建环境 srs源码分析2-浅析state_threads srs源码分析3-srs的启动 srs源码分析4-客户端的连接 srs源码分析5-han ...
- SRS源码分析-rtmp转rtc流程
前言 SRS4.0支持将RTMP流转换成RTC流,本文将结合源码分析下这个过程. 配置 首先,需要在SRS4.0的启动配置文件里面开启RTC Server和RTC 能力,可以参考官方提供的配置文件./ ...
- Hadoop 3.2.1 【 YARN 】源码分析 : DefaultContainerExecutor 浅析
一 .前言 DefaultContainerExecuter 类提供通用的container 执行服务. 负责启动Container . 是默认实现, 未提供任何权安全措施, 它以NodeManage ...
- Hadoop3.2.1 【 YARN 】源码分析 : ContainerManager浅析
一. 前言 Container启动命令是由各个ApplicationMaster通过RPC函数ContainerManagementProtocol#startContainer向NodeManage ...
- Hadoop3.2.1 【 YARN 】源码分析 : LinuxContainerExecutor 浅析 [ 一 ]
一 .前言 而LinuxContainerExecutor则以应用程序拥有者的身份启动和停止Container, 因此更加安全, 此外, LinuxContainerExecutor允许用户通过Cgr ...
- SRS(simple-rtmp-server)流媒体服务器源码分析--启动
SRS(simple-rtmp-server)流媒体服务器源码分析--系统启动 一.前言 小卒最近看SRS源码,随手写下博客,其一为了整理思路,其二也是为日后翻看方便.如果不足之处,请指教! 首先总结 ...
- SRS流媒体服务器架构设计及源码分析丨音视频开发丨C/C++音视频丨Android开发丨嵌入式开发
SRS流媒体服务器架构设计及源码分析 1.SRS流媒体服务器架构设计 2.协程-连接之间的关系 3.推流-转发-拉流之间的关系 4.如何手把手调试SRS源码 视频讲解如下,点击观看: SRS流媒体服务 ...
- SRS流媒体服务器源码分析(一):Rtmp publish流程
1.线程模型 srs使用了state-threads协程库,是单线程多协程模型. 这个协程的概念类似于lua的协程,都是单线程中可以创建多个协程.而golang中的goroutine协程是多线程并发的 ...
- Django源码分析8:单元测试test命令浅析
django源码分析 本文环境python3.5.2,django1.10.x系列 django源码分析-test命令分析 Django项目中提供了,test命令行命令来执行django的单元测试,该 ...
- Django源码分析7:migrate命令的浅析
django源码分析 本文环境python3.5.2,django1.10.x系列 django源码分析-migrate命令分析 Django项目中提供了,通过migrations操作数据库的结构的命 ...
最新文章
- HDU 4630 No Pain No Game(树状数组)
- memcache php windows,windows系统下安装memcache
- oracle 迁移用户信息,Oracle备份一个用户并迁移
- IE6 式样表 Bug
- 代码款空题 包的使用
- UVA10033 Interpreter【模拟】
- origin调整画板大小
- 『编程题全队』Alpha 阶段冲刺博客集合
- win10 完全卸载 MYsql8.0
- Windows 10 不同版本WHQL认证驱动数字签名兼容问题
- connect etimedout php,npm error: tunneling socket could not be established, cause=connect ETIMEDOUT
- 无人驾驶-控制-阿克曼模型
- item_password-获得淘口令真实url接口,淘宝app短链接商品接口,1688商品淘口令url接口
- 来,我教你怎么优雅删除数据
- NRF52840学习历程(四)定时器
- 啁啾信号chirp(扫频余弦信号)
- 无人机飞行控制实验平台
- nginx php 0xc0000409,卡巴斯基2016导致爱奇艺PPS影音客户端播放故障
- 广州市车联网车联网先导区 V2X 云控基础平台技术规范
- 程序包lombok不存在