java多线程执行任务(工具升级版)
java多线程执行任务(工具升级版)
昨天写的java多线程执行任务(工具)但是不能符合顺序执行计划的场景,下面升级一下原工具
java多线程执行任务(工具)
更新java多线程执行任务(工具再升级版)
之前只支持两种模式,新工具支持四种模式
执行模式:
- 1:所有任务信息都执行
- 2:先执行部分任务,执行完后再执行其他任务
- 3:所有任务信息都执行,但是顺序执行每个任务中的计划
- 4:顺序先执行执行任务中的计划,执行完后再顺序执行其他任务
模式3,4在模式1,2上顺序执行每个任务中的计划
实现原理如图:
接着上代码
Scheder
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.RandomUtil;
import lombok.extern.slf4j.Slf4j;import java.util.ArrayList;
import java.util.LinkedList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;/*** @Author: dinghao* @Date: 2022/3/7 15:29*/
@Slf4j
public class Scheder {/*** 任务信息数组*/private TaskInfo[] taskInfos;/*** 执行计划队列数组*/private LinkedBlockingQueue<Plan>[] planQueueArray;/*** 队列的数量*/int queueNum;/*** 队列的容量*/int queueSize;/*** 允许并行执行的线程池*/private ExecutorService loopExecutor;/*** 允许并行执行的线程数*/private int nThrends;/*** 执行模式:* 1:所有任务信息都执行* 2:先执行部分任务,执行完后再执行其他任务* 3:顺序执行任务中的计划* 4:顺序先执行执行任务中的计划,执行完后再顺序执行其他任务*/private int model;/*** 每批执行的任务数量*/private int modelSize;/*** model = 2,4 时有效*/private ArrayList<Integer> indexList = new ArrayList<>();/*** 协助判断,是否线程池的任务全部结束*/private AtomicInteger count;/*** 调度器的执行状态*/private volatile boolean status = false;/*** 构造器** @param taskInfos 任务数组* @param nThrends 同时执行任务中的计划线程数* @param queueSize 计划执行队列* @param model 执行模式 1:所有任务信息都执行 2:先执行部分任务,执行完后再执行其他任务* @param modelSize 每批执行任务的数量*/public Scheder(TaskInfo[] taskInfos, int nThrends, int queueNum, int queueSize, int model, Integer modelSize) {this.taskInfos = taskInfos;this.nThrends = nThrends;this.queueNum = queueNum;this.queueSize = queueSize;this.loopExecutor = Executors.newFixedThreadPool(this.nThrends);this.model = model;if (this.model < 3) {this.planQueueArray = new LinkedBlockingQueue[1];this.planQueueArray[0] = new LinkedBlockingQueue<>(this.queueSize);} else {// 初始化队列数组this.planQueueArray = new LinkedBlockingQueue[this.queueNum];IntStream.range(0, this.queueNum).forEach(i -> this.planQueueArray[i] = new LinkedBlockingQueue<>(this.queueSize));}// modelSize只有在等于2,4有效if (this.model == 2 || this.model == 4) {this.modelSize = modelSize > taskInfos.length ? taskInfos.length : modelSize;}count = countPlan();}/*** 计算一共有多少执行计划** @return /*/private AtomicInteger countPlan() {int sum = 0;for (int i = 0; i < this.taskInfos.length; i++) {sum += this.taskInfos[i].getPlanQueue().size();}return new AtomicInteger(sum);}public Scheder(TaskInfo[] taskInfos, int nThrends, int model, Integer modelSize) {this(taskInfos, nThrends, nThrends, 100, model, modelSize);}public Scheder(TaskInfo[] taskInfos, int nThrends) {this(taskInfos, nThrends, nThrends, 100, 1, null);}public Scheder(TaskInfo[] taskInfos) {this(taskInfos, 10, 10, 100, 1, null);}public void setModel(int model) {this.model = model;}public int getModel() {return model;}public void setModelSize(int modelSize) {this.modelSize = modelSize;}public int getModelSize() {return modelSize;}public ArrayList<Integer> getIndexList() {return indexList;}public AtomicInteger getCount() {return count;}public boolean isStatus() {return status;}/*** 执行方法*/public void run() {if (this.status) {log.warn("任务处于启动状态");return;}this.status = true;// 开启向队列中添加执行计划线程init();// 循环执行执行计划while (this.status) {// 所有执行计划执行完后,退出if (this.taskInfos.length <= 0) {if (this.model < 3) {if (this.planQueueArray[0].size() == 0) {this.status = false;break;}} else {ArrayList<Integer> notEmptyIndex = getNotEmptyIndex(this.planQueueArray);if (CollUtil.isEmpty(notEmptyIndex)) {this.status = false;break;}}}// 执行计划execute();}int size;// 所有线程执行完毕出循环for (; ; ) {size = this.count.get();if (size == 0) {break;}}//停止线程池this.loopExecutor.shutdownNow();for (; ; ) {//只有当线程池中所有线程完成任务时才会返回true,并且需要先调用线程池的shutdown方法或者shutdownNow方法。if (this.loopExecutor.isTerminated()) {System.out.println("执行结束!");break;}}}private void execute() {if (this.model < 3) {try {// 获取一个执行计划Plan plan = this.planQueueArray[0].take();// 执行计划this.loopExecutor.execute(() -> plan.run0(this.count));} catch (InterruptedException e) {log.error("任务执行中发生异常", e);}} else {this.loopExecutor.execute(() -> {try {// 获取一个执行计划Plan plan = null;// 获取线程idString name = Thread.currentThread().getName();int lastIndexOf = name.lastIndexOf("-");int id = Integer.parseInt(name.substring(lastIndexOf + 1));ArrayList<Integer> notEmptyIndex2 = getNotEmptyIndex(this.planQueueArray);Integer index = notEmptyIndex2.stream().filter(item -> item % this.nThrends == (id - 1)).findAny().orElse(null);if (index == null) {return;}LinkedBlockingQueue<Plan> plans = this.planQueueArray[index];if (plans.size() > 0) {plan = plans.take();plan.run0(this.count);}} catch (InterruptedException e) {log.error("任务执行中发生异常", e);}});}}private ArrayList<Integer> getNotEmptyIndex(LinkedBlockingQueue<Plan>[] planQueueArray) {ArrayList<Integer> indexArray = new ArrayList<>();for (int i = 0; i < planQueueArray.length; i++) {if (!CollUtil.isEmpty(planQueueArray[i])) {indexArray.add(i);}}return indexArray;}/*** 开启一个线程,持续向执行计划队列添加执行计划,直到所有的计划任务添加完*/private void init() {new Thread(() -> {while (this.status) {// 任务信息数组数量int length = this.taskInfos.length;// 执行完结束线程if (length <= 0) {break;}// 获取添加执行计划的的任务索引值int index = getIndexOfModel(this.model, length);TaskInfo taskInfo = null;try {taskInfo = this.taskInfos[index];} catch (Exception e) {e.printStackTrace();}LinkedList<Plan> plans = taskInfo.getPlanQueue();if (plans.size() > 0) {try {if (this.model >= 3) {int index2 = taskInfo.getId() % this.planQueueArray.length;this.planQueueArray[index2].put(plans.removeFirst());} else {this.planQueueArray[0].put(plans.removeFirst());}} catch (InterruptedException e) {log.error("向执行计划队列放入计划异常", e);}} else {this.taskInfos = reBuildTaskInfos(this.taskInfos, index);}}}).start();}/*** 根据执行模式获取添加执行计划的的任务信息索引值** @param model 执行模式* @param length 任务信息数组数量* @return 任务信息索引值*/private int getIndexOfModel(int model, int length) {if (model == 1 || model == 3) {return RandomUtil.randomInt(0, length) % length;} else {this.indexList.removeIf(item -> item >= length);if (this.indexList.size() < this.modelSize) {int index = RandomUtil.randomInt(0, length) % length;this.indexList.add(index);return index;} else {return this.indexList.get(RandomUtil.randomInt(0, length) % this.indexList.size());}}}/*** 重新构建任务信息数组** @param taskInfos 原来任务信息数组* @param index 需要移除的任务信息* @return 新的任务信息数组*/private TaskInfo[] reBuildTaskInfos(TaskInfo[] taskInfos, int index) {TaskInfo[] newTaskINfo = new TaskInfo[taskInfos.length - 1];for (int j = 0, i = 0; i < taskInfos.length; i++) {if (i != index) {newTaskINfo[j] = taskInfos[i];j++;}}return newTaskINfo;}}
TaskInfo
import lombok.Data;import java.util.LinkedList;/*** @Author: dinghao* @Date: 2022/3/7 15:31*/
@Data
public class TaskInfo {/*** 唯一标识*/private int id;/*** 任务名称*/private String name;/*** 执行计划队列*/private LinkedList<Plan> planQueue;public TaskInfo(int id, String name, LinkedList<Plan> planQueue) {this.id = id;this.name = name;this.planQueue = planQueue;}
}
这里加了id属性
Plan
import java.util.concurrent.atomic.AtomicInteger;/*** @Author: dinghao* @Date: 2022/3/7 15:37*/
public interface Plan {/*** 线程池执行前*/default void before(){}void run();/*** 线程池执行后*/default void after(){}default void run0(AtomicInteger atomicInteger) {try{before();run();}finally {after();atomicInteger.decrementAndGet();}}}
修改完成
实现自己的计划
MyPlan
import lombok.Data;/*** @Author: dinghao* @Date: 2022/3/9 10:33*/
@Data
public class MyPlan implements Plan {private String name;@Overridepublic void run() {// if( name.startsWith("用户99")){System.out.println(Thread.currentThread().getName() + ":" + name);
// }}
}
Test
import java.util.LinkedList;
import java.util.stream.IntStream;/*** @Author: dinghao* @Date: 2022/3/9 14:52*/
public class Test {public static void main(String[] args) {int userSize = 100;int jobSize = 1000;TaskInfo[] taskInfos = new TaskInfo[userSize];IntStream.range(0, userSize).parallel().forEach(i -> {LinkedList<Plan> plans = new LinkedList<>();for (int j = 0; j < jobSize; j++) {MyPlan myPlan = new MyPlan();myPlan.setName("用户" + i + ",执行计划" + j);plans.add(myPlan);}taskInfos[i] = new TaskInfo(i, "用户" + i, plans);});Scheder scheder = new Scheder(taskInfos, 3, 10, 100, 3, 2);scheder.run();}
}
测试结果:
java多线程执行任务(工具升级版)相关推荐
- JAVA多线程执行,等待返回结果,再执行
JAVA多线程执行,等待返回结果,再执行 1.实现callable接口 1)配置线程池 package com.neusoft.demo.server.config;import org.spring ...
- java new 多线程_Java多线程:Java多线程执行框架
为什么需要执行框架呢? 使用一般的new方法来创建线程有什么问题呢?一般的new线程的方式一般要给出一个实现了Runnable接口的执行类,在其中重写run()方法,然后再在将这个执行类的对象传给线程 ...
- 使用java多线程分批处理数据工具类
最近由于业务需要,数据量比较大,需要使用多线程来分批处理,提高处理效率和能力,于是就写了一个通用的多线程处理工具,只需要实现自己的业务逻辑就可以正常使用,现在记录一下 主要是针对大数据量list,将l ...
- JAVA多线程-常用JUC工具类及阻塞队列
工具类 CountDownLactch 简述 允许一个或多个线程等待,直到在其他线程中执行的一组操作完成,同步辅助. CountDownLatch 类用给定的计数初始化. await 方法阻塞,直到由 ...
- java多线程aqs实现工具类_Java并发多线程 - 并发工具类JUC
(adsbygoogle = window.adsbygoogle || []).push({}); 安全共享对象策略 1.线程限制 : 一个被线程限制的对象,由线程独占,并且只能被占有它的线程修改 ...
- java多线程执行任务
前言:在循环执行一些耗时任务时,都是在同步发生,效率比较底下,所以可以采用创建多线程来执行任务,增加执行效率 // 任务集合List<Object> list = new ArrayLis ...
- 使用Java多线程实现任务分发
多线程下载由来已久,如 FlashGet.NetAnts 等工具,它们都是依懒于 HTTP 协议的支持(Range 字段指定请求内容范围),首先能读取出请求内容 (即欲下载的文件) 的大小,划分出若干 ...
- Java 多线程与并发编程专题
Java 线程基础 Java 多线程开发 线程安全与同步 并发控制 非阻塞套接字(NIO) Java 5 中的并发 JDK 7 中的 Fork/Join 模式 相关书评 Java 平台提供了一套广泛而 ...
- Java自带的多线程监控分析工具(VisualVM)
转自: http://www.udpwork.com/item/1105.html 在java多线程程序运行时,多数情况下我们不知道到底发生了什么,只有出了错误的日志的时候,我们才知道原来代码中有死锁 ...
最新文章
- R语言grafify包简单、快速绘制19个漂亮的统计图实战
- python清空字典保留变量方法_python学习day06--02字典增删差改以及字符串的一些方法...
- Socket 异步通信编程
- 开启HDR视觉盛宴:究竟什么视频算得上HDR?
- taylor+swift纽约公寓_国际巨星Taylor Swift有多爱豪宅?才30岁就有8套豪宅
- app传输数据到php,安卓app客户端和使用php的服务器端数据交互
- go语言 gosched
- ios上的pvr与png
- MySQL二十八规范数据库设计
- linux 设置更新源为cd,技术|如何修复 apt-get update 无法添加新的 CD-ROM 的错误
- hash和encrypt区别及应用_转
- Linux完全卸载mysql的方式
- VMware Cloud Director 10.3 发布(下载) - 云计算调配和管理平台
- 学3D建模需要多久?
- IDM与其他下载器加速器优缺点介绍
- android sdk引入 微信分享_android 调用本地微信自定义多图分享朋友圈,可放在share sdk中一起使用...
- 我的第一个油猴脚本--微博超话自动签到
- 极客爱情前传:程序员应该送什么礼物给女朋友
- 0 在c语言中有什么作用,\0在c语言中代表什么?
- 未来,大数据行业工资会断崖式下滑吗?