java多线程执行任务(工具升级版)

昨天写的java多线程执行任务(工具)但是不能符合顺序执行计划的场景,下面升级一下原工具

java多线程执行任务(工具)

更新java多线程执行任务(工具再升级版)

之前只支持两种模式,新工具支持四种模式

执行模式:

  • 1:所有任务信息都执行
  • 2:先执行部分任务,执行完后再执行其他任务
  • 3:所有任务信息都执行,但是顺序执行每个任务中的计划
  • 4:顺序先执行执行任务中的计划,执行完后再顺序执行其他任务

模式3,4在模式1,2上顺序执行每个任务中的计划

实现原理如图:

接着上代码

Scheder

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.RandomUtil;
import lombok.extern.slf4j.Slf4j;import java.util.ArrayList;
import java.util.LinkedList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;/*** @Author: dinghao* @Date: 2022/3/7 15:29*/
@Slf4j
public class Scheder {/*** 任务信息数组*/private TaskInfo[] taskInfos;/*** 执行计划队列数组*/private LinkedBlockingQueue<Plan>[] planQueueArray;/*** 队列的数量*/int queueNum;/*** 队列的容量*/int queueSize;/*** 允许并行执行的线程池*/private ExecutorService loopExecutor;/*** 允许并行执行的线程数*/private int nThrends;/*** 执行模式:* 1:所有任务信息都执行* 2:先执行部分任务,执行完后再执行其他任务* 3:顺序执行任务中的计划* 4:顺序先执行执行任务中的计划,执行完后再顺序执行其他任务*/private int model;/*** 每批执行的任务数量*/private int modelSize;/*** model = 2,4 时有效*/private ArrayList<Integer> indexList = new ArrayList<>();/*** 协助判断,是否线程池的任务全部结束*/private AtomicInteger count;/*** 调度器的执行状态*/private volatile boolean status = false;/*** 构造器** @param taskInfos 任务数组* @param nThrends  同时执行任务中的计划线程数* @param queueSize 计划执行队列* @param model     执行模式 1:所有任务信息都执行 2:先执行部分任务,执行完后再执行其他任务* @param modelSize 每批执行任务的数量*/public Scheder(TaskInfo[] taskInfos, int nThrends, int queueNum, int queueSize, int model, Integer modelSize) {this.taskInfos = taskInfos;this.nThrends = nThrends;this.queueNum = queueNum;this.queueSize = queueSize;this.loopExecutor = Executors.newFixedThreadPool(this.nThrends);this.model = model;if (this.model < 3) {this.planQueueArray = new LinkedBlockingQueue[1];this.planQueueArray[0] = new LinkedBlockingQueue<>(this.queueSize);} else {// 初始化队列数组this.planQueueArray = new LinkedBlockingQueue[this.queueNum];IntStream.range(0, this.queueNum).forEach(i -> this.planQueueArray[i] = new LinkedBlockingQueue<>(this.queueSize));}// modelSize只有在等于2,4有效if (this.model == 2 || this.model == 4) {this.modelSize = modelSize > taskInfos.length ? taskInfos.length : modelSize;}count = countPlan();}/*** 计算一共有多少执行计划** @return /*/private AtomicInteger countPlan() {int sum = 0;for (int i = 0; i < this.taskInfos.length; i++) {sum += this.taskInfos[i].getPlanQueue().size();}return new AtomicInteger(sum);}public Scheder(TaskInfo[] taskInfos, int nThrends, int model, Integer modelSize) {this(taskInfos, nThrends, nThrends, 100, model, modelSize);}public Scheder(TaskInfo[] taskInfos, int nThrends) {this(taskInfos, nThrends, nThrends, 100, 1, null);}public Scheder(TaskInfo[] taskInfos) {this(taskInfos, 10, 10, 100, 1, null);}public void setModel(int model) {this.model = model;}public int getModel() {return model;}public void setModelSize(int modelSize) {this.modelSize = modelSize;}public int getModelSize() {return modelSize;}public ArrayList<Integer> getIndexList() {return indexList;}public AtomicInteger getCount() {return count;}public boolean isStatus() {return status;}/*** 执行方法*/public void run() {if (this.status) {log.warn("任务处于启动状态");return;}this.status = true;// 开启向队列中添加执行计划线程init();// 循环执行执行计划while (this.status) {// 所有执行计划执行完后,退出if (this.taskInfos.length <= 0) {if (this.model < 3) {if (this.planQueueArray[0].size() == 0) {this.status = false;break;}} else {ArrayList<Integer> notEmptyIndex = getNotEmptyIndex(this.planQueueArray);if (CollUtil.isEmpty(notEmptyIndex)) {this.status = false;break;}}}// 执行计划execute();}int size;// 所有线程执行完毕出循环for (; ; ) {size = this.count.get();if (size == 0) {break;}}//停止线程池this.loopExecutor.shutdownNow();for (; ; ) {//只有当线程池中所有线程完成任务时才会返回true,并且需要先调用线程池的shutdown方法或者shutdownNow方法。if (this.loopExecutor.isTerminated()) {System.out.println("执行结束!");break;}}}private void execute() {if (this.model < 3) {try {// 获取一个执行计划Plan plan = this.planQueueArray[0].take();// 执行计划this.loopExecutor.execute(() -> plan.run0(this.count));} catch (InterruptedException e) {log.error("任务执行中发生异常", e);}} else {this.loopExecutor.execute(() -> {try {// 获取一个执行计划Plan plan = null;// 获取线程idString name = Thread.currentThread().getName();int lastIndexOf = name.lastIndexOf("-");int id = Integer.parseInt(name.substring(lastIndexOf + 1));ArrayList<Integer> notEmptyIndex2 = getNotEmptyIndex(this.planQueueArray);Integer index = notEmptyIndex2.stream().filter(item -> item % this.nThrends == (id - 1)).findAny().orElse(null);if (index == null) {return;}LinkedBlockingQueue<Plan> plans = this.planQueueArray[index];if (plans.size() > 0) {plan = plans.take();plan.run0(this.count);}} catch (InterruptedException e) {log.error("任务执行中发生异常", e);}});}}private ArrayList<Integer> getNotEmptyIndex(LinkedBlockingQueue<Plan>[] planQueueArray) {ArrayList<Integer> indexArray = new ArrayList<>();for (int i = 0; i < planQueueArray.length; i++) {if (!CollUtil.isEmpty(planQueueArray[i])) {indexArray.add(i);}}return indexArray;}/*** 开启一个线程,持续向执行计划队列添加执行计划,直到所有的计划任务添加完*/private void init() {new Thread(() -> {while (this.status) {// 任务信息数组数量int length = this.taskInfos.length;// 执行完结束线程if (length <= 0) {break;}// 获取添加执行计划的的任务索引值int index = getIndexOfModel(this.model, length);TaskInfo taskInfo = null;try {taskInfo = this.taskInfos[index];} catch (Exception e) {e.printStackTrace();}LinkedList<Plan> plans = taskInfo.getPlanQueue();if (plans.size() > 0) {try {if (this.model >= 3) {int index2 = taskInfo.getId() % this.planQueueArray.length;this.planQueueArray[index2].put(plans.removeFirst());} else {this.planQueueArray[0].put(plans.removeFirst());}} catch (InterruptedException e) {log.error("向执行计划队列放入计划异常", e);}} else {this.taskInfos = reBuildTaskInfos(this.taskInfos, index);}}}).start();}/*** 根据执行模式获取添加执行计划的的任务信息索引值** @param model  执行模式* @param length 任务信息数组数量* @return 任务信息索引值*/private int getIndexOfModel(int model, int length) {if (model == 1 || model == 3) {return RandomUtil.randomInt(0, length) % length;} else {this.indexList.removeIf(item -> item >= length);if (this.indexList.size() < this.modelSize) {int index = RandomUtil.randomInt(0, length) % length;this.indexList.add(index);return index;} else {return this.indexList.get(RandomUtil.randomInt(0, length) % this.indexList.size());}}}/*** 重新构建任务信息数组** @param taskInfos 原来任务信息数组* @param index     需要移除的任务信息* @return 新的任务信息数组*/private TaskInfo[] reBuildTaskInfos(TaskInfo[] taskInfos, int index) {TaskInfo[] newTaskINfo = new TaskInfo[taskInfos.length - 1];for (int j = 0, i = 0; i < taskInfos.length; i++) {if (i != index) {newTaskINfo[j] = taskInfos[i];j++;}}return newTaskINfo;}}

TaskInfo

import lombok.Data;import java.util.LinkedList;/*** @Author: dinghao* @Date: 2022/3/7 15:31*/
@Data
public class TaskInfo {/*** 唯一标识*/private int id;/*** 任务名称*/private String name;/*** 执行计划队列*/private LinkedList<Plan> planQueue;public TaskInfo(int id, String name, LinkedList<Plan> planQueue) {this.id = id;this.name = name;this.planQueue = planQueue;}
}

这里加了id属性

Plan


import java.util.concurrent.atomic.AtomicInteger;/*** @Author: dinghao* @Date: 2022/3/7 15:37*/
public interface Plan {/*** 线程池执行前*/default void before(){}void run();/*** 线程池执行后*/default void after(){}default void run0(AtomicInteger atomicInteger) {try{before();run();}finally {after();atomicInteger.decrementAndGet();}}}

修改完成

实现自己的计划

MyPlan

import lombok.Data;/*** @Author: dinghao* @Date: 2022/3/9 10:33*/
@Data
public class MyPlan implements Plan {private String name;@Overridepublic void run() {//        if( name.startsWith("用户99")){System.out.println(Thread.currentThread().getName() + ":" + name);
//        }}
}

Test


import java.util.LinkedList;
import java.util.stream.IntStream;/*** @Author: dinghao* @Date: 2022/3/9 14:52*/
public class Test {public static void main(String[] args) {int userSize = 100;int jobSize = 1000;TaskInfo[] taskInfos = new TaskInfo[userSize];IntStream.range(0, userSize).parallel().forEach(i -> {LinkedList<Plan> plans = new LinkedList<>();for (int j = 0; j < jobSize; j++) {MyPlan myPlan = new MyPlan();myPlan.setName("用户" + i + ",执行计划" + j);plans.add(myPlan);}taskInfos[i] = new TaskInfo(i, "用户" + i, plans);});Scheder scheder = new Scheder(taskInfos, 3, 10, 100, 3, 2);scheder.run();}
}

测试结果:

java多线程执行任务(工具升级版)相关推荐

  1. JAVA多线程执行,等待返回结果,再执行

    JAVA多线程执行,等待返回结果,再执行 1.实现callable接口 1)配置线程池 package com.neusoft.demo.server.config;import org.spring ...

  2. java new 多线程_Java多线程:Java多线程执行框架

    为什么需要执行框架呢? 使用一般的new方法来创建线程有什么问题呢?一般的new线程的方式一般要给出一个实现了Runnable接口的执行类,在其中重写run()方法,然后再在将这个执行类的对象传给线程 ...

  3. 使用java多线程分批处理数据工具类

    最近由于业务需要,数据量比较大,需要使用多线程来分批处理,提高处理效率和能力,于是就写了一个通用的多线程处理工具,只需要实现自己的业务逻辑就可以正常使用,现在记录一下 主要是针对大数据量list,将l ...

  4. JAVA多线程-常用JUC工具类及阻塞队列

    工具类 CountDownLactch 简述 允许一个或多个线程等待,直到在其他线程中执行的一组操作完成,同步辅助. CountDownLatch 类用给定的计数初始化. await 方法阻塞,直到由 ...

  5. java多线程aqs实现工具类_Java并发多线程 - 并发工具类JUC

    (adsbygoogle = window.adsbygoogle || []).push({}); 安全共享对象策略 1.线程限制 : 一个被线程限制的对象,由线程独占,并且只能被占有它的线程修改 ...

  6. java多线程执行任务

    前言:在循环执行一些耗时任务时,都是在同步发生,效率比较底下,所以可以采用创建多线程来执行任务,增加执行效率 // 任务集合List<Object> list = new ArrayLis ...

  7. 使用Java多线程实现任务分发

    多线程下载由来已久,如 FlashGet.NetAnts 等工具,它们都是依懒于 HTTP 协议的支持(Range 字段指定请求内容范围),首先能读取出请求内容 (即欲下载的文件) 的大小,划分出若干 ...

  8. Java 多线程与并发编程专题

    Java 线程基础 Java 多线程开发 线程安全与同步 并发控制 非阻塞套接字(NIO) Java 5 中的并发 JDK 7 中的 Fork/Join 模式 相关书评 Java 平台提供了一套广泛而 ...

  9. Java自带的多线程监控分析工具(VisualVM)

    转自: http://www.udpwork.com/item/1105.html 在java多线程程序运行时,多数情况下我们不知道到底发生了什么,只有出了错误的日志的时候,我们才知道原来代码中有死锁 ...

最新文章

  1. R语言grafify包简单、快速绘制19个漂亮的统计图实战
  2. python清空字典保留变量方法_python学习day06--02字典增删差改以及字符串的一些方法...
  3. Socket 异步通信编程
  4. 开启HDR视觉盛宴:究竟什么视频算得上HDR?
  5. taylor+swift纽约公寓_国际巨星Taylor Swift有多爱豪宅?才30岁就有8套豪宅
  6. app传输数据到php,安卓app客户端和使用php的服务器端数据交互
  7. go语言 gosched
  8. ios上的pvr与png
  9. MySQL二十八规范数据库设计
  10. linux 设置更新源为cd,技术|如何修复 apt-get update 无法添加新的 CD-ROM 的错误
  11. hash和encrypt区别及应用_转
  12. Linux完全卸载mysql的方式
  13. VMware Cloud Director 10.3 发布(下载) - 云计算调配和管理平台
  14. 学3D建模需要多久?
  15. IDM与其他下载器加速器优缺点介绍
  16. android sdk引入 微信分享_android 调用本地微信自定义多图分享朋友圈,可放在share sdk中一起使用...
  17. 我的第一个油猴脚本--微博超话自动签到
  18. 极客爱情前传:程序员应该送什么礼物给女朋友
  19. 0 在c语言中有什么作用,\0在c语言中代表什么?
  20. 未来,大数据行业工资会断崖式下滑吗?

热门文章

  1. 软件测试周刊(第16期):戴着镣铐起舞
  2. Nginx配置数据库服务器反向代理
  3. YOLOv5火焰烟雾检测
  4. 很有爱的输入法BrailleType:让盲人也能打字
  5. 外贸网站的SEO优化该怎么做?
  6. W6 BinarySearch
  7. c语言 计算订单总价
  8. 数组的常用方法(函数)
  9. 鬼谷子的毒鸡汤书,你还在看吗?
  10. C语言如何进行补码运算并举例说明