声明:本文是《 Java 7 Concurrency Cookbook 》的第三章, 作者: Javier Fernández González 译者:郑玉婷

在并发任务间交换数据

Java 并发 API 提供了一种允许2个并发任务间相互交换数据的同步应用。更具体的说,Exchanger 类允许在2个线程间定义同步点,当2个线程到达这个点,他们相互交换数据类型,使用第一个线程的数据类型变成第二个的,然后第二个线程的数据类型变成第一个的。

这个类在遇到类似生产者和消费者问题时,是非常有用的。来一个非常经典的并发问题:你有相同的数据buffer,一个或多个数据生产者,和一个或多个数据消费者。只是Exchange类只能同步2个线程,所以你只能在你的生产者和消费者问题中只有一个生产者和一个消费者时使用这个类。

在这个指南,你将学习如何使用 Exchanger 类来解决只有一个生产者和一个消费者的生产者和消费者问题。

准备

这个指南的例子使用Eclipse IDE实现。如果你使用Eclipse或其他IDE,如NetBeans,打开它并创建一个新的Java项目。

怎么做呢

按照这些步骤来实现下面的例子:

01 package tool;
02 import java.util.List;
03 import java.util.concurrent.Exchanger;
04  
05 //1. 首先,从实现producer开始吧。创建一个类名为Producer并一定实现 Runnable 接口。
06 public class Producer implements Runnable {
07  
08 // 2. 声明 List<String>对象,名为 buffer。这是等等要被相互交换的数据类型。
09 private List<String> buffer;
10  
11 // 3. 声明 Exchanger<List<String>>; 对象,名为exchanger。这个 exchanger 对象是用来同步producer和consumer的。
12 private final Exchanger<List<String>> exchanger;
13  
14 // 4. 实现类的构造函数,初始化这2个属性。
15 public Producer(List<String> buffer, Exchanger<List<String>> exchanger) {
16 this.buffer = buffer;
17 this.exchanger = exchanger;
18 }
19  
20 // 5. 实现 run() 方法. 在方法内,实现10次交换。
21 @Override
22 public void run() {
23 int cycle = 1;
24 for (int i = 0; i < 10; i++) {           System.out.printf("Producer: Cycle %d\n", cycle);
25  
26 // 6. 在每次循环中,加10个字符串到buffer。
27 for (int j = 0; j <10; j++) {
28 String message = "Event " + ((i * 10) + j);
29 System.out.printf("Producer: %s\n", message);
30 buffer.add(message);
31 }
32  
33 // 7. 调用 exchange() 方法来与consumer交换数据。此方法可能会抛出InterruptedException 异常, 加上处理代码。
34 try {
35 buffer = exchanger.exchange(buffer);
36 } catch (InterruptedException e) {
37 e.printStackTrace();
38 }
39 System.out.println("Producer: " + buffer.size());
40 cycle++;
41 }
42 }
43 }

01 //8. 现在, 来实现consumer。创建一个类名为Consumer并一定实现 Runnable 接口。
02 package tool;
03 import java.util.List;
04 import java.util.concurrent.Exchanger;
05 public class Consumer implements Runnable {
06  
07 // 9. 声明名为buffer的 List<String>对象。这个对象类型是用来相互交换的。
08 private List<String> buffer;
09  
10 // 10. 声明一个名为exchanger的 Exchanger<List<String>> 对象。用来同步 producer和consumer。
11 private final Exchanger<List<String>> exchanger;
12  
13 // 11. 实现类的构造函数,并初始化2个属性。
14 public Consumer(List<String>buffer, Exchanger<List<String>> exchanger) {
15 this.buffer = buffer;
16 this.exchanger = exchanger;
17 }
18  
19 // 12. 实现 run() 方法。在方法内,实现10次交换。
20 @Override
21 public void run() {
22 int cycle = 1;
23 for (int i = 0; i < 10; i++) {
24 System.out.printf("Consumer: Cycle %d\n", cycle);
25  
26 // 13. 在每次循环,首先调用exchange()方法来与producer同步。Consumer需要消耗数据。此方法可能会抛出InterruptedException异常, 加上处理代码。
27 try {
28 buffer = exchanger.exchange(buffer);
29 } catch (InterruptedException e) {              e.printStackTrace();
30 }
31  
32 // 14. 把producer发来的在buffer里的10字符串写到操控台并从buffer内删除,留空。System.out.println("Consumer: " + buffer.size());
33 for (int j = 0; j <10; j++) {
34 String message = buffer.get(0);
35 System.out.println("Consumer: " + message);
36 buffer.remove(0);
37 }
38 cycle++;
39 }

01 //15.现在,实现例子的主类通过创建一个类,名为Core并加入 main() 方法。
02 package tool;
03 import java.util.ArrayList;
04 mport java.util.List;
05 import java.util.concurrent.Exchanger;
06  
07 public class Core {
08 public static void main(String[] args) {
09  
10 // 16. 创建2个buffers。分别给producer和consumer使用.
11 List<String> buffer1 = new ArrayList<String>();
12 List<String> buffer2 = new ArrayList<String>();
13  
14 // 17. 创建Exchanger对象,用来同步producer和consumer。
15 Exchanger<List<String>> exchanger = new Exchanger<List<String>>();
16  
17 // 18. 创建Producer对象和Consumer对象。
18 Producer producer = new Producer(buffer1, exchanger);
19 Consumer consumer = new Consumer(buffer2, exchanger);
20  
21 // 19. 创建线程来执行producer和consumer并开始线程。
22 Thread threadProducer = new Thread(producer);
23 Thread threadConsumer = new Thread(consumer); threadProducer.start();
24 threadConsumer.start();
25 }

它是怎么工作的…

消费者开始时是空白的buffer,然后调用Exchanger来与生产者同步。因为它需要数据来消耗。生产者也是从空白的buffer开始,然后创建10个字符串,保存到buffer,并使用exchanger与消费者同步。

在这儿,2个线程(生产者和消费者线程)都是在Exchanger里并交换了数据类型,所以当消费者从exchange() 方法返回时,它有10个字符串在buffer内。当生产者从 exchange() 方法返回时,它有空白的buffer来重新写入。这样的操作会重复10遍。

如你执行例子,你会发现生产者和消费者是如何并发的执行任务和在每个步骤它们是如何交换buffers的。与其他同步工具一样会发生这种情况,第一个调用 exchange()方法会进入休眠直到其他线程的达到。

更多…

Exchanger 类有另外一个版本的exchange方法:

  • exchange(V data, long time, TimeUnit unit):V是声明Phaser的参数种类(例子里是 List)。 此线程会休眠直到另一个线程到达并中断它,或者特定的时间过去了。TimeUnit类有多种常量:DAYS, HOURS, MICROSECONDS, MILLISECONDS, MINUTES, NANOSECONDS, 和 SECONDS。

线程同步工具(七)在并发任务间交换数据相关推荐

  1. Java并发工具Exchanger线程间交换数据

    本文目录 Exchanger简介 Exchanger使用介绍 Exchanger简介 Exchanger是一个用于线程间协作的工具类.Exchanger用于进行线程间的数据交 换.它提供一个同步点,在 ...

  2. (三)线程同步工具集_1---控制线程并发访问一个资源

    2019独角兽企业重金招聘Python工程师标准>>> 线程同步工具集 在前面了解了线程的同步机制,临界区等,了解了线程的两种基本的同步机制: synchronized关键字: Lo ...

  3. 【并发技术16】线程同步工具Exchanger的使用

    如果两个线程在运行过程中需要交换彼此的信息,比如一个数据或者使用的空间,就需要用到 Exchanger 这个类,Exchanger 为线程交换信息提供了非常方便的途径,它可以作为两个线程交换对象的同步 ...

  4. java线程并发库之--线程同步工具Exchanger的使用

    Exchanger可以在两个线程之间交换数据,只能是2个线程,他不支持更多的线程之间互换数据.今天我们就通过实例来学习一下Exchanger的用法. Exchanger的简单实例 Exchanger是 ...

  5. java线程同步的实现_Java并发编程(三) - 实战:线程同步的实现

    synchronized关键字 首先,来看一个多线程竞争临界资源导致的同步不安全问题. package com.example.weishj.mytester.concurrency.sync; /* ...

  6. Java并发编程--6.Exchanger线程间交换数据

    在两个线程之间定义同步点,当两个线程都到达同步点时,他们交换数据结构,因此第一个线程的数据结构进入到第二个线程中,第二个线程的数据结构进入到第一个线程中 在生产者-消费者情境模式中它包含了一个数缓冲区 ...

  7. php阿里的同步工具canal,基于阿里的Canal实现数据同步

    一.开启同步数据库的binlog功能 (1)开启同步数据端的数据库服务(比如我的将一号虚拟机上的mysql数据库作为同步操作数据库) systemctl start mysql.service mys ...

  8. (三)线程同步工具集_2---控制并发访问资源的多个副本

    2019独角兽企业重金招聘Python工程师标准>>> 控制并发访问资源的多个副本 在前面的记录中使用binary semaphore实现了一个例子,这个例子是用来保护一个共享资源: ...

  9. java线程并发库之--线程同步工具CountDownLatch用法

    CountDownLatch,一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待. 主要方法 public CountDownLatch(int count); pu ...

最新文章

  1. python windows ui自动化_appium+python+windows UI自动化 四.简单使用Appium客户端
  2. linux命令大全 笔试,Linux基础及常用命令(笔试面试必备)
  3. Java Stack栈类详解
  4. aix oracle监听配置_Oracel:ORA-12518:监听程序无法分发客户机连接
  5. bzoj2285 [SDOI2011]保密 分数规划spfa+最小割
  6. common java socket,JAVA I/O(四)网络Socket和ServerSocket
  7. 个人认为,载人登陆火星技术上无法实现
  8. MS CRM 2011汇总更新5发布
  9. Teemo Attacking 提莫攻击
  10. 怎么查看无线路由器连接的设备连接服务器,路由器怎么看几个人连接
  11. ECCV2022细粒度图像检索SEMICON学习记录
  12. 中国省数据字典表sql
  13. 你都有哪些面试时被虐的经历?
  14. 学而优则仕:中国古代政治原生态(转自 百度 读书吧)
  15. 精简《JavaScript高级程序设计》五、引用类型(上)
  16. android数字滚动动画,数字滚动效果 RollingText
  17. python中的corr()方法
  18. 微软笔记本怎么装linux,微软正在为XO笔记本装Win/Linux双系统
  19. PT_随机变量离散型随机变量及其常见分布(二项分布/Possion分布)
  20. 21北京交通大学\北交软件专硕复试经验分享

热门文章

  1. SQL 表之间的更新
  2. 中级.NET开发人员
  3. 用 Java 实现断点续传 (HTTP)
  4. 神经网络与机器学习 笔记—小规模和大规模学习问题
  5. C语言经典例79-字符串排序
  6. 【错误记录】Flutter 混合开发报错 ( Android 端与 Flutter 端 EventChannel 初始化顺序错误导致无法通信 | EventChannel 通信流程 )
  7. 【Flutter】Image 组件 ( Image 组件简介 | Image 构造函数 | Image.network 构造函数 | Image.asset 构造函数 )
  8. 【Android 内存优化】垃圾回收算法 ( 内存优化总结 | 常见的内存泄漏场景 | GC 算法 | 标记清除算法 | 复制算法 | 标记压缩算法 )
  9. 同步、异步、阻塞、非阻塞
  10. 通过 python-xmp-toolkit 读取图片xmlp信息