From: http://www.cfanz.cn/?c=article&a=read&id=34821

作者:yurunsun@gmail.com 新浪微博@孙雨润 新浪博客 CSDN博客日期:2012年11月28日

1. 背景介绍

1.1 什么是asio

2012年从5月份开始我主持了webyy服务器项目(http://www.yy.com/webyy.html),项目中没有按照惯例使用公司既有的基于epoll的网络框架,而是尝试了C++ tr2标准中的实验网络库asio,无论从开发效率、程序性能、稳定性上来说,都是一次成功的尝试。虽然是商业项目,但使用了linux、asio、protobuf等大量开源项目,开发过程共也借鉴了其他一些开源项目,因此我决定把与公司无关的部分剥离一下,分享出来,尽到使用自由软件的义务。

asio由Christopher M. Kohlhoff大牛从2003年着手开发,2006年申请加入C++ tr1,2008年3月份加入boost1.35.0,按照boost与C++标准库的发展惯例,预测很快会加入C++标准库中。其中的async调用方式已经作为非常重要的新特性,加入到C++0x标准库。

1.2 asio的相关资料

asio官方提供了及其详细的文档、例子、教程,没有必要再累赘地将其转述一遍。如果有朋友对英文有些吃力,网上也早有很多翻译版。这里提供一些官方的文档资料:

  • 非boost版本的asio —— http://think-async.com/

    与boost::asio的主要区别就是名字空间是boost::asio还是asio。

  • boost::asio —— http://www.boost.org/doc/libs/1520/doc/html/boost_asio.html

  • 申请加入tr2时的申报材料 —— 猛击此处下载高清pdf

  • proactor模式的首篇论文 —— 猛击此处下载高清pdf

  • 使用asio的大型开源网络项目sip —— https://svn.resiprocate.org/rep/resiprocate/main

2. 源码参考

由于代码使用了一点其他的工具,所以并没有想让读者能够编译通过。但是对从头开始搭建服务器的朋友来说,一定是一份非常有价值的参考。

2.1 作为Client的模块

这部分供作为Client去连接其他服务器时使用。给出的源码中有三个类:TcpConnection, BizConnection, Client. 其中

  • TcpConnection 提供了与协议无关的tcp连接,异步操作的结果以虚函数方式供派生类使用

  • BizConnection 继承自TcpConnection,使用具体的协议解析报文

  • Client使用BizConnection,并提供了等待具体某条消息的wait_for、心跳、延迟等功能

2.1.1 tcpconnection.h

#ifndef TCPCONNECTION_H
#define TCPCONNECTION_H/*** @author yurunsun@gmail.com*/#include <asio.hpp>
#include <asio/deadline_timer.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/timer.hpp>
#include <sstream>
#include "safehandler.h"class TcpConnection: public boost::enable_shared_from_this<TcpConnection>, private boost::noncopyable
{
public:typedef std::vector<uint8_t> DataBuffer;typedef boost::shared_ptr<TcpConnection> TcpPtr;static TcpPtr create(asio::io_service& io_service, const string& name){return TcpPtr(new TcpConnection(io_service, name));}virtual ~TcpConnection();void start(const string& ip, const string& port);void start(unsigned ip, uint16_t port);void start(const string& ip, uint16_t port);void stop();bool isConnected() {return m_socket.is_open();}/// Getters and Settersvoid setName(const string& name) {m_name = name;}const string& getName() {return m_name;}void setHeadLength(uint32_t size) {m_headLength = size;}uint32_t getHeadLength() {return m_headLength;}void setConnectTimeoutSec(uint32_t sec) {m_connectTimeoutSec = sec;}uint32_t getConnectTimeoutSec() {return m_connectTimeoutSec;}const string& getip() {return m_ip;}uint16_t getport() {return m_port;}string getFarpointInfo() { stringstream ss; ss << m_name << " " << m_ip << ":" << m_port << " "; return ss.str(); }protected:explicit TcpConnection(asio::io_service& io_service, const string& name);/// Provide for derived classvoid connect(asio::ip::tcp::endpoint endpoint);void receiveHead();void receiveBody(uint32_t bodyLength);void send(const void *data, uint32_t length);/// Class override callbacksvirtual void onConnectSuccess() { assert(false); }virtual void onConnectFailure(const asio::error_code& e) { (void)e; assert(false); }virtual void onReceiveHeadSuccess(DataBuffer& data) { (void)data; assert(false); }virtual void onReceiveBodySuccess(DataBuffer& data) { (void)data; assert(false); }virtual void onReceiveFailure(const asio::error_code& e) { (void)e; assert(false); }virtual void onSendSuccess() { assert(false); }virtual void onSendFailure(const asio::error_code& e) { (void)e; assert(false); }virtual void onTimeoutFailure(const asio::error_code& e) { (void)e; assert(false); }virtual void onCommonError(uint32_t ec, const string& em) { (void)ec; (void)em; assert(false); }private:void checkDeadline(const asio::error_code& e);void handleConnect(const asio::error_code &e);void handleReceiveHead(const asio::error_code& e);void handleReceiveBody(const asio::error_code& e);void handleSend(const asio::error_code& e);typedef TcpConnection this_type;asio::ip::tcp::socket m_socket;asio::deadline_timer m_deadline;bool m_stopped;DataBuffer m_readBuf;string m_name;boost::shared_ptr<Probe> m_probe;uint32_t m_headLength;uint32_t m_connectTimeoutSec;string m_ip;uint16_t m_port;
};#endif // TCPCONNECTION_H

2.1.2 tcpconnection.cpp

#include "stdafx.h"
#include "tcpconnection.h"using asio::ip::tcp;TcpConnection::TcpConnection(asio::io_service &io_service, const string& name): m_socket(io_service), m_deadline(io_service), m_stopped(false), m_name(name), m_probe(new Probe), m_headLength(10), m_connectTimeoutSec(5), m_ip(""), m_port(0)
{
}TcpConnection::~TcpConnection()
{stop();
}void TcpConnection::start(const string &ip, const string &port)
{start(ip, atoi(port.c_str()));
}void TcpConnection::start(unsigned ip, uint16_t port)
{connect(tcp::endpoint(asio::ip::address_v4(ip), port));
}void TcpConnection::start(const string &ip, uint16_t port)
{asio::ip::address_v4 addr_v4 = asio::ip::address_v4::from_string(ip);connect(tcp::endpoint(addr_v4, port));
}void TcpConnection::stop()
{if (!m_stopped) {m_stopped = true;try {m_readBuf.clear();asio::error_code ignored;m_socket.shutdown(tcp::socket::shutdown_both, ignored);m_socket.close(ignored);m_deadline.cancel();} catch (const asio::system_error& err) {FATAL("asio::system_error em %s", err.what());}}
}void TcpConnection::connect(tcp::endpoint endpoint)
{m_stopped = false;m_ip = endpoint.address().to_string();m_port = endpoint.port();INFO("Trying connect %s:%u ...%s", STR(m_ip), m_port, STR(m_name));m_deadline.expires_from_now(boost::posix_time::seconds(m_connectTimeoutSec));m_socket.async_connect(endpoint, boost::bind(&TcpConnection::handleConnect, shared_from_this(), asio::placeholders::error));m_deadline.async_wait(SafeHandler1<this_type, const asio::error_code&>(&this_type::checkDeadline, this, m_probe));
}void TcpConnection::receiveHead()
{m_readBuf.resize(m_headLength);asio::async_read(m_socket, asio::buffer(&m_readBuf[0], m_headLength),boost::bind(&TcpConnection::handleReceiveHead, shared_from_this(), asio::placeholders::error));
}void TcpConnection::receiveBody(uint32_t bodyLength)
{if (!m_stopped){if ((bodyLength <= MAX_BUFFER_SIZE) && (bodyLength > 0)) {m_readBuf.resize(bodyLength + m_headLength);asio::async_read(m_socket, asio::buffer(&m_readBuf[m_headLength], bodyLength),boost::bind(&TcpConnection::handleReceiveBody, shared_from_this(), asio::placeholders::error));} else {onCommonError(S_FATAL, "illegal bodyLength to call receiveBody");}} else {onCommonError(S_ERROR, "illegal to call receiveBody while tcp is not connected");}
}void TcpConnection::send(const void* data, uint32_t length)
{if (!m_stopped) {if (length <= MAX_BUFFER_SIZE) {asio::async_write(m_socket, asio::const_buffers_1(data, length),boost::bind(&TcpConnection::handleSend, shared_from_this(), asio::placeholders::error));} else {onCommonError(S_ERROR, "too big length to call send");}} else {onCommonError(S_ERROR, "illegal to call send while tcp is not connected");}
}/*** @brief TcpConnection::checkDeadline* @param e* case1: m_stopped == true which means user canceled* case2: m_deadline.expires_at() <= asio::deadline_timer::traits_type::now()*          Check whether the deadline has passed. We compare the deadline againstthe current time since a new asynchronous operation may have moved thedeadline before this actor had a chance to run.*/
void TcpConnection::checkDeadline(const asio::error_code& e)
{if (!m_stopped) {if (m_deadline.expires_at() <= asio::deadline_timer::traits_type::now()) {onTimeoutFailure(e);}}
}void TcpConnection::handleConnect(const asio::error_code &e)
{if (!m_stopped) {if (!e) {m_deadline.cancel();onConnectSuccess();} else {onConnectFailure(e);}} else {INFO("%s %s %u user's canceled by stop()", STR(m_name), STR(m_ip), m_port);}
}void TcpConnection::handleReceiveHead(const asio::error_code &e)
{if (!m_stopped) {if (!e) {onReceiveHeadSuccess(m_readBuf);} else if (isConnected()){onReceiveFailure(e);}} else {INFO("%s %s %u user's canceled by stop()", STR(m_name), STR(m_ip), m_port);}
}void TcpConnection::handleReceiveBody(const asio::error_code &e)
{if (!m_stopped) {if (!e) {onReceiveBodySuccess(m_readBuf);} else if (isConnected()){onReceiveFailure(e);}} else {INFO("%s %s %u user's canceled by stop()", STR(m_name), STR(m_ip), m_port);}
}void TcpConnection::handleSend(const asio::error_code &e)
{if (!m_stopped) {if (!e) {//onSendSuccess();} else if (isConnected()){onSendFailure(e);}} else {INFO("%s %s %u user's canceled by stop()", STR(m_name), STR(m_ip), m_port);}
}

2.1.3 bizconnection.h

#ifndef BIZCONNECTION_H
#define BIZCONNECTION_H/*** @author yurunsun@gmail.com*/#include <asio.hpp>
#include <cstdio>
#include <stdexcept>
#include "sigslot/sigslot.h"#include "tcpconnection.h"class BizConnection: public TcpConnection
{
public:typedef boost::shared_ptr<BizConnection> BizPtr;static BizPtr create(asio::io_service& io_service, const string& name = string("")){return BizPtr(new BizConnection(io_service, name));}void sendBizMsg(uint32_t uri, const BizPackage& pkg);sigslot::signal0<> BizConnected;sigslot::signal2<uint32_t, const string&> BizError;sigslot::signal0<> BizClosed;sigslot::signal1<BizPackage&> BizMsgArrived;protected:explicit BizConnection(asio::io_service& io_service, const string& name);/// Implement callbacks in base classvirtual void onConnectSuccess();virtual void onConnectFailure(const asio::error_code& e);virtual void onReceiveHeadSuccess(DataBuffer& data);virtual void onReceiveBodySuccess(DataBuffer& data);virtual void onReceiveFailure(const asio::error_code& e);virtual void onSendSuccess();virtual void onSendFailure(const asio::error_code& e);virtual void onTimeoutFailure(const asio::error_code &e);virtual void onCommonError(uint32_t ec, const string &em);static void initNeedErrorSet();static std::set<uint32_t> m_needError;private:inline bool peekLength(void* data, uint32_t length, uint32_t& outputi32);void handleError(const string& from, uint32_t ec, const asio::error_code& e = asio::error_code());
};inline bool BizConnection::peekLength(void* data, uint32_t length, uint32_t& outputi32)
{if (length >= 4) {memcpy(&outputi32, data, sizeof(uint32_t));return true;} else {return false;}
}#endif // BIZCONNECTION_H

2.1.4 bizconnection.cpp

#include "stdafx.h"
#include "bizconnection.h"
#include "tcpconnection.h"std::set<uint32_t> BizConnection::m_needError;BizConnection::BizConnection(asio::io_service &io_service, const string& name): TcpConnection(io_service, name)
{
}void BizConnection::sendBizMsg(unsigned uri, const BizPackage &pkg)
{/// TODO: usually this BizPackage contains the buffer of stream data to be sent to farpoint/// You should implement this by retriving buffer in BizPackage then call TcpConnection::send();
}void BizConnection::onConnectSuccess()
{receiveHead();BizConnected.emit();
}void BizConnection::onConnectFailure(const asio::error_code &e)
{if (e == asio::error::operation_aborted) {INFO("%s %s %u operation aborted... %s", STR(getName()), STR(getip()), getport(), STR(e.message()));}else if ((e == asio::error::already_connected) || (e == asio::error::already_open) || (e == asio::error::already_started)) {WARN("%s %s %u alread connected... %s", STR(getName()), STR(getip()), getport(), STR(e.message()));} else {handleError("onConnectFailure", S_FATAL, e);}
}void BizConnection::onReceiveHeadSuccess(TcpConnection::DataBuffer &data)
{uint32_t pkglen = 0;if (peekLength(data.data(), data.size(), pkglen)) {receiveBody(pkglen - getHeadLength());} else {handleError("peekLength", S_FATAL);}
}void BizConnection::onReceiveBodySuccess(TcpConnection::DataBuffer &data)
{/// This is simply an example, actually it's user's duty to unmarshal buffer to package.BizPackage msg;BizPackage.unserializeFrom(data);BizMsgArrived.emit(msg);receiveHead();
}void BizConnection::onReceiveFailure(const asio::error_code &e)
{if ((e == asio::error::operation_aborted)) {INFO("%s operation_aborted... %s", STR(getFarpointInfo()), STR(e.message()));} else {handleError("onReceiveFailure", S_FATAL, e);}
}void BizConnection::onSendSuccess()
{/// Leave it blank
}void BizConnection::onSendFailure(const asio::error_code &e)
{if ((e == asio::error::operation_aborted)) {INFO("%s operation_aborted... %s", STR(getFarpointInfo()), STR(e.message()));} else {handleError("onSendFailure", S_FATAL, e);}
}void BizConnection::onTimeoutFailure(const asio::error_code &e)
{if ((e == asio::error::operation_aborted)) {INFO("%s operation_aborted... %s", STR(getFarpointInfo()), STR(e.message()));} else {handleError("onTimeoutFailure", S_FATAL);}
}void BizConnection::onCommonError(uint32_t ec, const string &em)
{handleError(em, ec);
}void BizConnection::handleError(const string& from, uint32_t ec, const asio::error_code &e/* = asio::error_code()*/)
{stringstream ss;ss << getFarpointInfo() << " " << from;if (e) {ss << " asio " << e.value() << " " << e.message();}BizError.emit(ec, ss.str());
}

2.1.5 client.h

#ifndef CLIENT_H
#define CLIENT_H/*** @author yurunsun@gmail.com*/#include "bizconnection.h"
#include "handler.h"
#include "safehandler.h"#include <asio.hpp>
#include <boost/timer.hpp>class Client: public sigslot::has_slots<>
{
protected:BizConnection::BizPtr m_pBizConnection;public:typedef void (Client::*RequestPtr)(BizPackage&);typedef std::map<uint32_t, RequestPtr> RequestMap;typedef void (Client::*NotifyPtr)(BizPackage&);typedef std::map<uint32_t, NotifyPtr> NotifyMap;explicit Client(const string& name = string(""));/// 继承类需要实现的提供外部的方法virtual void startServer() = 0;virtual bool sendToServer(YProto &proto) = 0;virtual void stopServer() {clearWaitforTimer(); m_pBizConnection->stop();}protected:/// 继承类需要实现的初始化函数virtual void initRequestMap() {assert(false);}virtual void initNotifyMap() {assert(false);}virtual void initSignal();/// 继承类需要实现的钩子函数,用于处理网络事件virtual void onBizMsgArrived(core::Request& msg) = 0;virtual void onBizError(uint32_t ec, const string& em);virtual void onBizConnected() = 0;/// 继承类可以使用的工具方法/// 1. 心跳类void setKeepAliveSec(uint32_t sec) {m_keepAliveSec = sec;}uint32_t getKeepAliveSec() {return m_keepAliveSec;}void startKeepAlive();void keepAlive(const asio::error_code& e);virtual void onKeepAlive() {assert(false);}/// 2. 登陆状态类void setHasLogin(bool b) {m_hasLogin = b;}bool getHasLogin() {return m_hasLogin;}bool isOnline() { return (m_pBizConnection->isConnected() && m_hasLogin);}/// 3. 消息保存类template <typename Handler>bool savePendingCommand(Handler handler){if(m_pendingCmd.size() < MaxPendingCommandCount) {m_pendingCmd.push_back(Command(handler));return true;}return false;}void sendPendingCommand(){if (isOnline()) {vector<Command>::iterator it = m_pendingCmd.begin();for(; it != m_pendingCmd.end(); ++it) {(*it)();}m_pendingCmd.clear();}}/// 4. 延迟处理类typedef void (Client::*HoldonCallback)();void holdonSeconds(uint32_t sec, HoldonCallback func);void holdonHandler(HoldonCallback func, const asio::error_code &e);/// 5. waitfor 工具 处理异步消息超时typedef boost::shared_ptr<asio::deadline_timer> SharedTimerPtr;typedef boost::scoped_ptr<asio::deadline_timer> ScopedTimerPtr;typedef boost::shared_ptr<Probe> SharedProbe;typedef map<uint32_t, SharedTimerPtr> Uri2Timer;            /// 等待收到的包uri --> 这个时间timervoid waitfor(uint32_t uri, uint32_t sec);                   /// 在发送req的时候调用,sec 秒数 uri 等待收到的urivoid waitforTimeout(uint32_t uri, const asio::error_code& e); /// 所有waitfor超时都会自动回调这个函数virtual void onWaitforTimeout(uint32_t uri) {(void)uri; assert(false);}         /// 继承类覆盖这个钩子函数来进行错误处理void waitforReceived(uint32_t uri);                         /// 当响应函数handler被回调时,记得调用waitforReceived做清理工作void eraseWaitforTimer(uint32_t uri);void clearWaitforTimer();/// 继承类可以使用的工具成员:心跳 探针 请求阻塞typedef std::set<uint32_t> BlockReq;BlockReq m_block;SharedProbe m_probe;ScopedTimerPtr m_timer;uint32_t m_keepAliveSec;bool m_hasLogin;vector<Command> m_pendingCmd;static const uint32_t MaxPendingCommandCount = 20;ScopedTimerPtr m_holdonTimer;Uri2Timer m_uri2timer;
};#define BIND_REQ(m, uri, callback) \m[static_cast<uint32_t>(uri)] = static_cast<RequestPtr>(callback);#define BIND_NOTIFY(m, uri, callback) \m[static_cast<uint32_t>(uri)] = static_cast<NotifyPtr>(callback);#endif // CLIENT_H

2.1.6 client.cpp

#include "stdafx.h"
#include "client.h"Client::Client(const string& name): m_pBizConnection(BizConnection::create(ioService::instance(), name)), m_probe(new Probe), m_keepAliveSec(10), m_hasLogin(false)
{
}void Client::initSignal()
{m_pBizConnection->BizError.connect(this, &Client::onBizError);m_pBizConnection->BizMsgArrived.connect(this, &Client::onBizMsgArrived);m_pBizConnection->BizConnected.connect(this, &Client::onBizConnected);
}void Client::onBizError(uint32_t ec, const string &em)
{m_facade.serverError.emit(ec, em);
}void Client::startKeepAlive()
{m_timer.reset(new asio::deadline_timer(m_facade.io_service_ref));m_timer->expires_from_now(boost::posix_time::seconds(m_keepAliveSec));m_timer->async_wait(SafeHandler1<Client, const asio::error_code&>(&Client::keepAlive, this, m_probe));
}void Client::keepAlive(const asio::error_code &e)
{if (e != asio::error::operation_aborted) {FINE("%u send ping to %s %s:%u", m_facade.m_pInfo->uid, STR(m_pBizConnection->getName()), STR(m_pBizConnection->getip()), m_pBizConnection->getport());onKeepAlive();m_timer->expires_from_now(boost::posix_time::seconds(m_keepAliveSec ));m_timer->async_wait(SafeHandler1<Client, const asio::error_code&>(&Client::keepAlive, this, m_probe));}
}void Client::holdonSeconds(uint32_t sec, HoldonCallback func)
{m_holdonTimer.reset(new asio::deadline_timer(m_facade.io_service_ref));m_holdonTimer->expires_from_now(boost::posix_time::seconds(sec));SafeHandler1Bind1<Client, HoldonCallback, const asio::error_code&> h(&Client::holdonHandler, this, func, m_probe);m_holdonTimer->async_wait(h);
}void Client::holdonHandler(HoldonCallback func, const asio::error_code &e)
{if (!e) {if (m_holdonTimer != NULL)m_holdonTimer->cancel();(this->*func)();} else {WARN("error: %s", STR(e.message()));}
}void Client::waitfor(uint32_t uri, uint32_t sec)
{SharedTimerPtr t(new asio::deadline_timer(m_facade.io_service_ref));t->expires_from_now(boost::posix_time::seconds(sec));t->async_wait(SafeHandler1Bind1<Client, uint32_t, const asio::error_code&>(&Client::waitforTimeout, this, uri, m_probe));m_uri2timer[uri] = t;
}void Client::waitforTimeout(uint32_t uri, const asio::error_code &e)
{if (e != asio::error::operation_aborted) {FATAL("%s waitfor uri %u timeout", STR(m_pBizConnection->getName()), uri);eraseWaitforTimer(uri);onWaitforTimeout(uri);}
}void Client::waitforReceived(uint32_t uri)
{eraseWaitforTimer(uri);
}void Client::eraseWaitforTimer(uint32_t uri)
{Uri2Timer::iterator it = m_uri2timer.find(uri);if (it != m_uri2timer.end()) {SharedTimerPtr& t = it->second;if (t) {asio::error_code e;t->cancel(e);t.reset();}m_uri2timer.erase(it);}
}void Client::clearWaitforTimer()
{Uri2Timer::iterator it = m_uri2timer.begin();for (; it != m_uri2timer.end(); ++it) {SharedTimerPtr& t = it->second;if (t) {asio::error_code e;t->cancel(e);t.reset();}}m_uri2timer.clear();
}

2.2 作为server模块

作为server模块由于涉及公司的业务比较多,这里剥离出一个作为crossdomain服务器的部分,功能很简单:flash客户端通过socket请求crossdomain配置文件,server返回给定的字符串。这里使用了比较著名的pimpl模式,将实现完全隐藏在cpp文件中。

2.2.1 crossdomain.h

#ifndef CROSSDOMAIN_H
#define CROSSDOMAIN_H#include <string>
#include <boost/shared_ptr.hpp>
#include <asio.hpp>/*** @author yurunsun@gmail.com*/class CrossDomain
{
private:struct Server;boost::shared_ptr<Server> m_pserver;CrossDomain(asio::io_service& io_service, const std::string& local_port);static CrossDomain* s_instance;public:static void create(asio::io_service& io_service, const std::string& local_port){s_instance = new CrossDomain(io_service, local_port);}static CrossDomain* instance();void start_server();
};#endif // CROSSDOMAIN_H

2.2.2 crossdomain.cpp

#include "stdafx.h"
#include "crossdomain.h"using asio::ip::tcp;
using boost::uint8_t;
CrossDomain* CrossDomain::s_instance = NULL;struct CrossDomainImpl : public boost::enable_shared_from_this<CrossDomainImpl>
{
public:static const unsigned MaxReadSize = 22;typedef boost::shared_ptr<CrossDomainImpl> CrossDomainImplPtr;static CrossDomainImplPtr create(asio::io_service& io_service) {return CrossDomainImplPtr(new CrossDomainImpl(io_service));}tcp::socket& get_socket() {return m_socket;}void start() {start_read_some();}~CrossDomainImpl() {close();}void close() {if (m_socket.is_open()) {m_socket.close();}}private:CrossDomainImpl(asio::io_service& io_service): m_socket(io_service){}void start_read_some() {m_socket.async_read_some(asio::buffer(m_readbuf, MaxReadSize),boost::bind(&CrossDomainImpl::handle_read_some, shared_from_this(), asio::placeholders::error()));}void handle_read_some(const asio::error_code& err) {if (!err) {string str(m_readbuf);string reply("invalid");if (str == "<policy-file-request/>") {reply = "anything you wanna send back to client...";}asio::async_write(m_socket, asio::buffer(ref),boost::bind(&CrossDomainImpl::handle_write, shared_from_this(), asio::placeholders::error));}}void handle_write(const asio::error_code& error) {FINE("CrossDomain handle_write, gonna close");close();}tcp::socket m_socket;char m_readbuf[MaxReadSize];
};struct CrossDomain::Server
{
private:CrossDomain *m_facade;tcp::acceptor m_acceptor;bool m_listened;string m_local_port;public:Server(asio::io_service& io_service, const string &local_port): m_acceptor(io_service), m_listened(false), m_local_port(local_port){// intend to leave it blank}~Server() {if (m_acceptor.is_open()) {INFO("close server acceptor");m_acceptor.close();}}void start_server() {FINE("CrossDomain start_server....");if (!m_listened) {FINE("Try to listen...");try {tcp::endpoint ep(tcp::endpoint(tcp::v4(), atoi(m_local_port.c_str())));m_acceptor.open(ep.protocol());m_acceptor.bind(ep);m_acceptor.listen();} catch (const asio::system_error& ec) {WARN("Port %s already in use! Fail to listen...", STR(m_local_port));return;} catch (...) {WARN("Unknown error while trying to listen...");return;}m_listened = true;FINE("Listen port %s succesfully!", STR(m_local_port));}CrossDomainImpl::CrossDomainImplPtr new_server_impl = CrossDomainImpl::create(m_acceptor.get_io_service());m_acceptor.async_accept(new_server_impl->get_socket(),boost::bind(&Server::handle_accept, this, new_server_impl, asio::placeholders::error));}private:void handle_accept(CrossDomainImpl::CrossDomainImplPtr pserver_impl, const asio::error_code& err) {FINE("CrossDomain handle_accpet....");if (!err) {FINE("CrossDomain everything ok, start...");pserver_impl->start();    // start this serverstart_server();           // waiting for another Tuna Connection} else {pserver_impl->close();}}
};CrossDomain::CrossDomain(asio::io_service &io_service, const std::string &local_port): m_pserver(new Server(io_service, local_port))
{
}CrossDomain *CrossDomain::instance()
{if (!s_instance) {return NULL;}return s_instance;
}void CrossDomain::start_server()
{m_pserver->start_server();
}

3. 使用asio的陷阱

上边代码其实有几点漏洞:

3.1 std::vector<uint_8>不适合作为buffer

vector<uint8_t>不适合做buffer的原因是,sgi的内存分配器会以2倍的形式增长vector的内存,例如这个buffer要求100K,但当前vector的capability只有90K,那么sgi默认内存分配器会将vector的capability增长到180K。注意capability与size的区别。这就导致vector的内存占用依赖最大buffer的size,这是很危险的。

推荐使用boost的circular_buffer作为buffer,能有效避免内存碎片、隐式内存泄露等问题。

3.2 asio::const_buffer拷贝构造函数没有深拷贝

const_buffer系列静态buffer只能从mutable_buffermerge过来,但是从const_buffer的拷贝构造函数源码能看到,他并不对buffer做深拷贝。所以试图将其放到队列或者容器中,期待产生buffer的拷贝,是错误的。

3.3. async_write可能会拆包发送

例如先调用async_write发送一个100K的大包,再马上调用async_write发送一个8字节的ping包,非常可能出现问题。async_write函数的实现是循环调用async_write_some,对于大包会将其拆分成几个小报文。如果此时收到用户一个新的async_write调用,非常可能将小包夹在大包的几个部分中间发送,导致接收端出现异常。

解决的办法可以直接操作async_write_some,代替async_write。但更方便的办法是创建一个发送队列。实际上asio会准确地将发送成功的通知发送给用户,例如刚刚100K的打包,直到所有100K全部发送完成,才会调用handle回调。因此可以在发送时将报文入队列,回调函数里将报文出队列,发送下一个小报时判断队列是否为空,如果非空说明100K的包还没有发完。示例代码如下:

///发送时入队列
void TcpConnection::send(const void* data, uint32_t length)
{if (!m_stopped) {if (length <= MAX_BUFFER_SIZE) {const char* begin = (const char*)data;vector<char> vec(begin, begin + length);bool isLastComplete = m_bufQueue.empty();m_bufQueue.push_back(vec);/// 如果没有残余的包,就直接发送if (isLastComplete) {vector<char>& b(m_bufQueue.front());send(b);}} else {onCommonError(S_ERROR, "too big length to call send");}} else {onCommonError(S_ERROR, "illegal to call send while tcp is not connected");}
}void TcpConnection::send(const std::vector<char>& vec)
{if (!m_stopped) {asio::async_write(m_socket, asio::buffer(&vec[0], vec.size()), asio::transfer_all(), boost::bind(&TcpConnection::handleSend, shared_from_this(), asio::placeholders::error));} else {onCommonError(S_ERROR, "illegal to call send while tcp is not connected");}
}///回调函数将之前的buffer出队列,同时检查是否有后来的包
void TcpConnection::handleSend(const asio::error_code &e)
{if (!m_stopped) {if (!e) {m_bufQueue.pop_front();if (!m_bufQueue.empty()) {std::vector<char>& b(m_bufQueue.front());send(b);}//onSendSuccess();} else if (isConnected()){onSendFailure(e);}} else {INFO("%s %s %u user's canceled by stop()", STR(m_name), STR(m_ip), m_port);}
}

[Boost.asio] 深入linux网络编程(四):使用asio搭建商用服务器相关推荐

  1. linux网络编程(二)高并发服务器

    linux网络编程(二)高并发服务器 错误处理 高并发服务器 多进程并发服务器 客户端 错误处理 #include "wrap.h"int Bind(int fd, const s ...

  2. Linux网络编程(六)-高并发服务器03-I/O多路复用03:epoll【红黑树;根节点为监听节点】【无宏FD_SETSIZE限制;不需每次都将要监听的文件描述符从应用层拷贝到内核;不需遍历树】

    一.epoll概述 epoll的本质是一个[红黑树].监听结点为根节点. 大量并发,少量活跃效率高. epoll是Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并 ...

  3. Linux网络编程3——多进/线程并发服务器

    视频链接 黑马程序员-Linux网络编程_哔哩哔哩_bilibilihttps://www.bilibili.com/video/BV1iJ411S7UA?p=37 目录 一.高并发服务器 1.1 图 ...

  4. Linux 网络编程四(socket多线程升级版)

    //网络编程--客户端 #include <stdio.h> #include <stdlib.h> #include <string.h> #include &l ...

  5. Unix 网络编程(四)- 典型TCP客服服务器程序开发实例及基本套接字API介绍

    转载:http://blog.csdn.net/michael_kong_nju/article/details/43457393 写在开头: 在上一节中我们学习了一些基础的用来支持网络编程的API, ...

  6. python: 网络编程及fastapi快速搭建web服务器

    一.网络编程 1.网络编程三要素 ip地址.端口.TCP协议 ip地址 ip地址作用: 根据ip地址能够找到网络中的具体设备(电脑,打印机) ip地址概念: ip地址是网络设备的唯一标识 ip地址分类 ...

  7. linux网络编程之多路I/O转接服务器poll函数

    (1)poll函数 头文件:#include<poll.h> int  poll(struct  pollfd*fds, nfds_t nfds,int timeout); struct  ...

  8. linux网络编程之多路I/o转接服务器select

    (1)多路IO转接服务器也叫做多任务IO服务器,其主要思想是不再由程序自己监听客户端连接,取而代之的是由内核替应用程序监视文件,具体实现模型如图所示: 当客户端请求和服务器连接时,内核接收到连接指令, ...

  9. Linux网络编程-五

    Linux网络编程-五 1 线程池并发服务器 2 UDP通信 2.1TCP和UDP的区别 2.2 UDP通信相关函数介绍: 2.3 UDP的服务器和客户端编码流程 2.4 代码练习 3 本地socke ...

  10. 【Linux网络编程】TCP三次握手和四次挥手

    00. 目录 文章目录 00. 目录 01. 三次握手 02. 四次挥手 03. 三次握手和四次挥手原因 04. 2MSL 05. 附录 01. 三次握手 在 TCP/IP 协议中,TCP 协议提供可 ...

最新文章

  1. 由PostgreSQL的区域与字符集说起
  2. Java线程的概念:什么是线程?
  3. element ui登录界面_Vue和Element-UI做一个简单的登录页面
  4. 你必须搞清楚的String,StringBuilder,StringBuffer
  5. js正则匹配闭合标签_正则表达式匹配封闭html标签
  6. 7、Java格式注意要点
  7. git学习 add - commit - init
  8. python编写鸡兔同笼程序设计_Python少儿编程:鸡兔同笼,涨知识了
  9. 模板题——图论相关(2)
  10. [STL源码剖析]RB-tree的插入操作
  11. 10年老技术员教你免费的、完整的把 PDF 转换为 Word
  12. 网络工程制图论文计算机,计算机工程制图教学的课业评价的论文
  13. JAVA远程声卡,Delphi带多声道声卡(ASIO)
  14. C++ Primer 5th - 1.1 编写一个简单的C++程序
  15. 补题:HOJ吉林selection B-Bribing Eve(Gym-101174B) (象限极角排序)
  16. 华为的少将人才选拔方法
  17. android 音频转mp3格式,音频 (六)- 安卓 ndk 将 pcm 转换为 mp3
  18. NXP i.MX 8处理器再扩军!全新i.MX 8X处理器剑指工业与汽车应用
  19. libvirt介绍和使用
  20. python编程里的幂怎么表示

热门文章

  1. 时分秒毫秒 正则表达式
  2. 360安全路由器外网连内网(端口映射)的设置方法
  3. 计算机课评课意见,信息技术应用 用计算机画函数图象第一课时评课稿
  4. OA办公系统能帮助企业做些什么?
  5. 电力-开闭所/配电房/变电所/变电站
  6. 『vulnhub系列』dpwwn-1—Linux计划任务提权
  7. Mac系统如何安装Eclipse并搭建Android开发环境
  8. Kali Linux学习入门
  9. python怎么判断质数和合数_什么是质数和合数以及判断方法介绍
  10. 笔记本如何正确安装对应显卡驱动