一、创建一个线程池对象

使用Executors创建一个线程池常用的方法如下:

1、newFixedThreadPool()

说明:初始化一个指定线程数的线程池,其中 corePoolSize == maxiPoolSize,使用 LinkedBlockingQuene 作为阻塞队列

特点:即使当线程池没有可执行任务时,也不会释放线程。

2、newCachedThreadPool()

说明:初始化一个可以缓存线程的线程池,默认缓存60s,线程池的线程数可达到 Integer.MAX_VALUE,即 2147483647,内部使用 SynchronousQueue 作为阻塞队列;

特点:在没有任务执行时,当线程的空闲时间超过 keepAliveTime,会自动释放线程资源;当提交新任务时,如果没有空闲线程,则创建新线程执行任务,会导致一定的系统开销; 因此,使用时要注意控制并发的任务数,防止因创建大量的线程导致而降低性能。

3、newSingleThreadExecutor()

说明:初始化只有一个线程的线程池,内部使用 LinkedBlockingQueue 作为阻塞队列。

特点:如果该线程异常结束,会重新创建一个新的线程继续执行任务,唯一的线程可以保证所提交任务的顺序执行

4、newScheduledThreadPool()

特点:初始化的线程池可以在指定的时间内周期性的执行所提交的任务,在实际的业务场景中可以使用该线程池定期的同步数据。

线程池的构造方法如下:

    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();// 核心池大小this.corePoolSize = corePoolSize;// 最大池大小this.maximumPoolSize = maximumPoolSize;// 线程池的等待队列this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);// 线程工厂对象this.threadFactory = threadFactory;// 拒绝策略的句柄this.handler = handler;}

默认的线程工厂,Executors类的DefaultThreadFactory方法:

    /*** Thread factory capturing access control context and class loader*/static class PrivilegedThreadFactory extends DefaultThreadFactory {private final AccessControlContext acc;private final ClassLoader ccl;PrivilegedThreadFactory() {super();SecurityManager sm = System.getSecurityManager();if (sm != null) {// Calls to getContextClassLoader from this class// never trigger a security check, but we check// whether our callers have this permission anyways.sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION);// Fail fastsm.checkPermission(new RuntimePermission("setContextClassLoader"));}this.acc = AccessController.getContext();this.ccl = Thread.currentThread().getContextClassLoader();}public Thread newThread(final Runnable r) {return super.newThread(new Runnable() {public void run() {AccessController.doPrivileged(new PrivilegedAction<Void>() {public Void run() {Thread.currentThread().setContextClassLoader(ccl);r.run();return null;}}, acc);}});}}

默认的拒绝策略

    /*** The default rejected execution handler*/private static final RejectedExecutionHandler defaultHandler =new AbortPolicy();/*** A handler for rejected tasks that throws a* {@code RejectedExecutionException}.*/public static class AbortPolicy implements RejectedExecutionHandler {/*** Creates an {@code AbortPolicy}.*/public AbortPolicy() { }/*** Always throws RejectedExecutionException.** @param r the runnable task requested to be executed* @param e the executor attempting to execute this task* @throws RejectedExecutionException always.*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException("Task " + r.toString() +" rejected from " +e.toString());}}

二、添加任务到线程池execute()

    /*** Executes the given task sometime in the future.  The task* may execute in a new thread or in an existing pooled thread.** If the task cannot be submitted for execution, either because this* executor has been shutdown or because its capacity has been reached,* the task is handled by the current {@code RejectedExecutionHandler}.** @param command the task to execute* @throws RejectedExecutionException at discretion of*         {@code RejectedExecutionHandler}, if the task*         cannot be accepted for execution* @throws NullPointerException if {@code command} is null*/public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task.  The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread.  If it fails, we know we are shut down or saturated* and so reject the task.*/// 获取ctl对应的int值。该int值保存了"线程池中任务的数量"和"线程池状态"信息int c = ctl.get();// 当线程池中的任务数量 < "核心池大小"时,即线程池中少于corePoolSize个任务。// 则通过addWorker(command,true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}// 当线程池中的任务数量 >= "核心池大小"时,// 而且,"线程池处于允许状态"时,则尝试将任务添加到阻塞队列中。if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();// 再次确认“线程池状态”,若线程池异常终止了,则删除任务;然后通过reject()执行相应的拒绝策略的内容。if (! isRunning(recheck) && remove(command))reject(command);// 否则,如果"线程池中任务数量"为0,则通过addWorker(null,false)尝试新建一个线程,新建线程对应的任务为null。else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 通过addWorker(command,false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。// 如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内else if (!addWorker(command, false))reject(command);}

可以看到execute方法流程如下:(可参照英文注释)

  1. 线程池中的任务数量小于核心池大小时,新建线程;
  2. 线程池中的任务数量大于等于核心池大小时,添加到任务队列中;
  3. 若不能将任务添加到队列中,则尝试新建一个线程,失败的话执行拒绝策略

下面来看addWorker方法:

    private boolean addWorker(Runnable firstTask, boolean core) {retry:// 这部分是对状态进行检查,更新ctl的值for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get();  // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}// 添加任务到线程池,并启动任务所在的线程boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {final ReentrantLock mainLock = this.mainLock;// 新建Worker,并且指定firstTask为Worker的第一个任务w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int c = ctl.get();int rs = runStateOf(c);if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();// 添加到线程池集合中workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {// 启动线程t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}

三、关闭线程池

调用线程池的shutdown方法可以关闭线程池:

    /*** Initiates an orderly shutdown in which previously submitted* tasks are executed, but no new tasks will be accepted.* Invocation has no additional effect if already shut down.** <p>This method does not wait for previously submitted tasks to* complete execution.  Use {@link #awaitTermination awaitTermination}* to do that.** @throws SecurityException {@inheritDoc}*/public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 检查终止线程池的“线程”是否有权限checkShutdownAccess();// 设置线程池的状态为关闭状态advanceRunState(SHUTDOWN);// 中断线程池中空闲的线程interruptIdleWorkers();onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();}tryTerminate();}

四、线程池状态

线程池使用了原子类的Integer的对象来标识线程池的状态:

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static final int COUNT_BITS = Integer.SIZE - 3;private static final int CAPACITY   = (1 << COUNT_BITS) - 1;// runState is stored in the high-order bitsprivate static final int RUNNING    = -1 << COUNT_BITS;private static final int SHUTDOWN   =  0 << COUNT_BITS;private static final int STOP       =  1 << COUNT_BITS;private static final int TIDYING    =  2 << COUNT_BITS;private static final int TERMINATED =  3 << COUNT_BITS;

可以看到线程池公有以下五种状态:

  1. RUNNING:线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。线程池的初始化状态是RUNNING。

  2. SHUTDOWN:调用线程池的shutdown()接口时,线程池由RUNNING转到SHUTDOWN。线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。

  3. STOP:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP。线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。

  4. TIDYING:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING。当所有的任务已终止,ctl记录的"任务数量"为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。:

  5. TERMINATED:线程池处在TIDYING状态时,执行完terminated()之后,就会由TIDYING->TERMINATED。线程池彻底终止,就变成TERMINATED状态。

五、线程池拒绝策略

线程池共包括4种拒绝策略:

  1. AbortPolicy:当任务添加到线程池中被拒绝时,它将抛出 RejectedExecutionException 异常。
  2. CallerRunsPolicy:当任务添加到线程池中被拒绝时,会在线程池当前正在运行的Thread线程池中处理被拒绝的任务。
  3. DiscardOldestPolicy:当任务添加到线程池中被拒绝时,线程池会放弃等待队列中最旧的未处理任务,然后将被拒绝的任务添加到等待队列中。
  4. DiscardPolicy:当任务添加到线程池中被拒绝时,线程池将丢弃被拒绝的任务。

线程池原理(ThreadPoolExecutor)相关推荐

  1. Java线程池ThreadPoolExecutor使用和分析(三) - 终止线程池原理

    相关文章目录: Java线程池ThreadPoolExecutor使用和分析(一) Java线程池ThreadPoolExecutor使用和分析(二) - execute()原理 Java线程池Thr ...

  2. ThreadPoolExecutor线程池原理

    ThreadPoolExecutor线程池原理 线程池原理 1. 线程池的简单介绍 1.1 线程池是什么 1.2 线程池解决的核心问题是什么 2. 线程池的实现原理 2.1 线程池的执行流程 2.2 ...

  3. Java 并发编程——Executor框架和线程池原理

    Java 并发编程系列文章 Java 并发基础--线程安全性 Java 并发编程--Callable+Future+FutureTask java 并发编程--Thread 源码重新学习 java并发 ...

  4. JAVA线程池原理以及几种线程池类型介绍

    在什么情况下使用线程池? 1.单个任务处理的时间比较短      2.将需处理的任务的数量大 使用线程池的好处: 1.减少在创建和销毁线程上所花的时间以及系统资源的开销      2.如不使用线程池, ...

  5. java线程池_Java多线程并发:线程基本方法+线程池原理+阻塞队列原理技术分享...

    线程基本方法有哪些? 线程相关的基本方法有 wait,notify,notifyAll,sleep,join,yield 等. 线程等待(wait) 调用该方法的线程进入 WAITING 状态,只有等 ...

  6. 高并发中,那些不得不说的线程池与ThreadPoolExecutor类

    摘要:从整体上认识下线程池中最核心的类之一--ThreadPoolExecutor,关于ThreadPoolExecutor的底层原理和源码实现,以及线程池中的其他技术细节的底层原理和源码实现. 本文 ...

  7. python线程池原理_Django异步任务线程池实现原理

    这篇文章主要介绍了Django异步任务线程池实现原理,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 当数据库数据量很大时(百万级),许多批量数据修改 ...

  8. Java多线程系列--【JUC线程池 02】- 线程池原理(一)

    参考:http://www.cnblogs.com/skywang12345/p/java_threads_category.html 概要 在前面一章"Java多线程系列--"J ...

  9. 并发编程--线程池原理

    阻塞队列和非阻塞队列 ConcurrentLinkedQueue类 适用于高并发场景下的队列,通过无锁的方式,实现了高并发状态下的高性能,通常ConcurrentLinkedQueue性能好于Bloc ...

  10. java并发包线程池原理分析锁的深度化

    java并发包&线程池原理分析&锁的深度化 并发包 同步容器类 Vector与ArrayList区别 1.ArrayList是最常用的List实现类,内部是通过数组实现的,它允许对元素 ...

最新文章

  1. 2022-2028年中国塑料绳的制造行业市场现状调查及投资商机预测报告
  2. centos 7 php mysql apache_CentOS 7 搭建 Apache+MySQL+PHP
  3. git merge 和 git merge --no-ff
  4. solidity编码规范
  5. DNS扫盲系列之五:域名配置ZONE文件
  6. (转载)java语言对时间的处理
  7. VC控件自绘制三步曲
  8. mysql useing查询,MySQL数据库之多表查询using优化与案例
  9. 利用WCF的双工通讯实现一个简单的心跳监控系统
  10. 认识和选购极致画质的显示器
  11. GitLab 安装配置指南
  12. 【AI视野·今日CV 计算机视觉论文速览 第232期】Thu, 8 Jul 2021
  13. 面试准备每日五题:C++(九)——vector、list、deque、priority_queue、mapset
  14. GDI+绘制的一个Report Designer原型
  15. Web前端开发工程师到底是干什么的?
  16. 各种排序算法稳定性的探讨
  17. Chapter 4 Invitations——10
  18. LOJ2257 SNOI2017 遗失的答案 容斥、高维前缀和
  19. Java 程序员都该懂的 volatile 关键字
  20. 算法:剑指 Offer 06. 从尾到头打印链表

热门文章

  1. 打开软件提示丢失vcruntime140.dll下载安装详细教程
  2. caffeine本地缓存的使用和详解
  3. 《Java SE实战指南》13-07:final修饰符
  4. MATLAB中的bsxfun函数
  5. IDEA插件系列(77):Spec Math symbols插件——数学符号
  6. 【ELT.ZIP】OpenHarmony啃论文俱乐部——多维探秘通用无损压缩
  7. 如何使用Windows File Recovery工具在 Windows 10 上恢复丢失的文件
  8. 论文阅读-WARP: Word-level Adversarial ReProgramming
  9. [转载]菲尔兹奖历届得主
  10. Java中append方法和add方法的区别