第四章 可靠的请求-应答模式

懒惰海盗模式:来自客户端的可靠的请求-应答。
简单海盗模式:使用负载均衡的可靠的请求-应答。
偏执海盗模式:使用信号检测的可靠的请求-应答。
管家模式:面向服务的可靠排队。
泰坦尼克模式:基于磁盘/断开连接的可靠排队。
双星模式:主备份服务器故障转移。
自由职业者模式:缺少代理的可靠的请求-应答。
一、什么是“可靠性”
1、 定义:
从故障的角度来定义可靠性,如果我们可以处理一组特定的被明确定义和理解的故障,那么我们对于这些故障是可靠的。简单来说,可靠性就是“在代码冻结或崩溃时,让事情保持正常工作”。
2、故障排序(概率大小降序)
1.应用程序代码。
2.系统代码。
3.消息队列可能会溢出。
4.网络可能会失效。
5.硬件可能会失效。
6.网络可能会因外来的途径失败。例如,一个交换机的某些端口损坏。
7.整个数据中心因雷电、地震、火灾,或常见的电力或冷却故障受损。
二、可靠性设计
1、REQ-REP(请求-应答):
当客户端接收不到服务器的答复,可以判断正在处理请求的服务器故障了。客户端可以采取在放弃、等待、稍后再试以及发现寻找另一台服务器等处理方式。基本的REQ-REP模式应对故障效果很差,处理请求时服务器崩溃,网络丢失REQ或REP,客户端就会一直挂起。
2、将客户端连接到服务器大致有三种方式:

  1. 多个客户端与一台服务器直接交流。
    故障类型:服务器崩溃重启,网络断开连接。
  2. 多个客户端与一台将工作分配给多个worker的代理服务器交流。
    故障类型:worker崩溃重启、超负荷、worker频繁循环,队列崩溃并重启,以及网络断开连接。
  3. 多个客户端与多台服务器交流,没有中间层代理。
    故障类型:服务崩溃并重新启动、服务忙于循环、服务过载, 以及网络断开连接。
    3、 提高可靠性的三种方法:
    1、 客户端可靠性(懒惰海盗模式):
    不执行一个阻塞的接收,而是:
     仅当确信应答已经到达时,才轮询REQ套接字并接收它的应答。
     如果在超时时间内没有应答到达,则重新发送一个请求。
     如果多次请求后还是没有应答,则放弃事务。

示例Lazy Pirate client

# coding=gbk
#  懒惰海盗客户端
#  使用zmq_poll执行安全的请求-应答
#  要运行它,启动lpserver,然后再随机清除/重启它
#import itertools
import logging
import sys
import zmqlogging.basicConfig(format="%(levelname)s: %(message)s", level=logging.INFO)REQUEST_TIMEOUT = 2500
# 放弃前重试次数
REQUEST_RETRIES = 3
SERVER_ENDPOINT = "tcp://localhost:5555"context = zmq.Context()logging.info("Connecting to server…")
client = context.socket(zmq.REQ)
client.connect(SERVER_ENDPOINT)for sequence in itertools.count():request = str(sequence).encode()logging.info("Sending (%s)", request)# 发送一个请求,然后努力去获取一个应答client.send(request)retries_left = REQUEST_RETRIESwhile True:# 以超时时间轮询,套接字得到一个应答if (client.poll(REQUEST_TIMEOUT) & zmq.POLLIN) != 0:reply = client.recv()if int(reply) == sequence:logging.info("Server replied OK (%s)", reply)breakelse:logging.error("Malformed reply from server: %s", reply)continueretries_left -= 1logging.warning("No response from server")# 关闭并移除旧套接字。client.setsockopt(zmq.LINGER, 0)client.close()if retries_left == 0:logging.error("Server seems to be offline, abandoning")sys.exit()logging.info("Reconnecting to server…")# 创建一个新套接字,再次发送请求client = context.socket(zmq.REQ)client.connect(SERVER_ENDPOINT)logging.info("Resending (%s)", request)client.send(request)

示例Lazy Pirate server

# coding=gbk
#  懒惰海盗服务器
#  绑定REQ套接字到 tcp://*:5555
#  类似于 hwserver,除了 :
#   - 按原样回应请求
#   - 随机地慢速运行,或退出以模拟服务器崩溃。
#
from random import randint
import itertools
import logging
import time
import zmqlogging.basicConfig(format="%(levelname)s: %(message)s", level=logging.INFO)context = zmq.Context()
server = context.socket(zmq.REP)
server.bind("tcp://*:5555")for cycles in itertools.count():request = server.recv()# 模拟各种问题,在几个周期后if cycles > 3 and randint(0, 3) == 0:logging.info("Simulating a crash")breakelif cycles > 3 and randint(0, 3) == 0:logging.info("Simulating CPU overload")time.sleep(2)logging.info("Normal request (%s)", request)time.sleep(1)  # Do some heavy workserver.send(request)

(懒惰海盗模式)优点:
•易于理解和实施。
•能轻松地与现有的客户端和服务器应用程序代码配合使用。
•ZMQ自动重试实际的重新连接,直到它正常工作为止。
(懒惰海盗模式)缺点:
•不能执行到备份或备用服务器的故障切换。
2、 基本可靠队列(简单海盗模式):
带有队列代理的懒惰海盗模式,我们可以透明地通过这个代理与多台服务器交流,我们可以更准确地称这些服务器为worker,从最简化的模式(简单海盗模式)开始。
这些海盗模式中,工人都是无状态的。如果应用程序需要共享某些状态,例如,一个共享的数据库。有队列代理,意味着worker来去自如,客户端不需要知道关于它的任何事。如果一个worker崩溃,另一个worker就接管它。

示例Simple Pirate queue

# coding=gbk
#
#  简单海盗队列
#  这与负载均衡模式是相同的,不带可靠性
#  它依赖于客户端来恢复。始终运行。
#import zmq# worker准备就绪的信号
LRU_READY = "\x01"context = zmq.Context(1)frontend = context.socket(zmq.ROUTER)  # ROUTER
backend = context.socket(zmq.ROUTER)  # ROUTER
frontend.bind("tcp://*:5555")  # 用于客户端
backend.bind("tcp://*:5556")  # 用于workerspoll_workers = zmq.Poller()
poll_workers.register(backend, zmq.POLLIN)poll_both = zmq.Poller()
poll_both.register(frontend, zmq.POLLIN)
poll_both.register(backend, zmq.POLLIN)workers = []while True:if workers:socks = dict(poll_both.poll())else:socks = dict(poll_workers.poll())# 处理后端的worker活动if socks.get(backend) == zmq.POLLIN:# 使用worker地址进行LRU路由msg = backend.recv_multipart()if not msg:breakaddress = msg[0]workers.append(address)# 第二个(分隔符)之后的所有内容都是应答reply = msg[2:]# 如果不是READY,则转发消息给客户端if reply[0] != LRU_READY:frontend.send_multipart(reply)if socks.get(frontend) == zmq.POLLIN:#  获取客户端请求,路由到第一个可用的workermsg = frontend.recv_multipart()request = [workers.pop(0), ''.encode()] + msgbackend.send_multipart(request)示例Simple Pirate worker
#  coding=gbk
#  简单海盗worker
#  连接REQ套接字到tcp://*:5556
#  实现LRU排队的worker部分from random import randint
import time
import zmq# worker准备就绪的信号
LRU_READY = "\x01"context = zmq.Context(1)
worker = context.socket(zmq.REQ)
# 设置随机的身份以便于跟踪
identity = "%04X-%04X" % (randint(0, 0x10000), randint(0, 0x10000))
worker.setsockopt_string(zmq.IDENTITY, identity)
worker.connect("tcp://localhost:5556")
# 告诉代理我们已准备好接受工作
print("I: (%s) worker ready" % identity)
worker.send_string(LRU_READY)cycles = 0
while True:msg = worker.recv_multipart()if not msg:break# 模拟各种问题,在几个周期后 cycles++cycles += 1if cycles > 0 and randint(0, 5) == 0:print("I: (%s) simulating a crash" % identity)breakelif cycles > 3 and randint(0, 5) == 0:print("I: (%s) simulating CPU overload" % identity)time.sleep(3)print("I: (%s) normal reply" % identity)time.sleep(1)  # Do some heavy workworker.send_multipart(msg)

这种模式适用于任何数量的客户端和工人。不过,它也有一些缺点:
• 它在面临队列崩溃和重启时不够健壮。客户端会恢复,但worker不会。新启动的队列连接之前,worker不会发出准备就绪的信号。要解决这个问题,需在队列与worker间做信号检测,让工人可以检测到队列已经死机。
• 队列不检测worker的故障,如果一个worker在空闲期间崩溃,除非队列给它发送一个请求,否则不能从工作队列中将其删除。客户端会不停的等待和重试。需在worker与队列之间做信号检测,让队列可以在任何阶段都检测到丢失的worker。
3、 健壮可靠队列(偏执海盗模式):
对于偏执海盗worker,切换到一个DEALER套接字。这样我们可以在任何时候发送和接收消息,不需遵循REQ强调的步调一致地发送/接收。

示例Paranoid Pirate queue

#  coding=gbk
#  偏执海盗队列
#
from collections import OrderedDict
import time
import zmqHEARTBEAT_LIVENESS = 3  # 3-5是合理的
HEARTBEAT_INTERVAL = 1.0  # 秒钟#  偏执海盗协议常量
PPP_READY = b"\x01"  # worker准备就绪的信号
PPP_HEARTBEAT = b"\x02"  # worker信号检测的信号# worker类
class Worker(object):def __init__(self, address):self.address = addressself.expiry = time.time() + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESSclass WorkerQueue(object):def __init__(self):self.queue = OrderedDict()def ready(self, worker):self.queue.pop(worker.address, None)self.queue[worker.address] = workerdef purge(self):"""寻找并杀死过期的workers."""t = time.time()expired = []for address, worker in self.queue.items():if t > worker.expiry:  # 过期的 Workerexpired.append(address)for address in expired:print("W: Idle worker expired: %s" % address)self.queue.pop(address, None)def next(self):address, worker = self.queue.popitem(False)return addresscontext = zmq.Context(1)frontend = context.socket(zmq.ROUTER)  # ROUTER
backend = context.socket(zmq.ROUTER)  # ROUTER
frontend.bind("tcp://*:5555")  # 用于客户端
backend.bind("tcp://*:5556")  # 用于workerspoll_workers = zmq.Poller()
poll_workers.register(backend, zmq.POLLIN)poll_both = zmq.Poller()
poll_both.register(frontend, zmq.POLLIN)
poll_both.register(backend, zmq.POLLIN)
# 可用的工人的列表
workers = WorkerQueue()
# 以常规时间间隔发出检测信号
heartbeat_at = time.time() + HEARTBEAT_INTERVALwhile True:if len(workers.queue) > 0:poller = poll_bothelse:poller = poll_workerssocks = dict(poller.poll(HEARTBEAT_INTERVAL * 1000))# 在后台处理worker活动if socks.get(backend) == zmq.POLLIN:# 使用worker地址进行负载均衡frames = backend.recv_multipart()if not frames:break# 来自工人的任何信号表示它已准备就绪address = frames[0]workers.ready(Worker(address))# 验证控制消息,或给客户端返回应答msg = frames[1:]if len(msg) == 1:if msg[0] not in (PPP_READY, PPP_HEARTBEAT):print("E: Invalid message from worker: %s" % msg)else:frontend.send_multipart(msg)# 如果时间到了,给空闲的worker发送心跳信号if time.time() >= heartbeat_at:for worker in workers.queue:msg = [worker, PPP_HEARTBEAT]backend.send_multipart(msg)heartbeat_at = time.time() + HEARTBEAT_INTERVALif socks.get(frontend) == zmq.POLLIN:frames = frontend.recv_multipart()if not frames:breakframes.insert(0, workers.next())backend.send_multipart(frames)workers.purge()

示例Paranoid Pirate worker

#  coding=gbk
#
#  偏执海盗工人
#
from random import randint
import timeimport zmqHEARTBEAT_LIVENESS = 3
HEARTBEAT_INTERVAL = 1
INTERVAL_INIT = 1
INTERVAL_MAX = 32#  偏执海盗协议常量
PPP_READY = b"\x01"  # 工人准备就绪的信号
PPP_HEARTBEAT = b"\x02"  # 工人信号检测的信号def worker_socket(context, poller):"""返回一个新的配置套接字的Helper函数与偏执的海盗队列相连"""worker = context.socket(zmq.DEALER)  # DEALERidentity = b"%04X-%04X" % (randint(0, 0x10000), randint(0, 0x10000))worker.setsockopt(zmq.IDENTITY, identity)poller.register(worker, zmq.POLLIN)worker.connect("tcp://localhost:5556")worker.send(PPP_READY)return workercontext = zmq.Context(1)
poller = zmq.Poller()liveness = HEARTBEAT_LIVENESS
interval = INTERVAL_INITheartbeat_at = time.time() + HEARTBEAT_INTERVALworker = worker_socket(context, poller)
cycles = 0
while True:socks = dict(poller.poll(HEARTBEAT_INTERVAL * 1000))# 在后端处理worker活动if socks.get(worker) == zmq.POLLIN:#  Get message#  - 3-part envelope + content -> request#  - 1-part HEARTBEAT -> heartbeatframes = worker.recv_multipart()if not frames:break  # Interruptedif len(frames) == 3:# 在几个周期后模拟各种问题cycles += 1if cycles > 3 and randint(0, 5) == 0:print("I: Simulating a crash")breakif cycles > 3 and randint(0, 5) == 0:print("I: Simulating CPU overload")time.sleep(3)print("I: Normal reply")worker.send_multipart(frames)liveness = HEARTBEAT_LIVENESStime.sleep(1)  # Do some heavy workelif len(frames) == 1 and frames[0] == PPP_HEARTBEAT:print("I: Queue heartbeat")liveness = HEARTBEAT_LIVENESSelse:print("E: Invalid message: %s" % frames)interval = INTERVAL_INITelse:liveness -= 1if liveness == 0:print("W: Heartbeat failure, can't reach queue")print("W: Reconnecting in %0.2fs..." % interval)time.sleep(interval)if interval < INTERVAL_MAX:interval *= 2poller.unregister(worker)worker.setsockopt(zmq.LINGER, 0)worker.close()worker = worker_socket(context, poller)liveness = HEARTBEAT_LIVENESSif time.time() > heartbeat_at:heartbeat_at = time.time() + HEARTBEAT_INTERVALprint("I: Worker heartbeat")worker.send(PPP_HEARTBEAT)

三、信号检测(Heartbeating)
信号检测用来确定节点是活着还是死掉。ZMQ来检测信号的三个方案
1、不做任何操作
最常见的方法是抱乐观态度不做信号检测。
2、 单向信号检测
所有节点每隔一秒发送检测信号消息给其对等节点。当某个节点在超过几秒钟内没有从另一个节点收到任何回复,就把该对等节点当作已崩溃。
3、 乒乓信号检测
使用一个乒乓对话框。一个节点发送一个ping命令到另一个节点,后者回复一个pong命令。
例子:针对偏执海盗的信号检测(方法2)
worker中处理来自队列的检测信号步骤:

  1. 设置一个活跃度(liveness)。这是我们让队列崩溃前,允许错过的检测信号数量。初始值是3,每次错过检测信号时,递减。
  2. 在zmq_poll()循环中,每次等候1秒(信号检测间隔)。
  3. 如果在这段时间里有来自队列的消息,将活跃度重置为3。
  4. 如果在这段时间里没有来自队列的消息,减少活跃度。
  5. 如果活跃度达到零,我们就认为队列崩溃了。
  6. 如果队列已经崩溃了,就销毁套接字,创建一个新的套接字,并重新连接。
  7. 为避免打开和关闭太多套接字,在重新连接之前,需等待一定的时间,并且每
    次对时间间隔加倍,直至达到32秒。
    处理队列的信号检测:
  8. 在发送下一个检测信号的时候执行计算,这是一个变量,因为我们正在与队列交流。
  9. 在zmq_poll()循环中,每当经过这段时间,我们就发送一个检测信号到队列。
    示例代码:worker的基本信号检测
#define HEARTBEAT_LIVENESS  3       //  3-5是合理的
#define HEARTBEAT_INTERVAL  1000    //  毫秒
#define INTERVAL_INIT       1000    //  重新连接时间间隔的初始值
#define INTERVAL_MAX       32000    //  在指数级退避后的时间间隔...
//  如果活跃度触及零,就认为队列已经断开连接
size_t liveness = HEARTBEAT_LIVENESS;
size_t interval = INTERVAL_INIT;//  以常规时间间隔发出检测信号
uint64_t heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;while (true) {zmq_pollitem_t items [] = { { worker,  0, ZMQ_POLLIN, 0 } };int rc = zmq_poll (items, 1, HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);if (items [0].revents & ZMQ_POLLIN) {//  从队列接收到任何消息liveness = HEARTBEAT_LIVENESS;interval = INTERVAL_INIT;}elseif (--liveness == 0) {zclock_sleep (interval);if (interval < INTERVAL_MAX)interval *= 2;zsocket_destroy (ctx, worker);...liveness = HEARTBEAT_LIVENESS;}//  如果到时间了就发送检测信号给队列if (zclock_time () > heartbeat_at) {heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;//  发送检测信号消息给队列}
}

四、合同和协议(Contracts and Protocols)
定义:为了保证互操作性,我们需要一种合同,一份协议,让在不同时间和地点的不同团队编写能保证协同工作的代码,我们称之为“协议”。

1、 面向服务的可靠队列(管家模式)
管家协议(MDP)用一个有趣的方式延伸并改善了海盗模式协议(PPP):它在客户端发送的请求上增加了一个“服务名称”,并要求worker注册特定的服务。添加服务名称使得偏执海盗队列变成了面向服务的代理。管家协议分为两部分,客户端一方和worker一方。
客户端API:
mdcli_t *mdcli_new (char *broker);
void mdcli_destroy (mdcli_t **self_p);
zmsg_t *mdcli_send (mdcli_t *self, char *service, zmsg_t **request_p);
worker API:
mdwrk_t *mdwrk_new (char *broker,char *service);
void mdwrk_destroy (mdwrk_t **self_p);
zmsg_t *mdwrk_recv (mdwrk_t *self, zmsg_t *reply);
示例Majordomo Protocol Client API

coding=gbk

“”"管家协议客户端API, Python version.

Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.

“”"

import logging

import zmq

import MDP
from zhelpers import dump

class MajorDomoClient(object):
“”"Majordomo Protocol Client API, Python version.

  Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.
"""
broker = None
ctx = None
client = None
poller = None
timeout = 2500
retries = 3
verbose = Falsedef __init__(self, broker, verbose=False):self.broker = brokerself.verbose = verboseself.ctx = zmq.Context()self.poller = zmq.Poller()logging.basicConfig(format="%(asctime)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S",level=logging.INFO)self.reconnect_to_broker()def reconnect_to_broker(self):"""连接或重新连接到代理"""if self.client:self.poller.unregister(self.client)self.client.close()self.client = self.ctx.socket(zmq.REQ)self.client.linger = 0self.client.connect(self.broker)self.poller.register(self.client, zmq.POLLIN)if self.verbose:logging.info("I: connecting to broker at %s...", self.broker)def send(self, service, request):"""向代理发送请求并得到回复。获得请求消息的所有权,并在发送时销毁它。返回应答消息,如果没有应答,则返回None。"""if not isinstance(request, list):request = [request]request = [MDP.C_CLIENT, service] + requestif self.verbose:logging.warn("I: send request to '%s' service: ", service)dump(request)reply = Noneretries = self.retrieswhile retries > 0:self.client.send_multipart(request)try:items = self.poller.poll(self.timeout)except KeyboardInterrupt:break # interruptedif items:msg = self.client.recv_multipart()if self.verbose:logging.info("I: received reply:")dump(msg)# 不要尝试处理错误assert len(msg) >= 3header = msg.pop(0)assert MDP.C_CLIENT == headerreply_service = msg.pop(0)assert service == reply_servicereply = msgbreakelse:if retries:logging.warn("W: no reply, reconnecting...")self.reconnect_to_broker()else:logging.warn("W: permanent error, abandoning")breakretries -= 1return replydef destroy(self):self.context.destroy()

示例Majordomo Protocol client example

"""
管家协议客户端示例. Uses the mdcli API to hide all MDP aspectsAuthor : Min RK <benjaminrk@gmail.com>"""import sys
from mdcliapi import MajorDomoClientdef main():verbose = '-v' in sys.argvclient = MajorDomoClient("tcp://localhost:5555", verbose)count = 0while count < 100000:request = b"Hello world"try:reply = client.send(b"echo", request)except KeyboardInterrupt:breakelse:# 如果没有回复,也可以打断:if reply is None:breakcount += 1print ("%i requests/replies processed" % count)if __name__ == '__main__':main()示例Majordomo Protocol Worker API
"""管家协议 Worker API, Python versionImplements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.Author: Min RK <benjaminrk@gmail.com>
Based on Java example by Arkadiusz Orzechowski
"""import logging
import time
import zmqfrom zhelpers import dump
# 管家协议常量:
import MDPclass MajorDomoWorker(object):"""Majordomo Protocol Worker API, Python versionImplements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7."""HEARTBEAT_LIVENESS = 3 # 3-5 is reasonablebroker = Nonectx = Noneservice = Noneworker = None # Socket 代理heartbeat_at = 0 # 何时发送心跳信号 (relative to time.time(), so in seconds)liveness = 0 # 还有多少次尝试heartbeat = 2500 # 心跳延迟, 毫秒reconnect = 2500 # 重连延迟, 毫秒# 内部状态expect_reply = False # 仅开始时 Falsetimeout = 2500 # 轮询超时verbose = False # 将活动打印到标准输出# 返回地址(如果有的话)reply_to = Nonedef __init__(self, broker, service, verbose=False):self.broker = brokerself.service = serviceself.verbose = verboseself.ctx = zmq.Context()self.poller = zmq.Poller()logging.basicConfig(format="%(asctime)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S",level=logging.INFO)self.reconnect_to_broker()def reconnect_to_broker(self):"""Connect or reconnect to broker"""if self.worker:self.poller.unregister(self.worker)self.worker.close()self.worker = self.ctx.socket(zmq.DEALER)self.worker.linger = 0self.worker.connect(self.broker)self.poller.register(self.worker, zmq.POLLIN)if self.verbose:logging.info("I: connecting to broker at %s...", self.broker)# Register service with brokerself.send_to_broker(MDP.W_READY, self.service, [])# If liveness hits zero, queue is considered disconnectedself.liveness = self.HEARTBEAT_LIVENESSself.heartbeat_at = time.time() + 1e-3 * self.heartbeatdef send_to_broker(self, command, option=None, msg=None):"""Send message to broker.If no msg is provided, creates one internally"""if msg is None:msg = []elif not isinstance(msg, list):msg = [msg]if option:msg = [option] + msgmsg = [b'', MDP.W_WORKER, command] + msgif self.verbose:logging.info("I: sending %s to broker", command)dump(msg)self.worker.send_multipart(msg)def recv(self, reply=None):"""Send reply, if any, to broker and wait for next request."""# Format and send the reply if we were provided oneassert reply is not None or not self.expect_replyif reply is not None:assert self.reply_to is not Nonereply = [self.reply_to, b''] + replyself.send_to_broker(MDP.W_REPLY, msg=reply)self.expect_reply = Truewhile True:# Poll socket for a reply, with timeouttry:items = self.poller.poll(self.timeout)except KeyboardInterrupt:break # Interruptedif items:msg = self.worker.recv_multipart()if self.verbose:logging.info("I: received message from broker: ")dump(msg)self.liveness = self.HEARTBEAT_LIVENESS# Don't try to handle errors, just assert noisilyassert len(msg) >= 3empty = msg.pop(0)assert empty == b''header = msg.pop(0)assert header == MDP.W_WORKERcommand = msg.pop(0)if command == MDP.W_REQUEST:# We should pop and save as many addresses as there are# up to a null part, but for now, just save one...self.reply_to = msg.pop(0)# pop emptyempty = msg.pop(0)assert empty == b''return msg # We have a request to processelif command == MDP.W_HEARTBEAT:# Do nothing for heartbeatspasselif command == MDP.W_DISCONNECT:self.reconnect_to_broker()else :logging.error("E: invalid input message: ")dump(msg)else:self.liveness -= 1if self.liveness == 0:if self.verbose:logging.warn("W: disconnected from broker - retrying...")try:time.sleep(1e-3*self.reconnect)except KeyboardInterrupt:breakself.reconnect_to_broker()# Send HEARTBEAT if it's timeif time.time() > self.heartbeat_at:self.send_to_broker(MDP.W_HEARTBEAT)self.heartbeat_at = time.time() + 1e-3*self.heartbeatlogging.warn("W: interrupt received, killing worker...")return Nonedef destroy(self):# context.destroy depends on pyzmq >= 2.1.10self.ctx.destroy(0)

示例Majordomo Protocol worker example

"""Majordomo Protocol worker example.Uses the mdwrk API to hide all MDP aspectsAuthor: Min RK <benjaminrk@gmail.com>
"""import sys
from mdwrkapi import MajorDomoWorkerdef main():verbose = '-v' in sys.argvworker = MajorDomoWorker("tcp://localhost:5555", b"echo", verbose)reply = Nonewhile True:request = worker.recv(reply)if request is None:break # Worker was interruptedreply = request # Echo is complex... :-)if __name__ == '__main__':main()

示例Majordomo Protocol broker

#  coding=gbk
"""
Majordomo Protocol broker
A minimal implementation of http:#rfc.zeromq.org/spec:7 and spec:8Author: Min RK <benjaminrk@gmail.com>
Based on Java example by Arkadiusz Orzechowski
"""import logging
import sys
import time
from binascii import hexlifyimport zmq# local
import MDP
from zhelpers import dumpclass Service(object):"""a single Service"""name = None # Service namerequests = None # List of client requestswaiting = None # List of waiting workersdef __init__(self, name):self.name = nameself.requests = []self.waiting = []class Worker(object):"""a Worker, idle or active"""identity = None # hex Identity of workeraddress = None # Address to route toservice = None # Owning service, if knownexpiry = None # expires at this point, unless heartbeatdef __init__(self, identity, address, lifetime):self.identity = identityself.address = addressself.expiry = time.time() + 1e-3*lifetimeclass MajorDomoBroker(object):"""Majordomo Protocol brokerA minimal implementation of http:#rfc.zeromq.org/spec:7 and spec:8"""# We'd normally pull these from config dataINTERNAL_SERVICE_PREFIX = b"mmi."HEARTBEAT_LIVENESS = 3 # 3-5 is reasonableHEARTBEAT_INTERVAL = 2500 # msecsHEARTBEAT_EXPIRY = HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS# ---------------------------------------------------------------------ctx = None # Our contextsocket = None # Socket for clients & workerspoller = None # our Pollerheartbeat_at = None# When to send HEARTBEATservices = None # known servicesworkers = None # known workerswaiting = None # idle workersverbose = False # Print activity to stdout# ---------------------------------------------------------------------def __init__(self, verbose=False):"""Initialize broker state."""self.verbose = verboseself.services = {}self.workers = {}self.waiting = []self.heartbeat_at = time.time() + 1e-3*self.HEARTBEAT_INTERVALself.ctx = zmq.Context()self.socket = self.ctx.socket(zmq.ROUTER)self.socket.linger = 0self.poller = zmq.Poller()self.poller.register(self.socket, zmq.POLLIN)logging.basicConfig(format="%(asctime)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S",level=logging.INFO)# ---------------------------------------------------------------------def mediate(self):"""Main broker work happens here"""while True:try:items = self.poller.poll(self.HEARTBEAT_INTERVAL)except KeyboardInterrupt:break # Interruptedif items:msg = self.socket.recv_multipart()if self.verbose:logging.info("I: received message:")dump(msg)sender = msg.pop(0)empty = msg.pop(0)assert empty == b''header = msg.pop(0)if (MDP.C_CLIENT == header):self.process_client(sender, msg)elif (MDP.W_WORKER == header):self.process_worker(sender, msg)else:logging.error("E: invalid message:")dump(msg)self.purge_workers()self.send_heartbeats()def destroy(self):"""Disconnect all workers, destroy context."""while self.workers:self.delete_worker(self.workers.values()[0], True)self.ctx.destroy(0)def process_client(self, sender, msg):"""Process a request coming from a client."""assert len(msg) >= 2 # Service name + bodyservice = msg.pop(0)# Set reply return address to client sendermsg = [sender, b''] + msgif service.startswith(self.INTERNAL_SERVICE_PREFIX):self.service_internal(service, msg)else:self.dispatch(self.require_service(service), msg)def process_worker(self, sender, msg):"""Process message sent to us by a worker."""assert len(msg) >= 1 # At least, commandcommand = msg.pop(0)worker_ready = hexlify(sender) in self.workersworker = self.require_worker(sender)if (MDP.W_READY == command):assert len(msg) >= 1 # At least, a service nameservice = msg.pop(0)# Not first command in session or Reserved service nameif (worker_ready or service.startswith(self.INTERNAL_SERVICE_PREFIX)):self.delete_worker(worker, True)else:# Attach worker to service and mark as idleworker.service = self.require_service(service)self.worker_waiting(worker)elif (MDP.W_REPLY == command):if (worker_ready):# Remove & save client return envelope and insert the# protocol header and service name, then rewrap envelope.client = msg.pop(0)empty = msg.pop(0) # ?msg = [client, b'', MDP.C_CLIENT, worker.service.name] + msgself.socket.send_multipart(msg)self.worker_waiting(worker)else:self.delete_worker(worker, True)elif (MDP.W_HEARTBEAT == command):if (worker_ready):worker.expiry = time.time() + 1e-3*self.HEARTBEAT_EXPIRYelse:self.delete_worker(worker, True)elif (MDP.W_DISCONNECT == command):self.delete_worker(worker, False)else:logging.error("E: invalid message:")dump(msg)def delete_worker(self, worker, disconnect):"""Deletes worker from all data structures, and deletes worker."""assert worker is not Noneif disconnect:self.send_to_worker(worker, MDP.W_DISCONNECT, None, None)if worker.service is not None:worker.service.waiting.remove(worker)self.workers.pop(worker.identity)def require_worker(self, address):"""Finds the worker (creates if necessary)."""assert (address is not None)identity = hexlify(address)worker = self.workers.get(identity)if (worker is None):worker = Worker(identity, address, self.HEARTBEAT_EXPIRY)self.workers[identity] = workerif self.verbose:logging.info("I: registering new worker: %s", identity)return workerdef require_service(self, name):"""Locates the service (creates if necessary)."""assert (name is not None)service = self.services.get(name)if (service is None):service = Service(name)self.services[name] = servicereturn servicedef bind(self, endpoint):"""Bind broker to endpoint, can call this multiple times.We use a single socket for both clients and workers."""self.socket.bind(endpoint)logging.info("I: MDP broker/0.1.1 is active at %s", endpoint)def service_internal(self, service, msg):"""Handle internal service according to 8/MMI specification"""returncode = b"501"if b"mmi.service" == service:name = msg[-1]returncode = b"200" if name in self.services else b"404"msg[-1] = returncode# insert the protocol header and service name after the routing envelope ([client, ''])msg = msg[:2] + [MDP.C_CLIENT, service] + msg[2:]self.socket.send_multipart(msg)def send_heartbeats(self):"""Send heartbeats to idle workers if it's time"""if (time.time() > self.heartbeat_at):for worker in self.waiting:self.send_to_worker(worker, MDP.W_HEARTBEAT, None, None)self.heartbeat_at = time.time() + 1e-3*self.HEARTBEAT_INTERVALdef purge_workers(self):"""Look for & kill expired workers.Workers are oldest to most recent, so we stop at the first alive worker."""while self.waiting:w = self.waiting[0]if w.expiry < time.time():logging.info("I: deleting expired worker: %s", w.identity)self.delete_worker(w,False)self.waiting.pop(0)else:breakdef worker_waiting(self, worker):"""This worker is now waiting for work."""# Queue to broker and service waiting listsself.waiting.append(worker)worker.service.waiting.append(worker)worker.expiry = time.time() + 1e-3*self.HEARTBEAT_EXPIRYself.dispatch(worker.service, None)def dispatch(self, service, msg):"""Dispatch requests to waiting workers as possible"""assert (service is not None)if msg is not None:# Queue message if anyservice.requests.append(msg)self.purge_workers()while service.waiting and service.requests:msg = service.requests.pop(0)worker = service.waiting.pop(0)self.waiting.remove(worker)self.send_to_worker(worker, MDP.W_REQUEST, None, msg)def send_to_worker(self, worker, command, option, msg=None):"""Send message to worker.If message is provided, sends that message."""if msg is None:msg = []elif not isinstance(msg, list):msg = [msg]# Stack routing and protocol envelopes to start of message# and routing envelopeif option is not None:msg = [option] + msgmsg = [worker.address, b'', MDP.W_WORKER, command] + msgif self.verbose:logging.info("I: sending %r to worker", command)dump(msg)self.socket.send_multipart(msg)def main():"""create and start new broker"""verbose = '-v' in sys.argvbroker = MajorDomoBroker(verbose)broker.bind("tcp://*:5555")broker.mediate()if __name__ == '__main__':main()

2、 异步管家模式
示例1Round-trip demonstrator

"""Round-trip demonstratorWhile this example runs in a single process, that is just to make
it easier to start and stop the example. Client thread signals to
main when it's ready.
"""import sys
import threading
import timeimport zmqfrom zhelpers import zpipedef client_task (ctx, pipe):client = ctx.socket(zmq.DEALER)client.identity = b'C'client.connect("tcp://localhost:5555")print ("Setting up test...")time.sleep(0.1)print ("Synchronous round-trip test...")start = time.time()requests = 10000for r in range(requests):client.send(b"hello")client.recv()print (" %d calls/second" % (requests / (time.time()-start)))print ("Asynchronous round-trip test...")start = time.time()for r in range(requests):client.send(b"hello")for r in range(requests):client.recv()print (" %d calls/second" % (requests / (time.time()-start)))# signal done:pipe.send(b"done")def worker_task():ctx = zmq.Context()worker = ctx.socket(zmq.DEALER)worker.identity = b'W'worker.connect("tcp://localhost:5556")while True:msg = worker.recv_multipart()worker.send_multipart(msg)ctx.destroy(0)def broker_task():# Prepare our context and socketsctx = zmq.Context()frontend = ctx.socket(zmq.ROUTER)backend = ctx.socket(zmq.ROUTER)frontend.bind("tcp://*:5555")backend.bind("tcp://*:5556")# Initialize poll setpoller = zmq.Poller()poller.register(backend, zmq.POLLIN)poller.register(frontend, zmq.POLLIN)while True:try:items = dict(poller.poll())except:break # Interruptedif frontend in items:msg = frontend.recv_multipart()msg[0] = b'W'backend.send_multipart(msg)if backend in items:msg = backend.recv_multipart()msg[0] = b'C'frontend.send_multipart(msg)def main():# Create threadsctx = zmq.Context()client,pipe = zpipe(ctx)client_thread = threading.Thread(target=client_task, args=(ctx, pipe))worker_thread = threading.Thread(target=worker_task)worker_thread.daemon=Truebroker_thread = threading.Thread(target=broker_task)broker_thread.daemon=Trueworker_thread.start()broker_thread.start()client_thread.start()# Wait for signal on client pipeclient.recv()if __name__ == '__main__':main()

示例2Majordomo Protocol Client API

"""Majordomo Protocol Client API, Python version.Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.Author: Min RK <benjaminrk@gmail.com>
Based on Java example by Arkadiusz Orzechowski
"""import loggingimport zmqimport MDP
from zhelpers import dumpclass MajorDomoClient(object):"""Majordomo Protocol Client API, Python version.Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7."""broker = Nonectx = Noneclient = Nonepoller = Nonetimeout = 2500verbose = Falsedef __init__(self, broker, verbose=False):self.broker = brokerself.verbose = verboseself.ctx = zmq.Context()self.poller = zmq.Poller()logging.basicConfig(format="%(asctime)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S",level=logging.INFO)self.reconnect_to_broker()def reconnect_to_broker(self):"""Connect or reconnect to broker"""if self.client:self.poller.unregister(self.client)self.client.close()self.client = self.ctx.socket(zmq.DEALER)self.client.linger = 0self.client.connect(self.broker)self.poller.register(self.client, zmq.POLLIN)if self.verbose:logging.info("I: connecting to broker at %s...", self.broker)def send(self, service, request):"""Send request to broker"""if not isinstance(request, list):request = [request]# Prefix request with protocol frames# Frame 0: empty (REQ emulation)# Frame 1: "MDPCxy" (six bytes, MDP/Client x.y)# Frame 2: Service name (printable string)request = [b'', MDP.C_CLIENT, service] + requestif self.verbose:logging.warn("I: send request to '%s' service: ", service)dump(request)self.client.send_multipart(request)def recv(self):"""Returns the reply message or None if there was no reply."""try:items = self.poller.poll(self.timeout)except KeyboardInterrupt:return # interruptedif items:# if we got a reply, process itmsg = self.client.recv_multipart()if self.verbose:logging.info("I: received reply:")dump(msg)# Don't try to handle errors, just assert noisilyassert len(msg) >= 4empty = msg.pop(0)header = msg.pop(0)assert MDP.C_CLIENT == headerservice = msg.pop(0)return msgelse:logging.warn("W: permanent error, abandoning request")

示例3Majordomo Protocol client example

"""
Majordomo Protocol client example. Uses the mdcli API to hide all MDP aspectsAuthor : Min RK <benjaminrk@gmail.com>"""import sys
from mdcliapi2 import MajorDomoClientdef main():verbose = '-v' in sys.argvclient = MajorDomoClient("tcp://localhost:5555", verbose)requests = 100000for i in range(requests):request = b"Hello world"try:client.send(b"echo", request)except KeyboardInterrupt:print ("send interrupted, aborting")returncount = 0while count < requests:try:reply = client.recv()except KeyboardInterrupt:breakelse:# also break on failure to reply:if reply is None:breakcount += 1print ("%i requests/replies processed" % count)if __name__ == '__main__':main()

用以上的测试程序来衡量往返的实际成本。在这个测试中,我们发出了一堆消息,首先逐个等待每个消息的应答,其次作为批处理,将所有应答作为一个批次来读取。虽然这两种方法都做同样的工作,但它们给出了非常不同的结果。
示例1:在最简单的情况下,往返比异步要慢20倍
示例3:通过10W请求-应答循环,使用了一个worker的异步客户端比使用同步客户端快两倍。
3、 服务发现
•当客户端请求了以mmi.开头的服务时,我们不将这个请求路由到一个worker,而是在 内部处理它。
•我们在代理中只处理一个服务,这个服务是mmi.service(服务发现服务)。
•请求的有效载荷是一个外部服务的名称(一个实际的名称,由worker提供)。
•该代理将返回"200”(OK)或“404”(未找到),这取决于是否有worker注册那个服务。
示例MMI echo query example

"""
MMI echo query exampleAuthor : Min RK <benjaminrk@gmail.com>"""import sys
from mdcliapi import MajorDomoClientdef main():verbose = '-v' in sys.argvclient = MajorDomoClient("tcp://localhost:5555", verbose)request = b"echo"reply = client.send(b"mmi.service", request)if reply:replycode = reply[0]print ("Lookup echo service:", replycode)else:print ("E: no response from broker, make sure it's running")if __name__ == '__main__':main()

4、 幂等服务(Idempotent Services)
幂等含义:重复某个操作是安全的。例如,检查时钟是幂等的,将信用卡借给小孩子则不是幂等的。
当服务器应用程序不是幂等的时候,我们必须更仔细地考虑,它们何时可能会崩溃。为了处理非幂等操作,我们使用检测和拒绝重复请求的解决方案。

  1. 客户端必须为每个请求加盖一个独特的客户端标识符和一个唯一的消息编号。
  2. 服务器在发回一个应答之前,使用客户端1D和消息编号的组合作为键来存储它。
  3. 服务器从给定的客户端获取请求时,首先检查它是否拥有该客户端ID和消息编号的 应答。如果有这样的应答,它就不处理该请求,而只是重新发送应答。

5、 断开连接的可靠性(泰坦尼克模式)
在此模式中,无论客户端和工人连接是多么零散,将消息写入磁盘,确保它们不会丢失。在MDP之上放一层Titanic,而不是扩展MDP。
它有下面几个优点:

  1. 代理处理消息路由,而worker处理可靠性,分而治之。
  2. 它允许我们将用一种语言编写的代理与用另一种语言编写的worker混用。
  3. 它允许我们独立地开发一劳永逸的(fire-and-forget)技术。

Titanic既是worker又是client
示例Titanic client example

"""
Titanic client example
Implements client side of http:rfc.zeromq.org/spec:9Author : Min RK <benjaminrk@gmail.com>
"""import sys
import timefrom mdcliapi import MajorDomoClientdef service_call (session, service, request):"""Calls a TSP serviceReturns reponse if successful (status code 200 OK), else None"""reply = session.send(service, request)if reply:status = reply.pop(0)if status == b"200":return replyelif status == b"400":print ("E: client fatal error 400, aborting")sys.exit (1)elif status == b"500":print ("E: server fatal error 500, aborting")sys.exit (1)else:sys.exit (0);    #  Interrupted or faileddef main():verbose = '-v' in sys.argvsession = MajorDomoClient("tcp://localhost:5555", verbose)#  1. Send 'echo' request to Titanicrequest = [b"echo", b"Hello world"]reply = service_call(session, b"titanic.request", request)uuid = Noneif reply:uuid = reply.pop(0)print ("I: request UUID ", uuid)#  2. Wait until we get a replywhile True:time.sleep (.1)request = [uuid]reply = service_call (session, b"titanic.reply", request)if reply:reply_string = reply[-1]print ("I: reply:", reply_string)#  3. Close requestrequest = [uuid]reply = service_call (session, b"titanic.close", request)breakelse:print ("I: no reply yet, trying again...")time.sleep(5)     #  Try again in 5 secondsreturn 0if __name__ == '__main__':main()示例Titanic service
"""
Titanic serviceImplements server side of http:#rfc.zeromq.org/spec:9Author: Min RK <benjaminrk@gmail.com>
"""import pickle
import os
import sys
import threading
import time
from uuid import uuid4import zmqfrom mdwrkapi import MajorDomoWorker
from mdcliapi import MajorDomoClientfrom zhelpers import zpipeTITANIC_DIR = ".titanic"def request_filename (suuid):"""Returns freshly allocated request filename for given UUID str"""return os.path.join(TITANIC_DIR, "%s.req" % suuid)#def reply_filename (suuid):"""Returns freshly allocated reply filename for given UUID str"""return os.path.join(TITANIC_DIR, "%s.rep" % suuid)# ---------------------------------------------------------------------
# Titanic request servicedef titanic_request (pipe):worker = MajorDomoWorker("tcp://localhost:5555", b"titanic.request")reply = Nonewhile True:# Send reply if it's not null# And then get next request from brokerrequest = worker.recv(reply)if not request:break      # Interrupted, exit# Ensure message directory existsif not os.path.exists(TITANIC_DIR):os.mkdir(TITANIC_DIR)# Generate UUID and save message to disksuuid = uuid4().hexfilename = request_filename (suuid)with open(filename, 'wb') as f:pickle.dump(request, f)# Send UUID through to message queuepipe.send_string(suuid)# Now send UUID back to client# Done by the worker.recv() at the top of the loopreply = [b"200", suuid.encode('utf-8')]# ---------------------------------------------------------------------
# Titanic reply servicedef titanic_reply ():worker = MajorDomoWorker("tcp://localhost:5555", b"titanic.reply")reply = Nonewhile True:request = worker.recv(reply)if not request:break      # Interrupted, exitsuuid = request.pop(0).decode('utf-8')req_filename = request_filename(suuid)rep_filename = reply_filename(suuid)if os.path.exists(rep_filename):with open(rep_filename, 'rb') as f:reply = pickle.load(f)reply = [b"200"] + replyelse:if os.path.exists(req_filename):reply = [b"300"] # pendingelse:reply = [b"400"] # unknown# ---------------------------------------------------------------------
# Titanic close servicedef titanic_close():worker = MajorDomoWorker("tcp://localhost:5555", b"titanic.close")reply = Nonewhile True:request = worker.recv(reply)if not request:break      # Interrupted, exitsuuid = request.pop(0).decode('utf-8')req_filename = request_filename(suuid)rep_filename = reply_filename(suuid)# should these be protected?  Does zfile_delete ignore files# that have already been removed?  That's what we are doing here.if os.path.exists(req_filename):os.remove(req_filename)if os.path.exists(rep_filename):os.remove(rep_filename)reply = [b"200"]def service_success(client, suuid):"""Attempt to process a single request, return True if successful"""# Load request message, service will be first framefilename = request_filename (suuid)# If the client already closed request, treat as successfulif not os.path.exists(filename):return Truewith open(filename, 'rb') as f:request = pickle.load(f)service = request.pop(0)# Use MMI protocol to check if service is availablemmi_request = [service]mmi_reply = client.send(b"mmi.service", mmi_request)service_ok = mmi_reply and mmi_reply[0] == b"200"if service_ok:reply = client.send(service, request)if reply:filename = reply_filename (suuid)with open(filename, "wb") as f:pickle.dump(reply, f)return Truereturn Falsedef main():verbose = '-v' in sys.argvctx = zmq.Context()# Create MDP client session with short timeoutclient = MajorDomoClient("tcp://localhost:5555", verbose)client.timeout = 1000 # 1 secclient.retries = 1 # only 1 retryrequest_pipe, peer = zpipe(ctx)request_thread = threading.Thread(target=titanic_request, args=(peer,))request_thread.daemon = Truerequest_thread.start()reply_thread = threading.Thread(target=titanic_reply)reply_thread.daemon = Truereply_thread.start()close_thread = threading.Thread(target=titanic_close)close_thread.daemon = Trueclose_thread.start()poller = zmq.Poller()poller.register(request_pipe, zmq.POLLIN)queue_filename = os.path.join(TITANIC_DIR, 'queue')# Main dispatcher loopwhile True:# Ensure message directory existsif not os.path.exists(TITANIC_DIR):os.mkdir(TITANIC_DIR)f = open(queue_filename,'wb')f.close()# We'll dispatch once per second, if there's no activitytry:items = poller.poll(1000)except KeyboardInterrupt:break;              # Interruptedif items:# Append UUID to queue, prefixed with '-' for pendingsuuid = request_pipe.recv().decode('utf-8')with open(queue_filename, 'ab') as f:line = "-%s\n" % suuidf.write(line.encode('utf-8'))# Brute-force dispatcherwith open(queue_filename, 'rb+') as f:for entry in f.readlines():entry = entry.decode('utf-8')# UUID is prefixed with '-' if still waitingif entry[0] == '-':suuid = entry[1:].rstrip() # rstrip '\n' etc.print ("I: processing request %s" % suuid)if service_success(client, suuid):# mark queue entry as processedhere = f.tell()f.seek(-1*len(entry), os.SEEK_CUR)f.write('+'.encode('utf-8'))f.seek(here, os.SEEK_SET)if __name__ == '__main__':main()

6、 高可用性对(双星模式)

双星模式(Binary Star Pattern)配置两台服务器作为主/备用高可用性对。在任何给定的时间内,其中一台(活动服务器)接受来自客户端应用程序的连接。另一台(被动服务器)不执行任何操作,但两台服务器相互监视。如果活动的那台服务器从网络消失,经过一定时间,被动的服务器就接管工作,充当活动服务器。
正常运行的高可用性对

故障转移过程中的高可用性对

从故障转移恢复的工作原理如下:
•操作者修复导致主服务器从网络上消失的问题,并重新启动它。
•操作者在备用服务器会对应用程序造成最小中断的时候,停止它。
•当应用程序已重新连接到主服务器时,操作者重新启动备用服务器。
示例Binary Star Server

# Binary Star Server
#
# Author: Dan Colish <dcolish@gmail.com>from argparse import ArgumentParser
import timefrom zhelpers import zmqSTATE_PRIMARY = 1
STATE_BACKUP = 2
STATE_ACTIVE = 3
STATE_PASSIVE = 4PEER_PRIMARY = 1
PEER_BACKUP = 2
PEER_ACTIVE = 3
PEER_PASSIVE = 4
CLIENT_REQUEST = 5HEARTBEAT = 1000class BStarState(object):def __init__(self, state, event, peer_expiry):self.state = stateself.event = eventself.peer_expiry = peer_expiryclass BStarException(Exception):passfsm_states = {STATE_PRIMARY: {PEER_BACKUP: ("I: connected to backup (slave), ready as master",STATE_ACTIVE),PEER_ACTIVE: ("I: connected to backup (master), ready as slave",STATE_PASSIVE)},STATE_BACKUP: {PEER_ACTIVE: ("I: connected to primary (master), ready as slave",STATE_PASSIVE),CLIENT_REQUEST: ("", False)},STATE_ACTIVE: {PEER_ACTIVE: ("E: fatal error - dual masters, aborting", False)},STATE_PASSIVE: {PEER_PRIMARY: ("I: primary (slave) is restarting, ready as master",STATE_ACTIVE),PEER_BACKUP: ("I: backup (slave) is restarting, ready as master",STATE_ACTIVE),PEER_PASSIVE: ("E: fatal error - dual slaves, aborting", False),CLIENT_REQUEST: (CLIENT_REQUEST, True)  # Say true, check peer later}}def run_fsm(fsm):# There are some transitional states we do not want to handlestate_dict = fsm_states.get(fsm.state, {})res = state_dict.get(fsm.event)if res:msg, state = reselse:returnif state is False:raise BStarException(msg)elif msg == CLIENT_REQUEST:assert fsm.peer_expiry > 0if int(time.time() * 1000) > fsm.peer_expiry:fsm.state = STATE_ACTIVEelse:raise BStarException()else:print(msg)fsm.state = statedef main():parser = ArgumentParser()group = parser.add_mutually_exclusive_group()group.add_argument("-p", "--primary", action="store_true", default=False)group.add_argument("-b", "--backup", action="store_true", default=False)args = parser.parse_args()ctx = zmq.Context()statepub = ctx.socket(zmq.PUB)statesub = ctx.socket(zmq.SUB)statesub.setsockopt_string(zmq.SUBSCRIBE, u"")frontend = ctx.socket(zmq.ROUTER)fsm = BStarState(0, 0, 0)if args.primary:print("I: Primary master, waiting for backup (slave)")frontend.bind("tcp://*:5001")statepub.bind("tcp://*:5003")statesub.connect("tcp://localhost:5004")fsm.state = STATE_PRIMARYelif args.backup:print("I: Backup slave, waiting for primary (master)")frontend.bind("tcp://*:5002")statepub.bind("tcp://*:5004")statesub.connect("tcp://localhost:5003")statesub.setsockopt_string(zmq.SUBSCRIBE, u"")fsm.state = STATE_BACKUPsend_state_at = int(time.time() * 1000 + HEARTBEAT)poller = zmq.Poller()poller.register(frontend, zmq.POLLIN)poller.register(statesub, zmq.POLLIN)while True:time_left = send_state_at - int(time.time() * 1000)if time_left < 0:time_left = 0socks = dict(poller.poll(time_left))if socks.get(frontend) == zmq.POLLIN:msg = frontend.recv_multipart()fsm.event = CLIENT_REQUESTtry:run_fsm(fsm)frontend.send_multipart(msg)except BStarException:del msgif socks.get(statesub) == zmq.POLLIN:msg = statesub.recv()fsm.event = int(msg)del msgtry:run_fsm(fsm)fsm.peer_expiry = int(time.time() * 1000) + (2 * HEARTBEAT)except BStarException:breakif int(time.time() * 1000) >= send_state_at:statepub.send("%d" % fsm.state)send_state_at = int(time.time() * 1000) + HEARTBEATif __name__ == '__main__':main()示例Binary Star Client
from time import sleep
import zmqREQUEST_TIMEOUT = 1000  # msecs
SETTLE_DELAY = 2000  # before failing overdef main():server = ['tcp://localhost:5001', 'tcp://localhost:5002']server_nbr = 0ctx = zmq.Context()client = ctx.socket(zmq.REQ)client.connect(server[server_nbr])poller = zmq.Poller()poller.register(client, zmq.POLLIN)sequence = 0while True:client.send_string("%s" % sequence)expect_reply = Truewhile expect_reply:socks = dict(poller.poll(REQUEST_TIMEOUT))if socks.get(client) == zmq.POLLIN:reply = client.recv_string()if int(reply) == sequence:print("I: server replied OK (%s)" % reply)expect_reply = Falsesequence += 1sleep(1)else:print("E: malformed reply from server: %s" % reply)else:print("W: no response from server, failing over")sleep(SETTLE_DELAY / 1000)poller.unregister(client)client.close()server_nbr = (server_nbr + 1) % 2print("I: connecting to server at %s.." % server[server_nbr])client = ctx.socket(zmq.REQ)poller.register(client, zmq.POLLIN)# reconnect and resend requestclient.connect(server[server_nbr])client.send_string("%s" % sequence)if __name__ == '__main__':main()

双星有限状态机

五、无代理可靠性(自由职业者模式)
ZMQ虽然不强加给你一个以代理为中心的架构,但它提供了构建代理的工具。通过解构迄今为止已经建成的以代理为基础的可靠性,并把它们放入一个称之为自由职业者模式的分布式对等架构。
模式一:简单的重试和故障转移
示例Freelance server - Model 1

#
# Freelance server - Model 1
# Trivial echo service
#
# Author: Daniel Lundin <dln(at)eintr(dot)org>
#import sys
import zmqif len(sys.argv) < 2:print "I: Syntax: %s <endpoint>" % sys.argv[0]sys.exit(0)endpoint = sys.argv[1]
context = zmq.Context()
server = context.socket(zmq.REP)
server.bind(endpoint)print "I: Echo service is ready at %s" % endpoint
while True:msg = server.recv_multipart()if not msg:break  # Interruptedserver.send_multipart(msg)server.setsockopt(zmq.LINGER, 0) # Terminate immediately示例Freelance Client - Model 1
#
# Freelance Client - Model 1
# Uses REQ socket to query one or more services
#
# Author: Daniel Lundin <dln(at)eintr(dot)org>
#import sys
import timeimport zmqREQUEST_TIMEOUT = 1000  # ms
MAX_RETRIES = 3   # Before we abandondef try_request(ctx, endpoint, request):print "I: Trying echo service at %s..." % endpointclient = ctx.socket(zmq.REQ)client.setsockopt(zmq.LINGER, 0)  # Terminate earlyclient.connect(endpoint)client.send(request)poll = zmq.Poller()poll.register(client, zmq.POLLIN)socks = dict(poll.poll(REQUEST_TIMEOUT))if socks.get(client) == zmq.POLLIN:reply = client.recv_multipart()else:reply = ''poll.unregister(client)client.close()return replycontext = zmq.Context()
request = "Hello world"
reply = Noneendpoints = len(sys.argv) - 1
if endpoints == 0:print "I: syntax: %s <endpoint> ..." % sys.argv[0]
elif endpoints == 1:# For one endpoint, we retry N timesendpoint = sys.argv[1]for retries in xrange(MAX_RETRIES):reply = try_request(context, endpoint, request)if reply:break  # Successprint "W: No response from %s, retrying" % endpoint
else:# For multiple endpoints, try each at most oncefor endpoint in sys.argv[1:]:reply = try_request(context, endpoint, request)if reply:break  # Successprint "W: No response from %s" % endpointif reply:print "Service is running OK"

模式二:粗暴猎枪屠杀
示例Freelance server - Model 2

#
# Freelance server - Model 2
# Does some work, replies OK, with message sequencing
#
# Author: Daniel Lundin <dln(at)eintr(dot)org>
#import sys
import zmqif len(sys.argv) < 2:print "I: Syntax: %s <endpoint>" % sys.argv[0]sys.exit(0)endpoint = sys.argv[1]
context = zmq.Context()
server = context.socket(zmq.REP)
server.bind(endpoint)print "I: Service is ready at %s" % endpoint
while True:request = server.recv_multipart()if not request:break  # Interrupted# Fail nastily if run against wrong clientassert len(request) == 2address = request[0]reply = [address, "OK"]server.send_multipart(reply)server.setsockopt(zmq.LINGER, 0)  # Terminate early

示例Freelance Client - Model 2

#
# Freelance Client - Model 2
# Uses DEALER socket to blast one or more services
#
# Author: Daniel Lundin <dln(at)eintr(dot)org>
#import sys
import timeimport zmqGLOBAL_TIMEOUT = 2500  # msclass FLClient(object):def __init__(self):self.servers = 0self.sequence = 0self.context = zmq.Context()self.socket = self.context.socket(zmq.DEALER)   # DEALERdef destroy(self):self.socket.setsockopt(zmq.LINGER, 0)  # Terminate earlyself.socket.close()self.context.term()def connect(self, endpoint):self.socket.connect(endpoint)self.servers += 1print "I: Connected to %s" % endpointdef request(self, *request):# Prefix request with sequence number and empty envelopeself.sequence += 1msg = ['', str(self.sequence)] + list(request)# Blast the request to all connected serversfor server in xrange(self.servers):self.socket.send_multipart(msg)# Wait for a matching reply to arrive from anywhere# Since we can poll several times, calculate each onepoll = zmq.Poller()poll.register(self.socket, zmq.POLLIN)reply = Noneendtime = time.time() + GLOBAL_TIMEOUT / 1000while time.time() < endtime:socks = dict(poll.poll((endtime - time.time()) * 1000))if socks.get(self.socket) == zmq.POLLIN:reply = self.socket.recv_multipart()assert len(reply) == 3sequence = int(reply[1])if sequence == self.sequence:breakreturn replyif len(sys.argv) == 1:print "I: Usage: %s <endpoint> ..." % sys.argv[0]sys.exit(0)# Create new freelance client object
client = FLClient()for endpoint in sys.argv[1:]:client.connect(endpoint)start = time.time()
for requests in xrange(10000):request = "random name"reply = client.request(request)if not reply:print "E: Name service not available, aborting"break
print "Average round trip cost: %i usec" % ((time.time() - start) / 100)
client.destroy()

模式三:复杂和讨厌的

示例Freelance server - Model 3
"""Freelance server - Model 3Uses an ROUTER/ROUTER socket but just one threadAuthor: Min RK <benjaminrk@gmail.com>
"""import sysimport zmqfrom zhelpers import dumpdef main():verbose = '-v' in sys.argvctx = zmq.Context()# Prepare server socket with predictable identitybind_endpoint = "tcp://*:5555"connect_endpoint = "tcp://localhost:5555"server = ctx.socket(zmq.ROUTER)server.identity = connect_endpointserver.bind(bind_endpoint)print "I: service is ready at", bind_endpointwhile True:try:request = server.recv_multipart()except:break # Interrupted# Frame 0: identity of client# Frame 1: PING, or client control frame# Frame 2: request bodyaddress, control = request[:2]reply = [address, control]if control == "PING":reply[1] = "PONG"else:reply.append("OK")if verbose:dump(reply)server.send_multipart(reply)print "W: interrupted"if __name__ == '__main__':main()示例Freelance client - Model 3
"""
Freelance client - Model 3Uses flcliapi class to encapsulate Freelance patternAuthor : Min RK <benjaminrk@gmail.com>
"""import timefrom flcliapi import FreelanceClientdef main():# Create new freelance client objectclient = FreelanceClient()# Connect to several endpointsclient.connect ("tcp://localhost:5555")client.connect ("tcp://localhost:5556")client.connect ("tcp://localhost:5557")# Send a bunch of name resolution 'requests', measure timerequests = 10000start = time.time()for i in range(requests):request = ["random name"]reply = client.request(request)if not reply:print "E: name service not available, aborting"returnprint "Average round trip cost: %d usec" % (1e6*(time.time() - start) / requests)if __name__ == '__main__':main()

示例Freelance Pattern agent class

"""
flcliapi - Freelance Pattern agent class
Model 3: uses ROUTER socket to address specific servicesAuthor: Min RK <benjaminrk@gmail.com>
"""import threading
import timeimport zmqfrom zhelpers import zpipe# If no server replies within this time, abandon request
GLOBAL_TIMEOUT = 3000    # msecs
# PING interval for servers we think are alivecp
PING_INTERVAL  = 2000    # msecs
# Server considered dead if silent for this long
SERVER_TTL     = 6000    # msecsdef flciapi_agent(peer):"""This is the thread that handles our real flcliapi class"""pass# =====================================================================
# Synchronous part, works in our application threadclass FreelanceClient(object):ctx = None      # Our Contextpipe = None     # Pipe through to flciapi agentagent = None    # agent in a threaddef __init__(self):self.ctx = zmq.Context()self.pipe, peer = zpipe(self.ctx)self.agent = threading.Thread(target=agent_task, args=(self.ctx,peer))self.agent.daemon = Trueself.agent.start()def connect(self, endpoint):"""Connect to new server endpointSends [CONNECT][endpoint] to the agent"""self.pipe.send_multipart(["CONNECT", endpoint])time.sleep(0.1) # Allow connection to come updef request(self, msg):"Send request, get reply"request = ["REQUEST"] + msgself.pipe.send_multipart(request)reply = self.pipe.recv_multipart()status = reply.pop(0)if status != "FAILED":return reply# =====================================================================
# Asynchronous part, works in the background# ---------------------------------------------------------------------
# Simple class for one server we talk toclass FreelanceServer(object):endpoint = None         # Server identity/endpointalive = True            # 1 if known to be aliveping_at = 0             # Next ping at this timeexpires = 0             # Expires at this timedef __init__(self, endpoint):self.endpoint = endpointself.alive = Trueself.ping_at = time.time() + 1e-3*PING_INTERVALself.expires = time.time() + 1e-3*SERVER_TTLdef ping(self, socket):if time.time() > self.ping_at:socket.send_multipart([self.endpoint, 'PING'])self.ping_at = time.time() + 1e-3*PING_INTERVALdef tickless(self, tickless):if tickless > self.ping_at:tickless = self.ping_atreturn tickless# ---------------------------------------------------------------------
# Simple class for one background agentclass FreelanceAgent(object):ctx = None              # Own contextpipe = None             # Socket to talk back to applicationrouter = None           # Socket to talk to serversservers = None          # Servers we've connected toactives = None          # Servers we know are alivesequence = 0            # Number of requests ever sentrequest = None          # Current request if anyreply = None            # Current reply if anyexpires = 0             # Timeout for request/replydef __init__(self, ctx, pipe):self.ctx = ctxself.pipe = pipeself.router = ctx.socket(zmq.ROUTER)self.servers = {}self.actives = []def control_message (self):msg = self.pipe.recv_multipart()command = msg.pop(0)if command == "CONNECT":endpoint = msg.pop(0)print "I: connecting to %s...\n" % endpoint,self.router.connect(endpoint)server = FreelanceServer(endpoint)self.servers[endpoint] = serverself.actives.append(server)# these are in the C case, but seem redundant:server.ping_at = time.time() + 1e-3*PING_INTERVALserver.expires = time.time() + 1e-3*SERVER_TTLelif command == "REQUEST":assert not self.request    # Strict request-reply cycle# Prefix request with sequence number and empty envelopeself.request = [str(self.sequence), ''] + msg# Request expires after global timeoutself.expires = time.time() + 1e-3*GLOBAL_TIMEOUTdef router_message (self):reply = self.router.recv_multipart()# Frame 0 is server that repliedendpoint = reply.pop(0)server = self.servers[endpoint]if not server.alive:self.actives.append(server)server.alive = 1server.ping_at = time.time() + 1e-3*PING_INTERVALserver.expires = time.time() + 1e-3*SERVER_TTL;# Frame 1 may be sequence number for replysequence = reply.pop(0)if int(sequence) == self.sequence:self.sequence += 1reply = ["OK"] + replyself.pipe.send_multipart(reply)self.request = None# ---------------------------------------------------------------------
# Asynchronous agent manages server pool and handles request/reply
# dialog when the application asks for it.def agent_task(ctx, pipe):agent = FreelanceAgent(ctx, pipe)poller = zmq.Poller()poller.register(agent.pipe, zmq.POLLIN)poller.register(agent.router, zmq.POLLIN)while True:# Calculate tickless timer, up to 1 hourtickless = time.time() + 3600if (agent.request and tickless > agent.expires):tickless = agent.expiresfor server in agent.servers.values():tickless = server.tickless(tickless)try:items = dict(poller.poll(1000 * (tickless - time.time())))except:break              # Context has been shut downif agent.pipe in items:agent.control_message()if agent.router in items:agent.router_message()# If we're processing a request, dispatch to next serverif (agent.request):if (time.time() >= agent.expires):# Request expired, kill itagent.pipe.send("FAILED")agent.request = Noneelse:# Find server to talk to, remove any expired oneswhile agent.actives:server = agent.actives[0]if time.time() >= server.expires:server.alive = 0agent.actives.pop(0)else:request = [server.endpoint] + agent.requestagent.router.send_multipart(request)break# Disconnect and delete any expired servers# Send heartbeats to idle servers if neededfor server in agent.servers.values():server.ping(agent.router)

ZeroMQ学习笔记(4)——可靠的请求-应答模式相关推荐

  1. ZeroMQ 中文指南 第四章 可靠的请求-应答模式【转载】

    此文章转载自GitHub : https://github.com/anjuke/zguide-cn 作者信息如下. ZMQ 指南 作者: Pieter Hintjens ph@imatix.com, ...

  2. zmq 可靠的请求-应答模式

    <link rel="stylesheet" href="https://csdnimg.cn/release/phoenix/template/css/ck_ht ...

  3. ZMQ 第四章 可靠的请求-应答模式

    感谢原创作者的分享! # ZMQ 第四章 可靠的请求-应答模式 第三章中我们使用实例介绍了高级请求-应答模式,本章我们会讲述请求-应答模式的可靠性问题,并使用ZMQ提供的套接字类型组建起可靠的请求-应 ...

  4. ZeroMQ学习笔记(2)——套接字和模式

    文章目录 一.套接字(socket)API 1.套接字接入网路拓扑 2.用套接字传输数据 3.单播传输 4.I/O线程 二.消息传递模式 1.处理消息 2.处理多个套接字 3.多部分消息 4.中间层 ...

  5. ZEROMQ 第 4 章 - 可靠地请求响应模型

    第三章中我们使用实例介绍了高级请求-应答模式,本章我们会讲述请求-应答模式的可靠性问题,并使用ZMQ提供的套接字类型组建起可靠的请求-应答消息系统. 本章将介绍的内容有: 客户端请求-应答 最近最少使 ...

  6. ZeroMQ指南:第4章:可靠的请求-应答

    本文是阅读http://zguide2.zeromq.org/page:all#toc65的笔记. 第三章用实际的示例探讨了请求-回应模式的高级用法.本章将探讨可靠性的问题,在ZeroMQ的核心请求- ...

  7. 初探ZeroMQ(二) 请求-应答模式中套结字总结

    参考资料:ØMQ - The Guide(英文) 参考资料:ØMQ - The Guide(中文) 本文主要介绍和总结在请求-应答模式中各种套结字的行为. 套结字简介 来点通俗易懂的,先认识下请求-应 ...

  8. SwiftUI学习笔记之异步数据请求

    SwiftUI学习笔记之异步数据请求 方法一 方法描述: 结合使用 ObservableObject @Published @ObservedObject ObservableObject 定义自己的 ...

  9. 设计模式学习笔记(十七)——Command命令模式

    设计模式学习笔记(十七)--Command命令模式 Command命令模式介绍: Command命令模式是一种对象行为型模式,它主要解决的问题是:在软件构建过程中,"行为请求者"与 ...

最新文章

  1. 中兴同美司法部和解协议获准:罚12亿美元
  2. python绘制3d图-python中Matplotlib实现绘制3D图的示例代码
  3. 网工必看,万字网络排错笔记
  4. 31天重构学习笔记3. 提升方法
  5. python list转map_Python 进阶之术 Map Filter Reduce
  6. iheatmapr包:可交互的热图绘制方法
  7. Ubuntu16.04装机5:安装Anaconda3
  8. easyui 常用的属性
  9. python顺序查找算法解释_顺序查找算法详解(包含C语言实现代码)
  10. centos mpeg acc 解码器安装
  11. 马里兰大学calce电池循环测试数据集_锂电池极片:机械性能测试是门学问,要搞懂真不容易...
  12. 仿得微博字符限制效果
  13. 20个PCB快捷键操作,提升绘图效率
  14. 查看360极速浏览器保存的密码
  15. 几款常见的可视化HTML编辑器 WYSIWYG
  16. 扰动观察法怎么写matlab,扰动观察法
  17. 揭秘 手机群控 带来的利益
  18. java如何给字符串每三位分隔逗号
  19. Android之WebView的使用与简单浏览器
  20. Mysql时区差异异常-The server time zone value ‘xxx‘ is unrecognized or represents more than one time zone.

热门文章

  1. centos7下系统日志的时间会比当前时间快8个小时的解决方案
  2. memset and ZeroMemory
  3. 麒麟系统登录界面开启root用户登录
  4. lnmp一键安装包linux,linux centos7.3 lnmp一键安装包集成
  5. 介绍磁性微球在各领域中的应用
  6. 实现分享功能插件2---jiathis分享插件应用
  7. 离子色谱的优点及原理
  8. 邮寄地址英文翻译总汇
  9. YOLO vs SSD
  10. 将本地vue项目上传到github上