java Exchanger原理
一、简介
Exchanger是自jdk1.5起开始提供的工具套件,一般用于两个工作线程之间交换数据。在本文中我将采取由浅入深的方式来介绍分析这个工具类。首先我们来看看官方的api文档中的叙述:
A synchronization point at which threads can pair and swap elements within pairs. Each thread presents some object on entry to the exchange method, matches with a partner thread, and receives its partner's object on return. An Exchanger may be viewed as a bidirectional form of a SynchronousQueue. Exchangers may be useful in applications such as genetic algorithms and pipeline designs.
在以上的描述中,有几个要点:
- 此类提供对外的操作是同步的;
- 用于成对出现的线程之间交换数据;
- 可以视作双向的同步队列;
- 可应用于基因算法、流水线设计等场景。
接着看api文档,这个类提供对外的接口非常简洁,一个无参构造函数,两个重载的范型exchange方法:
public V exchange(V x) throws InterruptedException
public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
从官方的javadoc可以知道,当一个线程到达exchange调用点时,如果它的伙伴线程此前已经调用了此方法,那么它的伙伴会被调度唤醒并与之进行对象交换,然后各自返回。如果它的伙伴还没到达交换点,那么当前线程将会被挂起,直至伙伴线程到达——完成交换正常返回;或者当前线程被中断——抛出中断异常;又或者是等候超时——抛出超时异常。
二、原理及算法分析
换句话说Exchanger提供的是一个交换服务,允许原子性的交换两个(多个)对象,但同时只有一对才会成功。先看一个简单的实例模型。
在上面的模型中,我们假定一个空的栈(Stack),栈顶(Top)当然是没有元素的。同时我们假定一个数据结构Node,包含一个要交换的元素E和一个要填充的“洞”Node。这时线程T1携带节点node1进入栈(cas_push),当然这是CAS操作,这样栈顶就不为空了。线程T2携带节点node2进入栈,发现栈里面已经有元素了node1,同时发现node1的hold(Node)为空,于是将自己(node2)填充到node1的hold中(cas_fill)。然后将元素node1从栈中弹出(cas_take)。这样线程T1就得到了node1.hold.item也就是node2的元素e2,线程T2就得到了node1.item也就是e1,从而达到了交换的目的。
算法描述就是下图展示的内容。
JDK 5就是采用类似的思想实现的Exchanger。JDK 6以后为了支持多线程多对象同时Exchanger了就进行了改造(为了支持更好的并发),采用ConcurrentHashMap的思想,将Stack分割成很多的片段(或者说插槽Slot),线程Id(Thread.getId())hash相同的落在同一个Slot上,这样在默认32个Slot上就有很好的吞吐量。当然会根据机器CPU内核的数量有一定的优化,有兴趣的可以去了解下Exchanger的源码。
三、例子
至于Exchanger的使用,在JDK文档上有个例子,讲述的是两个线程交换数据缓冲区的例子(实际上仍然可以认为是生产者/消费者模型)。
class FillAndEmpty {Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>();DataBuffer initialEmptyBuffer = a made-up typeDataBuffer initialFullBuffer = class FillingLoop implements Runnable {public void run() {DataBuffer currentBuffer = initialEmptyBuffer;try {while (currentBuffer != null) {addToBuffer(currentBuffer);if (currentBuffer.isFull())currentBuffer = exchanger.exchange(currentBuffer);}} catch (InterruptedException ex) { handle }}}class EmptyingLoop implements Runnable {public void run() {DataBuffer currentBuffer = initialFullBuffer;try {while (currentBuffer != null) {takeFromBuffer(currentBuffer);if (currentBuffer.isEmpty())currentBuffer = exchanger.exchange(currentBuffer);}} catch (InterruptedException ex) { handle }}}void start() {new Thread(new FillingLoop()).start();new Thread(new EmptyingLoop()).start();}}
四、一个简单的例子
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger; /** * @Title: ExchangerTest * @Description: Test class for Exchanger * @Company: CSAIR * @Author: lixuanbin * @Creation: 2014年12月14日 * @Version:1.0 */
public class ExchangerTest { protected static final Logger log = Logger.getLogger(ExchangerTest.class); private static volatile boolean isDone = false; static class ExchangerProducer implements Runnable { private Exchanger<Integer> exchanger; private static int data = 1; ExchangerProducer(Exchanger<Integer> exchanger) { this.exchanger = exchanger; } @Override public void run() { while (!Thread.interrupted() && !isDone) { for (int i = 1; i <= 3; i++) { try { TimeUnit.SECONDS.sleep(1); data = i; System.out.println("producer before: " + data); data = exchanger.exchange(data); System.out.println("producer after: " + data); } catch (InterruptedException e) { log.error(e, e); } } isDone = true; } } } static class ExchangerConsumer implements Runnable { private Exchanger<Integer> exchanger; private static int data = 0; ExchangerConsumer(Exchanger<Integer> exchanger) { this.exchanger = exchanger; } @Override public void run() { while (!Thread.interrupted() && !isDone) { data = 0; System.out.println("consumer before : " + data); try { TimeUnit.SECONDS.sleep(1); data = exchanger.exchange(data); } catch (InterruptedException e) { log.error(e, e); } System.out.println("consumer after : " + data); } } } /** * @param args */ public static void main(String[] args) { ExecutorService exec = Executors.newCachedThreadPool(); Exchanger<Integer> exchanger = new Exchanger<Integer>(); ExchangerProducer producer = new ExchangerProducer(exchanger); ExchangerConsumer consumer = new ExchangerConsumer(exchanger); exec.execute(producer); exec.execute(consumer); exec.shutdown(); try { exec.awaitTermination(30, TimeUnit.SECONDS); } catch (InterruptedException e) { log.error(e, e); } }
}
这大致可以看作是一个简易的生产者消费者模型,有两个任务类,一个递增地产生整数,一个产生整数0,然后双方进行交易。每次交易前的生产者和每次交易后的消费者都会sleep 1秒来模拟数据处理的消耗,并在交易前后把整数值打印到控制台以便检测结果。在这个例子里交易循环只执行三次,采用一个volatile boolean来控制交易双方线程的退出。
我们来看看程序的输出:
consumer before : 0
producer before: 1
consumer after : 1
producer after: 0
consumer before : 0
producer before: 2
producer after: 0
consumer after : 2
consumer before : 0
producer before: 3
producer after: 0
consumer after : 3
输出结果验证了以下两件事情:
- exchange方法真的帮一对线程交换了数据;
- exchange方法真的会阻塞调用方线程直至另一方线程参与交易。
那么在中断和超时两种情况下程序的运行表现会是怎样呢?作为一个小练习,有兴趣的观众可以设想并编写测试用例覆盖验证之。接下来谈谈最近我在生产场景中对Exchanger的应用。
五、Exchanger的应用场景
Exchanger可以用于遗传算法,遗传算法里需要选出两个人作为交配对象,这时候会交换两人的数据,并使用交叉规则得出2个交配结果。
Exchanger也可以用于校对工作。比如我们需要将纸制银流通过人工的方式录入成电子银行流水,为了避免错误,采用AB岗两人进行录入,录入到Excel之后,系统需要加载这两个Excel,并对这两个Excel数据进行校对,看看是否录入的一致。代码如下:
public class ExchangerTest {private static final Exchanger<String> exgr = new Exchanger<String>();private static ExecutorService threadPool = Executors.newFixedThreadPool(2);public static void main(String[] args) {threadPool.execute(new Runnable() {@Overridepublic void run() {try {String A = "银行流水A";// A录入银行流水数据exgr.exchange(A);} catch (InterruptedException e) {}}});threadPool.execute(new Runnable() {@Overridepublic void run() {try {String B = "银行流水B";// B录入银行流水数据String A = exgr.exchange("B");System.out.println("A和B数据是否一致:" + A.equals(B) + ",A录入的是:"+ A + ",B录入是:" + B);} catch (InterruptedException e) {}}});threadPool.shutdown();}
}
转载自:
http://lixuanbin.iteye.com/blog/2166772
http://www.blogjava.net/xylz/archive/2010/11/22/338733.html
http://ifeve.com/concurrency-exchanger/
源码参考:
http://coderbee.net/index.php/concurrent/20140424/897
http://blog.csdn.net/luoyuyou/article/details/30257073
java Exchanger原理相关推荐
- java exchanger 原理_Exchanger 原理
Exchanger(交换者)是一个用于线程间协作的工具类.Exchanger用于进行线程间的数据交换.它提供一个同步点,在这个同步点两个线程可以交换彼此的数据.这两个线程通过exchange方法交换数 ...
- 《Java虚拟机原理图解》5. JVM类加载器机制与类加载过程
参考网址:http://blog.csdn.net/luanlouis/article/details/50529868 0.前言 读完本文,你将了解到: 一.为什么说Jabalpur语言是跨平台的 ...
- 【Java 虚拟机原理】Class 字节码二进制文件分析 七 ( 局部变量表分析 )
文章目录 前言 一.编译生成带局部变量表的字节码文件 二.局部变量表 前言 上一篇博客 [Java 虚拟机原理]Class 字节码二进制文件分析 二 ( 常量池位置 | 常量池结构 | tag | i ...
- 【Java 虚拟机原理】Class 字节码二进制文件分析 六 ( 属性类型 | Code 属性 | 属性名称索引 | 属性长度 | 操作数栈最大深度 | 局部变量存储空间 | 字节码长度 )
文章目录 前言 一.属性类型 二.Code 属性表数据结构 三.属性名称索引 四.属性长度 五.操作数栈最大深度 六.局部变量存储空间 七.字节码长度 八.存储字节码指令的一系列字节流 前言 上一篇博 ...
- 【Java 虚拟机原理】Class 字节码二进制文件分析 五 ( 方法计数器 | 方法表 | 访问标志 | 方法名称索引 | 方法返回值类型 | 方法属性数量 | 方法属性表 )
文章目录 前言 一.方法表结构 二.方法计数器 三.方法表数据解析 ( init 构造方法 ) 1.方法访问标志 2.方法名称索引 3.方法返回类型 4.方法属性数量 前言 上一篇博客 [Java 虚 ...
- 【Java 虚拟机原理】Class 字节码二进制文件分析 四 ( 字段表数据结构 | 字段表详细分析 | 访问标志 | 字段名称 | 字段描述符 | 属性项目 )
文章目录 前言 一.字段表总数据结构 二.访问标志 三.字段名称 四.字段描述符 五.属性项目数 前言 上一篇博客 [Java 虚拟机原理]Class 字节码二进制文件分析 三 ( 访问和修饰标志 | ...
- 【Java 虚拟机原理】Class 字节码二进制文件分析 三 ( 访问和修饰标志 | 类索引 | 父类索引 | 接口计数器 | 接口表 | 字段计数器 | 字段表 )
文章目录 前言 一.访问和修饰标志 二.类索引 三.父类索引 四.接口计数器 五.接口表 六.字段计数器 七.字段表 前言 上一篇博客 [Java 虚拟机原理]Class 字节码二进制文件分析 二 ( ...
- 【Java 虚拟机原理】Class 字节码二进制文件分析 二 ( 常量池位置 | 常量池结构 | tag | info[] | 完整分析字节码文件中的常量池二进制数据 )
文章目录 前言 一.常量池结构分析 1.常量池位置 2.常量池结构 3.常量池单个常量 4.常量池单个常量 tag 标签 二.常量池字节码文件分析 0.常量池附加信息 1.常量池 #1 常量分析 2. ...
- Java NIO原理 图文分析及代码实现
最近在分析hadoop的RPC(Remote Procedure Call Protocol ,远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议.可以参考: ...
最新文章
- 2021全国高校计算机能力挑战赛(初赛)Java试题三
- Sql Server实用操作-SQL语句导入导出大全
- Android开发-屏幕常亮的方法
- 第一章计算机基础知识作业答案,第一章 计算机基础知识.doc第一次作业
- 阿里云 快照恢复的操作过程
- (转)淘淘商城系列——使用JsonView来格式化json字符串
- 每天学一点Scala之Try
- CentOS6.0升级内核为6.2
- matlab 读取mdf文件路径,从 MDF 文件中读取数据
- 微信发朋友圈和朋友圈点赞测试用例
- HTML5实现动态视频背景
- 硬件工程师七夕鹊桥设计锦集
- Unity3D 射击游戏练习实例
- 安装部署WSUS服务器
- 中国式家长计算机科学家攻略,中国式家长特长图鉴一览表 Q版图表讲解各特长发展路线...
- CSDN第11期周赛
- script标签放在页面头部和尾部的区别
- hiredis中lua脚本调用
- TPM分析笔记(四)TPM-TSS协议栈
- 百度智能云 × 狮桥物流 | 主动安全驾驶技术加码,狮桥物流干线运输安全有保障...