skynet入口解析

  • skynet入口
    • skynet总体架构
      • skynet入口函数
        • 具体会有如下线程:
        • 网络线程工作流程:
        • 工作线程工作流程:
        • 定时器线程工作流程:
        • 监视器线程工作流程:
      • 一、skynet网络线程入口
      • 二、skynet工作线程入口
      • 三、skynet定时器线程入口
      • 四、skynet监视器线程入口
  • 如何查找skynet的C源码位置

skynet入口

skynet总体架构

skynet是一个多线程的服务端架构。

skynet入口函数

总入口

void
skynet_start(struct skynet_config * config) {// register SIGHUP for log file reopenstruct sigaction sa;sa.sa_handler = &handle_hup;sa.sa_flags = SA_RESTART;sigfillset(&sa.sa_mask);sigaction(SIGHUP, &sa, NULL);if (config->daemon) {if (daemon_init(config->daemon)) {exit(1);}}skynet_harbor_init(config->harbor);skynet_handle_init(config->harbor);skynet_mq_init();skynet_module_init(config->module_path);skynet_timer_init();skynet_socket_init();skynet_profile_enable(config->profile);struct skynet_context *ctx = skynet_context_new(config->logservice, config->logger);if (ctx == NULL) {fprintf(stderr, "Can't launch %s service\n", config->logservice);exit(1);}skynet_handle_namehandle(skynet_context_handle(ctx), "logger");bootstrap(ctx, config->bootstrap);start(config->thread);// harbor_exit may call socket send, so it should exit before socket_freeskynet_harbor_exit();skynet_socket_free();if (config->daemon) {daemon_exit(config->daemon);}
}

创建上下文环境

static void
bootstrap(struct skynet_context * logger, const char * cmdline) {int sz = strlen(cmdline);char name[sz+1];char args[sz+1];int arg_pos;sscanf(cmdline, "%s", name);  arg_pos = strlen(name);if (arg_pos < sz) {while(cmdline[arg_pos] == ' ') {arg_pos++;}strncpy(args, cmdline + arg_pos, sz);} else {args[0] = '\0';}struct skynet_context *ctx = skynet_context_new(name, args);if (ctx == NULL) {skynet_error(NULL, "Bootstrap error : %s\n", cmdline);skynet_context_dispatchall(logger);exit(1);}
}

创建线程:创建一个socket、timer、monitor线程和n个工作线程。工作线程的个数由启动时配置的参数决定。

static void
start(int thread) {pthread_t pid[thread+3];struct monitor *m = skynet_malloc(sizeof(*m));memset(m, 0, sizeof(*m));m->count = thread;m->sleep = 0;m->m = skynet_malloc(thread * sizeof(struct skynet_monitor *));int i;for (i=0;i<thread;i++) {m->m[i] = skynet_monitor_new();}if (pthread_mutex_init(&m->mutex, NULL)) {fprintf(stderr, "Init mutex error");exit(1);}if (pthread_cond_init(&m->cond, NULL)) {fprintf(stderr, "Init cond error");exit(1);}create_thread(&pid[0], thread_monitor, m);create_thread(&pid[1], thread_timer, m);create_thread(&pid[2], thread_socket, m);static int weight[] = { -1, -1, -1, -1, 0, 0, 0, 0,1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, };struct worker_parm wp[thread];for (i=0;i<thread;i++) {wp[i].m = m;wp[i].id = i;if (i < sizeof(weight)/sizeof(weight[0])) {wp[i].weight= weight[i];} else {wp[i].weight = 0;}create_thread(&pid[i+3], thread_worker, &wp[i]);}for (i=0;i<thread+3;i++) {pthread_join(pid[i], NULL); }free_monitor(m);
}
具体会有如下线程:
  1. 一个网络线程
  2. 多个工作线程(可通过配置修改,取值一般是CPU的核心数)
  3. 一个定时器线程
  4. 一个监视器线程
网络线程工作流程:
  1. 调用多路复用API创建epoll管理多个客户端socket
  2. 当收到客户端消息,把消息插入到客户端对应的服务的消息队列中。
工作线程工作流程:
  1. 从全局队列获取有消息要处理的服务
  2. 把服务从全局队列中删掉,然后处理服务里的消息队列的消息。注意消息队列里的每个消息都会创建一个协程来处理,提高当前线程的并发性
定时器线程工作流程:
  1. 线程每隔一段时间休眠一次
  2. 线程唤醒后,查看是否有到期的消息,如果有,把到期的消息插入到对应的服务的消息队列中
监视器线程工作流程:
  1. 线程每隔5秒休眠一次
  2. 线程唤醒后,查看工作线程是否陷入死循环。(注:每个工作线程对应一个监视)

一、skynet网络线程入口

网络线程入口函数如下,调用函数socket_server_poll创建多路复用IO监听socket IO事件。

//skynet_start.c
static void *
thread_socket(void *p) {struct monitor * m = p;skynet_initthread(THREAD_SOCKET);for (;;) {int r = skynet_socket_poll();if (r==0)break;if (r<0) {CHECK_ABORTcontinue;}wakeup(m,0);}return NULL;
}

socket_server_poll函数触发获得事件后,根据返回值事件类型type,调用消息创建函数forward_message创建消息。

//skynet_socket.c
int
skynet_socket_poll() {struct socket_server *ss = SOCKET_SERVER;assert(ss);struct socket_message result;int more = 1;int type = socket_server_poll(ss, &result, &more);switch (type) {case SOCKET_EXIT:return 0;case SOCKET_DATA:forward_message(SKYNET_SOCKET_TYPE_DATA, false, &result);break;case SOCKET_CLOSE:forward_message(SKYNET_SOCKET_TYPE_CLOSE, false, &result);break;case SOCKET_OPEN:forward_message(SKYNET_SOCKET_TYPE_CONNECT, true, &result);break;case SOCKET_ERR:forward_message(SKYNET_SOCKET_TYPE_ERROR, true, &result);break;case SOCKET_ACCEPT:forward_message(SKYNET_SOCKET_TYPE_ACCEPT, true, &result);break;case SOCKET_UDP:forward_message(SKYNET_SOCKET_TYPE_UDP, false, &result);break;case SOCKET_WARNING:forward_message(SKYNET_SOCKET_TYPE_WARNING, false, &result);break;default:skynet_error(NULL, "Unknown socket message type %d.",type);return -1;}if (more) {return -1;}return 1;
}

在创建消息函数中,构建struct skynet_message message消息对象,调用skynet_context_push函数将消息插入服务的消息队列。

//skynet_socket.c
static void
forward_message(int type, bool padding, struct socket_message * result) {struct skynet_socket_message *sm;size_t sz = sizeof(*sm);if (padding) {if (result->data) {size_t msg_sz = strlen(result->data);if (msg_sz > 128) {msg_sz = 128;}sz += msg_sz;} else {result->data = "";}}sm = (struct skynet_socket_message *)skynet_malloc(sz);sm->type = type;sm->id = result->id;sm->ud = result->ud;if (padding) {sm->buffer = NULL;memcpy(sm+1, result->data, sz - sizeof(*sm));} else {sm->buffer = result->data;}struct skynet_message message;message.source = 0;message.session = 0;message.data = sm;message.sz = sz | ((size_t)PTYPE_SOCKET << MESSAGE_TYPE_SHIFT);if (skynet_context_push((uint32_t)result->opaque, &message)) {// todo: report somewhere to close socket// don't call skynet_socket_close here (It will block mainloop)skynet_free(sm->buffer);skynet_free(sm);}
}

具体将信息插入服务的消息队列代码如下,注意加锁解锁操作,因为涉及多线程操作该消息队列(worker线程要从队列拿出信息消费,网络线程要插入信息到队列)

int
skynet_context_push(uint32_t handle, struct skynet_message *message) {struct skynet_context * ctx = skynet_handle_grab(handle);if (ctx == NULL) {return -1;}skynet_mq_push(ctx->queue, message);skynet_context_release(ctx);return 0;
}

二、skynet工作线程入口

skynet工作线程入口如下,调用函数skynet_context_message_dispatch从全局队列拿出服务然后消费,返回值q如果为NULL表示目前没有服务有消息要处理,然后通过条件变量+互斥锁阻塞当前的工作线程。

//skynet_start.c
static void *skynet_start.c
thread_worker(void *p) {struct worker_parm *wp = p;int id = wp->id;int weight = wp->weight;struct monitor *m = wp->m;struct skynet_monitor *sm = m->m[id];skynet_initthread(THREAD_WORKER);struct message_queue * q = NULL;while (!m->quit) {q = skynet_context_message_dispatch(sm, q, weight);if (q == NULL) {if (pthread_mutex_lock(&m->mutex) == 0) {++ m->sleep;// "spurious wakeup" is harmless,// because skynet_context_message_dispatch() can be call at any time.if (!m->quit)pthread_cond_wait(&m->cond, &m->mutex);-- m->sleep;if (pthread_mutex_unlock(&m->mutex)) {fprintf(stderr, "unlock mutex error");exit(1);}}}}return NULL;
}

消息处理逻辑如下。在for循环里,处理消息队列里指定数目的消息。调用函数dispatch_message,执行在服务skynet.start时候绑定的ctx->cb回调函数。在该回调函数中,会创建一个协程去执行,具体可以跟一下skynet.start创建回调函数的具体代码。

//skynet_server.c
struct message_queue *
skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q, int weight) {if (q == NULL) {q = skynet_globalmq_pop();if (q==NULL)return NULL;}uint32_t handle = skynet_mq_handle(q);struct skynet_context * ctx = skynet_handle_grab(handle);if (ctx == NULL) {struct drop_t d = { handle };skynet_mq_release(q, drop_message, &d);return skynet_globalmq_pop();}int i,n=1;struct skynet_message msg;for (i=0;i<n;i++) {if (skynet_mq_pop(q,&msg)) {skynet_context_release(ctx);return skynet_globalmq_pop();} else if (i==0 && weight >= 0) {n = skynet_mq_length(q);n >>= weight;}int overload = skynet_mq_overload(q);if (overload) {skynet_error(ctx, "May overload, message queue length = %d", overload);}skynet_monitor_trigger(sm, msg.source , handle);if (ctx->cb == NULL) {skynet_free(msg.data);} else {dispatch_message(ctx, &msg);}skynet_monitor_trigger(sm, 0,0);}assert(q == ctx->queue);struct message_queue *nq = skynet_globalmq_pop();if (nq) {// If global mq is not empty , push q back, and return next queue (nq)// Else (global mq is empty or block, don't push q back, and return q again (for next dispatch)skynet_globalmq_push(q);q = nq;} skynet_context_release(ctx);return q;
}

三、skynet定时器线程入口

skynet定时器入口如下,就是在for死循环中,调用skynet_updatetimeskynet_socket_updatetime函数更新时间,然后调用usleep系统接口每隔一段时间阻塞睡眠当前线程,当CPU重新调度到该线程后,调用函数signal_hup处理到期时间逻辑。

//skynet_start.c
static void *
thread_timer(void *p) {struct monitor * m = p;skynet_initthread(THREAD_TIMER);for (;;) {skynet_updatetime();skynet_socket_updatetime();CHECK_ABORTwakeup(m,m->count-1);usleep(2500);if (SIG) {signal_hup();SIG = 0;}}// wakeup socket threadskynet_socket_exit();// wakeup all worker threadpthread_mutex_lock(&m->mutex);m->quit = 1;pthread_cond_broadcast(&m->cond);pthread_mutex_unlock(&m->mutex);return NULL;
}

处理到期时间逻辑如下,调用函数skynet_handle_findname获取到期的时间信息,然后调用函数skynet_context_push把到期时间的信息重新插入到其对应的服务的消息队列当中。

static void
signal_hup() {// make log file reopenstruct skynet_message smsg;smsg.source = 0;smsg.session = 0;smsg.data = NULL;smsg.sz = (size_t)PTYPE_SYSTEM << MESSAGE_TYPE_SHIFT;uint32_t logger = skynet_handle_findname("logger");if (logger) {skynet_context_push(logger, &smsg);}
}

四、skynet监视器线程入口

skynet监视器入口如下,就是在for死循环中,每隔5秒钟,调用skynet_monitor_check检查对应工作线程是否死循环(注:每个工作线程对应一个监视,因此m->count就是工作线程的个数)。

static void *
thread_monitor(void *p) {struct monitor * m = p;int i;int n = m->count;skynet_initthread(THREAD_MONITOR);for (;;) {CHECK_ABORTfor (i=0;i<n;i++) {skynet_monitor_check(m->m[i]);}for (i=0;i<5;i++) {CHECK_ABORTsleep(1);}}return NULL;
}

每个工作线程对应一个skynet_monitor结构,当工作线程处理消息前用skynet_monitor记录消息的来源和目标。处理完清除。监控线程每隔五秒检测。

struct skynet_monitor {int version;int check_version;uint32_t source;uint32_t destination;
};//监控线程检测是否陷入死循环
void
skynet_monitor_check(struct skynet_monitor *sm) {if (sm->version == sm->check_version) {if (sm->destination) {skynet_context_endless(sm->destination);skynet_error(NULL, "A message from [ :%08x ] to [ :%08x ] maybe in an endless loop (version = %d)", sm->source , sm->destination, sm->version);}} else {sm->check_version = sm->version;}
}
//工作线程记录消息执行记录
skynet_monitor_trigger(struct skynet_monitor *sm, uint32_t source, uint32_t destination) {sm->source = source;sm->destination = destination;ATOM_INC(&sm->version);
}
//调用消息回调前
skynet_monitor_trigger(sm, msg.source , handle);
//调用消息回调后
skynet_monitor_trigger(sm, 0,0);

如何判断是否陷入死循环?

假设 version = 0,check_version = 0. 工作线程处理消息前 在skynet_monitor_trigger中sm->version = sm->version + 1,五秒后监控线程执行sm->check_version = sm->version; version = 1,check_version = 1.五秒后,监控线程再次检测,如果在五秒内工作线程执行完消息那么工作线程会执行skynet_monitor_trigger sm->version = sm->version + 1 .version = 2,check_version = 1.没执行完则version = 1,check_version = 1,sm->destination != 0.则认为消息的执行超过了五秒,可能陷入死循环。

如何查找skynet的C源码位置

lua代码中像如下的源文件怎么查找呢

local socketdriver = require "skynet.socketdriver"

由于lua代码无法进行IO操作,因此lua的IO操作都是调用C接口进行的。

因此如上比较底层的需要和socket IO打交道的代码,一定是在C源码中。

我们可以利用全局搜索,如下所示。注意规律,前面加luaopen_,skynet和后面的库名之间的".“替换为”_"

luaopen_skynet_socketdriver

lua代码中像如下的源文件怎么查找呢

local c = require "skynet.core"

我们可以利用全局搜索,如下所示。注意规律,前面加luaopen_,skynet和后面的库名之间的".“替换为”_"

luaopen_skynet_core

【skynet】skynet入口解析相关推荐

  1. VNPY程序入口解析

    VNPY程序入口解析 一.run.py 二.引擎对象 一.run.py from vnpy.event import EventEngine from vnpy.trader.engine impor ...

  2. skynet源码解析(三)——启动流程

    对于你不了解的框架或者引擎,介绍再多的逻辑结构都好像有点茫然的感觉.所以小编认为,最有效的方式就是搞清楚框架启动流程的步骤,让自己心中有一条线可以牵引着. 当你在终端输入./skeynet examp ...

  3. skynet源码解析(一)——编译和运行skynet

    要想认识一个框架,首先要做的就是让它跑起来. skynet是一款基于C跟lua的开源服务端并发框架,这个框架是单进程多线程模型,主要应用于游戏服务端领域,是lua大神云风所写的.本文不涉及框架过多的理 ...

  4. 云风Skynet——skynet非官方网站

    http://skynetclub.github.io/ skynet非官方网站 skynet是云风编写的服务端底层管理框架,底层由C编写,配套lua作为脚本使用,可换python等其他脚本语言.sk ...

  5. skynet原理解析

    一.消息队列 上图摘自Actor模型解析,每个Actor都有一个专用的MailBox来接收消息,这也是Actor实现异步的基础.当一个Actor实例向另外一个Actor发消息的时候,并非直接调用Act ...

  6. 【Skynet】Skynet项目-球球作战实例

    Skynet项目-球球作战实例 一.拓扑结构 1.1 各服务功能 1.2 消息流程 1.3 设计要点 二.目录结构 2.1 项目根目录 2.2 service目录 2.3 lualib目录 2.4 l ...

  7. 【Skynet】Skynet入门实例

    Skynet入门实例 一.下载和编辑 二.运行解析 三.理解skynet 3.1 配置文件说明: 3.2 目录结构: 四.skynetAPI 五.skynet实例程序 4.1 PingPong 4.2 ...

  8. skynet源码赏析

    skynet源码赏析 对于skynet,需要屡清楚的几个问题 skynet本质上解决什么问题? skynet有哪些基本的数据结构? skynet有几类线程,他们分别的作用是什么? skynet如何启动 ...

  9. skynet cjson

    Skynet早期拥有lua-cjson库,后被sproto取代.由于Lua5.3开始支持整形,但cjson并没有适配Lua5.3,若直接编译会造成JSON字符串中的数字转换为浮点数的问题.因此风云为c ...

最新文章

  1. Linux中的文件描述符与打开文件之间的关系
  2. angularjs教程网址
  3. C# XML格式化显示
  4. 教育部正式宣布:9年义务教育大变动!与孩子息息相关
  5. redis学习-列表(list)常用命令
  6. My sql 常用函数
  7. atitit.窗体静听esc退出本窗体java swing c# .net php
  8. Julia: MFDCCA和MFCCA算法代码
  9. catia 二次开发:高版本的catia vba项目在低版本的catia上运行,报错
  10. c++调用powershell_告别 Windows 终端的难看难用,从改造 PowerShell 的外观开始
  11. MINI2440 TD35 P35触摸屏不能使用? 让我们来把一线触控改四线触控
  12. java高级工程师个人简历模板
  13. 微单相机和单反相机的区别?摄影入门第一课
  14. u盘安装centos8黑屏_求助啊为何装centos7一点安装就黑屏
  15. 线性代数笔记8:矩阵的对角化
  16. 计算机软件的著作权和专利权法律保护资料
  17. 八. geotrellis使用 矢量数据栅格化
  18. Twitter账号老被封?一文教会你怎么养号
  19. 模电(二)半导体二极管
  20. JZOJ2020年8月11日提高组T3 页

热门文章

  1. 数据库基础理论二——模式分解为主要导向
  2. 更新安全补丁后无法读取查询导入Excel问题解决说明书
  3. Unknown custom element did you register the component correctly
  4. 为什么说网络安全是风口行业?是IT行业最后的红利?
  5. MySQL拼接函数使用介绍
  6. 英语语法第二节(句子成分)
  7. 织梦安装,访问http://域名/install/index.php 出现空白页或Not Found 问题
  8. 计算机考试每一抖音的分数是,抖音权重1到10是什么意思?有什么用?
  9. 企业版Docker4——如何通过阿里云的镜像加速器快速拉取镜像到本地
  10. php调用layer alert弹窗