1.java实现延迟消息(队列DelayQueue)

DelayQueue是一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。

缓存系统的设计:这里使用DelayQueue保存缓存元素的有效期,一个线程(生产者)设置失效实现循环添加消息,使用一个线程(消费者)循环查询
DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了

应用场景:

  • 消息生产和消费有时间窗口要求,例如在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在 30 分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。如支付未完成,则关闭订单。如已完成支付则忽略。
  • 通过消息触发一些定时任务,例如在某一固定时间点向用户发送提醒消息。

前提条件:放置在DelayQueue的元素需要实现Delayed接口,Delayed接口使对象成为延迟对象,它使存放在DelayQueue类中的对象具有了激活日期

CompareTo(Delayed o):Delayed接口继承了Comparable接口,因此有了这个方法。
getDelay(TimeUnit unit):这个方法返回到激活日期的剩余时间,时间单位由单位参数指定。

2.实现Delayed接口

package com.violet.Queue;import java.time.format.DateTimeFormatter;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;public class MessageData implements Delayed {private static final AtomicLong atomic = new AtomicLong(0);// 数据的失效时间点private final long time;// 序号private final long seqno;/*** @param deadline 数据失效时间点*/public MessageData(long deadline) {this.time = deadline;//序号自增类似于i++ ,这里使用AtomicLong实现原子操作this.seqno = atomic.getAndIncrement();}/*** 返回剩余有效时间** @param unit 时间单位*/@Overridepublic long getDelay(TimeUnit unit) {//查看是不是当这个时间到期时,消息被消费System.out.println(unit.convert(this.time - System.currentTimeMillis(), TimeUnit.NANOSECONDS));return unit.convert(this.time - System.currentTimeMillis(), TimeUnit.NANOSECONDS);}/*** 比较两个Delayed对象的大小, 比较顺序如下:* 1. 如果是对象本身, 返回0;* 2. 比较失效时间点, 先失效的返回-1,后失效的返回1;* 3. 比较元素序号, 序号小的返回-1, 否则返回1.* 4. 非Data类型元素, 比较剩余有效时间, 剩余有效时间小的返回-1,大的返回1,相同返回0*/@Overridepublic int compareTo(Delayed other) {if (other == this)  // compare zero if same objectreturn 0;if (other instanceof MessageData) {MessageData x = (MessageData) other;// 优先比较失效时间long diff = this.time - x.time;if (diff < 0)return -1;else if (diff > 0)return 1;else if (this.seqno < x.seqno)    // 剩余时间相同则比较序号return -1;elsereturn 1;}// 一般不会执行到此处,除非元素不是MessageData类型long diff = this.getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS);return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;}@Overridepublic String toString() {return "Data{" +"time=" + time +", seqno=" + seqno +"}, isValid=" + isValid();}private boolean isValid() {return this.getDelay(TimeUnit.NANOSECONDS) > 0;}}

3.生产者

package com.violet.Queue;import java.util.Date;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ThreadLocalRandom;public class Producer implements Runnable {private final DelayQueue<MessageData> queue;public Producer(DelayQueue<MessageData> queue) {this.queue = queue;}@Overridepublic void run() {while (true) {//选取系统当前时间+随机生成的时间 来设置消息失效时间long currentTime = System.currentTimeMillis();long validTime = ThreadLocalRandom.current().nextLong(1000L, 7000L);MessageData data = new MessageData(currentTime + validTime);queue.put(data);System.out.println(Thread.currentThread().getName() + ": put " + data);try {//为了效果显著这里将线程停的时间长一点Thread.sleep(1000000000);} catch (InterruptedException e) {e.printStackTrace();}}}
}

4.消费者

package com.violet.Queue;import java.util.concurrent.DelayQueue;public class Consumer implements Runnable {private final DelayQueue<MessageData> queue;public Consumer(DelayQueue<MessageData> queue) {this.queue = queue;}@Overridepublic void run() {while (true) {try {MessageData data = queue.take();System.out.println(Thread.currentThread().getName() + ": take " + data);Thread.yield();} catch (InterruptedException e) {e.printStackTrace();}}}
}

5.main方法

package com.violet.Queue;import java.util.concurrent.DelayQueue;public class Main {public static void main(String[] args) {DelayQueue<MessageData> queue = new DelayQueue<>();Thread c1 = new Thread(new Consumer(queue), "consumer");Thread p1 = new Thread(new Producer(queue), "producer");c1.start();p1.start();}
}

6.效果

java实现消息队列以及延迟消息(队列DelayQueue)相关推荐

  1. MQ延迟队列实现延迟消息

    在开发中经常会遇到延时任务的需求,例如在12306购买车票,若生成订单30分钟未支付则自动取消:还有在线商城完成订单后48小时不评价 ,自动5星好评.像这类在某事件触发后一段时间内执行的需求任务我们称 ...

  2. 消息队列和延迟消息队列

    应用场景 1.商品秒杀 短时间内出现爆发式的用户请求,如果不采取相关的措施,会导致服务器忙不过来,响应超时的问题,轻则会导致服务假死,重则会让服务器直接宕机. 这个时候加上了消息队列,服务器接收到用户 ...

  3. 发送延迟消息_微信延迟消息的发送方式

    Part one. 如何使用微信来发送延迟消息呢? Part two. 之前一次偶然的机会,想要定时发送一个消息给朋友,于是产生了微信能否发送定时消息这个念头.去百度了一下,找到了方法.微信是可以定时 ...

  4. kafka消息消费有延迟_消息中间件选型分析---从Kafka与RabbitMQ的对比来看全局

    有很多网友留言:公司要做消息中间件选型,该如何选?你觉得哪个比较好?消息选型的确是一个大论题,实则说来话长的事情又如何长话短说.对此笔者专门撰稿一篇内功心法:如何看待消息中间件的选型,不过这篇只表其意 ...

  5. RocketMQ延迟消息的代码实战及原理分析

    RocketMQ简介 RocketMQ是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的.高可靠.万亿级容量.灵活可伸缩的消息发布与订阅服务. 它前身是MetaQ,是阿里基于Kafka ...

  6. 深入理解RocketMQ延迟消息

    延迟消息是实际开发中一个非常有用的功能,本文第一部分从整体上介绍秒级精度延迟消息的实现思路,在第二部分结合RocketMQ的延迟消息实现,进行细致的讲解,点出关键部分的源码.第三步介绍延迟消息与消息重 ...

  7. RocketMQ源码分析之延迟消息

    文章目录 前言 一.延迟消息 1.特点 2.使用场景 3.demo 二.发送延迟消息 三.broker端存储延迟消息 四.总结 1.延迟消息工作原理 2.延迟消息在消费者消费重试中的应用 前言 本篇文 ...

  8. 延迟消息的五种实现方案

    生产者把消息发送到消息队列中以后,并不期望被立即消费,而是等待指定时间后才可以被消费者消费,这类消息通常被称为延迟消息.延迟消息的应用场景其实是非常的广泛,比如以下的场景: 网上直播授课时,在课程开始 ...

  9. RabbitMQ延迟消息场景分析以及实现两种方式(SpringBoot)

    使用场景,不限于下面 用户下订单结束,如果用户未及时支付,后续需要取消订单,可以怎么做?定时任务可以做,但是不能接近实时或者消耗数据库性能太大. [数据库定时任务方案]:定时任务可以做到订单的状态的改 ...

最新文章

  1. UA OPTI512R 傅立叶光学导论15 2-D Fourier变换与Hankel变换
  2. JVM Class字节码之三-使用BCEL改变类属性
  3. Maven引入外部jar的几种方法
  4. WatchOS系统开发大全(3)-创建第一个WatchApp工程
  5. boost::iostreams::detail::path用法的测试程序
  6. C++实现successive approximation渐进法(附完整源码)
  7. Swift学习字符串、数组、字典
  8. 清零 css,css样式清零及常用类
  9. 关于计算机的英语作文八百字,小学英语作文800字(精选10篇)
  10. 辐流式重力浓缩池计算_注册考试重点!平流式、竖流式、辐流式、斜板式4大沉淀池构型...
  11. 怎么自动缩进_Python 的缩进是不是反人类的设计?
  12. “日薄西山”的摩托罗拉推出VR头显,这回靠谱吗?
  13. [PsTools]psexec.exe使用范例-运行远程电脑程序(exe、bat等)
  14. php万能表单制作教程,万能表单系统
  15. win7绕过开机密码
  16. 斑马旅游在千帆竞发的出境游市场能否找到属于自己的道路?
  17. Pytorch显存分配机制与显存占用分析方法
  18. 阿里、美团、Oracle等大厂的Java虚拟机面试题集锦
  19. Unity实现AR扫描图片
  20. html文字居中对齐显示

热门文章

  1. Leetcode 中等:89.格雷编码
  2. 一句话解决汉诺塔(C语言递归)每日一练
  3. java培训第二十二天总结 线程
  4. ps -ef和ps -aux的区别
  5. centos无法识别NTFS格式的U盘解决办法
  6. requests模块
  7. 我把3个镜头手机拍的照片发微信群,哥们说,现在手机摄像头越来越多,我有一个扫码就够了...
  8. 禅道管理员admin密码登录失败,更改密码
  9. 图片转为pdf怎么弄?发送图片安全高效的格式
  10. Html5和CSS3开发指南学习