ConcurrentHashMap源码分析

ConcurrentHashMap源码分析(2)——JDK1.8的实现

前言

ConcurrentHashMap是线程安全且高效的HashMap的实现,在并发编程的情景下使用的非常广泛。它是怎么来的呢?

先抛个HashMap!

通过对HashMap的分析,我们知道当插入的元素超过临界值就会触发HashMap的扩容机制(也就是rehash,重新计算元素位置,将其都扔进新容器中)。在多线程的场景下,可能会造成闭环链表,进而导致get操作会出现死循环。因此,HashMap可以说是线程不安全的。

换别的?HashTable?

HashTable和HashMap的原理差不多,唯一的区别在于:1.HashTable不允许K和V为null;2.HashTable是线程安全的。但是,它在多线程下所有的get/put的操作都加上了synchronized关键字来锁住整个table。也就是说,我给整个哈希表加上了一把大锁。所有的线程都在竞争一把锁,只要有一个线程访问到或者操作一个对象,那其他线程就只能阻塞。将操作串行化,它是安全的,但是简单粗暴的手段带来的弊端就是HashTable的效率低下。

没救了么?

其实吧,也并不是没药了。HashTable也不是没有优化的空间了,它性能差主要就是因为所有的操作都得去竞争一把锁。这一把锁锁住整个table简直不要太粗暴,完全可以温柔一点。比方说,以前我只用一把锁,现在我多加几把锁,每一把锁负责锁一段数据。这样一来,在多线程的情景下,我对不同的数据集的操作就不用竞争一把大锁了。毕竟他们拥有不同的hash值,不会因为rehash造成线程不安全,彼此之间保持距离、互不影响,从而使得并发效率也可以得到极大的提升!上述这一过程,体现的就是ConcurrentHashMap的“分段锁”思想(也叫锁分离),引入多把锁分别对应控制多个小table,借此来降低锁的粒度(当然这只是jdk1.7版本的核心思想,jdk1.8在降低锁粒度方面更加的“过分”)。

一言不合就上图!

虽然是盗来的图,但是这张图确实画的好。放眼望去,N把锁,每个锁由一个Segment数组和多个HashEntry组成,主干还是Segment数组。

 final Segment<K,V>[] segments;
static final class HashEntry<K,V> {final int hash;final K key;volatile V value;volatile HashEntry<K,V> next;//额外说一下,这个volatile作用很大!!!......
}

通过分段锁的思想,已经将整个table切分加锁。和HashMap的数据存储结构类似,ConcurrentHashMap的每一个Segment元素存的都是HashEntry数组+链表,而HashEntry是它最小的逻辑处理单元了。
通过上面这张图,我们可以清晰得知,在ConcurrentHashMap中,一个Segment可以理解为就是一个子哈希表,Segment里维护了一个HashEntry数组。并且Segment继承了ReentrantLock,是一种可重入锁(ReentrantLock)。所以,并发环境下,对于同一个Segment的操作才需考虑线程同步,不同的Segment则无需考虑(默认16个线程并发执行,why are you so diao?)。

看下Segment身上HashMap的影子

都说Segment可以理解成是自哈希表了,那么它跟HashMap肯定就有一些神似之处。

Segment(float lf, int threshold, HashEntry<K,V>[] tab) {this.loadFactor = lf;//加载因子this.threshold = threshold;//阈值this.table = tab;//主干数组即HashEntry数组}

好好观察下Segment的构造方法,loadFactor、threshold是不是看着很熟悉?

ConcurrentHashMap构造方法

public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)throw new IllegalArgumentException();//MAX_SEGMENTS 为1<<16=65536,也就是最大并发数为65536if (concurrencyLevel > MAX_SEGMENTS)concurrencyLevel = MAX_SEGMENTS;//2的sshif次方等于ssize,例:ssize=16,sshift=4;ssize=32,sshif=5int sshift = 0;//ssize 为segments数组长度,根据concurrentLevel计算得出int ssize = 1;while (ssize < concurrencyLevel) {++sshift;ssize <<= 1;}//segmentShift和segmentMask这两个变量在定位segment时会用到,后面会详细讲this.segmentShift = 32 - sshift;this.segmentMask = ssize - 1;if (initialCapacity > MAXIMUM_CAPACITY)initialCapacity = MAXIMUM_CAPACITY;//计算cap的大小,即Segment中HashEntry的数组长度,cap也一定为2的n次方.int c = initialCapacity / ssize;if (c * ssize < initialCapacity)++c;int cap = MIN_SEGMENT_TABLE_CAPACITY;while (cap < c)cap <<= 1;//创建segments数组并初始化第一个Segment,其余的Segment延迟初始化Segment<K,V> s0 =new Segment<K,V>(loadFactor, (int)(cap * loadFactor),(HashEntry<K,V>[])new HashEntry[cap]);Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];UNSAFE.putOrderedObject(ss, SBASE, s0); this.segments = ss;}

构造方法有3个参数,分别为initialCapacity(数组大小)、loadFactor(加载因子,扩容相关)和concurrencyLevel(并发度,默认16个线程并发执行)。其中,segments数组长度ssize和concurrentLevel有关,ssize总是>=concurrentLevel的最小2次幂。
如果觉得这么描述不好理解,那么举个栗子就清楚了。比如concurrentLevel=17,2的几次幂刚好>=17呢?是32对吧!所以ssize=32。
更重要的是,Segment的数组大小之所以一定是2的次幂,就是为了方便通过按位与的散列算法来定位Segment的index位置。

ConcurrentHashMap常规操作

put方法

put操作的步骤:
1.对value判空
2.hash(key)对key的hashcode重新散列
3.对Segment定位并确保已初始化
4.Segment为空,调用ensureSegment()方法;否则,直接调用查询到的Segment的put方法插入值,

public V put(K key, V value) {Segment<K,V> s;//concurrentHashMap不允许key/value为空if (value == null)throw new NullPointerException();//hash函数对key的hashCode重新散列,避免差劲的不合理的hashcode,保证散列均匀int hash = hash(key);//返回的hash值无符号右移segmentShift位与段掩码进行位运算,定位segmentint j = (hash >>> segmentShift) & segmentMask;if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck(segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment//如果获取到的Segment为空,就调用ensureSegment(j)s = ensureSegment(j);return s.put(key, hash, value, false);}

关于对Segment定位那块,hash值无符号右移segmentShift位与段掩码进行位运算,最终得到Segment位置。其中,SegmentMask(段掩码)是为了确保散列的均匀性(计算方式为segments数组长度-1);segmentShift计算方式为segmentShift=32-sshift,其中2的sshift次方等于ssize。比如segments数组长度16,2的n次方=16,segmentShift=32-n。
另外,值得注意的是,put方法首先也会通过hash算法定位到对应的Segment,此时,如果获取到的Segment为空,则调用ensureSegment()方法;否则,直接调用查询到的Segment的put方法插入值,注意此处并没有用getObjectVolatile()方法读,而是在ensureSegment()中再用volatile读操作,这样可以在查询segments不为空的时候避免使用volatile读,提高效率。在ensureSegment()方法中,首先使用getObjectVolatile()读取对应Segment,如果还是为空,则以segments[0]为原型创建一个Segment对象,并将这个对象设置为对应的Segment值并返回。

Segment的put方法

final V put(K key, int hash, V value, boolean onlyIfAbsent) {HashEntry<K,V> node = tryLock() ? null :scanAndLockForPut(key, hash, value);//tryLock不成功时会遍历定位到的HashEnry位置的链表(遍历主要是为了使CPU缓存链表),若找不到,则创建HashEntry。tryLock一定次数后(MAX_SCAN_RETRIES变量决定),则lock。若遍历过程中,由于其他线程的操作导致链表头结点变化,则需要重新遍历。V oldValue;try {HashEntry<K,V>[] tab = table;int index = (tab.length - 1) & hash;//定位HashEntry,可以看到,这个hash值在定位Segment时和在Segment中定位HashEntry都会用到,只不过定位Segment时只用到高几位。HashEntry<K,V> first = entryAt(tab, index);for (HashEntry<K,V> e = first;;) {if (e != null) {K k;if ((k = e.key) == key ||(e.hash == hash && key.equals(k))) {oldValue = e.value;if (!onlyIfAbsent) {e.value = value;++modCount;}break;}e = e.next;}else {if (node != null)node.setNext(first);elsenode = new HashEntry<K,V>(hash, key, value, first);int c = count + 1;//若c超出阈值threshold,需要扩容并rehash。扩容后的容量是当前容量的2倍。这样可以最大程度避免之前散列好的entry重新散列。扩容并rehash的这个过程是比较消耗资源的。if (c > threshold && tab.length < MAXIMUM_CAPACITY)rehash(node);elsesetEntryAt(tab, index, node);++modCount;count = c;oldValue = null;break;}}} finally {unlock();}return oldValue;}

在Segment的put方法中,首先需要调用tryLock()方法获取锁,然后通过hash算法定位到对应的HashEntry,然后遍历整个链表,如果查到key值,则直接插入元素即可;而如果没有查询到对应的key,则需要调用rehash()方法对Segment中保存的table进行扩容,扩容为原来的2倍,并在扩容之后插入对应的元素。插入一个key/value对后,需要将统计Segment中元素个数的count属性加1。最后,插入成功之后,需要使用unLock()释放锁。

get方法

get操作的步骤:
1.hash(key)对key的hashcode重新散列(既然已经存进来了,k、v肯定不为null,所以不用再判断了)
2.定位Segment
3.定位HashEntry
4.通过getObjectVolatile()方法获取指定偏移量上的HashEntry
5.通过循环遍历链表获取对应值

public V get(Object key) {Segment<K,V> s; HashEntry<K,V>[] tab;//hash(key)对key的hashcode重新散列int h = hash(key);//先定位Segmentlong u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;//再定位HashEntry-->(((tab.length - 1) & h)) << TSHIFT) + TBASE;-->getObjectVolatile()方法获取指定偏移量上的HashEntry-->循环遍历链表获取对应值if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&(tab = s.table) != null) {for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);e != null; e = e.next) {K k;if ((k = e.key) == key || (e.hash == h && key.equals(k)))return e.value;}}return null;}

在jdk1.7中ConcurrentHashMap的get操作并没有加锁,因为在每个Segment中定义的HashEntry数组和在每个HashEntry中定义的value和next HashEntry节点都是volatile类型的,volatile类型的变量可以保证其在多线程之间的可见性,因此可以被多个线程同时读。

size

try {for (;;) {if (retries++ == RETRIES_BEFORE_LOCK) {for (int j = 0; j < segments.length; ++j) ensureSegment(j).lock(); // force creation}sum = 0L;size = 0;overflow = false;for (int j = 0; j < segments.length; ++j) {Segment<K,V> seg = segmentAt(segments, j);if (seg != null) { sum += seg.modCount; int c = seg.count; if (c < 0 || (size += c) < 0)overflow = true;} }if (sum == last) break;last = sum; } }
finally {if (retries > RETRIES_BEFORE_LOCK) {for (int j = 0; j < segments.length; ++j)segmentAt(segments, j).unlock();}
}

针对并发插入数据时计算ConcurrentHashMap的大小,有两种方式:
方式一:不加锁而多次计算size,若前后两次结果一致则表示准确(最多3次)
方式二:如果两次结果不同,就对所有的Segment加锁来计算size

ConcurrentHashMap源码分析(1)——JDK1.7的实现相关推荐

  1. ConcurrentHashMap源码分析(2)——JDK1.8的实现

    ConcurrentHashMap源码分析(1)--JDK1.7的实现 前言 在JDK1.7版本上,ConcurrentHashMap还是通过分段锁来实现的,Segment的数量制约着并发量.在JDK ...

  2. 【阅读源码系列】ConcurrentHashMap源码分析(JDK1.7和1.8)

    个人学习源码的思路: 使用ctrl+单机进入源码,并阅读源码的官方文档–>大致的了解一下此类的特点和功能 使用ALIT+7查看类中所有方法–>大致的看一下此类的属性和方法 找到重要方法并阅 ...

  3. ConcurrentHashMap源码分析,轻取面试Offer(一)

    ConcurrentHashMap 这里主要分析的 jdk1.8中的ConcurrentHashMap,他是java之父Doug Lea之作,很多优秀的开源框架如tomcat.spring.中都大量用 ...

  4. ConcurrentHashMap源码分析,轻取面试Offer(二)

    上篇ConcurrentHashMap源码分析,轻取面试Offer(一)中降到了看源码的方法,下面接上篇继续分析源码 先来上篇注释过的代码段和遗留的问题. final V putVal(K key, ...

  5. ConcurrentHashMap源码解析——基于JDK1.8

    ConcurrentHashMap源码解析--基于JDK1.8 前言 这篇博客不知道写了多久,总之就是很久,头都炸了.最开始阅读源码时确实是一脸茫然,找不到下手的地方,真是太难了.下面的都是我自己阅读 ...

  6. JDK1.8 中 ConcurrentHashMap源码分析(一)容器初始化

    上一篇文章中说到如何使用IDEA搭建JDK1.8阅读学习环境,JDK1.8源码下载及获取.导入IDEA阅读.配置JDK源码.这篇文章将学习ConcurrentHashMap源码

  7. Java源码详解六:ConcurrentHashMap源码分析--openjdk java 11源码

    文章目录 注释 类的继承与实现 数据的存储 构造函数 哈希 put get 扩容 本系列是Java详解,专栏地址:Java源码分析 ConcurrentHashMap 官方文档:ConcurrentH ...

  8. [JUC-5]ConcurrentHashMap源码分析JDK8

    在学习之前,最好先了解下如下知识: 1.ReentrantLock的实现和原理. 2.Synchronized的实现和原理. 3.硬件对并发支持的CAS操作及JVM中Unsafe对CAS的实现. 4. ...

  9. 多线程高并发编程(10) -- ConcurrentHashMap源码分析

    一.背景 前文讲了HashMap的源码分析,从中可以看到下面的问题: HashMap的put/remove方法不是线程安全的,如果在多线程并发环境下,使用synchronized进行加锁,会导致效率低 ...

最新文章

  1. 设置WebStorm像VSCode一样每行代码结尾自动格式化加入“;”分号(JavaScript、TypeScript格式化)
  2. 技术大牛养成指南,一篇不鸡汤的成功学实践
  3. java12章_【有书共读】java核心技术卷1--第12章
  4. 计算缺失的元素 java_计算包含缺失值的相关系数
  5. ABAP-在SMARTFORMS中取消使用WORD作为编辑器
  6. java mock void_如何使用Mockito模拟void方法 - How to mock void methods with Mockito
  7. while循环 for循环的理解
  8. 梅花传播业大展:Focussend将精准营销融入个性化邮件
  9. (转)spring boot整合redis
  10. java面试题汇总(1)
  11. Hadoop Snappy安装终极教程
  12. net 去掉第一位和最后一位_2020最后三个月港剧有咩睇?熟女强人首播!
  13. [转] 2018年最新桌面CPU性能排行天梯图(含至强处理器)
  14. 支持HEVC格式的浏览器推荐(windows 10)
  15. Altium Designer生成Gerber文件
  16. 淘宝618超级喵运会怎么玩 如何获取喵币?
  17. 教你用快捷键 以管理员身份运行cmd
  18. 火热报名|Apache Pulsar x KubeSphere 在线 Meetup 来袭
  19. php生成数字订单号,php生成订单号函数
  20. PHP根据生日计算年龄(周岁)

热门文章

  1. java手机游戏模拟器下载_Java手机游戏模拟器
  2. python协程池操作mysql_在python中使用aiomysql异步操作mysql
  3. 单链表的插入和删除_从0开始的编程之梦——数据结构之单链表的基本运算
  4. 超暖心!美国消防员钻冰窟窿救狗狗
  5. +7白盒测试与黑盒测试的定义与区别
  6. conda 基本操作
  7. 在chrome Sources 页 显示 Console(drawer) 页
  8. bzoj[1835][ZJOI2010]base 基地选址
  9. 分支语句 (if) 练习 Java代码
  10. CentOS Wifi Connection