文章目录

  • 面试官:能给我讲讲线程池的实现原理?
  • 线程池类继承关系
  • ThreadPoolExecutor
    • 核心数据结构
    • 面试官:给我讲讲线程池的有哪些参数?
    • 面试官:如何优雅的关闭线程?
      • 线程的生命周期
    • 面试官:线程池哪五种状态?
    • 面试官:线程池哪4种拒绝策略?并分别说一下作用和实现原理?
      • DiscardOldestPolicy
      • AbortPolicy
      • DiscardPolicy
      • CallerRunsPolicy
    • 面试官:线程池常用的阻塞队列有哪些?能说下各自的区别?
      • SynchronousQueue应用
      • PriorityBlockedQueue应用
      • DelayQueue应用
    • 面试官:如何结合业务合理的配置线程池参数?CPU密集型和IO密集型如何配置?线程设置过多会造成什么影响?
      • CPU 密集型任务
      • IO密集型任务
    • 面试官:给我讲讲什么是线程复用?
    • 面试官:为什么《阿里巴巴开发手册》不推荐使用Executor创建线程?
  • ScheduledThreadPoolExecutor
    • 延时执行
    • 周期执行
    • 面试题:你知道延迟执行、周期性执行任务实现原理?
    • 面试题:为什么不使用Timer而使用ScheduledThreadPoolExecutor?
  • CompletableFuture异步编程工具
    • 基本使用
    • 四种任务原型
    • 面试题:你知道CompletableFuture内部原理?
      • CompletableFuture的构造:ForkJoinPool
      • 任务类型的适配
      • 任务的链式执行过程分析
  • 什么是 Java8 的 ForkJoinPool?
    • 应用
    • 核心数据结构

面试官:能给我讲讲线程池的实现原理?

声:回答该问题需要了解线程池有哪些方法并讲解每个方法的作用,以及各个类的继承关系,线程池的运行原理,线程池的状态转换、生命周期,线程池的构造参数,线程池Runnable->Worker->Thread执行任务->线程复用机制等

线程池类继承关系

ThreadPoolExecutor

核心数据结构

public class ThreadPoolExecutor extends AbstractExecutorService {//存储线程池的状态和线程数量private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));// 存放任务的阻塞队列private final BlockingQueue<Runnable> workQueue;// 对线程池内部各种变量进行互斥访问控制private final ReentrantLock mainLock = new ReentrantLock();// 线程集合private final HashSet<Worker> workers = new HashSet<Worker>();

每一个线程是一个Worker对象,Worker是ThreadPoolExecutor内部类,核心数据结构如下:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {final Thread thread; // Worker封装的线程Runnable firstTask; // Worker接收到的第1个任务volatile long completedTasks; // Worker执行完毕的任务个数
}

由定义会发现,Worker继承于AQS,也就是说Worker本身就是一把锁。这把锁有什么用处呢?用于线程池的关闭、线程执行任务的过程中。

面试官:给我讲讲线程池的有哪些参数?

ThreadPoolExecutor在其构造方法中提供了几个核心配置参数,来配置不同策略的线程池。

    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
  1. corePoolSize:核心线程数-线程池中始终维护的线程
  2. MaxPoolSize:最大线程数-达到核心线程数并且阻塞队列慢的时候会扩充到最大线程数
  3. KeepAliveTime、TimeUnit:空闲超过该时间后线程会被销毁
  4. WorkQueue:任务阻塞队列-当核心线程满的时候会放入阻塞队列中
  5. ThreadFactory:线程工厂-可以根据业务自定义创建线程,修改线程名称
  6. Handler:拒绝策略-最大线程满并且阻塞队列慢了之后新的任务进来会触发拒绝策略

面试官:如何优雅的关闭线程?

线程池的关闭比线程的关闭更加复杂,因为线程池的关闭涉及到很多场景,如果有线程正在执行任务?如果任务队列不为空?还有当前线程进来如何处理,因此,关闭过程不可能是瞬时的,而是需要一个平滑的过渡,这就涉及线程池的完整生命周期管理。

线程的生命周期

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

在JDK 7中,把线程数量(workerCount)和线程池状态(runState)这两个变量打包存储在一个字
段里面,即ctl变量。如下图所示,最高的3位存储线程池状态,其余29位存储线程个数。而在JDK 6中,
这两个变量是分开存储的。


关于内部封装的获取生命周期状态、获取线程池线程数量的计算方法如以下代码所示

private static int runStateOf(int c)     { return c & ~CAPACITY; } //计算当前运行状态
private static int workerCountOf(int c)  { return c & CAPACITY; }  //计算当前线程数量
private static int ctlOf(int rs, int wc) { return rs | wc; }   //通过状态和线程数生成ctl

面试官:ctl为什么这样设计?这样做的好处?

用一个变量去存储两个值,可避免在做相关决策时,出现不一致的情况,不必为了维护两者的一致,而占用锁资源。通过阅读线程池源代码也可以发现,经常出现要同时判断线程池运行状态和线程数量的情况。线程池也提供了若干方法去供用户获得线程池当前的运行状态、线程个数。这里都使用的是位运算的方式,相比于基本运算,速度也会快很多。

线程状态转换过程:

状态解释:

切记:线程状态-1、0、1、2、3转化只能从小到大,而不能逆向转换。
除 terminated()之外,线程池还提供了其他几个钩子方法,这些方法的实现都是空的。如果想实现
自己的线程池,可以重写这几个方法:

protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }

面试官:线程池哪五种状态?

    // runState is stored in the high-order bitsprivate static final int RUNNING    = -1 << COUNT_BITS;private static final int SHUTDOWN   =  0 << COUNT_BITS;private static final int STOP       =  1 << COUNT_BITS;private static final int TIDYING    =  2 << COUNT_BITS;private static final int TERMINATED =  3 << COUNT_BITS;

面试官:线程池哪4种拒绝策略?并分别说一下作用和实现原理?

接口类:

public interface RejectedExecutionHandler {void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

实现类:

DiscardOldestPolicy

    public static class DiscardOldestPolicy implements RejectedExecutionHandler {public DiscardOldestPolicy() { }/*** 从任务队列中调用poll()方法删除最先入队列的(最老的)任务* 拓展:队列是先进先出,由此调用poll()方法是取出的是先入队列的数据*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {e.getQueue().poll();e.execute(r);}}}

AbortPolicy

    public static class AbortPolicy implements RejectedExecutionHandler {/*** Creates an {@code AbortPolicy}.*/public AbortPolicy() { }/*** Always throws RejectedExecutionException.* 丢弃准备添加的任务并抛出异常* @param r the runnable task requested to be executed* @param e the executor attempting to execute this task* @throws RejectedExecutionException always*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException("Task " + r.toString() +" rejected from " +e.toString());}}

DiscardPolicy

    public static class DiscardPolicy implements RejectedExecutionHandler {/*** Creates a {@code DiscardPolicy}.*/public DiscardPolicy() { }/*** Does nothing, which has the effect of discarding task r.* 不做任何处理,丢弃准备添加的任务* @param r the runnable task requested to be executed* @param e the executor attempting to execute this task*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}}

CallerRunsPolicy

    public static class CallerRunsPolicy implements RejectedExecutionHandler {/*** Creates a {@code CallerRunsPolicy}.*/public CallerRunsPolicy() { }/*** Executes task r in the caller's thread, unless the executor* has been shut down, in which case the task is discarded.* 准备添加的任务,直接调用run()方法交给提交任务的线程执行* @param r the runnable task requested to be executed* @param e the executor attempting to execute this task*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {r.run();}}}

面试官:线程池常用的阻塞队列有哪些?能说下各自的区别?

队列 说明
ArrayBlockedQueue 数组实现有界队列,FIFO先入先出,支持公平锁、非公平锁
LinkedBlockedQueue 单链表实现的有界队列,如果不指定容量默认为Integer.MAX_VALUE
SynchronousQueue 不存储元素的队列,每个put()操作时必须有线程正在调用take(),该元素才存在,Executors.newCacheThreadPool()就使用该队列,每来一个任务如果没有空闲线程(线程复用)则创建新线程执行任务
PriorityBlockedQueue 无界的优先队列,默认按自然排序,自定义实现compareTo()定制自己优先级,不同保证同优先级顺序
DelayQueue 无界延迟队列,利用PriorityBlockedQueue实现,在创建元素时可以指定多久能够获取到该元素,只有满足延迟时间才能获取到数据,ScheduledThreadPoolExecutor定时任务就是利用自己实现的延时队列(思想一致)

SynchronousQueue应用

    @Testpublic void SynchronousQueue() throws InterruptedException {SynchronousQueue<Integer> queue = new SynchronousQueue<>();Random random = new Random();AtomicInteger ait = new AtomicInteger(0);new Thread(() -> {try {for (int i = 0; i < 3; i++) {Integer integer = queue.take();if (integer != null){int count = ait.incrementAndGet();System.out.println(count + "-" + integer);}}} catch (InterruptedException e) {e.printStackTrace();}}).start();TimeUnit.SECONDS.sleep(3);new Thread(() -> {for (int i = 0; i < 3; i++) {queue.offer(random.nextInt());}}).start();TimeUnit.SECONDS.sleep(5);}

PriorityBlockedQueue应用

和PriorityQueue使用一样,无非就是加了锁阻塞生产、消费者线程

    @Testpublic void priorityQueue(){PriorityQueue<Integer> queue = new PriorityQueue<>(new Comparator<Integer>() {@Overridepublic int compare(Integer o1, Integer o2) {return Integer.compare(o1, o2);}});queue.add(2);queue.add(1);queue.add(3);while (!queue.isEmpty()){System.out.println(queue.poll());}PriorityQueue<CustomRank> queue2 = new PriorityQueue<>();queue2.add(new CustomRank(2));queue2.add(new CustomRank(1));queue2.add(new CustomRank(3));while (!queue2.isEmpty()){System.out.println(queue2.poll().v);}}public class CustomRank implements Comparable<CustomRank>{Integer v;public CustomRank(Integer v) {this.v = v;}@Overridepublic int compareTo(CustomRank o) {return Integer.compare(this.v, o.v);}}

DelayQueue应用

    @Testpublic void delayQueue() throws InterruptedException {DelayQueue<CustomTimeTask> queue = new DelayQueue<>();queue.add(new CustomTimeTask("我是第一个任务", 4, TimeUnit.SECONDS));queue.add(new CustomTimeTask("我是第二个任务", 8, TimeUnit.SECONDS));queue.add(new CustomTimeTask("我是第三个任务", 16, TimeUnit.SECONDS));while (!queue.isEmpty()){CustomTimeTask task = queue.take();System.out.format("name: {%s}, time: {%s} \n", task.name, new Date());}}class CustomTimeTask implements Delayed{//触发时间long time;//任务名称String name;public CustomTimeTask(String name,long time, TimeUnit timeUnit) {this.time = System.currentTimeMillis() + timeUnit.toMillis(time);this.name = name;}@Overridepublic long getDelay(TimeUnit unit) {return time - System.currentTimeMillis();}/*** 利用优先队列将任务按照触发时间从小到大排序* @param o* @return*/@Overridepublic int compareTo(Delayed o) {CustomTimeTask other = (CustomTimeTask) o;return Long.compare(this.time, other.time);}@Overridepublic String toString() {return "CustomTimeTask{" +"time=" + time +", name='" + name + '\'' +'}';}}

面试官:如何结合业务合理的配置线程池参数?CPU密集型和IO密集型如何配置?线程设置过多会造成什么影响?

答案:其实没有完整的公式去计算,我在使用的时候一般是根据业务场景,动态的去改变线程池参数选择最优配置方案

CPU 密集型任务

IO密集型任务

面试官:给我讲讲什么是线程复用?

什么是线程复用?
通过同一个线程去执行不同的任务,这就是线程复用。

java.util.concurrent.ThreadPoolExecutor#execute

 public void execute(Runnable command) {// 如果传入的Runnable的空,就抛出异常if (command == null)throw new NullPointerException();int c = ctl.get();// 线程池中的线程比核心线程数少 if (workerCountOf(c) < corePoolSize) {// 新建一个核心线程执行任务if (addWorker(command, true))return;c = ctl.get();}// 核心线程已满,但是任务队列未满,添加到队列中if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();// 任务成功添加到队列以后,再次检查是否需要添加新的线程,因为已存在的线程可能被销毁了if (! isRunning(recheck) && remove(command))// 如果线程池处于非运行状态,并且把当前的任务从任务队列中移除成功,则拒绝该任务reject(command);else if (workerCountOf(recheck) == 0)// 如果之前的线程已经被销毁完,新建一个非核心线程addWorker(null, false);}// 核心线程池已满,队列已满,尝试创建一个非核心新的线程else if (!addWorker(command, false))// 如果创建新线程失败,说明线程池关闭或者线程池满了,拒绝任务reject(command);}

线程复用源码分析:java.util.concurrent.ThreadPoolExecutor#runWorker

    final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // 释放锁 设置work的state=0 允许中断boolean completedAbruptly = true;try {//一直执行 如果task不为空 或者 从队列中获取的task不为空while (task != null || (task = getTask()) != null) {task.run();//执行task中的run方法}}completedAbruptly = false;} finally {//1.将 worker 从数组 workers 里删除掉//2.根据布尔值 allowCoreThreadTimeOut 来决定是否补充新的 Worker 进数组 workersprocessWorkerExit(w, completedAbruptly);}}

面试官:为什么《阿里巴巴开发手册》不推荐使用Executor创建线程?

ScheduledThreadPoolExecutor

延时执行

    ScheduledThreadPoolExecutor threadPool = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "schedule-thread");}});/*** 延迟执行* @throws InterruptedException*/@Testvoid testSchedule() throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(1);System.out.println(new Date());threadPool.schedule(new TimeTask(), 3, TimeUnit.SECONDS);countDownLatch.await();}class TimeTask implements Runnable{@Overridepublic void run() {try {TimeUnit.SECONDS.sleep(4);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + new Date() + " 任务执行完成");}}

周期执行

1.scheduleAtFixedRate方法

按固定频率执行,与任务本身执行时间无关。但有个前提条件,任务执行时间必须小于间隔时间,例如间隔时间是5s,每5s执行一次任务,任务的执行时间必须小于5s。

    @Testvoid testScheduleAtFixedRate() throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(1);threadPool.scheduleAtFixedRate(new TimeTask(), 2, 3, TimeUnit.SECONDS);countDownLatch.await();}

2.scheduleWithFixedDelay方法

按固定间隔执行,与任务本身执行时间有关。例如,任务本身执行时间是10s,间隔2s,则下一次开始执行的时间就是12s。

    @Testvoid testScheduleWithFixedDelay() throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(1);threadPool.scheduleWithFixedDelay(new TimeTask(), 2, 3, TimeUnit.SECONDS);countDownLatch.await();}

面试题:你知道延迟执行、周期性执行任务实现原理?

ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,这意味着其内部的数据结构和ThreadPoolExecutor是基本一样的。

延迟执行任务依靠的是DelayQueue。DelayQueue是 BlockingQueue的一种,其实现原理是二叉堆

而周期性执行任务是执行完一个任务之后,再把该任务扔回到任务队列中,如此就可以对一个任务反复执行。

不过这里并没有使用DelayQueue,而是在ScheduledThreadPoolExecutor内部又实现了一个特定的DelayQueue

    static class DelayedWorkQueue extends AbstractQueue<Runnable>implements BlockingQueue<Runnable> {...}

其原理和DelayQueue一样,但针对任务的取消进行了优化。下面主要讲延迟执行和周期性执行的实现过程。

延迟执行设计原理:

传进去的是一个Runnable,外加延迟时间delay。在内部通过decorateTask(…)方法把Runnable包装成一个ScheduleFutureTask对象,而DelayedWorkQueue中存放的正是这种类型的对象,这种类型的对象一定实现了Delayed接口。



从上面的代码中可以看出,schedule()方法本身很简单,就是把提交的Runnable任务加上delay时间,转换成ScheduledFutureTask对象,放入DelayedWorkerQueue中。任务的执行过程还是复用的ThreadPoolExecutor,延迟的控制是在DelayedWorkerQueue内部完成的。

周期性执行设计原理:


和schedule(…)方法的框架基本一样,也是包装一个ScheduledFutureTask对象,只是在延迟时间参数之外多了一个周期参数,然后放入DelayedWorkerQueue就结束了。

两个方法的区别在于一个传入的周期是一个负数,另一个传入的周期是一个正数,为什么要这样做呢?

用于生成任务序列号的sequencer,创建ScheduledFutureTask的时候使用:

    private class ScheduledFutureTask<V>extends FutureTask<V> implements RunnableScheduledFuture<V> {/** Sequence number to break ties FIFO */private final long sequenceNumber;/** 延时时间 */private long time;private final long period;/** The actual task to be re-enqueued by reExecutePeriodic */RunnableScheduledFuture<V> outerTask = this;/*** Index into delay queue, to support faster cancellation.*/int heapIndex;/*** Creates a one-shot action with given nanoTime-based trigger time.*/ScheduledFutureTask(Runnable r, V result, long ns) {super(r, result);this.time = ns;this.period = 0;this.sequenceNumber = sequencer.getAndIncrement();}/*** Creates a periodic action with given nano time and period.*/ScheduledFutureTask(Runnable r, V result, long ns, long period) {super(r, result);this.time = ns;this.period = period;this.sequenceNumber = sequencer.getAndIncrement();}public long getDelay(TimeUnit unit) {return unit.convert(time - now(), NANOSECONDS);}public int compareTo(Delayed other) {if (other == this) // compare zero if same objectreturn 0;if (other instanceof ScheduledFutureTask) {ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;long diff = time - x.time;if (diff < 0)return -1;else if (diff > 0)return 1;else if (sequenceNumber < x.sequenceNumber)return -1;elsereturn 1;}long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;}public boolean isPeriodic() {return period != 0;}/*** 设置下一个执行时间*/private void setNextRunTime() {long p = period;if (p > 0)time += p;elsetime = triggerTime(-p);}public boolean cancel(boolean mayInterruptIfRunning) {boolean cancelled = super.cancel(mayInterruptIfRunning);if (cancelled && removeOnCancel && heapIndex >= 0)remove(this);return cancelled;}/***实现Runnable*/public void run() {boolean periodic = isPeriodic();if (!canRunInCurrentRunState(periodic))cancel(false);// 如果不是周期执行,则执行一次else if (!periodic)ScheduledFutureTask.super.run();// 如果是周期执行,则重新设置下一次运行的时间,重新入队列else if (ScheduledFutureTask.super.runAndReset()) {setNextRunTime();reExecutePeriodic(outerTask);}}//下一次触发时间long triggerTime(long delay) {return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));}//放到队列中,等待下一次执行void reExecutePeriodic(RunnableScheduledFuture<?> task{if (canRunInCurrentRunState(true)) {super.getQueue().add(task);if (!canRunInCurrentRunState(true) && remove(task))task.cancel(false);elseensurePrestart();}}}

withFixedDelay和atFixedRate的区别就体现在setNextRunTime里面。

如果是atFixedRate,period>0,下一次开始执行时间等于上一次开始执行时间+period;

如果是withFixedDelay,period < 0,下一次开始执行时间等于triggerTime(-p),为now+(-period),now即上一次执行的结束时间。

面试题:为什么不使用Timer而使用ScheduledThreadPoolExecutor?

  1. Timer使用的是绝对时间,系统时间的改变会对Timer产生一定的影响;而ScheduledThreadPoolExecutor使用的是相对时间,所以不会有这个问题。
  2. Timer使用单线程来处理任务,长时间运行的任务会导致其他任务的延时处理,而ScheduledThreadPoolExecutor可以自定义线程数量。
  3. Timer没有对运行时异常进行处理,一旦某个任务触发运行时异常,会导致整个Timer崩溃,而ScheduledThreadPoolExecutor对运行时异常做了捕获(可以在afterExecute()回调方法中进行处理),所以更加安全。

CompletableFuture异步编程工具

基本使用

package net.dreamzuora.thread;import org.testng.annotations.Test;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;/*** 异步编程工具*/
public class CompletableFutureDemo {/*** CompletableFuture实现了Future接口,所以它也具有Future的特性:调用get()方法会阻塞在那,* 直到结果返回。* 另外1个线程调用complete方法完成该Future,则所有阻塞在get()方法的线程都将获得返回结果。* @throws ExecutionException* @throws InterruptedException*/@Testvoid complete() throws ExecutionException, InterruptedException {CompletableFuture<String> completeFuture = new CompletableFuture<>();new Thread(() -> {try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}completeFuture.complete("gome");}).start();System.out.println(completeFuture.get());}/*** 阻塞等待任务执行完成*/@Testvoid runAsyncTest() throws ExecutionException, InterruptedException {CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {try {TimeUnit.SECONDS.sleep(3);System.out.println("hello word!");} catch (InterruptedException e) {e.printStackTrace();}});//阻塞等待任务完成completableFuture.get();System.out.println("succ");}/*** 带返回值的任务执行* @throws ExecutionException* @throws InterruptedException*/@Testvoid supplyAsync() throws ExecutionException, InterruptedException {CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}return "hello";}});String result = stringCompletableFuture.get();System.out.println(result);}/*** thenRun():上个任务结束再执行(不带上一个返回值结果)下一个任务* thenAccept后面跟的是一个有参数、无返回值的方法,称为Consumer,返回值也是* CompletableFuture<Void>类型。顾名思义,只进不出,所以称为Consumer;前面的* Supplier,是无参数,有返回值,只出不进,和Consumer刚好相反。* @throws ExecutionException* @throws InterruptedException*/@Testvoid thenRun() throws ExecutionException, InterruptedException {CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {System.out.println("第一次执行");}).thenRun(new Runnable() {@Overridepublic void run() {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第二次执行");}});completableFuture.get();}/*** thenAccept():上个任务结束再执行(前面任务的结果作为下一个任务的入参)下一个任务* @throws ExecutionException* @throws InterruptedException*/@Testvoid thenAccept() throws ExecutionException, InterruptedException {CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {return "hello";}}).thenAccept(new Consumer<String>() {@Overridepublic void accept(String param) {System.out.println(param + " word!");}});completableFuture.get();}/*** thenApply 后面跟的是一个有参数、有返回值的方法,称为Function。返回值是* CompletableFuture<String>类型。* @throws ExecutionException* @throws InterruptedException*/@Testvoid thenApply() throws ExecutionException, InterruptedException {CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}return "第一个任务执行完成!";}}).thenApply(new Function<String, String>() {@Overridepublic String apply(String firstTaskResult) {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}return firstTaskResult + " 第二个任务执行完成!";}});String result = stringCompletableFuture.get();System.out.println(result);}/*** 第1个参数是一个CompletableFuture类型,第2个参数是一个方法,并且是一个BiFunction,也就* 是该方法有2个输入参数,1个返回值。* 从该接口的定义可以大致推测,它是要在2个 CompletableFuture 完成之后,把2个* CompletableFuture的返回值传进去,再额外做一些事情。* @throws ExecutionException* @throws InterruptedException*/@Testvoid thenCompose() throws ExecutionException, InterruptedException {CompletableFuture<String> future = CompletableFuture.supplyAsync((Supplier<String>) () -> "第一个任务执行完成!").thenCompose(new Function<String, CompletionStage<String>>() {@Overridepublic CompletionStage<String> apply(String firstTask) {return CompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {return firstTask + " 第二个任务执行完成!";}});}});String s = future.get();System.out.println(s);}/*** 如果希望返回值是一个非嵌套的CompletableFuture,可以使用thenCompose:* @throws ExecutionException* @throws InterruptedException*/@Testvoid thenCombine() throws ExecutionException, InterruptedException {CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {return "第一个任务执行完成! ";}}).thenCombine(CompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {return "第二个任务执行完成! ";}}), new BiFunction<String, String, Integer>() {@Overridepublic Integer apply(String s1, String s2) {return s1.length() + s2.length();}});System.out.println(future.get());}/*** 等待所有的CompletableFuture执行完成,无返回值* @throws ExecutionException* @throws InterruptedException*/@Testvoid allOf() throws ExecutionException, InterruptedException {AtomicInteger atc = new AtomicInteger(0);CompletableFuture[] completableFutures = new CompletableFuture[10];for (int i = 0; i < 10; i++){CompletableFuture supplyAsync = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}return atc.incrementAndGet();}});completableFutures[i] = supplyAsync;}CompletableFuture<Void> completableFuture = CompletableFuture.allOf(completableFutures);completableFuture.get();System.out.println(atc);}/*** anyOf:只要有任意一个CompletableFuture结束,就可以做接下来的事情,而无须像* AllOf那样,等待所有的CompletableFuture结束。* 但由于每个CompletableFuture的返回值类型都可能不同,任意一个,意味着无法判断是什么类* 型,所以anyOf的返回值是CompletableFuture<Object>类型*/@Testvoid anyOf() throws ExecutionException, InterruptedException {AtomicInteger atc = new AtomicInteger(0);CompletableFuture[] completableFutures = new CompletableFuture[10];for (int i = 0; i < 10; i++){CompletableFuture supplyAsync = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}return atc.incrementAndGet();}});completableFutures[i] = supplyAsync;}Integer result = (Integer) CompletableFuture.anyOf(completableFutures).get();System.out.println(result);}
}

四种任务原型

通过上面的例子可以总结出,提交给CompletableFuture执行的任务有四种类型:Runnable、Consumer、Supplier、Function。下面是这四种任务原型的对比。

runAsync 与 supplierAsync 是 CompletableFuture 的静态方法;而 thenAccept、thenAsync、thenApply是CompletableFutre的成员方法。

因为初始的时候没有CompletableFuture对象,也没有参数可传,所以提交的只能是Runnable或者Supplier,只能是静态方法;

通过静态方法生成CompletableFuture对象之后,便可以链式地提交其他任务了,这个时候就可以提交Runnable、Consumer、Function,且都是成员方法。

面试题:你知道CompletableFuture内部原理?

CompletableFuture的构造:ForkJoinPool

    private static final Executor asyncPool = useCommonPool ?ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

任务执行

    public static CompletableFuture<Void> runAsync(Runnable runnable) {return asyncRunStage(asyncPool, runnable);}static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {if (f == null) throw new NullPointerException();CompletableFuture<Void> d = new CompletableFuture<Void>();e.execute(new AsyncRun(d, f));return d;}


通过上面的代码可以看到,asyncPool是一个static类型,supplierAsync、asyncSupplyStage也都是static方法。

Static方法会返回一个CompletableFuture类型对象,之后就可以链式调用CompletionStage里面的各个方法。

任务类型的适配

我们向CompletableFuture提交的任务是Runnable/Supplier/Consumer/Function 。因此,肯定需要一个适配机制,把这四种类型的任务转换成ForkJoinTask,然后提交给ForkJoinPool,如下图所示:

supplyAsync()->Supplier->AsyncSupply

在 supplyAsync(…)方法内部,会把一个 Supplier 转换成一个 AsyncSupply,然后提交给ForkJoinPool执行;

    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {return asyncSupplyStage(asyncPool, supplier);}static <U> CompletableFuture<U> asyncSupplyStage(Executor e,Supplier<U> f) {if (f == null) throw new NullPointerException();CompletableFuture<U> d = new CompletableFuture<U>();e.execute(new AsyncSupply<U>(d, f));return d;}static final class AsyncSupply<T> extends ForkJoinTask<Void>implements Runnable, AsynchronousCompletionTask {CompletableFuture<T> dep; Supplier<T> fn;AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {this.dep = dep; this.fn = fn;}...}

runAsync()->Runnable->AsyncRun
在runAsync(…)方法内部,会把一个Runnable转换成一个AsyncRun,然后提交给ForkJoinPool执行;

    public static CompletableFuture<Void> runAsync(Runnable runnable) {return asyncRunStage(asyncPool, runnable);}static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {if (f == null) throw new NullPointerException();CompletableFuture<Void> d = new CompletableFuture<Void>();e.execute(new AsyncRun(d, f));return d;}static final class AsyncRun extends ForkJoinTask<Void>implements Runnable, AsynchronousCompletionTask {CompletableFuture<Void> dep; Runnable fn;AsyncRun(CompletableFuture<Void> dep, Runnable fn) {this.dep = dep; this.fn = fn;}...}

thenAccept()->Consumer->UniAccept
在 thenRun/thenAccept/thenApply 内部,会分别把Runnable/Consumer/Function 转换成UniRun/UniAccept/UniApply对象,然后提交给ForkJoinPool执行;

除此之外,还有两种 CompletableFuture 组合的情况,分为“与”和“或”,所以有对应的Bi和Or类型
的Completion类型

    public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {return uniAcceptStage(null, action);}private CompletableFuture<Void> uniAcceptStage(Executor e,Consumer<? super T> f) {if (f == null) throw new NullPointerException();CompletableFuture<Void> d = new CompletableFuture<Void>();if (e != null || !d.uniAccept(this, f, null)) {UniAccept<T> c = new UniAccept<T>(e, d, this, f);push(c);c.tryFire(SYNC);}return d;}

任务的链式执行过程分析

下面以CompletableFuture.supplyAsync(…).thenApply(…).thenRun(…)链式代码为例,分析整个执行过程。

    static final class AsyncSupply<T> extends ForkJoinTask<Void>implements Runnable, AsynchronousCompletionTask {...}

什么是 Java8 的 ForkJoinPool?

ForkJoinPool就是JDK7提供的一种“分治算法”的多线程并行计算框架。Fork意为分叉,Join意为合并,一分一合,相互配合,形成分治算法。此外,也可以将ForkJoinPool看作一个单机版的
Map/Reduce,多个线程并行计算。

相比于ThreadPoolExecutor,ForkJoinPool可以更好地实现计算的负载均衡,提高资源利用率。

假设有5个任务,在ThreadPoolExecutor中有5个线程并行执行,其中一个任务的计算量很大,其余4个任务的计算量很小,这会导致1个线程很忙,其他4个线程则处于空闲状态。

利用ForkJoinPool,可以把大的任务拆分成很多小任务,然后这些小任务被所有的线程执行,从而
实现任务计算的负载均衡。

应用

1.斐波那契数列

    @Testvoid testForkJoin() throws ExecutionException, InterruptedException {ForkJoinPool forkJoinPool = new ForkJoinPool();ForkJoinTask<Integer> task = forkJoinPool.submit(new FibonacciTask(5));System.out.println(task.get());}// 1 1 2 3 5 8 ...class FibonacciTask extends RecursiveTask<Integer> {int n;public FibonacciTask(int n) {this.n = n;}@Overrideprotected Integer compute() {if (n <= 1){return 1;}FibonacciTask task1 = new FibonacciTask(n - 1);task1.fork();FibonacciTask task2 = new FibonacciTask(n - 2);task2.fork();return task1.join() + task2.join();}}

核心数据结构

与ThreadPoolExector不同的是,除一个全局的任务队列之外,每个线程还有一个自己的局部队列。

本课程内容参考:
1.《并发编程78讲》-徐隆曦 滴滴出行高级工程师
2.美团技术博客-Java线程池实现原理及其在美团业务中的实践
3.《java并发编程实战》
4.CSDN博客-面试官:你知道什么是线程池的线程复用原理吗?

深入剖析线程池基本原理以及常见面试题详解相关推荐

  1. Java开发常见面试题详解(LockSupport,AQS,Spring循环依赖,Redis)_3

    Java开发常见面试题详解(LockSupport,AQS,Spring循环依赖,Redis)_3 总览 问题 详解 String.intern()的作用 link LeetCode的Two Sum题 ...

  2. Java开发常见面试题详解(JVM)_2

    Java开发常见面试题详解(JVM)_2 JVM 问题 详解 JVM垃圾回收的时候如何确定垃圾?是否知道什么是GC Roots link 你说你做过JVM调优和参数配置,请问如何盘点查看JVM系统默认 ...

  3. JSP, Servlet常见面试题详解

    JSP, Servlet常见面试题 1,J2EE是什么? J2EE本身是一个标准,一个为企业分布式应用的开发提供的标准平台. J2EE也是一个框架,包括JDBC.JNDI.RMI.JMS.EJB.JT ...

  4. Java开发常见面试题详解(LockSupport,AQS,Spring循环依赖,Redis)

    总览 问题 详解 String.intern()的作用 link LeetCode的Two Sum题 link 什么是可重入锁? link 谈谈LockSupport link 谈谈AQS link ...

  5. Java开发常见面试题详解(并发,JVM)

    预览 并发 问题 详解 请谈谈你对volatile的理解 link CAS你知道吗? link 原子类Atomiclnteger的ABA问题谈谈?原子更新引用知道吗? link 我们知道ArrayLi ...

  6. Redis常见面试题详解

    文章目录 1. Redis 1.1 Redis可以用来做什么? 1.2 Redis和传统的关系型数据库有什么不同? 1.3 Redis有哪些数据类型? 1.4 Redis是单线程的,为什么还能这么快? ...

  7. jQuery的概念、用法、常见面试题详解

    什么是jQuery? jQuery 是一个高效.精简并且功能丰富的 JavaScript 工具库.它提供的 API 易于使用且兼容众多浏览器,这让诸如 HTML 文档遍历和操作.事件处理.动画和 Aj ...

  8. 线程池框架_Java并发——Executor框架详解(Executor框架结构与框架成员)

    一.什么是Executor框架? 我们知道线程池就是线程的集合,线程池集中管理线程,以实现线程的重用,降低资源消耗,提高响应速度等.线程用于执行异步任务,单个的线程既是工作单元也是执行机制,从JDK1 ...

  9. 线程池的创建及参数设置详解

    一. 常见线程池 线程池的创建方法主要有两类,第一是通过Executors 创建线程池,第二是通过 ThreadPoolExecutor 创建线程池. 首先我们来看通过Executors 创建的线程池 ...

最新文章

  1. 零基础入门学习python(24)-字典(2):字典的内置方法
  2. 习题:codevs 2822 爱在心中 解题报告
  3. html 自定义标签使用实现方法
  4. UVa 10375 Choose and divide
  5. 三张图看遍Linux 性能监控、测试、优化工具
  6. 强化学习笔记:Q_learning (Q-table)示例举例
  7. 质数,约数(数论) AcWing算法课
  8. PDF转换器安装教程
  9. 5.jQueryAjax
  10. SQL Server 2008 R2数据库镜像部署
  11. ubuntu 下可以尝试还不错的屏幕截图工具: flameshot
  12. java 裁剪 pdf_java – 使用iTextPDF修剪页面的空白
  13. 设计模式-模板方法模式(15)
  14. 2013年最新黑马程序员全套视频-.net视频40G免费下
  15. html画表盘 随时间转动,Html5画钟表盘/指针实时跳动
  16. 决策树案例学习(Python实现)
  17. 5月市场平稳,期货成交量环比下降
  18. 学生成绩排名预测(DC)
  19. 梦之光芒ctf小游戏闯关过程
  20. xp开机加载个人设置很慢的解决方法

热门文章

  1. 现代软件工程 第十七章 【人、绩效和职业道德】 练习与讨论
  2. ln -s命令 linux,Linux下 ln -s 软链接用法
  3. python最常用的编程方式是什么_python常用模块和对象编程
  4. Linux如何查看所有用户和用户组信息(cat groups whoami)
  5. 计算机管理中添加用户属性,如何在计算机右键菜单栏中添加属性选项
  6. android opencv 水印,关于opencv对图片添加水印
  7. redis数据丢失_有效避免数据丢失!Redis持久化方案选择详解
  8. python哪个文字转语音好用_【python3】Python十行代码搞定文字转语音
  9. oracle归档模式备份恢复,oracle归档模式备份恢复
  10. 一行代码蒸发64亿人民币!黑客盯上区块链漏洞!Python真的变态!