一,编写rabbitmq基础模块类

1,安装pika模块
pip install pika

这里需要注意的是: pika官网明确说明 pika==0.11.0版本只支持python2.6以前的版本
重点: 在下载时可以进入官网确定你的版本所需要的pika版本号。
pika官网地址:https://pypi.org/project/pika/

2,实现rabbitmq基础模块类的编写

这里实现了Rabbitmq对象初始化、连接mq、发送mq消息、阻塞监听消息并回调。
本模块代码作为rabbitmq基础模块类,为业务模块调用提供方法。
对于代码的详解已经写道注释中了。

这里的一个connection就是一个tcp连接。为了提升tcp连接复用性,在每个连接基础上可以建立多个channel信道,每个信道都会被指派一个唯一的 ID。同时 RabbitMQ 可以确保每个线程的私密性,就像拥有独立的连接一样。但考虑到如果数据量过大,会导致连接阻塞,最终这里选择一个connect连接只对应了一个channel信道。
关于RabbitMQ 中 Connection 和 Channel 详解:https://www.cnblogs.com/eleven24/p/10326718.html
类似于下图:

模块代码如下:

import pika
import time
import logging
logger = logging.getLogger('mydjango')
# import sys
from django.conf import settings
from retrying import retryclass RabbitmqServer(object):def __init__(self,username,password,serverip,port,virtual_host):self.username =usernameself.password = passwordself.serverip = serveripself.port = portself.virtual_host = virtual_host# def connent(self):#    for i in range(3):#         try:#             logger.info("into mq connet")#             user_pwd = pika.PlainCredentials(self.username,self.password)#             logger.info("create mq ...")#             logger.info("%s,%s,%s,%s,%s"%(self.virtual_host,self.serverip,self.port,self.password,self.username))#             s_conn = pika.BlockingConnection(pika.ConnectionParameters(virtual_host=self.virtual_host,host= self.serverip,port=self.port, credentials=user_pwd))  # 创建连接#             logger.info('create channel...')#             self.channel = s_conn.channel()#             logger.info('connect successful')#             break#         except Exception as e:#             logger.info("连接mq失败,沉睡10s再试,共沉睡三次,失败原因:%s",e)#             time.sleep(10)@retry(stop_max_delay=30000, wait_fixed=5000)def connent(self):logger.info("into mq connet")user_pwd = pika.PlainCredentials(self.username, self.password)logger.info("create mq ...")logger.info("%s,%s,%s,%s,%s" % (self.virtual_host, self.serverip, self.port, self.password, self.username))# 创建 mq连接s_conn = pika.BlockingConnection(pika.ConnectionParameters(virtual_host=self.virtual_host, host=self.serverip, port=self.port,credentials=user_pwd))  logger.info('create channel...')self.channel = s_conn.channel()logger.info('connect successful')def productMessage(self,queuename,message):self.channel.queue_declare(queue=queuename, durable=True)self.channel.basic_publish(exchange='',routing_key=queuename,#写明将消息发送给队列queuenamebody=message,    #要发送的消息properties=pika.BasicProperties(delivery_mode=2,)#设置消息持久化,将要发送的消息的属性标记为2,表示该消息要持久化)def expense(self,queuename,func):""":param queuename: 消息队列名称:param func: 要回调的方法名"""self.channel.basic_qos(prefetch_count=1)self.channel.basic_consume(func,queue=queuename,)self.channel.start_consuming()def callback(ch, method, properties, body):print(" [消费者] Received %r" % body)time.sleep(1)print(" [消费者] Done")ch.basic_ack(delivery_tag=method.delivery_tag)#  接收到消息后会给rabbitmq发送一个确认if __name__ != '__main__':   # 测试服务是否能启动时使用from django.conf import settings# username = settings.RABBITMQCONFIG.get("username")# password = settings.RABBITMQCONFIG.get("password")# severip = settings.RABBITMQCONFIG.get("severip")# port = settings.RABBITMQCONFIG.get("port")username,password,severip,port,virtual_host = settings.PONEDITOR_RMQ_USER,settings.PONEDITOR_RMQ_PASSWD,settings.PONEDITOR_RMQ_IP,\settings.PONEDITOR_RMQ_PORT,settings.PONEDITOR_RMQ_VIRHOSTRabbitmqClient = RabbitmqServer(username,password,severip,port,virtual_host)if __name__ == '__main__':import jsonRabbitmqClient = RabbitmqServer("root", "ssb@2019",'172.31.0.54',5673,"YuXinIBTool")RabbitmqClient.connent()data = {"code":3}RabbitmqClient.productMessage("test3",json.dumps(data))RabbitmqClient.expense("test3",callback)

二,django中多进程开启Rabbitmq声明队列、发送消息、阻塞监听

此模块中将使用多进程,为方便结束开启的多个进程,这里使用kill -15 方法删掉进程。
kill -9 和kill -15的区别:https://www.cnblogs.com/domestique/p/8241219.html

from django.core.management.base import BaseCommand
from utils.echo_display import zip_unzip
import os
import json
import pymysql
import pandas as pd
import time
from django.conf import settings
# from utils.structure_modify import modify
from utils.annual_entrusted_modify import modify_annual
import signal
import multiprocessing
# import threadingclass Command(BaseCommand):def handle(self, *args, **options):import logginglogger = logging.getLogger('mydjango')from utils.Rabbitmqserver import RabbitmqClientfrom django.conf import settingsimport jsondef parse_result_func(ch, method, properties, body):### 逻辑程序logger.info("%s start to Analytical data..." %(queue_name))logger.info(" [接收到的请求头] Received %r [接收到的请求体] Received %r" % (properties.headers, body))try:req_res = json.loads(body)req_head = dict(properties.headers)project_id = str(req_res["projectId"])logger.info("Analytical data successful")except Exception as e:logger.error("there is a failed cause : rabbitmq parameter not correct %s"%(e) )logger.error("failed info -- properties : %s  body : %s"%(properties,body))returntry:logger.info("start logical")finlall_docx_name = modify_annual.main(body)logger.info("logical successful")except Exception as e:logger.error(str(body))logger.info("send a reject to rabbitmq queue : %s" % (queue_name))# 接收到消息后会给rabbitmq发送一个拒绝ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)  logger.info("reject process is end")else:logger.info("send a ack to rabbitmq queue : %s" % (queue_name))# 接收到消息后会给rabbitmq发送一个确认ch.basic_ack(delivery_tag=method.delivery_tag)  def term(sig_num, addtion):logger.info("stop the process current pid is %s ,group id is %s" % (os.getpid(), os.getpgrp()))os.killpg(os.getpgid(os.getpid()), signal.SIGKILL)def func(args):arguments = {'x-dead-letter-exchange': "exchange.e50.oneditor",  # 延迟结束后指向交换机(死信收容交换机)'x-dead-letter-routing-key': "rkey.oneditor5.dlx",  # 延迟结束后指向队列(死信收容队列),可直接设置queue name也可以设置routing-key}logger.info("into %s sub-process pid is %s ,group id is %s"%(args,os.getpid(), os.getpgrp()))global queue_namequeue_name = argslogger.info("start to connect the RabbitmqClient")# 连接mq,建立connention和channelRabbitmqClient.connent()# 声明队列RabbitmqClient.channel.queue_declare(queue=args, durable=True,arguments=arguments)logger.info("connect the RabbitmqClient successful")# 调用回调函数并祖泽监听队列RabbitmqClient.expense(args, parse_result_func)logger.info("into listening logical --queue_listener--")# 接受kill -15 消息后,调用term函数signal.signal(signal.SIGTERM, term)logger.info("current pid is %s ,group id is %s" % (os.getpid(), os.getpgrp()))# 开启多进程processes_list = []listenque_list = ["queue.p"+str(i)+".oneditor.docx" for i in range(10)]# listenque_list = ["queue.p0.oneditor.docx"]for listenque in listenque_list:t = multiprocessing.Process(target=func, args=(listenque,))t.daemon = Truet.start()processes_list.append(t)for p in processes_list:p.join()############################################################ print("current pid is % s" % os.getpid())# processes_list = []## listenque_list = ["queue.p1.oneditor.docx","queue.p2.oneditor.docx"]# # listenque_list = ["queue.p"+str(i)+".oneditor.docx" for i in range(2)]# print(listenque_list)# for listenque in listenque_list:#     t = multiprocessing.Process(target=func, args=(listenque,))#     t.daemon = True#     t.start()#     processes_list.append(t)# time.sleep(2)## try:#     for p in processes_list:#         p.join()# except Exception as e:#     print(str(e))
########################################################### t1 = threading.Thread(target=func, args=("queue.p1.oneditor.docx",))# t2 = threading.Thread(target=func, args=("queue.p2.oneditor.docx",))## t1.start()# t2.start()########################################################## class Mythreadsend1(threading.Thread):#     def run(self):#         logger.info("start to listening the info...")#         RabbitmqClient.expense("queue.p1.oneditor.docx", parse_result_func)## class Mythreadsend2(threading.Thread):#     def run(self):#         logger.info("start to listening the info...")#         RabbitmqClient.expense("queue.p2.oneditor.docx", parse_result_func)## t1 = Mythreadsend1()# t2 = Mythreadsend2()# t1.start()# t2.start()

三,django中使用manage.py开启独立进程

参考如下:https://blog.csdn.net/luslin1711/article/details/87885145

1、开发时会使用django环境进行一些初始化操作,这些程序一般只执行几次,但是需要django中的环境变量。
2、使用django运行阻塞监听的程序,比如Rabbitmq监听,放在主程序中就阻塞住了,需要另外开命令执行

在创建的app下创建文件夹management,在management文件夹下创建文件夹commands,将要执行的文件放到文件将爱下,记得把__init__.py文件一并创建了,init.py是声明这个文件夹是一个包。然后在主目录(就是manage.py文件所在目录)执行 python manage.py 文件名即可

# 终端前台开启
python manage.py queue_listener# 终端后台开启
nohup python manage.py queue_listener&# 终端后台开启,并将打印log放入“黑洞”中
nohup python manage.py queue_listener > /dev/null 2>&1&# 查看进程pid
ps -aux | grep python

django多任务开启rabbitmq,并进行声明队列、发送、阻塞监听消息相关推荐

  1. IBMMQ监听消息队列

    ** IBMMQ发送和接收消息示例: pom.xml下载jar包: <dependency><groupId>com.ibm.mq</groupId><art ...

  2. java监控队列_java-Spring Rabbit监听输出队列或接收

    我正在开发具有Rabbitmq支持的应用程序.因此,我有一个消费者和一个生产者.我需要在两种方法之间做出决定,如何在两者之间建立通信. 第一种方式 public void send(){ //send ...

  3. java onmessage监听消息队列_消息队列(MQ)功能场景

    来自公众号:京东技术 消息队列(MQ)是一种不同应用程序之间(跨进程)的通信方法.应用程序通过写入和检索出入列队的数据(消息)来通信,而无需通过专用链接来连接它们.消息传递指的是程序之间通过在消息中发 ...

  4. RabbitMQ介绍与延时队列

    RabbitMQ特性 消息可靠性:典型的生产者-消费者模型,发送端有消息发送确认机制,服务端有消息持久化方案,消费端有消息Ack机制 灵活的消息路由分发:多种多样的交换机 多语言客户端开发AMQP:只 ...

  5. RabbitMQ快速入门--简单队列模型

    入门案例 简单队列模式的模型图: 官方的HelloWorld是基于最基础的消息队列模型来实现的,只包括三个角色: publisher:消息发布者,将消息发送到队列queue queue:消息队列,负责 ...

  6. RabbitMQ详解(三)------RabbitMQ的五种队列

    目录 1.简单队列 2.work 模式 3.发布/订阅模式 4.路由模式 5.主题模式 6.四种交换器 7.总结 上一篇博客我们介绍了RabbitMQ消息通信中的一些基本概念,这篇博客我们介绍 Rab ...

  7. Rabbitmq消费失败死信队列

    为什么80%的码农都做不了架构师?>>>    Rabbitmq 重消费处理 一 处理流程图:    业务交换机:正常接收发送者,发送过来的消息,交换机类型topic AE交换机: ...

  8. RabbitMQ 可靠性、重复消费、顺序性、消息积压解决方案

    前言 为什么引入消息队列?引入 MQ 给我们解决了一些问题,但同时又引入了一些复杂的问题,这些问题是大型项目中必须解决的重点,更重要的是,面试也经常问.实际上消息队列可以说是没法百分之百保证可靠性的! ...

  9. 聊聊RabbitMq动态监听这点事

    很长时间没有分享过学习心得了,看了下发布记录,最后一篇文章的时间都在2020-12-10年了,今天抽时间整理下一个很早就想整理的技术分享.顺便说句题外话,因为我一直没时间整理,再加上开发的小伙伴对Mq ...

最新文章

  1. 如何掌握并在实践中自如运用设计模式
  2. 7.4.6 核PCA
  3. ARM(IMX6U)裸机主频和时钟
  4. [蓝桥杯][算法提高VIP]数组替换-模拟
  5. java 时间戳 星期几_java自定义获取星期几、几点、几分。
  6. sql去重复查询distinct_SQL的简单查询
  7. Stateflow_报错记录
  8. p10可以适配鸿蒙吗,鸿蒙系统支持旧机型吗
  9. maven中scope属性
  10. mongodb的数据怎么导入到hdfs上_如何成为一名合格的数据架构师?
  11. 图论及其应用 2018年期末考试 答案总结
  12. 湖南大学计算机与通信学院李燕,基于SVM的面部表情分析
  13. vue在filters中使用sort()无限循环踩坑实践
  14. LSVGlobal Mapper应用----影像裁剪
  15. 浅析NDI 5(一)基于NDI 5如何打造全球NDI演播室?
  16. Android系统快速编译方式ninja
  17. 自制随机小姐姐摄影api
  18. cx oracle 连接编码,python用cx_Oracle连接oracle编码问题解决办法
  19. 敬业福!2023官方集福攻略
  20. 影流之主——stm32OLED显示一张图片方法基于战舰

热门文章

  1. 基于仿360小说网站(校园网)的源码设计实现(升级版)
  2. Win11终于兼容安卓App!微软推送安卓子系统
  3. 微信小程序 SOTER 生物认证DEMO,指纹识别
  4. 2021-4-13大学化学无机原理(7)酸碱质子理论,弱酸碱及两性物质溶液计算
  5. 13个风格独特的关于(About)页面设计
  6. 关于在Opengl中先平移后旋转和先旋转后平移的效果不一样的原因
  7. MVP模式请求网络数据
  8. c语言的字符数组strlen的详细使用
  9. 《解读基金我的投资观与实践》读后感
  10. 软件测试中 Bug 书写规范