1. 背景介绍

1.1 什么是asio

2012年从5月份开始我主持了webyy服务器项目(http://www.yy.com/webyy.html),项目中没有按照惯例使用公司既有的基于epoll的网络框架,而是尝试了C++ tr2标准中的实验网络库asio,无论从开发效率、程序性能、稳定性上来说,都是一次成功的尝试。虽然是商业项目,但使用了linux、asio、protobuf等大量开源项目,开发过程共也借鉴了其他一些开源项目,因此我决定把与公司无关的部分剥离一下,分享出来,尽到使用自由软件的义务。

asio由Christopher M. Kohlhoff大牛从2003年着手开发,2006年申请加入C++ tr1,2008年3月份加入boost1.35.0,按照boost与C++标准库的发展惯例,预测很快会加入C++标准库中。其中的async调用方式已经作为非常重要的新特性,加入到C++0x标准库。

1.2 asio的相关资料

asio官方提供了及其详细的文档、例子、教程,没有必要再累赘地将其转述一遍。如果有朋友对英文有些吃力,网上也早有很多翻译版。这里提供一些官方的文档资料:

  • 非boost版本的asio —— http://think-async.com/

    与boost::asio的主要区别就是名字空间是boost::asio还是asio。

  • boost::asio —— http://www.boost.org/doc/libs/1520/doc/html/boost_asio.html

  • 申请加入tr2时的申报材料 —— 猛击此处下载高清pdf

  • proactor模式的首篇论文 —— 猛击此处下载高清pdf

  • 使用asio的大型开源网络项目sip —— https://svn.resiprocate.org/rep/resiprocate/main

2. 源码参考

由于代码使用了一点其他的工具,所以并没有想让读者能够编译通过。但是对从头开始搭建服务器的朋友来说,一定是一份非常有价值的参考。

2.1 作为Client的模块

这部分供作为Client去连接其他服务器时使用。给出的源码中有三个类:TcpConnection, BizConnection, Client. 其中

  • TcpConnection 提供了与协议无关的tcp连接,异步操作的结果以虚函数方式供派生类使用

  • BizConnection 继承自TcpConnection,使用具体的协议解析报文

  • Client使用BizConnection,并提供了等待具体某条消息的wait_for、心跳、延迟等功能

2.1.1 tcpconnection.h

#ifndef TCPCONNECTION_H
#define TCPCONNECTION_H/*** @author yurunsun@gmail.com*/#include <asio.hpp>
#include <asio/deadline_timer.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/timer.hpp>
#include <sstream>
#include "safehandler.h"class TcpConnection: public boost::enable_shared_from_this<TcpConnection>, private boost::noncopyable
{
public:typedef std::vector<uint8_t> DataBuffer;typedef boost::shared_ptr<TcpConnection> TcpPtr;static TcpPtr create(asio::io_service& io_service, const string& name){return TcpPtr(new TcpConnection(io_service, name));}virtual ~TcpConnection();void start(const string& ip, const string& port);void start(unsigned ip, uint16_t port);void start(const string& ip, uint16_t port);void stop();bool isConnected() {return m_socket.is_open();}/// Getters and Settersvoid setName(const string& name) {m_name = name;}const string& getName() {return m_name;}void setHeadLength(uint32_t size) {m_headLength = size;}uint32_t getHeadLength() {return m_headLength;}void setConnectTimeoutSec(uint32_t sec) {m_connectTimeoutSec = sec;}uint32_t getConnectTimeoutSec() {return m_connectTimeoutSec;}const string& getip() {return m_ip;}uint16_t getport() {return m_port;}string getFarpointInfo() { stringstream ss; ss << m_name << " " << m_ip << ":" << m_port << " "; return ss.str(); }protected:explicit TcpConnection(asio::io_service& io_service, const string& name);/// Provide for derived classvoid connect(asio::ip::tcp::endpoint endpoint);void receiveHead();void receiveBody(uint32_t bodyLength);void send(const void *data, uint32_t length);/// Class override callbacksvirtual void onConnectSuccess() { assert(false); }virtual void onConnectFailure(const asio::error_code& e) { (void)e; assert(false); }virtual void onReceiveHeadSuccess(DataBuffer& data) { (void)data; assert(false); }virtual void onReceiveBodySuccess(DataBuffer& data) { (void)data; assert(false); }virtual void onReceiveFailure(const asio::error_code& e) { (void)e; assert(false); }virtual void onSendSuccess() { assert(false); }virtual void onSendFailure(const asio::error_code& e) { (void)e; assert(false); }virtual void onTimeoutFailure(const asio::error_code& e) { (void)e; assert(false); }virtual void onCommonError(uint32_t ec, const string& em) { (void)ec; (void)em; assert(false); }private:void checkDeadline(const asio::error_code& e);void handleConnect(const asio::error_code &e);void handleReceiveHead(const asio::error_code& e);void handleReceiveBody(const asio::error_code& e);void handleSend(const asio::error_code& e);typedef TcpConnection this_type;asio::ip::tcp::socket m_socket;asio::deadline_timer m_deadline;bool m_stopped;DataBuffer m_readBuf;string m_name;boost::shared_ptr<Probe> m_probe;uint32_t m_headLength;uint32_t m_connectTimeoutSec;string m_ip;uint16_t m_port;
};#endif // TCPCONNECTION_H

2.1.2 tcpconnection.cpp

#include "stdafx.h"
#include "tcpconnection.h"using asio::ip::tcp;TcpConnection::TcpConnection(asio::io_service &io_service, const string& name): m_socket(io_service), m_deadline(io_service), m_stopped(false), m_name(name), m_probe(new Probe), m_headLength(10), m_connectTimeoutSec(5), m_ip(""), m_port(0)
{
}TcpConnection::~TcpConnection()
{stop();
}void TcpConnection::start(const string &ip, const string &port)
{start(ip, atoi(port.c_str()));
}void TcpConnection::start(unsigned ip, uint16_t port)
{connect(tcp::endpoint(asio::ip::address_v4(ip), port));
}void TcpConnection::start(const string &ip, uint16_t port)
{asio::ip::address_v4 addr_v4 = asio::ip::address_v4::from_string(ip);connect(tcp::endpoint(addr_v4, port));
}void TcpConnection::stop()
{if (!m_stopped) {m_stopped = true;try {m_readBuf.clear();asio::error_code ignored;m_socket.shutdown(tcp::socket::shutdown_both, ignored);m_socket.close(ignored);m_deadline.cancel();} catch (const asio::system_error& err) {FATAL("asio::system_error em %s", err.what());}}
}void TcpConnection::connect(tcp::endpoint endpoint)
{m_stopped = false;m_ip = endpoint.address().to_string();m_port = endpoint.port();INFO("Trying connect %s:%u ...%s", STR(m_ip), m_port, STR(m_name));m_deadline.expires_from_now(boost::posix_time::seconds(m_connectTimeoutSec));m_socket.async_connect(endpoint, boost::bind(&TcpConnection::handleConnect, shared_from_this(), asio::placeholders::error));m_deadline.async_wait(SafeHandler1<this_type, const asio::error_code&>(&this_type::checkDeadline, this, m_probe));
}void TcpConnection::receiveHead()
{m_readBuf.resize(m_headLength);asio::async_read(m_socket, asio::buffer(&m_readBuf[0], m_headLength),boost::bind(&TcpConnection::handleReceiveHead, shared_from_this(), asio::placeholders::error));
}void TcpConnection::receiveBody(uint32_t bodyLength)
{if (!m_stopped){if ((bodyLength <= MAX_BUFFER_SIZE) && (bodyLength > 0)) {m_readBuf.resize(bodyLength + m_headLength);asio::async_read(m_socket, asio::buffer(&m_readBuf[m_headLength], bodyLength),boost::bind(&TcpConnection::handleReceiveBody, shared_from_this(), asio::placeholders::error));} else {onCommonError(S_FATAL, "illegal bodyLength to call receiveBody");}} else {onCommonError(S_ERROR, "illegal to call receiveBody while tcp is not connected");}
}void TcpConnection::send(const void* data, uint32_t length)
{if (!m_stopped) {if (length <= MAX_BUFFER_SIZE) {asio::async_write(m_socket, asio::const_buffers_1(data, length),boost::bind(&TcpConnection::handleSend, shared_from_this(), asio::placeholders::error));} else {onCommonError(S_ERROR, "too big length to call send");}} else {onCommonError(S_ERROR, "illegal to call send while tcp is not connected");}
}/*** @brief TcpConnection::checkDeadline* @param e* case1: m_stopped == true which means user canceled* case2: m_deadline.expires_at() <= asio::deadline_timer::traits_type::now()*          Check whether the deadline has passed. We compare the deadline againstthe current time since a new asynchronous operation may have moved thedeadline before this actor had a chance to run.*/
void TcpConnection::checkDeadline(const asio::error_code& e)
{if (!m_stopped) {if (m_deadline.expires_at() <= asio::deadline_timer::traits_type::now()) {onTimeoutFailure(e);}}
}void TcpConnection::handleConnect(const asio::error_code &e)
{if (!m_stopped) {if (!e) {m_deadline.cancel();onConnectSuccess();} else {onConnectFailure(e);}} else {INFO("%s %s %u user's canceled by stop()", STR(m_name), STR(m_ip), m_port);}
}void TcpConnection::handleReceiveHead(const asio::error_code &e)
{if (!m_stopped) {if (!e) {onReceiveHeadSuccess(m_readBuf);} else if (isConnected()){onReceiveFailure(e);}} else {INFO("%s %s %u user's canceled by stop()", STR(m_name), STR(m_ip), m_port);}
}void TcpConnection::handleReceiveBody(const asio::error_code &e)
{if (!m_stopped) {if (!e) {onReceiveBodySuccess(m_readBuf);} else if (isConnected()){onReceiveFailure(e);}} else {INFO("%s %s %u user's canceled by stop()", STR(m_name), STR(m_ip), m_port);}
}void TcpConnection::handleSend(const asio::error_code &e)
{if (!m_stopped) {if (!e) {//onSendSuccess();} else if (isConnected()){onSendFailure(e);}} else {INFO("%s %s %u user's canceled by stop()", STR(m_name), STR(m_ip), m_port);}
}

2.1.3 bizconnection.h

#ifndef BIZCONNECTION_H
#define BIZCONNECTION_H/*** @author yurunsun@gmail.com*/#include <asio.hpp>
#include <cstdio>
#include <stdexcept>
#include "sigslot/sigslot.h"#include "tcpconnection.h"class BizConnection: public TcpConnection
{
public:typedef boost::shared_ptr<BizConnection> BizPtr;static BizPtr create(asio::io_service& io_service, const string& name = string("")){return BizPtr(new BizConnection(io_service, name));}void sendBizMsg(uint32_t uri, const BizPackage& pkg);sigslot::signal0<> BizConnected;sigslot::signal2<uint32_t, const string&> BizError;sigslot::signal0<> BizClosed;sigslot::signal1<BizPackage&> BizMsgArrived;protected:explicit BizConnection(asio::io_service& io_service, const string& name);/// Implement callbacks in base classvirtual void onConnectSuccess();virtual void onConnectFailure(const asio::error_code& e);virtual void onReceiveHeadSuccess(DataBuffer& data);virtual void onReceiveBodySuccess(DataBuffer& data);virtual void onReceiveFailure(const asio::error_code& e);virtual void onSendSuccess();virtual void onSendFailure(const asio::error_code& e);virtual void onTimeoutFailure(const asio::error_code &e);virtual void onCommonError(uint32_t ec, const string &em);static void initNeedErrorSet();static std::set<uint32_t> m_needError;private:inline bool peekLength(void* data, uint32_t length, uint32_t& outputi32);void handleError(const string& from, uint32_t ec, const asio::error_code& e = asio::error_code());
};inline bool BizConnection::peekLength(void* data, uint32_t length, uint32_t& outputi32)
{if (length >= 4) {memcpy(&outputi32, data, sizeof(uint32_t));return true;} else {return false;}
}#endif // BIZCONNECTION_H

2.1.4 bizconnection.cpp

#include "stdafx.h"
#include "bizconnection.h"
#include "tcpconnection.h"std::set<uint32_t> BizConnection::m_needError;BizConnection::BizConnection(asio::io_service &io_service, const string& name): TcpConnection(io_service, name)
{
}void BizConnection::sendBizMsg(unsigned uri, const BizPackage &pkg)
{/// TODO: usually this BizPackage contains the buffer of stream data to be sent to farpoint/// You should implement this by retriving buffer in BizPackage then call TcpConnection::send();
}void BizConnection::onConnectSuccess()
{receiveHead();BizConnected.emit();
}void BizConnection::onConnectFailure(const asio::error_code &e)
{if (e == asio::error::operation_aborted) {INFO("%s %s %u operation aborted... %s", STR(getName()), STR(getip()), getport(), STR(e.message()));}else if ((e == asio::error::already_connected) || (e == asio::error::already_open) || (e == asio::error::already_started)) {WARN("%s %s %u alread connected... %s", STR(getName()), STR(getip()), getport(), STR(e.message()));} else {handleError("onConnectFailure", S_FATAL, e);}
}void BizConnection::onReceiveHeadSuccess(TcpConnection::DataBuffer &data)
{uint32_t pkglen = 0;if (peekLength(data.data(), data.size(), pkglen)) {receiveBody(pkglen - getHeadLength());} else {handleError("peekLength", S_FATAL);}
}void BizConnection::onReceiveBodySuccess(TcpConnection::DataBuffer &data)
{/// This is simply an example, actually it's user's duty to unmarshal buffer to package.BizPackage msg;BizPackage.unserializeFrom(data);BizMsgArrived.emit(msg);receiveHead();
}void BizConnection::onReceiveFailure(const asio::error_code &e)
{if ((e == asio::error::operation_aborted)) {INFO("%s operation_aborted... %s", STR(getFarpointInfo()), STR(e.message()));} else {handleError("onReceiveFailure", S_FATAL, e);}
}void BizConnection::onSendSuccess()
{/// Leave it blank
}void BizConnection::onSendFailure(const asio::error_code &e)
{if ((e == asio::error::operation_aborted)) {INFO("%s operation_aborted... %s", STR(getFarpointInfo()), STR(e.message()));} else {handleError("onSendFailure", S_FATAL, e);}
}void BizConnection::onTimeoutFailure(const asio::error_code &e)
{if ((e == asio::error::operation_aborted)) {INFO("%s operation_aborted... %s", STR(getFarpointInfo()), STR(e.message()));} else {handleError("onTimeoutFailure", S_FATAL);}
}void BizConnection::onCommonError(uint32_t ec, const string &em)
{handleError(em, ec);
}void BizConnection::handleError(const string& from, uint32_t ec, const asio::error_code &e/* = asio::error_code()*/)
{stringstream ss;ss << getFarpointInfo() << " " << from;if (e) {ss << " asio " << e.value() << " " << e.message();}BizError.emit(ec, ss.str());
}

2.1.5 client.h

#ifndef CLIENT_H
#define CLIENT_H/*** @author yurunsun@gmail.com*/#include "bizconnection.h"
#include "handler.h"
#include "safehandler.h"#include <asio.hpp>
#include <boost/timer.hpp>class Client: public sigslot::has_slots<>
{
protected:BizConnection::BizPtr m_pBizConnection;public:typedef void (Client::*RequestPtr)(BizPackage&);typedef std::map<uint32_t, RequestPtr> RequestMap;typedef void (Client::*NotifyPtr)(BizPackage&);typedef std::map<uint32_t, NotifyPtr> NotifyMap;explicit Client(const string& name = string(""));/// 继承类需要实现的提供外部的方法virtual void startServer() = 0;virtual bool sendToServer(YProto &proto) = 0;virtual void stopServer() {clearWaitforTimer(); m_pBizConnection->stop();}protected:/// 继承类需要实现的初始化函数virtual void initRequestMap() {assert(false);}virtual void initNotifyMap() {assert(false);}virtual void initSignal();/// 继承类需要实现的钩子函数,用于处理网络事件virtual void onBizMsgArrived(core::Request& msg) = 0;virtual void onBizError(uint32_t ec, const string& em);virtual void onBizConnected() = 0;/// 继承类可以使用的工具方法/// 1. 心跳类void setKeepAliveSec(uint32_t sec) {m_keepAliveSec = sec;}uint32_t getKeepAliveSec() {return m_keepAliveSec;}void startKeepAlive();void keepAlive(const asio::error_code& e);virtual void onKeepAlive() {assert(false);}/// 2. 登陆状态类void setHasLogin(bool b) {m_hasLogin = b;}bool getHasLogin() {return m_hasLogin;}bool isOnline() { return (m_pBizConnection->isConnected() && m_hasLogin);}/// 3. 消息保存类template <typename Handler>bool savePendingCommand(Handler handler){if(m_pendingCmd.size() < MaxPendingCommandCount) {m_pendingCmd.push_back(Command(handler));return true;}return false;}void sendPendingCommand(){if (isOnline()) {vector<Command>::iterator it = m_pendingCmd.begin();for(; it != m_pendingCmd.end(); ++it) {(*it)();}m_pendingCmd.clear();}}/// 4. 延迟处理类typedef void (Client::*HoldonCallback)();void holdonSeconds(uint32_t sec, HoldonCallback func);void holdonHandler(HoldonCallback func, const asio::error_code &e);/// 5. waitfor 工具 处理异步消息超时typedef boost::shared_ptr<asio::deadline_timer> SharedTimerPtr;typedef boost::scoped_ptr<asio::deadline_timer> ScopedTimerPtr;typedef boost::shared_ptr<Probe> SharedProbe;typedef map<uint32_t, SharedTimerPtr> Uri2Timer;            /// 等待收到的包uri --> 这个时间timervoid waitfor(uint32_t uri, uint32_t sec);                   /// 在发送req的时候调用,sec 秒数 uri 等待收到的urivoid waitforTimeout(uint32_t uri, const asio::error_code& e); /// 所有waitfor超时都会自动回调这个函数virtual void onWaitforTimeout(uint32_t uri) {(void)uri; assert(false);}         /// 继承类覆盖这个钩子函数来进行错误处理void waitforReceived(uint32_t uri);                         /// 当响应函数handler被回调时,记得调用waitforReceived做清理工作void eraseWaitforTimer(uint32_t uri);void clearWaitforTimer();/// 继承类可以使用的工具成员:心跳 探针 请求阻塞typedef std::set<uint32_t> BlockReq;BlockReq m_block;SharedProbe m_probe;ScopedTimerPtr m_timer;uint32_t m_keepAliveSec;bool m_hasLogin;vector<Command> m_pendingCmd;static const uint32_t MaxPendingCommandCount = 20;ScopedTimerPtr m_holdonTimer;Uri2Timer m_uri2timer;
};#define BIND_REQ(m, uri, callback) \m[static_cast<uint32_t>(uri)] = static_cast<RequestPtr>(callback);#define BIND_NOTIFY(m, uri, callback) \m[static_cast<uint32_t>(uri)] = static_cast<NotifyPtr>(callback);#endif // CLIENT_H

2.1.6 client.cpp

#include "stdafx.h"
#include "client.h"Client::Client(const string& name): m_pBizConnection(BizConnection::create(ioService::instance(), name)), m_probe(new Probe), m_keepAliveSec(10), m_hasLogin(false)
{
}void Client::initSignal()
{m_pBizConnection->BizError.connect(this, &Client::onBizError);m_pBizConnection->BizMsgArrived.connect(this, &Client::onBizMsgArrived);m_pBizConnection->BizConnected.connect(this, &Client::onBizConnected);
}void Client::onBizError(uint32_t ec, const string &em)
{m_facade.serverError.emit(ec, em);
}void Client::startKeepAlive()
{m_timer.reset(new asio::deadline_timer(m_facade.io_service_ref));m_timer->expires_from_now(boost::posix_time::seconds(m_keepAliveSec));m_timer->async_wait(SafeHandler1<Client, const asio::error_code&>(&Client::keepAlive, this, m_probe));
}void Client::keepAlive(const asio::error_code &e)
{if (e != asio::error::operation_aborted) {FINE("%u send ping to %s %s:%u", m_facade.m_pInfo->uid, STR(m_pBizConnection->getName()), STR(m_pBizConnection->getip()), m_pBizConnection->getport());onKeepAlive();m_timer->expires_from_now(boost::posix_time::seconds(m_keepAliveSec ));m_timer->async_wait(SafeHandler1<Client, const asio::error_code&>(&Client::keepAlive, this, m_probe));}
}void Client::holdonSeconds(uint32_t sec, HoldonCallback func)
{m_holdonTimer.reset(new asio::deadline_timer(m_facade.io_service_ref));m_holdonTimer->expires_from_now(boost::posix_time::seconds(sec));SafeHandler1Bind1<Client, HoldonCallback, const asio::error_code&> h(&Client::holdonHandler, this, func, m_probe);m_holdonTimer->async_wait(h);
}void Client::holdonHandler(HoldonCallback func, const asio::error_code &e)
{if (!e) {if (m_holdonTimer != NULL)m_holdonTimer->cancel();(this->*func)();} else {WARN("error: %s", STR(e.message()));}
}void Client::waitfor(uint32_t uri, uint32_t sec)
{SharedTimerPtr t(new asio::deadline_timer(m_facade.io_service_ref));t->expires_from_now(boost::posix_time::seconds(sec));t->async_wait(SafeHandler1Bind1<Client, uint32_t, const asio::error_code&>(&Client::waitforTimeout, this, uri, m_probe));m_uri2timer[uri] = t;
}void Client::waitforTimeout(uint32_t uri, const asio::error_code &e)
{if (e != asio::error::operation_aborted) {FATAL("%s waitfor uri %u timeout", STR(m_pBizConnection->getName()), uri);eraseWaitforTimer(uri);onWaitforTimeout(uri);}
}void Client::waitforReceived(uint32_t uri)
{eraseWaitforTimer(uri);
}void Client::eraseWaitforTimer(uint32_t uri)
{Uri2Timer::iterator it = m_uri2timer.find(uri);if (it != m_uri2timer.end()) {SharedTimerPtr& t = it->second;if (t) {asio::error_code e;t->cancel(e);t.reset();}m_uri2timer.erase(it);}
}void Client::clearWaitforTimer()
{Uri2Timer::iterator it = m_uri2timer.begin();for (; it != m_uri2timer.end(); ++it) {SharedTimerPtr& t = it->second;if (t) {asio::error_code e;t->cancel(e);t.reset();}}m_uri2timer.clear();
}

2.2 作为server模块

作为server模块由于涉及公司的业务比较多,这里剥离出一个作为crossdomain服务器的部分,功能很简单:flash客户端通过socket请求crossdomain配置文件,server返回给定的字符串。这里使用了比较著名的pimpl模式,将实现完全隐藏在cpp文件中。

2.2.1 crossdomain.h

#ifndef CROSSDOMAIN_H
#define CROSSDOMAIN_H#include <string>
#include <boost/shared_ptr.hpp>
#include <asio.hpp>/*** @author yurunsun@gmail.com*/class CrossDomain
{
private:struct Server;boost::shared_ptr<Server> m_pserver;CrossDomain(asio::io_service& io_service, const std::string& local_port);static CrossDomain* s_instance;public:static void create(asio::io_service& io_service, const std::string& local_port){s_instance = new CrossDomain(io_service, local_port);}static CrossDomain* instance();void start_server();
};#endif // CROSSDOMAIN_H

2.2.2 crossdomain.cpp

#include "stdafx.h"
#include "crossdomain.h"using asio::ip::tcp;
using boost::uint8_t;
CrossDomain* CrossDomain::s_instance = NULL;struct CrossDomainImpl : public boost::enable_shared_from_this<CrossDomainImpl>
{
public:static const unsigned MaxReadSize = 22;typedef boost::shared_ptr<CrossDomainImpl> CrossDomainImplPtr;static CrossDomainImplPtr create(asio::io_service& io_service) {return CrossDomainImplPtr(new CrossDomainImpl(io_service));}tcp::socket& get_socket() {return m_socket;}void start() {start_read_some();}~CrossDomainImpl() {close();}void close() {if (m_socket.is_open()) {m_socket.close();}}private:CrossDomainImpl(asio::io_service& io_service): m_socket(io_service){}void start_read_some() {m_socket.async_read_some(asio::buffer(m_readbuf, MaxReadSize),boost::bind(&CrossDomainImpl::handle_read_some, shared_from_this(), asio::placeholders::error()));}void handle_read_some(const asio::error_code& err) {if (!err) {string str(m_readbuf);string reply("invalid");if (str == "<policy-file-request/>") {reply = "anything you wanna send back to client...";}asio::async_write(m_socket, asio::buffer(ref),boost::bind(&CrossDomainImpl::handle_write, shared_from_this(), asio::placeholders::error));}}void handle_write(const asio::error_code& error) {FINE("CrossDomain handle_write, gonna close");close();}tcp::socket m_socket;char m_readbuf[MaxReadSize];
};struct CrossDomain::Server
{
private:CrossDomain *m_facade;tcp::acceptor m_acceptor;bool m_listened;string m_local_port;public:Server(asio::io_service& io_service, const string &local_port): m_acceptor(io_service), m_listened(false), m_local_port(local_port){// intend to leave it blank}~Server() {if (m_acceptor.is_open()) {INFO("close server acceptor");m_acceptor.close();}}void start_server() {FINE("CrossDomain start_server....");if (!m_listened) {FINE("Try to listen...");try {tcp::endpoint ep(tcp::endpoint(tcp::v4(), atoi(m_local_port.c_str())));m_acceptor.open(ep.protocol());m_acceptor.bind(ep);m_acceptor.listen();} catch (const asio::system_error& ec) {WARN("Port %s already in use! Fail to listen...", STR(m_local_port));return;} catch (...) {WARN("Unknown error while trying to listen...");return;}m_listened = true;FINE("Listen port %s succesfully!", STR(m_local_port));}CrossDomainImpl::CrossDomainImplPtr new_server_impl = CrossDomainImpl::create(m_acceptor.get_io_service());m_acceptor.async_accept(new_server_impl->get_socket(),boost::bind(&Server::handle_accept, this, new_server_impl, asio::placeholders::error));}private:void handle_accept(CrossDomainImpl::CrossDomainImplPtr pserver_impl, const asio::error_code& err) {FINE("CrossDomain handle_accpet....");if (!err) {FINE("CrossDomain everything ok, start...");pserver_impl->start();    // start this serverstart_server();           // waiting for another Tuna Connection} else {pserver_impl->close();}}
};CrossDomain::CrossDomain(asio::io_service &io_service, const std::string &local_port): m_pserver(new Server(io_service, local_port))
{
}CrossDomain *CrossDomain::instance()
{if (!s_instance) {return NULL;}return s_instance;
}void CrossDomain::start_server()
{m_pserver->start_server();
}

3. 使用asio的陷阱

上边代码其实有几点漏洞:

3.1 std::vector<uint_8>不适合作为buffer

vector<uint8_t>不适合做buffer的原因是,sgi的内存分配器会以2倍的形式增长vector的内存,例如这个buffer要求100K,但当前vector的capability只有90K,那么sgi默认内存分配器会将vector的capability增长到180K。注意capability与size的区别。这就导致vector的内存占用依赖最大buffer的size,这是很危险的。

推荐使用boost的circular_buffer作为buffer,能有效避免内存碎片、隐式内存泄露等问题。

3.2 asio::const_buffer拷贝构造函数没有深拷贝

const_buffer系列静态buffer只能从mutable_buffermerge过来,但是从const_buffer的拷贝构造函数源码能看到,他并不对buffer做深拷贝。所以试图将其放到队列或者容器中,期待产生buffer的拷贝,是错误的。

3.3. async_write可能会拆包发送

例如先调用async_write发送一个100K的大包,再马上调用async_write发送一个8字节的ping包,非常可能出现问题。async_write函数的实现是循环调用async_write_some,对于大包会将其拆分成几个小报文。如果此时收到用户一个新的async_write调用,非常可能将小包夹在大包的几个部分中间发送,导致接收端出现异常。

解决的办法可以直接操作async_write_some,代替async_write。但更方便的办法是创建一个发送队列。实际上asio会准确地将发送成功的通知发送给用户,例如刚刚100K的打包,直到所有100K全部发送完成,才会调用handle回调。因此可以在发送时将报文入队列,回调函数里将报文出队列,发送下一个小报时判断队列是否为空,如果非空说明100K的包还没有发完。示例代码如下:

///发送时入队列
void TcpConnection::send(const void* data, uint32_t length)
{if (!m_stopped) {if (length <= MAX_BUFFER_SIZE) {const char* begin = (const char*)data;vector<char> vec(begin, begin + length);bool isLastComplete = m_bufQueue.empty();m_bufQueue.push_back(vec);/// 如果没有残余的包,就直接发送if (isLastComplete) {vector<char>& b(m_bufQueue.front());send(b);}} else {onCommonError(S_ERROR, "too big length to call send");}} else {onCommonError(S_ERROR, "illegal to call send while tcp is not connected");}
}void TcpConnection::send(const std::vector<char>& vec)
{if (!m_stopped) {asio::async_write(m_socket, asio::buffer(&vec[0], vec.size()), asio::transfer_all(), boost::bind(&TcpConnection::handleSend, shared_from_this(), asio::placeholders::error));} else {onCommonError(S_ERROR, "illegal to call send while tcp is not connected");}
}///回调函数将之前的buffer出队列,同时检查是否有后来的包
void TcpConnection::handleSend(const asio::error_code &e)
{if (!m_stopped) {if (!e) {m_bufQueue.pop_front();if (!m_bufQueue.empty()) {std::vector<char>& b(m_bufQueue.front());send(b);}//onSendSuccess();} else if (isConnected()){onSendFailure(e);}} else {INFO("%s %s %u user's canceled by stop()", STR(m_name), STR(m_ip), m_port);}
}

转载于:https://my.oschina.net/u/1024573/blog/413756

使用asio搭建商用服务器相关推荐

  1. 微软.NET年芳15:我在Azure上搭建Photon服务器(C#.NET)

    摘录网上的".NET 15周年"信息如下: 微软的 .NET 框架本周迎来了 15 岁生日..NET 的第一个版本在 2002 年 2 月 13 日作为的 Visual Studi ...

  2. 商用服务器系统比较好,商用服务器操作系统都用哪种

    商用服务器操作系统都用哪种 内容精选 换一换 切换弹性云服务器操作系统.支持弹性云服务器数据盘不变的情况下,使用新镜像重装系统盘.调用该接口后,系统将卸载系统盘,然后使用新镜像重新创建系统盘,并挂载至 ...

  3. 使用Nginx搭建直播服务器(nginx-rtmp-module)

    使用Nginx搭建直播服务器 1 简介 2 RTMP Module 3 gcc 4 OpenSSL 5 g++ 6 pcre 7 zlib 8 安装nginx 9 配置文件 10 systemctl ...

  4. Centos7 搭建DNS服务器与原理配置详解

    在搭建我们自己DNS服务器之前,先必须了解下DNS服务器的作用和原理. DNS是在互联网上进行域名解析到对应IP地址的服务器,保存互联网上所有的IP与域名的对应信息,然后将我们对网址的访问,解析成IP ...

  5. 废旧Android手机搭建个人服务器:ksweb搭建Web服务器+Termux、Ngrok实现内网穿透

    写在前面: 本篇博客介绍利用废旧手机搭建一个服务器,可以放自己的网页搭建网站 之前闲着没事,低价买了个虚拟主机,放了自己的静态网页,搭建了个人博客引导网站,后来虚拟主机被打死,凑巧了解到相关知识,于是 ...

  6. 基于Boost.Asio的异步通信服务器设计与开发

     boost::asio 通讯服务器实践 1. 开发环境搭建 1.1. Asio准备 万事开头难.对于一个C++的陌生者,编译一个开源的代码并不是一件轻松愉快的事情.为使大家在审阅和检测本代码可使 ...

  7. 使用Docker搭建svn服务器教程

    使用Docker搭建svn服务器教程 svn简介 SVN是Subversion的简称,是一个开放源代码的版本控制系统,相较于RCS.CVS,它采用了分支管理系统,它的设计目标就是取代CVS.互联网上很 ...

  8. centos7 下搭建git服务器

    总的来说,搭建git和svn形式差不多 1.首先要在服务器安装git环境,创建用户密码和仓库等 2.然后个人主机(基本是window)安装git客户端 3.将个人主机客户端的秘钥写到git服务器的一个 ...

  9. 搭建Git服务器教程转载

    1. 在Windows下使用ssh+msysgit客户端搭建Git服务器 http://www.codeproject.com/Articles/296398/Step-by-Step-Setup-G ...

  10. 使用 Nginx 搭建图片服务器

    点击上方"方志朋",选择"设为星标" 回复"666"获取新整理的面试资料 作者 | ITDragon龙 链接 | cnblogs.com/i ...

最新文章

  1. Drug Target Review | 虚拟现实(VR)用于新药设计
  2. 美国三院院士「迈克尔•乔丹」长文论述:为什么说「人工智能革命」尚未发生...
  3. YYCache 源码分析(一)
  4. c#中将对象序列化为xml(包括list)
  5. 一行上自动控制数据长度,并换行
  6. OWASP TOP 10 1
  7. python建立矩阵原理_怎么用python建立矩阵-问答-阿里云开发者社区-阿里云
  8. 以下不属于时序逻辑电路的有_静态时序分析圣经翻译计划——附录B:SDF
  9. poj 1703 并查集
  10. mongo与sql对比--来自网上
  11. linux脚本编写乘法口诀,shell脚本编写乘法口诀
  12. ns-3 教程 —— 入门
  13. 嵌入式工程师学习路线(软件类)
  14. 以DMA方式开启DAC输出正弦波
  15. Gateway一直访问本地
  16. 【网络安全】小白每天学一点之“监控应用程序行为” [process monitor]
  17. 软件测试之柠檬班python全栈自动化50期测试笔记
  18. 设计模式 ---建造者模式
  19. 【PyTorch】随机种子 与 网络初始化
  20. Linux 中 Netcat 工具的使用

热门文章

  1. 不同计算机的操作码完全相同,单片机课后习题答案
  2. 类文件解析及引申的一系列仇怨
  3. r语言 断轴 画图_R语言之画图(一)
  4. libmaxminddb
  5. PCM音频数据的声音分贝值计算
  6. 上海域格ASR平台4g模块低功耗应用指导
  7. android 行政区域,最新Android使用Jsoup获取省市县行政区划代码行政编码(附源码与Json数据)...
  8. excel-自定义函数及使用
  9. 3GPP TS 23502-h20 中英文对照 | 4.15.6.2 NEF service operations information flow
  10. 谷歌广告已拒登:恶意软件或垃圾软件 如何解决