ZMQ特点

普通的socket是端对端的关系,ZMQ是N:M的关系,socket的连接需要显式地建立连接,销毁连接,选择协议(TCP/UDP)和错误处理,ZMQ屏蔽了这些细节,像是一个封装了的socket库,让网络编程变得更简单。ZMQ不关用于主机与主机之间的socket通信,还可以是线程和进程之间的通信。ZMQ提供的套接字可以在多种协议中传输消息,线程间,进程间,TCP等。可以使用套接字创建多种消息模式,如‘请求-应答模式’,‘发布-订阅模式’,‘分布式模式’等。

组件来去自如,ZQM会负责自动重连,服务端和客户端可以随意的退出网络。tcp的话,必须先服务端启动,再启动客户端,否则会报错。

  1. ZMQ会在必要的情况下将消息放入队列中保存,一旦建立了连接就开始发送。
  2. ZMQ有阈值机制,当队列满的时候,可以自动阻塞发送者,或者丢弃部分消息。
  3. ZMQ可以使用不同的通信协议进行连接,TCP,进程间,线程间。
  4. ZMQ提供了多种模式进行消息路由。如请求-应答模式,发布-订阅模式等,这些模式可以用来搭建网络拓扑结构。
  5. ZMQ会在后台线程异步的处理I/O操作,他使用一种不会死锁的数据结构来存储消息。

ZMQ消息模式

Reuqest-Reply(请求-应答模式)

  1. 使用Request-Reply模式,需要遵循一定的规律。
  2. 客户端必要先发送消息,在接收消息;服务端必须先进行接收客户端发送过来的消息,在发送应答给客户端,如此循环
  3. 服务端和客户端谁先启动,效果都是一样的。
  4. 服务端在收到消息之前,会一直阻塞,等待客户端连上来。

server.py

import zmq
import timecontext = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")
count = 0# 必须要先接收消息,然后再应答
if __name__ == '__main__':print('zmq server start....')while True:message = socket.recv().decode("UTF-8")count += 1print(f'received request. message:{message}; count:{count}')time.sleep(1)socket.send_string("world!")

client.py

import zmqcontext = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")# 客户端必须要先发送消息,然后在接收消息
if __name__ == '__main__':print('zmq client start....')for i in range(1, 10):socket.send_string("hello")message = socket.recv().decode("UTF-8")print(f'received response. message:{message}')

常用数据发送和接收:

# 发送数据
socket.send_json(data)                      # data 会被json序列化后进行传输 (json.dumps)
socket.send_string(data, encoding="utf-8")  # data为unicode字符串,会进行编码成子节再传输
socket.send_pyobj(obj)                      # obj为python对象,采用pickle进行序列化后传输
socket.send_multipart(msg_parts)            # msg_parts, 发送多条消息组成的迭代器序列,每条消息是子节类型,# 如[b"message1", b"message2", b"message2"]# 接收数据
socket.recv_json()
socket.recv_string()
socket.recv_pyobj()
socket.recv_multipart()

Publisher-Subscriber(发布-订阅模式)

Publisher-Subscriber模式,消息是单向流动的,发布者只能发布消息,不能接受消息;订阅者只能接受消息,不能发送消息(可参考 Redis 的发布和订阅方式)。服务端发布消息的过程中,如果有订阅者退出,不影响发布者继续发布消息,当订阅者再次连接上来,收到的消息是后来发布的消息。比较晚加入的订阅者,或者中途离开的订阅者,必然会丢掉一部分信息,如果发布者停止,所有的订阅者会阻塞,等发布者再次上线的时候回继续接受消息。

"慢连接": 我们不知道订阅者是何时开始接受消息的,就算启动"订阅者",再启动"发布者", "订阅者"还是会缺失一部分的消息,因为建立连接是需要时间的,虽然时间很短,但不是零。ZMQ在后台是进行异步的IO传输,在建立TCP连接的短时间段内,ZMQ就可以发送很多消息了。有种简单的方法来同步"发布者" 和"订阅者", 通过sleep让发布者延迟发布消息,等连接建立完成后再进行发送。

publisher.py

import zmq
import time
import randomcontext = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5555")if __name__ == '__main__':print("发布者启动.....")for i in range(1000):time.sleep(0.1)temperature = random.randint(-10, 40)message = f"我是publisher, 这是我发布给你们的第{i+1}个消息!今日温度{temperature}"socket.send_string(message)

subscriber.py

import zmqcontext = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5555")# 客户端需要设定一个过滤,否则收不到任何信息
socket.setsockopt_string(zmq.SUBSCRIBE, '')if __name__ == '__main__':print('订阅者一号启动....')while True:message = socket.recv_string()print(f"(subscriber1)接收到'发布者'发送的消息:{message}")

Push-Pull(平行管道模式/分布式处理)

Ventilator:任务发布器会生成大量可以并行运算的任务。

Worker:有一组worker会处理这些任务。

Sink:结果接收器会在末端接收所有的Worker的处理结果,进行汇总。

Worker上游和"任务发布器"相连,下游和"结果接收器"相连,"任务发布器" 和 "结果接收器"是这个网路结构中比较稳定的部分,由他们绑定至端点

Worker只是连接两个端点,需要等Worker全部启动后,再进行任务分发。Socket的连接会消耗一定时间(慢连接), 如果不进行同步的话,第一个Worker启动,会一下子接收很多任务。

"任务分发器" 会向Worker均匀的分发任务(负载均衡机制)

"结果接收器" 会均匀地从Worker处收集消息(公平队列机制)

ventilator.py

import zmq
import randomraw_input = input
context = zmq.Context()sender = context.socket(zmq.PUSH)
sender.bind("tcp://*:5557")sink = context.socket(zmq.PUSH)
sink.connect("tcp://localhost:5558")if __name__ == '__main__':# 同步操作print("Press Enter when the workers are ready: ")_ = raw_input()print("Sending tasks to workers…")sink.send_string('0')# 发送十个任务total_msec = 0for task_nbr in range(10):# 每个任务耗时为Nworkload = random.randint(1, 5)total_msec += workloadsender.send_string(f"{workload}")print("10个任务的总工作量: %s 秒" % total_msec)
Sending tasks to workers…
10个任务的总工作量: 25 秒

worker1.py

import time
import zmqcontext = zmq.Context()receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")sender = context.socket(zmq.PUSH)
sender.connect("tcp://localhost:5558")if __name__ == '__main__':while True:s = receiver.recv().decode("UTF-8")print(f'work1 接收到一个任务... 需要{s}秒')# Do the worktime.sleep(int(s))# Send results to sinksender.send_string(f'work1 完成一个任务,耗时{s}秒')
work1 接收到一个任务... 需要2秒
work1 接收到一个任务... 需要1秒
work1 接收到一个任务... 需要3秒
work1 接收到一个任务... 需要5秒
work1 接收到一个任务... 需要3秒

worker2.py

import time
import zmqcontext = zmq.Context()receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")sender = context.socket(zmq.PUSH)
sender.connect("tcp://localhost:5558")if __name__ == '__main__':while True:s = receiver.recv().decode("UTF-8")print(f'work2 接收到一个任务... 需要{s}秒')# Do the worktime.sleep(int(s))# Send results to sinksender.send_string(f'work2 完成一个任务,耗时{s}秒')
work2 接收到一个任务... 需要3秒
work2 接收到一个任务... 需要2秒
work2 接收到一个任务... 需要3秒
work2 接收到一个任务... 需要1秒
work2 接收到一个任务... 需要2秒

sink.py

import time
import zmqcontext = zmq.Context()receiver = context.socket(zmq.PULL)
receiver.bind("tcp://*:5558")if __name__ == '__main__':s = receiver.recv()print('开始接收处理结果.....')# 计时,所有任务处理完一共需要多久start_time = time.time()# 接受十个任务的处理结果for task_nbr in range(10):s = receiver.recv_string()print(s)end_time = time.time()print("2个worker同时工作,耗时: %d 秒" % (end_time-start_time))
开始接收处理结果.....
work1 完成一个任务,耗时2秒
work2 完成一个任务,耗时3秒
work1 完成一个任务,耗时1秒
work2 完成一个任务,耗时2秒
work1 完成一个任务,耗时3秒
work2 完成一个任务,耗时3秒
work2 完成一个任务,耗时1秒
work1 完成一个任务,耗时5秒
work2 完成一个任务,耗时2秒
work1 完成一个任务,耗时3秒
2个worker同时工作,耗时: 14 秒

ZMQ超时重试

server.py

import zmq
import timecontext = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")
count = 0# 必须要先接收消息,然后再应答
if __name__ == '__main__':print('zmq server start....')while True:message = socket.recv().decode("UTF-8")count += 1print(f'received request. message:{message}; count:{count}')time.sleep(1)socket.send_string("ping test success")

client.py

import zmq# 超时重连
class PingPort():def __init__(self):self.port = '5555'self.socket_req_url = 'tcp://localhost:{}'.format(self.port)self.socket_req = zmq.Context().socket(zmq.REQ)self.socket_req.connect(self.socket_req_url)self.poller = zmq.Poller()self.poller.register(self.socket_req, zmq.POLLIN)def ping(self):self.socket_req.send_string('ping test')if self.poller.poll(5555):resp = self.socket_req.recv().decode("UTF-8")print(resp)else:print('ping {} port fail, no response.'.format(self.port))self.socket_req.setsockopt(zmq.LINGER, 0)self.socket_req.close()self.poller.unregister(self.socket_req)print('-------------begin reconnect--------------------')self.socket_req = zmq.Context().socket(zmq.REQ)self.socket_req.connect(self.socket_req_url)self.poller = zmq.Poller()self.poller.register(self.socket_req, zmq.POLLIN)self.ping()if __name__ == '__main__':obj = PingPort()obj.ping()

未超时:

ping test success

已超时(若服务端未开启):

ping 5555 port fail, no response.
-------------begin reconnect--------------------
ping 5555 port fail, no response.
-------------begin reconnect--------------------

ZMQ特点及消息模式相关推荐

  1. zmq java 消息阻塞_ZMQ的三种消息模式

    一. ZMQ是什么? 普通的socket是端对端(1:1)的关系,ZMQ是N:M的关系,socket的连接需要显式地建立连接,销毁连接,选择协议(TCP/UDP)和 错误处理,ZQM屏蔽了这些细节,像 ...

  2. ZeroMQ接口函数之 :zmq - 0MQ 轻量级消息传输内核

    官方网址:http://api.zeromq.org/4-0:zmq zmq(7) 0MQ Manual - 0MQ/3.2.5 Name zmq – ØMQ 轻量级消息传输内核 Synopsis # ...

  3. Redis的两种消息模式

    Redis的两种消息模式 队列模式 发布订阅模式 队列模式 队列模式下每个消费者可以同时从多个服务器读取消息,但是每个消息只能被一个消费者读取. 在队列模式下其实每次插入的数据都是载入在最前面的,而先 ...

  4. 消息模式在实际开发应用中的优势

    曾经.NET面试过程中经常问的一个问题是,如果程序集A,引用B ,B 引用C,那么C怎么去访问A中的方法呢. 这个问题初学.net可能一时想不出该咋处理,这涉及到循环引用问题.但有点经验的可能就简单了 ...

  5. ActiveMQ之发布- 订阅消息模式实现

    一.概念 发布者/订阅者模型支持向一个特定的消息主题发布消息.0或多个订阅者可能对接收来自特定消息主题的消息感兴趣.在这种模型下,发布者和订阅者彼此不知道对方.这种模式好比是匿名公告板.这种模式被概括 ...

  6. jms消息模式和区别_JMS管理对象和JMS消息

    jms消息模式和区别 Before reading this post, please go through my previous at "JMS Messaging Models&quo ...

  7. 命名管道的使用方式:消息模式/字节模式

    转自:http://blog.sina.com.cn/s/blog_71b3a9690100usem.html 由于自己在写进程间通信的相关程序,查阅了关于资料.觉得命名管道方法实现通信是不错的选择, ...

  8. RabbitMQ之消息模式简单易懂,超详细分享

    前言 上一篇对RabbitMQ的流程和相关的理论进行初步的概述,如果小伙伴之前对消息队列不是很了解,那么在看理论时会有些困惑,这里以消息模式为切入点,结合理论细节和代码实践的方式一起来学习. 正文 常 ...

  9. 并发编程含义比较广泛,包含多线程编程、多进程编程及分布式程序等 目录 1. “共享内存系统”,消息传递系统”。 1 1.1. 共享模式 多进程 多线程 1 1.2. Actor消息模式 事件驱动 2

    并发编程含义比较广泛,包含多线程编程.多进程编程及分布式程序等 目录 1. "共享内存系统",消息传递系统". 1 1.1. 共享模式 多进程 多线程 1 1.2. Ac ...

最新文章

  1. 网易云游戏来了:手机电脑电视随时接入可玩,高流畅度低延迟,还能跨终端无缝切换...
  2. 基于matlab_simulink的捷联惯性导航系统仿真,基于MATLAB/Simulink的捷联惯性导航系统仿真...
  3. Json格式的netconf转成NormalizedNode
  4. 【linux】修改机器时间
  5. MySQL中事务控制语句_Mysql事务控制语言
  6. java线程间ThreadLocal的传递
  7. Node.js webpack 打包的入口与出口
  8. Windows下Weblogic 12c单机安装与部署
  9. 编程加速服务器_英特尔:将可编程加速进行到底
  10. Codeforces Round #661-C Boats Competition
  11. 扑克牌图片一张一张_扑克牌玩法 | 简单易上手的扑克游戏,重点是你没玩过!...
  12. N个结点不同结构的二叉树个数
  13. c语言调光程序,dmx512协议c语言编程
  14. python获取股票分时数据_AkShare-股票数据-分时数据
  15. 怎样才能够修改PDF文件中的文字大小
  16. macd底背离的python_java尝试编写macd,试验顶背离底背离
  17. vue中的key有什么作用?(key的内部原理)
  18. SQL----关于三种其他类型转字符串类型的函数
  19. 谷歌拼音输入法PinyinIME源码修改----随着Setting中中英文的切换对应改变软键盘中英文输入且字符变换
  20. openssl生成随机密码。

热门文章

  1. 使用Snake代理跳板
  2. 解决Ubuntu18.04搜狗拼音输入法选词面板出现乱码问题
  3. 【安卓开发】android studio 学习入门篇
  4. 读研期间发84篇SCI!研究生被怀疑有“背景”和学术造假,本人回应!
  5. 这才是数字孪生污水处理厂该有的样子 | 智慧水务
  6. NSIS终极篇(安装包、常用网站)
  7. Qt视频剪辑软件开发(一):开发步骤介绍
  8. 打飞机python(完整版)
  9. 北科大计算机与通信工程博士,2015年北科大计算机与通信工程学院考研拟录取名单...
  10. 计算机网络自顶向下方法,第7版—第1章习题