点击关注公众号,Java干货及时送达

来源:blog.csdn.net/qq330983778/article/details/99341671

设计

之前学习Redis的时候发现有赞团队之前分享过一篇关于延时队列的设计:有赞延时队列 现在就尝试实现一下!

业务流程

首先我们分析下这个流程

  1. 用户提交任务。首先将任务推送至延迟队列中。

  2. 延迟队列接收到任务后,首先将任务推送至job pool中,然后计算其执行时间。

  3. 然后生成延迟任务(仅仅包含任务id)放入某个桶中

  4. 时间组件时刻轮询各个桶,当时间到达的时候从job pool中获得任务元信息。

  5. 监测任务的合法性如果已经删除则pass。继续轮询。如果任务合法则再次计算时间

  6. 如果合法则计算时间,如果时间合法:根据topic将任务放入对应的ready queue,然后从bucket中移除。如果时间不合法,则重新计算时间再次放入bucket,并移除之前的bucket中的内容

  7. 消费端轮询对应topic的ready queue。获取job后做自己的业务逻辑。与此同时,服务端将已经被消费端获取的job按照其设定的TTR,重新计算执行时间,并将其放入bucket。

  8. 完成消费后,发送finish消息,服务端根据job id删除对应信息。

对象

我们现在可以了解到中间存在的几个组件

  • 延迟队列,为Redis延迟队列。实现消息传递

  • Job pool 任务池保存job元信息。根据文章描述使用K/V的数据结构,key为ID,value为job

  • Delay Bucket 用来保存业务的延迟任务。文章中描述使用轮询方式放入某一个Bucket可以知道其并没有使用topic来区分,个人这里默认使用顺序插入

  • Timer 时间组件,负责扫描各个Bucket。根据文章描述存在多个Timer,但是同一个Timer同一时间只能扫描一个Bucket

  • Ready Queue 负责存放需要被完成的任务,但是根据描述根据Topic的不同存在多个Ready Queue。

其中Timer负责轮询,Job pool、Delay Bucket、Ready Queue都是不同职责的集合。

任务状态

  • ready:可执行状态,

  • delay:不可执行状态,等待时钟周期。

  • reserved:已被消费者读取,但没有完成消费。

  • deleted:已被消费完成或者已被删除。

对外提供的接口

额外的内容

  1. 首先根据状态状态描述,finish和delete操作都是将任务设置成deleted状态。

  2. 根据文章描述的操作,在执行finish或者delete的操作的时候任务已经从元数据中移除,此时deleted状态可能只存在极短时间,所以实际实现中就直接删除了。

  3. 文章中并没有说明响应超时后如何处理,所以个人现在将其重新投入了待处理队列。

  4. 文章中因为使用了集群,所以使用redis的setnx锁来保证多个时间循环处理多个桶的时候不会出现重复循环。这里因为是简单的实现,所以就很简单的每个桶设置一个时间队列处理。也是为了方便简单处理。关于分布式锁可以看我之前的文章里面有描述。

实现

现在我们根据设计内容完成设计。这一块设计我们分四步完成

任务及相关对象

目前需要两个对象,一个是任务对象(job)一个负责保存任务引用的对象(delay job)

任务对象

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Job implements Serializable {/*** 延迟任务的唯一标识,用于检索任务*/@JsonSerialize(using = ToStringSerializer.class)private Long id;/*** 任务类型(具体业务类型)*/private String topic;/*** 任务的延迟时间*/private long delayTime;/*** 任务的执行超时时间*/private long ttrTime;/*** 任务具体的消息内容,用于处理具体业务逻辑用*/private String message;/*** 重试次数*/private int retryCount;/*** 任务状态*/private JobStatus status;
}

任务引用对象

@Data
@AllArgsConstructor
public class DelayJob implements Serializable {/*** 延迟任务的唯一标识*/private long jodId;/*** 任务的执行时间*/private long delayDate;/*** 任务类型(具体业务类型)*/private String topic;public DelayJob(Job job) {this.jodId = job.getId();this.delayDate = System.currentTimeMillis() + job.getDelayTime();this.topic = job.getTopic();}public DelayJob(Object value, Double score) {this.jodId = Long.parseLong(String.valueOf(value));this.delayDate = System.currentTimeMillis() + score.longValue();}
}

容器

目前我们需要完成三个容器的创建,Job任务池、延迟任务容器、待完成任务容器

job任务池,为普通的K/V结构,提供基础的操作

@Component
@Slf4j
public class JobPool {@Autowiredprivate RedisTemplate redisTemplate;private String NAME = "job.pool";private BoundHashOperations getPool () {BoundHashOperations ops = redisTemplate.boundHashOps(NAME);return ops;}/*** 添加任务* @param job*/public void addJob (Job job) {log.info("任务池添加任务:{}", JSON.toJSONString(job));getPool().put(job.getId(),job);return ;}/*** 获得任务* @param jobId* @return*/public Job getJob(Long jobId) {Object o = getPool().get(jobId);if (o instanceof Job) {return (Job) o;}return null;}/*** 移除任务* @param jobId*/public void removeDelayJob (Long jobId) {log.info("任务池移除任务:{}",jobId);// 移除任务getPool().delete(jobId);}
}

延迟任务,使用可排序的ZSet保存数据,提供取出最小值等操作

@Slf4j
@Component
public class DelayBucket {@Autowiredprivate RedisTemplate redisTemplate;private static AtomicInteger index = new AtomicInteger(0);@Value("${thread.size}")private int bucketsSize;private List <String> bucketNames = new ArrayList <>();@Beanpublic List <String> createBuckets() {for (int i = 0; i < bucketsSize; i++) {bucketNames.add("bucket" + i);}return bucketNames;}/*** 获得桶的名称* @return*/private String getThisBucketName() {int thisIndex = index.addAndGet(1);int i1 = thisIndex % bucketsSize;return bucketNames.get(i1);}/*** 获得桶集合* @param bucketName* @return*/private BoundZSetOperations getBucket(String bucketName) {return redisTemplate.boundZSetOps(bucketName);}/*** 放入延时任务* @param job*/public void addDelayJob(DelayJob job) {log.info("添加延迟任务:{}", JSON.toJSONString(job));String thisBucketName = getThisBucketName();BoundZSetOperations bucket = getBucket(thisBucketName);bucket.add(job,job.getDelayDate());}/*** 获得最新的延期任务* @return*/public DelayJob getFirstDelayTime(Integer index) {String name = bucketNames.get(index);BoundZSetOperations bucket = getBucket(name);Set<ZSetOperations.TypedTuple> set = bucket.rangeWithScores(0, 1);if (CollectionUtils.isEmpty(set)) {return null;}ZSetOperations.TypedTuple typedTuple = (ZSetOperations.TypedTuple) set.toArray()[0];Object value = typedTuple.getValue();if (value instanceof DelayJob) {return (DelayJob) value;}return null;}/*** 移除延时任务* @param index* @param delayJob*/public void removeDelayTime(Integer index,DelayJob delayJob) {String name = bucketNames.get(index);BoundZSetOperations bucket = getBucket(name);bucket.remove(delayJob);}}

待完成任务,内部使用topic进行细分,每个topic对应一个list集合

@Component
@Slf4j
public class ReadyQueue {@Autowiredprivate RedisTemplate redisTemplate;private String NAME = "process.queue";private String getKey(String topic) {return NAME + topic;}/*** 获得队列* @param topic* @return*/private BoundListOperations getQueue (String topic) {BoundListOperations ops = redisTemplate.boundListOps(getKey(topic));return ops;}/*** 设置任务* @param delayJob*/public void pushJob(DelayJob delayJob) {log.info("执行队列添加任务:{}",delayJob);BoundListOperations listOperations = getQueue(delayJob.getTopic());listOperations.leftPush(delayJob);}/*** 移除并获得任务* @param topic* @return*/public DelayJob popJob(String topic) {BoundListOperations listOperations = getQueue(topic);Object o = listOperations.leftPop();if (o instanceof DelayJob) {log.info("执行队列取出任务:{}", JSON.toJSONString((DelayJob) o));return (DelayJob) o;}return null;}}

轮询处理

设置了线程池为每个bucket设置一个轮询操作

@Component
public class DelayTimer implements ApplicationListener <ContextRefreshedEvent> {@Autowiredprivate DelayBucket delayBucket;@Autowiredprivate JobPool     jobPool;@Autowiredprivate ReadyQueue  readyQueue;@Value("${thread.size}")private int length;@Override public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {ExecutorService executorService = new ThreadPoolExecutor(length, length,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue <Runnable>());for (int i = 0; i < length; i++) {executorService.execute(new DelayJobHandler(delayBucket,jobPool,readyQueue,i));}}
}

测试请求

/*** 测试用请求* @author daify**/
@RestController
@RequestMapping("delay")
public class DelayController {@Autowiredprivate JobService jobService;/*** 添加* @param request* @return*/@RequestMapping(value = "add",method = RequestMethod.POST)public String addDefJob(Job request) {DelayJob delayJob = jobService.addDefJob(request);return JSON.toJSONString(delayJob);}/*** 获取* @return*/@RequestMapping(value = "pop",method = RequestMethod.GET)public String getProcessJob(String topic) {Job process = jobService.getProcessJob(topic);return JSON.toJSONString(process);}/*** 完成一个执行的任务* @param jobId* @return*/@RequestMapping(value = "finish",method = RequestMethod.DELETE)public String finishJob(Long jobId) {jobService.finishJob(jobId);return "success";}@RequestMapping(value = "delete",method = RequestMethod.DELETE)public String deleteJob(Long jobId) {jobService.deleteJob(jobId);return "success";}}

测试

添加延迟任务

通过postman请求:localhost:8000/delay/add

此时这条延时任务被添加进了线程池中

2019-08-12 21:21:36.589  INFO 21444 --- [nio-8000-exec-6] d.samples.redis.delay.container.JobPool  : 任务池添加任务:{"delayTime":10000,"id":3,"message":"tag:testid:3","retryCount":0,"status":"DELAY","topic":"test","ttrTime":10000}
2019-08-12 21:21:36.609  INFO 21444 --- [nio-8000-exec-6] d.s.redis.delay.container.DelayBucket    : 添加延迟任务:{"delayDate":1565616106609,"jodId":3,"topic":"test"}

根据设置10秒钟之后任务会被添加至ReadyQueue中

2019-08-12 21:21:46.744  INFO 21444 --- [pool-1-thread-4] d.s.redis.delay.container.ReadyQueue     : 执行队列添加任务:DelayJob(jodId=3, delayDate=1565616106609, topic=test)

获得任务

这时候我们请求localhost:8000/delay/pop

这个时候任务被响应,修改状态的同时设置其超时时间,然后放置在DelayBucket中

2019-08-09 19:36:02.342  INFO 58456 --- [nio-8000-exec-3] d.s.redis.delay.container.ReadyQueue     : 执行队列取出任务:{"delayDate":1565321728704,"jodId":1,"topic":"测试"}
2019-08-09 19:36:02.364  INFO 58456 --- [nio-8000-exec-3] d.samples.redis.delay.container.JobPool  : 任务池添加任务:{"delayTime":10000,"id":1,"message":"延迟10秒,超时30秒","retryCount":0,"status":"RESERVED","topic":"测试","ttrTime":30000}
2019-08-09 19:36:02.384  INFO 58456 --- [nio-8000-exec-3] d.s.redis.delay.container.DelayBucket    : 添加延迟任务:{"delayDate":1565321792364,"jodId":1,"topic":"测试"}

按照设计在30秒后,任务假如没有被消费将会重新放置在ReadyQueue中

2019-08-12 21:21:48.239  INFO 21444 --- [nio-8000-exec-7] d.s.redis.delay.container.ReadyQueue     : 执行队列取出任务:{"delayDate":1565616106609,"jodId":3,"topic":"test"}
2019-08-12 21:21:48.261  INFO 21444 --- [nio-8000-exec-7] d.samples.redis.delay.container.JobPool  : 任务池添加任务:{"delayTime":10000,"id":3,"message":"tag:testid:3","retryCount":0,"status":"RESERVED","topic":"test","ttrTime":10000}

任务的删除/消费

现在我们请求:localhost:8000/delay/delete

此时在Job pool中此任务将会被移除,此时元数据已经不存在,但任务还在DelayBucket中循环,然而在循环中当检测到元数据已经不存的话此延时任务会被移除。

2019-08-12 21:21:54.880  INFO 21444 --- [nio-8000-exec-8] d.samples.redis.delay.container.JobPool  : 任务池移除任务:3
2019-08-12 21:21:59.104  INFO 21444 --- [pool-1-thread-5] d.s.redis.delay.handler.DelayJobHandler  : 移除不存在任务:{"delayDate":1565616118261,"jodId":3,"topic":"test"}

本篇文章涉及的源码下载地址:

https://gitee.com/daifyutils/springboot-samples

热门内容:服务端如何防止订单重复支付!
拜托!不要用“ ! = null " 做判空了
道友自诉:入职中软一个月(外包华为)就离职了!
23 种设计模式的通俗解释,看完秒懂
token多平台身份认证架构设计思路
最近面试BAT,整理一份面试资料《Java面试BAT通关手册》,覆盖了Java核心技术、JVM、Java并发、SSM、微服务、数据库、数据结构等等。获取方式:点“在看”,关注公众号并回复 666 领取,更多内容陆续奉上。
明天见(。・ω・。)ノ♡

SpringBoot 整合:Redis延时队列的简单实现(基于有赞的设计)相关推荐

  1. springboot整合redis消息队列

    前言 消息队列作为一种常用的异步通信解决方案,而redis是一款高性能的nosql产品,今天就给大家介绍一下,如何使用redis实现消息队列,并整合到springboot. 两个消息模型 1. 队列模 ...

  2. springboot整合redis做缓存

    之前的项目中,用到过redis,主要是使用redis做缓存,redis在web开发中使用的场景很多,其中缓存是其中一个很重要的使用场景,之所以用作缓存,得益于redis的读写数据,尤其是在读取数据的时 ...

  3. springboot:整合redis消息队列

    整合redis消息队列 项目依赖 <!-- RedisTemplate --><dependency><groupId>org.springframework.bo ...

  4. SpringBoot整合Redis缓存

    SpringBoot整合Redis缓存 一.缓存概念知识 1.是什么缓存 2.缓存的优缺点 3.为什么使用缓存 二.Redis概念知识 1.Redis简介 2.为什么用Redis作为缓存 3.Redi ...

  5. Redis学习(含 Springboot 整合 Redis)

    Redis NoSQL (not only sql) 在现代的计算系统上每天网络上都会产生庞大的数据量. 这些数据有很大一部分是由关系数据库管理系统(RDBMS)来处理. 1970年 E.F.Codd ...

  6. 8分钟带你学会SpringBoot整合Redis来实现缓存技术

    1.概述 随着互联网技术的发展,对技术要求也越来越高,所以在当期情况下项目的开发中对数据访问的效率也有了很高的要求,所以在项目开发中缓存技术使用的也越来越多,因为它可以极大的提高系统的访问速度,关于缓 ...

  7. Springboot整合redis(lettuce)

    springboot 整合redis(lettuce) 首先确保电脑上装了redis.最好能用redisDesktop查看一下数据情况 redis是一款非常流行的Nosql数据库.redis的功能非常 ...

  8. SpringBoot系列十:SpringBoot整合Redis

    From: https://www.cnblogs.com/leeSmall/p/8728231.html 声明:本文来源于MLDN培训视频的课堂笔记,写在这里只是为了方便查阅. 1.概念:Sprin ...

  9. springboot整合redis,推荐整合和使用案例(2021版)

    背景:手下新人在初次使用springboot整合redis,大部分人习惯从网上检索到一份配置,然后不知其所以然的复制粘贴到项目中,网上搜索到的配置良莠不齐但又万变不离其宗.由于springboot最大 ...

最新文章

  1. 云计算的认识和看法_【云计算】如何理解云计算才是正确的?
  2. 外包网络推广公司浅析想保持稳定的SEO排名和流量,需要做什么呢?
  3. Spring注解编程基石(三)
  4. windows7 docker mysql_DOCKER windows 7 详细安装教程
  5. [转]Android动态加载jar/dex
  6. stack操作 and deque操作
  7. 2021-09-01
  8. 路人实拍Waymo无人车:行为诡异,谨慎到让人怀疑人生
  9. Android Studio3.5 JAVA调用C++源码方法总结
  10. html+css的响应式个人简历
  11. matlab平差实习报告,《测量平差》课程设计实习报告 五星文库
  12. php聊天室发送表情,聊天室技术(六)-- 表情和动作_PHP
  13. 数字信号处理原理及实现一书的思维导图
  14. 淘宝/天猫买家信息 API
  15. 效率脚本:删除已经合并的git分支
  16. Ubuntu 下重启网络的方法
  17. android-处理日期时间 - 随心
  18. “FlipFlo“.它从1计数到100,遇到3的倍数就替换为单词 Flip”,5的倍数就替换为单词“Flop”,既为3的倍数又为5的倍数则替换为单词 ―FlipFlop”其余情况下输出当前数字.
  19. VCC、VDD、VSS
  20. 程旭媛技术面试代码题集锦

热门文章

  1. LaxTex-----参考文献中同名作者被默认缺省的问题
  2. webpack笔记(6)调试模式
  3. 【CV知识学习】early stop、regularation、fine-tuning and some other trick to be known
  4. C#如何根据DataTable生成泛型List或者动态类型list
  5. Asp.net中GridView使用详解(引)【转】
  6. HTTP请求报文和HTTP响应报文(转)
  7. MapXtreme 2005 学习心得 在地图上创建点/线并显示标注(五)
  8. 说说.net事件和委托。
  9. 清华学长带你从宏观角度看递归
  10. 【怎样写代码】工厂三兄弟之工厂方法模式(三):解决方案 II