文章目录

  • Pre
  • 延时任务 VS 定时任务
  • Solutions
    • DB 轮询
      • 核心思想
      • Demo Code
      • 优缺点
    • JDK的Delay Queue
      • 核心思想
      • Demo Code
      • 优缺点
    • 时间轮算法
      • 核心思想
      • Demo Code
      • 优缺点
    • Redis缓存(zset)
      • 核心思想
      • Demo Code
    • Redis缓存(Keyspace Notifications)
      • 核心思想
    • Redisson 延时队列
    • 使用消息队列


Pre

每日一博 - 使用环形队列实现高效的延时消息


延时任务 VS 定时任务

举个例子,开发中常见的延时任务场景:

  • 半小时未支付,取消订单

延时任务和定时任务的几个小区别,梳理下:

  • 定时任务有明确的触发时间,延时任务没有
  • 定时任务有执行周期,而延时任务在某事件触发后一段时间内执行,没有执行周期
  • 定时任务一般执行的是批处理操作是多个任务,而延时任务一般是单个任务

Solutions

DB 轮询

核心思想

通过定时任务扫描,执行业务逻辑。


Demo Code

参考实现如下:

  <dependency><groupId>org.quartz-scheduler</groupId><artifactId>quartz</artifactId><version>2.2.2</version></dependency>
import org.quartz.JobBuilder;
import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.SchedulerFactory;
import org.quartz.SimpleScheduleBuilder;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.impl.StdSchedulerFactory;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;public class MyJob implements Job {public void execute(JobExecutionContext context)throws JobExecutionException {System.out.println("模拟扫描任务。。。。。");}public static void main(String[] args) throws Exception {// 创建任务JobDetail jobDetail = JobBuilder.newJob(MyJob.class).withIdentity("job1", "group1").build();// 创建触发器 每3秒钟执行一次Trigger trigger = TriggerBuilder.newTrigger().withIdentity("trigger1", "group3").withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(3).repeatForever()).build();Scheduler scheduler = new StdSchedulerFactory().getScheduler();// 将任务及其触发器放入调度器scheduler.scheduleJob(jobDetail, trigger);// 调度器开始调度任务scheduler.start();}
}

优缺点

优点: 简单 (好像也没有其他的优点了 哈哈哈 )

缺点:

  • (1)占用资源,对服务器内存消耗大

  • (2)存在延迟,比如你每隔n分钟扫描一次,那最坏的延迟时间就是n分钟

  • (3)如果表的数据量较大,每隔几分钟这样扫描一次,性能堪忧,DB压力较大


JDK的Delay Queue

核心思想

利用JDK自带的DelayQueue来实现, 无界阻塞队列,该队列只有在延迟期满的时候才能从中获取元素,放入DelayQueue中的对象,必须实现Delayed接口。

  • poll():获取并移除队列的超时元素,没有则返回空
  • take():获取并移除队列的超时元素,如果没有则wait当前线程,直到有元素满足超时条件,返回结果。

Demo Code

 import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/9/2 22:50* @mark: show me the code , change the world*/
public class TicketDelay implements Delayed {private String ticketId;private long timeout;public TicketDelay(String ticketId, long timeout) {this.ticketId= ticketId;this.timeout = timeout + System.nanoTime();}@Overridepublic int compareTo(Delayed other) {if (other == this) {return 0;}TicketDelay t = (TicketDelay) other;long d = (getDelay(TimeUnit.NANOSECONDS) - t.getDelay(TimeUnit.NANOSECONDS));return (d == 0) ? 0 : ((d < 0) ? -1 : 1);}/*** 返回距离自定义的超时时间还有多少* @param unit* @return*/@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(timeout - System.nanoTime(),TimeUnit.NANOSECONDS);}void doSomething() {System.out.println(ticketId+" is deleted");}
}
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/9/2 22:51* @mark: show me the code , change the world*/
public class DelayQueueDemo {public static void main(String[] args) {// 模拟数据List<String> list = new ArrayList<>();list.add("Ticket-1");list.add("Ticket-2");list.add("Ticket-3");list.add("Ticket-4");list.add("Ticket-5");// 延时队列DelayQueue<TicketDelay> queue = new DelayQueue<>();for (int i = 0; i < 5; i++) {long start = System.currentTimeMillis();//延迟2秒取出queue.put(new TicketDelay(list.get(i), TimeUnit.NANOSECONDS.convert(2, TimeUnit.SECONDS)));System.out.println("biubiubiu ~ " + (System.currentTimeMillis() - start) + " MilliSeconds ");try {queue.take().doSomething();System.out.println("biubiubiu  " + (System.currentTimeMillis() - start) + " MilliSeconds 取到了数据,开始后执行业务操作");} catch (InterruptedException e) {e.printStackTrace();}System.out.println("===========================\n" );}}
}

优缺点

优点:

  • 效率高,任务触发时间延迟低

缺点:

  • 服务器重启后,数据全部消失,怕宕机

  • 集群扩展相当麻烦

  • 因为内存条件限制的原因,如数据太多,那么很容易就出现OOM异常

  • 代码复杂度较高


时间轮算法

每日一博 - 使用环形队列实现高效的延时消息

延时消息之时间轮


核心思想

其实本质上它就是一个环形的数组,如图所示,假设我们创建了一个长度为 8 的时间轮。

task0 = 当我们需要新建一个 5s 延时消息,则只需要将它放到下标为 5 的那个槽中。

task1 = 而如果是一个 10s 的延时消息,则需要将它放到下标为 2 的槽中,但同时需要记录它所对应的圈数,不然就和 2 秒的延时消息重复了。

task2= 当创建一个 21s 的延时消息时,它所在的位置就和 task0 相同了,都在下标为 5 的槽中,所以为了区别需要为他加上圈数为 2。

当我们需要取出延时消息时,只需要每秒往下移动这个指针,然后取出该位置的所有任务即可。

当然取出任务之前还得判断圈数是否为 0 ,不为 0 时说明该任务还得再轮几圈,同时需要将圈数 -1 。

这样就可避免轮询所有的任务,不过如果时间轮的槽比较少,导致某一个槽上的任务非常多那效率也比较低,这就和 HashMap 的 hash 冲突是一样的。


时间轮算法可以类比于时钟, (指针)按某一个方向按固定频率轮动,每一次跳动称为一个 tick。这样可以看出定时轮由个3个重要的属性参数,ticksPerWheel(一轮的tick数),tickDuration(一个tick的持续时间)以及 timeUnit(时间单位)。

例如当ticksPerWheel=60,tickDuration=1,timeUnit=秒,这就和现实中的始终的秒针走动完全类似了。

如果当前指针指在1上面,我有一个任务需要4秒以后执行,那么这个执行的线程回调或者消息将会被放在5上。那如果需要在20秒之后执行怎么办,由于这个环形结构槽数只到8,如果要20秒,指针需要多转2圈。位置是在2圈之后的5上面(20 % 8 + 1)


Demo Code

我们用Netty的HashedWheelTimer来实现

 <dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.24.Final</version></dependency>

import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;import java.util.concurrent.TimeUnit;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/9/2 23:30* @mark: show me the code , change the world*/
public class HashedWheelTimerTest {static class MyTimerTask implements TimerTask {boolean flag;public MyTimerTask(boolean flag) {this.flag = flag;}@Overridepublic void run(Timeout timeout) throws Exception {System.out.println("要去执行业务啦....");this.flag = false;}}public static void main(String[] argv) {MyTimerTask timerTask = new MyTimerTask(true);Timer timer = new HashedWheelTimer();timer.newTimeout(timerTask, 5, TimeUnit.SECONDS);int i = 1;while (timerTask.flag) {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(i + "秒过去了");i++;}}
}

优缺点

优点

  • 效率高,任务触发时间延迟时间比delayQueue低,代码复杂度比delayQueue低。

缺点:

  • 服务器重启后,数据全部消失,怕宕机

  • 集群扩展相当麻烦

  • 因为内存条件限制的原因,比如数据太多,那么很容易就出现OOM异常


Redis缓存(zset)

核心思想

利用redis的zset,zset是一个有序集合,每一个元素(member)都关联了一个score,通过score排序来取集合中的值

# 添加单个元素redis> ZADD page_rank 10 google.com
(integer) 1# 添加多个元素redis> ZADD page_rank 9 baidu.com 8 bing.com
(integer) 2redis> ZRANGE page_rank 0 -1 WITHSCORES
1) "bing.com"
2) "8"
3) "baidu.com"
4) "9"
5) "google.com"
6) "10"# 查询元素的score值
redis> ZSCORE page_rank bing.com
"8"# 移除单个元素redis> ZREM page_rank google.com
(integer) 1redis> ZRANGE page_rank 0 -1 WITHSCORES
1) "bing.com"
2) "8"
3) "baidu.com"
4) "9"

那么如何实现呢?我们将订单超时时间戳与订单号分别设置为score和member,系统扫描第一个元素判断是否超时

Demo Code


import java.util.Calendar;
import java.util.Set;import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Tuple;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/9/4 21:04* @mark: show me the code , change the world*/
public class RedisDelayQueue {private static final String ADDR = "127.0.0.1";private static final int PORT = 6379;private static JedisPool jedisPool = new JedisPool(ADDR, PORT);public static Jedis getJedis() {return jedisPool.getResource();}/*** 生产者,生成5个订单放进去*/public void productionDelayMessage() {for (int i = 0; i < 5; i++) {//延迟3秒Calendar cal1 = Calendar.getInstance();cal1.add(Calendar.SECOND, 3);int second3later = (int) (cal1.getTimeInMillis() / 1000);RedisDelayQueue.getJedis().zadd("OrderId", second3later, "ARTISAN_ID_" + i);System.out.println(System.currentTimeMillis() + "ms:redis生成了订单:订单ID为" + "ARTISAN_ID_" + i);}}/*** 消费者,取订单*/public void consumerDelayMessage() {Jedis jedis = RedisDelayQueue.getJedis();while (true) {Set<Tuple> items = jedis.zrangeWithScores("OrderId", 0, 1);if (items == null || items.isEmpty()) {System.out.println("当前没有等待的任务");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}continue;}int score = (int) ((Tuple) items.toArray()[0]).getScore();Calendar cal = Calendar.getInstance();int nowSecond = (int) (cal.getTimeInMillis() / 1000);if (nowSecond >= score) {String orderId = ((Tuple) items.toArray()[0]).getElement();jedis.zrem("OrderId", orderId);System.out.println(System.currentTimeMillis() + "ms:redis消费了一个任务:消费的订单Id为" + orderId);}}}public static void main(String[] args) {RedisDelayQueue appTest = new RedisDelayQueue();appTest.productionDelayMessage();appTest.consumerDelayMessage();}
}

上面的代码有个硬伤:在高并发条件下,多消费者会取到同一个订单号。


import java.util.concurrent.CountDownLatch;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/9/4 21:21* @mark: show me the code , change the world*/
public class MTest {private static final int threadNum = 10;private static CountDownLatch cdl = new CountDownLatch(threadNum);static class DelayMessage implements Runnable {@Overridepublic void run() {try {cdl.await();} catch (InterruptedException e) {e.printStackTrace();}RedisDelayQueue appTest = new RedisDelayQueue();appTest.consumerDelayMessage();}}public static void main(String[] args) {RedisDelayQueue appTest = new RedisDelayQueue();appTest.productionDelayMessage();for (int i = 0; i < threadNum; i++) {new Thread(new DelayMessage()).start();cdl.countDown();}}
}

显然,出现了多个线程消费同一个资源的情况

解决方案

  • (1)用分布式锁,但是用分布式锁,性能下降了,不推荐

  • (2)对ZREM的返回值进行判断,只有大于0的时候,才消费数据,于是将consumerDelayMessage()方法里的

 if(nowSecond >= score){String orderId = ((Tuple)items.toArray()[0]).getElement();jedis.zrem("OrderId", orderId);System.out.println(System.currentTimeMillis()+"ms:redis消费了一个任务:消费的订单OrderId为"+orderId);}

修改为

 if(nowSecond >= score){String orderId = ((Tuple)items.toArray()[0]).getElement();Long num = jedis.zrem("OrderId", orderId);if( num != null && num>0){System.out.println(System.currentTimeMillis()+"ms:redis消费了一个任务:消费的订单OrderId为"+orderId);}}

Redis缓存(Keyspace Notifications)

核心思想

该方案使用redis的Keyspace Notifications,中文翻译就是键空间机制,就是利用该机制可以在key失效之后,提供一个回调,实际上是redis会给客户端发送一个消息。 redis版本2.8以上。

在redis.conf中,加入一条配置

notify-keyspace-events Ex

import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPubSub;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/9/4 21:38* @mark: show me the code , change the world*/
public class RedisTest {private static final String ADDR = "127.0.0.1";private static final int PORT = 6379;private static JedisPool jedis = new JedisPool(ADDR, PORT);private static RedisSub sub = new RedisSub();public static void init() {new Thread(() -> jedis.getResource().subscribe(sub, "__keyevent@0__:expired")).start();}public static void main(String[] args) throws InterruptedException {init();for (int i = 0; i < 10; i++) {String orderId = "OID000000" + i;jedis.getResource().setex(orderId, 3, orderId);System.out.println(System.currentTimeMillis() + "ms:" + orderId + "订单生成");}}static class RedisSub extends JedisPubSub {@Overridepublic void onMessage(String channel, String message) {System.out.println(System.currentTimeMillis() + "ms:" + message + "订单取消");}}
}

Redis的发布/订阅目前是即发即弃(fire and forget)模式的,因此无法实现事件的可靠通知。也就是说,如果发布/订阅的客户端断链之后又重连,则在客户端断链期间的所有事件都丢失了。
因此,Keyspace Notifications不是太推荐。当然,如果你对可靠性要求不高,可以使用。


Redisson 延时队列

使用消息队列

每日一博 - 延时任务的多种实现方式解读相关推荐

  1. 每日一博 - DelayQueue阻塞队列源码解读

    文章目录 Pre DelayQueue特征 Leader/Followers模式 DelayQueue源码分析 类继承关系 核心方法 成员变量 构造函数 入队方法 offer(E e) 出队方法 po ...

  2. 每日一博 - Review线程池_02

    文章目录 Pre 使用场景 场景1:响应速度优先 场景2:吞吐量优先 线程池设置不合理发生的那些故障 线程池的参数如何评估和配置??? 不用线程池? 万能公式? 线程池参数动态化? 线程池的监控 Pr ...

  3. Agile - 埃杰团队每日例会博客目录

    Agile - 埃杰团队每日例会博客目录 项目 这个作业属于哪个课程 2023北航敏捷软件工程 这个作业的要求在哪里 团队项目-每日例会报告 我们在这个课程的目标是 学习并实践软件工程开发的方法论.在 ...

  4. 《OSChina每日一博》2018年05月整理合集

    <OSChina每日一博>2018年05月整理合集 简介 收录开源中国每日推荐的优秀博客文章,开源中国每日会推荐一篇比较优秀的博客文章,称之为每日一bo,文章实属精品,收藏于此,供自己慢慢 ...

  5. mysql表的设计几种方式_支持多种登录方式的数据表设计 | 六阿哥博客

    一个带有用户系统的应用最基本登录方式是站内账号登录,但这种方式往往不能满足我们的需求.现在的应用基本都有站内账号.email.手机和一堆第三方登录,那么如果需要支持这么多种登录方式,或者还有银行卡登录 ...

  6. C# 高性能 TCP 服务的多种实现方式

    哎~~ 想想大部分园友应该对 "高性能" 字样更感兴趣,为了吸引眼球所以标题中一定要突出,其实我更喜欢的标题是<猴赛雷,C# 编写 TCP 服务的花样姿势!>. 本篇文 ...

  7. 你有没有遇到要实现多种登录方式的场景丫 一起来看看咯 Spring Security 实现多种登录方式,如常规方式外的邮件、手机验证码登录

    你好丫,我是博主宁在春,一起加油吧!!! 不知道, 你在用Spring Security的时候,有没有想过,用它实现多种登录方式勒,这次我的小伙伴就给我提了一些登录方面的需求,需要在原有账号密码登录的 ...

  8. C#高性能TCP服务的多种实现方式

    ☆ 哎~~ 想想大部分园友应该对 "高性能" 字样更感兴趣,为了吸引眼球所以标题中一定要突出,其实我更喜欢的标题是<猴赛雷,C#编写TCP服务的花样姿势!>. 本篇文章 ...

  9. python进阶11并发之七多种并发方式的效率测试

    原创博客地址:python进阶11并发之七多种并发方式的效率测试 测试map,apply_async,gevent协程爬虫 测试代码:网页爬虫 函数代码 1 2 3 4 5 6 7 8 9 10 11 ...

最新文章

  1. 使用git更新github上的开源项目
  2. 求助:我有一辆机器人小车,怎么让它跑起来,还会避障、目标跟踪、路径规划?...
  3. java中break,continue,標籤實現goto效果(編程思想)
  4. 捕捉Entity framework 6的详细异常提示
  5. 重新修复安装.netframework2.0
  6. 收获,不止SQL优化——抓住SQL的本质--第三章
  7. 如何将Eclipse 的JavaWeb工程部署到Tomcat的webapps目录下
  8. Python爬取网易云热歌榜所有音乐及其热评
  9. python代码缩进中是否支持tab键和空格混用_python自测——编码规范
  10. 【浙大第19届校赛:G】Postman(贪心)
  11. Eclipse调试:改变颜色, 背景与字体大小 和xml字体调整
  12. 百元性价比高的蓝牙耳机推荐:学生党适合使用的蓝牙耳机
  13. (转)人工智能公司Kensho是如何改变华尔街的?
  14. VSCode远程连接服务器报错:Could not establish connection to “xxx”,Faild to write install script to path!【已解决】
  15. python pgm 转 bmp
  16. Centos安装firefox
  17. Spring Boot+Vue(一)node.js环境搭建
  18. HTTP缓存机制和原理
  19. java 蓝牙_通过Java代码连接到蓝牙设备
  20. 我是主考官7:他为什么没有被录取

热门文章

  1. 部署必备之Docker
  2. python elasticsearch
  3. 递归神经网络预测股票好文章
  4. html 书架样式css,CSS3 响应式书架布局
  5. java自定义类怎么比大小_实战:Java 扑克牌比较游戏
  6. 在有序旋转数组中找到最小值
  7. Pytorch离线安装的纯净版
  8. 用pytorch实现简易RNN
  9. 多面性的打赏功能,由直播行业引发的一点思考
  10. Hadoop在Ubuntu下的安装配置(配置成功)