Striped64

JDK 8 的 java.util.concurrent.atomic 下有一个包本地的类 Striped64 ,它持有常见表示和机制用于类支持动态 striping 到 64bit 值上。

设计思路

这个类维护一个延迟初始的、原子地更新值的表,加上额外的 “base” 字段。表的大小是 2 的幂。索引使用每线程的哈希码来masked。这个的几乎所有声明都是包私有的,通过子类直接访问。

表的条目是 Cell 类,一个填充过(通过 sun.misc.Contended )的 AtomicLong 的变体,用于减少缓存竞争。填充对于多数 Atomics 是过度杀伤的,因为它们一般不规则地分布在内存里,因此彼此间不会有太多冲突。但存在于数组的原子对象将倾向于彼此相邻地放置,因此将通常共享缓存行(对 性能有巨大的副作用),在没有这个防备下。

部分地,因为Cell相对比较大,我们避免创建它们直到需要时。当没有竞争时,所有的更新都作用到 base 字段。根据第一次竞争(更新 base 的 CAS 失败),表被初始化为大小 2。表的大小根据更多的竞争加倍,直到大于或等于CPU数量的最小的 2 的幂。表的槽在它们需要之前保持空。

一个单独的自旋锁(“cellsBusy”)用于初始化和resize表,还有用新的Cell填充槽。不需要阻塞锁,当锁不可得,线程尝试其他槽(或 base)。在这些重试中,会增加竞争和减少本地性,这仍然好于其他选择。

通过 ThreadLocalRandom 维护线程探针字段,作为每线程的哈希码。我们让它们为 0 来保持未初始化直到它们在槽 0 竞争。然后初始化它们为通常不会互相冲突的值。当执行更新操作时,竞争和/或表冲突通过失败了的 CAS 来指示。根据冲突,如果表的大小小于容量,它的大小加倍,除非有些线程持有了锁。如果一个哈希后的槽是空的,且锁可得,创建新的Cell。否则,如果槽存 在,重试CAS。重试通过 “重散列,double hashing” 来继续,使用一个次要的哈希算法(Marsaglia XorShift)来尝试找到一个自由槽位。

表的大小是有上限的,因为,当线程数多于CPU数时,假如每个线程绑定到一个CPU上,存在一个完美的哈希函数映射线程到槽上,消除了冲突。当我们 到达容量,我们随机改变碰撞线程的哈希码搜索这个映射。因为搜索是随机的,冲突只能通过CAS失败来知道,收敛convergence 是慢的,因为线程通常不会一直绑定到CPU上,可能根本不会发生。然而,尽管有这些限制,在这些案例下观察到的竞争频率显著地低。

当哈希到特定 Cell 的线程终止后,Cell 可能变为空闲的,表加倍后导致没有线程哈希到扩展的 Cell 也会出现这种情况。我们不尝试去检测或移除这些 Cell,在实例长期运行的假设下,观察到的竞争水平将重现,所以 Cell 将最终被再次需要。对于短期存活的实例,这没关系。

设计思路小结

  • striping和缓存行填充:通过把类数据 striping 为 64bit 的片段,使数据成为缓存行友好的,减少CAS竞争。
  • 分解表示:对于一个数字 5,可以分解为一序列数的和:2 + 3,这个数字加 1 也等价于它的分解序列中的任一 数字加 1:5 + 1 = 2 + (3 + 1)
  • 通过把分解序列存放在表里面,表的条目都是填充后的 Cell;限制表的大小为 2 的幂,则可以用掩码来实现索引;同时把表的大小限制为大于等于CPU数量的最小的 2 的幂。
  • 当表的条目上出现竞争时,在到达容量前表扩容一倍,通过增加条目来减少竞争。

Cell 类

Cell 类是 Striped64 的静态内部类。通过注解 @sun.misc.Contended 来自动实现缓存行填充,让Java编译器和JRE运行时来决定如何填充。本质上是一个填充了的、提供了CAS更新的volatile变量。

@sun.misc.Contended static final class Cell {volatile long value;Cell(long x) { value = x; }final boolean cas(long cmp, long val) {return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);}// Unsafe mechanicsprivate static final sun.misc.Unsafe UNSAFE;private static final long valueOffset;static {try {UNSAFE = sun.misc.Unsafe.getUnsafe();Class<?> ak = Cell.class;valueOffset = UNSAFE.objectFieldOffset(ak.getDeclaredField("value"));} catch (Exception e) {throw new Error(e);}}
}

Striped64

Striped64 通过一个 Cell 数组维持了一序列分解数的表示,通过 base 字段维持数的初始值,通过 cellsBusy 字段来控制 resing 和/或 创建Cell。它还提供了对数进行累加的机制。

abstract class Striped64 extends Number {static final int NCPU = Runtime.getRuntime().availableProcessors();// 存放 Cell 的表。当不为空时大小是 2 的幂。transient volatile Cell[] cells;// base 值,在没有竞争时使用,也作为表初始化竞争时的一个后备。transient volatile long base;// 自旋锁,在 resizing 和/或 创建Cell时使用。transient volatile int cellsBusy;
}

累加机制 longAccumulate

设计思路里针对机制的实现,核心逻辑。该方法处理涉及初始化、resing、创建新cell、和/或竞争的更新。

逻辑如下:

  • if 表已初始化

    • if 映射到的槽是空的,加锁后再次判断,如果仍然是空的,初始化cell并关联到槽。
    • else if (槽不为空)在槽上之前的CAS已经失败,重试。
    • else if (槽不为空、且之前的CAS没失败,)在此槽的cell上尝试更新
    • else if 表已达到容量上限或被扩容了,重试。
    • else if 如果不存在冲突,则设置为存在冲突,重试。
    • else if 如果成功获取到锁,则扩容。
    • else 重散列,尝试其他槽。
  • else if 锁空闲且获取锁成功,初始化表
  • else if 回退 base 上更新且成功则退出
  • else 继续
final void longAccumulate(long x, LongBinaryOperator fn,boolean wasUncontended) {int h;if ((h = getProbe()) == 0) {// 未初始化的ThreadLocalRandom.current(); // 强制初始化h = getProbe();wasUncontended = true;}// 最后的槽不为空则 true,也用于控制扩容,false重试。boolean collide = false;for (;;) {Cell[] as; Cell a; int n; long v;if ((as = cells) != null && (n = as.length) > 0) {// 表已经初始化if ((a = as[(n - 1) & h]) == null) {// 线程所映射到的槽是空的。if (cellsBusy == 0) {       // 尝试关联新的Cell// 锁未被使用,乐观地创建并初始化cell。Cell r = new Cell(x);if (cellsBusy == 0 && casCellsBusy()) {// 锁仍然是空闲的、且成功获取到锁boolean created = false;try {               // 在持有锁时再次检查槽是否空闲。Cell[] rs; int m, j;if ((rs = cells) != null &&(m = rs.length) > 0 &&rs[j = (m - 1) & h] == null) {// 所映射的槽仍为空rs[j] = r;          // 关联 cell 到槽created = true;}} finally {cellsBusy = 0;     // 释放锁}if (created)break;               // 成功创建cell并关联到槽,退出continue;           // 槽现在不为空了}}// 锁被占用了,重试collide = false;}// 槽被占用了else if (!wasUncontended)       // 已经知道 CAS 失败wasUncontended = true;      // 在重散列后继续// 在当前槽的cell上尝试更新else if (a.cas(v = a.value, ((fn == null) ? v + x :fn.applyAsLong(v, x))))break;// 表大小达到上限或扩容了;// 表达到上限后就不会再尝试下面if的扩容了,只会重散列,尝试其他槽else if (n >= NCPU || cells != as)collide = false;            // At max size or stale//  如果不存在冲突,则设置为存在冲突else if (!collide)collide = true;// 有竞争,需要扩容else if (cellsBusy == 0 && casCellsBusy()) {// 锁空闲且成功获取到锁try {if (cells == as) {      // 距上一次检查后表没有改变,扩容:加倍Cell[] rs = new Cell[n << 1];for (int i = 0; i < n; ++i)rs[i] = as[i];cells = rs;}} finally {cellsBusy = 0;     // 释放锁}collide = false;continue;                   // 在扩容后的表上重试}// 没法获取锁,重散列,尝试其他槽h = advanceProbe(h);}else if (cellsBusy == 0 && cells == as && casCellsBusy()) {// 加锁的情况下初始化表boolean init = false;try {                           // Initialize tableif (cells == as) {Cell[] rs = new Cell[2];rs[h & 1] = new Cell(x);cells = rs;init = true;}} finally {cellsBusy = 0;     // 释放锁}if (init)break;     // 成功初始化,已更新,跳出循环}else if (casBase(v = base, ((fn == null) ? v + x :fn.applyAsLong(v, x))))// 表未被初始化,可能正在初始化,回退使用 base。break;                          // 回退到使用 base}
}

LongAdder

LongAdder和AtomicLong类似,提供long类型数据原子操作。但是LongAdder在AtomicLong的基础上进行了热点分离,热点分离类似于有锁操作中的减小锁粒度,将一个锁分离成若干个锁来提高性能。在无锁中,也可以用类似的方式来增加CAS的成功率,从而提高性能。

LongAdder 继承自 Striped64,它的方法只针对简单的情况:cell存在且更新无竞争,其余情况都通过 Striped64 的longAccumulate方法来完成。

public void add(long x) {Cell[] as; long b, v; int m; Cell a;if ((as = cells) != null || !casBase(b = base, b + x)) {// cells 不为空 或在 base 上cas失败。也即出现了竞争。boolean uncontended = true;//if (as == null || (m = as.length - 1) < 0 ||(a = as[getProbe() & m]) == null ||!(uncontended = a.cas(v = a.value, v + x)))// 如果所映射的槽不为空,且成功更新则返回,否则进入复杂处理流程。longAccumulate(x, null, uncontended);}
}// 获取当前的和。base值加上每个cell的值。
public long sum() {Cell[] as = cells; Cell a;long sum = base;if (as != null) {for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null)sum += a.value;}}return sum;
}

java8新特性回顾(四)---并发增强之Striped64和longAdder相关推荐

  1. java8新特性(四)_Stream详解

    之前写过一篇用stream处理map的文章,但是对stream没有一个整体的认识,这次结合并发编程网和ibm中介绍stream的文章进行一个总结,我会着重写对list的处理,毕竟实际工作中大家每天进行 ...

  2. 【java8新特性】——方法引用(四)

    一.简介 方法引用是java8的新特性之一, 可以直接引用已有Java类或对象的方法或构造器.方法引用与lambda表达式结合使用,可以进一步简化代码. 来看一段简单代码: public static ...

  3. 【java8新特性】——Stream API详解(二)

    一.简介 java8新添加了一个特性:流Stream.Stream让开发者能够以一种声明的方式处理数据源(集合.数组等),它专注于对数据源进行各种高效的聚合操作(aggregate operation ...

  4. 【小家java】java8新特性之---全新的日期、时间API(JSR 310规范),附SpringMVC、Mybatis中使用JSR310的正确姿势

    [小家java]java5新特性(简述十大新特性) 重要一跃 [小家java]java6新特性(简述十大新特性) 鸡肋升级 [小家java]java7新特性(简述八大新特性) 不温不火 [小家java ...

  5. 【JAVA拾遗】Java8新特性合辑

    [JAVA拾遗]Java8新特性合辑 文章目录 [JAVA拾遗]Java8新特性合辑 0. 逼逼 [--/--]126 Lambda Expressions & Virtual Extensi ...

  6. Java8新特性学习笔记

    Java8新特性学习笔记 文章目录 Java8新特性学习笔记 一.接口和日期处理 1.接口增强 1.1.JDK8以前 VS JDK8 1)接口定义: 1.2.默认方法(default) 1)默认方法格 ...

  7. java8新特性之Steam

    java8新特性之Steam 一.什么是Stream?传统集合遍历循环存在哪些弊端? 二.获取流的方式 2.1.根据Collection获取流 2.2.根据Map获取流 2.3.根据数组获取流 三.S ...

  8. Java基础学习——第十六章 Java8新特性

    Java基础学习--第十六章 Java8 新特性 Java8(JDK8.0)较 JDK7.0 有很多变化或者说是优化,比如 interface 里可以有静态方法和默认方法,并且可以有方法体,这一点就颠 ...

  9. Java8 新特性之流式数据处理(转)

    转自:https://www.cnblogs.com/shenlanzhizun/p/6027042.html 一. 流式处理简介 在我接触到java8流式处理的时候,我的第一感觉是流式处理让集合操作 ...

最新文章

  1. RegistryCallback routine(CmRegisterCallback 注册表操作监控介绍)
  2. Python程序设计之迭代器和生成器示例
  3. 面对需求总是被Boss怒改,产品经理该怎么办?
  4. 对接第三方支付接口-类似文件锁的编程小技巧
  5. 刚开始学ASP+ACCESS时,该注意的事项
  6. 三大运营商5G基站大单纷纷落地:华为、中兴、爱立信、大唐移动收获大
  7. python3导入ssl报错_python3中pip3安装出错,找不到SSL的解决方式
  8. vue 组件 not defined_Vue、Spring Boot开发小而完整的Web前后端分离项目实战10
  9. 从零开始学androidNotification通知.四十四.
  10. VSDX Annotator for mac(Visio 绘图工具)
  11. Sprint周期项目开发总结
  12. 电影推荐——基于关联分析Apriori算法
  13. WEB应用log4j1.x升级到log4j2.17.1
  14. python画海贼王_用python自动爬取海贼王漫画推送kindle
  15. Linux学习-man和Info
  16. 酒浓码浓 - HTML5微数据/itemscope/itemtype/itemprop
  17. anaconda、labelme标注软件安装和使用
  18. 苹果怎样用小米云服务器,苹果换华为/小米,怎么同步数据?教程来了!
  19. 计算机学院刘岗,中科院金属所刘岗研究员访问城市环境研究所
  20. 电信中兴光猫ZXHN F650超管密码获取工具

热门文章

  1. 数据中心机房效果图鉴赏
  2. 迅时MX100G-S数字中继网关荣获《电信设备进网许可证》
  3. Vanishing gradient and activiation funcation(ReLU、Maxout)
  4. 计算机人民币货币符号是什么,人民币货币符号-人民币符号究竟是什么?yen;;还是¥? 爱问知识人...
  5. 龙讯|LT9721高性能HDMI/MIPItoTypeC TO DP转换器
  6. ffmpeg音频滤镜
  7. 并查集解决重复员工问题
  8. Java 报错Attempted read from closed stream
  9. 电子发票税费计算问题
  10. linux centos7 iptables配置