最近由于项目的需要原因,需要做一个延时队列,比如用户登录X秒后需要发送一些系统消息。或者要做一个小游戏,需要有操作超时检测,如果超时,则自动跳到下一个玩家操作。这些,都用到了定时检测,而又想到了redis有过期回调功能,所以打算使用redis的过期回调来实现这些功能。由于对于redis的过期回调不熟悉,导致踩了一些坑。

先大致介绍一下延时队列的实现方案:

  1. 定期轮询数据库
  2. DelayQueue
  3. 基于redis的过期回调
  4. 基于redis的zset + 定时实现
  5. 基于Rabbitmq死信队列

这里主要介绍基于redis的实现。

基于redis的过期回调实现

主要的问题有:

  1. redis的惰性删除以及过期策略导致的过期不回调问题
  2. 多实例监听导致的重复消费问题
  3. 由于redis的key已经过期,所以无法收到value的值,需要把业务需要的字段加到key中

先贴实现的配置和代码。

yml配置

spring:redis:database: 1host: 127.0.0.1port: 6380password:jedis:pool:max-active: 32max-wait: 2000msmax-idle: 8min-idle: 0

监听类:


@Service
public class RichmanExpireCallBackListener implements MessageListener {private static final Logger logger = LoggerFactory.getLogger(RichmanExpireCallBackListener.class);@Autowiredprivate RedisTemplate<String, String> stringTemplate;@Overridepublic void onMessage(Message message, byte[] bytes) {String str = (String) stringTemplate.getValueSerializer().deserialize(message.getBody());logger.info("=========richmanExpireCallBack onMessage  str:{}", str);//实现过期回调业务}
}

配置类:


@Configuration
public class RedisConfig {@Autowiredprivate RedisConnectionFactory redisConnectionFactory;@Autowiredprivate RichmanExpireCallBackListener redisMessageListener;@Bean("stringTemplate")public RedisTemplate<String, String> stringTemplate(RedisConnectionFactory redisConnectionFactory, StringRedisSerializer stringRedisSerializer) {RedisTemplate<String, String> redisTemplate = new RedisTemplate<>();redisTemplate.setKeySerializer(stringRedisSerializer);redisTemplate.setValueSerializer(stringRedisSerializer);redisTemplate.setHashKeySerializer(stringRedisSerializer);redisTemplate.setHashValueSerializer(stringRedisSerializer);redisTemplate.setConnectionFactory(redisConnectionFactory);return redisTemplate;}@Beanpublic StringRedisSerializer stringRedisSerializer() {return new StringRedisSerializer();}@Beanpublic ChannelTopic expiredTopic() {return new ChannelTopic("__keyevent@1__:expired");  // 选择1号数据库}@Beanpublic RedisMessageListenerContainer redisMessageListenerContainer() {RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);redisMessageListenerContainer.addMessageListener(redisMessageListener, expiredTopic());return redisMessageListenerContainer;}
}

单元测试:

@Autowired
private RedisTemplate<String, String> stringTemplate;@Test
public void testRedis() {stringTemplate.opsForValue().set("test","1",5, TimeUnit.SECONDS);System.out.println("=================");
}

这里配置的redis是取的是1号数据库,需要过期回调的key都set进1号数据库里。在配置类里,监听的也是1号数据库的过期数据。当然监听类也可以同过继承KeyExpirationEventMessageListener类,重写onMessage(Message message, byte[] pattern)来实现监听redis的过期回调。但是KeyExpirationEventMessageListener类的源码里监听所有 db 的过期事件 keyevent@*:expired

,而我这边把需要监听的过期的key都存在了1号数据库,所以只需要监听1号数据库的即可,而不需要因为监听了所有的过期的key,自己还要进行一次匹配过滤。

以下粘贴一个介绍redis过期回调的参数配置的博客,来自https://blog.csdn.net/zhu_tianwei/article/details/80169900

redis自2.8.0之后允许客户订阅Pub / Sub频道, Redis 目前的订阅与发布功能采取的是发送即忘(fire and forget)策略, 所以如果你的程序需要可靠事件通知, 那么目前的键空间通知可能并不适合:当订阅事件的客户端断线时, 它会丢失所有在断线期间分发给它的事件,并不能确保消息送达。

事件类型

对于每个修改数据库的操作,键空间通知都会发送两种不同类型的事件消息:keyspace 和 keyevent。以 keyspace 为前缀的频道被称为键空间通知(key-space notification), 而以 keyevent 为前缀的频道则被称为键事件通知(key-event notification)。

事件是用  __keyspace@DB__:KeyPattern 或者  __keyevent@DB__:OpsType 的格式来发布消息的。
DB表示在第几个库;KeyPattern则是表示需要监控的键模式(可以用通配符,如:__key*__:*);OpsType则表示操作类型。因此,如果想要订阅特殊的Key上的事件,应该是订阅keyspace。
比如说,对 0 号数据库的键 mykey 执行 DEL 命令时, 系统将分发两条消息, 相当于执行以下两个 PUBLISH 命令:
PUBLISH __keyspace@0__:sampleKey del
PUBLISH __keyevent@0__:del sampleKey
订阅第一个频道 __keyspace@0__:mykey 可以接收 0 号数据库中所有修改键 mykey 的事件, 而订阅第二个频道 __keyevent@0__:del 则可以接收 0 号数据库中所有执行 del 命令的键。

开启配置

键空间通知功能耗费CPU,默认关闭。所以在使用该特性之前,请确认一定是要用这个特性的,然后修改配置文件,或使用config配置。相关配置项如下:

字符 发送通知
K 键空间通知,所有通知以 keyspace@ 为前缀,针对Key
E 键事件通知,所有通知以 keyevent@ 为前缀,针对event
g DEL 、 EXPIRE 、 RENAME 等类型无关的通用命令的通知
$ 字符串命令的通知
l 列表命令的通知
s 集合命令的通知
h 哈希命令的通知
z 有序集合命令的通知
x 过期事件:每当有过期键被删除时发送
e 驱逐(evict)事件:每当有键因为 maxmemory 政策而被删除时发送
A 参数 g$lshzxe 的别名,相当于是All

输入的参数中至少要有一个 K 或者 E , 否则的话, 不管其余的参数是什么, 都不会有任何通知被分发。上表中斜体的部分为通用的操作或者事件,而黑体则表示特定数据类型的操作。配置文件中修改 notify-keyspace-events “Kx”,注意:这个双引号是一定要的,否则配置不成功,启动也不报错。例如,“Kx”表示想监控某个Key的失效事件。也可以通过config配置:CONFIG set notify-keyspace-events Ex (但非持久化)

重复消费问题

配置好了之后,在测试环境跑起来,会发现能完美运行,于是开开心心的上了线。但是,上了正式环境之后,就会发现其实这个redis回调并没有那么完美。首先的第一个问题就是重复消费问题。在实现用户登录X秒后收到系统的消息的功能中,发布到生产后,发现有很多用户收到了多条重复的系统通知。一开始以为是dubbo的重试功能导致了多次调用导致的,因为之前就有过由于dubbo的重试机制导致了获取token时报获取重复失败的情况。所以这次一开始以为是dubbo重试机制导致的重复发送,但经过了排查,和禁止dubbo重试之后,还是有这样的情况。后来才发现是生产环境多实例部署导致的问题。

由于生产环境,是多实例部署,而测试环境是单实例部署,所以测试环境并没有出现重复消费问题。redis的过期回调,是所有的监听的实例,都会受到redis的过期回调通知,所以在多实例部署的情况下,会出现一个key过期后,多次执行了回调代码,导致了重复消费。在实现用户登录X秒后收到系统的消息的功能中,就发现用户收到多条重复的系统消息,就是由于重复消费导致的。所以,如果如果回调代码要求不能重复消费的,则可以通过把消费的key放到一个set里,执行回调时,先检测是否有别的实例已经消费了。或者通过分布式锁,来避免重复消费。

部分数据无回调

当你解决了重复消费问题之后,以为事情就结束了。但是没过多久,你就会发现怎么有部分的过期的key为什么没有执行回调。这里就涉及到了redis的过期回调机制和淘汰机制的问题。由于redis是采用了惰性删除机制的,当key过期的时候,并没有立刻被删除的。而redis的回调,是需要key被删除了才会产生回调的。所以key到了过期时间后,并不能保证一定会产生回调。

过期键的删除策略

如果Redis的一个键是过期的,那它到了过期时间之后并不会马上就从内存中被删除,而是会采用相应的删除策略。主要有三种删除策略:

  1. 立即删除
  2. 惰性删除
  3. 定时删除

立即删除是指,在设置键的过期时间时,创建一个回调事件,当过期时间达到时,由时间处理器自动执行键的删除操作。立即删除能保证内存数据的新鲜度,过期的键值会被马上删除,所占用的内存也会随之释放。但是这个策略会消耗cpu,在过期的key比较多的情况下,会占用比较多的cpu资源,将cpu的资源耗费在删除一些无关key的事情上。

惰性删除是指,redis的key过期时,不会立刻被删除,而是等待下一次查询,或者被使用时,才会检测到过期了,才会被删除。所以这个策略的缺点是很浪费内存。

定时删除是指,每隔一段时间程序就会根据内部算法,对Redis数据进行一次检查,删除里面的过期key。定时删除策略介于立即删除和惰性删除之间,比立即删除少耗费cpu资源,比惰性删除节省内存空间。清理算法会通过限制清理的频率和时长来减少对cpu的影响。定时删除会依次遍历所有db,从db里随机取出20个key,判断是否过期,如果过期则清除。若有5个以上key过期,则重复上面步骤,否则遍历下一个db。在清理过程中,如果超过了限定的时间,25%的cpu时间,就会退出清理过程。

Redis配置项hz定义了serverCron任务的执行周期,默认为10,即CPU空闲时每秒执行10次,每隔100ms执行一次。每次过期key清理的时间不超过CPU时间的25%,即若hz=1,则一次清理时间最大为250ms,若hz=10,则一次清理时间最大为25ms

这是一个基于概率的简单算法,基本的假设是抽出的样本能够代表整个key空间,redis持续清理过期的数据直至将要过期的key的百分比降到了25%以下。由于算法采用的随机取key判断是否过期的方式,故几乎不可能清理完所有的过期Key。
调高hz参数可以提升清理的频率,过期key可以更及时的被删除,但hz太高会增加CPU时间的消耗。

数据逐出策略

上面的是过期的数据的删除策略,redis中还有6种数据淘汰策略。redis中过期key的删除策略默认为定时删除+惰性删除,而定时删除也会有部分key没有被删除,长期以往,就会大量的过期的key堆积在内存,导致内存耗尽。所以当redis中使用的内存快达到了设置的限定内存时,就会使用相应的数据淘汰策略,来进行数据淘汰,腾出空间。

  1. noeviction:禁止驱逐数据,当内存不足时,写入操作会报错。一般都不会设置为这个。
  2. volatile-lru:移除最近最少使用的key,只从设置了过期时间的key里移除。
  3. volatile-random:在设置了过期时间的key里,随机移除某个key
  4. volatile-ttl:在设置了过期时间的key里,挑选将要过期的key进行淘汰
  5. allkeys-lru:从数据集中挑选最近最少使用的数据淘汰
  6. allkeys-random:从数据集中任意选择数据淘汰

基于redis回调实现的延时队列的缺点

从上面的介绍来看,可以很明显的看到,基于redis回调实现的延时队列,并不能保证key过期时,一定会触发回调,存在一定的时间差。在一些场景中,比如玩游戏时,操作超时检测这种场景,这个方案就不适合。即key过期就一定执行回调业务的,对时效性有比较高的要求的场景,则不适用该方案。

基于zset + 定时任务的实现

延时队列还可以基于redis的zset结构+定时任务实现。将过期时间设为score,然后通过定时任务来扫描小于当前时间的数据,即过期的数据,然后批量进行业务处理。这个方案也是存在着一些问题的:

  1. 多实例部署时,就会有多个定时任务在跑,那么就会出现重复消费的情况。

  2. 重复消费可以利用分布式锁来进行互斥,使得只有一个实例在跑定时任务。那这样就容易出现单机跑任务,如果任务比较密集,会有一定的延迟。可以考虑利用多线程提升性能,但是还是避免不了单机的问题。

  3. 分布式锁不锁实例,只锁任务。即把锁的粒度调低,但是每个任务都需要获取锁,并且多实例还是会有任务的锁竞争,这些都是耗损性能的地方

  4. 利用zrem方法避免锁竞争。多实例执行定时任务获取过期数据,然后zrem成功的实例进行处理数据。这里有个明显的缺点,就是如果zrem成功后,但是程序运行失败了,则这个数据的处理就丢失了,也就没办法保证可用性。

  5. 定时任务需要多实例之间时钟同步。如果是实时性要求比较高的,机器之间出现了时钟的误差,那么就容易导致时间不精确。

基于RabbitMQ死信队列的实现

RabbitMQ的简单介绍

RabbitMQ是基于AMQP的一款消息管理系统,RabbitMq有以下几种常用的消息模型。

1.基本消息模型。该模型中生产者将消息发送到队列,消费者从队列中获取消息,队列是存储消息的缓冲区。一个生产者对应一个消费者,而实际生产过程中,往往消息生产会发送很多条消息,如果消费者只有一个的话效率就会很低,因此rabbitmq有另外一种工作队列消息模型

2.作队列消息模型。这种模型下,一个生产发送消息到队列,允许有多个消费者接收消息,但是一条消息只会被一个消费者获取。

3.订阅模型。实现一条消息被多个消费者获取。在这种模型下,消息传递过程中比之前多了一个exchange交换机,生产者不是直接发送消息到队列,而是先发送给交换机,经由交换机分配到不同的队列,而每个消费者都有自己的队列。

交换机的类型有以下几种:

1.Fanout:广播,交换机将消息发送到所有与之绑定的队列中去。

2.Direct:定向,交换机按照指定的Routing Key发送到匹配的队列中去。生产者发送的消息时需要携带一个Routing Key,消费者也需要设置Routing Key,并消费相应的Routing Key的消息。

3.Topics:通配符,与Direct大致相同,不同在于Routing Key可以根据通配符进行匹配。

死信队列

DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange。

消息变成死信的几种情况:

  • 消息被拒绝(basic.reject/basic.nack)并且requeue=false
  • 消息TTL过期
  • 队列达到最大长度

死信队列设置

死信队列消息的流转为:生产者发消息到死信队列,设置好TTL,然后消息过期后,会根据设置好exchange和routing key发送到相应的死信队列中。

首先先创建一个exchange

然后创建一个保存设置有过期时间的队列,并设置好过期后,路由到哪个exchange和routing key里。之后消息过期后,会发送到设置好的队列里。

最后,创建好接收死信队列消息的队列,并绑定好exchange和routing key即可。

测试

 @Autowiredprivate RabbitTemplate rabbitTemplate;@Autowired@Qualifier("DLXMessagePostProcessor")private MessagePostProcessor messagePostProcessor;@Testpublic void test() {try {for(int i=0;i<10;i++) {rabbitTemplate.convertAndSend(RabbitQueueName.DLX_EXCHANGE, RabbitQueueName.DLX_ROUTING_KEY, "test1111",messagePostProcessor);Thread.sleep(2000);}} catch (Exception e) {log.error("[cmd=MsgQueueService send err ... ]", e);}}
@Bean(name="DLXMessagePostProcessor")MessagePostProcessor getMessagePostProcessor() {return new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration("5000");return message;}};}

运行后会发现消息会先发到dlx.expire.queue,然后5秒超时后会转发到dlx.deal.queue。最后能看到dlx.deal.queue有10条记录。配置的时候,需要注意的是死信队列的绑定关系。保存过期消息的队列A需要设置好消息过期后,需要发送到的exchange和routing key。死信队列绑定的exchange和routing key需要和队列A设置的一致即可。这里由于个人懒得配置第二个exchange,所以把过期消息又发回给本exchange,但是routing key不一样,所以路由到别的队列中。

基于死信队列的实现总结

基于TTL模式的延时队列会涉及到2个交换机、2个路由键、2个队列,配置的时候会比较麻烦。基于死信队列的延时队列存在一个问题,就是同一个队列,即预保存过期消息的那个队列A里的消息延时时间最好一致。因为如果不一致,假设消息1过期时间为1小时,然后消息2过期时间为5秒。那么队列A需要先等消息1过期,即等1小时后,第二条数据才能过期。即消息2,虽然过期时间是5秒,但是需要等1小时了,才能被路由到死信队列。假设中间有条不设置过期时间的消息进入,而且没有被消费掉的话,会导致整个队列阻塞。解决过期时间不一致问题,可以设置多个队列。

关于延时队列的一些思考相关推荐

  1. Reids延时队列使用的思考

    我们平时习惯于使用 Rabbitmq 和 Kafka 作为消息队列中间件,来给应用程序之间增加异步消息传递功能.这两个中间件都是专业的消息队列中间件,特性之多超出了大多数人的理解能力. 对于那些只有一 ...

  2. RabbitMQ延时队列原理讲解

    RabbitMQ延时消息队列 延时队列介绍 延时队列即放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费. 那么,为什么需要延迟消费呢?我们来看以下的场景 网上商城下订单后30分钟 ...

  3. 4.2.4 Kafka高级特性解析(物理存储、稳定性:事物,控制器,可靠性,一致性,_consumer_offsets、延时队列、自定义重试队列)

    Kafka高级特性解析 文章目录 Kafka高级特性解析 2.5 物理存储 2.5.1 日志存储概述 2.5.2 日志存储 2.5.2.1 索引 2.5.2.1.1 偏移量 2.5.2.1.2 时间戳 ...

  4. 【Redis核心原理和应用实践】应用 2:缓兵之计 —— 延时队列

    我们平时习惯于使用 Rabbitmq 和 Kafka 作为消息队列中间件,来给应用程序之间增加异步消息传递功能.这两个中间件都是专业的消息队列中间件,特性之多超出了大多数人的理解能力.  使用过 Ra ...

  5. 兔老大的系统设计(二)定时系统(延时队列)

    之前文章: 兔老大的系统设计(一)健康度系统 一.背景 延迟队列的应用场景非常广泛,如客户主动操作: 股票定投 顾客预约场景 会员定时续费/缴费 CSDN定时发布 或系统内部操作: 订单成功后,在30 ...

  6. 【SpringBoot】43、SpringBoot中整合RabbitMQ实现延时队列(延时插件篇)

    死信队列实现篇,参考文章:[SpringBoot]60.SpringBoot中整合RabbitMQ实现延时队列(死信队列篇) 一.介绍 1.什么是延时队列? 延时队列即就是放置在该队列里面的消息是不需 ...

  7. 延时队列(Delayed)实现(支持失败重试机制自定义重试时间)

    最初的业务场景就是: 需要需要使用restTemplate调用个接口并且调用失败后需要延时重复调用(最多3次),第一次5秒,第二次10秒,第三次15秒. 1. 主要功能 最起初的话思考如果只是简单这样 ...

  8. SpringBoot 整合:Redis延时队列的简单实现(基于有赞的设计)

    点击关注公众号,Java干货及时送达 来源:blog.csdn.net/qq330983778/article/details/99341671 设计 之前学习Redis的时候发现有赞团队之前分享过一 ...

  9. 怎么设计一个合适的延时队列?

    [文章来源]https://sourl.cn/pcgvTp 延时队列技术调研 项目背景 延迟队列,它是一种带有延迟功能的消息队列,目前工作中有几处需延时处理的应用场景. 可选技术参考 kafka 考虑 ...

最新文章

  1. 三个大数据处理框架:Storm,Spark和Samza 介绍比较
  2. 交换机知识--集群管理
  3. SQL Server 2012 Express LocalDB
  4. int x = 0x13 c语言,2004年7月全国高等教育自学考试微型计算机原理与接口技术试题...
  5. 怎样与用户有效的沟通以获取用户的真实需求
  6. .NET环境下水晶报表使用总
  7. linux下的C语言开发(进程创建)
  8. 一张图学会python3语法-一张图片在Python操作下的4种玩法(附源码)
  9. Android中GridView实现互相添加和删除
  10. 4.Mongodb之js脚本
  11. 电脑的ppt打不开计算机二级,ppt打不开怎么办?详细教您详细解决方法
  12. 18个谷歌搜索技巧,让你受用终身
  13. 最简洁详细内网穿透教程实现远程桌面连接
  14. 计算机英语原文件夹,常用文件夹英文解译
  15. 中荷金生有约养老年金险怎么样?好不好?
  16. nexus2.5版本升级到nexus2.14(同时升级到nexus3.24版本)版本
  17. 一枚程序媛的java人生—2018年年终总结
  18. 互联网日报 | 4月27日 星期二 | 美团回应被立案调查;滴滴开通老年人打车400热线;百度App月活跃用户数达5.58亿
  19. 手机的IMEI、MEID、ICCID、UDID、IMSI
  20. Karl Guttag:Niantic户外AR参考设计或采用Lumus光波导

热门文章

  1. Altium Designer常用快捷键
  2. 9ms 静默活体检测,小视开源工业级品质算法
  3. wordpress标签大全
  4. 清空input file中的值
  5. java构造器是什么?
  6. Java Class 加密工具 ClassFinal
  7. 实用色轮图(赞、实用)
  8. 哪些 Java 知识不需要再学了
  9. 支付宝当面付扫码支付支付后不回调_支付宝餐饮蓝海计划2020年推出最新版
  10. Web自动化测试(一)—— Web自动化入门