一、简介
   Exchanger是自jdk1.5起开始提供的工具套件,一般用于两个工作线程之间交换数据。在本文中我将采取由浅入深的方式来介绍分析这个工具类。首先我们来看看官方的api文档中的叙述:

A synchronization point at which threads can pair and swap elements within pairs. Each thread presents some object on entry to the exchange method, matches with a partner thread, and receives its partner's object on return. An Exchanger may be viewed as a bidirectional form of a SynchronousQueue. Exchangers may be useful in applications such as genetic algorithms and pipeline designs.

在以上的描述中,有几个要点:

  • 此类提供对外的操作是同步的;
  • 用于成对出现的线程之间交换数据;
  • 可以视作双向的同步队列;
  • 可应用于基因算法、流水线设计等场景。

接着看api文档,这个类提供对外的接口非常简洁,一个无参构造函数,两个重载的范型exchange方法:
public V exchange(V x) throws InterruptedException
public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
   从官方的javadoc可以知道,当一个线程到达exchange调用点时,如果它的伙伴线程此前已经调用了此方法,那么它的伙伴会被调度唤醒并与之进行对象交换,然后各自返回。如果它的伙伴还没到达交换点,那么当前线程将会被挂起,直至伙伴线程到达——完成交换正常返回;或者当前线程被中断——抛出中断异常;又或者是等候超时——抛出超时异常。

二、一个简单的例子
按照某大师的观点,行为知之先,在知道了Exchanger的大致用途并参阅了使用说明后,我们马上动手写个例子来跑一跑:

import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;/*** @Title: ExchangerTest* @Description: Test class for Exchanger* @Company: CSAIR* @Author: lixuanbin* @Creation: 2014年12月14日* @Version:1.0*/
public class ExchangerTest {protected static final Logger log = Logger.getLogger(ExchangerTest.class);private static volatile boolean isDone = false;static class ExchangerProducer implements Runnable {private Exchanger<Integer> exchanger;private static int data = 1;ExchangerProducer(Exchanger<Integer> exchanger) {this.exchanger = exchanger;}@Overridepublic void run() {while (!Thread.interrupted() && !isDone) {for (int i = 1; i <= 3; i++) {try {TimeUnit.SECONDS.sleep(1);data = i;System.out.println("producer before: " + data);data = exchanger.exchange(data);System.out.println("producer after: " + data);} catch (InterruptedException e) {log.error(e, e);}}isDone = true;}}}static class ExchangerConsumer implements Runnable {private Exchanger<Integer> exchanger;private static int data = 0;ExchangerConsumer(Exchanger<Integer> exchanger) {this.exchanger = exchanger;}@Overridepublic void run() {while (!Thread.interrupted() && !isDone) {data = 0;System.out.println("consumer before : " + data);try {TimeUnit.SECONDS.sleep(1);data = exchanger.exchange(data);} catch (InterruptedException e) {log.error(e, e);}System.out.println("consumer after : " + data);}}}/*** @param args*/public static void main(String[] args) {ExecutorService exec = Executors.newCachedThreadPool();Exchanger<Integer> exchanger = new Exchanger<Integer>();ExchangerProducer producer = new ExchangerProducer(exchanger);ExchangerConsumer consumer = new ExchangerConsumer(exchanger);exec.execute(producer);exec.execute(consumer);exec.shutdown();try {exec.awaitTermination(30, TimeUnit.SECONDS);} catch (InterruptedException e) {log.error(e, e);}}
}

这大致可以看作是一个简易的生产者消费者模型,有两个任务类,一个递增地产生整数,一个产生整数0,然后双方进行交易。每次交易前的生产者和每次交易后的消费者都会sleep 1秒来模拟数据处理的消耗,并在交易前后把整数值打印到控制台以便检测结果。在这个例子里交易循环只执行三次,采用一个volatile boolean来控制交易双方线程的退出。
   我们来看看程序的输出:

consumer before : 0
producer before: 1
consumer after : 1
producer after: 0
consumer before : 0
producer before: 2
producer after: 0
consumer after : 2
consumer before : 0
producer before: 3
producer after: 0
consumer after : 3

输出结果验证了以下两件事情:

  • exchange方法真的帮一对线程交换了数据;
  • exchange方法真的会阻塞调用方线程直至另一方线程参与交易。

那么在中断和超时两种情况下程序的运行表现会是怎样呢?作为一个小练习,有兴趣的观众可以设想并编写测试用例覆盖验证之。接下来谈谈最近我在生产场景中对Exchanger的应用。

三、实战场景
1.问题描述
   最近接到外部项目组向我组提出的接口需求,需要查询我们业务办理量的统计情况。我们系统目前的情况是,有一个日增长十多万、总数据量为千万级别的业务办理明细表(xxx_info),每人次的业务办理结果会实时写入其中。以往对外提供的业务统计接口是在每次被调用时候在明细表中执行SQL查询(select、count、where、group by等),响应时间很长,对原生产业务的使用也有很大的影响。于是我决定趁着这次新增接口的上线机会对系统进行优化。
2.优化思路
   首先是在明细表之外再建立一个数据统计(xxx_statistics)表,考虑到目前数据库的压力以及公司内部质管流控等因素,暂没有分库存放,仍旧与原明细表放在同一个库。再设置一个定时任务于每日凌晨对明细表进行查询、过滤、统计、排序等操作,把统计结果插入到统计表中。然后对外暴露统计接口查询统计报表。现在的设计与原来的实现相比,虽然牺牲了统计表所占用的少量额外的存储空间(每日新增的十来万条业务办理明细记录经过处理最终会变成几百条统计表的记录),但是却能把select、count这样耗时的数据统计操作放到凌晨时段执行以避开白天的业务办理高峰,分表处理能够大幅降低对生产业务明细表的性能影响,而对外提供的统计接口的查询速度也将得到几个数量级的提升。当然,还有一个缺点是,不能实时提供当天的统计数据,不过这也是双方可以接受的。
3.设计实现
   设计一个定时任务,每日凌晨执行。在定时任务中启动两个线程,一个线程负责对业务明细表(xxx_info)进行查询统计,把统计的结果放置在内存缓冲区,另一个线程负责读取缓冲区中的统计结果并插入到业务统计表(xxx_statistics)中。
   亲,这样的场景是不是听起来很有感觉?没错!两个线程在内存中批量交换数据,这个事情我们可以使用Exchanger去做!我们马上来看看代码如何实现。

生产者线程:

class ExchangerProducer implements Runnable {private Exchanger<Set<XXXStatistics>> exchanger;private Set<XXXStatistics> holder;private Date fltDate;private int threshold;ExchangerProducer(Exchanger<Set<XXXStatistics>> exchanger,Set<XXXStatistics> holder, Date fltDate, int threshold) {this.exchanger = exchanger;this.holder = holder;this.fltDate = fltDate;this.threshold = threshold;}@Overridepublic void run() {try {while (!Thread.interrupted() && !isDone) {List<XXXStatistics> temp1 = null;List<XXXStatistics> temp11 = null;for (int i = 0; i < allCities.size(); i++) {try {temp1 = xxxDao.findStatistics1(fltDate, allCities.get(i));temp11 = xxxDao.findStatistics2(fltDate, allCities.get(i),internationalList);if (temp1 != null && !temp1.isEmpty()) {calculationCounter.addAndGet(temp1.size());if (temp11 != null && !temp11.isEmpty()) {// merge two lists into temp1
                                mergeLists(temp1, temp11);temp11.clear();temp11 = null;}// merge temp1 into holder set
                            mergeListToSet(holder, temp1);temp1.clear();temp1 = null;}} catch (Exception e) {log.error(e, e);}// Insert every ${threshold} or the last into database.if (holder.size() >= threshold|| i == (allCities.size() - 1)) {log.info("data collected: \n" + holder);holder = exchanger.exchange(holder);log.info("data submitted");}}// all cities are calculatedisDone = true;}log.info("calculation job done, calculated: "+ calculationCounter.get());} catch (InterruptedException e) {log.error(e, e);}exchanger = null;holder.clear();holder = null;fltDate = null;}
}

代码说明:

  • threshold:缓冲区的容量阀值;
  • allCities:城市列表,迭代这个列表作为入参来执行查询统计;
  • XXXStatistics:统计数据封装实体类,实现了Serializable和Comparable接口,覆写equals和compareTo方法,以利用TreeSet提供的去重和排序处理;
  • isDone:volatile boolean,标识统计任务是否完成;
  • holder:TreeSet<XXXStatistics>,存放统计结果的内存缓冲区,容量达到阀值后提交给Exchanger执行exchange操作;
  • dao.findStatistics1,dao.findStatistics2:简化的数据库查询统计操作,此处仅供示意;
  • calculationCounter:AtomicInteger,标记生产端所提交的记录总数;
  • mergeLists,mergeListToSet:内部私有工具方法,把dao查询返回的列表合并到holder中;

消费者线程:

class ExchangerConsumer implements Runnable {private Exchanger<Set<XXXStatistics>> exchanger;private Set<XXXStatistics> holder;ExchangerConsumer(Exchanger<Set<XXXStatistics>> exchanger,Set<XXXStatistics> holder) {this.exchanger = exchanger;this.holder = holder;}@Overridepublic void run() {try {List<XXXStatistics> tempList;while (!Thread.interrupted() && !isDone) {holder = exchanger.exchange(holder);log.info("got data: \n" + holder);if (holder != null && !holder.isEmpty()) {try {// insert data into databasetempList = convertSetToList(holder);insertionCounter.addAndGet(xxxDao.batchInsertXXXStatistics(tempList));tempList.clear();tempList = null;} catch (Exception e) {log.error(e, e);}// clear the set
                    holder.clear();} else {log.info("wtf, got an empty list");}log.info("data processed");}log.info("insert job done, inserted: " + insertionCounter.get());} catch (InterruptedException e) {log.error(e, e);}exchanger = null;holder.clear();holder = null;}
}

代码说明:

  • convertSetToList:由于dao接口的限制,需把交换得到的Set转换为List;
  • batchInsertXXXStatistics:使用jdbc4的batch update而实现的批量插入dao接口;
  • insertionCounter:AtomicInteger,标记消费端插入成功的记录总数;

调度器代码:

public boolean calculateStatistics(Date fltDate) {// initializationcalculationCounter.set(0);insertionCounter.set(0);isDone = false;exec = Executors.newCachedThreadPool();Set<XXXStatistics> producerSet = new TreeSet<XXXStatistics>();Set<XXXStatistics> consumerSet = new TreeSet<XXXStatistics>();Exchanger<Set<XXXStatistics>> xc = new Exchanger<Set<XXXStatistics>>();ExchangerProducer producer = new ExchangerProducer(xc, producerSet,fltDate, threshold);ExchangerConsumer consumer = new ExchangerConsumer(xc, consumerSet);// execution
    exec.execute(producer);exec.execute(consumer);exec.shutdown();boolean isJobDone = false;try {// wait for terminationisJobDone = exec.awaitTermination(calculationTimeoutMinutes,TimeUnit.MINUTES);} catch (InterruptedException e) {log.error(e, e);}if (!isJobDone) {// force shutdown
        exec.shutdownNow();log.error("time elapsed for "+ calculationTimeoutMinutes+ " minutes, but still not finished yet, shut it down anyway.");}// clean upexec = null;producerSet.clear();producerSet = null;consumerSet.clear();consumerSet = null;xc = null;producer = null;consumer = null;System.gc();// return the resultif (isJobDone && calculationCounter.get() > 0&& calculationCounter.get() == insertionCounter.get()) {return true;}return false;
}

代码说明:
   调度器的代码就四个步骤:初始化、提交任务并等候处理结果、清理、返回。初始化阶段使用了jdk提供的线程池提交生产者和消费者任务,设置了最长等候时间calculationTimeoutMinutes,如果调度器线程被中断或者任务执行超时,awaitTermination会返回false,此时就强行关闭线程池并记录到日志。统计操作每日凌晨执行一次,所以在任务退出前的清理阶段建议jvm执行gc以尽早释放计算时所产生的垃圾对象。在结果返回阶段,如果查询统计出来的记录条数和插入成功的条数相等则返回true,否则返回false。

4.小结
   在这个案例中,使用Exchanger进行批量的双向数据交换可谓恰如其分:生产者在执行新的查询统计任务填入数据到缓冲区的同时,消费者正在批量插入生产者换入的上一次产生的数据,系统的吞吐量得到平滑的提升;计算复杂度、内存消耗、系统性能也能通过相关的参数设置而得到有效的控制(在消费端也可以对holder进行再次分割以控制每次批插入的大小,建议参阅数据库厂商以及数据库驱动包的说明文档以确定jdbc的最优batch update size);代码的实现也很简洁易懂。这些优点,是采用有界阻塞队列所难以达到的。
   程序的输出结果与业务紧密相关,就不打印出来了。可以肯定的是,经过了一段时间的摸索调优,内存消耗、执行速度和处理结果还是比较满意的。

原文地址:http://lixuanbin.iteye.com/blog/2166772

转载于:https://www.cnblogs.com/davidwang456/p/4179488.html

java.util.concurrent.Exchanger应用范例与原理浅析--转载相关推荐

  1. java.util.concurrent.Exchanger应用范例与原理浅析

    http://www.cnblogs.com/davidwang456/p/4179488.html Exchanger是自jdk1.5起开始提供的工具套件,一般用于两个工作线程之间交换数据.在本文中 ...

  2. 聊聊高并发(三十一)解析java.util.concurrent各个组件(十三) 理解Exchanger交换器

    这篇讲讲Exchanger交互器,它是一种比较特殊的两方(Two-Party)栅栏,可以理解成Exchanger是一个栅栏,两边一方是生产者,一方是消费者, 1. 生产者和消费者各自维护了一个容器,生 ...

  3. java多线程详解 六_java多线程学习-java.util.concurrent详解(六) Exchanger

    转载于:http://janeky.iteye.com/blog/769965 我们先来学习一下JDK1.5 API中关于这个类的详细介绍: "可以在pair中对元素进行配对和交换的线程的同 ...

  4. Java并发编程-并发工具包(java.util.concurrent)使用指南(全)

    1. java.util.concurrent - Java 并发工具包 Java 5 添加了一个新的包到 Java 平台,java.util.concurrent 包.这个包包含有一系列能够让 Ja ...

  5. Java并发编程-并发工具包java.util.concurrent使用指南

    译序 本指南根据 Jakob Jenkov 最新博客翻译,请随时关注博客更新 本指南已做成中英文对照阅读版的 pdf 文档,有兴趣的朋友可以去 Java并发工具包java.util.concurren ...

  6. java.util.concurrent 包下面的所有类

    java.util.concurrent 包下面的所有类 原子操作数类: java.util.concurrent.atomic.AtomicBoolean.class java.util.concu ...

  7. java多线程学习-java.util.concurrent详解

    http://janeky.iteye.com/category/124727 java多线程学习-java.util.concurrent详解(一) Latch/Barrier 博客分类: java ...

  8. [转载] 多线程详解java.util.concurrent

    参考链接: java.lang.Object的灵活性 一.多线程 1.操作系统有两个容易混淆的概念,进程和线程. 进程:一个计算机程序的运行实例,包含了需要执行的指令:有自己的独立地址空间,包含程序内 ...

  9. Java并发包-java.util.concurrent详解

    转载自https://blog.csdn.net/axi295309066/article/details/65665090 一.阻塞队列BlockingQueue BlockingQueue通常用于 ...

最新文章

  1. Java Scoket之java.io.EOFException解决方案
  2. UEBA 学术界研究现状——用户行为异常检测思路:序列挖掘prefixspan,HMM,LSTM/CNN,SVM异常检测,聚类CURE算法...
  3. linux下初次安装mysql使用指南
  4. html返回滚动按钮,如何通过滚动显示按钮返回TOP
  5. 华为BGP动态路由协议理论
  6. mysql 中文乱码 或 问号
  7. 使用Oracle验证外部数据
  8. 【养成好习惯】使用pipreqs导出本项目使用的环境
  9. Python灰度图像彩色化
  10. 搜索引擎html和css,CSS样式对搜索引擎排名的影响
  11. 配置邮件服务器sendman,java邮件收发功能实现代码.pdf
  12. 计算机桌面文件删除不掉是怎么了,如何解决电脑桌面文件无法删除问题
  13. Assembler - 数据段与代码段
  14. 开发人员必备的四象限壁纸
  15. Unity XLua Hotfix热更新配置笔记
  16. 04735数据库系统原理(笔记)(更新中)
  17. MySQL中按天、自然周、月、季度、年份统计
  18. Arduino学习笔记:基本直流电机驱动
  19. 小胖机器人宣传语_智能机器人推广宣传语
  20. matlab函数 bsxfun(高效代码)

热门文章

  1. linux c++ 程序运行时间,总结UNIX/LINUX下C++程序计时的方法
  2. Qt中的QMap和QHash
  3. oracle瘦连接,java-无法使用jdbc瘦驱动程序连接到oracle数据...
  4. binlog2mysql,MySQL 数据恢复工具之binlog2sql
  5. 网和aoe网的区别_欧哲门窗的金刚网和其他品牌的有什么区别?
  6. 图分区技术基本概念【1】
  7. 把mysql部署在局域网的服务器上,远程连接mysql时报错误代码1130 Host ‘***.***.***.***’is not allowed to connect to this MySQL
  8. yolov3-tf2 数据格式压缩
  9. oracle中app文件夹下,Oracle Form开发之folder(文件夹)功能开发(一)
  10. 重要性采样原理及实现