django多任务开启rabbitmq,并进行声明队列、发送、阻塞监听消息
一,编写rabbitmq基础模块类
1,安装pika模块
pip install pika
这里需要注意的是: pika官网明确说明 pika==0.11.0版本只支持python2.6以前的版本。
重点: 在下载时可以进入官网确定你的版本所需要的pika版本号。
pika官网地址:https://pypi.org/project/pika/
2,实现rabbitmq基础模块类的编写
这里实现了Rabbitmq对象初始化、连接mq、发送mq消息、阻塞监听消息并回调。
本模块代码作为rabbitmq基础模块类,为业务模块调用提供方法。
对于代码的详解已经写道注释中了。
这里的一个connection就是一个tcp连接。为了提升tcp连接复用性,在每个连接基础上可以建立多个channel信道,每个信道都会被指派一个唯一的 ID。同时 RabbitMQ 可以确保每个线程的私密性,就像拥有独立的连接一样。但考虑到如果数据量过大,会导致连接阻塞,最终这里选择一个connect连接只对应了一个channel信道。
关于RabbitMQ 中 Connection 和 Channel 详解:https://www.cnblogs.com/eleven24/p/10326718.html
类似于下图:
模块代码如下:
import pika
import time
import logging
logger = logging.getLogger('mydjango')
# import sys
from django.conf import settings
from retrying import retryclass RabbitmqServer(object):def __init__(self,username,password,serverip,port,virtual_host):self.username =usernameself.password = passwordself.serverip = serveripself.port = portself.virtual_host = virtual_host# def connent(self):# for i in range(3):# try:# logger.info("into mq connet")# user_pwd = pika.PlainCredentials(self.username,self.password)# logger.info("create mq ...")# logger.info("%s,%s,%s,%s,%s"%(self.virtual_host,self.serverip,self.port,self.password,self.username))# s_conn = pika.BlockingConnection(pika.ConnectionParameters(virtual_host=self.virtual_host,host= self.serverip,port=self.port, credentials=user_pwd)) # 创建连接# logger.info('create channel...')# self.channel = s_conn.channel()# logger.info('connect successful')# break# except Exception as e:# logger.info("连接mq失败,沉睡10s再试,共沉睡三次,失败原因:%s",e)# time.sleep(10)@retry(stop_max_delay=30000, wait_fixed=5000)def connent(self):logger.info("into mq connet")user_pwd = pika.PlainCredentials(self.username, self.password)logger.info("create mq ...")logger.info("%s,%s,%s,%s,%s" % (self.virtual_host, self.serverip, self.port, self.password, self.username))# 创建 mq连接s_conn = pika.BlockingConnection(pika.ConnectionParameters(virtual_host=self.virtual_host, host=self.serverip, port=self.port,credentials=user_pwd)) logger.info('create channel...')self.channel = s_conn.channel()logger.info('connect successful')def productMessage(self,queuename,message):self.channel.queue_declare(queue=queuename, durable=True)self.channel.basic_publish(exchange='',routing_key=queuename,#写明将消息发送给队列queuenamebody=message, #要发送的消息properties=pika.BasicProperties(delivery_mode=2,)#设置消息持久化,将要发送的消息的属性标记为2,表示该消息要持久化)def expense(self,queuename,func):""":param queuename: 消息队列名称:param func: 要回调的方法名"""self.channel.basic_qos(prefetch_count=1)self.channel.basic_consume(func,queue=queuename,)self.channel.start_consuming()def callback(ch, method, properties, body):print(" [消费者] Received %r" % body)time.sleep(1)print(" [消费者] Done")ch.basic_ack(delivery_tag=method.delivery_tag)# 接收到消息后会给rabbitmq发送一个确认if __name__ != '__main__': # 测试服务是否能启动时使用from django.conf import settings# username = settings.RABBITMQCONFIG.get("username")# password = settings.RABBITMQCONFIG.get("password")# severip = settings.RABBITMQCONFIG.get("severip")# port = settings.RABBITMQCONFIG.get("port")username,password,severip,port,virtual_host = settings.PONEDITOR_RMQ_USER,settings.PONEDITOR_RMQ_PASSWD,settings.PONEDITOR_RMQ_IP,\settings.PONEDITOR_RMQ_PORT,settings.PONEDITOR_RMQ_VIRHOSTRabbitmqClient = RabbitmqServer(username,password,severip,port,virtual_host)if __name__ == '__main__':import jsonRabbitmqClient = RabbitmqServer("root", "ssb@2019",'172.31.0.54',5673,"YuXinIBTool")RabbitmqClient.connent()data = {"code":3}RabbitmqClient.productMessage("test3",json.dumps(data))RabbitmqClient.expense("test3",callback)
二,django中多进程开启Rabbitmq声明队列、发送消息、阻塞监听
此模块中将使用多进程,为方便结束开启的多个进程,这里使用kill -15 方法删掉进程。
kill -9 和kill -15的区别:https://www.cnblogs.com/domestique/p/8241219.html
from django.core.management.base import BaseCommand
from utils.echo_display import zip_unzip
import os
import json
import pymysql
import pandas as pd
import time
from django.conf import settings
# from utils.structure_modify import modify
from utils.annual_entrusted_modify import modify_annual
import signal
import multiprocessing
# import threadingclass Command(BaseCommand):def handle(self, *args, **options):import logginglogger = logging.getLogger('mydjango')from utils.Rabbitmqserver import RabbitmqClientfrom django.conf import settingsimport jsondef parse_result_func(ch, method, properties, body):### 逻辑程序logger.info("%s start to Analytical data..." %(queue_name))logger.info(" [接收到的请求头] Received %r [接收到的请求体] Received %r" % (properties.headers, body))try:req_res = json.loads(body)req_head = dict(properties.headers)project_id = str(req_res["projectId"])logger.info("Analytical data successful")except Exception as e:logger.error("there is a failed cause : rabbitmq parameter not correct %s"%(e) )logger.error("failed info -- properties : %s body : %s"%(properties,body))returntry:logger.info("start logical")finlall_docx_name = modify_annual.main(body)logger.info("logical successful")except Exception as e:logger.error(str(body))logger.info("send a reject to rabbitmq queue : %s" % (queue_name))# 接收到消息后会给rabbitmq发送一个拒绝ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False) logger.info("reject process is end")else:logger.info("send a ack to rabbitmq queue : %s" % (queue_name))# 接收到消息后会给rabbitmq发送一个确认ch.basic_ack(delivery_tag=method.delivery_tag) def term(sig_num, addtion):logger.info("stop the process current pid is %s ,group id is %s" % (os.getpid(), os.getpgrp()))os.killpg(os.getpgid(os.getpid()), signal.SIGKILL)def func(args):arguments = {'x-dead-letter-exchange': "exchange.e50.oneditor", # 延迟结束后指向交换机(死信收容交换机)'x-dead-letter-routing-key': "rkey.oneditor5.dlx", # 延迟结束后指向队列(死信收容队列),可直接设置queue name也可以设置routing-key}logger.info("into %s sub-process pid is %s ,group id is %s"%(args,os.getpid(), os.getpgrp()))global queue_namequeue_name = argslogger.info("start to connect the RabbitmqClient")# 连接mq,建立connention和channelRabbitmqClient.connent()# 声明队列RabbitmqClient.channel.queue_declare(queue=args, durable=True,arguments=arguments)logger.info("connect the RabbitmqClient successful")# 调用回调函数并祖泽监听队列RabbitmqClient.expense(args, parse_result_func)logger.info("into listening logical --queue_listener--")# 接受kill -15 消息后,调用term函数signal.signal(signal.SIGTERM, term)logger.info("current pid is %s ,group id is %s" % (os.getpid(), os.getpgrp()))# 开启多进程processes_list = []listenque_list = ["queue.p"+str(i)+".oneditor.docx" for i in range(10)]# listenque_list = ["queue.p0.oneditor.docx"]for listenque in listenque_list:t = multiprocessing.Process(target=func, args=(listenque,))t.daemon = Truet.start()processes_list.append(t)for p in processes_list:p.join()############################################################ print("current pid is % s" % os.getpid())# processes_list = []## listenque_list = ["queue.p1.oneditor.docx","queue.p2.oneditor.docx"]# # listenque_list = ["queue.p"+str(i)+".oneditor.docx" for i in range(2)]# print(listenque_list)# for listenque in listenque_list:# t = multiprocessing.Process(target=func, args=(listenque,))# t.daemon = True# t.start()# processes_list.append(t)# time.sleep(2)## try:# for p in processes_list:# p.join()# except Exception as e:# print(str(e))
########################################################### t1 = threading.Thread(target=func, args=("queue.p1.oneditor.docx",))# t2 = threading.Thread(target=func, args=("queue.p2.oneditor.docx",))## t1.start()# t2.start()########################################################## class Mythreadsend1(threading.Thread):# def run(self):# logger.info("start to listening the info...")# RabbitmqClient.expense("queue.p1.oneditor.docx", parse_result_func)## class Mythreadsend2(threading.Thread):# def run(self):# logger.info("start to listening the info...")# RabbitmqClient.expense("queue.p2.oneditor.docx", parse_result_func)## t1 = Mythreadsend1()# t2 = Mythreadsend2()# t1.start()# t2.start()
三,django中使用manage.py开启独立进程
参考如下:https://blog.csdn.net/luslin1711/article/details/87885145
1、开发时会使用django环境进行一些初始化操作,这些程序一般只执行几次,但是需要django中的环境变量。
2、使用django运行阻塞监听的程序,比如Rabbitmq监听,放在主程序中就阻塞住了,需要另外开命令执行
在创建的app下创建文件夹management,在management文件夹下创建文件夹commands,将要执行的文件放到文件将爱下,记得把__init__.py文件一并创建了,init.py是声明这个文件夹是一个包。然后在主目录(就是manage.py文件所在目录)执行 python manage.py 文件名即可
# 终端前台开启
python manage.py queue_listener# 终端后台开启
nohup python manage.py queue_listener&# 终端后台开启,并将打印log放入“黑洞”中
nohup python manage.py queue_listener > /dev/null 2>&1&# 查看进程pid
ps -aux | grep python
django多任务开启rabbitmq,并进行声明队列、发送、阻塞监听消息相关推荐
- IBMMQ监听消息队列
** IBMMQ发送和接收消息示例: pom.xml下载jar包: <dependency><groupId>com.ibm.mq</groupId><art ...
- java监控队列_java-Spring Rabbit监听输出队列或接收
我正在开发具有Rabbitmq支持的应用程序.因此,我有一个消费者和一个生产者.我需要在两种方法之间做出决定,如何在两者之间建立通信. 第一种方式 public void send(){ //send ...
- java onmessage监听消息队列_消息队列(MQ)功能场景
来自公众号:京东技术 消息队列(MQ)是一种不同应用程序之间(跨进程)的通信方法.应用程序通过写入和检索出入列队的数据(消息)来通信,而无需通过专用链接来连接它们.消息传递指的是程序之间通过在消息中发 ...
- RabbitMQ介绍与延时队列
RabbitMQ特性 消息可靠性:典型的生产者-消费者模型,发送端有消息发送确认机制,服务端有消息持久化方案,消费端有消息Ack机制 灵活的消息路由分发:多种多样的交换机 多语言客户端开发AMQP:只 ...
- RabbitMQ快速入门--简单队列模型
入门案例 简单队列模式的模型图: 官方的HelloWorld是基于最基础的消息队列模型来实现的,只包括三个角色: publisher:消息发布者,将消息发送到队列queue queue:消息队列,负责 ...
- RabbitMQ详解(三)------RabbitMQ的五种队列
目录 1.简单队列 2.work 模式 3.发布/订阅模式 4.路由模式 5.主题模式 6.四种交换器 7.总结 上一篇博客我们介绍了RabbitMQ消息通信中的一些基本概念,这篇博客我们介绍 Rab ...
- Rabbitmq消费失败死信队列
为什么80%的码农都做不了架构师?>>> Rabbitmq 重消费处理 一 处理流程图: 业务交换机:正常接收发送者,发送过来的消息,交换机类型topic AE交换机: ...
- RabbitMQ 可靠性、重复消费、顺序性、消息积压解决方案
前言 为什么引入消息队列?引入 MQ 给我们解决了一些问题,但同时又引入了一些复杂的问题,这些问题是大型项目中必须解决的重点,更重要的是,面试也经常问.实际上消息队列可以说是没法百分之百保证可靠性的! ...
- 聊聊RabbitMq动态监听这点事
很长时间没有分享过学习心得了,看了下发布记录,最后一篇文章的时间都在2020-12-10年了,今天抽时间整理下一个很早就想整理的技术分享.顺便说句题外话,因为我一直没时间整理,再加上开发的小伙伴对Mq ...
最新文章
- 如何掌握并在实践中自如运用设计模式
- 7.4.6 核PCA
- ARM(IMX6U)裸机主频和时钟
- [蓝桥杯][算法提高VIP]数组替换-模拟
- java 时间戳 星期几_java自定义获取星期几、几点、几分。
- sql去重复查询distinct_SQL的简单查询
- Stateflow_报错记录
- p10可以适配鸿蒙吗,鸿蒙系统支持旧机型吗
- maven中scope属性
- mongodb的数据怎么导入到hdfs上_如何成为一名合格的数据架构师?
- 图论及其应用 2018年期末考试 答案总结
- 湖南大学计算机与通信学院李燕,基于SVM的面部表情分析
- vue在filters中使用sort()无限循环踩坑实践
- LSVGlobal Mapper应用----影像裁剪
- 浅析NDI 5(一)基于NDI 5如何打造全球NDI演播室?
- Android系统快速编译方式ninja
- 自制随机小姐姐摄影api
- cx oracle 连接编码,python用cx_Oracle连接oracle编码问题解决办法
- 敬业福!2023官方集福攻略
- 影流之主——stm32OLED显示一张图片方法基于战舰