java实现消息队列以及延迟消息(队列DelayQueue)
1.java实现延迟消息(队列DelayQueue)
DelayQueue是一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。
缓存系统的设计:这里使用DelayQueue保存缓存元素的有效期,一个线程(生产者)设置失效实现循环添加消息,使用一个线程(消费者)循环查询
DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了
应用场景:
- 消息生产和消费有时间窗口要求,例如在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在 30 分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。如支付未完成,则关闭订单。如已完成支付则忽略。
- 通过消息触发一些定时任务,例如在某一固定时间点向用户发送提醒消息。
前提条件:放置在DelayQueue的元素需要实现Delayed接口,Delayed接口使对象成为延迟对象,它使存放在DelayQueue类中的对象具有了激活日期
CompareTo(Delayed o):Delayed接口继承了Comparable接口,因此有了这个方法。
getDelay(TimeUnit unit):这个方法返回到激活日期的剩余时间,时间单位由单位参数指定。
2.实现Delayed接口
package com.violet.Queue;import java.time.format.DateTimeFormatter;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;public class MessageData implements Delayed {private static final AtomicLong atomic = new AtomicLong(0);// 数据的失效时间点private final long time;// 序号private final long seqno;/*** @param deadline 数据失效时间点*/public MessageData(long deadline) {this.time = deadline;//序号自增类似于i++ ,这里使用AtomicLong实现原子操作this.seqno = atomic.getAndIncrement();}/*** 返回剩余有效时间** @param unit 时间单位*/@Overridepublic long getDelay(TimeUnit unit) {//查看是不是当这个时间到期时,消息被消费System.out.println(unit.convert(this.time - System.currentTimeMillis(), TimeUnit.NANOSECONDS));return unit.convert(this.time - System.currentTimeMillis(), TimeUnit.NANOSECONDS);}/*** 比较两个Delayed对象的大小, 比较顺序如下:* 1. 如果是对象本身, 返回0;* 2. 比较失效时间点, 先失效的返回-1,后失效的返回1;* 3. 比较元素序号, 序号小的返回-1, 否则返回1.* 4. 非Data类型元素, 比较剩余有效时间, 剩余有效时间小的返回-1,大的返回1,相同返回0*/@Overridepublic int compareTo(Delayed other) {if (other == this) // compare zero if same objectreturn 0;if (other instanceof MessageData) {MessageData x = (MessageData) other;// 优先比较失效时间long diff = this.time - x.time;if (diff < 0)return -1;else if (diff > 0)return 1;else if (this.seqno < x.seqno) // 剩余时间相同则比较序号return -1;elsereturn 1;}// 一般不会执行到此处,除非元素不是MessageData类型long diff = this.getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS);return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;}@Overridepublic String toString() {return "Data{" +"time=" + time +", seqno=" + seqno +"}, isValid=" + isValid();}private boolean isValid() {return this.getDelay(TimeUnit.NANOSECONDS) > 0;}}
3.生产者
package com.violet.Queue;import java.util.Date;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ThreadLocalRandom;public class Producer implements Runnable {private final DelayQueue<MessageData> queue;public Producer(DelayQueue<MessageData> queue) {this.queue = queue;}@Overridepublic void run() {while (true) {//选取系统当前时间+随机生成的时间 来设置消息失效时间long currentTime = System.currentTimeMillis();long validTime = ThreadLocalRandom.current().nextLong(1000L, 7000L);MessageData data = new MessageData(currentTime + validTime);queue.put(data);System.out.println(Thread.currentThread().getName() + ": put " + data);try {//为了效果显著这里将线程停的时间长一点Thread.sleep(1000000000);} catch (InterruptedException e) {e.printStackTrace();}}}
}
4.消费者
package com.violet.Queue;import java.util.concurrent.DelayQueue;public class Consumer implements Runnable {private final DelayQueue<MessageData> queue;public Consumer(DelayQueue<MessageData> queue) {this.queue = queue;}@Overridepublic void run() {while (true) {try {MessageData data = queue.take();System.out.println(Thread.currentThread().getName() + ": take " + data);Thread.yield();} catch (InterruptedException e) {e.printStackTrace();}}}
}
5.main方法
package com.violet.Queue;import java.util.concurrent.DelayQueue;public class Main {public static void main(String[] args) {DelayQueue<MessageData> queue = new DelayQueue<>();Thread c1 = new Thread(new Consumer(queue), "consumer");Thread p1 = new Thread(new Producer(queue), "producer");c1.start();p1.start();}
}
6.效果
java实现消息队列以及延迟消息(队列DelayQueue)相关推荐
- MQ延迟队列实现延迟消息
在开发中经常会遇到延时任务的需求,例如在12306购买车票,若生成订单30分钟未支付则自动取消:还有在线商城完成订单后48小时不评价 ,自动5星好评.像这类在某事件触发后一段时间内执行的需求任务我们称 ...
- 消息队列和延迟消息队列
应用场景 1.商品秒杀 短时间内出现爆发式的用户请求,如果不采取相关的措施,会导致服务器忙不过来,响应超时的问题,轻则会导致服务假死,重则会让服务器直接宕机. 这个时候加上了消息队列,服务器接收到用户 ...
- 发送延迟消息_微信延迟消息的发送方式
Part one. 如何使用微信来发送延迟消息呢? Part two. 之前一次偶然的机会,想要定时发送一个消息给朋友,于是产生了微信能否发送定时消息这个念头.去百度了一下,找到了方法.微信是可以定时 ...
- kafka消息消费有延迟_消息中间件选型分析---从Kafka与RabbitMQ的对比来看全局
有很多网友留言:公司要做消息中间件选型,该如何选?你觉得哪个比较好?消息选型的确是一个大论题,实则说来话长的事情又如何长话短说.对此笔者专门撰稿一篇内功心法:如何看待消息中间件的选型,不过这篇只表其意 ...
- RocketMQ延迟消息的代码实战及原理分析
RocketMQ简介 RocketMQ是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的.高可靠.万亿级容量.灵活可伸缩的消息发布与订阅服务. 它前身是MetaQ,是阿里基于Kafka ...
- 深入理解RocketMQ延迟消息
延迟消息是实际开发中一个非常有用的功能,本文第一部分从整体上介绍秒级精度延迟消息的实现思路,在第二部分结合RocketMQ的延迟消息实现,进行细致的讲解,点出关键部分的源码.第三步介绍延迟消息与消息重 ...
- RocketMQ源码分析之延迟消息
文章目录 前言 一.延迟消息 1.特点 2.使用场景 3.demo 二.发送延迟消息 三.broker端存储延迟消息 四.总结 1.延迟消息工作原理 2.延迟消息在消费者消费重试中的应用 前言 本篇文 ...
- 延迟消息的五种实现方案
生产者把消息发送到消息队列中以后,并不期望被立即消费,而是等待指定时间后才可以被消费者消费,这类消息通常被称为延迟消息.延迟消息的应用场景其实是非常的广泛,比如以下的场景: 网上直播授课时,在课程开始 ...
- RabbitMQ延迟消息场景分析以及实现两种方式(SpringBoot)
使用场景,不限于下面 用户下订单结束,如果用户未及时支付,后续需要取消订单,可以怎么做?定时任务可以做,但是不能接近实时或者消耗数据库性能太大. [数据库定时任务方案]:定时任务可以做到订单的状态的改 ...
最新文章
- UA OPTI512R 傅立叶光学导论15 2-D Fourier变换与Hankel变换
- JVM Class字节码之三-使用BCEL改变类属性
- Maven引入外部jar的几种方法
- WatchOS系统开发大全(3)-创建第一个WatchApp工程
- boost::iostreams::detail::path用法的测试程序
- C++实现successive approximation渐进法(附完整源码)
- Swift学习字符串、数组、字典
- 清零 css,css样式清零及常用类
- 关于计算机的英语作文八百字,小学英语作文800字(精选10篇)
- 辐流式重力浓缩池计算_注册考试重点!平流式、竖流式、辐流式、斜板式4大沉淀池构型...
- 怎么自动缩进_Python 的缩进是不是反人类的设计?
- “日薄西山”的摩托罗拉推出VR头显,这回靠谱吗?
- [PsTools]psexec.exe使用范例-运行远程电脑程序(exe、bat等)
- php万能表单制作教程,万能表单系统
- win7绕过开机密码
- 斑马旅游在千帆竞发的出境游市场能否找到属于自己的道路?
- Pytorch显存分配机制与显存占用分析方法
- 阿里、美团、Oracle等大厂的Java虚拟机面试题集锦
- Unity实现AR扫描图片
- html文字居中对齐显示