目录

1 ZeroMQ概述

2 ZeroMQ工作模式

2.1 请求-应答

2.2 发布-订阅

2.3 并行管道

3 参考


1 ZeroMQ概述

       ZeroMQ看起来想一个可嵌入的网络库,但其作用就像是一个并发框架。它为你提供了各种传输工具,如进程内,进程间,TCP和组播中进行原子消息传递的套接字。你可以使用各种模式实现N对N的套接字连接,这些模式包括发布订阅,请求应答,管道模式。它的速度足够快,因此可以充当集群产品的结构,他的异步IO模型提供了可扩展的多核应用程序,用异步消息来处理任务。

2 ZeroMQ工作模式

2.1 请求-应答

让我们从简单的代码开始,一段传统的Hello World程序。我们会创建一个客户端和一个服务端,客户端发送Hello给服务端,服务端返回World。下文是C语言编写的服务端,它在5555端口打开一个ZMQ套接字,等待请求,收到后应答World。

服务端:zmhwserver.cpp

#include <zmq.hpp>
#include <string>
#include <iostream>
#ifndef _WIN32
#include <unistd.h>
#else
#include <windows.h>#define sleep(n)    Sleep(n)
#endifint main () {//  Prepare our context and socketzmq::context_t context (1);zmq::socket_t socket (context, ZMQ_REP);socket.bind ("tcp://*:5555");while (true) {zmq::message_t request;//  Wait for next request from clientsocket.recv (&request);std::cout << "Received Hello" << std::endl;//  Do some 'work'sleep(1);//  Send reply back to clientzmq::message_t reply (5);memcpy (reply.data (), "World", 5);socket.send (reply);}return 0;
}
  • 编译:g++  zmhwserver.cpp -o zmhwserver  -lzmq
  • 运行:./zmhwserver

客户端:zmhwclient.cpp

#include <zmq.hpp>
#include <string>
#include <iostream>int main ()
{//  Prepare our context and socketzmq::context_t context (1);zmq::socket_t socket (context, ZMQ_REQ);std::cout << "Connecting to hello world server…" << std::endl;socket.connect ("tcp://localhost:5555");//  Do 10 requests, waiting each time for a responsefor (int request_nbr = 0; request_nbr != 10; request_nbr++) {zmq::message_t request (5);memcpy (request.data (), "Hello", 5);std::cout << "Sending Hello " << request_nbr << "…" << std::endl;socket.send (request);//  Get the reply.zmq::message_t reply;socket.recv (&reply);std::cout << "Received World " << request_nbr << std::endl;}return 0;
}
  • 编译:g++  zmhwclient.cpp -o zmhwclient  -lzmq
  • 运行:./zmhwclient

2.2 发布-订阅

“发布-订阅”模式下,“发布者”绑定一个指定的地址,例如“192.168.10.1:5500”,“订阅者”连接到该地址。该模式下消息流是单向的,只允许从“发布者”流向“订阅者”。且“发布者”只管发消息,不理会是否存在“订阅者”。上图只是“发布-订阅”的最基本的模型,一个“发布者”可以拥有多个订阅者,同样的,一个“订阅者”也可订阅多个发布者。

让我们看一个示例,该示例推出由邮政编码,温度和相对湿度组成的天气更新。就像真实的气象站一样,我们将生成随机值。

服务器应用程序:将为该应用程序使用端口5556:

#include <zmq.hpp>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>#if (defined (WIN32))
#include <zhelpers.hpp>
#endif#define within(num) (int) ((float) num * random () / (RAND_MAX + 1.0))int main () {//  Prepare our context and publisherzmq::context_t context (1);zmq::socket_t publisher (context, ZMQ_PUB);publisher.bind("tcp://*:5556");publisher.bind("ipc://weather.ipc");                // Not usable on Windows.//  Initialize random number generatorsrandom ((unsigned) time (NULL));while (1) {int zipcode, temperature, relhumidity;//  Get values that will fool the bosszipcode     = within (100000);temperature = within (215) - 80;relhumidity = within (50) + 10;//  Send message to all subscriberszmq::message_t message(20);snprintf ((char *) message.data(), 20 ,"%05d %d %d", zipcode, temperature, relhumidity);publisher.send(message);}return 0;
}

 客户端应用程序:它监听更新流并获取与指定邮政编码相关的任何内容,默认情况下是纽约市

#include <zmq.hpp>
#include <iostream>
#include <sstream>int main (int argc, char *argv[])
{zmq::context_t context (1);//  Socket to talk to serverstd::cout << "Collecting updates from weather server…\n" << std::endl;zmq::socket_t subscriber (context, ZMQ_SUB);subscriber.connect("tcp://localhost:5556");//  Subscribe to zipcode, default is NYC, 10001const char *filter = (argc > 1)? argv [1]: "10001 ";subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen (filter));//  Process 100 updatesint update_nbr;long total_temp = 0;for (update_nbr = 0; update_nbr < 100; update_nbr++) {zmq::message_t update;int zipcode, temperature, relhumidity;subscriber.recv(&update);std::istringstream iss(static_cast<char*>(update.data()));iss >> zipcode >> temperature >> relhumidity ;total_temp += temperature;}std::cout     << "Average temperature for zipcode '"<< filter<<"' was "<<(int) (total_temp / update_nbr) <<"F"<< std::endl;return 0;
}

需要注意的是,在使用SUB套接字时,必须使用zmq_setsockopt()方法来设置订阅的内容。如果你不设置订阅内容,那将什么消息都收不到,新手很容易犯这个错误。订阅信息可以是任何字符串,可以设置多次。只要消息满足其中一条订阅信息,SUB套接字就会收到。订阅者可以选择不接收某类消息,也是通过zmq_setsockopt()方法实现的。

PUB-SUB套接字组合是异步的。客户端在一个循环体中使用zmq_recv()接收消息,如果向SUB套接字发送消息则会报错;类似地,服务端可以不断地使用zmq_send()发送消息,但不能在PUB套接字上使用zmq_recv()。

关于PUB-SUB套接字,还有一点需要注意:你无法得知SUB是何时开始接收消息的。就算你先打开了SUB套接字,后打开PUB发送消息,这时SUB还是会丢失一些消息的,因为建立连接是需要一些时间的。很少,但并不是零。

关于发布-订阅模式的几点说明:

  • 订阅者可以连接多个发布者,轮流接收消息;
  • 如果发布者没有订阅者与之相连,那它发送的消息将直接被丢弃;
  • 如果你使用TCP协议,那当订阅者处理速度过慢时,消息会在发布者处堆积。以后我们会讨论如何使用阈值(HWM)来保护发布者。
  • 在目前版本的ZMQ中,消息的过滤是在订阅者处进行的。也就是说,发布者会向订阅者发送所有的消息,订阅者会将未订阅的消息丢弃。

2.3 并行管道

下面一个示例程序中,我们将使用ZMQ进行超级计算,也就是并行处理模型:

  • 任务分发器会生成大量可以并行计算的任务;
  • 有一组worker会处理这些任务;
  • 结果收集器会在末端接收所有worker的处理结果,进行汇总。

现实中,worker可能散落在不同的计算机中,利用GPU(图像处理单元)进行复杂计算。下面是任务分发器的代码,它会生成100个任务,任务内容是让收到的worker延迟若干毫秒。

taskvent: Parallel task ventilator in C++

#include <zmq.hpp>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <iostream>#define within(num) (int) ((float) num * random () / (RAND_MAX + 1.0))int main (int argc, char *argv[])
{zmq::context_t context (1);//  Socket to send messages onzmq::socket_t  sender(context, ZMQ_PUSH);sender.bind("tcp://*:5557");std::cout << "Press Enter when the workers are ready: " << std::endl;getchar ();std::cout << "Sending tasks to workers…\n" << std::endl;//  The first message is "0" and signals start of batchzmq::socket_t sink(context, ZMQ_PUSH);sink.connect("tcp://localhost:5558");zmq::message_t message(2);memcpy(message.data(), "0", 1);sink.send(message);//  Initialize random number generatorsrandom ((unsigned) time (NULL));//  Send 100 tasksint task_nbr;int total_msec = 0;     //  Total expected cost in msecsfor (task_nbr = 0; task_nbr < 100; task_nbr++) {int workload;//  Random workload from 1 to 100msecsworkload = within (100) + 1;total_msec += workload;message.rebuild(10);memset(message.data(), '\0', 10);sprintf ((char *) message.data(), "%d", workload);sender.send(message);}std::cout << "Total expected cost: " << total_msec << " msec" << std::endl;sleep (1);              //  Give 0MQ time to deliverreturn 0;
}

下面是worker的代码,它接受信息并延迟指定的毫秒数,并发送执行完毕的信号:

taskwork: Parallel task worker in C++

#include "zhelpers.hpp"
#include <string>int main (int argc, char *argv[])
{zmq::context_t context(1);//  Socket to receive messages onzmq::socket_t receiver(context, ZMQ_PULL);receiver.connect("tcp://localhost:5557");//  Socket to send messages tozmq::socket_t sender(context, ZMQ_PUSH);sender.connect("tcp://localhost:5558");//  Process tasks foreverwhile (1) {zmq::message_t message;int workload;           //  Workload in msecsreceiver.recv(&message);std::string smessage(static_cast<char*>(message.data()), message.size());std::istringstream iss(smessage);iss >> workload;//  Do the works_sleep(workload);//  Send results to sinkmessage.rebuild();sender.send(message);//  Simple progress indicator for the viewerstd::cout << "." << std::flush;}return 0;
}

下面是结果收集器的代码。它会收集100个处理结果,并计算总的执行时间,让我们由此判别任务是否是并行计算的。

tasksink: Parallel task sink in C++

#include <zmq.hpp>
#include <time.h>
#include <sys/time.h>
#include <iostream>int main (int argc, char *argv[])
{//  Prepare our context and socketzmq::context_t context(1);zmq::socket_t receiver(context,ZMQ_PULL);receiver.bind("tcp://*:5558");//  Wait for start of batchzmq::message_t message;receiver.recv(&message);//  Start our clock nowstruct timeval tstart;gettimeofday (&tstart, NULL);//  Process 100 confirmationsint task_nbr;int total_msec = 0;     //  Total calculated cost in msecsfor (task_nbr = 0; task_nbr < 100; task_nbr++) {receiver.recv(&message);if ((task_nbr / 10) * 10 == task_nbr)std::cout << ":" << std::flush;elsestd::cout << "." << std::flush;}//  Calculate and report duration of batchstruct timeval tend, tdiff;gettimeofday (&tend, NULL);if (tend.tv_usec < tstart.tv_usec) {tdiff.tv_sec = tend.tv_sec - tstart.tv_sec - 1;tdiff.tv_usec = 1000000 + tend.tv_usec - tstart.tv_usec;}else {tdiff.tv_sec = tend.tv_sec - tstart.tv_sec;tdiff.tv_usec = tend.tv_usec - tstart.tv_usec;}total_msec = tdiff.tv_sec * 1000 + tdiff.tv_usec / 1000;std::cout << "\nTotal elapsed time: " << total_msec << " msec\n" << std::endl;return 0;
}

一组任务的平均执行时间在5秒左右,以下是分别开始1个、2个、4个worker时的执行结果:

#   1 worker
Total elapsed time: 5034 msec
#   2 workers
Total elapsed time: 2421 msec
#   4 workers
Total elapsed time: 1018 msec

关于这段代码的几个细节:

  • worker上游和任务分发器相连,下游和结果收集器相连,这就意味着你可以开启任意多个worker。但若worker是绑定至端点的,而非连接至端点,那我们就需要准备更多的端点,并配置任务分发器和结果收集器。所以说,任务分发器和结果收集器是这个网络结构中较为稳定的部分,因此应该由它们绑定至端点,而非worker,因为它们较为动态。

  • 我们需要做一些同步的工作,等待worker全部启动之后再分发任务。这点在ZMQ中很重要,且不易解决。连接套接字的动作会耗费一定的时间,因此当第一个worker连接成功时,它会一下收到很多任务。所以说,如果我们不进行同步,那这些任务根本就不会被并行地执行。你可以自己试验一下。

  • 任务分发器使用PUSH套接字向worker均匀地分发任务(假设所有的worker都已经连接上了),这种机制称为_负载均衡_,以后我们会见得更多。

  • 结果收集器的PULL套接字会均匀地从worker处收集消息,这种机制称为_公平队列_:

管道模式也会出现慢连接的情况,让人误以为PUSH套接字没有进行负载均衡。如果你的程序中某个worker接收到了更多的请求,那是因为它的PULL套接字连接得比较快,从而在别的worker连接之前获取了额外的消息。

3 参考

1、zmq指南

2、zmq官方教程

消息队列(MQ):ZeroMQ基本原理相关推荐

  1. 消息队列的使用场景_消息队列MQ的特点、选型及应用场景

    一.什么是消息队列 消息队列(Message Queue,简称MQ),指保存消息的一个容器,本质是个队列. 消息(Message)是指在应用之间传送的数据,消息可以非常简单,比如只包含文本字符串,也可 ...

  2. 消息中间件系列(四):消息队列MQ的特点、选型、及应用场景详解

    前面集中谈了分布式缓存Redis系列: 高并发架构系列:分布式锁的由来.特点.及Redis分布式锁的实现详解 高并发架构系列:Redis并发竞争key的解决方案详解 高并发架构系列:Redis缓存和M ...

  3. 消息队列MQ夺命连环11问:kafka、rabbitmq、rocketmq、activemq

    <消息队列MQ如何保证消息的幂等性> <RabbitMQ架构> <ZeroMQ简介:一种高性能的异步消息传递库> <Rocketmq原理&最佳实践&g ...

  4. 消息队列MQ与微消息队列MQTT

    文章目录 参考文章 什么是消息队列,什么是RPC 为什么要使用MQ消息队列 1. 解耦(可用性) 2. 流量削峰 3. 数据分发 消息队列的缺点 多种主流传统消息队列MQ对比 传统消息队列Rocket ...

  5. java队列_RPC远程调用和消息队列MQ的区别

    RPC和MQ同样都是用于分布式系统的两个很重要的技术,都有服务提供者.消费者的概念,可在一定程度上对系统进行解耦.但两者之间还是有区别的,本篇简单介绍~ 一.RPC RPC(Remote Proced ...

  6. 后端技术:消息队列MQ/JMS/Kafka相关知识介绍

    ?今天给大家分享消息队列MQ/JMS/Kafka相关知识介绍 1.消息队列介绍 首先举个收快递的栗子,传统的收快递,快递小哥把我们的快递送到我们的手里.他需要什么条件嗯? 快递小哥有时间送, 我们有时 ...

  7. 详解RPC远程调用和消息队列MQ的区别

    谈到分布式架构,就不得不谈到分布式架构的基石RPC. 什么是RPC RPC(Remote Procedure Call)远程过程调用,主要解决远程通信间的问题,不需要了解底层网络的通信机制. RPC服 ...

  8. 消息队列和ZeroMQ原理和应用

    一.定义 消息队列(message queue)本质就是个队列,先进先出FIFO. 利用FIFO先进先出的特性,可以保证消息的顺序性. 主要用途:不同服务server.进程process.线程thre ...

  9. 阿里云消息队列MQ学习—阿里云大学视频课

    在刷ACE题的过程中,感觉对于消息队列部分的理解不是很深刻,这里来学习一下. 例行还是先走一遍阿里云大学的一些视频课程扫扫盲,选择如下课程: 阿里消息队列MQ简介:阿里巴巴中间件技术部自主研发的专业消 ...

  10. 消息队列MQ 之 Kafka

    目录 前言 一.消息队列 MQ 为什么需要消息队列(MQ) 使用消息队列的好处 消息队列的两种模式 二.Kafka 概述 Kafka 简介 Kafka 的特性 三 实验 前言 一.消息队列 MQ MQ ...

最新文章

  1. [转载]C++ 面试
  2. VLC播放器如何录制rtsp流生成视频文件?
  3. OpenCV坎尼探测器Canny Detector的实例(附完整代码)
  4. php直销二叉树,PHP二叉树递归算法
  5. 电脑是否存在内存泄漏_STM32裸机内存管理解析
  6. php数组添加省会城市,【JSON数据】中国各省份省会城市经纬度 JSON
  7. 网站攻击软件_如何防止网站建设中出现安全问题?
  8. mysql 空闲几分钟速度变慢,MYSQL 运作一小段时间后,速度变得奇慢。而CPU基本空闲状态...
  9. 用博弈论的思想玩游戏(洛谷P3150题题解,Java语言描述)
  10. InnoSQL/MySQL并行复制的实现与配置
  11. Java Web应用小案例:查询城市天气信息
  12. mysql 编码种类_MySQL 编码
  13. bzoj4850 [JSOI2016]灯塔
  14. mysql 行级锁 where_mysql 行级锁的使用以及死锁的预防
  15. mysql cluster推荐配置
  16. H264编码质量屏幕截图
  17. 数据导入时出现的问题:
  18. 工厂供电技术实训设备QY-PGD19
  19. Hexo文章中图片点击实现全屏查看
  20. ESP8266制作创意时钟(DS1302+TM1637)

热门文章

  1. MySQL数据库操作指令
  2. app 服务器 运营 维护,app服务器维护
  3. Android数据的四种存储方式
  4. 我的RTOS 之二 --Threadx在skyeye上仿真測试(基于2410)
  5. 修改Eclipse的WorkSpace保持数[转载]
  6. DataBindings的用法
  7. 在Web.Config中指定页面的基类
  8. 建议简书评论区升级筛选/排序功能
  9. 基于GDAL的一个通用的3×3模板函数
  10. GDAL源码剖析(八)之编译GEOS和PROJ4库