基于线程池/执行器的实现

比原始线程版本更好的方法是基于线程池的线程池,其中基于运行任务的系统定义了适当的线程池大小– CPU数量/(任务的1-Blocking Coefficient)。 Venkat Subramaniams书中有更多详细信息:

首先,在给定“报告部件请求”的情况下,我定义了一个自定义任务来生成“报告部件”,将其实现为Callable :

public class ReportPartRequestCallable implements Callable<ReportPart> {private final ReportRequestPart reportRequestPart;private final ReportPartGenerator reportPartGenerator;public ReportPartRequestCallable(ReportRequestPart reportRequestPart, ReportPartGenerator reportPartGenerator) {this.reportRequestPart = reportRequestPart;this.reportPartGenerator = reportPartGenerator;}@Overridepublic ReportPart call() {return this.reportPartGenerator.generateReportPart(reportRequestPart);}
}
public class ExecutorsBasedReportGenerator implements ReportGenerator {private static final Logger logger = LoggerFactory.getLogger(ExecutorsBasedReportGenerator.class);private ReportPartGenerator reportPartGenerator;private ExecutorService executors = Executors.newFixedThreadPool(10);@Overridepublic Report generateReport(ReportRequest reportRequest) {List<Callable<ReportPart>> tasks = new ArrayList<Callable<ReportPart>>();List<ReportRequestPart> reportRequestParts = reportRequest.getRequestParts();for (ReportRequestPart reportRequestPart : reportRequestParts) {tasks.add(new ReportPartRequestCallable(reportRequestPart, reportPartGenerator));}List<Future<ReportPart>> responseForReportPartList;List<ReportPart> reportParts = new ArrayList<ReportPart>();try {responseForReportPartList = executors.invokeAll(tasks);for (Future<ReportPart> reportPartFuture : responseForReportPartList) {reportParts.add(reportPartFuture.get());}} catch (Exception e) {logger.error(e.getMessage(), e);throw new RuntimeException(e);}return new Report(reportParts);}......
}

在这里,使用Executors.newFixedThreadPool(10)调用创建线程池,线程池的大小为10,为每个报告请求部分生成一个可调用任务,并使用ExecutorService抽象将其移交给线程池

responseForReportPartList = executors.invokeAll(tasks);

此调用返回一个期货列表,该列表支持get()方法,这是对响应可用的阻塞调用。

与原始线程版本相比,这显然是一个更好的实现,线程数量在负载下被限制为可管理的数量。

基于Spring集成的实现

我个人最喜欢的方法是使用Spring Integration ,原因是使用Spring Integration时 ,我专注于完成不同任务的组件,并留给Spring Integration使用基于xml或基于注释的配置将流程连接在一起。 在这里,我将使用基于XML的配置:

在我的案例中,这些组件是:
1.给出报告部分的请求,生成报告部分的组件,我之前已经显示过 。
2.用于将报告请求拆分为报告请求部分的组件:

public class DefaultReportRequestSplitter implements ReportRequestSplitter{@Overridepublic List<ReportRequestPart> split(ReportRequest reportRequest) {return reportRequest.getRequestParts();}
}

3.用于将报告部分组装/汇总为整个报告的组件:

public class DefaultReportAggregator implements ReportAggregator{@Overridepublic Report aggregate(List<ReportPart> reportParts) {return new Report(reportParts);}}

这就是Spring Integration所需的所有Java代码,其余的都是接线–在这里,我使用了Spring Integration配置文件:

<?xml version='1.0' encoding='UTF-8'?>
<beans ....<int:channel id='report.partsChannel'/><int:channel id='report.reportChannel'/><int:channel id='report.partReportChannel'><int:queue capacity='50'/></int:channel>  <int:channel id='report.joinPartsChannel'/><int:splitter id='splitter' ref='reportsPartSplitter' method='split' input-channel='report.partsChannel' output-channel='report.partReportChannel'/><task:executor id='reportPartGeneratorExecutor' pool-size='10' queue-capacity='50' /><int:service-activator id='reportsPartServiceActivator'  ref='reportPartReportGenerator' method='generateReportPart' input-channel='report.partReportChannel' output-channel='report.joinPartsChannel'><int:poller task-executor='reportPartGeneratorExecutor' fixed-delay='500'></int:poller></int:service-activator><int:aggregator ref='reportAggregator' method='aggregate' input-channel='report.joinPartsChannel' output-channel='report.reportChannel' ></int:aggregator> <int:gateway id='reportGeneratorGateway' service-interface='org.bk.sisample.springintegration.ReportGeneratorGateway' default-request-channel='report.partsChannel' default-reply-channel='report.reportChannel'/><bean name='reportsPartSplitter' class='org.bk.sisample.springintegration.processors.DefaultReportRequestSplitter'></bean><bean name='reportPartReportGenerator' class='org.bk.sisample.processors.DummyReportPartGenerator'/><bean name='reportAggregator' class='org.bk.sisample.springintegration.processors.DefaultReportAggregator'/><bean name='reportGenerator' class='org.bk.sisample.springintegration.SpringIntegrationBasedReportGenerator'/></beans>

Spring Source Tool Suite提供了一种可视化此文件的好方法:

这完全符合我对用户流的原始看法:

在代码的Spring Integration版本中,我定义了不同的组件来处理流程的不同部分:
1.将报告请求转换为报告请求部分的拆分器:

<int:splitter id='splitter' ref='reportsPartSplitter' method='split' input-channel='report.partsChannel' output-channel='report.partReportChannel'/>

2.服务激活器组件,用于根据报告部件请求生成报告部件:

<int:service-activator id='reportsPartServiceActivator'  ref='reportPartReportGenerator' method='generateReportPart' input-channel='report.partReportChannel' output-channel='report.joinPartsChannel'><int:poller task-executor='reportPartGeneratorExecutor' fixed-delay='500'></int:poller></int:service-activator>

3.聚合器,用于将报表部分重新加入报表,并且足够智能,可以适当地关联原始拆分报表请求,而无需任何显式编码:

<int:aggregator ref='reportAggregator' method='aggregate' input-channel='report.joinPartsChannel' output-channel='report.reportChannel' ></int:aggregator>

这段代码有趣的是,就像在基于执行者的示例中一样,使用xml文件,通过使用适当的通道将不同的组件连接在一起以及通过使用任务执行器 ,可以完全配置服务于每个组件的线程数。设置为执行程序属性的线程池大小。

在这段代码中,我定义了一个队列通道,其中报告请求部分进入其中:

<int:channel id='report.partReportChannel'><int:queue capacity='50'/></int:channel>

并由服务激活器组件使用任务执行器提供服务,该任务执行器的线程池大小为10,容量为50:

<task:executor id='reportPartGeneratorExecutor' pool-size='10' queue-capacity='50' /><int:service-activator id='reportsPartServiceActivator'  ref='reportPartReportGenerator' method='generateReportPart' input-channel='report.partReportChannel' output-channel='report.joinPartsChannel'><int:poller task-executor='reportPartGeneratorExecutor' fixed-delay='500'></int:poller></int:service-activator>

所有这些都通过配置!

该示例的完整代码库可在以下github位置获得: https : //github.com/bijukunjummen/si-sample

参考: 并发–来自JCG合作伙伴 Biju Kunjummen的“ 执行程序和Spring集成” ,位于all和其他博客上。

翻译自: https://www.javacodegeeks.com/2012/06/concurrency-executors-and-spring.html

并发–执行程序和Spring集成相关推荐

  1. 从零开始学 Java - Spring 集成 Memcached 缓存配置(二)

    Memcached 客户端选择 上一篇文章 从零开始学 Java - Spring 集成 Memcached 缓存配置(一)中我们讲到这篇要谈客户端的选择,在 Java 中一般常用的有三个: Memc ...

  2. Spring集成Shiro框架实战

    文章目录 一:什么是Shiro框架 二:Shiro框架简介 1.Shiro基础功能点介绍 2.Shiro的工作原理 3.Shiro的内部工作结构 4.Shiro的身份认证流程 三:Spring集成Sh ...

  3. Spring集成基础知识

    本文是我们名为" EAI的Spring集成 "的学院课程的一部分. 在本课程中,向您介绍了企业应用程序集成模式以及Spring Integration如何解决它们. 接下来,您将深 ...

  4. Activiti配置实例以及Spring集成配置

    public class TestDB {public static void main(String[] args) {//1. 创建Activiti配置对象的实例ProcessEngineConf ...

  5. Spring集成Mybatis多数据源配置

    既然在整理Mybatis那就把经常用的这个多数据源的笔记也整一下吧. Spring集成Mybatis在之前就已经提到了.Spring集成Mybatis 集成Mybatis多数据源有两种方式: 1.创建 ...

  6. Spring集成Memcached三种方式(一)

    Spring集成Memcached三种方式(一) 转载:http://blog.csdn.net/u013725455/article/details/52102170 Memcached Clien ...

  7. (转)为Spring集成的Hibernate配置二级缓存

    http://blog.csdn.net/yerenyuan_pku/article/details/52896195 前面我们已经集成了Spring4.2.5+Hibernate4.3.11+Str ...

  8. spring集成kafka,以及常见错误解决

    spring集成kafka,以及常见错误解决 一.配置kafka 1.引入jar包 <!--Kafka和spring集成的支持类库,spring和kafka通信监听--><!-- h ...

  9. 深入浅出MyBatis:MyBatis与Spring集成及实用场景

    为什么80%的码农都做不了架构师?>>>    本系列是「深入浅出MyBatis:技术原理与实践」书籍的总结笔记. 本篇是「深入浅出MyBatis」系列的最后一篇,主要介绍与Spri ...

最新文章

  1. asp.net 2.0 技巧2
  2. PAT乙级 1038 统计同成绩学生 C++)
  3. python字符串长度_如何使用python获取字符串长度?哪些方法?
  4. LeetCode 1859. 将句子排序
  5. Linux数据库1366错误,ERROR 1366 (HY000): Incorrect string value:’XXX’ for column 'XXX at row 1解决...
  6. R语言聚类算法之k均值聚类(K-means)
  7. php程序设计经典300例,第16-20例
  8. react的单文件编写方式
  9. c++两个文档匹配数据_MongoDB 数据库的命名、设计规范
  10. 拓端tecdat|R语言中实现广义相加模型GAM和普通最小二乘(OLS)回归
  11. ssas连接mysql_BI-SSAS简介篇
  12. Zephyr学习(一)Zephyr介绍
  13. Fiddler中文版汉化插件 0.1
  14. 2020年会必备,Excel轻松制作抽奖小游戏
  15. C++函数参数中的省略号
  16. 蚂蚁金服缘何自研Service Mesh?
  17. 新购买的阿里云虚拟机部署项目
  18. 各种友(e)善(xin)数论总集(未完待续),从入门到绝望
  19. Android build.prop参数详解
  20. 三菱M80操作介绍_三菱PLC操作

热门文章

  1. 转自: SparkConf 配置的概念和用法
  2. JSON转换工具---jackson
  3. 单列集合List的实现类
  4. aws dynamodb_使用适用于Java 2的AWS开发工具包的AWS DynamoDB版本字段
  5. vue中生产模式和调试模式_为什么在生产中进行调试是如此诱人?
  6. java登录界面命令_Java命令行界面(第26部分):CmdOption
  7. java8并行流_Java 8:CompletableFuture与并行流
  8. java8升级java12_为什么现在是升级到Java 8的最佳时机
  9. 集成spring mvc_向Spring MVC Web应用程序添加社交登录:集成测试
  10. gradle文件不识别_识别Gradle约定