为什么80%的码农都做不了架构师?>>>   

根据前文描述的Doug Lea的理论基础,在JDK1.7中已经给出了Fork Join的实现。在Java SE 7的API中,多了ForkJoinTask、ForkJoinPool、ForkJoinWorkerThread、RecursiveAction、RecursiveTask这样5个类。本文就对JDK1.7中增加这5个工具类实现做简要分析。

0. JDK中ForkJoin实现概述

在JavaSE7的API和JDK1.7中,分别集成了支持ForkJoin的五个类:

  • ForkJoinPool 实现ForkJoin的线程池
  • ForkJoinWorkerThread  实现ForkJoin的线程
  • ForkJoinTask<V> 一个描述ForkJoin的抽象类
  • RecursiveAction 无返回结果的ForkJoinTask实现
  • RecursiveTask<V> 有返回结果的ForkJoinTask实现

ForkJoinPool维护了多个线程构成的数组,维护了任务提交队列,给出了多个线程之间工作窃取的实现。给出了任务类型适配,和提交任务逻辑的实现。需要和线程紧密配合。

而ForkJoinWorkerThread则继承了java.lang.Thread类,维护了线程自己的队列,同一个任务fork()操作原则上会添加到同一个线程队列中。而这个线程类需要和ForkJoinPool紧密合作,有指向对应ForkJoinPool对象的引用。

ForkJoinTask则实现了Future接口,除了对接口的实现外,主要是fork()和join()操作。注意,貌似fork()只有ForkJoinWorkerThread 中才能执行。

两个子类RecursiveAction和RecursiveTask则实现比较简单,区别就在于返回值的处理不同。

1. ForkJoinPool

ForkJoinPool是实现了 Fork Join 的线程池。看JDK源码我们知道ForkJoinPool是extends AbstractExecutorService的,也就是说间接地实现了Executor和ExecutorService接口。实际上也就意味着ForkJoinPool是继ThreadPoolExecutor后的又一个Executor(Service)的具体实现。

1.1. 构建初始化

我们先看ForkJoinPool的构造方法,一共有3个重载的实现。有一个单参数的默认实现,通常我们使用这个就足够了,这最终会以默认的参数调用3参数的构造方法。我们再来看3个参数的构造方法实现。其中:

  • int parallelism 第一个参数是并行度,这个参数简介影响着(会额外做一些运算)这个ForkJoinPool的ForkJoinWorkerThread 线程数。默认情况下,这个参数是任务运行环境的处理器个数,比如系统提供的处理器数目为4,初始化线程池会开启16个线程。
  • ForkJoinWorkerThreadFactory factory 这个是ForkJoinPool构建新线程ForkJoinWorkerThread 对象的工厂,类似于ThreadPoolExecutor中用到的ThreadFactory。
  • Thread.UncaughtExceptionHandler handler 这个前面并发的文章页提到过,是线程异常处理器,这里不多说了。

1.2. 任务提交

前面已经提到,ForkJoinPool也是Executor(Service)的实现,那么execute()和submit()这样向ThreadPoolExecutor提交任务的方法对于ForkJoinPool来说也是一样有效的。

需要说明的是,除了增加支持ForkJoinTask对象参数的重载实现外,还在Runnable和Callable参数的方法中对原始的Runnable和Callable对象做了到ForkJoinTask的适配,使用的分别是ForkJoinTask的静态内部类AdaptedRunnable和AdaptedCallable的对象。而这两个类型参数对应的方法最终都会调用ForkJoinTask参数的方法:

public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {if (task == null)throw new NullPointerException();forkOrSubmit(task);return task;}

我们接下来再看下任务提交中被调用到的forkOrSubmit()方法:

private <T> void forkOrSubmit(ForkJoinTask<T> task) {ForkJoinWorkerThread w;Thread t = Thread.currentThread();if (shutdown)throw new RejectedExecutionException();if ((t instanceof ForkJoinWorkerThread) &&(w = (ForkJoinWorkerThread)t).pool == this)w.pushTask(task);elseaddSubmission(task);
}

逻辑很容易理解,先判断ForkJoinPool的状态,若已停止,则抛异常返回。之后如果当前线程是ForkJoinWorkerThread类型的,则将任务追加到ForkJoinWorkerThread对象中维护的队列上,否则将新的任务放入ForkJoinPool的提交队列中,并通知线程工作。

1.3. 线程的启动和工作

前面已经强调过,ForkJoinPool和ForkJoinWorkerThread是紧密相关,耦合在一起的。Thread的start()会调用run(),而ForkJoinWorkerThread类重写了run()方法,会调用对应的线程池ForkJoinPool对象的work()方法。

我们来看一下work()方法的实现。

final void work(ForkJoinWorkerThread w) {boolean swept = false;                // true on empty scanslong c;while (!w.terminate && (int)(c = ctl) >= 0) {int a;                            // active countif (!swept && (a = (int)(c >> AC_SHIFT)) <= 0)swept = scan(w, a);else if (tryAwaitWork(w, c))swept = false;}
}

里面主要是一个while循环体,只要当前的线程和线程池不是处于终止状态,则这个循环一直执行。执行的内容则是这样的,如果能够根据scan()方法得到任务,并执行,否则进入阻塞状态。

我们来看一下scan()方法的实现。

private boolean scan(ForkJoinWorkerThread w, int a) {int g = scanGuard; // mask 0 avoids useless scans if only one activeint m = (parallelism == 1 - a && blockedCount == 0) ? 0 : g & SMASK;ForkJoinWorkerThread[] ws = workers;if (ws == null || ws.length <= m)         // staleness checkreturn false;for (int r = w.seed, k = r, j = -(m + m); j <= m + m; ++j) {ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;ForkJoinWorkerThread v = ws[k & m];if (v != null && (b = v.queueBase) != v.queueTop &&(q = v.queue) != null && (i = (q.length - 1) & b) >= 0) {long u = (i << ASHIFT) + ABASE;if ((t = q[i]) != null && v.queueBase == b &&UNSAFE.compareAndSwapObject(q, u, t, null)) {int d = (v.queueBase = b + 1) - v.queueTop;v.stealHint = w.poolIndex;if (d != 0)signalWork();             // propagate if nonemptyw.execTask(t);}r ^= r << 13; r ^= r >>> 17; w.seed = r ^ (r << 5);return false;                     // store next seed}else if (j < 0) {                     // xorshiftr ^= r << 13; r ^= r >>> 17; k = r ^= r << 5;}else++k;}if (scanGuard != g)                       // staleness checkreturn false;else {                                    // try to take submissionForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;if ((b = queueBase) != queueTop &&(q = submissionQueue) != null &&(i = (q.length - 1) & b) >= 0) {long u = (i << ASHIFT) + ABASE;if ((t = q[i]) != null && queueBase == b &&UNSAFE.compareAndSwapObject(q, u, t, null)) {queueBase = b + 1;w.execTask(t);}return false;}return true;                         // all queues empty}
}

看起来很复杂,实际的原理则很简单,就是先尝试做任务窃取( Work Stealing ),如果不满足条件则到提交队列中获取任务。而ForkJoinWorkerThread线程本身也维护了线程内fork和join任务操作得到的队列,结合起来,总体执行任务的顺序就是:

  • 线程会先执行ForkJoinWorkerThread对象内维护的任务队列中的任务,即ForkJoinWorkerThread的execTask()方法中的循环实现。通常是LIFO,即去最新的任务。也有特殊情况,这个根据变量locallyFifo的值来判断。
  • 之后会尝试做任务窃取,尝试从其他线程中获取任务
  • 任务窃取条件不满足时,到提交队列中获取提交的任务

1.4. ForkJoinPool的其它属性

除了上述提到的操作,ForkJoin中还维护了

  • 线程数组和提交任务的队列,这是最基本的
  • 操作相关的锁和条件对象
  • volatile long ctl; 等线程池ForkJoinPool状态的属性
  • static final Random workerSeedGenerator; 等和任务窃取策略相关的一系列属性
  • private volatile long stealCount; 等数据统计相关属性

等数据属性。

2. ForkJoinWorkerThread

ForkJoinWorkerThread扩展于Thread类,但提供了很多支持ForkJoin的特性。

上文在介绍ForkJoinPool的时候已经对这个类做了很多描述,也强调过线程类ForkJoinWorkerThread和ForkJoinPool相互依赖,放在一起才有意义。实际上,还要提到描述Fork Join任务的类ForkJoinTask。

除了上面提到的以外,对于ForkJoinWorkerThread这个类,再稍微提一下这样几个点:

  • ForkJoinTask<?>[] queue; 这是维护和ForkJoin相关的(子)任务队列,还有queueTop和queueBase属性,分别标记队列的尾部和头部
  • final ForkJoinPool pool; 指向线程池的引用,需要注意的是,这个属性被final修饰
  • 和ForkJoinTask的fork()和join()方法相关的方法——pushTask()和unpushTask(),分别负责在当前ForkJoinWorkerThread对象维护的队列中新增和取回任务
  • 其它与状态和统计相关的属性

3. ForkJoinTask及两个抽象子类

ForkJoinTask是ForkJoin框架中的主体,是ForkJoin中任务的体现。这个类实现了Future和Serializable接口。除了Futrue接口要满足的方法外,我想有这样3个方法是有必要知道的,分别是fork()、join()和exec()。

对于fork(),这个也许大家都很熟悉了,在这里也就是分解出子任务的执行。这个在实现上很简单那,就是在当前线程ForkJoinWorkerThread对象维护的队列中加入新的子任务。实现如下:

public final ForkJoinTask fork() {((ForkJoinWorkerThread) Thread.currentThread()).pushTask(this);return this;
}

需要注意的是fork()方法的调用是在当前线程对象为ForkJoinWorkerThread的条件下。

我们再来看看对应的join()实现:

public final V join() {if (doJoin() != NORMAL)return reportResult();elsereturn getRawResult();
}

显然,它有调用了doJoin()方法,我们再来深入了解下。

private int doJoin() {Thread t; ForkJoinWorkerThread w; int s; boolean completed;if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {if ((s = status) < 0)return s;if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) {try {completed = exec();} catch (Throwable rex) {return setExceptionalCompletion(rex);}if (completed)return setCompletion(NORMAL);}return w.joinTask(this);}elsereturn externalAwaitDone();
}

大概的逻辑是这样的,在当前线程对象为ForkJoinWorkerThread的条件下,从队列中取回当前任务ForkJoinTask对象,并尝试在调用线程对其直接执行,否则当前线程调用wait()阻塞等待。更深入的理解可续继续查阅源码。

最后,我们再来看看exec()方法,这个是在ForkJoinTask中是没有给出实现的。

在JDK中,有ForkJoinTask的两个抽象子类RecursiveAction和RecursiveTask,他们分别给出了exec()的实现,这也是这两个子类主要做的事情,实际上是调用了各自的compute()方法,而在RecursiveAction和RecursiveTask中compute()又是未给出实现的。

实际上,compute()方法就是Fork Join要执行的内容,是Fork Join任务的实质,需要开发者给出。

而RecursiveAction和RecursiveTask就是方便开发者使用Fork Join的,RecursiveAction和RecursiveTask这两个类的区别仅仅是返回结果的情况不同。而这个compute()方法就是留给开发者继承扩展使用的。这个会在下篇文章详细讲述。

转载于:https://my.oschina.net/3715cc/blog/678794

Java7中的ForkJoin并发框架初探(中)——JDK中实现简要分析相关推荐

  1. [转]Java7中的ForkJoin并发框架初探(上)——需求背景和设计原理

    详见: http://blog.yemou.net/article/query/info/tytfjhfascvhzxcytp83 这篇我们来简要了解一下JavaSE7中提供的一个新特性 -- For ...

  2. Java7中的ForkJoin并发框架初探(上)——需求背景和设计原理

    原文:发表于 2013 年 8 月 26 日 由 三石 0. 处理器发展和需求背景 回想一下并发开发的初衷,其实可以说是有两点,或者说可以从两个方面看. 对于单核的处理器来说,在进行IO操作等比较费时 ...

  3. #Python3中tornado高并发框架

    Python3中tornado高并发框架 简介: Tornado是一种 Web 服务器软件的开源版本.Tornado 和现在的主流 Web 服务器框架(包括大多数 Python 的框架)有着明显的区别 ...

  4. 在java中 以下关于集合框架_在Java中LinkedList类和ArrayList类同属于集合框架类,下列...

    [单选题]PAL 制标准视频像素长宽比是: [单选题]在节目编辑过程中可以任意编辑镜头顺序的编辑方式是: [单选题]气管上皮内无: [单选题]将溶液定量转移至容量瓶后,加水至容量瓶的( )容积时,开始 ...

  5. java disruptor压测_Java并发框架Disruptor实现原理与源码分析(二) 缓存行填充与CAS操作...

    ##缓存行填充 关于缓存行填充在我个人的印象里面第一次看到是在Java的java.util.concurrent包中,因为当时很好奇其用法背后的逻辑,所以查了很多资料才明白到底是怎么回事*(也许事实上 ...

  6. 【Android 逆向】使用 Python 解析 ELF 文件 ( Capstone 反汇编框架 | PyCharm 中导入 Capstone 反汇编框架 )

    文章目录 一.Capstone 反汇编框架 二.PyCharm 中导入 Capstone 反汇编框架 一.Capstone 反汇编框架 Android 的 APK 安装文件中 , 可能存在若干 so ...

  7. sql判断基数_SQL Server中的基数估计框架版本控制

    sql判断基数 This is a small post about how you may control the cardinality estimator version and determi ...

  8. html网页制作浮动框架,浮动框架在网页制作中使用技巧.doc

    浮动框架在网页制作中使用技巧 浮动框架在网页制作中使用技巧摘要:首先介绍了浮动框架的应用特点,然后结合实际的使用技巧介绍了在浮动框架中:可以制作与其它页面之间的链接,设置浮动框架的滚动条及区域的颜色, ...

  9. java中fork函数_java中的forkjoin框架的使用

    fork join框架是java 7中引入框架,这个框架的引入主要是为了提升并行计算的能力. fork join主要有两个步骤,第一就是fork,将一个大任务分成很多个小任务,第二就是join,将第一 ...

最新文章

  1. 超级实用的 MySQL 常用优化指南!
  2. 第一节 并发基础概念及实现、进程、线程基本概念
  3. Armijo-Goldstein和wolfe-power的matlab代码实现(转)
  4. python selenium 等待元素出现_Python Selenium等待加载几个元素
  5. php定义常量、判断有没有被定义、预定义常量、显示所有常量
  6. centos 卸载软件_一篇看懂!详解-Linux系统中安装软件的三种方法
  7. 机构研究显示iPhone 12 Pro Max是美国最受欢迎5G手机
  8. VoiceDial首款语音识别软件使用教程
  9. JavaWeb — session+实战项目
  10. 《单细胞生物》教学反思
  11. php扩展ui,jQuery UI 扩展小部件
  12. python保存文件后缀_python文件的后缀名是什么
  13. 硬件工程师岗位应聘为什么都要求精通CC++呢,这其中有什么说法吗
  14. 2023西安电子科技大学计算机考研信息汇总
  15. Bezout's Lemma 学习笔记
  16. python实现单机斗地主手机版下载_单机斗地主(单机版)无需网络下载
  17. PROCAST-重力铸造分析流程
  18. 在B站,没有什么是不能搞CP的
  19. 2个动作,让研发效率提升120%,代码减少50%
  20. 打印出所有的水仙花数,所谓水仙花数是指一个三位数,其各位数字立方和等于该数本身。

热门文章

  1. 【C/C++】与const有关的指针类型赋值
  2. linux之xargs使用技巧
  3. 别把机器学习和人工智能搞混了! 1
  4. mysqldump 导出中文乱码
  5. 学号:201621123032 《Java程序设计》第7周学习总结
  6. Appium框架中Android下EditText内容清除
  7. IOS开发常用插件(二)
  8. Cobbler安装指南
  9. 财务一体化项目,进度与计划31
  10. 8-1 数据库分库分表的几种方式