2019独角兽企业重金招聘Python工程师标准>>>

在之前的例子中,我们使用执行器框架都是在主类中提交任务,等待任务执行完毕后再去处理任务执行的结果。接下来我们打算将任务的提交和结果的处理都放置到线程中去执行。在每个任务内部提交自己到执行器,然后通过一个统一的结果处理线程来处理所有任务执行的结果。

为了解决这个问题,执行器框架为我们提供了一个CompletionService类,任务执行线程和结果处理线程能够共享这个类,结果处理线程便可以在这里渠道已经执行完毕的任务的结果。CompletionService类的内部也是通过一个ExecutorService来提交任务的。

首先,创建任务线程,实现Callable接口。模拟报表生成过程。

/*** 模拟生成报告** Created by hadoop on 2016/11/3.*/
public class ReportGenerator implements Callable<String> {private String sender;private String title;public ReportGenerator(String sender, String title) {this.sender = sender;this.title = title;}@Overridepublic String call() throws Exception {long duration = (long)(Math.random() * 10);System.out.printf("ReportGenerator: Generator report %s_%s duration %d seconds.\n", sender, title, duration);TimeUnit.SECONDS.sleep(duration);return sender + "_" + title;}
}

然后我们创建任务提交线程,这个线程的构造方法接受两个参数,分别是报表名称和CompletionService对象。将报表生成任务提交到CompletionService去执行。

import java.util.concurrent.CompletionService;/*** 模拟请求获取报告** Created by hadoop on 2016/11/3.*/
public class ReportRequest implements Runnable {private String name;private CompletionService<String> service;public ReportRequest(String name, CompletionService<String> service) {this.name = name;this.service = service;}@Overridepublic void run() {ReportGenerator generator = new ReportGenerator(name, "Report");service.submit(generator);}
}

下面我们创建任务结果处理类,来打印生成的报表。这个类同样会拿到CompletionService的引用,然后循环调用CompletionService.poll()方法来从任务结果队列中获取执行的结果,这个方法接受一个时间参数,如果当前结果队列为空,那么则等待这个时间,超时返回null。不带参数的poll()方法,如果对别为空则直接返回null。

/*** 处理报表结果** Created by hadoop on 2016/11/3.*/
public class ReportProcessor implements Runnable {private boolean end;private CompletionService<String> service;public ReportProcessor(boolean end, CompletionService<String> service) {this.end = end;this.service = service;}@Overridepublic void run() {while (!end) {try {Future<String> future = service.poll(20, TimeUnit.SECONDS);if (future != null) {System.out.printf("ReportReceiver: received %s\n", future.get());}} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}}public void setEnd(boolean end) {this.end = end;}
}

最后我们创建主方法类。在这里我们创建ExecutorServer并把它赋值给ExecutorCompletionService。之后创建两个报表请求任务和一个报表处理任务,同时持有ExecutorCompletionService的引用。

import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;/*** 在执行器中分离任务执行和结果处理** 我们如何处理在一个对象里发送任务给执行器,在另一个对象里处理任务执行结果。* 对于这种情况Java API提供了CompletionService类** CompletionService使用Executor对象类执行任务。*   优势在于:可以共享CompletionService。*   缺点在于:CompletionService获取的Future对象只能是已经执行完毕的任务,他没有办法控制任务状态,只能处理任务结果。** 我们创建了一个ExecutorService,然后使用这个ExecutorService来初始化一个ExecutorCompletionService<String>(executor)。** 首先创建了两个ReportRequest任务,然后在任务内部使用service.submit(generator)方法调用报表生成任务。** 然后再ReportProcessor中不断调用service.poll(20, TimeUnit.SECONDS);方法获取已经执行完的结果,如果当前没有结果那么等待20秒。** CompletionService还提供了两个人方法:*   poll():如果没有任何Future直接返回null。*   take():如若任务队列中没有Future那么阻塞知道有可用的Future。** Created by hadoop on 2016/11/3.*/
public class Main {public static void main(String[] args) {ExecutorService executor = Executors.newCachedThreadPool();CompletionService<String> service = new ExecutorCompletionService<String>(executor);ReportRequest request1 = new ReportRequest("Face", service);ReportRequest request2 = new ReportRequest("Online", service);ReportProcessor processor = new ReportProcessor(false, service);Thread thread1 = new Thread(request1);Thread thread2 = new Thread(request2);Thread thread3 = new Thread(processor);thread1.start();thread2.start();thread3.start();try {thread1.join();thread2.join();} catch (InterruptedException e) {e.printStackTrace();}executor.shutdown();try {executor.awaitTermination(1, TimeUnit.SECONDS);} catch (InterruptedException e) {e.printStackTrace();}try {TimeUnit.SECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}processor.setEnd(true);}
}

控制台中,我们可以看到两个任务的提交信息和结果处理信息。

ReportGenerator: Generator report Online_Report duration 4 seconds.
ReportGenerator: Generator report Face_Report duration 1 seconds.
ReportReceiver: received Face_Report
ReportReceiver: received Online_Report

转载于:https://my.oschina.net/nenusoul/blog/849165

Java并发编程高级篇(十):分离任务的执行和结果的处理相关推荐

  1. Java并发编程高级篇(八):在执行器中取消任务

    2019独角兽企业重金招聘Python工程师标准>>> 前面我们已经学习如何把任务发送给执行器去执行,但是当我们想要取消一个已经发送给执行器的任务该怎么办呢.可以使用Future对象 ...

  2. Java并发编程|第二篇:线程生命周期

    文章目录 系列文章 1.线程的状态 2.线程生命周期 3.状态测试代码 4.线程终止 4.1 线程执行完成 4.2 interrupt 5.线程复位 5.1interrupted 5.2抛出异常 6. ...

  3. Java 并发编程之美:并发编程高级篇之一-chat

    借用 Java 并发编程实践中的话:编写正确的程序并不容易,而编写正常的并发程序就更难了.相比于顺序执行的情况,多线程的线程安全问题是微妙而且出乎意料的,因为在没有进行适当同步的情况下多线程中各个操作 ...

  4. Java 并发编程之美:并发编程高级篇之一

    借用 Java 并发编程实践中的话:编写正确的程序并不容易,而编写正常的并发程序就更难了.相比于顺序执行的情况,多线程的线程安全问题是微妙而且出乎意料的,因为在没有进行适当同步的情况下多线程中各个操作 ...

  5. java并发编程入门_探讨一下!Java并发编程基础篇一

    Java并发编程想必大家都不陌生,它是实现高并发/高流量的基础,今天我们就来一起学习这方面的内容. 什么是线程?什么是进程?他们之间有什么联系? 简单来说,进程就是程序的一次执行过程,它是系统进行资源 ...

  6. 【Java并发编程】之十六:深入Java内存模型——happen-before规则及其对DCL的分析(含代码)...

    Java并发编程系列 版权声明:本文为博主原创文章,未经博主允许不得转载. https://blog.csdn.net/mmc_maodun/article/details/17348313 转载请注 ...

  7. 转:【Java并发编程】之十六:深入Java内存模型——happen-before规则及其对DCL的分析(含代码)...

    转载请注明出处:http://blog.csdn.net/ns_code/article/details/17348313 happen-before规则介绍 Java语言中有一个"先行发生 ...

  8. 《JUC并发编程 - 高级篇》01 - 进程与线程概述 | 02 - Java线程(创建线程、查看线程、线程常见方法、线程状态)

  9. Java并发编程中级篇(一):使用Semaphore信号量进行并发控制

    2019独角兽企业重金招聘Python工程师标准>>> Semaphore是一个二进制信号量,只有0和1两个值.如果线程想要访问一个共享资源,它必须先获得信号量.如果信号量的内部计数 ...

最新文章

  1. matlab中阈值计算方法,三种阈值计算方法在MatLab6.5中的实现
  2. [转]ArcGIS.Server.9.3和ArcGIS API for Flex实现Toolbar功能(四)
  3. 工作277:v-model实战
  4. linux+有趣的指令,6个有趣的Linux命令(乐趣终端) - 第二部分
  5. 【java】java 并发编程 BlockingQueue 和 BlockingDeque
  6. element ui下拉框实现
  7. C# VS如何整个项目中查找字符串
  8. numpy教程:排序、搜索和计数
  9. kancloud mysql内核_锁 · Mysql · 看云
  10. 考研408 完整知识点篇2.0版
  11. cad解除块的快捷命令_CAD撤销上一步和恢復下一步的快捷键是什么?
  12. (openCV 十二)图像增强(对数变换/伽马变换/分段线性变换)
  13. Sketch for mac|矢量绘图设计
  14. hardhat 教程及 hardhat-deploy 插件使用
  15. java计算机毕业设计健身房管理系统演示录像2021MyBatis+系统+LW文档+源码+调试部署
  16. 分布式理论(五)—— 一致性算法 Paxos
  17. 修改mp3图片和信息——BesMp3Editor
  18. 北航计算机组成实验课,北航计算机组成实验Project4
  19. 世纪互联私有化历时一年后搁浅 清华系上位雷军系白忙
  20. 俄警告将在东欧部署导弹报复美国 惹怒波兰

热门文章

  1. mongodb mysql 写_MongoDB与MySQL关于写确认的异同
  2. mysql event执行记录_mysql event建立模板(可记录执行履历)
  3. linux php版本升级_玩转Linux,介绍一个强大的Linux服务器管理面板,比宝塔更强...
  4. C语言树形文件结构的创建,C语言二叉树
  5. java webrtc ns降噪_单独编译和使用webrtc音频降噪模块(附完整源码+测试音频文件)...
  6. mysql 获取子分类_MySQL 自定义函数获取一个分类的无限级子分类
  7. liferay + struts2 + spring + ibatis整合开发案例
  8. formdata怎么传数组_如何使用formData上传file数组
  9. 《剑指offer》把数组排成最小的数
  10. TensorFlow学习笔记(二十) tensorflow实现简单三层网络