ForkJoin简介及使用

ForkJoin框架是CompletableFuture和java8 stream使用到的框架。主要用于分片处理的场景。
可以通过自定义分片粒度来实现任务分解。并行处理数据。

CompletableFuture能够实现响应式编程。但未用到ForkJoin的分片。所以对于CompletableFuture来说,ForkJoin仅是一个公用的线程池而已

stream能让java处理数据更加优雅,并行流stream使用了ForkJoin的分片处理(参考java.util.stream.ForEachTask等类)

ForkJoin源码分析

从CompletableFuture的使用进入

CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {System.out.println("hello world");return 0;
});CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(() ->System.out.println("hello world")
);completableFuture1 = CompletableFuture.runAsync(() ->System.out.println("hello world"), Executors.newSingleThreadExecutor());completableFuture.get();

CompletableFuture提供了提交Runnable和Callable任务的接口。分别对应runAsync(Runnable r)和supplyAsync(Callable c)。runAsync(Runnable r)有一个重载方法runAsync(Runnable r, Executor e) 支持自定义线程池。如果调用runAsync(Runnable r)方法,则代表使用ForkJoinPool作为线程池。

ForkJoinPool源码剖析

public static CompletableFuture<Void> runAsync(Runnable runnable) {return asyncRunStage(asyncPool, runnable);
}

asyncPool参数解析

// ForkJoinPool.parallelism默认为可用CPU的数量
// parallelism可以自行设置
private static final boolean useCommonPool =(ForkJoinPool.getCommonPoolParallelism() > 1);// 当parallelism并行度等于1(不并行处理)时使用ThreadPerTaskExecutor线程池
private static final Executor asyncPool = useCommonPool ?ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();// ForkJoinPool每个任务可能有多个线程进行处理
// ThreadPerTaskExecutor每个任务创建一个线程来进行处理
static final class ThreadPerTaskExecutor implements Executor {public void execute(Runnable r) { new Thread(r).start(); }
}

ForkJoinPool的类图

CompletableFuture#asyncRunStage方法内容

static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {if (f == null) throw new NullPointerException();CompletableFuture<Void> d = new CompletableFuture<Void>();// 这里调用的是ForkJoinPool#execute(Runnable task)方法e.execute(new AsyncRun(d, f));return d;
}

AsyncRun继承了ForkJoinTask类,实现了Runnable接口

static final class AsyncRun extends ForkJoinTask<Void>implements Runnable, AsynchronousCompletionTask

ForkJoinPool#execute(Runnable task)方法

public void execute(Runnable task) {if (task == null)throw new NullPointerException();ForkJoinTask<?> job;if (task instanceof ForkJoinTask<?>) // avoid re-wrap// 从CompletableFuture提交的任务会走到这job = (ForkJoinTask<?>) task;elsejob = new ForkJoinTask.RunnableExecuteAction(task);// 将任务放入工作队列externalPush(job);
}

ForkJoinPool#externalPush(ForkJoinTask<?> task)方法,将任务放入工作队列

final void externalPush(ForkJoinTask<?> task) {WorkQueue[] ws; WorkQueue q; int m;int r = ThreadLocalRandom.getProbe();int rs = runState;// 如果工作队列不为空(ws = workQueues) != null && (m = (ws.length - 1)) >= 0// q = ws[m & r & SQMASK],q为ws中随机的偶数索引。// SQMASK为0x007e 126二进制位为11111110所以q的结果永远为偶数// U.compareAndSwapInt(q, QLOCK, 0, 1)加锁,QLOCK为第q个WorkQueue对象的锁的引用if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&(q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&U.compareAndSwapInt(q, QLOCK, 0, 1)) {ForkJoinTask<?>[] a; int am, n, s;if ((a = q.array) != null &&(am = a.length - 1) > (n = (s = q.top) - q.base)) {int j = ((am & s) << ASHIFT) + ABASE;// 把任务放入WorkQueue[q].array的队尾U.putOrderedObject(a, j, task);// 队列的top++;U.putOrderedInt(q, QTOP, s + 1);// 解锁U.putIntVolatile(q, QLOCK, 0);// 处于active的工作线程不够则新建work来if (n <= 1)signalWork(ws, q);return;}// 解锁U.compareAndSwapInt(q, QLOCK, 1, 0);}// 如果工作队列为空,或者加锁失败,则初始化工作队列后提交任务externalSubmit(task);
}
private void externalSubmit(ForkJoinTask<?> task) {int r;                                    // initialize caller's probeif ((r = ThreadLocalRandom.getProbe()) == 0) {ThreadLocalRandom.localInit();r = ThreadLocalRandom.getProbe();}// 循环退出条件// 抛出RejectedExecutionException异常(runState为关闭状态)// 任务提交成功for (;;) {WorkQueue[] ws; WorkQueue q; int rs, m, k;boolean move = false;if ((rs = runState) < 0) {tryTerminate(false, false);     // help terminatethrow new RejectedExecutionException();}// 没初始化,就初始化WorkQueueelse if ((rs & STARTED) == 0 ||     // initialize((ws = workQueues) == null || (m = ws.length - 1) < 0)) {int ns = 0;// 加锁,锁标志位低位置为1, rs |= 1;rs = lockRunState();try {// 没初始化,则新建一个WorkQueueif ((rs & STARTED) == 0) {U.compareAndSwapObject(this, STEALCOUNTER, null,new AtomicLong());// 创建一个size为2的n次方的WorkQueue数组int p = config & SMASK; // ensure at least 2 slotsint n = (p > 1) ? p - 1 : 1;n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;workQueues = new WorkQueue[n];// 将置为已初始化状态ns = STARTED;}} finally {// 解锁并设置runState为已初始化状态unlockRunState(rs, (rs & ~RSLOCK) | ns);}}// 判断该任务所在WorkQueue是否为空else if ((q = ws[k = r & m & SQMASK]) != null) {if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {ForkJoinTask<?>[] a = q.array;int s = q.top;boolean submitted = false; // initial submission or resizingtry {                      // locked version of pushif ((a != null && a.length > s + 1 - q.base) ||(a = q.growArray()) != null) {int j = (((a.length - 1) & s) << ASHIFT) + ABASE;U.putOrderedObject(a, j, task);U.putOrderedInt(q, QTOP, s + 1);submitted = true;}} finally {U.compareAndSwapInt(q, QLOCK, 1, 0);}// 任务提交成功if (submitted) {// 如果worker太少,则创建worker,没有空闲的worker则啥也不干。signalWork(ws, q);return;}}// 未提交成功则继续此循环move = true;                   // move on failure}else if (((rs = runState) & RSLOCK) == 0) { // create new queueq = new WorkQueue(this, null);q.hint = r;q.config = k | SHARED_QUEUE;q.scanState = INACTIVE;rs = lockRunState();           // publish indexif (rs > 0 &&  (ws = workQueues) != null &&k < ws.length && ws[k] == null)ws[k] = q;                 // else terminatedunlockRunState(rs, rs & ~RSLOCK);}elsemove = true;                   // move if busyif (move)r = ThreadLocalRandom.advanceProbe(r);}
}

ForkJoin框架源码分析(详细)相关推荐

  1. Spark RPC框架源码分析(二)RPC运行时序

    前情提要: Spark RPC框架源码分析(一)简述 一. Spark RPC概述 上一篇我们已经说明了Spark RPC框架的一个简单例子,Spark RPC相关的两个编程模型,Actor模型和Re ...

  2. php+yii框架,yii框架源码分析(一)

    yii框架源码分析(一) 本文将对yii中的mvc,路由器,filter,组件机制等最主要的部分进行自己的一点浅析,力求说明自己做一个php mvc不是那么的遥不可及,其实是很简单的. 源码基于yii ...

  3. S3C24XX DMA框架源码分析

    基于S3C2440 的DMA 框架源码分析 基于S3C2440 的DMA 框架源码分析 二寻根溯源 1 设备类的注册 2 s3c2410_dma_init 3 s3c24xx_dma_order_se ...

  4. golang gin框架源码分析(二)---- 渐入佳境 摸索Engine ServeHTTP访问前缀树真正原理

    文章目录 全系列总结博客链接 前引 golang gin框架源码分析(二)---- 渐入佳境 摸索Engine ServeHTTP访问前缀树真正远原理 1.再列示例代码 从示例代码入手 2.r.Run ...

  5. Linux驱动修炼之道-SPI驱动框架源码分析(上)

    Linux驱动修炼之道-SPI驱动框架源码分析(上)   SPI协议是一种同步的串行数据连接标准,由摩托罗拉公司命名,可工作于全双工模式.相关通讯设备可工作于m/s模式.主设备发起数据帧,允许多个从设 ...

  6. Android框架源码分析——从设计模式角度看 Retrofit 核心源码

    Android框架源码分析--从设计模式角度看 Retrofit 核心源码 Retrofit中用到了许多常见的设计模式:代理模式.外观模式.构建者模式等.我们将从这三种设计模式入手,分析 Retrof ...

  7. SPI驱动框架源码分析

     SPI驱动框架源码分析 2013-04-12 16:13:08 分类: LINUX SPI驱动框架源码分析 SPI协议是一种同步的串行数据连接标准,由摩托罗拉公司命名,可工作于全双工模式.相关通讯设 ...

  8. Java集合类框架源码分析 之 LinkedList源码解析 【4】

    上一篇介绍了ArrayList的源码分析[点击看文章],既然ArrayList都已经做了介绍,那么作为他同胞兄弟的LinkedList,当然必须也配拥有姓名! Talk is cheap,show m ...

  9. leaf框架源码分析

    leaf框架源码分析 近来阅读leaf框架的代码,有些感悟,特来记录一番.既是一个总结,又是对后来阅读者的一个启发. 个人看代码的一个习惯,喜欢有上下文,因此会各个代码文件之间相互乱窜.但多看几次也就 ...

最新文章

  1. sublime配置攻略
  2. 李雷和韩梅梅的一次转账事务–事务系统概述
  3. python抢票_50 个加速包都抢不到车票,还不如这个 Python 抢票神器!
  4. GUI(概述和布局)
  5. Jasypt 加密-引言
  6. python二分法排序_python实现快速排序的示例(二分法思想)
  7. 东汉才女班昭:中国最早的女数学家
  8. Android相关资源
  9. 前后端整合---js对象方法---异步组件
  10. mysql 分页_MySQL如何实现分页查询
  11. 苹果笔记本摄像头Linux驱动,苹果发布Macbook摄像头驱动更新 更适配window10
  12. python版使用tinypng压缩图片大小
  13. JavaScript动态创建DOM(七)
  14. 协议栈之packet_type
  15. EF Core 日志跟踪sql语句
  16. jmeter学习:如何使用jmeter自动发帖
  17. 照着这本“书”,3年量产自动驾驶卡车
  18. 2021爱智先行者—精灵1号的体验分享
  19. 鉴智机器人:以视觉3D理解为核心的下一代自动驾驶系统
  20. PCB ODB++(Gerber)图形绘制实现方法

热门文章

  1. 新手如何快速入门Python(菜鸟必看篇)
  2. 云演CTF刷题 lfi
  3. 拿下18Koffer,黑马老学长分享了4点学习建议!
  4. 查找图形图斑中的空洞
  5. html足球球面插件,三维效果的黄金足球球面揭示开场片头AE模板
  6. RAD0.1 RB.1/.2
  7. BZOJ 4589 Hard Nim
  8. 增加打字音效让码字变成一种享受
  9. win11,google chrome没有声音怎么办
  10. Math.Round函数