ScheduledThreadPoolExecutor 线程池

链接: ScheduledThreadPoolExecutor.


import java.util.concurrent.Future;public class Entity {/*** 订单到期时间*/private String orderExpirationTime;/*** 定时器Future*/private Future future;/*** @param orderExpirationTime 订单到期时间* @param future 定时器*/public Entity(String orderExpirationTime, Future future) {this.orderExpirationTime = orderExpirationTime;this.future = future;}/*** 获取值*/public String getOrderExpirationTime() {return orderExpirationTime;}/*** 获取Future对象*/public Future getFuture() {return future;}
}
import cn.hutool.core.date.DateField;
import cn.hutool.core.date.DateUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;import java.util.Date;
import java.util.concurrent.*;/*** 取消订单定时器缓存*/
public class CancelOrderTimer {/*** key为orderNumber*/private final static ConcurrentHashMap<String, Entity> map = new ConcurrentHashMap<>();/*** 线程池大小*/private static final int POOL_SIZE = 5;/*** 过期时间:一分钟*/public static final int EXPIRE = 1;private final static ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(POOL_SIZE,new BasicThreadFactory.Builder().namingPattern("cancelOrder-schedule-pool-%d").daemon(true).build());public static ScheduledExecutorService getExecutor() {return executor;}/*** 读取缓存*/public static String get(String key) {Entity entity = map.get(key);return entity == null ? null : entity.getOrderExpirationTime();}/*** 放入缓存*/public static void put(String orderNumber,Future future) {Date newDate = DateUtil.offset(DateUtil.date(), DateField.MINUTE, EXPIRE);String orderExpirationTime = DateUtil.formatDateTime(newDate);map.put(orderNumber, new Entity(orderExpirationTime, future));}/*** 清除缓存并取消定时任务* @param orderNumber 订单号* @param mayInterruptIfRunning 是否中断正在执行的任务*/public static void remove(String orderNumber, boolean mayInterruptIfRunning) {Entity entity = map.remove(orderNumber);if (entity == null) {return;}Future future = entity.getFuture();if (future != null) {// 注意:传入true会中断正在执行的任务future.cancel(mayInterruptIfRunning);}}/*** 获取订单剩余取消时间** @param orderNumber 订单号*/public static String getOrderExpirationTime(String orderNumber) {long remainMinute = 0;long remainSecond = 0;String orderExpirationTime = get(orderNumber);if (StringUtils.isNotBlank(orderExpirationTime)) {long s = DateUtil.parse(orderExpirationTime).getTime();long between = (s - System.currentTimeMillis()) / 1000;remainMinute = between / 60 % 60;remainSecond = between % 60;}return "订单过期还剩:" + remainMinute + "分钟" + remainSecond + "秒";}}
import cn.hutool.core.lang.Console;
import cn.hutool.core.thread.ConcurrencyTester;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.IdUtil;
import com.zm.demo.util.CancelOrderTimer;
import com.zm.demo.util.RedisUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;@RestController
@Validated
public class DelayTestController {public static Logger log = LoggerFactory.getLogger(CancelOrderTimer.class);@Autowiredprivate RedisUtil RedisUtil;@GetMapping("/put_timer")public String hello1(@RequestParam String orderNumber){ConcurrencyTester tester = ThreadUtil.concurrencyTest(100, () -> {// 测试的逻辑内容putTimer(IdUtil.simpleUUID());});// 获取总的执行时间,单位毫秒Console.log(tester.getInterval());return "ok";}@GetMapping("/expiration_time")public String hello2(@RequestParam String orderNumber){return CancelOrderTimer.getOrderExpirationTime(orderNumber);}public void putTimer(String orderNumber) {CancelOrderTimer.remove(orderNumber,false);ScheduledExecutorService executor = CancelOrderTimer.getExecutor();Future future = executor.schedule(() -> {try {synchronized (CancelOrderTimer.class) {// 这里用 redis计录 任务执行成功的次数RedisUtil.incrBy("one",1);log.info(orderNumber + "订单号成功执行取消订单");}} catch (Exception e) {e.printStackTrace();log.error(orderNumber + "订单号执行取消订单失败");}}, CancelOrderTimer.EXPIRE, TimeUnit.MINUTES);CancelOrderTimer.put(orderNumber,future);log.info( orderNumber + "订单号启动取消订单定时器");}
}

redis

链接: 超简单使用redisson延迟队列做定时任务.

<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.15.3</version>
</dependency>
spring:redis:host: xxxxxxxport: 6379timeout: 5000password: xxxxxx
import java.io.Serializable;public class TaskBodyDTO implements Serializable {private String orderNumber;public String getOrderNumber() {return orderNumber;}public void setOrderNumber(String orderNumber) {this.orderNumber = orderNumber;}
}
/*** 队列事件监听接口,需要实现这个方法** @param <T>*/
public interface RedisDelayedQueueListener<T> {/*** 执行方法** @param t*/void invoke(T t);
}
import com.zm.demo.util.RedisUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class TestListener1 implements RedisDelayedQueueListener<TaskBodyDTO>{public static Logger log = LoggerFactory.getLogger(TestListener1.class);@Autowiredprivate RedisUtil RedisUtil;@Overridepublic void invoke(TaskBodyDTO taskBodyDTO) {RedisUtil.incrBy("one",1);log.info( taskBodyDTO.getOrderNumber() + "订单号成功执行取消订单");}}
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;import java.util.Map;
import java.util.concurrent.*;/*** 初始化队列监听*/
@Component
public class RedisDelayedQueueInit implements ApplicationContextAware {private static Logger logger = LoggerFactory.getLogger(RedisDelayedQueueInit.class);@AutowiredRedissonClient redissonClient;/*** corePoolSize:核心线程数** maximumPoolSize:最大线程数** keepAliveTime + unit:线程回收时间** workQueue:任务较多时暂存到队列** threadFactory:执行程序创建新线程时使用的工厂** handler:超出线程池容量以及队列长度后拒绝任务的策略*/private final static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("judge-pool-%d").setUncaughtExceptionHandler((thread, throwable)->logger.error("ThreadPool {} got exception", thread,throwable)).build();// 创建线程池,使⽤有界阻塞队列防⽌内存溢出private final static ExecutorService statsThreadPool = new ThreadPoolExecutor(5, 10,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(100), namedThreadFactory);/*** 获取应用上下文并获取相应的接口实现类并启动对应的监听线程* @param applicationContext* @throws BeansException*/@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {Map<String, RedisDelayedQueueListener> map = applicationContext.getBeansOfType(RedisDelayedQueueListener.class);for (Map.Entry<String, RedisDelayedQueueListener> taskEventListenerEntry : map.entrySet()) {String listenerName = taskEventListenerEntry.getValue().getClass().getName();startThread(listenerName, taskEventListenerEntry.getValue());}}/*** 启动线程获取队列*** @param queueName                 queueName* @param redisDelayedQueueListener 任务回调监听* @param <T>                       泛型* @return*/private <T> void startThread(String queueName, RedisDelayedQueueListener redisDelayedQueueListener) {RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);//由于此线程需要常驻,可以新建线程,不用交给线程池管理Thread thread = new Thread(() -> {logger.info("启动监听队列线程" + queueName);while (true) {try {T t = blockingFairQueue.take();statsThreadPool.execute(() -> {redisDelayedQueueListener.invoke(t);});} catch (Exception e) {logger.info("监听队列线程错误,", e);try {Thread.sleep(10000);} catch (InterruptedException ex) {}}}});thread.setName(queueName);thread.start();}}
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.concurrent.TimeUnit;@Component
public class RedisDelayedQueue {@AutowiredRedissonClient redissonClient;private static Logger logger = LoggerFactory.getLogger(RedisDelayedQueue.class);/*** 添加队列** @param t        DTO传输类* @param delay    时间数量* @param timeUnit 时间单位* @param <T>      泛型*/public <T> void addQueue(T t, long delay, TimeUnit timeUnit, String queueName) {RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);delayedQueue.offer(t, delay, timeUnit);}}
@AutowiredRedisDelayedQueue redisDelayedQueue;@GetMapping("/redis/put_timer")public String hello(){ConcurrencyTester tester = ThreadUtil.concurrencyTest(100, () -> {TaskBodyDTO taskBody = new TaskBodyDTO();taskBody.setOrderNumber(IdUtil.simpleUUID());redisDelayedQueue.addQueue(taskBody, CancelOrderTimer.EXPIRE, TimeUnit.MINUTES, TestListener1.class.getName());});Console.log("总的执行时间:"+tester.getInterval());return "ok";}

消息中间件

RabbitMQ 实现延迟队列

导入依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

绑定交换机和队列的关系

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitConfig {public static final String exchange_name = "fanout_order_exchange";public static final String dead_exchange_name = "dead_order_exchange";public static final String dead_rout_key = "dead_order";/*** 配置交换机*/@Beanpublic FanoutExchange fanoutOrderExchange() {return new FanoutExchange(exchange_name, true, false);}/*** 配置ttl队列 存放订单 设置1分钟投入 死信队列*/@Beanpublic Queue ttlQueue() {// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。//一般设置一下队列的持久化就好,其余两个就是默认falseMap<String,Object> args = new HashMap<>();// 1000/秒args.put("x-message-ttl",60000);args.put("x-dead-letter-exchange",dead_exchange_name);args.put("x-dead-letter-routing-key",dead_rout_key);return new Queue("cancel.fanout.queue", true, false ,false, args);}/*** 将队列和交换机绑定*/@Beanpublic Binding bindingFanout() {return BindingBuilder.bind(ttlQueue()).to(fanoutOrderExchange());}/*** 配置死信交换机*/@Beanpublic DirectExchange deadExchange() {return new DirectExchange(dead_exchange_name, true, false);}/*** 配置死信队列*/@Beanpublic Queue cancelQueue() {// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。//一般设置一下队列的持久化就好,其余两个就是默认falsereturn new Queue("cancel.direct.queue", true);}/*** 将队列和交换机绑定* @return*/@Beanpublic Binding bindingDirect() {return BindingBuilder.bind(cancelQueue()).to(deadExchange()).with(dead_rout_key);}}

import cn.hutool.core.util.IdUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class OrderService {public static Logger log = LoggerFactory.getLogger(OrderService .class);@Autowiredprivate RabbitTemplate rabbitTemplate;public void makeOrder() {String orderNumber = IdUtil.simpleUUID();// convertSendAndReceive(…):可以同步消费者。使用此方法,当确认了所有的消费者都接收成功之后,才触发另一个convertSendAndReceive(…),也就是才会接收下一条消息。RPC调用方式。// convertAndSend(…):使用此方法,交换机会马上把所有的信息都交给所有的消费者,消费者再自行处理,不会因为消费者处理慢而阻塞线程。rabbitTemplate.convertAndSend(RabbitConfig.exchange_name, "", orderNumber);log.info(orderNumber + "订单号启动取消订单定时器");}}

import com.zm.demo.util.RedisUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class CancelOrderService {public static Logger log = LoggerFactory.getLogger(CancelOrderService .class);@Autowiredprivate RedisUtil RedisUtil;/*** @RabbitListener 监听队列* @RabbitHandler 代表此方法是一个消息接收的方法;该不要有返回值*/@RabbitListener(queues = "cancel.direct.queue")@RabbitHandlerpublic void invoke(String message){RedisUtil.incrBy("one",1);log.info(message + "订单号成功执行取消订单");}}
    @AutowiredOrderService OrderService;@GetMapping("/mq/put_timer")public String mq(){ConcurrencyTester tester = ThreadUtil.concurrencyTest(1000, () -> {OrderService.makeOrder();});Console.log("总的执行时间:"+tester.getInterval());return "ok";}

使用死信队列实现延时消息的缺点:

1) 如果统一用队列来设置消息的TTL,当梯度非常多的情况下,比如1分钟,2分钟,5分钟,10分钟,20分钟,30分钟……需要创建很多交换机和队列来路由消息。

2) 如果单独设置消息的TTL,则可能会造成队列中的消息阻塞——前一条消息没有出队(没有被消费),后面的消息无法投递。

3) 可能存在一定的时间误差。

并发测试对比:服务器性能 1核2G

定时器线程池 100并发
添加任务:87ms
执行任务:8秒

redis 100并发
添加任务:698ms
执行任务:8秒

RabbitMQ 100并发
添加任务:6秒
执行任务:8秒

定时器线程池 1000并发
添加任务:202ms
执行任务:1分钟30秒

redis 1000并发
添加任务:3秒
执行任务:1分钟30秒

RabbitMQ 1000并发
添加任务:1分钟多
执行任务:2分钟多

RabbitMQ 为什么这么慢 代码有问题还是什么原因 有待研究~

具体选择看具体需求及场景

定时器线程池

优点:
使用简单
支持停止任务
执行时间较为准时

缺点:
任务数量大时,占用大量内存
一旦宕机或者执行任务失败,无法重新执行任务,需要写补偿机制
存在较小时间误差

redis 延迟队列

优点:
redis 执行效率快
支持分布式
消息持久化

缺点:
编码实现稍微复杂
没有确认消息可靠消费机制,需要写补偿机制
无法取消任务
存在较小时间误差

RabbitMQ 延迟队列

优点:
解耦
通过生产者可靠消息投递和消费者可靠消息确认机制能确保任务稳定执行
消息持久化
支持分布式
能接收大量的消息

缺点:
编码实现复杂
无法取消任务
存在时间误差

java业务场景-实现订单超时关闭等延时队列操作的几种方式相关推荐

  1. Java操作Excel三种方式POI、Hutool、EasyExcel

    Java操作Excel三种方式POI.Hutool.EasyExcel 1. Java操作Excel概述 1.1 Excel需求概述 1.2 Excel操作三种方式对比 2. ApachePOIExc ...

  2. java制作oracle程序,Java程序操作Oracle两种方式之简单实现

    Java程序操作Oracle两种方式之简单实现 1.通过JDBC-ODBC桥连接Oracle数据库 (1)创建odbc源,在控制面板->管理工具->数据源(odbc)中添加DSN,比如取名 ...

  3. ribbonmq超时配置_使用RabbitMQ实现订单超时取消(延迟队列)

    使用RabbitMQ实现订单超时取消,大致流程: 生产者生产一条设置了TTL的延迟取消订单消息=>延迟队列交换机(通过绑定路由键)=>消息投递至延迟队列=>消息延迟队列时间到期=&g ...

  4. java websocket注解_【websocket】spring boot 集成 websocket 的四种方式

    集成 websocket 的四种方案 1. 原生注解 pom.xml org.springframework.boot spring-boot-starter-websocket WebSocketC ...

  5. springboot Java实现多文件的zip压缩操作 + 通过浏览器下载文件的两种方式

    注只适配utf-8的场景,待完善! 压缩为zip文件 通过java程序输出文件 /*** 功能:压缩多个文件成一个zip文件* @param srcfile:源文件列表* @param zipfile ...

  6. java将a对象转换为b对象_Java 对象的深复制五种方式

    1. 概述 在实际编程过程中,我们常常要遇到这种情况:有一个对象A,在某一时刻A中已经包含了一些有效值,此时可能 会需要一个和A完全相同新对象B,并且此后对B任何改动都不会影响到A中的值,也就是说,A ...

  7. java在linux创建文件_Java中创建并写文件的5种方式

    导读 在java中有很多的方法可以创建文件写文件,你是否真的认真的总结过?下面笔者就帮大家总结一下java中创建文件的五种方法. 在java中有很多的方法可以创建文件写文件,你是否真的认真的总结过?下 ...

  8. java并发编程基础系列(五): 创建线程的四种方式

    线程的创建一共有四种方式: 继承于Thread类,重写run()方法: 实现Runable接口,实现里面的run()方法: 使用 FutureTask 实现有返回结果的线程 使用ExecutorSer ...

  9. java oom dump_JVM 在遇到OOM(OutOfMemoryError)时生成Dump文件的三种方式

    JVM 在遇到OOM(OutOfMemoryError)时生成Dump文件的三种方式,以及如何使用Eclips Memory Analyzer(MAT)插件进行堆内存分析. 方法一: jmap -du ...

  10. java applet怎么运行_Java如何运行Applet?运行Applet的两种方式

    详细内容 applet是一个小型的动态Java程序,那么如何运行Applet?本篇文章就给大家介绍Applet,让大家了解Applet是什么,运行Applet的方法,希望对你们有所帮助. Java a ...

最新文章

  1. 一场稳定、高清、流畅的大型活动直播是怎么炼成的?
  2. 如何看exe文件源代码_杀进程、删文件...看新型勒索软件RobbinHood如何干掉杀毒软件...
  3. “AI+”农业向农民致敬-丰收节交易会:谋定工业反哺农业
  4. 【机器学习】降维代码练习
  5. 泛型的作用是什么?——Java系列学习笔记
  6. 2020年终回顾:时间会回答成长,成长会回答梦想
  7. 链表之打印两个有序链表的公共部分
  8. 郭凯天:中国公益慈善行业数字化观察与思考
  9. 【链表】链表中环的入口结点
  10. zynq 和fpga区别_FPGA复位的正确打开方式
  11. 浅谈Android onTouchEvent 与 onInterceptTouchEvent的区别详解
  12. happy number(快乐数)
  13. 肝了一天一夜 吐血整理的超级实用的Web前端面试题总结
  14. office2010常见问题集锦
  15. DS18B20数字温度传感器
  16. ffprobe获取视频帧信息中的pkt_pts、pkt_pts_time
  17. 大数据学习笔记(十)-Hive中的Storage format
  18. 摩托车无钥匙启动解决方案设计
  19. 物联网系统开发的11个步骤【看评论区里领取项目资料】
  20. 跨域解决方案PHP,跨域解决方案

热门文章

  1. JavaScript中的面向对象编程
  2. JavaScript 读写剪贴板之方式汇总
  3. 学习报告:基于原型网络的小样本学习《Prototypical Networks for Few-shot Learning》
  4. 十大重要IT公司排名 -2009
  5. 浪潮云服务器安装win7系统,WIN7旗舰版操作系统中浪潮ERP_GS5.2安装说明.doc
  6. html- 颜色代码
  7. 中国近代史纲要 期末复习
  8. 谷歌地图 经纬加密_Google开始加密搜索
  9. 多智能体系统的分布式协同控制——采样控制、脉冲控制、弹性控制
  10. (转)三维GIS软件十九重唱