文章目录

  • 前言
    • 1. 背景
    • 2. 特性
  • 一、编译运行
    • 1. 编译
    • 2. 运行
  • 二、整体框架
    • 运行框架
    • 代码框架
      • 源码目录
      • 生成代码
    • 与svrkit的比较
  • 三、network
    • socket流
      • streambuf
        • 自定义缓冲区
        • 输入缓冲区
        • 输出缓冲区
      • BlockTcpStreamBuf
      • BlockTcpStream
      • BlockTcpUtils
      • Timer
    • ucontext协程
      • ucontext库
      • UThreadContext 协程基类
      • UThreadStackMemory 协程私有栈
      • UThreadContextSystem 协程实现
      • UThreadRuntime 协程调度
      • __uthread 协程操作重载
      • UThreadEpollScheduler Epoll驱动的协程调度器
  • 四、rpc 半同步半异步服务模型
    • 概述
    • DataFlow 数据中心,线程间数据交互
    • Worker 工作线程
    • WorkPool 工作线程池
    • HshaServerStat 统计线程
    • HshaServerQos 过载保护机制
    • HshaServerUnit 工作单元
    • HshaServerIO IO线程
    • HshaServerAcceptor Accept线程
    • HshaServer server服务对象
  • 五. sample 模板代码
    • 请求分发
    • 接口代码
    • 客户端
    • 配置文件
  • 六、总结
  • 参考文献

前言

1. 背景

  • PhxRPC是微信后台团队推出的一个非常简洁小巧的RPC框架,可以看作是微信内部svrkit框架的简化版。在此基础上又实现了PhxSQL和PhxQueue等开源框架。
  • 通过学习PhxRPC可以学到微信内部优秀的框架实现经验(快速拒绝、C++协程、HSHA、Back Request等)。
  • 看这个项目的初衷是因为内部的svrkit代码太多,想先从简单的下手。不过这个项目貌似也很久没维护过了,网上资料也不多,这里是自己学习过程的一些记录,写的一般,敬请斧正。

2. 特性

  • 使用Protobuf作为IDL,网络通信则使用HTTP协议。
  • 基于Protobuf自动生成Client和Server接口,不过因为基于的是HTTP协议,理论上也可以支持HTTP请求。
  • 服务端采用半同步半异步模式,多个独立IO线程,每个IO线程对应一个工作线程池,IO线程与工作线程通过内存队列交互。
  • 使用Epoll管理请求,堆实现的定时器来处理过期连接。
  • 提供过载保护机制,快速拒绝过载请求(内部过载保护机制复杂的多,可参考文献[4])。
  • 基于ucontext实现的协程库(svrkit使用的是libco)。
  • 基于lambda函数并发访问Server,实现Backup Requests 模式。
  • 只支持多线程不支持多进程(这里估计是为了简化实现)。

一、编译运行

1. 编译

PhxRPC依赖第三方库Protobuf和boost,不过这些操作都打包在了build.sh中,直接一键编译。

git clone https://github.com/Tencent/phxrpc.git
./build.sh

2. 运行

  1. 执行server
./search_main -c search_server.conf
  1. 执行client :
./search_tool_main -c search_client.conf -f PHXEcho -s "hello"

二、整体框架

运行框架

PhxRPC主要由四部分组成。

  • 基于输入输出流的socket函数族。
  • 基于HTTP和Protobuf协议的信息收发流。
  • 基于Epoll和协程的IO调度中心,任务通过UthreadEpoll调度中心调度。
  • 半同步半异步网络框架,一个accept线程,多个IO线程,每个IO线程对应一个工作线程池,IO线程和Worker线程间通过队列交互。

主要流程如下:

  1. 创建一个HshaServer对象,运行时会有一个accept线程,多个IO线程,多个worker线程,每个woker线程有多个协程。
HshaServer server(config, Dispatch, &config);
server.RunForever();
  1. server会创建多个HshaServerUnit
auto hsha_server_unit =
new HshaServerUnit(i, this, (int)worker_thread_count_per_io,config.GetWorkerUThreadCount(), worker_uthread_stack_size,               dispatch, args);

每个HshaServerUnit对应一个io线程和一个worker线程池。

class HshaServerUnit {...private:// epoll调度器UThreadEpollScheduler scheduler_;// 数据中心DataFlow data_flow_;// woker线程池WorkerPool worker_pool_;// io线程HshaServerIO hsha_server_io_;// 用于执行io线程,对应RunFuncstd::thread thread_;
};

服务采取半同步半异步模式,accept逻辑运行在主线程,同步读取新的连接,异步执行IO和Worker逻辑。IO线程和Accept线程通过accepted_fd_list_队列交互,IO线程和Worker线程通过DataFlow队列交互。

  1. UThreadEpollScheduler通过协程和Epoll调度任务,Epoll会监听管道fd读端,当IO线程或者Worker线程需要执行调度任务时会向管道写端写数据,这会激活Epoll,从而激活调度器执行协程调度任务。
// 代码省略过
bool UThreadEpollScheduler::Run() {... ConsumeTodoList(); // 消费任务for (; (run_forever_) || (!runtime_.IsAllDone());) {// epoll多路复用,检查需要io的fdint nfds = epoll_wait(epoll_fd_, events, max_task_, 4);if (nfds != -1) {for (int i = 0; i < nfds; i++) {UThreadSocket_t * socket = (UThreadSocket_t*) events[i].data.ptr;socket->waited_events = events[i].events;runtime_.Resume(socket->uthread_id);}// for server mode,if (active_socket_func_ != nullptr) {UThreadSocket_t * socket = nullptr;while ((socket = active_socket_func_()) != nullptr) {runtime_.Resume(socket->uthread_id);}}// for server uthread worker,worker线程使用,执行逻辑if (handler_new_request_func_ != nullptr) {handler_new_request_func_();}// IO线程函数,对应HshaServerIO::IOFunc,接收和回复请求if (handler_accepted_fd_func_ != nullptr) {handler_accepted_fd_func_();}if (closed_) {ResumeAll(UThreadEpollREvent_Close);break;}ConsumeTodoList();                  // 消费任务DealwithTimeout(next_timeout);       // 去除定时链接} else if (errno != EINTR) {ResumeAll(UThreadEpollREvent_Error);break;}StatEpollwaitEvents(nfds);}free(events);return true;
}

代码框架

源码目录

.
├── file
│   ├── config.cpp
│   ├── config.h            // 配置类
│   ├── file_utils.cpp
│   ├── file_utils.h        // 文件类
│   ├── log_utils.cpp
│   ├── log_utils.h     // 日志类
│   ├── opt_map.cpp
│   └── opt_map.h       // 参数类
├── file.h
├── http
│   ├── Makefile
│   ├── http_client.cpp
│   ├── http_client.h       // 发送HTTP请求
│   ├── http_msg.cpp
│   ├── http_msg.h      // HTTP请求回复相关类
│   ├── http_msg_handler.cpp
│   ├── http_msg_handler.h      // HttpMessage相关基类
│   ├── http_msg_handler_factory.cpp
│   ├── http_msg_handler_factory.h  // 生成msg类
│   ├── http_protocol.cpp
│   ├── http_protocol.h     // 实现HTTP协议
│   └── test_http_client.cpp
├── msg     // 信息收发接口基类
├── network
│   ├── Makefile
│   ├── socket_stream_base.cpp
│   ├── socket_stream_base.h            // socket函数基类,基于流实现
│   ├── socket_stream_block.cpp
│   ├── socket_stream_block.h           // 阻塞的socket_stream
│   ├── socket_stream_uthread.cpp
│   ├── socket_stream_uthread.h     // 协程实现socket_stream
│   ├── timer.cpp
│   ├── timer.h             // 定时器
│   ├── uthread_context_base.cpp
│   ├── uthread_context_base.h      // 协程接口基类
│   ├── uthread_context_system.cpp
│   ├── uthread_context_system.h    // 定义协程接口
│   ├── uthread_context_util.cpp
│   ├── uthread_context_util.h      // 协程私有栈
│   ├── uthread_epoll.cpp
│   ├── uthread_epoll.h     // 基于协程的Epoll的调度
│   ├── uthread_runtime.cpp
│   └── uthread_runtime.h       // 协程调度
├── network.h
├── rpc
│   ├── Makefile
│   ├── caller.cpp              // client rpc调用
│   ├── caller.h
│   ├── client_config.cpp   // client配置
│   ├── client_config.h
│   ├── client_monitor.cpp  // 调用监控
│   ├── client_monitor.h
│   ├── hsha_server.cpp // server相关类
│   ├── hsha_server.h
│   ├── monitor_factory.cpp
│   ├── monitor_factory.h   // monitor工厂
│   ├── phxrpc.proto
│   ├── server_base.cpp
│   ├── server_base.h
│   ├── server_config.cpp
│   ├── server_config.h     // server配置
│   ├── server_monitor.cpp
│   ├── server_monitor.h    // server监控
│   ├── socket_stream_phxrpc.cpp
│   ├── socket_stream_phxrpc.h      // socket_stream封装
│   ├── thread_queue.h      // 线程池
│   ├── uthread_caller.cpp      // 基于协程的client rpc调用方法
│   └── uthread_caller.h
└── rpc.h

生成代码

.
├── Makefile
├── phxrpc_search_dispatcher.cpp
├── phxrpc_search_dispatcher.h          // 请求分发
├── phxrpc_search_service.cpp
├── phxrpc_search_service.h         // service基类
├── phxrpc_search_stub.cpp
├── phxrpc_search_stub.h            // 用于client调用方法
├── phxrpc_search_tool.cpp
├── phxrpc_search_tool.h                // tool测试
├── regen.sh                                    // 基于proto生成pb文件
├── search.proto                            // proto文件
├── search_client.conf                  // 客户端配置文件
├── search_client.cpp
├── search_client.h                     // client类,用于客户端调用
├── search_main
├── search_main.cpp                 // 主文件,运行server
├── search_server.conf                  // 配置文件
├── search_server_config.cpp
├── search_server_config.h          // 配置
├── search_service_impl.cpp
├── search_service_impl.h           // RPC方法实现的函数,业务逻辑写在这下面
├── search_tool_impl.cpp                // 测试类
├── search_tool_impl.h
├── search_tool_main
└── search_tool_main.cpp            // main方法

与svrkit的比较

开发语言 跨语言 数据编码协议 是否支持REST 线程模型 并发模型 过载保护 协程框架 日志
PhxRPC C++ X protobuf 多线程/多协程 半同步半异步 基于平均响应时间随机拒绝 基于ucontext实现 同步日志
svrkit C++ protobuf 多线程/多进程/多协程 半同步半异步 结合队列平均等待时间、cpu使用率调整通过率削减请求,并基于业务优先级和用户uin快速拒绝 libco server写共享内存,mmsvrkitagent落盘

从实现角度来看,svrkit与PhxRPC基本一样,都是半同步半异步模型,支持多线程多协程,只不过svrkit能支持多进程功能而PhxRPC不行。从过载保护、日志等角度来看,svrkit实现的更加深入。可以将PhxRPC看作将svrkit网络实现抽出来的简化版本。

三、network

socket流

phxrpc用标准输入输出封装了socket函数,使用<<>>实现读写。
继承关系如下:
BlockTcpStream->BaseTcpStream->iostream
BlockTcpStreamBuf->BaseTcpStreamBuf->streambuf

streambuf

STL相关流类图:

streambuf是一个模板、虚基类,可以派生子类以便提供其他设备/数据输入的接口。每个流(cout/cin以及本例中的BlockTcpStream)都有自己的缓冲区streambuf。可以通过rdbuf函数来获取当前的streambuf,也可以设置新的streambuf。

streambuf可以看作是一块缓冲区,用于存储数据。平常使用标准输入输出流读取字符串的时候内部都会初始化一个streambuf,作为读写数据的缓冲。

自定义缓冲区

streambuf主要有三个指针函数

  1. pbase():输出缓冲流首指针
  2. pptr():当前可写指针位置
  3. epptr():输出缓冲流尾指针

pptr != epptr时,数据默认顺序更新到缓冲区,一直到pptr=epptrunderflow函数),标识缓冲区已满,并调用overflow函数将数据发送并清空缓冲区。

输入缓冲区

int BaseTcpStreamBuf::underflow() {int ret = precv(eback(), buf_size_, 0);if (ret > 0) {setg(eback(), eback(), eback() + ret);return traits_type::to_int_type(*gptr());} else {//phxrpc::log(LOG_ERR, "ret %d errno %d,%s", ret, errno, strerror(errno));return traits_type::eof();}
}

输出缓冲区

int BaseTcpStreamBuf::sync() {int sent = 0;int total = pptr() - pbase();while (sent < total) {int ret = psend(pbase() + sent, total - sent, 0);if (ret > 0) {sent += ret;} else {//phxrpc::log(LOG_ERR, "sync ret %d errno %d,%s", ret, errno, strerror(errno));return -1;}}setp(pbase(), pbase() + buf_size_);pbump(0);return 0;
}int BaseTcpStreamBuf::overflow(int c) {if (-1 == sync()) {return traits_type::eof();} else {if (!traits_type::eq_int_type(c, traits_type::eof())) {sputc(traits_type::to_char_type(c));}return traits_type::not_eof(c);}
}

BlockTcpStreamBuf

自定义缓冲区基础上进行网络IO

ssize_t BlockTcpStreamBuf::precv(void * buf, size_t len, int flags) {return recv(socket_, buf, len, flags);
}ssize_t BlockTcpStreamBuf::psend(const void *buf, size_t len, int flags) {return send(socket_, buf, len, flags);
}

BlockTcpStream

BlockTcpStream为阻塞TCP流对象,通过Attach函数调用rdbuf()绑定streambuf,网络IO交给BlockTcpStreamBuf处理

BlockTcpUtils

BlockTcpUtils封装了网络连接的函数库

  • Open:打开一个BlockTcpStream流对象,调用SetNonBlock将socketfd设置为非阻塞
  • Listen:监听连接
  • Poll:封装poll()函数

Timer

基于最小堆的定时器。最小堆是指每个节点值小于等于其儿子的二叉树,用于寻找前K小,可以做到O(logn)复杂度淘汰过期连接。heap_up时将节点向上调整,用于插入数据。heap_down是将节点向下调整,用于删除数据。

void Timer::heap_up(const size_t end_idx) {size_t now_idx = end_idx - 1;TimerObj obj = timer_heap_[now_idx];size_t parent_idx = (now_idx - 1) / 2;while (now_idx > 0 && parent_idx >= 0 && obj < timer_heap_[parent_idx]) {timer_heap_[now_idx] = timer_heap_[parent_idx];UThreadSocketSetTimerID(*timer_heap_[now_idx].socket_, now_idx + 1);now_idx = parent_idx;parent_idx = (now_idx - 1) / 2;}timer_heap_[now_idx] = obj;UThreadSocketSetTimerID(*timer_heap_[now_idx].socket_, now_idx + 1);
}void Timer::heap_down(const size_t begin_idx) {size_t now_idx = begin_idx;TimerObj obj = timer_heap_[now_idx];size_t child_idx = (now_idx + 1) * 2;while (child_idx <= timer_heap_.size()) {if (child_idx == timer_heap_.size()|| timer_heap_[child_idx - 1] < timer_heap_[child_idx]) {child_idx--;}if (obj < timer_heap_[child_idx]|| obj == timer_heap_[child_idx]) {break;}timer_heap_[now_idx] = timer_heap_[child_idx];UThreadSocketSetTimerID(*timer_heap_[now_idx].socket_, now_idx + 1);now_idx = child_idx;child_idx = (now_idx + 1) * 2;}timer_heap_[now_idx] = obj;UThreadSocketSetTimerID(*timer_heap_[now_idx].socket_, now_idx + 1);
}

ucontext协程

协程可以看作用户级的线程,线程/进程的调度由OS内核完成,一般采用公平调度。而协程则是自己实现调度算法,在IO等待的时候切换出去,IO完成再切换回来,适合IO密集型场景。

ucontext库

协程切换的关键就是保存和恢复上下文,PhxRPC使用了ucontext库。

typedef struct ucontext {struct ucontext *uc_link;  // 指向一个上下文该上下文执行完时要恢复的上下文sigset_t         uc_sigmask;  stack_t          uc_stack;  //使用的栈mcontext_t       uc_mcontext;  //保存各种寄存器信息...
} ucontext_t;
int getcontext(ucontext_t *ucp); //将当前上下文保存到ucp
int setcontext(const ucontext_t *ucp); //切换到上下文ucp
void makecontext(ucontext_t *ucp, void (*func)(), int argc, ...); // 修改通过getcontext取得的上下文ucp,然后给该上下文指定一个栈空间ucp->stack,设置后继的上下文ucp->uc_link。上下文通过setcontext或者getcontext激活后执行函数func,然后进入继承上下文(为空则退出)。
int swapcontext(ucontext_t *oucp, ucontext_t *ucp); //保存当前上下文到oucp,切换到上下文ucp

简单的使用,获取上下文然后切换上下文,会无限输出Hello world。

#include <stdio.h>
#include <ucontext.h>
#include <unistd.h>int main(int argc, const char *argv[]){ucontext_t context;getcontext(&context);puts("Hello world");sleep(1);setcontext(&context);return 0;
}

makecontext函数使用

#include <ucontext.h>
#include <stdio.h>void print_child(void * arg)
{puts("child");
}
void context_test()
{char stack[102400];ucontext_t child, main;getcontext(&child); //获取当前上下文child.uc_stack.ss_sp = stack; //指定栈空间child.uc_stack.ss_size = sizeof(stack);//指定栈空间大小child.uc_stack.ss_flags = 0;child.uc_link = &main; //设置后继上下文makecontext(&child,(void (*)(void))print_child,0);//child上下文指向print_child函数swapcontext(&main,&child);//切换到child上下文,保存当前上下文到mainputs("main");//如果设置了后继上下文,print_child函数指向完后会返回此处
}int main()
{context_test();return 0;
}

UThreadContext 协程基类

UThreadContext是协程接口函数的基类。定义了静态函数用于创建上下文。对应子类UThreadContextSystem的DoCreate函数。

class UThreadContext {public:UThreadContext() { }virtual ~UThreadContext() { }static UThreadContext * Create(size_t stack_size, UThreadFunc_t func, void * args, UThreadDoneCallback_t callback, const bool need_stack_protect);  // 使用context_create_func_函数创建一个UThreadContext类。...
private:static ContextCreateFunc_t context_create_func_; // 静态函数,创建上下文。对应子类UThreadContextSystem的DoCreate函数。
};

UThreadStackMemory 协程私有栈

  • UThreadStackMemory定义了协程的私有栈,内存分配使用mmap。用need_protect_代表是否开启保护模式,如果开启保护模式则会在栈两段各多分配一页,并设置为PROT_NONE属性禁止访问。
  • mmap使用时设置了MAP_ANONYMOUS | MAP_PRIVATE属性,MAP_ANONYMOUS代表内存匿名映射。通常为了建立映射区需要open一个临时文件,mmap完成再ulink,使用匿名映射可以避免这个操作。MAP_PRIVATE建立了一个私有映射,对其他进程不可见。
if (need_protect) {raw_stack_ = mmap(NULL, stack_size_ + page_size * 2, PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);assert(raw_stack_ != nullptr);PHXRPC_ASSERT(mprotect(raw_stack_, page_size, PROT_NONE) == 0);PHXRPC_ASSERT(mprotect((void *)((char *)raw_stack_ + stack_size_ + page_size), page_size, PROT_NONE) == 0);stack_ = (void *)((char *)raw_stack_ + page_size);} else {raw_stack_ = mmap(NULL, stack_size_, PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);assert(raw_stack_ != nullptr);stack_ = raw_stack_;}

UThreadContextSystem 协程实现

UThreadContextSystem是协程接口函数的具体实现,本质是对ucontext库进行了一层封装。主要有三个方法:

  • Make(UThreadFunc_t func, void * args): 对本协程上下文设置执行的函数func,并将后继节点设置为主线程上下文
  • Resume: 切换到本协程上下文
  • Yield: 将本协程上下文切出

UThreadRuntime 协程调度

UThreadRuntime封装了协程的调度

__uthread 协程操作重载

  • __uthread重载了-,可以使用uthread_t增加协程调度任务。
  • uthread_s、uthread_end、uthread_s、uthread_end这个几个宏分别对应协程的开始、结束、调度以及创建。
class __uthread {public:__uthread(UThreadEpollScheduler &scheduler) : scheduler_(scheduler) { }template <typename Func>void operator-(Func const & func) {scheduler_.AddTask(func, nullptr);}private:UThreadEpollScheduler &scheduler_;
};
#define uthread_begin phxrpc::UThreadEpollScheduler _uthread_scheduler(64 * 1024, 300);
#define uthread_begin_withargs(stack_size, max_task) phxrpc::UThreadEpollScheduler _uthread_scheduler(stack_size, max_task);
#define uthread_s _uthread_scheduler
#define uthread_t phxrpc::__uthread(_uthread_scheduler)-
#define uthread_end _uthread_scheduler.Run();

UThreadEpollScheduler Epoll驱动的协程调度器

UThreadEpollScheduler封装了基于epoll多路复用的协程调度。

  • epoll_wake_up_.Run();epoll_wake_up_(EpollNotifier)本质对应一个pipe管道,执行Run函数时会将Func加入调度器,会检查管道的读端。当有任务可以执行的时候就会往管道写数据,这样可以激活epoll,类似条件变量。
  • ConsumeTodoList:读取任务队列中的任务并resume执行。
  • NotifyEpoll:往pipe写端里面写一个"a",目的是激活epoll。
  • Run:通过epoll监听响应的fd,这里的fd实际对应EpollNotifier的pipefd,如果有worker线程或者IO线程任务完成则会通过NotifyEpoll激活epoll,此时调度器会检查任务并执行对应的回调函数。
void UThreadEpollScheduler::RunForever() {run_forever_ = true;epoll_wake_up_.Run(); // 将Func加入调度器,会检查管道的读端。当有任务可以执行的时候就会往管道写数据,这样可以激活epoll,类似条件变量。Run(); // epoll_wait检查活动中的fd并resume执行
}bool UThreadEpollScheduler::Run() {... ConsumeTodoList(); // 消费任务for (; (run_forever_) || (!runtime_.IsAllDone());) {// epoll多路复用,检查需要io的fdint nfds = epoll_wait(epoll_fd_, events, max_task_, 4);if (nfds != -1) {for (int i = 0; i < nfds; i++) {UThreadSocket_t * socket = (UThreadSocket_t*) events[i].data.ptr;socket->waited_events = events[i].events;runtime_.Resume(socket->uthread_id);}// for server mode,if (active_socket_func_ != nullptr) {UThreadSocket_t * socket = nullptr;while ((socket = active_socket_func_()) != nullptr) {runtime_.Resume(socket->uthread_id);}}// for server uthread worker,worker线程使用,执行逻辑if (handler_new_request_func_ != nullptr) {handler_new_request_func_();}// IO线程函数,对应HshaServerIO::IOFunc,接收和回复请求if (handler_accepted_fd_func_ != nullptr) {handler_accepted_fd_func_();}if (closed_) {ResumeAll(UThreadEpollREvent_Close);break;}ConsumeTodoList();                 // 消费任务DealwithTimeout(next_timeout);       // 去除定时链接} else if (errno != EINTR) {ResumeAll(UThreadEpollREvent_Error);break;}StatEpollwaitEvents(nfds);}free(events);return true;
}

四、rpc 半同步半异步服务模型

概述

这一部分是核心的服务模块,包括服务的建立、协程/线程的创建、与调度器的交互。核心函数有两个:Worker::WorkerLogicHshaServerIO::IOFunc,分别代表工作线程逻辑和IO线程逻辑。在IOFunc函数执行到需要处理request请求的时候就会将协程切走给WorkerLogic,当WorkerLogic处理完request再把协程返回给IOFunc,任务调度切换通过UThreadEpollScheduler调度器完成,数据交互通过DataFlow完成。

你可能已经注意到这里的文件基本都用HsHa来作为前缀,这其实是Half-Sync/Half-Async(半同步半异步网络模型)的简写。这是一种著名的网络模型,主要结构为:异步IO层+队列层+同步处理层。同步指的是接收用户连接请求,异步指的是IO处理流程是异步的。

本框架中accept线程运行在主线程,IO线程处理IO请求是异步的,主线程与IO线程通过fd队列交互,IO线程与worker线程通过DataFlow交互,中间都有一层队列进行缓冲。

DataFlow 数据中心,线程间数据交互

数据中心,用阻塞队列(也叫线程池)in_queue_out_queue_保存了请求、回复数据。工作线程和IO线程通过DataFlow进行数据交互。

Worker 工作线程

工作线程,有线程模式和协程模式两种。

WorkPool 工作线程池

工作线程池,会启动多个工作线程,每个HshaServerUnit对应一个WorkPool

for (int i{0}; i < thread_count; ++i) {auto worker(new Worker(i, this, uthread_count_per_thread, uthread_stack_size));assert(worker != nullptr);worker_list_.push_back(worker);}

HshaServerStat 统计线程

统计运行状态,对应一个独立线程。通过HshaServerQos::CalFunc将运行状态上报到hsha_server_monitor_中。

HshaServerQos 过载保护机制

独立线程,根据运行状态来判断是否接收数据或者接收连接。

HshaServerQos::CalFunc实现了一个精简的负反馈过载保护机制,值得学习。原理是设置了一个快速拒绝比例,根据比例来拒绝请求。当系统负载高时快速拒绝比例高;系统负载降下来时快速拒绝比例降低。

  • 当平均等待时间大于预设的快速拒绝时间时,快速拒绝比例提高。
  • 当平均等待时间小于预设的快速拒绝时间时,快速拒绝比例降低。

不过这里的过载保护是对于全局进行的,实际场景中要复杂的多,比如:

  1. 有的业务会更加重要,当系统过载时应该优先满足重要业务。比如微信支付肯定要比朋友圈重要。
  2. 一个用户进行的一个操作会对应多个请求,一个请求失败整个操作也会失败。如果是随机比例拒绝,那么用户操作体验会非常差。

因此拒绝比例设置为二元维度更为合理 <business, uin>,第一维是业务,第二维是人。

详细可以参考
微信过载保护的实现原理

void HshaServerQos::CalFunc() {while (!break_out_) {unique_lock<mutex> lock(mutex_);cv_.wait_for(lock, chrono::seconds(1));// fast reject. 在每次平均耗时变化的时候调整快速拒绝比例if (hsha_server_stat_->inqueue_avg_wait_time_costs_per_second_cal_seq_!= inqueue_avg_wait_time_costs_per_second_cal_last_seq_) {// inqueue avg wait time refleshint avg_queue_wait_time = (hsha_server_stat_->inqueue_avg_wait_time_costs_per_second_+ hsha_server_stat_->outqueue_avg_wait_time_costs_per_second_) / 2;int rate = config_->GetFastRejectAdjustRate();// 过载了,提升快速拒绝比例if (avg_queue_wait_time > config_->GetFastRejectThresholdMS()) {if (enqueue_reject_rate_ != 99) {enqueue_reject_rate_ = enqueue_reject_rate_ + rate > 99 ? 99 : enqueue_reject_rate_ + rate;}} else { // 系统资源充足,拒绝比例降下来~.~if (enqueue_reject_rate_ != 0) {enqueue_reject_rate_ = enqueue_reject_rate_ - rate < 0 ? 0 : enqueue_reject_rate_ - rate;}}inqueue_avg_wait_time_costs_per_second_cal_last_seq_ =hsha_server_stat_->inqueue_avg_wait_time_costs_per_second_cal_seq_;}phxrpc::log(LOG_NOTICE, "[SERVER_QOS] accept_reject_qps %d queue_full_reject_qps %d"" fast_reject_qps %d fast_reject_rate %d",hsha_server_stat_->reject_qps_, hsha_server_stat_->queue_full_rejected_after_accepted_qps_,hsha_server_stat_->enqueue_fast_reject_qps_, enqueue_reject_rate_);}
}

HshaServerUnit 工作单元

工作单元,每个unit对应一个HshaServerIO、一个WorkerPool、一个DataFlow、一个UThreadEpollScheduler

HshaServerIO IO线程

IO线程,RPC模块最复杂的类。

首先看HsHaServerIO类,有5个函数

  • ActiveSocketFuncdata_flow_中提取就绪的fd。
  • AddAcceptedFd将已经连接的fd放入accepted_fd_list_。如果系统负载不高则唤醒调度器处理。
  • HandlerAcceptedFd会不断从accepted_fd_list_中提取fd然后加入到调度器中,并绑定IOFunc作为回调函数。
  • IOFunc执行请求IO逻辑;首先创建UThreadTcpStream关联fd,然后判断请求类型解析请求(PhxRPC目前只支持了HTTP协议);请求解析完毕以后导入data_flow_中。
    • 如果是协程模式还得调用worker_pool_->NotifyEpoll(),通知woker对应的UThreadEpollScheduler,接下来会调用UThreadWait将协程让出。
    • 同时UThreadEpollScheduler的Run函数会一直轮询现有的任务,执行回调函数handler_new_request_func_,从DataFlow拉取request并将任务push到调度器任务队列,这样会执行WorkerLogic函数,将response加入到DataFlow中。
    • WorkerLogic函数会执行 pool_->scheduler_->NotifyEpoll();,将流程转回到调度器的Run函数,执行active_socket_func_(实际对应HshaServerIO的ActiveSocketFunc)从DataFlow中取出response,最后执行Resume函数将协程转回到IOFunc并将response发送出去,完成整个数据收发流程。
  • RunForever函数绑定回调函数,并且启动Epoll调度器。
class HshaServerIO final {public:void RunForever();bool AddAcceptedFd(const int accepted_fd);void HandlerAcceptedFd();void IOFunc(int accept_fd);UThreadSocket_t *ActiveSocketFunc();...
};

RunForever函数

void HshaServerIO::RunForever() {scheduler_->SetHandlerAcceptedFdFunc(bind(&HshaServerIO::HandlerAcceptedFd, this));scheduler_->SetActiveSocketFunc(bind(&HshaServerIO::ActiveSocketFunc, this));scheduler_->RunForever();
}

HshaServerAcceptor Accept线程

监听接收client连接,工作在主线程。会循环accept新连接,通过轮询的方式将请求分发到各个工作单元上。

while (true) {struct sockaddr_in addr;socklen_t socklen = sizeof(addr);int accepted_fd{accept(listen_fd, (struct sockaddr *) &addr, &socklen)};if (accepted_fd >= 0) {idx_ %= hsha_server_->server_unit_list_.size();   // Round robin,idx_递增if (!hsha_server_->server_unit_list_[idx_++]->AddAcceptedFd(accepted_fd)) {hsha_server_->hsha_server_stat_.rejected_fds_++;log(LOG_ERR, "%s accept queue full, reject accept, fd %d", __func__, accepted_fd);close(accepted_fd);continue;}}}

小trick,Accept线程绑定在了特定CPU核,减少在不同核上调度的花费。

cpu_set_t mask;
CPU_ZERO(&mask);
CPU_SET(0, &mask);
pid_t thread_id = 0;
int ret{sched_setaffinity(thread_id, sizeof(mask), &mask)};
if (0 != ret) {printf("sched_setaffinity err\n");
}

HshaServer server服务对象

server对象,会创建多个HshaServerUnit。启动后会监听所有的新连接。

void HshaServer::RunForever() {hsha_server_acceptor_.LoopAccept(config_->GetBindIP(), config_->GetPort());
}

五. sample 模板代码

这是使用codegen基于proto生成的模板代码,主要有配置文件、接口代码、连接分发代码、客户端调用代码。

请求分发

search_main.cpp中定义了Dispatch方法,并在实例化HshaServer使用了Dispatch。

void Dispatch(const phxrpc::BaseRequest &req,phxrpc::BaseResponse *const resp,phxrpc::DispatcherArgs_t *const args) {ServiceArgs_t *service_args{(ServiceArgs_t *)(args->service_args)};SearchServiceImpl service(*service_args);SearchDispatcher dispatcher(service, args);phxrpc::BaseDispatcher<SearchDispatcher> base_dispatcher(dispatcher, SearchDispatcher::GetURIFuncMap());if (!base_dispatcher.Dispatch(req, resp)) {resp->SetFake(phxrpc::BaseResponse::FakeReason::DISPATCH_ERROR);}
}int main() {phxrpc::HshaServer server(config.GetHshaServerConfig(), Dispatch, &service_args);server.RunForever();
}

经过HshaServer->HshaServerUnit->WorkerPool->Worker这样的重重转发,最终在WorkerLogic函数中使用到了Dispatch函数,实际是通过uri与实际接口函数映射map定位到真实的接口函数处理执行。

void Worker::WorkerLogic(void *args, BaseRequest *req, int queue_wait_time_ms) {...BaseResponse *resp{req->GenResponse()};if (queue_wait_time_ms < MAX_QUEUE_WAIT_TIME_COST) {HshaServerStat::TimeCost time_cost;DispatcherArgs_t dispatcher_args(pool_->hsha_server_stat_->hsha_server_monitor_,worker_scheduler_, pool_->args_, args);pool_->dispatch_(*req, resp, &dispatcher_args);}...
}

接口代码

int SearchServiceImpl::PHXEcho(const google::protobuf::StringValue &req, google::protobuf::StringValue *resp) {resp->set_value(req.value());return 0;
}int SearchServiceImpl::Search(const search::SearchRequest &req, search::SearchResult *resp) {return -1;
}int SearchServiceImpl::Notify(const google::protobuf::StringValue &req, google::protobuf::Empty *resp) {return -1;
}

客户端

client同步调用

int SearchClient::PHXEcho(const google::protobuf::StringValue &req, google::protobuf::StringValue *resp)
{const phxrpc::Endpoint_t *ep{global_searchclient_config_.GetRandom()};if (ep) {phxrpc::BlockTcpStream socket;bool open_ret{phxrpc::PhxrpcTcpUtils::Open(&socket, ep->ip, ep->port,global_searchclient_config_.GetConnectTimeoutMS(), nullptr, 0,*(global_searchclient_monitor_.get()))};if (open_ret) {socket.SetTimeout(global_searchclient_config_.GetSocketTimeoutMS());phxrpc::HttpMessageHandlerFactory http_msg_factory;SearchStub stub(socket, *(global_searchclient_monitor_.get()), http_msg_factory);return stub.PHXEcho(req, resp);}}return -1;
}

client并发调用,使用协程向多个server并发发起调用。首先使用uthread_begin创建一个协程调度器,然后遍历服务器,使用uthread_t以lambda形式封装协程任务,作用是想当前服务器发送请求,并将任务加入到协程调度器。当有请求任务返回时会调用uthread_s结束协程任务。uthread_end是开始协程任务。

这里实现了Google提出的Backup Request调用,同时向多个Server发送请求,当有一个Server响应时请求结束,可以大幅度降低响应延迟问题。

int SearchClient::PHXBatchEcho(const google::protobuf::StringValue &req, google::protobuf::StringValue *resp)
{int ret{-1};size_t echo_server_count{2};uthread_begin;for (size_t i{0}; echo_server_count > i; ++i) {uthread_t [=, &uthread_s, &ret](void *) {const phxrpc::Endpoint_t *ep = global_searchclient_config_.GetByIndex(i);if (ep != nullptr) {phxrpc::UThreadTcpStream socket;if (phxrpc::PhxrpcTcpUtils::Open(&uthread_s, &socket, ep->ip, ep->port,global_searchclient_config_.GetConnectTimeoutMS(), *(global_searchclient_monitor_.get()))) {socket.SetTimeout(global_searchclient_config_.GetSocketTimeoutMS());phxrpc::HttpMessageHandlerFactory http_msg_factory;SearchStub stub(socket, *(global_searchclient_monitor_.get()), http_msg_factory);int this_ret{stub.PHXEcho(req, resp)};if (this_ret == 0) {ret = this_ret;uthread_s.Close();}}}};}uthread_end;return ret;
}

配置文件

search_client.conf, search_server.conf

[Server]
BindIP = 127.0.0.1              // Server IP
Port = 16161                    // Server Port
MaxThreads = 16                 // Worker 线程数
WorkerUThreadCount = 50         // 每个线程开启的协程数,采用-u生成的Server必须配置这一项
WorkerUThreadStackSize = 65536  // UThread worker的栈大小
IOThreadCount = 3               // IO线程数,针对业务请自行调节
PackageName = search            // Server 名字,用于自行实现的监控统计上报
MaxConnections = 800000         // 最大并发连接数
MaxQueueLength = 20480          // IO队列最大长度
FastRejectThresholdMS = 20      // 快速拒绝自适应调节阀值,建议保持默认20ms,不做修改[ServerTimeout]
SocketTimeoutMS = 5000          // Server读写超时,Worker处理超时

六、总结

阅读PhxRPC代码可以学到微信内部优秀的开发思路。但因为是轻量级框架,有些地方实现过于简化,比如日志、过载保护,可以在此基础上进一步的完善实现。

参考文献

[1] 微信后台 phxrpc (v0.8) 之 编译&&整体流程&&部分代码解析(一)
[2] 微信phxrpc源码分析
[3] phxrpc github
[4] 微信过载保护的实现原理
[5] c++ 流对象之streambuf
[6] PhxRPC源码分析(一、二、三)
[7] 云风coroutine源码分析
[8] ucontext-人人都可以实现的简单协程库
[9] std::streambuf从示例到应用

PhxRPC源码简析相关推荐

  1. 【Golang源码分析】Go Web常用程序包gorilla/mux的使用与源码简析

    目录[阅读时间:约10分钟] 一.概述 二.对比: gorilla/mux与net/http DefaultServeMux 三.简单使用 四.源码简析 1.NewRouter函数 2.HandleF ...

  2. django源码简析——后台程序入口

    django源码简析--后台程序入口 这一年一直在用云笔记,平时记录一些tips或者问题很方便,所以也就不再用博客进行记录,还是想把最近学习到的一些东西和大家作以分享,也能够对自己做一个总结.工作中主 ...

  3. (Ajax)axios源码简析(三)——请求与取消请求

    传送门: axios源码简析(一)--axios入口文件 axios源码简析(二)--Axios类与拦截器 axios源码简析(三)--请求与取消请求 请求过程 在Axios.prototype.re ...

  4. java ArrayList 概述 与源码简析

    ArrayList 概述 与源码简析 1 ArrayList 创建 ArrayList<String> list = new ArrayList<>(); //构造一个初始容量 ...

  5. Spring Boot源码简析 @EnableTransactionManagement

    相关阅读 Spring Boot源码简析 事务管理 Spring Boot源码简析 @EnableAspectJAutoProxy Spring Boot源码简析 @EnableAsync Sprin ...

  6. ffmpeg实战教程(十三)iJKPlayer源码简析

    要使用封装优化ijk就必须先了解ffmpeg,然后看ijk对ffmpeg的C层封装! 这是我看ijk源码时候的笔记,比较散乱.不喜勿喷~ ijk源码简析: 1.ijkplayer_jni.c 封装的播 ...

  7. 【Android项目】本地FM收音机开发及源码简析

    [Android项目]本地FM收音机开发及源码简析 目录 1.概述 2.收音机的基本原理 3.收音机其他信息 RDS功能 4.Android开发FM收音机源码解析 5.App层如何设计本地FM应用 6 ...

  8. Log-Pilot 源码简析

    Log-Pilot 源码简析 简单介绍 源码简析 Pilot结构体 Piloter接口 main函数 Pilot.Run Pilot.New Pilot.watch Pilot.processEven ...

  9. Spring Boot源码简析 @Qualifier

    源码 @Target({ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER, ElementType.TYPE, ElementT ...

最新文章

  1. Java知多少(29)覆盖和重载
  2. 如何使用python找出CPU数量
  3. Laravel5.2目录结构及composer.json文件解析
  4. 阿里云超算异构Spot集群,助力深势科技30%成本驱动MDaaS海量算力
  5. SpringBoot 手动配置 @Enable 的秘密
  6. QYResearch回顾:2017年中国汽车语音识别系统产量为1413万
  7. C#中的线程二(Cotrol.BeginInvoke和Control.Invoke)
  8. 3.剑指Offer --- 高质量的代码
  9. 网站被黑被劫持跳转的症状与木马代码清除
  10. 硬件工程师岗位应聘为什么都要求精通CC++呢,这其中有什么说法吗
  11. html实体注册商标,html 注册商标,html 注册商标代码
  12. 直流无刷电机及Matlab/Simulink驱动仿真
  13. oracle中的dual详解
  14. 目标跟踪之LTMU:High-Performance Long-Term Tracking with Meta-Updater环境配置及代码运行
  15. cisco pkt 路由器配置基础及接口配置 路由协议与交换技术
  16. Hadoop集群环境搭建(超详细)
  17. 生成双色球号码,祝大家好运^_^
  18. xtu oj 1218
  19. 螺纹铣刀与丝锥攻丝有什么区别,谁的优势大呢?
  20. 【路径规划】基于matlab Hybrid A_Star算法机器人路径规划【含Matlab源码 1390期】

热门文章

  1. 身材与攻打全体变得有些扭曲怪僻文学会员手打
  2. wGlasses AR智能眼镜正式发布!影育科技带来端云协同、软硬件及资源零代码、一体化的AR生态科技盛宴!
  3. javascript---对象和函数的引用、浅拷贝、深拷贝、递归
  4. 笔记本电脑键盘被锁如何解锁
  5. c语言用随机投点法计算圆周率,(原创精品)用随机投点法计算π值【compute π with dartpoint randomly】...
  6. mycat分片规则详解+实例演示
  7. AI红包皮速领,人类现金速抽|祝大家新春快乐
  8. 我爱天文 - 月亮从哪边升出来?
  9. 计算机的正确配置文件,显示器颜色配置文件在win10电脑中设置正确配置的方法...
  10. 17 | 分布式安全:上百个分布式节点,不会出现“内奸”吗?