文章目录

  • Executor框架的结构
  • Executor框架的执行流程
  • Executor框架的成员
    • 线程池ThreadPoolExecutor
      • 三大方法七大参数四种拒绝策略
      • 线程池的处理流程
      • `execute()`和`submit()`的区别
      • `shutdown()`和`shitdownNow()`的区别
      • 线程池的监控
      • FixedThreadPool
      • SingleThreadExecutor
      • CachedThreadPool
      • 为什么不要用Executors来创建线程
    • ScheduledThreadPoolExecutor
    • Future
    • FutureTask
      • FutureTask的三种状态

在Java中,使用线程来异步执行任务。Java线程的创建与销毁需要一定的开销,如果我们为每一个任务创建一个新线程来执行,这些线程的创建与销毁将消耗大量的计算资源。同时,为每一个任务创建一个新线程来执行,这种策略可能会使处于高负荷状态的应用最终崩溃。

Java的线程既是工作单元,也是执行机制。从JDK 5开始,把工作单元与执行机制分离开来。工作单元包括RunnableCallable,而执行机制由Executor框架提供

在HotSpot VM的线程模型中,Java线程(java.lang.Thread)被一对一映射为本地操作系统线程。Java线程启动时会创建一个本地操作系统线程;当该Java线程终止时,这个操作系统线程也会被回收。

在上层,Java多线程程序通常把应用分解为若干个任务,然后使用用户级的调度器(Executor框架)将这些任务映射为固定数量的线程;在底层,操作系统内核将这些线程映射到硬件处理器上:

Executor框架的结构

任务: 包括被执行任务需要实现的接口:Runnable接口或Callable接口。

任务的执行: 包括任务执行机制的核心接口Executor,以及继承自ExecutorExecutorService接口。ExecutorService接口有两个关键的实现类:ThreadPoolExecutorScheduledThreadPoolExecutor

异步计算的结果: 包括接口Future和实现Future接口的FutureTask类。

  • Executor是一个接口,它是Executor框架的基础,它将任务的提交与任务的执行分离开来。
  • ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务。
  • ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。ScheduledThreadPoolExecutorTimer更灵活,功能更强大。
  • Future接口和实现Future接口的FutureTask类,代表异步计算的结果。
  • Runnable接口和Callable接口的实现类,都可以被ThreadPoolExecutorScheduledThreadPoolExecutor执行。

Executor框架的执行流程

  1. 主线程首先要创建实现Runnable或者Callable接口的任务对象。工具类Executors可以把一个Runnable对象封装为一个Callable对象(Executors.callable(Runnable task)Executors.callable(Runnable task, Object resule))。
  2. 然后可以把Runnable对象直接交给ExecutorService执行(ExecutorService.execute(Runnable command)),或者也可以把Runnable对象或Callable对象提交给ExecutorService执行(ExecutorService.submit(Runnable task)ExecutorService.submit(Callable<T>task))。
  3. 如果执行的是ExecutorServicesubmit()方法,ExecutorService将返回一个实现Future接口的对象(FutureTask对象)。由于FutureTask实现了Runnable,程序员也可以创建FutureTask,然后直接交给ExecutorService执行。
  4. 最后,主线程可以执行FutureTask.get()方法来等待任务执行完成。主线程也可以执行FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行。

Executor框架的成员

线程池ThreadPoolExecutor

三大方法七大参数四种拒绝策略

Executor框架最核心的类是ThreadPoolExecutor,它是线程池的实现类。

三大方法
ThreadPoolExecutor通常使用工厂类Executors来创建。Executors可以创建3种类型的ThreadPoolExecutorSingleThreadExecutorFixedThreadPoolCachedThreadPool

  • Executors.newSingleThreadExecutor()
  • Executors.newFixedThreadPool()
  • Executors.newCachedThreadPool()

七大参数

  • int corePoolSize核心线程数。(要保留在池中的线​​程数,即使它们处于空闲状态,除非设置了allowCoreThreadTimeOut
  • int maximumPoolSize最大线程数。(如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务;如果使用了无界的任务队列这个参数是无效的。)
    • 对于CPU密集型任务,可以将该参数设置为 CPU核数 + 1
    • 对于IO密集型任务,可以将该参数设置为 CPU核数 * 2
  • long keepAliveTime超时释放时间。(当线程数大于核心数时,这是多余空闲线程在终止前等待新任务的最长时间)
  • TimeUnit unit超时释放时间的单位。(枚举类TimeUnit的常量)
  • BlockingQueue<Runnable> workQueue阻塞队列。(用于在执行任务之前保存任务的队列, 这个队列将只保存execute方法提交的Runnable任务)
    • ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按FIFO(先进先出)原则对元素进行排序。
    • LinkedBlockingQueue:一个基于链表结构的阻塞队列,是一个容量为Integer.MAX_VALUE的队列(无界队列)。此队列按FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
    • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool()使用了这个队列。
    • PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
  • ThreadFactory threadFactory线程工厂。(执行程序创建新线程时使用的工厂)
  • RejectedExecutionHandler handler拒绝策略。(当提交任务数超过maxmumPoolSize + workQueue 之和时,任务会交给handler来处理)

四种拒绝策略

  • AbortPolicy:抛出RejectedExecutionException异常
  • DiscardPolicy:丢掉任务,不抛异常
  • DiscardOldestPolicy:不抛异常,尝试去和最早的去竞争,竞争失败再丢掉任务
  • CallerRunsPolicy:哪来的回哪里(将被拒绝的任务任务返回给execute()方法的调用线程中运行)

线程池的处理流程


execute()submit()的区别

public interface Executor {void execute(Runnable command);
}public interface ExecutorService extends Executor {<T> Future<T> submit(Callable<T> task);<T> Future<T> submit(Runnable task, T result);Future<?> submit(Runnable task);
}
  1. execute()方法是Executor接口的唯一方法,submit()方法是ExecutorService的,一共有三个重载。
  2. execute()方法只能接收Runnable接口的实现类作为参数,submit()方法可以接收RunnableCallable接口的实现类作为参数。
  3. execute()方法没有返回值,submit()方法用于提交需要返回值的任务。线程池会返回一个Future类型的对象,通过这个对象可以判断任务是否执行成功,并且可以通过get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用get(long timeout, TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。
  4. execute()方法无法处理异常,只会抛出,submit()方法可以通过返回结果的Future对象的get()方法对异常进行处理。

shutdown()shitdownNow()的区别

可以通过调用线程池的shutdown()shutdownNow()方法来关闭线程池。它们的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt()方法来中断线程,所以无法响应中断的任务可能永远无法终止。

shutdown()只是将线程池的状态设置为SHUTWDOWN状态,正在执行的任务会继续执行下去,没有被执行的则中断。而shutdownNow()则是将线程池的状态设置为STOP,正在执行的任务则被停止,没被执行任务的则返回。(shutdown()不着急,shutdownNow()很着急

线程池的监控

// 返回正在积极执行任务的线程的大致数量
executor.getActiveCount();
// 返回已安排执行的大致任务总数
// 由于任务和线程的状态在计算过程中可能会动态变化,因此返回值只是一个近似值
executor.getTaskCount();
// 返回已完成执行的大致任务总数
// 由于任务和线程的状态在计算过程中可能会动态变化,因此返回值只是一个近似值,但在连续调用中永远不会减少
executor.getCompletedTaskCount();
// 返回曾经同时进入池中的最大线程数
// 通过这个数据可以知道线程池是否曾经满过:如该数值等于线程池的最大大小,则表示线程池曾经满过。
executor.getLargestPoolSize();
// 返回当前池中的线程数
executor.getPoolSize();

可以通过继承线程池来自定义线程池,重写线程池的beforeExecute()afterExecute()terminated()方法,也可以在任务执行前、执行后和线程池关闭前执行一些代码来进行监控。

protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }

FixedThreadPool

FixedThreadPool被称为可重用固定线程数的线程池

public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}

FixedThreadPool的核心线程数corePoolSize和最大线程数maximumPoolSize都被设置为创建FixedThreadPool时指定的参数nThreads。

当线程池中的线程数大于核心线程数corePoolSizekeepAliveTime为多余的空闲线程等待新任务的最长时间,超过这个时间后多余的线程将被终止。这里把keepAliveTime设置为0L,意味着多余的空闲线程会被立即终止

  • 如果当前运行的线程数少于corePoolSize,则创建新线程来执行任务
  • 在线程池完成预热之后(当前运行的线程数等于corePoolSize),将任务加入LinkedBlockingQueue
  • 线程执行完任务后,会在循环中反复从LinkedBlockingQueue获取任务来执行

SingleThreadExecutor

SingleThreadExecutor是使用单个worker线程的Executor(corePoolSizemaximumPoolSize被设置为1)。

public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}

  • 如果当前运行的线程数少于corePoolSize(即线程池中无运行的线程),则创建一个新线程来执行任务
  • 如果当前线程池中有一个运行的线程,则将任务加入LinkedBlockingQueue
  • 核心线程执行完任务后,会在一个无限循环中反复从LinkedBlockingQueue获取任务来执行

CachedThreadPool

CachedThreadPool是一个会根据需要创建新线程的线程池。

public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}

CachedThreadPoolcorePoolSize被设置为0,即corePool为空;maximumPoolSize被设置为Integer.MAX_VALUE,即 maximumPool是无界的。这里把keepAliveTime设置为60L,意味着CachedThreadPool中的空闲线程等待新任务的最长时间为60秒,空闲线程超过60秒后将会被终止。

SynchronousQueue是一个没有容量的阻塞队列。每个插入操作必须等待另一个线程的对应移除操作,反之亦然。CachedThreadPool使用SynchronousQueue,把主线程提交的任务传递给空闲线程执行:

为什么不要用Executors来创建线程

  • FixedThreadPoolSignalThreadExecutor
    FixedThreadPoolSignalThreadExecutor的阻塞队列都为LinkedBlockingQueueLinkedBlockingQueue的容量为Integer.MAX_VALUE无界队列),由于无界队列的存在,会导致maximumPoolSizekeepAliveTime是无效的,也就是说新的任务会一直添加到无界队列中去,虽然这个“无界队列”其实是有界的,但是还没等这个队列被填满,就OOM了。同时,由于无界队列不可能被正常填满,因此拒绝策略handler也是形同虚设。
  • CachedThreadPool
    CachedThreadPoolmaximumPoolSize被设置为Integer.MAX_VALUE,即 maximumPool是无界的;同时CachedThreadPool的阻塞队列为没有容量的SynchronousQueue。这意味着,如果主线程提交任务的速度高于maximumPool中线程处理任务的速度时,CachedThreadPool会不断创建新线程。极端情况下,CachedThreadPool会因为创建过多线程而耗尽CPU和内存资源(OOM)。

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor继承自ThreadPoolExecutor。它主要用来在给定的延迟之后运行任务,或者定期执行任务。

它有三种调度任务的方式:

  • schedule():延迟多长时间之后只执行一次;

    ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(10);
    System.out.println(LocalDateTime.now());
    scheduled.schedule(new Runnable() {@Overridepublic void run() {System.out.println(LocalDateTime.now());}
    }, 4, TimeUnit.SECONDS);
    

  • scheduledAtFixedRate():延迟指定时间后执行一次,之后按照固定的时长周期执行;

    ScheduledThreadPoolExecutor  scheduled = new ScheduledThreadPoolExecutor(10);
    scheduled.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {System.out.println(LocalDateTime.now());}
    }, 0, 4, TimeUnit.SECONDS);
    

  • scheduleWithFixedDelay():延迟指定时间后执行一次,之后按照:上一次任务执行时长 + 周期的时长 的时间去周期执行;

    ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(10);
    scheduled.scheduleWithFixedDelay(new Runnable() {@SneakyThrows@Overridepublic void run() {Thread.sleep(1000);System.out.println(LocalDateTime.now());}
    }, 0,4, TimeUnit.SECONDS);
    

ScheduledThreadPoolExecutor有四种构造器,用来指定核心线程数、线程工厂、拒绝策略:

因为ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,所以它的构造器都是通过super调用的父类的构造器:

super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler);

DelayedWorkQueue是一个无界队列,因此这里把maximumPoolSize也设为了算是无穷大吧(没什么意义)…

ScheduledThreadPoolExecutor会把待调度的任务封装为一个ScheduledFutureTaskScheduledThreadPoolExecutor的私有内部类)对象放到优先队列DelayedWorkQueueScheduledThreadPoolExecutor的静态内部类)中。

ScheduledFutureTask主要包含3个成员变量:

  • time:表示这个任务将要被执行的具体时间
  • sequenceNumber:表示这个任务被添加到ScheduledThreadPoolExecutor中的序号
  • period:表示任务执行的间隔周期

DelayedWorkQueue会对队列中的ScheduledFutureTask进行排序。排序时,time小的排在前面(时间早的任务将被先执行)。如果两个ScheduledFutureTasktime相同,就比较sequenceNumbersequenceNumber小的排在前面(也就是说,如果两个任务的执行时间相同,那么先提交的任务将被先执行)。

执行流程

  1. 调用DelayedWorkQueuetake()方法获取一个已到期(ScheduledFutureTasktime大于等于当前时间)的ScheduledFutureTask对象
  2. 调用ScheduledFutureTask对象的run()方法执行ScheduledFutureTask对象
  3. 调用ScheduledFutureTask对象的setNextRunTime()方法修改ScheduledFutureTasktime变量为下次将要被执行的时间
  4. 调用ScheduledFutureTask对象的reExecutePeriodic()方法将修改time之后的ScheduledFutureTask放回DelayedWorkQueue

Future

Future接口主要提供了异步返回任务执行结果,取消任务执行,获取任务执行状态的功能,接口定义如下:

public interface Future<V> {// 取消任务执行// mayInterruptIfRunning用于控制如果任务正在执行,是否中断对应的执行线程来取消该任务// 成功cancel,则isCancelled和isDoned都返回true。boolean cancel(boolean mayInterruptIfRunning);// 任务是否已取消boolean isCancelled();// 正常执行,被取消,异常退出都返回trueboolean isDone();// 阻塞等待执行结果// CancellationException:任务被取消// ExecutionException:任务执行异常// InterruptedException:该等待结果线程被中断V get() throws InterruptedException, ExecutionException;// 阻塞等待执行结果指定时间,除了以上异常,// TimeoutException:等待超时V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;
}

FutureTask

Future接口和实现Future接口的FutureTask类,代表异步计算的结果

FutureTask除了实现Future接口外,还实现了Runnable接口。因此,FutureTask可以交给Executor执行,也可以由调用线程直接执行(FutureTask.run())。

可以把FutureTask交给Executor执行;也可以通过ExecutorService.submit()方法返回一个FutureTask,然后执行FutureTask.get()方法或FutureTask.cancel()方法。除此以外,还可以单独使用FutureTask

FutureTask的三种状态

根据FutureTask.run()方法被执行的时机,FutureTask可以处于下面3种状态:

  • 未启动:当创建一个FutureTask且没有执行FutureTask.run()方法之前,这个FutureTask处于未启动状态。
  • 已启动FutureTask.run()方法被执行的过程中,FutureTask处于已启动状态。
  • 已完成FutureTask.run()方法执行完后正常结束,或被取消(FutureTask.cancel()),或执行FutureTask.run()方法时抛出异常而异常结束,FutureTask处于已完成状态


FutureTask处于未启动已启动状态时,执行FutureTask.get()方法将导致调用线程阻塞;当FutureTask处于已完成状态时,执行FutureTask.get()方法将导致调用线程立即返回结果或抛出异常

FutureTask处于未启动状态时,执行FutureTask.cancel()方法将导致此任务永远不会被执行;当FutureTask处于已启动状态时,执行FutureTask.cancel(true)方法将以中断执行此任务线程的方式来试图停止任务;当FutureTask处于已启动状态时,执行FutureTask.cancel(false)方法将不会对正在执行此任务的线程产生影响(让正在执行的任务运行完成);当FutureTask处于已完成状态时,执行FutureTask.cancel()方法将返回false

Java并发编程之 Excutor相关推荐

  1. zbb20180929 thread java并发编程之Condition

    java并发编程之Condition 引言 在java中,对于任意一个java对象,它都拥有一组定义在java.lang.Object上监视器方法,包括wait(),wait(long timeout ...

  2. java并发编程之4——Java锁分解锁分段技术

    转载自 java并发编程之4--Java锁分解锁分段技术 并发编程的所有问题,最后都转换成了,"有状态bean"的状态的同步与互斥修改问题.而最后提出的解决"有状态bea ...

  3. Java 并发编程之美:并发编程高级篇之一-chat

    借用 Java 并发编程实践中的话:编写正确的程序并不容易,而编写正常的并发程序就更难了.相比于顺序执行的情况,多线程的线程安全问题是微妙而且出乎意料的,因为在没有进行适当同步的情况下多线程中各个操作 ...

  4. Java 并发编程之美:并发编程高级篇之一

    借用 Java 并发编程实践中的话:编写正确的程序并不容易,而编写正常的并发程序就更难了.相比于顺序执行的情况,多线程的线程安全问题是微妙而且出乎意料的,因为在没有进行适当同步的情况下多线程中各个操作 ...

  5. Java并发编程之CAS第三篇-CAS的缺点

    Java并发编程之CAS第三篇-CAS的缺点 通过前两篇的文章介绍,我们知道了CAS是什么以及查看源码了解CAS原理.那么在多线程并发环境中,的缺点是什么呢?这篇文章我们就来讨论讨论 本篇是<凯 ...

  6. Java并发编程之CyclicBarrier详解

    简介 栅栏类似于闭锁,它能阻塞一组线程直到某个事件的发生.栅栏与闭锁的关键区别在于,所有的线程必须同时到达栅栏位置,才能继续执行.闭锁用于等待事件,而栅栏用于等待其他线程. CyclicBarrier ...

  7. java并发编程之AbstractQueuedSynchronizer

    引言 AbstractQueuedSynchronizer,队列同步器,简称AQS,它是java并发用来构建锁或者其他同步组件的基础框架. 一般使用AQS的主要方式是继承,子类通过实现它提供的抽象方法 ...

  8. Java并发编程之synchronized关键字解析

    前言 公司加班太狠了,都没啥时间充电,这周终于结束了.这次整理了Java并发编程里面的synchronized关键字,又称为隐式锁,与JUC包中的Lock显示锁相对应:这个关键字从Java诞生开始就有 ...

  9. Java并发编程之AbstractQueuedSynchronizer(AQS)源码解析

    自己一个人随便看看源码学习的心得,分享一下啦,不过我觉得还是建议去买本Java并发编程的书来看会比较好点,毕竟个人的理解有限嘛. 独占锁和共享锁 首先先引入这两个锁的概念: 独占锁即同一时刻只有一个线 ...

最新文章

  1. case的执行顺序 嵌套使用
  2. 5分钟在超能云(SuperVessel)上免费创建属于自己的大数据环境
  3. 003 Preconditons
  4. 十天冲刺---Day8
  5. 文件系统管理 之 文件和目录访问权限设置
  6. 入门实践,Python数据分析
  7. [Aaronyang] 写给自己的WPF4.5 笔记[2依赖属性]
  8. 计算机在车联网的应用,刘小洋, 伍民友. 车联网: 物联网在城市交通网络中的应用[J]. 计算机应用, 2012, 32(4): 900-904....
  9. python函数参数的部分求值方法
  10. excel多元线性拟合_[求助]excel里面的linest函数中多元回归怎么用啊?
  11. Java开发基于控制台的购书系统
  12. 【深度学习+组合优化】深度学习和强化学习在组合优化方面有哪些应用?
  13. elasticsearch2.2之index映射参数的not_analyzed属性
  14. python画圣诞帽_用Python就可以给你的头像戴上圣诞帽,别@微信团队了!
  15. 不可思议有氧机器人_不思议迷宫奇怪的机器人怎么得?不思议迷宫奇怪的机器人获取一览...
  16. 爪哇国新游记之十七----肺腑之言
  17. 红海厮杀的超融合 泽塔云竟用GPU云开辟一片蓝海
  18. GFlags调试堆中野指针
  19. “云”中智控 IT管理新境界
  20. vue-simple-uploader上传组件

热门文章

  1. JavaScript获得字符串实际长度
  2. php separator,PHP常量DIRECTORY_SEPARATOR原理及用法解析
  3. Jvectormap中文帮助文档(API)
  4. 微信小程序滚动Tab选项卡:左右滑动切换、触底加载分页
  5. 【软件安全:软件安全技术课后习题及答案】
  6. Java中接口如何继承接口呢?
  7. windows 程序员装机必备软件
  8. 学习STM32单片机,从菜鸟到牛人就是这么简单
  9. lisp绘制法兰_lisp语言画键槽_用LISP语言自定义AutoCAD命令
  10. Python Scrapy 爬虫 - 爬取多级别的页面