一、简介

  Exchanger是自jdk1.5起开始提供的工具套件,一般用于两个工作线程之间交换数据。在本文中我将采取由浅入深的方式来介绍分析这个工具类。首先我们来看看官方的api文档中的叙述:

A synchronization point at which threads can pair and swap elements within pairs. Each thread presents some object on entry to the exchange method, matches with a partner thread, and receives its partner's object on return. An Exchanger may be viewed as a bidirectional form of a SynchronousQueue. Exchangers may be useful in applications such as genetic algorithms and pipeline designs.

  在以上的描述中,有几个要点:

  • 此类提供对外的操作是同步的;
  • 用于成对出现的线程之间交换数据;
  • 可以视作双向的同步队列;
  • 可应用于基因算法、流水线设计等场景。

  接着看api文档,这个类提供对外的接口非常简洁,一个无参构造函数,两个重载的范型exchange方法:

public V exchange(V x) throws InterruptedException
public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException

  从官方的javadoc可以知道,当一个线程到达exchange调用点时,如果它的伙伴线程此前已经调用了此方法,那么它的伙伴会被调度唤醒并与之进行对象交换,然后各自返回。如果它的伙伴还没到达交换点,那么当前线程将会被挂起,直至伙伴线程到达——完成交换正常返回;或者当前线程被中断——抛出中断异常;又或者是等候超时——抛出超时异常。

二、原理及算法分析

  换句话说Exchanger提供的是一个交换服务,允许原子性的交换两个(多个)对象,但同时只有一对才会成功。先看一个简单的实例模型。

  在上面的模型中,我们假定一个空的栈(Stack),栈顶(Top)当然是没有元素的。同时我们假定一个数据结构Node,包含一个要交换的元素E和一个要填充的“洞”Node。这时线程T1携带节点node1进入栈(cas_push),当然这是CAS操作,这样栈顶就不为空了。线程T2携带节点node2进入栈,发现栈里面已经有元素了node1,同时发现node1的hold(Node)为空,于是将自己(node2)填充到node1的hold中(cas_fill)。然后将元素node1从栈中弹出(cas_take)。这样线程T1就得到了node1.hold.item也就是node2的元素e2,线程T2就得到了node1.item也就是e1,从而达到了交换的目的。

  算法描述就是下图展示的内容。

  JDK 5就是采用类似的思想实现的Exchanger。JDK 6以后为了支持多线程多对象同时Exchanger了就进行了改造(为了支持更好的并发),采用ConcurrentHashMap的思想,将Stack分割成很多的片段(或者说插槽Slot),线程Id(Thread.getId())hash相同的落在同一个Slot上,这样在默认32个Slot上就有很好的吞吐量。当然会根据机器CPU内核的数量有一定的优化,有兴趣的可以去了解下Exchanger的源码。

三、例子

  至于Exchanger的使用,在JDK文档上有个例子,讲述的是两个线程交换数据缓冲区的例子(实际上仍然可以认为是生产者/消费者模型)。

class FillAndEmpty {Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>();DataBuffer initialEmptyBuffer =  a made-up typeDataBuffer initialFullBuffer = class FillingLoop implements Runnable {public void run() {DataBuffer currentBuffer = initialEmptyBuffer;try {while (currentBuffer != null) {addToBuffer(currentBuffer);if (currentBuffer.isFull())currentBuffer = exchanger.exchange(currentBuffer);}} catch (InterruptedException ex) {  handle  }}}class EmptyingLoop implements Runnable {public void run() {DataBuffer currentBuffer = initialFullBuffer;try {while (currentBuffer != null) {takeFromBuffer(currentBuffer);if (currentBuffer.isEmpty())currentBuffer = exchanger.exchange(currentBuffer);}} catch (InterruptedException ex) {  handle }}}void start() {new Thread(new FillingLoop()).start();new Thread(new EmptyingLoop()).start();}}

四、一个简单的例子

import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;  /** * @Title: ExchangerTest * @Description: Test class for Exchanger * @Company: CSAIR * @Author: lixuanbin * @Creation: 2014年12月14日 * @Version:1.0 */
public class ExchangerTest {  protected static final Logger log = Logger.getLogger(ExchangerTest.class);  private static volatile boolean isDone = false;  static class ExchangerProducer implements Runnable {  private Exchanger<Integer> exchanger;  private static int data = 1;  ExchangerProducer(Exchanger<Integer> exchanger) {  this.exchanger = exchanger;  }  @Override  public void run() {  while (!Thread.interrupted() && !isDone) {  for (int i = 1; i <= 3; i++) {  try {  TimeUnit.SECONDS.sleep(1);  data = i;  System.out.println("producer before: " + data);  data = exchanger.exchange(data);  System.out.println("producer after: " + data);  } catch (InterruptedException e) {  log.error(e, e);  }  }  isDone = true;  }  }  }  static class ExchangerConsumer implements Runnable {  private Exchanger<Integer> exchanger;  private static int data = 0;  ExchangerConsumer(Exchanger<Integer> exchanger) {  this.exchanger = exchanger;  }  @Override  public void run() {  while (!Thread.interrupted() && !isDone) {  data = 0;  System.out.println("consumer before : " + data);  try {  TimeUnit.SECONDS.sleep(1);  data = exchanger.exchange(data);  } catch (InterruptedException e) {  log.error(e, e);  }  System.out.println("consumer after : " + data);  }  }  }  /** * @param args */  public static void main(String[] args) {  ExecutorService exec = Executors.newCachedThreadPool();  Exchanger<Integer> exchanger = new Exchanger<Integer>();  ExchangerProducer producer = new ExchangerProducer(exchanger);  ExchangerConsumer consumer = new ExchangerConsumer(exchanger);  exec.execute(producer);  exec.execute(consumer);  exec.shutdown();  try {  exec.awaitTermination(30, TimeUnit.SECONDS);  } catch (InterruptedException e) {  log.error(e, e);  }  }
} 

  这大致可以看作是一个简易的生产者消费者模型,有两个任务类,一个递增地产生整数,一个产生整数0,然后双方进行交易。每次交易前的生产者和每次交易后的消费者都会sleep 1秒来模拟数据处理的消耗,并在交易前后把整数值打印到控制台以便检测结果。在这个例子里交易循环只执行三次,采用一个volatile boolean来控制交易双方线程的退出。
  我们来看看程序的输出:

consumer before : 0
producer before: 1
consumer after : 1
producer after: 0
consumer before : 0
producer before: 2
producer after: 0
consumer after : 2
consumer before : 0
producer before: 3
producer after: 0
consumer after : 3

  输出结果验证了以下两件事情:

  • exchange方法真的帮一对线程交换了数据;
  • exchange方法真的会阻塞调用方线程直至另一方线程参与交易。

  那么在中断和超时两种情况下程序的运行表现会是怎样呢?作为一个小练习,有兴趣的观众可以设想并编写测试用例覆盖验证之。接下来谈谈最近我在生产场景中对Exchanger的应用。

五、Exchanger的应用场景

  Exchanger可以用于遗传算法,遗传算法里需要选出两个人作为交配对象,这时候会交换两人的数据,并使用交叉规则得出2个交配结果。
  Exchanger也可以用于校对工作。比如我们需要将纸制银流通过人工的方式录入成电子银行流水,为了避免错误,采用AB岗两人进行录入,录入到Excel之后,系统需要加载这两个Excel,并对这两个Excel数据进行校对,看看是否录入的一致。代码如下:

public class ExchangerTest {private static final Exchanger<String> exgr = new Exchanger<String>();private static ExecutorService threadPool = Executors.newFixedThreadPool(2);public static void main(String[] args) {threadPool.execute(new Runnable() {@Overridepublic void run() {try {String A = "银行流水A";// A录入银行流水数据exgr.exchange(A);} catch (InterruptedException e) {}}});threadPool.execute(new Runnable() {@Overridepublic void run() {try {String B = "银行流水B";// B录入银行流水数据String A = exgr.exchange("B");System.out.println("A和B数据是否一致:" + A.equals(B) + ",A录入的是:"+ A + ",B录入是:" + B);} catch (InterruptedException e) {}}});threadPool.shutdown();}
}

转载自:
http://lixuanbin.iteye.com/blog/2166772
http://www.blogjava.net/xylz/archive/2010/11/22/338733.html
http://ifeve.com/concurrency-exchanger/

源码参考:
http://coderbee.net/index.php/concurrent/20140424/897
http://blog.csdn.net/luoyuyou/article/details/30257073

java Exchanger原理相关推荐

  1. java exchanger 原理_Exchanger 原理

    Exchanger(交换者)是一个用于线程间协作的工具类.Exchanger用于进行线程间的数据交换.它提供一个同步点,在这个同步点两个线程可以交换彼此的数据.这两个线程通过exchange方法交换数 ...

  2. 《Java虚拟机原理图解》5. JVM类加载器机制与类加载过程

    参考网址:http://blog.csdn.net/luanlouis/article/details/50529868 0.前言 读完本文,你将了解到: 一.为什么说Jabalpur语言是跨平台的 ...

  3. 【Java 虚拟机原理】Class 字节码二进制文件分析 七 ( 局部变量表分析 )

    文章目录 前言 一.编译生成带局部变量表的字节码文件 二.局部变量表 前言 上一篇博客 [Java 虚拟机原理]Class 字节码二进制文件分析 二 ( 常量池位置 | 常量池结构 | tag | i ...

  4. 【Java 虚拟机原理】Class 字节码二进制文件分析 六 ( 属性类型 | Code 属性 | 属性名称索引 | 属性长度 | 操作数栈最大深度 | 局部变量存储空间 | 字节码长度 )

    文章目录 前言 一.属性类型 二.Code 属性表数据结构 三.属性名称索引 四.属性长度 五.操作数栈最大深度 六.局部变量存储空间 七.字节码长度 八.存储字节码指令的一系列字节流 前言 上一篇博 ...

  5. 【Java 虚拟机原理】Class 字节码二进制文件分析 五 ( 方法计数器 | 方法表 | 访问标志 | 方法名称索引 | 方法返回值类型 | 方法属性数量 | 方法属性表 )

    文章目录 前言 一.方法表结构 二.方法计数器 三.方法表数据解析 ( init 构造方法 ) 1.方法访问标志 2.方法名称索引 3.方法返回类型 4.方法属性数量 前言 上一篇博客 [Java 虚 ...

  6. 【Java 虚拟机原理】Class 字节码二进制文件分析 四 ( 字段表数据结构 | 字段表详细分析 | 访问标志 | 字段名称 | 字段描述符 | 属性项目 )

    文章目录 前言 一.字段表总数据结构 二.访问标志 三.字段名称 四.字段描述符 五.属性项目数 前言 上一篇博客 [Java 虚拟机原理]Class 字节码二进制文件分析 三 ( 访问和修饰标志 | ...

  7. 【Java 虚拟机原理】Class 字节码二进制文件分析 三 ( 访问和修饰标志 | 类索引 | 父类索引 | 接口计数器 | 接口表 | 字段计数器 | 字段表 )

    文章目录 前言 一.访问和修饰标志 二.类索引 三.父类索引 四.接口计数器 五.接口表 六.字段计数器 七.字段表 前言 上一篇博客 [Java 虚拟机原理]Class 字节码二进制文件分析 二 ( ...

  8. 【Java 虚拟机原理】Class 字节码二进制文件分析 二 ( 常量池位置 | 常量池结构 | tag | info[] | 完整分析字节码文件中的常量池二进制数据 )

    文章目录 前言 一.常量池结构分析 1.常量池位置 2.常量池结构 3.常量池单个常量 4.常量池单个常量 tag 标签 二.常量池字节码文件分析 0.常量池附加信息 1.常量池 #1 常量分析 2. ...

  9. Java NIO原理 图文分析及代码实现

    最近在分析hadoop的RPC(Remote Procedure Call Protocol ,远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议.可以参考: ...

最新文章

  1. 2021全国高校计算机能力挑战赛(初赛)Java试题三
  2. Sql Server实用操作-SQL语句导入导出大全
  3. Android开发-屏幕常亮的方法
  4. 第一章计算机基础知识作业答案,第一章 计算机基础知识.doc第一次作业
  5. 阿里云 快照恢复的操作过程
  6. (转)淘淘商城系列——使用JsonView来格式化json字符串
  7. 每天学一点Scala之Try
  8. CentOS6.0升级内核为6.2
  9. matlab 读取mdf文件路径,从 MDF 文件中读取数据
  10. 微信发朋友圈和朋友圈点赞测试用例
  11. HTML5实现动态视频背景
  12. 硬件工程师七夕鹊桥设计锦集
  13. Unity3D 射击游戏练习实例
  14. 安装部署WSUS服务器
  15. 中国式家长计算机科学家攻略,中国式家长特长图鉴一览表 Q版图表讲解各特长发展路线...
  16. CSDN第11期周赛
  17. script标签放在页面头部和尾部的区别
  18. hiredis中lua脚本调用
  19. TPM分析笔记(四)TPM-TSS协议栈
  20. 百度智能云 × 狮桥物流 | 主动安全驾驶技术加码,狮桥物流干线运输安全有保障...

热门文章

  1. AudioTrack 分析
  2. Java1Java2
  3. Retrofit 使用详解
  4. js中call()方法的用法
  5. 计算机视觉博士去向,为什么现在不看好 CV 方向了呢?
  6. 论文推荐 | 综述:自动驾驶背景下的交通流模型研究
  7. winhex快捷键使用
  8. jq基础文档手册3.0
  9. 基于Java的网络编程实践
  10. Ubuntu 添加root用户