重头戏!ZeroMQ的发布-订阅模式详解:ZMQ_PUB、ZMQ_SUB
一、ØMQ模式总览
- ØMQ支持多种模式,具体可以参阅:https://blog.csdn.net/qq_41453285/article/details/106865539
- 本文介绍ØMQ的“发布-订阅”模式
二、发布-订阅模式
- 发布-订阅模式由https://rfc.zeromq.org/spec/29/正式定义
- 在发布-订阅模式中,有一个发布者用来发送消息,该模式中有很多订阅者会接收发布者发布的消息
- “发布-订阅”模型支持的套接字类型有4种:
- ZMQ_PUB
- ZMQ_SUB
- ZMQ_XPUB
- ZMQ_XSUB
三、“PUB-SUB”套接字类型
- PUB就是发布者,SUB就是订阅者
ZMQ_PUB
- 发布者使用类型为ZMQ_PUB的套接字来分发数据。发送的消息以扇出方式分发给所有连接的对等方
- 在ZMQ_PUB类型的套接字上不能执行zmq_msg_recv()等接收数据的函数
- 当ZMQ_PUB套接字由于已达到订阅者的高水位标记而进入静音状态时,将发送给有问题的订阅者的任何消息都将被丢弃,直到静音状态结束为止。关于“高水位标记”请参阅:
- 对于该套接字类型,zmq_msg_send()函数将永远不会阻塞
ZMQ_PUB特性摘要 兼容的对等套接字 ZMQ_SUB、ZMQ_XSUB 方向 单向 发送/接收模式 仅发送 入网路由策略 不适用(N/A) 外发路由策略
扇出(Fan out) 静音状态下的操作 丢弃 ZMQ_SUB
- 订阅者使用ZMQ_SUB类型的套接字来订阅发布者分发的数据
- ZMQ_SUB套接字创建完成之后,ZMQ_SUB套接字未订阅任何消息,请使用zmq_setsockopt()的ZMQ_SUBSCRIBE选项指定要订阅的消息
- 在ZMQ_PUB类型的套接字上不能执行zmq_msg_recv()等接收数据的函数
ZMQ_SUB特性摘要 兼容的对等套接字 ZMQ_PUB、ZMQ_XPUB 方向 单向 发送/接收模式 仅接收 入网路由策略 公平排队 外发路由策略
不适用(N/A)
- 下面编写一个使用“SUB-PUB”的发布订阅演示案例:
- 发布者:类似于一个天气更新服务器,向订阅者发送天气更新,内容包括邮政编码、温度、湿度等信息
- 订阅者:它监听发布者更新的数据流,过滤只接收与特定邮政编码相关的天气信息,默认接收接收10条数据
- 发布者代码如下:
// 源码链接:https://github.com/dongyusheng/csdn-code/blob/master/ZeroMQ/wuserver.c
// wuserver.c
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <unistd.h>
#include <string.h>
#include <time.h>
#include <zmq.h>// 随机生成0...num-1的随机数
#define randof(num) (int) ((float) (num) * random () / (RAND_MAX + 1.0))// 将string消息格式化为zmq_meg_t对象, 然后发往socket套接字上
static int s_send(void *socket, char *string);int main()
{// 1.初始化上下文void *context = zmq_ctx_new();// 2.创建、绑定套接字void *publisher = zmq_socket(context, ZMQ_PUB);assert(publisher != NULL);// 此处我们将发布者绑定到一个tcp节点上和一个ipc节点上, 但是本案例我们只使用tcp, ipc那个只是演示说明zmq的套接字可以绑定到多个节点上int rc = zmq_bind(publisher, "tcp://*:5555");assert(rc == 0);rc = zmq_bind(publisher, "ipc://weather.ipc");assert(rc == 0);// 3.初始化随机数发生器srandom((unsigned)time(NULL));// 4.循环发送数据while(1){// 5.随机生成邮政编码、温度、适度int zipcode, temperature, relhumidity;zipcode = randof(100000);temperature = randof(215) - 80;relhumidity = randof(50) + 10;// 6.将消息发送给所有的订阅者char update[20];sprintf(update, "%05d %d %d", zipcode, temperature, relhumidity);rc = s_send(publisher, update);assert(rc);}// 7.关闭套接字、销毁上下文zmq_close(publisher);zmq_ctx_destroy(context);return 0;
}static int s_send(void *socket, char *string)
{// 初始化一个zmq_msg_t对象, 分配的大小为string的大小zmq_msg_t msg;zmq_msg_init_size(&msg, strlen(string));memcpy(zmq_msg_data(&msg), string, strlen(string));// 发送数据int rc = zmq_msg_send(&msg, socket, 0);// 关闭zmq_msg_t对象zmq_msg_close(&msg);return rc;
}
- 订阅者代码如下:
// 源码链接:https://github.com/dongyusheng/csdn-code/blob/master/ZeroMQ/wuclient.c
// wuclient.c
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <unistd.h>
#include <string.h>
#include <zmq.h>// 从socket接收数据, 并将数据返回
char *s_recv(void *socket);int main(int argc, char *argv[])
{// 1.初始化上下文void *context = zmq_ctx_new();// 2.创建套接字、连接发布者void *subscriber = zmq_socket(context, ZMQ_SUB);assert(subscriber != NULL);int rc = zmq_connect(subscriber, "tcp://localhost:5555");assert(rc == 0);// 3.因为自己是订阅者, 因此需要使用设置过滤器, 显式指定自己是否需要接收什么类型的消息// 程序运行时可以输入参数, 参数代表邮政编码, 如果参数为空, 那么就过滤10001的消息char *filter = (argc > 1) ? argv[1] : "10001";rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, filter, strlen(filter));assert( rc == 0);// 4.从发布者那里接收消息, 接收10条自己想要的数据int update_nbr;long total_temp = 0;for(update_nbr = 0; update_nbr < 10; update_nbr++){// 5.接收数据char *string = s_recv(subscriber);assert(string != NULL);// 6.将数据中的邮政编码、温度、适度分别存储变量中int zipcode, temperature, relhumidity;sscanf(string, "%d %d %d", &zipcode, &temperature, &relhumidity);total_temp += temperature;free(string);}// 7.接收完成之后, 打印一下平均温度printf("Average tempature for zipcode '%s' was %dF\n", filter, (int)(total_temp / update_nbr));// 8.关闭套接字、销毁上下文zmq_close(subscriber);zmq_ctx_destroy(context);return 0;
}char *s_recv(void *socket)
{// 创建zmq_msg_t对象接收数据zmq_msg_t msg;zmq_msg_init(&msg);int size = zmq_msg_recv(&msg, socket, 0);if(size == -1){return NULL;}// 将zmq_msg_t对象中的数据保存到字符串中char *string = (char*)malloc(size + 1);memcpy(string, zmq_msg_data(&msg), size);zmq_msg_close(&msg);string[size] = 0;return string;
}
- 编译如下:
gcc -o wuserver wuserver.c -lzmq
gcc -o wuclient wuclient.c -lzmq
- 一次运行如下,左侧为发布者,右侧为订阅者,订阅者没有传入参数,因此默认订阅的是邮政编码为“10001”的数据
- 又运行一次如下,订阅者传入的参数为“10002”,因此订阅的是邮政编码为“10002”的数据
四、“XPUB-XSUB”套接字类型
- “XPUB-XSUB”套接字类型与“PUB-SUB”套接字类型相同,也是属于发布-订阅
- 在“PUB-SUB”中,订阅者通过zmq_connect()向发布者发起订阅;但是“XPUB-XSUB”套接字类型允许订阅者通过发送一条订阅信息到发布者来完成订阅
ZMQ_XPUB
- 用法与ZMQ_PUB大部分相同
- 但是有一点与ZMQ_PUB不同:ZMQ_XPUB(自己)的订阅方可以向自己发送一个订阅信息来进行订阅。订阅消息是字节1(用于订阅)或字节0(用于取消订阅),后跟订阅主体。也接收不带子/取消订阅前缀的消息,但对订阅状态没有影响
ZMQ_XPUB特性摘要 兼容的对等套接字 ZMQ_SUB、ZMQ_XSUB 方向 单向 发送/接收模式 发送消息,接收订阅 入网路由策略 不适用(N/A) 外发路由策略
扇出(Fan out) 静音状态下的操作 丢弃 ZMQ_XSUB
- 用法与ZMQ_SUB大部分相同
- 但是有一点与ZMQ_SUB不同:自己可以向发布者发送一条订阅信息来进行订阅。订阅消息是字节1(用于订阅)或字节0(用于取消订阅),后跟订阅主体。也接收不带子/取消订阅前缀的消息,但对订阅状态没有影响
ZMQ_XSUB特性摘要 兼容的对等套接字 ZMQ_PUB、ZMQ_XPUB 方向 单向 发送/接收模式 接收消息,发送订阅 入网路由策略 公平排队 外发路由策略
不适用(N/A) 静音状态下的操作 丢弃
XSUB、XPUB应用之“代理”
动态发现问题:在设计大型的分布式体系结构时,会遇到的问题之一是——“发现”。也就是说,部件如何认识对象,以及部件增减或减少时,如何更新这些消息,因此,我们称之为“动态发现”
动态发现解决方案①:简单的方式是通过硬编码(或配置)的网络架构来完全避免,一般通过手工操作。但是这种方案导致体系结构变得脆弱和笨重。例如一个系统有一个发布者和一百个订阅者,你需要对每一个订阅者配置发布者端点来让每个订阅者连接到发布者服务器。订阅者是动态的,发布者是静态的,如果你又新增了新的发布者,那么就需要再配置这一百个订阅者,工作量相当的大
- 动态发现解决方案②:通过中间层(代理)来实现,这种方法比较推荐
- 在“发布-订阅”模型中,我们可以在中间增加一个新的代理节点,该节点绑定了XSUB套接字和XPUB套接字,发布者连接到XSUB中,订阅者连接到XPUB中。这样一来添加或者删除发布者或订阅者节点就变得微不足道了
- 对于代理节点,其需要执行订阅转发:SUB套接字需要将自己的订阅信息作为特殊消息发送到代理的XPUB端点上,代理转发这些订阅消息到XSUB上,然后XSUB再将消息发送到PUB,从而最终完成SUB到PUB的订阅
- 当完成订阅之后,PUB直接发送消息,SUB可以直接收到,不需要代理进行转发
- ØMQ自己提供了代理的接口,可以省略代码的书写,详情可参阅:https://blog.csdn.net/qq_41453285/article/details/106887035
五、发布-订阅的相关注意事项
关于“订阅”的说明(对于SUB、SUBX端来说)
- 当你使用一个SUB、XSUB套接字时(订阅方)必须使用zmq_setsockopt()和SUBSCRIBE设置一个订阅(例如上面的客户端的代码所示)。如果你创建了一个SUB套接字,但是没有设置任何订阅,那么就不会得到任何消息
- 订阅者可以设置许多的订阅,它们被累加在一起。也就是说,如果某个更新匹配任何订阅,那么订阅者都会接收到它
- 订阅者也可以取消特定的订阅
- 订阅经常但不一定是一个可打印的字符串
- 要了解这是如何工作的,请参见zmq_setsockopt()和下面的演示案例
“慢木匠”症状
- 如果发布者开启之后再启动订阅者,那么在订阅者启动的这段时间内,发布者发送的消息订阅者就接收不到了
- “满木匠”症状:即使先启动订阅者,再启动发布者,订阅者也可能错过发布者发送的消息:因此当订阅者连接到发布者时(这需要的时间很短,但非0),发布者可能已经将消息发送出去了
- “满木匠”症状案例:
- ZeroMQ在后台执行异步I/O。假设你有两个节点执行次操作,顺序如下:
- 订阅者接连到一个端点,并接收和处理消息
- 发布者绑定到一个端点,并立即发送10000条消息
- 订阅者很可能不会收到任何东西(在设置了正确的过滤器的情况下)
- 建立TCP连接包含会花几毫秒的握手,这取决于你的网络和节点间的跳数。在这段时间里,ZeroMQ可以发送很多消息
- 解决方法:
- 在后面的文章中,我们将解释如何来同步发布者和订阅者,这样就不会启动数据发布,直到订阅者真正连接并准备就绪
- 有一个简单(愚蠢)的方式来延迟发布,就是休眠。但是,在实际中不建议这么做
- 同步的替代办法是简单地假定发布的数据流是无限的,它没有起点也没有终点。人们还假设订阅者不关心它启动前发生了什么事情
- 关于发布-订阅模式的几个要点:
- 订阅者不可以使用zmq_msg_send()等发送消息的函数,发布者不可以使用zmq_msg_recv)等接收消息的函数
- 一个订阅者可以连接到多个发布者,每次使用一个connect调用。那么数据将交错到达(“公平排队”)
- 如果一个发布者没有连接的订阅者,那么它简单地丢弃所有消息
- 如果你使用的是TCP并且订阅者是慢速的,那么消息将在发布方排队。我们将在后面的文章中介绍如何通过使用“高水位线”来针对这种情况保护发布者
- 从ZeroMQ v3.x开始,在使用连接的协议(tcp或ipc)时,过滤发生在发布方。使用epgm协议,过滤发生在订阅方。但在ZeroMQ v2.x版本中,所有过滤都发生在订阅方。
- 我是小董,V公众点击"笔记白嫖"解锁更多【ZeroMQ】资料内容。
重头戏!ZeroMQ的发布-订阅模式详解:ZMQ_PUB、ZMQ_SUB相关推荐
- redis发布订阅模式详解
文章目录 写在前面 发布订阅的使用 SUBSCRIBE命令 PUBLISH命令 注意发布.订阅客户端启动顺序! PUBSUB命令 PUNSUBSCRIBE命令 UNSUBSCRIBE命令 PSUBSC ...
- javascript 观察者(发布订阅)模式详解
写给读者的话 本人是千千万万前端小白中的一员,所以对前端小白的痛苦感同身受,面对一个新的知识点,很多时候感到束手无策.网上搜资料,有的不全,有的看不懂,所以本人作为小白,很有义务将自己觉得理解了的知识 ...
- [设计模式] ------ 观察者模式和他的升级版发布订阅模式
概念 观察者模式,原理很简单,把A类的子类分别注入到B类中,通过用B类调用方法,循环调用A类的方法,就是所谓观察者模式 伪代码如下,最快的速度理解观察者模式: 接口 A{// 观察者接口notify( ...
- ZMQ模式详解——发布/订阅模式
一个例子 1.一个服务器负责生成天气相关数据(邮编.温度.湿度),然后将这些数据发布到所有需要知悉天气的客户端: 2.一个客户端需要时刻更新和获取最新的邮政编码,在没有获取之前可默认为纽约地区. ## ...
- linux发布微软消息队列,消息队列RabbitMQ入门与5种模式详解
1.RabbitMQ概述 简介: MQ全称为Message Queue,消息队列是应用程序和应用程序之间的通信方法: RabbitMQ是开源的,实现了AMQP协议的,采用Erlang(面向并发编程语言 ...
- RabbitMQ工作流程和工作模式详解
RabbitMQ工作流程 生产者发送消息的流程 生产者连接RabbitMQ,建立TCP连接( Connection),开启信道(Channel) 生产者声明一个Exchange(交换器),并设置相关属 ...
- Spotify敏捷模式详解三部曲第二篇:研发过程
本文转自:Scrum 中文网 引言 在本系列文章的第一篇,我们介绍了Spotify的敏捷研发团队,以及它独特的组织架构.Spotify的研发团队采用的是一种非常独特的组织架构,如下图所示: 整个研发组 ...
- Spotify敏捷模式详解三部曲第一篇:研发团队
本文转自:Scrum中文网 引言 2018年4月,来自北欧瑞典的音乐流媒体公司.百亿美元独角兽Spotify创造了历史,它成为了当代上市公司当中,第一家通过"直接上市"的方式在美国 ...
- 点击事件调用匿名函数如何传参_事件发布/订阅模式的简单实现
这是一种广泛应用于异步编程的模式,是回调函数的事件化,常常用来解耦业务逻辑.事件的发布者无需关注订阅的侦听器如何实现业务逻辑,甚至不用关注有多少个侦听器存在.数据通过消息的方式可以灵活的传递. --& ...
- 敏捷开发系列学习总结(14)——Spotify敏捷模式详解三部曲第二篇:研发过程
分享一个大神的人工智能教程.零基础!通俗易懂!风趣幽默!还带黄段子!希望你也加入到人工智能的队伍中来!点击浏览教程 摘要 在本系列文章的第一篇,我们介绍了Spotify的敏捷研发团队,以及它独特的组织 ...
最新文章
- 网络上可供测试的Web Service
- C编译器、链接器、加载器详解
- 蓝桥杯 试题 基础练习 特殊回文数——16行代码AC
- 浅谈SQLiteOpenHelper之onUpgrade例子
- 赞!苏州大学95后硕士一作发《Nature》!
- 1.4 为什么深度学习会兴起?(Why is Deep Learning taking off?)
- 【译】WebSocket协议第四章——连接握手(Opening Handshake)
- Unity-中英对照汉化
- PCIe5.0 协议
- linux 密码字典生成,Linux下的字典生成工具Crunch 创造自己的专属字典
- 大规模知识图谱数据存储实战解析
- Java---SSM---Spring(1)
- 常用的无线充发射IC芯片
- 解决electron-vue打包错误问题,nsis和winCodeSign下载失败问题
- scrapy源码学习 - 启动一个crawl命令
- redis 穿透、雪崩、击穿
- 黑吧安全网--古墓探秘
- DIDI3(数字转为TP,小写转换大写)
- 为什么停用CentOS?
- EXCEL条件格式(一)