Java JUC学习 - ConcurrentLinkedDeque 详解

0x00 前言

  • 如何实现并发程序,对于Java以及其他高级语言来说都是一件并不容易的事情。在大一上学期的时候,我们学习了链表。可以说链表是一种很常用的数据结构了,不管是实现链式的队列和栈还是实现树都需要这种数据结构。但是在最初的时候我们并没有考虑过链表的并发问题,比如下面是一个经典的链表插入过程:
//C Source Code
//查找函数
LinkNode *SearchNode(LinkList *list, int i);
//插入函数
void LinkListInsert(LinkList *list, LinkNode *node, int i) {
//  首先找到插入点之前的结点LinkNode *previous = SearchNode(list, i-1);
//    插入操作node->next = previous->next;previous->next = node;
}
  • 如果只有一个线程操作链表的话那一定是没有问题的,但是因为链表的插入操作实际上是非原子(nonatomic)访问,所以多线程操作同一个链表的话可能会出现问题。解决这种问题的一种方法是使用线程同步,在访问的时候加锁。不过这样一来编程就会变的很繁琐,也会出现很多不可预料的问题。在Java中为我们提供了JUC类库,里面有很多可以用于多线程访问的类型。今天介绍的就是其中的ConcurrentLinkedDeque类型。

0x01 ConcurrentLinkedDeque概述

  • ConcurrentLinkedDeque是从Java SE 7/JDK 7开始提供的一种Non-blocking Thread-safe List(非阻塞式线程安全链表),隶属于java.util.concurrent包。其类原型为:
public class ConcurrentLinkedDeque<E>
extends AbstractCollection<E>
implements Deque<E>, Serializable
  • 在API简介中,对ConcurrentLinkedDeque的表述是:

An unbounded concurrent deque based on linked nodes. Concurrent insertion, removal, and access operations execute safely across multiple threads. A ConcurrentLinkedDeque is an appropriate choice when many threads will share access to a common collection. Like most other concurrent collection implementations, this class does not permit the use of null elements.

  • 大概意思是,ConcurrentLinkedDeque是一种基于链接节点的无限并发链表。可以安全地并发执行插入、删除和访问操作。当许多线程同时访问一个公共集合时,ConcurrentLinkedDeque是一个合适的选择。和大多数其他并发的集合类型一样,这个类不允许使用空元素。
  • 听起来很美好,但是在使用过程中还要注意一个元素数量的问题,在API文档中这样表述:

Beware that, unlike in most collections, the size method is NOT a constant-time operation. Because of the asynchronous nature of these deques, determining the current number of elements requires a traversal of the elements, and so may report inaccurate results if this collection is modified during traversal.

  • 大概意思是:与大多数集合类型不同,其size方法不是一个常量操作。因为链表的异步性质,确定当前元素的数量需要遍历所有的元素,所以如果在遍历期间有其他线程修改了这个集合,size方法就可能会报告不准确的结果。
  • 同时,对链表的批量操作也不是一个原子操作,在使用的时候要注意,在API文档中这样表述:

Bulk operations that add, remove, or examine multiple elements, such as addAll(java.util.Collection<? extends E>), removeIf(java.util.function.Predicate<? super E>) or forEach(java.util.function.Consumer<? super E>), are not guaranteed to be performed atomically.

  • 大概意思是,批量的操作:包括添加、删除或检查多个元素,比如addAll(java.util.Collection<? extends E>)removeIf(java.util.function.Predicate<? super E>)或者removeIf(java.util.function.Predicate<? super E>)forEach(java.util.function.Consumer<? super E>)方法,这个类型并不保证以原子方式执行。由此可见如果想保证原子访问,不得使用批量操作的方法。
  • 浏览API的介绍,我们对这个类型有了大体上的了解。实际上ConcurrentLinkedDeque本质上也是一个链表,对这个类型的基本操作也和基本的链表一样,基本上就是“创建、插入、删除、遍历、清空”等,下面直接借助具体的例子来表述。

0x02 ConcurrentLinkedDeque源代码解析

  • 在本节我将从普通链表的实现的角度解析这个非阻塞的线程安全链表的实现。首先对于链表而言一定是由结点构成的,定义如下:
static final class Node<E> {
//        结点的前继结点,Java没有指针,但是普通对象就相当于指针了。volatile Node<E> prev;
//        存储的泛型元素。volatile E item;
//        结点的后继结点volatile Node<E> next;}
  • 其次是该类的一些公有方法的实现,这些方法大多是实现的java.util.Deque<E>中的接口方法。
  • 检查链表中是否包含给定的元素
/*** Returns {@code true} if this deque contains the specified element.* More formally, returns {@code true} if and only if this deque contains* at least one element {@code e} such that {@code o.equals(e)}.** @param o element whose presence in this deque is to be tested* @return {@code true} if this deque contains the specified element*/public boolean contains(Object o) {//如果是空对象直接返回,因为不允许存储空对象if (o != null) {//比较经典的for循环结构了,从首个结点开始依次寻找。for (Node<E> p = first(); p != null; p = succ(p)) {final E item;//如果结点元素不为空并且还是我们要的元素,就返回trueif ((item = p.item) != null && o.equals(item))return true;}}return false;}
  • 检查链表是否为空
/*** Returns {@code true} if this collection contains no elements.** @return {@code true} if this collection contains no elements*/public boolean isEmpty() {//直接检查能否取到首元素即可,下面会讲解peekFirst()方法的实现return peekFirst() == null;}
  • 返回链表中包含的元素个数
/*** Returns the number of elements in this deque.  If this deque* contains more than {@code Integer.MAX_VALUE} elements, it* returns {@code Integer.MAX_VALUE}.** <p>Beware that, unlike in most collections, this method is* <em>NOT</em> a constant-time operation. Because of the* asynchronous nature of these deques, determining the current* number of elements requires traversing them all to count them.* Additionally, it is possible for the size to change during* execution of this method, in which case the returned result* will be inaccurate. Thus, this method is typically not very* useful in concurrent applications.** @return the number of elements in this deque*/public int size() {//这里应该是Java中的标签(label)语句的用法,我们平时编程比较少用到restartFromHead: for (;;) {//count计数器int count = 0;//从首结点for (Node<E> p = first(); p != null;) {//如果不是空结点if (p.item != null)//如果count加1之后达到了最大整数值就退出if (++count == Integer.MAX_VALUE)break;  // @see Collection.size()//如果p的下一个结点仍然为p,则从头开始if (p == (p = p.next))continue restartFromHead;}return count;}}
  • 获得链表首结点
public E peekFirst() {//从第一个结点开始循环,找到第一个非空的结点for (Node<E> p = first(); p != null; p = succ(p)) {final E item;if ((item = p.item) != null)return item;}return null;}
  • 获得链表尾结点
public E peekLast() {//从最后一个结点开始,找到第一个非空的结点for (Node<E> p = last(); p != null; p = pred(p)) {final E item;if ((item = p.item) != null)return item;}return null;}
  • 下面介绍一些protected方法和private方法。这里仅讲解插入到首结点和插入到尾结点的方法,其他方法思路大致相同,这里就不再介绍了。
    /*** Links e as first element.*/private void linkFirst(E e) {// 新建一个结点,并且要求结点不为空。final Node<E> newNode = newNode(Objects.requireNonNull(e));restartFromHead:for (;;)//从头结点开始遍历链表for (Node<E> h = head, p = h, q;;) {//如果p不是第一个结点则应该退回第一个结点if ((q = p.prev) != null &&(q = (p = q).prev) != null)// Check for head updates every other hop.// If p == q, we are sure to follow head instead.p = (h != (h = head)) ? h : q;else if (p.next == p) // PREV_TERMINATORcontinue restartFromHead;else {// p is first node//将新结点的下个结点设为pNEXT.set(newNode, p); // CAS piggyback//设置新结点的上个结点if (PREV.compareAndSet(p, null, newNode)) {// Successful CAS is the linearization point// for e to become an element of this deque,// and for newNode to become "live".if (p != h) // hop two nodes at a time; failure is OK//调整头结点HEAD.weakCompareAndSet(this, h, newNode);return;}// Lost CAS race to another thread; re-read prev}}}
/*** Links e as last element.*/private void linkLast(E e) {// 新建一个结点,并且要求结点不为空。final Node<E> newNode = newNode(Objects.requireNonNull(e));restartFromTail:for (;;)for (Node<E> t = tail, p = t, q;;) {//如果p不是最后一个结点则应该退回最后一个结点if ((q = p.next) != null &&(q = (p = q).next) != null)// Check for tail updates every other hop.// If p == q, we are sure to follow tail instead.p = (t != (t = tail)) ? t : q;else if (p.prev == p) // NEXT_TERMINATORcontinue restartFromTail;else {// p is last node//将新结点的上个结点设为pPREV.set(newNode, p); // CAS piggybackif (NEXT.compareAndSet(p, null, newNode)) {// Successful CAS is the linearization point// for e to become an element of this deque,// and for newNode to become "live".if (p != t) // hop two nodes at a time; failure is OK//调整尾结点TAIL.weakCompareAndSet(this, t, newNode);return;}// Lost CAS race to another thread; re-read next}}}
  • 果然Java库中的代码的实现还是比较复杂的,这次只是粗略的了解。有些地方还是不太理解,所以还需要进一步的学习。

0x03 ConcurrentLinkedDeque在实际中的应用

  • 不多做解释了,直接上应用。
package cn.xiaolus.cld;import java.util.concurrent.ConcurrentLinkedDeque;public class CLDMain {private static ConcurrentLinkedDeque<String> cld = new ConcurrentLinkedDeque<>();public static void main(String[] args) {int numThread = Runtime.getRuntime().availableProcessors();Thread[] threads = new Thread[numThread];for (int i = 0; i < threads.length; i++) {(threads[i] = new Thread(addTask(), "Thread "+i)).start();}}public static Runnable addTask() {return new Runnable() {@Overridepublic void run() {int num = Runtime.getRuntime().availableProcessors();for (int i = 0; i < num; i++) {StringBuilder item = new StringBuilder("Item ").append(i);cld.addFirst(item.toString());callbackAdd(Thread.currentThread().getName(), item);}callbackFinish(Thread.currentThread().getName());}};}public static void callbackAdd(String threadName, StringBuilder item) {StringBuilder builder = new StringBuilder(threadName).append(" added :").append(item);System.out.println(builder);}public static void callbackFinish(String threadName) {StringBuilder builder = new StringBuilder(threadName).append(" has Finished");System.out.println(builder);System.out.println(new StringBuilder("CurrentSize ").append(cld.size()));}
}
  • 程序的一次输出为:
Thread 0 added :Item 0
Thread 0 added :Item 1
Thread 0 added :Item 2
Thread 0 added :Item 3
Thread 0 has Finished
CurrentSize 6
Thread 1 added :Item 0
Thread 2 added :Item 0
Thread 2 added :Item 1
Thread 2 added :Item 2
Thread 2 added :Item 3
Thread 1 added :Item 1
Thread 1 added :Item 2
Thread 2 has Finished
Thread 1 added :Item 3
Thread 1 has Finished
CurrentSize 13
CurrentSize 13
Thread 3 added :Item 0
Thread 3 added :Item 1
Thread 3 added :Item 2
Thread 3 added :Item 3
Thread 3 has Finished
CurrentSize 16
  • 该程序实现了多线程并发添加大量元素到一个公共的链表,刚好是ConcurrentLinkedDeque的典型使用场景。同时也验证了上面的说法,即size()方法需要遍历链表,可能返回错误的结果。
  • 源代码链接:Github - xiaolulwr (路伟饶) / JUC-Demo。

Java JUC学习 - ConcurrentLinkedDeque 详解相关推荐

  1. Java JUC并发编程详解

    Java JUC并发编程详解 1. JUC概述 1.1 JUC简介 1.2 进程与线程 1.2 并发与并行 1.3 用户线程和守护线程 2. Lock接口 2.1 Synchronized 2.2 什 ...

  2. Java基础学习总结(24)——Java单元测试之JUnit4详解

    Java单元测试之JUnit4详解 与JUnit3不同,JUnit4通过注解的方式来识别测试方法.目前支持的主要注解有: @BeforeClass 全局只会执行一次,而且是第一个运行 @Before  ...

  3. Java编程配置思路详解

    Java编程配置思路详解 SpringBoot虽然提供了很多优秀的starter帮助我们快速开发,可实际生产环境的特殊性,我们依然需要对默认整合配置做自定义操作,提高程序的可控性,虽然你配的不一定比官 ...

  4. 你真的弄明白了吗?Java并发之AQS详解

    你真的弄明白了吗?Java并发之AQS详解 带着问题阅读 1.什么是AQS,它有什么作用,核心思想是什么 2.AQS中的独占锁和共享锁原理是什么,AQS提供的锁机制是公平锁还是非公平锁 3.AQS在J ...

  5. java定时任务框架elasticjob详解

    这篇文章主要介绍了java定时任务框架elasticjob详解,Elastic-Job是ddframe中dd-job的作业模块中分离出来的分布式弹性作业框架.该项目基于成熟的开源产品Quartz和Zo ...

  6. Java RMI远程方法调用详解

    Java RMI远程方法调用详解     [尊重原创,转载请注明出处]http://blog.csdn.net/guyuealian/article/details/51992182 一.Java R ...

  7. Java高并发编程详解系列-Java线程入门

    根据自己学的知识加上从各个网站上收集的资料分享一下关于java高并发编程的知识点.对于代码示例会以Maven工程的形式分享到个人的GitHub上面.   首先介绍一下这个系列的东西是什么,这个系列自己 ...

  8. 【java】java ReentrantLock 源码详解

    文章目录 1.概述 2.问题 3.ReentrantLock源码分析 3.1 类的继承关系 3.2 类的内部类 3.2.1 Sync类 3.2.2 NonfairSync类 3.2.3 FairSyn ...

  9. java反射机制深入详解_Java反射机制深入详解

    原标题:Java反射机制深入详解 一.概念 反射就是把Java的各种成分映射成相应的Java类. Class类的构造方法是private,由JVM创建. 反射是java语言的一个特性,它允程序在运行时 ...

最新文章

  1. 计算机视觉相关干货文章-20190807
  2. struts国际化java_java框架篇---Struts2 本地化/国际化(i18n)
  3. Django实现一个简单的中间件,不熟悉中间件的爬坑之路
  4. oracle 论坛 千万级表,Oracle千万级记录操作总结
  5. Nginx负载均衡和反向代理设置
  6. C++ template<typename> 模板怎么用
  7. matlab箭头梯度方向场,局部路径规划算法——人工势场法
  8. 基于python、百度ocr、multiprocessing多进程、selenium网页自动化 、pyqt5界面弹出,实现发票的识别与对学校财务网站的脚本自动化上传操作的项目总结
  9. 蒸烤一体机哪个品牌好性价比高,盘点国内消费者呼声最高的品牌推荐
  10. 系统错误null是什么意思_为什么NULL是错误的?
  11. ui设计培训机构内课程包括哪些板块|优漫动游
  12. 计算机主机无反应,电脑突然开不了机、主机没反应、不显示,几个方法轻松解决...
  13. simon手册翻译_part2
  14. python代码练习,微信登入并生成头像大图
  15. 今天再次认真整理了浏览器收藏夹
  16. 三七互娱php笔试题,三七互娱笔试
  17. 神奇的输入法——小狼毫——个性化设置
  18. EXCEL直接生成扫描枪可识别条码
  19. C++11之std::future对象使用说明
  20. 苹果OFFICE 2011MAC打开WORD字体乱码解决方法

热门文章

  1. Web Broadcast Channel
  2. 7-130 古风排版 (20 分)
  3. numpy 中shape的用法
  4. JAVA 一个或多个空格分割字符串
  5. 数据库(5)SQL约束
  6. Spring AOP自动创建代理 和 ProxyFactoryBean创建代理
  7. 数字逻辑对偶式_数字电子技术实验——组合逻辑电路的设计
  8. ssas脚本组织程序_微服务架构:从事务脚本到领域模型
  9. 如何把本地yum源给其他机器使用_配置本地yum源以及第3方软件仓库的搭建
  10. python设计模式之猴子补丁模式