Chapter2 - Sockets and Patterns

文章目录

  • Chapter2 - Sockets and Patterns
    • 1. The Socket API
      • 1.1 Plugging Sockets into the Topology
      • 1.2 Sending and Receiving Message
      • 1.3 Unicast(单播) Transports
      • 1.4 ZeroMQ is Not a Neutral Carrier
      • 1.5 I/O Threads
    • 2 Message Patterns
      • 2.1 High-Level Messaging Patterns
      • 2.2 Working with Messages
      • 2.3 Handling Multiple Sockets
      • 2.4 Multipart Messages
      • Intermediaries and Proxies
      • The Dynamic Discovery Problem
      • Shared Queue (DEALER and ROUTER sockets)
      • ZeroMQ’s Built-In Proxy Function
    • Transport Bridging
    • Handling Errors and ETERM
    • Handling Interrupt Signals
    • Multithreading with ZeroMQ

在 Chapter 1 - Basics,我们通过一些基础的例子了解了主要的ZeroMQ模式:request-reply, pub-sub, 和 pipeline。在本章,我们将开始学习如何在真正的程序中使用这些工具。

1. The Socket API

ZeroMQ呈现出一种熟悉的基于socket的API。Sockets的生命分为4部分,类似于 BSD sockets:

  • 创建和销毁sockets,它们一起形成了socket生命周期(见zmq_socket(), zmq_close())。
  • 配置sockets,在其上设置选项,或在必要时检查其上的选项值(见zmq_setsockopt(),zmq_getsockopt())。
  • 在网络拓扑中插入sockets,创建ZeroMQ连接(见zmq_bind,zmq_connect())。
  • 使用socket携带数据,在其上写消息或者收消息。

注意sockets总是void指针,消息是结构体。所以在C中你可以传递socket本身,但是你在所有与消息有关的函数中传递的是消息的地址,比如zmq_msg_send()zmq_msg_recv()。记住,在ZeroMQ中,所有你的sockets都属于我们,但是,消息是你实际代码所拥有的东西。

创建,销毁,和配置sockets与你所期待的其他对象一样,同样起作用。但是,记住ZeroMQ是异步且灵活的。这对我们如何将socket插入网络拓扑中,以及这之后如何使用sockets由一些影响。

1.1 Plugging Sockets into the Topology

为了创建两个节点之间的连接,在一个节点中使用zmq_bind(),在另一个节点中调用zmq_connect()。一般来说,调用zmq_bind()是"server",绑定一个公开的网络地址,调用zmq_connect()的节点是"client",使用不公开的或任意的网络地址。因此我们说"将套接字(socket)绑定到终端(endpoint)",和"连接一个套接字(socket)到终端(endpoint)",终端是那个公开的网络地址。

ZeroMQ 连接月典型的TCP连接有一些不同。主要的值得注意的不同点是:

  • 它们适用于任意的传输(inproc,ipc,tcp,pgm,或epgm)。见zmq_inproc(),zmq_ipc(),zmq_tcp(),zmq_pgm(),和zmq_epgm()。
  • 一个socket可能有很多outgoing和很多incoming的连接。
  • 没有zmq_accept()方法,当一个socket被绑定到一个endpoint,它自动的开始接受连接。
  • 网络连接在后台发生,如果网络连接broken,ZeroMQ将自动重连(e.g. 如果peer消失然后恢复)。
  • 你的应用代码不能直接使用这些连接进行工作,它们被封装在socket中。

很多架构依照某种client/server模型,大部分情况下,server是静态组件,clients是动态组件,i.e. 它们来来去去。在寻址上有一些问题:servers对client可见,但反过来未必如此。大部分情况下,很显然哪个节点应当调用zmq_bind()(the server),哪个节点应当调用zmq_connect()(the client)。这也取决于你正在使用哪种sokcet,除了一些不常见的网络架构。我们之后再看一看socket类型。

现在,假设我们再启动服务器之前启动客户端。在传统的网络中,我们获得一个大的红色失败标志。但是ZeroMQ让我们能够更加随意的开始和停止。一旦客户端节点执行zmq_connect(),连接便已经存在且节点能开始向socket写消息了。在一些阶段(希望是在消息队列达到存储上界,它开始丢弃消息,或者客户端阻塞),服务器开始工作,调用zmq_bind(),且ZeroMQ开始递交消息。

一个服务器节点对许多终端(协议和地址的组合)进行绑定,且它可以使用单个套接字这样做。这意味着可在在不同的transport上接受连接:

 zmq_bind(socket, "tcp://*:5555");zmq_bind(socket, "tcp:://*:9999");zmq_bind(socket, "inproc://someone");

With most transports, you cannot bind to the same endpoint twice, unlike for example in UDP. The ipc transport does, however, let one process bind to an endpoint already used by a first process. It`s meant to allow a process to recover after a crash.

大部分的传输中,无法对同一个 ip:port 绑定多次,这是因为 ip:port 套接字仅能绑定唯一的应用程序,向其传输网络数据。

Although ZeroMQ tries to be neutral(中立的) about which side binds and which side connects, there are differences. We’ll see these in more details later. The upshot(结果) is that you should usually think in terms of “servers” as static part of your topology that bind to more or less fixed endpoints, and “clients” as dynamic parts that come and go and connect to these endpoints.Then design your application around this model.The chances(机会) that it will “just work” are much better like that.

server 是相对静态的,client 是相对动态的。

Sockets have types. The socket type defines the semantics of the socket, its policies(政策) for routing message inwards and outwards, queuing, etc. You can connect certain types of socket together, e.g. a publisher socket and a subscriber socket. Sockets work together in “messaging patterns”. We’ll look at this in more details later.

  • ZeroMQ socket 拥有套接字类型,但是不是类似 Unix Socket 与协议相关。
  • 对于两个套接字之间的连接,传输,关闭等操作,两个套接字的类型需要满足 “messaging patterns” 的规定。

It is the ability to connect sockets in these different ways that gives ZeroMQ its basic power as a message queuing system. There are layers on top of this, such as proxies, which we’ll get to later. But essentially, with ZeroMQ you define your network architecture by plugging pieces together like a child’s construction toy.

1.2 Sending and Receiving Message

To send and receive message you use the zmq_msg_send() and zmq_msg_recv() methods. The names are conventional(符合习俗的), but ZeroMQ`s I/O model is different enough from the classic TCP model that you will need time to get your head around it.

  • 使用zmq_msg_send()zmq_msg_recv() 收发数据

Figure 9 - TCP sockets are 1 to 1

Let’s look at main difference between TCP sockets and ZeroMQ sockets when it comes to working with data:

  • ZeroMQ sockets carry messages, like UDP, rather than a stream of bytes as TCP does. A ZeroMQ message is length-specified binary data. We’ll come to message shortly; their desgin is optimized for performance and so a little trikey.
  • ZeroMQ socket 不使用字节流传输,使用报文。
  • ZeroMQ socket 使用长度指定的二进制数据作为消息传递。
  • ZeroMQ socket do their I/O in a backgroud thread. This means that message arrive in local input queues and are sent from local output queues, no matter what your application is busy doing.
  • ZeroMQ Socket 的 I/O 操作在后台线程完成。
  • ZeroMQ Socket 的 消息收发通过 input/output 队列完成。
  • ZeroMQ sockets have one-to-N routing(按指定路线发送) behavior built-in, according to the socket type.

The zmq_send() method does not actually send the message to the socket connection(s). it queues the message so that the I/O thread can send it asynchronously. It does not block except in some exception cases. So the message is not necessarily sent when zmq_send() return to your application.

  • zmq_send() 只进行发送消息入队,而实际将消息发送到socket连接,由后台I/O线程决定执行。

1.3 Unicast(单播) Transports

ZeroMQ provides a set of unicast transport(inproc, ipc, tcp) and multicast transports(epgm, pgm). Multicast is an advanced technique that we’ll come to later. Don’t even start using it unless you know that your fan-out ratios will make 1-to-N unicast impossible.

  • ZeroMQ 提供单播传输(inproc, ipc, tcp)和多播传输(epgm, pgm)。

For most common cases, use tcp, which is a disconnected TCP transport. It is elastic, protable, and fast enough for most cases. We call this disconnected because ZeroMQ’s tcp transport doesn’t require that the endpoint exists before you connect to it. Clients and servers can connect and bind at any time, can go and come back, and it remains transparent to applications.

  • ZeroMQ sockets 可以在任意时候进行连接,能够断开连接后在某一时刻恢复,这对调用应用来说完全透明。

The inter-process ipc transport is disconnected, like tcp. It has one limitation: it does not yet work on Windows. By convention we use endpoint names with an “.rpc” extension to avoid potential conflict with other file names. On Unix systems, if you are use ipc endpoints you need to create these with appropriate permissions otherwise they may not be shareable between processes running under different user IDs. You must also make sure all processes can access the files, e.g. by running in the same working directory.

  • windows 不支持 ipc socket
  • UNIX 下实现是FIFO?

The inter-thread transport, inproc, is a connected signaling transport. It is much faster than tcp or ipc. The transport has a specific limitation compared to tcp and ipc: the server must issue a bind before any client issues a connect. This is something future versions of ZeroMQ may fix, but at present this defines how us use inproc sockets. We create and bind one socket and start the child threads, which create and connect the other sockets.

1.4 ZeroMQ is Not a Neutral Carrier

A common question that newcomers to ZeroMQ ask is, “how do I wriet an XYZ server in ZeroMQ?” For example, “how I write an HTTP server in ZeroMQ?” The implication is that if we use normal sockets to carry HTTP requests and repsonses, we should be able to use ZeroMQ sockets to do the same, only much faseter an better.

The answer used to be “this is not how it works”. ZeroMQ is not a neutral carrier: it imposes a framing on the transport protocols it uses. This framing is not compatible with existing protocols, which tend to use their own framing. For example, compare an HTTP request and a ZeroMQ request, both over TCP/IP.

Figure 10 - HTTP on the Wire

The HTTP request uses CR-LF as its simplest framing delimiter(定界符), whereas ZeroMQ uses a length-specified frame. So you could write an HTTP-like protocol using ZeroMQ, using for example the request-reply socket pattern. But it would not be HTTP.

Figure 11 - ZeroMQ on the Wire

Since v3.3, however, ZeroMQ has a socket option called ZMQ_ROUTER_RAW that lets you read and write data without the ZeroMQ framing. You could use this to read and write proper HTTP requests and responses. Hardeep Singh contributed this change so that he could connect to Telnet servers from his ZeroMQ application. At time of writing this is still somewhat experimental, but it shows how ZeroMQ keeps evolving to solve new problems. Maybe the next patch will be yours.

ZeroMQ frame 有自己处理方式,比如说他发送的数据是指定长度的二进制数,某种意义上说它可能定义了自己消息帧格式。

1.5 I/O Threads

We said that ZeroMQ does I/O in a background thread. One I/O thread (for all sockets) is sufficient for all but the most extreme applications. When you create a new context, it starts with one I/O thread. The general rule of thumb(一般经验法则) is to allow one I/O thread per gigabyte(十亿字节) of data in or out per second. To raise the number of I/O threads, use the zmq_ctx_set() call before creating any sockets:

int io_threads = 4;
void *context = zmq_ctx_new ();
zmq_ctx_set (context, ZMQ_IO_THREADS, io_threads);
assert (zmq_ctx_get (context, ZMQ_IO_THREADS) == io_threads);

后台运行的I/O线程随着context的创建而创建,可以通过 zmq_ctx_set() 来设置后台运行的I/O线程数。

We’ve seen that one socket can handle dozens, even thousands of connections at once. This has a fundamental impact on how you write applications. A traditional networked application has one process or one thread per remote connection, and that process or thread handles one socket. ZeroMQ lets you collapse this entire structure into a single process and then break it up as necessary for scaling.

If you are using ZeroMQ for inter-thread communications only (i.e., a multithreaded application that does no external socket I/O) you can set the I/O threads to zero. It’s not a significant optimization though, more of a curiosity.

2 Message Patterns

Underneath the brown paper wrapping of ZeroMQ’s socket API lies the world of messaging patterns. If you have a background in enterprise messaging, or know UDP well, these will be vaguely familiar. But to most ZeroMQ newcomers, they are a surprise. We’re so used to the TCP paradigm where a socket maps one-to-one to another node.

Let’s recap briefly what ZeroMQ does for you. It delivers blobs of data (messages) to nodes, quickly and efficiently. You can map nodes to threads, processes, or nodes. ZeroMQ gives your applications a single socket API to work with, no matter what the actual transport (like in-process, inter-process, TCP, or multicast). It automatically reconnects to peers as they come and go. It queues messages at both sender and receiver, as needed. It limits these queues to guard processes against running out of memory. It handles socket errors. It does all I/O in background threads. It uses lock-free techniques for talking between nodes, so there are never locks, waits, semaphores, or deadlocks.

  • ZeroMQ向节点传递消息,快速而有效。
  • 你可以将节点映射到线程,进程,或节点。
  • ZeroMQ 给出一套socket API,可以在进程间,进程内(线程间),TCP 或 多播中通讯。
  • ZeroMQ 有自动重连机制。
  • ZeroMQ 将消息在发送端和接受端给消息排队。
  • ZeroMQ给队列加以限制,以保证进程不会将内存耗尽。
  • ZeroMQ 处理 sockets 错误。
  • ZeroMQ 在后台线程中处理所有I/O。
  • ZeroMQ 在不同节点的通讯中使用无锁技术,所以永远不会有锁,等待,信号或者死锁。

But cutting through that, it routes and queues messages according to precise recipes called patterns. It is these patterns that provide ZeroMQ’s intelligence. They encapsulate our hard-earned experience of the best ways to distribute data and work. ZeroMQ’s patterns are hard-coded but future versions may allow user-definable patterns.

ZeroMQ patterns are implemented by pairs of sockets with matching types. In other words, to understand ZeroMQ patterns you need to understand socket types and how they work together. Mostly, this just takes study; there is little that is obvious at this level.

The built-in core ZeroMQ patterns are:

  • Request-reply, which connects a set of clients to a set of services. This is a remote procedure call and task distribution pattern.
  • Pub-sub, which connects a set of publishers to a set of subscribers. This is a data distribution pattern.
  • Pipeline, which connects nodes in a fan-out/fan-in pattern that can have multiple steps and loops. This is a parallel task distribution and collection pattern.
  • Exclusive pair, which connects two sockets exclusively. This is a pattern for connecting two threads in a process, not to be confused with “normal” pairs of sockets.

We looked at the first three of these in Chapter 1 - Basics, and we’ll see the exclusive pair pattern later in this chapter. The zmq_socket() man page is fairly clear about the patterns – it’s worth reading several times until it starts to make sense. These are the socket combinations that are valid for a connect-bind pair (either side can bind):

* PUB and SUB
* REQ and REP
* REQ and ROUTER(take care, REQ inserts an extra null frame)
* DEALER and REP(take care, REP assumes a null frame)
* DEALER and ROUTER
* DEALER and DEALER
* ROUTER and ROUTER
* PUSH and PULL
* PAIR and PAIR

You’ll also see references to XPUB and XSUB sockets, which we’ll come to later (they’re like raw versions of PUB and SUB). Any other combination will produce undocumented and unreliable results, and future versions of ZeroMQ will probably return errors if you try them. You can and will, of course, bridge other socket types via code, i.e., read from one socket type and write to another.

2.1 High-Level Messaging Patterns

These four core patterns are cooked into ZeroMQ. They are part of the ZeroMQ API, implemented in the core C++ library, and are guaranteed to be available in all fine retail stores.

On top of those, we add high-level messaging patterns. We build these high-level patterns on top of ZeroMQ and implement them in whatever language we’re using for our application. They are not part of the core library, do not come with the ZeroMQ package, and exist in their own space as part of the ZeroMQ community. For example the Majordomo pattern, which we explore in Chapter 4 - Reliable Request-Reply Patterns, sits in the GitHub Majordomo project in the ZeroMQ organization.

One of the things we aim to provide you with in this book are a set of such high-level patterns, both small (how to handle messages sanely) and large (how to make a reliable pub-sub architecture).

2.2 Working with Messages

The libzmq core library has in fact two APIs to send and receive messages. The zmq_send() and zmq_recv() methods that we’ve already seen and used are simple one-liners. We will use these often, but zmq_recv() is bad at dealing with arbitrary message sizes: it truncates messages to whatever buffer size you provide. So there’s a second API that works with zmq_msg_t structures, with a richer but more difficult API:

  • Initialise a message: zmq_msg_init(), zmq_msg_init_size(), zmq_msg_init_data().
  • Sending and receiving a message: zmq_msg_send(), zmq_msg_recv().
  • Release a message: zmq_msg_close().
  • Access message content: zmq_msg_data(), zmq_msg_size(), zmq_msg_more().
  • Work with message properties: zmq_msg_get(), zmq_msg_set().
  • Message manipulation: zmq_msg_copy(), zmq_msg_move().

On the wire, ZeroMQ messages are blobs of any size from zero upwards that fit in memory. You do your own serialization using protocol buffers, msgpack, JSON, or whatever else your applications need to speak. It’s wise to choose a data representation that is portable, but you can make your own decisions about trade-offs.

ZeroMQ 消息是 size 大于0的blobs(很大的二进制数据块)。
对二进制数据序列化的操作需要额外去做。

In memory, ZeroMQ messages are zmq_msg_t structures (or classes depending on your language). Here are the basic ground rules for using ZeroMQ messages in C:

  • You create and pass around zmq_msg_t objects, not blocks of data.
  • To read a message, you use zmq_msg_init() to create an empty message, and then you pass that to zmq_msg_recv().
  • To write a message from new data, you use zmq_msg_init_size() to create a message and at the same time allocate a block of data of some size. You then fill that data using memcpy, and pass the message to zmq_msg_send().
  • To release (not destroy) a message, you call zmq_msg_close(). This drops a reference, and eventually ZeroMQ will destroy the message.
  • To access the message content, you use zmq_msg_data(). To know how much data the message contains, use zmq_msg_size().
  • Do not use zmq_msg_move(), zmq_msg_copy(), or zmq_msg_init_data() unless you read the man pages and know precisely why you need these.
  • After you pass a message to zmq_msg_send(), ØMQ will clear the message, i.e., set the size to zero. You cannot send the same message twice, and you cannot access the message data after sending it.
  • These rules don’t apply if you use zmq_send() and zmq_recv(), to which you pass byte arrays, not message structures.
  • zmq_msg_init() 创建一个空消息,可用于读,调用zmq_msg_recv()
  • zmq_msg_init_size() 创建一个指定大小的消息(分配指定尺寸的数据块),然后用memcpy写内存,再用zmq_msg_send() 发送消息
  • zmq_msg_close()用于 release 一个消息,而不是 destory,本质是内部消息的引用计数减一,最终的destory由ZeroMQ来做
  • zmq_msg_data() 用于访问数据内容,zmq_msg_size()用于访问数据大小
  • zmg_msg_t实行所有权转移,zmq_msg_send()或获取zmq_msg_t的所有权,导致原来的(被发送的)zmq_msg_t无效,比如说将其尺寸设为0,当发送消息后,你不能再访问它。

Here is a typical chunk of code working with messages that should be familiar if you
have been paying attention. This is from the zhelpers.h file we use in all the examples:

// Receive 0MQ string from socket and convert into C string
static char * s_recv (void *socket) {zmq_msg_t message;zmq_msg_init (&message);int size = zmq_msg_recv (&message, socket, 0);if (size == -1)return NULL;char *string = malloc (size + 1);memcpy (string, zmq_msg_data (&message), size);zmq_msg_close (&message);string [size] = 0;return (string);
}
// Convert C string to 0MQ string and send to socket
static int s_send (void *socket, char *string) {zmq_msg_t message;zmq_msg_init_size (&message, strlen (string));memcpy (zmq_msg_data (&message), string, strlen (string));int size = zmq_msg_send (&message, socket, 0);zmq_msg_close (&message);return (size);
}

You can easily extend this code to send and receive blobs of arbitrary length.

If you want to send the same message more than once, and it’s sizable, create a second message, initialize it using zmq_msg_init(), and then use zmq_msg_copy() to create a copy of the first message.This does not copy the data but copies a reference. You can then send the message twice (or more, if you create more copies) and the message will only be finally destroyed when the last copy is sent or closed.

ZeroMQ also supports multipart messages, which let you send or receive a list of frames as a single on-the-wire message. This is widely used in real applications and we’ll look at that later in this chapter and in Chapter 3 - Advanced Request-Reply Patterns.

Frames (also called “message parts” in the ZeroMQ reference manual pages) are the basic wire format for ZeroMQ messages. A frame is a length-specified block of data. The length can be zero upwards. If you’ve done any TCP programming you’ll appreciate why frames are a useful answer to the question “how much data am I supposed to read of this network socket now?”

  • frames 是长度特定的数据块。

There is a wire-level protocol called ZMTP that defines how ZeroMQ reads and writes frames on a TCP connection. If you’re interested in how this works, the spec is quite short.

Originally, a ZeroMQ message was one frame, like UDP. We later extended this with multipart messages, which are quite simply series of frames with a “more” bit set to one, followed by one with that bit set to zero. The ZeroMQ API then lets you write messages with a “more” flag and when you read messages, it lets you check if there’s “more”.

  • 最初,ZeroMQ 消息是一个frame,像UDP一样。
  • 随后,ZeroMQ 消息被拓展为 multipart messages。frame 中有一位标志位more,当more置为1时,表示有后续frame,当more被置为0时,表示消息结束。

In the low-level ZeroMQ API and the reference manual, therefore, there’s some fuzziness(模糊性) about messages versus frames. So here’s a useful lexicon:

  • A message can be one or more parts.
  • These parts are also called “frames”.
  • Each part is a zmq_msg_t object.
  • You send and receive each part separately, in the low-level API.
  • Higher-level APIs provide wrappers to send entire multipart messages.

一条完整的消息可能由一个多多个frame组成,frame再ZeroMQ中式 zmq_msg_t 对象,查看zmq_msg_set() 设置frame属性。

Some other things that are worth knowing about messages:

  • You may send zero-length messages, e.g., for sending a signal from one thread to another.
  • ZeroMQ guarantees to deliver all the parts (one or more) for a message, or none of them.
  • ZeroMQ does not send the message (single or multipart) right away, but at some indeterminate later time. A multipart message must therefore fit in memory.
  • A message (single or multipart) must fit in memory. If you want to send files of arbitrary sizes, you should break them into pieces and send each piece as separate single-part messages. Using multipart data will not reduce memory consumption.
  • You must call zmq_msg_close() when finished with a received message, in languages that don’t automatically destroy objects when a scope closes. You don’t call this method after sending a message.

在使用完接收到的消息后,必须调用zmq_msg_close()来释放引用计数,在发送消息后,不必调用。

And to be repetitive, do not use zmq_msg_init_data() yet. This is a zero-copy method and is guaranteed to create trouble for you. There are far more important things to learn about ZeroMQ before you start to worry about shaving off microseconds.

This rich API can be tiresome to work with. The methods are optimized for performance, not simplicity. If you start using these you will almost definitely get them wrong until you’ve read the man pages with some care. So one of the main jobs of a good language binding is to wrap this API up in classes that are easier to use.

将ZeroMQ API 封装为类。

2.3 Handling Multiple Sockets

In all the examples so far, the main loop of most examples has been:

  1. Wait for message on socket.
  2. Process message.
  3. Repeat.

What if we want to read from multiple endpoints at the same time? The simplest way is to connect one socket to all the endpoints and get ZeroMQ to do the fan-in for us. This is legal if the remote endpoints are in the same pattern, but it would be wrong to connect a PULL socket to a PUB endpoint.

To actually read from multiple sockets all at once, use zmq_poll(). An even better way might be to wrap zmq_poll() in a framework that turns it into a nice event-driven reactor, but it’s significantly more work than we want to cover here.

Let’s start with a dirty hack, partly for the fun of not doing it right, but mainly because it lets me show you how to do nonblocking socket reads. Here is a simple example of reading from two sockets using nonblocking reads. This rather confused program acts both as a subscriber to weather updates, and a worker for parallel tasks:

//
// Reading from multiple sockets
// This version uses a simple recv loop
//
#include "zhelpers.h"
int main (void) {// Prepare our context and socketsvoid *context = zmq_ctx_new ();// Connect to task ventilatorvoid *receiver = zmq_socket (context, ZMQ_PULL);zmq_connect (receiver, "tcp://localhost:5557");// Connect to weather servervoid *subscriber = zmq_socket (context, ZMQ_SUB);zmq_connect (subscriber, "tcp://localhost:5556");zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "10001 ", 6);// Process messages from both sockets// We prioritize traffic from the task ventilatorwhile (1) {// Process any waiting tasksint rc;for (rc = 0; !rc; ) {zmq_msg_t task;zmq_msg_init (&task);if ((rc = zmq_msg_recv (&task, receiver, ZMQ_DONTWAIT)) != -1) {// Process task}zmq_msg_close (&task);}// Process any waiting weather updatesfor (rc = 0; !rc; ) {zmq_msg_t update;zmq_msg_init (&update);if ((rc = zmq_msg_recv (&update, subscriber, ZMQ_DONTWAIT)) != -1) {// Process weather update}zmq_msg_close (&update);}// No activity, so sleep for 1 msecs_sleep (1);}// We never get here, but clean up anyhowzmq_close (receiver);zmq_close (subscriber);zmq_ctx_destroy (context);return 0;
}

The cost of this approach is some additional latency on the first message (the sleep at the end of the loop, when there are no waiting messages to process). This would be a problem in applications where submillisecond latency was vital. Also, you need to check the documentation for nanosleep() or whatever function you use to make sure it does not busy-loop.

  • zmq_poll实现,loop终止后,如果没有等待处理的消息则会sleep(nanosleep),以避免busy-loop。

You can treat the sockets fairly by reading first from one, then the second rather than prioritizing them as we did in this example.
Example 2-2 shows the same little senseless application done right, using zmq_poll().
Example 2-2. Multiple socket poller (mspoller.c)

//
// Reading from multiple sockets
// This version uses zmq_poll()
//
#include "zhelpers.h"
int main (void) {void *context = zmq_ctx_new ();// Connect to task ventilatorvoid *receiver = zmq_socket (context, ZMQ_PULL);zmq_connect (receiver, "tcp://localhost:5557");// Connect to weather servervoid *subscriber = zmq_socket (context, ZMQ_SUB);zmq_connect (subscriber, "tcp://localhost:5556");zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "10001 ", 6);// Initialize poll setzmq_pollitem_t items [] = {{ receiver, 0, ZMQ_POLLIN, 0 },{ subscriber, 0, ZMQ_POLLIN, 0 }};// Process messages from both socketswhile (1) {zmq_msg_t message;zmq_poll (items, 2, -1);if (items [0].revents & ZMQ_POLLIN) {zmq_msg_init (&message);zmq_msg_recv (&message, receiver, 0);// Process taskzmq_msg_close (&message);}if (items [1].revents & ZMQ_POLLIN) {zmq_msg_init (&message);zmq_msg_recv (&message, subscriber, 0);// Process weather updatezmq_msg_close (&message);}}// We never get herezmq_close (receiver);zmq_close (subscriber);zmq_ctx_destroy (context);return 0;
}

The items structure has these four members:

typedef struct {void *socket;       //  ZeroMQ socket to poll onint fd;             //  OR, native file handle to poll onshort events;       //  Events to poll onshort revents;      //  Events returned after poll
} zmq_pollitem_t;

2.4 Multipart Messages

ZeroMQ lets us compose a message out of several frames, giving us a “multipart message”. Realistic applications use multipart messages heavily, both for wrapping messages with address information and for simple serialization. We’ll look at reply envelopes later.

  • ZeroMQ将几个frame组装为一条消息,给出一条"multipart message"。
  • 为了用地址信息封装消息,和简单的序列化。

What we’ll learn now is simply how to blindly and safely read and write multipart messages in any application (such as a proxy) that needs to forward messages without inspecting them.

  • proxy–直接转发消息,不检查它们。

When you work with multipart messages, each part is a zmq_msg item. E.g., if you are sending a message with five parts, you must construct, send, and destroy five zmq_msg items. You can do this in advance (and store the zmq_msg items in an array or other structure), or as you send them, one-by-one.

Here is how we send the frames in a multipart message (we receive each frame into a message object):

zmq_msg_send (&message, socket, ZMQ_SNDMORE);
...
zmq_msg_send (&message, socket, ZMQ_SNDMORE);
...
zmq_msg_send (&message, socket, 0);

Here is how we receive and process all the parts in a message, be it single part or multipart:

while (1) {zmq_msg_t message;zmq_msg_init (&message);zmq_msg_recv (&message, socket, 0);//  Process the message frame...zmq_msg_close (&message);if (!zmq_msg_more (&message))break;      //  Last message frame
}

Some things to know about multipart messages:

  • When you send a multipart message, the first part (and all following parts) are only actually sent on the wire when you send the final part.
  • If you are using zmq_poll(), when you receive the first part of a message, all the rest has also arrived.
  • You will receive all parts of a message, or none at all.
  • Each part of a message is a separate zmq_msg item.
  • You will receive all parts of a message whether or not you check the more property.
  • On sending, ZeroMQ queues message frames in memory until the last is received, then sends them all.
  • There is no way to cancel a partially sent message, except by closing the socket.

发送"muiltipart message"时,仅当最后一个消息被发送时,调用zmq_msg_send(),才是真正进行I/O操作,向对端按序发送所有的"muiltipart message",在这之前,frame存储在 ZeroMQ 队列中。

Intermediaries and Proxies

ZeroMQ aims for decentralized intelligence, but that doesn’t mean your network is empty space in the middle. It’s filled with message-aware infrastructure and quite often, we build that infrastructure with ZeroMQ. The ZeroMQ plumbing can range from tiny pipes to full-blown service-oriented brokers. The messaging industry calls this intermediation, meaning that the stuff in the middle deals with either side. In ZeroMQ, we call these proxies, queues, forwarders, device, or brokers, depending on the context.

ZeroMQ 致力于 decentralized intelligence。

This pattern is extremely common in the real world and is why our societies and economies are filled with intermediaries who have no other real function than to reduce the complexity and scaling costs of larger networks. Real-world intermediaries are typically called wholesalers, distributors, managers, and so on.

中介机构降低了大型网络的复杂性和扩展性的花费。

The Dynamic Discovery Problem

One of the problems you will hit as you design larger distributed architectures is discovery. That is, how do pieces know about each other? It’s especially difficult if pieces come and go, so we call this the “dynamic discovery problem”.

当你在设计大型分布式系统时,你会遇到"dynamic discovery problem" – 如何使网络中的各部分发现彼此,特别是当它们时而加入时而离开网络。

There are several solutions to dynamic discovery. The simplest is to entirely avoid it by hard-coding (or configuring) the network architecture so discovery is done by hand. That is, when you add a new piece, you reconfigure the network to know about it.

使用硬编码(或者配置文件)配置网络结构,所以发现被手动完成,但是每次一个新的piece加入,你都需要重新配置。

Figure 12 - Small-Scale Pub-Sub Network

In practice, this leads to increasingly fragile and unwieldy architectures. Let’s say you have one publisher and a hundred subscribers. You connect each subscriber to the publisher by configuring a publisher endpoint in each subscriber. That’s easy. Subscribers are dynamic; the publisher is static. Now say you add more publishers. Suddenly, it’s not so easy any more. If you continue to connect each subscriber to each publisher, the cost of avoiding dynamic discovery gets higher and higher.

Figure 13 - Pub-Sub Network with a Proxy

There are quite a few answers to this, but the very simplest answer is to add an intermediary; that is, a static point in the network to which all other nodes connect. In classic messaging, this is the job of the message broker. ZeroMQ doesn’t come with a message broker as such, but it lets us build intermediaries quite easily.

最简单的方法是添加一个中介,它是,网络上的一个静态节点,所有的其他节点都可以连接。在典型的消息中,这是消息代理(message broker)的工作。ZeroMQ 没有提供这样的消息代理,但是它使我们构建中介十分容易。

You might wonder, if all networks eventually get large enough to need intermediaries, why don’t we simply have a message broker in place for all applications? For beginners, it’s a fair compromise. Just always use a star topology, forget about performance, and things will usually work. However, message brokers are greedy things; in their role as central intermediaries, they become too complex, too stateful, and eventually a problem.

“message broker” 使用星型拓扑,它是中心的中介者,所以它变得十分复杂,too stateful,这最终成为一个问题,它的状态影响了所有网络上其他的节点。

It’s better to think of intermediaries as simple stateless message switches. A good analogy is an HTTP proxy; it’s there, but doesn’t have any special role. Adding a pub-sub proxy solves the dynamic discovery problem in our example. We set the proxy in the “middle” of the network. The proxy opens an XSUB socket, an XPUB socket, and binds each to well-known IP addresses and ports. Then, all other processes connect to the proxy, instead of to each other. It becomes trivial to add more subscribers or publishers.

将中介视为简单的无状态的消息转换器更好。一个好的类比就是HTTP proxy;它在那儿,但是没有任何特殊的角色。添加一个 pub-sub proxy 将在我们的例子中解决发现的问题。我们在协议“中部”设置代理。Proxy 打开一个 XSUB socket,一个 XPUB socket,并绑定每个well-know IP 地址和端口。然后,所有的进程连接proxy,而不是彼此。添加更多的订阅者和发布者变得更简单。

Figure 14 - Extended Pub-Sub

We need XPUB and XSUB sockets because ZeroMQ does subscription forwarding from subscribers to publishers. XSUB and XPUB are exactly like SUB and PUB except they expose subscriptions as special messages. The proxy has to forward these subscription messages from subscriber side to publisher side, by reading them from the XPUB socket and writing them to the XSUB socket. This is the main use case for XSUB and XPUB.

我们需要 XSUB 和 XPUB 套接字,因为ZeroMQ 执行从订阅者到发布者的订阅转发。XSUB 和 XPUB 实际上与SUB和PUB十分类似,除了将订阅视为一种特殊消息。代理会转发从订阅者到发布者的订阅消息,通过从XPUB socket中读取订阅消息,向XSUB套接字写消息。这就是XSUB和XPUB使用的主要情境。

Shared Queue (DEALER and ROUTER sockets)

In the Hello World client/server application, we have one client that talks to one service. However, in real cases we usually need to allow multiple services as well as multiple clients. This lets us scale up the power of the service (many threads or processes or nodes rather than just one). The only constraint is that services must be stateless, all state being in the request or in some shared storage such as a database.

多服务多客户端的情况下,要求服务是多线程/进程/节点的。唯一的限制是服务必须是无状态的,所有的状态必须在请求或者共享存储中,比如数据库。

Figure 15 - Request Distribution

There are two ways to connect multiple clients to multiple servers. The brute force way is to connect each client socket to multiple service endpoints. One client socket can connect to multiple service sockets, and the REQ socket will then distribute requests among these services. Let’s say you connect a client socket to three service endpoints; A, B, and C. The client makes requests R1, R2, R3, R4. R1 and R4 go to service A, R2 goes to B, and R3 goes to service C.

连接client socket到多个服务终端,REQ socket 将分发请求到这些服务。

This design lets you add more clients cheaply. You can also add more services. Each client will distribute its requests to the services. But each client has to know the service topology. If you have 100 clients and then you decide to add three more services, you need to reconfigure and restart 100 clients in order for the clients to know about the three new services.

That’s clearly not the kind of thing we want to be doing at 3 a.m. when our supercomputing cluster has run out of resources and we desperately need to add a couple of hundred of new service nodes. Too many static pieces are like liquid concrete: knowledge is distributed and the more static pieces you have, the more effort it is to change the topology. What we want is something sitting in between clients and services that centralizes all knowledge of the topology. Ideally, we should be able to add and remove services or clients at any time without touching any other part of the topology.

这显然不是我们凌晨3点想做的事。当我们的炒鸡计算机群已经资源耗尽,且我们迫切需要立即增加新的服务节点。太多的静态pieces就像液压混凝土:knowledage 是分布式的,越多的静态 pieces,就要花费越多的努力来改变网络拓扑结构。我们想要某样东西能够在客户端和服务之间能集中所有的拓扑k
knowledage(信息?)。理想化的,我们应当能够在任意时刻添加和移除服务或者客户端,且不涉及到其他的拓扑部分。

So we’ll write a little message queuing broker that gives us this flexibility. The broker binds to two endpoints, a frontend for clients and a backend for services. It then uses zmq_poll() to monitor these two sockets for activity and when it has some, it shuttles messages between its two sockets. It doesn’t actually manage any queues explicitly–ZeroMQ does that automatically on each socket.

所以我们将写一个小的消息队列broker来给与我们这种便利。broker绑定两个终端,一个是用于clients的前端,一个是用于services的后端。然后使用zmq_poll()来监视这两个sockets的活动,当有一些活动时,它在两个套接字之间传递消息。它实际上没有显式的管理任何队列–ZeroMQ自动的在每个socket上去做这些工作。

When you use REQ to talk to REP, you get a strictly synchronous request-reply dialog. The client sends a request. The service reads the request and sends a reply. The client then reads the reply. If either the client or the service try to do anything else (e.g., sending two requests in a row without waiting for a response), they will get an error.

REQ-REP模式,是严格同步的请求-响应会话。

But our broker has to be nonblocking. Obviously, we can use zmq_poll() to wait for activity on either socket, but we can’t use REP and REQ.

Figure 16 - Extended Request-Reply

Luckily, there are two sockets called DEALER and ROUTER that let you do nonblocking request-response. You’ll see in Chapter 3 - Advanced Request-Reply Patterns how DEALER and ROUTER sockets let you build all kinds of asynchronous request-reply flows. For now, we’re just going to see how DEALER and ROUTER let us extend REQ-REP across an intermediary, that is, our little broker.

In this simple extended request-reply pattern, REQ talks to ROUTER and DEALER talks to REP. In between the DEALER and ROUTER, we have to have code (like our broker) that pulls messages off the one socket and shoves them onto the other.

The request-reply broker binds to two endpoints, one for clients to connect to (the frontend socket) and one for workers to connect to (the backend). To test this broker, you will want to change your workers so they connect to the backend socket. Here is a client that shows what I mean:

//
// Hello World client
// Connects REQ socket to tcp://localhost:5559
// Sends "Hello" to server, expects "World" back
//
#incl
#include "zhelpers.h"
int main (void) {void *context = zmq_ctx_new ();// Socket to talk to servervoid *requester = zmq_socket (context, ZMQ_REQ);zmq_connect (requester, "tcp://localhost:5559");int request_nbr;for (request_nbr = 0; request_nbr != 10; request_nbr++) {s_send (requester, "Hello");char *string = s_recv (requester);printf ("Received reply %d [%s]\n", request_nbr, string);free (string);}zmq_close (requester);zmq_ctx_destroy (context);return 0;
}
//
// Hello World worker
// Connects REP socket to tcp://*:5560
// Expects "Hello" from client, replies with "World"
//
#include "zhelpers.h"
int main (void) {void *context = zmq_ctx_new ();// Socket to talk to clientsvoid *responder = zmq_socket (context, ZMQ_REP);zmq_connect (responder, "tcp://localhost:5560");while (1) {// Wait for next request from clientchar *string = s_recv (responder);printf ("Received request: [%s]\n", string);free (string);// Do some 'work'sleep (1);// Send reply back to clients_send (responder, "World");}// We never get here, but clean up anyhowzmq_close (responder);zmq_ctx_destroy (context);return 0;
}
//
// Simple request-reply broker
//
int main (void) {// Prepare our context and socketsvoid *context = zmq_ctx_new ();void *frontend = zmq_socket (context, ZMQ_ROUTER);void *backend = zmq_socket (context, ZMQ_DEALER);zmq_bind (frontend, "tcp://*:5559");zmq_bind (backend, "tcp://*:5560");// Initialize poll setzmq_pollitem_t items [] = {{ frontend, 0, ZMQ_POLLIN, 0 },{ backend, 0, ZMQ_POLLIN, 0 }};// Switch messages between socketswhile (1) {zmq_msg_t message;int more; // Multipart detectionzmq_poll (items, 2, -1);if (items [0].revents & ZMQ_POLLIN) {while (1) {// Process all parts of the messagezmq_msg_init (&message);zmq_msg_recv (&message, frontend, 0);size_t more_size = sizeof (more);zmq_getsockopt (frontend, ZMQ_RCVMORE, &more, &more_size);zmq_msg_send (&message, backend, more? ZMQ_SNDMORE: 0);zmq_msg_close (&message);if (!more)break; // Last message part}}if (items [1].revents & ZMQ_POLLIN) {while (1) {// Process all parts of the messagezmq_msg_init (&message);zmq_msg_recv (&message, backend, 0);size_t more_size = sizeof (more);zmq_getsockopt (backend, ZMQ_RCVMORE, &more, &more_size);zmq_msg_send (&message, frontend, more? ZMQ_SNDMORE: 0);zmq_msg_close (&message);if (!more)break; // Last message part}}}// We never get here, but clean up anyhowzmq_close (frontend);zmq_close (backend);zmq_ctx_destroy (context);return 0;
}

Using a request-reply broker makes your client/server architectures easier to scale, be‐
cause clients don’t see workers and workers don’t see clients. The only static node is the
broker in the middle (Figure 2-9).

Figure 17 - Request-Reply Broker

Using a request-reply broker makes your client/server architectures easier to scale because clients don’t see workers, and workers don’t see clients. The only static node is the broker in the middle.

ZeroMQ’s Built-In Proxy Function

It turns out that the core loop in the previous section’s rrbroker is very useful, and reusable. It lets us build pub-sub forwarders and shared queues and other little intermediaries with very little effort. ZeroMQ wraps this up in a single method, zmq_proxy():

zmq_proxy(frontend, backend, capture);

The two (or three sockets, if we want to capture data) must be properly connected, bound, and configured. When we call the zmq_proxy method, it’s exactly like starting the main loop of rrbroker. Let’s rewrite the request-reply broker to call zmq_proxy, and re-badge this as an expensive-sounding “message queue” (people have charged houses for code that did less):

Example 2-6. Message queue broker (msgqueue.c)

//
// Simple message queuing broker
// Same as request-reply broker but using QUEUE device
//
#include "zhelpers.h"
int main(void) {void *content = zmq_ctx_new();// Socket facing clientsvoid* frontend = zmq_socket(context, ZMQ_ROUTER);int rc = zmq_bind(frontend, "tcp://*:5559");assert(rc == 0);// Socket facing servicesvoid *backend = zmq_socket(context, ZMQ_DEALER);rc = zmq_bind(backend, "tcp://*:5560");assset(rc == 0);// Start the proxyzmq_proxy(frontend, backend, NULL);// We never get here...zmq_close(frontend);zmq_close(backend);zmq_ctx_destory(context);return 0;
}

If you’re like most ØMQ users, at this stage you’re starting to think, “What kind of evil stuff can I do if I plug random socket types into the proxy?” The short answer is: try it and work out what is happening. In practice, you would usually stick to ROUTER/DEALER, XSUB/XPUB, or PULL/PUSH.

Transport Bridging

A frequent request from ØMQ users is, “How do I connect my ØMQ network with technology X?” where X is some other networking or messaging technology.

The simple answer is to build a “bridge.” A bridge is a small application that speaks one protocol at one socket, and converts to/from a second protocol at another socket. A protocol in‐terpreter, if you like. A common bridging problem in ØMQ is to bridge two transports or networks.

As an example, we’re going to write a little proxy (Example 2-7) that sits in between a publisher and a set of subscribers, bridging two networks. The frontend socket (SUB) faces the internal network where the weather server is sitting, and the backend (PUB) faces subscribers on the external network. It subscribes to the weather service on the frontend socket, and republishes its data on the backend socket (Figure 2-10).

Example 2-7. Weather update proxy (wuproxy.c)

//
// Weather proxy device
//
#include "zhelpers.h"int main(void) {void* context = zmq_ctx_new();// This is where the weather server sitsvoid* frontend = zmq_socket(context, ZMQ_XSUB);zmq_connect(frontend , "tcp://192.168.55.210:5556");// This is our public endpoint for subscribersvoid* backend = zmq_socket(context, ZMQ_XPUB);zmq_bind(backend, "tcp://10.1.1.0.8100");// Run the proxy until the user interrupts uszmq_proxy(frontend, backend, NULL);zmq_close(frontend);zmq_close(backend);zmq_ctx_destory(context);return 0;
}

It looks very similar to the earlier proxy example, but the key part is that the frontend and backend sockets are on two different networks. We can use this model for example to connect a multicast network (pgm transport) to a tcp publisher.

Handling Errors and ETERM

ZeroMQ’s error handling philosophy is a mix of fail-fast and resilience(恢复力). Processes, we believe, should be as vulnerable(易受伤害的) as possible to internal errors, and as robust as possible against external attacks and errors. To give an analogy, a living cell will self-destruct if it detects a single internal error, yet it will resist attack from the outside by all means possible.

Assertions, which pepper the ZeroMQ code, are absolutely vital(至关重要的) to robust code; they just have to be on the right side of the cellular wall. And there should be such a wall. If it is unclear whether a fault is internal or external, that is a design flaw to be fixed. In C/C++, assertions stop the application immediately with an error. In other languages, you may get exceptions or halts.

When ZeroMQ detects an external fault it returns an error to the calling code. In some rare cases, it drops messages silently if there is no obvious strategy for recovering from the error.

In most of the C examples we’ve seen so far there’s been no error handling. Real code should do error handling on every single ZeroMQ call. If you’re using a language binding other than C, the binding may handle errors for you. In C, you do need to do this yourself. There are some simple rules, starting with POSIX conventions(惯例,约定):

  • Methods that create objects return NULL if they fail.
  • Methods that process data may return the number of bytes processed, or -1 on an error or failure.
  • Other methods return 0 on success and -1 on an error or failure.
  • The error code is provided in errno or zmq_errno().
  • A descriptive error text for logging is provided by zmq_strerror().

For example:

void *context = zmq_ctx_new ();
assert (context);
void *socket = zmq_socket (context, ZMQ_REP);
assert (socket);
int rc = zmq_bind (socket, "tcp://*:5555");
if (rc == -1) {printf ("E: bind failed: %s\n", strerror (errno));return -1;
}

There are two main exceptional conditions that you should handle as nonfatal:

  • When your code receives a message with the ZMQ_DONTWAIT option and there is no waiting data, ZeroMQ will return -1 and set errno to EAGAIN.
  • When one thread calls zmq_ctx_destroy(), and other threads are still doing blocking work, the zmq_ctx_destroy() call closes the context and all blocking calls exit with -1, and errno set to ETERM.

Figure 19 - Parallel Pipeline with Kill Signaling

Let’s see how to shut down a process cleanly. We’ll take the parallel pipeline example from the previous section. If we’ve started a whole lot of workers in the background, we now want to kill them when the batch is finished. Let’s do this by sending a kill message to the workers. The best place to do this is the sink because it really knows when the batch is done.

How do we connect the sink to the workers? The PUSH/PULL sockets are one-way only. We could switch to another socket type, or we could mix multiple socket flows. Let’s try the latter: using a pub-sub model to send kill messages to the workers:

  • The sink creates a PUB socket on a new endpoint.
  • Workers connect their input socket to this endpoint.
  • When the sink detects the end of the batch, it sends a kill to its PUB socket.
  • When a worker detects this kill message, it exits.

It doesn’t take much new code in the sink:

void *controller = zmq_socket (context, ZMQ_PUB);
zmq_bind (controller, "tcp://*:5559");
...
//  Send kill signal to workers
s_send (controller, "KILL");

Here is the worker process, which manages two sockets (a PULL socket getting tasks, and a SUB socket getting control commands), using the zmq_poll() technique we saw earlier:

Example 2-8. Parallel task worker with kill signaling (taskwork2.c)

//
// Task worker - design 2
// Adds pub-sub flow to receive and respond to kill signal
//
#include "zhelpers.h"int main(void) {void *context = zmq_ctx_new();// Socket to receive messages onvoid* receiver = zmq_socket(context, ZMQ_PULL);zmq_connect(receiver, "tcp://localhost:5557");// Socket to send message tovoid* sender = zmq_socket(context, ZMQ_PUSH);zmq_connect(sender, "tcp://localhost:5558");// Socket for control inputvoid* controller = zmq_socket(context, ZMQ_SUB);zmq_connect(controller, "tcp://localhost:5559");zmq_setsockopt(controller, ZMQ_SUBSCRIBE, "", 0);// Process message from both socketswhile(1) {zmq_msg_t message;zmq_poll(items, 2, -1);if(items[0].revents & ZMQ_POLLIN) {zmq_msg_init(&message));zmq_msg_recv(&message, receiver, 0);// Do the works_sleep(atoi ((char *) zmq_msg_data(&message)));// Send results to sinkzmq_msg_init(&message);zmq_msg_send(&message, sender, 0);// Simple progress indicator for the viewerprintf(".");fflush(stdout);zmq_msg_close(&message); }if(item[1].revents & ZMQ_POLLIN)break;     // Exit loop}// Finishedzmq_close(receiver);zmq_close(sender);zmq_close(controller);zmq_ctx_destroy(context);return 0;
}

Example 2-9 shows the modified sink application. When it’s finished collecting results,
it broadcasts a kill message to all workers.

Example 2-9. Parallel task sink with kill signaling (tasksink2.c)

//
// Task sink - design 2
// Adds pub-sub flow to send kill signal to workers
//
#include "zhelpers.h"int main(void) {void* context = zmq_ctx_new();// Socket to receive message onvoid *receiver = zmq_socket(context, ZMQ_PULL);zmq_bind(receiver, "tcp://*:5558");// Socket to worker controlvoid* controller = zmq_socket(context, ZMQ_PUB);zmq_bind(controller, "tcp://*:5559");// Wait for start of batchchar* string = s_recv(receiver);free(string);// Start our clock nowint64_t start_time = s_clock();// process 100 confirmationsint task_nbr;for(task_nbr = 0; task_nbr < 100; task_nbr++) {char* string = s_recv(receiver);free(string);if((task_nbr / 10) * 10 == task_nbr)printf(":");elseprintf(".");fflush(stdout);}printf ("Total elapsed time: %d msec\n",(int) (s_clock () - start_time));// Send kill signal to workerss_send(controller, "KILL");// Finishedsleep(1);     // Give 0MQ time to deliverzmq_close(receiver);zmq_close(controller);zmq_ctx_destroy(context);return 0;
}

Handling Interrupt Signals

Multithreading with ZeroMQ

使用ZeroMQ书写多线程代码时,你应当遵循一些准则:

  • 分离线程私有数据且不在多线程中共享数据。唯一的例外是 ZeroMQ contexts,它是线程安全的。
  • 远离典型的并发机制,比如 mutexes,critical sections,semaphores,等。这是ZeroMQ应用的反面模式。
  • 在进程的开始时,创建一个ZeroMQ context,并向所有你想要通过inproc进行连接的线程传递。
  • 使用 attached 线程来创建你的应用中的结构,然后使用inproc中的PAIR套接字连接它们与它们的父线程。模式是:绑定父套接字,然后创建自线程区连接这个套接字。
  • 使用detached线程来模拟独立任务,用它们自己的contexts。在tcp之上连接它们。然后你可以将其移动到独立进程中,不需要对代码进行大量更改。
  • 所有线程中的交互都作为ZeroMQ消息发生,你可以或多或少的正式定义一些。
  • 不要在线程中共享ZeroMQ套接字。ZeroMQ套接字不是线程安全的。

Chapter2 - Sockets and Patterns相关推荐

  1. 艾伟_转载:C# Design Patterns (3) - Decorator

    Decorator Pattern (装饰模式) 装饰模式可「动态」地给一个对象添加一些额外的职责,提供有别于「继承」的另一种选择.就扩展功能而言,Decorator Pattern 透过 Aggre ...

  2. C# Design Patterns (3) - Decorator

    本帖介绍 Decorator Pattern (装饰模式),文章最后另提供 Decorator Pattern 的趣味四格漫画. ----------------------------------- ...

  3. [转] Leaving patterns practices

    [J.D. Meier's Blog]"Life is like skiing.  Just like skiing, the goal is not to get to the botto ...

  4. 视频中的运动特征--Learning Motion Patterns in Videos

    Learning Motion Patterns in Videos CVPR2017 Torch code: http://thoth.inrialpes.fr/research/mpnet 本文要 ...

  5. Chapter2 消息总线 ConfigClient配置自动刷新

    Chapter2 消息总线ConfigClient配置自动刷新 Spring Cloud Bus: Spring Cloud Bus提供了批量刷新配置的机制,它使用轻量级的消息代理(例如RabbitM ...

  6. Conventions and patterns for multi-platform development

    For Developers‎ > ‎Design Documents‎ > ‎ Conventions and patterns for multi-platform developme ...

  7. WHAT THE DATA SAYS ABOUT KUBERNETES DEPLOYMENT PATTERNS

    2019独角兽企业重金招聘Python工程师标准>>> WHAT THE DATA SAYS ABOUT KUBERNETES DEPLOYMENT PATTERNS The con ...

  8. HTML5权威指南--Web Storage,本地数据库,本地缓存API,Web Sockets API,Geolocation API(简要学习笔记二)...

    1.Web Storage HTML5除了Canvas元素之外,还有一个非常重要的功能那就是客户端本地保存数据的Web Storage功能. 以前都是用cookies保存用户名等简单信息. 但是coo ...

  9. 艾伟_转载:C# Design Patterns (4) - Proxy

    本帖介绍 Proxy Pattern (代理模式). Proxy Pattern (代理模式) The Proxy Pattern provides a surrogate or placeholde ...

  10. 缓存服务器syns to listen sockets drop导致创建socket失败

    问题描述: 最近遇到了一个syn丢包的情况,当系统磁盘.网络.cpu都无压力的时候,系统莫名其妙出现"sync to listen sockets drop"问题:无论带宽是10M ...

最新文章

  1. 《Swift开发实战》——第16章,第16.2节下标脚本用法
  2. Jquerymobile 简单安装
  3. 安卓最新系统_成纺移动校园(移动办公系统)V3.2.1 安卓最新版
  4. linux物理内存地址与iomem,一种Linux系统物理内存镜像文件分析方法_4
  5. Kubeflow使用Kubernetes进行机器学习GPU分布式训练
  6. 5行Python 代码就能让你的电脑永不息屏
  7. 江苏自考计算机组成原理多少分及格,自考《计算机组成原理》基本概念第七章...
  8. PJAX,站点加速之翼
  9. pandas dataframe遍历_Pandas循环提速7万多倍!Python数据分析攻略!
  10. MySQL服务 - 客户端工具mysql及mysqladmin使用介绍
  11. 笔记本怎样做无线打印服务器,自己的笔记本怎么连打印机_笔记本怎样无线连接打印机...
  12. dvi一分四_【1进4出DVI分配器】
  13. 保研计算机英语词汇,如何技巧背诵考研英语词汇
  14. 打通零售新渠道,实现线上线下业务一体化
  15. 小程序token有效期_微信小程序token过期时间后重新获取-微信小程序过期恢复步骤-微信小程序怎么找...
  16. 知识图谱构建软件Protege下载使用
  17. 星座运势,周公解梦流量主微信小程序源码下载
  18. ​小皮助手(电脑玩手机游戏) v1.0 官方版
  19. 前端开发(layui框架)
  20. 每个人的生活都不容易,请无论如何选择宽容和理解,善待身边的每一个人

热门文章

  1. 利用Google Drive將英文版的PDF翻译成中文版的PDF
  2. 字符串与整型的相互转换
  3. windows常用快捷键与快捷指令
  4. iPhone长截图快捷指令(科技兽修改版)
  5. C#通过WebBrowser对网页截图
  6. 太赞了!分享一个数据科学利器 PyCaret,几行代码搞定从数据处理到模型部署
  7. ndows 内存诊断工具,windows内存诊断工具有什么作用
  8. 点击链接时触发php文件,php点击链接直接下载文件写法
  9. 华为员工能拿多少钱,揭秘一个真实的华为
  10. java分析内存泄露工具_AIL-Framework下载-java内存泄露分析工具(Information Analysis Leaks)-东坡下载...