ConcurrentHashMap在jdk7的使用的是分段锁(ReentrantLock),而jdk8则改为使用synchronized。同时jdk8的ConcurrentHashMap和HashMap一样的引入红黑树(解决hash冲撞时的操作效率),并且在扩容过程中像ForkJoinPool一样可以自动多线程协作(提高扩容效率,并且解决HashMap的扩容时并发问题……PS:请慎用jdk8的ParallelStream,因为它底层默认调用的是公共的ForkJoinPool)。整个代码的逻辑和HashMap十分相似,可以对照着看方便理解。

下文和HashMap一样,针对ConcurrentHashMap的初始化、put和扩容源码进行分析。

初始化

    /*** Creates a new, empty map with an initial table size* accommodating the specified number of elements without the need* to dynamically resize.** @param initialCapacity The implementation performs internal* sizing to accommodate this many elements.* @throws IllegalArgumentException if the initial capacity of* elements is negative*/public ConcurrentHashMap(int initialCapacity) {if (initialCapacity < 0)throw new IllegalArgumentException();int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?MAXIMUM_CAPACITY :tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));this.sizeCtl = cap;}/*** Returns a power of two table size for the given desired capacity.* See Hackers Delight, sec 3.2*/private static final int tableSizeFor(int c) {int n = c - 1;n |= n >>> 1;n |= n >>> 2;n |= n >>> 4;n |= n >>> 8;n |= n >>> 16;return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;}

其中的tableSizeFor(int c)和HashMap一致,并且都是将真正存放数据的内部Node数组延迟初始化。特殊的是最后一行this.sizeCtl = cap;

sizeCtl使用了Unsafe进行操纵,一样使用Unsafe的还有以下这些变量。通过这些变量和相关代码,ConcurrentHashMap实现高并发时线程安全的效果。

    /*** Base counter value, used mainly when there is no contention,* but also as a fallback during table initialization* races. Updated via CAS.*/private transient volatile long baseCount;/*** Table initialization and resizing control.  When negative, the* table is being initialized or resized: -1 for initialization,* else -(1 + the number of active resizing threads).  Otherwise,* when table is null, holds the initial table size to use upon* creation, or 0 for default. After initialization, holds the* next element count value upon which to resize the table.*/private transient volatile int sizeCtl;/*** The next table index (plus one) to split while resizing.*/private transient volatile int transferIndex;/*** Spinlock (locked via CAS) used when resizing and/or creating CounterCells.*/private transient volatile int cellsBusy;// Unsafe mechanicsprivate static final sun.misc.Unsafe U;private static final long SIZECTL;private static final long TRANSFERINDEX;private static final long BASECOUNT;private static final long CELLSBUSY;private static final long CELLVALUE;private static final long ABASE;private static final int ASHIFT;static {try {U = sun.misc.Unsafe.getUnsafe();Class<?> k = ConcurrentHashMap.class;SIZECTL = U.objectFieldOffset(k.getDeclaredField("sizeCtl"));TRANSFERINDEX = U.objectFieldOffset(k.getDeclaredField("transferIndex"));BASECOUNT = U.objectFieldOffset(k.getDeclaredField("baseCount"));CELLSBUSY = U.objectFieldOffset(k.getDeclaredField("cellsBusy"));Class<?> ck = CounterCell.class;CELLVALUE = U.objectFieldOffset(ck.getDeclaredField("value"));Class<?> ak = Node[].class;ABASE = U.arrayBaseOffset(ak);int scale = U.arrayIndexScale(ak);if ((scale & (scale - 1)) != 0)throw new Error("data type scale not a power of two");ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);} catch (Exception e) {throw new Error(e);}}

put

    static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hashstatic final int spread(int h) {// 高低位异或,并且强制第一位为0return (h ^ (h >>> 16)) & HASH_BITS;}public V put(K key, V value) {return putVal(key, value, false);}/** Implementation for put and putIfAbsent */final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null) throw new NullPointerException();// 使用高低位异或得到内部使用的hash值int hash = spread(key.hashCode());int binCount = 0;for (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh;if (tab == null || (n = tab.length) == 0)tab = initTable(); // 首次使用或者延迟实例化else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {// Node数组中hash对应位置f没值,使用cas新建节点if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))break;                   // no lock when adding to empty bin}else if ((fh = f.hash) == MOVED)tab = helpTransfer(tab, f); // Node数组中hash标记为迁移中,协助迁移else {// hash对应位置已有节点fV oldVal = null;// 单独对节点f加同步锁synchronized (f) {// 再检查一次if (tabAt(tab, i) == f) {if (fh >= 0) {// 链表,最后binCount为链表长度binCount = 1;for (Node<K,V> e = f;; ++binCount) {K ek;// hash值相同且key相同,准备赋值valueif (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent)e.val = value;break;}// 到达链表末尾,添加新nodeNode<K,V> pred = e;if ((e = e.next) == null) {pred.next = new Node<K,V>(hash, key,value, null);break;}}}else if (f instanceof TreeBin) {// 判断节点是红黑树,插入红黑树Node<K,V> p;binCount = 2;if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}}}if (binCount != 0) {// 链表长度达到红黑树阈值,转换为红黑树if (binCount >= TREEIFY_THRESHOLD)treeifyBin(tab, i);if (oldVal != null)return oldVal;break;}}}// 计数器+1,并且检查是否需要扩容addCount(1L, binCount);return null;}/*** Replaces all linked nodes in bin at given index unless table is* too small, in which case resizes instead.*/private final void treeifyBin(Node<K,V>[] tab, int index) {Node<K,V> b; int n, sc;if (tab != null) {// node数组长度小于64时,和HashMap一样进行扩容代替转换红黑树操作if ((n = tab.length) < MIN_TREEIFY_CAPACITY)tryPresize(n << 1); else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {// 节点b存在且状态不是在扩容中,加同步锁synchronized (b) {if (tabAt(tab, index) == b) {TreeNode<K,V> hd = null, tl = null;// 将链表的每个元素按序转换为红黑树的一个节点for (Node<K,V> e = b; e != null; e = e.next) {TreeNode<K,V> p =new TreeNode<K,V>(e.hash, e.key, e.val,null, null);if ((p.prev = tl) == null)hd = p;elsetl.next = p;tl = p;}// 将树直接设置到node数组中setTabAt(tab, index, new TreeBin<K,V>(hd));}}}}}

扩容

有三个方法触发扩容,分别是addCount、helpTransfer和tryPresize,对应外层的各种添加元素操作,下面主要分析transfer这个扩容的核心方法。

transfer

    /** Number of CPUS, to place bounds on some sizings */static final int NCPU = Runtime.getRuntime().availableProcessors();/*** Minimum number of rebinnings per transfer step. Ranges are* subdivided to allow multiple resizer threads.  This value* serves as a lower bound to avoid resizers encountering* excessive memory contention.  The value should be at least* DEFAULT_CAPACITY.*/private static final int MIN_TRANSFER_STRIDE = 16;/*** Moves and/or copies the nodes in each bin to new table. See* above for explanation.*/private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {int n = tab.length, stride;// 通过CPU核数进行扩容任务分割,最少分割为16个子任务if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)stride = MIN_TRANSFER_STRIDE; // subdivide rangeif (nextTab == null) {            // initiating// 初始化扩容用的新node数组,扩容1倍try {@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];nextTab = nt;} catch (Throwable ex) {      // try to cope with OOMEsizeCtl = Integer.MAX_VALUE;return;}nextTable = nextTab;transferIndex = n;}int nextn = nextTab.length;ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);boolean advance = true;boolean finishing = false; // to ensure sweep before committing nextTabfor (int i = 0, bound = 0;;) {Node<K,V> f; int fh;while (advance) {int nextIndex, nextBound;if (--i >= bound || finishing)advance = false;else if ((nextIndex = transferIndex) <= 0) {i = -1;advance = false;}else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ?nextIndex - stride : 0))) {// 从最后一个节点开始进行倒序迁移,子任务完成后,剩余的节点都由本线程独立进行迁移bound = nextBound;i = nextIndex - 1;advance = false;}}if (i < 0 || i >= n || i + n >= nextn) {int sc;if (finishing) {// 扩容完成,替换正式node数组nextTable = null;table = nextTab;// sizeCtl设置为当前数组大小的0.75(1-1/4=0.75)sizeCtl = (n << 1) - (n >>> 1);return;}if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)return;finishing = advance = true;i = n; // recheck before commit}}else if ((f = tabAt(tab, i)) == null)// 原f节点为null,直接标记节点已迁移advance = casTabAt(tab, i, null, fwd);else if ((fh = f.hash) == MOVED)// f节点已迁移,执行下一个节点的迁移advance = true; // already processedelse {// 锁定f节点,进行迁移synchronized (f) {// 再校验if (tabAt(tab, i) == f) {Node<K,V> ln, hn;if (fh >= 0) {// 链节点int runBit = fh & n;Node<K,V> lastRun = f;for (Node<K,V> p = f.next; p != null; p = p.next) {int b = p.hash & n;if (b != runBit) {runBit = b;lastRun = p;}}// 同HashMap扩容的链表处理,将数组中f节点的元素按原顺序分散到新Node数组上if (runBit == 0) {ln = lastRun;hn = null;}else {hn = lastRun;ln = null;}for (Node<K,V> p = f; p != lastRun; p = p.next) {int ph = p.hash; K pk = p.key; V pv = p.val;if ((ph & n) == 0)ln = new Node<K,V>(ph, pk, pv, ln); // 掩码位为0,保留在低位链表elsehn = new Node<K,V>(ph, pk, pv, hn); // 掩码位为1,添加到高位链表}// 将高低位链表分别放到新node数组中setTabAt(nextTab, i, ln);setTabAt(nextTab, i + n, hn);// 原node数组f节点标记为已迁移setTabAt(tab, i, fwd);advance = true;}else if (f instanceof TreeBin) {// 树节点TreeBin<K,V> t = (TreeBin<K,V>)f;TreeNode<K,V> lo = null, loTail = null;TreeNode<K,V> hi = null, hiTail = null;int lc = 0, hc = 0;// 类似链节点的处理,将数组中f节点的元素按原顺序分散到新Node数组上for (Node<K,V> e = t.first; e != null; e = e.next) {int h = e.hash;TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null);if ((h & n) == 0) {// 掩码位为0,保留在低位树if ((p.prev = loTail) == null)lo = p;elseloTail.next = p;loTail = p;++lc;}else {// 掩码位为1,添加到高位树if ((p.prev = hiTail) == null)hi = p;elsehiTail.next = p;hiTail = p;++hc;}}// 分散后得到的高低位树长度小于等于6的,转换为链表ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :(hc != 0) ? new TreeBin<K,V>(lo) : t;hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :(lc != 0) ? new TreeBin<K,V>(hi) : t;// 将高低位树分别放到新node数组中setTabAt(nextTab, i, ln);setTabAt(nextTab, i + n, hn);// 原node数组f节点标记为已迁移setTabAt(tab, i, fwd);advance = true;}}}}}}/*** A node inserted at head of bins during transfer operations.*/static final class ForwardingNode<K,V> extends Node<K,V> {final Node<K,V>[] nextTable;ForwardingNode(Node<K,V>[] tab) {// 设置hash为MOVED,用于判断该节点是否已经完成迁移super(MOVED, null, null, null);this.nextTable = tab;}……}

可见和jdk8的HashMap扩容区别不大,使用了同步锁、CAS、倒序迁移、划分子任务等技术实现了多线程安全和高效。

helpTransfer

    /*** Helps transfer if a resize is in progress.*/final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {Node<K,V>[] nextTab; int sc;if (tab != null && (f instanceof ForwardingNode) &&(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {int rs = resizeStamp(tab.length);while (nextTab == nextTable && table == tab &&(sc = sizeCtl) < 0) {if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || transferIndex <= 0)break;if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {// 扩容中但未扩容完成的,协助扩容transfer(tab, nextTab);break;}}return nextTab;}return table;}

jdk8的ConcurrentHashMap实现相关推荐

  1. 不止JDK7的HashMap,JDK8的ConcurrentHashMap也会造成CPU 100%

    点击上方"方志朋",选择"设为星标" 回复"666"获取新整理的面试资料 作者:朱小厮 公众号:朱小厮的博客(ID:hiddenkafka) ...

  2. 不止 JDK7 的 HashMap ,JDK8 的 ConcurrentHashMap 也会造成 CPU 100%?原因与解决~

    现象 大家可能都听过JDK7中的HashMap在多线程环境下可能造成CPU 100%的现象,这个由于在扩容的时候put时产生了死链,由此会在get时造成了CPU 100%.这个问题在JDK8中的Has ...

  3. JDK8之ConcurrentHashMap源码解读

    本文默认读者阅读过JDK8的HashMap源码,不再对源码中的红黑树操作进行分析. 本文主要对put().transfer().addCount()进行分析(replaceNode()源码近似于put ...

  4. JDK8中ConcurrentHashMap源码解析

    在介绍ConcurrentHashMap源码之前,首先需要了解以下几个知识 1.JDK1.8中ConcurrentHashMap的基本结构 2.并发编程的三个概念:可见性,原子性,有序性 3.CAS( ...

  5. 解读Java8中ConcurrentHashMap是如何保证线程安全的

    HashMap是工作中使用频度非常高的一个K-V存储容器.在多线程环境下,使用HashMap是不安全的,可能产生各种非期望的结果. 关于HashMap线程安全问题,可参考笔者的另一篇文章: 深入解读H ...

  6. 【Java自顶向下】试手小公司,面试官问我ConcurrentHashMap,我微微一笑……

    文章目录 ConcurrentHashMap 一.ConcurrentHashMap初始化的剖析 1.1 ConcurrentHashMap初始化 1.2 理解sizeCtl 二.JDK8的添加安全 ...

  7. JDK之ConcurrentHashMap

    2019独角兽企业重金招聘Python工程师标准>>> 注:分析JDK8的ConcurrentHashMap,JDK6/7上的实现和JDK8上的不一样. 1.构造方法中的参数含义 构 ...

  8. 问到ConcurrentHashMap不要再提Segment了

    前面已经介绍了hashMap.但hashMap并没有考虑多线程并发的情况,因此是线程不安全的.我们可以使用HashTable,它使用synchronized来锁住整张Hash表让一个线程独占来实现线程 ...

  9. currenthashmap扩容原理_高并发编程系列:深入探讨ConcurrentHashMap的实现原理(JDK1.7和JDK1.8)...

    HashMap.CurrentHashMap 的实现原理基本都是BAT面试必考内容,阿里P8架构师谈:深入探讨HashMap的底层结构.原理.扩容机制深入谈过hashmap的实现原理以及在JDK 1. ...

最新文章

  1. SQL Server不能启动
  2. ?通配符 以及扩展通配符在范型中的应用。。。。。。。。。。。。。。。。。。...
  3. 编程式事务与声明式事务
  4. 当罗密欧遇到朱丽叶... ...当指针遇到数组
  5. silverlight下多线程处理
  6. SylixOS 内存管理源代码分析--pageTable.c
  7. windows常用端口对应表
  8. C语言课设贪吃蛇说明书,c语言课设贪吃蛇.doc
  9. 假装内卷,才是互联网人的骚操作
  10. vue 使用echarts 进行简单封装统一使用
  11. 淘宝/天猫获得淘宝app商品详情原数据 API
  12. 从永远到永远-Navicat将MySQL数据库复制到另一个Mysql数据库
  13. mysql的UNIX_TIMESTAMP用法
  14. 针对英夫利昔单抗或阿达木单抗的抗体存在与否决定转用依那西普的疗效
  15. java 防火墙_java如何穿越防火墙
  16. 项目管理(PMP)项目风险管理
  17. 某站卖的第八区分发源码/APP分发系统平台源码
  18. 芯昇 CM32M101A 固件库 W25Q128JWSIQ 驱动
  19. 未配置appkey或配置错误,uniapp原生安卓插件开发
  20. 【CS】客户端更新(一)——更新程序文件方式

热门文章

  1. MySQL原理 - 字符集与排序规则
  2. Abbkine 彩色预染蛋白质Marker (10-180 kDa)方案
  3. 生僻字_tte_linux_ttf_提取字体_打印生僻字_uni
  4. MSI和MSI-X对比(五)
  5. 这两省软考电子版证书下载已开通
  6. 幽门杆菌来源_肉毒杆菌毒素和设计移情的艺术
  7. Indexes: RDBMS vs Coherence vs Lucene
  8. 微信小程序利用腾讯云IM发送语音 + 图片
  9. 华为od机试真题 C++ 实现【导师请吃火锅】【2023 B卷】
  10. android点亮屏幕软件,插亮屏幕Lite