说明

一个实现消息队列简单管理的对象

内容

代码

import redis
# 基于Redis Stream的消息队列
class LittleRQ:def __init__(self, host, port, decode_responses=True,password=None):self.conn  = redis.Redis(host=host, port=int(port), decode_responses=decode_responses,password=password) # 向队列存入一条消息def add_msg(self,stream_name, msg_dict, maxlen=10, approximate=False):self.conn.xadd(stream_name, msg_dict,maxlen=maxlen, approximate=approximate)return True# 队列的长度def len_of_queue(self, stream_name):try:recs = self.conn.xlen(stream_name)except:recs = 0return recs# 确保组的存在def ensure_group(self, stream_name, group_name ='group1', order='fifo'):try:res = self.conn.xinfo_groups(stream_name)if len(res) == 0:if order.lower() == 'fifo':self.conn.xgroup_create(stream_name, group_name, id=0)else:self.conn.xgroup_create(stream_name, group_name, id='$')                except:if order.lower() == 'fifo':self.conn.xgroup_create(stream_name, group_name, id=0)else:self.conn.xgroup_create(stream_name, group_name, id='$')return True# 检查一个stream的状态def get_a_stream_state(self, stream_name):try:return self.conn.xinfo_stream(stream_name)except:return False# 检查一个group的状态def get_a_stream_groups(self, stream_name):try:return self.conn.xinfo_groups(stream_name)except:return False# 删掉一个队列def del_a_stream(self, stream_name):if self.conn.exists(stream_name):self.conn.delete(stream_name)return True# 获取消息def get_msg(self, stream_name, group_name ='group1', consumer_name = 'consumer1' , count=10000, block=None, msg_id = None):# 历史消息consumer_id='0-0'recs1 = self.conn.xreadgroup(group_name, consumer_name, {stream_name: consumer_id}, block=block, count=count) # 指定的新消息consumer_id =msg_id or '>'recs2 = self.conn.xreadgroup(group_name, consumer_name, {stream_name: consumer_id}, block=block, count=count) recs = recs1 + recs2# tuple_listif len(recs):return recs[0][1]else:return False# 获取被耽误的消息def get_pending_info(self, stream_name, group_name='group1'):pending_info = self.conn.xpending(stream_name, group_name)return pending_info# 获取被耽误的消息def get_pending_msg(self, stream_name, group_name='group1', consumer_name = 'consumer1' ,count = 100):pending_info = self.get_pending_info(stream_name, group_name = group_name)pending_num = pending_info.get('pending') print('>>>', pending_num)if pending_num>0:recs =  self.get_msg(stream_name,group_name=group_name,consumer_name=consumer_name,count=count , msg_id =pending_info.get('min')  )else:recs = []return recs# 确认消息def ack_msg(self, stream_name,mid ,group_name='group1'):return self.conn.xack(stream_name, group_name, mid)# 删除消息(不然统计队列内消息的数量不会减少)def del_msg(self, stream_name,mid ):return self.conn.xdel(stream_name, mid)

使用

# 1 建立实例
lq = LittleRQ('172.17.0.1',6379, password='YOURPASS' )
# 2 数据
some_data = {'name': 'Ashley', 'age': '16'}
# 3 队列名
stream_name = 'test1'
# 4 存消息
for i in range(int(10)):lq.add_msg(stream_name, some_data, maxlen =int(10))
# 5 查看队列长度
lq.len_of_queue(stream_name)
# 6 查看pending的消息
pending_msg = lq.get_pending_msg(stream_name)
# 7 将pending消息确认掉
for k in pending_msg:lq.ack_msg(stream_name ,k[0])
# 8 为某个队列增加组
# 不在当前列表集合中
lq.ensure_group(stream_name)
# 9 获取消息列表/没有消息时返回False
rec_tuple_list = lq.get_msg(stream_name)

Python一些可能用的到的函数系列81 基于Redis Stream的简单消息队列对象相关推荐

  1. python使用redis的消息队列_Redis实现简单消息队列

    任务异步化 打开浏览器,输入地址,按下回车,打开了页面.于是一个HTTP请求(request)就由客户端发送到服务器,服务器处理请求,返回响应(response)内容. 我们每天都在浏览网页,发送大大 ...

  2. Python一些可能用的到的函数系列37flipflop 对象

    说明 对于调用成功一个函数,立即调用另一个函数的应用.例如从数据库获取了一批数据,将某些结果回写的过程. 内容 假设有两个函数,写成了不定参格式,这样比较通用. hello def hello(*ar ...

  3. Python一些可能用的到的函数系列11 本地哈希和服务器哈希比对

    说明 摘要可以作为文件的唯一识别和信息验证,在进行大量存储时是很有比较的. 内容 以前不太清楚哪里出过错,感觉服务器上计算hash和本地不一致. 由于python的hashlib是对二进制文本进行计算 ...

  4. Python一些可能用的到的函数系列76 最大回撤率

    说明 这个问题其实有点绕,写在这里备忘吧. 内容 最大回撤:从任何一个峰值向后的最大落差. 两个要点: 1 找峰值 2 从峰值往后看,找到最大落差 最后取所有峰值的最大值就是最大回撤. 如下一个数列, ...

  5. Python一些可能用的到的函数系列102 推断df的变量类型

    说明 这个主要是为了在sql里快速建表 内容 对每列的变量进行遍历,去掉空之后进行简单的判定.数值(double.float和date)的数据长度是固定的,变化比较大的是字符型.我觉得可以采用最大最小 ...

  6. Python一些可能用的到的函数系列28 超大文本文件分割读取

    说明 有时候会有一些特别大的文本文件需要拆分处理 内容 1 曾经用过的笨办法 1.1 读取大文件的末尾n行 # 采用偏置方法读取末尾三百行文本 def read_tail_n(fname, n_lin ...

  7. python 系列 03 - 基于scrapy框架的简单爬虫

    文章目录 1. scrapy介绍 2 新建爬虫项目 3 新建蜘蛛文件 4 运行爬虫 5 爬取内容 5.1分析网页结构 5.2 关于Xpath解析 5.3 接着解析电影数据 5.4 下载缩略图 5.5 ...

  8. python银行排队系统_python-我需要基于Web的系统的消息/排队解决...

    我正在寻找在Ubuntu上运行的基于Web的系统的消息/队列解决方案. 该系统基于以下技术构建: Javascript (Extjs framework) – Frontend PHP Python ...

  9. exit函数_Linux进程间通信详解(三) 消息队列及函数

    消息队列的概念 消息队列就是一个消息的链表,每个消息队列都有一个队列头,用结构struct msg_queue来描述.队列头中包含了该队列的大量信息,包括消息队列的键值.用户ID.组ID.消息数目.读 ...

最新文章

  1. java 程序编译和运行的过程
  2. c++ 截取\r\n问题
  3. python递归迭代_Python入门基础知识点(python迭代器和递归)
  4. vscode 快速调到定义处_vim技巧:在程序代码中快速跳转,在文件内跳转到变量定义处...
  5. 来一份全面的面试宝典练练手,面试真题解析
  6. java dos窗口小工具下载,maxdos 9 3-maxdos工具箱 v9.3 官方版
  7. 深信服SCSA安全工程师题库(方便大家复习备考)
  8. MIT6.828 Part B: Copy-on-Write Fork
  9. 出国计算机相关专业,加拿大计算机相关专业解析
  10. Le le's picture when aged 5 monthes old_拔剑-浆糊的传说_新浪博客
  11. 多部民族电影在移动电影院App首映发布成功
  12. Pr 入门教程 如何使用超级键效果?
  13. MD5加密(纯代码)
  14. PHP快速入门(一)
  15. VMware下Ubuntu如何与主机进行文件共享(留作自用)
  16. vue3 获取当前路由参数
  17. python-字典附加题3- 股票查询
  18. HUST软测1504班第6周小组作业成绩
  19. Macromedia Breeze 快速安装
  20. win7浏览器主页修改不过来_Windows7 IE主页不能修改怎么办?

热门文章

  1. Windows下的工作组与域环境以及域的搭建管理流程
  2. csust 1097
  3. java 颜色 16进制转换_Java中Color和16进制字符串互相转换的方法
  4. frpc启动失败总结
  5. 西门子Siemens PLc自动配料称重系统,托尼多称,modbus通讯,变频器控制,温度模拟量处理,Pid控制,配料重量处理
  6. 一个超简单的android任务列队(排队)3
  7. 基于springboot框架开发的办公自动化OA系统
  8. ubuntu 更换pip源
  9. 由QQ聊天文件开始的渗透录像
  10. 【闭包】什么是闭包?闭包用途