文章目录

  • 1.传统集合框架并发编程中Map存在的问题?
  • 2.早期改进策略
  • 3.ConcurrentHashMap采取了哪些方法来提高并发表现(jdk1.8)?
  • 4.ConcurrentHashMap实现分析
    • table:
    • nextTable
    • sizeCtl (不同场景有不同意义,肥肠重要!!!)
    • sun.misc.Unsafe U
    • tabAt
    • casTabAt
    • setTabAt
    • put
      • 详细分析put的扩容操作
        • CounterCells 解释
        • transfer 扩容阶段
        • 数据迁移:
    • get

1.传统集合框架并发编程中Map存在的问题?

  • HashMap死循环,造成CPU100%负载
    HashMap进行存储时,如果size超过(当前最大容量*负载因子)时候会发生resize,而resize中又调用了又调用了transfer()方法,而这个方法实现的机制就是将每个链表转化到新链表,并且链表中的位置发生反转,而这在多线程情况下是很容易造成链表回路,从而发生死循环;
  • 元素丢失问题,多线程put操作,hash碰撞时候两个线程得到同样的bucketIndex可能会导致覆盖的情况,有一个元素会丢失;
  • 还有其他的,路过的评论区补充一下……

2.早期改进策略

  1. HashTable
    HashTable相比HashMap是线程安全的,因为HashTable所有的方法都是加了synchronized的,锁的是整个hashMap,也就是我们说的锁的粒度比较大,由于最基本的put,set操作都加了互斥锁,造成的结果就是同一时间点只能由一个线程put或只能get,并发操作时所有的put,get操作都必须等一个线程完了之后再操作,线程安全得到了保证,但大大降低了并发效率,在非高度的并发的场景可取,高度并发时往往不可取, 。
  2. jdk1.8以前的ConcurrentHashMap
    ConcurrentHashMap在jdk1.7及以前采用的是锁分段机制来保证HashMap的线程安全,锁分段也就是将HashMap内部分段,每段是一个segment, 对每个segment加锁( 可以理解为ConcurrentHashMap是一个segment数组 ),每个段里面包含多个HashEntry,和原HashMap类似,hash相同的entry也是以链表形式存放,这样锁的粒度相比HashTable就小了很多,值得注意的是,1.7的ConcurrentHashMap是通过继承ReentrantLock 来进行加锁的,不同于之前HashTable使用synchronize的加锁形式;通过锁住每个segment来保证每个segment内的操作的线程安全性,也就避免了HashTable的整体同步,一定程度上提升了性能;
    另外在构造的时候, Segment的数量由所谓的concurrentcyLevel决定, 默认是16; 和HashMap的初始容量一致, 也可以在相应构造函数直接指定。 同样是2的幂数值, 如果输入是类似15这种非幂值, 会被自动调整到16之类2的幂数值。所以,默认情况下此时的ConcurrentHashMap支持16个线程并发操作
  3. 除了以上两种方法意外,Collections本身也提供了一种安全机制,就是通过Map<K,V> synchronizedMap(Map<K,V> m)方法将其包装为一个线程安全的map,我们看一下它的put源码实现就清除了:
public V put(K key, V value) {synchronized (mutex) {return m.put(key, value);}
}

以上简单的说了早期如何保证HashMap的线程安全,下面详细分析一下jdk1.8如何保证线程安全

3.ConcurrentHashMap采取了哪些方法来提高并发表现(jdk1.8)?

相比1.7做了两个改进:
1.取消了锁分段的设计,直接使用Node 数组来保存数据,并且用Node数组来保存数据,并且采用Node数组元素作为锁来实现对每一行数据加锁来进一步减少并发冲突的概率。
2.引入了红黑树的设计,在原来的数组+链表的基础上新增了红黑树的设计,当链表的长度超过8的时候就将链表转为红黑树,此时查询的复杂度也降低到了O(logN), 提升了查询的性能。
3.这一点不知道算不算是改进,但是和1.7确实是不一样的,为了解决线程安全问题,这一版的ConcurrentHashMap采用了synchronzied和CAS的方式,至于为什么选用了synchronzied我猜是因为1.8的synchronzied也做了很多的优化,包括偏向锁到轻量级所到重量级锁膨胀,因此改进后的synchronzied相较于ReentrantLock的性能在某些情况下并不差或许会更优,所以这里才选择了synchronzied来加锁,cas无锁操作的特性我就不多说了,比较容易理解。
稍后我们分析put源码的时候会看到这部分变化的具体实现。
另外,关于1.8版本的synchronzied优化可以查看本系列中博客中的:
【Java并发】-- synchronized原理 (偏向锁,轻量级锁,重量级锁膨胀过程)
结构图:
这个和jdk1.8的hashmap结构一致,但增加了线程安全的实现,所以结构简单,但实现会复杂一些;

4.ConcurrentHashMap实现分析

4.1 ConcurrentHashMap中关键的属性

table:

 //装载Node的数组,作为ConcurrentHashMap的数据容器,采用懒加载的方式,//直到第一次插入数据的时候才会进行初始化操作,数组的大小总是为2的幂次方。volatile Node<K,V>[] table:

nextTable

//扩容时使用,平时为null,只有在扩容的时候才为非null,
volatile Node<K,V>[] nextTable;

sizeCtl (不同场景有不同意义,肥肠重要!!!)

// 该属性用来控制table数组的大小,根据是否初始化和是否正在扩容有几种情况:
-------------------------
// 当值为负数时,-1这时表示数组有一个线程正在初始化,-n表示有n-1个线程正在进行扩容操作
// 注意:(扩容时可以多线程协作,但初始化只能有一个线程来完成)
-------------------------
// 当值为正数时:表示当前数组的临界值,也就是数组程度*负载因子得到的临界值,到达这个值就会进行扩容操作
// 当值为0时,是数组的默认初始值,此时还未被初始化。
volatile int sizeCtl;

sun.misc.Unsafe U

在ConcurrentHashMap的实现中也可以看到大量的cas操作,也就是U.compareAndSwapXXX类型的方法,调用这些方法去修改ConcurrentHashMap属性的时候就是利用了cas无锁算法来保证线程安全性,这是乐观锁的完美运用,cas是通过sun.misc.Unsafe类实现的,点到这个类之后我们发现所有的方法基本都是native的,也就是非java实现的接口; Unsafe类提供的方法是可以直接操作内存和线程的底层操作,该成员变量的获取是在静态代码块中:

 static {try {U = sun.misc.Unsafe.getUnsafe();.......} catch (Exception e) {throw new Error(e);}
}

4.2 ConcurrentHashMap中关键的CAS操作

tabAt

该方法获取对象中offset偏移地址对应的对象field的值, 简单来说也就是获取该方法用来获取table数组中索引为i的Node元素,但大家思考一下为什么不直接通过table[i]获取到第i个元素,而非要通过底层Unsafe类来进行table的操作呢?
因为我们虽然在table数组上加了volatile关键字来保证可见性,但是被volatile修饰的数组只针对数组的引用具有可先性,而不针对数组的元素,所以如果有其他个线程对这个数组的某个元素进行写操作的时候,不一定能保证可见性,当前线程也就不一定读到最新的值了。所以这里调用了Unsafe的getObjectVolatile方法保证每个元素都读到最新的值,同时也保证了性能。下面的casTabAt和setTabAt也是同理。

// 该方法用来获取table数组中索引为i的Node元素static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);}

casTabAt

// 利用CAS操作设置table数组中索引为i的元素static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,Node<K,V> c, Node<K,V> v) {return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);}

setTabAt

// 该方法用来设置table数组中索引为i的元素static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);}

4.3 ConcurrentHashMap核心方法
从整体来说为了解决线程安全的问题,ConcurrentHashMap使用了synchronzied和CAS的方式

put

put方法调用的是putVal来进行put操作,我们来分析一下putVal大致做了哪些事情来保证线程安全,下面是核心逻辑,一定要理解!!

  1. 首先用spread方法进行了一次重hash从而减小hash冲突的可能性;
  2. 调用initTable方法初始化table,已经初始化之后会跳过这一步;
  3. 判断是否可以直接将新值插入到table数组中,为什么需要先判断呢?这块其实分了三种情况;首先插入和更新两种,插入的又分为直接插入table和接入链表;如果待插入的位置table[i]刚好为null就可以直接插入。如果hash取模之后发现i已经有元素了,需要对比hash值是否相等,若相等则覆盖原有元素,若不相等则以链表的形式将当前节点next属性更改为新的节点,把他们连起来。这里多线程操作的情况下根据happenbefore规则 线程 A 的 casTabAt 操作,一定对线程 B 的 tabAt 操作可见;
  4. 判断是否正在扩容,如果正在扩容可以协助扩容(但有协助线程数量有限制,跟cpu的核数有关)
  5. 当table[i]为链表的头结点,在链表中插入新值,我们可以看这部分代码用synchronized 同步代码块包了起来,加了互斥锁来保证线程安全。
  6. 当table[i]为红黑树的根节点,在红黑树中插入新值
  7. 根据节点个数调整红黑树
  8. 对容量大小进行检查,超过了临界值需要扩容;
** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null) throw new NullPointerException();//1. 计算key的hash值int hash = spread(key.hashCode());int binCount = 0; // 用来记录链表的长度for (Node<K,V>[] tab = table;;) {// 自旋,当出现线程竞争时不断自旋Node<K,V> f; int n, i, fh;//2. 如果当前table还没有初始化先调用initTable方法将tab进行初始化if (tab == null || (n = tab.length) == 0)tab = initTable(); // 初始化数组方法//3. tab中索引为i的位置的元素为null,则直接使用CAS将值插入即可// 通过hash值对应的数组下标得到第一个节点;以volatile读的方式来读取table数组中的元素,// 保证每次拿到的数据都是最新的else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {// 如果该下标返回的节点为空,则直接cas插入,cas失败则存在竞争,进入下一次循环if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))break;                   // no lock when adding to empty bin}//4. 当前正在扩容else if ((fh = f.hash) == MOVED)tab = helpTransfer(tab, f);else {V oldVal = null;synchronized (f) {if (tabAt(tab, i) == f) {//5. 当前为链表,在链表中插入新的键值对if (fh >= 0) {binCount = 1;for (Node<K,V> e = f;; ++binCount) {K ek;if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent)e.val = value;break;}Node<K,V> pred = e;if ((e = e.next) == null) {pred.next = new Node<K,V>(hash, key,value, null);break;}}}// 6.当前为红黑树,将新的键值对插入到红黑树中else if (f instanceof TreeBin) {Node<K,V> p;binCount = 2;if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}}}// 7.插入完键值对后再根据实际大小看是否需要转换成红黑树if (binCount != 0) {if (binCount >= TREEIFY_THRESHOLD)treeifyBin(tab, i);if (oldVal != null)return oldVal;break;}}}//8.对当前容量大小进行检查,如果超过了临界值(实际大小*加载因子)就需要扩容 addCount(1L, binCount);return null;
}

详细分析put的扩容操作

扩容部分有两个经典的设计:
1.高并发下的扩容
2.如何保证addCount的数据安全性以及性能

 // 调用传参addCount(1L, binCount);// 把当前ConcurrentHashMap的元素个数+1// 这个方法一共做了两件事,更新baseCount的值,检测是否进行扩容
private final void addCount(long x, int check) {CounterCell[] as; long b, s;//利用CAS方法更新baseCount的值 /* 判断 counterCells 是否为空,
1. 如果为空,就通过 cas 操作尝试修改 baseCount 变量,对这个变量进行原子累加操
作(做这个操作的意义是:如果在没有竞争的情况下,仍然采用 baseCount 来记录元素个
数)
2. 如果 cas 失败说明存在竞争,这个时候不能再采用 baseCount 来累加,而是通过
CounterCell 来记录
*/if ((as = counterCells) != null ||!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {CounterCell a; long v; int m;boolean uncontended = true; // 是否冲突标识,默认为没有冲突if (as == null || (m = as.length - 1) < 0 ||(a = as[ThreadLocalRandom.getProbe() & m]) == null ||!(uncontended =U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {fullAddCount(x, uncontended);return;}if (check <= 1)return;s = sumCount();}//如果check值大于等于0 则需要检验是否需要进行扩容操作if (check >= 0) {Node<K,V>[] tab, nt; int n, sc;while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&(n = tab.length) < MAXIMUM_CAPACITY) {int rs = resizeStamp(n);//if (sc < 0) {if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||transferIndex <= 0)break;//如果已经有其他线程在执行扩容操作if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))transfer(tab, nt);}//当前线程是唯一的或是第一个发起扩容的线程  此时nextTable=nullelse if (U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2))transfer(tab, null);s = sumCount();}}
}
CounterCells 解释

更新map的size值这里借用了分布式的思想,起到关键作用的是这里的CounterCell 数组,这个数组里面每个元素都存着一个value值,而最终map的size就是数组中所有value值相加得来的,详细可以查看sumCount的源码;
为什么如此设计呢?
一般的集和在进行put操作的时候,size的大小只要随着put操作i++即可,但是在多线程情况下i++的不安全结果也一定不准确,为了保证这个size共享变量的安全性势必会增加锁的设计,通过自旋,cas或synchronize锁等实现,但在竞争非常激烈的情况下如此这般设计一定会占据资源影响性能,所以这里采用了引入了CounterCells ,采用分布式的思想进行分片化处理,其实看到这里我是非常激动的,必须对Doug Lea大师真的致以最崇高的respect!具体如何实现呢?
注意这里:as[ThreadLocalRandom.getProbe() & m]
as是CounterCells 数组,ThreadLocalRandom是保证在多线程情况下Random生成随机数的线程安全;
实现逻辑:

  1. 计数表(CounterCells 数组)为空则直接调用 fullAddCount ;
  2. 从计数表中随机取出一个数组的位置为空,直接调用 fullAddCount
  3. 通过 CAS 修改 CounterCell 随机位置的值,如果修改失败说明出现并发情况,继续cas即可;
    举个简单的例子:
    比如说现在有三个线程ThreadA/B/C在并发进行put操作,ThreadLocalRandom.getProbe()会为他们生成三个随机数,范围是(0,m),m是CounterCell.length-1; 比如说是初始化的长度为2,此时假设为这三个线程生成了三个随机数0,1,0
    ThreadA拿到了0,ThreadB拿到了1,ThreadC拿到了0,此时他们会针对CounterCell数组对应下表的value进行+1的cas操作,ThreadA会找到CounterCell[0]对0下标处的value元素+1,默认为0,此时通过cas+1后变成1;因为ThreadC也拿到了0下标所以也会对CounterCell[0]进行cas+1操作,cas是无锁操作,ThreadC会一直cas重试,直到ThreadA操作完毕释放锁,于是CounterCell[0]中的value会经历两次+1的cas操作变成2;
    同理ThreadB会将CounterCell[1]处的value值cas更新为1,然后再调用sumCount将CounterCell数组中的所有value元素累加得到真正的size值;
    这样设计带来的好处?
    利用分片思维提高了负载能力,CounterCell的数组长度为多少,就可以支持多少个线程并发的去对size计数,相应的负载能力就会多少倍;CounterCell的默认初始值为2,也就是至少可以提升2倍的负载能力,CounterCell后期同样可以扩容,但扩容的契机我还有待研究,暂不多说。
transfer 扩容阶段

扩容的基本思想是跟hashMap是很像的,另外注意这里的并发扩容是是没有加锁的,所以这里支持并发扩容,效率是很高的,但是实现起来要复杂的多,所以这里也是ConcurrentHashMap 的精华之一;
首先判断是否需要扩容,也就是当更新后的键值对总数 baseCount >= 阈值 sizeCtl 时,进行
rehash,这里面会有两个逻辑。

  1. 如果当前正在处于扩容阶段,则当前线程会加入并且协助扩容
  2. 如果当前没有在扩容,则直接触发扩容操作
    resizeStamp
    这里先提一下resizeStamp这个扩容戳,是扩容时的重要标记;
static final int resizeStamp(int n) {return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}

Integer.numberOfLeadingZeros 这个方法是返回无符号整数 n 最高位非 0 位前面的 0 的个数
比如 10 的二进制是 0000 0000 0000 0000 0000 0000 0000 1010
那么这个方法返回的值就是 28
根据 resizeStamp 的运算逻辑,我们来推演一下,假如 n=16,那么 resizeStamp(16)=32796
转化为二进制是
[0000 0000 0000 0000 1000 0000 0001 1100]
接着再来看,当第一个线程尝试进行扩容的时候,会执行下面这段代码

    U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)

rs 左移 16 位,相当于原本的二进制低位变成了高位 1000 0000 0001 1100 0000 0000 0000 0000
然后再+2
=1000 0000 0001 1100 0000 0000 0000 0000 +10
=1000 0000 0001 1100 0000 0000 0000 0010

这样存储带来的好处??

  • 首先在 CHM 中是支持并发扩容的,也就是说如果当前的数组需要进行扩容操作,可以由多个线程来共同负责;
    第一个扩容的线程,执行 transfer 方法之前,
    会设置 sizeCtl =(resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2)
    后续帮其扩容的线程,执行 transfer 方法之前,会设置 sizeCtl = sizeCtl+1
    每一个退出 transfer 的方法的线程,退出之前,会设置 sizeCtl = sizeCtl-1
    那么最后一个线程退出时:必然有
    sc == (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2),即 (sc - 2)
    == resizeStamp(n) << RESIZE_STAMP_SHIFT
    如果 sc - 2 不等于标识符左移 16 位。如果他们相等了,说明没有线程在帮助他们扩容了。也就是说,扩容结束了。
  • 可以保证每次扩容都生成唯一的生成戳, 每次新的扩容,都有一个不同的 n(n是map的size),这个生成戳就是根据 n 来计算出来的一个数字, n 不同,这个数字也不同

第一个线程尝试扩容的时候,为什么是+2 ??
因为 1 表示初始化,2 表示一个线程在执行扩容,而且对 sizeCtl 的操作都是基于位运算的,
所以不会关心它本身的数值是多少,只关心它在二进制上的数值,而 sc + 1 会在
低 16 位上加 1。

多线程扩容要注意的问题?
在扩容的时候其他线程也可能正在添加元素,这时又触发了扩容怎么办? 可能大家想到的第
一个解决方案是加互斥锁,把转移过程锁住,虽然是可行的解决方案,但是会带来较大的性
能开销。因为互斥锁会导致所有访问临界区的线程陷入到阻塞状态,持有锁的线程耗时越长,
其他竞争线程就会一直被阻塞,导致吞吐量较低。而且还可能导致死锁

而 ConcurrentHashMap 并没有直接加锁,而是采用 CAS 实现无锁的并发同步策略,最精华
的部分是它可以利用多线程来进行协同扩容
简单来说,它把 Node 数组当作多个线程之间共享的任务队列,然后通过维护一个指针来划
分每个线程锁负责的区间,每个线程通过区间逆向遍历来实现扩容,一个已经迁移完的
bucket 会被替换为一个 ForwardingNode 节点,标记当前 bucket 已经被其他线程迁移完了。

transfer的源码分析

  private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {int n = tab.length, stride;/*将 (n>>>3 相当于 n/8) 然后除以 CPU 核心数。如果得到的结果小于 16,那么就使用 16这里的目的是让每个 CPU 处理的桶一样多,避免出现转移任务不均匀的现象,如果桶较少的话,默认一个 CPU(一个线程)处理 16 个桶,也就是长度为 16 的时候,扩容的时候只会有一个线程来扩容*/if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) <MIN_TRANSFER_STRIDE)stride = MIN_TRANSFER_STRIDE;  //nextTab 未初始化, nextTab 是用来扩容的 node 数组if (nextTab == null) { // initiatingtry {@SuppressWarnings("unchecked")//新建一个 n<<1 原始 table 大小的 nextTab,也就是 32Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];nextTab = nt;//赋值给 nextTab} catch (Throwable ex) { // try to cope with OOMEsizeCtl = Integer.MAX_VALUE; //扩容失败, sizeCtl 使用 int 的最大值return;}nextTable = nextTab; //更新成员变量transferIndex = n;//更新转移下标, 表示转移时的下标}int nextn = nextTab.length;//新的 tab 的长度
/* 创建一个 fwd 节点, 表示一个正在被迁移的 Node,并且它的 hash 值为-1(MOVED),也就是前面我们在讲 putval 方法的时候,会有一个判断 MOVED 的逻辑。它的作用是用来占位,表示原数组中位置 i 处的节点完成迁移以后,就会在 i 位置设置一个 fwd 来告诉其他线程这个位置已经处理过了,具体后续还会在讲*/ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);/* 首次推进为 true,如果等于 true,说明需要再次推进一个下标(i--),反之,如果是false,那么就不能推进下标,需要将当前的下标处理完毕才能继续推进boolean advance = true;判断是否已经扩容完成,完成就 return,退出循环*/boolean finishing = false; // to ensure sweep before committing nextTab/*通过 for 自循环处理每个槽位中的链表元素,默认 advace 为真,通过 CAS 设置transferIndex 属性值,并初始化 i 和 bound 值, i 指当前处理的槽位序号, bound 指需要处理的槽位边界,先处理槽位 15 的节点;*/for (int i = 0, bound = 0;;) {// 这个循环使用 CAS 不断尝试为当前线程分配任务// 直到分配成功或任务队列已经被全部分配完毕// 如果当前线程已经被分配过 bucket 区域// 那么会通过--i 指向下一个待处理 bucket 然后退出该循环Node<K,V> f; int fh;while (advance) {int nextIndex, nextBound;//--i 表示下一个待处理的 bucket,如果它>=bound,表示当前线程已经分配过bucket 区域if (--i >= bound || finishing)advance = false;else if ((nextIndex = transferIndex) <= 0) {//表示所有 bucket 已经被分配完毕i = -1;advance = false;}/*通过 cas 来修改 TRANSFERINDEX,为当前线程分配任务,处理的节点区间为(nextBound,nextIndex)->(0,15)*/
else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ?nextIndex - stride : 0))) {bound = nextBound;//0i = nextIndex - 1;//15advance = false;咕泡学院-做技术人的指路明灯, 职场生涯的精神导师}}
// i<0 说明已经遍历完旧的数组,也就是当前线程已经处理完所有负责的 bucketif (i < 0 || i >= n || i + n >= nextn) {int sc;if (finishing) {//如果完成了扩容nextTable = null;//删除成员变量table = nextTab;//更新 table 数组sizeCtl = (n << 1) - (n >>> 1);//更新阈值(32*0.75=24)return;}
// sizeCtl 在迁移前会设置为 (rs << RESIZE_STAMP_SHIFT) + 2
// 然后, 每增加一个线程参与迁移就会将 sizeCtl 加 1,
// 这里使用 CAS 操作对 sizeCtl 的低 16 位进行减 1,代表做完了属于自己的任务if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {/*  第一个扩容的线程,执行 transfer 方法之前,会设置 sizeCtl =(resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2)后续帮其扩容的线程,执行 transfer 方法之前,会设置 sizeCtl = sizeCtl+1每一个退出 transfer 的方法的线程,退出之前,会设置 sizeCtl = sizeCtl-1那么最后一个线程退出时:必然有sc == (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2),即 (sc - 2)== resizeStamp(n) << RESIZE_STAMP_SHIFT// 如果 sc - 2 不等于标识符左移 16 位。如果他们相等了,说明没有线程在帮助他们扩容了。也就是说,扩容结束了。*/if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)return;// 如果相等,扩容结束了,更新 finising 变量finishing = advance = true;// 再次循环检查一下整张表i = n; // recheck before commit咕泡学院-做技术人的指路明灯, 职场生涯的精神导师}}
// 如果位置 i 处是空的,没有任何节点,那么放入刚刚初始化的 ForwardingNode ”空节点“else if ((f = tabAt(tab, i)) == null)advance = casTabAt(tab, i, null, fwd);
//表示该位置已经完成了迁移,也就是如果线程 A 已经处理过这个节点,
// 那么线程 B 处理这个节点时, hash 值一定为 MOVED
else if ((fh = f.hash) == MOVED)advance = true; // already processed}}
数据迁移:

扩容之后的数据迁移是借助高低位来实现的,有两个问题我们需要注意:
1.高低位如何划分
通过 fn&n 可以把这个链表中的元素分为两类, A 类是 hash 值的第 X 位为 0, B 类是 hash 值的第 x 位为不等于 0(至于为什么要这么区分,稍后分析),并且通过 lastRun 记录最后要处理的节点。最终要达到的目的是, A 类的链表保持位置不动, B 类的链表为 14+16(扩容增加的长度)=30
图解一下过程:
迁移前:

扩容迁移后:

扩容之后关于红黑树节点的调整今天暂不做分析了;

get

put如若看得理解了,get就非常容易了;

代码的逻辑请看注释,首先先看当前的hash桶数组节点即table[i]是否为查找的节点,若是则直接返回;若不是,则继续再看当前是不是树节点?通过看节点的hash值是否为小于0,如果小于0则为树节点。如果是树节点在红黑树中查找节点;如果不是树节点,那就只剩下为链表的形式的一种可能性了,就向后遍历查找节点,若查找到则返回节点的value即可,若没有找到就返回null。

看一下get的源码:

public V get(Object key) {Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;// 1. 重hashint h = spread(key.hashCode());if ((tab = table) != null && (n = tab.length) > 0 &&(e = tabAt(tab, (n - 1) & h)) != null) {// 2. table[i]桶节点的key与查找的key相同,则直接返回if ((eh = e.hash) == h) {if ((ek = e.key) == key || (ek != null && key.equals(ek)))return e.val;}// 3. 当前节点hash小于0说明为树节点,在红黑树中查找即可else if (eh < 0)return (p = e.find(h, key)) != null ? p.val : null;while ((e = e.next) != null) {//4. 从链表中查找,查找到则返回该节点的value,否则就返回null即可if (e.hash == h &&((ek = e.key) == key || (ek != null && key.equals(ek))))return e.val;}}return null;
}

一篇博客写了一天半,分析源码真心好累啊,烧脑…… 烧脑…… 烧脑…… 烧脑…… 智商不够真捉急……


一篇博客写了一天半,分析源码真心好累啊,烧脑…… 烧脑…… 烧脑…… 烧脑…… 智商不够真捉急……


一篇博客写了一天半,分析源码真心好累啊,烧脑…… 烧脑…… 烧脑…… 烧脑…… 智商不够真捉急……


一篇博客写了一天半,分析源码真心好累啊,烧脑…… 烧脑…… 烧脑…… 烧脑…… 智商不够真捉急……


一篇博客写了一天半,分析源码真心好累啊,烧脑…… 烧脑…… 烧脑…… 烧脑…… 智商不够真捉急……

【Java并发】-- ConcurrentHashMap如何实现高效地线程安全(jdk1.8)相关推荐

  1. Java并发(二十一):线程池实现原理

    一.总览 线程池类ThreadPoolExecutor的相关类需要先了解: (图片来自:https://javadoop.com/post/java-thread-pool#%E6%80%BB%E8% ...

  2. 【Java并发编程】之二:线程中断

    [Java并发编程]之二:线程中断 使用interrupt()中断线程 ​ 当一个线程运行时,另一个线程可以调用对应的Thread对象的interrupt()方法来中断它,该方法只是在目标线程中设置一 ...

  3. Java 并发---ConcurrentHashMap

    concurrent包下的并发容器 JDK5中添加了新的concurrent包,相对同步容器而言,并发容器通过一些机制改进了并发性能.因为同步容器将所有对容器状态的访问都串行化了,这样保证了线程的安全 ...

  4. 如何保证集合是线程安全的? ConcurrentHashMap如何实现高效地线程安全?(转)

    了解 Java 语言提供了并发包(java.util.concurrent),为高度并发需求提供了更加全面的工具支持. Java 提供了不同层面的线程安全支持.在传统集合框架内部,除了 Hashtab ...

  5. java并发编程基础—生命周期与线程控制

    一.线程生命周期 线程被创建启动以后,他既不是一启动就进入执行状态,也不是一直处于执行状态,在线程的生命周期中,它要经过新建(New).就绪(Runnable).运行(Running).阻塞(Bloc ...

  6. 【Java 并发编程】【05】线程安全问题与线程同步

    5. 线程安全问题与线程同步 多线程编程是有趣且复杂的事情,它常常容易突然出现"错误情况",这是由于系统的线程调度具有一定的随机性.即使程序在运行过程中偶尔会出现问题,那也是由于我 ...

  7. 【死磕Java并发】-----J.U.C之线程池:线程池的基础架构

    原文出处:https://www.cmsblogs.com/category/1391296887813967872 『chenssy』 经历了Java内存模型.JUC基础之AQS.CAS.Lock. ...

  8. 【Java并发编程】面试必备之线程池

    什么是线程池 是一种基于池化思想管理线程的工具.池化技术:池化技术简单点来说,就是提前保存大量的资源,以备不时之需.比如我们的对象池,数据库连接池等. 线程池好处 我们为什么要使用线程池,直接new ...

  9. Java并发编程(十)设计线程安全的类

    待续... 线程安全的类 之前学了很多线程安全的知识,现在导致了我每次用一个类或者做一个操作我就会去想是不是线程安全的.如果每次都这样的考虑的话就很蛋疼了,这里的思路是,将现有的线程安全组件组合为更大 ...

最新文章

  1. ACM第一天研究懂的AC代码——BFS问题解答——习题zoj2165
  2. jquery getJSON 中对超时Timeout的处理
  3. 【BZOJ 2323】 2323: [ZJOI2011]细胞 (DP+矩阵乘法+快速幂*)
  4. 二进制信号在信噪比为127:1的4kHz信道上传输,最大数据传输速率可以达到( )
  5. 基于GCCAVR的TLC2543读写程序----模拟SPI方式实现
  6. 取某个单元格的值_vba中如何进行单元格复制,Copy方法使用介绍,一定要学
  7. Laravel 的安装使用
  8. ios 点击出现另外一套tabbar_iOS 点击UITabBar触发刷新
  9. java简单计算器实现
  10. 51单片机基本工作引脚
  11. 概率分布分位点_概率统计计量经济学_假设检验中的重要概念_分位点/p值
  12. 奇迹网站系统IGC奇迹mu S18网站可视化装备模板
  13. 人工智能时代特征初步显现,主要体现在哪几个方面?
  14. 第四章 证券投资基金的监管
  15. Python入门(2)
  16. HI3559A系统卡死问题-修复
  17. Oracle安装时物理内存检查失败的解决方案:
  18. 因向欺诈者出售数据,Epsilon向美国司法部支付1.5亿美元罚款
  19. FRAM铁电存储器FM25W256编程实现存取数据
  20. 2019年旅韩华侨华人新春招待会举行

热门文章

  1. 如何在房屋的未知位置找到设置的路由器?
  2. 让人脑壳疼的STP是如何做到防止环路?-理论
  3. 2019校招硬件岗笔试题(乐鑫科技+比特大陆)
  4. [java] 分布式id生成方案
  5. 信息安全技术——(一)绪论
  6. 2014多校联合-第七场
  7. 出租车计价 (15分)
  8. C++ STL源码剖析 笔记
  9. 电信计算机知识考试,2019中国电信计算机专业知识模拟试卷 --- 答案解析(九)...
  10. 高并发、高可用、高可靠微服务架构7大顶级设计思维模型