导读:Task是web开发中一个经典场景,我们时常需要延时任务,或者定时任务,通常都需要任务队列。常见的任务队列如celery,lmstfy是美图开源的任务队列。本文作者详细剖析了lmstfy的架构实现,干货满满,适合技术人员阅读。

lmstfy(Let Me Schedule Task For You) 是美图架构基础服务团队在 2018 年初基于 Redis 实现的简单任务队列(Task Queue)服务,目前在美图多个线上产品使用接近两年的时间。主要提供以下特性:

  • 任务具备延时、自动重试、优先级以及过期等功能

  • 通过 HTTP restful API 提供服务

  • 具备横向扩展能力

  • 丰富的业务和性能指标

Github 项目地址: https://github.com/meitu/lmstfy

使用场景

任务队列跟消息队列在使用场景上最大的区别是: 任务之间是没有顺序约束而消息要求顺序(FIFO),且可能会对任务的状态更新而消息一般只会消费不会更新。 类似 Kafka 利用消息 FIFO 和不需要更新(不需要对消息做索引)的特性来设计消息存储,将消息读写变成磁盘的顺序读写来实现比较好的性能。而任务队列需要能够任务状态进行更新则需要对每个消息进行索引,如果把两者放到一起实现则很难实现在功能和性能上兼得。在美图内部选型上,如果是异步消息模型一般会选择消息队列,比如类似日志上报,抢购等。而对于需要延时/定时下发或者修改状态任务则是使用任务队列。

比如在以下几种场景会使用任务队列:

  1. 定时任务,如每天早上 8 点开始推送消息,定期删除过期数据等

  2. 任务流,如自动创建 Redis 流程由资源创建,资源配置,DNS 修改等部分组成,使用任务队列可以简化整体的设计和重试流程

  3. 重试任务,典型场景如离线图片处理

目标与调研

在自研任务队列之前,我们基于以下几个要求作为约束调研了现有一些开源方案:

  • 任务支持延时/优先级任务和自动重试

  • 高可用,服务不能有单点以及保证数据不丢失

  • 可扩展,主要是容量和性能需要可扩展

第一种方案是 Redis 作者开源的分布式内存队列 disque(https://github.com/antirez/disque)。disque 采用和 Redis Cluster 类似无中心设计,所有节点都可以写入并复制到其他节点。不管是从功能上、设计还是可靠性都是比较好的选择。我们在 2017 年也引入 disque 在部分业务使用过一段时间,后面遇到 bug 在内部修复后想反馈到社区,发现 Redis 作者决定不再维护这个项目(要把 disque 功能作为 redis module 来维护,应该是会伴随 Redis 6 发布)。最终我们也放弃了 disque 方案,将数据迁移到我们自研任务队列服务。

第二种方案是 2007 年就开源的 beanstalkd(https://github.com/beanstalkd/beanstalkd),现在仍然还是在维护状态。beanstalkd 是类 memcached 协议全内存任务队列,断电或者重启时通过 WAL 文件来恢复数据。但 benstalkd 不支持复制功能,服务存在单点问题且数据可靠性也无法满足。当时也有考虑基于 beanstalkd 去做二次开发,但看完代码之后觉得需要改造的点不只是复制,还有类似内存控制等等,所以没有选择 beanstalkd 二次开发的方案。

也考虑过类似基于 kafka/rocketmq 等消息队列作为存储的方案,最后从存储设计模型和团队技术栈等原因决定选择基于 redis 作为存储来实现任务队列的功能。举个例子,假设以 Kafka 这种消息队列存储来实现延时功能,每个队列的时间都需要创建一个单独的 topic(如: Q1-1s, Q1-2s..)。这种设计在延时时间比较固定的场景下问题不太大,但如果是延时时间变化比较大会导致 topic 数目过多,会把磁盘从顺序读写会变成随机读写从导致性能衰减,同时也会带来其他类似重启或者恢复时间过长的问题。

设计和实现

整体设计

lmstfy 是 HTTP 协议的无状态服务,可以通过 4/L7 的 LB 来接入。内部主要由四个模块组成:

  1. Pump Thread: 每秒轮询 Redis 将到期的任务迁移到就绪队列(ready queue)

  2. Metric Collector, 定时收集队列相关统计数据到实例再通过 prometheus exporter 暴露给监控系统

  3. Token Manager,用来管理 namespace 和 token 的模块,namespace 是用来做业务隔离的单位

  4. Producer/Consumer,用来处理用户的任务和消费请求

Default Pool 除了用来存储业务数据,namespace/token 这类元数据也会默认存储到 Default 这个 Redis 池子里面

基础概念

  • namespace - 用来隔离业务,每个业务是独立的 namespace

  • queue - 队列名称,用区分同一业务不同消息类型

  • job - 业务定义的业务,主要包含以下几个属性:

    • id: 任务 ID,全局唯一

    • delay: 任务延时下发时间, 单位是秒

    • tries: 任务最大重试次数,tries = N 表示任务会最多下发 N 次

    • ttl(time to live): 任务最长有效期,超过之后任务自动消失

    • ttr(time to run): 任务预期执行时间,超过 ttr 则认为任务消费失败,触发任务自动重试

数据存储

lmstfy 的 redis 存储由四部分组成:

  1. timer(sorted set) - 用来实现延迟任务的排序,再由后台线程定期将到期的任务写入到 Ready Queue 里面

  2. ready queue (list) - 无延时或者已到期任务的队列

  3. deadletter (list) - 消费失败(重试次数到达上限)的任务,可以手动重新放回队列

  4. job pool(string) - 存储消息内容的池子

支持延迟的任务队列本质上是两个数据结构的结合: FIFO 和 sorted set。sorted set 用来实现延时的部分,将任务按照到期时间戳升序存储,然后定期将到期的任务迁移至 FIFO(ready queue)。任务的具体内容只会存储一份在 job pool 里面,其他的像 ready queue,timer,deadletter 只是存储 job id,这样可以节省一些内存空间。

以下是整体设计:

任务写入

任务在写入时会先产生一个 job id,目前 job id (16bytes) 包含写入时间戳、 随机数和延迟秒数, 然后写入 key 为 j:{namespace}/{queue}/{ID} 的任务到任务池 (pool) 里面。之后根据延时时间来决定这个 job id 应该到 ready queue 还是 timer 里面:

  • delay = 0,表示不需要延时则直接写到 ready queue(list)

  • delay = n(n > 0),表示需要延时,将延时加上当前系统时间作为绝对时间戳写到 timer(sorted set)

timer 的实现是利用 zset 根据绝对时间戳进行排序,再由旁路线程定期轮询将到期的任务通过 redis lua script 来将数据原子地转移到 ready queue 里面。

任务消费

之前提到任务在消费失败之后预期能够重试,所以必须知道什么时候可认为任务消费失败?业务在消费时需要携带 ttr(time to run) 参数,用来表示业务预期任务最长执行时间,如果在 ttr 时间内没有收到业务主动回复 ACK 消息则会认为任务失败(类似 tcp 的重传 timer)。

消费时从 ready queue 中 (B)RPOP 出任务的 job id,然后根据 job id 从 pool 中将任务内容发送给消费者。同时对 tries 减1,根据消费的 ttr(time to run) 参数, 将任务放入 timer 中。如果 tries 为零, 在 ttr 时间到期后该 job id 会被放入 dead letter 队列中(表示任务执行失败)。

同步任务模型

lmstfy 除了可以用来实现异步和延时任务模型之外,因为 namespace 下面的队列是动态创建且 job id 全局唯一,还可以用来实现同步任务模型 (producer 等到任务执行成功之后返回)。大概如下:

  1. producer 写入任务之后拿到 job id, 然后监听(consume)以 job id 为名的队列

  2. consumer 消费任务成功后,写回复消息到同样以 job id 为名的队列中

  3. producer 如果规定时间内能读到回复消息则认为消费成功,等待超时则认为任务失败

如何实现横向扩展

lmstfy 本身是无状态的服务可以很简单的实现横向扩展,这里的横向扩展主要是存储(目前只支持 Redis)的横向扩展。设计也比较简单,主要通过通过 namespace 对应的 token 路由来实现, 比如我们当前配置两组 Redis 资源: default 和 meipai:

[Pool][Pool.default]Addr = "1.1.1.1:6379"[Pool.meipai]Addr = "2.2.2.2:6389"

在创建 namespace 时可以指定资源池,token 里面会携带资源池名字作为前缀。比指定美拍资源池,那么 token 类似: meipai:01DT8EZ1N6XT ,后续在处理请求时就可以根据 token 里面携带的资源池名称来进行路由数据。不过这种设计实现队列级别的扩展,如果单队列存储消息量超过 Redis 内存上限则需要其他手段来解决(后面会支持磁盘类型存储)。

如何使用

# 创建 namespace 和 token, 注意这里使用管理端口$ ./scripts/token-cli -c -n test_ns -p default -D "test ns apply by @hulk" 127.0.0.1:7778{ "token": "01DT9323JACNBQ9JESV80G0000"}# 写入内容为 value 的任务$ curl -XPUT -d "value" -i "http://127.0.0.1:7777/api/test_ns/q1?tries=3&delay=1&token=01DT931XGSPKNB7E2XFKPY3ZPB"{"job_id":"01DT9323JACNBQ9JESV80G0000

redis延迟队列 如何确保成功消费_千万级延时任务队列如何实现,看美图开源的-LMSTFY...相关推荐

  1. redis延迟队列 实现_灵感来袭,基于Redis的分布式延迟队列

    一.延迟队列 延迟队列,也就是一定时间之后将消息体放入队列,然后消费者才能正常消费.比如1分钟之后发送短信,发送邮件,检测数据状态等. 二.Redisson Delayed Queue 如果你项目中使 ...

  2. Memcache/Redis集群管理探索与实现:美图开源PaaS平台资源网关

    https://mp.weixin.qq.com/s/HSuzeS2BBf-pLexZOr8c8A 美图开源内部改造的 Twemproxy(Redis/Memcached 缓存代理) https:// ...

  3. redis延迟队列 实现_灵感来袭,基于Redis的分布式延迟队列(续)

    背景 上一篇(灵感来袭,基于Redis的分布式延迟队列)讲述了基于Java DelayQueue和Redis实现了分布式延迟队列,这种方案实现比较简单,应用于延迟小,消息量不大的场景是没问题的,毕竟J ...

  4. java redis延迟队列_基于redis实现的延迟消息队列

    delay-queue redis实现延迟消息队列 需求背景 最近在做一个排队取号的系统 在用户预约时间到达前XX分钟发短信通知 在用户预约时间结束时要判断用户是否去取号了,不然就记录为爽约 在用户取 ...

  5. redis延迟队列 实现_php使用redis的有序集合zset实现延迟队列

    延迟队列就是个带延迟功能的消息队列,相对于普通队列,它可以在指定时间消费掉消息. 延迟队列的应用场景: 1.新用户注册,10分钟后发送邮件或站内信. 2.用户下单后,30分钟未支付,订单自动作废. 我 ...

  6. 电商项目订单取消(Redis 延迟队列)--1

    现功能时的选择很重要,如果你的系统所处理的数据量不是很大,我觉得队列和缓存很适合你,这样你可以对消息的传递更加了解,但你使用MQ,kafka的中间件时,你会发现使用起来更加轻松,但对于数据量大的系统来 ...

  7. 如何估算代码量_千万级用户的大型网站,应该如何设计其高并发架构?(彩蛋)...

    目录 (1)单块架构 (2)初步的高可用架构 (3)千万级用户量的压力预估 (4)服务器压力预估 (5)业务垂直拆分 (6)用分布式缓存抗下读请求 (7)基于数据库主从架构做读写分离 (8)总结 本文 ...

  8. 幽灵交易策略_千万级 交易商 独家 策略 开盘区间 突破 PZ Stretch 指标

    千万级 交易商 独家 策略 开盘区间 突破 PZ Stretch 指标 PZ Stretch 指标 是根据成功交易者[Toby Crabel]的独特交易策略,并由著名开发公司 Point Zero 研 ...

  9. mysql like 多个条件_千万级MySQL数据库这样建索引可以让你的数据库飞起来.........

    创建索引常用规则 1.表的主键.外键必须有索引: 2.数据量超过300的表应该有索引: 3.经常与其他表进行连接的表,在连接字段上应该建立索引: 4.经常出现在Where子句中的字段,特别是大表的字段 ...

最新文章

  1. 平时用电脑的一些技巧
  2. ARP缓存表的构成ARP协议全面实战协议详解、攻击与防御
  3. python接口自动化(二十四)--unittest断言——中(详解)
  4. 【Python刷题】_3
  5. 数据结构学习笔记(四):重识数组(Array)
  6. 关系型数据库(RDBMS)实质
  7. 【大会】网络性能、安全与成本之困
  8. cass块参照怎么改颜色,【干货】新版本CASS符号颜色自定义详解
  9. element table表格里的多选按钮,根据条件判断是否可以被选中
  10. zabbix触发器表达式详解
  11. 【译】使用 ndb 调试 node 应用
  12. python构建网站flask_某课Python Flask实现构建视频网站
  13. win10 没有计算机策略,Win10家庭版没有组策略怎么办
  14. logistic回归列线图(nomogram)的多种绘制方法
  15. Excel vba编程实现ftp下载并打开自动运行
  16. 【javaweb】【服务器】javaweb部署到服务器显示jdbc驱动加载失败
  17. 免费 whois 查询 API
  18. html设置一句话的样式,页面上的div标签,其HTML代码为divid=boxstyle=color:red文字/div,为其设置CSS样式如下:...
  19. Tomcat过时了?别多想,很多公司还是在用的。这份Tomcat架构详解,真的颠覆你的认知
  20. gzip and deflate

热门文章

  1. 长时间整理的xhtml网页设计规范
  2. ASP.NET中生成缩略图的代码
  3. Delphi中DLL封装业务逻辑的实现
  4. 23种设计模式C++源码与UML实现--建造者模式
  5. pandas中的函数—— .map()
  6. 【路由交换实验】OSPF
  7. eclipse创建了java web项目后怎么连接mysql
  8. 16进制数怎么判断正负
  9. 汇编语言串指令经典题目
  10. UNIX再学习 -- 线程同步