消息队列(MQ):ZeroMQ基本原理
目录
1 ZeroMQ概述
2 ZeroMQ工作模式
2.1 请求-应答
2.2 发布-订阅
2.3 并行管道
3 参考
1 ZeroMQ概述
ZeroMQ看起来想一个可嵌入的网络库,但其作用就像是一个并发框架。它为你提供了各种传输工具,如进程内,进程间,TCP和组播中进行原子消息传递的套接字。你可以使用各种模式实现N对N的套接字连接,这些模式包括发布订阅,请求应答,管道模式。它的速度足够快,因此可以充当集群产品的结构,他的异步IO模型提供了可扩展的多核应用程序,用异步消息来处理任务。
2 ZeroMQ工作模式
2.1 请求-应答
让我们从简单的代码开始,一段传统的Hello World程序。我们会创建一个客户端和一个服务端,客户端发送Hello给服务端,服务端返回World。下文是C语言编写的服务端,它在5555端口打开一个ZMQ套接字,等待请求,收到后应答World。
服务端:zmhwserver.cpp
#include <zmq.hpp>
#include <string>
#include <iostream>
#ifndef _WIN32
#include <unistd.h>
#else
#include <windows.h>#define sleep(n) Sleep(n)
#endifint main () {// Prepare our context and socketzmq::context_t context (1);zmq::socket_t socket (context, ZMQ_REP);socket.bind ("tcp://*:5555");while (true) {zmq::message_t request;// Wait for next request from clientsocket.recv (&request);std::cout << "Received Hello" << std::endl;// Do some 'work'sleep(1);// Send reply back to clientzmq::message_t reply (5);memcpy (reply.data (), "World", 5);socket.send (reply);}return 0;
}
- 编译:g++ zmhwserver.cpp -o zmhwserver -lzmq
- 运行:./zmhwserver
客户端:zmhwclient.cpp
#include <zmq.hpp>
#include <string>
#include <iostream>int main ()
{// Prepare our context and socketzmq::context_t context (1);zmq::socket_t socket (context, ZMQ_REQ);std::cout << "Connecting to hello world server…" << std::endl;socket.connect ("tcp://localhost:5555");// Do 10 requests, waiting each time for a responsefor (int request_nbr = 0; request_nbr != 10; request_nbr++) {zmq::message_t request (5);memcpy (request.data (), "Hello", 5);std::cout << "Sending Hello " << request_nbr << "…" << std::endl;socket.send (request);// Get the reply.zmq::message_t reply;socket.recv (&reply);std::cout << "Received World " << request_nbr << std::endl;}return 0;
}
- 编译:g++ zmhwclient.cpp -o zmhwclient -lzmq
- 运行:./zmhwclient
2.2 发布-订阅
“发布-订阅”模式下,“发布者”绑定一个指定的地址,例如“192.168.10.1:5500”,“订阅者”连接到该地址。该模式下消息流是单向的,只允许从“发布者”流向“订阅者”。且“发布者”只管发消息,不理会是否存在“订阅者”。上图只是“发布-订阅”的最基本的模型,一个“发布者”可以拥有多个订阅者,同样的,一个“订阅者”也可订阅多个发布者。
让我们看一个示例,该示例推出由邮政编码,温度和相对湿度组成的天气更新。就像真实的气象站一样,我们将生成随机值。
服务器应用程序:将为该应用程序使用端口5556:
#include <zmq.hpp>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>#if (defined (WIN32))
#include <zhelpers.hpp>
#endif#define within(num) (int) ((float) num * random () / (RAND_MAX + 1.0))int main () {// Prepare our context and publisherzmq::context_t context (1);zmq::socket_t publisher (context, ZMQ_PUB);publisher.bind("tcp://*:5556");publisher.bind("ipc://weather.ipc"); // Not usable on Windows.// Initialize random number generatorsrandom ((unsigned) time (NULL));while (1) {int zipcode, temperature, relhumidity;// Get values that will fool the bosszipcode = within (100000);temperature = within (215) - 80;relhumidity = within (50) + 10;// Send message to all subscriberszmq::message_t message(20);snprintf ((char *) message.data(), 20 ,"%05d %d %d", zipcode, temperature, relhumidity);publisher.send(message);}return 0;
}
客户端应用程序:它监听更新流并获取与指定邮政编码相关的任何内容,默认情况下是纽约市
#include <zmq.hpp>
#include <iostream>
#include <sstream>int main (int argc, char *argv[])
{zmq::context_t context (1);// Socket to talk to serverstd::cout << "Collecting updates from weather server…\n" << std::endl;zmq::socket_t subscriber (context, ZMQ_SUB);subscriber.connect("tcp://localhost:5556");// Subscribe to zipcode, default is NYC, 10001const char *filter = (argc > 1)? argv [1]: "10001 ";subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen (filter));// Process 100 updatesint update_nbr;long total_temp = 0;for (update_nbr = 0; update_nbr < 100; update_nbr++) {zmq::message_t update;int zipcode, temperature, relhumidity;subscriber.recv(&update);std::istringstream iss(static_cast<char*>(update.data()));iss >> zipcode >> temperature >> relhumidity ;total_temp += temperature;}std::cout << "Average temperature for zipcode '"<< filter<<"' was "<<(int) (total_temp / update_nbr) <<"F"<< std::endl;return 0;
}
需要注意的是,在使用SUB套接字时,必须使用zmq_setsockopt()方法来设置订阅的内容。如果你不设置订阅内容,那将什么消息都收不到,新手很容易犯这个错误。订阅信息可以是任何字符串,可以设置多次。只要消息满足其中一条订阅信息,SUB套接字就会收到。订阅者可以选择不接收某类消息,也是通过zmq_setsockopt()方法实现的。
PUB-SUB套接字组合是异步的。客户端在一个循环体中使用zmq_recv()接收消息,如果向SUB套接字发送消息则会报错;类似地,服务端可以不断地使用zmq_send()发送消息,但不能在PUB套接字上使用zmq_recv()。
关于PUB-SUB套接字,还有一点需要注意:你无法得知SUB是何时开始接收消息的。就算你先打开了SUB套接字,后打开PUB发送消息,这时SUB还是会丢失一些消息的,因为建立连接是需要一些时间的。很少,但并不是零。
关于发布-订阅模式的几点说明:
- 订阅者可以连接多个发布者,轮流接收消息;
- 如果发布者没有订阅者与之相连,那它发送的消息将直接被丢弃;
- 如果你使用TCP协议,那当订阅者处理速度过慢时,消息会在发布者处堆积。以后我们会讨论如何使用阈值(HWM)来保护发布者。
- 在目前版本的ZMQ中,消息的过滤是在订阅者处进行的。也就是说,发布者会向订阅者发送所有的消息,订阅者会将未订阅的消息丢弃。
2.3 并行管道
下面一个示例程序中,我们将使用ZMQ进行超级计算,也就是并行处理模型:
- 任务分发器会生成大量可以并行计算的任务;
- 有一组worker会处理这些任务;
- 结果收集器会在末端接收所有worker的处理结果,进行汇总。
现实中,worker可能散落在不同的计算机中,利用GPU(图像处理单元)进行复杂计算。下面是任务分发器的代码,它会生成100个任务,任务内容是让收到的worker延迟若干毫秒。
taskvent: Parallel task ventilator in C++
#include <zmq.hpp>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <iostream>#define within(num) (int) ((float) num * random () / (RAND_MAX + 1.0))int main (int argc, char *argv[])
{zmq::context_t context (1);// Socket to send messages onzmq::socket_t sender(context, ZMQ_PUSH);sender.bind("tcp://*:5557");std::cout << "Press Enter when the workers are ready: " << std::endl;getchar ();std::cout << "Sending tasks to workers…\n" << std::endl;// The first message is "0" and signals start of batchzmq::socket_t sink(context, ZMQ_PUSH);sink.connect("tcp://localhost:5558");zmq::message_t message(2);memcpy(message.data(), "0", 1);sink.send(message);// Initialize random number generatorsrandom ((unsigned) time (NULL));// Send 100 tasksint task_nbr;int total_msec = 0; // Total expected cost in msecsfor (task_nbr = 0; task_nbr < 100; task_nbr++) {int workload;// Random workload from 1 to 100msecsworkload = within (100) + 1;total_msec += workload;message.rebuild(10);memset(message.data(), '\0', 10);sprintf ((char *) message.data(), "%d", workload);sender.send(message);}std::cout << "Total expected cost: " << total_msec << " msec" << std::endl;sleep (1); // Give 0MQ time to deliverreturn 0;
}
下面是worker的代码,它接受信息并延迟指定的毫秒数,并发送执行完毕的信号:
taskwork: Parallel task worker in C++
#include "zhelpers.hpp"
#include <string>int main (int argc, char *argv[])
{zmq::context_t context(1);// Socket to receive messages onzmq::socket_t receiver(context, ZMQ_PULL);receiver.connect("tcp://localhost:5557");// Socket to send messages tozmq::socket_t sender(context, ZMQ_PUSH);sender.connect("tcp://localhost:5558");// Process tasks foreverwhile (1) {zmq::message_t message;int workload; // Workload in msecsreceiver.recv(&message);std::string smessage(static_cast<char*>(message.data()), message.size());std::istringstream iss(smessage);iss >> workload;// Do the works_sleep(workload);// Send results to sinkmessage.rebuild();sender.send(message);// Simple progress indicator for the viewerstd::cout << "." << std::flush;}return 0;
}
下面是结果收集器的代码。它会收集100个处理结果,并计算总的执行时间,让我们由此判别任务是否是并行计算的。
tasksink: Parallel task sink in C++
#include <zmq.hpp>
#include <time.h>
#include <sys/time.h>
#include <iostream>int main (int argc, char *argv[])
{// Prepare our context and socketzmq::context_t context(1);zmq::socket_t receiver(context,ZMQ_PULL);receiver.bind("tcp://*:5558");// Wait for start of batchzmq::message_t message;receiver.recv(&message);// Start our clock nowstruct timeval tstart;gettimeofday (&tstart, NULL);// Process 100 confirmationsint task_nbr;int total_msec = 0; // Total calculated cost in msecsfor (task_nbr = 0; task_nbr < 100; task_nbr++) {receiver.recv(&message);if ((task_nbr / 10) * 10 == task_nbr)std::cout << ":" << std::flush;elsestd::cout << "." << std::flush;}// Calculate and report duration of batchstruct timeval tend, tdiff;gettimeofday (&tend, NULL);if (tend.tv_usec < tstart.tv_usec) {tdiff.tv_sec = tend.tv_sec - tstart.tv_sec - 1;tdiff.tv_usec = 1000000 + tend.tv_usec - tstart.tv_usec;}else {tdiff.tv_sec = tend.tv_sec - tstart.tv_sec;tdiff.tv_usec = tend.tv_usec - tstart.tv_usec;}total_msec = tdiff.tv_sec * 1000 + tdiff.tv_usec / 1000;std::cout << "\nTotal elapsed time: " << total_msec << " msec\n" << std::endl;return 0;
}
一组任务的平均执行时间在5秒左右,以下是分别开始1个、2个、4个worker时的执行结果:
# 1 worker
Total elapsed time: 5034 msec
# 2 workers
Total elapsed time: 2421 msec
# 4 workers
Total elapsed time: 1018 msec
关于这段代码的几个细节:
worker上游和任务分发器相连,下游和结果收集器相连,这就意味着你可以开启任意多个worker。但若worker是绑定至端点的,而非连接至端点,那我们就需要准备更多的端点,并配置任务分发器和结果收集器。所以说,任务分发器和结果收集器是这个网络结构中较为稳定的部分,因此应该由它们绑定至端点,而非worker,因为它们较为动态。
我们需要做一些同步的工作,等待worker全部启动之后再分发任务。这点在ZMQ中很重要,且不易解决。连接套接字的动作会耗费一定的时间,因此当第一个worker连接成功时,它会一下收到很多任务。所以说,如果我们不进行同步,那这些任务根本就不会被并行地执行。你可以自己试验一下。
任务分发器使用PUSH套接字向worker均匀地分发任务(假设所有的worker都已经连接上了),这种机制称为_负载均衡_,以后我们会见得更多。
结果收集器的PULL套接字会均匀地从worker处收集消息,这种机制称为_公平队列_:
管道模式也会出现慢连接的情况,让人误以为PUSH套接字没有进行负载均衡。如果你的程序中某个worker接收到了更多的请求,那是因为它的PULL套接字连接得比较快,从而在别的worker连接之前获取了额外的消息。
3 参考
1、zmq指南
2、zmq官方教程
消息队列(MQ):ZeroMQ基本原理相关推荐
- 消息队列的使用场景_消息队列MQ的特点、选型及应用场景
一.什么是消息队列 消息队列(Message Queue,简称MQ),指保存消息的一个容器,本质是个队列. 消息(Message)是指在应用之间传送的数据,消息可以非常简单,比如只包含文本字符串,也可 ...
- 消息中间件系列(四):消息队列MQ的特点、选型、及应用场景详解
前面集中谈了分布式缓存Redis系列: 高并发架构系列:分布式锁的由来.特点.及Redis分布式锁的实现详解 高并发架构系列:Redis并发竞争key的解决方案详解 高并发架构系列:Redis缓存和M ...
- 消息队列MQ夺命连环11问:kafka、rabbitmq、rocketmq、activemq
<消息队列MQ如何保证消息的幂等性> <RabbitMQ架构> <ZeroMQ简介:一种高性能的异步消息传递库> <Rocketmq原理&最佳实践&g ...
- 消息队列MQ与微消息队列MQTT
文章目录 参考文章 什么是消息队列,什么是RPC 为什么要使用MQ消息队列 1. 解耦(可用性) 2. 流量削峰 3. 数据分发 消息队列的缺点 多种主流传统消息队列MQ对比 传统消息队列Rocket ...
- java队列_RPC远程调用和消息队列MQ的区别
RPC和MQ同样都是用于分布式系统的两个很重要的技术,都有服务提供者.消费者的概念,可在一定程度上对系统进行解耦.但两者之间还是有区别的,本篇简单介绍~ 一.RPC RPC(Remote Proced ...
- 后端技术:消息队列MQ/JMS/Kafka相关知识介绍
?今天给大家分享消息队列MQ/JMS/Kafka相关知识介绍 1.消息队列介绍 首先举个收快递的栗子,传统的收快递,快递小哥把我们的快递送到我们的手里.他需要什么条件嗯? 快递小哥有时间送, 我们有时 ...
- 详解RPC远程调用和消息队列MQ的区别
谈到分布式架构,就不得不谈到分布式架构的基石RPC. 什么是RPC RPC(Remote Procedure Call)远程过程调用,主要解决远程通信间的问题,不需要了解底层网络的通信机制. RPC服 ...
- 消息队列和ZeroMQ原理和应用
一.定义 消息队列(message queue)本质就是个队列,先进先出FIFO. 利用FIFO先进先出的特性,可以保证消息的顺序性. 主要用途:不同服务server.进程process.线程thre ...
- 阿里云消息队列MQ学习—阿里云大学视频课
在刷ACE题的过程中,感觉对于消息队列部分的理解不是很深刻,这里来学习一下. 例行还是先走一遍阿里云大学的一些视频课程扫扫盲,选择如下课程: 阿里消息队列MQ简介:阿里巴巴中间件技术部自主研发的专业消 ...
- 消息队列MQ 之 Kafka
目录 前言 一.消息队列 MQ 为什么需要消息队列(MQ) 使用消息队列的好处 消息队列的两种模式 二.Kafka 概述 Kafka 简介 Kafka 的特性 三 实验 前言 一.消息队列 MQ MQ ...
最新文章
- [转载]C++ 面试
- VLC播放器如何录制rtsp流生成视频文件?
- OpenCV坎尼探测器Canny Detector的实例(附完整代码)
- php直销二叉树,PHP二叉树递归算法
- 电脑是否存在内存泄漏_STM32裸机内存管理解析
- php数组添加省会城市,【JSON数据】中国各省份省会城市经纬度 JSON
- 网站攻击软件_如何防止网站建设中出现安全问题?
- mysql 空闲几分钟速度变慢,MYSQL 运作一小段时间后,速度变得奇慢。而CPU基本空闲状态...
- 用博弈论的思想玩游戏(洛谷P3150题题解,Java语言描述)
- InnoSQL/MySQL并行复制的实现与配置
- Java Web应用小案例:查询城市天气信息
- mysql 编码种类_MySQL 编码
- bzoj4850 [JSOI2016]灯塔
- mysql 行级锁 where_mysql 行级锁的使用以及死锁的预防
- mysql cluster推荐配置
- H264编码质量屏幕截图
- 数据导入时出现的问题:
- 工厂供电技术实训设备QY-PGD19
- Hexo文章中图片点击实现全屏查看
- ESP8266制作创意时钟(DS1302+TM1637)