背景:ForkJoinPool的优势在于,可以充分利用多cpu,多核cpu的优势,把一个任务拆分成多个“小任务”,把多个“小任务”放到多个处理器核心上并行执行;当多个“小任务”执行完成之后,再将这些执行结果合并起来即可。这种思想值得学习。

介绍

Java7 提供了ForkJoinPool来支持将一个任务拆分成多个“小任务”并行计算,再把多个“小任务”的结果合并成总的计算结果。

ForkJoinPool是ExecutorService的实现类,因此是一种特殊的线程池。

使用方法:创建了ForkJoinPool实例之后,就可以调用ForkJoinPool的submit(ForkJoinTask<T> task) 或invoke(ForkJoinTask<T> task)方法来执行指定任务了。

其中ForkJoinTask代表一个可以并行、合并的任务。ForkJoinTask是一个抽象类,它还有两个抽象子类:RecusiveAction和RecusiveTask。其中RecusiveTask代表有返回值的任务,而RecusiveAction代表没有返回值的任务。

下面的UML类图显示了ForkJoinPool、ForkJoinTask之间的关系:

案列一:通过多线程分多个小任务进行打印数据  无返回值的

/*** */
package forkJoinPool;import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;/*** @author liuchaojun* @date 2018-9-10 下午02:57:29* * RecursiveAction  无返回值的* */
public class PrintTask extends RecursiveAction {private static final long serialVersionUID = 1L;private static final int INDEX = 50;private int start;private int end;/*** */public PrintTask(int start, int end) {// TODO Auto-generated constructor stubthis.start = start;this.end = end;}/** (non-Javadoc)* * @see java.util.concurrent.RecursiveAction#compute()*/@Overrideprotected void compute() {if (end - start < INDEX) {for (int i = start; i < end; i++) {System.out.println(Thread.currentThread().getName() + "----"+ i);}} else {int middle = (end + start) / 2;PrintTask taskLeft = new PrintTask(start, middle);PrintTask taskRight = new PrintTask(middle, end);//taskLeft.invoke();执行给定的任务,在完成后返回其结果。//并行执行两个“小任务”
/*          taskLeft.fork();taskRight.fork();*/invokeAll(taskLeft, taskRight);//执行给定的任务}}public static void main(String[] args) throws InterruptedException {PrintTask task = new PrintTask(0, 300);ForkJoinPool pool = new ForkJoinPool();pool.submit(task);pool.awaitTermination(2, TimeUnit.SECONDS);//阻塞2秒pool.shutdown();}
}

运行结果

本机电脑cpu4核,通过上面观察线程名称,可以看出4个cpu都在进行

案列二:通过多线程分多个小任务进行数据累加  返回结果集

package forkJoinPool;
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.TimeUnit;/*** @author liuchaojun* @date 2018-9-10 下午03:50:24*/
public class PrintTask2 {public static void main(String[] args) throws Exception {int[] arr = new int[200];Random r = new Random();int tempSum = 0;// 普通总数for (int i = 0; i < arr.length; i++) {tempSum += (arr[i] = r.nextInt(10));}System.out.println("普通总数结果为:" + tempSum);ForkJoinPool pool = new ForkJoinPool();MyTask task = new MyTask(0, arr.length, arr);Future<Integer> sum = pool.submit(task);System.out.println("多线程的执行结果:" + sum.get());// get 如果需要,等待计算完成,然后检索其结果。pool.awaitTermination(2, TimeUnit.SECONDS);pool.shutdown(); // 关闭线程池}
}class MyTask extends RecursiveTask<Integer> {private static final long serialVersionUID = 1L;private static final int INDEX = 50;// 每个小任务执行50个private int start;private int end;private int arr[];/*** @param start* @param end* @param arr*/public MyTask(int start, int end, int[] arr) {this.start = start;this.end = end;this.arr = arr;}/** (non-Javadoc)* * @see java.util.concurrent.RecursiveTask#compute()*/@Overrideprotected Integer compute() {int sum = 0;if (end - start < INDEX) {for (int i = start; i < end; i++) {sum += arr[i];}return sum;} else {int middle = (end + start) / 2;MyTask taskLeft2 = new MyTask(start, middle, arr);MyTask taskRight2 = new MyTask(middle, end, arr);/*invokeAll(taskLeft2, taskRight2);*/taskLeft2.fork();taskRight2.fork();int leftValue = taskLeft2.join();// 当计算完成时返回计算结果。int rightValue = taskRight2.join();return leftValue + rightValue;}}
}

运行结果:

提供一些API 的方法,可以参考下

ForkJoinPool类

    • 构造方法摘要

      Constructors 
      构造方法与描述
      ForkJoinPool()

      Creates a ForkJoinPool with parallelism equal to Runtime.availableProcessors(), using the default thread factory, no UncaughtExceptionHandler, and non-async LIFO processing mode.

      ForkJoinPool(int parallelism)

      Creates a ForkJoinPool with the indicated parallelism level, the default thread factory, no UncaughtExceptionHandler, and non-async LIFO processing mode.

      ForkJoinPool(int parallelism, ForkJoinPool.ForkJoinWorkerThreadFactory factory, Thread.UncaughtExceptionHandler handler, boolean asyncMode)

      Creates a ForkJoinPool with the given parameters.

    • 方法摘要

      Methods 
      修饰符与类型 方法与描述
      boolean awaitTermination(long timeout, TimeUnit unit)

      Blocks until all tasks have completed execution after a shutdown request, or the timeout occurs, or the current thread is interrupted, whichever happens first.

      protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c)

      Removes all available unexecuted submitted and forked tasks from scheduling queues and adds them to the given collection, without altering their execution status.

      void execute(ForkJoinTask<?> task)

      Arranges for (asynchronous) execution of the given task.

      void execute(Runnable task)

      Executes the given command at some time in the future.

      int getActiveThreadCount()

      Returns an estimate of the number of threads that are currently stealing or executing tasks.

      boolean getAsyncMode()

      Returns true if this pool uses local first-in-first-out scheduling mode for forked tasks that are never joined.

      ForkJoinPool.ForkJoinWorkerThreadFactory getFactory()

      Returns the factory used for constructing new workers.

      int getParallelism()

      Returns the targeted parallelism level of this pool.

      int getPoolSize()

      Returns the number of worker threads that have started but not yet terminated.

      int getQueuedSubmissionCount()

      Returns an estimate of the number of tasks submitted to this pool that have not yet begun executing.

      long getQueuedTaskCount()

      Returns an estimate of the total number of tasks currently held in queues by worker threads (but not including tasks submitted to the pool that have not begun executing).

      int getRunningThreadCount()

      Returns an estimate of the number of worker threads that are not blocked waiting to join tasks or for other managed synchronization.

      long getStealCount()

      Returns an estimate of the total number of tasks stolen from one thread's work queue by another.

      Thread.UncaughtExceptionHandler getUncaughtExceptionHandler()

      Returns the handler for internal worker threads that terminate due to unrecoverable errors encountered while executing tasks.

      boolean hasQueuedSubmissions()

      Returns true if there are any tasks submitted to this pool that have not yet begun executing.

      <T> T invoke(ForkJoinTask<T> task)

      Performs the given task, returning its result upon completion.

      <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)

      Executes the given tasks, returning a list of Futures holding their status and results when all complete.

      boolean isQuiescent()

      Returns true if all worker threads are currently idle.

      boolean isShutdown()

      Returns true if this pool has been shut down.

      boolean isTerminated()

      Returns true if all tasks have completed following shut down.

      boolean isTerminating()

      Returns true if the process of termination has commenced but not yet completed.

      static void managedBlock(ForkJoinPool.ManagedBlocker blocker)

      Blocks in accord with the given blocker.

      protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable)

      Returns a RunnableFuture for the given callable task.

      protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value)

      Returns a RunnableFuture for the given runnable and default value.

      protected ForkJoinTask<?> pollSubmission()

      Removes and returns the next unexecuted submission if one is available.

      void shutdown()

      Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted.

      List<Runnable> shutdownNow()

      Attempts to cancel and/or stop all tasks, and reject all subsequently submitted tasks.

      <T> ForkJoinTask<T> submit(Callable<T> task)

      Submits a value-returning task for execution and returns a Future representing the pending results of the task.

      <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)

      Submits a ForkJoinTask for execution.

      ForkJoinTask<?> submit(Runnable task)

      Submits a Runnable task for execution and returns a Future representing that task.

      <T> ForkJoinTask<T> submit(Runnable task, T result)

      Submits a Runnable task for execution and returns a Future representing that task.

      String toString()

      Returns a string identifying this pool, as well as its state, including indications of run state, parallelism level, and worker and task counts.

  • ExecutorService接口

    • 方法摘要

      Methods 
      修饰符与类型 方法与描述
      boolean awaitTermination(long timeout, TimeUnit unit)

      Blocks until all tasks have completed execution after a shutdown request, or the timeout occurs, or the current thread is interrupted, whichever happens first.

      <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)

      Executes the given tasks, returning a list of Futures holding their status and results when all complete.

      <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)

      Executes the given tasks, returning a list of Futures holding their status and results when all complete or the timeout expires, whichever happens first.

      <T> T invokeAny(Collection<? extends Callable<T>> tasks)

      Executes the given tasks, returning the result of one that has completed successfully (i.e., without throwing an exception), if any do.

      <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)

      Executes the given tasks, returning the result of one that has completed successfully (i.e., without throwing an exception), if any do before the given timeout elapses.

      boolean isShutdown()

      Returns true if this executor has been shut down.

      boolean isTerminated()

      Returns true if all tasks have completed following shut down.

      void shutdown()

      Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted.

      List<Runnable> shutdownNow()

      Attempts to stop all actively executing tasks, halts the processing of waiting tasks, and returns a list of the tasks that were awaiting execution.

      <T> Future<T> submit(Callable<T> task)

      Submits a value-returning task for execution and returns a Future representing the pending results of the task.

      Future<?> submit(Runnable task)

      Submits a Runnable task for execution and returns a Future representing that task.

      <T> Future<T> submit(Runnable task, T result)

      Submits a Runnable task for execution and returns a Future representing that task.

  • 总结

1,invokeAll(task)方法,主动执行其它的ForkJoinTask,并等待Task完成。(同步的)

2,fork方法,让一个task执行(异步的)

3,join方法,让一个task执行(同步的,它和fork不同点是同步或者异步的区别)

4,可以使用join来取得ForkJoinTask的返回值。由于RecursiveTask类实现了Future接口,所以也可以使用get()取得返回值。 
get()和join()有两个主要的区别: 
join()方法不能被中断。如果你中断调用join()方法的线程,这个方法将抛出InterruptedException异常。 
如果任务抛出任何未受检异常,get()方法将返回一个ExecutionException异常,而join()方法将返回一个RuntimeException异常。

5,ForkJoinTask在不显示使用ForkJoinPool.execute/invoke/submit()方法进行执行的情况下,也可以使用自己的fork/invoke方法进行执行。 
使用fork/invoke方法执行时,其实原理也是在ForkJoinPool里执行,只不过使用的是一个“在ForkJoinPool内部生成的静态的”ForkJoinPool。

6,ForkJoinTask有两个子类,RecursiveAction和RecursiveTask。他们之间的区别是,RecursiveAction没有返回值,RecursiveTask有返回值。

7,看看ForkjoinTask的Complete方法的使用场景 
这个方法好要是用来使一个任务结束。这个方法被用在结束异步任务上,或者为那些能不正常结束的任务,提供一个选择。

8,Task的completeExceptionally方法是怎么回事。 
这个方法被用来,在异步的Task中产生一个exception,或者强制结束那些“不会结束”的任务 
这个方法是在Task想要“自己结束自己”时,可以被使用。而cancel方法,被设计成被其它TASK调用。 
当你在一个任务中抛出一个未检查异常时,它也影响到它的父任务(把它提交到ForkJoinPool类的任务)和父任务的父任务,以此类推。

9,可以使用ForkJoinPool.execute(异步,不返回结果)、invoke(同步,返回结果)、submit(异步,返回结果)方法,来执行ForkJoinTask。

10,ForkJoinPool有一个方法commonPool(),这个方法返回一个ForkJoinPool内部声明的静态ForkJoinPool实例。 在jdk1.8里面才有。文档上说,这个方法适用于大多数的应用。这个静态实例的初始线程数,为“CPU核数-1 ”(Runtime.getRuntime().availableProcessors() - 1)。 
ForkJoinTask自己启动时,使用的就是这个静态实例。

多线程 ForkJoinPool相关推荐

  1. java多线程 异常处理_Java8多线程ForkJoinPool:处理异常

    java多线程 异常处理 引入Java8 lambda背后的主要动机之一是能够尽可能轻松地使用多核的能力(请参阅精通Lambdas:多核世界中的Java编程 ). 只需将代码从collection.s ...

  2. Java8多线程ForkJoinPool:处理异常

    引入Java8 lambda背后的主要动机之一是能够尽可能轻松地使用多核的能力(请参阅精通Lambdas:多核世界中的Java编程 ). 只需将代码从collection.stream()...更改为 ...

  3. java 运行main_使用maven运行Java Main的三种方法解析

    maven使用exec插件运行java main方法,以下是3种不同的操作方式. 一.从命令行运行 1.运行前先编译代码,exec:java不会自动编译代码,你需要手动执行mvn compile来完成 ...

  4. java当中有关循环的代码_有关Java循环的内容,编程中还是比较常用的,下面分享给大家几个循环的示例代码,练习一下。1、循环输出1到100之间所有能被3或能被4整除的数。pack...

    有关Java循环的内容,编程中还是比较常用的,下面分享给大家几个循环的示例代码,练习一下. 1.循环输出1到100之间所有能被3或能被4整除的数. package com.hz.loop02; /** ...

  5. java main 运行_使用maven运行Java Main的三种方法解析

    导读热词 maven使用exec插件运行java main方法,以下是3种不同的操作方式. 一.从命令行运行 1.运行前先编译代码,exec:java不会自动编译代码,你需要手动执行mvn compi ...

  6. java多线程工具类_Java多线程系列之:线程的并发工具类

    一,Fork-Join 1,定义: Fork-Join框架:就是在必要的情况下,将一个大任务,进行拆分(fork)成若干个小任务(拆到不能再拆时),再将一个个的小任务运算的结果进行join汇总. 2, ...

  7. command对象提供的3个execute方法是_前阿里P9的Java面试重点3:多线程

    1. 并行和并发有什么区别? 并行:多个处理器或多核处理器同时处理多个任务. 并发:多个任务在同一个 CPU 核上,按细分的时间片轮流(交替)执行,从逻辑上来看那些任务是同时执行. 如下图: 并发 = ...

  8. android串口补位,Rust多线程中的消息传递机制

    代码说话. use std::thread; use std::sync::mpsc; use std::time::Duration; fn main() { let (tx, rx) = mpsc ...

  9. 多线程面试题_100多线程和Java并发面试问答–最终清单(PDF下载)

    多线程面试题 在这篇文章中,我们将提供有关多线程和Java并发面试问答的综合文章. 编者注:并发始终是开发人员的挑战,编写并发程序可能非常困难. 引入并发时,有很多事情可能会崩溃,并且系统的复杂性会大 ...

最新文章

  1. 机器学习实战 k-近邻算法 使用matplotlib创建散点图
  2. springboot springcloud 热部署
  3. 学好python能干嘛-普通人学Python有用吗?学完能做什么?
  4. HDU1040-As Easy As A+B
  5. springboot中下面哪一个作为jpa默认实现_35个超高频SpringBoot知识点(附解析),别怪我没给你机会收藏...
  6. easyui树拖拽排序java_easyui tree 拖拽功能并将数据返回后台保存至数据库
  7. 问题解决:Sublime 乱码显示GBK编码文件解决
  8. 话里话外:谁才是流程的主人
  9. Unity中获取鼠标相对于UI组件的位置
  10. 优秀架构师是怎么炼成的?
  11. python基础教程解压密码_python学习手册视频教程压缩包解压密码?
  12. 微信小游戏 H5 排行榜源码
  13. 51单片机外围模块——DS1302时钟模块
  14. shopnc mysql_(转) shopnc数据库操作
  15. sonarqube+scanner代码质量检查
  16. JAVA -- 正则表达式高级学习技巧
  17. Debian 11 修改 DNS 服务器
  18. 惠州市有哪些学计算机的学校,惠州有哪些好学校?
  19. Java excel poi 读取已有文件 ,动态插入一列数据
  20. 计算机网络实验2--简单企业网络搭建

热门文章

  1. HTML文本域添加滑杆,Objective-C 自定义UISlider滑杆 分段样式
  2. java 取磁盘阵列容量_硬盘阵列 Raid 的区别及容量计算方式
  3. LaTeX 使用 \begin{aligned} 出现错误代码 Environment aligned undefined. \begin{aligned}
  4. 关于Fatal NI connect error 12170
  5. oracle学习笔记(四)-- 数学函数
  6. 【EF】对象名 'dbo.EdmMetadata' 无效。
  7. SAP 标准成本、计划成本、目标成本、实际成本解析
  8. 基于G6-Editor的流程图编辑器
  9. 「技巧」如何快速安装 Sketch 插件
  10. Python中路径的写法