文章很长,而且持续更新,建议收藏起来,慢慢读!疯狂创客圈总目录 博客园版 为您奉上珍贵的学习资源 :

免费赠送 :《尼恩Java面试宝典》 持续更新+ 史上最全 + 面试必备 2000页+ 面试必备 + 大厂必备 +涨薪必备
免费赠送 经典图书:《Java高并发核心编程(卷1)加强版》 面试必备 + 大厂必备 +涨薪必备 加尼恩免费领
免费赠送 经典图书:《Java高并发核心编程(卷2)加强版》 面试必备 + 大厂必备 +涨薪必备 加尼恩免费领
免费赠送 经典图书:《Java高并发核心编程(卷3)加强版》 面试必备 + 大厂必备 +涨薪必备 加尼恩免费领
免费赠送 经典图书:《尼恩Java面试宝典 最新版》 面试必备 + 大厂必备 +涨薪必备 加尼恩免费领
免费赠送 资源宝库: Java 必备 百度网盘资源大合集 价值>10000元 加尼恩领取


高性能 BoundedBuffer 条带环形队列

Caffeine 源码中,用到几个高性能数据结构要讲

  • 一个是 条带环状 队列 (超高性能、无锁队列
  • 一个是mpsc队列 (超高性能、无锁队列
  • 一个是 多级时间轮

这里给大家 介绍 环形队列、 条带环形队列 Striped-RingBuffer 。

剩下的两个结构, 稍后一点 ,使用专门的 博文介绍。

CAS 的优势与核心问题

由于JVM重量级锁使用了Linux内核态下的互斥锁(Mutex),这是重量级锁开销很大的原因。

抢占与释放的过程中,涉及到 进程的 用户态和 内核态, 进程的 用户空间 和内核空间之间的切换, 性能非常低。

而CAS进行自旋抢锁,这些CAS操作都处于用户态下,进程不存在用户态和内核态之间的运行切换,因此JVM轻量级锁开销较小。这是 CAS 的优势。

但是, 任何事情,都有两面性。

CAS 的核心问题是什么呢?

在争用激烈的场景下,会导致大量的CAS空自旋。

比如,在大量的线程同时并发修改一个AtomicInteger时,可能有很多线程会不停地自旋,甚至有的线程会进入一个无限重复的循环中。

大量的CAS空自旋会浪费大量的CPU资源,大大降低了程序的性能。

除了存在CAS空自旋之外,在SMP架构的CPU平台上,大量的CAS操作还可能导致“总线风暴”,具体可参见《Java高并发核心编程 卷2 加强版》第5章的内容。

在高并发场景下如何提升CAS操作性能/ 解决CAS恶性空自旋 问题呢?

较为常见的方案有两种:

  • 分散操作热点、
  • 使用队列削峰。

比如,在自增的场景中, 可以使用LongAdder替代AtomicInteger。

这是一种 分散操作热点 ,空间换时间 方案,

也是 分而治之的思想。

以空间换时间:LongAdder 以及 Striped64

Java 8提供一个新的类LongAdder,以空间换时间的方式提升高并发场景下CAS操作性能。

LongAdder核心思想就是热点分离,与ConcurrentHashMap的设计思想类似:将value值分离成一个数组,当多线程访问时,通过Hash算法将线程映射到数组的一个元素进行操作;而获取最终的value结果时,则将数组的元素求和。

最终,通过LongAdder将内部操作对象从单个value值“演变”成一系列的数组元素,从而减小了内部竞争的粒度。LongAdder的演变如图3-10所示。

图3-10 LongAdder的操作对象由单个value值“演变”成了数组

LongAdder的分治思想和架构

LongAdder的操作对象由单个value值“演变”成了数组

LongAdder 继承了 Striped64,核心源码在 Striped64中。

条带累加Striped64的结构和源码

/*** A package-local class holding common representation and mechanics* for classes supporting dynamic striping on 64bit values. The class* extends Number so that concrete subclasses must publicly do so.*/
@SuppressWarnings("serial")
abstract class Striped64 extends Number {/*** Padded variant of AtomicLong supporting only raw accesses plus CAS.** JVM intrinsics note: It would be possible to use a release-only* form of CAS here, if it were provided.*/@sun.misc.Contended static final class Cell {volatile long value;Cell(long x) { value = x; }final boolean cas(long cmp, long val) {return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);}// Unsafe mechanicsprivate static final sun.misc.Unsafe UNSAFE;private static final long valueOffset;static {try {UNSAFE = sun.misc.Unsafe.getUnsafe();Class<?> ak = Cell.class;valueOffset = UNSAFE.objectFieldOffset(ak.getDeclaredField("value"));} catch (Exception e) {throw new Error(e);}}}/** Number of CPUS, to place bound on table size */static final int NCPU = Runtime.getRuntime().availableProcessors();/*** Table of cells. When non-null, size is a power of 2.*/transient volatile Cell[] cells;/*** Base value, used mainly when there is no contention, but also as* a fallback during table initialization races. Updated via CAS.*/transient volatile long base;/*** Spinlock (locked via CAS) used when resizing and/or creating Cells.*/transient volatile int cellsBusy;/*** Package-private default constructor*/Striped64() {}

以上源码的特别复杂,请参见 《Java高并发核心编程 卷2 加强版》

BoundedBuffer 的核心源码

/*** A striped, non-blocking, bounded buffer.** @author ben.manes@gmail.com (Ben Manes)* @param <E> the type of elements maintained by this buffer*/
final class BoundedBuffer<E> extends StripedBuffer<E>

它是一个 striped、非阻塞、有界限的 buffer,继承于StripedBuffer类。

下面看看StripedBuffer的实现:

/*** A base class providing the mechanics for supporting dynamic striping of bounded buffers. This* implementation is an adaption of the numeric 64-bit {@link java.util.concurrent.atomic.Striped64}* class, which is used by atomic counters. The approach was modified to lazily grow an array of* buffers in order to minimize memory usage for caches that are not heavily contended on.** @author dl@cs.oswego.edu (Doug Lea)* @author ben.manes@gmail.com (Ben Manes)*/abstract class StripedBuffer<E> implements Buffer<E>

StripedBuffer (条带缓冲)的架构

解决CAS恶性空自旋的有效方式之一是以空间换时间,较为常见的方案有两种:

  • 分散操作热点、
  • 使用队列削峰。

这个StripedBuffer设计的思想是跟Striped64类似的,通过扩展结构把分散操作热点(/竞争热点分离)

具体实现是这样的,StripedBuffer维护一个Buffer[]数组,叫做table,每个元素就是一个RingBuffer,

每个线程用自己id属性作为 hash 值的种子产生hash值,这样就相当于每个线程都有自己“专属”的RingBuffer,

在hash分散很均衡的场景下,就不会尽量的降低竞争,避免空自旋,

看看StripedBuffer的属性

/** Table of buffers. When non-null, size is a power of 2. */
//RingBuffer数组
transient volatile Buffer<E> @Nullable[] table;//当进行resize时,需要整个table锁住。tableBusy作为CAS的标记。
static final long TABLE_BUSY = UnsafeAccess.objectFieldOffset(StripedBuffer.class, "tableBusy");
static final long PROBE = UnsafeAccess.objectFieldOffset(Thread.class, "threadLocalRandomProbe");/** Number of CPUS. */
static final int NCPU = Runtime.getRuntime().availableProcessors();/** The bound on the table size. */
//table最大size
static final int MAXIMUM_TABLE_SIZE = 4 * ceilingNextPowerOfTwo(NCPU);/** The maximum number of attempts when trying to expand the table. */
//如果发生竞争时(CAS失败)的尝试次数
static final int ATTEMPTS = 3;/** Table of buffers. When non-null, size is a power of 2. */
//核心数据结构
transient volatile Buffer<E> @Nullable[] table;/** Spinlock (locked via CAS) used when resizing and/or creating Buffers. */
transient volatile int tableBusy;/** CASes the tableBusy field from 0 to 1 to acquire lock. */
final boolean casTableBusy() {return UnsafeAccess.UNSAFE.compareAndSwapInt(this, TABLE_BUSY, 0, 1);
}/*** Returns the probe value for the current thread. Duplicated from ThreadLocalRandom because of* packaging restrictions.*/
static final int getProbe() {return UnsafeAccess.UNSAFE.getInt(Thread.currentThread(), PROBE);
}

offer方法,当没初始化或存在竞争时,则扩容为 2 倍。最大为不小于 CPU核数的 2幂值。

/*** The bound on the table size.*/static final int MAXIMUM_TABLE_SIZE = 4 * ceilingPowerOfTwo(NCPU);

实际是调用RingBuffer的 offer 方法,把数据追加到RingBuffer后面。

@Override
public int offer(E e) {int mask;int result = 0;Buffer<E> buffer;//是否不存在竞争boolean uncontended = true;Buffer<E>[] buffers = table//是否已经初始化if ((buffers == null)|| (mask = buffers.length - 1) < 0//用thread的随机值作为hash值,得到对应位置的RingBuffer|| (buffer = buffers[getProbe() & mask]) == null//检查追加到RingBuffer是否成功|| !(uncontended = ((result = buffer.offer(e)) != Buffer.FAILED))) {//其中一个符合条件则进行扩容expandOrRetry(e, uncontended);}return result;
}/*** Handles cases of updates involving initialization, resizing, creating new Buffers, and/or* contention. See above for explanation. This method suffers the usual non-modularity problems of* optimistic retry code, relying on rechecked sets of reads.** @param e the element to add* @param wasUncontended false if CAS failed before call*///这个方法比较长,但思路还是相对清晰的。
@SuppressWarnings("PMD.ConfusingTernary")
final void expandOrRetry(E e, boolean wasUncontended) {int h;if ((h = getProbe()) == 0) {ThreadLocalRandom.current(); // force initializationh = getProbe();wasUncontended = true;}boolean collide = false; // True if last slot nonemptyfor (int attempt = 0; attempt < ATTEMPTS; attempt++) {Buffer<E>[] buffers;Buffer<E> buffer;int n;if (((buffers = table) != null) && ((n = buffers.length) > 0)) {if ((buffer = buffers[(n - 1) & h]) == null) {if ((tableBusy == 0) && casTableBusy()) { // Try to attach new Bufferboolean created = false;try { // Recheck under lockBuffer<E>[] rs;int mask, j;if (((rs = table) != null) && ((mask = rs.length) > 0)&& (rs[j = (mask - 1) & h] == null)) {rs[j] = create(e);created = true;}} finally {tableBusy = 0;}if (created) {break;}continue; // Slot is now non-empty}collide = false;} else if (!wasUncontended) { // CAS already known to failwasUncontended = true;      // Continue after rehash} else if (buffer.offer(e) != Buffer.FAILED) {break;} else if (n >= MAXIMUM_TABLE_SIZE || table != buffers) {collide = false; // At max size or stale} else if (!collide) {collide = true;} else if (tableBusy == 0 && casTableBusy()) {try {if (table == buffers) { // Expand table unless staletable = Arrays.copyOf(buffers, n << 1);}} finally {tableBusy = 0;}collide = false;continue; // Retry with expanded table}h = advanceProbe(h);} else if ((tableBusy == 0) && (table == buffers) && casTableBusy()) {boolean init = false;try { // Initialize tableif (table == buffers) {@SuppressWarnings({"unchecked", "rawtypes"})Buffer<E>[] rs = new Buffer[1];rs[0] = create(e);table = rs;init = true;}} finally {tableBusy = 0;}if (init) {break;}}}
}

环形队列

我们知道,队列伴随着生产和消费,而队列一般也是由数组或链表来实现的,

队列是一个先进先出的结构,那么随着游标在数组上向后移动,

前面已经消费了的数据已没有意义,但是他们依然占据着内存空间,浪费越来越大,

所以:环形队列就很好的解决了这个问题。

环形队列是在实际编程极为有用的数据结构,它采用数组的线性空间,数据组织简单,能很快知道队列是否满或空,能以很快速度的来存取数据。

从顺时针看,环形队列 有队头 head 和队尾 tail。

生产的流程是:

生产者顺时针向队尾 tail 插入元素,这会导致 head 位置不变,tail 位置在后移;

消费的流程是:

消费者则从队头 head 开始消费,这会导致 head 向后移动,而tail 位置不变,如果队列满了就不能写入。

环形队列的特点:

队头 head 和队尾 tail 的位置是不定的,位置一直在循环流动,空间就被重复利用起来了。

因为有简单高效的原因,甚至在硬件都实现了环形队列.。

环形队列广泛用于网络数据收发,和不同程序间数据交换(比如内核与应用程序大量交换数据,从硬件接收大量数据)均使用了环形队列。

环形队列的参考实现

下面的环形队列, 参考了 缓存之王 Caffeine 源码中的 命名

package com.crazymakercircle.queue;public class SimpleRingBufferDemo {public static void main(String[] args) {//创建一个环形队列SimpleRingBuffer queue = new SimpleRingBuffer(4);queue.offer(11);queue.offer(12);queue.offer(13);System.out.println("queue = " + queue);int temp = queue.poll();System.out.println("temp = " + temp);System.out.println("queue = " + queue);temp = queue.poll();System.out.println("temp = " + temp);System.out.println("queue = " + queue);temp = queue.poll();System.out.println("temp = " + temp);System.out.println("queue = " + queue);}}class SimpleRingBuffer {private int maxSize;//表示数组的最大容量private int head;  // 模拟 缓存之王 Caffeine 源码命名//head就指向队列的第一个元素,也就是arr[head]就是队列的第一个元素//head的初始值=0private int tail; // 模拟 缓存之王 Caffeine 源码命名//tail指向队列的最后一个元素的后一个位置,因为希望空出一个空间做为约定//tail的初始化值=0private int[] buffer;//该数据用于存放数据public SimpleRingBuffer(int arrMaxSize) {maxSize = arrMaxSize;buffer = new int[maxSize];}//判断队列是否满public boolean isFull() {return (tail + 1) % maxSize == head;}//判断队列是否为空public boolean isEmpty() {return tail == head;}//添加数据到队列public void offer(int n) {//判断队列是否满if (isFull()) {System.out.println("队列满,不能加入数据");return;}//直接将数据加入buffer[tail] = n;//将tail后移,这里必须考虑取模tail = (tail + 1) % maxSize;}//获取队列的数据,出队列public int poll() {//判断队列是否空if (isEmpty()) {//通过抛出异常throw new RuntimeException("队列空,不能取数据");}//这里需要分析出head是指向队列的第一个元素//1.先把head对应的值保留到一个临时变量//2.将head后移,考虑取模//3.将临时保存的变量返回int value = buffer[head];head = (head + 1) % maxSize;return value;}//求出当前队列有效数据的个数public int size() {return (tail + maxSize - head) % maxSize;}@Overridepublic String toString() {return   String.format("head=%d , tail =%d\n",head,tail);}
}

测试的结果

环形核心的结构和流程说明

  1. 约定head指向队列的第一个元素

    也就是说data[head]就是队头数据,head初始值为0。

  2. 约定tail指向队列的最后一个元素的后一个位置

    也就是说data[tail-1]就是队尾数据,tail初始值为0。

  3. 队列满的条件是:

    ( tail+1 )% maxSize == head

  4. 队列空的条件是:

    tail == head

  5. 队列中的元素个数为:

    ( tail + maxsize - head) % maxSize

  6. 有效数据只有maxSize-1个

    因为tail指向队尾的后面一个位置,这个位置就不能存数据,因此有效数据只有maxSize-1个

环形队列核心操作:判满

写入的时候,当前位置的下一位置是(tail+1)% maxSize

由图可知:

当head刚好指向tail的下一个位置时队列满,而tail的下一个位置是 (tail+1)% maxSize

所以当( tail + 1 )% maxSize == head 时,队列就满了。

环形队列核心操作:判空

队列为空的情况如下图所示,当队头队尾都指向一个位置,即 head == tail 时,队列为空。

当head == tail时,队列为空

因为tail指向队尾的后面一个位置,这个位置就不能存数据,

因此, 环形队列的有效数据只有maxSize-1个

RingBuffer 源码

caffeine源码中, 注意RingBuffer是BoundedBuffer的内部类。

/** The maximum number of elements per buffer. */
static final int BUFFER_SIZE = 16;// Assume 4-byte references and 64-byte cache line (16 elements per line)
//256长度,但是是以16为单位,所以最多存放16个元素
static final int SPACED_SIZE = BUFFER_SIZE << 4;
static final int SPACED_MASK = SPACED_SIZE - 1;
static final int OFFSET = 16;
//RingBuffer数组
final AtomicReferenceArray<E> buffer;//插入方法@Overridepublic int offer(E e) {long head = readCounter;long tail = relaxedWriteCounter();//用head和tail来限制个数long size = (tail - head);if (size >= SPACED_SIZE) {return Buffer.FULL;}//tail追加16if (casWriteCounter(tail, tail + OFFSET)) {//用tail“取余”得到下标int index = (int) (tail & SPACED_MASK);//用unsafe.putOrderedObject设值buffer.lazySet(index, e);return Buffer.SUCCESS;}//如果CAS失败则返回失败return Buffer.FAILED;}//用consumer来处理buffer的数据@Overridepublic void drainTo(Consumer<E> consumer) {long head = readCounter;long tail = relaxedWriteCounter();//判断数据多少long size = (tail - head);if (size == 0) {return;}do {int index = (int) (head & SPACED_MASK);E e = buffer.get(index);if (e == null) {// not published yetbreak;}buffer.lazySet(index, null);consumer.accept(e);//head也跟tail一样,每次递增16head += OFFSET;} while (head != tail);lazySetReadCounter(head);}

注意,ring buffer 的 size(固定是 16 个)是不变的,变的是 head 和 tail 而已。

Striped-RingBuffer 有如下特点:

总的来说 Striped-RingBuffer 有如下特点:

  • 使用 Striped-RingBuffer来提升对 buffer 的读写
  • 用 thread 的 hash 来避开热点 key 的竞争
  • 允许写入的丢失

推荐阅读:

  • 《尼恩Java面试宝典》

  • 《Springcloud gateway 底层原理、核心实战 (史上最全)》

  • 《sentinel (史上最全)》

  • 《分库分表 Sharding-JDBC 底层原理、核心实战(史上最全)》

  • 《分布式事务 (秒懂)》

  • 《缓存之王:Caffeine 源码、架构、原理(史上最全,10W字 超级长文)》

  • 《缓存之王:Caffeine 的使用(史上最全)》

  • 《Java Agent 探针、字节码增强 ByteBuddy(史上最全)》

  • 《Docker原理(图解+秒懂+史上最全)》

  • 《Redis分布式锁(图解 - 秒懂 - 史上最全)》

  • 《Zookeeper 分布式锁 - 图解 - 秒懂》

  • 《Zookeeper Curator 事件监听 - 10分钟看懂》

  • 《Netty 粘包 拆包 | 史上最全解读》

  • 《Netty 100万级高并发服务器配置》

  • 《Springcloud 高并发 配置 (一文全懂)》

参考文献

  1. 疯狂创客圈 JAVA 高并发 总目录

    ThreadLocal(史上最全)
    https://www.cnblogs.com/crazymakercircle/p/14491965.html

  2. 3000页《尼恩 Java 面试宝典 》的 35个面试专题 :
    https://www.cnblogs.com/crazymakercircle/p/13917138.html

  3. 价值10W的架构师知识图谱
    https://www.processon.com/view/link/60fb9421637689719d246739

4、尼恩 架构师哲学
https://www.processon.com/view/link/616f801963768961e9d9aec8

5、尼恩 3高架构知识宇宙
https://www.processon.com/view/link/635097d2e0b34d40be778ab4

Guava Cache主页:https://github.com/google/guava/wiki/CachesExplained

Caffeine的官网:https://github.com/ben-manes/caffeine/wiki/Benchmarks

https://gitee.com/jd-platform-opensource/hotkey

https://developer.aliyun.com/article/788271?utm_content=m_1000291945

https://b.alipay.com/page/account-manage-oc/approval/setList

Caffeine: https://github.com/ben-manes/caffeine

这里: https://albenw.github.io/posts/df42dc84/

Benchmarks: https://github.com/ben-manes/caffeine/wiki/Benchmarks

环形队列、 条带环形队列 Striped-RingBuffer (史上最全)相关推荐

  1. TCP半连接队列和全连接队列(史上最全)

    TCP半连接队列和全连接队列 文章很长,建议收藏起来慢慢读! 总目录 博客园版 为您奉上珍贵的学习资源 : 免费赠送 :<尼恩Java面试宝典>持续更新+ 史上最全 + 面试必备 2000 ...

  2. 高可用 Canal集群( 秒懂 + 史上最全)

    文章很长,而且持续更新,建议收藏起来,慢慢读!疯狂创客圈总目录 博客园版 为您奉上珍贵的学习资源 : 免费赠送 :<尼恩Java面试宝典> 持续更新+ 史上最全 + 面试必备 2000页+ ...

  3. 时间轮 (史上最全)

    缓存之王 Caffeine 中,涉及到100w级.1000W级.甚至亿级元素的过期问题,如何进行高性能的定时调度,是一个难题. 注: 本文从 对 海量调度任务场景中, 高性能的时间轮算法, 做了一个 ...

  4. Nginx面试题(史上最全 + 持续更新)

    尼恩面试宝典专题39:Nginx面试题(史上最全.持续更新) 本文版本说明:V27 <尼恩面试宝典>升级规划为: 后续基本上,每一个月,都会发布一次,最新版本,可以联系构师尼恩获取, 发送 ...

  5. 史上最全 Java 多线程面试题及答案

    这篇文章主要是对多线程的问题进行总结的,因此罗列了40个多线程的问题. 这些多线程的问题,有些来源于各大网站.有些来源于自己的思考.可能有些问题网上有.可能有些问题对应的答案也有.也可能有些各位网友也 ...

  6. 史上最全Java多线程面试题

    转载自 史上最全Java多线程面试题及答案 多线程并发编程是Java编程中重要的一块内容,也是面试重点覆盖区域.所以,学好多线程并发编程对Java程序员来来说极其重要的. 下面小编整理了60道最常见的 ...

  7. 史上最全java架构师技能图谱(下)

    "java架构史上最全技能图谱分为上下两篇,这是java架构史上最全图谱下篇,包含: 大数据以及性能.设计模式.UML.中间件.分布式集群.负载均衡.通讯协议.架构设计等技术图谱等章节. 如 ...

  8. 史上最全Redis面试49题(含答案):哨兵+复制+事务+集群+持久化等

    最全面试题答案系列 史上最强多线程面试44题和答案:线程锁+线程池+线程同步等 最全MySQL面试60题和答案 史上最全memcached面试26题和答案 史上最全Spring面试71题与答案 今天主 ...

  9. 史上最全java架构师技能图谱(上)

    java架构师最全技能图谱上篇,包含:数结构算法.java进阶.web开发.框架与工具四大技能图谱. 下篇将包含大数据以及性能.设计模式.UML.中间件.分布式集群.负载均衡.通讯协议.架构设计等技术 ...

最新文章

  1. Android Binder 学习笔记
  2. 11-17的学习总结(DOMfirstday)
  3. 南明区将引进和培养大数据高端人才逾千名
  4. ABAP table buffer test
  5. 非线性动力学_非线性科学中的现代数学方法:综述
  6. WebService学习总结(二)——WebService相关概念介绍
  7. python自动化六--操作mysql,redis,发送邮件,EXCEL,MD5加密
  8. 一篇文章帮你梳理清楚API设计时需要考虑的几个关键点
  9. mysql 地理空间数据库_地理空间数据库
  10. 机器人的弊议论文_关于练字的作文800字高中(写字机器人的利弊议论文)
  11. ora-00959(表空间不存在) 的另一种可能性
  12. mysql复制表结构创建新表
  13. Word中如何修改脚注的编号方式
  14. Jmeter压力测试,个人使用总结
  15. Google 真的抄百度了吗?
  16. 职校高一计算机课高一,职高高一数学课件
  17. 【imessage苹果群发苹果推】软件安装应用程序/框架/ gcdwebservers
  18. MongoDB增删改查基础操作
  19. 自定义Linxu启动logo(从其他分区加载logo)
  20. 测试难题:测试数据准备之如何准备测试数据

热门文章

  1. 完全平方数(C语言,调用函数)
  2. Android开发之用户头像上传
  3. Kali之Crunch:自定义字典
  4. 企业网配置必备技术NAT,3张图理清
  5. Solidworks快速装配——带配合装配与阵列装配(带例子解释)
  6. i5 12400f性能怎么样 i5 12400f相当于什么水平酷睿i5 12400f有核显吗
  7. 需要系数 计算机房,计算电流及需要系数表.xls
  8. [Leetcode/Python3] 第204场周赛题解
  9. 展锐平台 Android 10.0 OTA升级开机Logo
  10. Eclipse如何安装lombok插件