个人学习源码的思路:

  1. 使用ctrl+单机进入源码,并阅读源码的官方文档–>大致的了解一下此类的特点和功能
  2. 使用ALIT+7查看类中所有方法–>大致的看一下此类的属性和方法
  3. 找到重要方法并阅读源码–> 如HashMap的put,resize,get
  • 粗看–>源码的实现思路
  • 细看–>背后原理和代码风格
  1. 有余力可以再阅读其他方法源码

ConcurrentHashMap源码分析(59700字)

  • 一、JDK1.7的ConcurrentHashMap
    • [1] 数据结构
    • [2] 初始化
    • [3] put方法
      • 1. 第一层put方法
      • 2. 第二层put方法
      • 3. ensureSegment()
      • 4. scanAndLockForPut()
    • [4] rehash 流程
    • [5] get方法
    • [5] 要点总结
    • [6] size 计算流程
  • 二、JDK1.8的ConcurrentHashMap
    • [1] 官方文档
    • [2] 数据结构
    • [3] 重要属性和方法
    • [4] put方法
    • [5] 深入分析扩容实现
      • 1.tryPresize()
      • 2.transfer()
      • 3.helpTransfer()
    • [6] 深入分析get方法与无锁并发
    • [7] size源码
    • [8] 小结
    • [9] 要点总结

参考文章:

ConcurrentHashMap源码分析(1.8):https://www.cnblogs.com/zerotomax/p/8687425.html

《吊打面试官》系列:https://zhuanlan.zhihu.com/p/97902016

ConcurrentHashMap实现原理及源码分析:https://www.cnblogs.com/chengxiao/p/6842045.html

Java并发编程:https://www.bilibili.com/video/BV1jE411j7uX?p=291

并发编程——ConcurrentHashMap#helpTransfer() 分析:https://www.jianshu.com/p/39b747c99d32

ConcurrentHashMap源码分析:https://www.jianshu.com/p/f9b3e76951c2

Java7/8 中的 HashMap 和 ConcurrentHashMap 全解析:https://www.javadoop.com/post/hashmap

源码分析ConcurrentHashMap(jdk1.7 和jdk1.8):https://blog.csdn.net/shenxinmou1661/article/details/97313596?utm_medium=distribute.pc_relevant.none-task-blog-baidujs-1

一、JDK1.7的ConcurrentHashMap

[1] 数据结构

ConcurrentHashMap 底层在jdk1.7中是采用**Segment(Segment继承了ReentrantLock) + HashEntry(类似于hashmap,内部结构为数组加链表) **的方式进行实现的。

每个 segment 对应一把锁,Segment继承了ReentrantLock,所以它就是一种可重入锁(ReentrantLock)。在ConcurrentHashMap,一个Segment就是一个子哈希表,Segment里维护了一个HashEntry数组,并发环境下,对于不同Segment的数据进行操作是不用考虑锁竞争的(就按默认的ConcurrentLeve为16来讲,理论上就允许16个线程并发执行,有木有很酷)。所以,对于同一个Segment的操作才需考虑线程同步,不同的Segment则无需考虑。

优点:如果多个线程访问不同的 segment,实际是没有冲突的,这与 jdk8 中是类似的

缺点:Segments 数组默认大小为16,这个容量初始化指定后就不能改变了,并且不是懒惰初始化。

主要的组成如下:

Segment和HashEntry结构:

static final class Segment<K,V> extends ReentrantLock implements Serializable {private static final long serialVersionUID = 2249069246763182397L;// 和 HashMap 中的 HashEntry 作用一样,真正存放数据的桶transient volatile HashEntry<K,V>[] table;transient int count;// 记得快速失败(fail—fast)么?transient int modCount;// 大小transient int threshold;// 负载因子final float loadFactor;
}
static final class HashEntry<K,V> {final int hash;final K key;volatile V value;volatile HashEntry<K,V> next;//其他省略
}

ConcurrentHashMap初始化方法有三个参数,如果用户不指定则会使用默认值,initialCapacity为16,loadFactor为0.75(负载因子,扩容时需要参考),concurrentLevel为16。

从下面的代码可以看出来,Segment数组的大小ssize是由concurrentLevel来决定的,但是却不一定等于concurrentLevel,ssize一定是大于或等于concurrentLevel的最小的2的次幂。

比如:默认情况下concurrentLevel是16,则ssize为16;若concurrentLevel为14,ssize为16;若concurrentLevel为17,则ssize为32。为什么Segment的数组大小一定是2的次幂?其实主要是便于通过按位与的散列算法来定位Segment的index。至于更详细的原因,有兴趣的话可以参考另一篇文章《HashMap实现原理及源码分析》,其中对于数组长度为什么一定要是2的次幂有较为详细的分析。

[2] 初始化

initialCapacity:初始容量,这个值指的是整个 ConcurrentHashMap 的初始容量,实际操作的时候需要平均分给每个 Segment。

loadFactor:负载因子,之前我们说了,Segment 数组不可以扩容,所以这个负载因子是给每个 Segment 内部使用的。

public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)throw new IllegalArgumentException();//最大分段锁if (concurrencyLevel > MAX_SEGMENTS)concurrencyLevel = MAX_SEGMENTS;// Find power-of-two sizes best matching arguments//找到两个大小的幂最佳匹配参数int sshift = 0;int ssize = 1;// 计算并行级别 ssize,因为要保持并行级别是 2 的 n 次方while (ssize < concurrencyLevel) {++sshift;ssize <<= 1;}// 我们这里先不要那么烧脑,用默认值,concurrencyLevel 为 16,sshift 为 4// 那么计算出 segmentShift 为 28,segmentMask 为 15,后面会用到这两个值this.segmentShift = 32 - sshift;this.segmentMask = ssize - 1;//最大容量if (initialCapacity > MAXIMUM_CAPACITY)initialCapacity = MAXIMUM_CAPACITY;// initialCapacity 是设置整个 map 初始的大小,// 这里根据 initialCapacity 计算 Segment 数组中每个位置可以分到的大小// 如 initialCapacity 为 64,那么每个 Segment 或称之为"槽"可以分到 4 个int c = initialCapacity / ssize;if (c * ssize < initialCapacity)++c;// 默认 MIN_SEGMENT_TABLE_CAPACITY 是 2,这个值也是有讲究的,因为这样的话,对于具体的槽上,// 插入一个元素不至于扩容,插入第二个的时候才会扩容int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c)cap <<= 1;// 创建 Segment 数组,// 并创建数组的第一个元素 segment[0]Segment<K,V> s0 =new Segment<K,V>(loadFactor, (int)(cap * loadFactor),(HashEntry<K,V>[])new HashEntry[cap]);Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];// 往数组写入 segment[0]UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]this.segments = ss;
}

初始化完成,我们得到了一个 Segment 数组。

我们就当是用 new ConcurrentHashMap() 无参构造函数进行初始化的,那么初始化完成后:

  • Segment 数组长度为 16,不可以扩容
  • Segment[i] 的默认大小为 2,负载因子是 0.75,得出初始阈值为 1.5,也就是以后插入第一个元素不会触发扩容,插入第二个会进行第一次扩容
  • 这里初始化了 segment[0],其他位置还是 null,至于为什么要初始化 segment[0],后面的代码会介绍
  • 当前 segmentShift 的值为 32 - 4 = 28,segmentMask 为 16 - 1 = 15,姑且把它们简单翻译为移位数掩码,这两个值马上就会用到
[3] put方法
1. 第一层put方法

这里说的是外层的put方法,但是内部仍然有put方法。从源码看出,put的主要逻辑也就两步:1.定位segment并确保定位的Segment已初始化 2.调用Segment的put方法。

 public V put(K key, V value) {Segment<K,V> s;//1. 首先校验value不能为空if (value == null)throw new NullPointerException();//2. 计算key的hash值int hash = hash(key);//3. 根据hash值找到Segment数组中的位置j,  j为hash的高四位int j = (hash >>> segmentShift) & segmentMask;//4. 如果ensureSegment(j)为空,则对 segment[j] 进行初始化if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck(segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegments = ensureSegment(j);//初始化segment[j]//5. 将新值插入到槽s[j]中return s.put(key, hash, value, false);//进入segment的put方法}

如何定位segment?

segmentShift和segmentMask这两个全局变量的主要作用是用来定位Segment, 算出key的hash值,然后右移segmentShift(segmentShift=32-sshift,2的sshift次方等于ssize)位,然后与segmentMask(segments数组长度-1)进行与操作

  • int j =(hash >>> segmentShift) & segmentMask 类似int j=hashcode&(n-1)

  • segmentShift:2的sshift次方等于ssize,segmentShift=32-sshift。若segments长度为16,segmentShift=32-4=28;若segments长度为32,segmentShift=32-5=27。而计算得出的hash值最大为32位,无符号右移segmentShift,则意味着只保留高几位(其余位是没用的),然后与段掩码segmentMask位运算来定位Segment。

  • segmentMask:段掩码,假如segments数组长度为16,则段掩码为16-1=15;segments长度为32,段掩码为32-1=31。这样得到的所有bit位都为1,可以更好地保证散列的均匀性

  • &意思

    1. 逻辑上表示and (和)的意思。A & B表示A、B两种元素du缺一不可。

    2. &和&&都可以用作逻辑与的运算符,表示逻辑与(and),当运算符两边的表达式的结果都为true时,整个运算结果才为true,否则,只要有一方为false,则结果为false。

    3. 取模就是求余数的运算,例如10除以4的余数是2,于是取模的结果就是2。通常取模运算也叫取余运算,它们返回结果都是余数 .remmod 唯一的区别在于:当 x 和 y 的正负号一样的时候,两个函数结果是等同的;当 x 和 y 的符号不同时,rem 函数结果的符号和 x 的一样,而 mod 和 y 一样。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-oJfpmjvL-1596600177691)(X:\Users\xu\AppData\Roaming\Typora\typora-user-images\image-20200718112425451.png)]

2. 第二层put方法

​ 他先定位到Segment,然后再进行put操作。首先第一步的时候会尝试获取锁,如果获取失败肯定就有其他线程存在竞争,则利用 scanAndLockForPut() 自旋获取锁,如果重试的次数达到了 MAX_SCAN_RETRIES (64次)则改为阻塞锁获取,保证能获取成功。

get方法无需加锁,由于其中涉及到的共享变量都使用volatile修饰,volatile可以保证内存可见性,所以不会读取到过期数据。

 /*** segment的put方法* 方法第一步:获取独占锁*/final V put(K key, int hash, V value, boolean onlyIfAbsent) {//在往segment中写入前要先获得该segment的独占锁HashEntry<K,V> node = tryLock() ? null :scanAndLockForPut(key, hash, value);V oldValue;try {//segment内部数组HashEntry<K,V>[] tab = table;//再利用hash值求内部数组对应存放位置下标int index = (tab.length - 1) & hash;//first为数组该位置处的链表表头HashEntry<K,V> first = entryAt(tab, index);for (HashEntry<K,V> e = first;;) {if (e != null) {K k;if ((k = e.key) == key ||(e.hash == hash && key.equals(k))) {oldValue = e.value;if (!onlyIfAbsent) {//替换旧值e.value = value;++modCount;}break;}e = e.next;}else {if (node != null)node.setNext(first);else//没有找到相同的key,则将新结点插入到链表表头node = new HashEntry<K,V>(hash, key, value, first);int c = count + 1;//如果超过了segment阈值,则需要扩容if (c > threshold && tab.length < MAXIMUM_CAPACITY)rehash(node);elsesetEntryAt(tab, index, node);++modCount;count = c;oldValue = null;break;}}} finally {//解锁unlock();}return oldValue;}
3. ensureSegment()

ConcurrentHashMap 初始化的时候会初始化第一个槽 segment[0],对于其他槽来说,在插入第一个值的时候进行初始化。

这里需要考虑并发,因为很可能会有多个线程同时进来初始化同一个槽 segment[k],不过只要有一个成功了就可以。

 /*** 作用:初始化segment[k]* 过程:循环使用CAS操作进行并发控制*/@SuppressWarnings("unchecked")private Segment<K,V> ensureSegment(int k) {final Segment<K,V>[] ss = this.segments;long u = (k << SSHIFT) + SBASE; // raw offsetSegment<K,V> seg;if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {//使用ss[0]作为原型Segment<K,V> proto = ss[0]; // use segment 0 as prototypeint cap = proto.table.length;float lf = proto.loadFactor;int threshold = (int)(cap * lf);//初始化segment[k]内部数组HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];//再次检查该槽是否为空if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))== null) { // recheckSegment<K,V> s = new Segment<K,V>(lf, threshold, tab);//使用循环CAS 初始化while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))== null) {if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))break;}}}return seg;}

总的来说,ensureSegment(int k) 比较简单,对于并发操作使用 CAS 进行控制。

我没搞懂这里为什么要搞一个 while 循环,CAS 失败不就代表有其他线程成功了吗,为什么要再进行判断?

感谢评论区的李子木,如果当前线程 CAS 失败,这里的 while 循环是为了将 seg 赋值返回。

4. scanAndLockForPut()

获取写入锁

前面我们看到,在往某个 segment 中 put 的时候,首先会调用 node = tryLock() ? null : scanAndLockForPut(key, hash, value),也就是说先进行一次 tryLock() 快速获取该 segment 的独占锁,如果失败,那么进入到 scanAndLockForPut 这个方法来获取锁。

下面我们来具体分析这个方法中是怎么控制加锁的。

     /*** tryLock获取锁失败,则通过该方法获得锁。* 该方法有两个出口:1. tryLock方法成功,循环结束   2. 重试次数大于MAX_SCAN_RETRIES,进入lock方法,此方法会阻塞等待,知道成功拿到独占锁,然后break中断循环* 这个方法就是看似复杂,但是其实就是做了一件事,那就是获取该 segment 的独占锁,如果需要的话顺便实例化了一下 node。*/private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {HashEntry<K,V> first = entryForHash(this, hash);HashEntry<K,V> e = first;HashEntry<K,V> node = null;int retries = -1; // negative while locating nodewhile (!tryLock()) {HashEntry<K,V> f; // to recheck first belowif (retries < 0) {if (e == null) {if (node == null) // speculatively create node//进入这里,说明数组该位置的链表为空,没有任何元素node = new HashEntry<K,V>(hash, key, value, null);retries = 0;}else if (key.equals(e.key))retries = 0;elsee = e.next;}//重试次数大于MAX_SCAN_RETRIES:单核1,多核64,则不抢了,进入阻塞队列等待//lock为阻塞方法,直到获取锁后返回else if (++retries > MAX_SCAN_RETRIES) {lock();break;}else if ((retries & 1) == 0 &&//当有新的元素进来成为表头,就需要重新走一遍scanAndLockForPut方法(f = entryForHash(this, hash)) != first) {e = first = f; // re-traverse if entry changedretries = -1;}}return node;}

这个方法有两个出口,一个是 tryLock() 成功了,循环终止,另一个就是重试次数超过了 MAX_SCAN_RETRIES,进到 lock() 方法,此方法会阻塞等待,直到成功拿到独占锁。

这个方法就是看似复杂,但是其实就是做了一件事,那就是获取该 segment 的独占锁,如果需要的话顺便实例化了一下 node。

[4] rehash 流程

segment 数组不能扩容,扩容是 segment 数组某个位置内部的数组 HashEntry<K,V>[] 进行扩容,扩容后,容量为原来的 2 倍。我们要回顾一下触发扩容的地方,put 的时候,如果判断该值的插入会导致该 segment 的元素个数超过阈值,那么先进行扩容,再插值,读者这个时候可以回去 put 方法看一眼。

该方法不需要考虑并发,因为到这里的时候,是持有该 segment 的独占锁的。

    /**这里的扩容比之前的 HashMap 要复杂一些,代码难懂一点。有两个挨着的 for 循环,第一个 for 有什么用呢?仔细一看发现,如果没有第一个 for 循环,也是可以工作的,但是,这个 for 循环下来,如果 lastRun 的后面还有比较多的节点,那么这次就是值得的。因为我们只需要克隆 lastRun 前面的节点,后面的一串节点跟着 lastRun 走就是了,不需要做任何操作。我觉得 Doug Lea 的这个想法也是挺有意思的,不过比较坏的情况就是每次 lastRun 都是链表的最后一个元素或者很靠后的元素,那么这次遍历就有点浪费了。不过 Doug Lea 也说了,根据统计,如果使用默认的阈值,大约只有 1/6 的节点需要克隆。*/
// 方法参数上的 node 是这次扩容后,需要添加到新的数组中的数据。
private void rehash(HashEntry<K,V> node) {HashEntry<K,V>[] oldTable = table;int oldCapacity = oldTable.length;// 2 倍int newCapacity = oldCapacity << 1;threshold = (int)(newCapacity * loadFactor);// 创建新数组HashEntry<K,V>[] newTable =(HashEntry<K,V>[]) new HashEntry[newCapacity];// 新的掩码,如从 16 扩容到 32,那么 sizeMask 为 31,对应二进制 ‘000...00011111’int sizeMask = newCapacity - 1;// 遍历原数组,老套路,将原数组位置 i 处的链表拆分到 新数组位置 i 和 i+oldCap 两个位置for (int i = 0; i < oldCapacity ; i++) {// e 是链表的第一个元素HashEntry<K,V> e = oldTable[i];if (e != null) {HashEntry<K,V> next = e.next;// 计算应该放置在新数组中的位置,// 假设原数组长度为 16,e 在 oldTable[3] 处,那么 idx 只可能是 3 或者是 3 + 16 = 19int idx = e.hash & sizeMask;if (next == null)   // 该位置处只有一个元素,那比较好办newTable[idx] = e;else { // Reuse consecutive sequence at same slot// e 是链表表头HashEntry<K,V> lastRun = e;// idx 是当前链表的头结点 e 的新位置int lastIdx = idx;// 下面这个 for 循环会找到一个 lastRun 节点,这个节点之后的所有元素是将要放到一起的for (HashEntry<K,V> last = next;last != null;last = last.next) {int k = last.hash & sizeMask;if (k != lastIdx) {lastIdx = k;lastRun = last;}}// 将 lastRun 及其之后的所有节点组成的这个链表放到 lastIdx 这个位置newTable[lastIdx] = lastRun;// 下面的操作是处理 lastRun 之前的节点,//    这些节点可能分配在另一个链表中,也可能分配到上面的那个链表中for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {V v = p.value;int h = p.hash;int k = h & sizeMask;HashEntry<K,V> n = newTable[k];newTable[k] = new HashEntry<K,V>(h, p.key, v, n);}}}}// 将新来的 node 放到新数组中刚刚的 两个链表之一 的 头部int nodeIndex = node.hash & sizeMask; // add the new nodenode.setNext(newTable[nodeIndex]);newTable[nodeIndex] = node;table = newTable;
}
[5] get方法

相对于 put 来说,get 真的不要太简单。

  1. 计算 hash 值,找到 segment 数组中的具体位置,或我们前面用的“槽”
  2. 槽中也是一个数组,根据 hash 找到数组中具体的位置
  3. 到这里是链表了,顺着链表进行查找即可
   public V get(Object key) {Segment<K, V> s; // manually integrate access methods to reduce overheadHashEntry<K, V>[] tab;//1. 获得hash值int h = hash(key);long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;//2. 根据hash值找到对应的segmentif ((s = (Segment<K, V>) UNSAFE.getObjectVolatile(segments, u)) != null &&(tab = s.table) != null) {//3. 找到segment 内部数组相应位置的链表,遍历for (HashEntry<K, V> e = (HashEntry<K, V>) UNSAFE.getObjectVolatile(tab, ((long) (((tab.length - 1) & h)) << TSHIFT) + TBASE);e != null; e = e.next) {K k;if ((k = e.key) == key || (e.hash == h && key.equals(k)))return e.value;}}return null;}
//remove过程
public V remove(Object key) {int hash = hash(key);Segment<K, V> s = segmentForHash(hash);return s == null ? null : s.remove(key, hash, null);}/*** segment的remove方法*/final V remove(Object key, int hash, Object value) {//1. 上锁if (!tryLock())scanAndLock(key, hash);V oldValue = null;try {HashEntry<K, V>[] tab = table;//2. 由hash值获得具体的数组下标indexint index = (tab.length - 1) & hash;//3.获得对应链表的头部结点HashEntry<K, V> e = entryAt(tab, index);//4. 使用前驱结点指针HashEntry<K, V> pred = null;while (e != null) {K k;HashEntry<K, V> next = e.next;if ((k = e.key) == key ||(e.hash == hash && key.equals(k))) {V v = e.value;if (value == null || value == v || value.equals(v)) {//要删除的结点是链表头结点,则将该结点的后继结点设为新的头结点if (pred == null)setEntryAt(tab, index, next);//要删除的结点不是链表头结点,则将该结点的前驱结点指向后继结点elsepred.setNext(next);++modCount;--count;oldValue = v;}break;}pred = e;e = next;}} finally {//解锁unlock();}return oldValue;}

size 计算流程

  • 计算元素个数前,先不加锁计算两次,如果前后两次结果如一样,认为个数正确返回

  • 如果不一样,进行重试,重试次数超过 3,将所有 segment 锁住,重新计算个数返回

public int size() {// Try a few times to get accurate count. On failure due to// continuous async changes in table, resort to locking.final Segment<K,V>[] segments = this.segments;int size;boolean overflow; // true if size overflows 32 bitslong sum; // sum of modCountslong last = 0L; // previous sumint retries = -1; // first iteration isn't retrytry {for (;;) {if (retries++ == RETRIES_BEFORE_LOCK) {// 超过重试次数, 需要创建所有 segment 并加锁for (int j = 0; j < segments.length; ++j)ensureSegment(j).lock(); // force creation}sum = 0L;size = 0;overflow = false;for (int j = 0; j < segments.length; ++j) {Segment<K,V> seg = segmentAt(segments, j);if (seg != null) {sum += seg.modCount;int c = seg.count;if (c < 0 || (size += c) < 0)overflow = true;}}if (sum == last)break;last = sum;}} finally {if (retries > RETRIES_BEFORE_LOCK) {for (int j = 0; j < segments.length; ++j)segmentAt(segments, j).unlock();}}return overflow ? Integer.MAX_VALUE : size; }
[5] 要点总结

1. JDK 1.7 ConcurrentHashMap 数据结构是怎么样的?

ConcurrentHashMap 底层在jdk1.7中是采用Segment数组 + HashEntry的方式进行实现的。

  • **Segment数组 **:Segment数组继承了ReentrantLock,每个Segment里维护了一个HashEntry数组。ConcurrentHashMap 默认有 16 个 Segments,并且一但指定则不可改变。理论上,这个时候,最多可以同时支持 16 个线程并发写,只要它们的操作分别分布在不同的 Segment 上。
  • HashEntry:与hashmap非常相似,但是内部数组用vlilote关键字修饰。

2. 说JDK 1.7 ConcurrentHashMap的初始化操作?

首先根据计算concurrencyLevel并行级别 ssize,并根据创建 Segment 数组,然后平均分配初始容量到segment内,并创建数组的第一个元素 segment[0]

initialCapacity:初始容量,这个值指的是整个 ConcurrentHashMap 的初始容量,实际操作的时候需要平均分给每个 Segment。

loadFactor:负载因子,之前我们说了,Segment 数组不可以扩容,所以这个负载因子是给每个 Segment 内部使用的。

concurrencyLevel:并发等级,也就是segment数量,你可以自己设置。但是他会根据并行级别 ssize自动调整到2的n次方的数组,

3. 说JDK 1.7 ConcurrentHashMap的put操作?

添加节点的操作 put 和删除节点的操作 remove 都是要加 segment 上的独占锁的,所以它们之间自然不会有问题,我们需要考虑的问题就是 get 的时候在同一个 segment 中发生了 put 或 remove 操作。

  1. put 操作的线程安全性。

    1. 初始化槽,这个我们之前就说过了,使用了 CAS 来初始化 Segment 中的数组。
    2. 添加节点到链表的操作是插入到表头的,所以,如果这个时候 get 操作在链表遍历的过程已经到了中间,是不会影响的。当然,另一个并发问题就是 get 操作在 put 之后,需要保证刚刚插入表头的节点被读取,这个依赖于 setEntryAt 方法中使用的 UNSAFE.putOrderedObject。
    3. 扩容。扩容是新创建了数组,然后进行迁移数据,最后面将 newTable 设置给属性 table。所以,如果 get 操作此时也在进行,那么也没关系,如果 get 先行,那么就是在旧的 table 上做查询操作;而 put 先行,那么 put 操作的可见性保证就是 table 使用了 volatile 关键字。
  • 首先说一下put 的主流程:在传入key,value时,首先计算 key 的 hash 值,然后根据 hash 值找到 Segment 数组中的位置

    如何根据 hash 值找到 Segment 数组中的位置?

    int j =(hash >>> segmentShift) & segmentMask ,他的含义是什么?就是让最高位与segmentMask重合,然后进行一个与运算!那为什么是最高位呢?,直接进行与运算不行吗,

    解释下segmentShift和segmentMask?为什么这么算?

    • segmentShift移位数,2的sshift次方等于ssize,segmentShift=32-sshift。若segments长度为16,segmentShift=32-4=28;若segments长度为32,segmentShift=32-5=27。而计算得出的hash值最大为32位,无符号右移segmentShift,则意味着只保留高几位(其余位是没用的),然后与段掩码segmentMask位运算来定位Segment。
    • segmentMask段掩码,假如segments数组长度为16,则段掩码为16-1=15;segments长度为32,段掩码为32-1=31。这样得到的所有bit位都为1,可以更好地保证散列的均匀性
    • 类似于int j=hashcode&(n-1)

    什么是掩码?

    入位

    https://baike.baidu.com/item/%E6%8E%A9%E7%A0%81/86301?fr=aladdin

    并发度如何设置?

    ConcurrentHashMap默认的并发度为16。当用户设置并发度时,ConcurrentHashMap会使用大于等于该值的最小2幂指数作为实际并发度(假如用户设置并发度为17,实际并发度则为32),原因与HashMap设置容量大小为2的n次方一样,能够更高效的使用“与”操作来定位分段Segment。如果并发度设置的过小,会带来严重的锁竞争问题;如果并发度设置的过大,原本位于同一个Segment内的访问会扩散到不同的Segment中,CPU cache命中率会下降,从而引起程序性能下降。

    说JDK 1.7 ConcurrentHashMap的内层put操作?–>获取整个数组的独占锁

    首先第一步的时候会尝试获取锁,如果获取失败肯定就有其他线程存在竞争,则利用 scanAndLockForPut() 自旋获取锁,如果重试的次数达到了 MAX_SCAN_RETRIES (64次)则改为阻塞锁获取(进入阻塞队列了),保证能获取成功。

    **ensureSegment:**ConcurrentHashMap 初始化的时候会初始化第一个槽 segment[0],对于其他槽来说,在插入第一个值的时候进行初始化。这里需要考虑并发,因为很可能会有多个线程同时进来初始化同一个槽 segment[k],不过只要有一个成功了就可以。

    scanAndLockForPut:前面我们看到,在往某个 segment 中 put 的时候,首先会调用 node = tryLock() ? null : scanAndLockForPut(key, hash, value),也就是说先进行一次 tryLock() 快速获取该 segment 的独占锁,如果失败,那么进入到 scanAndLockForPut 这个方法来获取锁。

    else if (++retries > MAX_SCAN_RETRIES) {
    lock();
    break;
    }
    else if ((retries & 1) == 0 &&
    // 这个时候是有大问题了,那就是有新的元素进到了链表,成为了新的表头
    // 所以这边的策略是,相当于重新走一遍这个 scanAndLockForPut 方法
    (f = entryForHash(this, hash)) != first) {
    e = first = f; // re-traverse if entry changed
    retries = -1;
    }

    4. 扩容: rehash

    重复一下,segment 数组不能扩容,扩容是 segment 数组某个位置内部的数组 HashEntry<K,V>[] 进行扩容,扩容后,容量为原来的 2 倍。

    首先,我们要回顾一下触发扩容的地方,put 的时候,如果判断该值的插入会导致该 segment 的元素个数超过阈值,那么先进行扩容,再插值,读者这个时候可以回去 put 方法看一眼。

    该方法不需要考虑并发,因为到这里的时候,是持有该 segment 的独占锁的。

    这里的扩容比之前的 HashMap 要复杂一些,代码难懂一点。上面有两个挨着的 for 循环,第一个 for 有什么用呢?

    仔细一看发现,如果没有第一个 for 循环,也是可以工作的,但是,这个 for 循环下来,如果 lastRun 的后面还有比较多的节点,那么这次就是值得的。因为我们只需要克隆 lastRun 前面的节点,后面的一串节点跟着 lastRun 走就是了,不需要做任何操作。

    我觉得 Doug Lea 的这个想法也是挺有意思的,不过比较坏的情况就是每次 lastRun 都是链表的最后一个元素或者很靠后的元素,那么这次遍历就有点浪费了。不过 Doug Lea 也说了,根据统计,如果使用默认的阈值,大约只有 1/6 的节点需要克隆

    [6] size 计算流程
    • 计算元素个数前,先不加锁计算两次,如果前后两次结果如一样,认为个数正确返回
    • 如果不一样,进行重试,重试次数超过 3,将所有 segment 锁住,重新计算个数返回

    6. remove 操作的线程安全性

    remove 操作我们没有分析源码,所以这里说的读者感兴趣的话还是需要到源码中去求实一下的。

    get 操作需要遍历链表,但是 remove 操作会"破坏"链表。

    如果 remove 破坏的节点 get 操作已经过去了,那么这里不存在任何问题。

    如果 remove 先破坏了一个节点,分两种情况考虑。 1、如果此节点是头结点,那么需要将头结点的 next 设置为数组该位置的元素,table 虽然使用了 volatile 修饰,但是 volatile 并不能提供数组内部操作的可见性保证,所以源码中使用了 UNSAFE 来操作数组,请看方法 setEntryAt。2、如果要删除的节点不是头结点,它会将要删除节点的后继节点接到前驱节点中,这里的并发保证就是 next 属性是 volatile 的。

二、JDK1.8的ConcurrentHashMap

[1] 官方文档

此为翻译版本,但并非逐字翻译,使用ctrl+单机进入源码,可阅读源码的英文文档,简单总结如下:

  • ConcurrentHashMap是一个支持并发的哈希表。功能类似于Hashtable,但实现不同;
  • 检索(put)操作并不加锁,反映了最近完成的更新操作的结果;
  • Hashtable类不允许 null用作键或值,HashMap 可以;
  • ConcurrentHashMaps支持一组顺序和并行批量操作;
  • 所有任务方法的所有参数都必须为非空值
  ConcurrentHashMap是一个支持检索的完全并发性和更新的高预期并发性的哈希表。这个类服从类似于Hashtable的功能规范,并且包括对应于Hashtable的每个方法。不过,尽管所有操作都是线程安全的,检索(put)操作并不意味着锁定,并没有任何支持为防止所有访问的方式锁定整个表。这个类可以实现的功能类似于Hashtable,但同步细节不同。检索操作(包括get)通常不阻止,因此可能与更新操作重叠(包括put和remove )。检索反映了最近完成的更新操作的结果。(更正式地,对于给定密钥的更新操作熊之前发生与任何(非空关系)检索该键报告经更新的值。)对于聚合操作,比如putAll和clear,并发检索可能反映插入或移除只有一些条目。类似地,迭代器,分割器和枚举返回在反映迭代器/枚举创建过程中或之后反映哈希表状态的元素。他们不抛出ConcurrentModificationException 。然而,迭代器被设计为一次只能由一个线程使用。包括size ,isEmpty和containsValue通常是有用的,只有当一个map没有发生在其他线程并发更新。否则,这些方法的结果反映了可能足以用于监视或估计目的的瞬态状态,但不适用于程序控制。当存在太多的冲突(即,具有不同的哈希码但是以表的大小为模数落入相同的时隙的密钥)时,该表被动态扩展,并且每个映射保持大致两个bin的预期平均效果(对应于0.75负载因素阈值调整大小)。由于映射被添加和删除,这个平均值可能会有很大差异,但是总的来说,这为哈希表保留了普遍接受的时间/空间权衡。然而,调整这个或任何其他类型的散列表可能是相对较慢的操作 如果可能,最好提供一个尺寸估计作为可选的initialCapacity构造函数参数。 附加的可选的loadFactor构造函数参数提供了另外的手段,通过指定在计算给定数量的元素时要分配的空间量时使用的表密度来定制初始表容量。 此外,为了与此类的先前版本兼容,构造函数可以可选地指定预期的concurrencyLevel作为内部大小调整的附加提示。 请注意,使用完全相同的许多键hashCode()是降低任何哈希表的hashCode()的一种可靠的方法。 为了改善影响,当按键为Comparable时,该类可以使用键之间的比较顺序来帮助打破关系。当Set投影一个的ConcurrentHashMap可以(使用被创建newKeySet()或newKeySet(int) ),或观察(使用keySet(Object)时仅键是感兴趣的,并且被映射的值是(可能瞬时)不使用或全部取相同的映射值。 ConcurrentHashMap可以通过使用LongAdder值并通过computeIfAbsent进行初始化,将其用作可缩放的频率映射(直方图或多集的形式)。 例如,要向ConcurrentHashMap<String,LongAdder> freqs添加计数,可以使用freqs.computeIfAbsent(k -> new LongAdder()).increment(); 此类及其视图和迭代器实现所有的可选方法Map个Iterator接口。像Hashtable但不像HashMap ,这个类不允许 null用作键或值。ConcurrentHashMaps支持一组顺序和并行批量操作,与大多数Stream方法不同,它们被设计为安全并且经常明智地应用,即使是由其他线程同时更新的映射; 例如,当计算共享注册表中的值的快照摘要时。 有三种操作,每种具有四种形式,接受键,值,条目和(键,值)参数和/或返回值的函数。 由于ConcurrentHashMap的元素不以任何特定的方式排序,并且可能会在不同的并行执行中以不同的顺序进行处理,因此提供的函数的正确性不应取决于任何排序,也不应依赖于可能瞬时变化的任何其他对象或值计算进行中; 除了每一个行动,理想情况下都是无副作用的。 对Map.Entry对象的批量操作不支持方法setValue 。 forEach:对每个元素执行给定的操作。 变量形式在执行操作之前对每个元素应用给定的变换。search:返回在每个元素上应用给定函数的第一个可用非空结果; 当找到结果时跳过进一步的搜索。reduce:累积每个元素。 提供的减少功能不能依赖于排序(更正式地,它应该是关联和交换)。 有五种变体:
平原减少 (由于没有相应的返回类型,因此(key,value)函数参数没有这种方法的形式)
映射的减少积累了应用于每个元素的给定函数的结果。使用给定的基础值减少到标量双,长和int。 这些批量操作接受一个parallelismThreshold参数。 如果估计当前地图大小小于给定阈值,则方法依次进行。 使用Long.MAX_VALUE的值Long.MAX_VALUE抑制所有的并行性。 使用1的值可以通过划分为足够的子任务来完全利用用于所有并行计算的ForkJoinPool.commonPool()来实现最大并行度。 通常,您最初将选择其中一个极值,然后测量使用中间值之间的性能,从而降低开销与吞吐量之间的关系。批量操作的并发属性遵循ConcurrentHashMap的并发属性:从get(key)返回的任何非空结果和相关的访问方法与相关的插入或更新都有一个发生之前的关系。 任何批量操作的结果都反映了这些每个元素关系的组成(但是除非以某种方式已知是静态的,它们并不一定是相对于整个地图的原子)。 相反,因为映射中的键和值从不为空,所以null作为目前缺乏任何结果的可靠原子指标。 为了保持此属性,null用作所有非标量缩减操作的隐含基础。 对于double,long和int版本,基础应该是当与任何其他值组合时返回其他值(更正式地,它应该是减少的标识元素)。 大多数常见的减少具有这些属性; 例如,使用基数0或最小值与基准MAX_VALUE计算和。作为参数提供的搜索和转换函数应该类似地返回null以指示缺少任何结果(在这种情况下不被使用)。 在映射缩减的情况下,这也使得转换可以用作过滤器,如果不应该组合元素,返回null(或者在原始专业化的情况下,身份基础)。 在使用它们进行搜索或减少操作之前,您可以通过在“null意味着现在没有任何内容”规则下自行构建复合转换和过滤。接受和/或返回Entry参数的方法维护键值关联。 例如,当找到最大价值的钥匙时,它们可能是有用的。 请注意,可以使用new AbstractMap.SimpleEntry(k,v)提供“plain”Entry new AbstractMap.SimpleEntry(k,v) 。
批量操作可能突然完成,抛出在应用程序中遇到的异常。 在处理这样的异常时,请注意,其他并发执行的函数也可能引发异常,或者如果没有发生第一个异常,则会这样做。 与顺序形式相比,加速比是常见的,但不能保证。 如果并行计算的基础工作比计算本身更昂贵,则涉及小地图上的简短功能的并行操作可能比顺序形式执行得更慢。 类似地,如果所有处理器正忙于执行不相关的任务,并行化可能不会导致太多的实际并行。 所有任务方法的所有参数都必须为非空值。
    /** Overview:** The primary design goal of this hash table is to maintain* concurrent readability (typically method get(), but also* iterators and related methods) while minimizing update* contention. Secondary goals are to keep space consumption about* the same or better than java.util.HashMap, and to support high* initial insertion rates on an empty table by many threads.** This map usually acts as a binned (bucketed) hash table.  Each* key-value mapping is held in a Node.  Most nodes are instances* of the basic Node class with hash, key, value, and next* fields. However, various subclasses exist: TreeNodes are* arranged in balanced trees, not lists.  TreeBins hold the roots* of sets of TreeNodes. ForwardingNodes are placed at the heads* of bins during resizing. ReservationNodes are used as* placeholders while establishing values in computeIfAbsent and* related methods.  The types TreeBin, ForwardingNode, and* ReservationNode do not hold normal user keys, values, or* hashes, and are readily distinguishable during search etc* because they have negative hash fields and null key and value* fields. (These special nodes are either uncommon or transient,* so the impact of carrying around some unused fields is* insignificant.)** The table is lazily initialized to a power-of-two size upon the* first insertion.  Each bin in the table normally contains a* list of Nodes (most often, the list has only zero or one Node).* Table accesses require volatile/atomic reads, writes, and* CASes.  Because there is no other way to arrange this without* adding further indirections, we use intrinsics* (sun.misc.Unsafe) operations.** We use the top (sign) bit of Node hash fields for control* purposes -- it is available anyway because of addressing* constraints.  Nodes with negative hash fields are specially* handled or ignored in map methods.** Insertion (via put or its variants) of the first node in an* empty bin is performed by just CASing it to the bin.  This is* by far the most common case for put operations under most* key/hash distributions.  Other update operations (insert,* delete, and replace) require locks.  We do not want to waste* the space required to associate a distinct lock object with* each bin, so instead use the first node of a bin list itself as* a lock. Locking support for these locks relies on builtin* "synchronized" monitors.** Using the first node of a list as a lock does not by itself* suffice though: When a node is locked, any update must first* validate that it is still the first node after locking it, and* retry if not. Because new nodes are always appended to lists,* once a node is first in a bin, it remains first until deleted* or the bin becomes invalidated (upon resizing).** The main disadvantage of per-bin locks is that other update* operations on other nodes in a bin list protected by the same* lock can stall, for example when user equals() or mapping* functions take a long time.  However, statistically, under* random hash codes, this is not a common problem.  Ideally, the* frequency of nodes in bins follows a Poisson distribution* (http://en.wikipedia.org/wiki/Poisson_distribution) with a* parameter of about 0.5 on average, given the resizing threshold* of 0.75, although with a large variance because of resizing* granularity. Ignoring variance, the expected occurrences of* list size k are (exp(-0.5) * pow(0.5, k) / factorial(k)). The* first values are:** 0:    0.60653066* 1:    0.30326533* 2:    0.07581633* 3:    0.01263606* 4:    0.00157952* 5:    0.00015795* 6:    0.00001316* 7:    0.00000094* 8:    0.00000006* more: less than 1 in ten million** Lock contention probability for two threads accessing distinct* elements is roughly 1 / (8 * #elements) under random hashes.** Actual hash code distributions encountered in practice* sometimes deviate significantly from uniform randomness.  This* includes the case when N > (1<<30), so some keys MUST collide.* Similarly for dumb or hostile usages in which multiple keys are* designed to have identical hash codes or ones that differs only* in masked-out high bits. So we use a secondary strategy that* applies when the number of nodes in a bin exceeds a* threshold. These TreeBins use a balanced tree to hold nodes (a* specialized form of red-black trees), bounding search time to* O(log N).  Each search step in a TreeBin is at least twice as* slow as in a regular list, but given that N cannot exceed* (1<<64) (before running out of addresses) this bounds search* steps, lock hold times, etc, to reasonable constants (roughly* 100 nodes inspected per operation worst case) so long as keys* are Comparable (which is very common -- String, Long, etc).* TreeBin nodes (TreeNodes) also maintain the same "next"* traversal pointers as regular nodes, so can be traversed in* iterators in the same way.** The table is resized when occupancy exceeds a percentage* threshold (nominally, 0.75, but see below).  Any thread* noticing an overfull bin may assist in resizing after the* initiating thread allocates and sets up the replacement array.* However, rather than stalling, these other threads may proceed* with insertions etc.  The use of TreeBins shields us from the* worst case effects of overfilling while resizes are in* progress.  Resizing proceeds by transferring bins, one by one,* from the table to the next table. However, threads claim small* blocks of indices to transfer (via field transferIndex) before* doing so, reducing contention.  A generation stamp in field* sizeCtl ensures that resizings do not overlap. Because we are* using power-of-two expansion, the elements from each bin must* either stay at same index, or move with a power of two* offset. We eliminate unnecessary node creation by catching* cases where old nodes can be reused because their next fields* won't change.  On average, only about one-sixth of them need* cloning when a table doubles. The nodes they replace will be* garbage collectable as soon as they are no longer referenced by* any reader thread that may be in the midst of concurrently* traversing table.  Upon transfer, the old table bin contains* only a special forwarding node (with hash field "MOVED") that* contains the next table as its key. On encountering a* forwarding node, access and update operations restart, using* the new table.** Each bin transfer requires its bin lock, which can stall* waiting for locks while resizing. However, because other* threads can join in and help resize rather than contend for* locks, average aggregate waits become shorter as resizing* progresses.  The transfer operation must also ensure that all* accessible bins in both the old and new table are usable by any* traversal.  This is arranged in part by proceeding from the* last bin (table.length - 1) up towards the first.  Upon seeing* a forwarding node, traversals (see class Traverser) arrange to* move to the new table without revisiting nodes.  To ensure that* no intervening nodes are skipped even when moved out of order,* a stack (see class TableStack) is created on first encounter of* a forwarding node during a traversal, to maintain its place if* later processing the current table. The need for these* save/restore mechanics is relatively rare, but when one* forwarding node is encountered, typically many more will be.* So Traversers use a simple caching scheme to avoid creating so* many new TableStack nodes. (Thanks to Peter Levart for* suggesting use of a stack here.)** The traversal scheme also applies to partial traversals of* ranges of bins (via an alternate Traverser constructor)* to support partitioned aggregate operations.  Also, read-only* operations give up if ever forwarded to a null table, which* provides support for shutdown-style clearing, which is also not* currently implemented.** Lazy table initialization minimizes footprint until first use,* and also avoids resizings when the first operation is from a* putAll, constructor with map argument, or deserialization.* These cases attempt to override the initial capacity settings,* but harmlessly fail to take effect in cases of races.** The element count is maintained using a specialization of* LongAdder. We need to incorporate a specialization rather than* just use a LongAdder in order to access implicit* contention-sensing that leads to creation of multiple* CounterCells.  The counter mechanics avoid contention on* updates but can encounter cache thrashing if read too* frequently during concurrent access. To avoid reading so often,* resizing under contention is attempted only upon adding to a* bin already holding two or more nodes. Under uniform hash* distributions, the probability of this occurring at threshold* is around 13%, meaning that only about 1 in 8 puts check* threshold (and after resizing, many fewer do so).** TreeBins use a special form of comparison for search and* related operations (which is the main reason we cannot use* existing collections such as TreeMaps). TreeBins contain* Comparable elements, but may contain others, as well as* elements that are Comparable but not necessarily Comparable for* the same T, so we cannot invoke compareTo among them. To handle* this, the tree is ordered primarily by hash value, then by* Comparable.compareTo order if applicable.  On lookup at a node,* if elements are not comparable or compare as 0 then both left* and right children may need to be searched in the case of tied* hash values. (This corresponds to the full list search that* would be necessary if all elements were non-Comparable and had* tied hashes.) On insertion, to keep a total ordering (or as* close as is required here) across rebalancings, we compare* classes and identityHashCodes as tie-breakers. The red-black* balancing code is updated from pre-jdk-collections* (http://gee.cs.oswego.edu/dl/classes/collections/RBCell.java)* based in turn on Cormen, Leiserson, and Rivest "Introduction to* Algorithms" (CLR).** TreeBins also require an additional locking mechanism.  While* list traversal is always possible by readers even during* updates, tree traversal is not, mainly because of tree-rotations* that may change the root node and/or its linkages.  TreeBins* include a simple read-write lock mechanism parasitic on the* main bin-synchronization strategy: Structural adjustments* associated with an insertion or removal are already bin-locked* (and so cannot conflict with other writers) but must wait for* ongoing readers to finish. Since there can be only one such* waiter, we use a simple scheme using a single "waiter" field to* block writers.  However, readers need never block.  If the root* lock is held, they proceed along the slow traversal path (via* next-pointers) until the lock becomes available or the list is* exhausted, whichever comes first. These cases are not fast, but* maximize aggregate expected throughput.** Maintaining API and serialization compatibility with previous* versions of this class introduces several oddities. Mainly: We* leave untouched but unused constructor arguments refering to* concurrencyLevel. We accept a loadFactor constructor argument,* but apply it only to initial table capacity (which is the only* time that we can guarantee to honor it.) We also declare an* unused "Segment" class that is instantiated in minimal form* only when serializing.** Also, solely for compatibility with previous versions of this* class, it extends AbstractMap, even though all of its methods* are overridden, so it is just useless baggage.** This file is organized to make things a little easier to follow* while reading than they might otherwise: First the main static* declarations and utilities, then fields, then main public* methods (with a few factorings of multiple public methods into* internal ones), then sizing methods, trees, traversers, and* bulk operations.*//* ---------------- Constants -------------- */
[2] 数据结构

抛弃了原有的 Segment 分段锁,而采用了 CAS + synchronized 来保证并发安全性。

跟HashMap很像,也把之前的HashEntry改成了Node,但是作用不变,把值和next采用了volatile去修饰,保证了可见性,并且也引入了红黑树,在链表大于一定值的时候会转换(默认是8)。

[3] 重要属性和方法

重要属性

// 默认为 0
// 当初始化时, 为 -1
// 当扩容时, 为 -(1 + 扩容线程数)
// 当初始化或扩容完成后,为 下一次的扩容的阈值大小
private transient volatile int sizeCtl;
// 整个 ConcurrentHashMap 就是一个 Node[]
static class Node<K,V> implements Map.Entry<K,V> {}
// hash 表
transient volatile Node<K,V>[] table;
// 扩容时的 新 hash 表
private transient volatile Node<K,V>[] nextTable;
// 扩容时如果某个 bin 迁移完毕, 用 ForwardingNode 作为旧 table bin 的头结点
static final class ForwardingNode<K,V> extends Node<K,V> {}
// 用在 compute 以及 computeIfAbsent 时, 用来占位, 计算完成后替换为普通 Node
static final class ReservationNode<K,V> extends Node<K,V> {}
// 作为 treebin 的头节点, 存储 root 和 first
static final class TreeBin<K,V> extends Node<K,V> {}
// 作为 treebin 的节点, 存储 parent, left, right
static final class TreeNode<K,V> extends Node<K,V> {}

重要方法

// 获取 Node[] 中第 i 个 Node
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i)// cas 修改 Node[] 中第 i 个 Node 的值, c 为旧值, v 为新值
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v)// 直接修改 Node[] 中第 i 个 Node 的值, v 为新值
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v)

构造器分析

public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)throw new IllegalArgumentException();if (initialCapacity < concurrencyLevel) // Use at least as many binsinitialCapacity = concurrencyLevel; // as estimated threadslong size = (long)(1.0 + (long)initialCapacity / loadFactor);// tableSizeFor 仍然是保证计算的大小是 2^n, 即 16,32,64 ... int cap = (size >= (long)MAXIMUM_CAPACITY) ?MAXIMUM_CAPACITY : tableSizeFor((int)size);this.sizeCtl = cap; }
[4] put方法

ConcurrentHashMap在进行put操作的还是比较复杂的,大致可以分为以下步骤:

  1. 当添加一对键值对的时候,首先会去判断K,V是否为空,如果为空,否则的话抛出空指针异常;
  2. 取得key的hash值, spread()保证key是一个正整数
  3. 进入一个循环,直到正确添加:
  • 第一次put的时候如果table有没有初始化,则initTable()(使用CAS)初始化table;

  • 求出结点应该插入hash表的位置,如果位置为空,则创建头结点并安全(使用CAS)插入;

  • 如果位置上有节点,则通过节点的hash值来判断当前结点是否在扩容,如果MOVED(-1)则表示有其他线程正在对这个数组进行扩容,则当前线程也 利用helpTransfer()(使用CAS)去帮助扩容。

  • 最后一种情况就是,如果这个节点,不为空,也不在扩容,说明发生hash冲突了。则加锁(synchronized)这个链表的头结点进行添加操作(如果其他key put进来的时候也对应这个tab则堵塞在这里),然后判断当前取出的节点位置存放的是链表还是树:

    • 如果是链表的话(hash大于0),就对这个链表的所有元素进行遍历如果找到了key和key的hash值都一样的节点,则把它的值替换;如果没找到的话,则添加在链表的最后面。
    • 否则,是树的话,则调用putTreeVal方法添加到树中去,在添加完之后,会对该节点上关联的的数目进行判断,如果在8个以上的话,则会调用treeifyBin方法,来尝试转化为树,或者是扩容。

总结:

  1. 当发生hash冲突才使用synchronized锁住链表头,在其他情况下都是用CAS
  2. 有其他线程正在对这个数组进行扩容,会使用helpTransfer()(使用了CAS)去帮忙扩容。
    /*
单纯的额调用putVal方法,并且putVal的第三个参数设置为false,当设置为false的时候表示这个key值相同时,新value会覆盖旧值,true的时候,则不会覆盖旧值。*/public V put(K key, V value) {return putVal(key, value, false);}final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null) throw new NullPointerException();//K,V都不能为空,否则的话抛出异常int hash = spread(key.hashCode());    //取得key的hash值int binCount = 0;    //用来计算在这个节点总共有多少个元素,用来控制扩容或者转移为树for (Node<K,V>[] tab = table;;) {    //无限循环Node<K,V> f; int n, i, fh;if (tab == null || (n = tab.length) == 0)    tab = initTable();    //第一次put的时候table没有初始化,则使用cas初始化tableelse if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {   //通过哈希计算出一个表中的位置因为n是数组的长度,所以(n-1)&hash肯定不会出现数组越界if (casTabAt(tab, i, null,        //如果这个位置没有元素的话,则通过cas的方式尝试添加,注意这个时候是没有加锁的new Node<K,V>(hash, key, value, null)))        //创建一个Node添加到数组中区,null表示的是下一个节点为空break;        // no lock when adding to empty bin}/** 如果检测到某个节点的hash值是MOVED,则表示正在进行数组扩张的数据复制阶段,则当前线程也会参与去复制,通过允许多线程复制的功能,一次来减少数组的复制所带来的性能损失*/else if ((fh = f.hash) == MOVED)    tab = helpTransfer(tab, f);else {/** 如果在这个位置有元素的话,就采用synchronized的方式加锁,* 如果是链表的话(hash大于0),就对这个链表的所有元素进行遍历,*         如果找到了key和key的hash值都一样的节点,则把它的值替换到*         如果没找到的话,则添加在链表的最后面*  否则,是树的话,则调用putTreeVal方法添加到树中去*  在添加完之后,会对该节点上关联的的数目进行判断,*  如果在8个以上的话,则会调用treeifyBin方法,来尝试转化为树,或者是扩容*/V oldVal = null;synchronized (f) {if (tabAt(tab, i) == f) {        //再次取出要存储的位置的元素,跟前面取出来的比较 if (fh >= 0) {               //取出来的元素的hash值大于0,当转换为树之后,hash值为-2binCount = 1;            for (Node<K,V> e = f;; ++binCount) {    //遍历这个链表K ek;if (e.hash == hash &&        //要存的元素的hash,key跟要存储的位置的节点的相同的时候,替换掉该节点的value即可                 ((ek = e.key) == key ||(ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent)        //当使用putIfAbsent的时候,只有在这个key没有设置值得时候才设置                                 e.val = value;break;}Node<K,V> pred = e;if ((e = e.next) == null) {    //如果不是同样的hash,同样的key的时候,则判断该节点的下一个节点是否为空,                       pred.next = new Node<K,V>(hash, key,       //为空的话把这个要加入的节点设置为当前节点的下一个节点                                                                                                  value, null);break;}}}else if (f instanceof TreeBin) {    //表示已经转化成红黑树类型了Node<K,V> p;binCount = 2;if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,    //调用putTreeVal方法,将该元素添加到树中去                                                              value)) != null) {oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}}}if (binCount != 0) {if (binCount >= TREEIFY_THRESHOLD)    //当在同一个节点的数目达到8个的时候,则扩张数组或将给节点的数据转为tree                   treeifyBin(tab, i);    if (oldVal != null)return oldVal;break;}}}addCount(1L, binCount);    //计数return null;}
[5] 深入分析扩容实现

https://www.jianshu.com/p/f6730d5784ad

ConcurrentHashMap相关的文章写了不少,有个遗留问题一直没有分析,也被好多人请教过,被搁置在一旁,即如何在并发的情况下实现数组的扩容。

什么情况会触发扩容

当往hashMap中成功插入一个key/value节点时,有可能触发扩容动作:
1、如果新增节点之后,所在链表的元素个数达到了阈值 8,则会调用treeifyBin方法把链表转换成红黑树,不过在结构转换之前,会对数组长度进行判断,实现如下:

如果数组长度n小于阈值MIN_TREEIFY_CAPACITY,默认是64,则会调用tryPresize方法把数组长度扩大到原来的两倍,并触发transfer方法,重新调整节点的位置。

2、新增节点之后,会调用addCount方法记录元素个数,并检查是否需要进行扩容,当数组元素个数达到阈值时,会触发transfer方法,重新调整节点的位置。

transfer实现

transfer方法实现了在并发的情况下,高效的从原始组数往新数组中移动元素,假设扩容之前节点的分布如下,这里区分蓝色节点和红色节点,是为了后续更好的分析:

在上图中,第14个槽位插入新节点之后,链表元素个数已经达到了8,且数组长度为16,优先通过扩容来缓解链表过长的问题,实现如下:
1、根据当前数组长度n,新建一个两倍长度的数组nextTable

2、初始化ForwardingNode节点,其中保存了新数组nextTable的引用,在处理完每个槽位的节点之后当做占位节点,表示该槽位已经处理过了;

3、通过for自循环处理每个槽位中的链表元素,默认advace为真,通过CAS设置transferIndex属性值,并初始化ibound值,i指当前处理的槽位序号,bound指需要处理的槽位边界,先处理槽位15的节点;

4、在当前假设条件下,槽位15中没有节点,则通过CAS插入在第二步中初始化的ForwardingNode节点,用于告诉其它线程该槽位已经处理过了;

5、如果槽位15已经被线程A处理了,那么线程B处理到这个节点时,取到该节点的hash值应该为MOVED,值为-1,则直接跳过,继续处理下一个槽位14的节点;

6、处理槽位14的节点,是一个链表结构,先定义两个变量节点lnhn,按我的理解应该是lowNodehighNode,分别保存hash值的第X位为0和1的节点,具体实现如下:

使用fn&n可以快速把链表中的元素区分成两类,A类是hash值的第X位为0,B类是hash值的第X位为1,并通过lastRun记录最后需要处理的节点,A类和B类节点可以分散到新数组的槽位14和30中,在原数组的槽位14中,蓝色节点第X为0,红色节点第X为1,把链表拉平显示如下:

1、通过遍历链表,记录runBitlastRun,分别为1和节点6,所以设置hn为节点6,ln为null;
2、重新遍历链表,以lastRun节点为终止条件,根据第X位的值分别构造ln链表和hn链表:

ln链:和原来链表相比,顺序已经不一样了

hn链:

通过CAS把ln链表设置到新数组的i位置,hn链表设置到i+n的位置;

7、如果该槽位是红黑树结构,则构造树节点lohi,遍历红黑树中的节点,同样根据hash&n算法,把节点分为两类,分别插入到lohi为头的链表中,根据lohi链表中的元素个数分别生成lnhn节点,其中ln节点的生成逻辑如下:
(1)如果lo链表的元素个数小于等于UNTREEIFY_THRESHOLD,默认为6,则通过untreeify方法把树节点链表转化成普通节点链表;
(2)否则判断hi链表中的元素个数是否等于0:如果等于0,表示lo链表中包含了所有原始节点,则设置原始红黑树给ln,否则根据lo链表重新构造红黑树。

最后,同样的通过CAS把ln设置到新数组的i位置,hn设置到i+n位置。
链接:https://www.jianshu.com/p/f6730d5784ad

1.tryPresize()

扩容操作tryPresize中涉及到了数据迁移transfer方法,这两个方法也是 Java8 ConcurrentHashMap中比较复杂的方法

这个方法的核心在于 sizeCtl 值的操作,首先将其设置为一个负数,然后执行 transfer(tab, null),再下一个循环将 sizeCtl 加 1,并执行 transfer(tab, nt),之后可能是继续 sizeCtl 加 1,并执行 transfer(tab, nt)。

所以,可能的操作就是执行 1 次 transfer(tab, null) + 多次 transfer(tab, nt),这里怎么结束循环的需要看完 transfer 源码才清楚。

// 首先要说明的是,方法参数 size 传进来的时候就已经翻了倍了
private final void tryPresize(int size) {// c:size 的 1.5 倍,再加 1,再往上取最近的 2 的 n 次方。int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :tableSizeFor(size + (size >>> 1) + 1);int sc;while ((sc = sizeCtl) >= 0) {Node<K,V>[] tab = table; int n;// 这个 if 分支和之前说的初始化数组的代码基本上是一样的,在这里,我们可以不用管这块代码if (tab == null || (n = tab.length) == 0) {n = (sc > c) ? sc : c;if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {try {if (table == tab) {@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];table = nt;sc = n - (n >>> 2); // 0.75 * n}} finally {sizeCtl = sc;}}}else if (c <= sc || n >= MAXIMUM_CAPACITY)break;else if (tab == table) {// 我没看懂 rs 的真正含义是什么,不过也关系不大int rs = resizeStamp(n);if (sc < 0) {Node<K,V>[] nt;if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||transferIndex <= 0)break;// 2. 用 CAS 将 sizeCtl 加 1,然后执行 transfer 方法//    此时 nextTab 不为 nullif (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))transfer(tab, nt);}// 1. 将 sizeCtl 设置为 (rs << RESIZE_STAMP_SHIFT) + 2)//     我是没看懂这个值真正的意义是什么?不过可以计算出来的是,结果是一个比较大的负数//  调用 transfer 方法,此时 nextTab 参数为 nullelse if (U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2))transfer(tab, null);}}
}
2.transfer()

下面这个方法有点长,将原来的 tab 数组的元素迁移到新的 nextTab 数组中。虽然我们之前说的 tryPresize 方法中多次调用 transfer 不涉及多线程,但是这个 transfer 方法可以在其他地方被调用,典型地,我们之前在说 put 方法的时候就说过了,请往上看 put 方法,是不是有个地方调用了 helpTransfer 方法,helpTransfer 方法会调用 transfer 方法的。此方法支持多线程执行,外围调用此方法的时候,会保证第一个发起数据迁移的线程,nextTab 参数为 null,之后再调用此方法的时候,nextTab 不会为 null。

阅读源码之前,先要理解并发操作的机制。原数组长度为 n,所以我们有 n 个迁移任务,让每个线程每次负责一个小任务是最简单的,每做完一个任务再检测是否有其他没做完的任务,帮助迁移就可以了,而 Doug Lea 使用了一个 stride,简单理解就是步长,每个线程每次负责迁移其中的一部分,如每次迁移 16 个小任务。所以,我们就需要一个全局的调度者来安排哪个线程执行哪几个任务,这个就是属性 transferIndex 的作用。

第一个发起数据迁移的线程会将 transferIndex 指向原数组最后的位置,然后从后往前的 stride 个任务属于第一个线程,然后将 transferIndex 指向新的位置,再往前的 stride 个任务属于第二个线程,依此类推。当然,这里说的第二个线程不是真的一定指代了第二个线程,也可以是同一个线程,这个读者应该能理解吧。其实就是将一个大的迁移任务分为了一个个任务包。

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {int n = tab.length, stride;// stride 在单核下直接等于 n,多核模式下为 (n>>>3)/NCPU,最小值是 16// stride 可以理解为”步长“,有 n 个位置是需要进行迁移的,//   将这 n 个任务分为多个任务包,每个任务包有 stride 个任务if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)stride = MIN_TRANSFER_STRIDE; // subdivide range// 如果 nextTab 为 null,先进行一次初始化//    前面我们说了,外围会保证第一个发起迁移的线程调用此方法时,参数 nextTab 为 null//       之后参与迁移的线程调用此方法时,nextTab 不会为 nullif (nextTab == null) {try {// 容量翻倍Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];nextTab = nt;} catch (Throwable ex) {      // try to cope with OOMEsizeCtl = Integer.MAX_VALUE;return;}// nextTable 是 ConcurrentHashMap 中的属性nextTable = nextTab;// transferIndex 也是 ConcurrentHashMap 的属性,用于控制迁移的位置transferIndex = n;}int nextn = nextTab.length;// ForwardingNode 翻译过来就是正在被迁移的 Node// 这个构造方法会生成一个Node,key、value 和 next 都为 null,关键是 hash 为 MOVED// 后面我们会看到,原数组中位置 i 处的节点完成迁移工作后,//    就会将位置 i 处设置为这个 ForwardingNode,用来告诉其他线程该位置已经处理过了//    所以它其实相当于是一个标志。ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);// advance 指的是做完了一个位置的迁移工作,可以准备做下一个位置的了boolean advance = true;boolean finishing = false; // to ensure sweep before committing nextTab/** 下面这个 for 循环,最难理解的在前面,而要看懂它们,应该先看懂后面的,然后再倒回来看* */// i 是位置索引,bound 是边界,注意是从后往前for (int i = 0, bound = 0;;) {Node<K,V> f; int fh;// 下面这个 while 真的是不好理解// advance 为 true 表示可以进行下一个位置的迁移了//   简单理解结局:i 指向了 transferIndex,bound 指向了 transferIndex-stridewhile (advance) {int nextIndex, nextBound;if (--i >= bound || finishing)advance = false;// 将 transferIndex 值赋给 nextIndex// 这里 transferIndex 一旦小于等于 0,说明原数组的所有位置都有相应的线程去处理了else if ((nextIndex = transferIndex) <= 0) {i = -1;advance = false;}else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ?nextIndex - stride : 0))) {// 看括号中的代码,nextBound 是这次迁移任务的边界,注意,是从后往前bound = nextBound;i = nextIndex - 1;advance = false;}}if (i < 0 || i >= n || i + n >= nextn) {int sc;if (finishing) {// 所有的迁移操作已经完成nextTable = null;// 将新的 nextTab 赋值给 table 属性,完成迁移table = nextTab;// 重新计算 sizeCtl:n 是原数组长度,所以 sizeCtl 得出的值将是新数组长度的 0.75 倍sizeCtl = (n << 1) - (n >>> 1);return;}// 之前我们说过,sizeCtl 在迁移前会设置为 (rs << RESIZE_STAMP_SHIFT) + 2// 然后,每有一个线程参与迁移就会将 sizeCtl 加 1,// 这里使用 CAS 操作对 sizeCtl 进行减 1,代表做完了属于自己的任务if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {// 任务结束,方法退出if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)return;// 到这里,说明 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT,// 也就是说,所有的迁移任务都做完了,也就会进入到上面的 if(finishing){} 分支了finishing = advance = true;i = n; // recheck before commit}}// 如果位置 i 处是空的,没有任何节点,那么放入刚刚初始化的 ForwardingNode ”空节点“else if ((f = tabAt(tab, i)) == null)advance = casTabAt(tab, i, null, fwd);// 该位置处是一个 ForwardingNode,代表该位置已经迁移过了else if ((fh = f.hash) == MOVED)advance = true; // already processedelse {// 对数组该位置处的结点加锁,开始处理数组该位置处的迁移工作synchronized (f) {if (tabAt(tab, i) == f) {Node<K,V> ln, hn;// 头结点的 hash 大于 0,说明是链表的 Node 节点if (fh >= 0) {// 下面这一块和 Java7 中的 ConcurrentHashMap 迁移是差不多的,// 需要将链表一分为二,//   找到原链表中的 lastRun,然后 lastRun 及其之后的节点是一起进行迁移的//   lastRun 之前的节点需要进行克隆,然后分到两个链表中int runBit = fh & n;Node<K,V> lastRun = f;for (Node<K,V> p = f.next; p != null; p = p.next) {int b = p.hash & n;if (b != runBit) {runBit = b;lastRun = p;}}if (runBit == 0) {ln = lastRun;hn = null;}else {hn = lastRun;ln = null;}for (Node<K,V> p = f; p != lastRun; p = p.next) {int ph = p.hash; K pk = p.key; V pv = p.val;if ((ph & n) == 0)ln = new Node<K,V>(ph, pk, pv, ln);elsehn = new Node<K,V>(ph, pk, pv, hn);}// 其中的一个链表放在新数组的位置 isetTabAt(nextTab, i, ln);// 另一个链表放在新数组的位置 i+nsetTabAt(nextTab, i + n, hn);// 将原数组该位置处设置为 fwd,代表该位置已经处理完毕,//    其他线程一旦看到该位置的 hash 值为 MOVED,就不会进行迁移了setTabAt(tab, i, fwd);// advance 设置为 true,代表该位置已经迁移完毕advance = true;}else if (f instanceof TreeBin) {// 红黑树的迁移TreeBin<K,V> t = (TreeBin<K,V>)f;TreeNode<K,V> lo = null, loTail = null;TreeNode<K,V> hi = null, hiTail = null;int lc = 0, hc = 0;for (Node<K,V> e = t.first; e != null; e = e.next) {int h = e.hash;TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null);if ((h & n) == 0) {if ((p.prev = loTail) == null)lo = p;elseloTail.next = p;loTail = p;++lc;}else {if ((p.prev = hiTail) == null)hi = p;elsehiTail.next = p;hiTail = p;++hc;}}// 如果一分为二后,节点数少于 8,那么将红黑树转换回链表ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :(hc != 0) ? new TreeBin<K,V>(lo) : t;hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :(lc != 0) ? new TreeBin<K,V>(hi) : t;// 将 ln 放置在新数组的位置 isetTabAt(nextTab, i, ln);// 将 hn 放置在新数组的位置 i+nsetTabAt(nextTab, i + n, hn);// 将原数组该位置处设置为 fwd,代表该位置已经处理完毕,//    其他线程一旦看到该位置的 hash 值为 MOVED,就不会进行迁移了setTabAt(tab, i, fwd);// advance 设置为 true,代表该位置已经迁移完毕advance = true;}}}}}
}

代码逻辑请看注释,整个扩容操作分为两个部分

第一部分是构建一个nextTable,它的容量是原来的两倍,这个操作是单线程完成的。新建table数组的代码为:Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1],在原容量大小的基础上右移一位。

第二个部分就是将原来table中的元素复制到nextTable中,主要是遍历复制的过程。

根据运算得到当前遍历的数组的位置i,然后利用tabAt方法获得i位置的元素再进行判断:

  1. 如果这个位置为空,就在原table中的i位置放入forwardNode节点,这个也是触发并发扩容的关键点;
  2. 如果这个位置是Node节点(fh>=0),如果它是一个链表的头节点,就构造一个反序链表,把他们分别放在nextTable的i和i+n的位置上
  3. 如果这个位置是TreeBin节点(fh<0),也做一个反序处理,并且判断是否需要untreefi,把处理的结果分别放在nextTable的i和i+n的位置上
  4. 遍历过所有的节点以后就完成了复制工作,这时让nextTable作为新的table,并且更新sizeCtl为新容量的0.75倍 ,完成扩容。设置为新容量的0.75倍代码为 sizeCtl = (n << 1) - (n >>> 1),仔细体会下是不是很巧妙,n<<1相当于n右移一位表示n的两倍即2n,n>>>1左右一位相当于n除以2即0.5n,然后两者相减为2n-0.5n=1.5n,是不是刚好等于新容量的0.75倍即2n*0.75=1.5n。
3.helpTransfer()
/*** Helps transfer if a resize is in progress.*/
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {Node<K,V>[] nextTab; int sc;// 如果 table 不是空 且 node 节点是转移类型,数据检验// 且 node 节点的 nextTable(新 table) 不是空,同样也是数据校验// 尝试帮助扩容if (tab != null && (f instanceof ForwardingNode) &&(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {// 根据 length 得到一个标识符号int rs = resizeStamp(tab.length);// 如果 nextTab 没有被并发修改 且 tab 也没有被并发修改// 且 sizeCtl  < 0 (说明还在扩容)while (nextTab == nextTable && table == tab &&(sc = sizeCtl) < 0) {// 如果 sizeCtl 无符号右移  16 不等于 rs ( sc前 16 位如果不等于标识符,则标识符变化了)// 或者 sizeCtl == rs + 1  (扩容结束了,不再有线程进行扩容)(默认第一个线程设置 sc ==rs 左移 16 位 + 2,当第一个线程结束扩容了,就会将 sc 减一。这个时候,sc 就等于 rs + 1)// 或者 sizeCtl == rs + 65535  (如果达到最大帮助线程的数量,即 65535)// 或者转移下标正在调整 (扩容结束)// 结束循环,返回 tableif ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || transferIndex <= 0)break;// 如果以上都不是, 将 sizeCtl + 1, (表示增加了一个线程帮助其扩容)if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {// 进行转移transfer(tab, nextTab);// 结束循环break;}}return nextTab;}return table;
}
[6] 深入分析get方法与无锁并发

get 逻辑比较简单,只需要将 Key 通过 Hash 之后定位到具体的 Segment ,再通过一次 Hash 定位到具体的元素上。ConcurrentHashMap 的 get 方法是非常高效的,因为整个过程都不需要加锁。为什么呢?简单来说:由于 HashEntry 中的 value 属性是用 volatile 关键词修饰的,保证了内存可见性,所以每次获取时都是最新值。

分析对比:

在jdk1.7中是采用Segment + HashEntry + ReentrantLock的方式进行实现的,而1.8中放弃了Segment臃肿的设计,取而代之的是采用Node + CAS + Synchronized来保证并发安全进行实现。

  • JDK1.8的实现降低锁的粒度,JDK1.7版本锁的粒度是基于Segment的,包含多个HashEntry,而JDK1.8锁的粒度就是HashEntry(首节点)

  • JDK1.8版本的数据结构变得更加简单,使得操作也更加清晰流畅,因为已经使用synchronized来进行同步,所以不需要分段锁的概念,也就不需要Segment这种数据结构了,由于粒度的降低,实现的复杂度也增加了

  • JDK1.8使用红黑树来优化链表,基于长度很长的链表的遍历是一个很漫长的过程,而红黑树的遍历效率是很快的,代替一定阈值的链表,这样形成一个最佳拍档

  • JDK1.8使用红黑树来优化链表,基于长度很长的链表的遍历是一个很漫长的过程,而红黑树的遍历效率是很快的,代替一定阈值的链表,这样形成一个最佳拍档

get操作源码

  1. 首先计算hash值,定位到该table索引位置,如果是首节点符合就返回
  2. 如果遇到扩容的时候,会调用标志正在扩容节点ForwardingNode的find方法,查找该节点,匹配就返回
  3. 以上都不符合的话,就往下遍历节点,匹配就返回,否则最后就返回null
//会发现源码中没有一处加了锁,CAS也没有
public V get(Object key) {Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;int h = spread(key.hashCode()); //计算hashif ((tab = table) != null && (n = tab.length) > 0 &&(e = tabAt(tab, (n - 1) & h)) != null) {//读取首节点的Node元素if ((eh = e.hash) == h) { //如果该节点就是首节点就返回if ((ek = e.key) == key || (ek != null && key.equals(ek)))return e.val;}
//hash值为负值表示正在扩容,这个时候查的是ForwardingNode的find方法来定位到nextTable来
//eh=-1,说明该节点是一个ForwardingNode,正在迁移,此时调用ForwardingNode的find方法去nextTable里找。
//eh=-2,说明该节点是一个TreeBin,此时调用TreeBin的find方法遍历红黑树,由于红黑树有可能正在旋转变色,所以find里会有读写锁。
//eh>=0,说明该节点下挂的是一个链表,直接遍历该链表即可。else if (eh < 0)return (p = e.find(h, key)) != null ? p.val : null;while ((e = e.next) != null) {//既不是首节点也不是ForwardingNode,那就往下遍历if (e.hash == h &&((ek = e.key) == key || (ek != null && key.equals(ek))))return e.val;}}return null;
}

get没有加锁的话,ConcurrentHashMap是如何保证读到的数据不是脏数据的呢?volatile登场

对于可见性,Java提供了volatile关键字来保证可见性有序性但不保证原子性
普通的共享变量不能保证可见性,因为普通共享变量被修改之后,什么时候被写入主存是不确定的,当其他线程去读取时,此时内存中可能还是原来的旧值,因此无法保证可见性。

  • volatile关键字对于基本类型的修改可以在随后对多个线程的读保持一致,但是对于引用类型如数组,实体bean,仅仅保证引用的可见性,但并不保证引用内容的可见性。。
  • 禁止进行指令重排序。

背景:为了提高处理速度,处理器不直接和内存进行通信,而是先将系统内存的数据读到内部缓存(L1,L2或其他)后再进行操作,但操作完不知道何时会写到内存。

  • 如果对声明了volatile的变量进行写操作,JVM就会向处理器发送一条指令,将这个变量所在缓存行的数据写回到系统内存。但是,就算写回到内存,如果其他处理器缓存的值还是旧的,再执行计算操作就会有问题。
  • 在多处理器下,为了保证各个处理器的缓存是一致的,就会实现缓存一致性协议,当某个CPU在写数据时,如果发现操作的变量是共享变量,则会通知其他CPU告知该变量的缓存行是无效的,因此其他CPU在读取该变量时,发现其无效会重新从主存中加载数据。

    总结下来
  • 第一:使用volatile关键字会强制将修改的值立即写入主存;
  • 第二:使用volatile关键字的话,当线程2进行修改时,会导致线程1的工作内存中缓存变量的缓存行无效(反映到硬件层的话,就是CPU的L1或者L2缓存中对应的缓存行无效);
  • 第三:由于线程1的工作内存中缓存变量的缓存行无效,所以线程1再次读取变量的值时会去主存读取。

是加在数组上的volatile吗?

    /*** The array of bins. Lazily initialized upon first insertion.* Size is always a power of two. Accessed directly by iterators.*/transient volatile Node<K,V>[] table;

我们知道volatile可以修饰数组的,只是意思和它表面上看起来的样子不同。举个栗子,volatile int array[10]是指array的地址是volatile的而不是数组元素的值是volatile的.

用volatile修饰的Node:get操作可以无锁是由于Node的元素val和指针next是用volatile修饰的,在多线程环境下线程A修改结点的val或者新增节点的时候是对线程B可见的

static class Node<K,V> implements Map.Entry<K,V> {final int hash;final K key;//可以看到这些都用了volatile修饰volatile V val;volatile Node<K,V> next;Node(int hash, K key, V val, Node<K,V> next) {this.hash = hash;this.key = key;this.val = val;this.next = next;}public final K getKey()       { return key; }public final V getValue()     { return val; }public final int hashCode()   { return key.hashCode() ^ val.hashCode(); }public final String toString(){ return key + "=" + val; }public final V setValue(V value) {throw new UnsupportedOperationException();}public final boolean equals(Object o) {Object k, v, u; Map.Entry<?,?> e;return ((o instanceof Map.Entry) &&(k = (e = (Map.Entry<?,?>)o).getKey()) != null &&(v = e.getValue()) != null &&(k == key || k.equals(key)) &&(v == (u = val) || v.equals(u)));}/*** Virtualized support for map.get(); overridden in subclasses.*/Node<K,V> find(int h, Object k) {Node<K,V> e = this;if (k != null) {do {K ek;if (e.hash == h &&((ek = e.key) == k || (ek != null && k.equals(ek))))return e;} while ((e = e.next) != null);}return null;}
}

既然volatile修饰数组对get操作没有效果那加在数组上的volatile的目的是什么呢?

其实就是为了使得Node数组在扩容的时候对其他线程具有可见性而加的volatile

总结

  • 在1.8中ConcurrentHashMap的get操作全程不需要加锁,这也是它比其他并发集合比如hashtable、用Collections.synchronizedMap()包装的hashmap;安全效率高的原因之一。
  • get操作全程不需要加锁是因为Node的成员val是用volatile修饰的和数组用volatile修饰没有关系。
  • 数组用volatile修饰主要是保证在数组扩容的时候保证可见性。
[7] size源码

size 计算实际发生在 put,remove 改变集合元素的操作之中

  • 没有竞争发生,向 baseCount 累加计数

  • 有竞争发生,新建 counterCells,向其中的一个 cell 累加计数

    • counterCells 初始有两个 cell
    • 如果计数竞争比较激烈,会创建新的 cell 来累加计数
/*** Replaces all linked nodes in bin at given index unless table is* too small, in which case resizes instead.* 当数组长度小于64的时候,扩张数组长度一倍,否则的话把链表转为树*/private final void treeifyBin(Node<K,V>[] tab, int index) {Node<K,V> b; int n, sc;if (tab != null) {System.out.println("treeifyBin方\t==>数组长:"+tab.length);if ((n = tab.length) < MIN_TREEIFY_CAPACITY)    //MIN_TREEIFY_CAPACITY 64tryPresize(n << 1);        // 数组扩容else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {synchronized (b) {    //使用synchronized同步器,将该节点出的链表转为树if (tabAt(tab, index) == b) {TreeNode<K,V> hd = null, tl = null;    //hd:树的头(head)for (Node<K,V> e = b; e != null; e = e.next) {TreeNode<K,V> p =new TreeNode<K,V>(e.hash, e.key, e.val,null, null);if ((p.prev = tl) == null)        //把Node组成的链表,转化为TreeNode的链表,头结点任然放在相同的位置hd = p;    //设置headelsetl.next = p;tl = p;}setTabAt(tab, index, new TreeBin<K,V>(hd));//把TreeNode的链表放入容器TreeBin中}}}}}

可以看到当需要扩容的时候,调用的时候tryPresize方法,看看trePresize的源码

/*** 扩容表为指可以容纳指定个数的大小(总是2的N次方)* 假设原来的数组长度为16,则在调用tryPresize的时候,size参数的值为16<<1(32),此时sizeCtl的值为12* 计算出来c的值为64,则要扩容到sizeCtl≥为止*  第一次扩容之后 数组长:32 sizeCtl:24*  第二次扩容之后 数组长:64 sizeCtl:48*  第二次扩容之后 数组长:128 sizeCtl:94 --> 这个时候才会退出扩容*/private final void tryPresize(int size) {/** MAXIMUM_CAPACITY = 1 << 30* 如果给定的大小大于等于数组容量的一半,则直接使用最大容量,* 否则使用tableSizeFor算出来* 后面table一直要扩容到这个值小于等于sizeCtrl(数组长度的3/4)才退出扩容*/int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :tableSizeFor(size + (size >>> 1) + 1);int sc;while ((sc = sizeCtl) >= 0) {Node<K,V>[] tab = table; int n;
//            printTable(tab);    调试用的/** 如果数组table还没有被初始化,则初始化一个大小为sizeCtrl和刚刚算出来的c中较大的一个大小的数组* 初始化的时候,设置sizeCtrl为-1,初始化完成之后把sizeCtrl设置为数组长度的3/4* 为什么要在扩张的地方来初始化数组呢?这是因为如果第一次put的时候不是put单个元素,* 而是调用putAll方法直接put一个map的话,在putALl方法中没有调用initTable方法去初始化table,* 而是直接调用了tryPresize方法,所以这里需要做一个是不是需要初始化table的判断*/if (tab == null || (n = tab.length) == 0) {n = (sc > c) ? sc : c;if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {    //初始化tab的时候,把sizeCtl设为-1try {if (table == tab) {@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];table = nt;sc = n - (n >>> 2);}} finally {sizeCtl = sc;}}}/** 一直扩容到的c小于等于sizeCtl或者数组长度大于最大长度的时候,则退出* 所以在一次扩容之后,不是原来长度的两倍,而是2的n次方倍*/else if (c <= sc || n >= MAXIMUM_CAPACITY) {break;    //退出扩张}else if (tab == table) {int rs = resizeStamp(n);/** 如果正在扩容Table的话,则帮助扩容* 否则的话,开始新的扩容* 在transfer操作,将第一个参数的table中的元素,移动到第二个元素的table中去,* 虽然此时第二个参数设置的是null,但是,在transfer方法中,当第二个参数为null的时候,* 会创建一个两倍大小的table*/if (sc < 0) {Node<K,V>[] nt;if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||transferIndex <= 0)break;/** transfer的线程数加一,该线程将进行transfer的帮忙* 在transfer的时候,sc表示在transfer工作的线程数*/if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))transfer(tab, nt);}/** 没有在初始化或扩容,则开始扩容*/else if (U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2)) {transfer(tab, null);}}}}

在tryPresize方法中,并没有加锁,允许多个线程进入,如果数组正在扩张,则当前线程也去帮助扩容。

数组扩容的主要方法就是transfer方法

/*** Moves and/or copies the nodes in each bin to new table. See* above for explanation.* 把数组中的节点复制到新的数组的相同位置,或者移动到扩张部分的相同位置* 在这里首先会计算一个步长,表示一个线程处理的数组长度,用来控制对CPU的使用,* 每个CPU最少处理16个长度的数组元素,也就是说,如果一个数组的长度只有16,那只有一个线程会对其进行扩容的复制移动操作* 扩容的时候会一直遍历,知道复制完所有节点,没处理一个节点的时候会在链表的头部设置一个fwd节点,这样其他线程就会跳过他,* 复制后在新数组中的链表不是绝对的反序的*/private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {int n = tab.length, stride;if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)    //MIN_TRANSFER_STRIDE 用来控制不要占用太多CPUstride = MIN_TRANSFER_STRIDE; // subdivide range    //MIN_TRANSFER_STRIDE=16/** 如果复制的目标nextTab为null的话,则初始化一个table两倍长的nextTab* 此时nextTable被设置值了(在初始情况下是为null的)* 因为如果有一个线程开始了表的扩张的时候,其他线程也会进来帮忙扩张,* 而只是第一个开始扩张的线程需要初始化下目标数组*/if (nextTab == null) {            // initiatingtry {@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];nextTab = nt;} catch (Throwable ex) {      // try to cope with OOMEsizeCtl = Integer.MAX_VALUE;return;}nextTable = nextTab;transferIndex = n;}int nextn = nextTab.length;/** 创建一个fwd节点,这个是用来控制并发的,当一个节点为空或已经被转移之后,就设置为fwd节点* 这是一个空的标志节点*/ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);boolean advance = true;    //是否继续向前查找的标志位boolean finishing = false; // to ensure sweep(清扫) before committing nextTab,在完成之前重新在扫描一遍数组,看看有没完成的没for (int i = 0, bound = 0;;) {Node<K,V> f; int fh;while (advance) {int nextIndex, nextBound;if (--i >= bound || finishing) {advance = false;}else if ((nextIndex = transferIndex) <= 0) {i = -1;advance = false;}else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ?nextIndex - stride : 0))) {bound = nextBound;i = nextIndex - 1;advance = false;}}if (i < 0 || i >= n || i + n >= nextn) {int sc;if (finishing) {        //已经完成转移nextTable = null;table = nextTab;sizeCtl = (n << 1) - (n >>> 1);    //设置sizeCtl为扩容后的0.75return;}if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) {return;}finishing = advance = true;i = n; // recheck before commit}}else if ((f = tabAt(tab, i)) == null)            //数组中把null的元素设置为ForwardingNode节点(hash值为MOVED[-1])advance = casTabAt(tab, i, null, fwd);else if ((fh = f.hash) == MOVED)advance = true; // already processedelse {synchronized (f) {                //加锁操作if (tabAt(tab, i) == f) {Node<K,V> ln, hn;if (fh >= 0) {        //该节点的hash值大于等于0,说明是一个Node节点/** 因为n的值为数组的长度,且是power(2,x)的,所以,在&操作的结果只可能是0或者n* 根据这个规则*         0-->  放在新表的相同位置*         n-->  放在新表的(n+原来位置)*/int runBit = fh & n; Node<K,V> lastRun = f;
/*
* lastRun 表示的是需要复制的最后一个节点
* 每当新节点的hash&n -> b 发生变化的时候,就把runBit设置为这个结果b
* 这样for循环之后,runBit的值就是最后不变的hash&n的值
* 而lastRun的值就是最后一次导致hash&n 发生变化的节点(假设为p节点)
* 为什么要这么做呢?因为p节点后面的节点的hash&n 值跟p节点是一样的,
* 所以在复制到新的table的时候,它肯定还是跟p节点在同一个位置
* 在复制完p节点之后,p节点的next节点还是指向它原来的节点,就不需要进行
* 复制了,自己就被带过去了
* 这也就导致了一个问题就是复制后的链表的顺序并不一定是原来的倒序
*/for (Node<K,V> p = f.next; p != null; p = p.next) {int b = p.hash & n;    //n的值为扩张前的数组的长度if (b != runBit) {runBit = b;lastRun = p;}}if (runBit == 0) {ln = lastRun;hn = null;}else {hn = lastRun;ln = null;}/** 构造两个链表,顺序大部分和原来是反的* 分别放到原来的位置和新增加的长度的相同位置(i/n+i)*/for (Node<K,V> p = f; p != lastRun; p = p.next) {int ph = p.hash; K pk = p.key; V pv = p.val;if ((ph & n) == 0)/** 假设runBit的值为0,* 则第一次进入这个设置的时候相当于把旧的序列的最后一次发生hash变化的节点(该节点后面可能还有hash计算后同为0的节点)设置到旧的table的第一个hash计算后为0的节点下一个节点* 并且把自己返回,然后在下次进来的时候把它自己设置为后面节点的下一个节点*/ln = new Node<K,V>(ph, pk, pv, ln);else/** 假设runBit的值不为0,* 则第一次进入这个设置的时候相当于把旧的序列的最后一次发生hash变化的节点(该节点后面可能还有hash计算后同不为0的节点)设置到旧的table的第一个hash计算后不为0的节点下一个节点* 并且把自己返回,然后在下次进来的时候把它自己设置为后面节点的下一个节点*/hn = new Node<K,V>(ph, pk, pv, hn);    }setTabAt(nextTab, i, ln);    setTabAt(nextTab, i + n, hn);setTabAt(tab, i, fwd);advance = true;}else if (f instanceof TreeBin) {    //否则的话是一个树节点TreeBin<K,V> t = (TreeBin<K,V>)f;TreeNode<K,V> lo = null, loTail = null;TreeNode<K,V> hi = null, hiTail = null;int lc = 0, hc = 0;for (Node<K,V> e = t.first; e != null; e = e.next) {int h = e.hash;TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null);if ((h & n) == 0) {if ((p.prev = loTail) == null)lo = p;elseloTail.next = p;loTail = p;++lc;}else {if ((p.prev = hiTail) == null)hi = p;elsehiTail.next = p;hiTail = p;++hc;}}/** 在复制完树节点之后,判断该节点处构成的树还有几个节点,* 如果≤6个的话,就转回为一个链表*/ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :(hc != 0) ? new TreeBin<K,V>(lo) : t;hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :(lc != 0) ? new TreeBin<K,V>(hi) : t;setTabAt(nextTab, i, ln);setTabAt(nextTab, i + n, hn);setTabAt(tab, i, fwd);advance = true;}}}}}}

到这里,ConcurrentHashMap的put操作和扩容都介绍的差不多了,下面的两点一定要注意:

  • 复制之后的新链表不是旧链表的绝对倒序。

  • 在扩容的时候每个线程都有处理的步长,最少为16,在这个步长范围内的数组节点只有自己一个线程来处理

[8] 小结

1.8 在 1.7 的数据结构上做了大的改动,采用红黑树之后可以保证查询效率(O(logn)),甚至取消了 ReentrantLock 改为了 synchronized,这样可以看出在新版的 JDK 中对 synchronized 优化是很到位的。

ConcurrentHashMap锁进一步细化,去除segment。采用数组(Node) +( 链表 Node | 红黑树 TreeNode ) 以下数组简称(table),链表简称(bin)

  • 初始化,使用 cas 来保证并发安全,懒惰初始化 table

  • 树化,当 table.length < 64 时,先尝试扩容,超过 64 时,并且 bin.length > 8 时,会将链表树化,树化过程会用 synchronized 锁住链表头

  • put,如果该 bin 尚未创建,只需要使用 cas 创建 bin;如果已经有了,锁住链表头进行后续 put 操作,元素

添加至 bin 的尾部

  • get无锁操作仅需要保证可见性,扩容过程中 get 操作拿到的是 ForwardingNode 它会让 get 操作在新

table 进行搜索

  • 扩容,扩容时以 bin 为单位进行,需要对 bin 进行 synchronized,但这时妙的是其它竞争线程也不是无事可

做,它们会帮助把其它 bin 进行扩容,扩容时平均只有 1/6 的节点会把复制到新 table 中

  • size,元素个数保存在 baseCount 中,并发时的个数变动保存在 CounterCell[] 当中。最后统计数量时累加

即可。

[9] 要点总结
  1. 数据结构

抛弃了原有的 Segment 分段锁,而采用了 CAS + synchronized 来保证并发安全性。

跟HashMap很像,也把之前的HashEntry改成了Node,但是作用不变,把值和next采用了volatile去修饰,保证了可见性,并且也引入了红黑树,在链表大于一定值的时候会转换(默认是8)。

  1. put源码

ConcurrentHashMap在进行put操作的还是比较复杂的,大致可以分为以下步骤:

  1. 当添加一对键值对的时候,首先会去判断K,V是否为空,如果为空,否则的话抛出空指针异常;
  2. 取得key的hash值, spread()保证key是一个正整数
  3. 进入一个循环,直到正确添加:
  • 第一次put的时候如果table有没有初始化,则initTable()(使用CAS)初始化table;

  • 求出结点应该插入hash表的位置,如果位置为空,则创建头结点并安全(使用CAS)插入;

  • 如果位置上有节点,则通过节点的hash值来判断当前结点是否在扩容,如果MOVED(-1)则表示有其他线程正在对这个数组进行扩容,则当前线程也 利用helpTransfer()(使用CAS)去帮助扩容。

  • 最后一种情况就是,如果这个节点,不为空,也不在扩容,说明发生hash冲突了。则加锁(synchronized)这个链表的头结点进行添加操作,然后判断当前取出的节点位置存放的是链表还是树:

    • 如果是链表的话(hash大于0),就对这个链表的所有元素进行遍历如果找到了key和key的hash值都一样的节点,则把它的值替换;如果没找到的话,则添加在链表的最后面。
    • 否则,是树的话,则调用putTreeVal方法添加到树中去,在添加完之后,会对该节点上关联的的数目进行判断,如果在8个以上的话,则会调用treeifyBin方法,来尝试转化为树,或者是扩容。

总结:

  1. 当发生hash冲突才使用synchronized锁住链表头,在其他情况下都是用CAS
  2. 有其他线程正在对这个数组进行扩容,会使用helpTransfer()(使用了CAS)去帮忙扩容。
  1. size 源码

    计算实际发生在 put,remove 改变集合元素的操作之中

  • 没有竞争发生,向 baseCount 累加计数

  • 有竞争发生,新建 counterCells,向其中的一个 cell 累加计数

    • counterCells 初始有两个 cell
    • 如果计数竞争比较激烈,会创建新的 cell 来累加计数

【阅读源码系列】ConcurrentHashMap源码分析(JDK1.7和1.8)相关推荐

  1. Java集合Collection源码系列-ArrayList源码分析

    Java集合系列-ArrayList源码分析 文章目录 Java集合系列-ArrayList源码分析 前言 一.为什么想去分析ArrayList源码? 二.源码分析 1.宏观上分析List 2.方法汇 ...

  2. Java集合系列---ConcurrentHashMap源码解析

    ConcurrentHashMap是Java并发容器的一员,jdk1.8以后的基本的数据结构和HashMap相似,也是选用了数组+链表/红黑树的结构,在jdk1,.7以前则是采用了分段锁的技术.Con ...

  3. JUC源码系列-CountDownLatch源码研读

    前言 CountDownLatch是一个很有用的工具,latch是门闩的意思,该工具是为了解决某些操作只能在一组操作全部执行完成后才能执行的情景.例如,小组早上开会,只有等所有人到齐了才能开:再如,游 ...

  4. 高效阅读嵌入式源码系列一:静态分析神器understand软件基本操作

    系列文章目录 高效阅读嵌入式源码系列一:静态分析神器understand软件基本操作 高效阅读嵌入式源码系列二:understand阅读linux.uboot等源码 高效阅读嵌入式源码系列三:unde ...

  5. ConcurrentHashMap源码解析_01 成员属性、内部类、构造方法分析

    文章参考:小刘源码 ConcurrentHashMap源码解析_01 成员属性.内部类.构造方法分析 1.简介 ConcurrentHashMap是HashMap的线程安全版本,内部也是使用(数组 + ...

  6. ConcurrentHashMap源码分析(2)——JDK1.8的实现

    ConcurrentHashMap源码分析(1)--JDK1.7的实现 前言 在JDK1.7版本上,ConcurrentHashMap还是通过分段锁来实现的,Segment的数量制约着并发量.在JDK ...

  7. ConcurrentHashMap源码分析(1)——JDK1.7的实现

    ConcurrentHashMap源码分析 ConcurrentHashMap源码分析(2)--JDK1.8的实现 前言 ConcurrentHashMap是线程安全且高效的HashMap的实现,在并 ...

  8. c++ map 获取key列表_好未来Golang源码系列一:Map实现原理分析

    分享老师:学而思网校 郭雨田 一.map的结构与设计原理 golang中map是一个kv对集合.底层使用hash table,用链表来解决冲突 ,出现冲突时,不是每一个key都申请一个结构通过链表串起 ...

  9. JDK1.8 中 ConcurrentHashMap源码分析(一)容器初始化

    上一篇文章中说到如何使用IDEA搭建JDK1.8阅读学习环境,JDK1.8源码下载及获取.导入IDEA阅读.配置JDK源码.这篇文章将学习ConcurrentHashMap源码

最新文章

  1. cxgrid 保存数据_什么是大数据
  2. php+html 实现加减乘除
  3. VTK:Math之MatrixTranspose
  4. 实施cisco catalyst 交换机的管理和数据平面安全特性
  5. 二叉排序树查找的c语言程序,C语言二叉排序(搜索)树实例
  6. [收藏]上班族的真实写照
  7. 运筹优化(三)--线性规划之单纯形法
  8. ORA-600 各个参数含义说明
  9. AfxMessageBox详细使用说明
  10. 【error】 in ./api/axios.js Module parse failed: Unexpected token
  11. Pano2VR生成的HTML文件打开为黑屏
  12. 【Windows】windows生成rsa密钥对
  13. Java源码阅读绘图规范手册--[捷特版]
  14. 单号查询方法,怎么查快递物流到哪里了
  15. Epic宣布免费开放虚幻4引擎
  16. Notepad++ 设置tab为N个空格
  17. ctrl键频繁失灵,但不是键盘本身的问题,换个键盘同样失灵
  18. 钟南山团队携手腾讯研发新冠重症AI预测 成果登上Nature子刊
  19. 写一程序,用scanf函数输入x,输出y值。
  20. 信息系统项目管理师EV、PV、AC、BAC、CV、SV、EAC、ETC、CPI、SPI概念说明

热门文章

  1. undo系列学习之undo入门基础知识介绍
  2. 售价五万,4.6升/百公里,它们是国内油耗最低的车
  3. css——指定某个区域可垂直或水平滑动
  4. 【单片机】C52单片机之4X4矩阵键盘和数码管联动
  5. Error creating bean with name 'sqlSessionFactory' defined in URL
  6. Mediapipe 基于KNIFT如何输出识别数据
  7. 甜菜碱改善肥胖和代谢,肠道菌群是关键
  8. PLC实验五(LED数码管显示控制)
  9. 大旺中学2021年高考成绩查询,2021年肇庆中考分数线什么时候出来,查询入口公布时间规定...
  10. 大学四年要做的四件事