设计

  • 该文章主要是花了比较长的时间解决AMQP 数据收发异步的问题。该问题主要在于PIKA包不支持异步的问题。收发数据使用同一个连接和同一个通道会在某些环境异常(Windows 和部分Linux没有出现,在生产环境的Linux下会产生异常) 。而且问题很多,其中一个就是 Stream connection lost: AssertionError((’_AsyncTransportBase._produce() tx buffer size underflow’, -275, 1),)。

设计一个AMQPClientUtil类 用户AMQPClient管理

  • AMQMClient管理 这里会创建两个AMQP对象,一个是用户定义的比如 hello_amqp,主要用户其他服务来请求数据,另一个有系统定义hello_amqprep,在用户定义的queue末尾增加req主要用户向其他服务请求数据。这样设计的目的在于将主动请求和主动接受分开。避免一个数据队列数据量过多
  • 外部通过AMQPClientUntil的对象调用 只存在启动时,调用run函数,已初始化AMQP连接。发送数据时调用 send

设计一个AMQPClient类

  • 该类的主要作用的是连接AMQP,并进行异步收发数据
  • 详情见源码内说明

源码

# coding=UTF-8
import pika
import json
import threading
import time
import zlib
import datetime
import sysKEEP_ACTIVE = "keep_active"def getlocaltime():'''格式化 输出时间 这一块主要用于日志输出'''return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")class AMQPClientUtil():'''AMQMClient管理 这里会创建两个AMQP对象,一个是用户定义的比如 hello_amqp,主要用户其他服务来请求数据,另一个有系统定义hello_amqprep,在用户定义的queue末尾增加req主要用户向其他服务请求数据。这样设计的目的在于将主动请求和主动接受分开。避免一个数据队列数据量过多'''def __init__(self, queue, exchange, cb_func, username, userpasswd, host, port):self.recvChannel = AMQPClient(queue, exchange, cb_func, username, userpasswd, host, port)req_queue = queue + "req"self.sendChannel = AMQPClient(req_queue, exchange, cb_func, username, userpasswd, host, port)passdef run(self):self.recvChannel.run()self.sendChannel.run()def Send(self, routing_key, type, strMsg, szip="false"):self.sendChannel.Send(routing_key, type, strMsg, szip)class AMQPClient():def __init__(self, queue, exchange, cb_func, username, userpasswd, host, port):''':param queue: 队列和RoutineKey:param exchange:  交换机:param cb_func:   接收消息的回调函数 函数原型 (  DealMesIn(self,  msgType, msg, routineKey)  )'''print(getlocaltime(), "正在初始化AMQP---- " + queue)self.queue = queueself.exchange = exchangeself.pid = 1self.cb_func = cb_funcself.timmer = Noneself.cur_reciveDateTime = datetime.datetime.now()self.channel = Noneself.connection = Noneself.channel_send = Noneself.connection_send = None# 全局化 PIKA 参数 , 因为PIKA不支持异步,所以后面会有多个连接self.properties = pika.BasicProperties(headers={})credentials = pika.PlainCredentials(username, userpasswd)self.parameters = pika.ConnectionParameters(host=host, port=port, credentials=credentials, heartbeat=0)# 起一个线程监控接收数据的情况thread = threading.Thread(target=self.__checkReciveTime)thread.start()def __checkReciveTime(self):while 1:cur_time = datetime.datetime.now()if (cur_time - self.cur_reciveDateTime).seconds > 600:# 表示数据已经断了print ('AMQP断了正在重新连接......')                self.__ConnectAMQP()passtime.sleep(61 * 5)def __getPid(self):# 与服务端程序设计有关,服务端收到数据之后会将PID返回,让客户端实现同步操作self.pid += 1return self.piddef run(self):# 启动线程连接AMQPthread = threading.Thread(target=self.__ConnectAMQP)thread.start()self.__keep_active()passdef __ConnectAMQP(self):try:print(getlocaltime(), "ConnectAMQP---- " + self.queue)self.createAMQP = False# 关闭该关闭的数据self.__reset()# 创建一个接收数据的 连接 和 通道self.connection, self.channel = self.__getAMQPConeAndChannel()# 创建一个发送数据的 连接 和 通道self.connection_send, self.channel_send = self.__getAMQPConeAndChannel()# 注: 上面两组连接与通道产生的原因是 PIKA 是不支持异步,那如果只有一个连接和通道用于接收和发送数据会造成start_consuming异常print(getlocaltime(), "初始化AMQP成功---- " + self.queue)self.createAMQP = Trueself.channel.start_consuming()except BaseException as e:# 关闭之前创建的连接self.__reset()if isinstance(e, KeyboardInterrupt):returnexc_type, exc_value, exc_obj = sys.exc_info()traceback.print_exception(exc_type,exc_value,exc_obj,limit=2,file=sys.stdout)print(getlocaltime(), "{}:{}".format(sys._getframe().f_code.co_name, sys._getframe().f_lineno), "AMQP异常, ", e)# AMQP断连之后的重连机制timmer = threading.Timer(10, AMQPClient.__ConnectAMQP, args=(self, ))timmer.start()def __reset(self):try:if self.connection:self.connection.close()if self.connection_send:self.connection_send.close()if self.channel:self.channel.stop_consuming()if self.channel:self.channel_send.stop_consuming()except Exception as e:passfinally:passdef __getAMQPConeAndChannel(self):# 创建连接connection = pika.BlockingConnection(self.parameters)# 创建通道channel = connection.channel()# 声明一个交换机  durable 持久化数据 auto_delete 绑定数据channel.exchange_declare(exchange=self.exchange,exchange_type='direct',passive=True,durable=False,auto_delete=True)# 绑定队列和路由self.bind_queue(self.queue, channel)return connection, channeldef bind_queue(self, queue,channel):# 这个参数主要适用于超时断连自动删除队列和路由键dic_args = {"x-expires":60000, "x-message-ttl":30000}channel.queue_declare(queue=queue, arguments=dic_args)channel.queue_bind(queue=queue, exchange=self.exchange, routing_key=queue)# prefetch_count 当队列中有最大多个数据没有被确认接收 ,不再接收其他数据,如果想实现同步调用可以将这个值设置为1channel.basic_qos(prefetch_count=54)# auto_ack=False 这个参数 是收到消息自动确认,如果设置为True则表示自动确认收到消息。想实现同步消息 需要 prefetch_count设置为1 并且 auto_ack=Truechannel.basic_consume(queue=queue, on_message_callback=self.DealMesIn, auto_ack=False)def __keep_active(self):# 定时发送消息 以保证没有收发消息的时候,队列和路由键不会被销毁self.Send(self.queue, KEEP_ACTIVE, KEEP_ACTIVE)if self.timmer != None:self.timmer.cancel()self.timmer = threading.Timer(15, AMQPClient.__keep_active, args=(self, ))self.timmer.start()def DealMesIn(self, ch, method, properties, body):      # 内部的一个消息接收函数,主要是内部处理掉 keep_activeself.cur_reciveDateTime = datetime.datetime.now()dic_args = {"x-expires": 60000, "x-message-ttl": 30000}queue_declare = self.channel.queue_declare(queue=self.queue, arguments=dic_args)headers = properties.headerstry:if headers["zip"] == 'false' and (body.decode() == KEEP_ACTIVE):print(getlocaltime(), body.decode())else:msg = ""if headers["zip"] == "true":decompress = zlib.decompressobj()msg = decompress.decompress(body)else:msg = body.decode()# 调用用户注册的消息回调self.cb_func(headers["type"], msg, headers["from"])except Exception as e:exc_type, exc_value, exc_obj = sys.exc_info()traceback.print_exception(exc_type,exc_value,exc_obj,limit=2,file=sys.stdout)print (getlocaltime(), "{}:{}".format(sys._getframe().f_code.co_name, sys._getframe().f_lineno), "recive...error:",e)finally:# 确认消息 如果auto_ack=True,则这个不是需要的,如果 auto_ack=False,是需要设置的,不然消息数量达到 prefetch_count之后,不再接收消息ch.basic_ack(delivery_tag=method.delivery_tag)def Send(self, routing_key, type, strMsg, szip="false"):if not self.createAMQP:returnif szip:# 压缩系统还没做好szip = "false"dict_header = {}dict_header["type"] = typedict_header["from"] = self.queuedict_header["pid"] = str(self.__getPid())dict_header["structlen"] = str(len(strMsg))dict_header["zip"] = sziptry:# 构建消息的头,用于服务端解析消息的来源self.properties.headers = dict_header# 发送消息self.channel_send.basic_publish(exchange=self.exchange, routing_key=routing_key, properties=self.properties, body=strMsg)except:print (getlocaltime(), "{}:{}".format(sys._getframe().f_code.co_name, sys._getframe().f_lineno), "send...error")finally:# connection.close()pass# 以下是测试案例
def OnMessage(ch, method, properties, body):headers = properties.headersif headers["zip"] == 'false':print(datetime.datetime.now().strftime("%H:%M:%S"), "nozip", body)else:print(zlib.decompress(body))passdef aaa():print (getlocaltime(), "{}:{}".format(sys._getframe().f_code.co_name, sys._getframe().f_lineno))if __name__ == '__main__':aaa()a = Nonetry:a = AMQPClientUtil("hello_test2", "exchange", OnMessage, "username", "password", "amqp地址", "port")a.run()dict_data = {}dict_data["SubscribeMarketData"] = ["AP101"]time.sleep(3)while 1:a.Send("hello_test1", "SubscribeMarketData", json.dumps(dict_data))time.sleep(60)passfinally:if a != None:pass

python AMQP 客户端连接相关推荐

  1. 0039-如何使用Python Impyla客户端连接Hive和Impala

    温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看. 1.文档编写目的 继上一章讲述如何在CDH集群安装Anaconda&搭建Python私有源后,本章节主要讲述如何使用Pyton ...

  2. python访问k8s的api_如何通过Python Kubernetes客户端连接microk8s API?

    我正在尝试连接到我的microk8s Kubernetes集群,它正在使用Python Kubernetes client监听端口16443:#!/usr/bin/python3 import kub ...

  3. u8系统怎么连接服务器,怎么U8客户端连接服务器

    怎么U8客户端连接服务器 内容精选 换一换 本章节为您介绍以下内容:准备弹性云服务器作为GDS服务器在使用GDS导入导出数据之前,需要准备一台或多台与GaussDB(DWS) 集群在相同VPC内的Li ...

  4. python实现socket多客户端连接

    socket实现同网络下不同机器之间互联,常规写法,直接创建后只能有一个客户端连接到服务端,无法实现多客户端连接.在这里使用多线程的方法创建多个监听来实现多客户端连接同一个服务端. server端代码 ...

  5. stm32f407+lan8720 和 python 实现多个TCP客户端连接的TCP服务器

    最近本人想用开发板来做服务器,所以就想到这个方法.对于写pc端服务器的童鞋来说,这应该是件很容易的事情,所以,这里主要分为两种实现方法: 第一种:在stm32f4开发板实现,基于lwip 硬件:正点原 ...

  6. python tcp多个客户端连接服务器

    一.传输层** 该层为两台主机上的应用程序提供端到端的通信.传输层有两个传输协议:TCP(传输控制协议)和 UDP(用户数据报协议).其中,TCP是一个可靠的面向连接的协议,udp是不可靠的或者说无连 ...

  7. 第四节 RabbitMQ在C#端的应用-客户端连接

    第四节 RabbitMQ在C#端的应用-客户端连接 原文:第四节 RabbitMQ在C#端的应用-客户端连接 版权声明:未经本人同意,不得转载该文章,谢谢 https://blog.csdn.net/ ...

  8. 005,使用Java客户端连接RabbitMQ,构造我们的第一个Publish和Consumer应用

    2019独角兽企业重金招聘Python工程师标准>>> 官网地址:http://www.rabbitmq.com/documentation.html    如下图: 在客户端连接服 ...

  9. 虚拟机客户端怎么连接服务器,虚拟机客户端连接服务器

    虚拟机客户端连接服务器 内容精选 换一换 本章节指导您使用MongoDB客户端和Robo 3T工具,通过公网连接集群实例.通过MongoDB客户端和Robo 3T工具连接实例的方式有普通连接和SSL连 ...

最新文章

  1. mv 重命名或移动文件
  2. 第十一届蓝桥杯校内赛题解
  3. 【原创翻译】如何阅读一个GO程序
  4. android新架构,Android新架构组件 LifeCycles 简介
  5. 前端初学者开发学习视频_初学者学习前端开发的实用指南
  6. 【Docker】Segmentation Fault or Critical Error encountered. Dumping core and abort
  7. 大数据应用智能交通有哪些意义
  8. 【心情】换个皮肤试试看
  9. LNMP详解(十四)——Nginx日志详解
  10. CocoaPods 的安装(第三方开源类库)
  11. 主引导记录 - 维基百科,自由的百科全书
  12. 【UVM基础】两种启动 sequence 的方式
  13. 一些常用的英文写作网站
  14. 从别人那copy点学习资料
  15. CSR867x — Headset项目评估总结
  16. Java 中代码优化的 30 个小技巧(下)
  17. DC Motor 参数的理解
  18. 盘点:当今十大备份应用软件
  19. 第904题 水果成篮
  20. xxxxxx 不在 sudoers 文件中。此事将被报告

热门文章

  1. windows如何使用bat快速安装计划任务?
  2. Android JetPack底部导航Navigation 组件的介绍与使用
  3. 树莓派Zero 2W python3.7 安装tensorflow2.2
  4. vue实现下拉二级联动_vue实现二级联动效果
  5. SpringBoot 内嵌容器
  6. 2019前端面试题记录(杂文)
  7. LUT查找表实现多路复选器原理精讲
  8. 2015年App Store审核被拒的23个理由
  9. 人民币小写转大写的一般方法
  10. iOS小技能:合并mp3格式的文件