摘要

在高并发场景下,常见的设计模式可能存在线程安全问题,比如传统的单例模式就是一个典型。另外,为了充分发挥多核优势,高并发程序常常将大的任务分割成一些规模较小的任务,以便各个击破、分而治之,这就出现了一些高并发场景下特有的设计模式,比如ForkJoin模式等。博文将详细介绍在高并发场景常用的几种模式∶线程安全的单例模式、ForkJoin模式、生产者-消费者模式、Master-Worker模式和Future模式。

一、线程安全的单例模式

单例模式是常见的一种设计模式,一般用于全局对象管理,比如XML读写实例、系统配置实例、任务调度实例、数据库连接池实例等。

1.1 饿汉式单例设计模式

1.2 懒汉式单例设计模式

1.3 Double check单例设计模式

二、Master-Worker设计模式

Master-Worker模式是一种常见的高并发模式,它的核心思想是任务的调度和执行分离,调度任务的角色为Master,执行任务的角色为Worker,Master负责接收、分配任务和合并(Merge)任务结果,Worker负责执行任务。Master-Worker模式是一种归并类型的模式。

举一个例子,在TCP服务端的请求处理过程中,大量的客户端连接相当于大量的任务,Master需要将这些任务存储在一个任务队列中,然后分发给各个Worker,每个Worker是一个工作线程,负责完成连接的传输处理。Master-Worker模式的整体结构如图8-1所示。

2.1 Netty中的Master-work中模型

Master-Worker模式的核心思想为分而治之,Master角色负责接收和分配任务,Worker角色负责执行任务和结果回填,具体如图8-2所示。

实际上,高性能传输模式Reactor模式就是Master-Worker模式在传输领域的一种应用。基于Java的NIO技术,Netty设计了一套优秀的、高性能Reactor(反应器)模式的具体实现。在Netty中,EventLoop反应器内部有一个线程负责Java NIO选择器的事件轮询,然后进行对应的事件分发。事件分发的目标就是Netty的Handler处理程序(含用户定义的业务处理程序)。Netty服务器程序中需要设置两个EventLoopGroup轮询组,一个组负责新连接的监听和接收,另一个组负责IO传输事件的轮询与分发,两个轮询组的职责具体如下:

  • 负责新连接的监听和接收的EventLoopGroup轮询组中的反应器完成查询通道的新连接IO事件查询,这些反应器有点像负责招工的包工头,因此该轮询组可以形象地称为“包工头”(Boss)轮询组。
  • 另一个轮询组中的反应器完成查询所有子通道的IO事件,并且执行对应的Handler处理程序完成IO处理,例如数据的输入和输出(有点像搬砖),这个轮询组可以形象地称为“工人”(Worker)轮询组。

2.2 Nginx中的Master-work中模型

Nginx服务器是Master-Worker模式(更准确地说是Reactor模式)在高性能服务器领域的一种应用。Nginx是一个高性能的HTTP和反向代理Web服务器,Nginx因其高稳定性、丰富的功能集、内存消耗少、并发能力强而闻名全球,目前得到非常广泛的使用,比如百度、京东、新浪、网易、腾讯、淘宝等都是它的用户。

Nginx在启动后会以daemon方式在后台运行,它的后台进程有两类∶―类称为Master进程(相当于管理进程),另一类称为Worker进程(工作进程)。Nginx的进程结构图如图8-4所示。

Nginx的Master进程主要负责调度Worker进程,比如加载配置、启动工作进程、接收来自外界的信号、向各Worker进程发送信号、监控Worker进程的运行状态等。Master进程负责创建监听套接口,交由Worker进程进行连接监听。Worker进程主要用来处理网络事件,当一个Worker进程在接收一条连接通道之后,就开始读取请求、解析请求、处理请求,处理完成产生的数据后,再返回给客户端,最后断开连接通道。

三、Fork-join设计模式

3.1 ForkJoin的原理

“分而治之”是一种思想,所谓“分而治之”,就是把一个复杂的算法问题按一定的“分解”方法分为规模较小的若干部分,然后逐个解决,分别找出各部分的解,最后把各部分的解组成整个问题的解。

“分而治之”思想在软件体系结构设计、模块化设计、基础算法中得到了非常广泛的应用。许多基础算法都运用了“分而治之”的思想,比如二分查找、快速排序等。Master-Worker模式是“分而治之”思想的一种应用,本节所介绍的ForkJoin模式则是“分而治之”思想的另一种应用。与Master-Worker模式不同,ForkJoin模式没有Master角色,其所有的角色都是Worker,ForkJoin模式中的Worker将大的任务分割成小的任务,一直到任务的规模足够小,可以使用很简单、直接的方式来完成。

ForkJoin模式先把一个大任务分解成许多个独立的子任务,然后开启多个线程并行去处理这些子任务。有可能子任务还是很大而需要进一步分解,最终得到足够小的任务。

3.2 ForkJoin有哪些组件

JUC包提供了一套ForkJoin框架的实现,具体以ForkJoinPool线程池的形式提供,并且该线程池在Java 8的Lambda并行流框架中充当着底层框架的角色。JUC包的ForkJoin框架包含如下组件:

  1. ForkJoinPool:执行任务的线程池,继承了AbstractExecutorService类。
  2. ForkJoinWorkerThread:执行任务的工作线程(ForkJoinPool线程池中的线程)。每个线程都维护着一个内部队列,用于存放“内部任务”该类继承了Thread类。
  3. ForkJoinTask:用于ForkJoinPool的任务抽象类,实现了Future接口。
  4. RecursiveTask:带返回结果的递归执行任务,是ForkJoinTask的子类,在子任务带返回结果时使用。
  5. RecursiveAction∶不返回结果的递归执行任务,是ForkJoinTask的子类,在子任务不带返回结果时使用。

ForkJoinPool的构造

ForkJoinPool里三个重要的角色:

  1. ForkJoinWorkerThread(下文简称worker):包装Thread;
  2. WorkQueue:任务队列,双向;
  3. ForkJoinTask:worker执行的对象,实现了Future。两种类型,一种叫submission,另一种就叫task。

ForkJoinPool和ThreadPoolExecutor都是继承自AbstractExecutorService抽象类,所以它和ThreadPoolExecutor的使用几乎没有多少区别,除了任务变成了ForkJoinTask以外。

这里又运用到了一种很重要的设计原则——开闭原则——对修改关闭,对扩展开放。

可见整个线程池体系一开始的接口设计就很好,新增一个线程池类,不会对原有的代码造成干扰,还能利用原有的特性。

ForkJoinPool使用数组保存所有WorkQueue,每个worker有属于自己的WorkQueue,但不是每个WorkQueue都有对应的worker。没有worker的WorkQueue:保存的是submission,来自外部提交,在WorkQueue[]的下标是偶数;属于worker的WorkQueue:保存的是task,在WorkQueue[]的下标是奇数。

    // Instance fieldsvolatile long ctl;                   // main pool controlvolatile int runState;               // lockable statusfinal int config;                    // parallelism, modeint indexSeed;                       // to generate worker indexvolatile WorkQueue[] workQueues;     // main registryfinal ForkJoinWorkerThreadFactory factory;final UncaughtExceptionHandler ueh;  // per-worker UEHfinal String workerNamePrefix;       // to create worker name stringvolatile AtomicLong stealCounter;    // also used as sync monitor

ForkJoinPool有两种获取方法,通过commonPool方法或者通过构造方法创建一个对象。前者是内部的一个静态变量,也就是说在一个进程中共享该ForkJoinPool。下面是构造方法:

public ForkJoinPool() {this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),defaultForkJoinWorkerThreadFactory, null, false);
}
public ForkJoinPool(int parallelism,ForkJoinWorkerThreadFactory factory,UncaughtExceptionHandler handler,boolean asyncMode) {this(checkParallelism(parallelism),checkFactory(factory),handler,asyncMode ? FIFO_QUEUE : LIFO_QUEUE,"ForkJoinPool-" + nextPoolId() + "-worker-");checkPermission();
}
private ForkJoinPool(int parallelism,ForkJoinWorkerThreadFactory factory,UncaughtExceptionHandler handler,int mode,String workerNamePrefix) {this.workerNamePrefix = workerNamePrefix;this.factory = factory;this.ueh = handler;this.config = (parallelism & SMASK) | mode;long np = (long)(-parallelism); // offset ctl countsthis.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}

可以看到这里有三个构造方法的重载,parallelism默认是cpu核心数,factory是线程工厂,ctl是ForkJoinPool中最重要的控制字段,类型是long - 说明有64位,每个部分都有不同的作用。64位分为了四部分,每部分16位代表的值表示一个含义。从高到底以1,2,3,4表示。

  1. 表示活动的线程数,常用AC代替
  2. 表示线程总量,常用TC代替T
  3. 表示工作队列的状态,active的还是inactive,其余十五位表示版本号,防止ABA问题。
  4. 标识idle worker的WorkQueue 在WorkQueue[]数组中的index。这里需要说明的是,ctl的后32位其实只能表示一个idle workers,那么我们如果有很多个idle worker要怎么办呢?老爷子使用的是stack的概念来保存这些信息。后32位标识的是top的那个,我们能从top中的变量stackPred追踪到下一个idle worker。PS:workqueue中有一个workThread字段,用来和worker是关联的。

AC和TC初始化时取的是parallelism负数,后续代码可以直接判断正负,为负代表还没有达到目标数量。另外ctl低32位有个技巧可以直接用sp=(int)ctl取得,为负代表存在空闲worker。config保存不变的参数,包括了parallelism和mode,线程池记录字段是runState。

任务队列 WorkQueue类剖析

用于存放任务的队列,ForkJoin中用双端队列来实现。所谓双端,就是说队列中的元素(ForkJoinTask任务及其子任务)可以从一端入队出队,还可以从另一端入队出队。这个双端队列将用于支持ForkJoinPool的异步模型(asyncMode):后进先出(LIFO_QUEUE)和先进先出(FIFO_QUEUE)。

 @sun.misc.Contended
static final class WorkQueue {//初始化容量static final int INITIAL_QUEUE_CAPACITY = 1 << 13;//最大容量static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M// 队列的状态 // 偶数表示RUNNING  // 奇数表示SCANNING // 如果WorkQueue没有属于自己的owner(下标为偶数的都没有),该值为 inactive 也就是一个负数volatile int scanState;    int stackPred;             // pool stack (ctl) predecessorint nsteals;               // 窃取的个数int hint;                  // int config;                // pool index and mode// 锁标识,在多线程往队列中添加数据会有竞争。 1: locked, < 0: terminate; else 0volatile int qlock;         // 下一个出队元素的索引位(主要是为线程窃取准备的索引位置)volatile int base;         // 为下一个入队元素准备的索引位int top;                   // 队列中使用数组存储任务ForkJoinTask<?>[] array;   // 队列所属的ForkJoinPool(可能为空)final ForkJoinPool pool;  // 这个队列所属的归并计算工作线程。注意:工作队列也可能不属于任何工作线程final ForkJoinWorkerThread owner; volatile Thread parker;    // == owner during call to park; else null// 记录当前正在进行join等待的其它任务volatile ForkJoinTask<?> currentJoin;  // task being joined in awaitJoin// 当前正在偷取的任务volatile ForkJoinTask<?> currentSteal; // mainly used by helpStealer
}
//array的初始化是在任务线程的初始化时完成的,因此这里array不会为空。
final void push(ForkJoinTask<?> task) {ForkJoinTask<?>[] a; ForkJoinPool p;int b = base, s = top, n;if ((a = array) != null) {    // ignore if queue removedint m = a.length - 1;     // fenced write for task visibilityU.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);U.putOrderedInt(this, QTOP, s + 1);if ((n = s - b) <= 1) {if ((p = pool) != null)p.signalWork(p.workQueues, this);}else if (n >= m)growArray();}
}

U是Unsafe的实例,在这里用来操作数组。putOrderedObject方法在指定的对象a中,指定的内存偏移量的位置,赋予一个新的元素。这里是在队列的尾部添加一个元素。

putOrderedInt方法对当前指定的对象中的指定字段,进行赋值操作。这里的代码意义是将workQueue对象本身中的top标示的位置 + 1,s在当前活动的工作线程过少的情况下,通过调用signalWork创建新的工作线程。如果当队列的容量和数组的长度相等时,进行扩容。

ForkJoinPool中主要的工作线程,采用ForkJoinWorkerThread定义,其中有两个主要属性pool和workQueue:

public class ForkJoinWorkerThread extends Thread {final ForkJoinPool pool;                // the pool this thread works infinal ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics
}

pool属性表示这个进行归并计算的线程所属的ForkJoinPool实例,workQueue属性是java.util.concurrent.ForkJoinPool.WorkQueue这个类的实例,它表示这个线程所使用的子任务待执行队列,而且可以被其它工作线程偷取任务。

任务的提交

ForkJoinPool提供execute和invoke、submit方法來提交任務:

  • submit:提交任务并返回任务
  • execute:只提交任务
  • invoke:提交并返回任务结果(return task.join())
public <T> T invoke(ForkJoinTask<T> task) {externalPush(task);return task.join();
}

private void externalSubmit(ForkJoinTask<?> task) {int r;                                   if ((r = ThreadLocalRandom.getProbe()) == 0) {// 取得一个随机探查数ThreadLocalRandom.localInit();r = ThreadLocalRandom.getProbe();}for (;;) {WorkQueue[] ws; WorkQueue q; int rs, m, k;boolean move = false;if ((rs = runState) < 0) { // 如果已经结束tryTerminate(false, false);    throw new RejectedExecutionException();}// 如果条件成立,就说明当前ForkJoinPool类中,还没有任何队列,所以要进行队列初始化else if ((rs & STARTED) == 0 || ((ws = workQueues) == null || (m = ws.length - 1) < 0)) {//【A】int ns = 0;rs = lockRunState();try {if ((rs & STARTED) == 0) {// 通过原子操作,完成“任务窃取次数”这个计数器的初始化U.compareAndSwapObject(this, STEALCOUNTER, null, new AtomicLong());// 创建workQueue 数组int p = config & SMASK; // ensure at least 2 slotsint n = (p > 1) ? p - 1 : 1;n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;workQueues = new WorkQueue[n];ns = STARTED;}} finally {unlockRunState(rs, (rs & ~RSLOCK) | ns);//释放锁}}else if ((q = ws[k = r & m & SQMASK]) != null) {// 【C】if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {ForkJoinTask<?>[] a = q.array;int s = q.top;boolean submitted = false; // initial submission or resizingtry {                      // locked version of pushif ((a != null && a.length > s + 1 - q.base) ||(a = q.growArray()) != null) {int j = (((a.length - 1) & s) << ASHIFT) + ABASE;U.putOrderedObject(a, j, task);U.putOrderedInt(q, QTOP, s + 1);submitted = true;}} finally {U.compareAndSwapInt(q, QLOCK, 1, 0);}if (submitted) {signalWork(ws, q);return;}}move = true;                   // move on failure} else if (((rs = runState) & RSLOCK) == 0) { // 创建workQueue数组中的元素,workQueue即对象 //【B】q = new WorkQueue(this, null);q.hint = r;q.config = k | SHARED_QUEUE;q.scanState = INACTIVE;rs = lockRunState();           // publish indexif (rs > 0 &&  (ws = workQueues) != null && k < ws.length && ws[k] == null)ws[k] = q;                 // else terminatedunlockRunState(rs, rs & ~RSLOCK);}elsemove = true;                   // move if busyif (move)r = ThreadLocalRandom.advanceProbe(r);}
}

首先检查运行状态是否已经进入SHUTDOWN,抛出拒收的异常。

  1. 第一次循环中,运行状态还没有STARTED,执行A】分支。进行初始化操作,设置原子对象stealCounter,按2的幂设置WorkQueue[]的长度,然后运行状态进入STARTED
  2. 第二次循环中,workQueues不为空,继续执行,先执行【C】分支的判断语句。随机取一个小于workQueues长度的偶数(不一定是0),赋值给k,然后判断workQueues[k]是否为空,显然第二次循环中为空,因此最终跳入【B】分支,创建第一个WorkQueue。
  3. 第三次循环中,执行【C】分支,会找到刚才创建的WorkQueue,从队列的top端加入任务,signalWork激活或者创建worker。

工作线程

work线程的管理包括对线程的创建、唤醒、注册、撤销。当创建第一个WorkQueue并加入第一个任务,调用了signalWork,入参是WorkQueue[]和当前操作的WorkQueue。

final void signalWork(WorkQueue[] ws, WorkQueue q) {long c; int sp, i; WorkQueue v; Thread p;while ((c = ctl) < 0L) {                       // too few activeif ((sp = (int)c) == 0) {                  // no idle workersif ((c & ADD_WORKER) != 0L)            // too few workerstryAddWorker(c);break;}if (ws == null)                            // unstarted/terminatedbreak;if (ws.length <= (i = sp & SMASK))         // terminatedbreak;if ((v = ws[i]) == null)                   // terminatingbreak;int vs = (sp + SS_SEQ) & ~INACTIVE;        // next scanStateint d = sp - v.scanState;                  // screen CASlong nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {v.scanState = vs;                      // activate vif ((p = v.parker) != null)U.unpark(p);break;}if (q != null && q.base == q.top)          // no more workbreak;}
}

回忆下ctl的四个部分的含义,初始ctl为负数时(AC是parallelism的负数),线程还没有达到目标数量,因此进入循环。接着取ctl的低32位,构造函数中只是初始化了高32位,低32位为0,继续往下走。ADD_WORKER是什么呢?这个常量其实就是用来辅助判断线程是否已经达到阈值。

private void tryAddWorker(long c) {boolean add = false;do {long nc = ((AC_MASK & (c + AC_UNIT)) |(TC_MASK & (c + TC_UNIT)));if (ctl == c) {int rs, stop;                 // check if terminatingif ((stop = (rs = lockRunState()) & STOP) == 0)add = U.compareAndSwapLong(this, CTL, c, nc);unlockRunState(rs, rs & ~RSLOCK);if (stop != 0)break;if (add) {createWorker();break;}}} while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
}

tryAddWork中讲AC和TC数量加1,如果操作成功,即add为true,然后才真正的创建工作线程。
这里也是比较直观的,通过工厂类来创建工作线程,如果创建成功,则启动线程。并返回。如果失败,则撤销,撤销的逻辑也就是删除工作线程的任务队列,将AC和TC减1.

private boolean createWorker() {ForkJoinWorkerThreadFactory fac = factory;Throwable ex = null;ForkJoinWorkerThread wt = null;try {if (fac != null && (wt = fac.newThread(this)) != null) {wt.start();return true;}} catch (Throwable rex) {ex = rex;}deregisterWorker(wt, ex);return false;
}

我们着重分下工作线程的创建过程。也就是fac.newThread(this),newThread方法内部调用了 ForkJoinWorkerThread的构造方法。

protected ForkJoinWorkerThread(ForkJoinPool pool) {// Use a placeholder until a useful name can be set in registerWorkersuper("aForkJoinWorkerThread");this.pool = pool;this.workQueue = pool.registerWorker(this);
}

在ForkJoinWorkerThread的构造方法中进行了注册操作,那么到底注册什么呢?我们知道每一个线程都会有一个workQueue,并且workQueue在数组中下表为奇数。因此在这里是创建一个workQueue和该线程关联。

执行任务

当工作线程启动了之后会运行ForkJoinWorkerThread重写的run方法

 public void run() {onStart(); //空实现pool.runWorker(workQueue);onTermination(exception);
}
final void runWorker(WorkQueue w) {w.growArray();                   // allocate queueint seed = w.hint;               // initially holds randomization hintint r = (seed == 0) ? 1 : seed;  // avoid 0 for xorShiftfor (ForkJoinTask<?> t;;) {if ((t = scan(w, r)) != null)w.runTask(t);else if (!awaitWork(w, r))break;r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift}
}

这里有三个重点,scan:尝试获取一个任务,runTask:执行取得的任务,awaitWork:没有任务进入等待。如果awaitWork返回false,等不到任务,跳出runWorker的循环,回到run中执行finally,最后调用deregisterWorker撤销注册。

ForkJoinPool 最适合的是计算密集型的任务,如果存在 I/O,线程间同步,sleep() 等会造成线程长时间阻塞的情况时,最好配合使用 ManagedBlocker。Fork/Join又和线程池(ExecutorService)有些类似,它可以看做是线程池的补充,只适用于特殊的场景。

ForkJoinPool内部使用的是“工作窃取”算法实现

  • 每个工作线程都有自己的工作队列WorkQueue;
  • 这是一个双端队列,它是线程私有的;
  • ForkJoinTask中fork的子任务,将放入运行该任务的工作线程的队头,工作线程将以LIFO的顺序来处理工作队列中的任务;
  • 为了最大化地利用CPU,空闲的线程将从其它线程的队列中“窃取”任务来执行;
  • 从工作队列的尾部窃取任务,以减少竞争;
  • 双端队列的操作:push()/pop()仅在其所有者工作线程中调用,poll()是由其它线程窃取任务时调用的;
  • 当只剩下最后一个任务时,还是会存在竞争,是通过CAS来实现的;

ForkJoin的应用案例分析

在后端系统的业务开发中,可用做权限校验,批量定时任务状态刷新等各种功能场景:

如上图,假设数据的主键id分段如下,数据场景可能是数据源的连接信息,或者产品有效期类似业务,都可以基于线程池任务处理:

权限校验:基于数据源的连接信息,判断数据源是否可用,例如:判断连接是否可用,用户是否有库表的读写权限,在数据源多的情况下,基于线程池快速校验。

状态刷新:在定时任务中,经常见到状态类的刷新操作,例如判断产品是否在有效期范围内,在有效期范围之外,把数据置为失效状态,都可以利用线程池快速处理。

Fork/Join的陷阱与注意事项

避免不必要的fork():划分成两个子任务后,不要同时调用两个子任务的fork()方法。表面上看上去两个子任务都fork(),然后join()两次似乎更自然。但事实证明,直接调用compute()效率更高。因为直接调用子任务的compute()方法实际上就是在当前的工作线程进行了计算(线程重用),这比“将子任务提交到工作队列,线程又从工作队列中拿任务”快得多。

当一个大任务被划分成两个以上的子任务时,尽可能使用前面说到的三个衍生的invokeAll方法,因为使用它们能避免不必要的fork()。

注意fork()、compute()、join()的顺序

为了两个任务并行,三个方法的调用顺序需要万分注意。

right.fork(); // 计算右边的任务
long leftAns = left.compute(); // 计算左边的任务(同时右边任务也在计算)
long rightAns = right.join(); // 等待右边的结果
return leftAns + rightAns;

如果我们写成:

left.fork(); // 计算完左边的任务
long leftAns = left.join(); // 等待左边的计算结果
long rightAns = right.compute(); // 再计算右边的任务
return leftAns + rightAns;
long rightAns = right.compute(); // 计算完右边的任务
left.fork(); // 再计算左边的任务
long leftAns = left.join(); // 等待左边的计算结果
return leftAns + rightAns;

选择合适的子任务粒度

选择划分子任务的粒度(顺序执行的阈值)很重要,因为使用Fork/Join框架并不一定比顺序执行任务的效率高:如果任务太大,则无法提高并行的吞吐量;如果任务太小,子任务的调度开销可能会大于并行计算的性能提升,我们还要考虑创建子任务、fork()子任务、线程调度以及合并子任务处理结果的耗时以及相应的内存消耗。
官方给出的粗略经验是:任务应该执行100~10000个基本的计算步骤。决定子任务的粒度的最好办法是实践,通过实际测试结果来确定这个阈值才是“上上策”。和其他Java代码一样,Fork/Join框架测试时需要“预热”或者说执行几遍才会被JIT优化,所以测试性能之前跑几遍程序很重要。

避免重量级任务划分与结果合并

Fork/Join的很多使用场景都用到数组或者List等数据结构,子任务在某个分区中运行,最典型的例子如并行排序和并行查找。拆分子任务以及合并处理结果的时候,应该尽量避免System.arraycopy这样耗时耗空间的操作,从而最小化任务的处理开销。

异常处理

Java的受检异常机制一直饱受诟病,所以在ForkJoinTask的invoke()join()方法及其衍生方法中都没有像get()方法那样抛出个ExecutionException的受检异常。所以你可以在ForkJoinTask中看到内部把受检异常转换成了运行时异常。

static void rethrow(Throwable ex) {if (ex != null)ForkJoinTask.<RuntimeException>uncheckedThrow(ex);
}@SuppressWarnings("unchecked")
static <T extends Throwable> void uncheckedThrow(Throwable t) throws T {throw (T)t; // rely on vacuous cast
}

但不可否认的是invoke、join()仍可能会抛出运行时异常,所以ForkJoinTask还提供了两个不提取结果和异常的方法quietlyInvoke()、quietlyJoin(),这两个方法允许你在所有任务完成后对结果和异常进行处理。使用quitelyInvoke()和quietlyJoin()时可以配合isCompletedAbnormally()和isCompletedNormally()方法使用。

  • ForkJoinPool特别适合于“分而治之”算法的实现;
  • ForkJoinPool和ThreadPoolExecutor是互补的,不是谁替代谁的关系,二者适用的场景不同;
  • ForkJoinTask有两个核心方法——fork()和join(),有三个重要子类——RecursiveAction、RecursiveTask和CountedCompleter;
  • ForkjoinPool内部基于“工作窃取”算法实现;
  • 每个线程有自己的工作队列,它是一个双端队列,自己从队列头存取任务,其它线程从尾部窃取任务;
  • ForkJoinPool最适合于计算密集型任务,但也可以使用ManagedBlocker以便用于阻塞型任务;
  • RecursiveTask内部可以少调用一次fork(),利用当前线程处理,这是一种技巧;

ThreadPoolExecutor有什么优缺点?

ThreadPoolExecutor 对于任务会先交给核心线程去执行,如果核心线程数不够则会放入队列,线程池中的线程统一去任务队列中获取任务并执行,我们可以从这样的设计中很明显的看出来:ThreadPoolExecutor执行时间不确定的任务类型,比如说:网络IO操作、定时任务等

ForkJoinPool 则是一个线程对应一个任务队列,每个任务正常来说只需要执行自己的任务;如果线程对应的任务列表为空的时候,会在随机窃取其他线程任务列表中的任务(为减少获取任务竞争,会从任务另一端获取任务),当然上图没有表示出来。这样既减少了任务竞争,又能充分利用CPU资源。我们可以从这样的设计中很明显的看出来:ForkJoinPool非常适合执行任务比较多、执行时间比较短的程序,比如过滤集合中的元素(JDK1.8 stream底层就是ForkJoinPool哟)

最后补充: ForkJoinPool 的出现不是为了替换ThreadPoolExecutor这一类的线程池,而是为了做功能上的补充,两者各有使用场景,根据不同的场景使用不同的线程池即可

什么时候使用ForkJoinPool

  • 引用 1.2 中ForkJoinPool 分析和补充部分:ForkJoinPool非常适合执行任务比较多、执行事件比较短的程序,比如过滤集合中的元素(JDK1.8 stream底层就是ForkJoinPool哟);
  • ForkJoinPool 的出现不是为了替换ThreadPoolExecutor这一类的线程池,而是为了做功能上的补充,两者各有使用场景,根据不同的场景使用不同的线程池即可。
// 测试方法
public class ForkJoinPoolLearn {public final static String CONTENT = "哇,好帅哟!哇,是啊,我好喜欢呢!哇,可否给个签名呢?";public static final int THRESHHOLD = 5;public static List<String> BLACK_WORDS = new ArrayList<>();static {BLACK_WORDS.add("哇");}public static void main(String[] args) {//使用ForkJoinPool来执行任务// 有返回值对象System.out.println("即将测试有返回值对象。。。");ForkJoinPool forkJoinPool = new ForkJoinPool();MyRecursiveTask myRecursiveTask = new MyRecursiveTask(0, ForkJoinPoolLearn.CONTENT.length(), Arrays.asList(ForkJoinPoolLearn.CONTENT.split("")));Integer value = forkJoinPool.invoke(myRecursiveTask);System.out.println(String.format("字符串:%s 中包含违禁词数量:%s,违禁词:%s", CONTENT, value, StringUtils.join(BLACK_WORDS, ",")));}
}// 提交任务类
public class MyRecursiveTask extends RecursiveTask<Integer> {private int startIndex;private int endIndex;private List<String> words;public MyRecursiveTask(int startIndex, int endIndex, List<String> words) {this.startIndex = startIndex;this.endIndex = endIndex;this.words = words;}@Overrideprotected Integer compute() {int sum = 0;if ((endIndex - startIndex) <= ForkJoinPoolLearn.THRESHHOLD) {// 如果长度不可再分割,则开始做过滤for (int i = startIndex; i < words.size() && i < endIndex; i++) {String word = words.get(i);if (ForkJoinPoolLearn.BLACK_WORDS.contains(word)) {sum += 1;}}} else {// 如果长度过长,fork为两个任务来处理int middle = (startIndex + endIndex) / 2;MyRecursiveTask left = new MyRecursiveTask(startIndex, middle, words);MyRecursiveTask right = new MyRecursiveTask(middle, endIndex, words);left.fork();right.fork();Integer leftValue = left.join();Integer rightValue = right.join();sum = leftValue + rightValue;}return sum;// 返回计算后的值}
}

ForkJoinPool的整体执行流程,下文的源码分析会结合该图详细介绍,介绍完毕之后,最后补充具体的方法级别调用图。

工作窃取算法

Fork 就是把一个大任务切分为若干个子任务并行地执行,Join 就是合并这些子任务的执行结果,最后得到这个大任务的结果。Fork/Join 框架使用的是工作窃取算法。

工作窃取算法是指某个线程从其他队列里窃取任务来执行。对于一个比较大的任务,可以把它分割为若干个互不依赖的子任务,为了减少线程间的竞争,把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应。但是,有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务需要处理,于是它就去其他线程的队列里窃取一个任务来执行。由于此时它们访问同一个队列,为了减小竞争,通常会使用双端队列。被窃取任务的线程永远从双端队列的头部获取任务,窃取任务的线程永远从双端队列的尾部获取任务。

  • 优点:充分利用线程进行并行计算,减少了线程间的竞争。
  • 缺点:双端队列只存在一个任务时会导致竞争,会消耗更多的系统资源,因为需要创建多个线程和多个双端队列。

使用 ForkJoinPool 进行分叉和合并

ForkJoinPool 在 Java 7 中被引入。它和 ExecutorService 很相似,除了一点不同。ForkJoinPool 让我们可以很方便地把任务分裂成几个更小的任务,这些分裂出来的任务也将会提交给 ForkJoinPool。任务可以继续分割成更小的子任务,只要它还能分割。可能听起来有些抽象,因此本节中我们将会解释 ForkJoinPool 是如何工作的,还有任务分割是如何进行的。

一个使用了分叉和合并原理的任务可以将自己分叉(分割)为更小的子任务,这些子任务可以被并发执行。如下图所示:

  • 通过把自己分割成多个子任务,每个子任务可以由不同的 CPU 并行执行,或者被同一个 CPU 上的不同线程执行。
  • 只有当给的任务过大,把它分割成几个子任务才有意义。把任务分割成子任务有一定开销,因此对于小型任务,这个分割的消耗可能比每个子任务并发执行的消耗还要大。
  • 什么时候把一个任务分割成子任务是有意义的,这个界限也称作一个阀值。这要看每个任务对有意义阀值的决定。很大程度上取决于它要做的工作的种类。

合并

当一个任务将自己分割成若干子任务之后,该任务将进入等待所有子任务的结束之中。一旦子任务执行结束,该任务可以把所有结果合并到同一个结果。图示如下:

当然,并非所有类型的任务都会返回一个结果。如果这个任务并不返回一个结果,它只需等待所有子任务执行完毕。也就不需要结果的合并啦。

四、生产者-消费者模式

五、Future设计模式

博文参考

并发编程——Forkjoin设计模式原理相关推荐

  1. 冰河最新出版的《深入理解高并发编程:核心原理与案例实战》到底讲了些啥?(视频为证)

    大家好,我是冰河~~ 最近有很多小伙伴问我:<深入理解高并发编程:核心原理与案例实战>这本书有没有目录.我:安排!这不,我连夜录制了这本书的整体内容,希望能够为小伙伴们带来实质性的帮助,直 ...

  2. 最强阿里面试126题:数据结构+并发编程+Redis+设计模式+微服务

    BAT技术面试范围 数据结构与算法:最常见的各种排序,最好能手写 Java高级:JVM内存结构.垃圾回收器.回收算法.GC.并发编程相关(多线程.线程池等).NIO/BIO.各种集合类的比较优劣势(底 ...

  3. Java知识全面总结:并发编程+JVM+设计模式+常用框架+....

    本文整理的Java知识体系主要包括基础知识,工具,并发编程,数据结构与算法,数据库,JVM,架构设计,应用框架,中间件,微服务架构,分布式架构等内容.同时也有作为程序员的一些思考,包含了作为一个Jav ...

  4. Java 知识全面总结:并发编程+JVM+设计模式+常用框架+....

    本文整理的Java知识体系主要包括基础知识,工具,并发编程,数据结构与算法,数据库,JVM,架构设计,应用框架,中间件,微服务架构,分布式架构等内容.同时也有作为程序员的一些思考,包含了作为一个Jav ...

  5. Java并发编程——ForkJoin详解

    概念 Fork/Join 框架是 Java7 提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架.类似于Java 8中的paralle ...

  6. Java并发编程—ThreadLocal底层原理

    作者:Java3y 链接:https://www.zhihu.com/question/341005993/answer/793627819 来源:知乎 著作权归作者所有.商业转载请联系作者获得授权, ...

  7. 并发编程--线程池原理

    阻塞队列和非阻塞队列 ConcurrentLinkedQueue类 适用于高并发场景下的队列,通过无锁的方式,实现了高并发状态下的高性能,通常ConcurrentLinkedQueue性能好于Bloc ...

  8. Java并发编程-synchronized底层原理

    synchronized底层原理与Monitor密切相关 1.Java对象头 以 32 位虚拟机为例 普通对象 对象的类型,如Student类型,Teacher类型等是由KlassWord来表示的,它 ...

  9. 【java并发编程】底层原理——用户态和内核态的区别

    一.背景--线程状态切换的代价 java的线程是映射到操作系统原生线程之上的,如果要阻塞或唤醒一个线程就需要操作系统介入,需要在户态与核心态之间切换,这种切换会消耗大量的系统资源,因为用户态与内核态都 ...

  10. Java架构技术文档:并发编程+设计模式+常用框架+JVM+精选视频

    本篇文章是我们整理的一份架构师的成长路线,包括了并发编程.设计模式.常用框架.中间件.微服务与分布式.常用工具.JVM.MySQL.数据结构与算法,还有架构师精选视频.架构师成长路线高清大图. 又是新 ...

最新文章

  1. Mac OS X10.11(OS X EI Capitan)安装程序下载
  2. HTML 5常用的交互元素————内容交互元素(2)
  3. 人月神话阅读笔记 03
  4. ORACLE AUDIT 审计
  5. 找出js里面改变cookies的代码
  6. GitHub配置SSH key
  7. linux玩安卓游戏下载,在Deepin 20.2系统下可用Xdroid on Linux来玩王者荣耀游戏
  8. Js同步加载图片资源
  9. TS+vue3 页面红色波浪线(和声明类型有关)
  10. 数字图像处理(1)-数字图像处理的基本步骤
  11. PV、UV、IP理解
  12. NVIDIA 为微软 Xbox One 游戏机发布 PhysX 支持
  13. mysql备份恢复与集群部署
  14. ssm餐厅线上点菜系统、点餐系统的设计与实现
  15. arp断网攻击解决办法
  16. ubuntu能连接wifi或手机USB共享热点,不能上网
  17. java 雷霆战机游戏 飞机大战 全过程教学+免费素材(附全部源代码)
  18. 什么叫磁场强度、磁通势、磁阻、导磁率、电磁力、涡流?
  19. 【webpack】前端工程化与webpack
  20. logisim文件的使用

热门文章

  1. [统计学理论基础] 置信区间
  2. 并发编程之AQS中的CLH队列
  3. 惠普台式计算机耳机插口,惠普耳机插在台式电脑上没声音,为什么
  4. Ubuntu gbd调试
  5. JS实现星星评分系统
  6. Sails框架知识点
  7. 计算机毕业设计(34)java毕设作品之医院预约挂号系统
  8. 【原创】基于JavaWeb的医院预约挂号系统(医院挂号管理系统毕业设计)
  9. 因子分析——流程与实现stata
  10. 应用StarRocks实现存储引擎的收敛,保障高查询并发及低延迟要求