java中延迟任务的处理方式
1、利用延迟队列
延时队列,第一他是个队列,所以具有对列功能第二就是延时,这就是延时对列,功能也就是将任务放在该延时对列中,只有到了延时时刻才能从该延时对列中获取任务否则获取不到……
应用场景比较多,比如延时1分钟发短信,延时1分钟再次执行等,下面先看看延时队列demo之后再看延时队列在项目中的使用:
简单的延时队列要有三部分:第一实现了Delayed接口的消息体、第二消费消息的消费者、第三存放消息的延时队列,那下面就来看看延时队列demo。
一、消息体
package com.delqueue; import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit; /** * 消息体定义 实现Delayed接口就是实现两个方法即compareTo 和 getDelay最重要的就是getDelay方法,这个方法用来判断是否到期…… */
public class Message implements Delayed { private int id; private String body; // 消息内容 private long excuteTime;// 延迟时长,这个是必须的属性因为要按照这个判断延时时长。 public int getId() { return id; } public String getBody() { return body; } public long getExcuteTime() { return excuteTime; } public Message(int id, String body, long delayTime) { this.id = id; this.body = body; this.excuteTime = TimeUnit.NANOSECONDS.convert(delayTime, TimeUnit.MILLISECONDS) + System.nanoTime(); } // 自定义实现比较方法返回 1 0 -1三个参数 @Override public int compareTo(Delayed delayed) { Message msg = (Message) delayed; return Integer.valueOf(this.id) > Integer.valueOf(msg.id) ? 1 : (Integer.valueOf(this.id) < Integer.valueOf(msg.id) ? -1 : 0); } // 延迟任务是否到时就是按照这个方法判断如果返回的是负数则说明到期否则还没到期 @Override public long getDelay(TimeUnit unit) { return unit.convert(this.excuteTime - System.nanoTime(), TimeUnit.NANOSECONDS); }
}
二、消息消费者
package com.delqueue; import java.util.concurrent.DelayQueue; public class Consumer implements Runnable { // 延时队列 ,消费者从其中获取消息进行消费 private DelayQueue<Message> queue; public Consumer(DelayQueue<Message> queue) { this.queue = queue; } @Override public void run() { while (true) { try { Message take = queue.take(); System.out.println("消费消息id:" + take.getId() + " 消息体:" + take.getBody()); } catch (InterruptedException e) { e.printStackTrace(); } } }
}
三、延时队列
package com.delqueue; import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; public class DelayQueueTest { public static void main(String[] args) { // 创建延时队列 DelayQueue<Message> queue = new DelayQueue<Message>(); // 添加延时消息,m1 延时3s Message m1 = new Message(1, "world", 3000); // 添加延时消息,m2 延时10s Message m2 = new Message(2, "hello", 10000); //将延时消息放到延时队列中 queue.offer(m2); queue.offer(m1); // 启动消费线程 消费添加到延时队列中的消息,前提是任务到了延期时间 ExecutorService exec = Executors.newFixedThreadPool(1); exec.execute(new Consumer(queue)); exec.shutdown(); }
}
将消息体放入延迟队列中,在启动消费者线程去消费延迟队列中的消息,如果延迟队列中的消息到了延迟时间则可以从中取出消息否则无法取出消息也就无法消费。
这就是延迟队列demo,下面我们来说说在真实环境下的使用。
使用场景描述:
在打车软件中对订单进行派单的流程,当有订单的时候给该订单筛选司机,然后给当订单绑定司机,但是有时运气没那么好,订单进来后第一次没有筛选到合适的司机,但我们也不能就此结束派单,而是将该订单的信息放到延时队列中过个2秒钟在进行一次,其实这个2秒钟就是一个延迟,所以这里我们就可以使用延时队列来实现……
下面看看简单的流程图:
下面来看看具体代码实现:
在项目中有如下几个类:第一 、任务类 第二、按照任务类组装的消息体类 第三、延迟队列管理类
任务类即执行筛选司机、绑单、push消息的任务类
package com.test.delayqueue;
/** * 具体执行相关业务的业务类 * @author whd * @date 2017年9月25日 上午12:49:32 */
public class DelayOrderWorker implements Runnable { @Override public void run() { // TODO Auto-generated method stub //相关业务逻辑处理 System.out.println(Thread.currentThread().getName()+" do something ……"); }
}
消息体类,在延时队列中这个实现了Delayed接口的消息类是比不可少的,实现接口时有一个getDelay(TimeUnit unit)方法,这个方法就是判断是否到期的
这里定义的是一个泛型类,所以可以将我们上面的任务类作为其中的task,这样就将任务类分装成了一个消息体
package com.test.delayqueue; import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit; /** * 延时队列中的消息体将任务封装为消息体 * * @author whd * @date 2017年9月25日 上午12:48:30 * @param <T> */
public class DelayOrderTask<T extends Runnable> implements Delayed { private final long time; private final T task; // 任务类,也就是之前定义的任务类 /** * @param timeout * 超时时间(秒) * @param task * 任务 */ public DelayOrderTask(long timeout, T task) { this.time = System.nanoTime() + timeout; this.task = task; } @Override public int compareTo(Delayed o) { // TODO Auto-generated method stub DelayOrderTask other = (DelayOrderTask) o; long diff = time - other.time; if (diff > 0) { return 1; } else if (diff < 0) { return -1; } else { return 0; } } @Override public long getDelay(TimeUnit unit) { // TODO Auto-generated method stub return unit.convert(this.time - System.nanoTime(), TimeUnit.NANOSECONDS); } @Override public int hashCode() { return task.hashCode(); } public T getTask() { return task; }
}
延时队列管理类,这个类主要就是将任务类封装成消息并并添加到延时队列中,以及轮询延时队列从中取出到时的消息体,在获取任务类放到线程池中执行任务
package com.test.delayqueue; import java.util.Map;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; /** * 延时队列管理类,用来添加任务、执行任务 * * @author whd * @date 2017年9月25日 上午12:44:59 */
public class DelayOrderQueueManager { private final static int DEFAULT_THREAD_NUM = 5; private static int thread_num = DEFAULT_THREAD_NUM; // 固定大小线程池 private ExecutorService executor; // 守护线程 private Thread daemonThread; // 延时队列 private DelayQueue<DelayOrderTask<?>> delayQueue; private static final AtomicLong atomic = new AtomicLong(0); private final long n = 1; private static DelayOrderQueueManager instance = new DelayOrderQueueManager(); private DelayOrderQueueManager() { executor = Executors.newFixedThreadPool(thread_num); delayQueue = new DelayQueue<>(); init(); } public static DelayOrderQueueManager getInstance() { return instance; } /** * 初始化 */ public void init() { daemonThread = new Thread(() -> { execute(); }); daemonThread.setName("DelayQueueMonitor"); daemonThread.start(); } private void execute() { while (true) { Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces(); System.out.println("当前存活线程数量:" + map.size()); int taskNum = delayQueue.size(); System.out.println("当前延时任务数量:" + taskNum); try { // 从延时队列中获取任务 DelayOrderTask<?> delayOrderTask = delayQueue.take(); if (delayOrderTask != null) { Runnable task = delayOrderTask.getTask(); if (null == task) { continue; } // 提交到线程池执行task executor.execute(task); } } catch (Exception e) { e.printStackTrace(); } } } /** * 添加任务 * * @param task * @param time * 延时时间 * @param unit * 时间单位 */ public void put(Runnable task, long time, TimeUnit unit) { // 获取延时时间 long timeout = TimeUnit.NANOSECONDS.convert(time, unit); // 将任务封装成实现Delayed接口的消息体 DelayOrderTask<?> delayOrder = new DelayOrderTask<>(timeout, task); // 将消息体放到延时队列中 delayQueue.put(delayOrder); } /** * 删除任务 * * @param task * @return */ public boolean removeTask(DelayOrderTask task) { return delayQueue.remove(task); }
}
测试类
package com.delqueue; import java.util.concurrent.TimeUnit; import com.test.delayqueue.DelayOrderQueueManager;
import com.test.delayqueue.DelayOrderWorker; public class Test { public static void main(String[] args) { DelayOrderWorker work1 = new DelayOrderWorker();// 任务1 DelayOrderWorker work2 = new DelayOrderWorker();// 任务2 DelayOrderWorker work3 = new DelayOrderWorker();// 任务3 // 延迟队列管理类,将任务转化消息体并将消息体放入延迟对列中等待执行 DelayOrderQueueManager manager = DelayOrderQueueManager.getInstance(); manager.put(work1, 3000, TimeUnit.MILLISECONDS); manager.put(work2, 6000, TimeUnit.MILLISECONDS); manager.put(work3, 9000, TimeUnit.MILLISECONDS); } }
OK 这就是项目中的具体使用情况,当然具体内容被忽略,整体框架就是这样,还有这里使用java的延时队列但是这种方式是有问题的如果如果down机则会出现任务丢失,所以也可以考虑使用mq、redis来实现
2、mq实现延迟消息
在rabbitmq 3.5.7及以上的版本提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列功能。同时插件依赖Erlang/OPT 18.0及以上。
插件源码地址:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
插件下载地址:
https://bintray.com/rabbitmq/community-plugins/rabbitmq_delayed_message_exchange
安装:
进入插件安装目录
{rabbitmq-server}/plugins/(可以查看一下当前已存在的插件)
下载插件
rabbitmq_delayed_message_exchangewget https://bintray.com/rabbitmq/community-plugins/download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1.ez
(如果下载的文件名称不规则就手动重命名一下如:
rabbitmq_delayed_message_exchange-0.0.1.ez)
启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
关闭插件
rabbitmq-plugins disable rabbitmq_delayed_message_exchange
插件使用
通过声明一个x-delayed-message类型的exchange来使用delayed-messaging特性
x-delayed-message是插件提供的类型,并不是rabbitmq本身的,发送消息的时候通过在header添加”x-delay”参数来控制消息的延时时间
直接在maven工程的pom.xml文件中加入
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Spring Boot的版本我使用的是 2.0.1.RELEASE .
接下来在 application.properties 文件中加入redis配置:
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
定义ConnectionFactory和RabbitTemplate
也很简单,代码如下:
package com.mq.rabbitmq;import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitMqConfig {private String host;private int port;private String userName;private String password;@Beanpublic ConnectionFactory connectionFactory() {CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(host,port);cachingConnectionFactory.setUsername(userName);cachingConnectionFactory.setPassword(password);cachingConnectionFactory.setVirtualHost("/");cachingConnectionFactory.setPublisherConfirms(true);return cachingConnectionFactory;}@Beanpublic RabbitTemplate rabbitTemplate() {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());return rabbitTemplate;}public String getHost() {return host;}public void setHost(String host) {this.host = host;}public int getPort() {return port;}public void setPort(int port) {this.port = port;}public String getUserName() {return userName;}public void setUserName(String userName) {this.userName = userName;}public String getPassword() {return password;}public void setPassword(String password) {this.password = password;}
}
Exchange和Queue配置
package com.mq.rabbitmq;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class QueueConfig {@Beanpublic CustomExchange delayExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");return new CustomExchange("test_exchange", "x-delayed-message",true, false,args);}@Beanpublic Queue queue() {Queue queue = new Queue("test_queue_1", true);return queue;}@Beanpublic Binding binding() {return BindingBuilder.bind(queue()).to(delayExchange()).with("test_queue_1").noargs();}
}
这里要特别注意的是,使用的是 CustomExchange ,不是 DirectExchange ,另外 CustomExchange 的类型必须是 x-delayed-message 。
实现消息发送
package com.mq.rabbitmq;import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.text.SimpleDateFormat;
import java.util.Date;@Service
public class MessageServiceImpl {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg(String queueName,String msg) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("消息发送时间:"+sdf.format(new Date()));rabbitTemplate.convertAndSend("test_exchange", queueName, msg, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setHeader("x-delay",3000);return message;}});}
}
注意在发送的时候,必须加上一个header
x-delay
在这里我设置的延迟时间是3秒。
消息消费者
package com.mq.rabbitmq;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;
import java.util.Date;
@Component
public class MessageReceiver {@RabbitListener(queues = "test_queue_1")public void receive(String msg) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("消息接收时间:"+sdf.format(new Date()));System.out.println("接收到的消息:"+msg);}
}
运行Spring Boot程序和发送消息
直接在main方法里运行Spring Boot程序,Spring Boot会自动解析 MessageReceiver 类的。
接下来只需要用Junit运行一下发送消息的接口即可。
package com.mq.rabbitmq;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqApplicationTests {@Autowiredprivate MessageServiceImpl messageService;@Testpublic void send() {messageService.sendMsg("test_queue_1","hello i am delay msg");}
}
运行完后,可以看到如下信息:
消息发送时间:2018-05-03 12:44:53
3秒钟后,Spring Boot控制台会输出:
消息接收时间:2018-05-03 12:44:56
接收到的消息:hello i am delay msg
java中延迟任务的处理方式相关推荐
- opengl中的Floatbuffer和IntBuffer与java中数据的存储方式不同的解决方法,编辑一个自己的BufferUtil工具类
opengl中的Floatbuffer和IntBuffer与java中数据的存储方式不同的解决方法,编辑一个自己的BufferUtil工具类 参考文章: (1)opengl中的Floatbuffer和 ...
- JS 和 Java 中URL特殊字符编码方式
前几天遇到url特殊字符编码的问题,在这里整理一下: JavaScript 1. 编码 escape(String) 其中某些字符被替换成了十六进制的转义序列. 解码 unescape(String ...
- java中的异常处理代码,java_深入剖析Java中的各种异常处理方式,1. 调试追踪代码:public s - phpStudy...
深入剖析Java中的各种异常处理方式 1. 调试追踪代码: public static void enterTryMethod() { System.out.println("enter a ...
- java数组初始化的方式,java中初始化数组的方式有几种
java中初始化数组的方式有几种 发布时间:2020-06-01 16:12:45 来源:亿速云 阅读:153 作者:鸽子 三种初始化方式: 1.静态初始化:创建+赋值 2.动态初始化:先创建再赋值 ...
- java中常见跳出循环的方式总结
java中常见跳出循环的方式一般有两种,一种是常用的break,continue,return方式:另一种是循环标记的方式. 方式一:break,continue,return方式 案例: break ...
- 关于Java中的Map遍历方式比较
最近在看阿里巴巴最新版的Java开发手册,里面的内容还是很值得去阅读学习,下面是我对Java中Map的遍历方式的比较和总结: 第一种:使用entrySet()的形式来遍历,也是效率高,推荐使用的一种遍 ...
- java中的Map遍历方式总结
java中的Map遍历方式总结 import java.util.HashMap; import java.util.Iterator; import java.util.Map;public cla ...
- java 中线程的创建方式
如果说在java中创建线程的有几种方式的话,归根结底我认为就两种方式 1.继承Thread类,重写run方法 继承Thread类,如下图重写了run()方法 通过start()方法来启动线程 最后的输 ...
- java中的排序方法,Java中的排序比较方式:自然排序和比较器排序
这里所说到的Java中的排序并不是指插入排序.希尔排序.归并排序等具体的排序算法.而是指执行这些排序算法时,比较两个对象"大小"的比较操作.我们很容易理解整型的 i>j 这样 ...
最新文章
- linux上安装mysql,tomcat,jdk
- 零点起飞学mysql视频_零点起飞学MySQL
- Spring boot使用Spring Security和OAuth2保护REST接口
- 【Android 插件化】Hook 插件化框架 ( Hook Activity 启动流程 | Hook 点分析 )
- 疫情期间在公共场所要全程佩戴口罩,不要抱有侥幸心理
- 权限柜作用_超市条码寄存柜使用要点
- 【转】jquery ui中文说明(使用方法)
- 存储知识学习之--IP网络存储iSCSI的概念与工作原理
- 学生档案信息管理案例
- hive sql 正则表达式
- HTML5+CSS3 从入门到精通(2)
- java解压服务器文件夹,java解压7z文件
- 计算机 标量,标量关系
- TwinCAT 3 使用XML-server
- [APIO2014]序列分割
- OSPF认证、虚链路、过滤、track
- Ubuntu虚拟机安装
- 搭建GTK+开发环境
- 二、VB.NET实现给图片添加文字水印
- 暨南大学计算机技术复试名单复试,关于2020年暨南大学硕士生复试资格线及复试名单的通知来...
热门文章
- html网页钩子,HTML5中的meta标签 和 IE浏览器能识别的钩子
- php模块下载,douphp下载中心模块修改版
- spring框架文档学习(包会)
- springBoot后台发送内容至邮箱
- chrome session丢失_一文带你彻底读懂Cookie、Session、Token到底是什么
- Linux 利用yum源安装subversion(svn)客户端
- Linux 分割、合并文件
- 设计模式(三)--适配器模式
- java闪屏怎么制作,Java Swing创建自定义闪屏:在闪屏下画进度条(一)
- Windows环境下MySQL 8.0 的安装、配置与卸载