Redisson延迟队列
场景:
需求:
支付的二维码,超过两个小时以后,如果还未支付,则自动转为取消支付,或者支付超时的状态
需求分析:
1,动态定时任务:
每个支付的二维码创建的时候,创建一个动态的定时任务,两个小时候自动执行,更新支付状态,可以解决这个问题。
(1)持久化:
如果服务重启了,动态定时任务会丢失,导致部分数据没办法更新状态。
(2)分布式:
如果当服务重启时,自动扫描数据,重新计算时间,再次创建动态定时任务。可以解决(1)的问题,但是当分布式,多个节点的时候,都会重新加载所有的任务,这样性能上不是最优解,只能在数据源上加上节点名称,不同的服务节点,加载属于自己的定时任务,可以解决这个问题。总的想想,太麻烦了,还是算了。
2,Redisson延迟队列
(1)持久化:队列信息放在Redis上,服务重启不影响。
(2)分布式:多节点去Redis拿去数据,谁抢到算谁的,不会存在同一个任务,多个节点支持。唯一不足就是过度依赖Redis,万一Redis崩了,那就凉凉了(那就是要把Redis配置高可用,当前业务就不用管了)。总体来说还是比较好用的。
实现
1,创建延迟队列的监听任务【RedisDelayedQueueListener】,消费延迟队列
2,创建新增延迟队列的类,用于创建延迟队列
3,整体初始化,把监听任务与spring绑定,扫描各个监听延迟队列的实现类,并开启单独线程,监听任务。
4,创建延迟任务(开始测试使用)
连接Redis
不贴代码了,自己在网上搜
监听延迟队列
接口:
/*** 队列事件监听接口,需要实现这个方法** @module* @author frank* @date 2021/8/19 10:50*/
public interface RedisDelayedQueueListener<T> {/*** 执行方法** @param t*/void invoke(T t);
}
实现:
import com.sxmaps.netschool.common.redisson.RedisDelayedQueueListener;
import com.sxmaps.netschool.service.vo.school.SchoolAccountPayStateReqVO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** 支付二维码监听器** @module* @author frank* @date 2021/8/19 10:49*/
@Component
public class PayQCordListener implements RedisDelayedQueueListener<SchoolAccountPayStateReqVO> {private final Logger logger = LoggerFactory.getLogger(PayQCordListener.class);@Autowiredprivate SchoolAccountService schoolAccountService;@Overridepublic void invoke(SchoolAccountPayStateReqVO payStateReqVO) {logger.info("支付二维码-延迟失效,内容:{}", payStateReqVO);//处理业务,更新二维码状态logger.info("支付二维码-延迟失效,内容:{},处理结果:{}", payStateReqVO,respDTO);}
}
增加延迟队列
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.concurrent.TimeUnit;/*** 增加延迟信息** @author frank* @module* @date 2021/8/19 10:49*/
@Component
public class RedisDelayedQueue {private final Logger logger = LoggerFactory.getLogger(RedisDelayedQueue.class);@AutowiredRedissonClient redissonClient;/*** 添加队列** @param t DTO传输类* @param delay 时间数量* @param timeUnit 时间单位* @param <T> 泛型*/private <T> void addQueue(T t, long delay, TimeUnit timeUnit, String queueName) {logger.info("添加延迟队列,监听名称:{},时间:{},时间单位:{},内容:{}" , queueName, delay, timeUnit,t);RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);delayedQueue.offer(t, delay, timeUnit);}/*** 添加队列-秒** @param t DTO传输类* @param delay 时间数量* @param <T> 泛型*/public <T> void addQueueSeconds(T t, long delay, Class<? extends RedisDelayedQueueListener> clazz) {addQueue(t, delay, TimeUnit.SECONDS, clazz.getName());}/*** 添加队列-分** @param t DTO传输类* @param delay 时间数量* @param <T> 泛型*/public <T> void addQueueMinutes(T t, long delay, Class<? extends RedisDelayedQueueListener> clazz) {addQueue(t, delay, TimeUnit.MINUTES, clazz.getName());}/*** 添加队列-时** @param t DTO传输类* @param delay 时间数量* @param <T> 泛型*/public <T> void addQueueHours(T t, long delay, Class<? extends RedisDelayedQueueListener> clazz) {addQueue(t, delay, TimeUnit.HOURS, clazz.getName());}/*** 添加队列-天** @param t DTO传输类* @param delay 时间数量* @param <T> 泛型*/public <T> void addQueueDays(T t, long delay, Class<? extends RedisDelayedQueueListener> clazz) {addQueue(t, delay, TimeUnit.DAYS, clazz.getName());}
}
整体初始化
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;import java.util.Map;/*** 初始化队列监听** @module* @author frank* @date 2021/8/19 10:49*/
@Component
public class RedisDelayedQueueInit implements ApplicationContextAware {private final Logger logger = LoggerFactory.getLogger(RedisDelayedQueueInit.class);@AutowiredRedissonClient redissonClient;/*** 获取应用上下文并获取相应的接口实现类** @param applicationContext* @throws BeansException*/@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {Map<String, RedisDelayedQueueListener> map = applicationContext.getBeansOfType(RedisDelayedQueueListener.class);for (Map.Entry<String, RedisDelayedQueueListener> taskEventListenerEntry : map.entrySet()) {String listenerName = taskEventListenerEntry.getValue().getClass().getName();startThread(listenerName, taskEventListenerEntry.getValue());}}/*** 启动线程获取队列*** @param queueName queueName* @param redisDelayedQueueListener 任务回调监听* @param <T> 泛型* @return*/private <T> void startThread(String queueName, RedisDelayedQueueListener redisDelayedQueueListener) {RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);//服务重启后,无offer,take不到信息。redissonClient.getDelayedQueue(blockingFairQueue);//由于此线程需要常驻,可以新建线程,不用交给线程池管理Thread thread = new Thread(() -> {logger.info("启动监听队列线程" + queueName);while (true) {try {T t = blockingFairQueue.take();logger.info("监听队列线程,监听名称:{},内容:{}", queueName, t);redisDelayedQueueListener.invoke(t);} catch (Exception e) {logger.info("监听队列线程错误,", e);}}});thread.setName(queueName);thread.start();}}
创建延迟任务
@Autowired
RedisDelayedQueue queue;
.................queue.addQueueHours(new SchoolAccountPayStateReqVO(dto.getPayNo()),2, PayQCordListener.class);
本文参考了:https://my.oschina.net/wangnian/blog/3167316【王念博客】
Redisson延迟队列相关推荐
- redisson究极爽文-手把手带你实现redisson的发布订阅,消息队列,延迟队列(死信队列),(模仿)分布式线程池
参考资料 :分布式中间件实战:java版 (书籍), 多线程视频教程(视频)- 项目启动环境 导入依赖 <parent><groupId>org.springframework ...
- 电商项目订单取消(Redis 延迟队列)--1
现功能时的选择很重要,如果你的系统所处理的数据量不是很大,我觉得队列和缓存很适合你,这样你可以对消息的传递更加了解,但你使用MQ,kafka的中间件时,你会发现使用起来更加轻松,但对于数据量大的系统来 ...
- Redis实现排行榜、延迟队列、LRU、消息已读未读(Redisson客户端实现)
目录 序言 Redis客户端选型 Redis配置 Redis实现排行榜 Redis实现延迟队列 Redis LRU(Least Recently Used)使用 Redis实现消息已读未读 总结 序言 ...
- SpringBoot整合Redisson实现延迟队列
SpringBoot整合Redisson实现延迟队列 技术选型 引入 Redisson 依赖 配置项 编写工具类 延迟队列执行器 业务消费类枚举 加载消费队列 消费者类 测试类 测试结果 技术选型 关 ...
- redis延迟队列 实现_灵感来袭,基于Redis的分布式延迟队列
一.延迟队列 延迟队列,也就是一定时间之后将消息体放入队列,然后消费者才能正常消费.比如1分钟之后发送短信,发送邮件,检测数据状态等. 二.Redisson Delayed Queue 如果你项目中使 ...
- Redisson 延时队列 原理 详解
花了一天研究了下Redisson 的延时队列,RBlockingQueue ,RDelayedQueue . 网上没一个说清楚的,而且都是说轮询redis的zset,都是错误的! 让我来纠正,如果我有 ...
- 消息延迟队列处理拼团时间到期
1.RabbitMqConfig /*** * 延时队列交换机* * 注意这里的交换机类型:CustomExchange* ** * @return* */ @Bean public CustomEx ...
- 面试官:RabbitMQ本身不支持延迟队列,那你给我实现一个?
以下文章来源方志朋的博客,回复"666"获面试宝典 RabbitMQ本身没有延迟队列的支持,但是基于其本身的一些特性,可以做到类似延迟队列的效果:基于死信交换器+TTL. 以下介绍 ...
- RabbitMQ 延迟队列实现定时任务的正确姿势,你学会了么?
以下文章来源方志朋的博客,回复"666"获面试宝典 场景 开发中经常需要用到定时任务,对于商城来说,定时任务尤其多,比如优惠券定时过期.订单定时关闭.微信支付2小时未支付关闭订单等 ...
- RabbitMQ 延迟队列,太实用了!
点击关注公众号,Java干货及时送达 目前常见的应用软件都有消息的延迟推送的影子,应用也极为广泛,例如: 淘宝七天自动确认收货.在我们签收商品后,物流系统会在七天后延时发送一个消息给支付系统,通知支付 ...
最新文章
- 【工具类】时间相关的方法
- 成功解决pandas\core\indexing.py:179: SettingWithCopyWarning: A value is trying to be set on a copy of a
- C++实现欧拉的totient 函数(Euler’s totient function)(附完整源码)
- 抠图为什么要用绿布_《暮白首》为什么如此火爆?五个原因带你深度剖析
- bpl开发模式_BPL的完整形式是什么? 什么是电力线宽带
- 企业即时通讯市场增长500%
- uri uri_什么是URI? 了解许可证术语以确保合规
- 最炫python表白代码_python炫酷烟花表白源代码
- html中改变图标颜色,使用CSS更改图标的颜色
- eclipse如何设置中文
- SCU2016-05 I题 trie图 + 大数dp
- electron 自定义标题栏_如何在Electron Framework中创建自定义标题栏(灵感来自Visual Studio Code标题栏)...
- c字打头的语言英语单词,C字开头的励志的英文单词要C字开头的~例如Champion,Confidence,...-c英语开头名词-英语-柯拿拷同学...
- 面向对象和面向过程编程
- 内置函数2 递归函数
- Linux设置自动重启脚本
- 七段显示器显示整数C语言答案,C语言程序设计试卷(含答案)(7页)-原创力文档...
- 找懂的大佬做一个闲鱼监控软件,大概要求如下。
- 基于springboot写的毕业设计星嘉购物系统设计与实现(附资源下载)
- 区块链赋能不动产—易居EBaaS在不动产领域应用