【Boost】boost库asio详解9——TCP的简单例子2
客户端:
// Client.cpp : 定义控制台应用程序的入口点。
//#include "stdafx.h"
#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>using boost::asio::ip::tcp;
using boost::asio::ip::address;class session: public boost::enable_shared_from_this<session> {
public:session(boost::asio::io_service &io_service, tcp::endpoint &endpoint): io_service_(io_service), socket_(io_service), endpoint_(endpoint){ } void start() {socket_.async_connect(endpoint_,boost::bind(&session::handle_connect, shared_from_this(), boost::asio::placeholders::error));} private:void handle_connect(const boost::system::error_code &error) {if (error) {if (error.value() != boost::system::errc::operation_canceled) {std::cerr << boost::system::system_error(error).what() << std::endl;} socket_.close();return;} static tcp::no_delay option(true);socket_.set_option(option);strcpy(buf, "Hello World!\n");boost::asio::async_write(socket_, boost::asio::buffer(buf, strlen(buf)),boost::bind(&session::handle_write, shared_from_this(), boost::asio::placeholders::error,boost::asio::placeholders::bytes_transferred));} void handle_write(const boost::system::error_code& error, size_t bytes_transferred) {memset(buf, sizeof(buf), 0); boost::asio::async_read_until(socket_, sbuf,"\n",boost::bind(&session::handle_read,shared_from_this(),boost::asio::placeholders::error,boost::asio::placeholders::bytes_transferred));}void handle_read(const boost::system::error_code& error, size_t bytes_transferred) {std::cout << buf << std::endl;}private:boost::asio::io_service &io_service_;tcp::socket socket_;tcp::endpoint &endpoint_;char buf[1024];boost::asio::streambuf sbuf;
};typedef boost::shared_ptr<session> session_ptr;int main(int argc, char* argv[])
{boost::asio::io_service io_service;tcp::endpoint endpoint(address::from_string("172.16.6.70"), 10028);session_ptr new_session(new session(io_service, endpoint));new_session->start();io_service.run();return 0;
}
服务器:
#include <string.h>
#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>using boost::asio::ip::tcp;
using boost::asio::ip::address;class session: public boost::enable_shared_from_this<session> {
public:session(boost::asio::io_service &io_service): socket_(io_service){ } void start() {static tcp::no_delay option(true);socket_.set_option(option);boost::asio::async_read_until(socket_, sbuf_,"\n",boost::bind(&session::handle_read, shared_from_this(), boost::asio::placeholders::error,boost::asio::placeholders::bytes_transferred));} tcp::socket &socket() {return socket_;} private:void handle_write(const boost::system::error_code& error, size_t bytes_transferred) {boost::asio::async_read_until(socket_, sbuf_,"\n",boost::bind(&session::handle_read, shared_from_this(), boost::asio::placeholders::error,boost::asio::placeholders::bytes_transferred));} void handle_read(const boost::system::error_code& error, size_t bytes_transferred) {boost::asio::async_write(socket_, sbuf_,boost::bind(&session::handle_write, shared_from_this(), boost::asio::placeholders::error,boost::asio::placeholders::bytes_transferred));std::ostream os(&sbuf_);printf("%s \n",sbuf_.data());}private:tcp::socket socket_;boost::asio::streambuf sbuf_;
};typedef boost::shared_ptr<session> session_ptr;class server {
public:server(boost::asio::io_service &io_service, tcp::endpoint &endpoint): io_service_(io_service), acceptor_(io_service, endpoint){session_ptr new_session(new session(io_service_));acceptor_.async_accept(new_session->socket(),boost::bind(&server::handle_accept,this,new_session,boost::asio::placeholders::error));}void handle_accept(session_ptr new_session, const boost::system::error_code& error) {if (error) {return;}new_session->start();new_session.reset(new session(io_service_));acceptor_.async_accept(new_session->socket(),boost::bind(&server::handle_accept,this,new_session,boost::asio::placeholders::error));}void run() {io_service_.run();}private:boost::asio::io_service &io_service_;tcp::acceptor acceptor_;
};int main(int argc, char* argv[])
{boost::asio::io_service io_service;//tcp::endpoint endpoint(address::from_string("172.16.6.70"), 10028);tcp::endpoint endpoint(boost::asio::ip::tcp::v4(), 10028);server s(io_service, endpoint);s.run();return 0;
}
编译:
g++ -Wall -o client client.cpp -lboost_system
g++ -Wall -o server server.cpp -lboost_system
这里需要注意的是: async_write, async_read, async_read_until 都是需要达到特定条件才会调用回调函数,
在调用回调函数之前, 不能再次调用, 否则接收到的数据很可能是乱的. 所以, 在实际代码当中, 会有一个写缓冲队列, 当需要write的时, 先放到队列中, 如果队列个数为1, 则调用async_write, 否则等待函数回调, 当函数回调时将首个元素从队列中移除, 然后接着发送下一个, 直到队列为空.
对于client, 由于is_open在async_connect之后就是true状态了, 因此在async_connect回调返回之前没有方法知道是否已经连接成功, 实际代码当中一般会增加一个变量以表示该套接字是否已经允许发送数据.
============================================================================
服务器:
- #define PACK_MAX_SIZE 16*1024
- class TCPConnection: public boost::enable_shared_from_this<TCPConnection> {
- public:
- static boost::shared_ptr<TCPConnection> create(IoService& ioService);
- tcp::socket& getSocket();
- void start();
- private:
- TCPConnection(IoService& ioService);
- void handleRead(const boost::system::error_code& e,size_t bytesTransferred);
- void handleWrite(const boost::system::error_code& e,size_t bytesTransferred);
- //TCP的socket
- tcp::socket socket;
- //接收和发送的缓冲区
- char m_buffer[PACK_MAX_SIZE];
- size_t m_bytesReceived;
- };
- typedef boost::shared_ptr<TCPConnection> pointer;
- class TCPServer {
- public:
- TCPServer(IoService& ioService, int port);
- public:
- private:
- void startAccept();
- void handleAccept(boost::shared_ptr<TCPConnection> newConnection,
- const boost::system::error_code& error);
- private:
- tcp::acceptor acceptor;
- };
- TCPConnection::TCPConnection(IoService& ioService) :socket(ioService) {
- m_bytesReceived = 0;
- }
- boost::shared_ptr<TCPConnection> TCPConnection::create(IoService& ioService) {
- return pointer(new TCPConnection(ioService));
- }
- tcp::socket& TCPConnection::getSocket() {
- return socket;
- }
- void TCPConnection::start() {
- static tcp::no_delay option( true );
- socket.set_option( option );
- socket.set_option(boost::asio::socket_base::keep_alive(true));
- memset(m_buffer, 0x0, 2048);
- socket.async_read_some(boost::asio::buffer(m_buffer),
- boost::bind(&TCPConnection::handleRead, shared_from_this(),
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred));
- }
- void TCPConnection::sendTermMessage( long phoneNum, unsigned short serialNum, unsigned char cmd, void* rawData )
- {
- socket.async_write_some(
- boost::asio::buffer( resp->mem, resp->size ),
- boost::bind(
- &TCPConnection::handleWrite,
- shared_from_this(),
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred
- )
- );
- }
- void TCPConnection::handleRead(const boost::system::error_code& error, size_t bytesTransferred)
- {
- //cout <<"************************socket read " << bytesTransferred << " bytes codes ***************************"<< endl;
- if( error != 0 || bytesTransferred == 0 )
- {
- std::cout << "!!! network exception: err(" << error.value() << "), " << boost::system::system_error( error ).what() << " !!!" << std::endl;
- socket.close();
- CmdQueue::getInstance()->cancelSubscribe( this );
- return;
- }
- std::cout << "received some from remote serivce";
- time_t nowTime = time(NULL);
- std::cout << ":(time:"<<nowTime<<")" << SQ::Utils::bytes2HexString( (SQ::Byte *) m_buffer, bytesTransferred<=40?bytesTransferred:40);
- if (bytesTransferred<=40)
- {
- std::cout<< std::endl;
- }
- else
- {
- cout << "..." <<endl;
- }
- //解析消息(主要考虑到TCP分包和TCP粘包)
- size_t m_bytesReceived = 0;
- m_bytesReceived += bytesTransferred;
- size_t dealDataSize = 0;
- if (m_bytesReceived>=sizeof(TCPHead))
- {
- int offset = 0;
- while (true)//考虑到TCP粘包问题,做个循环
- {
- //剩余缓冲区不足消息头
- offset = dealDataSize;
- if (m_bytesReceived-offset < sizeof(TCPHead)) break;
- TCPHead* head = (TCPHead*)(m_buffer + offset);
- unsigned long phoneNum = SQ::Utils::BCD2long( head->terminalID, PHONENUM_LENGTH );
- offset += PHONENUM_LENGTH;
- unsigned short serialNum = ntohs( head->serialNum );
- offset += 2;
- unsigned char cmd = head->cmd ;
- offset += 1;
- int bodyLen = ntohl( head->bodyLen );
- offset += 4;
- if (m_bytesReceived-offset < bodyLen) break;
- SQ::Byte *pRawData = (SQ::Byte *)m_buffer + offset;
- sendUDPPackToTerminal(phoneNum, pRawData, bodyLen);
- dealDataSize += (sizeof(TCPHead)+bodyLen);
- }
- }
- //处理数据之后刷新缓冲区
- if (dealDataSize > 0)
- {
- memmove(m_buffer,m_buffer+dealDataSize,m_bytesReceived-dealDataSize);
- m_bytesReceived -= dealDataSize;
- }
- socket.async_read_some(boost::asio::buffer(m_buffer + m_bytesReceived, PACK_MAX_SIZE-m_bytesReceived),
- boost::bind(&TCPConnection::handleRead, shared_from_this(),
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred));
- return;
- }
- void TCPConnection::handleWrite( const boost::system::error_code& error, size_t bytesTransferred)
- {
- if( error != 0 )
- {
- std::cout << "write error." << std::endl;
- }
- }
- TCPServer::TCPServer(IoService& ioService, int port) :
- acceptor(ioService, tcp::endpoint(tcp::v4(), port)) {
- startAccept();
- }
- void TCPServer::startAccept() {
- boost::shared_ptr<TCPConnection> newConnection = TCPConnection::create(
- acceptor.get_io_service());
- acceptor.async_accept(newConnection->getSocket(),
- boost::bind(&TCPServer::handleAccept, this, newConnection,
- boost::asio::placeholders::error));
- }
- void TCPServer::handleAccept(boost::shared_ptr<TCPConnection> newConnection,
- const boost::system::error_code& error) {
- if (!error) {
- newConnection->start();
- startAccept();
- }
- }
客户端:
- class CommonSession : public boost::enable_shared_from_this<CommonSession>
- {
- //外部接口
- public:
- CommonSession( boost::asio::io_service *io_service, const std::string &ip, short port);
- void start();
- void sendDataBytes(SQ::Byte *pByte, int len);
- bool getConnectState() {return m_isConnected;}
- //内部接口
- protected:
- void onConnect( const boost::system::error_code &error );
- void onWrite( const boost::system::error_code& error, size_t bytes_transferred );
- void onRead( const boost::system::error_code& error, size_t bytes_transferred );
- private:
- tcp::socket m_socket; //套接字
- char m_buffer[PACK_MAX_SIZE]; //接收缓冲区
- bool m_isConnected; //连接状态
- std::string m_ip; //目标 I P
- short m_port; //目标端口
- size_t m_bytesReceived; //收字节数
- unsigned short m_serialNum; //发流水号
- boost::recursive_mutex m_mutex; //互斥体
- boost::asio::io_service* m_io_service; //I O 服务
- };
- CommonSession::CommonSession( boost::asio::io_service *io_service, const std::string &ip, short port )
- : m_io_service( io_service ), m_socket( *io_service ), m_ip( ip ), m_port( port )
- {
- m_bytesReceived = 0;
- m_isConnected = false;
- haveLog = false;
- }
- void CommonSession::onConnect( const boost::system::error_code & error )
- {
- if( error )
- {
- //错误LOG只输出一次
- if (!haveLog)
- {
- haveLog = true;
- //连接失败
- std::cout << "connect to " << m_ip << ":" << m_port << " failed!" << " -> ERR : ";
- if( error.value() != boost::system::errc::operation_canceled )
- {
- std::cerr << boost::system::system_error( error ).what() << std::endl;
- }
- }
- //3秒后重新连接
- m_socket.close();
- boost::this_thread::sleep( boost::posix_time::seconds( 3 ) );
- start();
- }
- else
- {
- std::cout << "connect to " << m_ip << ":" << m_port << " successed! (time:"<< time(NULL)<<")" << std::endl;
- m_isConnected = true;
- static tcp::no_delay option( true );
- m_socket.set_option( option );
- m_socket.set_option( boost::asio::socket_base::keep_alive( true ) );
- //重发未发送的消息
- //发起读事件
- m_socket.async_read_some( boost::asio::buffer( m_buffer ),
- boost::bind( &CommonSession::onRead,
- shared_from_this(),
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred ) );
- }
- }
- void CommonSession::onWrite( const boost::system::error_code& error, size_t bytes_transferred )
- {
- if( error != 0 )
- {
- std::cout << "!!! send error, code is " << error<<" !!!"<<std::endl;
- std::cerr << boost::system::system_error( error ).what() << std::endl;
- }
- }
- void CommonSession::onRead( const boost::system::error_code& error, size_t bytes_transferred )
- {
- //cout <<"************************socket read " << bytes_transferred << " bytes codes ***************************"<< endl;
- //出错检查
- if( error != 0 )
- {
- std::cout << "!!! onRead error, code is " << error<<" !!!"<<std::endl;
- std::cerr << boost::system::system_error( error ).what() << std::endl;
- m_socket.close();
- //3秒后重新连接
- boost::this_thread::sleep( boost::posix_time::seconds( 3 ) );
- m_isConnected = false;
- start();
- return;
- }
- else if( bytes_transferred == 0 )
- {
- m_socket.close();
- //3秒后重新连接
- boost::this_thread::sleep( boost::posix_time::seconds( 3 ) );
- m_isConnected = false;
- start();
- return;
- }
- //解析消息(主要考虑到TCP分包和TCP粘包)
- size_t m_bytesReceived = 0;
- m_bytesReceived += bytes_transferred;
- size_t dealDataSize = 0;
- if (m_bytesReceived>=sizeof(TCPHead))
- {
- int offset = 0;
- while (true)//考虑到TCP粘包问题,做个循环
- {
- //剩余缓冲区不足消息头
- offset = dealDataSize;
- if (m_bytesReceived-offset < sizeof(SQ::TCPHead)) break;
- SQ::TCPHead* head = (SQ::TCPHead*)(m_buffer + offset);
- unsigned long phoneNum = SQ::Utils::BCD2long( head->terminalID, PHONENUM_LENGTH );
- offset += PHONENUM_LENGTH;
- unsigned short serialNum = ntohs( head->serialNum );
- offset += 2;
- unsigned char cmd = head->cmd ;
- offset += 1;
- int bodyLen = ntohl( head->bodyLen );
- offset += 4;
- if (bodyLen == 0)//处理来自网关的数据,而不是终端
- {
- std::cout<< "receive(time:"<< time(NULL)<<") command from gate, cmd:" << (int)cmd << " phoneNum:"<<phoneNum<<std::endl;
- dealCmd(cmd, phoneNum);
- dealDataSize += sizeof(SQ::TCPHead);
- }
- else//处理来自终端的消息
- {
- //剩余缓冲区不足消息体
- if (m_bytesReceived-offset < bodyLen) break;
- //取出808消息体
- char *pRawData = m_buffer + offset;
- //解开消息头
- CodecMsgHead csMsgHead;
- AbstractCodec tmpCode;
- tmpCode.decodeHead((SQ::Byte*)pRawData, &csMsgHead);
- //处理消息长度
- dealDataSize += (sizeof(SQ::TCPHead)+bodyLen);
- }
- }
- }
- //处理数据之后刷新缓冲区
- if (dealDataSize > 0)
- {
- memmove(m_buffer,m_buffer+dealDataSize,m_bytesReceived-dealDataSize);
- m_bytesReceived -= dealDataSize;
- }
- //发起读事件
- m_socket.async_read_some( boost::asio::buffer(m_buffer + m_bytesReceived, PACK_MAX_SIZE-m_bytesReceived),
- boost::bind( &CommonSession::onRead,
- shared_from_this(),
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred ) );
- }
- void CommonSession::start()
- {
- tcp::endpoint ep = tcp::endpoint( address::from_string( m_ip ), m_port );
- m_socket.async_connect( ep,
- boost::bind( &CommonSession::onConnect,
- shared_from_this(),
- boost::asio::placeholders::error ) );
- }
- //发送字节流,编码808协议之后的字节流
- void CommonSession::sendDataBytes(SQ::Byte *pByte, int len)
- {
- //检测连接
- if (!m_isConnected)
- {
- std::cout<< "connection(to "<<m_ip<<":"<< m_port<<") is not established, so sendDataBytes faild" <<std::endl;
- return;
- }
- //发送消息
- time_t nowTime = time(NULL);
- std::cout << "send:(time:"<<nowTime<<")" << SQ::Utils::bytes2HexString( pByte, len<=40?len:40);
- if (len<=40)
- {
- std::cout<< std::endl;
- }
- else
- {
- cout << "..." <<endl;
- }
- // std::cout << "send:(time:"<<nowTime<<")" << SQ::Utils::bytes2HexString( pByte, len) <<endl;
- boost::asio::async_write(
- m_socket,
- boost::asio::buffer( pByte, len),
- boost::bind(&CommonSession::onWrite,shared_from_this(),boost::asio::placeholders::error,boost::asio::placeholders::bytes_transferred)
- );
- }
【Boost】boost库asio详解9——TCP的简单例子2相关推荐
- 【Boost】boost库asio详解8——TCP的简单例子1
摘于boost官网的几个例子, 做了点小修改, 笔记之. 同步客户端 [cpp] view plain copy print? void test_asio_synclient() { typede ...
- 【Boost】boost库asio详解9——UDP的简单例子1
服务器: #include "stdafx.h" #include <iostream> #include <boost/asio.hpp> #includ ...
- 【Boost】boost库asio详解7——boost::asio::buffer用法
1. asio::buffer常用的构造方法 asio::buffer有多种的构造方法,而且buffer大小是自动管理的 1.1 字符数组 [cpp] view plain copy print? ...
- 【Boost】boost库asio详解3——io_service作为work pool
无论如何使用,都能感觉到使用boost.asio实现服务器,不仅是一件非常轻松的事,而且代码很漂亮,逻辑也相当清晰,这点上很不同于ACE. 使用io_service作为处理工作的work pool,可 ...
- 【Boost】boost库asio详解5——resolver与endpoint使用说明
tcp::resolver一般和tcp::resolver::query结合用,通过query这个词顾名思义就知道它是用来查询socket的相应信息,一般而言我们关心socket的东东有address ...
- 【Boost】boost库asio详解2——io_service::run函数无任务时退出的问题
io_service::work类可以使io_service::run函数在没有任务的时候仍然不返回,直至work对象被销毁. [cpp] view plaincopy print? void tes ...
- 【Boost】boost库asio详解6——boost::asio::error的用法浅析
1. 概述 一般而言我们创建用于接收error的类型大多声明如下: [cpp] view plain copy print? boost::system::error_code error 我们用这 ...
- 【Boost】boost库asio详解4——deadline_timer使用说明
deadline_timer和socket一样,都用io_service作为构造函数的参数.也即,在其上进行异步操作,都将导致和io_service所包含的iocp相关联.这同样意味着在析构 io_s ...
- 【Boost】boost库asio详解2——strand与io_service区别
[cpp] view plain copy print? namespace { // strand提供串行执行, 能够保证线程安全, 同时被post或dispatch的方法, 不会被并发的执行. ...
最新文章
- python入门视频教程推荐-python入门学习哪个书比较好(python视频教程知乎)
- shell脚本[] [[]] -n -z 的含义解析
- MVC设计模式学习总结
- poj 2388 排序的水题
- Spark源码分析之SparkContext
- Linux内核深入理解系统调用(3):open 系统调用实现以及资源限制(setrlimit/getrlimit/prlimit)
- csrf 攻击及防御
- 奥威尔:老大哥在看着你-软件公司十诫
- vue中的阿里巴巴矢量图标使用
- 步进电机 高速光耦_光耦的参数以及高速光耦如何选型!-先进光半导体
- 时分秒倒计时的js实现
- 【小白向】利用笔记本+网线让台式机上网
- iOS 玩不转的GCD第0课时
- 【它山之石,可以攻玉】关于求职(实习)面试经验(2)
- Android studio 改app图标,名字及一键拨号
- 易快借的额度有多少?易快借的利息怎么计算?
- 认识图,用矩阵表示图
- 使用endnote来查找目标期刊
- Mac上NVM 安装与使用教程
- fastadmin input rule 自定义