线程池原理(ThreadPoolExecutor)
一、创建一个线程池对象
使用Executors创建一个线程池常用的方法如下:
1、newFixedThreadPool()
说明:初始化一个指定线程数的线程池,其中 corePoolSize == maxiPoolSize,使用 LinkedBlockingQuene 作为阻塞队列
特点:即使当线程池没有可执行任务时,也不会释放线程。
2、newCachedThreadPool()
说明:初始化一个可以缓存线程的线程池,默认缓存60s,线程池的线程数可达到 Integer.MAX_VALUE,即 2147483647,内部使用 SynchronousQueue 作为阻塞队列;
特点:在没有任务执行时,当线程的空闲时间超过 keepAliveTime,会自动释放线程资源;当提交新任务时,如果没有空闲线程,则创建新线程执行任务,会导致一定的系统开销; 因此,使用时要注意控制并发的任务数,防止因创建大量的线程导致而降低性能。
3、newSingleThreadExecutor()
说明:初始化只有一个线程的线程池,内部使用 LinkedBlockingQueue 作为阻塞队列。
特点:如果该线程异常结束,会重新创建一个新的线程继续执行任务,唯一的线程可以保证所提交任务的顺序执行
4、newScheduledThreadPool()
特点:初始化的线程池可以在指定的时间内周期性的执行所提交的任务,在实际的业务场景中可以使用该线程池定期的同步数据。
线程池的构造方法如下:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();// 核心池大小this.corePoolSize = corePoolSize;// 最大池大小this.maximumPoolSize = maximumPoolSize;// 线程池的等待队列this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);// 线程工厂对象this.threadFactory = threadFactory;// 拒绝策略的句柄this.handler = handler;}
默认的线程工厂,Executors类的DefaultThreadFactory方法:
/*** Thread factory capturing access control context and class loader*/static class PrivilegedThreadFactory extends DefaultThreadFactory {private final AccessControlContext acc;private final ClassLoader ccl;PrivilegedThreadFactory() {super();SecurityManager sm = System.getSecurityManager();if (sm != null) {// Calls to getContextClassLoader from this class// never trigger a security check, but we check// whether our callers have this permission anyways.sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION);// Fail fastsm.checkPermission(new RuntimePermission("setContextClassLoader"));}this.acc = AccessController.getContext();this.ccl = Thread.currentThread().getContextClassLoader();}public Thread newThread(final Runnable r) {return super.newThread(new Runnable() {public void run() {AccessController.doPrivileged(new PrivilegedAction<Void>() {public Void run() {Thread.currentThread().setContextClassLoader(ccl);r.run();return null;}}, acc);}});}}
默认的拒绝策略
/*** The default rejected execution handler*/private static final RejectedExecutionHandler defaultHandler =new AbortPolicy();/*** A handler for rejected tasks that throws a* {@code RejectedExecutionException}.*/public static class AbortPolicy implements RejectedExecutionHandler {/*** Creates an {@code AbortPolicy}.*/public AbortPolicy() { }/*** Always throws RejectedExecutionException.** @param r the runnable task requested to be executed* @param e the executor attempting to execute this task* @throws RejectedExecutionException always.*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException("Task " + r.toString() +" rejected from " +e.toString());}}
二、添加任务到线程池execute()
/*** Executes the given task sometime in the future. The task* may execute in a new thread or in an existing pooled thread.** If the task cannot be submitted for execution, either because this* executor has been shutdown or because its capacity has been reached,* the task is handled by the current {@code RejectedExecutionHandler}.** @param command the task to execute* @throws RejectedExecutionException at discretion of* {@code RejectedExecutionHandler}, if the task* cannot be accepted for execution* @throws NullPointerException if {@code command} is null*/public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task. The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread. If it fails, we know we are shut down or saturated* and so reject the task.*/// 获取ctl对应的int值。该int值保存了"线程池中任务的数量"和"线程池状态"信息int c = ctl.get();// 当线程池中的任务数量 < "核心池大小"时,即线程池中少于corePoolSize个任务。// 则通过addWorker(command,true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}// 当线程池中的任务数量 >= "核心池大小"时,// 而且,"线程池处于允许状态"时,则尝试将任务添加到阻塞队列中。if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();// 再次确认“线程池状态”,若线程池异常终止了,则删除任务;然后通过reject()执行相应的拒绝策略的内容。if (! isRunning(recheck) && remove(command))reject(command);// 否则,如果"线程池中任务数量"为0,则通过addWorker(null,false)尝试新建一个线程,新建线程对应的任务为null。else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 通过addWorker(command,false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。// 如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内else if (!addWorker(command, false))reject(command);}
可以看到execute方法流程如下:(可参照英文注释)
- 线程池中的任务数量小于核心池大小时,新建线程;
- 线程池中的任务数量大于等于核心池大小时,添加到任务队列中;
- 若不能将任务添加到队列中,则尝试新建一个线程,失败的话执行拒绝策略
下面来看addWorker方法:
private boolean addWorker(Runnable firstTask, boolean core) {retry:// 这部分是对状态进行检查,更新ctl的值for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}// 添加任务到线程池,并启动任务所在的线程boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {final ReentrantLock mainLock = this.mainLock;// 新建Worker,并且指定firstTask为Worker的第一个任务w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int c = ctl.get();int rs = runStateOf(c);if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();// 添加到线程池集合中workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {// 启动线程t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}
三、关闭线程池
调用线程池的shutdown方法可以关闭线程池:
/*** Initiates an orderly shutdown in which previously submitted* tasks are executed, but no new tasks will be accepted.* Invocation has no additional effect if already shut down.** <p>This method does not wait for previously submitted tasks to* complete execution. Use {@link #awaitTermination awaitTermination}* to do that.** @throws SecurityException {@inheritDoc}*/public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 检查终止线程池的“线程”是否有权限checkShutdownAccess();// 设置线程池的状态为关闭状态advanceRunState(SHUTDOWN);// 中断线程池中空闲的线程interruptIdleWorkers();onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();}tryTerminate();}
四、线程池状态
线程池使用了原子类的Integer的对象来标识线程池的状态:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static final int COUNT_BITS = Integer.SIZE - 3;private static final int CAPACITY = (1 << COUNT_BITS) - 1;// runState is stored in the high-order bitsprivate static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;
可以看到线程池公有以下五种状态:
RUNNING:线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。线程池的初始化状态是RUNNING。
SHUTDOWN:调用线程池的shutdown()接口时,线程池由RUNNING转到SHUTDOWN。线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。
STOP:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP。线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。
TIDYING:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING。当所有的任务已终止,ctl记录的"任务数量"为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。:
TERMINATED:线程池处在TIDYING状态时,执行完terminated()之后,就会由TIDYING->TERMINATED。线程池彻底终止,就变成TERMINATED状态。
五、线程池拒绝策略
线程池共包括4种拒绝策略:
- AbortPolicy:当任务添加到线程池中被拒绝时,它将抛出 RejectedExecutionException 异常。
- CallerRunsPolicy:当任务添加到线程池中被拒绝时,会在线程池当前正在运行的Thread线程池中处理被拒绝的任务。
- DiscardOldestPolicy:当任务添加到线程池中被拒绝时,线程池会放弃等待队列中最旧的未处理任务,然后将被拒绝的任务添加到等待队列中。
- DiscardPolicy:当任务添加到线程池中被拒绝时,线程池将丢弃被拒绝的任务。
线程池原理(ThreadPoolExecutor)相关推荐
- Java线程池ThreadPoolExecutor使用和分析(三) - 终止线程池原理
相关文章目录: Java线程池ThreadPoolExecutor使用和分析(一) Java线程池ThreadPoolExecutor使用和分析(二) - execute()原理 Java线程池Thr ...
- ThreadPoolExecutor线程池原理
ThreadPoolExecutor线程池原理 线程池原理 1. 线程池的简单介绍 1.1 线程池是什么 1.2 线程池解决的核心问题是什么 2. 线程池的实现原理 2.1 线程池的执行流程 2.2 ...
- Java 并发编程——Executor框架和线程池原理
Java 并发编程系列文章 Java 并发基础--线程安全性 Java 并发编程--Callable+Future+FutureTask java 并发编程--Thread 源码重新学习 java并发 ...
- JAVA线程池原理以及几种线程池类型介绍
在什么情况下使用线程池? 1.单个任务处理的时间比较短 2.将需处理的任务的数量大 使用线程池的好处: 1.减少在创建和销毁线程上所花的时间以及系统资源的开销 2.如不使用线程池, ...
- java线程池_Java多线程并发:线程基本方法+线程池原理+阻塞队列原理技术分享...
线程基本方法有哪些? 线程相关的基本方法有 wait,notify,notifyAll,sleep,join,yield 等. 线程等待(wait) 调用该方法的线程进入 WAITING 状态,只有等 ...
- 高并发中,那些不得不说的线程池与ThreadPoolExecutor类
摘要:从整体上认识下线程池中最核心的类之一--ThreadPoolExecutor,关于ThreadPoolExecutor的底层原理和源码实现,以及线程池中的其他技术细节的底层原理和源码实现. 本文 ...
- python线程池原理_Django异步任务线程池实现原理
这篇文章主要介绍了Django异步任务线程池实现原理,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 当数据库数据量很大时(百万级),许多批量数据修改 ...
- Java多线程系列--【JUC线程池 02】- 线程池原理(一)
参考:http://www.cnblogs.com/skywang12345/p/java_threads_category.html 概要 在前面一章"Java多线程系列--"J ...
- 并发编程--线程池原理
阻塞队列和非阻塞队列 ConcurrentLinkedQueue类 适用于高并发场景下的队列,通过无锁的方式,实现了高并发状态下的高性能,通常ConcurrentLinkedQueue性能好于Bloc ...
- java并发包线程池原理分析锁的深度化
java并发包&线程池原理分析&锁的深度化 并发包 同步容器类 Vector与ArrayList区别 1.ArrayList是最常用的List实现类,内部是通过数组实现的,它允许对元素 ...
最新文章
- 2022-2028年中国塑料绳的制造行业市场现状调查及投资商机预测报告
- centos 7 php mysql apache_CentOS 7 搭建 Apache+MySQL+PHP
- git merge 和 git merge --no-ff
- solidity编码规范
- DNS扫盲系列之五:域名配置ZONE文件
- (转载)java语言对时间的处理
- VC控件自绘制三步曲
- mysql useing查询,MySQL数据库之多表查询using优化与案例
- 利用WCF的双工通讯实现一个简单的心跳监控系统
- 认识和选购极致画质的显示器
- GitLab 安装配置指南
- 【AI视野·今日CV 计算机视觉论文速览 第232期】Thu, 8 Jul 2021
- 面试准备每日五题:C++(九)——vector、list、deque、priority_queue、mapset
- GDI+绘制的一个Report Designer原型
- Web前端开发工程师到底是干什么的?
- 各种排序算法稳定性的探讨
- Chapter 4 Invitations——10
- LOJ2257 SNOI2017 遗失的答案 容斥、高维前缀和
- Java 程序员都该懂的 volatile 关键字
- 算法:剑指 Offer 06. 从尾到头打印链表
热门文章
- 打开软件提示丢失vcruntime140.dll下载安装详细教程
- caffeine本地缓存的使用和详解
- 《Java SE实战指南》13-07:final修饰符
- MATLAB中的bsxfun函数
- IDEA插件系列(77):Spec Math symbols插件——数学符号
- 【ELT.ZIP】OpenHarmony啃论文俱乐部——多维探秘通用无损压缩
- 如何使用Windows File Recovery工具在 Windows 10 上恢复丢失的文件
- 论文阅读-WARP: Word-level Adversarial ReProgramming
- [转载]菲尔兹奖历届得主
- Java中append方法和add方法的区别