点击上方“Java精选”,选择“设为星标”

别问别人为什么,多问自己凭什么!

下方有惊喜,留言必回,有问必答!

每天 08:15 更新文章,每天进步一点点...

一、背景

在业务发展过程中,会出现一些需要延时处理的场景,比如:

  1. 订单下单之后超过30分钟用户未支付,需要取消订单

  2. 订单一些评论,如果48h用户未对商家评论,系统会自动产生一条默认评论

  3. 点我达订单下单后,超过一定时间订单未派出,需要超时取消订单等。。

处理这类需求,比较直接简单的方式就是定时任务轮训扫表。这种处理方式在数据量不大的场景下是完全没问题,但是当数据量大的时候高频的轮训数据库就会比较的耗资源,导致数据库的慢查或者查询超时。所以在处理这类需求时候,采用了延时队列来完成。

二、几种延时队列

延时队列就是一种带有延迟功能的消息队列。下面会介绍几种目前已有的延时队列:

推荐下自己几个月熬夜整理的各个大厂面试资料:

https://gitee.com/yoodb/eboo‍ks

1.Java中java.util.concurrent.DelayQueue

优点:JDK自身实现,使用方便,量小适用

缺点:队列消息处于jvm内存,不支持分布式运行和消息持久化

2.Rocketmq延时队列

优点:消息持久化,分布式

缺点:不支持任意时间精度,只支持特定level的延时消息

3.Rabbitmq延时队列(TTL+DLX实现)

优点:消息持久化,分布式

缺点:延时相同的消息必须扔在同一个队列

根据自身业务和公司情况,如果实现一个自己的延时队列服务需要考虑一下几点:

* 消息存储
* 过期延时消息实时获取
* 高可用性

三、 基于Redis实现

1.0版本

  • 功能特性

* 消息可靠性,消息持久化,消息至少被消费一次

* 实时性:存在一定的时间误差(定时任务间隔)

* 支持指定消息remove

* 高可用性

  • 整体结构

- Messages Pool所有的延时消息存放,结构为KV结构,key为消息ID,value为一个具体的message(这里选择Redis Hash结构主要是因为hash结构能存储较大的数据量,数据较多时候会进行渐进式rehash扩容,并且对于HSET和HGET命令来说时间复杂度都是O(1))

- Delayed Queue是16个有序队列(队列支持水平扩展),结构为ZSET,value为messages pool中消息ID,score为过期时间(分为多个队列是为了提高扫描的速度)

推荐下自己做的 Spring boot 的实战项目:
https://gitee.com/yoodb/jing-xuan

- Timed Task定时任务,负责扫描处理每个队列过期消息

  • 消息结构

每个延时消息必须包括以下参数:

* tags:消息过期之后发送mq的tags

* keys:消息过期之后发送mq的keys

* body:消息过期之后发送mq的body,提供给消费这做具体的消息处理

* delayTime:延时发送时间(默认,delayTime、expectDate有一个即可)

* expectDate:期望发送时间

* 另外,推荐公众号Java精选,回复java面试,获取最新redis面试题资料,支持在线随时随地刷题。

  • 流程

注:上图1、2、3或者2、3是一个事务操作

取出过期消息过程是通过一个外部定时任务每隔1min分钟去查询队列中过期的消息,然后发送mq && remove

2.0版本

1.0上有一个可改进的地方就是队列中过期的消息是通过定时任务触发查询。所有有了2.0

2.0版本在1.0上做了一个优化,废弃掉了1min定时任务触发过期消息发送,采用了java Lock await/singlal方式实现过期消息的实时发送低延时

  • 多节点部署结构:

- pull job:这里分别为每一个队列创建了一个pull job thread,功能很简单,就是负责去队列中拉取过期的消息数据(这里保证一个队列有且只有一个pull job)

- worker:pull job拉取到的过期消息会交给一个worker thread去处理,这样的好处是处理过期的消息实时性更高(pull job不必等去除过期消息全部处理完成在继续去拉取新的过期数据)。另外,关注公众号Java精选,回复渗透工具四个汉字,可以获取十种渗透工具。

- zookeeper coordinate:通过zk的操作来完成对队列的重新分配工作,daemon thread监听zk节点的创建和删除。

  • 主要流程:

服务启动会注册zk,获取分配处理的queues,启动后台线程监听zk 。
为每个分配queue创建一个pull job 。

pull job首先会去queue中查询是否有过期消息:

Y:将取出消息交给worker处理

N:查询queue中最后一个成员(zset结构默认按score递增排序),如果为空,则await;不为空则await(成员score-System.currentTimeMillis())

由于过期消息发送成功才会从队列中remove,所以pull job会记录上一次查询队列的一个offset,每次获取到过期消息会将offset向前偏移,过期消息交给worker处理,当worker由于某些异常原因处理失败会重置pull job中offset,这样可以避免消息发送一次失败之后没办法在继续处理(除了新节点add || remove时候)。

当部署服务有新增,延时队列服务会重新计算得到当前处理队列,并将之前创建pull job cancel,为新处理队列重新创建pull job。删除同理。

作者:程序人生ly

https://www.cnblogs.com/lylife/p/7881950.html

公众号“Java精选”所发表内容注明来源的,版权归原出处所有(无法查证版权的或者未注明出处的均来自网络,系转载,转载的目的在于传递更多信息,版权属于原作者。如有侵权,请联系,笔者会第一时间删除处理!

------ THE END ------

精品资料,超赞福利!

>Java精选面试题<
3000+ 道面试题在线刷,最新、最全 Java 面试题!

期往精选  点击标题可跳转

【232期】面试官:如何保护 Spring Boot 配置文件敏感信息?

【233期】Java8 stream 处理 List 集合的相同部分(交集)、去重!

【234期】新来的同事问我 where 1=1 是什么意思?

【235期】不同并发场景下 LongAdder 与 AtomicLong 如何选择?

【236期】ElasticSearch 进阶:一文全览各种 ES 查询在 Java 中的实现

【237期】Java 8 判空新写法

【238期】Java 8 中 Lambda 实现原理及源码剖析!

【239期】面试官问:你觉得 ThreadLocalRandom 这玩意安全吗?

技术交流群!

最近有很多人问,有没有读者&异性交流群,你懂的!想知道如何加入。加入方式很简单,有兴趣的同学,只需要点击下方卡片,回复“加群”,即可免费加入交流群!

文章有帮助的话,在看,转发吧!

【240期】面试官问:说说基于 Redis 实现延时队列服务?相关推荐

  1. 面试官问我:Redis 内存满了怎么办

    转载自 想不到!面试官问我:Redis 内存满了怎么办 Redis占用内存大小 Redis的内存淘汰 LRU算法 LRU在Redis中的实现 LFU算法 问题 Redis占用内存大小 我们知道Redi ...

  2. 面试官问我,Redis分布式锁如何续期?懵了。

    作者:肥朝,来自:肥朝(ID:feichao_java) 前言 上一篇[面试官问我,使用Dubbo有没有遇到一些坑?我笑了.]之后,又有一位粉丝和我说在面试过程中被虐了.鉴于这位粉丝是之前肥朝的老粉丝 ...

  3. 面试官问我为什么Redis这么快,我.......

    开课开课~ 面试官:为什么项目中用Redis? 我:当然是因为Redis好啊 面试官:emmm.....那Redis哪里好? 我:因为Redis快啊. 面试官:(这小伙子有点彪啊...)那为什么Red ...

  4. 基于Redis实现延时队列的优化方案

    一.延时队列的应用 近期在开发部门的新项目,其中有个关键功能就是智能推送,即根据用户行为在特定的时间点向用户推送相应的提醒消息,比如以下业务场景: 在用户点击充值项后,半小时内未充值,向用户推送充值未 ...

  5. 基于Redis实现延时队列

    文章目录 理论介绍 代码实现 理论介绍 首先贴图来说明redis是如何实现延时队列的 当用户发送一个消息请求给服务器后台的时候,服务器会检测这条消息是否需要进行延时处理,如果需要就放入到延时队列中,由 ...

  6. 想不到!面试官问我:Redis 内存满了怎么办?

    点击上方"方志朋",选择"设为星标" 回复"666"获取新整理的面试资料 来源:http://rrd.me/et29e Redis占用内存大 ...

  7. 如果面试官问你:Redis 内存满了怎么办?

    来自:掘金(作者:千山qianshan) 原文链接: https://juejin.im/post/5d674ac2e51d4557ca7fdd70 Redis占用内存大小 Redis的内存淘汰 LR ...

  8. 高德面试官问我:JVM内存溢出后服务还能运行吗,我一顿操作行云流水

    文章开篇问一个问题吧,一个java程序,如果其中一个线程发生了OOM,那进程中的其他线程还能运行吗? 接下来做实验,看看JVM的六种OOM之后程序还能不能访问. 在这里我用的是一个springboot ...

  9. redis怎么修改_面试官问我Redis事务,还问我有哪些实现方式

    ❝ 「第12期」 距离大叔的80期小目标还有68期,今天大叔要跟大家分享的内容是 -- Reids中的事务.同样,这也是redis中重要指数为四颗星的必备基础知识点.下面一起来了解一下吧. ❞ 相信大 ...

最新文章

  1. 谈谈机器学习模型的部署
  2. mysql集群之MMM简单搭建
  3. 超经典动态规划题:最大子序和
  4. 2020-12-09 深度学习 卷积核/过滤器、特征图(featue map)、卷积层
  5. Maven在Eclipse中的实用小技巧
  6. gaia引擎分析(一)资源管理
  7. QT + MinGW 中文显示乱码解决方案
  8. vue页面乱码_项目部署到weblogic后页面乱码问题
  9. VS2015 编译问题记录(更新)
  10. ZOJ 2859 二维RMQ(模板)
  11. 男孩应该懂的,女孩应该懂的
  12. php编码怎么变西欧了403,你知道一个简单的PHP脚本在ip检查后抛出403吗?
  13. 主成分分析与因子分析及SPSS实现
  14. 瑞昱Realtek交换机芯片概要
  15. 洛谷 P4099 SAO —— 树形dp
  16. 阿里云无影云桌面分配用户是什么?
  17. 为了讨好程序员,阿里云和 Costa 开了家码农咖啡馆
  18. win7删除计算机 网络打印机驱动程序,Windows 7、8 系统下手动删除打印机驱动程序的方法...
  19. HDU2037:今年暑假不AC
  20. 程序员如何轻松实现数据可视化?

热门文章

  1. 浅谈 Spring IOC和AOP
  2. INRIA数据集转换成VOC格式
  3. js定时器以及验证码倒计时案例
  4. Halcon评价图像清晰度的算子
  5. 上海交大计算机网络基础,上海交大计算机网络基础教程
  6. 实体店如何挑选会员系统
  7. 不得不说的SD2.0
  8. 盐城计算机工程学院,​盐城市计算机学会召开2020年会
  9. 学校计算机室应该配备哪种灭火器,学校教学楼应配备的灭火器型号是什么呢
  10. 介绍一款密码分析软件cap4