1、利用延迟队列

延时队列,第一他是个队列,所以具有对列功能第二就是延时,这就是延时对列,功能也就是将任务放在该延时对列中,只有到了延时时刻才能从该延时对列中获取任务否则获取不到……

应用场景比较多,比如延时1分钟发短信,延时1分钟再次执行等,下面先看看延时队列demo之后再看延时队列在项目中的使用:

简单的延时队列要有三部分:第一实现了Delayed接口的消息体、第二消费消息的消费者、第三存放消息的延时队列,那下面就来看看延时队列demo。

一、消息体

packagecom.delqueue;importjava.util.concurrent.Delayed;importjava.util.concurrent.TimeUnit;/*** 消息体定义 实现Delayed接口就是实现两个方法即compareTo 和 getDelay最重要的就是getDelay方法,这个方法用来判断是否到期……*/

public class Message implementsDelayed {private intid;private String body; //消息内容

private long excuteTime;//延迟时长,这个是必须的属性因为要按照这个判断延时时长。

public intgetId() {returnid;

}publicString getBody() {returnbody;

}public longgetExcuteTime() {returnexcuteTime;

}public Message(int id, String body, longdelayTime) {this.id =id;this.body =body;this.excuteTime = TimeUnit.NANOSECONDS.convert(delayTime, TimeUnit.MILLISECONDS) +System.nanoTime();

}//自定义实现比较方法返回 1 0 -1三个参数

@Overridepublic intcompareTo(Delayed delayed) {

Message msg=(Message) delayed;return Integer.valueOf(this.id) > Integer.valueOf(msg.id) ? 1: (Integer.valueOf(this.id) < Integer.valueOf(msg.id) ? -1 : 0);

}//延迟任务是否到时就是按照这个方法判断如果返回的是负数则说明到期否则还没到期

@Overridepublic longgetDelay(TimeUnit unit) {return unit.convert(this.excuteTime -System.nanoTime(), TimeUnit.NANOSECONDS);

}

}

二、消息消费者

packagecom.delqueue;importjava.util.concurrent.DelayQueue;public class Consumer implementsRunnable {//延时队列 ,消费者从其中获取消息进行消费

private DelayQueuequeue;public Consumer(DelayQueuequeue) {this.queue =queue;

}

@Overridepublic voidrun() {while (true) {try{

Message take=queue.take();

System.out.println("消费消息id:" + take.getId() + " 消息体:" +take.getBody());

}catch(InterruptedException e) {

e.printStackTrace();

}

}

}

}

三、延时队列

packagecom.delqueue;importjava.util.concurrent.DelayQueue;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Executors;public classDelayQueueTest {public static voidmain(String[] args) {//创建延时队列

DelayQueue queue = new DelayQueue();//添加延时消息,m1 延时3s

Message m1 = new Message(1, "world", 3000);//添加延时消息,m2 延时10s

Message m2 = new Message(2, "hello", 10000);//将延时消息放到延时队列中

queue.offer(m2);

queue.offer(m1);//启动消费线程 消费添加到延时队列中的消息,前提是任务到了延期时间

ExecutorService exec = Executors.newFixedThreadPool(1);

exec.execute(newConsumer(queue));

exec.shutdown();

}

}

将消息体放入延迟队列中,在启动消费者线程去消费延迟队列中的消息,如果延迟队列中的消息到了延迟时间则可以从中取出消息否则无法取出消息也就无法消费。

这就是延迟队列demo,下面我们来说说在真实环境下的使用。

使用场景描述:

在打车软件中对订单进行派单的流程,当有订单的时候给该订单筛选司机,然后给当订单绑定司机,但是有时运气没那么好,订单进来后第一次没有筛选到合适的司机,但我们也不能就此结束派单,而是将该订单的信息放到延时队列中过个2秒钟在进行一次,其实这个2秒钟就是一个延迟,所以这里我们就可以使用延时队列来实现……

下面看看简单的流程图:

下面来看看具体代码实现:

在项目中有如下几个类:第一 、任务类   第二、按照任务类组装的消息体类  第三、延迟队列管理类

任务类即执行筛选司机、绑单、push消息的任务类

packagecom.test.delayqueue;/*** 具体执行相关业务的业务类

*@authorwhd

* @date 2017年9月25日 上午12:49:32*/

public class DelayOrderWorker implementsRunnable {

@Overridepublic voidrun() {//TODO Auto-generated method stub//相关业务逻辑处理

System.out.println(Thread.currentThread().getName()+" do something ……");

}

}

消息体类,在延时队列中这个实现了Delayed接口的消息类是比不可少的,实现接口时有一个getDelay(TimeUnit unit)方法,这个方法就是判断是否到期的

这里定义的是一个泛型类,所以可以将我们上面的任务类作为其中的task,这样就将任务类分装成了一个消息体

packagecom.test.delayqueue;importjava.util.concurrent.Delayed;importjava.util.concurrent.TimeUnit;/*** 延时队列中的消息体将任务封装为消息体

*

*@authorwhd

* @date 2017年9月25日 上午12:48:30

*@param*/

public class DelayOrderTask implementsDelayed {private final longtime;private final T task; //任务类,也就是之前定义的任务类

/***@paramtimeout

* 超时时间(秒)

*@paramtask

* 任务*/

public DelayOrderTask(longtimeout, T task) {this.time = System.nanoTime() +timeout;this.task =task;

}

@Overridepublic intcompareTo(Delayed o) {//TODO Auto-generated method stub

DelayOrderTask other =(DelayOrderTask) o;long diff = time -other.time;if (diff > 0) {return 1;

}else if (diff < 0) {return -1;

}else{return 0;

}

}

@Overridepublic longgetDelay(TimeUnit unit) {//TODO Auto-generated method stub

return unit.convert(this.time -System.nanoTime(), TimeUnit.NANOSECONDS);

}

@Overridepublic inthashCode() {returntask.hashCode();

}publicT getTask() {returntask;

}

}

延时队列管理类,这个类主要就是将任务类封装成消息并并添加到延时队列中,以及轮询延时队列从中取出到时的消息体,在获取任务类放到线程池中执行任务

packagecom.test.delayqueue;importjava.util.Map;importjava.util.concurrent.DelayQueue;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Executors;importjava.util.concurrent.TimeUnit;importjava.util.concurrent.atomic.AtomicLong;/*** 延时队列管理类,用来添加任务、执行任务

*

*@authorwhd

* @date 2017年9月25日 上午12:44:59*/

public classDelayOrderQueueManager {private final static int DEFAULT_THREAD_NUM = 5;private static int thread_num =DEFAULT_THREAD_NUM;//固定大小线程池

privateExecutorService executor;//守护线程

privateThread daemonThread;//延时队列

private DelayQueue>delayQueue;private static final AtomicLong atomic = new AtomicLong(0);private final long n = 1;private static DelayOrderQueueManager instance = newDelayOrderQueueManager();privateDelayOrderQueueManager() {

executor=Executors.newFixedThreadPool(thread_num);

delayQueue= new DelayQueue<>();

init();

}public staticDelayOrderQueueManager getInstance() {returninstance;

}/*** 初始化*/

public voidinit() {

daemonThread= new Thread(() ->{

execute();

});

daemonThread.setName("DelayQueueMonitor");

daemonThread.start();

}private voidexecute() {while (true) {

Map map =Thread.getAllStackTraces();

System.out.println("当前存活线程数量:" +map.size());int taskNum =delayQueue.size();

System.out.println("当前延时任务数量:" +taskNum);try{//从延时队列中获取任务

DelayOrderTask> delayOrderTask =delayQueue.take();if (delayOrderTask != null) {

Runnable task=delayOrderTask.getTask();if (null ==task) {continue;

}//提交到线程池执行task

executor.execute(task);

}

}catch(Exception e) {

e.printStackTrace();

}

}

}/*** 添加任务

*

*@paramtask

*@paramtime

* 延时时间

*@paramunit

* 时间单位*/

public void put(Runnable task, longtime, TimeUnit unit) {//获取延时时间

long timeout =TimeUnit.NANOSECONDS.convert(time, unit);//将任务封装成实现Delayed接口的消息体

DelayOrderTask> delayOrder = new DelayOrderTask<>(timeout, task);//将消息体放到延时队列中

delayQueue.put(delayOrder);

}/*** 删除任务

*

*@paramtask

*@return

*/

public booleanremoveTask(DelayOrderTask task) {returndelayQueue.remove(task);

}

}

测试类

packagecom.delqueue;importjava.util.concurrent.TimeUnit;importcom.test.delayqueue.DelayOrderQueueManager;importcom.test.delayqueue.DelayOrderWorker;public classTest {public static voidmain(String[] args) {

DelayOrderWorker work1= new DelayOrderWorker();//任务1

DelayOrderWorker work2 = new DelayOrderWorker();//任务2

DelayOrderWorker work3 = new DelayOrderWorker();//任务3//延迟队列管理类,将任务转化消息体并将消息体放入延迟对列中等待执行

DelayOrderQueueManager manager =DelayOrderQueueManager.getInstance();

manager.put(work1,3000, TimeUnit.MILLISECONDS);

manager.put(work2,6000, TimeUnit.MILLISECONDS);

manager.put(work3,9000, TimeUnit.MILLISECONDS);

}

}

OK 这就是项目中的具体使用情况,当然具体内容被忽略,整体框架就是这样,还有这里使用java的延时队列但是这种方式是有问题的如果如果down机则会出现任务丢失,所以也可以考虑使用mq、redis来实现

2、mq实现延迟消息

在rabbitmq 3.5.7及以上的版本提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列功能。同时插件依赖Erlang/OPT 18.0及以上。

插件源码地址:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

安装:

进入插件安装目录

{rabbitmq-server}/plugins/(可以查看一下当前已存在的插件)

下载插件

rabbitmq_delayed_message_exchange

wget https://bintray.com/rabbitmq/community-plugins/download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1.ez

(如果下载的文件名称不规则就手动重命名一下如:

rabbitmq_delayed_message_exchange-0.0.1.ez)

启用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

关闭插件

rabbitmq-plugins disable rabbitmq_delayed_message_exchange

插件使用

通过声明一个x-delayed-message类型的exchange来使用delayed-messaging特性

x-delayed-message是插件提供的类型,并不是rabbitmq本身的,发送消息的时候通过在header添加”x-delay”参数来控制消息的延时时间

直接在maven工程的pom.xml文件中加入

org.springframework.boot

spring-boot-starter-amqp

Spring Boot的版本我使用的是 2.0.1.RELEASE .

接下来在 application.properties 文件中加入redis配置:

spring.rabbitmq.host=127.0.0.1

spring.rabbitmq.port=5672

spring.rabbitmq.username=guest

spring.rabbitmq.password=guest

定义ConnectionFactory和RabbitTemplate

也很简单,代码如下:

packagecom.mq.rabbitmq;importorg.springframework.amqp.rabbit.connection.CachingConnectionFactory;importorg.springframework.amqp.rabbit.connection.ConnectionFactory;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.boot.context.properties.ConfigurationProperties;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;

@Configuration

@ConfigurationProperties(prefix= "spring.rabbitmq")public classRabbitMqConfig {privateString host;private intport;privateString userName;privateString password;

@BeanpublicConnectionFactory connectionFactory() {

CachingConnectionFactory cachingConnectionFactory= newCachingConnectionFactory(host,port);

cachingConnectionFactory.setUsername(userName);

cachingConnectionFactory.setPassword(password);

cachingConnectionFactory.setVirtualHost("/");

cachingConnectionFactory.setPublisherConfirms(true);returncachingConnectionFactory;

}

@BeanpublicRabbitTemplate rabbitTemplate() {

RabbitTemplate rabbitTemplate= newRabbitTemplate(connectionFactory());returnrabbitTemplate;

}publicString getHost() {returnhost;

}public voidsetHost(String host) {this.host =host;

}public intgetPort() {returnport;

}public void setPort(intport) {this.port =port;

}publicString getUserName() {returnuserName;

}public voidsetUserName(String userName) {this.userName =userName;

}publicString getPassword() {returnpassword;

}public voidsetPassword(String password) {this.password =password;

}

}

Exchange和Queue配置

packagecom.mq.rabbitmq;import org.springframework.amqp.core.*;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importjava.util.HashMap;importjava.util.Map;

@Configurationpublic classQueueConfig {

@BeanpublicCustomExchange delayExchange() {

Map args = new HashMap<>();

args.put("x-delayed-type", "direct");return new CustomExchange("test_exchange", "x-delayed-message",true, false,args);

}

@BeanpublicQueue queue() {

Queue queue= new Queue("test_queue_1", true);returnqueue;

}

@BeanpublicBinding binding() {return BindingBuilder.bind(queue()).to(delayExchange()).with("test_queue_1").noargs();

}

}

这里要特别注意的是,使用的是 CustomExchange ,不是 DirectExchange ,另外 CustomExchange 的类型必须是 x-delayed-message 。

实现消息发送

packagecom.mq.rabbitmq;importorg.springframework.amqp.AmqpException;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.core.MessagePostProcessor;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Service;importjava.text.SimpleDateFormat;importjava.util.Date;

@Servicepublic classMessageServiceImpl {

@AutowiredprivateRabbitTemplate rabbitTemplate;public voidsendMsg(String queueName,String msg) {

SimpleDateFormat sdf= new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

System.out.println("消息发送时间:"+sdf.format(newDate()));

rabbitTemplate.convertAndSend("test_exchange", queueName, msg, newMessagePostProcessor() {

@Overridepublic Message postProcessMessage(Message message) throwsAmqpException {

message.getMessageProperties().setHeader("x-delay",3000);returnmessage;

}

});

}

}

注意在发送的时候,必须加上一个header

x-delay

在这里我设置的延迟时间是3秒。

消息消费者

packagecom.mq.rabbitmq;importorg.springframework.amqp.rabbit.annotation.RabbitHandler;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;importjava.text.SimpleDateFormat;importjava.util.Date;

@Componentpublic classMessageReceiver {

@RabbitListener(queues= "test_queue_1")public voidreceive(String msg) {

SimpleDateFormat sdf= new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

System.out.println("消息接收时间:"+sdf.format(newDate()));

System.out.println("接收到的消息:"+msg);

}

}

运行Spring Boot程序和发送消息

直接在main方法里运行Spring Boot程序,Spring Boot会自动解析 MessageReceiver 类的。

接下来只需要用Junit运行一下发送消息的接口即可。

packagecom.mq.rabbitmq;importorg.junit.Test;importorg.junit.runner.RunWith;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;importorg.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)

@SpringBootTestpublic classRabbitmqApplicationTests {

@AutowiredprivateMessageServiceImpl messageService;

@Testpublic voidsend() {

messageService.sendMsg("test_queue_1","hello i am delay msg");

}

}

运行完后,可以看到如下信息:

消息发送时间:2018-05-03 12:44:533秒钟后,Spring Boot控制台会输出:

消息接收时间:2018-05-03 12:44:56接收到的消息:hello i am delay msg

Java如何解决mysql读写延迟_java中延迟任务的处理方式相关推荐

  1. java窗口向mysql加信息_Java中如何实现向DBC方式向表中添加数据

    原标题:Java中如何实现向DBC方式向表中添加数据 Java中如何实现向DBC方式向表中添加数据 Java程序JDBC方式向数据库的表添加记录的步骤: 1.新建工程: Java Project 2. ...

  2. mysql查询字段纯字母_解决MySQL之中一个字段中无法精准的查询多语言语言字母的问题...

    解决MySQL之中一个字段中无法精准的查询多语言语言字母的问题 解决MySQL之中一个字段中无法精准的查询多语言语言字母的问题 目录 1.使用场景 2.实现过程及展示结果 2.1.修改字段排序规则为u ...

  3. 解决mysql的utf8编码中生僻字写入时Incorrect string value错误

    http://hi.baidu.com/cnkarl/item/e18214e8ba7ce3c6bbf37ddd ------------------------------------------- ...

  4. 解决Mysql读写分离数据延迟

    MySQL的主从同步机制非常方便的解决了高并发读的应用需求,给Web方面开发带来了极大的便利.但这种方式有个比较大的缺陷在于MySQL的同步机制是依赖Slave主动向Master发请求来获取数据的,而 ...

  5. java读文件几种方式_java中读取文件的方式有哪几种

    java中读取文件的方式有哪几种 发布时间:2020-06-19 13:36:48 来源:亿速云 阅读:135 作者:鸽子 读取文件有多种方式,基于传统的输入流方式或基于nio的Buffer缓冲对象和 ...

  6. JAVA中初始化线程的两种方法_java中最简单的方式新起一个线程

    启动一个线程 在一个方法中启动一个线程,有两种方法 第一种是让类实现Runable接口,这样的话编译器就会提示你实现里面的未实现的方法(就是run方法) 第二种是,现在方法中new一个线程,然后直接调 ...

  7. java中文件读取方式的顺序_java中读取文件的方式

    java中读取文件的方式 经常遇到java中读取文件的方式,有时候需要指定编码,有时候不需要指定编码,被搞的挺晕的,抽时间整理了一下java读取文件的方式,主要是对字符型的处理,二进制的暂时不考虑. ...

  8. Java生成随机数原理_Java中随机数的产生方式与原理详解

    Java中随机数的产生方式与原理 查阅随机数相关资料,特做整理 首先说一下java中产生随机数的几种方式 在j2se中我们可以使用Math.random()方法来产生一个随机数,这个产生的随机数是0- ...

  9. java什么是栈和堆_JAVA中的栈和堆

    JAVA在程序运行时,在内存中划分5片空间进行数据的存储.分别是:1:寄存器.2:本地方法区.3:方法区.4:栈.5:堆. 基本,栈stack和堆heap这两个概念很重要,不了解清楚,后面就不用学了. ...

最新文章

  1. 领结婚证了,新的人生开始了!
  2. UA PHYS515A 电磁理论V 电磁波与辐射1 电磁波的方程
  3. You must install pydot and graphviz for plotmodel to work报错如何处理
  4. 写通俗易懂代码-用卫语句替代嵌套条件表达式
  5. java邮件程序实例_java 发送邮件简单实例
  6. 基于阿里云服务器+wordpress构建自己的网站(全过程系列,无需任何编程知识)
  7. vi单文件操作常用命令
  8. 智能(个性化)推荐系统全流程落地实施方案
  9. 费马定理、罗尔中值定理、零点存在定理、拉格朗日中值定理、
  10. 京东颜色html,京东m.jd站点静态页实现(首页)H5
  11. php 查询每个一号,SPOT系列卫星参数一览表 - 高分一号、高分二号卫星查询遥感数据购买 - 新闻资讯 - 遥感卫星影像数据查询中心-北京揽宇方圆-购买高分卫星影像...
  12. win10红警遇到的各种问题
  13. 苹果收购公司,为什么总是低调而高效---转自百度新闻|DTCHAT
  14. U3D AudioSource 完整音效截取部分
  15. ubuntu好用的输入法googlepinyin
  16. 计算机网速单位是什么,计算机存储单位和网络网速单位
  17. Java强、软、弱、虚四大引用(附代码示例)
  18. 在J2EE项目中集成快钱支付接口
  19. wordpress仿站笔记
  20. DOS 用ren命令批量修改文件后缀名

热门文章

  1. java sqlserver 2000_谁能救救我啊,关于JAVA连接SQLserver2000
  2. Python类的自定义属性访问及动态属性设置
  3. Python:给定一个不超过5位的正整数,判断有几位
  4. android 长按缩放拖动_十年Android之路面试2000人,面试准备+内部泄露核心题(中高级)...
  5. opencv cv2.LUT()(使用查找表中的值填充输出数组)
  6. 失落城堡 各种颜色药水、道具效果
  7. TCP释放连接后实现端口的立即复用
  8. tensorflow 测试 cuda 是否安装成功,测试代码环境
  9. 随e行安全层在与远程计算机初始化,g3随e行怎么用_g3随e行怎么安装_随e行wlan无法登陆...
  10. SpringBoot @Valid各种注解使用说明