本文我们讲一下zeromq的线/进程间通信方式。

在zeromq源代码分析1中我们分析了zeromq的基本工作流程。我们了解到了zeromq的线/进程间通信的方式为消息传递。

zeromq中的各线程间通信的消息称之为command_t类型:

     //  This structure defines the commands that can be sent between threads.struct command_t{//  Object to process the command.class object_t *destination;enum type_t{stop,plug,own,attach,bind,activate_reader,activate_writer,pipe_term,pipe_term_ack,term_req,term,term_ack,reap,reaped,done} type;union {//  Sent to I/O thread to let it know that it should//  terminate itself.struct {} stop;//  Sent to I/O object to make it register with its I/O thread.struct {} plug;//  Sent to socket to let it know about the newly created object.struct {class own_t *object;} own;//  Attach the engine to the session. If engine is NULL, it informs//  session that the connection have failed.struct {struct i_engine *engine;unsigned char peer_identity_size;unsigned char *peer_identity;} attach;//  Sent from session to socket to establish pipe(s) between them.//  Caller have used inc_seqnum beforehand sending the command.struct {class reader_t *in_pipe;class writer_t *out_pipe;unsigned char peer_identity_size;unsigned char *peer_identity;} bind;//  Sent by pipe writer to inform dormant pipe reader that there//  are messages in the pipe.struct {} activate_reader;//  Sent by pipe reader to inform pipe writer about how many//  messages it has read so far.struct {uint64_t msgs_read;} activate_writer;//  Sent by pipe reader to pipe writer to ask it to terminate//  its end of the pipe.struct {} pipe_term;//  Pipe writer acknowledges pipe_term command.struct {} pipe_term_ack;//  Sent by I/O object ot the socket to request the shutdown of//  the I/O object.struct {class own_t *object;} term_req;//  Sent by socket to I/O object to start its shutdown.struct {int linger;} term;//  Sent by I/O object to the socket to acknowledge it has//  shut down.struct {} term_ack;//  Transfers the ownership of the closed socket//  to the reaper thread.struct {class socket_base_t *socket;} reap;//  Closed socket notifies the reaper that it's already deallocated.struct {} reaped;//  Sent by reaper thread to the term thread when all the sockets//  are successfully deallocated.struct {} done;} args;};

由此可见一个command_t主要包括destination,type,args,其中destination表明要传递命令消息的目的对象,type和args则是命令消息的类型和参数。

而发送消息的代码如下:

 //  Derived object can use these functions to send commands//  to other objects.void send_stop ();void send_plug (class own_t *destination_,bool inc_seqnum_ = true);void send_own (class own_t *destination_,class own_t *object_);void send_attach (class session_t *destination_,struct i_engine *engine_, const blob_t &peer_identity_,bool inc_seqnum_ = true);void send_bind (class own_t *destination_,class reader_t *in_pipe_, class writer_t *out_pipe_,const blob_t &peer_identity_, bool inc_seqnum_ = true);void send_activate_reader (class reader_t *destination_);void send_activate_writer (class writer_t *destination_,uint64_t msgs_read_);void send_pipe_term (class writer_t *destination_);void send_pipe_term_ack (class reader_t *destination_);void send_term_req (class own_t *destination_,class own_t *object_);void send_term (class own_t *destination_, int linger_);void send_term_ack (class own_t *destination_);void send_reap (class socket_base_t *socket_);void send_reaped ();void send_done ();
void zmq::object_t::send_command (command_t &cmd_)
{ctx->send_command (cmd_.destination->get_tid (), cmd_);
}void zmq::ctx_t::send_command (uint32_t tid_, const command_t &command_)
{slots [tid_]->send (command_);
}

因为zeromq采用的模式基本上就是一个object和一个io_thread绑定起来,所以这里的意思就是我们将command发送给目的对象的io_thread的mailbox中去。

如果对ctx::slots不太懂,请回到zeromq源代码分析1并且结合源代码理解。

下面我们对mailbox来一个深入分析:

zmq::mailbox_t::mailbox_t ()
{
#ifdef PIPE_BUF//  Make sure that command can be written to the socket in atomic fashion.//  If this wasn't guaranteed, commands from different threads would be//  interleaved.zmq_assert (sizeof (command_t) <= PIPE_BUF);
#endif//  Create the socketpair for signaling.int rc = make_socketpair (&r, &w);errno_assert (rc == 0);//  Set the writer to non-blocking mode.int flags = fcntl (w, F_GETFL, 0);errno_assert (flags >= 0);rc = fcntl (w, F_SETFL, flags | O_NONBLOCK);errno_assert (rc == 0);#ifndef MSG_DONTWAIT//  Set the reader to non-blocking mode.flags = fcntl (r, F_GETFL, 0);errno_assert (flags >= 0);rc = fcntl (r, F_SETFL, flags | O_NONBLOCK);errno_assert (rc == 0);
#endif
}

该函数创建了socketpair,并且设置了writer和reader都为非阻塞模式。

而socketpair在UNIX上是有原生的实现的,在windows下面通过自己创建一个一对socket连接,客户端当reader,服务端当writer。

void zmq::mailbox_t::send (const command_t &cmd_)
{//  Attempt to write an entire command without blocking.ssize_t nbytes;do {nbytes = ::send (w, &cmd_, sizeof (command_t), 0);} while (nbytes == -1 && errno == EINTR);//  Attempt to increase mailbox SNDBUF if the send failed.if (nbytes == -1 && errno == EAGAIN) {int old_sndbuf, new_sndbuf;socklen_t sndbuf_size = sizeof old_sndbuf;//  Retrieve current send buffer size.int rc = getsockopt (w, SOL_SOCKET, SO_SNDBUF, &old_sndbuf,&sndbuf_size);errno_assert (rc == 0);new_sndbuf = old_sndbuf * 2;//  Double the new send buffer size.rc = setsockopt (w, SOL_SOCKET, SO_SNDBUF, &new_sndbuf, sndbuf_size);errno_assert (rc == 0);//  Verify that the OS actually honored the request.rc = getsockopt (w, SOL_SOCKET, SO_SNDBUF, &new_sndbuf, &sndbuf_size);errno_assert (rc == 0);zmq_assert (new_sndbuf > old_sndbuf);//  Retry the sending operation; at this point it must succeed.do {nbytes = ::send (w, &cmd_, sizeof (command_t), 0);} while (nbytes == -1 && errno == EINTR);}errno_assert (nbytes != -1);//  This should never happen as we've already checked that command size is//  less than PIPE_BUF.zmq_assert (nbytes == sizeof (command_t));
}

发送command,这边利用writer句柄调用send(4) syscall进行发送command。先尝试写,如果被中断则继续循环写。如果返回错误EAGAIN,由于是非阻塞模式写我们就会增加SND_BUF继续写,最后验证一下是否成功。

int zmq::mailbox_t::recv (command_t *cmd_, bool block_)
{
#ifdef MSG_DONTWAIT//  Attempt to read an entire command. Returns EAGAIN if non-blocking//  mode is requested and a command is not available.ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t),block_ ? 0 : MSG_DONTWAIT);if (nbytes == -1 && (errno == EAGAIN || errno == EINTR))return -1;
#else//  If required, set the reader to blocking mode.if (block_) {int flags = fcntl (r, F_GETFL, 0);errno_assert (flags >= 0);int rc = fcntl (r, F_SETFL, flags & ~O_NONBLOCK);errno_assert (rc == 0);}//  Attempt to read an entire command. Returns EAGAIN if non-blocking//  and a command is not available. Save value of errno if we wish to pass//  it to caller.int err = 0;ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), 0);if (nbytes == -1 && (errno == EAGAIN || errno == EINTR))err = errno;//  Re-set the reader to non-blocking mode.if (block_) {int flags = fcntl (r, F_GETFL, 0);errno_assert (flags >= 0);int rc = fcntl (r, F_SETFL, flags | O_NONBLOCK);errno_assert (rc == 0);}//  If the recv failed, return with the saved errno if set.if (err != 0) {errno = err;return -1;}#endif//  Sanity check for success.errno_assert (nbytes != -1);//  Check whether we haven't got half of command.zmq_assert (nbytes == sizeof (command_t));return 0;

接收command则根据参数block_可以选择是否阻塞地读。这边之所以可以直接使用

    ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), 0);

是因为对于线程间的通信木有字节序不同的问题。

下面我们来讲一下接收的函数调用过程。

由于我们在zeromq源代码分析1中已经说明io_thread在构造函数中会将自己的mailbox的句柄加入到poller中,设置事件处理器就是自身的函数in_event()/out_event(),并且激活读事件。因此当poller轮询到mailbox的读事件,也就是有人发消息给该io_thread的mailbox时候,就能通过in_event()函数去recv消息。

void zmq::io_thread_t::in_event ()
{//  TODO: Do we want to limit number of commands I/O thread can//  process in a single go?while (true) {//  Get the next command. If there is none, exit.command_t cmd;int rc = mailbox.recv (&cmd, false);if (rc != 0 && errno == EINTR)continue;if (rc != 0 && errno == EAGAIN)break;errno_assert (rc == 0);//  Process the command.cmd.destination->process_command (cmd);}
}

这边收到command之后我们处理该command,即调用cmd的destination的process_command(1)函数,也就是与该io_thread绑定的object。

//  These handlers can be overloaded by the derived objects. They are//  called when command arrives from another thread.virtual void process_stop ();virtual void process_plug ();virtual void process_own (class own_t *object_);virtual void process_attach (struct i_engine *engine_,const blob_t &peer_identity_);virtual void process_bind (class reader_t *in_pipe_,class writer_t *out_pipe_, const blob_t &peer_identity_);virtual void process_activate_reader ();virtual void process_activate_writer (uint64_t msgs_read_);virtual void process_pipe_term ();virtual void process_pipe_term_ack ();virtual void process_term_req (class own_t *object_);virtual void process_term (int linger_);virtual void process_term_ack ();virtual void process_reap (class socket_base_t *socket_);virtual void process_reaped ();void zmq::object_t::process_command (command_t &cmd_)
{switch (cmd_.type) {case command_t::activate_reader:process_activate_reader ();break;case command_t::activate_writer:process_activate_writer (cmd_.args.activate_writer.msgs_read);break;case command_t::stop:process_stop ();break;case command_t::plug:process_plug ();process_seqnum ();return;...

这些process_xxx()函数在子类中override,处理各种线程间发送的命令。

下一篇文章会介绍zmq_msg_t以及相关的函数,敬请期待。希望有兴趣的朋友可以和我联系,一起学习。 kaka11.chen@gmail.com

zeromq源代码分析2------线/进程间通信方式相关推荐

  1. Android系统进程间通信(IPC)机制Binder中的Server启动过程源代码分析

    原文地址: http://blog.csdn.net/luoshengyang/article/details/6629298 在前面一篇文章浅谈Android系统进程间通信(IPC)机制Binder ...

  2. Android系统进程间通信(IPC)机制Binder中的Client获得Server远程接口过程源代码分析(2)...

    注意,这里的参数reply = 0,表示这是一个BC_TRANSACTION命令.         前面我们提到,传给驱动程序的handle值为0,即这里的tr->target.handle = ...

  3. binder 从c到java_Android系统进程间通信Binder机制在应用程序框架层的Java接口源代码分析...

    在前面几篇文章中,我们详细介绍了Android系统进程间通信机制Binder的原理,并且深入分析了系统提供的Binder运行库和驱动程序的源代码.细心的读者会发现,这几篇文章分析的Binder接口都是 ...

  4. dat关闭某进程_超详细解析!工程师必会的Linux进程间通信方式和原理

    ▍进程的概念 · 进程是操作系统的概念,每当我们执行一个程序时,对于操作系统来讲就创建了一个进程,在这个过程中,伴随着资源的分配和释放.可以认为进程是一个程序的一次执行过程. ▍进程通信的概念 · 进 ...

  5. Android应用程序绑定服务(bindService)的过程源代码分析

    Android应用程序组件Service与Activity一样,既可以在新的进程中启动,也可以在应用程序进程内部启动:前面我们已经分析了在新的进程中启动Service的过程,本文将要介绍在应用程序内部 ...

  6. Hadoop源代码分析

    http://wenku.baidu.com/link?url=R-QoZXhc918qoO0BX6eXI9_uPU75whF62vFFUBIR-7c5XAYUVxDRX5Rs6QZR9hrBnUdM ...

  7. Android源代码分析之类方法与组件名词解释(持续更新)

    Android源代码分析方法组件详解 1. 类方法 1.1 Handler方法 1.2 Looper方法 1.3 Binder 1.4 Instrumentation 2. 组件 2.1 AMS.WM ...

  8. Android Applicaion组件创建的源代码分析(Android 9,含序列图)

    Application组件源代码分析 1. Applicaion启动流程源代码分析 2. 启动过程中应用进程.系统服务进程通信的分界点 3. 组件生命周期与系统服务的关系 4. Application ...

  9. Android系统默认Home应用程序(Launcher)的启动过程源代码分析

    在前面一篇文章中,我们分析了Android系统在启动时安装应用程序的过程,这些应用程序安装好之后,还需要有一个Home应用程序来负责把它们在桌面上展示出来,在Android系统中,这个默认的Home应 ...

最新文章

  1. sqlserver2000换成mysql_将Microsoft SQL Server 2000数据库转换成MySQL数据库
  2. python怎么显示结果_python中plot实现即时数据动态显示方法
  3. clickhouse原理解析与应用实践_编程好书推荐《Redis 深度历险:核心原理与应用实践》...
  4. python set()函数讲解
  5. c语言定义数组a10 指定各元素,C语言填空题.doc
  6. 趣图:21 副 GIF 动图让你了解各种数学概念
  7. devops handbook 读书笔记_DevOps教程:Azure DevOps
  8. 37线性映射04——像与核、核与像的计算、线性映射的维数公式
  9. Java Swing线程之SwingUtilities.invokeLater解释
  10. 一不小心就进入了P2P陷阱
  11. 德国铁路公司基于模型的铁路系统设计路线图 - 基于模型的系统开发在铁路部门的应用
  12. 淘宝客用微博推广方法
  13. 该怎么把光纤接入家里预埋的网线中?
  14. Java手写AVL树(非常详细)
  15. 3.2 腾讯云AI解决方案
  16. 解决关于win10下eclipse代码格式化不生效问题
  17. Element-UI实现对话框内播放视频
  18. 【QT实现TCP和UDP协议通信(一)】
  19. Python可视化数据分析02、Scrapy框架-强化测试Scrapy-CSS
  20. JVM学习笔记(上)

热门文章

  1. 【Python】利用高斯朴素贝叶斯模型实现光学字符识别
  2. 常用ASCII/ISO-8859-1/GB2312/GBK/UTF-8等字符编码梳理
  3. 内容平台的勾陈一:做好三件事,驶向星辰大海
  4. 多摄像头/跨境头多目标跟踪的简单实现
  5. oracle 连接数设置
  6. 魔方软件测试自学,魔方练习APP
  7. 相似问答检索——汽车之家的 Milvus 实践
  8. 【面试必备】java开发转算法工程师
  9. W5100硬件设计和调试要点
  10. 第二篇第三章建筑分类与耐火等级