zmq pub/sub使用详解
关于zmq的基本简介,请参考ZeroMQ基础入门。
pub/sub模式介绍
发布/订阅模式,全称为Publish/Subscribe,支持多个发布者/多订阅者,使用在消息单向传输的应用场景,消息总是从发布者发送到订阅者。
一般的使用流程为:
pub端:
- 创建context
- 创建socket,设置ZMQ_PUB模式
- bind端口
- 循环发布消息send
socket特性:
特性 | 值 |
---|---|
兼容的对端套接字 | ZMQ_SUB |
方向性 | 单向 |
发送/接收模式 | 仅发送 |
进入路由策略 | 无 |
流出路由策略 | 扇出(呈扇形发出) |
ZMQ_HWM 选项行为 | 丢弃 |
sub端:
- 创建context
- 创建socket,设置ZMQ_SUB模式
- connect到pub端
- setsockopt设置ZMQ_SUBSCRIBE订阅的消息
- 循环接收recv
特性 | 值 |
---|---|
兼容的对端套接字 | ZMQ_PUB |
方向性 | 单向 |
发送/接收模式 | 仅接收 |
进入路由策略 | 平等排队 |
流出路由策略 | 无 |
ZMQ_HWM 选项行为 | 丢弃 |
注意事项
- pub端socket不能使用recv函数,同样,sub端不能使用send函数
- 当pub端由于到达了高水位而使ZMQ_PUB套接字进入静默模式的时候,所有发送到这个有问题的订阅者的消息都会被丢弃,直到静默模式终止
- pub端socket的zmq_send()函数永远不会阻塞
- sub端刚创建socket后是无法订阅到任何消息的,必须使用setsockopt设置订阅的消息后才能接收到
- sub端是根据参数前缀进行过滤的。订阅的内容以pub端发出的内容从头开始匹配为过滤条件,完全匹配订阅内容的消息会被sub接收,如订阅内容为
"a"
时,所有以a开头的消息都会接收 - 当订阅内容为
""
,长度为0时为订阅所有内容,因为所有消息都匹配成功 - 如果存在某个pub没有被任何sub连接,则该pub会丢弃所有的消息
- 如果采用tcp的连接方式,sub很慢时,消息将会堆积在pub,可以通过设置ZMQ_HWM选项来保护发布者,对于重要的消息,也可以写入硬盘等待发送
- pub和sub谁bind谁connect并无严格要求(虽本质并无区别),但仍建议pub使用bind,sub使用connect,在实际测试中,使用sub绑定而pub connect时,sub端无法接收到消息
- 一个显著的问题是,“slow joiner”可能导致发布者的第一笔消息总是丢失,下文会进一步说明该问题
- 一个订阅者(subcriber)可以链接超过一个发布者(publisher)。数据到达后将交叉存取(公平队列),以保证发布者之间的数据不会淹没
- 从ZeroMQ v3.x版本开始,使用tcp://或者ipc://协议连接时会在发布者进行消息过滤,使用epgm://协议仍在订阅者过滤;在ZeroMQ v2.x,所有消息过滤都发生在订阅者
关于slow joiner问题
一般描述为,即使是先启动订阅者,再启动发布者,订阅者也有可能会丢失前一部分数据。你无法得知SUB是何时开始接收消息的。
这是因为订阅者向发布者建立连接也是耗费时间的,虽然时间极短,但不为0。这个时间内发布者发布的内容将没有订阅者能够接收。
几种处理方式:
- 不处理
对于可以容忍数据丢失的应用来说,不必理会丢失的数据。比如接收天气信息的应用,你可能只关注最新的天气信息。
- 使用延时发布策略
因为数据丢失的原因是发布者在没有稳定接收者的情况下仍然发送了数据,所以可以让发布者等待一段时间再发送数据。
最简单的方式是sleep一段时间。
缺点是:
- 不知道sleep多久合适
- 即使sleep也不能保证sub者一定就准备好了
- 在程序中加sleep不是一种优雅的设计
- 通知发布者模式
发布者可以在确保订阅者已经启动成功的情况下再发送数据,只需要订阅者在准备好后通知发布者。
这个不难实现,订阅者在准备好后,首先使用req/rep模式向发布者发送一个特定的请求,发布者接收请求并应答,然后再发布真正的数据。
这种方式增加了简单一步操作,但保证了数据的完整。
多线程发送问题
注意:context是线程安全的,但socket非线程安全。在多个线程中使用同一个socket会导致程序崩溃(不提倡使用锁,它会导致竞争并减慢性能,不符合zmq的设计理念)。
- 使用proxy
一种比较常见的场景是,发布者使用多个线程来发布不同的数据,所有的数据通过一个endpoint发送。
这与动态发现问题类似,对于一个应用场景,可能会随时增加发布者或者订阅者,构建一个合适的系统可以减小编码和出错的机会。
如图所示,使用一个proxy可以轻松解决这个问题。
- 增加了中间层。中间层是静态的,它不会经常变动
- pub可以随意增减,它们都connect到xsub
- sub也可以随意增减,它们都connect到xpub
- xsub收到的所有数据会通过proxy转发到xpub,从而发布到各sub
对于这种情况,可以在多个发布线程中分别创建socket,connect到xsub,从而避免多线程共用socket。
这种情况下,当发布线程较多时,会导致socket堆积,最终导致系统文件描述符过多而失败。可以使用线程池,每个线程使用自己的socket。
还有一种使用代理的情况是:
这里proxy类似于一个桥,连接了两个不同的网络。也可以作为协议网关,用于连接两个使用不同协议的网络。
结合天气服务,实现一个Proxy的例程如下:
//
// Weather proxy device C++
//
// Olivier Chamoux <olivier.chamoux@fr.thalesgroup.com>
//#include "zhelpers.hpp"int main (int argc, char *argv[])
{zmq::context_t context(1);// This is where the weather server sitszmq::socket_t frontend(context, ZMQ_SUB);frontend.connect("tcp://192.168.55.210:5556");// This is our public endpoint for subscriberszmq::socket_t backend (context, ZMQ_PUB);backend.bind("tcp://10.1.1.0:8100");// Subscribe on everythingfrontend.setsockopt(ZMQ_SUBSCRIBE, "", 0);// Shunt messages out to our own subscriberswhile (1) {while (1) {zmq::message_t message;int more;size_t more_size = sizeof (more);// Process all parts of the messagefrontend.recv(&message);frontend.getsockopt( ZMQ_RCVMORE, &more, &more_size);backend.send(message, more? ZMQ_SNDMORE: 0);if (!more)break; // Last message part}}return 0;
}
- 使用push/pull
使用push/pull建立一个发布端模型,pull接收到所有数据通过proxy转到pub,再发送出去。
这种方式是使用xpub/xsub的变体。因为实际使用中,在单进程内使用sub绑定,pub连接的方式无法接收到数据,而push/pull是一个可行的替代。
启动代理:
// 代码片断
void StartPubProxy(string& port)
{try{// 面向client的socket,多线程发来所有数据zmq::socket_t frontend(m_ctx, ZMQ_PULL);frontend.bind("inproc://#0");// 面向services的socket,提供对外端口,并实际发布数据zmq::socket_t backend(m_ctx, ZMQ_PUB);string strBind = "tcp://*:" + port;backend.bind(strBind);// 创建proxyzmq::proxy(static_cast<void*>(frontend), static_cast<void*>(backend), nullptr);}catch(const std::exception& e){std::cerr <<"zmq启动转发代理失败:" << e.what() << '\n';}
}
原发布线程向"inproc://#0"
push数据即可。
- 使用boost asio单线程处理数据
在有些情况下,加锁的多线程未必比无锁单线程更快。
对于多线程发布模型来说,可以把它们要发送的数据通过strand.post到io_service的单线程队列里,由工作线程异步处理。
这种方式简单方便,关于使用asio创建线程的使用可以参考:boost::asio::io_service创建线程池简单实例。
保护发布者
前方讲述,当sub端处理较慢时,pub端在到达高水位线后会丢弃数据。对于重要的应用,由于不能对sub端的性能作出任何假设,所以需要一定的策略来保证。
ZMQ_HWM
ZMQ_SNDHWM:对向外发送的消息设置高水位(最大缓存量),ZMQ_RCVHWM:对进入socket的消息设置高水位。
ZMQ_SNDHWM属性将会设置socket参数指定的socket对外发送的消息的高水位。高水位是一个硬限制,它会限制每一个与此socket相连的在内存中排队的未处理的消息数目的最大值。
0值代表着没有限制,默认值为1000,就在bind/connect之前设置该属性。如果设置为无限,可能会导致发布者崩溃。
如果已经到达了规定的限制,socket就需要进入一种异常的状态,表现形式因socket类型而异。socket会进行适当的调节,比如阻塞或者丢弃已发送的消息。
总是给套接字设置一个基于期望的订阅方数量的最大值,你打算用于队列的内存的数量,和一个消息平均大小的高水位线。例如,如果你希望有5000个订阅方,有1G的内存可有,消息平均200字节,那么一个安全的高水位线应该是(1000000000/200/5000)=1000.
参考资料
?MQ - The Guide
Unable to receive messages when binding subscriber socket
What is a simple example of a working XSUB / XPUB proxy in zeromq
How to implement Pub-Sub Network with a Proxy by using XPUB and XSUB in ZeroMQ(C++)?
Proxy with inproc frontend
ZMQ模式详解——发布/订阅模式
ZeroMQ基础入门
zmq pub/sub使用详解相关推荐
- Linux中级实战专题篇:rabbitmq(消息中间件p2p模式和pub模式,消息队列rabbitmq详解,单机安装,集群部署以及配置实战)
一.消息中间件相关概念 1.简介 消息中间件也可以称消息队列,是指用高效可靠的消息传递机制进行与平台相关 的数据交流,并基于数据通信来进行分布式系统的集成.通过提供消息传递和消息 队列模型,可以在分布 ...
- linux卸载hadoop版本,centos6.5 安装hadoop1.2.1的教程详解【亲测版】
本篇只简单介绍安装步骤 1. 角色分配 10.11.84.4 web-crawler--1.novalocal master/slave 10.11.84.5 web-crawler--2.noval ...
- linux yum命令详解
yum(全称为 Yellow dog Updater, Modified)是一个在Fedora和RedHat以及SUSE中的Shell前端软件包管理器.基於RPM包管理,能够从指定的服务器自动下载RP ...
- x264代码剖析(一):图文详解x264在Windows平台上的搭建
x264代码剖析(一):图文详解x264在Windows平台上的搭建 X264源码下载地址:http://ftp.videolan.org/pub/videolan/x264/ 平台:win7 PC. ...
- Nmap扫描教程之基础扫描详解
Nmap扫描教程之基础扫描详解 Nmap扫描基础扫描 当用户对Nmap工具了解后,即可使用该工具实施扫描.通过上一章的介绍,用户可知Nmap工具可以分别对主机.端口.版本.操作系统等实施扫描.但是,在 ...
- ansible 详解
文章目录 一.ansible 简介 1.1 ansible 是什么? 1.2 ansible 特点 1.3 ansible 架构图 二.ansible 任务执行 2.1 ansible 任务执行模式 ...
- 红帽企业集群和存储管理之DRBD+Heartbeat+NFS实现详解
红帽企业集群和存储管理之 DRBD+Heartbeat+NFS实现详解 案例应用背景 本实验部署DRBD + HEARDBEAT + NFS 环境,建立一个高可用(HA)的文件服务器集群.在方案中,通 ...
- Linux 高可用(HA)集群之keepalived详解
大纲 一.前言 二.Keepalived 详解 三.环境准备 四.LVS+Keepalived 实现高可用的前端负载均衡器 一.前言 这篇文章是前几篇文章的总结,我们先简单的总结一下我们前面讲解的内容 ...
- Maven settings.xml配置详解
首先:Maven中央仓库的搜索全部公共jar包的地址是,http://search.maven.org/ ===Maven基础-默认中央仓库============================== ...
- Java消息服务JMS详解
2019独角兽企业重金招聘Python工程师标准>>> JMS: Java消息服务(Java Message Service) JMS是用于访问企业消息系统的开发商中立的API.企业 ...
最新文章
- mysql分页案例_php+mysql 进行分页案例
- SQL Server 数据岸问题
- 基于Chrome开源提取的界面开发框架 三
- Attention Is All You Need (transformer)
- 软件项目管理0716:责任分工明确
- Arduino--库函数头文件
- labview如何加载库_迈德威视工业相机LabView 开发指导
- [Android Pro] 完美解决隐藏Listview和RecyclerView去掉滚动条和滑动到边界阴影的方案...
- 用 SAP ABAP 编写的俄罗斯游戏
- 求质数算法的N种境界 (N 10) zz
- Spring Cloud构建微服务架构:消息驱动的微服务(核心概念)【Dalston版】
- C语言:用二维字符数组的每行存储键盘输入的字符串,将这些字符串按字典顺序升序排序,输出排序后的结果。
- 修改android的avd路径方法
- Fiddler中文版 软件分享(亲测可用!)
- 创建一个urdf机器人_ROS机器人Diego 1#制作(十六)创建机器人的urdf模型描述文件详解...
- Google引擎搜索技巧
- 入门SAP MM的学习流程
- HBase --------- 深入了解HBase架构(架构组建及HBase工作原理)
- R语言使用循环语句一次性画出多幅图
- PyCharm + PySide2/PySide6 外部工具配置