一种消息和任务队列——beanstalkd
- 基于管道 (tube) 和任务 (job) 的工作队列 (work-queue):d
管道(tube),tube类似于消息主题(topic),在一个beanstalkd中可以支持多个tube,每个tube都有自己的producer和consumer;任务(job),beanstalkd用job代替了message的概念,与消息不同,job有一系列状态:
- 内部实现采用了 libevent, 服务器-客户端之间用类似 memcached 的轻量级通讯协议,具有有很高的性能。
- 尽管是内存队列,beanstalkd 提供了 binlog 机制,当重启 beanstalkd 时,当前任务状态能够从纪录的本地 binlog 中恢复。
- 优先级(priority):job可以有0~2^32个优先级,0代表最高优先级,beanstalkd使用最大最小堆处理job的优先级排序,因此reserve命令的时间复杂度是O(logN);
- 延时(delay),有两种方式可以执行延时任务:producer发布任务时指定延时;或者当任务处理完毕后, consumer再次将任务放入队列延时执行 (RELEASE with <delay>);
- 超时重发(time-to-run),Beanstalkd 把job返回给consumer以后:consumer必须在预设的 TTR (time-to-run) 时间内发送 delete / release/ bury 改变任务状态;否则 Beanstalkd 会认为消息处理失败,然后把job交给另外的消费者节点执行。如果consumer预计在 TTR (time-to-run) 时间内无法完成任务, 也可以发送 touch 命令, 它的作用是让 Beanstalkd 从系统时间重新计算 TTR ;
- 任务预留(buried),如果job因为某些原因无法执行, consumer可以把任务置为 buried 状态让 Beanstalkd 保留这些任务。管理员可以通过 peek buried 命令查询被保留的任务,并且进行人工干预。简单的, kick <n> 能够一次性把 n 条被保留的任务踢回队列。
job的状态
- READY,需要立即处理的任务,当延时 (DELAYED) 任务到期后会自动成为当前任务;
- DELAYED,延迟执行的任务, 当消费者处理任务后,可以用将消息再次放回 DELAYED 队列延迟执行;
- RESERVED,已经被消费者获取, 正在执行的任务,Beanstalkd 负责检查任务是否在 TTR(time-to-run) 内完成;
- BURIED,保留的任务: 任务不会被执行,也不会消失,除非有人把它 "踢" 回队列;
- DELETED,消息被彻底删除。Beanstalkd 不再维持这些消息。
put reserve delete-----> [READY] ---------> [RESERVED] --------> *poof*
#!/usr/bin/env python import beanstalkcbeanstalk=beanstalkc.Connection(host="127.0.0.1",port=11300)# pruducer beanstalk.put('hello') beanstalk.put('world')# consumer job1=beanstalk.reserve() print "job1: " + job1.body job1.delete()job2=beanstalk.reserve(timeout=1) print "job2: " + job2.body job2.delete()job3=beanstalk.reserve(timeout=1) # Error,队列中已经没有job了 print "job3: " + job3.body job3.delete()
tube的管理
print beanstalk.tubes() # default print beanstalk.using() # default beanstalk.use('queue1') print beanstalk.using() # queue1 beanstalk.use('queue2') # queue2 print beanstalk.using()print beanstalk.tubes() # default, queue2
如上面的例子,在最后的tubes()命令打印所有tube的时候,并没有看到queue1,这是因为没有任何client 在using或者watching的tube会自动消失。
可以使用watch命令让client同时处理多个tube,而不用担心tube会被销毁:
print beanstalk.tubes() # default print beanstalk.using() # default beanstalk.use('queue1') beanstalk.watch('queue1') print beanstalk.using() # queue1 beanstalk.use('queue2') # queue2 print beanstalk.using()print beanstalk.tubes() # default, queue1, queue2 print beanstalk.watching() # default, queue1
watch的tube如果不存在,会被自动创建,可以用ignore命令取消关注tube:
beanstalk.watch('queue3') print beanstalk.watching() # default, queue3 beanstalk.ignore('queue3') print beanstalk.watching() # default
beanstalkc命令
put with delay release with delay----------------> [DELAYED] <------------.| || (time passes) || |put v reserve | delete-----------------> [READY] ---------> [RESERVED] --------> *poof*^ ^ | || \ release | || `-------------' || || kick || || bury |[BURIED] <---------------'|| delete`--------> *poof*
# pruducer beanstalk.put('hello', delay=10) beanstalk.put('world')# consumer job1=beanstalk.reserve() print "job1: " + job1.body # World job1.delete()job2=beanstalk.reserve(timeout=0) print "job2: " + job2.body # Error,
put命令也支持优先级参数:
# pruducer beanstalk.put('hello', priority=10) beanstalk.put('world', priority=9)# consumer job1=beanstalk.reserve() print "job1: " + job1.body # world job1.delete()job2=beanstalk.reserve(timeout=0) print "job2: " + job2.body # hello job2.delete()
release命令可以释放job回队列:
# pruducer beanstalk.put('hello') beanstalk.put('world')# consumer job1=beanstalk.reserve() print "job1: " + job1.body # hello job1.release()job2=beanstalk.reserve(timeout=0) print "job2: " + job2.body # hello
bury命令将job放到一个特殊的FIFO队列中,之后不能被reserve命令获取,但可以用kick命令扔回工作队列中,之后就能被消费了:
# pruducer beanstalk.put('hello') beanstalk.put('world')# consumer job1=beanstalk.reserve(timeout=0) job1.bury() print job1.stats()['state'] # buried job2=beanstalk.reserve(timeout=0) print "job2: " + job2.body # world job2.delete()beanstalk.kick()job3=beanstalk.reserve(timeout=0) print "job3: " + job3.body # hello job3.delete()
peek命令允许查看一个job,但不会reserve它;
# pruducer beanstalk.put('hello') beanstalk.put('world')#print(beanstalk.stats()) # consumer job1=beanstalk.reserve(timeout=0) job1_id=job1.stats()['id'] print job1_id job1_r=beanstalk.peek(job1_id) print "job1 " + job1_r.body # hello job2=beanstalk.reserve(timeout=0) print "job2: " + job2.body # hello job2.delete()job3=beanstalk.reserve(timeout=0) print "job3: " + job3.body # world job3.delete()
一种消息和任务队列——beanstalkd相关推荐
- RabbitMQ之五种消息模型
首先什么是MQ MQ全称是Message Queue,即消息对列!消息队列是典型的:生产者.消费者模型.生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息.因为消息的生产和消费都是异步的,而 ...
- day72 JavaWeb框架阶段——RabbitMQ消息队列【了解常见的MQ产品,了解RabbitMQ的5种消息模型,会使用Spring AMQP】
文章目录 0.学习目标 1.RabbitMQ 1.1.搜索与商品服务的问题 1.2.消息队列(MQ) 1.2.1.什么是消息队列 1.2.2.AMQP和JMS 1.2.3.常见MQ产品 1.2.4.R ...
- 消息队列之----Beanstalkd
消息队列之----Beanstalkd Beanstalkd 是什么 Beanstalkd,一个高性能.轻量级的分布式内存队列系统 Beanstalkd 特性 优先级(priority) 注:优先级就 ...
- Rabbitmq超级详细的笔记,包括安装,基本命令,rabbitmq的七种消息模式,以及死信队列,延迟队列,优先级队列和惰性队列的介绍
RabbitMQ 文章目录 RabbitMQ 1 RabbitMQ介绍 1.1 基本介绍 1.2 RabbitMQ的安装 1.2.1 ubuntu20.04 安装rabbitmq 1.2.2 cent ...
- 简单易用的任务队列-beanstalkd
更多技术文章,请关注我的个人博客 www.immaxfang.com 和小公众号 Max的学习札记. 概述 beanstalkd 是一个简单快速的分布式工作队列系统,协议基于 ASCII 编码运行在 ...
- 下面是DHCP协议工作的4种消息,正确的顺序应该是(40)。【答案】B
下面是DHCP协议工作的4种消息,正确的顺序应该是(40). ①DHCP Discovery ②DHCP Offer ③DHCP Request ④DHCP Ack (40)A.①③②④ B.①②③④ ...
- RabbitMQ(八):SpringBoot 整合 RabbitMQ(三种消息确认机制以及消费端限流)
说明 本文 SpringBoot 与 RabbitMQ 进行整合的时候,包含了三种消息的确认模式,如果查询详细的确认模式设置,请阅读:RabbitMQ的三种消息确认模式 同时消费端也采取了限流的措施, ...
- Redis的两种消息模式
Redis的两种消息模式 队列模式 发布订阅模式 队列模式 队列模式下每个消费者可以同时从多个服务器读取消息,但是每个消息只能被一个消费者读取. 在队列模式下其实每次插入的数据都是载入在最前面的,而先 ...
- SpringBoot整合RabbitMQ 实现五种消息模型
目录 SpringBoot中使用RabbitMQ 搭建初始环境 引入依赖 配置配置文件 测试类 注入 rabbitTemplate 消息队列RabbitMQ之五种消息模型 第一种直连模型使用 开发生产 ...
最新文章
- 软件质量保证与测试笔记——江湖救急版
- 【网络爬虫】BeautfulSoup下载美图(真の能看懂~!)
- 基于c#的相关性分析_开源Math.NET基础数学类库使用(11)C#计算相关系数
- keil uVision4 创建项目
- AX 2009 父窗体参数记录传递
- 计算机工程专业毕业,新加坡国立大学计算机工程专业毕业生亲临介绍
- Android 存储学习之保存系统短信到SD卡
- swift4 label显示html,Swift:在标签或textView中显示HTML数据
- 【2019杭电多校第七场1010=HDU6655】Just Repeat(思维)
- 畅视影院APP源码,电影网站app源码
- 腾科张老师教你如何在cisco路由器上部署和使用FTP/TFTP
- html基础dw,HTML基础DW使用教程
- c#webservice接口調用_Windows 桌面应用开发之 C# 调用 WebService 接口
- UNRAID挂载exFat格式的USB磁盘
- pythongui做计算器_Python——GUI编程 利息计算器 作业9(python programming)
- [Wpf] . [Theme] 重构/Themes/Generic.xaml 创建一个Custom Control的典型做法
- 云适配CEO陈本峰:移动办公真正产生商业价值要落到实际场景中
- Linux-Journal
- 【转】MBBMS CA方案
- [转载] 中华典故故事(孙刚)——40 不见棺材不落泪,不到黄河不死心
热门文章
- 精度 vs 效率:模型越小,精度就一定越低吗?
- 破解自动机器学习的黑匣子
- 风之语.人在职场也需要'备胎'
- 图神经网络(GNN)系列
- Python 之 matplotlib (十四)图中图
- 完美解决latex警告信息:Citation `Gusfield:97‘ on page 1 undefined.
- TypeError: __init__() takes 1 positional argument but 4 were given
- 玩电子游戏的神经网络,告诉我们大脑是如何决策的
- Nature:实验室培育的大脑可以拥有意识吗?
- 后香农时代,华为提出10大数学挑战问题