文章目录

  • server端使用
    • brpc::Server::AddService初始化各种数据
    • StartInternal内部其余服务也调用该函数
    • 接收连接套接字StartAccept请求
    • ResetFileDescriptor设置socket信息,并注册epoll中
    • IO多路复用EventDispatcher::Run(),事件监听
      • OnNewConnections接收连接请求
    • OnNewMessages读事件调用
    • 包校验CutInputMessage
      • QueueMessage对收到的包,开启协程进行逻辑处理
    • 以http协议为例ProcessHttpRequest执行调用
    • echo.proto中echo.pb.cc的CallMethod方法
    • EchoServiceImpl::Echo
    • 总结

大家好,我是dandyhuang,brpc在c艹届还是比较牛逼的rpc框架,本次带来brpc的server端的源码分析。分析源码前,大家先搭建好环境,有利于代码调试和理解。按照brpc框架中example的echo_c++为例子,并且将protobuf中,编译出的中间文件echo.pb.cc和.h保留,有利于我们更好的代码理解。

server端使用

namespace example {
class EchoServiceImpl : public EchoService {
public:EchoServiceImpl() {};virtual ~EchoServiceImpl() {};virtual void Echo(google::protobuf::RpcController* cntl_base,const EchoRequest* request,EchoResponse* response,google::protobuf::Closure* done) {brpc::ClosureGuard done_guard(done);brpc::Controller* cntl =static_cast<brpc::Controller*>(cntl_base);response->set_message(request->message());if (FLAGS_echo_attachment) {cntl->response_attachment().append(cntl->request_attachment());}}
};
int main(int argc, char* argv[]) {// Parse gflags. We recommend you to use gflags as well.GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);brpc::Server server;// 继承proto文件example::EchoServiceImpl echo_service_impl;if (server.AddService(&echo_service_impl, brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {LOG(ERROR) << "Fail to add service";return -1;}butil::EndPoint point;point = butil::EndPoint(butil::IP_ANY, FLAGS_port);// Start the server.brpc::ServerOptions options;options.idle_timeout_sec = FLAGS_idle_timeout_s;if (server.Start(point, &options) != 0) {LOG(ERROR) << "Fail to start EchoServer";return -1;}// Wait until Ctrl-C is pressed, then Stop() and Join() the server.server.RunUntilAskedToQuit();return 0;
}

总体代码还是比较简洁,如果能够根据IDL自动代码生成那样就更完美了。话不多说,直接开撸源码

brpc::Server::AddService初始化各种数据

int Server::AddServiceInternal(google::protobuf::Service* service,bool is_builtin_service,const ServiceOptions& svc_opt) {// 如果idl中imp没有定义方法,那么校验失败const google::protobuf::ServiceDescriptor* sd = service->GetDescriptor();if (sd->method_count() == 0) {LOG(ERROR) << "service=" << sd->full_name()<< " does not have any method.";return -1;}// 初始化并注册:NamingService,LoadBalancer,CompressHandler,protocols等// 后续的收发包校验函数都会使用if (InitializeOnce() != 0) {LOG(ERROR) << "Fail to initialize Server[" << version() << ']';return -1;}// InitializeOnce初始化失败则退出if (status() != READY) {LOG(ERROR) << "Can't add service=" << sd->full_name() << " to Server["<< version() << "] which is " << status_str(status());return -1;}// defined `option (idl_support) = true' or not.const bool is_idl_support = sd->file()->options().GetExtension(idl_support);Tabbed* tabbed = dynamic_cast<Tabbed*>(service);// 初始化定义的servicefor (int i = 0; i < sd->method_count(); ++i) {const google::protobuf::MethodDescriptor* md = sd->method(i);MethodProperty mp;mp.is_builtin_service = is_builtin_service;mp.own_method_status = true;mp.params.is_tabbed = !!tabbed;mp.params.allow_default_url = svc_opt.allow_default_url;mp.params.allow_http_body_to_pb = svc_opt.allow_http_body_to_pb;mp.params.pb_bytes_to_base64 = svc_opt.pb_bytes_to_base64;mp.service = service;mp.method = md;mp.status = new MethodStatus;_method_map[md->full_name()] = mp;if (is_idl_support && sd->name() != sd->full_name()/*has ns*/) {MethodProperty mp2 = mp;mp2.own_method_status = false;// have to map service_name + method_name as well because ubrpc// does not send the namespace before service_name.std::string full_name_wo_ns;full_name_wo_ns.reserve(sd->name().size() + 1 + md->name().size());full_name_wo_ns.append(sd->name());full_name_wo_ns.push_back('.');full_name_wo_ns.append(md->name());if (_method_map.seek(full_name_wo_ns) == NULL) {_method_map[full_name_wo_ns] = mp2;} else {LOG(ERROR) << '`' << full_name_wo_ns << "' already exists";RemoveMethodsOf(service);return -1;}}}const ServiceProperty ss = {is_builtin_service, svc_opt.ownership, service, NULL };_fullname_service_map[sd->full_name()] = ss;_service_map[sd->name()] = ss;if (is_builtin_service) {++_builtin_service_count;} else {if (_first_service == NULL) {_first_service = service;}}butil::StringPiece restful_mappings = svc_opt.restful_mappings;restful_mappings.trim_spaces();// restful_mappings解析我们暂时就不分析了,主要就是匹配方法if (!restful_mappings.empty()) {// Parse the mappings.·········}// AddBuiltinService 实例化。idl生成的服务,不属于tabbedif (tabbed) {if (_tab_info_list == NULL) {_tab_info_list = new TabInfoList;}const size_t last_size = _tab_info_list->size();tabbed->GetTabInfo(_tab_info_list);const size_t cur_size = _tab_info_list->size();for (size_t i = last_size; i != cur_size; ++i) {const TabInfo& info = (*_tab_info_list)[i];if (!info.valid()) {LOG(ERROR) << "Invalid TabInfo: path=" << info.path<< " tab_name=" << info.tab_name;_tab_info_list->resize(last_size);RemoveService(service);return -1;}}}return 0;
}

AddServiceInternal中,可以用于echo.proto生成的服务和BuiltinService的服务校验,初始化等。如方法是否有没定义,存储映射的关系:_method_map、_service_map、_fullname_service_map等

StartInternal内部其余服务也调用该函数

int Server::StartInternal(const butil::ip_t& ip,const PortRange& port_range,const ServerOptions *opt) {std::unique_ptr<Server, RevertServerStatus> revert_server(this);if (_failed_to_set_max_concurrency_of_method) {_failed_to_set_max_concurrency_of_method = false;LOG(ERROR) << "previous call to MaxConcurrencyOf() was failed, ""fix it before starting server";return -1;}// addserver已初始化if (InitializeOnce() != 0) {LOG(ERROR) << "Fail to initialize Server[" << version() << ']';return -1;}const Status st = status();// addserver初始化成功,设置为ready就绪状态if (st != READY) {if (st == RUNNING) {LOG(ERROR) << "Server[" << version() << "] is already running on "<< _listen_addr;} else {LOG(ERROR) << "Can't start Server[" << version()<< "] which is " << status_str(status());}return -1;}if (opt) {_options = *opt;} else {_options = ServerOptions();}// Init _keytable_pool always. If the server was stopped before, the pool// should be destroyed in Join()._keytable_pool = new bthread_keytable_pool_t;if (bthread_keytable_pool_init(_keytable_pool) != 0) {LOG(ERROR) << "Fail to init _keytable_pool";delete _keytable_pool;_keytable_pool = NULL;return -1;}_tl_options = ThreadLocalOptions();_concurrency = 0;if (_options.has_builtin_services &&_builtin_service_count <= 0 &&AddBuiltinServices() != 0) {LOG(ERROR) << "Fail to add builtin services";return -1;}// If a server is started/stopped for mutiple times and one of the options// sets has_builtin_service to true, builtin services will be enabled for// any later re-start. Check this case and report to user.if (!_options.has_builtin_services && _builtin_service_count > 0) {LOG(ERROR) << "A server started/stopped for multiple times must be ""consistent on ServerOptions.has_builtin_services";return -1;}// Prepare all restful mapsfor (ServiceMap::const_iterator it = _fullname_service_map.begin();it != _fullname_service_map.end(); ++it) {if (it->second.restful_map) {it->second.restful_map->PrepareForFinding();}}if (_global_restful_map) {_global_restful_map->PrepareForFinding();}// cpu核数+1,设置协程数量if (_options.num_threads > 0) {if (FLAGS_usercode_in_pthread) {_options.num_threads += FLAGS_usercode_backup_threads;}if (_options.num_threads < BTHREAD_MIN_CONCURRENCY) {_options.num_threads = BTHREAD_MIN_CONCURRENCY;}bthread_setconcurrency(_options.num_threads);}// 设置限流auto、constant、unlimitedfor (MethodMap::iterator it = _method_map.begin();it != _method_map.end(); ++it) {if (it->second.is_builtin_service) {it->second.status->SetConcurrencyLimiter(NULL);} else {const AdaptiveMaxConcurrency* amc = &it->second.max_concurrency;if (amc->type() == AdaptiveMaxConcurrency::UNLIMITED()) {amc = &_options.method_max_concurrency;}ConcurrencyLimiter* cl = NULL;if (!CreateConcurrencyLimiter(*amc, &cl)) {LOG(ERROR) << "Fail to create ConcurrencyLimiter for method";return -1;}it->second.status->SetConcurrencyLimiter(cl);}}_listen_addr.ip = ip;for (int port = port_range.min_port; port <= port_range.max_port; ++port) {_listen_addr.port = port;// 创建listen_fd套接字butil::fd_guard sockfd(tcp_listen(_listen_addr));if (sockfd < 0) {if (port != port_range.max_port) { // not the last port, try nextcontinue;}if (port_range.min_port != port_range.max_port) {LOG(ERROR) << "Fail to listen " << ip<< ":[" << port_range.min_port << '-'<< port_range.max_port << ']';} else {LOG(ERROR) << "Fail to listen " << _listen_addr;}return -1;}if (_listen_addr.port == 0) {// port=0 makes kernel dynamically select a port from// https://en.wikipedia.org/wiki/Ephemeral_port_listen_addr.port = get_port_from_fd(sockfd);if (_listen_addr.port <= 0) {LOG(ERROR) << "Fail to get port from fd=" << sockfd;return -1;}}if (_am == NULL) {// 创建接收器,将解析,打包等协议都存储起来_am = BuildAcceptor();if (NULL == _am) {LOG(ERROR) << "Fail to build acceptor";return -1;}}// Set `_status' to RUNNING before accepting connections// to prevent requests being rejected as ELOGOFF_status = RUNNING;time(&_last_start_time);// 记录一些信息到version中GenerateVersionIfNeeded();g_running_server_count.fetch_add(1, butil::memory_order_relaxed);// 开启协程,创建epoll_create,接收客户端的连接、请求if (_am->StartAccept(sockfd, _options.idle_timeout_sec,_default_ssl_ctx) != 0) {LOG(ERROR) << "Fail to start acceptor";return -1;}sockfd.release();break; // stop trying}// 内置服务accept创建的信息if (_options.internal_port >= 0 && _options.has_builtin_services) {// 同样的处理方式...}// pid写入文件PutPidFileIfNeeded();// 更新指标连接信息等CHECK_EQ(INVALID_BTHREAD, _derivative_thread);if (bthread_start_background(&_derivative_thread, NULL,UpdateDerivedVars, this) != 0) {LOG(ERROR) << "Fail to create _derivative_thread";return -1;}// Print tips to server launcher.int http_port = _listen_addr.port;std::ostringstream server_info;server_info << "Server[" << version() << "] is serving on port="<< _listen_addr.port;if (_options.internal_port >= 0 && _options.has_builtin_services) {http_port = _options.internal_port;server_info << " and internal_port=" << _options.internal_port;}LOG(INFO) << server_info.str() << '.';if (_options.has_builtin_services) {LOG(INFO) << "Check out http://" << butil::my_hostname() << ':'<< http_port << " in web browser.";} else {LOG(WARNING) << "Builtin services are disabled according to ""ServerOptions.has_builtin_services";}// 设置链路追踪的地址SetTrackMeAddress(butil::EndPoint(butil::my_ip(), http_port));revert_server.release();return 0;
}

StartInternal中,创建了套接字、创建接收器。核心还是在StartAccept中。

接收连接套接字StartAccept请求

int Acceptor::StartAccept(int listened_fd, int idle_timeout_sec,const std::shared_ptr<SocketSSLContext>& ssl_ctx) {// 校验套接字合法性 if (listened_fd < 0) {LOG(FATAL) << "Invalid listened_fd=" << listened_fd;return -1;}...// Creation of _acception_id is inside lock so that OnNewConnections// (which may run immediately) should see sane fields set below.SocketOptions options;options.fd = listened_fd;options.user = this;// 设置回调,到时候epoll收到连接的请求的时候,会调用这个函数options.on_edge_triggered_events = OnNewConnections;if (Socket::Create(options, &_acception_id) != 0) {// Close-idle-socket thread will be stopped inside destructorLOG(FATAL) << "Fail to create _acception_id";return -1;}_listened_fd = listened_fd;_status = RUNNING;return 0;
}int Socket::Create(const SocketOptions& options, SocketId* id) {butil::ResourceId<Socket> slot;// 获取创建套接字Socket* const m = butil::get_resource(&slot, Forbidden());if (m == NULL) {LOG(FATAL) << "Fail to get_resource<Socket>";return -1;}// 初始化socketg_vars->nsocket << 1;CHECK(NULL == m->_shared_part.load(butil::memory_order_relaxed));m->_nevent.store(0, butil::memory_order_relaxed);m->_keytable_pool = options.keytable_pool;m->_tos = 0;m->_remote_side = options.remote_side;m->_on_edge_triggered_events = options.on_edge_triggered_events;m->_this_id = MakeSocketId(VersionOfVRef(m->_versioned_ref.fetch_add(1, butil::memory_order_release)), slot);m->_preferred_index = -1;m->_hc_count = 0;CHECK(m->_read_buf.empty());const int64_t cpuwide_now = butil::cpuwide_time_us();m->_last_readtime_us.store(cpuwide_now, butil::memory_order_relaxed);m->reset_parsing_context(options.initial_parsing_context);m->_correlation_id = 0;m->_health_check_interval_s = options.health_check_interval_s;m->_ninprocess.store(1, butil::memory_order_relaxed);m->_auth_flag_error.store(0, butil::memory_order_relaxed);const int rc2 = bthread_id_create(&m->_auth_id, NULL, NULL);if (rc2) {LOG(ERROR) << "Fail to create auth_id: " << berror(rc2);m->SetFailed(rc2, "Fail to create auth_id: %s", berror(rc2));return -1;}// NOTE: last two params are useless in bthread > r32787const int rc = bthread_id_list_init(&m->_id_wait_list, 512, 512);if (rc) {LOG(ERROR) << "Fail to init _id_wait_list: " << berror(rc);m->SetFailed(rc, "Fail to init _id_wait_list: %s", berror(rc));return -1;}m->_last_writetime_us.store(cpuwide_now, butil::memory_order_relaxed);m->_unwritten_bytes.store(0, butil::memory_order_relaxed);CHECK(NULL == m->_write_head.load(butil::memory_order_relaxed));// Must be last one! Internal fields of this Socket may be access// just after calling ResetFileDescriptor.// 这里设置套接字的一些信息,并且把fd注册到epoll队列if (m->ResetFileDescriptor(options.fd) != 0) {const int saved_errno = errno;PLOG(ERROR) << "Fail to ResetFileDescriptor";m->SetFailed(saved_errno, "Fail to ResetFileDescriptor: %s", berror(saved_errno));return -1;}*id = m->_this_id;return 0;
}

调用socket::Create来创建套接字,这里套接字的accept、connect等,都是用这个函数创建的。on_edge_triggered_events用来回调处理,这里当发生客户端链接的事件时,我们调用回调OnNewConnections。

ResetFileDescriptor设置socket信息,并注册epoll中

int Socket::ResetFileDescriptor(int fd) {// Reset message sizes when fd is changed._last_msg_size = 0;_avg_msg_size = 0;_fd.store(fd, butil::memory_order_release);_reset_fd_real_us = butil::gettimeofday_us();// 校验fd是否合法if (!ValidFileDescriptor(fd)) {return 0;}// 获取本地ip信息if (butil::get_local_side(fd, &_local_side) != 0) {_local_side = butil::EndPoint();}// 关闭一些子进程等无用文件描述符butil::make_close_on_exec(fd);// 设置非阻塞if (butil::make_non_blocking(fd) != 0) {PLOG(ERROR) << "Fail to set fd=" << fd << " to non-blocking";return -1;}// 关闭Nagle算法butil::make_no_delay(fd);if (_tos > 0 &&setsockopt(fd, IPPROTO_IP, IP_TOS, &_tos, sizeof(_tos)) < 0) {PLOG(FATAL) << "Fail to set tos of fd=" << fd << " to " << _tos;}// 设置发送缓冲区if (FLAGS_socket_send_buffer_size > 0) {int buff_size = FLAGS_socket_send_buffer_size;socklen_t size = sizeof(buff_size);if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &buff_size, size) != 0) {PLOG(FATAL) << "Fail to set sndbuf of fd=" << fd << " to " << buff_size;}}// 设置接收缓冲区大小        if (FLAGS_socket_recv_buffer_size > 0) {int buff_size = FLAGS_socket_recv_buffer_size;socklen_t size = sizeof(buff_size);if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &buff_size, size) != 0) {PLOG(FATAL) << "Fail to set rcvbuf of fd=" << fd << " to " << buff_size;}}// 第一次创建epoll_create,并将接收fd加入到epoll_ctl_add队列中if (_on_edge_triggered_events) {if (GetGlobalEventDispatcher(fd).AddConsumer(id(), fd) != 0) {PLOG(ERROR) << "Fail to add SocketId=" << id() << " into EventDispatcher";_fd.store(-1, butil::memory_order_release);return -1;}}return 0;
}

这里设置了fd。并且启动协程,创建epoll队列。并将连接设置为ET(边沿触发)模式。此次,我们就可以接受来自client端的请求了。

IO多路复用EventDispatcher::Run(),事件监听


void EventDispatcher::Run() {while (!_stop) {epoll_event e[32];// 立刻返回int n = epoll_wait(_epfd, e, ARRAY_SIZE(e), 0);// 没有事件的时候if (n == 0) {// 阻塞等待事件到来n = epoll_wait(_epfd, e, ARRAY_SIZE(e), -1);}if (_stop) {break;}if (n < 0) {if (EINTR == errno) {// We've checked _stop, no wake-up will be missed.continue;}PLOG(FATAL) << "Fail to epoll_wait epfd=" << _epfd;break;}// 事件EPOLLINfor (int i = 0; i < n; ++i) {if (e[i].events & (EPOLLIN | EPOLLERR | EPOLLHUP)|| (e[i].events & has_epollrdhup)) {// We don't care about the return value.Socket::StartInputEvent(e[i].data.u64, e[i].events,_consumer_thread_attr);}}// 事件EPOLLOUTfor (int i = 0; i < n; ++i) {if (e[i].events & (EPOLLOUT | EPOLLERR | EPOLLHUP)) {// We don't care about the return value.Socket::HandleEpollOut(e[i].data.u64);}}}
}

当有连接事件时,触发epolin,调用StartInputEvent。

int Socket::StartInputEvent(SocketId id, uint32_t events,const bthread_attr_t& thread_attr) {SocketUniquePtr s;// 根据之前AddConsumer的id,找到对应Socket::Create创建的套接字映射if (Address(id, &s) < 0) {return -1;}// 校验回调函数是否为空,这里是OnNewConnectionsif (NULL == s->_on_edge_triggered_events) {return 0;}if (s->fd() < 0) {CHECK(!(events & EPOLLIN)) << "epoll_events=" << events;return -1;}// 这里头,如果有多个事件发生,每个fd只保证只创建一个协程来处理。来保证并发安全问题if (s->_nevent.fetch_add(1, butil::memory_order_acq_rel) == 0) {g_vars->neventthread << 1;bthread_t tid;// transfer ownership as well, don't use s anymore!Socket* const p = s.release();bthread_attr_t attr = thread_attr;attr.keytable_pool = p->_keytable_pool;if (bthread_start_urgent(&tid, &attr, ProcessEvent, p) != 0) {LOG(FATAL) << "Fail to start ProcessEvent";ProcessEvent(p);}}return 0;
}void* Socket::ProcessEvent(void* arg) {SocketUniquePtr s(static_cast<Socket*>(arg));// 调用OnNewConnectionss->_on_edge_triggered_events(s.get());return NULL;
}

Address根据listend_fd获取信息,在启动协程,调用回调OnNewConnections

OnNewConnections接收连接请求

void Acceptor::OnNewConnections(Socket* acception) {int progress = Socket::PROGRESS_INIT;do {while (1) {struct sockaddr in_addr;socklen_t in_len = sizeof(in_addr);// 接受到client发起连接服务器的请求butil::fd_guard in_fd(accept(acception->fd(), &in_addr, &in_len));if (in_fd < 0) {// 非阻塞,已经处理了所有的连接,则返回if (errno == EAGAIN) {return;}// 其他的一些原因,需要继续接收continue;}Acceptor* am = dynamic_cast<Acceptor*>(acception->user());if (NULL == am) {LOG(FATAL) << "Impossible! acception->user() MUST be Acceptor";acception->SetFailed(EINVAL, "Impossible! acception->user() MUST be Acceptor");return;}SocketId socket_id;SocketOptions options;options.keytable_pool = am->_keytable_pool;options.fd = in_fd;options.remote_side = butil::EndPoint(*(sockaddr_in*)&in_addr);options.user = acception->user();// 执行read读取缓冲区的数据options.on_edge_triggered_events = InputMessenger::OnNewMessages;options.initial_ssl_ctx = am->_ssl_ctx;// 跟listen_fd一样的处理,创建接收的fd。并将fd注册到epoll队列,启动协程当客户端端发送数据请求时。if (Socket::Create(options, &socket_id) != 0) {LOG(ERROR) << "Fail to create Socket";continue;}in_fd.release(); // transfer ownership to socket_idSocketUniquePtr sock;// 释放回收socketif (Socket::AddressFailedAsWell(socket_id, &sock) >= 0) {bool is_running = true;{BAIDU_SCOPED_LOCK(am->_map_mutex);is_running = (am->status() == RUNNING);// fd统计,检测am->_socket_map.insert(socket_id, ConnectStatistics());}if (!is_running) {LOG(WARNING) << "Acceptor on fd=" << acception->fd()<< " has been stopped, discard newly created " << *sock;sock->SetFailed(ELOGOFF, "Acceptor on fd=%d has been stopped, ""discard newly created %s", acception->fd(),sock->description().c_str());return;}} }if (acception->Failed()) {return;}// 这里对应因为只有一个协程同时处理同一个fd事件} while (acception->MoreReadEvents(&progress));
}

这里accept获取到客户端发起的连接请求。并将fd也注册到epoll队列中。

OnNewMessages读事件调用

void InputMessenger::OnNewMessages(Socket* m) {InputMessenger* messenger = static_cast<InputMessenger*>(m->user());const InputMessageHandler* handlers = messenger->_handlers;int progress = Socket::PROGRESS_INIT;std::unique_ptr<InputMessageBase, RunLastMessage> last_msg;bool read_eof = false;while (!read_eof) {const int64_t received_us = butil::cpuwide_time_us();const int64_t base_realtime = butil::gettimeofday_us() - received_us;size_t once_read = m->_avg_msg_size * 16;// 根据之前接收的包大大小,来判断本次要接收处理的数据if (once_read < MIN_ONCE_READ) {once_read = MIN_ONCE_READ;} else if (once_read > MAX_ONCE_READ) {once_read = MAX_ONCE_READ;}// read读取数据const ssize_t nr = m->DoRead(once_read);if (nr <= 0) {// 对端关闭连接if (0 == nr) {read_eof = true;                } else if (errno != EAGAIN) {if (errno == EINTR) {continue;  // just retry}const int saved_errno = errno;PLOG(WARNING) << "Fail to read from " << *m;m->SetFailed(saved_errno, "Fail to read from %s: %s",m->description().c_str(), berror(saved_errno));return;} else if (!m->MoreReadEvents(&progress)) {// EAGAIN or EWOULDBLOCK 错误说明缓冲区没有数据可读了return;} else { // 对应于我们每个fd只用一个协程处理,所以继续接收continue;}}// 统计等使用m->AddInputBytes(nr);// Avoid this socket to be closed due to idle_timeout_sm->_last_readtime_us.store(received_us, butil::memory_order_relaxed);size_t last_size = m->_read_buf.length();int num_bthread_created = 0;while (1) {size_t index = 8888;// 解析client传过来的数据,index记录解析的协议时哪种ParseResult pr = messenger->CutInputMessage(m, &index, read_eof);if (!pr.is_ok()) {// 半包情况,继续收包if (pr.error() == PARSE_ERROR_NOT_ENOUGH_DATA) {m->_last_msg_size += (last_size - m->_read_buf.length());break;// 其他错误退出} else if (pr.error() == PARSE_ERROR_TRY_OTHERS) {LOG(WARNING)<< "Close " << *m << " due to unknown message: "<< butil::ToPrintable(m->_read_buf);m->SetFailed(EINVAL, "Close %s due to unknown message",m->description().c_str());return;} else {LOG(WARNING) << "Close " << *m << ": " << pr.error_str();m->SetFailed(EINVAL, "Close %s: %s",m->description().c_str(), pr.error_str());return;}}m->AddInputMessages(1);// Calculate average size of messagesconst size_t cur_size = m->_read_buf.length();if (cur_size == 0) {m->_read_buf.return_cached_blocks();}// 一个包的大小last_size - cur_sizem->_last_msg_size += (last_size - cur_size);last_size = cur_size;const size_t old_avg = m->_avg_msg_size;if (old_avg != 0) {m->_avg_msg_size = (old_avg * (MSG_SIZE_WINDOW - 1) + m->_last_msg_size)/ MSG_SIZE_WINDOW;} else {m->_avg_msg_size = m->_last_msg_size;}m->_last_msg_size = 0;if (pr.message() == NULL) { // the Process() step can be skipped.continue;}pr.message()->_received_us = received_us;pr.message()->_base_real_us = base_realtime;DestroyingPtr<InputMessageBase> msg(pr.message());// 第一次的时候,last_msg为null。如果退出RunLastMessage会调用ProcessInputMessage// QueueMessage也会启动协程调用 ProcessInputMessageQueueMessage(last_msg.release(), &num_bthread_created,m->_keytable_pool);if (handlers[index].process == NULL) {LOG(ERROR) << "process of index=" << index << " is NULL";continue;}m->ReAddress(&msg->_socket);m->PostponeEOF();msg->_process = handlers[index].process;msg->_arg = handlers[index].arg;if (handlers[index].verify != NULL) {int auth_error = 0;if (0 == m->FightAuthentication(&auth_error)) {// Get the right to authenticateif (handlers[index].verify(msg.get())) {m->SetAuthentication(0);} else {m->SetAuthentication(ERPCAUTH);LOG(WARNING) << "Fail to authenticate " << *m;m->SetFailed(ERPCAUTH, "Fail to authenticate %s",m->description().c_str());return;}} else {LOG_IF(FATAL, auth_error != 0) <<"Impossible! Socket should have been ""destroyed when authentication failed";}}if (!m->is_read_progressive()) {// msg置换给last_msglast_msg.reset(msg.release());} else {QueueMessage(msg.release(), &num_bthread_created,m->_keytable_pool);bthread_flush();num_bthread_created = 0;}}if (num_bthread_created) {bthread_flush();}}if (read_eof) {m->SetEOF();}
}

DoRead读取缓冲区的数据。CutInputMessage校验每次获取的数据是否正确。我们拿http的协议来举例。之后校验成功后,启动协程调用QueueMessage。

包校验CutInputMessage

ParseResult InputMessenger::CutInputMessage(Socket* m, size_t* index, bool read_eof) {// 初始化为-1,存储上一次fd解析的协议,减少遍历次数const int preferred = m->preferred_index();const int max_index = (int)_max_index.load(butil::memory_order_acquire);// 先解析上次解析过的协议if (preferred >= 0 && preferred <= max_index&& _handlers[preferred].parse != NULL) {ParseResult result =// 按照http协议距离,解析使用调用的函数为ParseHttpMessage_handlers[preferred].parse(&m->_read_buf, m, read_eof, _handlers[preferred].arg);// 校验包不完整返回if (result.is_ok() ||result.error() == PARSE_ERROR_NOT_ENOUGH_DATA) {*index = preferred;return result;// 其他错误返回} else if (result.error() != PARSE_ERROR_TRY_OTHERS) {return result;}// 校验SocketUser是否为空if (m->CreatedByConnect() &&(ProtocolType)preferred != PROTOCOL_BAIDU_STD) {// The protocol is fixed at client-side, no need to try others.LOG(ERROR) << "Fail to parse response from " << m->remote_side()<< " by " << _handlers[preferred].name << " at client-side";return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG);}// Clear context before trying next protocol which probably has// an incompatible context with the current one.if (m->parsing_context()) {m->reset_parsing_context(NULL);}m->set_preferred_index(-1);}// 如果上次解析的解析不成功,在重新从0开始解析。一个个协议解析for (int i = 0; i <= max_index; ++i) {// 之前解析过了,跳过if (i == preferred || _handlers[i].parse == NULL) {// Don't try preferred handler(already tried) or invalid handlercontinue;}// 执行每个Protocol里头的ParseParseResult result = _handlers[i].parse(&m->_read_buf, m, read_eof, _handlers[i].arg);if (result.is_ok() ||result.error() == PARSE_ERROR_NOT_ENOUGH_DATA) {m->set_preferred_index(i);*index = i;return result;} else if (result.error() != PARSE_ERROR_TRY_OTHERS) {return result;}if (m->parsing_context()) {m->reset_parsing_context(NULL);}// Try other protocols.}return MakeParseError(PARSE_ERROR_TRY_OTHERS);
}

对收到的包,根据协议一个个解析。确认包是否正确

QueueMessage对收到的包,开启协程进行逻辑处理

static void QueueMessage(InputMessageBase* to_run_msg,int* num_bthread_created,bthread_keytable_pool_t* keytable_pool) {if (!to_run_msg) {return;}bthread_t th;bthread_attr_t tmp = (FLAGS_usercode_in_pthread ?BTHREAD_ATTR_PTHREAD :BTHREAD_ATTR_NORMAL) | BTHREAD_NOSIGNAL;tmp.keytable_pool = keytable_pool;// 启动协程执行 ProcessInputMessageif (bthread_start_background(&th, &tmp, ProcessInputMessage, to_run_msg) == 0) {++*num_bthread_created;} else {ProcessInputMessage(to_run_msg);}
}void* ProcessInputMessage(void* void_arg) {InputMessageBase* msg = static_cast<InputMessageBase*>(void_arg);// 调用ProcessHttpRequestmsg->_process(msg);return NULL;
}

启动协程,调用ProcessInputMessage

以http协议为例ProcessHttpRequest执行调用

void ProcessHttpRequest(InputMessageBase *msg) {const int64_t start_parse_us = butil::cpuwide_time_us();// ParseHttpMessage时ParseFromIOBuf解析将数据都存储到HttpContext内DestroyingPtr<HttpContext> imsg_guard(static_cast<HttpContext*>(msg));SocketUniquePtr socket_guard(imsg_guard->ReleaseSocket());Socket* socket = socket_guard.get();const Server* server = static_cast<const Server*>(msg->arg());ScopedNonServiceError non_service_error(server);// 创建controllerController* cntl = new (std::nothrow) Controller;if (NULL == cntl) {LOG(FATAL) << "Fail to new Controller";return;}// 后续http回包都调用改析构函数HttpResponseSender resp_sender(cntl);resp_sender.set_received_us(msg->received_us());// 判断是否是grpc协议const bool is_http2 = imsg_guard->header().is_http2();if (is_http2) {H2StreamContext* h2_sctx = static_cast<H2StreamContext*>(msg);resp_sender.set_h2_stream_id(h2_sctx->stream_id());}ControllerPrivateAccessor accessor(cntl);HttpHeader& req_header = cntl->http_request();// http head部分imsg_guard->header().Swap(req_header);// http body部分butil::IOBuf& req_body = imsg_guard->body();butil::EndPoint user_addr;if (!GetUserAddressFromHeader(req_header, &user_addr)) {user_addr = socket->remote_side();}ServerPrivateAccessor server_accessor(server);const bool security_mode = server->options().security_mode() &&socket->user() == server_accessor.acceptor();accessor.set_server(server).set_security_mode(security_mode).set_peer_id(socket->id()).set_remote_side(user_addr).set_local_side(socket->local_side()).set_auth_context(socket->auth_context()).set_request_protocol(is_http2 ? PROTOCOL_H2 : PROTOCOL_HTTP).set_begin_time_us(msg->received_us()).move_in_server_receiving_sock(socket_guard);// 根据http对的包头,做一些逻辑处理,如log_id设置,trace_id......// 解析http urI,看是否和服务生成的map匹配const Server::MethodProperty* const sp =FindMethodPropertyByURI(path, server, &req_header._unresolved_path);if (NULL == sp) {if (security_mode) {std::string escape_path;WebEscape(path, &escape_path);cntl->SetFailed(ENOMETHOD, "Fail to find method on `%s'", escape_path.c_str());} else {cntl->SetFailed(ENOMETHOD, "Fail to find method on `%s'", path.c_str());}return;} else if (sp->service->GetDescriptor() == BadMethodService::descriptor()) {BadMethodRequest breq;BadMethodResponse bres;butil::StringSplitter split(path.c_str(), '/');breq.set_service_name(std::string(split.field(), split.length()));sp->service->CallMethod(sp->method, cntl, &breq, &bres, NULL);return;}// Switch to service-specific error.non_service_error.release();MethodStatus* method_status = sp->status;resp_sender.set_method_status(method_status);if (method_status) {int rejected_cc = 0;if (!method_status->OnRequested(&rejected_cc)) {cntl->SetFailed(ELIMIT, "Rejected by %s's ConcurrencyLimiter, concurrency=%d",sp->method->full_name().c_str(), rejected_cc);return;}}// 过载保护的一些处理if (!sp->is_builtin_service && !sp->params.is_tabbed) {// 过载等if (socket->is_overcrowded()) {cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded",butil::endpoint2str(socket->remote_side()).c_str());return;}// 限流if (!server_accessor.AddConcurrency(cntl)) {cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d",server->options().max_concurrency);return;}// 线程是否过多if (FLAGS_usercode_in_pthread && TooManyUserCode()) {cntl->SetFailed(ELIMIT, "Too many user code to run when"" -usercode_in_pthread is on");return;}} else if (security_mode) {cntl->SetFailed(EPERM, "Not allowed to access builtin services, try ""ServerOptions.internal_port=%d instead if you're in"" internal network", server->options().internal_port);return;}//根据收到请求,调用对应的rpc方法,并根据方法创建req和respgoogle::protobuf::Service* svc = sp->service;const google::protobuf::MethodDescriptor* method = sp->method;accessor.set_method(method);google::protobuf::Message* req = svc->GetRequestPrototype(method).New();resp_sender.own_request(req);google::protobuf::Message* res = svc->GetResponsePrototype(method).New();resp_sender.own_response(res);if (__builtin_expect(!req || !res, 0)) {PLOG(FATAL) << "Fail to new req or res";cntl->SetFailed("Fail to new req or res");return;}// http_body 转pb过程if (sp->params.allow_http_body_to_pb &&method->input_type()->field_count() > 0) {// A protobuf service. No matter if Content-type is set to// applcation/json or body is empty, we have to treat body as a json// and try to convert it to pb, which guarantees that a protobuf// service is always accessed with valid requests.if (req_body.empty()) {// Treat empty body specially since parsing it results in errorif (!req->IsInitialized()) {cntl->SetFailed(EREQUEST, "%s needs to be created from a"" non-empty json, it has required fields.",req->GetDescriptor()->full_name().c_str());return;} // else all fields of the request are optional.} else {bool is_grpc_ct = false;const HttpContentType content_type =ParseContentType(req_header.content_type(), &is_grpc_ct);const std::string* encoding = NULL;// grpc协议根据不同的包头设置,如是否压缩,encoding等,对body校验解析if (is_http2) {if (is_grpc_ct) {bool grpc_compressed = false;if (!RemoveGrpcPrefix(&req_body, &grpc_compressed)) {cntl->SetFailed(ERESPONSE, "Invalid gRPC response");return;}if (grpc_compressed) {encoding = req_header.GetHeader(common->GRPC_ENCODING);if (encoding == NULL) {cntl->SetFailed(EREQUEST, "Fail to find header `grpc-encoding'"" in compressed gRPC request");return;}}int64_t timeout_value_us =ConvertGrpcTimeoutToUS(req_header.GetHeader(common->GRPC_TIMEOUT));if (timeout_value_us >= 0) {accessor.set_deadline_us(butil::gettimeofday_us() + timeout_value_us);}}} else {encoding = req_header.GetHeader(common->CONTENT_ENCODING);}if (encoding != NULL && *encoding == common->GZIP) {TRACEPRINTF("Decompressing request=%lu",(unsigned long)req_body.size());butil::IOBuf uncompressed;if (!policy::GzipDecompress(req_body, &uncompressed)) {cntl->SetFailed(EREQUEST, "Fail to un-gzip request body");return;}req_body.swap(uncompressed);}if (content_type == HTTP_CONTENT_PROTO) {if (!ParsePbFromIOBuf(req, req_body)) {cntl->SetFailed(EREQUEST, "Fail to parse http body as %s",req->GetDescriptor()->full_name().c_str());return;}} else {butil::IOBufAsZeroCopyInputStream wrapper(req_body);std::string err;json2pb::Json2PbOptions options;options.base64_to_bytes = sp->params.pb_bytes_to_base64;cntl->set_pb_bytes_to_base64(sp->params.pb_bytes_to_base64);if (!json2pb::JsonToProtoMessage(&wrapper, req, options, &err)) {cntl->SetFailed(EREQUEST, "Fail to parse http body as %s, %s",req->GetDescriptor()->full_name().c_str(), err.c_str());return;}}}} else {// 如果不解析body to pb部分,则保留原始数据cntl->request_attachment().swap(req_body);}// 创建done节点google::protobuf::Closure* done = new HttpResponseSenderAsDone(&resp_sender);imsg_guard.reset();  // optional, just release resourse ASAP// 这里开始调用rpc生成的方法,如echo.protoif (!FLAGS_usercode_in_pthread) {return svc->CallMethod(method, cntl, req, res, done);}if (BeginRunningUserCode()) {svc->CallMethod(method, cntl, req, res, done);return EndRunningUserCodeInPlace();} else {return EndRunningCallMethodInPool(svc, method, cntl, req, res, done);}
}

将收到的数据进行转化,创建control,req、resp、done等数据。最终调用CallMethod方法

echo.proto中echo.pb.cc的CallMethod方法

void EchoService::CallMethod(const ::google::protobuf::MethodDescriptor* method,::google::protobuf::RpcController* controller,const ::google::protobuf::Message* request,::google::protobuf::Message* response,::google::protobuf::Closure* done) {GOOGLE_DCHECK_EQ(method->service(),                         protobuf_echo_2eproto::file_level_service_descriptors[0]);switch(method->index()) {case 0:// 调用impl中的Echo方法Echo(controller,::google::protobuf::down_cast<const ::example::EchoRequest*>(request),::google::protobuf::down_cast< ::example::EchoResponse*>(response),done);break;default:GOOGLE_LOG(FATAL) << "Bad method index; this should never happen.";break;}
}

调用echo方法,就是调用了EchoServiceImpl::echo

EchoServiceImpl::Echo

class EchoServiceImpl : public EchoService {
public:EchoServiceImpl() {};virtual ~EchoServiceImpl() {};virtual void Echo(google::protobuf::RpcController* cntl,const EchoRequest* request,EchoResponse* response,google::protobuf::Closure* done) {// 写自己的业务逻辑}
};

至此我们已经收到数据做业务逻辑处理了

总结

brpc作为服务端整个收包过程基本就是这样,相信认真看完这篇文章。你对服务端肯定会有一定的收获。后面我们在再写文章分析介绍brpc服务端回包。 client收发包。brpc协程,socket套接字资源管理等。 关注我,我是dandyhuang。也可wx收dandyhuang_,有什么问题我们可以一起探讨交流。

Brpc 服务端收包源码分析(一)相关推荐

  1. 基于JAVA融呗智慧金融微资讯移动平台服务端计算机毕业设计源码+数据库+lw文档+系统+部署

    基于JAVA融呗智慧金融微资讯移动平台服务端计算机毕业设计源码+数据库+lw文档+系统+部署 基于JAVA融呗智慧金融微资讯移动平台服务端计算机毕业设计源码+数据库+lw文档+系统+部署 本源码技术栈 ...

  2. 基于JAVA社区养老综合服务平台服务端计算机毕业设计源码+数据库+lw文档+系统+部署

    基于JAVA社区养老综合服务平台服务端计算机毕业设计源码+数据库+lw文档+系统+部署 基于JAVA社区养老综合服务平台服务端计算机毕业设计源码+数据库+lw文档+系统+部署 本源码技术栈: 项目架构 ...

  3. suricata中DPDK收发包源码分析2

    <suricata中DPDK收发包源码分析1>中分析了整体的DPDK收发包框架代码,今天我们继续来深入了解一下一些细节方面的问题. 目录 Q1:收发包线程模式在代码中是怎样确定的? Q2: ...

  4. 分布式定时任务—xxl-job学习(四)——调度中心web页面端api调用源码分析

    分布式定时任务-xxl-job学习(四)--调度中心web页面端api调用源码分析 前言 一.controller目录下非controller类 1.1 PermissionLimit自定义注解 1. ...

  5. client-go之listers包源码分析

    listers包源码分析 listers包 admissionregistration包 listers包 从cache.Indexer中获取原生k8s的对象/列表 admissionregistra ...

  6. Android服务函数远程调用源码分析

    在Android服务查询完整过程源码分析中介绍了客户进程向ServiceManager进程查询服务的完整过程,ServiceManager进程根据服务名称在自身维护的服务链表中查找ServiceMan ...

  7. Eureka服务续约(Renew)源码分析

    主要对Eureka的Renew(服务续约),从服务提供者发起续约请求开始分析,通过阅读源码和画时序图的方式,展示Eureka服务续约的整个生命周期.服务续约主要是把服务续约的信息更新到自身的Eurek ...

  8. java如何通过grpc连接etcd_grpc通过 etcd 实现服务发现与注册-源码分析

    介绍 下面介绍 jupiter-0.2.7 版本中 grpc 通过 etcd 实现服务发现与注册. 服务发现与注册的实现解析 服务注册 服务注册的流程图: etcd的服务注册代码模块在 jupiter ...

  9. Golang日志框架lumberjack包源码分析

    github地址:  https://github.com/natefinch/lumberjack 获取源码 go get gopkg.in/natefinch/lumberjack.v2 介绍 l ...

最新文章

  1. ubuntu16.04 下安装Opencv2.4.9
  2. 20个!中国科协发布2020年重大科学问题和工程技术难题
  3. 创维 linux内核,Linux2.6内核在创维特jx2410平台上的移植四
  4. 深入理解 __doPostBack
  5. 卸载重装svn后原来项目不受管理,版本不对应还是,升级工作副本解决?
  6. python单双三引号区别_python中单引号,双引号,多引号区别_python中单双引号
  7. php 终止程序的方法——return、exit()、die()
  8. Java FileOutputStream
  9. arduino和轮毂电机接线_丰田Prius B电机技术解析
  10. idea怎么集成svn服务端,使用Mac自带svn搭建服务器,并使用idea进行连接(示例代码)...
  11. 一文读懂阿里云直播技术是如何实现的
  12. SpringCloud熔断机制大概什么意思
  13. 阿里巴巴数据库架构演进分析
  14. ctf show-web入门 php特性篇部分题解
  15. 图的存储结构——邻接表法
  16. 有关程序的50个至理名言
  17. websocket 超时重新连接
  18. 电商商品详情如何快速高效获取,api来帮你!
  19. 基于PHP的图书商城系统
  20. 简单保护动物网页制作stu-works.com学生保护动物网页设计作品HTML 濒危动物静态网页成品下载

热门文章

  1. 基于激光诱导石墨烯的柔性传感器最新研究进展[转]
  2. 免费下载国内各大音乐平台歌曲
  3. html留言板原理,js实现留言板
  4. 如何创建并签署CAB文件
  5. MLC Flash 笔记
  6. python strip()方法使用
  7. 表格中insertBefore方法的使用(The node before which the new node is to be inserted is not a child of this no)
  8. Oracle PLSQL
  9. 好用速度又快,多御安全浏览器下载安装教程
  10. 北工大计算机网络95分复习——【第三章 数据链路层】