多线程相关-ThreadPoolExecutor

应用层面:

  ThreadPoolExecutor:

  

  创建多线程池执行器:new ThreadPoolExecutor(),创建方法最终都是走的以下这个构造方法:

    /*** Creates a new {@code ThreadPoolExecutor} with the given initial* parameters.** @param corePoolSize the number of threads to keep in the pool, even*        if they are idle, unless {@code allowCoreThreadTimeOut} is set* @param maximumPoolSize the maximum number of threads to allow in the*        pool* @param keepAliveTime when the number of threads is greater than*        the core, this is the maximum time that excess idle threads*        will wait for new tasks before terminating.* @param unit the time unit for the {@code keepAliveTime} argument* @param workQueue the queue to use for holding tasks before they are*        executed.  This queue will hold only the {@code Runnable}*        tasks submitted by the {@code execute} method.* @param threadFactory the factory to use when the executor*        creates a new thread* @param handler the handler to use when execution is blocked*        because the thread bounds and queue capacities are reached* @throws IllegalArgumentException if one of the following holds:<br>*         {@code corePoolSize < 0}<br>*         {@code keepAliveTime < 0}<br>*         {@code maximumPoolSize <= 0}<br>*         {@code maximumPoolSize < corePoolSize}* @throws NullPointerException if {@code workQueue}*         or {@code threadFactory} or {@code handler} is null*/public ThreadPoolExecutor(int corePoolSize,//核心线程数int maximumPoolSize,//核心线程最大数量long keepAliveTime,//超出核心线程数的其他空闲线程保留时间TimeUnit unit,//空闲时间单位BlockingQueue<Runnable> workQueue,//对列,当线程数量大于等于核心线程数时,将任务works保存进对列ThreadFactory threadFactory,//创建线程的工厂RejectedExecutionHandler handler) {//超出最大核心线程数的拒绝策略if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}

创建线程池的其他方式:(返回的实际对象仍然是ThreadPoolExecutor,只不过是对构造函数的参数进行的特殊规定)

  1、Executors.newFixedThreadPool(int nThreads)

    public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}

  Executors.newFixedThreadPool(int nThreads, ThreadFactory threadFactory)//自动以创建线程的工厂

    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory);}

  2、Executors.newSingleThreadExecutor() 

    public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}

  3、Executor.newCachedThreadPool()

    public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}

    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),threadFactory);}

源码:

ThreadPoolExecutor

构造方法:

ThreadPoolExecutor(int corePoolSize,//核心线程数int maximumPoolSize,//核心线程最大数量long keepAliveTime,//超出核心线程数的其他空闲线程保留时间TimeUnit unit,//空闲时间单位BlockingQueue<Runnable> workQueue,//对列,当线程数量大于等于核心线程数时,将任务works保存进对列ThreadFactory threadFactory,//创建线程的工厂RejectedExecutionHandler handler) {//超出最大核心线程数的拒绝策略

corePoolSize:线程池的核心线程数,当线程池中的工作线程数小于核心线程数的时候,只要向线程池指派任务,线程池就会创建工作线程。

maximumPoolSize:线程池最大工作线程数,当线程池中的工作线程达到最大数的时候,即使再向线程池指派任务,线程池不会创建工作线程,回执行对应的拒绝策略。
    keepAliveTime:当线程池的工作线程数大于核心线程数的时候,多余的核心线程数的部分线程(空闲的)可以保持keepAliveTime的空闲时间,当keepAliveTime时间内还没有获取到任务,这些线程后就会被回收。
    unit:保持空闲时间的时间单位。
    workQueue:任务队列,当线程池里面核心线程都在工作的时候,再向线程池指派任务,线程池会将任务放入任务队列里,工作线程在执行完任务后会再向任务队列里取出任务来执行。
    threadFactory:创建执行任务的工作线程的线程工厂。
    handler:拒绝任务加入线程池的策越,当线程池里的线程已经达到最大数后,再向线程池里加派任务时,线程池会决绝执行这些任务,handler就是具体执行拒绝的对象。

线程池的大体工作思路

1.当线程池小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。 
2.当线程池达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行 
3.当workQueue已满,且maximumPoolSize>corePoolSize时,新提交任务会创建新线程执行任务 
4.当提交任务数超过maximumPoolSize时,新提交任务由RejectedExecutionHandler处理 
5.当线程池中超过corePoolSize数的线程,空闲时间达到keepAliveTime时,关闭空闲线程

6.当设置allowCoreThreadTimeOut(true)时,线程池中核心线程空闲时间达到keepAliveTime也将关闭

 /*** The main pool control state, ctl, is an atomic integer packing* two conceptual fields*   workerCount, indicating the effective number of threads*   runState,    indicating whether running, shutting down etc** In order to pack them into one int, we limit workerCount to* (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2* billion) otherwise representable. If this is ever an issue in* the future, the variable can be changed to be an AtomicLong,* and the shift/mask constants below adjusted. But until the need* arises, this code is a bit faster and simpler using an int.** The workerCount is the number of workers that have been* permitted to start and not permitted to stop.  The value may be* transiently different from the actual number of live threads,* for example when a ThreadFactory fails to create a thread when* asked, and when exiting threads are still performing* bookkeeping before terminating. The user-visible pool size is* reported as the current size of the workers set.** The runState provides the main lifecycle control, taking on values:**   RUNNING:  Accept new tasks and process queued tasksrunning状态是可以接受和处理任务*   SHUTDOWN: Don't accept new tasks, but process queued tasksshutdown状态时不能接受新的任务,但是仍可以处理对列中的任务*   STOP:     Don't accept new tasks, don't process queued tasks,stop状态,不接受新任务,也不执行对列中的任务,同事中断正在执行的任务*             and interrupt in-progress tasks*   TIDYING:  All tasks have terminated, workerCount is zero,*             the thread transitioning to state TIDYING*             will run the terminated() hook methodtidying状态,所有的工作线程全部停止,并工作线程数量为0,将调用terminated方法,进入到terninated状态*   TERMINATED: terminated() has completed终止状态** The numerical order among these values matters, to allow* ordered comparisons. The runState monotonically increases over* time, but need not hit each state. The transitions are:*各种状态的转换-----* RUNNING -> SHUTDOWN*    On invocation of shutdown(), perhaps implicitly in finalize()* (RUNNING or SHUTDOWN) -> STOP*    On invocation of shutdownNow()* SHUTDOWN -> TIDYING*    When both queue and pool are empty* STOP -> TIDYING*    When pool is empty* TIDYING -> TERMINATED*    When the terminated() hook method has completed** Threads waiting in awaitTermination() will return when the* state reaches TERMINATED.** Detecting the transition from SHUTDOWN to TIDYING is less* straightforward than you'd like because the queue may become* empty after non-empty and vice versa during SHUTDOWN state, but* we can only terminate if, after seeing that it is empty, we see* that workerCount is 0 (which sometimes entails a recheck -- see* below).*/private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static final int COUNT_BITS = Integer.SIZE - 3;private static final int CAPACITY   = (1 << COUNT_BITS) - 1;//默认的容量2^29 -1// runState is stored in the high-order bitsprivate static final int RUNNING    = -1 << COUNT_BITS;private static final int SHUTDOWN   =  0 << COUNT_BITS;private static final int STOP       =  1 << COUNT_BITS;private static final int TIDYING    =  2 << COUNT_BITS;private static final int TERMINATED =  3 << COUNT_BITS;// Packing and unpacking ctlprivate static int runStateOf(int c)     { return c & ~CAPACITY; }private static int workerCountOf(int c)  { return c & CAPACITY; }private static int ctlOf(int rs, int wc) { return rs | wc; }//rs:状态   ws:数量

转:
为什么线程池的状态简单的定义为 -1,0,1,2,3不就得了,为什么还要用移位操作呢?
原来这样的,ThreadPool
ctl的这个变量的设计哲学是用int的高3位 + 29个0代表状态,,用高位000+低29位来表示线程池中工作线程的数量,太佩服了。
首先CAPACITY的值为workCount的最大容量,该值为 000 11111 11111111 11111111 11111111,29个1(默认的出事容量536870911)
我们来看一下
private static int runStateOf(int c)     { return c & ~CAPACITY; }
用ctl里面的值与容量取反的方式获取状态值。由于CAPACITY的值为000 11111 11111111 11111111 11111111,
那取反后为111 00000 00000000 00000000 00000000, 用 c 与 该值进行与运算,这样就直接保留了c的高三位,
然后将c的低29位设置为0,这不就是线程池状态的存放规则吗,绝。
根据此方法,不难得出计算workCount的方法。
private static int ctlOf(int rs, int wc) { return rs | wc; }
该方法,主要是用来更新运行状态的。确保工作线程数量不丢失。

--------->

理解:ctl初始化:1110 0000 0000 0000 0000 0000 0000 0000   (该值也就是running状态值)-536870912capacity: 0001 1111 1111  1111  1111  1111  1111  1111     536870911当addworker()添加任务是,ctl中的value(也就是通过ctl.get()取到的值)就会加1,即:       1110 0000 0000 0000 0000 0000 0000 0001该值  &  初始容量capacity,即workerCountOf(c)方法:结果就是0000 0000 0000 0000 0000 0000 0000 0001(1),也就是线程数量为1个,同理getTask()的时候回进行-1操作

线程池设计原理:
1)线程池的工作线程为ThreadPoolExecutors的Worker线程,无论是submit还是executor方法中传入的Callable task,Runable参数,只是实现了Runnable接口,在线程池的调用过程,不会调用其start方法,只会调用Worker线程的start方法,然后在Worker线程的run方法中会调用入参的run方法。
2)线程的生命周期在run方法运行结束后(包括异常退出)就结束。要想重复利用线程,就要确保工作线程Worker的run方法运行在一个无限循环中,然后从任务队列中一个一个获取对象,如果任务队列为空,则阻塞,当然需要提供一些控制,结束无限循环,来销毁线程。在源码 runWorker方法与getTask来实现。 
大概的实现思路是 如果getTask返回null,则该worker线程将被销毁。
那getTask在什么情况下会返回false呢?
1、如果线程池的状态为SHUTDOWN并且队列不为空
2、如果线程池的状态大于STOP
3、如果当前运行的线程数大于核心线程数,会返回null,已销毁该worker线程
对keepAliveTime的理解,如果allowCoreThreadTimeOut为真,那么keepAliveTime其实就是从任务队列获取任务等待的超时时间,也就是workerQueue.poll(keepALiveTime, TimeUnit.NANOSECONDS)
    /*** Executes the given task sometime in the future.  The task* may execute in a new thread or in an existing pooled thread.** If the task cannot be submitted for execution, either because this* executor has been shutdown or because its capacity has been reached,* the task is handled by the current {@code RejectedExecutionHandler}.** @param command the task to execute* @throws RejectedExecutionException at discretion of*         {@code RejectedExecutionHandler}, if the task*         cannot be accepted for execution* @throws NullPointerException if {@code command} is null*/public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task.  The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread.  If it fails, we know we are shut down or saturated* and so reject the task.*/int c = ctl.get();//从ctl中取值,该值包含状态和数量if (workerCountOf(c) < corePoolSize) {//调用workCountOf方法得到当前的线程数量,和核心线程数比较if (addWorker(command, true))//符合,则调用addworker直接创建线程来执行(这里就是表示,当小于核心线程数时,不管有无空闲线程,都会创建新的线程)return;//创建成功直接returnc = ctl.get();}//没有创建成功则会进行拒绝策略方面的方法判断if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);}

addWorder():

    /*** Checks if a new worker can be added with respect to current* pool state and the given bound (either core or maximum). If so,* the worker count is adjusted accordingly, and, if possible, a* new worker is created and started, running firstTask as its* first task. This method returns false if the pool is stopped or* eligible to shut down. It also returns false if the thread* factory fails to create a thread when asked.  If the thread* creation fails, either due to the thread factory returning* null, or due to an exception (typically OutOfMemoryError in* Thread.start()), we roll back cleanly.** @param firstTask the task the new thread should run first (or* null if none). Workers are created with an initial first task* (in method execute()) to bypass queuing when there are fewer* than corePoolSize threads (in which case we always start one),* or when the queue is full (in which case we must bypass queue).* Initially idle threads are usually created via* prestartCoreThread or to replace other dying workers.** @param core if true use corePoolSize as bound, else* maximumPoolSize. (A boolean indicator is used here rather than a* value to ensure reads of fresh values after checking other pool* state).* @return true if successful*/private boolean addWorker(Runnable firstTask, boolean core) {retry://重复执行的标记,下边代码有break retry(结束)和continue retry(返回周之前标记为重新执行)for (;;) {int c = ctl.get();//取码int rs = runStateOf(c);//状态码// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))//进行ctl.value加1操作,成功则结束retrybreak retry;c = ctl.get();  // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);//new worker的时候,内部类中会调用工厂来新建一个线程final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;//重复锁mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);//workers,set集合,保存着所有的workerint s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();//workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}

getTask():

    /*** Performs blocking or timed wait for a task, depending on* current configuration settings, or returns null if this worker* must exit because of any of:* 1. There are more than maximumPoolSize workers (due to*    a call to setMaximumPoolSize).* 2. The pool is stopped.* 3. The pool is shutdown and the queue is empty.* 4. This worker timed out waiting for a task, and timed-out*    workers are subject to termination (that is,*    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})*    both before and after the timed wait, and if the queue is*    non-empty, this worker is not the last thread in the pool.** @return task, or null if the worker must exit, in which case*         workerCount is decremented*/private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();//处于stop、tidying、terminate状态时,循环减线程数量,回去返回对象return null;}int wc = workerCountOf(c);// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {//下边这一块代码控制着线程超时时间Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}

ThreadPoolExecutor的执行:

  当第一次submit或者execute添加任务的时候,如果添加成功会调Thread.start()方法,想线程得到CPU的使用位置的时候,就会走Worker的

run()方法,该run方法会走ThreadPoolExecutor中的runWorker()方法,在这个方法中会走Runnable的run()方法。

关于多线程的blog

http://ifeve.com/java-threadpool/

https://blog.csdn.net/hounanjsj/article/details/73822998

https://blog.csdn.net/wangbiao007/article/details/78196413

https://blog.csdn.net/prestigeding/article/details/53929713

https://blog.csdn.net/wangbiao007/article/details/78196413

posted @ 2018-06-26 20:31 犇犇丶 阅读(...) 评论(...) 编辑 收藏

多线程相关-ThreadPoolExecutor相关推荐

  1. Java多线程相关的几十个问题

    转载来源:http://www.cnblogs.com/HadesFX/p/5333810.html , https://www.cnblogs.com/HadesFX/p/5333820.html ...

  2. 并发与多线程相关知识点梳理

    文章目录 并发和并行的概念 如何保证线程安全 1. 数据单线程内可见 2. 只读对象 3. 线程安全类 4. 同步与锁机制 什么是锁 线程同步 引用类型 ThreadLocal LeetCode 相关 ...

  3. JAVA并发与多线程相关面试题总结

    JAVA并发与多线程相关面试题总结 1.什么是进程.线程.协程,它们之间的关系是怎样的? 进程: 本质上是一个独立执行的程序,是计算机中的程序关于数据集合上的一次运行活动,进程是操作系统进行资源分配和 ...

  4. 进程、线程、多线程相关总结

    进程.线程.多线程相关总结 一.说说概念 1.进程(process) 狭义定义:进程就是一段程序的执行过程. 广义定义:进程是一个程序关于某个数据集合的一次运行.它是操作系统动态执行的基本单元,在传统 ...

  5. 多线程相关知识点总结

    多线程相关知识点总结 1. 线程的概念: 在早期的操作系统中并没有线程的概念,进程是拥有资源和独立运行的最小单位,也是程序执行的最小单位.任务调度采用的是时间片轮转的抢占式调度方式,而进程是任务调度的 ...

  6. Java多线程相关知识【17】--设计模式--上下文模式(Context)

    文章目录 Java多线程相关知识[17]--设计模式--上下文模式(Context) 1. 问题的引入 2. 解决方法 1. 解决理论 2. 实操代码 上下文数据保存 上文 下文 调度者 测试上下文 ...

  7. 【多线程】ThreadPoolExecutor类源码解析----续(二进制相关运算)

    前言 在之前阅读 ThreadPoolExecutor 源码的时候,发现代码里用到了一些二进制相关的位运算之类的代码,看起来有些费劲了,所以现在大概总结了一些笔记,二进制这东西吧,不难,就跟数学一样, ...

  8. 【多线程】ThreadPoolExecutor类万字源码解析(注解超级详细)

    线程池 线程池初始化时是没有创建线程的,线程池里的线程的初始化与其他线程一样,但是在完成任务以后,该线程不会自行销毁,而是以挂起的状态返回到线程池.直到应用程序再次向线程池发出请求时,线程池里挂起的线 ...

  9. 多线程相关的一些知识点

    1.Lock(this)         Lock(this)只能锁住当前对象,对于同一类型的其他对象实例无能为力,可以通过锁定类中的静态字段来解决这个问题. 一些相关链接: C#中的多线程 C#与N ...

最新文章

  1. 字体--Ubuntu手记之系统配置
  2. 符号说明表怎么做_电气新手搞不定电气识图怎么办?别慌!8套电气识图教程,秒上手...
  3. 如何在 Simulink 中使用 PID Tuner 进行 PID 调参?
  4. Chrome 跨域调试
  5. 【竞赛篇-国创(大创)线上报告撰写(常用套话总结)】季度报告、中期报告、结题报告怎么写,用什么格式,附件传什么比较好
  6. css3实现缺角四边形_CSS3实现缺角矩形,折角矩形以及缺角边框
  7. css 实现导航菜单
  8. 虚拟机下的SYN Flood测试
  9. 巨杉数据库 java,巨杉Tech|SequoiaDB 巨杉数据库高可用容灾测试
  10. 支持英特尔9242的服务器,宝德2U双子星服务器PR2725TP2
  11. 近段时间参加的CTF竞赛部分题目复现(ISCC2020 、GKCTF、网鼎杯)
  12. 这是一款功能强大的开源 Python 绘图库
  13. python namedtuple用法_详解Python中namedtuple的使用
  14. DTW学习(dynamic time warping)——思想、代码实现
  15. kafka 命令重新启动_命令行基础知识:关闭和重新启动
  16. 【开发教程1】开源蓝牙心率防水运动手环-套件检测教程
  17. xftp、使用pure-ftpd搭建FTP服务
  18. 程序设计基石与实践系列之类型提升、内存分配,数组转指针、打桩和矢量变换
  19. PageHelper详解
  20. BIOS调整服务器性能模式,如何修改BIOS的设置,让显卡发挥最佳性能?

热门文章

  1. PHP底层运行原理初探
  2. Apache与Tomcat整合
  3. mysql5.7设置SQL Mode
  4. 猴子选王c语言链表程序代码,C语言程序设计-猴子选大王[链表应用]
  5. vue中的props对象
  6. linux实验五 信号应用,实验五 进程间通信(中)
  7. java的json解析工具_json文件解析工具类(java)
  8. python 模糊匹配文件名 glob_Python: glob匹配文件
  9. 如何设置mysql的权限_mysql 权限控制
  10. java反射的编译过程_Java反射机制小结和实际操作