Java并发编程之 Excutor
文章目录
- Executor框架的结构
- Executor框架的执行流程
- Executor框架的成员
- 线程池ThreadPoolExecutor
- 三大方法七大参数四种拒绝策略
- 线程池的处理流程
- `execute()`和`submit()`的区别
- `shutdown()`和`shitdownNow()`的区别
- 线程池的监控
- FixedThreadPool
- SingleThreadExecutor
- CachedThreadPool
- 为什么不要用Executors来创建线程
- ScheduledThreadPoolExecutor
- Future
- FutureTask
- FutureTask的三种状态
在Java中,使用线程来异步执行任务。Java线程的创建与销毁需要一定的开销,如果我们为每一个任务创建一个新线程来执行,这些线程的创建与销毁将消耗大量的计算资源。同时,为每一个任务创建一个新线程来执行,这种策略可能会使处于高负荷状态的应用最终崩溃。
Java的线程既是工作单元,也是执行机制。从JDK 5开始,把工作单元与执行机制分离开来。工作单元包括Runnable
和Callable
,而执行机制由Executor
框架提供。
在HotSpot VM的线程模型中,Java线程(java.lang.Thread)被一对一映射为本地操作系统线程。Java线程启动时会创建一个本地操作系统线程;当该Java线程终止时,这个操作系统线程也会被回收。
在上层,Java多线程程序通常把应用分解为若干个任务,然后使用用户级的调度器(Executor框架)将这些任务映射为固定数量的线程;在底层,操作系统内核将这些线程映射到硬件处理器上:
Executor框架的结构
任务: 包括被执行任务需要实现的接口:
Runnable
接口或Callable
接口。
任务的执行: 包括任务执行机制的核心接口
Executor
,以及继承自Executor
的ExecutorService
接口。ExecutorService
接口有两个关键的实现类:ThreadPoolExecutor
和ScheduledThreadPoolExecutor
。
异步计算的结果: 包括接口
Future
和实现Future接口的FutureTask
类。
Executor
是一个接口,它是Executor框架的基础,它将任务的提交与任务的执行分离开来。ThreadPoolExecutor
是线程池的核心实现类,用来执行被提交的任务。ScheduledThreadPoolExecutor
是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。ScheduledThreadPoolExecutor
比Timer
更灵活,功能更强大。Future
接口和实现Future接口的FutureTask
类,代表异步计算的结果。Runnable
接口和Callable
接口的实现类,都可以被ThreadPoolExecutor
或ScheduledThreadPoolExecutor
执行。
Executor框架的执行流程
- 主线程首先要创建实现
Runnable
或者Callable
接口的任务对象。工具类Executors
可以把一个Runnable
对象封装为一个Callable
对象(Executors.callable(Runnable task)
或Executors.callable(Runnable task, Object resule)
)。 - 然后可以把
Runnable
对象直接交给ExecutorService
执行(ExecutorService.execute(Runnable command)
),或者也可以把Runnable
对象或Callable
对象提交给ExecutorService
执行(ExecutorService.submit(Runnable task)
或ExecutorService.submit(Callable<T>task)
)。 - 如果执行的是
ExecutorService
的submit()
方法,ExecutorService
将返回一个实现Future
接口的对象(FutureTask
对象)。由于FutureTask
实现了Runnable
,程序员也可以创建FutureTask
,然后直接交给ExecutorService
执行。 - 最后,主线程可以执行
FutureTask.get()
方法来等待任务执行完成。主线程也可以执行FutureTask.cancel(boolean mayInterruptIfRunning)
来取消此任务的执行。
Executor框架的成员
线程池ThreadPoolExecutor
三大方法七大参数四种拒绝策略
Executor
框架最核心的类是ThreadPoolExecutor
,它是线程池的实现类。
三大方法
ThreadPoolExecutor
通常使用工厂类Executors
来创建。Executors
可以创建3种类型的ThreadPoolExecutor
:SingleThreadExecutor
、FixedThreadPool
和CachedThreadPool
。
- Executors.newSingleThreadExecutor()
- Executors.newFixedThreadPool()
- Executors.newCachedThreadPool()
七大参数
- int corePoolSize:核心线程数。(要保留在池中的线程数,即使它们处于空闲状态,除非设置了
allowCoreThreadTimeOut
) - int maximumPoolSize:最大线程数。(如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务;如果使用了无界的任务队列这个参数是无效的。)
- 对于CPU密集型任务,可以将该参数设置为
CPU核数 + 1
- 对于IO密集型任务,可以将该参数设置为
CPU核数 * 2
- 对于CPU密集型任务,可以将该参数设置为
- long keepAliveTime:超时释放时间。(当线程数大于核心数时,这是多余空闲线程在终止前等待新任务的最长时间)
- TimeUnit unit:超时释放时间的单位。(枚举类
TimeUnit
的常量) - BlockingQueue<Runnable> workQueue :阻塞队列。(用于在执行任务之前保存任务的队列, 这个队列将只保存
execute
方法提交的Runnable任务)- ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按FIFO(先进先出)原则对元素进行排序。
- LinkedBlockingQueue:一个基于链表结构的阻塞队列,是一个容量为
Integer.MAX_VALUE
的队列(无界队列)。此队列按FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。 - SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool()使用了这个队列。
- PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
- ThreadFactory threadFactory:线程工厂。(执行程序创建新线程时使用的工厂)
- RejectedExecutionHandler handler:拒绝策略。(当提交任务数超过
maxmumPoolSize
+workQueue
之和时,任务会交给handler来处理)
四种拒绝策略
- AbortPolicy:抛出
RejectedExecutionException
异常 - DiscardPolicy:丢掉任务,不抛异常
- DiscardOldestPolicy:不抛异常,尝试去和最早的去竞争,竞争失败再丢掉任务
- CallerRunsPolicy:哪来的回哪里(将被拒绝的任务任务返回给
execute()
方法的调用线程中运行)
线程池的处理流程
execute()
和submit()
的区别
public interface Executor {void execute(Runnable command);
}public interface ExecutorService extends Executor {<T> Future<T> submit(Callable<T> task);<T> Future<T> submit(Runnable task, T result);Future<?> submit(Runnable task);
}
execute()
方法是Executor
接口的唯一方法,submit()
方法是ExecutorService
的,一共有三个重载。execute()
方法只能接收Runnable
接口的实现类作为参数,submit()
方法可以接收Runnable
和Callable
接口的实现类作为参数。execute()
方法没有返回值,submit()
方法用于提交需要返回值的任务。线程池会返回一个Future类型的对象,通过这个对象可以判断任务是否执行成功,并且可以通过get()
方法来获取返回值,get()
方法会阻塞当前线程直到任务完成,而使用get(long timeout, TimeUnit unit)
方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。execute()
方法无法处理异常,只会抛出,submit()
方法可以通过返回结果的Future对象的get()
方法对异常进行处理。
shutdown()
和shitdownNow()
的区别
可以通过调用线程池的shutdown()
或shutdownNow()
方法来关闭线程池。它们的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt()
方法来中断线程,所以无法响应中断的任务可能永远无法终止。
shutdown()
只是将线程池的状态设置为SHUTWDOWN
状态,正在执行的任务会继续执行下去,没有被执行的则中断。而shutdownNow()
则是将线程池的状态设置为STOP
,正在执行的任务则被停止,没被执行任务的则返回。(shutdown()
不着急,shutdownNow()
很着急)
线程池的监控
// 返回正在积极执行任务的线程的大致数量
executor.getActiveCount();
// 返回已安排执行的大致任务总数
// 由于任务和线程的状态在计算过程中可能会动态变化,因此返回值只是一个近似值
executor.getTaskCount();
// 返回已完成执行的大致任务总数
// 由于任务和线程的状态在计算过程中可能会动态变化,因此返回值只是一个近似值,但在连续调用中永远不会减少
executor.getCompletedTaskCount();
// 返回曾经同时进入池中的最大线程数
// 通过这个数据可以知道线程池是否曾经满过:如该数值等于线程池的最大大小,则表示线程池曾经满过。
executor.getLargestPoolSize();
// 返回当前池中的线程数
executor.getPoolSize();
可以通过继承线程池来自定义线程池,重写线程池的beforeExecute()
、afterExecute()
和terminated()
方法,也可以在任务执行前、执行后和线程池关闭前执行一些代码来进行监控。
protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }
FixedThreadPool
FixedThreadPool
被称为可重用固定线程数的线程池。
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
FixedThreadPool
的核心线程数corePoolSize
和最大线程数maximumPoolSize
都被设置为创建FixedThreadPool
时指定的参数nThreads。
当线程池中的线程数大于核心线程数corePoolSize
时,keepAliveTime
为多余的空闲线程等待新任务的最长时间,超过这个时间后多余的线程将被终止。这里把keepAliveTime
设置为0L,意味着多余的空闲线程会被立即终止。
- 如果当前运行的线程数少于
corePoolSize
,则创建新线程来执行任务 - 在线程池完成预热之后(当前运行的线程数等于
corePoolSize
),将任务加入LinkedBlockingQueue
- 线程执行完任务后,会在循环中反复从
LinkedBlockingQueue
获取任务来执行
SingleThreadExecutor
SingleThreadExecutor
是使用单个worker线程的Executor(corePoolSize
和maximumPoolSize
被设置为1)。
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}
- 如果当前运行的线程数少于
corePoolSize
(即线程池中无运行的线程),则创建一个新线程来执行任务 - 如果当前线程池中有一个运行的线程,则将任务加入
LinkedBlockingQueue
- 核心线程执行完任务后,会在一个无限循环中反复从
LinkedBlockingQueue
获取任务来执行
CachedThreadPool
CachedThreadPool
是一个会根据需要创建新线程的线程池。
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}
CachedThreadPool
的corePoolSize
被设置为0,即corePool为空;maximumPoolSize
被设置为Integer.MAX_VALUE
,即 maximumPool
是无界的。这里把keepAliveTime
设置为60L,意味着CachedThreadPool
中的空闲线程等待新任务的最长时间为60秒,空闲线程超过60秒后将会被终止。
SynchronousQueue
是一个没有容量的阻塞队列。每个插入操作必须等待另一个线程的对应移除操作,反之亦然。CachedThreadPool
使用SynchronousQueue
,把主线程提交的任务传递给空闲线程执行:
为什么不要用Executors来创建线程
FixedThreadPool
和SignalThreadExecutor
FixedThreadPool
和SignalThreadExecutor
的阻塞队列都为LinkedBlockingQueue
,LinkedBlockingQueue
的容量为Integer.MAX_VALUE
(无界队列),由于无界队列的存在,会导致maximumPoolSize
和keepAliveTime
是无效的,也就是说新的任务会一直添加到无界队列中去,虽然这个“无界队列”其实是有界的,但是还没等这个队列被填满,就OOM了。同时,由于无界队列不可能被正常填满,因此拒绝策略handler
也是形同虚设。CachedThreadPool
CachedThreadPool
的maximumPoolSize
被设置为Integer.MAX_VALUE
,即maximumPool
是无界的;同时CachedThreadPool
的阻塞队列为没有容量的SynchronousQueue
。这意味着,如果主线程提交任务的速度高于maximumPool
中线程处理任务的速度时,CachedThreadPool
会不断创建新线程。极端情况下,CachedThreadPool
会因为创建过多线程而耗尽CPU和内存资源(OOM)。
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor
继承自ThreadPoolExecutor
。它主要用来在给定的延迟之后运行任务,或者定期执行任务。
它有三种调度任务的方式:
schedule():延迟多长时间之后只执行一次;
ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(10); System.out.println(LocalDateTime.now()); scheduled.schedule(new Runnable() {@Overridepublic void run() {System.out.println(LocalDateTime.now());} }, 4, TimeUnit.SECONDS);
scheduledAtFixedRate():延迟指定时间后执行一次,之后按照固定的时长周期执行;
ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(10); scheduled.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {System.out.println(LocalDateTime.now());} }, 0, 4, TimeUnit.SECONDS);
scheduleWithFixedDelay():延迟指定时间后执行一次,之后按照:上一次任务执行时长 + 周期的时长 的时间去周期执行;
ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(10); scheduled.scheduleWithFixedDelay(new Runnable() {@SneakyThrows@Overridepublic void run() {Thread.sleep(1000);System.out.println(LocalDateTime.now());} }, 0,4, TimeUnit.SECONDS);
ScheduledThreadPoolExecutor
有四种构造器,用来指定核心线程数、线程工厂、拒绝策略:
因为ScheduledThreadPoolExecutor
继承了ThreadPoolExecutor
,所以它的构造器都是通过super
调用的父类的构造器:
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler);
DelayedWorkQueue
是一个无界队列,因此这里把maximumPoolSize
也设为了算是无穷大吧(没什么意义)…
ScheduledThreadPoolExecutor
会把待调度的任务封装为一个ScheduledFutureTask
(ScheduledThreadPoolExecutor
的私有内部类)对象放到优先队列DelayedWorkQueue
(ScheduledThreadPoolExecutor
的静态内部类)中。
ScheduledFutureTask
主要包含3个成员变量:
time
:表示这个任务将要被执行的具体时间sequenceNumber
:表示这个任务被添加到ScheduledThreadPoolExecutor中的序号period
:表示任务执行的间隔周期
DelayedWorkQueue
会对队列中的ScheduledFutureTask
进行排序。排序时,time
小的排在前面(时间早的任务将被先执行)。如果两个ScheduledFutureTask
的time
相同,就比较sequenceNumber
,sequenceNumber
小的排在前面(也就是说,如果两个任务的执行时间相同,那么先提交的任务将被先执行)。
执行流程
- 调用
DelayedWorkQueue
的take()
方法获取一个已到期(ScheduledFutureTask
的time
大于等于当前时间)的ScheduledFutureTask
对象 - 调用
ScheduledFutureTask
对象的run()
方法执行ScheduledFutureTask
对象 - 调用
ScheduledFutureTask
对象的setNextRunTime()
方法修改ScheduledFutureTask
的time
变量为下次将要被执行的时间 - 调用
ScheduledFutureTask
对象的reExecutePeriodic()
方法将修改time
之后的ScheduledFutureTask
放回DelayedWorkQueue
中
Future
Future
接口主要提供了异步返回任务执行结果,取消任务执行,获取任务执行状态的功能,接口定义如下:
public interface Future<V> {// 取消任务执行// mayInterruptIfRunning用于控制如果任务正在执行,是否中断对应的执行线程来取消该任务// 成功cancel,则isCancelled和isDoned都返回true。boolean cancel(boolean mayInterruptIfRunning);// 任务是否已取消boolean isCancelled();// 正常执行,被取消,异常退出都返回trueboolean isDone();// 阻塞等待执行结果// CancellationException:任务被取消// ExecutionException:任务执行异常// InterruptedException:该等待结果线程被中断V get() throws InterruptedException, ExecutionException;// 阻塞等待执行结果指定时间,除了以上异常,// TimeoutException:等待超时V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;
}
FutureTask
Future
接口和实现Future接口的FutureTask
类,代表异步计算的结果。
FutureTask
除了实现Future
接口外,还实现了Runnable
接口。因此,FutureTask
可以交给Executor
执行,也可以由调用线程直接执行(FutureTask.run()
)。
可以把FutureTask
交给Executor
执行;也可以通过ExecutorService.submit()
方法返回一个FutureTask
,然后执行FutureTask.get()
方法或FutureTask.cancel()
方法。除此以外,还可以单独使用FutureTask
。
FutureTask的三种状态
根据FutureTask.run()
方法被执行的时机,FutureTask
可以处于下面3种状态:
- 未启动:当创建一个
FutureTask
且没有执行FutureTask.run()
方法之前,这个FutureTask
处于未启动状态。 - 已启动:
FutureTask.run()
方法被执行的过程中,FutureTask
处于已启动状态。 - 已完成:
FutureTask.run()
方法执行完后正常结束,或被取消(FutureTask.cancel()
),或执行FutureTask.run()
方法时抛出异常而异常结束,FutureTask
处于已完成状态
当FutureTask
处于未启动或已启动状态时,执行FutureTask.get()
方法将导致调用线程阻塞;当FutureTask
处于已完成状态时,执行FutureTask.get()
方法将导致调用线程立即返回结果或抛出异常。
当FutureTask
处于未启动状态时,执行FutureTask.cancel()
方法将导致此任务永远不会被执行;当FutureTask
处于已启动状态时,执行FutureTask.cancel(
true
)
方法将以中断执行此任务线程的方式来试图停止任务;当FutureTask
处于已启动状态时,执行FutureTask.cancel(
false
)
方法将不会对正在执行此任务的线程产生影响(让正在执行的任务运行完成);当FutureTask
处于已完成状态时,执行FutureTask.cancel()
方法将返回false。
Java并发编程之 Excutor相关推荐
- zbb20180929 thread java并发编程之Condition
java并发编程之Condition 引言 在java中,对于任意一个java对象,它都拥有一组定义在java.lang.Object上监视器方法,包括wait(),wait(long timeout ...
- java并发编程之4——Java锁分解锁分段技术
转载自 java并发编程之4--Java锁分解锁分段技术 并发编程的所有问题,最后都转换成了,"有状态bean"的状态的同步与互斥修改问题.而最后提出的解决"有状态bea ...
- Java 并发编程之美:并发编程高级篇之一-chat
借用 Java 并发编程实践中的话:编写正确的程序并不容易,而编写正常的并发程序就更难了.相比于顺序执行的情况,多线程的线程安全问题是微妙而且出乎意料的,因为在没有进行适当同步的情况下多线程中各个操作 ...
- Java 并发编程之美:并发编程高级篇之一
借用 Java 并发编程实践中的话:编写正确的程序并不容易,而编写正常的并发程序就更难了.相比于顺序执行的情况,多线程的线程安全问题是微妙而且出乎意料的,因为在没有进行适当同步的情况下多线程中各个操作 ...
- Java并发编程之CAS第三篇-CAS的缺点
Java并发编程之CAS第三篇-CAS的缺点 通过前两篇的文章介绍,我们知道了CAS是什么以及查看源码了解CAS原理.那么在多线程并发环境中,的缺点是什么呢?这篇文章我们就来讨论讨论 本篇是<凯 ...
- Java并发编程之CyclicBarrier详解
简介 栅栏类似于闭锁,它能阻塞一组线程直到某个事件的发生.栅栏与闭锁的关键区别在于,所有的线程必须同时到达栅栏位置,才能继续执行.闭锁用于等待事件,而栅栏用于等待其他线程. CyclicBarrier ...
- java并发编程之AbstractQueuedSynchronizer
引言 AbstractQueuedSynchronizer,队列同步器,简称AQS,它是java并发用来构建锁或者其他同步组件的基础框架. 一般使用AQS的主要方式是继承,子类通过实现它提供的抽象方法 ...
- Java并发编程之synchronized关键字解析
前言 公司加班太狠了,都没啥时间充电,这周终于结束了.这次整理了Java并发编程里面的synchronized关键字,又称为隐式锁,与JUC包中的Lock显示锁相对应:这个关键字从Java诞生开始就有 ...
- Java并发编程之AbstractQueuedSynchronizer(AQS)源码解析
自己一个人随便看看源码学习的心得,分享一下啦,不过我觉得还是建议去买本Java并发编程的书来看会比较好点,毕竟个人的理解有限嘛. 独占锁和共享锁 首先先引入这两个锁的概念: 独占锁即同一时刻只有一个线 ...
最新文章
- case的执行顺序 嵌套使用
- 5分钟在超能云(SuperVessel)上免费创建属于自己的大数据环境
- 003 Preconditons
- 十天冲刺---Day8
- 文件系统管理 之 文件和目录访问权限设置
- 入门实践,Python数据分析
- [Aaronyang] 写给自己的WPF4.5 笔记[2依赖属性]
- 计算机在车联网的应用,刘小洋, 伍民友. 车联网: 物联网在城市交通网络中的应用[J]. 计算机应用, 2012, 32(4): 900-904....
- python函数参数的部分求值方法
- excel多元线性拟合_[求助]excel里面的linest函数中多元回归怎么用啊?
- Java开发基于控制台的购书系统
- 【深度学习+组合优化】深度学习和强化学习在组合优化方面有哪些应用?
- elasticsearch2.2之index映射参数的not_analyzed属性
- python画圣诞帽_用Python就可以给你的头像戴上圣诞帽,别@微信团队了!
- 不可思议有氧机器人_不思议迷宫奇怪的机器人怎么得?不思议迷宫奇怪的机器人获取一览...
- 爪哇国新游记之十七----肺腑之言
- 红海厮杀的超融合 泽塔云竟用GPU云开辟一片蓝海
- GFlags调试堆中野指针
- “云”中智控 IT管理新境界
- vue-simple-uploader上传组件