文章目录

  • 一、套接字(socket)API
    • 1.套接字接入网路拓扑
    • 2.用套接字传输数据
    • 3.单播传输
    • 4.I/O线程
  • 二、消息传递模式
    • 1.处理消息
    • 2.处理多个套接字
    • 3.多部分消息
    • 4.中间层
    • 5.动态发现问题
    • 6.共享队列(DEALER与ROUTER套接字)
    • 7.zmq内置代理功能
    • 8.传输桥接
  • 三、处理错误和ETERM
  • 四、处理信号中断
  • 五、检测内存泄漏
  • 六、用zmq多线程
  • 七、线程间信令(PAIR套接字)
  • 八、节点协调
  • 九、零拷贝
  • 十、发布-订阅消息封包
  • 十一、高水位标记
  • 十二、信息丢失的解决方案

一、套接字(socket)API

Socket是对网络中不同主机上的应用进程间进行双向通信的端点抽象。

套接字可以看成是两个网络应用程序进行通信时,各自通信连接中的端点,这是一个逻辑上的概念。它是网络环境中进程间通信的API,也是可以被命名和寻址的通信端点,使用中的每一个套接字都有其类型和一个与之相连进程。通信时其中一个网络应用程序将要传输的一段信息写入它所在主机的 Socket中,该Socket通过与网络接口卡相连的传输介质将这段信息送到另外一台主机的 Socket中,使对方能够接收到这段信息。

zmq套接字的四个部分生存期(所给python代码)

  1. 创建和销毁,如socket = context.socket()
  2. 对其进行配置,如socket.setsockopt()
  3. 创建连接与绑定,如socket.connect()
  4. 写入与接收消息,如socket.send(),socket.recv()

1.套接字接入网路拓扑

两节点创建连接:
在一个节点(服务器端:静态)使用zmq_bind()把一个套接字绑定到端点,
另一个节点(客户端:动态)使用zmq_connect()把一个套接字连接到终端。

注:
1)不存在zmq_accept()方法,因为当一个套接字被绑定到一个端点的时候,会自动接受连接。
2)网络连接是在后台进行的,就算网络断开也会自动重新连接。
3)可以跨任意连接方式(inproc、ipc、tcp、pgm、epgm)
4)一个套接字不止一个输入或输出的连接
5)用用程序不可和这些连接直接交流,他们被封装在套接字之下
zmq准许任意启动和停止各个部件,比如客户端可在服务端zmq_bind()之前zmq_connect(),执行之后建立连接写入套接字,等待服务端在zmq_bind()后发送消息。

2.用套接字传输数据

用zmq_msg_send()和zmq_msg_recv()发送和接收消息。其中zmq_msg_send()方法实际上并没把消息发送到套接字连接,他会将消息排
队,这样I/O线程可以将其异步发送。所以当zmq_msg_send()返回应用时,该消息未必会被发送。
tcp套接字与zmq套接字区别:

  1. tcp传递字节流,zmq消息是指定长度的二进制数据。
  2. zmq套接字在后台线程执行自己的I/O,所以不管应用在干嘛,消息都是在本地输入队列到本地输出队列被发送。
  3. zmq套接字根据套接字类型具有内置的1对n的路由行为。

3.单播传输

tcp 、ipc、inproc都是zmq提供的单播传输。

tcp:断开连接的tcp传输,可伸缩、可移植、大多数情况也足够快。客户端和服务端可以再任何时候连接与绑定并且对应用程序保持透明。
注:“断开连接”指因为zmq的tcp传输在你连接到端点之前,无需端点存在。

ipc:进程之间的ipc传输亦是断开连接。但他不能再windows上工作。

Inproc:在线程间的传输中它是一个连接的信令传送,要比tcp与ipc快很多。但服务器必须在所有客户端发出连接之前发出一个绑定请求。

4.I/O线程

zmq在一个后台线程中执行I/O,一个I/O线程足以提供所有应用程序使用。一般让1GB/s的输入或输出数据使用一个I/O线程。可以在创建任何套接字之前调用zmq_ctx_set()。

二、消息传递模式

ZeroMQ 内置的有效绑定对:
PUB and SUB
REQ and REP
REQ and XREP
XREQ and REP
XREQ and XREP
XREQ and XREQ
XREP and XREP
PUSH and PULL
PAIR and PAIR
非正常匹配会出现意料之外的问题(未必报错,但可能数据不通路什么的,官方 说法是未来可能会有统一错误提示吧),

1.处理消息

由于zmq的发送机制,发送数据有两种状态(是否Copy),在非Copy下一旦发送成功,发送端将不能访问该数据,Copy状态则可以(主要用于重复发送)。还有就是所发送的信息都是保持在内存的,故不能随意发送大数据(以防溢出,可把大包拆开逐条发送.)

2.处理多个套接字

使用zmq_poll()可以一次性从多个套接字读取。可把zmq_poll()包装到一个框架中。
在官方文档中给出相应的封装示例代码:

import zmq# 创建上下文和套接字
context = zmq.Context()
# 连接任务发生器
receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")# 连接天气服务器
subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://localhost:5556")# 筛选以10001开头的消息
subscriber.setsockopt(zmq.SUBSCRIBE, b"10001")#初始化轮询池
poller = zmq.Poller()
poller.register(receiver, zmq.POLLIN)
poller.register(subscriber, zmq.POLLIN)# 处理来自两个套接字的消息
while True:try:socks = dict(poller.poll())except KeyboardInterrupt:break
#处理任务消息if receiver in socks:message = receiver.recv()
#处理天气服务器消息if subscriber in socks:message = subscriber.recv()

3.多部分消息

zmq允许有多个帧组成单个消息。
a) 当发送多部分消息时,只有发送最后的部分时,第一部分和以下的所有部分才会在线程上发送。
b) 如果使用zmq_poll(),则在收到第一部分时,其余所有部分皆以收到。
c) 不是全收,就是全没收到。
d) 消息的每个部分都是单独的zmq_msg。
e) 无论设置否,都会收到消息的所有部分。
f) 发送时,zmq消息帧在内存中排队,直到接收到最后一个部分才全部发送。
g) 除非关闭套接字,否则无法取消部分发送的消息。

4.中间层

zmq中的中间层根据山下文称为代理(proxy)、队列、转发器、设备或broker

5.动态发现问题

动态发现:部件如何认识对象,以及部件增减或减少时,如何更新这些消息。
zmq中的解决方案:通过中间层(代理)来实现。
在“发布/订阅”模型中增加代理节点如图所示,该节点绑定XSUB套接字和XPUB套接字,PUB连接到XSUB中,SUB连接到XPUB中。
SUB套接字将自己的订阅信息作为特殊消息发送到代理的XPUB端点上,代理转发这些订阅消息到XSUB上,然后XSUB再将消息发送到PUB,从而最终完成SUB到PUB的订阅,当完成订阅之后,PUB直接发送消息,SUB可以直接收到,不需要代理进行转发.
XPUB-XSUB套接字类型与PUB-SUB套接字类型相同,属于发布-订阅,在PUB-SUB套接字中,订阅者通过zmq_connect()向发布者发起订阅;但是XPUB-XSUB套接字类型允许订阅者通过发送一条订阅信息到发布者来完成订阅。
ZMQ_XPUB用法与ZMQ_PUB大部分相同,但ZMQ_XPUB(自己)的订阅方可以向自己发送一个订阅信息来进行订阅。订阅消息是字节1(用于订阅)或字节0(用于取消订阅),后跟订阅主体。
ZMQ_XSUB自己可以向发布者发送一条订阅信息来进行订阅。订阅消息是字节1(用于订阅)或字节0(用于取消订阅),后跟订阅主体。

6.共享队列(DEALER与ROUTER套接字)

通过加入代理来解决请求-响应模式中多个客户端和多个服务端之间相互交流的问题,如图所示。
将代理绑定到了两个端点,一个用于客户端的前端(ZMQ_ROUTER),另一个用于服务端的后端(ZMQ_DEALER)。然后带来使用zmq_poll()来轮询这两个套接字的活动,当有消息时,代理会将消息在两个套接字之间输送.

ROUTER类型的套接字是请求/回复模式的一种升级。当ZMQ_ROUTER收到一个消息的时候,会自动在消息前面添加一帧,这一帧用来识别发送端的地址。
当发送一个消息的时候,需要先发送一帧对端的地址,然后再发送消息,如果目的地址指向的对端不存在了,这个消息就会被丢弃。对端的地址默认情况下由ZMQ来产生一个唯一标识UUID。
DEALER可以任意读写,不需要额外的地址帧,当有多个对端的时候,循环给单个对端发送消息。

7.zmq内置代理功能

把上述6.共享队列封装在zmq_proxy()方法中,示例代码如下:

import zmqdef main():#创建上下文
context = zmq.Context()#面对客户端套接字
frontend = context.socket(zmq.ROUTER)
frontend.bind("tcp://*:5559")#面对服务端套接字
backend  = context.socket(zmq.DEALER)
backend.bind("tcp://*:5560")#启动代理
zmq.proxy(frontend, backend)#清理
frontend.close()
backend.close()
context.term()if __name__ == "__main__":
main()

8.传输桥接

zmq中一个常见的桥接问题是衔接两种传输协议或网络。它用一种协议与一个套接字交流,并将其转换为另一个套接字的另一种协议,
例如:一个发布者和一组订阅者之间衔接两个网络如图,关键的部分在于,前端和后端套接字在两个不同的网络上。

三、处理错误和ETERM

错误一般分为:内部错误、外部错误
内部错误:一般是由自身程序导致的,程序绝不允许有内部错误的存在。
外部错误:一般由外部传入,当程序接收到外部错误之后需要自己做处理(处理或是丢弃)。
在接收端通过发送一个kill消息给工人清除他们,
使用pub-sub模型向工作人员发送终止消息:
a) 接收器在新端点上创建一个PUB套接字。
b) 工作人员将其输入套接字连接到此端点。
c) 当接收器检测到批处理结束时,它将终止发送到其PUB套接字。
d) 当工作人员检测到此终止消息时,它将退出。

四、处理信号中断

捕获到trl-c(SIGINT)和SIGTERM信号后,执行退出。

import signal
import time
import zmqcontext = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5558")
# SIGINT通常会引发键盘中断,就像任何其他Python调用一样
try:socket.recv()
except KeyboardInterrupt:print("接收到中断,正在停止。。。")
finally:# 清理socket.close()
context.term()

五、检测内存泄漏

官方推荐使用valgrind,直接安装使用。

六、用zmq多线程

zmq的多线程中一条数据在同一时刻只允许被一个线程持有(而传统的是:只允许被一个线程操作)如图4所示。
多线程服务(MT版本)示例:

import time
import threading
import zmqdef worker_routine(worker_url, context=None):context = context or zmq.Context.instance()# 和调度器交流的套接字socket = context.socket(zmq.REP)socket.connect(worker_url)while True:string  = socket.recv()print("Received request: [ %s ]" % (string))time.sleep(1)#将应答发回给客户端socket.send(b"World")def main():url_worker = "inproc://workers"url_client = "tcp://*:5555"# Prepare our context and socketscontext = zmq.Context.instance()# 与客户端交流的套接字clients = context.socket(zmq.ROUTER)clients.bind(url_client)# 与工人交流的套接字workers = context.socket(zmq.DEALER)workers.bind(url_worker)# 工人线程投放池for i in range(5):thread = threading.Thread(target=worker_routine, args=(url_worker,))thread.daemon = Truethread.start()zmq.proxy(clients, workers)# 清理clients.close()workers.close()context.term()if __name__ == "__main__":main()

1、服务器启动一组工作线程。每个线程都会创建一个REP套接字然后在此套接字上处理请求。
2、服务器创建一个ROUTER套接字与客户端交流并将其绑定到其外部接口(通过tcp)。
3、服务器创建一个DEALER套接字与工人交流并将其绑定到其内部接口(通过inproc)。
4、服务器启动连接两个套接字的代理。代理公平地从所有客户端提取请求并将其分发给工人。

七、线程间信令(PAIR套接字)

PAIR线程间的信令这个套接字,是仅仅存在于inproc的协议当中的,也就是说,线程间的信令是没有办法拓展到ipc 进程间,并且不实现自动重新连接等功能。
PAIR应用:协调线程,可以使用PAIR套接字进行进程之间的通信。
示例:下面创建三个PAIR套接字:
PAIR3:主线程中的PAIR套接字,等待PAIR2发来通知消息。
PAIR2:创建线程,在线程的回调函数中创建PAIR2套接字,该套接字等待PAIR1发来通知消息。
PAIR1:在线程的回调函数中创建PAIR1套接字,PAIR1会向PAIR2发送消息。
整体的流程就是:PAIR1发送消息给PAIR2,PAIR2接收到PAIR1的消息之后再发送消息给PAIR3,PAIR3接收到PAIR2的消息之后退出程序.
.

八、节点协调

线程和节点之间的区别主要有:
1.节点可以加入或退出,是动态的;而线程的数量通常是固定的,是静态的。
2.节点要求退出后重新打开还可以建立连接,而PAIR套接字不能重新连接。
在以往的发布-订阅代码中,发布者对订阅者的数量没有限制,其只负责发布消息,根本不关心订阅者的数量与身份。
然现在程序的工作方式是:
1、在发布者的代码中定义一个宏,来用指定订阅者的数量。
2、发布者启动之后,创建PUB套接字和一个REP套接字其中REP套接字用来接收订阅者给自己发来消息(请求订阅的消息)。
3、订阅者也启动程序,创建SUB套接字和一个REQ套接字,其中REQ套接字用来给发布者发送订阅请求。
4、当发布者接收到指定数量的订阅者发来请求订阅消息之后,开始使用PUB套接字发布数据;订阅者使用SUB套接字接收发布数据。

发布者的代码如下:

import zmq# 十个订阅者
SUBSCRIBERS_EXPECTED = 10
def main():#创建上下文context = zmq.Context()# 创建PUB套接字publisher = context.socket(zmq.PUB)
#设置SNDHWM,防止速度慢的订阅者丢包(ZMQ_SNDHWM选项为设置套接字上#出站消息的高水位标志)publisher.sndhwm = 1100000publisher.bind('tcp://*:5561')# 设置REP套接字syncservice = context.socket(zmq.REP)syncservice.bind('tcp://*:5562')# 从订阅服务器获取同步subscribers = 0while subscribers < SUBSCRIBERS_EXPECTED:# 从订阅服务器获取同步等待同步请求       msg = syncservice.recv()#发送同步回复syncservice.send(b'')subscribers += 1print("+1 subscriber (%i/%i)" % (subscribers, SUBSCRIBERS_EXPECTED))# 发送1000000次后结束for i in range(1000000):publisher.send(b'Rhubarb')publisher.send(b'END')if __name__ == '__main__':
main()

订阅者的代码如下:

import time
import zmqdef main():
#创建上下文context = zmq.Context()# 连接SUB套接字subscriber = context.socket(zmq.SUB)
subscriber.connect('tcp://localhost:5561')#全部接收subscriber.setsockopt(zmq.SUBSCRIBE, b'')time.sleep(1)# 与发布服务器同步syncclient = context.socket(zmq.REQ)syncclient.connect('tcp://localhost:5562')# 发送同步请求
syncclient.send(b'')# 等待同步回复
syncclient.recv()# 接收消息nbr = 0while True:msg = subscriber.recv()if msg == b'END':breaknbr += 1print ('接收%d个' % nbr)if __name__ == '__main__':
main()

九、零拷贝

何为零拷贝?
零拷贝就是一种避免 CPU 将数据从一块存储拷贝到另外一块存储的技术。
ØMQ的消息API可让你直接从应用程序缓冲区发送和接收消息,而不用复制数据。可以在某些应用程序中用它来提高性能。
注:在接收的时候没有办法做到零拷贝。

十、发布-订阅消息封包

在发布-订阅模式中,在消息中加入“键”与实际消息的帧分割开来称之为封包,如图能被更形象的理解。

发布者示例如下:

import time
import zmqdef main():# 创建上下文和PUBcontext   = zmq.Context()publisher = context.socket(zmq.PUB)publisher.bind("tcp://*:5563")while True:#写两消息,每条消息都有键和内容publisher.send_multipart([b"A", b"We don't want to see this"])publisher.send_multipart([b"B", b"We would like to see this"])time.sleep(1)#清理publisher.close()
context.term()if __name__ == "__main__":
main()

订阅者示例如下:

import zmqdef main():# 创建上下文和SUBcontext    = zmq.Context()subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://localhost:5563")
#筛选头尾A的消息subscriber.setsockopt(zmq.SUBSCRIBE, b"B")while True:# 阅读带地址的消息[address, contents] = subscriber.recv_multipart()print("[%s] %s" % (address, contents))# 清理subscriber.close()context.term()if __name__ == "__main__":main()

发送两种消息A和B,消息分为两部分,第一部分为消息的类型A或B,第二部分为消息的实际信息。订阅者根据类型(键)选择想要接受的消息。
注:封包的格式可以自由设计。

十一、高水位标记

所谓高水位标记(HWM)就是用来定义内部管道(缓冲区)容量的。
可以通过setsocketopt函数来设置HWM的值,当数据填满管道达到HWM时,不同的socket也有不同的表现,PUB 和 ROUTER 会丢弃数据其他的socket会阻塞。
如果连接没有建立,在填满本地缓存后,阻塞或丢弃数据,如果建立了连接,在填满本地和对端缓存后,阻塞或丢弃数据。

十二、信息丢失的解决方案

ZeroMQ学习笔记(2)——套接字和模式相关推荐

  1. 2021版!万字UNIX网络编程学习笔记(套接字篇)

    目录 1.一个简单的时间获取服务器的程序 2.套接字篇 2.1 套接字简介 2.2 套接字中常用的函数 2.3 基本TCP套接字编程 2.3.1.socket函数 2.3.2 connect函数 2. ...

  2. ZeroMQ学习笔记(4)——可靠的请求-应答模式

    第四章 可靠的请求-应答模式 懒惰海盗模式:来自客户端的可靠的请求-应答. 简单海盗模式:使用负载均衡的可靠的请求-应答. 偏执海盗模式:使用信号检测的可靠的请求-应答. 管家模式:面向服务的可靠排队 ...

  3. think in java 读书笔记 2 —— 套接字

    目录 think in java 读书笔记 1 --移位 think in java 读书笔记 2 -- 套接字 think in java 读书笔记 3 -- 数据报 概要 1. 套接字基本知识 2 ...

  4. ZeroMQ学习笔记(7)——使用zmq高级框架

    第七章 使用zmq高级框架 ·如何安全地从创意过渡到能工作的原型(MOPED模式) ·将的数据作为zmq消息序列化的不同方式 ·如何用代码生成二进制序列化的编解码器 ·如何使用GSL工具来建立自定义的 ...

  5. Redis运维和开发学习笔记(5) 主从复制和sentinel哨兵模式

    Redis运维和开发学习笔记(5) 主从复制和sentinel哨兵模式 主从复制 将主节点的数据改变同步给从节点 作用 备份数据 读写分离 存在的问题: 手动干预切主等操作 主节点的写能力受到单机限制 ...

  6. 【OS学习笔记】三十八 保护模式十:中断和异常的处理与抢占式多任务对应的汇编代码----微型内核汇代码

    本文是以下几篇文章对应的微型内核代码汇编代码: [OS学习笔记]三十四 保护模式十:中断和异常区别 [OS学习笔记]三十五 保护模式十:中断描述符表.中断门和陷阱门 [OS学习笔记]三十六 保护模式十 ...

  7. 【OS学习笔记】二十八 保护模式八:任务切换对应的汇编代码之内核代码

    本汇编代码对应以下两篇文章对应的内核汇编代码: OS学习笔记]二十六 保护模式八:任务门-任务切换 [OS学习笔记]二十七 保护模式八:任务切换的方法之----jmp与call的区别以及任务的中断嵌套 ...

  8. 【OS学习笔记】二十五 保护模式七:任务和特权级保护对应的汇编源代码

    本汇编代码是以下两篇文章讲解的内容的内核代码; [OS学习笔记]二十三 保护模式七:保护模式下任务的隔离与任务的特权级概念 [OS学习笔记]二十四 保护模式七:调用门与依从的代码段----特权级保护 ...

  9. 设计模式学习笔记(十七)——Command命令模式

    设计模式学习笔记(十七)--Command命令模式 Command命令模式介绍: Command命令模式是一种对象行为型模式,它主要解决的问题是:在软件构建过程中,"行为请求者"与 ...

最新文章

  1. swift支持多线程操作数据库类库-CoreDataManager
  2. sqlserver数据库迁移mysql_在项目中迁移MS SQLServer到Mysql数据库,实现MySQL数据库的快速整合...
  3. Aligning Plots in a Column作图列对齐
  4. 人类繁荣的数学:数学的哈欠
  5. spring自动装配、注解
  6. rabbitMQ 常用api翻译
  7. 全球首发!计算机视觉Polygon Mesh Processing总结7——Remeshing Local Structure
  8. 25 亿条/秒消息处理!Flink 又双叒叕被 Apache 官方提名
  9. 【visio 绘图矢量图素材网站】
  10. 京东 php,[分享] 京东接口2.0 PHP版SDK
  11. 实现一个简单的类似spring的pointcut正则表达式
  12. 数据分析流程——业务需求分析
  13. Python爬虫——selenium爬取网易云评论并做词云
  14. 微信属于计算机操作系统吗,一款国产操作系统的微信电脑版使用体验
  15. windows下Linux系统U盘启动盘制作与系统安装图文教程
  16. github+hexo搭建自己的博客【真正的从0到1】20180122为准
  17. C语言函数大全-- s 开头的函数(3)
  18. epson r330语言设置_爱普生打印机使用方法 爱普生r330打印机故障
  19. ndk开发教程,Android工程师面试该怎么准备?真香!
  20. 006.Sql条件查询

热门文章

  1. linux pps 包 网卡,ubuntu linux下安装和使用PPS的详细步骤,包括解决无声的问题
  2. 中职计算机PPT触发器使用的教案,PPT触发器使用全攻略-实例讲解PPT触发器的设置和使用方法(2)...
  3. 南开02-06经济学考研真题和我的一点考研心得
  4. 高通主动降噪无线调试
  5. 神经网络波士顿房价预测
  6. 什么是战略实施、重点、关键步骤?创建和管理以客户为导向的 ITIL 服务策略
  7. unity项目——德军总部(巡逻兵)
  8. IP-Tools网络工具使用说明书
  9. Xilinx远程更新之Flash加载时间/Flash区域划分技巧
  10. 场地测量的方法和程序_一种场地测量装置的制作方法