多线程相关-ThreadPoolExecutor
多线程相关-ThreadPoolExecutor
应用层面:
ThreadPoolExecutor:
创建多线程池执行器:new ThreadPoolExecutor(),创建方法最终都是走的以下这个构造方法:
/*** Creates a new {@code ThreadPoolExecutor} with the given initial* parameters.** @param corePoolSize the number of threads to keep in the pool, even* if they are idle, unless {@code allowCoreThreadTimeOut} is set* @param maximumPoolSize the maximum number of threads to allow in the* pool* @param keepAliveTime when the number of threads is greater than* the core, this is the maximum time that excess idle threads* will wait for new tasks before terminating.* @param unit the time unit for the {@code keepAliveTime} argument* @param workQueue the queue to use for holding tasks before they are* executed. This queue will hold only the {@code Runnable}* tasks submitted by the {@code execute} method.* @param threadFactory the factory to use when the executor* creates a new thread* @param handler the handler to use when execution is blocked* because the thread bounds and queue capacities are reached* @throws IllegalArgumentException if one of the following holds:<br>* {@code corePoolSize < 0}<br>* {@code keepAliveTime < 0}<br>* {@code maximumPoolSize <= 0}<br>* {@code maximumPoolSize < corePoolSize}* @throws NullPointerException if {@code workQueue}* or {@code threadFactory} or {@code handler} is null*/public ThreadPoolExecutor(int corePoolSize,//核心线程数int maximumPoolSize,//核心线程最大数量long keepAliveTime,//超出核心线程数的其他空闲线程保留时间TimeUnit unit,//空闲时间单位BlockingQueue<Runnable> workQueue,//对列,当线程数量大于等于核心线程数时,将任务works保存进对列ThreadFactory threadFactory,//创建线程的工厂RejectedExecutionHandler handler) {//超出最大核心线程数的拒绝策略if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}
创建线程池的其他方式:(返回的实际对象仍然是ThreadPoolExecutor,只不过是对构造函数的参数进行的特殊规定)
1、Executors.newFixedThreadPool(int nThreads)
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}
Executors.newFixedThreadPool(int nThreads, ThreadFactory threadFactory)//自动以创建线程的工厂
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory);}
2、Executors.newSingleThreadExecutor()
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}
3、Executor.newCachedThreadPool()
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),threadFactory);}
源码:
ThreadPoolExecutor
构造方法:
ThreadPoolExecutor(int corePoolSize,//核心线程数int maximumPoolSize,//核心线程最大数量long keepAliveTime,//超出核心线程数的其他空闲线程保留时间TimeUnit unit,//空闲时间单位BlockingQueue<Runnable> workQueue,//对列,当线程数量大于等于核心线程数时,将任务works保存进对列ThreadFactory threadFactory,//创建线程的工厂RejectedExecutionHandler handler) {//超出最大核心线程数的拒绝策略
corePoolSize:线程池的核心线程数,当线程池中的工作线程数小于核心线程数的时候,只要向线程池指派任务,线程池就会创建工作线程。
maximumPoolSize:线程池最大工作线程数,当线程池中的工作线程达到最大数的时候,即使再向线程池指派任务,线程池不会创建工作线程,回执行对应的拒绝策略。
keepAliveTime:当线程池的工作线程数大于核心线程数的时候,多余的核心线程数的部分线程(空闲的)可以保持keepAliveTime的空闲时间,当keepAliveTime时间内还没有获取到任务,这些线程后就会被回收。
unit:保持空闲时间的时间单位。
workQueue:任务队列,当线程池里面核心线程都在工作的时候,再向线程池指派任务,线程池会将任务放入任务队列里,工作线程在执行完任务后会再向任务队列里取出任务来执行。
threadFactory:创建执行任务的工作线程的线程工厂。
handler:拒绝任务加入线程池的策越,当线程池里的线程已经达到最大数后,再向线程池里加派任务时,线程池会决绝执行这些任务,handler就是具体执行拒绝的对象。
线程池的大体工作思路
1.当线程池小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。
2.当线程池达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行
3.当workQueue已满,且maximumPoolSize>corePoolSize时,新提交任务会创建新线程执行任务
4.当提交任务数超过maximumPoolSize时,新提交任务由RejectedExecutionHandler处理
5.当线程池中超过corePoolSize数的线程,空闲时间达到keepAliveTime时,关闭空闲线程
6.当设置allowCoreThreadTimeOut(true)时,线程池中核心线程空闲时间达到keepAliveTime也将关闭
/*** The main pool control state, ctl, is an atomic integer packing* two conceptual fields* workerCount, indicating the effective number of threads* runState, indicating whether running, shutting down etc** In order to pack them into one int, we limit workerCount to* (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2* billion) otherwise representable. If this is ever an issue in* the future, the variable can be changed to be an AtomicLong,* and the shift/mask constants below adjusted. But until the need* arises, this code is a bit faster and simpler using an int.** The workerCount is the number of workers that have been* permitted to start and not permitted to stop. The value may be* transiently different from the actual number of live threads,* for example when a ThreadFactory fails to create a thread when* asked, and when exiting threads are still performing* bookkeeping before terminating. The user-visible pool size is* reported as the current size of the workers set.** The runState provides the main lifecycle control, taking on values:** RUNNING: Accept new tasks and process queued tasksrunning状态是可以接受和处理任务* SHUTDOWN: Don't accept new tasks, but process queued tasksshutdown状态时不能接受新的任务,但是仍可以处理对列中的任务* STOP: Don't accept new tasks, don't process queued tasks,stop状态,不接受新任务,也不执行对列中的任务,同事中断正在执行的任务* and interrupt in-progress tasks* TIDYING: All tasks have terminated, workerCount is zero,* the thread transitioning to state TIDYING* will run the terminated() hook methodtidying状态,所有的工作线程全部停止,并工作线程数量为0,将调用terminated方法,进入到terninated状态* TERMINATED: terminated() has completed终止状态** The numerical order among these values matters, to allow* ordered comparisons. The runState monotonically increases over* time, but need not hit each state. The transitions are:*各种状态的转换-----* RUNNING -> SHUTDOWN* On invocation of shutdown(), perhaps implicitly in finalize()* (RUNNING or SHUTDOWN) -> STOP* On invocation of shutdownNow()* SHUTDOWN -> TIDYING* When both queue and pool are empty* STOP -> TIDYING* When pool is empty* TIDYING -> TERMINATED* When the terminated() hook method has completed** Threads waiting in awaitTermination() will return when the* state reaches TERMINATED.** Detecting the transition from SHUTDOWN to TIDYING is less* straightforward than you'd like because the queue may become* empty after non-empty and vice versa during SHUTDOWN state, but* we can only terminate if, after seeing that it is empty, we see* that workerCount is 0 (which sometimes entails a recheck -- see* below).*/private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static final int COUNT_BITS = Integer.SIZE - 3;private static final int CAPACITY = (1 << COUNT_BITS) - 1;//默认的容量2^29 -1// runState is stored in the high-order bitsprivate static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;// Packing and unpacking ctlprivate static int runStateOf(int c) { return c & ~CAPACITY; }private static int workerCountOf(int c) { return c & CAPACITY; }private static int ctlOf(int rs, int wc) { return rs | wc; }//rs:状态 ws:数量
转:
为什么线程池的状态简单的定义为 -1,0,1,2,3不就得了,为什么还要用移位操作呢?
原来这样的,ThreadPool
ctl的这个变量的设计哲学是用int的高3位 + 29个0代表状态,,用高位000+低29位来表示线程池中工作线程的数量,太佩服了。
首先CAPACITY的值为workCount的最大容量,该值为 000 11111 11111111 11111111 11111111,29个1(默认的出事容量536870911)
我们来看一下
private static int runStateOf(int c) { return c & ~CAPACITY; }
用ctl里面的值与容量取反的方式获取状态值。由于CAPACITY的值为000 11111 11111111 11111111 11111111,
那取反后为111 00000 00000000 00000000 00000000, 用 c 与 该值进行与运算,这样就直接保留了c的高三位,
然后将c的低29位设置为0,这不就是线程池状态的存放规则吗,绝。
根据此方法,不难得出计算workCount的方法。
private static int ctlOf(int rs, int wc) { return rs | wc; }
该方法,主要是用来更新运行状态的。确保工作线程数量不丢失。
--------->
理解:ctl初始化:1110 0000 0000 0000 0000 0000 0000 0000 (该值也就是running状态值)-536870912capacity: 0001 1111 1111 1111 1111 1111 1111 1111 536870911当addworker()添加任务是,ctl中的value(也就是通过ctl.get()取到的值)就会加1,即: 1110 0000 0000 0000 0000 0000 0000 0001该值 & 初始容量capacity,即workerCountOf(c)方法:结果就是0000 0000 0000 0000 0000 0000 0000 0001(1),也就是线程数量为1个,同理getTask()的时候回进行-1操作
/*** Executes the given task sometime in the future. The task* may execute in a new thread or in an existing pooled thread.** If the task cannot be submitted for execution, either because this* executor has been shutdown or because its capacity has been reached,* the task is handled by the current {@code RejectedExecutionHandler}.** @param command the task to execute* @throws RejectedExecutionException at discretion of* {@code RejectedExecutionHandler}, if the task* cannot be accepted for execution* @throws NullPointerException if {@code command} is null*/public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task. The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread. If it fails, we know we are shut down or saturated* and so reject the task.*/int c = ctl.get();//从ctl中取值,该值包含状态和数量if (workerCountOf(c) < corePoolSize) {//调用workCountOf方法得到当前的线程数量,和核心线程数比较if (addWorker(command, true))//符合,则调用addworker直接创建线程来执行(这里就是表示,当小于核心线程数时,不管有无空闲线程,都会创建新的线程)return;//创建成功直接returnc = ctl.get();}//没有创建成功则会进行拒绝策略方面的方法判断if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);}
addWorder():
/*** Checks if a new worker can be added with respect to current* pool state and the given bound (either core or maximum). If so,* the worker count is adjusted accordingly, and, if possible, a* new worker is created and started, running firstTask as its* first task. This method returns false if the pool is stopped or* eligible to shut down. It also returns false if the thread* factory fails to create a thread when asked. If the thread* creation fails, either due to the thread factory returning* null, or due to an exception (typically OutOfMemoryError in* Thread.start()), we roll back cleanly.** @param firstTask the task the new thread should run first (or* null if none). Workers are created with an initial first task* (in method execute()) to bypass queuing when there are fewer* than corePoolSize threads (in which case we always start one),* or when the queue is full (in which case we must bypass queue).* Initially idle threads are usually created via* prestartCoreThread or to replace other dying workers.** @param core if true use corePoolSize as bound, else* maximumPoolSize. (A boolean indicator is used here rather than a* value to ensure reads of fresh values after checking other pool* state).* @return true if successful*/private boolean addWorker(Runnable firstTask, boolean core) {retry://重复执行的标记,下边代码有break retry(结束)和continue retry(返回周之前标记为重新执行)for (;;) {int c = ctl.get();//取码int rs = runStateOf(c);//状态码// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))//进行ctl.value加1操作,成功则结束retrybreak retry;c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);//new worker的时候,内部类中会调用工厂来新建一个线程final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;//重复锁mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);//workers,set集合,保存着所有的workerint s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();//workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}
getTask():
/*** Performs blocking or timed wait for a task, depending on* current configuration settings, or returns null if this worker* must exit because of any of:* 1. There are more than maximumPoolSize workers (due to* a call to setMaximumPoolSize).* 2. The pool is stopped.* 3. The pool is shutdown and the queue is empty.* 4. This worker timed out waiting for a task, and timed-out* workers are subject to termination (that is,* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})* both before and after the timed wait, and if the queue is* non-empty, this worker is not the last thread in the pool.** @return task, or null if the worker must exit, in which case* workerCount is decremented*/private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();//处于stop、tidying、terminate状态时,循环减线程数量,回去返回对象return null;}int wc = workerCountOf(c);// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {//下边这一块代码控制着线程超时时间Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}
ThreadPoolExecutor的执行:
当第一次submit或者execute添加任务的时候,如果添加成功会调Thread.start()方法,想线程得到CPU的使用位置的时候,就会走Worker的
run()方法,该run方法会走ThreadPoolExecutor中的runWorker()方法,在这个方法中会走Runnable的run()方法。
关于多线程的blog
http://ifeve.com/java-threadpool/
https://blog.csdn.net/hounanjsj/article/details/73822998
https://blog.csdn.net/wangbiao007/article/details/78196413
https://blog.csdn.net/prestigeding/article/details/53929713
https://blog.csdn.net/wangbiao007/article/details/78196413
多线程相关-ThreadPoolExecutor相关推荐
- Java多线程相关的几十个问题
转载来源:http://www.cnblogs.com/HadesFX/p/5333810.html , https://www.cnblogs.com/HadesFX/p/5333820.html ...
- 并发与多线程相关知识点梳理
文章目录 并发和并行的概念 如何保证线程安全 1. 数据单线程内可见 2. 只读对象 3. 线程安全类 4. 同步与锁机制 什么是锁 线程同步 引用类型 ThreadLocal LeetCode 相关 ...
- JAVA并发与多线程相关面试题总结
JAVA并发与多线程相关面试题总结 1.什么是进程.线程.协程,它们之间的关系是怎样的? 进程: 本质上是一个独立执行的程序,是计算机中的程序关于数据集合上的一次运行活动,进程是操作系统进行资源分配和 ...
- 进程、线程、多线程相关总结
进程.线程.多线程相关总结 一.说说概念 1.进程(process) 狭义定义:进程就是一段程序的执行过程. 广义定义:进程是一个程序关于某个数据集合的一次运行.它是操作系统动态执行的基本单元,在传统 ...
- 多线程相关知识点总结
多线程相关知识点总结 1. 线程的概念: 在早期的操作系统中并没有线程的概念,进程是拥有资源和独立运行的最小单位,也是程序执行的最小单位.任务调度采用的是时间片轮转的抢占式调度方式,而进程是任务调度的 ...
- Java多线程相关知识【17】--设计模式--上下文模式(Context)
文章目录 Java多线程相关知识[17]--设计模式--上下文模式(Context) 1. 问题的引入 2. 解决方法 1. 解决理论 2. 实操代码 上下文数据保存 上文 下文 调度者 测试上下文 ...
- 【多线程】ThreadPoolExecutor类源码解析----续(二进制相关运算)
前言 在之前阅读 ThreadPoolExecutor 源码的时候,发现代码里用到了一些二进制相关的位运算之类的代码,看起来有些费劲了,所以现在大概总结了一些笔记,二进制这东西吧,不难,就跟数学一样, ...
- 【多线程】ThreadPoolExecutor类万字源码解析(注解超级详细)
线程池 线程池初始化时是没有创建线程的,线程池里的线程的初始化与其他线程一样,但是在完成任务以后,该线程不会自行销毁,而是以挂起的状态返回到线程池.直到应用程序再次向线程池发出请求时,线程池里挂起的线 ...
- 多线程相关的一些知识点
1.Lock(this) Lock(this)只能锁住当前对象,对于同一类型的其他对象实例无能为力,可以通过锁定类中的静态字段来解决这个问题. 一些相关链接: C#中的多线程 C#与N ...
最新文章
- 字体--Ubuntu手记之系统配置
- 符号说明表怎么做_电气新手搞不定电气识图怎么办?别慌!8套电气识图教程,秒上手...
- 如何在 Simulink 中使用 PID Tuner 进行 PID 调参?
- Chrome 跨域调试
- 【竞赛篇-国创(大创)线上报告撰写(常用套话总结)】季度报告、中期报告、结题报告怎么写,用什么格式,附件传什么比较好
- css3实现缺角四边形_CSS3实现缺角矩形,折角矩形以及缺角边框
- css 实现导航菜单
- 虚拟机下的SYN Flood测试
- 巨杉数据库 java,巨杉Tech|SequoiaDB 巨杉数据库高可用容灾测试
- 支持英特尔9242的服务器,宝德2U双子星服务器PR2725TP2
- 近段时间参加的CTF竞赛部分题目复现(ISCC2020 、GKCTF、网鼎杯)
- 这是一款功能强大的开源 Python 绘图库
- python namedtuple用法_详解Python中namedtuple的使用
- DTW学习(dynamic time warping)——思想、代码实现
- kafka 命令重新启动_命令行基础知识:关闭和重新启动
- 【开发教程1】开源蓝牙心率防水运动手环-套件检测教程
- xftp、使用pure-ftpd搭建FTP服务
- 程序设计基石与实践系列之类型提升、内存分配,数组转指针、打桩和矢量变换
- PageHelper详解
- BIOS调整服务器性能模式,如何修改BIOS的设置,让显卡发挥最佳性能?
热门文章
- PHP底层运行原理初探
- Apache与Tomcat整合
- mysql5.7设置SQL Mode
- 猴子选王c语言链表程序代码,C语言程序设计-猴子选大王[链表应用]
- vue中的props对象
- linux实验五 信号应用,实验五 进程间通信(中)
- java的json解析工具_json文件解析工具类(java)
- python 模糊匹配文件名 glob_Python: glob匹配文件
- 如何设置mysql的权限_mysql 权限控制
- java反射的编译过程_Java反射机制小结和实际操作