线程池存在的意义

平常使用线程即new Thread()然后调用start()方法去启动这个线程,但是在频繁的业务情况下如果在生产环境大量的创建Thread对象是则会浪费资源,不仅增加GC回收压力,并且还浪费了时间,创建线程是需要花时间的;

线程池的存在就是降低频繁的创建线程,降低资源的消耗以及创建时间的浪费,并且可以同一管理。

ThreadPoolExecutor

在JDK中所有的线程池的父类就是ThreadPoolExecutor,以下是它的构造方法

    /*** Creates a new {@code ThreadPoolExecutor} with the given initial* parameters.** @param corePoolSize the number of threads to keep in the pool, even*        if they are idle, unless {@code allowCoreThreadTimeOut} is set* @param maximumPoolSize the maximum number of threads to allow in the*        pool* @param keepAliveTime when the number of threads is greater than*        the core, this is the maximum time that excess idle threads*        will wait for new tasks before terminating.* @param unit the time unit for the {@code keepAliveTime} argument* @param workQueue the queue to use for holding tasks before they are*        executed.  This queue will hold only the {@code Runnable}*        tasks submitted by the {@code execute} method.* @param threadFactory the factory to use when the executor*        creates a new thread* @param handler the handler to use when execution is blocked*        because the thread bounds and queue capacities are reached* @throws IllegalArgumentException if one of the following holds:<br>*         {@code corePoolSize < 0}<br>*         {@code keepAliveTime < 0}<br>*         {@code maximumPoolSize <= 0}<br>*         {@code maximumPoolSize < corePoolSize}* @throws NullPointerException if {@code workQueue}*         or {@code threadFactory} or {@code handler} is null*/public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}

int corePoolSize :线程池中核心线程数,小于corePoolSize ,就会创建新线程,等于corePoolSize ,这个任务就会保存到BlockingQueue,如果调用prestartAllCoreThreads()方法就会一次性的启动corePoolSize个数的线程。

int maximumPoolSize: 允许的最大线程数,BlockingQueue也满了,小于maximumPoolSize时候就会再次创建新的线程

long keepAliveTime:线程空闲下来后,存活的时间,这个参数只在大于corePoolSize才有用

TimeUnit unit:存活时间的单位值

BlockingQueue<Runnable> workQueue:保存任务的阻塞队列

ThreadFactory threadFactory:创建线程的工厂,给新建的线程赋予名字

RejectedExecutionHandler handler:饱和策略

          AbortPolicy :直接抛出异常,默认;

         CallerRunsPolicy:用调用者所在的线程来执行任务

         DiscardOldestPolicy:丢弃阻塞队列里最老的任务,队列里最靠前的任务

         DiscardPolicy :当前任务直接丢弃

也可以实现自己的饱和策略,实现RejectedExecutionHandler接口即可

实现基本原理

主要是依赖BlockingQueue<Runnable>队列和HashSet<Worker>实现的,Worker继承了Runnable以及AQS的一个内部类,所以这个类具体等待并且开启线程的功能

在提交Runnable可执行的线程时,

当前线程数小于corePoolSize  的时候,仅仅是将Runnable添加到HashSet<Worker>当中,并且执行start()方法,调用的是runWorker()方法

当前线程数大于或等于corePoolSize  的时候,会将Runnable添加到workerQueue队列中等待并且会添加一个null的Runnable到addWorker()方法当中。如果队列满了offer失败就会执相应的reject拒绝策略。

    public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task.  The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread.  If it fails, we know we are shut down or saturated* and so reject the task.*/int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);}private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get();  // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}

在addWorker()方法当中,如果Runnable为空的话,会直接返回false,否则将创建一个Worker对象并且启动它,在runWorker中,首先执行完后传输过来的Runnable对象中的run(),然后循环去workerQueue队列使用take方法拿等待队列中的Runnable对象,并且执行相应的run()方法。

final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted.  This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}

关闭线程池的方法:

shutdownNow():设置线程池的状态,还会尝试停止正在运行或者暂停任务的线程

shutdown()设置线程池的状态,只会中断所有没有执行任务的线程

工作机制

合理配置线程池

根据任务的性质来:计算密集型(CPU),IO密集型,混合型

计算密集型:加密,大数分解,正则……., 线程数适当小一点,最大推荐:机器的Cpu核心数+1,为什么+1,防止页缺失,(机器的Cpu核心=Runtime.getRuntime().availableProcessors();)

IO密集型:读取文件,数据库连接,网络通讯, 线程数适当大一点,机器的Cpu核心数*2,

混合型:尽量拆分,IO密集型>>计算密集型,拆分意义不大,IO密集型~计算密集型

队列的选择上,应该使用有界,无界队列可能会导致内存溢出

Executors预定义的线程池

FixedThreadPool:创建固定线程数量的,适用于负载较重的服务器,使用了无界队列

SingleThreadPoolExecutor:创建单个线程,需要顺序保证执行任务,不会有多个线程活动,使用了无界队列

CachedThreadPool:会根据需要来创建新线程的,执行很多短期异步任务的程序,使用了SynchronousQueue
WorkStealingPool(JDK7以后): 基于ForkJoinPool实现

Executor框架

还有一个是定时器,待会儿再说吧

Java并发编程之线程池ThreadPoolExecutor解析相关推荐

  1. 【Java 并发编程】线程池机制 ( ThreadPoolExecutor 线程池构造参数分析 | 核心线程数 | 最大线程数 | 非核心线程存活时间 | 任务阻塞队列 )

    文章目录 前言 一.ThreadPoolExecutor 构造参数 二.newCachedThreadPool 参数分析 三.newFixedThreadPool 参数分析 四.newSingleTh ...

  2. [转]Java并发编程:线程池的使用

    Java并发编程:线程池的使用 在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了, ...

  3. Java并发编程:线程池的使用

    在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统 ...

  4. 【Java 并发编程】线程池机制 ( 线程池示例 | newCachedThreadPool | newFixedThreadPool | newSingleThreadExecutor )

    文章目录 前言 一.线程池示例 二.newCachedThreadPool 线程池示例 三.newFixedThreadPool 线程池示例 三.newSingleThreadExecutor 线程池 ...

  5. (转)Java并发编程:线程池的使用

    背景:线程池在面试时候经常遇到,反复出现的问题就是理解不深入,不能做到游刃有余.所以这篇博客是要深入总结线程池的使用. ThreadPoolExecutor的继承关系 线程池的原理 1.线程池状态(4 ...

  6. Java并发编程一线程池的五种状态

    推荐:Java并发编程汇总 Java并发编程一线程池的五种状态 原文地址 Java多线程线程池(4)–线程池的五种状态 正文 线程池的5种状态:Running.ShutDown.Stop.Tidyin ...

  7. Java并发编程一线程池简介

    推荐:Java并发编程汇总 Java并发编程一线程池简介 为什么我们需要使用线程池? 我们知道线程是一种比较昂贵的资源,我们通过程序每创建一个线程去执行,其实操作系统都会对应地创建一个线程去执行我们的 ...

  8. c++ 线程池_JAVA并发编程:线程池ThreadPoolExecutor源码分析

    前面的文章已经详细分析了线程池的工作原理及其基本应用,接下来本文将从底层源码分析一下线程池的执行过程.在看源码的时候,首先带着以下两个问题去仔细阅读.一是线程池如何保证核心线程数不会被销毁,空闲线程数 ...

  9. Java并发编程之线程池及示例

    1.Executor 线程池顶级接口.定义方法,void execute(Runnable).方法是用于处理任务的一个服务方法.调用者提供Runnable 接口的实现,线程池通过线程执行这个 Runn ...

最新文章

  1. 常用批处理命令总结3之Find和FindStr
  2. 翟树卿:如何让数据挖掘助力精准化营销
  3. 008_效果和动画的Callback函数
  4. Redis进阶-核心数据结构进阶实战
  5. wxWidgets:wxListCtrl 示例
  6. boost::all_degree_centralities用法的测试程序
  7. jQuery 表格自动增加
  8. Spring Cloud 配置中心中的native配置
  9. 【2021杭电多校赛】2021“MINIEYE杯”中国大学生算法设计超级联赛(2)签到题5题
  10. NVIDIA Nsight Compute,Nsight Systems, Nsight Graphics,Nsight Deep Learning Designer简介-草稿
  11. 职场泥潭 | 这样的IT公司绝对不宜久留
  12. NPN PNP 的区别
  13. 苹果safari浏览器video视频无法播放
  14. Redux or Mobx --前端应用状态管理方案的探索与思考
  15. 网卡驱动收包代码分析之 page reuse
  16. 如何显示密件抄送人员地址_什么是密件抄送,以及为什么不使用它会成为一个可怕的人...
  17. 情态动词+have+done用法整理
  18. php 1为false,false是1还是0
  19. 《雷神的微软平台安全宝典》简介
  20. SQL Server报错:选择列表中的列无效,因为该列没有包含在聚合函数或 GROUP BY 子句中

热门文章

  1. linux中tags文件能删除吗,Git 详细介绍查看、删除、重命名远程分支和tag
  2. nedc和epa续航里程什么意思_400公里已成续航新起点,纯电动车的实用性到底怎样...
  3. linux 定时执行搅拌,Linux上定时shell脚本
  4. n个结点,不同形态的二叉树(数目+生成)
  5. 关闭浏览器前提示_win7系统ie总弹出查看和跟踪下载的关闭方法
  6. 怎么在html显示已登录状态,jQuery Ajax 实现在html页面实时显示用户登录状态
  7. android获取图片方向并旋转,Android 判断imageview角度并旋转
  8. python手机编译器可以干什么_世界上最好的Python编辑器是什么?
  9. vue中多行文本标签_vue控制多行文字展开收起的实现示例
  10. Problem C: 括号匹配(栈和队列)