前言

谈到并行,我们可能最先想到的是线程,多个线程一起运行,来提高我们系统的整体处理速度;为什么使用多个线程就能提高处理速度,因为现在计算机普遍都是多核处理器,我们需要充分利用cpu资源;如果站的更高一点来看,我们每台机器都可以是一个处理节点,多台机器并行处理;并行的处理方式可以说无处不在,本文主要来谈谈Java在并行处理方面的努力。

无处不在的并行

Java的垃圾回收器,我们可以看到每一代版本的更新,伴随着GC更短的延迟,从serial到cms再到现在的G1,一直在摘掉Java慢的帽子;消息队列从早期的ActiveMQ到现在的kafka和RocketMQ,引入的分区的概念,提高了消息的并行性;数据库单表数据到一定量级之后,访问速度会很慢,我们会对表进行分表处理,引入数据库中间件;Redis你可能觉得本身处理是单线程的,但是Redis的集群方案中引入了slot(槽)的概念;更普遍的就是我们很多的业务系统,通常会部署多台,通过负载均衡器来进行分发;好了还有其他的一些例子,此处不在一一例举。

如何并行

我觉得并行的核心在于"拆分",把大任务变成小任务,然后利用多核CPU也好,还是多节点也好,同时并行的处理,Java历代版本的更新,都在为我们开发者提供更方便的并行处理,从开始的Thread,到线程池,再到fork/join框架,最后到流处理,下面使用简单的求和例子来看看各种方式是如何并行处理的;

单线程处理

首先看一下最简单的单线程处理方式,直接使用主线程进行求和操作;

public class SingleThread {

public static long[] numbers;

public static void main(String[] args) {

numbers = LongStream.rangeClosed(1, 10_000_000).toArray();

long sum = 0;

for (int i = 0; i < numbers.length; i++) {

sum += numbers[i];

}

System.out.println("sum = " + sum);

}

}

求和本身是一个计算密集型任务,但是现在已经是多核时代,只用单线程,相当于只使用了其中一个cpu,其他cpu被闲置,资源的浪费;

Thread方式

我们把任务拆分成多个小任务,然后每个小任务分别启动一个线程,如下所示:

public class ThreadTest {

public static final int THRESHOLD = 10_000;

public static long[] numbers;

private static long allSum;

public static void main(String[] args) throws Exception {

numbers = LongStream.rangeClosed(1, 10_000_000).toArray();

int taskSize = (int) (numbers.length / THRESHOLD);

for (int i = 1; i <= taskSize; i++) {

final int key = i;

new Thread(new Runnable() {

public void run() {

sumAll(sum((key - 1) * THRESHOLD, key * THRESHOLD));

}

}).start();

}

Thread.sleep(100);

System.out.println("allSum = " + getAllSum());

}

private static synchronized long sumAll(long threadSum) {

return allSum += threadSum;

}

public static synchronized long getAllSum() {

return allSum;

}

private static long sum(int start, int end) {

long sum = 0;

for (int i = start; i < end; i++) {

sum += numbers[i];

}

return sum;

}

}

以上指定了一个拆分阀值,计算拆分多少个认为,同时启动多少线程;这种处理就是启动的线程数过多,而CPU数有限,更重要的是求和是一个计算密集型任务,启动过多的线程只会带来更多的线程上下文切换;同时线程处理完一个任务就终止了,也是对资源的浪费;另外可以看到主线程不知道何时子任务已经处理完了,需要做额外的处理;所有Java后续引入了线程池。

线程池方式

jdk1.5引入了并发包,其中包括了ThreadPoolExecutor,相关代码如下:

public class ExecutorServiceTest {

public static final int THRESHOLD = 10_000;

public static long[] numbers;

public static void main(String[] args) throws Exception {

numbers = LongStream.rangeClosed(1, 10_000_000).toArray();

ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);

CompletionService completionService = new ExecutorCompletionService(executor);

int taskSize = (int) (numbers.length / THRESHOLD);

for (int i = 1; i <= taskSize; i++) {

final int key = i;

completionService.submit(new Callable() {

@Override

public Long call() throws Exception {

return sum((key - 1) * THRESHOLD, key * THRESHOLD);

}

});

}

long sumValue = 0;

for (int i = 0; i < taskSize; i++) {

sumValue += completionService.take().get();

}

// 所有任务已经完成,关闭线程池

System.out.println("sumValue = " + sumValue);

executor.shutdown();

}

private static long sum(int start, int end) {

long sum = 0;

for (int i = start; i < end; i++) {

sum += numbers[i];

}

return sum;

}

}

上面已经分析了计算密集型并不是线程越多越好,这里创建了JDK默认的线程数:CPU数+1,这是一个经过大量测试以后给出的一个结果;线程池顾名思义,可以重复利用现有的线程;同时利用CompletionService来对子任务进行汇总;合理的使用线程池已经可以充分的并行处理任务,只是在写法上有点繁琐,此时JDK1.7中引入了fork/join框架;

fork/join框架

分支/合并框架的目的是以递归的方式将可以并行的认为拆分成更小的任务,然后将每个子任务的结果合并起来生成整体结果;相关代码如下:

public class ForkJoinTest extends java.util.concurrent.RecursiveTask {

private static final long serialVersionUID = 1L;

private final long[] numbers;

private final int start;

private final int end;

public static final long THRESHOLD = 10_000;

public ForkJoinTest(long[] numbers) {

this(numbers, 0, numbers.length);

}

private ForkJoinTest(long[] numbers, int start, int end) {

this.numbers = numbers;

this.start = start;

this.end = end;

}

@Override

protected Long compute() {

int length = end - start;

if (length <= THRESHOLD) {

return computeSequentially();

}

ForkJoinTest leftTask = new ForkJoinTest(numbers, start, start + length / 2);

leftTask.fork();

ForkJoinTest rightTask = new ForkJoinTest(numbers, start + length / 2, end);

Long rightResult = rightTask.compute();

// 注:join方法会阻塞,因此有必要在两个子任务的计算都开始之后才执行join方法

Long leftResult = leftTask.join();

return leftResult + rightResult;

}

private long computeSequentially() {

long sum = 0;

for (int i = start; i < end; i++) {

sum += numbers[i];

}

return sum;

}

public static void main(String[] args) {

System.out.println(forkJoinSum(10_000_000));

}

public static long forkJoinSum(long n) {

long[] numbers = LongStream.rangeClosed(1, n).toArray();

ForkJoinTask task = new ForkJoinTest(numbers);

return new ForkJoinPool().invoke(task);

}

}

ForkJoinPool是ExecutorService接口的一个实现,子认为分配给线程池中的工作线程;同时需要把任务提交到此线程池中,需要创建RecursiveTask的一个子类;大体逻辑就是通过fork进行拆分,然后通过join进行结果的合并,JDK为我们提供了一个框架,我们只需要在里面填充即可,更加方便;有没有更简单的方式,连拆分都省了,自动拆分合并,jdk在1.8中引入了流的概念;

流方式

Java8引入了stream的概念,可以让我们更好的利用并行,使用流代码如下:

public class StreamTest {

public static void main(String[] args) {

System.out.println("sum = " + parallelRangedSum(10_000_000));

}

public static long parallelRangedSum(long n) {

return LongStream.rangeClosed(1, n).parallel().reduce(0L, Long::sum);

}

}

以上代码是不是非常简单,对于开发者来说完全不需要手动拆分,使用同步机制等方式,就可以让任务并行处理,只需要对流使用parallel()方法,系统自动会对任务进行拆分,当然前提是没有共享可变状态;其实并行流内部使用的也是fork/join框架;

总结

本文使用一个求和的实例,来介绍了jdk为开发者提供并行处理的各种方式,可以看到Java一直在为提供更方便的并行处理而努力。

参考

<>

java多核并行计算_谈谈Java任务的并行处理相关推荐

  1. java stream 求和_谈谈Java任务的并行处理

    作者:ksfzhaohui 前言 谈到并行,我们可能最先想到的是线程,多个线程一起运行,来提高我们系统的整体处理速度:为什么使用多个线程就能提高处理速度,因为现在计算机普遍都是多核处理器,我们需要充分 ...

  2. c 与java 反射性能_谈谈Java 反射的快慢

    [相关学习推荐:java基础教程] 反射到底是好是坏 说到Java 中的反射,初学者在刚刚接触到反射的各种高级特性时,往往表示十分兴奋,甚至会在一些不需要使用反射的场景中强行使用反射来「炫技」.而经验 ...

  3. check在java意思吗_谈谈Java:Checked Exception与 unCheckException Runtime Exception 的区别...

    Java里有个很重要的特色是Exception ,也就是说允许程序产生例外状况.而在学Java 的时候,我们也只知道Exception 的写法,却未必真能了解不同种类的Exception 的区别. 首 ...

  4. jvm对于java的意义_谈谈对JVM的理解

    JVM可谓是学习JAVA基础中的基础了,但仍有不少同学对JVM概念还是比较模糊,甚至没有听说过,对java的理解也只是在基础语法 层面,本文就将对JVM进行初步介绍,因篇幅所限,只能介绍JVM基础,如 ...

  5. java异常处理方式推荐做法_谈谈Java异常处理这件事儿

    此文已由作者谢蕾授权网易云社区发布. 欢迎访问网易云社区,了解更多网易技术产品运营经验. 前言 我们对于"异常处理"这个词并不陌生,众多框架和库在异常处理方面都提供了便利,但是对于 ...

  6. java 节假日_谈谈JAVA实现节假日验证

    原标题:谈谈JAVA实现节假日验证 我们需要两个类,第一个类: 我们叫它验证类. 第二个类: 它是对法定节假日的抽象. 第一步开始: 当验证类被初始化的时候,会加载本年的所有法定节假日到一个list里 ...

  7. java面向对象_谈谈Java的面向对象

    类的拷贝和构造 C++是默认具有拷贝语义的,对于没有拷贝运算符和拷贝构造函数的类,可以直接进行二进制拷贝,但是Java并不天生支持深拷贝,它的拷贝只是拷贝在堆上的地址,不同的变量引用的是堆上的同一个对 ...

  8. java正则表达式 匹配()_学习Java正则表达式(匹配、替换、查找)

    import java.util.ArrayList; import java.util.regex.Matcher; import java.util.regex.Pattern; public c ...

  9. 纯java分布式内存数据库_最新Java岗面试清单:分布式+Dubbo+线程+Redis+数据库+JVM+并发...

    最近可能有点闲的慌,没事就去找面试面经,整理了一波面试题.我大概是分成了Java基础.中级.高级,分布式,Spring架构,多线程,网络,MySQL,Redis缓存,JVM相关,调优,设计模式,算法与 ...

最新文章

  1. antlr-2.7.6.jar的作用
  2. Git命令比较两个分支commit 差异
  3. 开发日记-20190430 关键词 apt,aspectj,javassist
  4. AtCoder AGC004F Namori (图论)
  5. 存储过程,触发器,事务和锁
  6. CG CTF WEB 综合题2
  7. 关于无线的Idle Timeout和Session Timeout
  8. 阿里云物联网MQTT三元数生成
  9. Java 1.1字符串
  10. Vmware虚机机挂起后无法远程连接
  11. 分享几个Python小技巧函数里的4个小花招 1
  12. java里if为真_Excel:如果所有条件都为真,则嵌套的IF语句返回所有真值
  13. ecall 方法必须打包到系统模块中_基于SpringBoot+Vue+Mysql开发的进销存管理系统
  14. 腾讯QQ会员技术团队:人人都可以做深度学习应用:入门篇(下)
  15. ps去水印教程_PS去水印教程:运用色彩原理去除半透明水印
  16. Thinkphp内核开发盲盒商城源码v2.0 对接易支付/阿里云短信/七牛云存储
  17. TensorFlow机器翻译之moses切词(附:ActivePerl安装)
  18. java 时间格式检查
  19. 哪些产品需要做3C认证,费用是多少
  20. 自定义launcher预置widget无法跳入app,显示无法添加微件问题

热门文章

  1. hikari数据源配置类_Spring中的“多数据源”之详解
  2. python怎么加图片_怎么在图片旁边加文字 python如何在图片上添加文字 - 励志 - 52资讯网...
  3. Setup Factory打包时实现第三方DLL文件的注册
  4. php 如何设计索引_Mysql学习浅谈mysql的索引设计原则以及常见索引的区别
  5. 浅谈 DDoS 攻击与防御
  6. SQL Server 将一个表中字段的值复制到另一个表的字段中
  7. Queue(队列)-Swift实现与广度优先搜索应用
  8. 网络安全-安全散列函数,信息摘要SHA-1,MD5原理
  9. js 运算符 语句
  10. JBoss5 启动报错java.lang.IllegalArgumentException: ...