zeromq源代码分析2------线/进程间通信方式
本文我们讲一下zeromq的线/进程间通信方式。
在zeromq源代码分析1中我们分析了zeromq的基本工作流程。我们了解到了zeromq的线/进程间通信的方式为消息传递。
zeromq中的各线程间通信的消息称之为command_t类型:
// This structure defines the commands that can be sent between threads.struct command_t{// Object to process the command.class object_t *destination;enum type_t{stop,plug,own,attach,bind,activate_reader,activate_writer,pipe_term,pipe_term_ack,term_req,term,term_ack,reap,reaped,done} type;union {// Sent to I/O thread to let it know that it should// terminate itself.struct {} stop;// Sent to I/O object to make it register with its I/O thread.struct {} plug;// Sent to socket to let it know about the newly created object.struct {class own_t *object;} own;// Attach the engine to the session. If engine is NULL, it informs// session that the connection have failed.struct {struct i_engine *engine;unsigned char peer_identity_size;unsigned char *peer_identity;} attach;// Sent from session to socket to establish pipe(s) between them.// Caller have used inc_seqnum beforehand sending the command.struct {class reader_t *in_pipe;class writer_t *out_pipe;unsigned char peer_identity_size;unsigned char *peer_identity;} bind;// Sent by pipe writer to inform dormant pipe reader that there// are messages in the pipe.struct {} activate_reader;// Sent by pipe reader to inform pipe writer about how many// messages it has read so far.struct {uint64_t msgs_read;} activate_writer;// Sent by pipe reader to pipe writer to ask it to terminate// its end of the pipe.struct {} pipe_term;// Pipe writer acknowledges pipe_term command.struct {} pipe_term_ack;// Sent by I/O object ot the socket to request the shutdown of// the I/O object.struct {class own_t *object;} term_req;// Sent by socket to I/O object to start its shutdown.struct {int linger;} term;// Sent by I/O object to the socket to acknowledge it has// shut down.struct {} term_ack;// Transfers the ownership of the closed socket// to the reaper thread.struct {class socket_base_t *socket;} reap;// Closed socket notifies the reaper that it's already deallocated.struct {} reaped;// Sent by reaper thread to the term thread when all the sockets// are successfully deallocated.struct {} done;} args;};
由此可见一个command_t主要包括destination,type,args,其中destination表明要传递命令消息的目的对象,type和args则是命令消息的类型和参数。
而发送消息的代码如下:
// Derived object can use these functions to send commands// to other objects.void send_stop ();void send_plug (class own_t *destination_,bool inc_seqnum_ = true);void send_own (class own_t *destination_,class own_t *object_);void send_attach (class session_t *destination_,struct i_engine *engine_, const blob_t &peer_identity_,bool inc_seqnum_ = true);void send_bind (class own_t *destination_,class reader_t *in_pipe_, class writer_t *out_pipe_,const blob_t &peer_identity_, bool inc_seqnum_ = true);void send_activate_reader (class reader_t *destination_);void send_activate_writer (class writer_t *destination_,uint64_t msgs_read_);void send_pipe_term (class writer_t *destination_);void send_pipe_term_ack (class reader_t *destination_);void send_term_req (class own_t *destination_,class own_t *object_);void send_term (class own_t *destination_, int linger_);void send_term_ack (class own_t *destination_);void send_reap (class socket_base_t *socket_);void send_reaped ();void send_done ();
void zmq::object_t::send_command (command_t &cmd_)
{ctx->send_command (cmd_.destination->get_tid (), cmd_);
}void zmq::ctx_t::send_command (uint32_t tid_, const command_t &command_)
{slots [tid_]->send (command_);
}
因为zeromq采用的模式基本上就是一个object和一个io_thread绑定起来,所以这里的意思就是我们将command发送给目的对象的io_thread的mailbox中去。
如果对ctx::slots不太懂,请回到zeromq源代码分析1并且结合源代码理解。
下面我们对mailbox来一个深入分析:
zmq::mailbox_t::mailbox_t ()
{
#ifdef PIPE_BUF// Make sure that command can be written to the socket in atomic fashion.// If this wasn't guaranteed, commands from different threads would be// interleaved.zmq_assert (sizeof (command_t) <= PIPE_BUF);
#endif// Create the socketpair for signaling.int rc = make_socketpair (&r, &w);errno_assert (rc == 0);// Set the writer to non-blocking mode.int flags = fcntl (w, F_GETFL, 0);errno_assert (flags >= 0);rc = fcntl (w, F_SETFL, flags | O_NONBLOCK);errno_assert (rc == 0);#ifndef MSG_DONTWAIT// Set the reader to non-blocking mode.flags = fcntl (r, F_GETFL, 0);errno_assert (flags >= 0);rc = fcntl (r, F_SETFL, flags | O_NONBLOCK);errno_assert (rc == 0);
#endif
}
该函数创建了socketpair,并且设置了writer和reader都为非阻塞模式。
而socketpair在UNIX上是有原生的实现的,在windows下面通过自己创建一个一对socket连接,客户端当reader,服务端当writer。
void zmq::mailbox_t::send (const command_t &cmd_)
{// Attempt to write an entire command without blocking.ssize_t nbytes;do {nbytes = ::send (w, &cmd_, sizeof (command_t), 0);} while (nbytes == -1 && errno == EINTR);// Attempt to increase mailbox SNDBUF if the send failed.if (nbytes == -1 && errno == EAGAIN) {int old_sndbuf, new_sndbuf;socklen_t sndbuf_size = sizeof old_sndbuf;// Retrieve current send buffer size.int rc = getsockopt (w, SOL_SOCKET, SO_SNDBUF, &old_sndbuf,&sndbuf_size);errno_assert (rc == 0);new_sndbuf = old_sndbuf * 2;// Double the new send buffer size.rc = setsockopt (w, SOL_SOCKET, SO_SNDBUF, &new_sndbuf, sndbuf_size);errno_assert (rc == 0);// Verify that the OS actually honored the request.rc = getsockopt (w, SOL_SOCKET, SO_SNDBUF, &new_sndbuf, &sndbuf_size);errno_assert (rc == 0);zmq_assert (new_sndbuf > old_sndbuf);// Retry the sending operation; at this point it must succeed.do {nbytes = ::send (w, &cmd_, sizeof (command_t), 0);} while (nbytes == -1 && errno == EINTR);}errno_assert (nbytes != -1);// This should never happen as we've already checked that command size is// less than PIPE_BUF.zmq_assert (nbytes == sizeof (command_t));
}
发送command,这边利用writer句柄调用send(4) syscall进行发送command。先尝试写,如果被中断则继续循环写。如果返回错误EAGAIN,由于是非阻塞模式写我们就会增加SND_BUF继续写,最后验证一下是否成功。
int zmq::mailbox_t::recv (command_t *cmd_, bool block_)
{
#ifdef MSG_DONTWAIT// Attempt to read an entire command. Returns EAGAIN if non-blocking// mode is requested and a command is not available.ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t),block_ ? 0 : MSG_DONTWAIT);if (nbytes == -1 && (errno == EAGAIN || errno == EINTR))return -1;
#else// If required, set the reader to blocking mode.if (block_) {int flags = fcntl (r, F_GETFL, 0);errno_assert (flags >= 0);int rc = fcntl (r, F_SETFL, flags & ~O_NONBLOCK);errno_assert (rc == 0);}// Attempt to read an entire command. Returns EAGAIN if non-blocking// and a command is not available. Save value of errno if we wish to pass// it to caller.int err = 0;ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), 0);if (nbytes == -1 && (errno == EAGAIN || errno == EINTR))err = errno;// Re-set the reader to non-blocking mode.if (block_) {int flags = fcntl (r, F_GETFL, 0);errno_assert (flags >= 0);int rc = fcntl (r, F_SETFL, flags | O_NONBLOCK);errno_assert (rc == 0);}// If the recv failed, return with the saved errno if set.if (err != 0) {errno = err;return -1;}#endif// Sanity check for success.errno_assert (nbytes != -1);// Check whether we haven't got half of command.zmq_assert (nbytes == sizeof (command_t));return 0;
接收command则根据参数block_可以选择是否阻塞地读。这边之所以可以直接使用
ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), 0);
是因为对于线程间的通信木有字节序不同的问题。
下面我们来讲一下接收的函数调用过程。
由于我们在zeromq源代码分析1中已经说明io_thread在构造函数中会将自己的mailbox的句柄加入到poller中,设置事件处理器就是自身的函数in_event()/out_event(),并且激活读事件。因此当poller轮询到mailbox的读事件,也就是有人发消息给该io_thread的mailbox时候,就能通过in_event()函数去recv消息。
void zmq::io_thread_t::in_event ()
{// TODO: Do we want to limit number of commands I/O thread can// process in a single go?while (true) {// Get the next command. If there is none, exit.command_t cmd;int rc = mailbox.recv (&cmd, false);if (rc != 0 && errno == EINTR)continue;if (rc != 0 && errno == EAGAIN)break;errno_assert (rc == 0);// Process the command.cmd.destination->process_command (cmd);}
}
这边收到command之后我们处理该command,即调用cmd的destination的process_command(1)函数,也就是与该io_thread绑定的object。
// These handlers can be overloaded by the derived objects. They are// called when command arrives from another thread.virtual void process_stop ();virtual void process_plug ();virtual void process_own (class own_t *object_);virtual void process_attach (struct i_engine *engine_,const blob_t &peer_identity_);virtual void process_bind (class reader_t *in_pipe_,class writer_t *out_pipe_, const blob_t &peer_identity_);virtual void process_activate_reader ();virtual void process_activate_writer (uint64_t msgs_read_);virtual void process_pipe_term ();virtual void process_pipe_term_ack ();virtual void process_term_req (class own_t *object_);virtual void process_term (int linger_);virtual void process_term_ack ();virtual void process_reap (class socket_base_t *socket_);virtual void process_reaped ();void zmq::object_t::process_command (command_t &cmd_)
{switch (cmd_.type) {case command_t::activate_reader:process_activate_reader ();break;case command_t::activate_writer:process_activate_writer (cmd_.args.activate_writer.msgs_read);break;case command_t::stop:process_stop ();break;case command_t::plug:process_plug ();process_seqnum ();return;...
这些process_xxx()函数在子类中override,处理各种线程间发送的命令。
下一篇文章会介绍zmq_msg_t以及相关的函数,敬请期待。希望有兴趣的朋友可以和我联系,一起学习。 kaka11.chen@gmail.com
zeromq源代码分析2------线/进程间通信方式相关推荐
- Android系统进程间通信(IPC)机制Binder中的Server启动过程源代码分析
原文地址: http://blog.csdn.net/luoshengyang/article/details/6629298 在前面一篇文章浅谈Android系统进程间通信(IPC)机制Binder ...
- Android系统进程间通信(IPC)机制Binder中的Client获得Server远程接口过程源代码分析(2)...
注意,这里的参数reply = 0,表示这是一个BC_TRANSACTION命令. 前面我们提到,传给驱动程序的handle值为0,即这里的tr->target.handle = ...
- binder 从c到java_Android系统进程间通信Binder机制在应用程序框架层的Java接口源代码分析...
在前面几篇文章中,我们详细介绍了Android系统进程间通信机制Binder的原理,并且深入分析了系统提供的Binder运行库和驱动程序的源代码.细心的读者会发现,这几篇文章分析的Binder接口都是 ...
- dat关闭某进程_超详细解析!工程师必会的Linux进程间通信方式和原理
▍进程的概念 · 进程是操作系统的概念,每当我们执行一个程序时,对于操作系统来讲就创建了一个进程,在这个过程中,伴随着资源的分配和释放.可以认为进程是一个程序的一次执行过程. ▍进程通信的概念 · 进 ...
- Android应用程序绑定服务(bindService)的过程源代码分析
Android应用程序组件Service与Activity一样,既可以在新的进程中启动,也可以在应用程序进程内部启动:前面我们已经分析了在新的进程中启动Service的过程,本文将要介绍在应用程序内部 ...
- Hadoop源代码分析
http://wenku.baidu.com/link?url=R-QoZXhc918qoO0BX6eXI9_uPU75whF62vFFUBIR-7c5XAYUVxDRX5Rs6QZR9hrBnUdM ...
- Android源代码分析之类方法与组件名词解释(持续更新)
Android源代码分析方法组件详解 1. 类方法 1.1 Handler方法 1.2 Looper方法 1.3 Binder 1.4 Instrumentation 2. 组件 2.1 AMS.WM ...
- Android Applicaion组件创建的源代码分析(Android 9,含序列图)
Application组件源代码分析 1. Applicaion启动流程源代码分析 2. 启动过程中应用进程.系统服务进程通信的分界点 3. 组件生命周期与系统服务的关系 4. Application ...
- Android系统默认Home应用程序(Launcher)的启动过程源代码分析
在前面一篇文章中,我们分析了Android系统在启动时安装应用程序的过程,这些应用程序安装好之后,还需要有一个Home应用程序来负责把它们在桌面上展示出来,在Android系统中,这个默认的Home应 ...
最新文章
- sqlserver2000换成mysql_将Microsoft SQL Server 2000数据库转换成MySQL数据库
- python怎么显示结果_python中plot实现即时数据动态显示方法
- clickhouse原理解析与应用实践_编程好书推荐《Redis 深度历险:核心原理与应用实践》...
- python set()函数讲解
- c语言定义数组a10 指定各元素,C语言填空题.doc
- 趣图:21 副 GIF 动图让你了解各种数学概念
- devops handbook 读书笔记_DevOps教程:Azure DevOps
- 37线性映射04——像与核、核与像的计算、线性映射的维数公式
- Java Swing线程之SwingUtilities.invokeLater解释
- 一不小心就进入了P2P陷阱
- 德国铁路公司基于模型的铁路系统设计路线图 - 基于模型的系统开发在铁路部门的应用
- 淘宝客用微博推广方法
- 该怎么把光纤接入家里预埋的网线中?
- Java手写AVL树(非常详细)
- 3.2 腾讯云AI解决方案
- 解决关于win10下eclipse代码格式化不生效问题
- Element-UI实现对话框内播放视频
- 【QT实现TCP和UDP协议通信(一)】
- Python可视化数据分析02、Scrapy框架-强化测试Scrapy-CSS
- JVM学习笔记(上)