详解(一)-ThreadPollExecutor-并发编程(Java)
文章目录
- 1 前言
- 2 TheadPoolExcutor 基础
- 2.1 状态
- 2.2 状态转换
- 2 构造方法
- 2.1 ThreadPoolExcutor构造方法
- 2.2 corePoolSize和maximumPoolSize
- 2.3 keepAliveTime和unit
- 1.3 阻塞队列 BlockingQueue
- 1.4 线程工厂ThreadFactory
- 1.5 拒绝策略 RejectedExecutionHandler
- 2.2 Excutors
- 3 线程池执行任务流程
- 3.1 execute()
- 3.1.1 execute执行流程
- 3.1.2 addWorker()分析
- 5 关闭线程池
- 6 后记
1 前言
前面通过实现自定义线程池-并发编程(Java)及详解-ThreadPollExecutor-并发编程(Java),我们对线程池有了初步了解。下面,我们深入JDK代码底层,更深入的学习
2 TheadPoolExcutor 基础
ThreadPoolExcutor就是JDK给我们提供的线程池实现类。继承关系如下图1-1所示:[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1nqJF2BS-1668310655374)(L:\study\java\concurrent\thread-pool\20221111-threadpool-inheritance.png)]
2.1 状态
先看下源码关于状态的定义:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
- COUNT_BITS:值为int位数32减去3即29
- ctl:这个原子整数使用int的高3位来表示线程状态;低29位表示线程数,即线程最大容量229−12^{29}-1229−1;
- 对于有对ctl的合成 与取状态线程数不明白的可以复习下按位运算。
线程的5种状态如下表2.1-1所示:
状态名 | 高3位 | 接收新任务 | 处理阻塞队列任务 | 说明 |
---|---|---|---|---|
RUNNING | -1 | Y | Y | 接收新任务,处理阻塞队列任务 |
SHUTDOWN | 0 | N | Y | 不会接收新任务,会处理阻塞队列任务 |
STOP | 1 | N | N | 不会结束新任务,抛弃阻塞队列任务,同时会中断正在执行的任务。 |
TIDYING | 2 | - | - | 所有任务终结,线程数为0,会调用terminated()方法;这是一个过渡状态。 |
TERMINATED | 3 | - | - | 终结状态,生命周期结束。 |
- 注:那么问题来了,为什么它要用一个原子整数存储2个值:状态和线程数,难道用2个变量不是更好操作吗?
我们这个是线程池,主要任务自然用多线程处理多个任务,不可避免涉及访问控制。使用一个原子整数,不用对2个变量进行原子操作;且ctl的合成和取状态和线程数不复杂。
2.2 状态转换
- 线程池状态转换如下表2.1-2所示
状态转换 | 触发 |
---|---|
RUNNING -> SHUTDOWN | 执行shutdown()方法也可能是finalize() |
(RUNNING or SHUTDOWN) -> STOP | shotdownNow() |
SHUTDOWN -> TIDYING | 队列和池为空的时候 |
STOP -> TIDYING | 池为空的时候 |
TIDYING -> TERMINATED | terminated()方法执行完成 |
2 构造方法
2.1 ThreadPoolExcutor构造方法
构造方法有重载,我们下面展示一个最全的构造方法:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
2.2 corePoolSize和maximumPoolSize
- corePoolSize:核心线程数
- maximumPoolSize:最大线程数
核心线程数和最大线程数作用,当有新任务时,如果当前线程数小于核心线程数,创建新的工作线程;大于等于核心线程数时,新任务放入阻塞队列;如果阻塞队列满了但是不超过最大线程数,创建非核心线程;如果超过最大线程数,执行拒绝策略。
非核心线程有的地方叫做救急线程,具体的代码和详细解析在下面线程池执行流程处讲解。
2.3 keepAliveTime和unit
- keepAliveTime:存活时间,指非核心线程或者设置了允许核心线程超时的核心线程
- unit:时间单位
看下面代码2.3-1:
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}
}
private void decrementWorkerCount() {do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
上述代码为工作线程从阻塞队列获取任务执行的方法,执行流程图如下图2.3-1所示:
其中获取阻塞队列任务的时候,用到这两个变量。下面我们来分析下满足那些条件,会执行到这里。
- 第一个主要分支判断:rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty()),我们需要的结果是false,满足false的条件为一下之一即条件关系是或的关系
- 线程池状态是RUNNING
- SHUTDOWN<=线程池状态<STOP且阻塞队列不能为空
- 第二个主要分支判断:(wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty()),我们需要的结果仍然是false,满足false的条件也是下面列出的条件之一
- wc <= maximumPoolSize && (!timed || !timedOut) :继续拆解,条件关系为且
- wc <= maximumPoolSize:工作现场数小于等于最大的工作现场数
- !timed || !timedOut:拆解,条件关系为或
- !timed:timed = allowCoreThreadTimeOut || wc > corePoolSize,允许核心线程设置超时否且工作现场数线程数小于等于核心线程数
- !timedOut:没有超时
- wc <= maximumPoolSize && (!timed || !timedOut) :继续拆解,条件关系为且
- 第三个分支判断:timed
- true:允许核心线程超时设置为真或者线程数大于核心线程数且小于等于最大工作现场数,此时执行从阻塞队列有时限的等待获取任务
1.3 阻塞队列 BlockingQueue
BlockingQueue阻塞队列主要功能就是当队列为空时,从队列取元素会阻塞;当队列满时,存入队列会阻塞。具体的实现,这里不详述,等以后有用到在讲解。
1.4 线程工厂ThreadFactory
线程工厂顾名思义就是使用工厂模式实现生成线程的,比起我们自己手动创建线程,它很好的封装了实现的细节。线程池代码中生成线程的地方,如下代码片段1.4-1:
Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);
}
即在工作线程构造方法中,具体的实现类同上不详述。
1.5 拒绝策略 RejectedExecutionHandler
拒绝策略在把任务放入阻塞队列失败或者添加工作线程失败时触发,定义了4种拒绝策略实现:
AbortPolicy:抛异常,线程池默认拒绝策略
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException("Task " + r.toString() +" rejected from " +e.toString()); }
DiscardPolicy:丢弃任务,什么也不做
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}
DiscardOldestPolicy:丢弃最早放入阻塞队列的任务,执行当前任务
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {e.getQueue().poll();e.execute(r);}}
CallerRunsPolicy:任务自己执行
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {r.run();}}
2.2 Excutors
我们常用的有3种类型的线程池,具体介绍可以查看这篇博客详解-ThreadPollExecutor-并发编程(Java)或者自行查阅相关文档。一般建议不直接new而是使用Excutors的静态方法来创建相应的线程池。
3 线程池执行任务流程
3.1 execute()
3.1.1 execute执行流程
讲解之前,先看下源码如下3.1-1所示:
public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task. The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread. If it fails, we know we are shut down or saturated* and so reject the task.*/int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);
}
代码中也有很清晰的解释,不过是英文
详解(一)-ThreadPollExecutor-并发编程(Java)相关推荐
- 详解JUC高并发编程
JUC并发编程 并发编程的本质:充分利用CPU的资源 问题:JAVA可以开启线程吗? 不能:最底层是本地方法,底层是C++,Java无法直接操作硬件 1.线程里有几个状态 NEW, 新生 RUNN ...
- 【牛客网】-【并发详解】-【并发编程基础】-【原子类】
目录 并发编程基础 原子类 参考书目: 并发编程基础 在操作系统中,并发是指一个时间段中有几个程序都处于已启动运行到运行完毕之间,且这几个程序都是在同一个处理机上运行,但任一个时刻点上只有一个程序在处 ...
- 相关性质和条件变量-ReentrantLock详解(2)-AQS-并发编程(Java)
文章目录 1 可重入 2 可打断 3 公平锁 4 条件变量 4.1 await() 4.1.1 主方法 4.1.2 addConditionWaiter() 4.1.3 isOnSyncQueue() ...
- 详解 Visual C# 数据库编程
详解 Visual C# 数据库编程 ****** 2007-11-05 14:34 关于数据库编程,微软提供了一个统一的数据对象访问模型,在Visual Studio6.0中称为ADO,在.NET中 ...
- Java 泛型(generics)详解及代码示例、Java 类型通配符详解及代码示例
Java 泛型(generics)详解及代码示例.Java 类型通配符详解及代码示例 - 概念 Java 泛型(generics)是 JDK 5 中引入的一个新特性, 泛型提供了编译时类型安全检测机制 ...
- 聚合中返回source_Java 8 中的 Streams API 详解—— Streams 的背景以及 Java 8 中的使用详解...
为什么需要 Stream Stream 作为 Java 8 的一大亮点,它与 java.io 包里的 InputStream 和 OutputStream 是完全不同的概念.它也不同于 StAX 对 ...
- 《Java和Android开发实战详解》——1.2节Java基础知识
本节书摘来自异步社区<Java和Android开发实战详解>一书中的第1章,第1.2节Java基础知识,作者 陈会安,更多章节内容可以访问云栖社区"异步社区"公众号查看 ...
- FutureTask-详解(二)-ThreadPollExecutor-并发编程(Java)
文章目录 1 FutureTask 1.1 简介 1.2 成员变量 1.3 构造方法 1.4 主要执行流程分析 1.4.1 run任务执行 1.4.1.1 run方法 1.4.1.2 set(resu ...
- 并发编程-java内存模型
1. 基本概念 程序:静态,用于完成某些功能的代码. 进程:动态,运行中的程序 线程:进程中的实际运作单位,一个进程可以包含一个或多个线程. 2. JVM内存区域 堆:线程共享,存放实例对象 (OOM ...
最新文章
- Java如何清除日期_Java中关于日期的处理方法
- 向 Web 开发人员推荐35款 JavaScript 图形图表库
- 大众点评字体_点评里的神笔马良!她的美食笔记会让你惊掉下巴!
- 活动推荐|20位大咖齐聚,“中国首届沉浸产业发展论坛”10月底将于南京召开...
- 软件测试用python一般用来做什么-想要成为一个优秀的软件测试人员,应该学些什么?...
- Activity启动模式 launchMode
- kie-api_KIE-WB / JBPM控制台Ng –配置
- OJ1000: A+B Problem
- Take a Photo and Upload it on Mobile Phones with HTML5
- 从R-CNN到Faster R-CNN漫谈
- zabbix agent启动不了
- 简单工厂模式-Simple Factory Pattern
- 摩托罗拉发布RhoElements HTML5框架
- sig值怎么计算_T检验、sig.值
- 清华大学计算机竞赛自主招生,清华大学自主招生竞赛有哪些要求
- 3种方式教你怎样显示手机wifi密码,不再愁密码忘记了
- 苏世民的54条人生成功经验
- 将一个D触发器转换成JK触发器
- 编程乐趣:获取12306的所有车站电报码
- Java 中 switch 的用法
热门文章
- linux 显卡扩展坞,Ubuntu18.04上外接显卡扩展坞安装Nvidia驱动和CUDA10.0及cuDNN
- QPSK和16QAM基带信号解调误比特率理论限和仿真对比
- 如何使用adb工具进行查看Android中创建的数据库文件
- 如何成为一名数仓工程师?
- 【复盘7】距离考研还有88天【补充作息时间表】
- 谷歌恐龙游戏HTML,谷歌浏览器自带的恐龙跑酷小游戏
- mysql 1168_解决fatal error LNK1168的终极方法
- 儿童护眼灯哪个品牌好?精选最好的儿童护眼灯
- win10 获取超级管理员权限
- 《阴符经》【中篇】【下篇】