spp worker Demo

spp_rpc/src/worker/main.cpp
main函数执行入口:

#include "defaultworker.h"
#include "comm_def.h"using namespace spp;
using namespace spp::worker;CServerBase* g_worker = NULL;// ...int main(int argc, char* argv[])
{// 生成workerg_worker = new CDefaultWorker;if (g_worker){// worker 运行并且阻塞g_worker->run(argc, argv);// worker 结束运行释放内存delete g_worker;}return 0;
}

spp_rpc/src/worker/defaultworker.h

namespace spp
{namespace worker{class CDefaultWorker : public CServerBase, public CFrame{public:// 构造函数+析构函数// 获取是否启用微线程bool get_mt_flag();// 微线程切换函数void handle_switch(bool block);// 重写CServerBase的realrun函数void realrun(int argc, char* argv[]);// 定义服务类型// 注册spp框架信号处理函数void assign_signal(int signo);// 框架循环调用的逻辑,用于config reloadint loop();//初始化配置int initconf(bool reload = false);static void shm_delay_stat(int64_t time_delay){if(time_delay <= 1){MONITOR(MONITOR_WORKER_RECV_DELAY_1);}else if(time_delay <= 10){MONITOR(MONITOR_WORKER_RECV_DELAY_10);}else if(time_delay <= 50){MONITOR(MONITOR_WORKER_RECV_DELAY_50);}else if(time_delay <= 100){MONITOR(MONITOR_WORKER_RECV_DELAY_100);}else{MONITOR(MONITOR_WORKER_RECV_DELAY_XXX);}}//一些回调函数static int ator_recvdata(unsigned flow, void* arg1, void* arg2);    //必要static int ator_recvdata_v2(unsigned flow, void* arg1, void* arg2); //必要static int ator_senddata(unsigned flow, void* arg1, void* arg2);    //非必要static int ator_overload(unsigned flow, void* arg1, void* arg2);    //非必要static int ator_senderror(unsigned flow, void* arg1, void* arg2);   //必要//接受者CTCommu* ator_;inline int get_TOS(){return TOS_;}private:unsigned msg_timeout_;int TOS_;int notify_fd_;     // socket commu need notify mircro thread};}
}

spp_rpc/src/comm/serverbase.h

namespace spp {namespace comm {// 服务器程序基础类,包含运行环境初始化、日志、统计、监控对象class CServerBase {public:// 构造函数+析构函数// 可重写的虚函数virtual void run(int argc, char *argv[]);virtual void startup(bool bg_run = true);virtual void realrun(int argc, char *argv[]) {}// 业务名和服务类型的描述,估计会和业务监控日志以及部署相关联// 业务日志CTLog log_;// 统计CTStat stat_;// 监控CTProcMonCli moncli_;protected:// 内部监控时间间隔public:///// 服务reload退出以及相关信号处理static bool reload();static bool quit();static void sigusr1_handle(int signo);static void sigusr2_handle(int signo);};}
}

看看比较重要的realrun虚函数的实现
spp_rpc/src/worker/defaultworker.cpp

void CDefaultWorker::realrun(int argc, char* argv[])
{// 初始化配置SingleTon<CTLog, CreateByProto>::SetProto(&flog_);initconf(false);time_t  nowtime = time(NULL), montime = 0;int64_t now_ms = 0;while (true){///< start: micro thread handle loop entry add 20130715if (sppdll.spp_handle_loop){sppdll.spp_handle_loop(this);   }            ///< end: micro thread handle loop entry 20130715// == 0 时,表示没取到请求,进入较长时间异步等待bool isBlock = (ator_->poll(false) == 0);static CSyncFrame* sync_frame = CSyncFrame::Instance();sync_frame->HandleSwitch(isBlock);// Check and reconnect net proxy, default 10 ms now_ms = get_time_ms();// 检查quit信号if (unlikely(CServerBase::quit()) || unlikely(CServerBase::reload())){now_ms = get_time_ms();// 保证剩下的请求都处理完if (unlikely(CServerBase::quit())){        flog_.LOG_P_PID(LOG_FATAL, "recv quit signal at %u\n", now_ms);ator_->poll(true);}else{           flog_.LOG_P_PID(LOG_FATAL, "recv reload signal at %u\n", now_ms);}int timeout = 0;//微线程while (CSyncFrame::Instance()->GetThreadNum() > 1 && timeout < 1000){CSyncFrame::Instance()->sleep(10000);timeout += 10;}now_ms = get_time_ms();flog_.LOG_P_PID(LOG_FATAL, "exit at %u\n", now_ms);break;}//监控信息上报nowtime = time(NULL);if ( unlikely(nowtime - montime > ix_->moni_inter_time_) ){CLI_SEND_INFO(&moncli_)->timestamp_ = nowtime;moncli_.run();montime = nowtime;flog_.LOG_P_PID(LOG_DEBUG, "moncli run!\n");}loop();}g_check_point = CoreCheckPoint_HandleFini;           // 设置待调用插件的CheckPointif (sppdll.spp_handle_fini != NULL)// 调用spp_handle_fini函数释放资源sppdll.spp_handle_fini(NULL, this);g_check_point = CoreCheckPoint_SppFrame;                // 恢复CheckPoint,重置为CoreCheckPoint_SppFrameCStatMgrInstance::destroy();
}

ator_ 类型为CTCommu*,负责接受信号。

        //通讯类抽象接口class CTCommu{public:CTCommu() {memset(func_list_, 0, sizeof(cb_func) *(CB_TIMEOUT + 1));memset(func_args_, 0, sizeof(void*) *(CB_TIMEOUT + 1));}virtual ~CTCommu() {}//初始化//config:配置文件名或者配置参数内存指针virtual int init(const void* config) = 0;//轮询,收发数据//block: true表示使用阻塞模式,否则非阻塞模式virtual int poll(bool block = false) = 0;//发送数据提交//flow: 数据包唯一标示//arg1: 通用参数指针1, 一般指向数据blob//arg2: 通用参数指针2,保留virtual int sendto(unsigned flow, void* arg1, void* arg2) = 0;//控制接口//flow: 数据包唯一标示//type: 控制命令//arg1: 通用参数指针1,具体组件有具体的含义//arg2: 通用参数指针2,具体组件有具体的含义virtual int ctrl(unsigned flow, ctrl_type type, void* arg1, void* arg2) = 0;//注册回调//type: 回调函数类型//func: 回调函数//args: 用户自定义参数指针, 作为回调函数的第2个通用参数传递virtual int reg_cb(cb_type type, cb_func func, void* args = NULL) {if (type <= CB_TIMEOUT) {func_list_[type] = func;func_args_[type] = args;return 0;} else {return -1;}}//清空所有共享内存队列,仅供proxy启动时使用virtual int clear() = 0;protected:cb_func func_list_[CB_TIMEOUT + 1];void* func_args_[CB_TIMEOUT + 1];//释放资源virtual void fini() = 0;};

ator_->poll(false) 按照注解说明,使用非阻塞模式接受数据。
那么ator 具体实现是initconf中
spp_rpc/src/worker/defaultworker.cpp
类比golang的gpm模型

// 使用CTShmCommu实现
ator_ = new CTShmCommu;
ret = ator_->init(&shm);// 注册了一些数据操作的回调
...// 调用了spp_handle_init函数
handle_init_ret = sppdll.spp_handle_init((void*)module_etc.c_str(), this);

spp_rpc/src/comm/tbase/tshmcommu.cpp

有空再分析如何实现poll。
[poll 实现](https://github.com/Tencent/MSEC/blob/master/document/msec/cpp_dev_manual.md)

spp_rpc/src/comm/serverbase.cpp
run->startup+realrun

void CServerBase::run(int argc, char* argv[])
{// 启动参数解析startup(true);realrun(argc, argv);
}// 信号处理
void CServerBase::startup(bool bg_run)
{//默认需要root权限才能setrlimitstruct rlimit rlim;if (0 == getrlimit(RLIMIT_NOFILE, &rlim)){rlim.rlim_cur = rlim.rlim_max;setrlimit(RLIMIT_NOFILE, &rlim);if (rlim.rlim_cur < 100000)     // fix for limits over 100000{rlim.rlim_cur = 100000;rlim.rlim_max = 100000;setrlimit(RLIMIT_NOFILE, &rlim);}}mallopt(M_MMAP_THRESHOLD, 1024*1024);   // 1MB,防止频繁mmapmallopt(M_TRIM_THRESHOLD, 8*1024*1024); // 8MB,防止频繁brksignal(SIGHUP, SIG_IGN);signal(SIGPIPE, SIG_IGN);signal(SIGTTOU, SIG_IGN);signal(SIGTTIN, SIG_IGN);signal(SIGCHLD, SIG_IGN);if (bg_run){signal(SIGINT,  SIG_IGN);signal(SIGTERM, SIG_IGN);daemon(1, 1);}CServerBase::flag_ = 0;//signal(SIGSEGV, CServerBase::sigsegv_handle);signal(SIGUSR1, CServerBase::sigusr1_handle);signal(SIGUSR2, CServerBase::sigusr2_handle);
}

启动流程大体走一遍了,那么接受到数据会进行怎样的回调呢?
spp_rpc/src/worker/defaultworker.h

            // 回调入口static int ator_recvdata(unsigned flow, void* arg1, void* arg2);    //必要static int ator_recvdata_v2(unsigned flow, void* arg1, void* arg2); //必要static int ator_senderror(unsigned flow, void* arg1, void* arg2);   //必要

spp_rpc/src/worker/defaultworker.cpp

int CDefaultWorker::ator_recvdata_v2(unsigned flow, void* arg1, void* arg2)
{blob_type* blob = (blob_type*)arg1;CDefaultWorker* worker = (CDefaultWorker*)arg2;if (likely(blob->len > 0)){TConnExtInfo* ptr = NULL;MONITOR(MONITOR_WORKER_FROM_PROXY);blob->len -= sizeof(TConnExtInfo);blob->extdata = blob->data + blob->len;ptr = (TConnExtInfo*)blob->extdata;int64_t recv_ms = int64_t(ptr->recvtime_) * 1000 + ptr->tv_usec / 1000;int64_t now = get_time_ms();int64_t time_delay = now - recv_ms;worker->fstat_.op(WIDX_MSG_SHM_TIME, time_delay);add_memlog(blob->data, blob->len);// 超时处理worker->flog_.LOG_P_FILE(LOG_DEBUG, "ator recvdone, flow:%u, blob len:%d\n", flow, blob->len);worker->fstat_.op(WIDX_SHM_RX_BYTES, blob->len); // 累加接收字节数g_check_point = CoreCheckPoint_HandleProcess;           // 设置待调用插件的CheckPoint// 调用spp_handle_process函数int ret = sppdll.spp_handle_process(flow, arg1, arg2);g_check_point = CoreCheckPoint_SppFrame;                // 恢复CheckPoint,重置为CoreCheckPoint_SppFrameif (likely(!ret)){MONITOR(MONITOR_WORKER_PROC_SUSS);return 0;}else{MONITOR(MONITOR_WORKER_PROC_FAIL);CTCommu* commu = (CTCommu*)blob->owner;blob_type rspblob;rspblob.len = 0;rspblob.data = NULL;commu->sendto(flow, &rspblob, NULL);}}return -1;
}

具体几个spp函数的实现:
spp_rpc/src/module/example/simple/echo_example.cpp


/*** Tencent is pleased to support the open source community by making MSEC available.** Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.** Licensed under the GNU General Public License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. You may * obtain a copy of the License at**     https://opensource.org/licenses/GPL-2.0** Unless required by applicable law or agreed to in writing, software distributed under the * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,* either express or implied. See the License for the specific language governing permissions* and limitations under the License.*///必须包含spp的头文件
#include "sppincl.h"
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>//格式化时间输出
char *format_time( time_t tm);//初始化方法(可选实现)
//arg1: 配置文件
//arg2: 服务器容器对象
//返回0成功,非0失败
extern "C" int spp_handle_init(void* arg1, void* arg2)
{//插件自身的配置文件const char* etc = (const char*)arg1;//服务器容器对象CServerBase* base = (CServerBase*)arg2;base->log_.LOG_P_PID(LOG_DEBUG, "spp_handle_init, config:%s, servertype:%d\n", etc, base->servertype());return 0;
}//数据接收(必须实现)
//flow: 请求包标志
//arg1: 数据块对象
//arg2: 服务器容器对象
//返回值:> 0 表示数据已经接收完整且该值表示数据包的长度
// == 0 表示数据包还未接收完整
// < 0 负数表示出错,将会断开连接
extern "C" int spp_handle_input(unsigned flow, void* arg1, void* arg2)
{//数据块对象,结构请参考tcommu.hblob_type* blob = (blob_type*)arg1;//extinfo有扩展信息TConnExtInfo* extinfo = (TConnExtInfo*)blob->extdata;//服务器容器对象CServerBase* base = (CServerBase*)arg2;base->log_.LOG_P(LOG_DEBUG, "spp_handle_input[recv time:%s] flow:%d, buffer len:%d, client ip:%s\n",format_time(extinfo->recvtime_),flow,blob->len,inet_ntoa(*(struct in_addr*)&extinfo->remoteip_));return blob->len;
}//路由选择(可选实现)
//flow: 请求包标志
//arg1: 数据块对象
//arg2: 服务器容器对象
//返回值表示worker的组号
extern "C" int spp_handle_route(unsigned flow, void* arg1, void* arg2)
{//服务器容器对象CServerBase* base = (CServerBase*)arg2;base->log_.LOG_P_FILE(LOG_DEBUG, "spp_handle_route, flow:%d\n", flow);return 1;
}//数据处理(必须实现)
//flow: 请求包标志
//arg1: 数据块对象
//arg2: 服务器容器对象
//返回0表示成功,非0失败(将会主动断开连接)
extern "C" int spp_handle_process(unsigned flow, void* arg1, void* arg2)
{//数据块对象,结构请参考tcommu.hblob_type* blob = (blob_type*)arg1;//数据来源的通讯组件对象CTCommu* commu = (CTCommu*)blob->owner;//extinfo有扩展信息TConnExtInfo* extinfo = (TConnExtInfo*)blob->extdata;//服务器容器对象CServerBase* base = (CServerBase*)arg2;base->log_.LOG_P_PID(LOG_DEBUG, "spp_handle_process[recv time:%s] flow:%d, buffer len:%d, client ip:%s\n",format_time(extinfo->recvtime_),flow,blob->len,inet_ntoa(*(struct in_addr*)&extinfo->remoteip_));//echo logicint ret = commu->sendto(flow, arg1, arg2 );if (ret != 0){base->log_.LOG_P_PID(LOG_ERROR, "send response error, ret:%d\n", ret);return ret;}//if all responses were sent, send a NULL blob to release flow//blob_type release_cmd;//release_cmd.data = NULL;//release_cmd.len = 0;//commu->sendto(flow, &release_cmd, arg2);return 0;
}//析构资源(可选实现)
//arg1: 保留参数
//arg2: 服务器容器对象
extern "C" void spp_handle_fini(void* arg1, void* arg2)
{//服务器容器对象CServerBase* base = (CServerBase*)arg2;base->log_.LOG_P_PID(LOG_DEBUG, "spp_handle_fini\n");
}char *format_time( time_t tm)
{static char str_tm[1024];struct tm tmm;memset(&tmm, 0, sizeof(tmm) );localtime_r((time_t *)&tm, &tmm);snprintf(str_tm, sizeof(str_tm), "[%04d-%02d-%02d %02d:%02d:%02d]",tmm.tm_year + 1900, tmm.tm_mon + 1, tmm.tm_mday,tmm.tm_hour, tmm.tm_min, tmm.tm_sec);return str_tm;
}

spp worker 源码分析相关推荐

  1. 3supervisor启动worker源码分析-worker.clj

    supervisor启动worker源码分析-worker.clj supervisor通过调用sync-processes函数来启动worker,关于sync-processes函数的详细分析请参见 ...

  2. supervisor启动worker源码分析-worker.clj

    supervisor通过调用sync-processes函数来启动worker,关于sync-processes函数的详细分析请参见"storm启动supervisor源码分析-superv ...

  3. Nginx源码分析:master/worker工作流程概述

    nginx源码分析 nginx-1.11.1 参考书籍<深入理解nginx模块开发与架构解析> Nginx的master与worker工作模式 在生成环境中的Nginx启动模式基本都是以m ...

  4. celery源码分析-worker初始化分析(下)

    celery源码分析 本文环境python3.5.2,celery4.0.2,django1.10.x系列 celery的worker启动 在上文中分析到了Hub类的初始化,接下来继续分析Pool类的 ...

  5. caffe.proto源码分析

    一什么是protocol buffer 二caffeproto中的几个重要数据类型 三caffeproto源码 分析caffe源码,看首先看caffe.proto,是明智的选择.好吧,我不是创造者,只 ...

  6. Yolov3Yolov4网络结构与源码分析

    Yolov3&Yolov4网络结构与源码分析 从2018年Yolov3年提出的两年后,在原作者声名放弃更新Yolo算法后,俄罗斯的Alexey大神扛起了Yolov4的大旗. 文章目录 论文汇总 ...

  7. Spark源码分析之七:Task运行(一)

    在Task调度相关的两篇文章<Spark源码分析之五:Task调度(一)>与<Spark源码分析之六:Task调度(二)>中,我们大致了解了Task调度相关的主要逻辑,并且在T ...

  8. kazoo源码分析:Zookeeper客户端start概述

    kazoo源码分析 kazoo-2.6.1 kazoo客户端 kazoo是一个由Python编写的zookeeper客户端,实现了zookeeper协议,从而提供了Python与zookeeper服务 ...

  9. Nginx源码分析:epoll事件处理模块概述

    nginx源码分析 nginx-1.11.1 参考书籍<深入理解nginx模块开发与架构解析> 事件处理模块概述 Nginx的高效请求的处理依赖于事件管理机制,本次默认的场景是Linux操 ...

最新文章

  1. java.util.date_关于java中java.util.Date(急)
  2. Python rstrip()方法 删除 string 字符串末尾的指定字符(默认为空格).
  3. 安装库_免费软件安装库
  4. cpu进程调度---RT Throttling【转】
  5. 2022年全球及中国豪华商业墙纸行业运营规模状况与投资产值预测报告
  6. 在 XML 中添加实体
  7. 原来每天喝它有助于大脑开发?
  8. 【高斯和拉普拉斯为什么分别对应L2和L1?】差分隐私系统学习记录(五)
  9. Hive 之 用户自定义函数 UDF UDAF UDTF
  10. MFC 获取其他窗口的Edit文本和单击Button
  11. python知识点总结_20211231
  12. 3d激光雷达开发(ransac的思想)
  13. 5年级用计算机器探索规律,《小数除法》用计算器探索规律
  14. css 链接设计,css将超链接a设计成按钮样式实例
  15. win10分辨率不能调整_win10常规问题解决方案
  16. Vue提高20 日期选择器插件
  17. 抱歉,我又可以了。。。
  18. 给想去阿里面试的同学一些意见
  19. Ubuntu22.04分区设置
  20. GMK4045-ASEMI光伏逆变器二极管GMK4045

热门文章

  1. 图片免费转pdf图片、图片免费转成word、图片免费转excel表格
  2. 最近朋友民间借贷起诉,聊天记录内容过多,聊天长截图需要处理成A4纸上,方便打印
  3. SketchUp:SketchUp草图大师软件界面介绍之详细攻略
  4. RDF查询语言SPARQL
  5. 第一届LeetCode刷题打卡赢现金活动开始啦,助力每一位想拿大厂offer的小伙伴!
  6. Vue禁用button
  7. 高分7(GF7)卫星数据制作DEM
  8. 设计师常用设计尺寸有哪些
  9. iOS启动优化之——如何使用Xcode Log、App Launch、代码来计算启动时间 Launch Time
  10. Cortex内核的比较(M3和M4)