

Java7 提供了ForkJoinPool来支持将一个任务拆分成多个“小任务”并行计算,再把多个“小任务”的结果合并成总的计算结果。


使用方法:创建了ForkJoinPool实例之后,就可以调用ForkJoinPool的submit(ForkJoinTask<T> task) 或invoke(ForkJoinTask<T> task)方法来执行指定任务了。



案列一:通过多线程分多个小任务进行打印数据  无返回值的

/*** */
package forkJoinPool;import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;/*** @author liuchaojun* @date 2018-9-10 下午02:57:29* * RecursiveAction  无返回值的* */
public class PrintTask extends RecursiveAction {private static final long serialVersionUID = 1L;private static final int INDEX = 50;private int start;private int end;/*** */public PrintTask(int start, int end) {// TODO Auto-generated constructor stubthis.start = start;this.end = end;}/** (non-Javadoc)* * @see java.util.concurrent.RecursiveAction#compute()*/@Overrideprotected void compute() {if (end - start < INDEX) {for (int i = start; i < end; i++) {System.out.println(Thread.currentThread().getName() + "----"+ i);}} else {int middle = (end + start) / 2;PrintTask taskLeft = new PrintTask(start, middle);PrintTask taskRight = new PrintTask(middle, end);//taskLeft.invoke();执行给定的任务,在完成后返回其结果。//并行执行两个“小任务”
/*          taskLeft.fork();taskRight.fork();*/invokeAll(taskLeft, taskRight);//执行给定的任务}}public static void main(String[] args) throws InterruptedException {PrintTask task = new PrintTask(0, 300);ForkJoinPool pool = new ForkJoinPool();pool.submit(task);pool.awaitTermination(2, TimeUnit.SECONDS);//阻塞2秒pool.shutdown();}



案列二:通过多线程分多个小任务进行数据累加  返回结果集

package forkJoinPool;
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.TimeUnit;/*** @author liuchaojun* @date 2018-9-10 下午03:50:24*/
public class PrintTask2 {public static void main(String[] args) throws Exception {int[] arr = new int[200];Random r = new Random();int tempSum = 0;// 普通总数for (int i = 0; i < arr.length; i++) {tempSum += (arr[i] = r.nextInt(10));}System.out.println("普通总数结果为:" + tempSum);ForkJoinPool pool = new ForkJoinPool();MyTask task = new MyTask(0, arr.length, arr);Future<Integer> sum = pool.submit(task);System.out.println("多线程的执行结果:" + sum.get());// get 如果需要,等待计算完成,然后检索其结果。pool.awaitTermination(2, TimeUnit.SECONDS);pool.shutdown(); // 关闭线程池}
}class MyTask extends RecursiveTask<Integer> {private static final long serialVersionUID = 1L;private static final int INDEX = 50;// 每个小任务执行50个private int start;private int end;private int arr[];/*** @param start* @param end* @param arr*/public MyTask(int start, int end, int[] arr) {this.start = start;this.end = end;this.arr = arr;}/** (non-Javadoc)* * @see java.util.concurrent.RecursiveTask#compute()*/@Overrideprotected Integer compute() {int sum = 0;if (end - start < INDEX) {for (int i = start; i < end; i++) {sum += arr[i];}return sum;} else {int middle = (end + start) / 2;MyTask taskLeft2 = new MyTask(start, middle, arr);MyTask taskRight2 = new MyTask(middle, end, arr);/*invokeAll(taskLeft2, taskRight2);*/taskLeft2.fork();taskRight2.fork();int leftValue = taskLeft2.join();// 当计算完成时返回计算结果。int rightValue = taskRight2.join();return leftValue + rightValue;}}


提供一些API 的方法,可以参考下


    • 构造方法摘要


      Creates a ForkJoinPool with parallelism equal to Runtime.availableProcessors(), using the default thread factory, no UncaughtExceptionHandler, and non-async LIFO processing mode.

      ForkJoinPool(int parallelism)

      Creates a ForkJoinPool with the indicated parallelism level, the default thread factory, no UncaughtExceptionHandler, and non-async LIFO processing mode.

      ForkJoinPool(int parallelism, ForkJoinPool.ForkJoinWorkerThreadFactory factory, Thread.UncaughtExceptionHandler handler, boolean asyncMode)

      Creates a ForkJoinPool with the given parameters.

    • 方法摘要

      修饰符与类型 方法与描述
      boolean awaitTermination(long timeout, TimeUnit unit)

      Blocks until all tasks have completed execution after a shutdown request, or the timeout occurs, or the current thread is interrupted, whichever happens first.

      protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c)

      Removes all available unexecuted submitted and forked tasks from scheduling queues and adds them to the given collection, without altering their execution status.

      void execute(ForkJoinTask<?> task)

      Arranges for (asynchronous) execution of the given task.

      void execute(Runnable task)

      Executes the given command at some time in the future.

      int getActiveThreadCount()

      Returns an estimate of the number of threads that are currently stealing or executing tasks.

      boolean getAsyncMode()

      Returns true if this pool uses local first-in-first-out scheduling mode for forked tasks that are never joined.

      ForkJoinPool.ForkJoinWorkerThreadFactory getFactory()

      Returns the factory used for constructing new workers.

      int getParallelism()

      Returns the targeted parallelism level of this pool.

      int getPoolSize()

      Returns the number of worker threads that have started but not yet terminated.

      int getQueuedSubmissionCount()

      Returns an estimate of the number of tasks submitted to this pool that have not yet begun executing.

      long getQueuedTaskCount()

      Returns an estimate of the total number of tasks currently held in queues by worker threads (but not including tasks submitted to the pool that have not begun executing).

      int getRunningThreadCount()

      Returns an estimate of the number of worker threads that are not blocked waiting to join tasks or for other managed synchronization.

      long getStealCount()

      Returns an estimate of the total number of tasks stolen from one thread's work queue by another.

      Thread.UncaughtExceptionHandler getUncaughtExceptionHandler()

      Returns the handler for internal worker threads that terminate due to unrecoverable errors encountered while executing tasks.

      boolean hasQueuedSubmissions()

      Returns true if there are any tasks submitted to this pool that have not yet begun executing.

      <T> T invoke(ForkJoinTask<T> task)

      Performs the given task, returning its result upon completion.

      <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)

      Executes the given tasks, returning a list of Futures holding their status and results when all complete.

      boolean isQuiescent()

      Returns true if all worker threads are currently idle.

      boolean isShutdown()

      Returns true if this pool has been shut down.

      boolean isTerminated()

      Returns true if all tasks have completed following shut down.

      boolean isTerminating()

      Returns true if the process of termination has commenced but not yet completed.

      static void managedBlock(ForkJoinPool.ManagedBlocker blocker)

      Blocks in accord with the given blocker.

      protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable)

      Returns a RunnableFuture for the given callable task.

      protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value)

      Returns a RunnableFuture for the given runnable and default value.

      protected ForkJoinTask<?> pollSubmission()

      Removes and returns the next unexecuted submission if one is available.

      void shutdown()

      Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted.

      List<Runnable> shutdownNow()

      Attempts to cancel and/or stop all tasks, and reject all subsequently submitted tasks.

      <T> ForkJoinTask<T> submit(Callable<T> task)

      Submits a value-returning task for execution and returns a Future representing the pending results of the task.

      <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)

      Submits a ForkJoinTask for execution.

      ForkJoinTask<?> submit(Runnable task)

      Submits a Runnable task for execution and returns a Future representing that task.

      <T> ForkJoinTask<T> submit(Runnable task, T result)

      Submits a Runnable task for execution and returns a Future representing that task.

      String toString()

      Returns a string identifying this pool, as well as its state, including indications of run state, parallelism level, and worker and task counts.

  • ExecutorService接口

    • 方法摘要

      修饰符与类型 方法与描述
      boolean awaitTermination(long timeout, TimeUnit unit)

      Blocks until all tasks have completed execution after a shutdown request, or the timeout occurs, or the current thread is interrupted, whichever happens first.

      <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)

      Executes the given tasks, returning a list of Futures holding their status and results when all complete.

      <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)

      Executes the given tasks, returning a list of Futures holding their status and results when all complete or the timeout expires, whichever happens first.

      <T> T invokeAny(Collection<? extends Callable<T>> tasks)

      Executes the given tasks, returning the result of one that has completed successfully (i.e., without throwing an exception), if any do.

      <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)

      Executes the given tasks, returning the result of one that has completed successfully (i.e., without throwing an exception), if any do before the given timeout elapses.

      boolean isShutdown()

      Returns true if this executor has been shut down.

      boolean isTerminated()

      Returns true if all tasks have completed following shut down.

      void shutdown()

      Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted.

      List<Runnable> shutdownNow()

      Attempts to stop all actively executing tasks, halts the processing of waiting tasks, and returns a list of the tasks that were awaiting execution.

      <T> Future<T> submit(Callable<T> task)

      Submits a value-returning task for execution and returns a Future representing the pending results of the task.

      Future<?> submit(Runnable task)

      Submits a Runnable task for execution and returns a Future representing that task.

      <T> Future<T> submit(Runnable task, T result)

      Submits a Runnable task for execution and returns a Future representing that task.

  • 总结










10,ForkJoinPool有一个方法commonPool(),这个方法返回一个ForkJoinPool内部声明的静态ForkJoinPool实例。 在jdk1.8里面才有。文档上说,这个方法适用于大多数的应用。这个静态实例的初始线程数,为“CPU核数-1 ”(Runtime.getRuntime().availableProcessors() - 1)。 

