线程池的使用(线程池重点解析)
我们有两种常见的创建线程的方法,一种是继承Thread类,一种是实现Runnable的接口,Thread类其实也是实现了Runnable接口。但是我们创建这两种线程在运行结束后都会被虚拟机销毁,如果线程数量多的话,频繁的创建和销毁线程会大大浪费时间和效率,更重要的是浪费内存,因为正常来说线程执行完毕后死亡,线程对象变成垃圾!那么有没有一种方法能让线程运行完后不立即销毁,而是让线程重复使用,继续执行其他的任务哪?我们使用线程池就能很好地解决这个问题。
我们接下来重点说明线程池类家族有哪些,线程池创建线程完成任务的实现原理是什么以及线程池的一些特性来进行分析。此文参考了https://www.cnblogs.com/dolphin0520/p/3932921.html
一.线程池家族
线程池的最上层接口是Executor,这个接口定义了一个核心方法execute(Runnabel command),这个方法最后被ThreadPoolExecutor类实现,这个方法是用来传入任务的。而且ThreadPoolExecutor是线程池的核心类,此类的构造方法如下:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue);public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
构造方法的参数及意义:
corePoolSize:核心线程池的大小,如果核心线程池有空闲位置,这是新的任务就会被核心线程池新建一个线程执行,执行完毕后不会销毁线程,线程会进入缓存队列等待再次被运行。
maximunPoolSize:线程池能创建最大的线程数量。如果核心线程池和缓存队列都已经满了,新的任务进来就会创建新的线程来执行。但是数量不能超过maximunPoolSize,否侧会采取拒绝接受任务策略,我们下面会具体分析。
keepAliveTime:非核心线程能够空闲的最长时间,超过时间,线程终止。这个参数默认只有在线程数量超过核心线程池大小时才会起作用。只要线程数量不超过核心线程大小,就不会起作用。
unit:时间单位,和keepAliveTime配合使用。
workQueue:缓存队列,用来存放等待被执行的任务。
threadFactory:线程工厂,用来创建线程,一般有三种选择策略。
ArrayBlockingQueue; LinkedBlockingQueue; SynchronousQueue;
handler:拒绝处理策略,线程数量大于最大线程数就会采用拒绝处理策略,四种策略为
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程) ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
Executor接口有一个子接口ExecutorService,ExecutorService的实现类为AbstracExecutorService,而ThreadPoolExcutor正是AbstrcExecutorService的子类。
ThreadPoolExecutor还有两个常用的方法shutdown和submit,两者都用来关闭线程池,但是后者有一个结果返回。
二.线程池实现原理
线程池图:
1.线程池状态
线程池和线程一样拥有自己的状态,在ThreadPoolExecutor类中定义了一个volatile变量runState来表示线程池的状态,线程池有四种状态,分别为RUNNING、SHURDOWN、STOP、TERMINATED。
线程池创建后处于RUNNING状态。
调用shutdown后处于SHUTDOWN状态,线程池不能接受新的任务,会等待缓冲队列的任务完成。
调用shutdownNow后处于STOP状态,线程池不能接受新的任务,并尝试终止正在执行的任务。
当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态。
2.线程池任务的执行
当执行execute(Runnable command)方法后,传入了一个任务,我们看一下execute方法的实现原理。
public void execute(Runnable command) {if (command == null)throw new NullPointerException();if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {if (runState == RUNNING && workQueue.offer(command)) {if (runState != RUNNING || poolSize == 0)ensureQueuedTaskHandled(command);}else if (!addIfUnderMaximumPoolSize(command))reject(command); } }
整个方法的执行过程是这样的,首先判断任务是否为空,空抛空指针异常,否则执行下一个判断,如果目前线程的数量小于核心线程池大小,就执行addIfUnderCorePollSize(command)方法,在核心线程池创建新的线程,并且执行这个任务。
我们看这个方法的具体实现:
private boolean addIfUnderCorePoolSize(Runnable firstTask) {Thread t = null;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (poolSize < corePoolSize && runState == RUNNING)t = addThread(firstTask); //创建线程去执行firstTask任务 } finally {mainLock.unlock();}if (t == null)return false;t.start();return true; }
这个方法首先是获取线程池的锁,参考别人的博客,说是和线程池状态有关,没有搞懂......,后面又进行一次判断,判断线程池线程数量和核心线程池的比较,前面execute()已经判断过,这里为什么还要进行判断哪?因为我们执行完Execute()中的判断后,可能有新的任务进来了,并且为这个任务在核心线程池创建了新的线程去执行,如果刚好这是核心线程池满了,那么就不能再加入新的县城到核心线程池了。这种可能性是存在的,因为你不知道cpu什么时间分配给谁,所以我们要加一个判断,至于线程池状态为什么也要判断,也是因为可能有其他线程执行了shutdown或者shutdownNow方法,导致线程池状态不是RUNNING,那么线程池就停止接收新的任务,也就不会创建新的线程去执行这个任务了。
t=addThread(firstTask);这句代码至关重要,我们看方法的实现代码:
private Thread addThread(Runnable firstTask) {Worker w = new Worker(firstTask);Thread t = threadFactory.newThread(w); //创建一个线程,执行任务 if (t != null) {w.thread = t; //将创建的线程的引用赋值为w的成员变量 workers.add(w); //将当前任务添加到任务集int nt = ++poolSize; //当前线程数加1 if (nt > largestPoolSize)largestPoolSize = nt;}return t; }
这个方法返回类型是Thread,所以我们可以新建一个线程并执行任务,之后将线程对象返回给外面的线程对象,然后执行t.start(),我们看到有一个Worker对象接收了任务,我们看Worker类的实现:
private final class Worker implements Runnable {private final ReentrantLock runLock = new ReentrantLock();private Runnable firstTask;volatile long completedTasks;Thread thread;Worker(Runnable firstTask) {this.firstTask = firstTask;}boolean isActive() {return runLock.isLocked();}void interruptIfIdle() {final ReentrantLock runLock = this.runLock;if (runLock.tryLock()) {try {if (thread != Thread.currentThread())thread.interrupt();} finally {runLock.unlock();}}}void interruptNow() {thread.interrupt();}private void runTask(Runnable task) {final ReentrantLock runLock = this.runLock;runLock.lock();try {if (runState < STOP &&Thread.interrupted() &&runState >= STOP)boolean ran = false;beforeExecute(thread, task); //beforeExecute方法是ThreadPoolExecutor类的一个方法,没有具体实现,用户可以根据//自己需要重载这个方法和后面的afterExecute方法来进行一些统计信息,比如某个任务的执行时间等 try {task.run();ran = true;afterExecute(task, null);++completedTasks;} catch (RuntimeException ex) {if (!ran)afterExecute(task, ex);throw ex;}} finally {runLock.unlock();}}public void run() {try {Runnable task = firstTask;firstTask = null;while (task != null || (task = getTask()) != null) {runTask(task);task = null;}} finally {workerDone(this); //当任务队列中没有任务时,进行清理工作 }} }
这个类实现了Runnable接口,所以会有run()方法,我们看到run中执行的还是传入的任务,其实相当于调用传入任务对象的run方法,我们之所以费力气将任务对象加到Worker类中去执行,是因为这个线程执行之后会进入阻塞队列等待被执行,这个线程的生命并没有结束,这也正是我们使用线程池的最大原因。我们用一个Set集合存储Worker,这样不会有重复的任务被存储,firstTask被执行完后进入缓存队列,而这个新创建的线程就一直从缓存队列中拿到任务去执行。这个方法为getTask(),所以我们来看看线程如何从缓存队列拿到任务。
Runnable getTask() {for (;;) {try {int state = runState;if (state > SHUTDOWN)return null;Runnable r;if (state == SHUTDOWN) // Help drain queuer = workQueue.poll();else if (poolSize > corePoolSize || allowCoreThreadTimeOut) //如果线程数大于核心池大小或者允许为核心池线程设置空闲时间,//则通过poll取任务,若等待一定的时间取不到任务,则返回nullr = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);elser = workQueue.take();if (r != null)return r;if (workerCanExit()) { //如果没取到任务,即r为null,则判断当前的worker是否可以退出if (runState >= SHUTDOWN) // Wake up othersinterruptIdleWorkers(); //中断处于空闲状态的workerreturn null;}// Else retry} catch (InterruptedException ie) {// On interruption, re-check runState}} }
我们看到如果核心线程池中创建的线程想要拿到缓存队列中的任务,先要判断线程池的状态,如果STOP或者TERMINATED,返回NULL,如果是RUNNING或者SHUTDOWN,则从缓存队列中拿到任务去执行。
这就是核心线程池执行任务的原理。
那么如果线程数量超过核心线程池大小哪?我们回到executor()方法,如果发生这种情况,处理方式是
if (runState == RUNNING && workQueue.offer(command))
这段代码意思是,如果线程数量超过核心线程池大小,先进行线程池状态的判断,如果是RUNNING,则将新进来的线程加入缓存队列。如果失败,往往是因为缓存队列满了或者线程池状态不是RUNNING,就直接创建新的线程去执行任务,调用addIfUnderMaximumPoolSize(command),就会新创建线程,但是这个县城不是核心线程池中的,是临时扩展的,要保证线程数最大不超过线程池大小 maximumPoolSize,如果超过执行
reject(command);操作,拒绝接受新的任务。
还有如果任务已经加入缓存队列成功还要继续进行判断
if (runState != RUNNING || poolSize == 0)
这是为了防止在将任务加入缓存队列的同时其他线程调用shutdown或者shutdownNow方法,所以采取了保护措施
ensureQueuedTaskHandled(command)
我们看addIfUnderMaximumPoolSize的实现方法:
private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {Thread t = null;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (poolSize < maximumPoolSize && runState == RUNNING)t = addThread(firstTask);} finally {mainLock.unlock();}if (t == null)return false;t.start();return true; }
这个方法和addIfUnderCorePoolSize基本一样,只是方法中判断条件改变了,这个方法是在缓冲队列满了并且线程池状态是在RUNNING状态下才会执行,里面的判断条件是线程池数量小于线程池最大容量,并且线程池状态是RUNNING。
我们进行总结:
- 如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务;
- 如果当前线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务;
- 如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理;
- 如果线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。
三.线程池使用示例
package cn.yqg.java;public class Task implements Runnable{private int num;public Task(int num) {this.num=num;}@Overridepublic void run() {System.out.println("正在执行任务 "+num);try {Thread.currentThread().sleep(4000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("线程"+num+"执行完毕");} }
package cn.yqg.java;import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit;public class Test4 {public static void main(String[] args) {ThreadPoolExecutor pool=new ThreadPoolExecutor(5,10,200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5));for(int i=0;i<15;i++) {Task task=new Task(i);pool.execute(task);System.out.println("线程池中线程数目:"+pool.getPoolSize()+",队列中等待执行的任务数目:"+pool.getQueue().size()+",已执行玩别的任务数目:"+pool.getCompletedTaskCount());}pool.shutdown();} }
一种可能情况:
正在执行任务 0 线程池中线程数目:1,队列中等待执行的任务数目:0,已执行玩别的任务数目:0 线程池中线程数目:2,队列中等待执行的任务数目:0,已执行玩别的任务数目:0 线程池中线程数目:3,队列中等待执行的任务数目:0,已执行玩别的任务数目:0 正在执行任务 1 正在执行任务 2 线程池中线程数目:4,队列中等待执行的任务数目:0,已执行玩别的任务数目:0 正在执行任务 3 线程池中线程数目:5,队列中等待执行的任务数目:0,已执行玩别的任务数目:0 线程池中线程数目:5,队列中等待执行的任务数目:1,已执行玩别的任务数目:0 正在执行任务 4 线程池中线程数目:5,队列中等待执行的任务数目:2,已执行玩别的任务数目:0 线程池中线程数目:5,队列中等待执行的任务数目:3,已执行玩别的任务数目:0 线程池中线程数目:5,队列中等待执行的任务数目:4,已执行玩别的任务数目:0 线程池中线程数目:5,队列中等待执行的任务数目:5,已执行玩别的任务数目:0 线程池中线程数目:6,队列中等待执行的任务数目:5,已执行玩别的任务数目:0 正在执行任务 10 线程池中线程数目:7,队列中等待执行的任务数目:5,已执行玩别的任务数目:0 正在执行任务 11 线程池中线程数目:8,队列中等待执行的任务数目:5,已执行玩别的任务数目:0 正在执行任务 12 线程池中线程数目:9,队列中等待执行的任务数目:5,已执行玩别的任务数目:0 正在执行任务 13 线程池中线程数目:10,队列中等待执行的任务数目:5,已执行玩别的任务数目:0 正在执行任务 14 线程2执行完毕 线程1执行完毕 线程4执行完毕 线程3执行完毕 正在执行任务 8 线程0执行完毕 正在执行任务 9 正在执行任务 7 正在执行任务 6 正在执行任务 5 线程12执行完毕 线程13执行完毕 线程11执行完毕 线程10执行完毕 线程14执行完毕 线程7执行完毕 线程9执行完毕 线程8执行完毕 线程5执行完毕 线程6执行完毕
从执行结果可以看出,当线程池中线程的数目大于5时,便将任务放入任务缓存队列里面,当任务缓存队列满了之后,便创建新的线程。如果上面程序中,将for循环中改成执行超过15个任务,就会抛出任务拒绝异常了。
不过并不提倡我们直接使用ThreadPoolExecutor,而是使用Executors类中提供的几个静态方法来创建线程池:
Executors.newCachedThreadPool(); //创建一个缓冲池,缓冲池容量大小为Integer.MAX_VALUE Executors.newSingleThreadExecutor(); //创建容量为1的缓冲池 Executors.newFixedThreadPool(int); //创建固定容量大小的缓冲池
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>()); }
从它们的具体实现来看,它们实际上也是调用了ThreadPoolExecutor,只不过参数都已配置好了。
newFixedThreadPool创建的线程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;
newSingleThreadExecutor将corePoolSize和maximumPoolSize都设置为1,也使用的LinkedBlockingQueue;
newCachedThreadPool将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,使用的SynchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程。
实际中,如果Executors提供的三个静态方法能满足要求,就尽量使用它提供的三个方法,因为自己去手动配置ThreadPoolExecutor的参数有点麻烦,要根据实际任务的类型和数量来进行配置。
另外,如果ThreadPoolExecutor达不到要求,可以自己继承ThreadPoolExecutor类进行重写。
from: https://www.cnblogs.com/zzuli/p/9386463.html
线程池的使用(线程池重点解析)相关推荐
- Java线程池架构(一)原理和源码解析
在前面介绍JUC的文章中,提到了关于线程池Execotors的创建介绍,在文章:<java之JUC系列-外部Tools>中第一部分有详细的说明,请参阅: 文章中其实说明了外部的使用方式,但 ...
- 【Android 异步操作】线程池 ( 线程池作用 | 线程池种类 | 线程池工作机制 | 线程池任务调度源码解析 )
文章目录 一.线程池作用 二.线程池种类 三.线程池工作机制 四.线程池任务调度源码解析 一.线程池作用 线程池作用 : ① 避免创建线程 : 避免每次使用线程时 , 都需要 创建线程对象 ; ② 统 ...
- c++ socket线程池原理_ThreadPoolExecutor线程池实现原理+源码解析
推荐学习 被微服务轰炸?莫怕!耗时35天整出的「微服务学习教程」送你 死磕「并发编程」100天,全靠阿里大牛的这份最全「高并发套餐」 闭关28天,奉上[Java一线大厂高岗面试题解析合集],备战金九银 ...
- java多线程——Executors线程池的四种用法简单解析
1.Executors.newFixedThreadPool(5) 是创建一个线程池,池子里面有5个线程,任务数多余5个时,超出的任务队列中排队等候执行 2.Executors.newCache ...
- Java线程池的核心线程数和最大线程数
Java的线程池就像是一个花瓶容器. 而把任务提交给线程池就像是把小球塞进花瓶. 整个过程就像下面这个有趣的动画: 下面我们先来了解一下Java线程池的参数. 希望看完这篇文章后, 再提起线程池的时候 ...
- mongodb线程池_常用高并发网络线程模型设计及MongoDB线程模型优化实践
服务端通常需要支持高并发业务访问,如何设计优秀的服务端网络IO工作线程/进程模型对业务的高并发访问需求起着至关重要的核心作用. 本文总结了了不同场景下的多种网络IO线程/进程模型,并给出了各种模型的优 ...
- 【Android 异步操作】线程池 ( Worker 简介 | 线程池中的工作流程 runWorker | 从线程池任务队列中获取任务 getTask )
文章目录 一.线程池中的 Worker ( 工作者 ) 二.线程池中的工作流程 runWorker 三.线程池任务队列中获取任务 getTask 在博客 [Android 异步操作]线程池 ( 线程池 ...
- 【Android 异步操作】线程池 ( 线程池简介 | 线程池初始化方法 | 线程池种类 | AsyncTask 使用线程池示例 )
文章目录 一.线程池简介 二.线程池初始化方法简介 三.线程池使用示例 一.线程池简介 线程池一般是实现了 ExecutorService 接口的类 , 一般使用 ThreadPoolExecutor ...
- Java 线程池中的线程复用是如何实现的?
前几天,技术群里有个群友问了一个关于线程池的问题,内容如图所示: 关于线程池相关知识可以先看下这篇:为什么阿里巴巴Java开发手册中强制要求线程池不允许使用Executors创建? 那么就来和大家探讨 ...
最新文章
- 专转本计算机组成原理,江苏专转本 计算机 第二章计算机组成原理2012冲刺班讲稿...
- python在实际中的作用_Python面向对象中__init__的实际作用是什么?
- 将可执行程序的内存空间扩展到3GB(windows)
- Qt:opencv编译
- python后端开发工程师面试题
- Leecode01. 两数之和——Leecode大厂热题100道系列
- 学python多大年龄可以学车_多大年龄可以学驾照?
- vscode 导入python库_vscode 如何导入python库
- 解决linux下QtCreator无法输入中文的情况
- mdb 查询过于复杂_【律联云知产课堂】南京商标查询主要从哪些方面判断一个商标是否适合注册?...
- 如何获取不重复的随机数
- 经典网页设计:20个与众不同的国外 HTML5 网站
- 【原理】 进程调度算法
- 测试基础系列之测试方法 第5讲
- NOIP2018 复赛提高组一等奖获奖名单
- 用c语言编写的源文件 若没有产生编译错误,MSE C语言第一章习题
- FlashPaper组件——api
- Android 面试中高级上
- java安装 2203_高手分析win7系统无法安装java程序提示“内部错误2203”的处理
- 软件工程(1) CSDN花神生涯
热门文章
- FreeMarker template error: The following has evaluated to null or missing
- 金融风控实战——模型融合
- 把自己当成打工的,一辈子都是打工的!:周鸿祎
- 任正非:进军高端市场的同时,华为要防范未来竞争者从低端崛起
- 房地产还有最后十年机会 抓紧时间转型
- Docker Review - Docker 部署 Spring Boot 项目
- 小工匠聊架构-Redis 缓存一致性设计
- 小工匠聊架构 - 缓存与数据库【双写不一致】【读写并发不一致】解决方案一览
- JVM-07垃圾收集Garbage Collection【GC日志分析】
- Oracle-锁解读