把这两天做Proactor的一些经验和心得写一下,可能会给一些人帮助。
    Proactor是异步模式的网络处理器,ACE中叫做“前摄器”。
    先讲几个概念:
    前摄器(Proactor)-异步的事件多路分离器、处理器,是核心处理类。启动后由3个线程组成(你不需要关心这三个线程,我只是让你知道一下有这回事存在)。
    接受器(Acceptor)-用于服务端,监听在一个端口上,接受用户的请求。
    连接器(Connector)-用于客户端,去连接远程的监听。当然,如果远程是ACE写的,就是Acceptor。
    异步模式-即非阻塞模式。网络的传输速度一般来讲为10Mbps、100Mbps、1000Mbps。拿千兆网来说,实际的传输速度为1000Mbps/8大概为128KB左右。我们的CPU一般为P4 3.0GHZ,如果是32位的处理器,一秒钟大概可以处理6G的字节,那么,128KB的网络速度是远远及不上处理器的速度的。网络发送数据是一位一位发送出去的,如果CPU等在这里,发送完成函数才结束,那么,处理器浪费了大量时间在网络传输上。
    操作系统提供了异步的模式来传输网络数据,工作模式即:应用程序把要发送的数据交给操作系统,操作系统把数据放在系统缓冲区后就告诉应用程序OK了,我帮你发,应用程序该干嘛干嘛去。操作系统发送完成后,会给应用系统一个回执,告诉应用程序:刚才那个包发送完成了!
   举个例子:你有几封邮件和包裹要发,最有效率的办法是什么?你把邮件和包裹及交给总台,总台MM说,好了,你帮你发,你忙去吧!然后你去工作了。过了一会,总台MM打电话告诉你:“刚才我叫快递公司的人来了,把你的包裹发出去了。邮局的人也来了,取走了邮件,放心好了”。同样,如果你知道今天会有包裹来,比如你在淘宝上购物了,你能成天等在总台?你应该告诉总台MM:“今天可能有我的一个快递,你帮我收一下,晚上请你肯德基!”。MM:“看在肯得基的面子上,帮你收了”。某个时间,MM打电话来了:“帅哥,你的包裹到了,我帮你签收了,快来拿吧。”
   因为操作系统是很有效率的,所有,他在后台收发是很快的。应用程序也很简单。Proactor就是这种异步模式的。Proactor就是总台MM;ACE_Service_Handle就是总台代为收发邮件的公司流程。

我们看一个实例:

//***********************************************************
class TPTCPAsynchServerImpl : public ACE_Service_Handler
{
public:
 TPTCPAsynchServerImpl(void);
 ~TPTCPAsynchServerImpl(void);

virtual void open (ACE_HANDLE handle, ACE_Message_Block &message_block); 
 virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);

virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);

virtual void  handle_time_out (const ACE_Time_Value &tv, const void *act=0);

private:
 int initiate_read_stream (const ACE_Asynch_Read_Stream::Result &result);

ACE_Asynch_Read_Stream rs_;
 ACE_Asynch_Write_Stream ws_;

};

这个例子从ACE_Service_Handler继承过来,ACE_Service_Handle主要就是定义了一些回调函数。
1、 virtual void open (ACE_HANDLE handle, ACE_Message_Block &message_block);
  当有客户端连接上来,连接建立成功后Proactor会调用这个方法。

2、 virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
当用户要读的数据读好了后,调用这个方法

3、virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);
当用户要写的数据在网卡上发送成功后,Proactor会回调这个方法

4、 virtual void  handle_time_out (const ACE_Time_Value &tv, const void *act=0);
当用户设定的时钟到期了,这个方法会被调用。

这跟和总台MM的联络方法是不是一样的?

对还缺点东西,缺少怎么向总台MM交待任务的方法。下面看看:

首先,创建一个监听器。

ACE_Asynch_Acceptor<TPTCPAsynchServerImpl> acceptor_;
看到没,就是我们刚才写的类,因为他继承了回调接口,并实现了自已的代码,模板中ACE_Asynch_Acceptor会在合适的时候回调这些方法。

//创建一个地址对象
 ACE_INET_Addr addr(port, ip);
acceptor_.open (addr, 8 * 1024, 1);

Open后,就开始监听了。其它的,向Proactor注册一些事件的事模板类中都替你做了,你不需要做很多事。
    那么,已经开始监听了,我的程序从哪里开始呢?对于一个服务程序来讲,程序是被用户的连接驱动的,一个用户程序想和通讯,必须先创建连接,就是Socket中的connect操作。这个操作Proactor会替我们做一些工作,当连接创建完成后,上面讲的Open方法会被调用,我们看看Open方法中都有些什么代码:

void TPTCPAsynchServerImpl::open (ACE_HANDLE handle, ACE_Message_Block &message_block)
{
 ACE_DEBUG ((LM_DEBUG, "%N:%l:TPTCPAsynchServerImpl::open()..."));

//构造读流
 if (rs_.open (*this, handle) == -1)
 {
  ACE_ERROR ((LM_ERROR, "%N:%l:", "TPTCPAsynchServerImpl::open() Error"));
  return;
    }

//构造写流
 if (ws_.open(*this, handle) == -1)
 {
  ACE_ERROR ((LM_ERROR, "%N:%l:", "TPTCPAsynchServerImpl::open() Error"));
  return;
    }

//获取客户端连接地址和端口
 ACE_INET_Addr addr; 
 ACE_SOCK_SEQPACK_Association ass=ACE_SOCK_SEQPACK_Association(handle); 
 size_t addr_size=1; 
 ass.get_local_addrs(&addr,addr_size);

this->server_->onClientConnect((int)handle, addr.get_ip_address(), addr.get_port_number());

//如果客户连接时同时提交了数据,需要伪造一个结果,然后呼叫读事件
 if (message_block.length () != 0)
 {
 // ACE_DEBUG((LM_DEBUG, "message_block.length() != 0 "));
  // 复制消息块
  ACE_Message_Block &duplicate =  *message_block.duplicate ();

// 伪造读结果,以便进行读完成回调
  ACE_Asynch_Read_Stream_Result_Impl *fake_result =
        ACE_Proactor::instance ()->create_asynch_read_stream_result (this->proxy (),
                                                                     this->handle_,
                                                                     duplicate,
                                                                     1024,
                                                                     0,
                                                                     ACE_INVALID_HANDLE,
                                                                     0,
                                                                     0);

size_t bytes_transferred = message_block.length ();

// Accept事件处理完成,wr_ptr指针会被向前移动,将其移动到开始位置
  duplicate.wr_ptr (duplicate.wr_ptr () - bytes_transferred);

// 这个方法将调用回调函数
  fake_result->complete (message_block.length (), 1, 0);

// 销毁伪造的读结果
  delete fake_result;
 }

// 否则,通知底层,准备读取用户数据
 //创建一个消息块。这个消息块将用于从套接字中异步读 
 ACE_Message_Block *mb = 0;
  ACE_NEW (mb, ACE_Message_Block (_bufSize));

if (rs_.read (*mb, mb->size () - 1) == -1)
 {
  delete mb;
  ACE_ERROR ((LM_ERROR, "%N:%l:open init read failed!"));
  return;
 }
}

我们看到,首先创建了两个流,就是前面类定义中定义的一个异步写流,一个异步读流。以后对网络的读和写就通过这两个流进行。我还给出了一段读客户端地址和端口的代码。然后是读取客户Connect可能附带的数据,那段代码不用看懂,以后使用照抄就行。然后就是

 if (rs_.read (*mb, mb->size () - 1) == -1)
 {
  delete mb;
  ACE_ERROR ((LM_ERROR, "%N:%l:open init read failed!"));
  return;
 }

这段代码使用读流读一段数据。这段代码就是向总台MM交待:我要收包裹,收好了叫我!
也就是说,这段代码99%的可能是读不出数据的,只是向Proactor注册读的事件,具体的等待、读取操作由Proactor读,读到了,就回调Handle_Read_Stream方法。ACE_Message_Block是消息块,数据就是存放在消息块中的。
下面看看Handle_Read_Stream方法的代码:

void TPTCPAsynchServerImpl::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
{
 result.message_block ().rd_ptr ()[result.bytes_transferred ()] = '/0';

ACE_DEBUG ((LM_DEBUG, "********************/n"));
  ACE_DEBUG ((LM_DEBUG, "%s = %d/n", "bytes_to_read", result.bytes_to_read ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d/n", "handle", result.handle ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d/n", "bytes_transfered", result.bytes_transferred ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d/n", "act", (u_long) result.act ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d/n", "success", result.success ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d/n", "completion_key", (u_long) result.completion_key ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d/n", "error", result.error ()));
  ACE_DEBUG ((LM_DEBUG, "********************/n"));

result.message_block().release();

if (this->initiate_read_stream (result) == -1)
 {
  ACE_ERROR((LM_ERROR, "%N:%l:read stream failed!connection closed, remove it:%d/n", result.handle()));
  closeConnection(result.handle());
 } 
}

这个函数被调用,就表明有数据已经读好了,包裹已经在总台了。Proactor比总台MM还好,给你送上门了,数据就在Result里,上面演示了Result中的数据。然后把消息块释放了,然后调用initiate_read_stream继续监听网络上可能到来的数据。看看initiate_read_stream好了:

int TPTCPAsynchServerImpl::initiate_read_stream (const ACE_Asynch_Read_Stream::Result &result)
{
 ACE_DEBUG((LM_TRACE, "%N:%l:TPTCPAsynchServerImpl::initiate_read_stream()"));
 //创建一个消息块。这个消息块将用于从套接字中异步读 
 ACE_Message_Block *mb = new ACE_Message_Block(_bufSize);
 if (mb == NULL)
 {
  ACE_DEBUG((LM_ERROR, "%N:%l:can't allock ACE_Message_Block. ")); 
  return -1;
 }

if (rs_.read (*mb, mb->size () - 1) == -1)
 {
  delete mb;
  ACE_ERROR_RETURN ((LM_ERROR, "%N:%l:rs->read() failed, clientID=%d", result.handle()),  -1);
 }

return 0;
}

代码很简单,就是创建一个新的消息块,然后使用读流注册一个读消息就可以了。

到此为止,Proactor的读流程很清楚了吧?

下面再说一个写流程。

写流程其实更简单,在任意想向客户端写数据的地方,调用相应代码就行了,比如,我们提供了SendData方法来发送数据,在任意想发送数据的地方调用SendData就行了,SendData的代码如下:

int TPTCPAsynchServerImpl::sendData(int clientID, const char *data, int dataLen, unsigned int &id)
{
 ACE_DEBUG((LM_DEBUG, "TPTCPAsynchServerImpl::sendData(void)"));

ACE_Message_Block *mb; 
 ACE_NEW_RETURN(mb, ACE_Message_Block(dataLen + 1), -1);

mb->wr_ptr((char*)data);                  
 ACE_OS::memcpy(mb->base(),(char*)data, dataLen);

id = GlobleSingleton::instance()->getIndex();
 mb->msg_type((int)id);

//向操作系统发送数据
 if (connection->ws->write (*mb , dataLen ) == -1)
 {
  ACE_ERROR_RETURN((LM_ERROR, "%N:%l:sendData failed! clientID=%d", clientID),-1);
 }

return 0;
}

简单说,就是创建了一个消息块,把用户数据拷贝进来,然后调用写流WS向Proactor发送一个Write事件就可以了,发送成功后,Handle_write_handle会被调用,看一下:

void
TPTCPAsynchServerImpl::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
{
  ACE_DEBUG ((LM_DEBUG,
              "handle_write_stream called"));

// Reset pointers.
  result.message_block ().rd_ptr (result.message_block ().rd_ptr () - result.bytes_transferred ());

ACE_DEBUG ((LM_DEBUG, "********************"));
  ACE_DEBUG ((LM_DEBUG, "%s = %d", "bytes_to_write", result.bytes_to_write ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d", "handle", result.handle ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d", "bytes_transfered", result.bytes_transferred ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d", "act", (u_long) result.act ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d", "success", result.success ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d", "completion_key", (u_long) result.completion_key ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d", "error", result.error ()));
  ACE_DEBUG ((LM_DEBUG, "********************"));
#if 0
  ACE_DEBUG ((LM_DEBUG, "%s = %s", "message_block", result.message_block ().rd_ptr ()));
#endif

// Release message block.
  result.message_block ().release ();
}

代码中使用了result中发数据,然后把消息块释放了,就这么简单。

这是简单的proactor用法,当然,复杂也基本就这样用。所谓不基本的不是Proactor的内容,而是服务器编程本身的麻烦。比如说,多个连接的管理、重发机制、发送队列等等,这都不是ACE的内容。这些要大家自己思考了,并添加。

在这里,我要说几个重要的问题:连接的管理。Acceptor是一个类,但是在每一个连接,Proactor都用了某种办法创建了一个实例,所以,连接管理的群集类一定不能在Acceptor类中,不然得到的结果就是始终只有一条记录。因为每个Acceptor都有一个实例,实例对应一个连接,群集类也就每个实例一个了。要采取的方法是一个全局的容器对象就可以了。比如我这个类:

typedef ACE_Map_Manager <ACE_HANDLE, ConnectionBean *, ACE_Null_Mutex> ConnectionMap;
typedef ACE_Map_Iterator<ACE_HANDLE, ConnectionBean *, ACE_Null_Mutex> ConnectionIterator;
typedef ACE_Map_Entry   <ACE_HANDLE, ConnectionBean *> ConnectionEntry;

class Globle
{
public:
 Globle(void);
 ~Globle(void);

ITPServer* server_;
 ConnectionMap _connections;

unsigned getIndex(void); 
 long getTimerId(void);

private:
 unsigned int index_;

long timerId_;
};

typedef ACE_Singleton<Globle, ACE_Null_Mutex> GlobleSingleton;

我使用ACE的Singleton模板创建这个类,每一个Acceptor要使用ConnectionMap,都使用这里的_connections,方法如下 :
  GlobleSingleton::instance()->connection.bind()......

这个问题可是我花费了2天时间找出来的,诸位同仁不可不戒啊,给点掌声:)

转自: http://blog.newsfan.net/leelin/archive/2006/11/01/4159.aspx

ACE中的Proactor介绍和应用实例相关推荐

  1. (zt)ACE中的Proactor介绍和应用实例

    把这两天做Proactor的一些经验和心得写一下,可能会给一些人帮助.     Proactor是异步模式的网络处理器,ACE中叫做"前摄器".     先讲几个概念:     前 ...

  2. ACE中的Proactor和Reactor

    ACE中的Proactor和Reactor ACE_Select_Reactor是除Windows之外所有平台使用的默认反应器实现,在这些系统上最终会用select()系统函数进行等待.在Window ...

  3. php引用代码_PHP引用是什么?php中引用的介绍(代码实例)

    本篇文章给大家带来的内容是关于PHP引用是什么?php中引用的介绍(代码实例),有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助. 1. 什么是引用 在 PHP 中引用是指用不同的名字访问 ...

  4. php正则运用,php中常用的正则表达式的介绍及应用实例代码

    更全面的实例,可以参考 最常用的PHP正则表达式收集整理 //www.jb51.net/article/14049.htm php 正则表达式小结 //www.jb51.net/article/198 ...

  5. ACE中静态实例管理方式

    ACE中的很多类使用了单例模式,为了便于管理单例对象,ACE使用了一个组件--ACE_Framework_Component来专门管理. 我们以ACE_Reactor这个单例类的创建和释放为例. 1. ...

  6. Java XML解析工具 dom4j介绍及使用实例

    Java XML解析工具 dom4j介绍及使用实例 dom4j介绍 dom4j的项目地址:http://sourceforge.net/projects/dom4j/?source=directory ...

  7. python asyncio教程_python中使用asyncio实现异步IO实例分析

    1.说明 Python实现异步IO非常简单,asyncio是Python 3.4版本引入的标准库,直接内置了对异步IO的支持. asyncio的编程模型就是一个消息循环.我们从asyncio模块中直接 ...

  8. (三)AJAX基本介绍和简单实例03

    (三)AJAX基本介绍和简单实例03-----Ajax与数据库的动态应用 前台显示界面: 选择所有客户之后: 选择其中一个客户---杜森: Demo03.html代码 <html> < ...

  9. python怎么处理数据_python中scrapy处理项目数据的实例分析

    在我们处理完数据后,习惯把它放在原有的位置,但是这样也会出现一定的隐患.如果因为新数据的加入或者其他种种原因,当我们再次想要启用这个文件的时候,小伙伴们就会开始着急却怎么也翻不出来,似乎也没有其他更好 ...

最新文章

  1. shareSDK(分享第三方库)的 使用
  2. AT COMMAND的命令集
  3. 关于机器学习的一些感想
  4. LeetCode 865. 具有所有最深结点的最小子树(递归)
  5. python中前后端通信方法Ajax和ORM映射(form表单提交)
  6. api 微信内置浏览器js_微信小程序和HTML的区别
  7. byte数组转blob类型_Java类型相互转换byte[]类型,blob类型
  8. cmder全局添加右键菜单项
  9. vue-router 修改或添加新参数
  10. cad汉仪长仿宋体_长仿宋体字体下载 cad工程机械绘图工程制图国标字体下载
  11. python安装pygame教程_python-pygame安装教程
  12. excel删除无尽空白行_Word技巧:快速删除Word文档中的空行
  13. 易班开放平台第三方网站接入的Demo(Java_SDK)的步骤
  14. wingdings字体符号在哪_wingding、Wingdings2、wingdings3字体特殊符号与键盘字母相对应位置...
  15. python 使打开的浏览器最大化
  16. 抽35块树莓派新品单片机送给可爱的你们
  17. LeetCode hot-100 简单and中等难度,61-70.
  18. 新职业人才缺口近千万,90后最担心失业;字节跳动回应TikTok被收购传闻;Twitter公布账号劫持事故细节 | EA周报...
  19. 备份Linux到ntfs硬盘,Linux(SLES)挂载NTFS移动硬盘实践
  20. 大数据方面的核心技术

热门文章

  1. cuda-convnet2与caffe对比
  2. Programming Computer Vision with Python (学习笔记五)
  3. 图像检索:图像拷贝检索PHash改进方案
  4. JVM内幕:Java虚拟机详解
  5. Effective Java读书笔记四:通用程序设计
  6. Java并发编程(5):volatile变量修饰符—意料之外的问题(含代码)
  7. Python脚本图解
  8. 实现一个反向传播人工神经网络
  9. 数字图像处理:第七章 邻域运算
  10. JavaScript对象继续总结