在JDK7引入了Fork/Join框架,所谓Fork/Join框架,个人理解,有一种分治的策略在里边:Fork分解任务成独立的子任务,用多线程去执行这些子任务,Join合并子任务的结果。这样就能使用多线程的方式来执行一个任务。

JDK7引入的Fork/Join有三个核心类:

ForkJoinPool,执行任务的线程池ForkJoinWorkerThread,执行任务的工作线程ForkJoinTask,一个用于ForkJoinPool的任务抽象类。

我们已经很清楚Fork/Join框架的需求了,那么我们可以思考一下,如果让我们来设计一个Fork/Join框架,该如何设计?这个思考有助于你理解Fork/Join框架的设计。

第一步分割任务。首先我们需要有一个fork类来把大任务分割成子任务,有可能子任务还是很大,所以还需要不停的分割,直到分割出的子任务足够小。

第二步执行任务并合并结果。分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。

Fork/Join使用两个类来完成以上两件事情:

ForkJoinTask:我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制,通常情况下我们不需要直接继承ForkJoinTask类,而只需要继承它的子类,Fork/Join框架提供了以下两个子类:

    RecursiveAction:用于没有返回结果的任务。RecursiveTask :用于有返回结果的任务。

ForkJoinPool :ForkJoinTask需要通过ForkJoinPool来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。

    public class Calculator extends RecursiveTask<Integer> {  private static final int THRESHOLD = 100;  private int start;  private int end;  public Calculator(int start, int end) {  this.start = start;  this.end = end;  }  @Override  protected Integer compute() {  int sum = 0;  if((start - end) < THRESHOLD){  for(int i = start; i< end;i++){  sum += i;  }  }else{  int middle = (start + end) /2;  Calculator left = new Calculator(start, middle);  Calculator right = new Calculator(middle + 1, end);  left.fork();  right.fork();  sum = left.join() + right.join();  }  return sum;  }  }  

而执行该自定义任务的调用的则是ForkJoinPool的execute方法,因此首先来看的就是ForkJoinPool的execute方法,看看和普通线程池执行任务有什么不同:

  public void execute(ForkJoinTask<?> task) {if (task == null)throw new NullPointerException();forkOrSubmit(task);}

因此forkOrSubmit是真正执行ForkJoinTask的方法:

   private <T> void forkOrSubmit(ForkJoinTask<T> task) {ForkJoinWorkerThread w;Thread t = Thread.currentThread();if (shutdown)throw new RejectedExecutionException();if ((t instanceof ForkJoinWorkerThread) &&(w = (ForkJoinWorkerThread)t).pool == this)w.pushTask(task);else// 正常执行的时候是主线程调用的,因此关注addSubmissionaddSubmission(task);}

那么我们首先要关注的是addSubmission方法,发觉所做的事情和普通线程池很类似,就是把任务加入到队列中,不同的是直接使用Unsafe操作内存来添加任务对象

   private void addSubmission(ForkJoinTask<?> t) {final ReentrantLock lock = this.submissionLock;lock.lock();try {// 队列只是普通的数组而不是普通线程池的BlockingQueue,// 唤醒worker线程的工作由下面的signalWork来完成// 使用Unsafe进行内存操作,把任务放置在数组中ForkJoinTask<?>[] q; int s, m;if ((q = submissionQueue) != null) {long u = (((s = queueTop) & (m = q.length-1)) << ASHIFT)+ABASE;UNSAFE.putOrderedObject(q, u, t);queueTop = s + 1;if (s - queueBase == m)// 数组已满,为数组扩容growSubmissionQueue();}} finally {lock.unlock();}// 通知有新任务来了:两种操作,有空闲线程则唤醒该线程// 否则如果可以新建worker线程则为这个任务新建worker线程// 如果不可以就返回了,等到有空闲线程来执行这个任务signalWork();}

接下来要弄清楚就是在compute中fork时,按道理来说这个动作是和主任务在同一个线程中执行,fork是如果把子任务变成多线程执行的:

    public final ForkJoinTask<V> fork() {((ForkJoinWorkerThread) Thread.currentThread()).pushTask(this);return this;}

在上面分析forkOrSubmit的时候同样见到了ForkJoinWorkerThread的pushTask方法调用,那么来看这个方法:

   final void pushTask(ForkJoinTask<?> t) {// 代码的基本逻辑和ForkJoinPool的addSubmission方法基本一致// 都是把任务加入了任务队列中,这里是加入到ForkJoinWorkerThread// 内置的任务队列中ForkJoinTask<?>[] q; int s, m;if ((q = queue) != null) {    // ignore if queue removedlong u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;UNSAFE.putOrderedObject(q, u, t);queueTop = s + 1;         // or use putOrderedInt// 这里不太明白if ((s -= queueBase) <= 2)pool.signalWork();else if (s == m)growQueue();}}

ForkJoinWorkerThread的run方法:

   public void run() {Throwable exception = null;try {// 初始化任务队列onStart();// 线程运行pool.work(this);} catch (Throwable ex) {exception = ex;} finally {// 结束后的工作onTermination(exception);}}

因此我们需要再次回到ForkJoinPool,看看work方法:

 final void work(ForkJoinWorkerThread w) {boolean swept = false;                // 下面scan方法没有扫描到任务返回truelong c;// ctl是一个64位长的数据,它的格式如下:// 48-63:AC,正在运行的worker线程数减去系统的并发数(减去系统的并发得出的实际是在某一瞬间等待并发资源的线程数量)// 32-47:TC,所有的worker线程数减去系统的并发数// 31:   ST,1表示线程池正在关闭// 16-30:EC,第一个等待线程的等待数// 0- 15:ID,Treiber栈(存储等待线程)顶的worker线程在线程池的线程队列中的索引// (int)(c = ctl) >= 0表示ST位为0,即线程池不是正在关闭的状态while (!w.terminate && (int)(c = ctl) >= 0) {int a; // 正在运行的worker线程数,ctl中的AC部分// swept为false可能有三种:// 1. scan返回false// 2. 首次循环// 3. tryAwaitWork成功if (!swept && (a = (int)(c >> AC_SHIFT)) <= 0)swept = scan(w, a);else if (tryAwaitWork(w, c))swept = false;}}
  private boolean scan(ForkJoinWorkerThread w, int a) {int g = scanGuard; // mask 0 avoids useless scans if only one activeint m = (parallelism == 1 - a && blockedCount == 0) ? 0 : g & SMASK;ForkJoinWorkerThread[] ws = workers;if (ws == null || ws.length <= m)         // staleness checkreturn false;// 代码看起来晕啊,看来当前的ForkJoinWorkerThread不一定是运行自己的// Task,可以运行其他ForkJoinWorkerThread的Task。// 似乎有点明白了,这样可以实现Fork出来的任务被多线程执行// 看起来这是一个较为复杂的算法for (int r = w.seed, k = r, j = -(m + m); j <= m + m; ++j) {ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;ForkJoinWorkerThread v = ws[k & m];if (v != null && (b = v.queueBase) != v.queueTop &&(q = v.queue) != null && (i = (q.length - 1) & b) >= 0) {long u = (i << ASHIFT) + ABASE;if ((t = q[i]) != null && v.queueBase == b &&UNSAFE.compareAndSwapObject(q, u, t, null)) {int d = (v.queueBase = b + 1) - v.queueTop;v.stealHint = w.poolIndex;if (d != 0)signalWork();             // propagate if nonemptyw.execTask(t);}r ^= r << 13; r ^= r >>> 17; w.seed = r ^ (r << 5);return false;                     // store next seed}else if (j < 0) {                     // xorshiftr ^= r << 13; r ^= r >>> 17; k = r ^= r << 5;}else++k;}if (scanGuard != g)                       // staleness checkreturn false;else {                                    // try to take submissionForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;if ((b = queueBase) != queueTop &&(q = submissionQueue) != null &&(i = (q.length - 1) & b) >= 0) {long u = (i << ASHIFT) + ABASE;if ((t = q[i]) != null && queueBase == b &&UNSAFE.compareAndSwapObject(q, u, t, null)) {queueBase = b + 1;w.execTask(t);}return false;}return true;                         // all queues empty}}

Work-Stealing的部分,下面列出该算法的要点:

  1. 每个Worker线程都维护一个任务队列,即ForkJoinWorkerThread中的任务队列。

  2. 任务队列是双向队列,这样可以同时实现LIFO和FIFO。

  3. 子任务会被加入到原先任务所在Worker线程的任务队列。

  4. Worker线程用LIFO的方法取出任务,也就后进队列的任务先取出来(子任务总是后加入队列,但是需要先执行)。

  5. Worker线程的任务队列为空,会随机从其他的线程的任务队列中拿走一个任务执行(所谓偷任务:steal work,FIFO的方式)。

  6. 如果一个Worker线程遇到了join操作,而这时候正在处理其他任务,会等到这个任务结束。否则直接返回。

  7. 如果一个Worker线程偷任务失败,它会用yield或者sleep之类的方法休息一会儿,再尝试偷任务(如果所有线程都是空闲状态,即没有任务运行,那么该线程也会进入阻塞状态等待新任务的到来)。

那么重新回到ForkJoinPool的scan方法

private boolean scan(ForkJoinWorkerThread w, int a) {// scanGuard是32位的整数,用于worker线程数组的索引// 第16位称为SG_UNIT,为1表示锁住// 0到15位是maskint g = scanGuard;// parallelism表示并发数,一般指CPU可以同时运行的线程数// 默认值是Runtime类的availableProcessors方法返回值,表示// 处理器的数量// a是活跃的Worker线程的数量,parallelism是大于0的,因此// 条件parallelism == 1 - a满足意味着parallelism为1而a为0// 而加上blockedCount为0(意味着没有线程因为join被阻塞),// 两个条件同时满足也就意味既没有任何线程在运行,那么也就// 意味着没有任务存在于worker线程,所以m=0也就是没法偷任务// SMASK=0xffff,g & SMASK返回的值scanGuard的0到15位的数值int m = (parallelism == 1 - a && blockedCount == 0) ? 0 : g & SMASK;ForkJoinWorkerThread[] ws = workers;if (ws == null || ws.length <= m) return false;// for (int r = w.seed, k = r, j = -(m + m); j <= m + m; ++j) {ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;// 从线程队列中随机获取一个worker线程ForkJoinWorkerThread v = ws[k & m];// 判断Worker线程是否存在以及该线程的任务队列是否有任务if (v != null && (b = v.queueBase) != v.queueTop &&(q = v.queue) != null && (i = (q.length - 1) & b) >= 0) {// 从队列中偷走一个任务long u = (i << ASHIFT) + ABASE;if ((t = q[i]) != null && v.queueBase == b &&UNSAFE.compareAndSwapObject(q, u, t, null)) {int d = (v.queueBase = b + 1) - v.queueTop;v.stealHint = w.poolIndex;// d是偷走一个任务后任务队列的长度if (d != 0)signalWork();w.execTask(t);}r ^= r << 13; r ^= r >>> 17; w.seed = r ^ (r << 5);// false表示扫描到了任务return false;}else if (j < 0) {                     // 异或移位,更新kr ^= r << 13; r ^= r >>> 17; k = r ^= r << 5;}else++k;}// 如果扫描不到任务,但是scanGuard被更新了,说明有任务的变化if (scanGuard != g)return false;else {// 从线程池的任务队列中取出任务来执行ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;if ((b = queueBase) != queueTop &&(q = submissionQueue) != null &&(i = (q.length - 1) & b) >= 0) {long u = (i << ASHIFT) + ABASE;if ((t = q[i]) != null && queueBase == b &&UNSAFE.compareAndSwapObject(q, u, t, null)) {queueBase = b + 1;w.execTask(t);}return false;}return true;}}

ForkJoinTask的join方法都发生了什么:
复制代码

public final V join() {// doJoin方法返回该任务的状态,状态值有三种:// NORMAL, CANCELLED和EXCEPTIONAL// join的等待过程在doJoin方法中进行if (doJoin() != NORMAL)// reportResult方法针对任务的三种状态有三种处理方式:// NORMAL: 直接返回getRawResult()方法的返回值// CANCELLED: 抛出CancellationException// EXCEPTIONAL: 如果任务执行过程抛出了异常,则抛出该异常,否则返回getRawResult()return reportResult();else// getRawResult是抽象方法,由子类来实现return getRawResult();
}

复制代码

RecursiveAction和RecursiveTask实现了getRawResult方法。

RecursiveAction用于没有返回值的场合,因此getRawResult方法返回null。

RecursiveTask用于有返回值的场合,因此返回的是抽象方法compute方法的返回值。

接下来继续看join的核心方法doJoin方法:

 private int doJoin() {Thread t; ForkJoinWorkerThread w; int s; boolean completed;// 针对ForkJoinWorkerThread调用join的情况if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {// status值的初始化值是0,在任务没有完成以前一直是非负值// 因此一旦status的值变成负数,表示任务已经完成,直接返回if ((s = status) < 0)return s;// 检查当前worker线程的任务栈(因为采用LIFO方式,所有这里称为栈)// 的栈顶的任务是不是当前任务,如果是,从栈中取走该任务并执行// 然后返回执行之后任务的状态if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) {try {completed = exec();} catch (Throwable rex) {return setExceptionalCompletion(rex);}if (completed)return setCompletion(NORMAL);}// 如果不是栈顶任务的情况return w.joinTask(this);}else// 外部线程等待任务结束的情况return externalAwaitDone();}

前面文章中曾经举了几个例子来演示如何实现RecursiveTask的子类。在compute方法中会看到了join方法的调用,也就是ForkJoinWorkerThread调用join的情况。

因此首先来看ForkJoinWorkerThread的joinTask方法的实现:

 final int joinTask(ForkJoinTask<?> joinMe) {ForkJoinTask<?> prevJoin = currentJoin;currentJoin = joinMe;for (int s, retries = MAX_HELP;;) {// 当前任务已经完成,返回到前面一个join的任务if ((s = joinMe.status) < 0) {currentJoin = prevJoin;return s;}// 剩余的尝试次数大于0(MAX_HELP值为16)的情况,继续做尝试if (retries > 0) {if (queueTop != queueBase) {// 检查当前线程的任务栈,如果任务栈不为空,当前任务处在栈顶位置则// 执行该任务返回true,否则返回false,直接认为尝试失败if (!localHelpJoinTask(joinMe))retries = 0;}// 尝试了最大允许次数的一半else if (retries == MAX_HELP >>> 1) {--retries;// 检查当前任务是否在某个worker线程的任务队列的队首位置// 如果是的话,偷走这个任务并且执行掉该任务。tryDeqAndExec// 返回任务的status值,因此大于等于0意味着任务还没有执行结束,// 当前线程让出控制权以便其他线程执行任务if (tryDeqAndExec(joinMe) >= 0)Thread.yield();}else// helpJoinTask方法检查当前任务是不是被某个Worker线程偷走了,// 并且是这个线程最新偷走的任务(currentSteal),如果是的话,// 当前线程帮助执行这个任务,这个过程成功则返回trueretries = helpJoinTask(joinMe) ? MAX_HELP : retries - 1;}else {// 尝试了最大允许次数还没有成功,重置以便再次尝试retries = MAX_HELP;// 一轮尝试失败,进入进程池等待任务pool.tryAwaitJoin(joinMe);}}}

来看一轮尝试失败之后,调用线程池的tryAwaitJoin方法会发生一些什么:

    final void tryAwaitJoin(ForkJoinTask<?> joinMe) {int s;// 检查任务是否结束之前先清除当前线程的中断状态// 因为tryAwaitDone会调用wait可能产生中断异常Thread.interrupted();// 任务还在执行的情况,否则执行完成就直接返回if (joinMe.status >= 0) {// blockedCount加1,把当前线程标记为阻塞// 成功则返回true,否则返回falseif (tryPreBlock()) {// 调用wait方法等待任务完成joinMe.tryAwaitDone(0L);// blockedCount减1,把当前线程标记为活跃状态postBlock();}// 线程处于关闭状态的情况,取消该任务else if ((ctl & STOP_BIT) != 0L)joinMe.cancelIgnoringExceptions();}}

最后又回归到了原点,来看task的tryAwaitDone方法:

   final void tryAwaitDone(long millis) {int s;try {// status为0,设为1。成功了然后才会用wait等待if (((s = status) > 0 ||(s == 0 &&UNSAFE.compareAndSwapInt(this, statusOffset, 0, SIGNAL))) &&status > 0) {synchronized (this) {if (status > 0)wait(millis);}}} catch (InterruptedException ie) {// 因为wait被中断了,不能保证任务被正确执行结束,因此调用该方法时要注意// 检查任务是否已经执行结束了}

走完了Worker线程内的join的流程,最后来看其他线程join等待发生了什么,来看externalAwaitDone方法:

  private int externalAwaitDone() {int s;if ((s = status) >= 0) {boolean interrupted = false;synchronized (this) {// 循环等待直到任务执行结束while ((s = status) >= 0) {if (s == 0)UNSAFE.compareAndSwapInt(this, statusOffset,0, SIGNAL);else {try {wait();} catch (InterruptedException ie) {interrupted = true;}}}}// 清除中断状态if (interrupted)Thread.currentThread().interrupt();}return s;}

externalAwaitDone逻辑较为简单,采用循环的方式,使用wait方法等待直到任务执行结束。

既然使用wait方法等待,那么必然在任务执行结束后需要调用notify或者notifyAll的方法,在setCompletion方法找到了:

 private int setCompletion(int completion) {for (int s;;) {if ((s = status) < 0)return s;if (UNSAFE.compareAndSwapInt(this, statusOffset, s, completion)) {if (s != 0)synchronized (this) { notifyAll(); }return completion;}}}

java.util.concurrent 包源码分析之Fork/Join框架相关推荐

  1. java.util.concurrent.FutureTask 源码

    2019独角兽企业重金招聘Python工程师标准>>> 线程池相关 源码: package java.util.concurrent;import java.util.concurr ...

  2. java.util.concurrent.ExecutorCompletionService 源码

    2019独角兽企业重金招聘Python工程师标准>>> 线程池相关 源码: package java.util.concurrent;public class ExecutorCom ...

  3. Java多线程 -- JUC包源码分析2 -- Copy On Write/CopyOnWriteArrayList/CopyOnWriteArraySet

    本人新书出版,对技术感兴趣的朋友请关注: https://mp.weixin.qq.com/s/uq2cw2Lgf-s4nPHJ4WH4aw 上1篇讲述了Java并发编程的第1个基本思想–CAS/乐观 ...

  4. Java并发之AQS源码分析ReentranLock、ReentrantReadWriteLock、Condition

    基于AQS的独享锁和共享锁的源码分析 基本概念说明 锁的基本原理思考 测试环境 实现方案1 实现方案2 独占锁:ReentrantLock源码分析 类依赖和类成员变量说明 加锁过程,入口方法:lock ...

  5. Java高并发编程学习(三)java.util.concurrent包

    简介 我们已经学习了形成Java并发程序设计基础的底层构建块,但对于实际编程来说,应该尽可能远离底层结构.使用由并发处理的专业人士实现的较高层次的结构要方便得多.要安全得多.例如,对于许多线程问题,可 ...

  6. Java并发编程 LockSupport源码分析

    这个类比较简单,是一个静态类,不需要实例化直接使用,底层是通过java未开源的Unsafe直接调用底层操作系统来完成对线程的阻塞. 1 package java.util.concurrent.loc ...

  7. java.util.concurrent包

    本文是我们学院课程中名为Java Concurrency Essentials的一部分 . 在本课程中,您将深入探讨并发的魔力. 将向您介绍并发和并发代码的基础知识,并学习诸如原子性,同步和线程安全之 ...

  8. 死磕Java集合之BitSet源码分析(JDK18)

    死磕Java集合之BitSet源码分析(JDK18) 文章目录 死磕Java集合之BitSet源码分析(JDK18) 简介 继承体系 存储结构 源码解析 属性 构造方法 set(int bitInde ...

  9. java.util.concurrent包API学习笔记

    newFixedThreadPool 创建一个固定大小的线程池. shutdown():用于关闭启动线程,如果不调用该语句,jvm不会关闭. awaitTermination():用于等待子线程结束, ...

最新文章

  1. 后深度学习时代的一大研究热点?论因果关系及其构建思路
  2. matlab入门学习2
  3. 网络营销激烈竞争下,网站被黑了怎么办?
  4. 一个例子学懂搜索引擎(lucene)
  5. mysql 用户授权_mysql添加、删除用户和授权用户
  6. Gradle笔记——Gradle的简介与安装
  7. 第七章信息系统安全工程考试要点及真题分布
  8. linux运维架构师职业规划
  9. Analysis-ik 中文分词安装
  10. Python - 列表解析式/生成器表达式
  11. 解决java使用Runtime.exec执行linux复杂命令不成功问题
  12. 十行代码实现高仿Promise
  13. 计算机控制中的时序,时序控制
  14. 激光雷达还是摄影测量?两者数据融合如何提高点云质量
  15. 必备工具:使用Pentaho进行数据迁移
  16. 《强化学习》 基本概念和交叉熵方法
  17. python获取cpu温度_Python如何读取CPU和GPU的温度?
  18. Anaconda3、TensorFlow和keras简单安装方法(较详细)
  19. 程序员埋逻辑炸弹,被判 6 个月
  20. 如何写出和阿里大佬一样高效优雅的打码

热门文章

  1. STM32F103 TFTLCD显示实验(一)
  2. 今晚8点直播 | 详解百度基于模板的 OCR 结果结构化处理技术
  3. 千里之行始于足下 开始篇
  4. 面向服务的体系架构(SOA)—入门篇
  5. openGL学习之glut库的使用
  6. 音乐制作宿主软件-Bitwig Studio 3 v3.3.3 x64 MacOSX
  7. NUMECA系列: FINE / Open 10.1 最新版64位CFD流体集成环境
  8. 后台框架--HUI 的学习跟使用1
  9. 视频转GIF制作工具哪个好用
  10. 初级学会响应式网页设计-周红川-专题视频课程