开发中经常会遇到各种池(如:连接池,线程池),它们的作用就是为了提高性能及减少开销,在JDK1.5以后的java.util.concurrent包中内置了很多不同使用场景的线程池,为了更好的理解它们,自己手写一个线程池,加深印象。

<!-- more -->

概述

1.什么是池

它的基本思想是一种对象池,程序初始化的时候开辟一块内存空间,里面存放若干个线程对象,池中线程执行调度由池管理器来处理。当有线程任务时,从池中取一个,执行完成后线程对象归池,这样可以避免反复创建线程对象所带来的性能开销,节省系统的资源。

2.使用线程池的好处

合理的使用线程池可以重复利用已创建的线程,这样就可以减少在创建线程和销毁线程上花费的时间和资源。并且,线程池在某些情况下还能动态调整工作线程的数量,以平衡资源消耗和工作效率。同时线程池还提供了对池中工作线程进行统一的管理的相关方法。这样就相当于我们一次创建,就可以多次使用,大量的节省了系统频繁的创建和销毁线程所需要的资源。

简易版实现

包含功能:

1.创建线程池,销毁线程池,添加新任务

2.没有任务进入等待,有任务则处理掉

3.动态伸缩,扩容

4.拒绝策略

介绍了线程池的原理以及主要组件之后,就让我们来手动实现一个自己的线程池,以加深理解和深入学习。因为自己实现的简易版本所以不建议生产中使用,生产中使用java.util.concurrent会更加健壮和优雅(后续文章会介绍)

代码

以下线程池相关代码均在SimpleThreadPoolExecutor.java中,由于为了便于解读因此以代码块的形式呈现

维护一个内部枚举类,用来标记当前任务线程状态,在Thread中其实也有.

private enum TaskState {FREE, RUNNABLE, BLOCKED, TERMINATED;
}

定义拒绝策略接口,以及默认实现

static class DiscardException extends RuntimeException {private static final long serialVersionUID = 8827362380544575914L;DiscardException(String message) {super(message);}
}interface DiscardPolicy {//拒绝策略接口void discard() throws DiscardException;
}

任务线程具体实现

1.继承Thread,重写run方法。

2.this.taskState == TaskState.FREE && TASK_QUEUE.isEmpty() 如果当前线程处于空闲状态且没有任何任务了就将它wait住,让出CPU执行权

3.如果有任务就去执行FIFO(先进先出)策略

4.定义close方法,关闭线程,当然这里不能暴力关闭,所以这里有需要借助interrupt

public static class WorkerTask extends Thread {// 线程状态private TaskState taskState;// 线程编号private static int threadInitNumber;/*** 生成线程名,参考Thread.nextThreadNum();** @return*/private static synchronized String nextThreadName() {return THREAD_NAME_PREFIX + (++threadInitNumber);}WorkerTask() {super(THREAD_GROUP, nextThreadName());}@Overridepublic void run() {Runnable target;//说明该线程处于空闲状态OUTER:while (this.taskState != TaskState.TERMINATED) {synchronized (TASK_QUEUE) {while (this.taskState == TaskState.FREE && TASK_QUEUE.isEmpty()) {try {this.taskState = TaskState.BLOCKED;//此处标记//没有任务就wait住,让出CPU执行权TASK_QUEUE.wait();//如果被打断说明当前线程执行了 shutdown() 方法  线程状态为 TERMINATED 直接跳到 while 便于退出} catch (InterruptedException e) {break OUTER;}}target = TASK_QUEUE.removeFirst();//遵循FIFO策略}if (target != null) {this.taskState = TaskState.RUNNABLE;target.run();//开始任务了this.taskState = TaskState.FREE;}}}void close() {//优雅关闭线程this.taskState = TaskState.TERMINATED;this.interrupt();}
}

简易版线程池,主要就是维护了一个任务队列线程集,为了动态扩容,自己也继承了Thread去做监听操作,对外提供submit()提交执行任务shutdown()等待所有任务工作完毕,关闭线程池

public class SimpleThreadPoolExecutor extends Thread {// 线程池大小private int threadPoolSize;// 最大接收任务private int queueSize;// 拒绝策略private DiscardPolicy discardPolicy;// 是否被销毁private volatile boolean destroy = false;private final static int DEFAULT_MIN_THREAD_SIZE = 2;// 默认最小线程数private final static int DEFAULT_ACTIVE_THREAD_SIZE = 5;// 活跃线程private final static int DEFAULT_MAX_THREAD_SIZE = 10;// 最大线程private final static int DEFAULT_WORKER_QUEUE_SIZE = 100;// 最多执行多少任务private final static String THREAD_NAME_PREFIX = "MY-THREAD-NAME-";//线程名前缀private final static String THREAD_POOL_NAME = "SIMPLE-POOL";//线程组的名称private final static ThreadGroup THREAD_GROUP = new ThreadGroup(THREAD_POOL_NAME);//线程组private final static List<WorkerTask> WORKER_TASKS = new ArrayList<>();// 线程容器// 任务队列容器,也可以用Queue<Runnable> 遵循 FIFO 规则private final static LinkedList<Runnable> TASK_QUEUE = new LinkedList<>();// 拒绝策略private final static DiscardPolicy DEFAULT_DISCARD_POLICY = () -> {throw new DiscardException("[拒绝执行] - [任务队列溢出...]");};private int minSize;//最小线程private int maxSize;//最大线程private int activeSize;//活跃线程SimpleThreadPoolExecutor() {this(DEFAULT_MIN_THREAD_SIZE, DEFAULT_ACTIVE_THREAD_SIZE, DEFAULT_MAX_THREAD_SIZE, DEFAULT_WORKER_QUEUE_SIZE, DEFAULT_DISCARD_POLICY);}SimpleThreadPoolExecutor(int minSize, int activeSize, int maxSize, int queueSize, DiscardPolicy discardPolicy){this.minSize = minSize;this.activeSize = activeSize;this.maxSize = maxSize;this.queueSize = queueSize;this.discardPolicy = discardPolicy;initPool();}void submit(Runnable runnable) {if (destroy) {throw new IllegalStateException("线程池已销毁...");}synchronized (TASK_QUEUE) {if (TASK_QUEUE.size() > queueSize) {//如果当前任务队超出队列限制,后续任务拒绝执行discardPolicy.discard();}// 1.将任务添加到队列TASK_QUEUE.addLast(runnable);// 2.唤醒等待的线程去执行任务TASK_QUEUE.notifyAll();}}void shutdown() throws InterruptedException {int activeCount = THREAD_GROUP.activeCount();while (!TASK_QUEUE.isEmpty() && activeCount > 0) {// 如果还有任务,那就休息一会Thread.sleep(100);}int intVal = WORKER_TASKS.size();//如果线程池中没有线程,那就不用关了while (intVal > 0) {for (WorkerTask task : WORKER_TASKS) {//当任务队列为空的时候,线程状态才会为 BLOCKED ,所以可以打断掉,相反等任务执行完在关闭if (task.taskState == TaskState.BLOCKED) {task.close();intVal--;} else {Thread.sleep(50);}}}this.destroy = true;//资源回收TASK_QUEUE.clear();WORKER_TASKS.clear();this.interrupt();System.out.println("线程关闭");}private void createWorkerTask() {WorkerTask task = new WorkerTask();//刚创建出来的线程应该是未使用的task.taskState = TaskState.FREE;WORKER_TASKS.add(task);task.start();}/*** 初始化操作*/private void initPool() {for (int i = 0; i < this.minSize; i++) {this.createWorkerTask();}this.threadPoolSize = minSize;this.start();//自己启动自己}@Overridepublic void run() {while (!destroy) {try {Thread.sleep(5_000L);if (TASK_QUEUE.size() > activeSize && threadPoolSize < activeSize) { // 第一次扩容到 activeSize 大小for (int i = threadPoolSize; i < activeSize; i++) {createWorkerTask();}this.threadPoolSize = activeSize;System.out.println("[初次扩充] - [" + toString() + "]");} else if (TASK_QUEUE.size() > maxSize && threadPoolSize < maxSize) {// 第二次扩容到最大线程System.out.println();for (int i = threadPoolSize; i < maxSize; i++) {createWorkerTask();}this.threadPoolSize = maxSize;System.out.println("[再次扩充] - [" + toString() + "]");} else {//防止线程在submit的时候,其他线程获取到锁干坏事synchronized (WORKER_TASKS) {int releaseSize = threadPoolSize - activeSize;Iterator<WorkerTask> iterator = WORKER_TASKS.iterator();// List不允许在for中删除集合元素,所以这里需要使用迭代器while (iterator.hasNext()) {if (releaseSize <= 0) {break;}WorkerTask task = iterator.next();//不能回收正在运行的线程,只回收空闲线程if (task.taskState == TaskState.FREE) {task.close();iterator.remove();releaseSize--;}}System.out.println("[资源回收] - [" + toString() + "]");}threadPoolSize = activeSize;}} catch (InterruptedException e) {System.out.println("资源释放");}}}@Overridepublic String toString() {return "SimpleThreadPoolExecutor{" +"threadPoolSize=" + threadPoolSize +", taskQueueSize=" + TASK_QUEUE.size() +", minSize=" + minSize +", maxSize=" + maxSize +", activeSize=" + activeSize +'}';}
}

测试一把

创建一个测试类

public class SimpleExecutorTest {public static void main(String[] args) throws InterruptedException {SimpleThreadPoolExecutor executor = new SimpleThreadPoolExecutor();IntStream.range(0, 30).forEach(i ->executor.submit(() -> {System.out.printf("[线程] - [%s] 开始工作...\n", Thread.currentThread().getName());try {Thread.sleep(2_000L);} catch (InterruptedException e) {e.printStackTrace();}System.out.printf("[线程] - [%s] 工作完毕...\n", Thread.currentThread().getName());}));//executor.shutdown();如果放开注释即会执行完所有任务关闭线程池}
}

日志分析: 从日志中可以看到,初始化的时候是2个线程在工作,执行速度较为缓慢,当经过第一次扩容后,会观察到线程池里线程个数增加了,执行任务的速度就越来越快了,本文一共扩容了2次,第一次是扩容到activeSize的大小,第二次是扩容到maxSize,在执行任务的过程中,当线程数过多的时候就会触发回收机制...

[线程] - [MY-THREAD-NAME-1] 开始工作...
[线程] - [MY-THREAD-NAME-2] 开始工作...
[线程] - [MY-THREAD-NAME-1] 工作完毕...
[线程] - [MY-THREAD-NAME-1] 开始工作...
[线程] - [MY-THREAD-NAME-2] 工作完毕...
[线程] - [MY-THREAD-NAME-2] 开始工作...
[线程] - [MY-THREAD-NAME-1] 工作完毕...
[线程] - [MY-THREAD-NAME-1] 开始工作...
[线程] - [MY-THREAD-NAME-2] 工作完毕...
[线程] - [MY-THREAD-NAME-2] 开始工作...
[初次扩充] - [SimpleThreadPoolExecutor{threadPoolSize=5, taskQueueSize=44, minSize=2, maxSize=10, activeSize=5}]
[线程] - [MY-THREAD-NAME-3] 开始工作...
...
[线程] - [MY-THREAD-NAME-6] 开始工作...
[线程] - [MY-THREAD-NAME-7] 开始工作...
[再次扩充] - [SimpleThreadPoolExecutor{threadPoolSize=10, taskQueueSize=30, minSize=2, maxSize=10, activeSize=5}]
[线程] - [MY-THREAD-NAME-10] 开始工作...
...
[线程] - [MY-THREAD-NAME-5] 开始工作...
[资源回收] - [SimpleThreadPoolExecutor{threadPoolSize=10, taskQueueSize=4, minSize=2, maxSize=10, activeSize=5}]
[线程] - [MY-THREAD-NAME-4] 工作完毕...
...
[线程] - [MY-THREAD-NAME-7] 工作完毕...
[资源回收] - [SimpleThreadPoolExecutor{threadPoolSize=5, taskQueueSize=0, minSize=2, maxSize=10, activeSize=5}]
[资源回收] - [SimpleThreadPoolExecutor{threadPoolSize=5, taskQueueSize=0, minSize=2, maxSize=10, activeSize=5}]

总结

通过本文,大致可以了解线程池的工作原理和实现方式,学习的过程中,就是要知其然知其所以然。这样才能更好地驾驭它,更好地去理解和使用,也能更好地帮助我们触类旁通,后面的文章中会详细介绍java.util.concurrent中的线程池

- 说点什么

全文代码:https://gitee.com/battcn/battcn-concurent/tree/master/Chapter1-1/battcn-thread/src/main/java/com/battcn/chapter12

  • 个人QQ:1837307557
  • battcn开源群(适合新手):391619659

微信公众号:battcn(欢迎调戏)

一起学并发编程 - 简易线程池实现相关推荐

  1. 【Java 并发编程】线程池机制 ( ThreadPoolExecutor 线程池构造参数分析 | 核心线程数 | 最大线程数 | 非核心线程存活时间 | 任务阻塞队列 )

    文章目录 前言 一.ThreadPoolExecutor 构造参数 二.newCachedThreadPool 参数分析 三.newFixedThreadPool 参数分析 四.newSingleTh ...

  2. 【Java 并发编程】线程池机制 ( 线程池示例 | newCachedThreadPool | newFixedThreadPool | newSingleThreadExecutor )

    文章目录 前言 一.线程池示例 二.newCachedThreadPool 线程池示例 三.newFixedThreadPool 线程池示例 三.newSingleThreadExecutor 线程池 ...

  3. 《转载》Python并发编程之线程池/进程池--concurrent.futures模块

    本文转载自 Python并发编程之线程池/进程池--concurrent.futures模块 一.关于concurrent.futures模块 Python标准库为我们提供了threading和mul ...

  4. (转)Java并发编程:线程池的使用

    背景:线程池在面试时候经常遇到,反复出现的问题就是理解不深入,不能做到游刃有余.所以这篇博客是要深入总结线程池的使用. ThreadPoolExecutor的继承关系 线程池的原理 1.线程池状态(4 ...

  5. [转]Java并发编程:线程池的使用

    Java并发编程:线程池的使用 在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了, ...

  6. Java并发编程一线程池简介

    推荐:Java并发编程汇总 Java并发编程一线程池简介 为什么我们需要使用线程池? 我们知道线程是一种比较昂贵的资源,我们通过程序每创建一个线程去执行,其实操作系统都会对应地创建一个线程去执行我们的 ...

  7. Java并发编程一线程池的五种状态

    推荐:Java并发编程汇总 Java并发编程一线程池的五种状态 原文地址 Java多线程线程池(4)–线程池的五种状态 正文 线程池的5种状态:Running.ShutDown.Stop.Tidyin ...

  8. Java并发编程:线程池的使用

    在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统 ...

  9. Java并发编程之线程池及示例

    1.Executor 线程池顶级接口.定义方法,void execute(Runnable).方法是用于处理任务的一个服务方法.调用者提供Runnable 接口的实现,线程池通过线程执行这个 Runn ...

最新文章

  1. 统一沟通-技巧-10-Lync-公网证书-Go Daddy
  2. Java与C#事件处理详细对比
  3. C语言程序设计0004,C语言程序设计0004.doc
  4. URAL - 1099 Work Scheduling(一般图最大匹配-带花树模板)
  5. 安卓代码迁移:ActionBarActivity: cannot be resolved to a type
  6. (转) java 复制文件,不使用输出流复制,高效率,文件通道的方式复制文件
  7. c语言指针地址交换程序,C语言-基础教程-指针的地址分配
  8. 51nod1255【贪心-栈的应用】
  9. Java知多少(64)线程死锁
  10. matlab 中文注释乱码问题解决
  11. 关于IE6 双倍间距的真正原因
  12. 疯狂的程序员 兼职(下)
  13. 发现一个好用的MySQL数据库管理工具
  14. 【Vue学习总结】22.使用Mint UI的infinite-scroll实现上拉分页加载
  15. 对区块链技术的一些新思考
  16. 与 BGI 绘图库的兼容情况
  17. 性能优化-Tomcat调优
  18. 关于tp-link wr740 v4的刷机救砖的办法(非线刷解决)恢复原版的
  19. 【DNA计算】DNA编码----笔记1
  20. 仿制金山毒霸专杀工具界面实现源码

热门文章

  1. bs4爬取的时候有两个标签相同_python爬虫初体验,爬取中国最好大学网大学名次...
  2. sybase数据库导出mysql_sybase导出数据库的表结构命令
  3. w7计算机无法管理员权限设置,操作权限不够?教你开启Win7管理员帐户
  4. Java实用教程 课堂测试(2021C++补考题目)
  5. Window10设置护眼色
  6. 【 MATLAB 】向量化编程实践(一)
  7. 再谈符号间干扰(一)
  8. BZOJ1011 莫比乌斯反演(基础题
  9. MongoDB sharding迁移那些事(一)
  10. json 是个什么东西?