这篇讲讲Exchanger交互器,它是一种比较特殊的两方(Two-Party)栅栏,可以理解成Exchanger是一个栅栏,两边一方是生产者,一方是消费者,

1. 生产者和消费者各自维护了一个容器,生产者往容器里生产东西,消费者从容器里消费东西。

2. 当生产者的容器是满的时候,它需要通过Exchanger向消费者交换,把满的容器交换给消费者,从消费者手里拿到空的容器继续生产。

3. 当消费者的容器是空的时候,它需要通过Exchanger向生产者交换,把空的容器交换给生产者,从生产者手里拿到满的容器继续消费。

所以我们看到这个过程中至少有5个组件

1. Exchanger栅栏

2. 生产者

3. 消费者

4. 生产者的容器

5. 消费者的容器

更复杂的情况是生产者有多个人在生产,消费者有多个人在消费,每个人都有自己的容器。这里有一个隐含的意思是生产者和消费者不挑容器,只要是空的或者满的都能用。Exchanger的匹配是根据Hash来的,所以可能出现不同的人生产者或消费者对应到同一个Hash值。

Exchanger使用了Slot槽来表示一个位置,生产者和消费者都可以被Hash到一个槽中。

 
  1. private static final class Slot extends AtomicReference<Object> {

  2. // Improve likelihood of isolation on <= 64 byte cache lines

  3. long q0, q1, q2, q3, q4, q5, q6, q7, q8, q9, qa, qb, qc, qd, qe;

  4. }

  5. /**

  6. * Slot array. Elements are lazily initialized when needed.

  7. * Declared volatile to enable double-checked lazy construction.

  8. */

  9. private volatile Slot[] arena = new Slot[CAPACITY];

创建了一个内部类Node来封装要交互者的线程和要交换的容器

 
  1. private static final class Node extends AtomicReference<Object> {

  2. /** The element offered by the Thread creating this node. */

  3. public final Object item;

  4. /** The Thread waiting to be signalled; null until waiting. */

  5. public volatile Thread waiter;

  6. /**

  7. * Creates node with given item and empty hole.

  8. * @param item the item

  9. */

  10. public Node(Object item) {

  11. this.item = item;

  12. }

  13. }

算法的主要部分就是交换的过程,下面简单说说交互的逻辑

1. 先根据当前线程的id计算出一个Hash值作为索引index

2. 然后轮询,如果index对应的Slot槽是null就生成一个,表示还没有人使用这个槽位

3. 如果对应的Slot已经有线程了,并且CAS设置它为null也成功了,表示生产者和消费者匹配上了,再通过CAS把自己的item设置给对方Node引用,然后把之前等待的一方唤醒,把对方Node里面的item返回给自己。这样相当于后来者拿到了之前等待者的item,并把后来者自己的item设置成了之前等待者的Node引用

当先来者被从自旋状态唤醒后,会从自己的Node引用中获取item,如果非空并且不是CANCEL,就证明有人跟它交换了,也拿到了对方的item返回了,否则就是超时取消了

4. 如果对应的Slot没有线程,说明它是先来的那个,如果是0号位置的Slot,就进行阻塞,如果是非0的Slot,就自旋,直到超时或取消

5. 如果一个进入在它自己选择的槽上CAS失败,它选择一个供替代的槽。如果一个线程成功CAS到一个槽但没有其他线程到达,它尝试其他,前往 0 号槽

 
  1. private Object doExchange(Object item, boolean timed, long nanos) {

  2. Node me = new Node(item); // Create in case occupying

  3. int index = hashIndex(); // Index of current slot

  4. int fails = 0; // Number of CAS failures

  5. for (;;) {

  6. Object y; // Contents of current slot

  7. Slot slot = arena[index];

  8. if (slot == null) // Lazily initialize slots

  9. createSlot(index); // Continue loop to reread

  10. else if ((y = slot.get()) != null && // Try to fulfill

  11. slot.compareAndSet(y, null)) {

  12. Node you = (Node)y; // Transfer item

  13. if (you.compareAndSet(null, item)) {

  14. LockSupport.unpark(you.waiter);

  15. return you.item;

  16. } // Else cancelled; continue

  17. }

  18. else if (y == null && // Try to occupy

  19. slot.compareAndSet(null, me)) {

  20. if (index == 0) // Blocking wait for slot 0

  21. return timed ?

  22. awaitNanos(me, slot, nanos) :

  23. await(me, slot);

  24. Object v = spinWait(me, slot); // Spin wait for non-0

  25. if (v != CANCEL)

  26. return v;

  27. me = new Node(item); // Throw away cancelled node

  28. int m = max.get();

  29. if (m > (index >>>= 1)) // Decrease index

  30. max.compareAndSet(m, m - 1); // Maybe shrink table

  31. }

  32. else if (++fails > 1) { // Allow 2 fails on 1st slot

  33. int m = max.get();

  34. if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1))

  35. index = m + 1; // Grow on 3rd failed slot

  36. else if (--index < 0)

  37. index = m; // Circularly traverse

  38. }

  39. }

  40. }

更多Exchanger算法的细节请参考这篇 http://coderbee.net/index.php/concurrent/20140424/897

下面用一个测试用例来测试Exchanger的功能。最简单的一个Exchanger的使用场景有5个组件

1个Exchanger, 1个生产者,1个生产者容器,1个消费者,1个消费者容器

当生产者把自己的容器生产满了,就在Exchanger栅栏处等待消费者拿空的容器和它交换

当消费者把自己的容器消费空了,就在Exchanger栅栏处等待生产者拿满的容器和它交换

 
  1. package com.lock.test;

  2. import java.util.concurrent.Exchanger;

  3. public class ExchangerUsecase {

  4. private static Exchanger<Buffer<Integer>> exchanger = new Exchanger<Buffer<Integer>>();

  5. private static Buffer<Integer> emptyBuffer = new Buffer<Integer>();

  6. private static Buffer<Integer> fullBuffer = new Buffer<Integer>();

  7. private static class Buffer<T>{

  8. private T[] cache = (T[])(new Object[2]);

  9. private int index = 0;

  10. public void add(T item){

  11. cache[index++] = item;

  12. }

  13. public T take(){

  14. return cache[--index];

  15. }

  16. public boolean isEmpty(){

  17. return index == 0;

  18. }

  19. public boolean isFull(){

  20. return index == cache.length;

  21. }

  22. }

  23. public static void main(String[] args){

  24. Runnable provider = new Runnable(){

  25. Buffer<Integer> currentBuffer = emptyBuffer;

  26. private int exchangeCount = 0;

  27. @Override

  28. public void run() {

  29. while(currentBuffer != null && exchangeCount <= 1){

  30. if(!currentBuffer.isFull()){

  31. System.out.println("Provider added one item");

  32. currentBuffer.add(1);

  33. }else{

  34. try {

  35. currentBuffer = exchanger.exchange(currentBuffer);

  36. exchangeCount ++;

  37. Thread.sleep(2000);

  38. } catch (InterruptedException e) {

  39. e.printStackTrace();

  40. }

  41. }

  42. }

  43. }

  44. };

  45. Runnable consumer = new Runnable(){

  46. Buffer<Integer> currentBuffer = fullBuffer;

  47. private int exchangeCount = 0;

  48. @Override

  49. public void run() {

  50. while(currentBuffer != null && exchangeCount <= 2){

  51. if(!currentBuffer.isEmpty()){

  52. System.out.println("Consumer took one item");

  53. currentBuffer.take();

  54. }else{

  55. try {

  56. currentBuffer = exchanger.exchange(currentBuffer);

  57. exchangeCount ++;

  58. } catch (InterruptedException e) {

  59. e.printStackTrace();

  60. }

  61. }

  62. }

  63. }

  64. };

  65. new Thread(provider).start();

  66. new Thread(consumer).start();

  67. }

  68. }

  69. private static Object spinWait(Node node, Slot slot) {

  70.         int spins = SPINS;

  71.         for (;;) {

  72.             Object v = node.get();

  73.             if (v != null)

  74.                 return v;

  75.             else if (spins > 0)

  76.                 --spins;

  77.             else

  78.                 tryCancel(node, slot);

  79.         }

  80.     } 

测试结果显示生产者先生成了两个,然后满了,就等待消费者和它交换。交换后消费者消费了两个,再次等待交换。生产者又生成满了一次,再次交换。如果不设置退出机制,双方会一直生产和消费下去,所以在测试用例中限制了交换两次

 
  1. Provider added one item

  2. Provider added one item

  3. Consumer took one item

  4. Consumer took one item

  5. Provider added one item

  6. Provider added one item

  7. Consumer took one item

  8. Consumer took one item

聊聊高并发(三十一)解析java.util.concurrent各个组件(十三) 理解Exchanger交换器相关推荐

  1. 聊聊高并发(二十一)解析java.util.concurrent各个组件(三) 深入理解AQS(一)

    AQS是AbstractQueuedSynchronizer的缩写,AQS是Java并包里大部分同步器的基础构件,利用AQS可以很方便的创建锁和同步器.它封装了一个状态,提供了一系列的获取和释放操作, ...

  2. 聊聊高并发(二十九)解析java.util.concurrent各个组件(十一) 再看看ReentrantReadWriteLock可重入读-写锁

    上一篇聊聊高并发(二十八)解析java.util.concurrent各个组件(十) 理解ReentrantReadWriteLock可重入读-写锁 讲了可重入读写锁的基本情况和主要的方法,显示了如何 ...

  3. 聊聊高并发(二十九)解析java.util.concurrent各个组件(十一) 再看看ReentrantReadWriteLock可重入读-写锁...

    上一篇聊聊高并发(二十八)解析java.util.concurrent各个组件(十) 理解ReentrantReadWriteLock可重入读-写锁 讲了可重入读写锁的基本情况和基本的方法,显示了怎样 ...

  4. 聊聊高并发(二十二)解析java.util.concurrent各个组件(四) 深入理解AQS(二)

    上一篇介绍了AQS的基本设计思路以及两个内部类Node和ConditionObject的实现 聊聊高并发(二十一)解析java.util.concurrent各个组件(三) 深入理解AQS(一) 这篇 ...

  5. 聊聊高并发(二十五)解析java.util.concurrent各个组件(七) 理解Semaphore

    前几篇分析了一下AQS的原理和实现,这篇拿Semaphore信号量做例子看看AQS实际是如何使用的. Semaphore表示了一种可以同时有多个线程进入临界区的同步器,它维护了一个状态表示可用的票据, ...

  6. 聊聊高并发(三十)解析java.util.concurrent各个组件(十二) 理解CyclicBarrier栅栏

    这篇讲讲CyclicBarrier栅栏,从它的名字可以看出,它是可循环使用的.它的功能和CountDownLatch类似,也是让一组线程等待,然后一起开始往下执行.但是两者还是有几个区别 1. 等待的 ...

  7. 聊聊高并发(二十)解析java.util.concurrent各个组件(二) 12个原子变量相关类

    这篇说说java.util.concurrent.atomic包里的类,总共12个,网上有很多文章解析这几个类,这里挑些重点说说. 这12个类可以分为三组: 1. 普通类型的原子变量 2. 数组类型的 ...

  8. 聊聊高并发(十七)解析java.util.concurrent各个组件(一) 了解sun.misc.Unsafe类

    了解了并发编程中锁的基本原理之后,接下来看看Java是如何利用这些原理来实现各种锁,原子变量,同步组件的.在开始分析java.util.concurrent的源代码直接,首先要了解的就是sun.mis ...

  9. 聊聊高并发(二十八)解析java.util.concurrent各个组件(十) 理解ReentrantReadWriteLock可重入读-写锁

    这篇讲讲ReentrantReadWriteLock可重入读写锁,它不仅是读写锁的实现,并且支持可重入性. 聊聊高并发(十五)实现一个简单的读-写锁(共享-排他锁) 这篇讲了如何模拟一个读写锁. 可重 ...

最新文章

  1. 对比MySQL表数据内容方式汇总
  2. 一系列图论问题[转]
  3. Python爬虫开发:贴吧案例
  4. 假定Csomething是一个类,执行下面这些语句后,内存里创建了几个Csomething对象
  5. 调用第三方接口的几种请求方式
  6. php golang 加密 对接,把php的加密算法转为go语言
  7. 一键安装zabbix percona mysql插件监控mysql
  8. php环境模拟stphp_用php模拟做服务端侦听端口
  9. faster-rcnn tensorflow windows demo运行
  10. ElasticSearch全文搜索引擎之查询API操作详解
  11. Catfishcms v4.8.54环境搭建
  12. python实现整数反转_python算法 整数反转
  13. android日历订阅,Android日历.
  14. 一清机、二清机、跳码,你知道这些POS机猫腻的原理吗?
  15. sql2000 sp3、sql2000 sp4升级补丁下载和安装须知
  16. pdf文件删除空白页技巧介绍
  17. Web字体应用修炼之道(上)
  18. 【Python语言基础】——Python 数字
  19. C++数据结构 交通咨询系统设计(一)
  20. 定点 浮点 神经网络 量化_神经网络模型量化论文小结

热门文章

  1. ie浏览器跳转谷歌浏览器_微软IE浏览器的命运:加速死亡
  2. 计算机网络王小茹,计算机网络(王小茹)3.pdf
  3. native react 折线图_react native中使用echarts
  4. Windows Server 笔记之远程桌面
  5. windows文件中的中文在ubuntu下乱码(小弟参考了许多都不行,这个绝对行啊) .
  6. mysql80连接不上本地服务器_Windows Server 2016 远程桌面本地连接不上
  7. 无线对讲调度服务器,无线对讲系统解决方案
  8. java mapreduce 标准差_MapReduce设计模式之概要设计模式
  9. range函数python3_Python3如何使用range函数替代xrange函数
  10. nginx虚拟目录支持PHP,nginx“虚拟目录”不支持php的解决方法