ZeroMQ XPub/XSub模式


Motivation

相对于XPub/XSub模式,我们很容易想到Pub/Sub模式,即订阅发布模式。当我们使用ZeroMQ创建一个包含订阅发布模式的系统时,我们通常创建一个消息的发布者,即Publisher,和若干个消息订阅者(Subscriber)。消息的发布者绑定端口,订阅者通过发布者的IP和端口连接发布者,并且注册消息主题(Topic),然后进行接收匹配主题的消息。整体结构如下图:

在订阅发布模式下,订阅者可以动态加入,随时连接消息的发布者,然后接收消息。但是,在这种结构中,如果有新的Publisher加入,那么所有订阅者都需要连接到这个Publisher上。如果系统中有成百上千的订阅者,每一个新的Publisher的加入都会给系统造成很大的操作成本,这显然限制了系统规模。要解决这个问题,也很简单,就像只有一个发布者情况,所有的订阅者都只与这一个消息发布者交互,不管是Publisher内部发生什么变化,Subscriber都可以动态感知这种变化。所以很容易我们可以想到创建一个中间件来解耦Publishers和Subscribers,所有Subscriber都只与这一个中间件交互,换句话说,这个中间件从很多个Publisher那里接收消息,然后转发给Subscibers。事实上,有了这个中间件,我们可以做很多Pub/Sub模式做不了的事情,比如说对传送过程中的消息进行管理,重构,或者对系统进行负载均衡等等。我们把这个中间件称为Broker,上面说的这种模式,我们称之为XPub/XSub模式

XPub/XSub

在XPub/XSub模式中,对Publisher来说由Pub/Sub模式中的bind操作变成了connect操作,connect的对象为Broker中的XSub端口。对Subscriber而言,和Publisher的操作一样,只不过connect的是Broker的XPub端口。在Broker中我们绑定XSub和XPub这两个端口。Proxy的作用即为中转消息,在ZMQ的API中提供了zmq.proxy方法来中转消息,其实Proxy就是一个代码块,在这个代码块中可以做任何我们想做的操作。后面会介绍一个简单的例子。从XPub/XSub这个模式中,我们可以发现,不管是Publisher还是Subscriber,它们的加入和离开都可以被系统动态发现。

XPub/XSub例子

在这个例子中,我们让Publisher从CSV文件中读取数据,在Broker中维护一个buffer,如果有Subscriber加入,我们首先发送缓冲区的历史消息,然后转发新消息给Subscribers。

封装ZMQ API

# -*- coding: utf-8 -*-
# utl.py
import zmqdef get_publisher(address, port):context = zmq.Context()socket = context.socket(zmq.PUB)connect_addr = 'tcp://%s:%s' % (address, port)socket.connect(connect_addr)return socketdef get_subscriber(address, port, topics):# Subscriber can register one more topics oncecontext = zmq.Context()socket = context.socket(zmq.SUB)connect_addr = 'tcp://%s:%s' % (address, port)socket.connect(connect_addr)if isinstance(topics, str):socket.subscribe(topics)elif isinstance(topics, list):[socket.subscribe(topic) for topic in topics]return socketdef get_broker(xsub_port, xpub_port):context = zmq.Context()xsub_socket = context.socket(zmq.XSUB)xsub_addr = 'tcp://*:%s' % xsub_portxsub_socket.bind(xsub_addr)# make xsub receive any messagexsub_socket.send(b'\x01')xpub_addr = 'tcp://*:%s' % xpub_portxpub_socket = context.socket(zmq.XPUB)xpub_socket.bind(xpub_addr)# make xpub receive verbose messagesxpub_socket.setsockopt(zmq.XPUB_VERBOSE, 1)# zmq.proxy(xsub_socket, xpub_socket)return xsub_socket, xpub_socket

Publisher类

# -*- coding: utf-8 -*-
# Publisher.pyimport csv
import time
import argparse
import utlclass Publisher(object):def __init__(self, topic, broker_address, broker_port, data, rate):''':param topic: the topic associated with messages:param broker_address: broker public IP:param broker_port: XSub port number:param data: csv file path:param rate: publishing rate, unit is second'''self.topic = topicself.pub_socket = utl.get_publisher(broker_address, broker_port)self.data = dataself.rate = ratedef publish_data(self):with open(self.data, newline='') as csv_file:reader = csv.reader(csv_file, delimiter=',')for row in reader:row.insert(0, self.topic)record = ','.join(row)self.pub_socket.send_string(record)print('[Publisher] Published message: %s' % record)time.sleep(self.rate)if __name__ == '__main__':parser = argparse.ArgumentParser()parser.add_argument('-t', '--topic', type=str, help='Topic')parser.add_argument('-a', '--address', type=str, help='Broker public IP address')parser.add_argument('-p', '--port', type=str, help='Broker XSub port number')parser.add_argument('-f', '--file', type=str, help='Data file path')parser.add_argument('-r', '--rate', type=int, help='Publishing rate in second')args = parser.parse_args()pub = Publisher(args.topic, args.address, args.port, args.file, args.rate)pub.publish_data()

Broker类

# -*- coding: utf-8 -*-
# Broker.pyimport sys
import time
import utl
import zmqclass Broker(object):def __init__(self, xsub_port, xpub_port):self.xsub_socket, self.xpub_socket = utl.get_broker(xsub_port, xpub_port)self.poller = zmq.Poller()self.poller.register(socket=self.xpub_socket, flags=zmq.POLLIN)self.poller.register(socket=self.xsub_socket, flags=zmq.POLLIN)self.buffer = {}def update_buffer(self, msg):topic = msg.split(',')[0]if topic in self.buffer:self.buffer[topic].append(msg)else:self.buffer.update({topic: [msg]})def handler(self):while True:events = dict(self.poller.poll(1000))# events from publishersif self.xsub_socket in events:msg = self.xsub_socket.recv_string()self.xpub_socket.send_string(msg)print('[Broker] Forwarded message: %s' % msg)self.update_buffer(msg)# events from subscribersif self.xpub_socket in events:topic = ''.join(list(self.xpub_socket.recv_string())[1:])if topic in self.buffer:# send history messages[self.xpub_socket.send_string(item) for item in self.buffer[topic]]else:self.xsub_socket.send_string(topic)if __name__ == '__main__':# The 1st argument is XSub port number, the 2nd is XPub port numberbroker = Broker(sys.argv[1], sys.argv[2])broker.handler()

Subscriber类

# -*- coding: utf-8 -*-
# Subscriber.pyimport argparse
import utlclass Subscriber(object):def __init__(self, broker_address, broker_port, topics):self.topics = topicsself.socket = utl.get_subscriber(broker_address, broker_port, topics)def subscribe(self):while True:msg = self.socket.recv_string()print('[Subscriber] Received message: %s' % msg)if __name__ == '__main__':parser = argparse.ArgumentParser()parser.add_argument('-t', '--topics', type=str, help='Topics separated by comma')parser.add_argument('-a', '--address', type=str, help='Broker address')parser.add_argument('-p', '--port', type=str, help='Broker port number')args = parser.parse_args()topics = args.topics.split(',')sub = Subscriber(args.address, args.port, topics)sub.subscribe()

ZeroMQ XPub/XSub模式相关推荐

  1. zeromq源代码分析2------线/进程间通信方式

    本文我们讲一下zeromq的线/进程间通信方式. 在zeromq源代码分析1中我们分析了zeromq的基本工作流程.我们了解到了zeromq的线/进程间通信的方式为消息传递. zeromq中的各线程间 ...

  2. gearman入门简介

    一.Gearman介绍 gearman,从名字上看叫做"齿轮工",就是通过齿轮把不同的组件组合在一起. 通常,多语言多系统之间的集成是项目开发中一个比较头疼的问题. 一般会采用RP ...

  3. ZMQ — 基本使用与工具类

    ZMQ - 基本使用与工具类 一.简介 官网:https://zeromq.org/ ZeroMQ(简称ZMQ)是一个基于消息队列的多线程网络库,其对套接字类型.连接处理.帧.甚至路由的底层细节进行抽 ...

  4. 大淘宝服务端技术干货沉淀和总结

    网络基础 TCP三次握手 三次握手过程 客户端--发送带有SYN标志的数据包--服务端 一次握手 Client进入syn_sent状态 服务端--发送带有SYN/ACK标志的数据包--客户端 二次握手 ...

  5. python 各种模块学习

    from:https://blog.csdn.net/weiwangchao_/article/details/70570508 转载:.... Python的模块大全,很全,有详细介绍! 另外附Py ...

  6. ZeroMQ学习笔记(2)——套接字和模式

    文章目录 一.套接字(socket)API 1.套接字接入网路拓扑 2.用套接字传输数据 3.单播传输 4.I/O线程 二.消息传递模式 1.处理消息 2.处理多个套接字 3.多部分消息 4.中间层 ...

  7. (转)ZeroMQ的模式-Requset-Reply

    2019独角兽企业重金招聘Python工程师标准>>> 我们先来看看第一种模式:Request-Reply Pattern. 请求应答模式. Request-Reply这个名字很直白 ...

  8. [zz]ZeroMQ 的模式

    在需要并行化处理数据的时候,采用消息队列通讯的方式来协作,比采用共享状态的方式要好的多.Erlang ,Go 都使用这一手段来让并行任务之间协同工作. 最近读完了 ZeroMQ 的 Guide.写的很 ...

  9. ZeroMQ 中文指南 第四章 可靠的请求-应答模式【转载】

    此文章转载自GitHub : https://github.com/anjuke/zguide-cn 作者信息如下. ZMQ 指南 作者: Pieter Hintjens ph@imatix.com, ...

  10. 传输层的各种模式——ZeroMQ 库的使用 .

    最近在研究 ZeroMQ 库的使用,所以在这里总结一下各种模式,以便日后拿来使用. 关于 ZeroMQ 库,我就不多介绍了,大家可以参考下面一些文章,以及他的官网.使用指南.API 参考.项目仓库等内 ...

最新文章

  1. 网络营销外包专员浅析企业网络营销外包整合关键点有哪些?
  2. 盛大创新院赞助首届.NET技术交流会即将召开
  3. 泛型参数怎么new_泛型编程,你不知道?(基础篇)
  4. 当程序员那么痛苦,我来告诉你他们为什么还没放弃?
  5. Java中的类变量(静态变量static的具体用法快速入门))
  6. 电商面试经验(mybatis)
  7. python异步调用_python如何实现异步调用函数执行
  8. collector_使用Data Collector进行SQL Server性能监视–第3部分–阅读报告
  9. 云服务器之间进行文件转移,windows服务器之间文件如何转移
  10. 6.4(反向显示一个整数)
  11. 在中国云市场淘金?看国际云大佬如何“软着陆”
  12. GOP I帧和IDR帧
  13. C++实现骰子涂色算法
  14. 各种经典英美剧中英字幕word文档分享
  15. VCC、 VDD、VEE、VSS 电压理解
  16. 【苹果推相册软件】imessage群发arrangesAllSubviews安装
  17. 微信公众号 - 网页服务 - 分享接口
  18. 昨晚开始了为期3个月的初级德语课,课上大家跟老师咿咿呀呀,仿佛回到了蒙学时代,感觉还是不错的!在blog里增加一个GERMAN随笔分类主要是方便自己随时学习,勿怪!Vielen Dank!...
  19. 机器人小农在CSDN的第一篇文
  20. Elasticsearch入门进阶篇

热门文章

  1. 搜狗主动提交url并反馈快照更新软件(含源码)
  2. (JAVA编程练习):输入两个正整数m和n,求其最大公约数和最小公倍数。
  3. 三年经验前端开发面试总结
  4. 计算机科任学 排名,2018软科中国最好学科排名正式发布
  5. 数据压缩算法—LZ77 vs LZ78
  6. [极客时间] 时间复杂度和空间复杂度分析
  7. 土地购买(USACO 2008 March Gold)
  8. GICv3软件overview手册之配置GIC
  9. python第八天 运算符的使用
  10. 【华为OD机试真题 JS】事件推送