beanstalkd 是一个轻量级消息中间件,其主要特性:
  • 基于管道  (tube) 和任务 (job) 的工作队列 (work-queue):d

    管道(tube),tube类似于消息主题(topic),在一个beanstalkd中可以支持多个tube,每个tube都有自己的producer和consumer;
    任务(job),beanstalkd用job代替了message的概念,与消息不同,job有一系列状态: 
  • 内部实现采用了 libevent, 服务器-客户端之间用类似 memcached 的轻量级通讯协议,具有有很高的性能。
  • 尽管是内存队列,beanstalkd 提供了 binlog 机制,当重启 beanstalkd 时,当前任务状态能够从纪录的本地 binlog 中恢复。
  • 优先级(priority):job可以有0~2^32个优先级,0代表最高优先级,beanstalkd使用最大最小堆处理job的优先级排序,因此reserve命令的时间复杂度是O(logN);
  • 延时(delay),有两种方式可以执行延时任务:producer发布任务时指定延时;或者当任务处理完毕后, consumer再次将任务放入队列延时执行 (RELEASE with <delay>);
  • 超时重发(time-to-run),Beanstalkd 把job返回给consumer以后:consumer必须在预设的 TTR (time-to-run) 时间内发送 delete / release/ bury 改变任务状态;否则 Beanstalkd 会认为消息处理失败,然后把job交给另外的消费者节点执行。如果consumer预计在 TTR (time-to-run) 时间内无法完成任务, 也可以发送 touch 命令, 它的作用是让 Beanstalkd 从系统时间重新计算 TTR ;
  • 任务预留(buried),如果job因为某些原因无法执行, consumer可以把任务置为 buried 状态让 Beanstalkd 保留这些任务。管理员可以通过 peek buried 命令查询被保留的任务,并且进行人工干预。简单的, kick <n> 能够一次性把 n 条被保留的任务踢回队列。

job的状态

  • READY,需要立即处理的任务,当延时 (DELAYED) 任务到期后会自动成为当前任务;
  • DELAYED,延迟执行的任务, 当消费者处理任务后,可以用将消息再次放回 DELAYED 队列延迟执行;
  • RESERVED,已经被消费者获取, 正在执行的任务,Beanstalkd 负责检查任务是否在 TTR(time-to-run) 内完成;
  • BURIED,保留的任务: 任务不会被执行,也不会消失,除非有人把它 "踢" 回队列;
  • DELETED,消息被彻底删除。Beanstalkd 不再维持这些消息。
如下,是一个典型任务的生命周期:
producer执行put命令将job放入队列,consumer执行reserve命令从队列取出job,执行完毕后发送delete命令告诉beanstalkd删除该job。
如果没有执行delete命令,beanstalkd将在一个TTR周期(默认120s)后重新将该job加入队列;
 put            reserve               delete-----> [READY] ---------> [RESERVED] --------> *poof*

下面是一个使用beanstalkc(python客户端)操作beanstalkd的例子:
#!/usr/bin/env python
import beanstalkcbeanstalk=beanstalkc.Connection(host="127.0.0.1",port=11300)# pruducer
beanstalk.put('hello')
beanstalk.put('world')# consumer
job1=beanstalk.reserve()
print "job1: " + job1.body
job1.delete()job2=beanstalk.reserve(timeout=1)
print "job2: " + job2.body
job2.delete()job3=beanstalk.reserve(timeout=1)       # Error,队列中已经没有job了
print "job3: " + job3.body
job3.delete()

tube的管理

beanstalkd通过tube维护多个队列,每个tube都是一个独立的queue,可以使用use命令切换tube,如果切换的tube不存在,会自动创建一个:
print beanstalk.tubes()     # default
print beanstalk.using()     # default 

beanstalk.use('queue1')
print beanstalk.using()     # queue1

beanstalk.use('queue2')     # queue2
print beanstalk.using()print beanstalk.tubes()     # default, queue2

如上面的例子,在最后的tubes()命令打印所有tube的时候,并没有看到queue1,这是因为没有任何client 在using或者watching的tube会自动消失。

可以使用watch命令让client同时处理多个tube,而不用担心tube会被销毁:

print beanstalk.tubes()     # default
print beanstalk.using()     # default

beanstalk.use('queue1')
beanstalk.watch('queue1')
print beanstalk.using()     # queue1

beanstalk.use('queue2')     # queue2
print beanstalk.using()print beanstalk.tubes()     # default, queue1, queue2
print beanstalk.watching()  # default, queue1

watch的tube如果不存在,会被自动创建,可以用ignore命令取消关注tube:

beanstalk.watch('queue3')
print beanstalk.watching()  # default, queue3
beanstalk.ignore('queue3')
print beanstalk.watching()  # default

注意,watch和use是两个独立的动作,use一个tube不代表watching它了,反之watch一个tube也不代表using它;

beanstalkc命令

如下,是一个job更完整的状态变迁和生命周期:
put with delay               release with delay----------------> [DELAYED] <------------.|                   || (time passes)     ||                   |put                  v     reserve       |       delete-----------------> [READY] ---------> [RESERVED] --------> *poof*^  ^                |  ||   \  release      |  ||    `-------------'   ||                      || kick                 ||                      ||       bury           |[BURIED] <---------------'||  delete`--------> *poof*

一些例子:
put的时候加上delay参数,可以延迟发布job:
# pruducer
beanstalk.put('hello', delay=10)
beanstalk.put('world')# consumer
job1=beanstalk.reserve()
print "job1: " + job1.body  # World
job1.delete()job2=beanstalk.reserve(timeout=0)
print "job2: " + job2.body  # Error,

put命令也支持优先级参数:

# pruducer
beanstalk.put('hello', priority=10)
beanstalk.put('world', priority=9)# consumer
job1=beanstalk.reserve()
print "job1: " + job1.body  # world
job1.delete()job2=beanstalk.reserve(timeout=0)
print "job2: " + job2.body  # hello
job2.delete()

release命令可以释放job回队列:

# pruducer
beanstalk.put('hello')
beanstalk.put('world')# consumer
job1=beanstalk.reserve()
print "job1: " + job1.body  # hello
job1.release()job2=beanstalk.reserve(timeout=0)
print "job2: " + job2.body  # hello

bury命令将job放到一个特殊的FIFO队列中,之后不能被reserve命令获取,但可以用kick命令扔回工作队列中,之后就能被消费了:

# pruducer
beanstalk.put('hello')
beanstalk.put('world')# consumer
job1=beanstalk.reserve(timeout=0)
job1.bury()
print job1.stats()['state'] # buried

job2=beanstalk.reserve(timeout=0)
print "job2: " + job2.body  # world
job2.delete()beanstalk.kick()job3=beanstalk.reserve(timeout=0)
print "job3: " + job3.body  # hello
job3.delete()

peek命令允许查看一个job,但不会reserve它;

# pruducer
beanstalk.put('hello')
beanstalk.put('world')#print(beanstalk.stats())  # consumer
job1=beanstalk.reserve(timeout=0)
job1_id=job1.stats()['id']
print job1_id
job1_r=beanstalk.peek(job1_id)
print "job1 " + job1_r.body  # hello

job2=beanstalk.reserve(timeout=0)
print "job2: " + job2.body  # hello
job2.delete()job3=beanstalk.reserve(timeout=0)
print "job3: " + job3.body  # world
job3.delete()

一种消息和任务队列——beanstalkd相关推荐

  1. RabbitMQ之五种消息模型

    首先什么是MQ MQ全称是Message Queue,即消息对列!消息队列是典型的:生产者.消费者模型.生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息.因为消息的生产和消费都是异步的,而 ...

  2. day72 JavaWeb框架阶段——RabbitMQ消息队列【了解常见的MQ产品,了解RabbitMQ的5种消息模型,会使用Spring AMQP】

    文章目录 0.学习目标 1.RabbitMQ 1.1.搜索与商品服务的问题 1.2.消息队列(MQ) 1.2.1.什么是消息队列 1.2.2.AMQP和JMS 1.2.3.常见MQ产品 1.2.4.R ...

  3. 消息队列之----Beanstalkd

    消息队列之----Beanstalkd Beanstalkd 是什么 Beanstalkd,一个高性能.轻量级的分布式内存队列系统 Beanstalkd 特性 优先级(priority) 注:优先级就 ...

  4. Rabbitmq超级详细的笔记,包括安装,基本命令,rabbitmq的七种消息模式,以及死信队列,延迟队列,优先级队列和惰性队列的介绍

    RabbitMQ 文章目录 RabbitMQ 1 RabbitMQ介绍 1.1 基本介绍 1.2 RabbitMQ的安装 1.2.1 ubuntu20.04 安装rabbitmq 1.2.2 cent ...

  5. 简单易用的任务队列-beanstalkd

    更多技术文章,请关注我的个人博客 www.immaxfang.com 和小公众号 Max的学习札记. 概述 beanstalkd 是一个简单快速的分布式工作队列系统,协议基于 ASCII 编码运行在 ...

  6. 下面是DHCP协议工作的4种消息,正确的顺序应该是(40)。【答案】B

    下面是DHCP协议工作的4种消息,正确的顺序应该是(40). ①DHCP Discovery ②DHCP Offer ③DHCP Request ④DHCP Ack (40)A.①③②④ B.①②③④ ...

  7. RabbitMQ(八):SpringBoot 整合 RabbitMQ(三种消息确认机制以及消费端限流)

    说明 本文 SpringBoot 与 RabbitMQ 进行整合的时候,包含了三种消息的确认模式,如果查询详细的确认模式设置,请阅读:RabbitMQ的三种消息确认模式 同时消费端也采取了限流的措施, ...

  8. Redis的两种消息模式

    Redis的两种消息模式 队列模式 发布订阅模式 队列模式 队列模式下每个消费者可以同时从多个服务器读取消息,但是每个消息只能被一个消费者读取. 在队列模式下其实每次插入的数据都是载入在最前面的,而先 ...

  9. SpringBoot整合RabbitMQ 实现五种消息模型

    目录 SpringBoot中使用RabbitMQ 搭建初始环境 引入依赖 配置配置文件 测试类 注入 rabbitTemplate 消息队列RabbitMQ之五种消息模型 第一种直连模型使用 开发生产 ...

最新文章

  1. 软件质量保证与测试笔记——江湖救急版
  2. 【网络爬虫】BeautfulSoup下载美图(真の能看懂~!)
  3. 基于c#的相关性分析_开源Math.NET基础数学类库使用(11)C#计算相关系数
  4. keil uVision4 创建项目
  5. AX 2009 父窗体参数记录传递
  6. 计算机工程专业毕业,新加坡国立大学计算机工程专业毕业生亲临介绍
  7. Android 存储学习之保存系统短信到SD卡
  8. swift4 label显示html,Swift:在标签或textView中显示HTML数据
  9. 【2019杭电多校第七场1010=HDU6655】Just Repeat(思维)
  10. 畅视影院APP源码,电影网站app源码
  11. 腾科张老师教你如何在cisco路由器上部署和使用FTP/TFTP
  12. html基础dw,HTML基础DW使用教程
  13. c#webservice接口調用_Windows 桌面应用开发之 C# 调用 WebService 接口
  14. UNRAID挂载exFat格式的USB磁盘
  15. pythongui做计算器_Python——GUI编程 利息计算器 作业9(python programming)
  16. [Wpf] . [Theme] 重构/Themes/Generic.xaml 创建一个Custom Control的典型做法
  17. 云适配CEO陈本峰:移动办公真正产生商业价值要落到实际场景中
  18. Linux-Journal
  19. 【转】MBBMS CA方案
  20. [转载] 中华典故故事(孙刚)——40 不见棺材不落泪,不到黄河不死心

热门文章

  1. 精度 vs 效率:模型越小,精度就一定越低吗?
  2. 破解自动机器学习的黑匣子
  3. 风之语.人在职场也需要'备胎'
  4. 图神经网络(GNN)系列
  5. Python 之 matplotlib (十四)图中图
  6. 完美解决latex警告信息:Citation `Gusfield:97‘ on page 1 undefined.
  7. TypeError: __init__() takes 1 positional argument but 4 were given
  8. 玩电子游戏的神经网络,告诉我们大脑是如何决策的
  9. Nature:实验室培育的大脑可以拥有意识吗?
  10. 后香农时代,华为提出10大数学挑战问题