// ACE_Proactor_Client.cpp : 定义控制台应用程序的入口点。
//#include "stdafx.h"#include "ace/Message_Queue.h"
#include "ace/Asynch_IO.h"
#include "ace/OS.h"
#include "ace/Proactor.h"
#include "ace/Asynch_Connector.h"
#include "ace/SOCK_SEQPACK_Association.h"#pragma comment(lib,"ACEd.lib")class Service_Handler : public ACE_Service_Handler
{
public:Service_Handler(){//ACE_OS::printf("Service_Handler constructed for connector \n");}~Service_Handler (){if (this->handle () != ACE_INVALID_HANDLE)ACE_OS::closesocket (this->handle ());//ACE_OS::printf("one Service_Handler for connecter destructed");}void post_send(void){do {time_t now = ACE_OS::gettimeofday().sec();ACE_Message_Block *mb = new ACE_Message_Block(128);char buff[64];ACE_INET_Addr addr;ACE_SOCK_SEQPACK_Association ass=ACE_SOCK_SEQPACK_Association(this->handle());size_t addr_size=sizeof ACE_INET_Addr;ass.get_local_addrs(&addr,addr_size);//ACE_OS::printf("fd:%d ip:%d port:%d\n",(int)this->handle(), addr.get_ip_address(), addr.get_port_number());sprintf(buff,"%d",addr.get_port_number());mb->copy(buff/*ctime(now)*/);if (this->writer_.write(*mb,mb->length()) !=0){ACE_OS::printf("Begin write fail in open\n");delete this;break;}else{ACE_OS::printf("sended:%s\n",mb->rd_ptr());}} while (0);}void post_recv(void){do {ACE_Message_Block *mb = new ACE_Message_Block(buffer,128);if (this->reader_.read (*mb, mb->space ()) != 0){ACE_OS::printf("Begin read fail\n");delete this;break;}} while (0);}virtual void open (ACE_HANDLE h, ACE_Message_Block&){do {this->handle (h);if (this->writer_.open (*this) != 0 ){ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),ACE_TEXT ("Service_Handler open")));delete this;break;}post_send();if (this->reader_.open (*this) != 0 ){ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),ACE_TEXT ("Service_Handler open")));delete this;break;}post_recv();} while (0);}virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result){result.message_block ().release();//ACE_OS::sleep(1);post_send();}virtual void handle_read_stream(const ACE_Asynch_Read_Stream::Result &result){do {ACE_Message_Block &mb = result.message_block ();if (!result.success () || result.bytes_transferred () == 0){mb.release ();delete this;break;}ACE_OS::printf("received:%s\n",mb.rd_ptr());mb.release();post_recv();} while (0);}
private:ACE_Asynch_Write_Stream writer_;ACE_Asynch_Read_Stream reader_;char buffer[128];
};#include <ace/OS.h>
#include <ace/Task.h>class TTcpNetThread : public ACE_Task_Base
{
public:/// 运行int open();/// 停止运行int close();
protected:/// 线程函数virtual int svc();
};int TTcpNetThread::open() { return this->activate(); }int TTcpNetThread::close()
{ACE_Proactor::instance()->proactor_end_event_loop(); // 终止ACE_Proactor循环this->wait(); // 等待清理现场return 0;
}int TTcpNetThread::svc()
{/*ACE_INET_Addr listenAddr(4567); // 默认监听地址TTcpAcceptor tcpAcceptor; // 接收器// 演出开始if (tcpAcceptor.open(listenAddr, 0, 1, 5, 1, 0, 0) != 0)ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("%p\n"), ACE_TEXT("failed to open TcpAcceptor errno=%i\n"), errno), -1);*/// Proactor的事件循环开始ACE_Proactor::instance()->proactor_run_event_loop();ACE_DEBUG((LM_DEBUG, ACE_TEXT("Network fin\n")));return 0;
}#define TCP_CLIENT_THREAD_SEND 0x777const int CLIENT_CONNECTION_NUM_OF_PER_THREAD = 1; //< 客户端每个线程的连接数#include "ace/SOCK_Connector.h"
#include "ace/SOCK_Stream.h"
/**
* @class TTcpClientThread
* @brief TCP客户端测试线程
*/
class TTcpClientThread : public ACE_Task<ACE_MT_SYNCH>
{ACE_SOCK_Connector connector[CLIENT_CONNECTION_NUM_OF_PER_THREAD]; //< 连接器ACE_SOCK_Stream peerStream[CLIENT_CONNECTION_NUM_OF_PER_THREAD]; //< 流对象public:/// ctor~TTcpClientThread();/// 运行int open();/// 停止运行int close();
private:/// 线程函数virtual int svc();
};TTcpClientThread::~TTcpClientThread()
{for(int i = 0; i < CLIENT_CONNECTION_NUM_OF_PER_THREAD; i++)peerStream[i].close();
}int TTcpClientThread::open() { return this->activate(); }int TTcpClientThread::close()
{ACE_TRACE("TTcpClientThread::close");ACE_Message_Block* termBlock;ACE_NEW_NORETURN(termBlock, ACE_Message_Block(0, ACE_Message_Block::MB_HANGUP));if (!termBlock)ACE_DEBUG((LM_ERROR, ACE_TEXT("Allocate failed %i"), errno));else{putq(termBlock);wait();}return 0;
}int TTcpClientThread::svc()
{ACE_INET_Addr srvAddr(7878, "127.0.0.1");for(int i = 0; i < CLIENT_CONNECTION_NUM_OF_PER_THREAD; i++){if (connector[i].connect(peerStream[i], srvAddr) == -1){ACE_ERROR((LM_ERROR, ACE_TEXT("%i Failed to connect server errno=%i\n"), i, errno));}Sleep(100);}struct TPack{
#pragma pack(push)
#pragma pack(1)unsigned int seq;unsigned short len;char data [128];
#pragma pack(pop)};ACE_Message_Block* msg = 0;ACE_INET_Addr localAddr;ACE_TCHAR localAddrStr[128];peerStream[0].get_local_addr(localAddr);localAddr.addr_to_string(localAddrStr, sizeof(localAddrStr) / sizeof(ACE_TCHAR));TPack data;int len = sizeof(unsigned int) + sizeof(unsigned short);data.seq = 0;data.len = strlen(localAddrStr) + 1;strcpy(data.data, localAddrStr);len += data.len;char tmp[sizeof(TPack)];char buf[256];memcpy(tmp, &data, len);while(true) // 线程循环{if (getq(msg) != -1){switch(msg->msg_type()){case ACE_Message_Block::MB_HANGUP:{msg->release();return 0;}break;default:{for(int i = 0; i < CLIENT_CONNECTION_NUM_OF_PER_THREAD; i++){peerStream[i].send(tmp, 5);Sleep(100);peerStream[i].send(tmp + 5, len - 5);Sleep(100);ACE_Time_Value timeout(2);int recvLen =  peerStream[i].recv_n(buf, sizeof(unsigned int) + sizeof(unsigned short), 0, &timeout);if (recvLen == sizeof(unsigned int) + sizeof(unsigned short)){short dataLen = *(short *)(buf + 4);if (dataLen > 256)dataLen = 256;recvLen = peerStream[i].recv_n(buf, dataLen, 0, &timeout);if (recvLen != dataLen)ACE_DEBUG((LM_INFO, ACE_TEXT("Failed to recv data, length is %i, but only get %i\n"), dataLen, recvLen));elseACE_DEBUG((LM_INFO, ACE_TEXT("Client get data: len=%i data=%s\n"), recvLen, buf));} // if recvLen} // for} // defaultbreak;} // switchmsg->release();} // if getq} // whileACE_DEBUG((LM_INFO, ACE_TEXT("Exit client thread")));return 0;
}#include <vector>
#define CLIENT_THREAD_NUM 4
int main(int argc, char *argv[])
{ACE_INET_Addr remote_addr(4567,ACE_LOCALHOST); std::vector<ACE_Asynch_Connector<Service_Handler> *> vtconnector;for (int i=0;i<2000;i++){ACE_INET_Addr local_addr(10000+i,ACE_LOCALHOST); ACE_Asynch_Connector<Service_Handler> *connector = new ACE_Asynch_Connector<Service_Handler>;connector->open();if (connector->connect(remote_addr,local_addr) == -1)return -1;vtconnector.push_back(connector);}TTcpNetThread netThread[CLIENT_THREAD_NUM];for(int i = 0; i < CLIENT_THREAD_NUM; i++){netThread[i].open();}while (getchar()){ACE_OS::sleep(1);}//ACE_Proactor::instance ()->proactor_run_event_loop();return 0;
}
// ACE_Proactor_Server.cpp : 定义控制台应用程序的入口点。
//#include "stdafx.h"#include "ace/Asynch_IO.h"
#include "ace/OS_main.h"
#include "ace/Proactor.h"
#include "ace/Asynch_Acceptor.h"
#include "ace/INET_Addr.h"
#include "ace/OS.h"
#include "ace/SOCK_Connector.h"
#include "ace/SOCK_Acceptor.h"
#include "ace/SOCK_Stream.h"
#include "ace/Message_Block.h"
#include "ace/Containers.h"
#include "ace/SOCK_SEQPACK_Association.h"ACE_DLList<ACE_Asynch_Write_Stream> wList;class Service_Handler:public ACE_Service_Handler
{
public:Service_Handler(){}~Service_Handler(void){if(this->handle()!=ACE_INVALID_HANDLE)ACE_OS::closesocket(this->handle());}virtual void open(ACE_HANDLE h,ACE_Message_Block &message_block){//handle_= h;//this->handle(h);if(rs_.open(*this,h)){ACE_ERROR ((LM_ERROR,"%p/n","ACE_Asynch_Read_Stream::open"));return;}if(ws_.open(*this)){ACE_ERROR ((LM_ERROR,"%p/n","ACE_Asynch_Write_Stream::open"));return;}if (post_recv()==-1)return;//wList.insert_tail(&ws_);addresses(remote_address,local_address);remote_address.addr_to_string(peer_name,MAXHOSTNAMELEN);ACE_INET_Addr addr;ACE_SOCK_SEQPACK_Association ass=ACE_SOCK_SEQPACK_Association(h);size_t addr_size=sizeof ACE_INET_Addr;ass.get_remote_addrs(&addr,addr_size);ACE_OS::printf("fd:%d ip:%d port:%d\n",(int)h, addr.get_ip_address(), addr.get_port_number());//ACE_DEBUG((LM_DEBUG,ACE_TEXT("peer:%s\n"),peer_name));}
protected:int post_recv(void){ACE_Message_Block *mb=0;ACE_NEW_RETURN(mb,ACE_Message_Block(512),-1);if(rs_.read(*mb,mb->space())==-1){ACE_ERROR_RETURN((LM_ERROR,"%p/n","ACE_Asynch_Read_Stream::read"),-1);}return 0;}virtual void handle_read_stream(const ACE_Asynch_Read_Stream::Result &result){//ACE_HANDLE h = result.handle();ACE_Message_Block &mb = result.message_block ();if (result.success()&&result.bytes_transferred()!=0){ACE_DEBUG((LM_DEBUG,ACE_TEXT("recv:%s\n"),mb.rd_ptr()));if (ws_.write(*mb.duplicate(),result.message_block().length())==-1){ACE_ERROR ((LM_ERROR,"%p\n","ACE_Asynch_Write_Stream::write"));}/*ACE_DLList_Iterator<ACE_Asynch_Write_Stream> iter(wList);while(!iter.done()){if (iter.next()->write(*result.message_block().duplicate(),result.message_block().length())==-1){ACE_ERROR ((LM_ERROR,"%p/n","ACE_Asynch_Write_Stream::write"));}iter++;}*/mb.release();post_recv();}else{mb.release();/*ACE_DLList_Iterator<ACE_Asynch_Write_Stream> iter(wList);while (!iter.done ()){if(&ws_==iter.next()){iter.remove();break;}iter++;}*/delete this;}}virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result){//ACE_OS::printf("write complete:%d %d\n", result.success(),result.bytes_transferred());result.message_block().release();}
private:ACE_Asynch_Read_Stream rs_;ACE_Asynch_Write_Stream ws_;ACE_HANDLE handle_;ACE_TCHAR peer_name[MAXHOSTNAMELEN];ACE_INET_Addr remote_address;ACE_INET_Addr local_address;
};#include <ace/OS.h>
#include <ace/Task.h>class TTcpNetThread : public ACE_Task_Base
{
public:/// 运行int open();/// 停止运行int close();
protected:/// 线程函数virtual int svc();
};int TTcpNetThread::open() { return this->activate(); }int TTcpNetThread::close()
{ACE_Proactor::instance()->proactor_end_event_loop(); // 终止ACE_Proactor循环this->wait(); // 等待清理现场return 0;
}int TTcpNetThread::svc()
{/*ACE_INET_Addr listenAddr(4567); // 默认监听地址TTcpAcceptor tcpAcceptor; // 接收器// 演出开始if (tcpAcceptor.open(listenAddr, 0, 1, 5, 1, 0, 0) != 0)ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("%p\n"), ACE_TEXT("failed to open TcpAcceptor errno=%i\n"), errno), -1);*/// Proactor的事件循环开始ACE_Proactor::instance()->proactor_run_event_loop();ACE_DEBUG((LM_DEBUG, ACE_TEXT("Network fin\n")));return 0;
}#pragma comment(lib,"ACEd.lib")#define CLIENT_THREAD_NUM 4int main(int argc,char *argv[])
{ACE_Asynch_Acceptor<Service_Handler> acceptor;if(acceptor.open(ACE_INET_Addr(4567),0,1) == -1){return -1;}TTcpNetThread netThread[CLIENT_THREAD_NUM];for(int i = 0; i < CLIENT_THREAD_NUM; i++){netThread[i].open();}while (getchar()){ACE_OS::sleep(1);}//ACE_Proactor::instance()->proactor_run_event_loop();return 0;
};

ACE之Proactor模式使用实例相关推荐

  1. ACE之Reactor模式使用实例

    // ACE_Reactor_Client.cpp : 定义控制台应用程序的入口点. //#include "stdafx.h"#include "ace/Reactor ...

  2. ACE前摄器Proactor模式

    版权声明:转载时请以超链接形式标明文章原始出处和作者信息及本声明 http://egeho123.blogbus.com/logs/10780720.html ACE前摄器Proactor模式 当 O ...

  3. ACE网络编程模式比较

    ACE将网络编程进行了模式化,以便你不必每次都重复相同的代码. 网络编程需要处理的事情多括中断,并发,多线程等,程序格式相对固定,但是健壮的网络程序则相对复杂.为了处理这些情形,ACE内建了几个网络编 ...

  4. Reactor模式与Proactor模式

    博主一脚刚踏进分布式的大门(看<分布式Java应用>,如果大家有啥推荐的书欢迎留言~),发现书中对NIO采用的Reactor模式.AIO采用的Proactor模式一笔带过,好奇心趋势我找了 ...

  5. reactor与proactor模式

    首先来看看Reactor模式,Reactor模式应用于同步I/O的场景.我们以读操作为例来看看Reactor中的具体步骤: 读取操作: 1. 应用程序注册读就需事件和相关联的事件处理器 2. 事件分离 ...

  6. 高性能IO设计中的Reactor模式与Proactor模式

    为什么80%的码农都做不了架构师?>>>    在高性能的IO设计中,有两个比较著名的模式Reactor和Proactor模式,其中Reactor模式用于同步I/O,而Proacto ...

  7. 在WildFly中运行多个standalone模式的实例

    WildFly作为一款优秀的EJB容器,其前身为JBoss AS.JBoss作为一款开源的应用服务器,被广泛的应用在各种项目当中.假设我们现在有这样一个项目,他是以standalone的模式运行在Wi ...

  8. 用ACE的Reactor模式实现网络通讯的例子

    用ACE的Reactor模式实现网络通讯的例子,不罗嗦,直接上代码. 服务器代码: [cpp] view plain copy #include <ace/Reactor.h> #incl ...

  9. OpenGL Subdivision Modes细分模式的实例

    OpenGL Subdivision Modes细分模式的实例 先上图,再解答. 完整主要的源代码 源代码剖析 先上图,再解答. 完整主要的源代码 #include <sb7.h> #&l ...

最新文章

  1. linux的cat命令
  2. linux GPIO驱动详解
  3. CodeForces - 1141D Colored Boots(暴力+水题)
  4. HDU 1059 Dividing
  5. javafx隐藏_JavaFX技巧14:StackPane子项-隐藏但不消失
  6. springcloud Feign工程熔断器Hystrix
  7. UVA474 Heads / Tails Probability【数学】
  8. 一个人独立完成一个网站上线的前前后后
  9. 与其他的PDF编辑器相比,迅捷PDF编辑器实在好用太多了
  10. 机器学习与数据挖掘的区别(一点个人理解)
  11. 解决 Could not find common.jar (android.arch.core:common:1.0.0). 错误
  12. 恩尼格玛计划续章…以及,我们正在招贤纳士
  13. 操作系统的主要功能是什么
  14. 大部分人都容易焦虑,那么应该如何对待焦虑呢?
  15. android修改图片(修改图片大小,图片旋转,图片平移)
  16. java path 注解_Java内置系统注解和元注解
  17. Repository “http://xxx@git.xxx.net/xxx/xxx.git”not found 解决
  18. static 函数和变量
  19. 2021年Vue最常见的面试题以及答案(面试必过)
  20. 抓包工具--Fiddler

热门文章

  1. android 自定义view 动画效果,Android自定义view----音乐播放动画
  2. python读取txt行问题
  3. C++单链表学习随想
  4. vs2010MFC D3D播放YUV格式视频详细制作全过程
  5. log4j2入门(四) log4j2.xml配置文件详细实例
  6. jqGrid使用整理
  7. 2016年计算机视觉和图像处理相关的国际会议一览表
  8. Python配置OpenCV时报错:ImportError DLL load failed: %1 不是有效的 Win32 应用程序
  9. 《Effective STL》学习笔记(第三部分)
  10. ‘聪明的搜索算法’ A*算法