本文分析的state-threads的版本是1.9

srs源码分析1-搭建环境

srs源码分析2-浅析state_threads

srs源码分析3-srs的启动

srs源码分析4-客户端的连接

srs源码分析5-handshake

srs源码分析6-connect

以下正在写作中。。。

srs源码分析7-create stream

srs源码分析8-推流-publish

srs源码分析9-推流-unpublish

srs源码分析10-拉流-play

srs源码分析11-拉流-pause

srs源码分析12-转发-forward


srs是基于协程开发的,底层使用了state_threads协程库。为了更好的理解srs,所以需要先熟悉state_threads。这里并不会介绍协程的相关概念,只是简单的介绍一下state_threads的核心逻辑。

以下state_thread会被简称为st。

使用示例-echo server

使用st实现了一个简单的echo服务器,以下代码写的很简单,重点是理解st的使用。

#include <arpa/inet.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <st.h>#define LISTEN_PORT 9000#define ERR_EXIT(m) \do {              \perror(m);      \exit(-1);       \} while (0)void *client_thread(void *arg) {st_netfd_t client_st_fd = (st_netfd_t)arg;int client_fd = st_netfd_fileno(client_st_fd);sockaddr_in client_addr;socklen_t client_addr_len = sizeof(client_addr);int ret = getpeername(client_fd, (sockaddr *)&client_addr, &client_addr_len);if (ret == -1) {printf("[WARN] Failed to get client ip: %s\n", strerror(ret));}char ip_buf[INET_ADDRSTRLEN];bzero(ip_buf, sizeof(ip_buf));inet_ntop(client_addr.sin_family, &client_addr.sin_addr, ip_buf,sizeof(ip_buf));while (1) {char buf[1024] = {0};ssize_t ret = st_read(client_st_fd, buf, sizeof(buf), ST_UTIME_NO_TIMEOUT);if (ret == -1) {printf("client st_read error\n");break;} else if (ret == 0) {printf("client quit, ip = %s\n", ip_buf);break;}printf("recv from %s, data = %s", ip_buf, buf);ret = st_write(client_st_fd, buf, ret, ST_UTIME_NO_TIMEOUT);if (ret == -1) {printf("client st_write error\n");}}
}void *listen_thread(void *arg) {while (1) {st_netfd_t client_st_fd =st_accept((st_netfd_t)arg, NULL, NULL, ST_UTIME_NO_TIMEOUT);if (client_st_fd == NULL) {continue;}printf("get a new client, fd = %d\n", st_netfd_fileno(client_st_fd));st_thread_t client_tid =st_thread_create(client_thread, (void *)client_st_fd, 0, 0);if (client_tid == NULL) {printf("Failed to st create client thread\n");}}
}int main() {int ret = st_set_eventsys(ST_EVENTSYS_ALT);if (ret == -1) {printf("st_set_eventsys use linux epoll failed\n");}ret = st_init();if (ret != 0) {printf("st_init failed. ret = %d\n", ret);return -1;}int listen_fd = socket(AF_INET, SOCK_STREAM, 0);if (listen_fd == -1) {ERR_EXIT("socket");}int reuse_socket = 1;ret = setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &reuse_socket,sizeof(int));if (ret == -1) {ERR_EXIT("setsockopt");}struct sockaddr_in server_addr;server_addr.sin_family = AF_INET;server_addr.sin_port = htons(LISTEN_PORT);server_addr.sin_addr.s_addr = INADDR_ANY;ret =bind(listen_fd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr));if (ret == -1) {ERR_EXIT("bind");}ret = listen(listen_fd, 128);if (ret == -1) {ERR_EXIT("listen");}st_netfd_t st_listen_fd = st_netfd_open_socket(listen_fd);if (!st_listen_fd) {printf("st_netfd_open_socket open socket failed.\n");return -1;}st_thread_t listen_tid =st_thread_create(listen_thread, (void *)st_listen_fd, 1, 0);if (listen_tid == NULL) {printf("Failed to st create listen thread\n");}while (1) {st_sleep(1);   /*用于让出CPU执行权,重新调度就绪的协程。*/}return 0;
}
root@learner:~/tmp/st# gcc main.cpp -lst
root@learner-Lenovo:~/tmp/st# ./a.out
get a new client, fd = 4
recv from 192.168.30.17, data = hello world
client quit, ip = 192.168.30.17
^C
root@learner:~# nc 192.168.30.17 9000
hello world
hello world
^C

创建一个listen协程,用于监听客户端的连接,当客户端连接服务后,会为此客户端创建一个client协程,用于处理此客户端的所有请求。

协程的切换

st中协程的切换提供了两种方式:一种是使用系统提供的setjmplongjmp接口,另一种是使用汇编实现的_st_md_cxt_save_st_md_cxt_restore接口,这两个函数从用法上同setjmp和longjmp。

这两种方式的切换本质上都是栈帧的切换。

setjmp和longjmp

C语言中的goto语句只能在当前函数内跳转,而不能在函数间跳转。setjmp()和longjmp()可以执行非局部跳转,即跳转的目标为当前执行函数之外的某个位置。

setjmp()函数为后续由longjmp()调用执行的跳转确立了跳转目标,该目标正是程序发起setjmp()调用的位置。从编程角度看来,调用longjmp()函数后,看起来就和从第二次调用setjmp()返回时完全一样。通过setjmp()的返回值,可以区分setjmp()调用是初始返回还是第二次返回。初始调用返回值为0,后续“伪返回”的返回值为longjmp()调用中val参数所指定的任意值。通过对val参数使用不同值,能够区分程序中跳转至同一目标的不同起跳位置。更多相关setjmp()、longjmp()的介绍,可以参考《Linux/UNIX系统编程手册》上册第106页。

以下是从《Linux/UNIX系统编程手册》摘抄的示例:

#include <stdio.h>
#include <stdlib.h>
#include <setjmp.h>jmp_buf env;void f2(int num)
{longjmp(env, num);
}void f1(int num)
{if(num == 1){longjmp(env, num);}f2(num);
}int main(int argc, char** argv)
{if(argc != 2){printf("Usage: %s [1|2]\n", argv[0]);return -1;}switch(setjmp(env)){case 0:printf("Calling f1() after initial setjmp()\n");f1(atoi(argv[1]));break;case 1:printf("We jumped back from f1()\n");break;case 2:printf("We jumped back from f2()\n");break;}return 0;
}

这个示例我稍微做了一些修改,运行结果及分析如下:

root@learner:~/tmp# ./a.out 1
Calling f1() after initial setjmp()
We jumped back from f1()

root@learner:~/tmp# ./a.out 2
Calling f1() after initial setjmp()
We jumped back from f2()

_st_md_cxt_save和_st_md_cxt_restore

这两个函数是通过汇编实现的,代码如下:

#define JB_BX  0
#define JB_SI  1
#define JB_DI  2
#define JB_BP  3
#define JB_SP  4
#define JB_PC  5.file "md.S"
.text/* _st_md_cxt_save(__jmp_buf env)              存储函数栈帧   */
.globl _st_md_cxt_save.type _st_md_cxt_save, @function.align 16
_st_md_cxt_save:movl 4(%esp), %eax                /*取得参数env的地址,保存到eax中。*/movl %ebx, (JB_BX*4)(%eax)        /*保存ebx*/movl %esi, (JB_SI*4)(%eax)        /*保存esi*/movl %edi, (JB_DI*4)(%eax)        /*保存edi*//*保存esp,即栈顶,保存的栈顶是没有调用_st_md_cxt_save()函数之前的栈顶*/leal 4(%esp), %ecx                /movl %ecx, (JB_SP*4)(%eax)        /*保存ecx*/movl 0(%esp), %ecx                      movl %ecx, (JB_PC*4)(%eax)        /*保存引用计数器pc*/movl %ebp, (JB_BP*4)(%eax)        /*保存ebp 即调用_st_md_cxt_save()的函数的ebp*/xorl %eax, %eax                   /*清空eax 作为_st_md_cxt_save()的返回值*/ret
.size _st_md_cxt_save, .-_st_md_cxt_save/* _st_md_cxt_restore(__jmp_buf env, int val)     恢复函数栈帧  */
.globl _st_md_cxt_restore.type _st_md_cxt_restore, @function.align 16
_st_md_cxt_restore:movl 4(%esp), %ecx            /*获取第一个参数的地址,即env的地址。*/movl 8(%esp), %eax            /*获取第二个参数的地址,即val的地址。*/movl (JB_PC*4)(%ecx), %edx    /*将原pc寄存器的值保存到edx中*/movl (JB_BX*4)(%ecx), %ebx    /*恢复ebx*/movl (JB_SI*4)(%ecx), %esi    /*恢复esi*/movl (JB_DI*4)(%ecx), %edi    /*恢复edi*/movl (JB_BP*4)(%ecx), %ebp    /*恢复ebp*/movl (JB_SP*4)(%ecx), %esp    /*恢复esp*/testl %eax, %eax              /*测试eax的值是否为0,也就是第二个参数是否为0。*/jnz  1f                       /*如果第二个参数不为0,则直接跳转到1:执行。*/incl %eax                     /*将返回值置为1*/1: jmp *%edx                  /*跳转到之前pc处*/
.size _st_md_cxt_restore, .-_st_md_cxt_restore

_st_md_cxt_save(__jmp_buf env)用于保存栈帧,_st_md_cxt_restore(__jmp_buf env, int val)用于恢复栈帧。

st中协程的切换宏

#if defined(MD_USE_BUILTIN_SETJMP) && !defined(USE_LIBC_SETJMP)   #define MD_SETJMP(env) _st_md_cxt_save(env)#define MD_LONGJMP(env, val) _st_md_cxt_restore(env, val)extern int _st_md_cxt_save(jmp_buf env);extern void _st_md_cxt_restore(jmp_buf env, int val);
#else#define MD_SETJMP(env) setjmp(env)  #define MD_LONGJMP(env, val) longjmp(env, val)
#endif

如果定义了MD_USE_BUILTIN_SETJMP宏,且没有定义USE_LIBC_SETJMP宏,则使用自定义的栈帧存取函数。否则使用系统提供的setjmp和longjmp切换栈帧。

#define _ST_SWITCH_CONTEXT(_thread)       \ST_BEGIN_MACRO                        \ST_SWITCH_OUT_CB(_thread);            \if (!MD_SETJMP((_thread)->context)) { \   /*调出协程返回0,调入协程返回1。*/ _st_vp_schedule();                \   /*选择下一个需要调度的协程*/}                                     \ST_DEBUG_ITERATE_THREADS();           \ST_SWITCH_IN_CB(_thread);             \ST_END_MACRO

_ST_SWITCH_CONTEXT用于将协程的CPU执行权让出去,重新调度一个新的协程。

当协程调用_ST_SWITCH_CONTEXT时,此时MD_SETJMP会返回0,则进入协程调度函数_st_vp_schedule(),CPU的执行权转移到其他协程。此时相当于在本协程中打上了一个切换点。当本协程将再次获得CPU执行权时,在_st_vp_schedule()中调用_ST_RESTORE_CONTEXT宏函数,会通过MD_SETJMP再次返回,此时返回值为1,跳过if语句返回到本协程调用_ST_SWITCH_CONTEXT的位置,继续往下执行。

#define _ST_RESTORE_CONTEXT(_thread)   \ST_BEGIN_MACRO                     \_ST_SET_CURRENT_THREAD(_thread);   \      /*标记此协程为当前运行的协程*/MD_LONGJMP((_thread)->context, 1); \      /*执行协程切换     恢复之前挂起的协程*/ST_END_MACRO

_ST_RESTORE_CONTEXT用于恢复指定的协程,通过MD_LONGJMP宏,返回到MD_SETJMP打的断点处,从MD_SETJMP再次返回,从而再次获取到CPU的执行权。

void _st_vp_schedule(void)
{_st_thread_t *thread;/*从就绪的协程队列中取出一个协程*/if (_ST_RUNQ.next != &_ST_RUNQ) {thread = _ST_THREAD_PTR(_ST_RUNQ.next);_ST_DEL_RUNQ(thread);    /*从就绪协程队列删除*/} else {   /*如果就绪的协程队列为空,说明所有的就绪协程都处理完毕了。*/thread = _st_this_vp.idle_thread;     /*现在切换至idle协程*/}ST_ASSERT(thread->state == _ST_ST_RUNNABLE);    /*该协程必须处于可运行状态*/thread->state = _ST_ST_RUNNING;     /*将即将运行协程的状态标记为正在运行*/_ST_RESTORE_CONTEXT(thread);        /*切换至新的协程*/
}

在切换协程时,会从就绪的协程队列中取出一个协程,然后切换至该协程。如果就绪队列中没有可切换的协程,则说明没有协程需要处理,此时会切换至idle协程。返回idle协程后,会重新进入epoll_wait,重新开始监听待发生的事件和处理定时事件。

调度器

所有的协程都是在一个单线程中执行的,所以需要有一个调度器来调度所有的协程,以便需要执行权限的协程能够获取到CPU。通常协程在发生读事件写事件定时器事件时才需要执行权限,也就是发生这些事件后,需要将协程调度到CPU上,让其获得CPU的执行权,处理对应的事情。

st中对读写事件的监控是通过epoll实现的,而定时器事件通过最小堆配合epoll的超时实现的。

typedef struct _st_eventsys_ops {const char *name;                          /* Name of this event system */int  val;                                  /* Type of this event system */int  (*init)(void);                        /* Initialization */void (*dispatch)(void);                    /* Dispatch function */int  (*pollset_add)(struct pollfd *, int); /* Add descriptor set */void (*pollset_del)(struct pollfd *, int); /* Delete descriptor set */int  (*fd_new)(int);                       /* New descriptor allocated */int  (*fd_close)(int);                     /* Descriptor closed */int  (*fd_getlimit)(void);                 /* Descriptor hard limit */
} _st_eventsys_t;

这是调度器的接口,可以使用epoll实现,也可以使用select和poll实现。

static _st_eventsys_t _st_epoll_eventsys = {"epoll",ST_EVENTSYS_ALT,_st_epoll_init,_st_epoll_dispatch,_st_epoll_pollset_add,_st_epoll_pollset_del,_st_epoll_fd_new,_st_epoll_fd_close,_st_epoll_fd_getlimit
};

st中通过epoll实现了调度器,实现的这些函数作为回调函数封装到了结构体中。

ST_HIDDEN void _st_epoll_dispatch(void)
{...if (_ST_SLEEPQ == NULL) {     /* 定时队列为空,说明没有定时器事件,则epoll_wait的超时时间为-1,即没有事件触发时,epoll_wait一直阻塞。*/timeout = -1;} else {                      /*从定时队列获取最小定时器*/min_timeout = (_ST_SLEEPQ->due <= _ST_LAST_CLOCK) ? 0 : (_ST_SLEEPQ->due - _ST_LAST_CLOCK);/*换算epoll_wait的超时时间  单位:us */timeout = (int) (min_timeout / 1000);
...}/*进入epoll等待事件的触发,也可能因为超时而退出。*/nfd = epoll_wait(..., ..., ..., timeout);
...pq->thread->state = _ST_ST_RUNNABLE;  /*把协程的状态设置为可运行状态*/_ST_ADD_RUNQ(pq->thread);             /*将协程添加到运行队列,等待新一轮的调度。*/
...
}

在进入epoll_wait之前,先从最小堆中获取最近一个定时器触发的时间,将此时间作为epoll_wait的超时时间,如果在这个超时时间之内发生了读写事件,则epoll_wait返回处理读写事件;如果段超时时间之内没有发生读写事件,epoll_wait会因为超时而退出,此时返回正好处理定时事件。

若不是因为超时而从epoll_wait返回,说明有的协程读写事件触发了,此时需要将触发事件的协程保存到可运行队列中,等待新一轮的调度。

创建协程

_st_thread_t *st_thread_create(void *(*start)(void *arg), void *arg, int joinable, int stk_size)
{_st_thread_t *thread;_st_stack_t *stack;void **ptds;char *sp;/* Adjust stack size   调整栈大小*/if (stk_size == 0)stk_size = ST_DEFAULT_STACK_SIZE;    /*默认栈大小是128KB*//*页大小对齐*/stk_size = ((stk_size + _ST_PAGE_SIZE - 1) / _ST_PAGE_SIZE) * _ST_PAGE_SIZE;    /*申请栈空间*/stack = _st_stack_new(stk_size);if (!stack)return NULL;sp = stack->stk_top;                     /*栈顶*/sp = sp - (ST_KEYS_MAX * sizeof(void *));/*栈顶空出一块区域,用于存放私有的数据。*/ptds = (void **) sp;sp = sp - sizeof(_st_thread_t);          /*再空出一个_st_thread_t大小*/thread = (_st_thread_t *) sp;if ((unsigned long)sp & 0x3f)sp = sp - ((unsigned long)sp & 0x3f);stack->sp = sp - _ST_STACK_PAD_SIZE;     /*栈顶再空出128字节的填充区域*/memset(thread, 0, sizeof(_st_thread_t));memset(ptds, 0, ST_KEYS_MAX * sizeof(void *));thread->private_data = ptds;     /*指向协程私有数据*/thread->stack = stack;           /*指向协程栈*/thread->start = start;           /*协程入口函数*/thread->arg = arg;               /*入口函数参数*//*保存切换上下文,打上还原点,当本协程下次获取到执行权限时,从这个还原点接着执行。*/_ST_INIT_CONTEXT(thread, stack->sp, _st_thread_main);/*如果需要主动回收协程,则需要协程创建一个条件变量,用于阻塞等待回收协程。*/if (joinable) {     thread->term = st_cond_new();if (thread->term == NULL) {_st_stack_free(thread->stack);return NULL;}}thread->state = _ST_ST_RUNNABLE;   /*标记协程为可运行状态*/_st_active_count++;                /*增加活跃协程的个数*/_ST_ADD_RUNQ(thread);              /*将协程插入到运行队列*/return thread;
}

创建一个新的协程,在创建的过程中,会将这个协程放到可运行队列,等待着调度。在调度到这个新的协程时,就可以获得CPU的执行权。

除了主协程外,其他协程的栈都是在堆上申请的空间,默认大小时128KB。

#define _ST_INIT_CONTEXT MD_INIT_CONTEXT#define MD_INIT_CONTEXT(_thread, _sp, _main) \ST_BEGIN_MACRO                           \if (MD_SETJMP((_thread)->context))       \ /*设置还原点,或从还原点返回。*/_main();                           \ MD_GET_SP(_thread) = (long) (_sp);       \ /*设置ctx中sp寄存器的值,设置新的栈帧*/ST_END_MACRO

在创建新协程时,会通过上面的宏函数设置还原点,当执行到MD_SETJMP时,会返回0,此时_main()函数不会被执行。当协程再次获取执行权时,会再次从MD_SETJMP返回,此时返回值为1,则进入_main()函数,也就是_st_thread_main()函数。

void _st_thread_main(void)
{_st_thread_t *thread = _ST_CURRENT_THREAD();      /*获取当前协程的句柄*/MD_CAP_STACK(&thread);thread->retval = (*thread->start)(thread->arg);   /*执行协程入口函数*/st_thread_exit(thread->retval);                   /*协程退出*/
}

新的协程创建后,并不会立即被执行,需要先打上还原点,然后放入可执行队列中。当调度器调度到这个新线程后才会真正获取到CPU的执行权,在MD_SETJMP返回后,进入这个函数,在此函数中才会进入协程的入口函数。协程入口函数处理完毕后,会进入协程退出函数,这个稍后分析。

st的初始化

int st_init(void)
{_st_thread_t *thread;if (_st_active_count) {                /*如果已经初始化,则直接返回。*/return 0;}st_set_eventsys(ST_EVENTSYS_DEFAULT);  /*设置epoll封装的接口 */if (_st_io_init() < 0)return -1;memset(&_st_this_vp, 0, sizeof(_st_vp_t));/*三个队列的初始化*/ST_INIT_CLIST(&_ST_RUNQ);        /*可运行队列*/ST_INIT_CLIST(&_ST_IOQ);         /*io队列*/ST_INIT_CLIST(&_ST_ZOMBIEQ);     /*僵尸态队列*/if ((*_st_eventsys->init)() < 0)  /*epoll的初始化*/return -1;_st_this_vp.pagesize = getpagesize();     /*页大小*/_st_this_vp.last_clock = st_utime();      /*时钟时间*//* 创建一个idle协程 */_st_this_vp.idle_thread = st_thread_create(_st_idle_thread_start, NULL, 0, 0);if (!_st_this_vp.idle_thread)return -1;_st_this_vp.idle_thread->flags = _ST_FL_IDLE_THREAD;    /*标识为idle协程*/_st_active_count--;      _ST_DEL_RUNQ(_st_this_vp.idle_thread);    /*从可运行队列中删除idle协程*//*为主协程封装一个_st_thread_t */thread = (_st_thread_t *) calloc(1, sizeof(_st_thread_t) + (ST_KEYS_MAX * sizeof(void *)));    if (!thread)return -1;thread->private_data = (void **) (thread + 1);    /*指向协程私有数据*/thread->state = _ST_ST_RUNNING;                   /*设置协程为可运行状态*/thread->flags = _ST_FL_PRIMORDIAL;                /*标识为主协程*/_ST_SET_CURRENT_THREAD(thread);       /*设置当前工作的协程*/_st_active_count++;                   /*增加活跃协程个数*/return 0;
}

在使用st时,首先需要调用st_init()函数对st进行初始化。这个函数有三个作用:1、做一些初始化工作 2、创建idle协程 3、将主线程封装为主协程

主线程也是一条可执行流,需要将主线程封装成主协程,以便能够在调度器中进行调度。

idle协程是非常核心的,当就绪队列中没有可运行的协程时,会将CPU的执行权限调度到idle协程。在idle协程中重新开始监听读、写、定时器事件。

void *_st_idle_thread_start(void *arg)
{_st_thread_t *me = _ST_CURRENT_THREAD();while (_st_active_count > 0) {_ST_VP_IDLE();                /*进入epoll_wait,监听读写事件*/_st_vp_check_clock();         /*处理定时器事件*/me->state = _ST_ST_RUNNABLE;  /*将idle线程标记为可运行状态*/_ST_SWITCH_CONTEXT(me);       /*让出CPU执行权,重新开始调度。*/}exit(0);return NULL;
}

当就绪队列为空时,调度会进入idle线程,在idle线程中,会进入epoll_wait监听读写事件,有读写事件触发时,会将协程保存到就绪队列中;从epoll_wait返回后,查看是否有定时器触发,若有定时器触发,则将协程保存到就绪队列中。处理完读写事件和定时器事件后,idle协程让出CPU执行权,开始依次调度所有的就绪协程,所有的就绪协程处理完毕后,会再次进入idle协程,之后都是这样循环往复。

#define _ST_VP_IDLE()                   (*_st_eventsys->dispatch)()

_st_eventsys->dispatch是回调函数,这个函数指针实际指向_st_epoll_dispatch。

void _st_vp_check_clock(void)
{_st_thread_t *thread;st_utime_t now;now = st_utime();        /*获取当前时间*/_ST_LAST_CLOCK = now;if (_st_curr_time && now - _st_last_tset > 999000) {_st_curr_time = time(NULL);_st_last_tset = now;}while (_ST_SLEEPQ != NULL) {    /*睡眠队列不为空*/thread = _ST_SLEEPQ;        /*获取最小堆上的最小的定时器*/ST_ASSERT(thread->flags & _ST_FL_ON_SLEEPQ);if (thread->due > now)    break;                  /*协程的定时器还没有到,立即返回。*//*协程的定时器触发了*/_ST_DEL_SLEEPQ(thread);  /*从睡眠队列中删除*//*协程是因为条件变量而睡眠的,现在条件变量超时了。*/if (thread->state == _ST_ST_COND_WAIT)    thread->flags |= _ST_FL_TIMEDOUT;     ST_ASSERT(!(thread->flags & _ST_FL_IDLE_THREAD));thread->state = _ST_ST_RUNNABLE;    /*标记协程为可运行状态*/_ST_INSERT_RUNQ(thread);            /*将协程送至就绪队列,等待调度。*/}
}

从epoll_wait返回后,检查睡眠队列中的协程,当其定时器到了,则将协程送至就绪队列,等待新一轮的调度。

所有的定时器都放在最小堆中,从最小堆中获取到的是所有定时器的最小值。如果当前时间超过了最小堆中的定时器,说明定时器触发了。通过while循环将最小堆中的所有该触发的定时器全部都保存到就绪队列中。

协程的exit、join和yield

_st_thread_t *st_thread_create(void *(*start)(void *arg), void *arg, int joinable, int stk_size)
{...if (joinable) {     /*如果协程需要主动回收,则为协程创建一个条件变量。*/thread->term = st_cond_new();        /*创建条件变量*/if (thread->term == NULL) {_st_stack_free(thread->stack);return NULL;}}
...
}

在创建协程的时候,需要指明是否会主动回收协程。如果需要主动回收协程,则需要为协程创建一个条件变量,以便其他协程阻塞的回收该协程。

void _st_thread_main(void)
{_st_thread_t *thread = _ST_CURRENT_THREAD();      /*获取当前协程的句柄*/MD_CAP_STACK(&thread);thread->retval = (*thread->start)(thread->arg);   /*执行协程入口函数*/st_thread_exit(thread->retval);                   /*退出协程*/
}

当协程的主体函数执行完毕后,会进入st_thread_exit函数,用于退出协程。

void st_thread_exit(void *retval)
{_st_thread_t *thread = _ST_CURRENT_THREAD();    /*获取当前协程句柄*/thread->retval = retval;        /*保存返回值*/_st_thread_cleanup(thread);     /*释放协程的私有数据*/_st_active_count--;             /*活跃协程数减一*/if (thread->term) {    /*如果需要主动回收此协程*/thread->state = _ST_ST_ZOMBIE;     /*设置协程为僵尸态*/_ST_ADD_ZOMBIEQ(thread);           /*添加到僵尸态队列*/st_cond_signal(thread->term);      /*通知阻塞等待回收的协程*/_ST_SWITCH_CONTEXT(thread);        /*让出执行权*/st_cond_destroy(thread->term);     /*销毁条件变量*/thread->term = NULL;}/*如果是主协程,则无需释放其对应的栈,否则释放在堆上申请的栈空间。*/if (!(thread->flags & _ST_FL_PRIMORDIAL))_st_stack_free(thread->stack);_ST_SWITCH_CONTEXT(thread);            /*销毁完毕让出CPU执行权*/
}

若协程是主协程,则无需释放堆空间,否则需要释放在堆上申请的用于栈的空间。thread->term不为NULL,说明这个协程需要主动的回收,此时需要将协程设置为僵尸态,并加入僵尸态队列。同时通知阻塞等待回收的协程。

int st_thread_join(_st_thread_t *thread, void **retvalp)
{_st_cond_t *term = thread->term;         /*获取协程的条件变量*/if (term == NULL) {                  errno = EINVAL;return -1;}if (_ST_CURRENT_THREAD() == thread) {    /*不能是当前协程*/errno = EDEADLK;return -1;}/*不能多个线程回收同时回收同一个协程*/if (term->wait_q.next != &term->wait_q) {errno = EINVAL;return -1;}/*如果协程的状态不是僵尸态,则用于回收的线程将进入条件变量等待。*/while (thread->state != _ST_ST_ZOMBIE) {if (st_cond_timedwait(term, ST_UTIME_NO_TIMEOUT) != 0)return -1;}if (retvalp)*retvalp = thread->retval;       /*获取待回收协程的返回值*/thread->state = _ST_ST_RUNNABLE;     /*将待回收的协程设置为可运行状态*/_ST_DEL_ZOMBIEQ(thread);             /*从僵尸态队列删除*/_ST_ADD_RUNQ(thread);                /*加入就绪运行队列*/return 0;
}

协程在回收其他协程,此时待回收的协程还没有退出,主动回收的协程将进入条件变量等待。当待回收的协程退出时,会激活条件变量上的协程。

主动回收的协程从条件变量返回后,此时待回收的协程处于僵尸态,获取返回值后,此时需要再次将待回收的协程置为可运行状态,并加入就绪运行队列。待回收协程会再次进入st_thread_exit()函数,从_ST_SWITCH_CONTEXT返回,主动销毁条件变量和栈空间,最后通过_ST_SWITCH_CONTEXT让出执行权,这时协程才算退出。

void st_thread_yield()
{_st_thread_t *me = _ST_CURRENT_THREAD();    /*获取当前协程句柄*//*检查是否有定时器事件触发*/_st_vp_check_clock();/*就绪队列为空,则直接返回。*/if (_ST_RUNQ.next == &_ST_RUNQ) {return;}me->state = _ST_ST_RUNNABLE;  /*将本协程标记为可运行状态*/_ST_ADD_RUNQ(me);             /*把本协程添加到就绪队列中*//*将执行权切换给就绪队列中的其他协程*/_ST_SWITCH_CONTEXT(me);
}

协程在运行的过程中,可以主动的让出执行权。在让出执行权的时候,需要将自己主动加入到就绪队列中,等待再次被调度。

socket的处理

int st_poll(struct pollfd *pds, int npds, st_utime_t timeout)
{struct pollfd *pd;struct pollfd *epd = pds + npds;     /*指向数组末尾*/_st_pollq_t pq;_st_thread_t *me = _ST_CURRENT_THREAD();int n;if (me->flags & _ST_FL_INTERRUPT) {me->flags &= ~_ST_FL_INTERRUPT;errno = EINTR;return -1;}if ((*_st_eventsys->pollset_add)(pds, npds) < 0)return -1;pq.pds = pds;pq.npds = npds;pq.thread = me;pq.on_ioq = 1;_ST_ADD_IOQ(pq);if (timeout != ST_UTIME_NO_TIMEOUT)_ST_ADD_SLEEPQ(me, timeout);me->state = _ST_ST_IO_WAIT;/*主动切出协程,交出执行权。*/_ST_SWITCH_CONTEXT(me);n = 0;if (pq.on_ioq) {_ST_DEL_IOQ(pq);(*_st_eventsys->pollset_del)(pds, npds);} else {for (pd = pds; pd < epd; pd++) {if (pd->revents)n++;}}if (me->flags & _ST_FL_INTERRUPT) {me->flags &= ~_ST_FL_INTERRUPT;errno = EINTR;return -1;}return n;
}

注册需要监听的事件,然后让出CPU执行权,当事件触发后再次从_ST_SWITCH_CONTEXT返回继续处理。

int st_netfd_poll(_st_netfd_t *fd, int how, st_utime_t timeout)
{struct pollfd pd;int n;pd.fd = fd->osfd;pd.events = (short) how;pd.revents = 0;if ((n = st_poll(&pd, 1, timeout)) < 0)     /*单一fd*/return -1;if (n == 0) {errno = ETIME;return -1;}if (pd.revents & POLLNVAL) {errno = EBADF;return -1;}return 0;
}

对监听一个文件描述符的封装

_st_netfd_t *st_accept(_st_netfd_t *fd, struct sockaddr *addr, int *addrlen, st_utime_t timeout)
{int osfd, err;_st_netfd_t *newfd;/*执行accept函数,如果没有client连接,则accept立即返回。*/while ((osfd = accept(fd->osfd, addr, (socklen_t *)addrlen)) < 0) {if (errno == EINTR)continue;if (!_IO_NOT_READY_ERROR)return NULL;/*进入poll函数,注册读事件,同时让出CPU的执行权,等待读事件触发。*/if (st_netfd_poll(fd, POLLIN, timeout) < 0)return NULL;}/*accept返回的client socket fd,将其进行封装。*/newfd = _st_netfd_new(osfd, 1, 1);if (!newfd) {err = errno;close(osfd);errno = err;}return newfd;
}

fd被设置为了非阻塞,调用accept()函数后,若没有客户端请求连接,则立即从accept返回,若errno为EAGAIN或EWOULDBLOCK,说明没有客户端连接,然后执行st_netfd_poll()函数,在此函数内会为fd注册读事件,同时会让出CPU的执行权。当fd的读事件触发后,本协程会再次被调度从而获得CPU执行权,接着往下执行。

ssize_t st_read(_st_netfd_t *fd, void *buf, size_t nbyte, st_utime_t timeout)
{ssize_t n;while ((n = read(fd->osfd, buf, nbyte)) < 0) {   /*非阻塞的读取*/if (errno == EINTR)            /*被信号中断了*/continue;if (!_IO_NOT_READY_ERROR)      /*不是EAGAIN或EWOULDBLOCK错误*/return -1;/*执行到这里说明发生了EAGAIN或EWOULDBLOCK错误,此时没有数据可读,让出执行权。*/if (st_netfd_poll(fd, POLLIN, timeout) < 0)return -1;}return n;
}

read的原理同accept,不再赘述。

srs源码分析2-浅析state_threads相关推荐

  1. srs源码分析3-srs的启动

    本文分析的srs版本是0.6.0 srs源码分析1-搭建环境 srs源码分析2-浅析state_threads srs源码分析3-srs的启动 srs源码分析4-客户端的连接 srs源码分析5-han ...

  2. SRS源码分析-rtmp转rtc流程

    前言 SRS4.0支持将RTMP流转换成RTC流,本文将结合源码分析下这个过程. 配置 首先,需要在SRS4.0的启动配置文件里面开启RTC Server和RTC 能力,可以参考官方提供的配置文件./ ...

  3. Hadoop 3.2.1 【 YARN 】源码分析 : DefaultContainerExecutor 浅析

    一 .前言 DefaultContainerExecuter 类提供通用的container 执行服务. 负责启动Container . 是默认实现, 未提供任何权安全措施, 它以NodeManage ...

  4. Hadoop3.2.1 【 YARN 】源码分析 : ContainerManager浅析

    一. 前言 Container启动命令是由各个ApplicationMaster通过RPC函数ContainerManagementProtocol#startContainer向NodeManage ...

  5. Hadoop3.2.1 【 YARN 】源码分析 : LinuxContainerExecutor 浅析 [ 一 ]

    一 .前言 而LinuxContainerExecutor则以应用程序拥有者的身份启动和停止Container, 因此更加安全, 此外, LinuxContainerExecutor允许用户通过Cgr ...

  6. SRS(simple-rtmp-server)流媒体服务器源码分析--启动

    SRS(simple-rtmp-server)流媒体服务器源码分析--系统启动 一.前言 小卒最近看SRS源码,随手写下博客,其一为了整理思路,其二也是为日后翻看方便.如果不足之处,请指教! 首先总结 ...

  7. SRS流媒体服务器架构设计及源码分析丨音视频开发丨C/C++音视频丨Android开发丨嵌入式开发

    SRS流媒体服务器架构设计及源码分析 1.SRS流媒体服务器架构设计 2.协程-连接之间的关系 3.推流-转发-拉流之间的关系 4.如何手把手调试SRS源码 视频讲解如下,点击观看: SRS流媒体服务 ...

  8. SRS流媒体服务器源码分析(一):Rtmp publish流程

    1.线程模型 srs使用了state-threads协程库,是单线程多协程模型. 这个协程的概念类似于lua的协程,都是单线程中可以创建多个协程.而golang中的goroutine协程是多线程并发的 ...

  9. Django源码分析8:单元测试test命令浅析

    django源码分析 本文环境python3.5.2,django1.10.x系列 django源码分析-test命令分析 Django项目中提供了,test命令行命令来执行django的单元测试,该 ...

  10. Django源码分析7:migrate命令的浅析

    django源码分析 本文环境python3.5.2,django1.10.x系列 django源码分析-migrate命令分析 Django项目中提供了,通过migrations操作数据库的结构的命 ...

最新文章

  1. HDU 4630 No Pain No Game(树状数组)
  2. memcache php windows,windows系统下安装memcache
  3. oracle 迁移用户信息,Oracle备份一个用户并迁移
  4. IE6 式样表 Bug
  5. 代码款空题 包的使用
  6. UVA10033 Interpreter【模拟】
  7. origin调整画板大小
  8. 『编程题全队』Alpha 阶段冲刺博客集合
  9. win10 完全卸载 MYsql8.0
  10. Windows 10 不同版本WHQL认证驱动数字签名兼容问题
  11. connect etimedout php,npm error: tunneling socket could not be established, cause=connect ETIMEDOUT
  12. 无人驾驶-控制-阿克曼模型
  13. item_password-获得淘口令真实url接口,淘宝app短链接商品接口,1688商品淘口令url接口
  14. 来,我教你怎么优雅删除数据
  15. NRF52840学习历程(四)定时器
  16. 啁啾信号chirp(扫频余弦信号)
  17. 无人机飞行控制实验平台
  18. nginx php 0xc0000409,卡巴斯基2016导致爱奇艺PPS影音客户端播放故障
  19. 广州市车联网车联网先导区 V2X 云控基础平台技术规范
  20. 程序包lombok不存在

热门文章

  1. 开发与研发:区别很大(上)
  2. 鼠标双击成了查看属性是怎么回事?怎样解决?
  3. 2020年执业药师考试,5个锦囊助你做好最后冲刺!
  4. 以编程会安全,以安全辅未来——2017看雪安全开发者峰会 强势来袭!
  5. 人脸识别访客系统解决方案
  6. 蓝桥杯0027 通信密码
  7. 联想微型计算机怎么恢复系统,联想一体机系统还原的方法 联想一体机如何还原系统...
  8. 数据处理 | 一些野路子
  9. jQuery 遍历 - closest() 方法 is()方法
  10. 外贸企业如何选ERP管理软件