前言

Java 8给大家带来了一个非常便捷的多线程工具:并行流,一改往日Java多线程繁琐的编程规范,只需要一行代码,就可以让一个多线程跑起来,似乎让很多人忘记了被多线程支配的恐惧,这篇文章给大家分享一个真实的生产故障,由于在消费消息的处理器中使用了Java 8的并行流,导致集群消费消息的能力急速下降,造成线上消息堆积,引发故障。可能有朋友会好奇,到底是什么场景让并行流起了反作用?

并行流执行速度一定比串行快吗?

答案是:不一定

我把产生线上问题的代码抽象成下面的示例代码:

void testParallelStream() throws InterruptedException {ExecutorService threadPool = new ThreadPoolExecutor(50, 200, 20, TimeUnit.MINUTES, new ArrayBlockingQueue<>(1000),new ThreadFactoryBuilder().setNameFormat("test-parallel-thread").build(), new ThreadPoolExecutor.CallerRunsPolicy());Long time1 = System.currentTimeMillis();// 1. 多线程+foreach执行时长for (int i = 0; i < ARRAY_LENGTH; i++) {CommonExecutor commonExecutor = new CommonExecutor();commonExecutor.array = arrays[i];threadPool.submit(commonExecutor);}commonCountDownLatch.await();Long time2 = System.currentTimeMillis();System.out.println("for循环耗时: " + (time2 - time1));threadPool = new ThreadPoolExecutor(50, 200, 20, TimeUnit.MINUTES, new ArrayBlockingQueue<>(1000),new ThreadFactoryBuilder().setNameFormat("test-parallel-thread").build(), new ThreadPoolExecutor.CallerRunsPolicy());// 2. 多线程+并行流执行时长for (int i = 0; i < ARRAY_LENGTH; i++) {ParallelStreamExecutor parallelStreamExecutor = new ParallelStreamExecutor();parallelStreamExecutor.array = arrays[i];threadPool.submit(parallelStreamExecutor);}parallelCountDownLatch.await();Long time3 = System.currentTimeMillis();System.out.println("并行流耗时: " + (time3 - time2));
}

其中两次提交给线程池的执行器如下所示:

@Data
private static class CommonExecutor implements Runnable {private long[] array;@Overridepublic void run() {// 选择排序法进行排序for (int i = 0; i < array.length; i++) {array[i] = i * i;}commonCountDownLatch.countDown();}
}
@Data
private static class ParallelStreamExecutor implements Runnable {private long[] array;@Overridepublic void run() {// 选择排序法进行排序IntStream.range(0, array.length).parallel().forEach(i -> array[i] = i * i);parallelCountDownLatch.countDown();}}

这段代码的思路非常简单,就是对一个二维数组arrays的每一行,计算其列下标的平方数,并且回填到数组中,只不过这个过程是通过线程池去完成的,提交给线程池的执行器有两种,一种是普通的for循环,通过游标遍历每一个元素的下标,并计算平方数。另一种使用了并行流去完成同样的事情。简单起见,我们把这段代码循环执行10次,并统计了每次两种实现方式的耗时(单位是毫秒),大家可以猜猜看,到底哪种实现方式更快。

下面是真实的耗时记录:

执行次数 1 2 3 4 5 6 7 8 9 10 平均耗时
for循环耗时 18 17 13 18 17 13 13 16 20 16 16.1
并行流耗时 32 41 38 59 51 34 53 57 49 47 46.1

执行的结果竟然是并行流的执行速度明显慢于for循环,到底是哪里出现问题了呢?

并行流的实现原理

其实问题就出现在并行流的实现上,同一个进程中提交给并行流的Action都会被同一个公共的线程池处理。也就是说上文构造的代码无论线程池threadPool的线程数开到多大,最终实际处理Action的线程数都由并行流的公共线程池大小决定,这一点我们可以从并行流的源码上看个大概:

@Override
@SuppressWarnings("unchecked")
public final S parallel() {sourceStage.parallel = true;return (S) this;
}

调用parallel只会将java.util.stream.AbstractPipeline中的sourceStage.parallel置为true,到调用foreach的时候,会调用到下面这个方法

final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {assert getOutputShape() == terminalOp.inputShape();if (linkedOrConsumed)throw new IllegalStateException(MSG_STREAM_LINKED);linkedOrConsumed = true;return isParallel()? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())): terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}

这里的isParallel()就会判断上面设置的sourceStage.parallel字段,从而使程序的执行流程走到terminalOp.evaluateParallel这个分支,再往后跟的话会发现最终任务会提交到ForEachTask

@Override
public <S> Void evaluateParallel(PipelineHelper<T> helper,Spliterator<S> spliterator) {if (ordered)new ForEachOrderedTask<>(helper, spliterator, this).invoke();elsenew ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke();return null;
}

ForEachTask这里稍微提一嘴,是java 1.7引入的一个轻量级多线程任务,逻辑还是比较多的,后面有机会我们再看下它的实现原理,通过断点跟进去,发现最后提交的任务都会调用到ForEachTaskcompute方法

public void compute() {// 以我们初始提交的任务为例spliterator的类型是一个RangeIntSpliterator,其中from = 0, upTo = 10000, last = 0Spliterator<S> rightSplit = spliterator, leftSplit;// estimateSize = upTo - from + lastlong sizeEstimate = rightSplit.estimateSize(), sizeThreshold;if ((sizeThreshold = targetSize) == 0L)// 目标大小会根据上文提到的公共线程池计算,值等于 sizeEstimate/线程池大小 * 4targetSize = sizeThreshold = AbstractTask.suggestTargetSize(sizeEstimate);boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags());boolean forkRight = false;Sink<S> taskSink = sink;ForEachTask<S, T> task = this;// 【核心逻辑】进入任务切分逻辑,while (!isShortCircuit || !taskSink.cancellationRequested()) {// 切分直至子任务的大小小于阈值if (sizeEstimate <= sizeThreshold ||// trySplit()会将rightSplit等比例切分,并返回切分的第一个子任务,切分比例跟待切分的任务总数相关// 如果待切分的子任务大小小于等于1,则返回null,停止切分(leftSplit = rightSplit.trySplit()) == null) {task.helper.copyInto(taskSink, rightSplit);break;}ForEachTask<S, T> leftTask = new ForEachTask<>(task, leftSplit);task.addToPendingCount(1);ForEachTask<S, T> taskToFork;// 这里通过forkRight控制本线程切割任务的顺序是// 左->右->左->右->左->右直至子任务大小满足阈值,这样可以让整个任务执行更离散// 关于这样做的好处也欢迎大家在评论区讨论if (forkRight) {forkRight = false;rightSplit = leftSplit;taskToFork = task;task = leftTask;}else {forkRight = true;taskToFork = leftTask;}// 通过fork将将切分好的子任务提交到线程池taskToFork.fork();sizeEstimate = rightSplit.estimateSize();}task.spliterator = null;task.propagateCompletion();
}

这段代码是整个并行流实现的核心逻辑,其本质就是将刚开始提交的串行大任务切分成更小的任务提交到线程池,并行流的秘密就藏在这段代码中:

public final ForkJoinTask<V> fork() {Thread t;if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)((ForkJoinWorkerThread)t).workQueue.push(this);elseForkJoinPool.common.externalPush(this);return this;
}

这里涉及到ForkJoinPool的一个设计,为了避免常规线程池中各个线程访问任务队列产生竞争,ForkJoinPool除了有一个公共的任务队列之外,每个线程自身还持有一个任务队列,外部线程需要提交任务到公共队列,线程池线程切分的更小的任务则直接提交到自身的工作队列中,因此就有了上面看到的这段逻辑。整个ForkJoinPool的逻辑如下图所示:

  • 其中提交给共享队列的线程会被内部工作线程偷取

  • 私有工作队列中的任务通过fork切分成小任务后会将子任务push回私有队列

  • 如果工作线程有空闲,他还可以去偷取其他工作队列的任务


至于ForkJoinPool.common,就是我们上文一直提及的公共线程池,其初始化方法在ForkJoinPool的静态调用块中调用

private static ForkJoinPool makeCommonPool() {final ForkJoinWorkerThreadFactory commonPoolForkJoinWorkerThreadFactory =new CommonPoolForkJoinWorkerThreadFactory();int parallelism = -1;ForkJoinWorkerThreadFactory factory = null;UncaughtExceptionHandler handler = null;try {  // ignore exceptions in accessing/parsing propertiesString pp = System.getProperty// 可以通过设置这个值来改变公共线程池的大小("java.util.concurrent.ForkJoinPool.common.parallelism");String fp = System.getProperty("java.util.concurrent.ForkJoinPool.common.threadFactory");String hp = System.getProperty("java.util.concurrent.ForkJoinPool.common.exceptionHandler");if (pp != null)parallelism = Integer.parseInt(pp);if (fp != null)factory = ((ForkJoinWorkerThreadFactory)ClassLoader.getSystemClassLoader().loadClass(fp).newInstance());if (hp != null)handler = ((UncaughtExceptionHandler)ClassLoader.getSystemClassLoader().loadClass(hp).newInstance());} catch (Exception ignore) {}if (factory == null) {if (System.getSecurityManager() == null)factory = commonPoolForkJoinWorkerThreadFactory;else // use security-managed defaultfactory = new InnocuousForkJoinWorkerThreadFactory();}if (parallelism < 0 && // default 1 less than #cores// 获取线程池线程数量,其值等于当前可用处理器减一(parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)parallelism = 1;if (parallelism > MAX_CAP)parallelism = MAX_CAP;return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,"ForkJoinPool.commonPool-worker-");}

这段代码比较简单,就是返回了一个固定的线程池,线程池大小默认等于可用处理器减一,这是因为在ForkJoinPool的设计中外部线程也是可以参与到执行子任务的,这个看似巧妙的设计其实很容易误用,尤其是遇到跟线程状态相关的全局变量时。

并行流比串行更慢的原因

在了解了并行流的实现原理后我们也就能理解为什么在文章开头,针对同一段逻辑,并行流的执行反而比串行慢了。

当在多线程场景下使用并行流时,由于并行流使用的是一个公共的线程池,所以无论外部有多少个线程,这些线程都会把任务提交给同一个线程池,所以你会发现,无论你怎么调整外面线程池的大小,都不能使任务加速。回到文章刚开始的例子,采用并行流的实现中真实的线程数为7,而采用串行的实现中真实的线程数为100,由于线程数差别巨大,因此造成了最终的耗时也有很明显的差距。

总结

并行流的设计是比较讨巧的,其中有三个地方容易踩坑

  • 同一个进程提交给并行流的任务都会被同一个公共线程池处理,因此,如果在多线程的环境中使用了并行流,反而会降低并发,使得处理变慢

  • 并行流的公共线程池大小为可用处理器减一,并且并行流会使用外部线程去处理内部子任务,搭配ThreadLocal使用的时候务必要慎重,在一些与ThreadLocal强耦合的场景,可能会导致ThreadLocal误清理,其他线程相关的全局变量同理

  • 并行流的设计是为了应对计算密集型的场景的,如果有较多的IO场景,比如常见的RPC调用,在高并发的场景下会导致外部线程阻塞,引起外部线程数增多,且这类问题在测试的时候不容易发现,极易引起生产故障。

END

推荐好文

>>【练手项目】基于SpringBoot的ERP系统,自带进销存+财务+生产功能>>分享一套基于SpringBoot和Vue的企业级中后台开源项目,代码很规范!
>>能挣钱的,开源 SpringBoot 商城系统,功能超全,超漂亮!

啥?用了并行流还更慢了相关推荐

  1. 拥抱 Java 8 并行流吧,让执行速度飞起!

    点击上方蓝色"方志朋",选择"设为星标" 回复"666"获取独家整理的学习资料! 作者:后青春期的Keats cnblogs.com/kea ...

  2. 拥抱并行流,提高程序执行速度

    点击上方"方志朋",选择"设为星标" 回复"666"获取新整理的面试文章 作者:后青春期的Keats cnblogs.com/keatsCo ...

  3. parallelstream启动的线程数_谈谈并行流parallelStream

    一.parallelStream是什么 Java8中提供了能够更方便处理集合数据的Stream类,其中parallelStream()方法能够充分利用多核CPU的优势,使用多线程加快对集合数据的处理速 ...

  4. Java 8 - 正确高效的使用并行流

    文章目录 Pre 正确使用并行流,避免共享可变状态 高效使用并行流 流的数据源和可分解性 Pre Java 8 - 并行流计算入门 正确使用并行流,避免共享可变状态 错用并行流而产生错误的首要原因,就 ...

  5. 拥抱 Java 8 并行流吧,速度飞起!

    前言 在 Java7 之前,如果想要并行处理一个集合,我们需要以下几步: 手动分成几部分 为每部分创建线程 在适当的时候合并.并且还需要关注多个线程之间共享变量的修改问题. 而 Java8 为我们提供 ...

  6. java并行流 阻塞主线程_多线程入门案例与java8的并行流

    java8 实例请移步https://www.cnblogs.com/ngLee/p/14021859.html 进程与线程 进程是所有线程的集合,每一个线程是进程中的一条执行路径. 多线程的创建方式 ...

  7. fork join框架_Fork / Join框架vs.并行流vs.ExecutorService:最终的Fork / Join基准

    fork join框架 Fork / Join框架在不同配置下如何工作? 就像即将到来的<星球大战>(Star Wars)一样,围绕Java 8并行性的批评也充满了兴奋. 并行流的语法糖带 ...

  8. Fork / Join框架vs并行流vs.ExecutorService:最终的Fork / Join基准

    Fork / Join框架在不同配置下如何工作? 就像即将上映的<星球大战>一样,围绕Java 8并行性的批评也充满了兴奋. 并行流的语法糖带来了一些炒作,就像我们在预告片中看到的新型光剑 ...

  9. java foreach多线程_详解多线程入门案例与java8的并行流

    进程与线程 进程是所有线程的集合,每一个线程是进程中的一条执行路径. 多线程的创建方式,继承Thread实现Runable /*** 第一种创建线程的方式,继承Thread*/ public clas ...

最新文章

  1. gcc 与 glibc 的关系 glibc版本查看
  2. h2 不能访问localhost_个人学习系列 - Spring Boot 整合 H2
  3. 收藏一些效果炫酷的可视化网站
  4. 实现 VUE 中 MVVM - step10 - Computed
  5. c语言栈指针移动原理,C指针原理(4)-ATamp;T汇编
  6. Android NDK开发篇(四):Java与原生代码通信(原生方法声明与定义与数据类型)
  7. 终极解决maya渲染层丢材质,变线框等问题
  8. PDP上下文和PDP地址
  9. Bugku—凯撒部长的奖励
  10. Excel精选28个实用技巧实例学习
  11. java printout_word中printout函数的相关参数介绍 | 学步园
  12. 是非人生 — 一个菜鸟程序员的5年职场路 第24节
  13. RIDE 访问数据库
  14. TOJ 1335 优先队列
  15. apache php gzip压缩输出的实现方法
  16. 微软业务生产力平台基础架构优化(BPIO)之企业项目管理(EPM)解决方案
  17. 【TUM公开数据集RGBD-Benchmark工具evaluate_rpe.py参数用法原理解读】
  18. 虚拟DOM中的key
  19. GD32汽车诊断协议 ISO-9141测试
  20. 某博搜索话题采集分析Python爬虫

热门文章

  1. 一加7T Pro最新渲染图曝光:背部有小改动
  2. 佩奇,是你吗?曝新款AirPods外观酷似吹风机
  3. 第七代微软小冰发布:全双工语言交互技术已经通过车载设备完成测试
  4. 华为EMUI 10系统内测截图曝光:基于Android 10.0
  5. 苹果发布iOS 12.4首个测试版 苹果信用卡即将来袭
  6. 华为下一代机皇曝光:全新麒麟985+55W超级快充
  7. 苹果iPhone XI奋起直追?直接升级四摄镜头
  8. 小米8大幅降价促销 性价比十足!
  9. 马化腾:5G技术可以影响甚至重构整个互联网服务形态
  10. 晨哥真有料丨对她越好,越难脱单!