Java中线程安全的容器主要包括两类:

  • VectorHashtable,以及封装器类Collections.synchronizedListCollections.synchronizedMap
  • Java 5.0引入的java.util.concurrent包,其中包含并发队列、并发HashMap以及写入时复制容器。

依笔者看,早期使用的同步容器主要有两方面的问题:1)通过对方法添加synchronized关键字实现同步,这种粗粒度的加锁操作在synchronized关键字本身未充分优化之前,效率偏低;2)同步容器虽然是线程安全的,但在某些外部复合操作(例:若没有则添加)时,依然需要客户端加锁保证数据安全。因此,从Java 5.0以后,并发编程偏向于使用java.util.concurrent包(作者:Doug Lea)中的容器类,本文也将着重介绍该包中的容器类,主要包括:

  1. 阻塞队列
  2. ConcurrentHashMap
  3. 写入时复制容器

一、阻塞队列

在并发环境下,阻塞队列是常用的数据结构,它能确保数据高效安全的传输,为快速搭建高质量的多线程应用带来极大的便利,比如MQ的原理就是基于阻塞队列的。java.util.concurrent中包含丰富的队列实现,它们之间的关系如下图所示:

  • BlockingQueue、Deque(双向队列)继承自Queue接口;
  • BlockingDeque同时继承自BlockingQueue、Deque接口,提供阻塞的双向队列属性;
  • LinkedBlockingQueue和LinkedBlockingDeque分别实现了BlockingQueue和BlockingDeque接口;
  • DelayQueue实现了BlockingQueue接口,提供任务延迟功能;
  • TransferQueue是Java 7引入的,用于替代BlockingQueue,LinkedTransferQueue是其实现类。

下面对这些队列进行详细的介绍:

1.1 BlockingQueue与BlockingDeque

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:

  • 在队列为空时,获取元素的线程会等待队列变为非空。
  • 当队列满时,存储元素的线程会等待队列可用。

阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

阻塞队列提供了四种处理方法:

方法 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
检查方法 element() peek() 不可用 不可用
  • 抛出异常:是指当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException("Queue full")异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementException异常 。
  • 返回特殊值:插入方法会返回是否成功,成功则返回true。移除方法,则是从队列里拿出一个元素,如果没有则返回null
  • 一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,消费者线程试图从队列里take元素,队列也会阻塞消费者线程,直到队列可用。
  • 超时退出:当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。

BlockingDeque在BlockingQueue的基础上,增加了支持双向队列的属性。如下图所示,相比于BlockingQueue的插入和移除方法,变为XxxFirstXxxLast方法,分别对应队列的两端,既可以在头部添加或移除,也可以在尾部添加或移除。

1.2 LinkedBlockingQueue与LinkedBlockingDeque

LinkedBlockingQueue是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为Integer.MAX_VALUE,按照先进先出的原则对元素进行排序。

首先看下LinkedBlockingQueue中核心的域:

static class Node<E> {E item;Node<E> next;Node(E x) { item = x; }
}private final int capacity;
private final AtomicInteger count = new AtomicInteger();transient Node<E> head;
private transient Node<E> last;private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
  • LinkedBlockingQueueLinkedList类似,通过静态内部类Node<E>进行元素的存储;
  • capacity表示阻塞队列所能存储的最大容量,在创建时可以手动指定最大容量,默认的最大容量为Integer.MAX_VALUE
  • count表示当前队列中的元素数量,LinkedBlockingQueue的入队列和出队列使用了两个不同的lock对象,因此无论是在入队列还是出队列,都会涉及对元素数量的并发修改,因此这里使用了一个原子操作类来解决对同一个变量进行并发修改的线程安全问题。
  • headlast分别表示链表的头部和尾部;
  • takeLock表示元素出队列时线程所获取的锁,当执行takepoll等操作时线程获取;notEmpty当队列为空时,通过该Condition让获取元素的线程处于等待状态;
  • putLock表示元素入队列时线程所获取的锁,当执行putoffer等操作时获取;notFull当队列容量达到capacity时,通过该Condition让加入元素的线程处于等待状态。

其次,LinkedBlockingQueue有三个构造方法,分别如下:

public LinkedBlockingQueue() {this(Integer.MAX_VALUE);
}public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;last = head = new Node<E>(null);
}public LinkedBlockingQueue(Collection<? extends E> c) {this(Integer.MAX_VALUE);final ReentrantLock putLock = this.putLock;putLock.lock(); // Never contended, but necessary for visibilitytry {int n = 0;for (E e : c) {if (e == null)throw new NullPointerException();if (n == capacity)throw new IllegalStateException("Queue full");enqueue(new Node<E>(e));++n;}count.set(n);} finally {putLock.unlock();}
}

默认构造函数直接调用LinkedBlockingQueue(int capacity)LinkedBlockingQueue(int capacity)会初始化首尾节点,并置位null。LinkedBlockingQueue(Collection<? extends E> c)在初始化队列的同时,将一个集合的全部元素加入队列。

最后分析下puttake的过程,这里重点关注:LinkedBlockingQueue如何实现添加/移除并行的?

public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();int c = -1;Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly();try {while (count.get() == capacity) {notFull.await();}enqueue(node);c = count.getAndIncrement();if (c + 1 < capacity)notFull.signal();} finally {putLock.unlock();}if (c == 0)signalNotEmpty();
}
public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {while (count.get() == 0) {notEmpty.await();}x = dequeue();c = count.getAndDecrement();if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}if (c == capacity)signalNotFull();return x;
}

之所以把puttake放在一起,是因为它们是一对互逆的过程:

  • put在插入元素前首先获得putLock和当前队列的元素数量,take在去除元素前首先获得takeLock和当前队列的元素数量;
  • put时需要判断当前队列是否已满,已满时当前线程进行等待,take时需要判断队列是否已空,队列为空时当前线程进行等待;
  • put调用enqueue在队尾插入元素,并修改尾指针,take调用dequeuehead指向原来first的位置,并将first的数据域置位null,实现删除原first指针,并产生新的head,同时,切断原head节点的引用,便于垃圾回收。
private void enqueue(Node<E> node) {last = last.next = node;
}
private E dequeue() {Node<E> h = head;Node<E> first = h.next;h.next = h; // help GChead = first;E x = first.item;first.item = null;return x;
}
  • 最后,put根据count决定是否触发队列未满和队列空;take根据count决定是否触发队列未空和队列满。

回到刚才的问题:LinkedBlockingQueue如何实现添加/移除并行的?
LinkedBlockingQueue在入队列和出队列时使用的是不同的Lock,这也意味着它们之间的操作不会存在互斥。在多个CPU的情况下,可以做到在同一时刻既消费、又生产,做到并行处理

同样的,LinkedBlockingDequeLinkedBlockingQueue的基础上,增加了双向操作的属性。继续以puttake为例,LinkedBlockingDeque增加了putFirst/putLasttakeFirst/takeLast方法,分别用于在队列头、尾进行添加和删除。与LinkedBlockingQueue不同的是,LinkedBlockingDeque的入队列和出队列不再使用不同的Lock。

final ReentrantLock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();

其中,lock表示读写的主锁,notEmpty和notFull依然表示相应的控制线程状态条件量。以putFirsttakeFirst为例:

public void putFirst(E e) throws InterruptedException {if (e == null) throw new NullPointerException();Node<E> node = new Node<E>(e);final ReentrantLock lock = this.lock;lock.lock();try {while (!linkFirst(node))notFull.await();} finally {lock.unlock();}
}

putFirst不支持插入null元素,首先新建一个Node对象,然后调用ReentrantLocklock方法获取锁,插入操作通过boolean linkFirst(Node<E> node)实现,如果当前队列头已满,那么该线程等待(linkFirst方法在写入元素成功后会释放该锁信号),最后,在finally块中释放锁(ReentrantLock的使用)。

public E takeFirst() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lock();try {E x;while ( (x = unlinkFirst()) == null)notEmpty.await();return x;} finally {lock.unlock();}
}

putFirst类似,takeFirst首先获取锁,然后在try中解除尾元素对象的引用,如果unlinkFirst为空,表示队列为空,没有元素可删,那么该线程等待。同样,最后在finally块中释放锁。

那么问题来了,LinkedBlockingDeque为什么不使用LinkedBlockingQueue读写锁分离的方式呢?LinkedBlockingDequeLinkedBlockingQueue的使用场景有什么区别呢?

1.3 DelayQueue

DelayQueue主要用于实现延时任务,比如:等待一段时间之后关闭连接,缓存对象过期删除,任务超时处理等等,这些任务的共同特点是等待一段时间之后执行(类似于TimerTask)。DelayQueue的实现包括三个核心特征:

  • 延时任务:DelayQueue的泛型类需要继承自Delayed接口,而Delayed接口继承自Comparable<Delayed>,用于队列中优先排序的比较;
  • 优先队列:DelayQueue的实现采用了优先队列PriorityQueue,即延迟时间越短的任务越优先(回忆下优先队列中二叉堆的实现)。
  • 阻塞队列:支持并发读写,采用ReentrantLock来实现读写的锁操作。

因此,DelayQueue = Delayed + PriorityQueue + BlockingQueue

public interface Delayed extends Comparable<Delayed> {long getDelay(TimeUnit unit);
}
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {private final transient ReentrantLock lock = new ReentrantLock();private final PriorityQueue<E> q = new PriorityQueue<E>();private Thread leader = null;private final Condition available = lock.newCondition();
}

接下来看下DelayQueue的读写操作如何实现延时任务的?

public boolean offer(E e) {final ReentrantLock lock = this.lock;lock.lock();try {q.offer(e);if (q.peek() == e) {leader = null;available.signal();}return true;} finally {lock.unlock();}
}

首先执行加锁操作,然后往优先队列中插入元素e,优先队列会调用泛型E的compareTo方法进行比较(具体关于二叉堆的操作,这里不再赘述,请参考数据结构部分相关分析),将延迟时间最短的任务添加到队头。最后检查下元素是否为队头,如果是队头的话,设置leader为空,唤醒所有等待的队列,释放锁。

public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {E first = q.peek();if (first == null)available.await();else {long delay = first.getDelay(NANOSECONDS);if (delay <= 0)return q.poll();first = null; // don't retain ref while waitingif (leader != null)available.await();else {Thread thisThread = Thread.currentThread();leader = thisThread;try {available.awaitNanos(delay);} finally {if (leader == thisThread)leader = null;}}}}} finally {if (leader == null && q.peek() != null)available.signal();lock.unlock();}
}
  • 首先执行加锁操作,然后取出优先队列的队头,如果对头为空,则该线程阻塞;
  • 获得对头元素的延迟时间,如果延迟时间小于等于0,说明该元素已经到了可以使用的时间,调用poll方法弹出该元素;
  • 在延迟时间大于0时,首先释放元素first的引用(避免内存泄露),其次判断如果leader线程不为空,则该线程阻塞(表示已有线程在等待)。否则,把当前线程赋值给leader元素,然后阻塞delay的时间,即等待队头到达延迟时间,在finally块中释放leader元素的引用。循环后,取出对头元素,退出for循环。
  • 最后,如果leader为空并且优先级队列不为空的情况下(判断还有没有其他后续节点),调用signal通知其他的线程,并执行解锁操作。

1.4 TransferQueue与LinkedTransferQueue

TransferQueue是一个继承了BlockingQueue的接口,并且增加若干新的方法。LinkedTransferQueueTransferQueue接口的实现类,其定义为一个无界的队列,具有先进先出(FIFO)的特性。

TransferQueue接口主要包含以下方法:

public interface TransferQueue<E> extends BlockingQueue<E> {boolean tryTransfer(E e);void transfer(E e) throws InterruptedException;boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException;boolean hasWaitingConsumer();int getWaitingConsumerCount();
}
  • transfer(E e):若当前存在一个正在等待获取的消费者线程,即立刻移交之;否则,会插入当前元素e到队列尾部,并且等待进入阻塞状态,到有消费者线程取走该元素。
  • tryTransfer(E e):若当前存在一个正在等待获取的消费者线程(使用take()或者poll()函数),使用该方法会即刻转移/传输对象元素e;若不存在,则返回false,并且不进入队列。这是一个不阻塞的操作。
  • tryTransfer(E e, long timeout, TimeUnit unit):若当前存在一个正在等待获取的消费者线程,会立即传输给它;否则将插入元素e到队列尾部,并且等待被消费者线程获取消费掉;若在指定的时间内元素e无法被消费者线程获取,则返回false,同时该元素被移除。
  • hasWaitingConsumer():判断是否存在消费者线程。
  • getWaitingConsumerCount():获取所有等待获取元素的消费线程数量。

LinkedTransferQueue实现了上述方法,较之于LinkedBlockingQueue在队列满时,入队操作会被阻塞的特性,LinkedTransferQueue在队列不满时也可以阻塞,只要没有消费者使用元素。下面来看下LinkedTransferQueue的入队和和出队操作:transfertake方法。

public void transfer(E e) throws InterruptedException {if (xfer(e, true, SYNC, 0) != null) {Thread.interrupted(); // failure possible only due to interruptthrow new InterruptedException();}
}
public E take() throws InterruptedException {E e = xfer(null, false, SYNC, 0);if (e != null)return e;Thread.interrupted();throw new InterruptedException();
}

LinkedTransferQueue入队和和出队都使用了一个关键方法:

private E xfer(E e, boolean haveData, int how, long nanos) {}

其中,E表示被操作的元素,haveDatatrue表示添加数据,false表示移除数据;how有四种取值:NOW, ASYNC, SYNC, 或者TIMED,分别表示执行的时机;nanos表示howTIMED时的时间限制。
xfer方法具体流程较为复杂,这里不再展开。另外,LinkedTransferQueue采用了CAS非阻塞同步机制,后面会具体讲到)

转载于:https://www.cnblogs.com/younghao/p/8457961.html

Java并发(一)——线程安全的容器(上)相关推荐

  1. Java 并发编程 -- 线程池源码实战

    一.概述 小编在网上看了好多的关于线程池原理.源码分析相关的文章,但是说实话,没有一篇让我觉得读完之后豁然开朗,完完全全的明白线程池,要么写的太简单,只写了一点皮毛,要么就是是晦涩难懂,看完之后几乎都 ...

  2. Java并发,volatile+不可变容器对象能保证线程安全么?!

    <Java并发编程实战>第3章原文 <Java并发编程实战>中3.4.2 示例:使用Volatile类型来发布不可变对象 在前面的UnsafeCachingFactorizer ...

  3. Java并发编程实战(进阶篇 - 上)

    通过上一篇的学习我们就可以运用这些方法来构建线程安全的类,那么本篇我们会着重学习Java中的同步容器类.并发容器和框架.并发工具类.Executor 框架,看看这些并发类库或者框架是怎样实现的. 一. ...

  4. java并发编程笔记3-同步容器并发容器闭锁栅栏信号量

    一.同步容器: 1.Vector容器实现了List接口,Vector实际上就是一个数组,和ArrayList类似,但是Vector中的方法都是synchronized方法,即进行了同步措施.保证了线程 ...

  5. Java并发编程——线程池的使用

    在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统 ...

  6. java workerdone_【架构】Java并发编程——线程池的使用

    前言 如果我们要使用线程的时候就去创建一个,这样虽然非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为 ...

  7. Java并发教程–线程池

    Java 1.5中提供的最通用的并发增强功能之一是引入了可自定义的线程池. 这些线程池使您可以对诸如线程数,线程重用,调度和线程构造之类的东西进行大量控制. 让我们回顾一下. 首先,线程池. 让我们直 ...

  8. Java并发编程-线程安全基础

    线程安全基础 1.线程安全问题 2.账户取款案例 3.同步代码块synchronized synchronized的理解 java中有三大变量的线程安全问题 在实例方法上使用synchronized ...

  9. 面试官:Java如何绑定线程到指定CPU上执行?

    不知道你是啥感觉,但是我第一次看到这个问题的时候,我是懵逼的. 而且它还是一个面试题. 我懵逼倒不是因为我不知道答案,而是恰好我之前在非常机缘巧合的情况下知道了答案. 我感觉非常的冷门,作为一个考察候 ...

  10. Java并发编程—线程间协作方式wait()、notify()、notifyAll()和Condition

    原文作者:Matrix海 子 原文地址:Java并发编程:线程间协作的两种方式:wait.notify.notifyAll和Condition 目录 一.wait().notify()和notifyA ...

最新文章

  1. Java项目:宿舍寝室维修上报管理系统(java+SpringBoot+FreeMarker+Mysql)
  2. [To be translated] Nova:libvirt image 的生命周期
  3. android studio adb
  4. python谁的课比较好-【年度系列】2018年学习Python最好的5门课程
  5. CentOS6.5下安装Apache2.4+PHP7
  6. GitHub如何删除一个repository(仓库)
  7. build vue 静态化_页面静态化
  8. Java黑皮书课后题第3章:3.10(游戏:加法测试)程序清单3-3随机产生一个减法问题。修改这个程序,随机产生一个计算两个小于100的整数的加法问题
  9. html5 jquery paint plugin,制作高质量的JQuery Plugin 插件的方法
  10. 一个mapper接口有多个mapper.xml 文件_爱了!分享一个基于Spring Boot的API、RESTful API项目种子(骨架)!...
  11. 找出性能消耗是第一步,如何解决问题才是关键
  12. DtCms.ActionLabel.Article.cs
  13. 创建一个java项目
  14. PHP安全新闻早8点_1127
  15. 人工智能全球 2000 位最具影响力学者榜单
  16. 计算机网络读书笔记DAY4(3)
  17. 定义一个基类BaseClass,从它派生出类DerivedClass。BaseClass有成员函数fn1(),fn2()
  18. element-ui el-descriptions取消冒号
  19. Android热更新
  20. MER:综述高通量测序应用于病原体和害虫诊断

热门文章

  1. 什么是php 的精华,PHP精华
  2. Android Permission访问权限许可
  3. Mysql练习_MySQL练习(一)
  4. 关于power shell
  5. 通过字节流来代替链接来下载小文件
  6. android 讲程序设为默认主屏幕_轻松搞定 PC 副屏,双屏幕更方便!
  7. 单片机加减法计算器_单片机简易加法计算器程序
  8. 如何鉴别项目经理/软件设计师的水平
  9. JavaWeb——IOC
  10. Android Studio 设置HTTP代理无法取消的问题