1.ACE反应器框架简介

反应器(Reactor):用于事件多路分离和分派的体系结构模式

通常的,对一个文件描述符指定的文件或设备, 有两种工作方式: 阻塞与非阻塞。所谓阻塞方式的意思是指, 当试图对该文件描述符进行读写时,如果当时没有东西可读,或者暂时不可写, 程序就进入等待状态, 直到有东西可读或者可写为止。而对于非阻塞状态,如果没有东西可读, 或者不可写, 读写函数马上返回,而不会等待。

在前面的章节中提到的Tcp通信的例子中,就是采用的阻塞式的工作方式:当接收tcp数据时,如果远端没有数据可以读,则会一直阻塞到读到需要的数据为止。这种方式的传输和传统的被动方法的调用类似,非常直观,并且简单有效,但是同样也存在一个效率问题,如果你是开发一个面对着数千个连接的服务器程序,对每一个客户端都采用阻塞的方式通信,如果存在某个非常耗时的读写操作时,其它的客户端通信将无法响应,效率非常低下。

一种常用做法是:每建立一个Socket连接时,同时创建一个新线程对该Socket进行单独通信(采用阻塞的方式通信)。这种方式具有很高的响应速度,并且控制起来也很简单,在连接数较少的时候非常有效,但是如果对每一个连接都产生一个线程的无疑是对系统资源的一种浪费,如果连接数较多将会出现资源不足的情况。

另一种较高效的做法是:服务器端保存一个Socket连接列表,然后对这个列表进行轮询,如果发现某个Socket端口上有数据可读时(读就绪),则调用该socket连接的相应读操作;如果发现某个Socket端口上有数据可写时(写就绪),则调用该socket连接的相应写操作;如果某个端口的Socket连接已经中断,则调用相应的析构方法关闭该端口。这样能充分利用服务器资源,效率得到了很大提高。

在Socket编程中就可以通过select等相关API实现这一方式。但直接用这些API控制起来比较麻烦,并且也难以控制和移植,在ACE中可以通过Reactor模式简化这一开发过程。

反应器本质上提供一组更高级的编程抽象,简化了事件驱动的分布式应用的设计和实现。除此而外,反应器还将若干不同种类的事件的多路分离集成到易于使用的API中。特别地,反应器对基于定时器的事件、信号事件、基于I/O端口监控的事件和用户定义的通知进行统一地处理。

ACE中的反应器与若干内部和外部组件协同工作。其基本概念是反应器框架检测事件的发生(通过在OS事件多路分离接口上进行侦听),并发出对预登记事件处理器(eventhandler)对象中的方法的"回调"(callback)。该方法由应用开发者实现,其中含有应用处理此事件的特定代码。

使用ACE的反应器,只需如下几步:

  1. 创建事件处理器,以处理他所感兴趣的某事件。
  2. 在反应器上登记,通知说他有兴趣处理某事件,同时传递他想要用以处理此事件的事件处理器的指针给反应器。

随后反应器框架将自动地:

  1. 在内部维护一些表,将不同的事件类型与事件处理器对象关联起来。
  2. 在用户已登记的某个事件发生时,反应器发出对处理器中相应方法的回调。

反应器模式在ACE中被实现为ACE_Reactor类,它提供反应器框架的功能接口。

如上面所提到的,反应器将事件处理器对象作为服务提供者使用。反应器内部记录某个事件处理器的特定事件的相关回调方法。当这些事件发生时,反应器会创建这种事件和相应的事件处理器的关联。

  1. 事件处理器
    事件处理器就是需要通过轮询发生事件改变的对象列表中的对象,如在上面的例子中就是连接的客户端,每个客户端都可以看成一个事件处理器。
  2. 回调事件
    就是反应器支持的事件,如Socket读就绪,写就绪。拿上面的例子来说,如果某个客户端(事件处理器)在反应器中注册了读就绪事件,当客户端给服务器发送一条消息的时候,就会触发这个客户端的数据可读的回调函数。

在反应器框架中,所有应用特有的事件处理器都必须由ACE_Event_Handler的抽象接口类派生。可以通过重载相应的"handle_"方法实现相关的回调方法。

使用ACE_Reactor基本上有三个步骤:

  1. 创建ACE_Event_Handler的子类,并在其中实现适当的"handle_"方法,以处理你想要此事件处理器为之服务的事件类型。
  2. 通过调用反应器对象的register_handler(),将你的事件处理器登记到反应器。
  3. 在事件发生时,反应器将自动回调相应的事件处理器对象的适当的handle_"方法。

下面我就以一个Socket客户端的例子为例简单的说明反应器的基本用法。

Cpp代码  
  1. #include <ace/OS.h>
  2. #include <ace/Reactor.h>
  3. #include <ace/SOCK_Connector.h>
  4. #include <string>
  5. #include <iostream>
  6. using namespace std;
  7. class MyClient:public ACE_Event_Handler
  8. {
  9. public:
  10. bool open()
  11. {
  12. ACE_SOCK_Connector connector;
  13. ACE_INET_Addr addr(3000,"127.0.0.1");
  14. ACE_Time_Value timeout(5,0);
  15. if(connector.connect(peer,addr,&timeout) != 0)
  16. {
  17. cout<<endl<<"connecetd fail";
  18. return false;
  19. }
  20. ACE_Reactor::instance()->register_handler(this,ACE_Event_Handler::READ_MASK);
  21. cout<<endl<<"connecetd ";
  22. return true;
  23. }
  24. ACE_HANDLE get_handle(void) const
  25. {
  26. return peer.get_handle();
  27. }
  28. int handle_input (ACE_HANDLE fd)
  29. {
  30. int rev=0;
  31. ACE_Time_Value timeout(5,0);
  32. if((rev=peer.recv(buffer,1000,&timeout))>0)
  33. {
  34. buffer[rev]='\0';
  35. cout<<endl<<"rev:\t"<<buffer<<endl;
  36. }
  37. return 3;
  38. }
  39. private:
  40. ACE_SOCK_Stream peer;
  41. char buffer[1024];
  42. };
  43. int main(int argc, char *argv[])
  44. {
  45. MyClient client;
  46. client.open();
  47. while(true)
  48. {
  49. ACE_Reactor::instance()->handle_events();
  50. }
  51. return 0;
  52. }

在这个例子中,客户端连接上服务器后,通过ACE_Reactor::instance()->register_handler(this,ACE_Event_Handler::READ_MASK)注册了一个读就绪的回调函数,当服务器端给客户端发消息的时候,会自动触发handle_input()函数,将接收到的信息打印出来。

这个例子只是为了演示反应器的基本用法,并不完善,我将在下一节中对如何在Socket通信中使用反应器做进一步的介绍。

在Socket编程中,常见的事件就是"读就绪","写就绪",通过对这两个事件的捕获分发,可以实现Socket中的异步操作。

Socket编程中的事件处理器

在前面我们已经介绍过,在ACE反应器框架中,任何都必须派生自ACE_Event_Handler类,并通过重载其相应会调事件处理函数来实现相应的回调处理的。在Socket编程中,我们通常需要重载的函数有

  1. handle_input()
    当I/O句柄(比如UNIX中的文件描述符)上的输入可用时,反应器自动回调该方法。
  2. handle_output()
    当I/O设备的输出队列有可用空间时,反应器自动回调该方法。
  3. handle_close()
    当事件处理器中的事件从Reactor中移除的时候调用。

此外,为了使Reactor能通过I/O句柄找到对应的事件处理器,还必须重载其get_handle()方法以使得Reactor建立起I/O句柄和事件处理器的关联。

使用Reactor框架。

下面我们将以一个客户端的程序为例,介绍如何在Socket编程中使用Reactor框架。

一.建立一个客户端对象(事件处理器)。

客户端对象就是一个事件处理器,其声明如下:

Cpp代码  
  1. class Client:public ACE_Event_Handler
  2. {
  3. public:
  4. ACE_HANDLE get_handle(void) const;
  5. int handle_input (ACE_HANDLE fd);
  6. int handle_close (ACE_HANDLE handle,
  7. ACE_Reactor_Mask close_mask);
  8. ACE_SOCK_Stream& Peer();
  9. private:
  10. ACE_SOCK_Stream peer;
  11. };

在Client端中我只关心"读就绪"事件,故只重载了handle_input函数(大多数应用下只需要重载handle_input函数)。另外,在客户端还保存了一个ACE_SOCK_Stream的peer对象用来进行Socket通信,同时封装了一个Peer()函数返回它的引用。

二.重载相应回调处理函数

Cpp代码  
  1. ACE_SOCK_Stream& Client::Peer()
  2. {
  3. return peer;
  4. }
  5. ACE_HANDLE Client::get_handle(void) const
  6. {
  7. return peer.get_handle();
  8. }
  9. int Client::handle_input (ACE_HANDLE fd)
  10. {
  11. int rev=0;
  12. if((rev = peer.recv(buffer,1000))>0)
  13. {
  14. buffer[rev]='\0';
  15. cout<<endl<<"rev:\t"<<buffer<<endl;
  16. return 0;
  17. }
  18. else    //Socket连接发生错误,返回-1,在Reactor中注销事件,触发handle_close函数
  19. {
  20. return -1;
  21. }
  22. }
  23. int Client::handle_close (ACE_HANDLE handle,
  24. ACE_Reactor_Mask close_mask)
  25. {
  26. cout<<endl<<"connecetd closed";
  27. return ACE_Event_Handler::handle_close(handle,close_mask);
  28. }

几个函数的功能都非常简单,这里就不多做介绍了。

三.在Reactor中注册事件

首先让我们来看看相应的main函数的代码:

Cpp代码  
  1. int main(int argc, char *argv[])
  2. {
  3. Client client;
  4. ACE_SOCK_Connector connector;
  5. ACE_INET_Addr addr(3000,"127.0.0.1");
  6. ACE_Time_Value timeout(5,0);
  7. if(connector.connect(client.Peer(),addr,&timeout) != 0)
  8. {
  9. cout<<endl<<"connecetd fail";
  10. return 0;
  11. }
  12. ACE_Reactor::instance()->register_handler(&client,ACE_Event_Handler::READ_MASK);
  13. while(true)
  14. {
  15. ACE_Reactor::instance()->handle_events();
  16. }
  17. return 0;
  18. }

在这里可以看到,使用Reactor框架后,依然首先通过ACE_SOCK_Connector的connect函数来建立连接。建立连接后,可以通过ACE_Reactor::instance()->register_handler函数来实现Reactor的注册,实现I/O事件和Client对象的handle_input方法相关联,它的第一个参数是事件处理器的地址,第二个参数是事件类型,由于这里只关心读就绪事件,故注册的事件类型是ACE_Event_Handler::READ_MASK。

四.启动Reactor事件循环

通过如上设置后,我们就可以通过ACE_Reactor::instance()->handle_events()启动Reactor循环了,这样,每当服务器端有数据发送给客户端时,当客户端的数据就绪时,就回触发Client对象的handle_input函数,将接收的数据打印出来。

通常的做法是,将Reactor事件循环作为一个单独的线程来处理,这样就不会阻塞main函数。

五.注销Reactor事件

Reactor事件的注销一般有两种方式,显式和隐式,下面将分别给予介绍。

  1. 隐式注销。
    当Reactor捕获事件后,会触发相应的"handle_"处理函数,当"handle_"处理函数返回值大于或等于0时,表示处理事件成功,当返回值小于0时,表示处理事件失败,这时Reactor会自动注销该句柄所注册的所有事件,并触发handle_close函数,以执行相应的资源清理工作。
    在本例中,当handle_input函数里的recv函数接收到0个数时,说明socket发生错误(大多为Socket连接中断),此时返回-1,则系统自动注销相应事件。
  2. 显示注销。
    调用Reactor对象的remove_handler方法移除,它有两个参数,第一个是所注册的事件反应器对象,第二个是所要注销的事件。

在这个示例程序里,连接方只有一个Socket连接,Reactor的优势并没有体现出来,但在一些网络管理系统里,连接方需要对多个需要管理的设备(服务器端)进行连接,在这种情况下使用Reactor模式,只需要多开一个Reactor事件循环线程就能实现事件多路分发复用,并且不会阻塞,通过面向对象的回调方式管理,使用起来非常方便。

Reactor框架的另外一个常用的地方就是服务器端,一般是一个服务器端对应多个客户端,这样用Reactor模式能大幅提高并发能力,这方面的编程方法将在下一章给与介绍。

在服务器端使用Reactor框架

使用Reactor框架的服务器端结构如下:

服务器端注册两种事件处理器,ClientAcceptor和ClientService,ClientService类负责和客户端的通信,每一个ClientService对象对应一个客户端的Socket连接。ClientAcceptor专门负责被动接受客户端的连接,并创建ClientService对象。这样,在一个N个Socket连接的服务器程序中,将存在1个ClientAcceptor对象和N个ClientService对象。

整个服务器端流程如下:

  1. 首先创建一个ClientAcceptor对象,该对象在Reactor上注册ACCEPT_MASK事件,Reactor将自动在监听端口建立Socket监听。
  2. 如果有对该端口的Socket连接时,Reactor将自动回调handle_input方法,ClientAcceptor重载此方法,并创建一个ClientService对象,用于处理和Client的通信。
  3. ClientService对象根据服务器的具体功能实现,其处理过程和客户端程序类似,注册相应的回调事件并分发即可。

代码如下:

Cpp代码  
  1. #include <ace/OS.h>
  2. #include <ace/Reactor.h>
  3. #include <ace/SOCK_Connector.h>
  4. #include <ace/SOCK_Acceptor.h>
  5. #include <ace/Auto_Ptr.h>
  6. class ClientService : public ACE_Event_Handler
  7. {
  8. public:
  9. ACE_SOCK_Stream &peer (void) { return this->sock_; }
  10. int open (void)
  11. {
  12. //注册读就绪回调函数
  13. return this->reactor ()->register_handler(this, ACE_Event_Handler::READ_MASK);
  14. }
  15. virtual ACE_HANDLE get_handle (void) const { return this->sock_.get_handle (); }
  16. virtual int handle_input (ACE_HANDLE fd )
  17. {
  18. //一个简单的EchoServer,将客户端的信息返回
  19. int rev = peer().recv(buf,100);
  20. if(rev<=0)
  21. return -1;
  22. peer().send(buf,rev);
  23. return 0;
  24. }
  25. // 释放相应资源
  26. virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask mask)
  27. {
  28. if (mask == ACE_Event_Handler::WRITE_MASK)
  29. return 0;
  30. mask = ACE_Event_Handler::ALL_EVENTS_MASK |
  31. ACE_Event_Handler::DONT_CALL;
  32. this->reactor ()->remove_handler (this, mask);
  33. this->sock_.close ();
  34. delete this;    //socket出错时,将自动删除该客户端,释放相应资源
  35. return 0;
  36. }
  37. protected:
  38. char buf[100];
  39. ACE_SOCK_Stream sock_;
  40. };
  41. class ClientAcceptor : public ACE_Event_Handler
  42. {
  43. public:
  44. virtual ~ClientAcceptor (){this->handle_close (ACE_INVALID_HANDLE, 0);}
  45. int open (const ACE_INET_Addr &listen_addr)
  46. {
  47. if (this->acceptor_.open (listen_addr, 1) == -1)
  48. {
  49. ACE_OS::printf("open port fail");
  50. return -1;
  51. }
  52. //注册接受连接回调事件
  53. return this->reactor ()->register_handler(this, ACE_Event_Handler::ACCEPT_MASK);
  54. }
  55. virtual ACE_HANDLE get_handle (void) const
  56. { return this->acceptor_.get_handle (); }
  57. virtual int handle_input (ACE_HANDLE fd )
  58. {
  59. ClientService *client = new ClientService();
  60. auto_ptr<ClientService> p (client);
  61. if (this->acceptor_.accept (client->peer ()) == -1)
  62. {
  63. ACE_OS::printf("accept client fail");
  64. return -1;
  65. }
  66. p.release ();
  67. client->reactor (this->reactor ());
  68. if (client->open () == -1)
  69. client->handle_close (ACE_INVALID_HANDLE, 0);
  70. return 0;
  71. }
  72. virtual int handle_close (ACE_HANDLE handle,
  73. ACE_Reactor_Mask close_mask)
  74. {
  75. if (this->acceptor_.get_handle () != ACE_INVALID_HANDLE)
  76. {
  77. ACE_Reactor_Mask m = ACE_Event_Handler::ACCEPT_MASK |
  78. ACE_Event_Handler::DONT_CALL;
  79. this->reactor ()->remove_handler (this, m);
  80. this->acceptor_.close ();
  81. }
  82. return 0;
  83. }
  84. protected:
  85. ACE_SOCK_Acceptor acceptor_;
  86. };
  87. int main(int argc, char *argv[])
  88. {
  89. ACE_INET_Addr addr(3000,"192.168.1.142");
  90. ClientAcceptor server;
  91. server.reactor(ACE_Reactor::instance());
  92. server.open(addr);
  93. while(true)
  94. {
  95. ACE_Reactor::instance()->handle_events();
  96. }
  97. return 0;
  98. }

代码功能比较简单,需要注意以下几点:

  1. 这里注册事件的方式和前面的文章中方式不一样,是通过ACE_Event_Handler类的reactor()方法设置和获取reactor的指针,比较直观和方便。前面的文章是通过ACE_Reactor::instance()来获取的一个单体reactor的指针。
  2. 当客户端Socket连接关闭时,需要释放相应资源,需要注意一下ClientService对象的handle_close方法中释放资源的相应代码。


定时器的实现

通过Reactor机制,还可以很容易的实现定时器的功能,使用方式如下。

  1. 编写一个事件反应器,重载handle_timeout()方法,该方法是定时器的触发时间到时,会自动触发该方法。
  2. 通过Reactor的schedule_timer()方法注册定时器。
  3. 启动reacotr的handle_events()事件分发循环。
  4. 当不想使用定时器时,可以通过Reactor的cancel_timer()方法注销定时器。

下面的代码简单的实现了一个定时器,并具有基本的开启,关闭功能。

Cpp代码  
  1. #include <ace/OS.h>
  2. #include <ace/Reactor.h>
  3. class MyTimerHandler : public ACE_Event_Handler
  4. {
  5. private:
  6. int inteval;    //执行时间间隔
  7. int delay;        //延迟执行时间
  8. int timerid;
  9. public:
  10. MyTimerHandler(int delay,int inteval)
  11. {
  12. this->delay=delay;
  13. this->inteval=inteval;
  14. }
  15. int open()    //注册定时器
  16. {
  17. ACE_Time_Value delaytime(inteval);
  18. ACE_Time_Value intevaltime(inteval);
  19. timerid = reactor()->schedule_timer(this,
  20. 0,    //传递handle_timeout给的参数
  21. delaytime,
  22. intevaltime);
  23. return timerid;
  24. }
  25. int close()    //取消定时器
  26. {
  27. return reactor()->cancel_timer(timerid);
  28. }
  29. //定时器回调函数
  30. int handle_timeout (const ACE_Time_Value &current_time,
  31. const void * = 0)
  32. {
  33. time_t epoch = ((timespec_t)current_time).tv_sec;
  34. ACE_DEBUG ((LM_INFO,
  35. ACE_TEXT ("handle_timeout: %s\n"),
  36. ACE_OS::ctime (&epoch)));
  37. return 0;
  38. }
  39. };
  40. int main(int argc, char *argv[])
  41. {
  42. MyTimerHandler * timer = new MyTimerHandler (3,5);
  43. timer->reactor(ACE_Reactor::instance());
  44. timer->open();
  45. for(int i=0;i<2;i++)    //触发次handle_timeout事件
  46. {
  47. ACE_OS::printf("\n%d\n",i);
  48. ACE_Reactor::instance()->handle_events();
  49. }
  50. timer->close();
  51. ACE_OS::printf("cancel timer");
  52. while(true)
  53. ACE_Reactor::instance()->handle_events();
  54. return 0;
  55. }

代码功能比较简单,这里就不多做介绍了。

ACE反应器(Reactor)模式相关推荐

  1. 【Netty】反应器 Reactor 模式 ( 单反应器 Reactor 单线程 | 单反应器 Reactor 多线程 )

    文章目录 一. 反应器 ( Reactor ) 模式 二. 反应器 ( Reactor ) 模式两大组件 三. 单反应器 ( Reactor ) 单线程 四. 单反应器 ( Reactor ) 单线程 ...

  2. 用ACE的Reactor模式实现网络通讯的例子

    用ACE的Reactor模式实现网络通讯的例子,不罗嗦,直接上代码. 服务器代码: [cpp] view plain copy #include <ace/Reactor.h> #incl ...

  3. ACE之Reactor模式使用实例

    // ACE_Reactor_Client.cpp : 定义控制台应用程序的入口点. //#include "stdafx.h"#include "ace/Reactor ...

  4. ACE - Reactor模式源码剖析及具体实现(大量源码慎入)

    原文出自http://www.cnblogs.com/binchen-china,禁止转载. 在之前的文章中提到过Reactor模式和Preactor模式,现在利用ACE的Reactor来实现一个基于 ...

  5. 【Netty】Netty 简介 ( 原生 NIO 弊端 | Netty 框架 | Netty 版本 | 线程模型 | 线程 阻塞 IO 模型 | Reactor 模式引入 )

    文章目录 一. NIO 原生 API 弊端 二. Netty 简介 三. Netty 架构 四. Netty 版本 五. Netty 线程模型 六. 阻塞 IO 线程模型 七. 反应器 ( React ...

  6. 反应器(Reactor)模式-golang探索

    反应器模式 在以前的博文模式设计概述:反应器(Reactor)模式介绍过相关的概念和流程,当时使用了python但是从结果上来看并没有起到很明显的效果.最近在处理有关proxy的项目中,刚刚好涉及到有 ...

  7. Reactor模式:反应器模式

    Reactor这个词译成汉语还真没有什么合适的,很多地方叫反应器模式,但更多好像就直接叫reactor模式了,其实我觉着叫应答者模式更好理解一些.通过了解,这个模式更像一个侍卫,一直在等待你的召唤,或 ...

  8. Java设计模式之Reactor(反应器)模式初探

    文章来源: https://blog.csdn.net/pistolove/article/details/53152708 http://www.blogjava.net/DLevin/archiv ...

  9. ACE网络编程模式比较

    ACE将网络编程进行了模式化,以便你不必每次都重复相同的代码. 网络编程需要处理的事情多括中断,并发,多线程等,程序格式相对固定,但是健壮的网络程序则相对复杂.为了处理这些情形,ACE内建了几个网络编 ...

最新文章

  1. 确认了!MySQL 狠甩 Oracle 稳居 Top1!
  2. linux 线程库在哪里,linux线程库
  3. springmvc请求参数异常处理
  4. 迪拜宣布亿航“无人机的士”计划,将于7月份正式运营
  5. 通过FILETIME得到时间
  6. GDCM:gdcm::File的测试程序
  7. git clone 所有远程分支
  8. 1.6-1.7配置IP1.8网络问题排查
  9. java jdk 的环境变量_Java JDK14(Java 14)在Windows上安装与环境变量配置
  10. linux centos7杀进程,centos7 nginx 启动/进程状态/杀掉进程
  11. 咏南中间件D7客户端演示
  12. 生物信息学概论_英国爱丁堡大学生物相关硕士- 系统与合成生物学理学硕士详解+案例分享...
  13. TcaplusDB X 黎明觉醒,探索不止,黎明将至
  14. 2022.10.23 英语背诵
  15. 8086汇编学习之[BX],CX寄存器与loop指令,ES寄存器等
  16. Suzy找到实习了吗Day 17 | 二叉树进行中:110. 平衡二叉树,257 二叉树的所有路径,404. 左叶子之和
  17. 连接校园网的路由器为啥老是服务器没响应,校园网路由器不能使用怎么办?
  18. 在职可以考计算机吗,沈阳师范大学在职研可以考计算机证吗
  19. 1.电磁波传播原理,慢衰落、快衰落、阴影效应、多径传播、多普勒效应、塔下黑。
  20. 掌握PS制作,实时预览你的精彩作品

热门文章

  1. 'React/RCTBridgeDelegate.h' file not found
  2. 如何升级浏览器_绿茶浏览器app下载安装_绿茶浏览器软件最新版免费下载
  3. oracle遍历表做查询,oracle 语句之对数据库的表名就行模糊查询,对查询结果进行遍历,依次获取每个表名结果中的每个字段(存储过程)...
  4. 百度翻译API的使用
  5. 6.神操作(把master上的三个安装包scp给slave)—Hadoop完全分布式搭建完成
  6. 三年程序员之后的思考
  7. 【opencv】2.opencv绘图、视频等
  8. Python下opencv使用笔记系列
  9. sql必知必会(第四版) 学习笔记一
  10. Python基础知识实例讲解