第九章 - 线程安全集合类

线程安全集合类概述

线程安全集合类可以分为三大类:

  • 遗留的线程安全集合如 Hashtable、Vector。线程安全的实现无非直接加synchronized
  • 使用 Collections 装饰的线程安全集合,如:
    • 使用装饰器模式,把所装饰的类的所有方法,套了一个synchronized
    • Collections.synchronizedCollection
    • Collections.synchronizedList
    • Collections.synchronizedMap
    • Collections.synchronizedSet
    • Collections.synchronizedNavigableMap
    • Collections.synchronizedNavigableSet
    • Collections.synchronizedSortedMap
    • Collections.synchronizedSortedSet
  • java.util.concurrent.*

重点介绍 java.util.concurrent.* 下的线程安全集合类,可以发现它们有规律,里面包含三类关键词:Blocking、CopyOnWrite、Concurrent

  • Blocking 大部分实现基于锁,并提供用来阻塞的方法
  • CopyOnWrite 之类容器修改开销相对较重
  • Concurrent 类型的容器
    • 内部很多操作使用 CAS 优化,一般可以提供较高吞吐量
    • 弱一致性
      • 遍历时弱一致性,例如,当利用迭代器遍历时,如果容器发生修改,迭代器仍然可以继续进行遍历,这时内容是旧的
      • 求大小弱一致性,size 操作未必是 100% 准确
      • 读取弱一致性

遍历时如果发生了修改,对于非安全容器来讲,使用 fail-fast
 机制也就是让遍历立刻失败,抛出ConcurrentModifificationException,不再继续遍历

JDK1.8 ConcurrentHashMap

练习:单词计数

模版代码中封装了多线程读取文件的代码

private static <V> void demo(Supplier<Map<String, V>> supplier, BiConsumer<Map<String, V>, List<String>> consumer) {Map<String, V> counterMap = supplier.get();// key value// a   200// b   200List<Thread> ts = new ArrayList<>();for (int i = 1; i <= 26; i++) {int idx = i;Thread thread = new Thread(() -> {List<String> words = readFromFile(idx);consumer.accept(counterMap, words);});ts.add(thread);}ts.forEach(t -> t.start());ts.forEach(t -> {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}});System.out.println(counterMap);
}public static List<String> readFromFile(int i) {ArrayList<String> words = new ArrayList<>();try (BufferedReader in = new BufferedReader(new InputStreamReader(new FileInputStream("tmp/" + i + ".txt")))) {while (true) {String word = in.readLine();if (word == null) {break;}words.add(word);}return words;} catch (IOException e) {throw new RuntimeException(e);}
}
  • 你要做的是实现两个参数:

    • 一是提供一个 map 集合,用来存放每个单词的计数结果,key 为单词,value 为计数
    • 二是提供一组操作,保证计数的安全性,会传递 map 集合以及 单词 List
  • 正确结果输出应该是每个单词出现 200 次

具体实现

使用普通的HashMap实现

public static void main(String[] args) {demo(()->new HashMap<String, Integer>(),(map, words) -> {for (String word : words) {// 检查 key 有没有Integer counter = map.get(word);int newValue = counter == null ? 1 : counter + 1;// 没有 则 putmap.put(word, newValue);}});}

使用 ConcurrentHashMap 实现

public static void main(String[] args) {demo(() -> new ConcurrentHashMap<String, LongAdder>(), (map, words) -> {for (String word : words) {// 注意不能使用 putIfAbsent,此方法返回的是上一次的 value,首次调用返回 null// 即使这里设置key-value和value自增不是原子性操作,LongAdder的CAS会获取到最新的值再进行加一的,这一点可以放心LongAdder value = map.computeIfAbsent(word, (key) -> new LongAdder());// 执行累加value.increment();}});}

并发集合

集合对比

三种集合:

  • HashMap 是线程不安全的,性能好
  • Hashtable 线程安全基于 synchronized,综合性能差,已经被淘汰
  • ConcurrentHashMap 保证了线程安全,综合性能较好,不止线程安全,而且效率高,性能好

集合对比:

  1. Hashtable 继承 Dictionary 类,HashMap、ConcurrentHashMap 继承 AbstractMap,均实现 Map 接口
  2. Hashtable 底层是数组 + 链表,JDK8 以后 HashMap 和 ConcurrentHashMap 底层是数组 + 链表 + 红黑树
  3. HashMap 线程非安全,Hashtable 线程安全,Hashtable 的方法都加了 synchronized 锁来确保线程同步
  4. ConcurrentHashMap、Hashtable 不允许 null 值,HashMap 允许 null 值
  5. ConcurrentHashMap、HashMap 的初始容量为 16,Hashtable 初始容量为11,填充因子默认都是 0.75,两种 Map 扩容是当前容量翻倍:capacity * 2,Hashtable 扩容时是容量翻倍 + 1:capacity*2 + 1

工作步骤:

  1. 初始化,使用 来保证并发安全,懒惰初始化 table

  2. 树化,当 table.length < 64 时,先尝试扩容,超过 64 时,并且 bin.length > 8 时,会将链表树化,树化过程会用 synchronized 锁住链表头

    说明:锁住某个槽位的对象头,是一种很好的细粒度的加锁方式,类似 MySQL 中的行锁

  3. put时,如果该 bin 尚未创建,只需要使用 cas 创建 bin;如果已经有了,锁住链表头进行后续 put 操作,元素添加至 bin 的尾部

  4. get时,无锁操作仅需要保证可见性,扩容过程中如果 get 操作拿到的是 ForwardingNode, 会让 get 操作在新 table 中进行搜索

  5. 扩容,扩容时以 bin 为单位进行,需要对 bin 进行 synchronized,但这时其它竞争线程也不是无事可做,它们会帮助把其它 bin 进行扩容

  6. size,元素个数保存在 baseCount 中,并发时的个数变动保存在 CounterCell[ ] 当中,最后统计数量时累加

//需求:多个线程同时往 HashMap 容器中存入数据会出现安全问题
public class ConcurrentHashMapDemo {public static Map<String, String> map = new ConcurrentHashMap();public static void main(String[] args) {new AddMapDataThread().start();new AddMapDataThread().start();sleep(5); // 休息5秒,确保两个线程执行完毕System.out.println("Map大小:" + map.size()); // Map大小:1000000}}class AddMapDataThread extends Thread {@Overridepublic void run() {for (int i = 0; i < 1000000; i++) {ConcurrentHashMapDemo.map.put("键:" + i, "值" + i);}}}

并发死链

注意要在 JDK 7 下运行,否则扩容机制和 hash 的计算方法都变了

public class TestDeadLink {public static void main(String[] args) {// 测试 java 7 中哪些数字的 hash 结果相等System.out.println("长度为16时,桶下标为1的key");for (int i = 0; i < 64; i++) {if (hash(i) % 16 == 1) {System.out.println(i);}}System.out.println("长度为32时,桶下标为1的key");for (int i = 0; i < 64; i++) {if (hash(i) % 32 == 1) {System.out.println(i);}}// 1, 35, 16, 50 当大小为16时,它们在一个桶内final HashMap<Integer, Integer> map = new HashMap<Integer, Integer>();// 放 12 个元素map.put(2, null);map.put(3, null);map.put(4, null);map.put(5, null);map.put(6, null);map.put(7, null);map.put(8, null);map.put(9, null);map.put(10, null);map.put(16, null);map.put(35, null);map.put(1, null);System.out.println("扩容前大小[main]:" + map.size());// 两个线程都去放置第13个元素new Thread() {@Overridepublic void run() {// 放第 13 个元素, 发生扩容map.put(50, null);System.out.println("扩容后大小[Thread-0]:" + map.size());}}.start();new Thread() {@Overridepublic void run() {// 放第 13 个元素, 发生扩容map.put(50, null);System.out.println("扩容后大小[Thread-1]:" + map.size());}}.start();}final static int hash(Object k) {int h = 0;if (0 != h && k instanceof String) {return sun.misc.Hashing.stringHash32((String) k);}h ^= k.hashCode();h ^= (h >>> 20) ^ (h >>> 12);return h ^ (h >>> 7) ^ (h >>> 4);}}

死链复现

  • 调试工具使用 idea
  • 在 HashMap 源码 590 行加断点
int newCapacity = newTable.length;

  • 断点的条件如下,目的是让 HashMap 在扩容为 32 时,并且线程为 Thread-0 或 Thread-1 时停下来(因为程序运行起来,其他线程也可能会扩容,这里我们只关注我们的测试代码)
newTable.length==32 &&(Thread.currentThread().getName().equals("Thread-0")||Thread.currentThread().getName().equals("Thread-1"))

  • 断点暂停方式选择 Thread,否则在调试 Thread-0 时,Thread-1 无法恢复运行
  • 运行代码,程序在预料的断点位置停了下来,输出

长度为16时,桶下标为1的key
1
16
35
50
长度为32时,桶下标为1的key
1
35
扩容前大小[main]:12
  • 接下来进入扩容流程调试
  • 在 HashMap 源码 594 行加断点
Entry<K,V> next = e.next; // 593
if (rehash) // 594
// ...

  • 这是为了观察 e 节点和 next 节点的状态,Thread-0 单步执行到 594 行,在 594 处再添加一个断点(条件Thread.currentThread().getName().equals(“Thread-0”)),让Thread-0 停住,让 Thread-1 先执行进行扩容。

  • 这时可以在 Variables 面板观察到 e 和 next 变量,使用 view as -> Object 查看节点状态
e (1)->(35)->(16)->null
next (35)->(16)->null
  • 在 Threads 面板选中 Thread-1 恢复运行,可以看到控制台输出新的内容如下,Thread-1 扩容已完成

  • 扩容后元素个数是13,容量是32
  • 扩容后的结果:

// 从旧的table迁移过来的,旧的table的表头是(1) -> (35),新的newtable就是(1),然后(35)再迁移过来
// 同样采用头插法,新的newtable就是 (35) -> (1) -> null
// (16) 因为扩容后桶下标变了,被放到其他地方去了
newTable[1] (35)->(1)->null
扩容后大小:13
  • 这时 Thread-0 还停在 594 处, Variables 面板变量的状态已经变化为

// Thread-1 已经修改了新的table了,所以 Thread-0 看到的最新结果是 (35) -> (1) -> null
e (1)->null
next (35)->(1)->null
  • 为什么呢,因为 Thread-1 扩容时链表也是后加入的元素放在链表头,因此链表就倒过来了,但 Thread-1 虽然结果正确,但它结束后 Thread-0 还要继续运行
  • 接下来就可以单步调试(F8)观察死链的产生了 (1) → (35) → (1)
  • 下一轮循环到 594,将 e 搬迁到 newTable 链表头
newTable[1] (1)->null
e (35)->(1)->null
next (1)->null
  • 下一轮循环到 594,将 e 搬迁到 newTable 链表头
newTable[1] (35)->(1)->null
e (1)->null
next null
  • 再看看源码
e.next = newTable[1];
// 这时 e (1,35)
// 而 newTable[1] (35,1)->(1,35) 因为是同一个对象newTable[1] = e;
// 再尝试将 e 作为链表头, 死链已成e = next;
// 虽然 next 是 null, 会进入下一个链表的复制, 但死链已经形成了

源码分析

  • HashMap 的并发死链发生在扩容时
// 将 table 迁移至 newTable
void transfer(Entry[] newTable, boolean rehash) {int newCapacity = newTable.length;for (Entry<K,V> e : table) {while(null != e) {Entry<K,V> next = e.next;// 1 处if (rehash) {e.hash = null == e.key ? 0 : hash(e.key);}int i = indexFor(e.hash, newCapacity);// 2 处// 将新元素加入 newTable[i], 原 newTable[i] 作为新元素的 nexte.next = newTable[i];newTable[i] = e; e = next;}}}
  • 假设 map 中初始元素是
原始链表,格式:[下标] (key,next)
[1] (1,35)->(35,16)->(16,null)线程 a 执行到 1 处 ,此时局部变量 e 为 (1,35),而局部变量 next 为 (35,16) 线程 a 挂起线程 b 开始执行
第一次循环
[1] (1,null)第二次循环
[1] (35,1)->(1,null)第三次循环
[1] (35,1)->(1,null) [17] (16,null)切换回线程 a,此时局部变量 e 和 next 被恢复,引用没变但内容变了:e 的内容被改为 (1,null),而 next 的内容被改为 (35,1) 并链向 (1,null)
第一次循环
[1] (1,null)第二次循环,注意这时 e 是 (35,1) 并链向 (1,null) 所以 next 又是 (1,null)
[1] (35,1)->(1,null)第三次循环,e 是 (1,null),而 next 是 null,但 e 被放入链表头,这样 e.next 变成了 35 (2 处)
[1] (1,35)->(35,1)->(1,35)已经是死链了

小结

  • 究其原因,是因为在多线程环境下使用了非线程安全的 map 集合
  • JDK 8 虽然将扩容算法做了调整,不再将元素加入链表头(而是保持与扩容前一样的顺序),但仍不意味着能够在多线程环境下能够安全扩容,还会出现其它问题(如扩容丢数据)

B站视频解析:https://www.bilibili.com/video/BV1n541177Ea

  • JDK1.7 的 HashMap 采用的头插法(拉链法)进行节点的添加,HashMap 的扩容长度为原来的 2 倍
  • resize( ) 中节点(Entry)转移的源代码:
// 将所有Entry从当前表转移到newTable
void transfer(Entry[] newTable, boolean rehash) {int newCapacity = newTable.length;//得到新数组的长度// 遍历整个数组对应下标下的链表,e代表一个节点for (Entry<K,V> e : table) {   // 当e == null时,则该链表遍历完了,继续遍历下一数组下标的链表while(null != e) { // 先把e节点的下一节点存起来Entry<K,V> next = e.next; if (rehash) {              //得到新的hash值e.hash = null == e.key ? 0 : hash(e.key);  }// 在新数组下得到新的数组下标int i = indexFor(e.hash, newCapacity);  // 将e的next指针指向新数组下标的位置e.next = newTable[i];   // 将该数组下标的节点变为e节点newTable[i] = e; // 遍历链表的下一节点e = next;                                   }}
}
  • JDK 8 虽然将扩容算法做了调整,改用了尾插法,但仍不意味着能够在多线程环境下能够安全扩容,还会出现其它问题(如扩容丢数据)

成员属性

变量

存储数组:

transient volatile Node<K,V>[] table;

散列表的长度:

private static final int MAXIMUM_CAPACITY = 1 << 30;  // 最大长度
private static final int DEFAULT_CAPACITY = 16;            // 默认长度

并发级别,JDK7 遗留下来,JDK8 中不代表并发级别:

private static final int DEFAULT_CONCURRENCY_LEVEL = 16;

负载因子,JDK1.8 的 ConcurrentHashMap 中是固定值:

private static final float LOAD_FACTOR = 0.75f;

阈值:

static final int TREEIFY_THRESHOLD = 8;     // 链表树化的阈值
static final int UNTREEIFY_THRESHOLD = 6;  // 红黑树转化为链表的阈值
static final int MIN_TREEIFY_CAPACITY = 64;    // 当数组长度达到64且某个桶位中的链表长度超过8,才会真正树化

扩容相关:

private static final int MIN_TRANSFER_STRIDE = 16;  // 线程迁移数据【最小步长】,控制线程迁移任务的最小区间
private static int RESIZE_STAMP_BITS = 16;         // 用来计算扩容时生成的【标识戳】
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; // 65535-1并发扩容最多线程数
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;      // 扩容时使用

节点哈希值:

static final int MOVED     = -1;            // 表示当前节点是 FWD 节点
static final int TREEBIN   = -2;           // 表示当前节点已经树化,且当前节点为 TreeBin 对象
static final int RESERVED  = -3;           // 表示节点时临时节点
static final int HASH_BITS = 0x7fffffff;   // 正常节点的哈希值的可用的位数

扩容过程:volatile 修饰保证多线程的可见性

// 扩容过程中,会将扩容中的新 table 赋值给 nextTable 保持引用,扩容结束之后,这里会被设置为 null
private transient volatile Node<K,V>[] nextTable;
// 记录扩容进度,所有线程都要从 0 - transferIndex 中分配区间任务,简单说就是老表转移到哪了,索引从高到低转移
private transient volatile int transferIndex;

累加统计:

// LongAdder 中的 baseCount 未发生竞争时或者当前LongAdder处于加锁状态时,增量累加到 baseCount 中
private transient volatile long baseCount;
// LongAdder 中的 cellsBuzy,0 表示当前 LongAdder 对象无锁状态,1 表示当前 LongAdder 对象加锁状态
private transient volatile int cellsBusy;
// LongAdder 中的 cells 数组,
private transient volatile CounterCell[] counterCells;

控制变量:

  • sizeCtl < 0:

    • 1 表示当前 table 正在初始化(有线程在创建 table 数组),当前线程需要自旋等待
    • 其他负数表示当前 map 的 table 数组正在进行扩容,高 16 位表示扩容的标识戳;低 16 位表示 (1 + nThread) 当前参与并发扩容的线程数量 + 1
  • sizeCtl = 0,表示创建 table 数组时使用 DEFAULT_CAPACITY 为数组大小
  • sizeCtl > 0:
    • 如果 table 未初始化,表示初始化大小
    • 如果 table 已经初始化,表示下次扩容时的触发条件(阈值,元素个数,不是数组的长度)
private transient volatile int sizeCtl;      // volatile 保持可见性

内部类

Node 节点:

static class Node<K,V> implements Entry<K,V> {// 节点哈希值final int hash;final K key;volatile V val;// 单向链表volatile Node<K,V> next;
}

TreeBin 节点:

static final class TreeBin<K,V> extends Node<K,V> {// 红黑树根节点TreeNode<K,V> root;// 链表的头节点volatile TreeNode<K,V> first;// 等待者线程volatile Thread waiter;volatile int lockState;// 写锁状态 写锁是独占状态,以散列表来看,真正进入到 TreeBin 中的写线程同一时刻只有一个线程static final int WRITER = 1;// 等待者状态(写线程在等待),当 TreeBin 中有读线程目前正在读取数据时,写线程无法修改数据static final int WAITER = 2;// 读锁状态是共享,同一时刻可以有多个线程 同时进入到 TreeBi 对象中获取数据,每一个线程都给 lockState + 4static final int READER = 4;}

TreeNode 节点:

static final class TreeNode<K,V> extends Node<K,V> {TreeNode<K,V> parent;  // red-black tree linksTreeNode<K,V> left;TreeNode<K,V> right;TreeNode<K,V> prev;   //双向链表boolean red;
}

ForwardingNode 节点:转移节点

static final class ForwardingNode<K,V> extends Node<K,V> {// 持有扩容后新的哈希表的引用final Node<K,V>[] nextTable;ForwardingNode(Node<K,V>[] tab) {// ForwardingNode 节点的 hash 值设为 -1super(MOVED, null, null, null);this.nextTable = tab;}}

代码块

变量:

// 表示sizeCtl属性在 ConcurrentHashMap 中内存偏移地址
private static final long SIZECTL;
// 表示transferIndex属性在 ConcurrentHashMap 中内存偏移地址
private static final long TRANSFERINDEX;
// 表示baseCount属性在 ConcurrentHashMap 中内存偏移地址
private static final long BASECOUNT;
// 表示cellsBusy属性在 ConcurrentHashMap 中内存偏移地址
private static final long CELLSBUSY;
// 表示cellValue属性在 CounterCell 中内存偏移地址
private static final long CELLVALUE;
// 表示数组第一个元素的偏移地址
private static final long ABASE;
// 用位移运算替代乘法
private static final int ASHIFT;

赋值方法:

// 表示数组单元所占用空间大小,scale 表示 Node[] 数组中每一个单元所占用空间大小,int 是 4 字节
int scale = U.arrayIndexScale(ak);
// 判断一个数是不是 2 的 n 次幂,比如 8:1000 & 0111 = 0000
if ((scale & (scale - 1)) != 0)throw new Error("data type scale not a power of two");// numberOfLeadingZeros(n):返回当前数值转换为二进制后,从高位到低位开始统计,看有多少个0连续在一起
// 8 → 1000 numberOfLeadingZeros(8) = 28
// 4 → 100 numberOfLeadingZeros(4) = 29   int 值就是占4个字节
ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);// ASHIFT = 31 - 29 = 2 ,int 的大小就是 2 的 2 次方,获取次方数
// ABASE + (5 << ASHIFT) 用位移运算替代了乘法,获取 arr[5] 的值

构造方法

无参构造, 散列表结构延迟初始化,默认的数组大小是 16:

public ConcurrentHashMap() {}

有参构造:

public ConcurrentHashMap(int initialCapacity) {// 指定容量初始化if (initialCapacity < 0) throw new IllegalArgumentException();int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?MAXIMUM_CAPACITY :// 假如传入的参数是 16,16 + 8 + 1 ,最后得到 32// 传入 12, 12 + 6 + 1 = 19,最后得到 32,尽可能的大,与 HashMap不一样tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));// sizeCtl > 0,当目前 table 未初始化时,sizeCtl 表示初始化容量this.sizeCtl = cap;
}
private static final int tableSizeFor(int c) {int n = c - 1;n |= n >>> 1;n |= n >>> 2;n |= n >>> 4;n |= n >>> 8;n |= n >>> 16;return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}
  • HashMap 部分详解了该函数,核心思想就是把最高位是 1 的位以及右边的位全部置 1
    ,结果加 1 后就是 2 的 n 次幂

多个参数构造方法:

public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)throw new IllegalArgumentException();// 初始容量小于并发级别if (initialCapacity < concurrencyLevel)  // 把并发级别赋值给初始容量initialCapacity = concurrencyLevel; // loadFactor 默认是 0.75long size = (long)(1.0 + (long)initialCapacity / loadFactor);int cap = (size >= (long)MAXIMUM_CAPACITY) ?MAXIMUM_CAPACITY : tableSizeFor((int)size);// sizeCtl > 0,当目前 table 未初始化时,sizeCtl 表示初始化容量this.sizeCtl = cap;
}

集合构造方法:

public ConcurrentHashMap(Map<? extends K, ? extends V> m) {this.sizeCtl = DEFAULT_CAPACITY;   // 默认16putAll(m);
}
public void putAll(Map<? extends K, ? extends V> m) {// 尝试触发扩容tryPresize(m.size());for (Entry<? extends K, ? extends V> e : m.entrySet())putVal(e.getKey(), e.getValue(), false);
}
private final void tryPresize(int size) {// 扩容为大于 2 倍的最小的 2 的 n 次幂int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :tableSizeFor(size + (size >>> 1) + 1);int sc;while ((sc = sizeCtl) >= 0) {Node<K,V>[] tab = table; int n;// 数组还未初始化,【一般是调用集合构造方法才会成立,put 后调用该方法都是不成立的】if (tab == null || (n = tab.length) == 0) {n = (sc > c) ? sc : c;if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {try {if (table == tab) {Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];table = nt;sc = n - (n >>> 2);// 扩容阈值:n - 1/4 n}} finally {sizeCtl = sc; // 扩容阈值赋值给sizeCtl}}}// 未达到扩容阈值或者数组长度已经大于最大长度else if (c <= sc || n >= MAXIMUM_CAPACITY)break;// 与 addCount 逻辑相同else if (tab == table) {int rs = resizeStamp(n);if (sc < 0) {Node<K,V>[] nt;if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||transferIndex <= 0)break;if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))transfer(tab, nt);}else if (U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2))transfer(tab, null);}}
}

成员方法

数据访存

tabAt( ):获取数组某个槽位的头节点,类似于数组中的直接寻址 arr[i]

// i 是数组索引
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {// (i << ASHIFT) + ABASE == ABASE + i * 4 (一个 int 占 4 个字节),这就相当于寻址,替代了乘法return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

casTabAt( ):指定数组索引位置修改原值为指定的值

static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

setTabAt( ):指定数组索引位置设置值

static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}

添加方法

public V put(K key, V value) {// 第三个参数 onlyIfAbsent 为 false 表示哈希表中存在相同的 key 时【用当前数据覆盖旧数据】return putVal(key, value, false);
}

putVal( )

final V putVal(K key, V value, boolean onlyIfAbsent) {// 【ConcurrentHashMap 不能存放 null 值】if (key == null || value == null) throw new NullPointerException();// 扰动运算,高低位都参与寻址运算int hash = spread(key.hashCode());// 表示当前 k-v 封装成 node 后插入到指定桶位后,在桶位中的所属链表的下标位置int binCount = 0;// tab 引用当前 map 的数组 table,开始自旋for (Node<K,V>[] tab = table;;) {// f 表示桶位的头节点,n 表示哈希表数组的长度// i 表示 key 通过寻址计算后得到的桶位下标,fh 表示桶位头结点的 hash 值Node<K,V> f; int n, i, fh;// 【CASE1】:表示当前 map 中的 table 尚未初始化if (tab == null || (n = tab.length) == 0)//【延迟初始化】tab = initTable();// 【CASE2】:i 表示 key 使用【寻址算法】得到 key 对应数组的下标位置,tabAt 获取指定桶位的头结点felse if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {// 对应的数组为 null 说明没有哈希冲突,直接新建节点添加到表中if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))break;}// 【CASE3】:逻辑说明数组已经被初始化,并且当前 key 对应的位置不为 null// 条件成立表示当前桶位的头结点为 FWD 结点,表示目前 map 正处于扩容过程中else if ((fh = f.hash) == MOVED)// 当前线程【需要去帮助哈希表完成扩容】tab = helpTransfer(tab, f);// 【CASE4】:哈希表没有在扩容,当前桶位可能是链表也可能是红黑树else {// 当插入 key 存在时,会将旧值赋值给 oldVal 返回V oldVal = null;// 【锁住当前 key 寻址的桶位的头节点】synchronized (f) {// 这里重新获取一下桶的头节点有没有被修改,因为可能被其他线程修改过,这里是线程安全的获取if (tabAt(tab, i) == f) {// 【头节点的哈希值大于 0 说明当前桶位是普通的链表节点】if (fh >= 0) {// 当前的插入操作没出现重复的 key,追加到链表的末尾,binCount表示链表长度 -1// 插入的key与链表中的某个元素的 key 一致,变成替换操作,binCount 表示第几个节点冲突binCount = 1;// 迭代循环当前桶位的链表,e 是每次循环处理节点,e 初始是头节点for (Node<K,V> e = f;; ++binCount) {// 当前循环节点 keyK ek;// key 的哈希值与当前节点的哈希一致,并且 key 的值也相同if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {// 把当前节点的 value 赋值给 oldValoldVal = e.val;// 允许覆盖if (!onlyIfAbsent)// 新数据覆盖旧数据e.val = value;// 跳出循环break;}Node<K,V> pred = e;// 如果下一个节点为空,把数据封装成节点插入链表尾部,【binCount 代表长度 - 1】if ((e = e.next) == null) {pred.next = new Node<K,V>(hash, key,value, null);break;}}}// 当前桶位头节点是红黑树else if (f instanceof TreeBin) {Node<K,V> p;binCount = 2;if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}}}// 条件成立说明当前是链表或者红黑树if (binCount != 0) {// 如果 binCount >= 8 表示处理的桶位一定是链表,说明长度是 9if (binCount >= TREEIFY_THRESHOLD)// 树化treeifyBin(tab, i);if (oldVal != null)return oldVal;break;}}}// 统计当前 table 一共有多少数据,判断是否达到扩容阈值标准,触发扩容// binCount = 0 表示当前桶位为 null,node 可以直接放入,2 表示当前桶位已经是红黑树addCount(1L, binCount);return null;
}

spread( ):扰动函数

将 hashCode 无符号右移 16 位,高 16bit 和低 16bit 做异或,最后与 HASH_BITS 相与变成正数,与树化节点和转移节点区分,把高低位都利用起来减少哈希冲突,保证散列的均匀性

static final int spread(int h) {return (h ^ (h >>> 16)) & HASH_BITS; // 0111 1111 1111 1111 1111 1111 1111 1111
}

initTable( ):初始化数组,延迟初始化

private final Node<K,V>[] initTable() {// tab 引用 map.table,sc 引用 sizeCtlNode<K,V>[] tab; int sc;// table 尚未初始化,开始自旋while ((tab = table) == null || tab.length == 0) {// sc < 0 说明 table 正在初始化或者正在扩容,当前线程可以释放 CPU 资源if ((sc = sizeCtl) < 0)Thread.yield();// sizeCtl 设置为 -1,相当于加锁,【设置的是 SIZECTL 位置的数据】,// 因为是 sizeCtl 是基本类型,不是引用类型,所以 sc 保存的是数据的副本else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {try {// 线程安全的逻辑,再进行一次判断if ((tab = table) == null || tab.length == 0) {// sc > 0 创建 table 时使用 sc 为指定大小,否则使用 16 默认值int n = (sc > 0) ? sc : DEFAULT_CAPACITY;// 创建哈希表数组Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];table = tab = nt;// 扩容阈值,n >>> 2  => 等于 1/4 n ,n - (1/4)n = 3/4 n => 0.75 * nsc = n - (n >>> 2);}} finally {// 解锁,把下一次扩容的阈值赋值给 sizeCtlsizeCtl = sc;}break;}}return tab;
}

treeifyBin( ):树化方法

private final void treeifyBin(Node<K,V>[] tab, int index) {Node<K,V> b; int n, sc;if (tab != null) {// 条件成立:【说明当前 table 数组长度未达到 64,此时不进行树化操作,进行扩容操作】if ((n = tab.length) < MIN_TREEIFY_CAPACITY)// 当前容量的 2 倍tryPresize(n << 1);// 条件成立:说明当前桶位有数据,且是普通 node 数据。else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {// 【树化加锁】synchronized (b) {// 条件成立:表示加锁没问题。if (tabAt(tab, index) == b) {TreeNode<K,V> hd = null, tl = null;for (Node<K,V> e = b; e != null; e = e.next) {TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val,null, null);if ((p.prev = tl) == null)hd = p;elsetl.next = p;tl = p;}setTabAt(tab, index, new TreeBin<K,V>(hd));}}}}
}

addCount( ):添加计数,代表哈希表中的数据总量

private final void addCount(long x, int check) {// 【上面这部分的逻辑就是 LongAdder 的累加逻辑】CounterCell[] as; long b, s;// 判断累加数组 cells 是否初始化,没有就去累加 base 域,累加失败进入条件内逻辑if ((as = counterCells) != null ||!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {CounterCell a; long v; int m;// true 未竞争,false 发生竞争boolean uncontended = true;// 判断 cells 是否被其他线程初始化if (as == null || (m = as.length - 1) < 0 ||// 前面的条件为 fasle 说明 cells 被其他线程初始化,通过 hash 寻址对应的槽位(a = as[ThreadLocalRandom.getProbe() & m]) == null ||// 尝试去对应的槽位累加,累加失败进入 fullAddCount 进行重试或者扩容!(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {// 与 Striped64#longAccumulate 方法相同fullAddCount(x, uncontended);return;}// 表示当前桶位是 null,或者一个链表节点if (check <= 1)  return;// 【获取当前散列表元素个数】,这是一个期望值s = sumCount();}// 表示一定 【是一个 put 操作调用的 addCount】if (check >= 0) {Node<K,V>[] tab, nt; int n, sc;// 条件一:true 说明当前 sizeCtl 可能为一个负数表示正在扩容中,或者 sizeCtl 是一个正数,表示扩容阈值//        false 表示哈希表的数据的数量没达到扩容条件// 然后判断当前 table 数组是否初始化了,当前 table 长度是否小于最大值限制,就可以进行扩容while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&(n = tab.length) < MAXIMUM_CAPACITY) {// 16 -> 32 扩容 标识为:1000 0000 0001 1011,【负数,扩容批次唯一标识戳】int rs = resizeStamp(n);// 表示当前 table,【正在扩容】,sc 高 16 位是扩容标识戳,低 16 位是线程数 + 1if (sc < 0) {// 条件一:判断扩容标识戳是否一样,fasle 代表一样// 勘误两个条件:// 条件二是:sc == (rs << 16 ) + 1,true 代表扩容完成,因为低16位是1代表没有线程扩容了// 条件三是:sc == (rs << 16) + MAX_RESIZERS,判断是否已经超过最大允许的并发扩容线程数// 条件四:判断新表的引用是否是 null,代表扩容完成// 条件五:【扩容是从高位到低位转移】,transferIndex < 0 说明没有区间需要扩容了if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||transferIndex <= 0)break;// 设置当前线程参与到扩容任务中,将 sc 低 16 位值加 1,表示多一个线程参与扩容// 设置失败其他线程或者 transfer 内部修改了 sizeCtl 值if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))//【协助扩容线程】,持有nextTable参数transfer(tab, nt);}// 逻辑到这说明当前线程是触发扩容的第一个线程,线程数量 + 2// 1000 0000 0001 1011 0000 0000 0000 0000 +2 => 1000 0000 0001 1011 0000 0000 0000 0010else if (U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2))//【触发扩容条件的线程】,不持有 nextTable,初始线程会新建 nextTabletransfer(tab, null);s = sumCount();}}
}

resizeStamp( ):扩容标识符,每次扩容都会产生一个,不是每个线程都产生,16 扩容到 32 产生一个,32 扩容到 64 产生一个

/*** 扩容的标识符* 16 -> 32 从16扩容到32* numberOfLeadingZeros(16) => 1 0000 => 32 - 5 = 27 => 0000 0000 0001 1011* (1 << (RESIZE_STAMP_BITS - 1)) => 1000 0000 0000 0000 => 32768* ---------------------------------------------------------------* 0000 0000 0001 1011* 1000 0000 0000 0000* 1000 0000 0001 1011* 永远是负数*/
static final int resizeStamp(int n) {// 或运算return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1)); // (16 -1 = 15)
}

扩容方法

扩容机制:

  • 当链表中元素个数超过 8 个,数组的大小还未超过 64 时,此时进行数组的扩容,如果超过则将链表转化成红黑树
  • put 数据后调用 addCount( ) 方法,判断当前哈希表的容量超过阈值 sizeCtl,超过进行扩容
  • 增删改线程发现其他线程正在扩容,帮其扩容

transfer( ):数据转移到新表中,完成扩容

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {// n 表示扩容之前 table 数组的长度int n = tab.length, stride;// stride 表示分配给线程任务的步长,默认就是 16 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)stride = MIN_TRANSFER_STRIDE;// 如果当前线程为触发本次扩容的线程,需要做一些扩容准备工作,【协助线程不做这一步】if (nextTab == null) {try {// 创建一个容量是之前【二倍的 table 数组】Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];nextTab = nt;} catch (Throwable ex) {sizeCtl = Integer.MAX_VALUE;return;}// 把新表赋值给对象属性 nextTable,方便其他线程获取新表nextTable = nextTab;// 记录迁移数据整体位置的一个标记,transferIndex 计数从1开始不是 0,所以这里是长度,不是长度-1transferIndex = n;}// 新数组的长度int nextn = nextTab.length;// 当某个桶位数据处理完毕后,将此桶位设置为 fwd 节点,其它写线程或读线程看到后,可以从中获取到新表ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);// 推进标记boolean advance = true;// 完成标记boolean finishing = false;// i 表示分配给当前线程任务,执行到的桶位// bound 表示分配给当前线程任务的下界限制,因为是倒序迁移,16 迁移完 迁移 15,15完成去迁移14for (int i = 0, bound = 0;;) {Node<K,V> f; int fh;// 给当前线程【分配任务区间】while (advance) {// 分配任务的开始下标,分配任务的结束下标int nextIndex, nextBound;// --i 让当前线程处理下一个索引,true说明当前的迁移任务尚未完成,false说明线程已经完成或者还未分配if (--i >= bound || finishing)advance = false;// 迁移的开始下标,小于0说明没有区间需要迁移了,设置当前线程的 i 变量为 -1 跳出循环else if ((nextIndex = transferIndex) <= 0) {i = -1;advance = false;}// 逻辑到这说明还有区间需要分配,然后给当前线程分配任务,else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,// 判断区间是否还够一个步长,不够就全部分配nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {// 当前线程的结束下标bound = nextBound;// 当前线程的开始下标,上一个线程结束的下标的下一个索引就是这个线程开始的下标i = nextIndex - 1;// 任务分配结束,跳出循环执行迁移操作advance = false;}}// 【分配完成,开始数据迁移操作】// 【CASE1】:i < 0 成立表示当前线程未分配到任务,或者任务执行完了if (i < 0 || i >= n || i + n >= nextn) {int sc;// 如果迁移完成if (finishing) {nextTable = null;   // help GCtable = nextTab; // 新表赋值给当前对象sizeCtl = (n << 1) - (n >>> 1);// 扩容阈值为 2n - n/2 = 3n/2 = 0.75*(2n)return;}// 当前线程完成了分配的任务区间,可以退出,先把 sizeCtl 赋值给 sc 保留if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {// 判断当前线程是不是最后一个线程,不是的话直接 return,if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)return;// 所以最后一个线程退出的时候,sizeCtl 的低 16 位为 1finishing = advance = true;// 【这里表示最后一个线程需要重新检查一遍是否有漏掉的区间】i = n;}}// 【CASE2】:当前桶位未存放数据,只需要将此处设置为 fwd 节点即可。else if ((f = tabAt(tab, i)) == null)advance = casTabAt(tab, i, null, fwd);// 【CASE3】:说明当前桶位已经迁移过了,当前线程不用再处理了,直接处理下一个桶位即可else if ((fh = f.hash) == MOVED)advance = true; // 【CASE4】:当前桶位有数据,而且 node 节点不是 fwd 节点,说明这些数据需要迁移else {// 【锁住头节点】synchronized (f) {// 二次检查,防止头节点已经被修改了,因为这里才是线程安全的访问if (tabAt(tab, i) == f) {// 【迁移数据的逻辑,和 HashMap 相似】// ln 表示低位链表引用// hn 表示高位链表引用Node<K,V> ln, hn;// 哈希 > 0 表示当前桶位是链表桶位if (fh >= 0) {// 和 HashMap 的处理方式一致,与老数组长度相与,16 是 10000// 判断对应的 1 的位置上是 0 或 1 分成高低位链表int runBit = fh & n;Node<K,V> lastRun = f;// 遍历链表,寻找【逆序看】最长的对应位相同的链表,看下面的图更好的理解for (Node<K,V> p = f.next; p != null; p = p.next) {// 将当前节点的哈希 与 nint b = p.hash & n;// 如果当前值与前面节点的值 对应位 不同,则修改 runBit,把 lastRun 指向当前节点if (b != runBit) {runBit = b;lastRun = p;}}// 判断筛选出的链表是低位的还是高位的if (runBit == 0) {ln = lastRun;  // ln 指向该链表hn = null;      // hn 为 null}// 说明 lastRun 引用的链表为高位链表,就让 hn 指向高位链表头节点else {hn = lastRun;ln = null;}// 从头开始遍历所有的链表节点,迭代到 p == lastRun 节点跳出循环for (Node<K,V> p = f; p != lastRun; p = p.next) {int ph = p.hash; K pk = p.key; V pv = p.val;if ((ph & n) == 0)// 【头插法】,从右往左看,首先 ln 指向的是上一个节点,// 所以这次新建的节点的 next 指向上一个节点,然后更新 ln 的引用ln = new Node<K,V>(ph, pk, pv, ln);elsehn = new Node<K,V>(ph, pk, pv, hn);}// 高低位链设置到新表中的指定位置setTabAt(nextTab, i, ln);setTabAt(nextTab, i + n, hn);// 老表中的该桶位设置为 fwd 节点setTabAt(tab, i, fwd);advance = true;}// 条件成立:表示当前桶位是 红黑树结点else if (f instanceof TreeBin) {TreeBin<K,V> t = (TreeBin<K,V>)f;TreeNode<K,V> lo = null, loTail = null;TreeNode<K,V> hi = null, hiTail = null;int lc = 0, hc = 0;// 迭代 TreeBin 中的双向链表,从头结点至尾节点for (Node<K,V> e = t.first; e != null; e = e.next) {// 迭代的当前元素的 hashint h = e.hash;TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null);// 条件成立表示当前循环节点属于低位链节点if ((h & n) == 0) {if ((p.prev = loTail) == null)lo = p;else//【尾插法】loTail.next = p;// loTail 指向尾节点loTail = p;++lc;}else {if ((p.prev = hiTail) == null)hi = p;elsehiTail.next = p;hiTail = p;++hc;}}// 拆成的高位低位两个链,【判断是否需要需要转化为链表】,反之保持树化ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :(hc != 0) ? new TreeBin<K,V>(lo) : t;hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :(lc != 0) ? new TreeBin<K,V>(hi) : t;setTabAt(nextTab, i, ln);setTabAt(nextTab, i + n, hn);setTabAt(tab, i, fwd);advance = true;}}}}}
}

链表处理的 LastRun 机制,可以减少节点的创建

helpTransfer( ):帮助扩容机制

final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {Node<K,V>[] nextTab; int sc;// 数组不为空,节点是转发节点,获取转发节点指向的新表开始协助主线程扩容if (tab != null && (f instanceof ForwardingNode) &&(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {// 扩容标识戳int rs = resizeStamp(tab.length);// 判断数据迁移是否完成,迁移完成会把 新表赋值给 nextTable 属性while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) {if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || transferIndex <= 0)break;// 设置扩容线程数量 + 1if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {// 协助扩容transfer(tab, nextTab);break;}}return nextTab;}return table;
}

获取方法

ConcurrentHashMap 使用 get( ) 方法获取指定 key 的数据

get( ):获取指定数据的方法

public V get(Object key) {Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;// 扰动运算,获取 key 的哈希值int h = spread(key.hashCode());// 判断当前哈希表的数组是否初始化if ((tab = table) != null && (n = tab.length) > 0 &&// 如果 table 已经初始化,进行【哈希寻址】,映射到数组对应索引处,获取该索引处的头节点(e = tabAt(tab, (n - 1) & h)) != null) {// 对比头结点 hash 与查询 key 的 hash 是否一致if ((eh = e.hash) == h) {// 进行值的判断,如果成功就说明当前节点就是要查询的节点,直接返回if ((ek = e.key) == key || (ek != null && key.equals(ek)))return e.val;}// 当前槽位的【哈希值小于0】说明是红黑树节点或者是正在扩容的 fwd 节点else if (eh < 0)return (p = e.find(h, key)) != null ? p.val : null;// 当前桶位是【链表】,循环遍历查找while ((e = e.next) != null) {if (e.hash == h &&((ek = e.key) == key || (ek != null && key.equals(ek))))return e.val;}}return null;
}

ForwardingNode#find:转移节点的查找方法

Node<K,V> find(int h, Object k) {// 获取新表的引用outer: for (Node<K,V>[] tab = nextTable;;)  {// e 表示在扩容而创建新表使用寻址算法得到的桶位头结点,n 表示为扩容而创建的新表的长度Node<K,V> e; int n;if (k == null || tab == null || (n = tab.length) == 0 ||// 在新表中重新定位 hash 对应的头结点,表示在 oldTable 中对应的桶位在迁移之前就是 null(e = tabAt(tab, (n - 1) & h)) == null)return null;for (;;) {int eh; K ek;// 【哈希相同值也相同】,表示新表当前命中桶位中的数据,即为查询想要数据if ((eh = e.hash) == h && ((ek = e.key) == k || (ek != null && k.equals(ek))))return e;// eh < 0 说明当前新表中该索引的头节点是 TreeBin 类型,或者是 FWD 类型if (eh < 0) {// 在并发很大的情况下新扩容的表还没完成可能【再次扩容】,在此方法处再次拿到 FWD 类型if (e instanceof ForwardingNode) {// 继续获取新的 fwd 指向的新数组的地址,递归了tab = ((ForwardingNode<K,V>)e).nextTable;continue outer;}else// 说明此桶位为 TreeBin 节点,使用TreeBin.find 查找红黑树中相应节点。return e.find(h, k);}// 逻辑到这说明当前桶位是链表,将当前元素指向链表的下一个元素,判断当前元素的下一个位置是否为空if ((e = e.next) == null)// 条件成立说明迭代到链表末尾,【未找到对应的数据,返回 null】return null;}}
}

删除方法

remove( ):删除指定元素

public V remove(Object key) {return replaceNode(key, null, null);
}

replaceNode( ):替代指定的元素,会协助扩容,增删改(写)都会协助扩容,只有查询(读)操作不会,因为读操作不涉及加锁

final V replaceNode(Object key, V value, Object cv) {// 计算 key 扰动运算后的 hashint hash = spread(key.hashCode());// 开始自旋for (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh;// 【CASE1】:table 还未初始化或者哈希寻址的数组索引处为 null,直接结束自旋,返回 nullif (tab == null || (n = tab.length) == 0 || (f = tabAt(tab, i = (n - 1) & hash)) == null)break;// 【CASE2】:条件成立说明当前 table 正在扩容,【当前是个写操作,所以当前线程需要协助 table 完成扩容】else if ((fh = f.hash) == MOVED)tab = helpTransfer(tab, f);// 【CASE3】:当前桶位可能是 链表 也可能是 红黑树 else {// 保留替换之前数据引用V oldVal = null;// 校验标记boolean validated = false;// 【加锁当前桶位头结点】,加锁成功之后会进入代码块synchronized (f) {// 双重检查if (tabAt(tab, i) == f) {// 说明当前节点是链表节点if (fh >= 0) {validated = true;//遍历所有的节点for (Node<K,V> e = f, pred = null;;) {K ek;// hash 和值都相同,定位到了具体的节点if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {// 当前节点的valueV ev = e.val;if (cv == null || cv == ev ||(ev != null && cv.equals(ev))) {// 将当前节点的值 赋值给 oldVal 后续返回会用到oldVal = ev;if (value != null)       // 条件成立说明是替换操作e.val = value;   else if (pred != null) // 非头节点删除操作,断开链表pred.next = e.next; else// 说明当前节点即为头结点,将桶位头节点设置为以前头节点的下一个节点setTabAt(tab, i, e.next);}break;}pred = e;if ((e = e.next) == null)break;}}// 说明是红黑树节点else if (f instanceof TreeBin) {validated = true;TreeBin<K,V> t = (TreeBin<K,V>)f;TreeNode<K,V> r, p;if ((r = t.root) != null &&(p = r.findTreeNode(hash, key, null)) != null) {V pv = p.val;if (cv == null || cv == pv ||(pv != null && cv.equals(pv))) {oldVal = pv;// 条件成立说明替换操作if (value != null)p.val = value;// 删除操作else if (t.removeTreeNode(p))setTabAt(tab, i, untreeify(t.first));}}}}}// 其他线程修改过桶位头结点时,当前线程 sync 头结点锁错对象,validated 为 false,会进入下次 for 自旋if (validated) {if (oldVal != null) {// 替换的值为 null,【说明当前是一次删除操作,更新当前元素个数计数器】if (value == null)addCount(-1L, -1);return oldVal;}break;}}}return null;
}

JDK1.7 ConcurrentHashMap

ConcurrentHashMap 对锁粒度进行了优化,分段锁技术,将整张表分成了多个数组(Segment),每个数组又是一个类似 HashMap 数组的结构。允许多个修改操作并发进行,Segment 是一种可重入锁,继承 ReentrantLock,并发时锁住的是每个 Segment,其他 Segment 还是可以操作的,这样不同 Segment 之间就可以实现并发,大大提高效率。

底层结构: Segment 数组 + HashEntry 数组 + 链表(数组 + 链表是 HashMap 的结构)

  • 优点:如果多个线程访问不同的 segment,实际是没有冲突的,这与 JDK8 中是类似的
  • 缺点:Segments 数组默认大小为16,这个容量初始化指定后就不能改变了,并且不是懒惰初始化

LinkedBlockingQueue 原理

基本的入队出队

public class LinkedBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {static class Node<E> {E item;/*** 下列三种情况之一* - 真正的后继节点* - 自己, 发生在出队时* - null, 表示是没有后继节点, 是最后了*/Node<E> next;Node(E x) { item = x; }}
}

入队操作

  • 初始化链表 last = head = new Node(null); Dummy 节点用来占位,item 为 null

  • 当一个节点入队 last = last.next = node;

  • 再来一个节点入队 last = last.next = node;

出队操作

Node<E> h = head;
Node<E> first = h.next; h.next = h; // help GC
head = first; E x = first.item;
first.item = null;
return x;
  • h = head

  • first = h.next

  • h.next = h,帮助进行垃圾回收

  • head = first

E x = first.item;
first.item = null;
return x;

加锁分析

高明之处在于用了两把锁dummy 节点

  • 用一把锁,同一时刻,最多只允许有一个线程(生产者或消费者,二选一)执行
  • 用两把锁,同一时刻,可以允许两个线程同时(一个生产者与一个消费者)执行
    • 消费者与消费者线程仍然串行
    • 生产者与生产者线程仍然串行

线程安全分析

  • 当节点总数大于 2 时(包括 dummy 节点),putLock 保证的是 last 节点的线程安全takeLock 保证的是 head 节点的线程安全。两把锁保证了入队和出队没有竞争
  • 当节点总数等于 2 时(即一个 dummy 节点,一个正常节点)这时候,仍然是两把锁锁两个对象,不会竞争
  • 当节点总数等于 1 时(就一个 dummy 节点)这时 take 线程会被 notEmpty 条件阻塞,有竞争,会阻塞
// 用于 put(阻塞) offer(非阻塞)
private final ReentrantLock putLock = new ReentrantLock();// 用户 take(阻塞) poll(非阻塞)
private final ReentrantLock takeLock = new ReentrantLock();

put 操作

public void put(E e) throws InterruptedException {// 不允许有空元素if (e == null) throw new NullPointerException();// c 用于检查空位的,初始值是-1int c = -1;// 创建node节点Node<E> node = new Node<E>(e);// put锁final ReentrantLock putLock = this.putLock;// count 用来维护元素计数final AtomicInteger count = this.count;// 上锁putLock.lockInterruptibly();try {// 满了等待while (count.get() == capacity) {// 倒过来读就好: 等待 notFullnotFull.await();}// 有空位, 入队且计数加一enqueue(node);// 返回加1前的值c = count.getAndIncrement();// 除了自己 put 以外, 如果队列还有空位, 由自己(当前put线程)叫醒其他 put 线程if (c + 1 < capacity)notFull.signal();} finally {// 解锁putLock.unlock();}// 如果队列中只有一个元素, 叫醒 take 线程if (c == 0)// 这里调用的是 notEmpty.signal() 而不是 notEmpty.signalAll() 是为了减少竞争signalNotEmpty();}

take 操作

public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;// take 锁final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {while (count.get() == 0) {notEmpty.await();}// 出队且计数减1x = dequeue();// 返回减1前的值c = count.getAndDecrement();if (c > 1)notEmpty.signal();} finally {// 解锁takeLock.unlock();}// 如果队列中只有一个空位时, 叫醒 put 线程// 如果有多个线程进行出队, 第一个线程满足 c == capacity, 但后续线程 c < capacityif (c == capacity)// 这里调用的是 notFull.signal() 而不是 notFull.signalAll() 是为了减少竞争signalNotFull();return x;
}

注意:由 put 唤醒 put 是为了避免信号不足

性能比较

主要列举 LinkedBlockingQueue 与 ArrayBlockingQueue 的性能比较

  • Linked 支持有界,Array 强制有界
  • Linked 实现是链表,Array 实现是数组
  • Linked 是懒惰的,而 Array 需要提前初始化 Node 数组
  • Linked 每次入队会生成新 Node,而 Array 的 Node 是提前创建好的
  • Linked 两把锁,Array 一把锁

ConcurrentLinkedQueue

ConcurrentLinkedQueue 的设计与 LinkedBlockingQueue 非常像,也是

  • 两把【锁】,同一时刻,可以允许两个线程同时(一个生产者与一个消费者)执行
  • dummy 节点的引入让两把【锁】将来锁住的是不同对象,避免竞争
  • 只是这【锁】是使用了 cas 来实现

事实上,ConcurrentLinkedQueue 应用还是非常广泛的

  • 例如之前讲的 Tomcat 的 Connector 结构时,Acceptor 作为生产者向 Poller 消费者传递事件信息时,正是采用了ConcurrentLinkedQueue 将 SocketChannel 封装成一个对象 给 Poller 使用

CopyOnWriteArrayList

原理分析

  • CopyOnWriteArraySet 是它的马甲
  • CopyOnWriteArrayList 采用了**写入时拷贝的思想,增删改操作会将底层数组拷贝一份,在新数组上执行操作,不影响其它线程的并发读,读写分离**
  • CopyOnWriteArraySet 底层对 CopyOnWriteArrayList 进行了包装,装饰器模式
public CopyOnWriteArraySet() {al = new CopyOnWriteArrayList<E>();
}

存储结构:

// volatile 保证了读写线程之间的可见性
private transient volatile Object[] array;

全局锁:保证线程的执行安全

final transient ReentrantLock lock = new ReentrantLock();

新增数据:需要加锁,创建新的数组操作

public boolean add(E e) {final ReentrantLock lock = this.lock;// 加锁,保证线程安全lock.lock();try {// 获取旧的数组Object[] elements = getArray();int len = elements.length;// 【拷贝新的数组(这里是比较耗时的操作,但不影响其它读线程)】Object[] newElements = Arrays.copyOf(elements, len + 1);// 添加新元素newElements[len] = e;// 替换旧的数组,【这个操作以后,其他线程获取数组就是获取的新数组了】setArray(newElements);return true;} finally {lock.unlock();}
}

读操作:不加锁,在原数组上操作

  • 适合读多写少的应用场景
public E get(int index) {return get(getArray(), index);
}private E get(Object[] a, int index) {return (E) a[index];
}

迭代器:CopyOnWriteArrayList 在返回迭代器时,创建一个该内部数组当前的快照(引用),即使其他线程替换了原始数组,迭代器遍历的快照依然引用的是创建快照时的数组,所以这种实现方式也存在一定的数据延迟性,对其他线程并行添加的数据不可见

public Iterator<E> iterator() {// 获取到数组引用,整个遍历的过程该数组都不会变,一直引用的都是老数组,return new COWIterator<E>(getArray(), 0);
}// 迭代器会创建一个底层array的快照,故主类的修改不影响该快照
static final class COWIterator<E> implements ListIterator<E> {// 内部数组快照private final Object[] snapshot;private COWIterator(Object[] elements, int initialCursor) {cursor = initialCursor;// 数组的引用在迭代过程不会改变snapshot = elements;}// 【不支持写操作】,因为是在快照上操作,无法同步回去public void remove() {throw new UnsupportedOperationException();}
}

get 弱一致性

数据一致性就是读到最新更新的数据:

  • 强一致性:当更新操作完成之后,任何多个后续进程或者线程的访问都会返回最新的更新过的值
  • 弱一致性:系统并不保证进程或者线程的访问都会返回最新的更新过的值,也不会承诺多久之后可以读到

时间点 操作
1 Thread-0 getArray( )
2 Thread-1 getArray( )
3 Thread-1 setArray(arrayCopy)
4 Thread-0 array[index]
  • Thread-0 读到了脏数据

不一定弱一致性就不好

  • 数据库的事务隔离级别就是弱一致性的表现
  • 并发高和一致性是矛盾的,需要权衡

安全失败

在 java.util 包的集合类就都是快速失败的,而 java.util.concurrent 包下的类都是安全失败

  • 快速失败:在 A 线程使用迭代器对集合进行遍历的过程中,此时 B 线程对集合进行修改(增删改),或者 A 线程在遍历过程中对集合进行修改,都会导致 A 线程抛出 ConcurrentModificationException 异常

    • AbstractList 类中的成员变量 modCount,用来记录 List 结构发生变化的次数,结构发生变化是指添加或者删除至少一个元素的操作,或者是调整内部数组的大小,仅仅设置元素的值不算结构发生变化
    • 在进行序列化或者迭代等操作时,需要比较操作前后 modCount 是否改变,如果改变了抛出 CME 异常
  • 安全失败:采用安全失败机制的集合容器,在迭代器遍历时直接在原集合数组内容上访问,但其他线程的增删改都会新建数组进行修改,就算修改了集合底层的数组容器,迭代器依然引用着以前的数组(快照思想),所以不会出现异常
  • ConcurrentHashMap 不会出现并发时的迭代异常,因为在迭代过程中 CHM 的迭代器并没有判断结构的变化,迭代器还可以根据迭代的节点状态去寻找并发扩容时的新表进行迭代
ConcurrentHashMap map = new ConcurrentHashMap();
// KeyIterator
Iterator iterator = map.keySet().iterator();
Traverser(Node<K,V>[] tab, int size, int index, int limit) {// 引用还是原来集合的 Node 数组,所以其他线程对数据的修改是可见的this.tab = tab;this.baseSize = size;this.baseIndex = this.index = index;this.baseLimit = limit;this.next = null;}
public final boolean hasNext() { return next != null; }
public final K next() {Node<K,V> p;if ((p = next) == null)throw new NoSuchElementException();K k = p.key;lastReturned = p;// 在方法中进行下一个节点的获取,会进行槽位头节点的状态判断advance();return k;
}

第九章 - 线程安全集合类相关推荐

  1. Windows核心编程 第九章 线程与内核对象的同步(上)

    第9章 线程与内核对象的同步 上一章介绍了如何使用允许线程保留在用户方式中的机制来实现线程同步的方法.用户方式同步的优点是它的同步速度非常快.如果强调线程的运行速度,那么首先应该确定用户方式的线程同步 ...

  2. Windows核心编程 第九章 线程与内核对象的同步(下)

    9.4 等待定时器内核对象 等待定时器是在某个时间或按规定的间隔时间发出自己的信号通知的内核对象.它们通常用来在某个时间执行某个操作. 若要创建等待定时器,只需要调用C r e a t e Wa i ...

  3. Windows PE第九章 线程局部存储

    线程局部存储(TLS) 这个东西并不陌生了,之前写过了一个关于这个的应用,利用静态TLS姿势实现代码段静态加密免杀或者所谓的加壳思路.地址在这:http://blog.csdn.net/u013761 ...

  4. 第九章 线程与内核对象的同步(6)

    六.其他的线程同步函数 1.异步设备I/O 异步设备I/O使得线程能够启动一个读操作或写操作,但是不必等待读操作或写操作完成.设备对象是可以同步的内核对象,可以调用WaitForSingleObjec ...

  5. java2第九章的总结_java并发的艺术-读书笔记-第九章线程池

    使用线程池的好处: 1.降低资源消耗:减少了线程创建和销毁的资源消耗 2.提高响应速度,当任务到达时,线程可以不尽兴创建直接处理 3.提高线程的可管理性.使用线程池可以对线程进行统一的管理,监控,使用 ...

  6. 第九章 线程与内核对象的同步(4)

    四.信标内核对象 信标内核对象用于资源进行计数.包含:引用计数.最大资源数量(用于标识信标能够控制的资源的最大数量).当期资源数量(用于标识当前可以使用的资源的数量). 信标的使用规则:当前资源数量大 ...

  7. CoreJava 笔记总结-第九章 集合

    第九章 集合 文章目录 第九章 集合 `Java`集合框架 集合接口与实现分离 `Collection`接口 迭代器 泛型实用方法 集合框架中的接口 具体集合 链表 数组列表 散列集 树集 优先队列 ...

  8. 深入理解Java虚拟机(第二版) 第九章:类加载及执行子系统的案例与实战

    第九章 类加载及执行子系统的案例与实战 9.1 概述 9.2 Tomcat: 正统的类加载器架构 9.3 OSGi:灵活的类加载器架构 9.4 字节码生成技术与动态代理的实现 9.5 Retrotra ...

  9. PE学习(九)第九章:TLS 动态TLS与静态TLS

    第九章:线程局部存储 PEB,在NT中,该结构可以从进程空间的FS:[0x30]处找到,PEB描述的信息主要包括:进程状态.进程堆.PE映像信息等,其中Ldr记录了进程加载进内存的所有模块的基地址. ...

最新文章

  1. 建立于因果推理与机器学习共识的稳定学习
  2. R使用交叉验证(cross validation)进行机器学习模型性能评估
  3. 原创 | 浅议数据资产市场
  4. 缓存方式之cookie的使用
  5. SIFT-FCACO算法的图像配准
  6. gdb加载python_gdb加载python脚本的方法
  7. lnmp解析php,LNMP之 php解析
  8. 交换字典的key和value
  9. 我的账号 小米云服务器地址,小米云服务登录
  10. 程序员的编辑器 notepad++ || XML编辑器
  11. 《网络协议分析与设计》实验报告书 实验一
  12. Check Point R80.10 SmartConsole汉化生成中文报表
  13. OpenBot开源小车
  14. Proteus仿真——用74LS194设计一个8个灯的流水灯电路
  15. 偏最小二乘,主成分分析,主成分回归,奇异值之间的关系
  16. Editplus各个版本(最新版本是5.3)注册码下载
  17. 利用XSL和ASP在线编辑XML文档
  18. 【数学建模】6 近十年江西省研究生建模赛题及近三年全国建模赛题目录
  19. JavaScript实现网站首页动态效果
  20. “识时务者为俊杰”------hao123峰会给渠道商们的启示

热门文章

  1. 用php处理wps文档,wps怎么解除限制编辑
  2. netty银行账目管理系统_基于JSP技术银行账目管理系统设计
  3. Linux capstone 反汇编引擎使用方法
  4. 算法小抄9-快慢指针
  5. 2017北大信科夏令营机试A:判决素数个数
  6. GO学习项目---家庭收支记账软件
  7. hue集成hive详解
  8. 论文的发表流程、会议的组织流程
  9. 离线环境使用leanote(蚂蚁笔记)桌面版
  10. 多地楼市库存续升凸显调控效果 下半年需关注钱流