Fork / Join框架是使用并发分治法解决问题的框架。 引入它们是为了补充现有的并发API。 在介绍它们之前,现有的ExecutorService实现是运行异步任务的流行选择,但是当任务同质且独立时,它们会发挥最佳作用。 运行依赖的任务并使用这些实现来组合其结果并不容易。 随着Fork / Join框架的引入,人们试图解决这一缺陷。 在本文中,我们将简要介绍API,并解决几个简单的问题以了解其工作原理。

解决非阻塞任务

让我们直接跳入代码。 让我们创建一个任务,该任务将返回List的所有元素的总和。 以下步骤以伪代码表示我们的算法:

01.查找列表的中间索引

02.在中间划分列表

03.递归创建一个新任务,该任务将计算剩余部分的总和

04.递归创建一个新任务,该任务将计算正确部分的总和

05.将左总和,中间元素和右总和的结果相加

这是代码–

@Slf4j
public class ListSummer extends RecursiveTask<Integer> {private final List<Integer> listToSum;ListSummer(List<Integer> listToSum) {this.listToSum = listToSum;}@Overrideprotected Integer compute() {if (listToSum.isEmpty()) {log.info("Found empty list, sum is 0");return 0;}int middleIndex = listToSum.size() / 2;log.info("List {}, middle Index: {}", listToSum, middleIndex);List<Integer> leftSublist = listToSum.subList(0, middleIndex);List<Integer> rightSublist = listToSum.subList(middleIndex + 1, listToSum.size());ListSummer leftSummer = new ListSummer(leftSublist);ListSummer rightSummer = new ListSummer(rightSublist);leftSummer.fork();rightSummer.fork();Integer leftSum = leftSummer.join();Integer rightSum = rightSummer.join();int total = leftSum + listToSum.get(middleIndex) + rightSum;log.info("Left sum is {}, right sum is {}, total is {}", leftSum, rightSum, total);return total;}
}

首先,我们扩展了ForkJoinTask的RecursiveTask子类型。 这是我们期望并发任务返回结果时的扩展类型。 当任务不返回结果而仅执行效果时,我们扩展RecursiveAction子类型。 对于我们解决的大多数实际任务,这两个子类型就足够了。

其次,RecursiveTask和RecursiveAction都定义了一种抽象计算方法。 这是我们进行计算的地方。

第三,在我们的计算方法内部,我们检查通过构造函数传递的列表的大小。 如果为空,则我们已经知道总和的结果为零,然后我们立即返回。 否则,我们将列表分为两个子列表,并创建ListSummer类型的两个实例。 然后,我们在这两个实例上调用fork()方法(在ForkJoinTask中定义)–

leftSummer.fork();
rightSummer.fork();

导致将这些任务安排为异步执行的原因,稍后将在本文中解释用于此目的的确切机制。

之后,我们调用join()方法(也在ForkJoinTask中定义)以等待这两部分的结果

Integer leftSum = leftSummer.join();
Integer rightSum = rightSummer.join();

然后将其与列表的中间元素相加以获得最终结果。

添加了许多日志消息,以使示例更易于理解。 但是,当我们处理包含数千个条目的列表时,拥有详细的日志记录(尤其是记录整个列表)可能不是一个好主意。

就是这样。 现在为测试运行创建一个测试类–

public class ListSummerTest {@Testpublic void shouldSumEmptyList() {ListSummer summer = new ListSummer(List.of());ForkJoinPool forkJoinPool = new ForkJoinPool();forkJoinPool.submit(summer);int result = summer.join();assertThat(result).isZero();}@Testpublic void shouldSumListWithOneElement() {ListSummer summer = new ListSummer(List.of(5));ForkJoinPool forkJoinPool = new ForkJoinPool();forkJoinPool.submit(summer);int result = summer.join();assertThat(result).isEqualTo(5);}@Testpublic void shouldSumListWithMultipleElements() {ListSummer summer = new ListSummer(List.of(1, 2, 3, 4, 5, 6, 7, 8, 9));ForkJoinPool forkJoinPool = new ForkJoinPool();forkJoinPool.submit(summer);int result = summer.join();assertThat(result).isEqualTo(45);}
}

在测试中,我们创建一个ForkJoinPool的实例。 ForkJoinPool是用于运行ForkJoinTasks的唯一ExecutorService实现。 它采用一种称为工作窃取算法的特殊算法。 与其他ExecutorService实现相反,在该实现中,只有一个队列包含要执行的所有任务,在工作窃取实现中,每个工作线程都获得其工作队列。 每个线程都从其队列开始执行任务。

当我们检测到ForkJoinTask可以分解为多个较小的子任务时,便将它们分解为较小的任务,然后在这些任务上调用fork()方法。 该调用导致子任务被推入执行线程的队列中。 在执行期间,当一个线程用尽队列/没有要执行的任务时,它可以从其他线程的队列中“窃取”任务(因此称为“工作窃取”)。 与使用任何其他ExecutorService实现相比,这种窃取行为可以带来更高的吞吐量。

之前,当我们在leftSummer和rightSummer任务实例上调用fork()时,它们被推入执行线程的工作队列中,之后它们被池中的其他活动线程“偷”(依此类推),因为它们确实那时没有其他事情要做。

很酷吧?

解决阻止任务

我们刚才解决的问题本质上是非阻塞的。 如果我们想解决一个阻塞操作的问题,那么为了获得更好的吞吐量,我们将需要改变策略。

让我们用另一个例子来研究一下。 假设我们要创建一个非常简单的网络搜寻器。 该搜寻器将接收HTTP链接列表,执行GET请求以获取响应主体,然后计算响应长度。 这是代码–

@Slf4j
public class ResponseLengthCalculator extends RecursiveTask<Map<String, Integer>> {private final List<String> links;ResponseLengthCalculator(List<String> links) {this.links = links;}@Overrideprotected Map<String, Integer> compute() {if (links.isEmpty()) {log.info("No more links to fetch");return Collections.emptyMap();}int middle = links.size() / 2;log.info("Middle index: {}", links, middle);ResponseLengthCalculator leftPartition = new ResponseLengthCalculator(links.subList(0, middle));ResponseLengthCalculator rightPartition = new ResponseLengthCalculator(links.subList(middle + 1, links.size()));log.info("Forking left partition");leftPartition.fork();log.info("Left partition forked, now forking right partition");rightPartition.fork();log.info("Right partition forked");String middleLink = links.get(middle);HttpRequester httpRequester = new HttpRequester(middleLink);String response;try {log.info("Calling managedBlock for {}", middleLink);ForkJoinPool.managedBlock(httpRequester);response = httpRequester.response;} catch (InterruptedException ex) {log.error("Error occurred while trying to implement blocking link fetcher", ex);response = "";}Map<String, Integer> responseMap = new HashMap<>(links.size());Map<String, Integer> leftLinks = leftPartition.join();responseMap.putAll(leftLinks);responseMap.put(middleLink, response.length());Map<String, Integer> rightLinks = rightPartition.join();responseMap.putAll(rightLinks);log.info("Left map {}, middle length {}, right map {}", leftLinks, response.length(), rightLinks);return responseMap;}private static class HttpRequester implements ForkJoinPool.ManagedBlocker {private final String link;private String response;private HttpRequester(String link) {this.link = link;}@Overridepublic boolean block() {HttpGet headRequest = new HttpGet(link);CloseableHttpClient client = HttpClientBuilder.create().disableRedirectHandling().build();try {log.info("Executing blocking request for {}", link);CloseableHttpResponse response = client.execute(headRequest);log.info("HTTP request for link {} has been executed", link);this.response = EntityUtils.toString(response.getEntity());} catch (IOException e) {log.error("Error while trying to fetch response from link {}: {}", link, e.getMessage());this.response = "";}return true;}@Overridepublic boolean isReleasable() {return false;}}
}

我们创建ForkJoinPool.ManagedBlocker的实现,在其中放置阻塞的HTTP调用。 该接口定义了两个方法– block()和isReleasable() 。 block()方法是我们进行阻塞调用的地方。 在完成阻塞操作之后,我们返回true,指示不再需要进一步的阻塞。 我们从isReleasable()实现中返回false,以向fork-join工作线程指示block()方法实现本质上可能在阻塞。 isReleasable()实现将在调用block()方法之前先由fork-join工作线程调用。 最后,我们通过调用ForkJoinPool.managedBlock()静态方法将HttpRequester实例提交到池中。 之后,我们的阻止任务将开始执行。 当它阻塞HTTP请求时,如果有必要,ForkJoinPool.managedBlock()方法还将安排激活备用线程,以确保足够的并行性。

那么,让我们将此实现用于测试驱动! 这是代码–

public class ResponseLengthCalculatorTest {@Testpublic void shouldReturnEmptyMapForEmptyList() {ResponseLengthCalculator responseLengthCalculator = new ResponseLengthCalculator(Collections.emptyList());ForkJoinPool pool = new ForkJoinPool();pool.submit(responseLengthCalculator);Map<String, Integer> result = responseLengthCalculator.join();assertThat(result).isEmpty();}@Testpublic void shouldHandle200Ok() {ResponseLengthCalculator responseLengthCalculator = new ResponseLengthCalculator(List.of("http://httpstat.us/200"));ForkJoinPool pool = new ForkJoinPool();pool.submit(responseLengthCalculator);Map<String, Integer> result = responseLengthCalculator.join();assertThat(result).hasSize(1).containsKeys("http://httpstat.us/200").containsValue(0);}@Testpublic void shouldFetchResponseForDifferentResponseStatus() {ResponseLengthCalculator responseLengthCalculator = new ResponseLengthCalculator(List.of("http://httpstat.us/200","http://httpstat.us/302","http://httpstat.us/404","http://httpstat.us/502"));ForkJoinPool pool = new ForkJoinPool();pool.submit(responseLengthCalculator);Map<String, Integer> result = responseLengthCalculator.join();assertThat(result).hasSize(4);}
}

今天就这样,伙计们! 与往常一样,任何反馈/改进建议/评论都将受到高度赞赏!

此处讨论的所有示例都可以在Github上找到( 特定提交 )。

大呼大叫的http://httpstat.us服务,对于开发简单的测试非常有帮助。

翻译自: https://www.javacodegeeks.com/2019/01/brief-overview-fork-join-framework-java.html

Java中的Fork / Join框架的简要概述相关推荐

  1. fork join框架_Java中的Fork / Join框架的简要概述

    fork join框架 Fork / Join框架是使用并发分治法解决问题的框架. 引入它们是为了补充现有的并发API. 在介绍它们之前,现有的ExecutorService实现是运行异步任务的流行选 ...

  2. java 中的fork join框架

    文章目录 ForkJoinPool ForkJoinWorkerThread ForkJoinTask 在ForkJoinPool中提交Task java 中的fork join框架 fork joi ...

  3. 【Java】java中的Fork/Join

    1.概述 视频:java中的Fork/Join Fork/Join是什么? Fork/Join框架是Java7提供的并行执行任务框架,思想是将大任务分解成小任务,然后小任务又可以继续分解,然后每个小任 ...

  4. java中fork什么意思_java 中的fork join框架

    fork join框架是java 7中引入框架,这个框架的引入主要是为了提升并行计算的能力. fork join主要有两个步骤,第一就是fork,将一个大任务分成很多个小任务,第二就是join,将第一 ...

  5. Java:使用Fork / Join框架的Mergesort

    此项的目的是显示一个Fork / Join RecursiveAction的简单示例,而不是过多地研究合并合并的可能优化方法,或者比使用Exkutor / Join Pool优于现有的基于Java 6 ...

  6. java forkjoinpool_Java并发——Fork/Join框架与ForkJoinPool

    为了防止无良网站的爬虫抓取文章,特此标识,转载请注明文章出处.LaplaceDemon/ShiJiaqi. http://www.cnblogs.com/shijiaqi1066/p/4631466. ...

  7. Java 并发 (13) -- Fork/Join 框架

    文章目录 1. 简介 2. 精讲 1. 什么是 Fork/Join 框架 2. 工作窃取算法 3. Fork/Join 框架的设计 4. 使用 Fork/Join 框架 5. Fork/Join 框架 ...

  8. Java 7:Fork / Join框架示例

    Java 7中的Fork / Join Framework专为可分解为较小任务的工作而设计,并将这些任务的结果组合起来以产生最终结果. 通常,使用Fork / Join Framework的类遵循以下 ...

  9. Levenshtein Distance编辑距离应用实践——拼写检查(Java fork/join框架实现)

    文章目录 1. 实现莱文斯坦距离算法 1.1 算法原理分析 1.2 代码实现 2. 使用fork/join进行匹配 2.1 单线程匹配 2.2 fork/join多线程匹配 2.3 传统多线程版本 2 ...

最新文章

  1. 《Java工程师修炼之道》内容概览
  2. H5 新标签用法及解释
  3. LED驱动设计及实现
  4. 微信小程序(5)wx:if 条件判断
  5. ajax中加上AntiForgeryToken防止CSRF攻击
  6. python显示文件夹图片_如何显示文件夹中的随机图片(Python)
  7. 【转】逆变与协变详解
  8. spring boot配置logback日志
  9. Tomcat 异常关闭排查
  10. 变频器调试工具:ABB Drive Composer
  11. oracle 取awr报告,Oracle生成awr报告
  12. 51单片机课设 计算器
  13. Go dep init失败
  14. RAR及ZIP压缩文件解压提示文件损坏或无法解压原因及修复办法全解析
  15. 荣耀笔记本linux版硬盘分区,荣耀MagicBook硬盘分区详细教程
  16. 计算机的端口以及tcp/ip中的端口
  17. SVG公众号排版 | 快速解决视频号美化出现“点赞信息”
  18. 关于Boost电感的问题
  19. SAP中采购订单历史分类标识与实际业务描述
  20. 电脑投屏到电视怎么操作_无线投屏器应用

热门文章

  1. 【二分】最大均值(ybtoj 二分-1-3)
  2. Codeforces 1176F
  3. 2017西安交大ACM小学期数据结构 [分块,区间修改,单点查询]
  4. A Boring Game
  5. SpringBoot2.1.9 分布式锁ShedLock
  6. 再有人问你volatile是什么,就把这篇文章发给他
  7. Oracle入门(十四.5)之识别数据类型
  8. 服务降级的概念及应用手段
  9. javafx之TableView的TableColumn
  10. 《朝花夕拾》金句摘抄(六)