实战系列-Java中线程安全集合类(二)
导语
上次分享中讲到了比较常用的几个Map相关的集合。这篇分享主要来记录一下,剩下的关于队列的一些信息以及其他补充的信息。
JCTools 非阻塞Queue
JCTools 所使用的队列是基于Lamport的无等待SPSC算法,然后稍微做了改进。Lamport算法是在顺序一致性内存模型下可以实现单生产者/单消费者的循环缓冲区。在做了调整之后,在总存储顺序和其他较弱的一致性模型下也是正确的。使用CAS的方式完成实现,相较于使用锁的方式可以减少很多性能的开支,抛弃了锁的申请与切换,可以带来很大的性能提升。
SPSC-单一生产者单一消费者(有界和无界)
只有同步,没有互斥。只有一个生产者,不存在同时有两个生产者使用缓冲区资源造成数据不一致的状态。只需要控制好在缓冲区满的时候不再继续添加元素。有界与无界的区别是一个有缓冲区上限,一个缓冲区可以不断向后延伸。
有界PscArrayQueue
初始化
public SpscArrayQueue(int capacity) {super(Math.max(capacity, 4));
}
传入值进行初始化,初始化时最小容量大小为4
Offer方法
public boolean offer(E e) {if (null == e) {throw new NullPointerException();} else {E[] buffer = this.buffer;long mask = this.mask;long producerIndex = this.producerIndex;if (producerIndex >= this.producerLimit && !this.offerSlowPath(buffer, mask, producerIndex)) {return false;} else {long offset = calcElementOffset(producerIndex, mask);UnsafeRefArrayAccess.soElement(buffer, offset, e);this.soProducerIndex(producerIndex + 1L);return true;}}
}
添加元素,添加的元素不可为空,添加的生产者的索引小于缓冲区的长度限制,CAS执行添加元素。只有当生产者索引大于缓冲区长度限制并且生产者索引的位置处有值时添加失败。
OfferSlowPath方法
private boolean offerSlowPath(E[] buffer, long mask, long producerIndex) {int lookAheadStep = this.lookAheadStep;if (null == UnsafeRefArrayAccess.lvElement(buffer, calcElementOffset(producerIndex + (long)lookAheadStep, mask))) {this.producerLimit = producerIndex + (long)lookAheadStep;} else {long offset = calcElementOffset(producerIndex, mask);if (null != UnsafeRefArrayAccess.lvElement(buffer, offset)) {return false;}}return true;
}
计算生产者索引处的偏移,获取偏移处缓冲区的值,如果为空就调整生产者的最大索引,如果不为空,则说明已经有值了,返回false。
Poll方法取值并弹出
public E poll() {long consumerIndex = this.consumerIndex;long offset = this.calcElementOffset(consumerIndex);E[] buffer = this.buffer;E e = UnsafeRefArrayAccess.lvElement(buffer, offset);if (null == e) {return null;} else {UnsafeRefArrayAccess.soElement(buffer, offset, (Object)null);this.soConsumerIndex(consumerIndex + 1L);return e;}
}
获取消费者索引,计算消费者在缓冲区中的下标,获取到下标处的值,如果不为空,更新该处的值为null,调整消费者索引+1 ,返回元素。
Peek方法是获取元素不弹出
public E peek() {return UnsafeRefArrayAccess.lvElement(this.buffer, this.calcElementOffset(this.consumerIndex));
}
Drain方法
public int drain(Consumer<E> c, int limit) {E[] buffer = this.buffer;long mask = this.mask;long consumerIndex = this.consumerIndex;for(int i = 0; i < limit; ++i) {long index = consumerIndex + (long)i;long offset = calcElementOffset(index, mask);E e = UnsafeRefArrayAccess.lvElement(buffer, offset);if (null == e) {return i;}UnsafeRefArrayAccess.soElement(buffer, offset, (Object)null);this.soConsumerIndex(index + 1L);c.accept(e);}return limit;
}
给定一个值或者成为范围,清除缓冲区中这个范围的值
Fill方法
public int fill(Supplier<E> s, int limit) {E[] buffer = this.buffer;long mask = this.mask;int lookAheadStep = this.lookAheadStep;long producerIndex = this.producerIndex;for(int i = 0; i < limit; ++i) {long index = producerIndex + (long)i;long lookAheadElementOffset = calcElementOffset(index + (long)lookAheadStep, mask);if (null != UnsafeRefArrayAccess.lvElement(buffer, lookAheadElementOffset)) {long offset = calcElementOffset(index, mask);if (null != UnsafeRefArrayAccess.lvElement(buffer, offset)) {return i;}UnsafeRefArrayAccess.soElement(buffer, offset, s.get());this.soProducerIndex(index + 1L);} else {int lookAheadLimit = Math.min(lookAheadStep, limit - i);for(int j = 0; j < lookAheadLimit; ++j) {long offset = calcElementOffset(index + (long)j, mask);UnsafeRefArrayAccess.soElement(buffer, offset, s.get());this.soProducerIndex(index + (long)j + 1L);}i += lookAheadLimit - 1;}}
给一个数组和范围,fill方法会将给的缓冲区范围内的值填充到有界缓冲区中,添加了等待策略的drain和fill,drain在循环等待4096次后放弃,fill在循环等待lookAheadStep次后放弃。
无界SpscUnBoundedArrayQueue
父类是BaseSpscLinkedArrayQueue,采用链表式的队列,方便无界的扩展。初始化时给了很大的值,而且是2的倍数。
&emps;其中放值取值等都是使用父类BaseSpscLinkedArrayQueue的方法,只有一个初始化和offerColdPath两个私有方法,offerColdPath方法是在offer添加元素缓冲区已满的情况下调用,方法中判断空间是否充足,缓冲区是否还有空间,都不满足即空间充足,缓冲区没有空间然后申请新的空间,并把元素添加进去。
MPSC-多生产者单一消费者(有界和无界)
多生产者需要考虑到生产者的互斥问题,不仅要控制好不能在空间已满的情况下添加并且还要考虑到是否有其他生产者在同时修改缓冲值,如果有修改的需要循环的方式重新尝试。
有界
构造方法使用的是父类的构造方法,环形的数组队列,初始容量取比传入数值略大的2的次方值
Offer方法
public boolean offer(E e) {if (null == e) {传入的元素不能为空throw new NullPointerException();} else {long mask = this.mask;
生产者的最大索引,初始时为传入的容量long producerLimit = this.lvProducerLimit();long pIndex;long offset;do {生产者索引pIndex = this.lvProducerIndex();
如果生产者索引达到最大值,要调整消费者的大小if (pIndex >= producerLimit) {获取消费者索引offset = this.lvConsumerIndex();
调整生产者上限为消费者索引+容器大小producerLimit = offset + mask + 1L;
调整完依然还是达到上限,返回falseif (pIndex >= producerLimit) {return false;}
更新生产者上限this.soProducerLimit(producerLimit);}
Cas更新成功就跳出循环,失败就再次尝试} while(!this.casProducerIndex(pIndex, pIndex + 1L));
计算索引,插入到相应的位置offset = calcElementOffset(pIndex, mask);UnsafeRefArrayAccess.soElement(this.buffer, offset, e);return true;}
}
Poll方法
public E poll() {获取消费者索引long cIndex = this.lpConsumerIndex();
计算在数组中的位置long offset = this.calcElementOffset(cIndex);E[] buffer = this.buffer;
获取缓冲区此位置的元素值E e = UnsafeRefArrayAccess.lvElement(buffer, offset);if (null == e) {判断数组是否为空if (cIndex == this.lvProducerIndex()) {return null;}do {e = UnsafeRefArrayAccess.lvElement(buffer, offset);} while(e == null);}UnsafeRefArrayAccess.spElement(buffer, offset, (Object)null);this.soConsumerIndex(cIndex + 1L);return e;
}
无界MpscChunkedArrayQueue
初始化通过父类的有参构造方法
public MpscUnboundedArrayQueue(int chunkSize) {super(chunkSize);
}
BaseMpscLinkedArrayQueue.Class
public BaseMpscLinkedArrayQueue(int initialCapacity) {验证初始容量值,必须大于等于2RangeUtil.checkGreaterThanOrEqual(initialCapacity, 2, "initialCapacity");
传入初始参数计算得出最接近initialCapacity的2的N次方值int p2capacity = Pow2.roundToPowerOfTwo(initialCapacity);
通过p2capacity计算mask值,用作扩容大小long mask = (long)(p2capacity - 1 << 1);
默认分配p2capacity+1大小的缓冲区E[] buffer = CircularArrayOffsetCalculator.allocate(p2capacity + 1);this.producerBuffer = buffer;this.producerMask = mask;this.consumerBuffer = buffer;this.consumerMask = mask;
用mask作为初始化队列的limit值,producerIndex大于limit就扩容this.soProducerLimit(mask);
}
RangeUtil.class
public static int checkGreaterThanOrEqual(int n, int expected, String name) {if (n < expected) {throw new IllegalArgumentException(name + ": " + n + " (expected: >= " + expected + ')');} else {return n;}
}
要求队列的值不小于expected的大小,但是这里expected就为2
Offer方法
public boolean offer(E e) {if (null == e) { 添加的元素e不能为空throw new NullPointerException();} else {while(true) {while(true) {队列的上限long offset = this.lvProducerLimit();
生产者指针long pIndex = this.lvProducerIndex();if ((pIndex & 1L) != 1L) {long mask = this.producerMask;E[] buffer = this.producerBuffer;
如果上限小于生产者指针时,去扩容if (offset <= pIndex) {通过offerSlowPath返回状态值来查看怎么处理这个待添加的元素int result = this.offerSlowPath(mask, pIndex, offset);switch(result) {case 0:default:break;case 1:continue;case 2:
队列已满,返回falsereturn false;case 3:
扩容this.resize(mask, buffer, pIndex, e);return true;}}
生产者指针没有超过上限的时候,cas方式对生产者指针+2if (this.casProducerIndex(pIndex, pIndex + 2L)) {增加成功之后,获取添加元素的位置,将新元素进行添加offset = LinkedArrayQueueUtil.modifiedCalcElementOffset(pIndex, mask);UnsafeRefArrayAccess.soElement(buffer, offset, e);return true;}}}}}
}
Resize方法
private void resize(long oldMask, E[] oldBuffer, long pIndex, E e) {获取oldBuffer长度值int newBufferLength = this.getNextBufferSize(oldBuffer);
将oldBuffer长度值设置给新的缓冲区E[] newBuffer = CircularArrayOffsetCalculator.allocate(newBufferLength);
新建的缓冲区赋值给生产者缓冲区this.producerBuffer = newBuffer;int newMask = newBufferLength - 2 << 1;this.producerMask = (long)newMask;
根据oldmask计算偏移位置long offsetInOld = LinkedArrayQueueUtil.modifiedCalcElementOffset(pIndex, oldMask);
根据newmask计算偏移位置long offsetInNew = LinkedArrayQueueUtil.modifiedCalcElementOffset(pIndex, (long)newMask);
将新元素e放置在新缓冲区offsetInNew的位置UnsafeRefArrayAccess.soElement(newBuffer, offsetInNew, e);
将新的缓冲区放到旧的缓冲区nextArrayOffset(oldmask)处
将oldBuffer中最后一个元素指向新的缓冲区newBuffer,构成单向链表的样子UnsafeRefArrayAccess.soElement(oldBuffer, this.nextArrayOffset(oldMask), newBuffer);
获取消费者索引long cIndex = this.lvConsumerIndex();
获取可用容量long availableInQueue = this.availableInQueue(pIndex, cIndex);RangeUtil.checkPositive(availableInQueue, "availableInQueue");
扩容生产者上限,扩容大小为新增的缓冲区的大小this.soProducerLimit(pIndex + Math.min((long)newMask, availableInQueue));
生产者指针加2this.soProducerIndex(pIndex + 2L);
当在一个缓冲区中遇到jump,就应该考虑取下一个缓冲区取值,衔接新老缓冲区的标志。UnsafeRefArrayAccess.soElement(oldBuffer, offsetInOld, JUMP);
}
Poll方法
public E poll() {获取消费者缓冲区E[] buffer = this.consumerBuffer;
获取消费者指针long index = this.consumerIndex;long mask = this.consumerMask;
根据消费者指针和掩码计算从哪里开始取值long offset = LinkedArrayQueueUtil.modifiedCalcElementOffset(index, mask);
取值
Object e = UnsafeRefArrayAccess.lvElement(buffer, offset);if (e == null) {取得值为空,先判断消费者指针和生产者指针是不是相同,相同说明队列为空if (index == this.lvProducerIndex()) {return null;}do {e = UnsafeRefArrayAccess.lvElement(buffer, offset);} while(e == null);}if (e == JUMP) {如果元素为jump,需要取下一个缓冲区的值E[] nextBuffer = this.getNextBuffer(buffer, mask);return this.newBufferPoll(nextBuffer, index);} else {一般情况下,设置offset位置处值为null,消费者指针加2UnsafeRefArrayAccess.soElement(buffer, offset, (Object)null);this.soConsumerIndex(index + 2L);return e;}
}
Drain方法和pscArrayQueue的drain方法相同
SPMC-单生产者多消费者(有界)
初始化调用父类的构造器
public ConcurrentCircularArrayQueue(int capacity) {int actualCapacity = Pow2.roundToPowerOfTwo(capacity);this.mask = (long)(actualCapacity - 1);this.buffer = CircularArrayOffsetCalculator.allocate(actualCapacity);
}
计算得出略大于传入值的2次方值作为队列的大小
Offer方法
public boolean offer(final E e)
{if (null == e){throw new NullPointerException();}final E[] buffer = this.buffer;final long mask = this.mask;final long currProducerIndex = lvProducerIndex();final long offset = calcElementOffset(currProducerIndex, mask);if (null != lvElement(buffer, offset)){long size = currProducerIndex - lvConsumerIndex();if (size > mask){return false;}else{// spin wait for slot to clear, buggers wait freedomwhile (null != lvElement(buffer, offset)){;}}}spElement(buffer, offset, e);// single producer, so store ordered is valid. It is also required to correctly publish the element// and for the consumers to pick up the tail value.soProducerIndex(currProducerIndex + 1);return true;
}
当前获取到的生产者指针指向的位置不为空时,循环直到该位置为空,然后将值插入到该位置。
Poll方法
public E poll()
{long currentConsumerIndex;
获取共享缓存中的生产者指针long currProducerIndexCache = lvProducerIndexCache();do{获取消费者指针currentConsumerIndex = lvConsumerIndex();
如果消费者指针大于当前共享缓存的生产者指针if (currentConsumerIndex >= currProducerIndexCache){long currProducerIndex = lvProducerIndex();
如果当前生产者指针确实小于等于消费者指针,说明缓冲区中没有元素if (currentConsumerIndex >= currProducerIndex){return null;}else{当前生产者指针被更新,或者说刚刚添加了新值,同步共享缓存中的生产者指针为当前生产者指针,并保存currProducerIndexCache = currProducerIndex;svProducerIndexCache(currProducerIndex);}}}
循环,当前消费者指针被其他消费者更改时进行循环尝试while (!casConsumerIndex(currentConsumerIndex, currentConsumerIndex + 1));// consumers are gated on latest visible tail, and so can't see a null value in the queue or overtake// and wrap to hit same location.
正常执行下来会把消费者指针位置的值删除掉return removeElement(buffer, currentConsumerIndex, mask);
}
Peek方法,大概套路是获取消费者指针,判断和共享缓存中的生产者指针的距离,如果相等或大于,就获取当前生产者指针的位置,再进行判断,如果还是一样的结果,就说明没有新值的添加,队列确实为空,有新值加入了,生产者指针就会更改,保存到共享缓存中,此时如果消费者指针被其他消费者更改,循环尝试。没有问题后返回元素值。
MPMC-多生产者多消费者(有界)
Offer方法
public boolean offer(final E e)
{if (null == e){throw new NullPointerException();}final long mask = this.mask;final long capacity = mask + 1;final long[] sBuffer = sequenceBuffer;long pIndex;long seqOffset;long seq;long cIndex = Long.MIN_VALUE;// start with bogus value, hope we don't need itdo{pIndex = lvProducerIndex();seqOffset = calcSequenceOffset(pIndex, mask);
获取到生产者指针在队列中的位置值seq = lvSequence(sBuffer, seqOffset);// consumer has not moved this seq forward, it's as last producer left
计算出的位置小于指针的位置,说明队列中的位置需要移动了if (seq < pIndex){// Extra check required to ensure [Queue.offer == false iff queue is full]
说明队列满了if (pIndex - capacity >= cIndex && // test against cached cIndexpIndex - capacity >= (cIndex = lvConsumerIndex())){ // test against latest cIndexreturn false;}else{队列未满,队列中的位置+1seq = pIndex + 1; // (+) hack to make it go around again without CAS}}}
其他生产者调整过队列中位置或者生产者指针添加失败都会进行循环重试while (seq > pIndex || // another producer has moved the sequence(or +)!casProducerIndex(pIndex, pIndex + 1)); // failed to incrementsoElement(buffer, calcElementOffset(pIndex, mask), e);soSequence(sBuffer, seqOffset, pIndex + 1); // seq++;return true;
}
Poll方法类似offer方法,做了对生产者指针位置的CAS校验,做了对消费者指针位置的CAS校验
多生产者就对多生产者可能出现并发现问题的地方执行CAS检测,多消费者就对多消费这可能出现并发问题的地方进行CAS检测。单生产者或单消费者就需要做好同步控制。
实战系列-Java中线程安全集合类(二)相关推荐
- Tableau实战系列Tableau基础概念全解析 (二)-万字长文解析数据类型及数据集
前言 以下是我为大家准备的几个精品专栏,喜欢的小伙伴可自行订阅,你的支持就是我不断更新的动力哟! MATLAB-30天带你从入门到精通 MATLAB深入理解高级教程(附源码) tableau可视化数据 ...
- Tableau实战系列Tableau基础概念全解析 (三)-维度和度量
前言 连接到新数据源时,Tableau 会将该数据源中的每个字段分配为 "数据"窗格的维度或度量,具体情况视字段包含的数据类型而定.你使用这些字段来构建数据的视图. 以下是我为大家 ...
- Tableau可视化分析实战系列Tableau基础概念全解析 (一)-数据结构及字段
前言 什么是维度和度量?为何有一些字段维度和其他度量? 为何一些字段的背景颜色是蓝色,而另外一些字段的背景颜色是绿色? 添加筛选器会对我的可视化项产生怎样的影响? 以下是我为大家准备的几个精品专栏,喜 ...
- 【直播回顾】云栖社区特邀专家徐雷Java Spring Boot开发实战系列课程(第19讲):Java Spring Cloud微服务架构模式与开发实战...
主讲人:徐雷(云栖社区特邀Java专家) 徐雷,花名:徐雷frank:资深架构师,MongoDB中文社区联席主席,吉林大学计算机学士,上海交通大学硕士.从事了 10年+开发工作,专注于分布式架构,Ja ...
- java search 不能使用方法_ElasticSearch实战系列三: ElasticSearch的JAVA API使用教程
前言 在上一篇中介绍了ElasticSearch实战系列二: ElasticSearch的DSL语句使用教程---图文详解,本篇文章就来讲解下 ElasticSearch 6.x官方Java API的 ...
- MongoDB实战系列之二:MongoDB的常用操作
#以服务方式启动mongodb,要求验证 /elain/apps/mongodb/bin/mongod --fork --port 27001 --auth --dbpath /elain/data/ ...
- [VulnStack] ATTCK实战系列—红队实战(二)
文章目录 0x00 环境搭建 0x01 信息收集 端口探测 目录扫描 0x02 漏洞利用 0x03 内网收集 主机信息搜集 域信息收集 内网漏洞扫描 0x04 横向渗透 MS17-010 PsExec ...
- Java秒杀系统实战系列~JMeter压力测试重现秒杀场景中超卖等问题
摘要: 本篇博文是"Java秒杀系统实战系列文章"的第十二篇,本篇博文我们将借助压力测试工具Jmeter重现秒杀场景(高并发场景)下出现的各种典型的问题,其中最为经典的当属&quo ...
- 抖音短视频数据抓取实战系列(二)——Fiddler安装配置以及模拟器监测环境配置
抖音短视频数据抓取实战系列(二)--Fiddler安装配置以及模拟器监测环境配置 项目目录 1.抖音短视频数据抓取实战系列(〇)--前言 2.抖音短视频数据抓取实战系列(一)--模拟器的选择与设置 3 ...
最新文章
- MySQL之查看数据库编码
- [ACM] hdu 1671 Phone List (字典树)
- 关于ContinuationFilter的使用
- get请求的乱码解决方式
- git回退到之前版本和合并分支查看当前分支切换分支
- lombok有参构造注解_Java高效开发工具: Lombok
- C语言 嵌入式 面试小知识点(一)
- python如何画出多个独立的图片_python实现在一个画布上画多个子图
- ajax 发http请求吗,使用 Ajax 发送 http 请求 (getpost 请求)
- 卖计算机英语对话,英语购买电脑情景对话.doc
- 25 | 业务安全体系:对比基础安全,业务安全有哪些不同?
- How to set the Default Page in ASP.NET?
- Spark基础编程实践
- UG导出CAD图纸的方法
- Ubuntu安装VMware tools工具
- dnf游戏多开虚拟机,过检测过制裁,dnf同步
- 我的大学——实习生涯
- win11死机怎么办?教你解决电脑死机的方法
- 二分图——洛谷P1155 双栈排序
- KDZD耐电压高压击穿强度测试仪
热门文章
- mysql exp 注入_使用exp进行SQL报错注入
- 泉州中考分数如何计算机,2019年泉州中考总分多少分,泉州中考考试科目设置
- 联合国再请马云出任要职
- MyBatis学习总结(13)——Mybatis查询之resultMap和resultType区别
- ionic 旅途-- 一起来填坑
- redis+lua现实游戏中的一些常用功能
- 数值的整数次方(剑指offer面试题11)
- RssTookit使用小结
- Crontab作业时间设置
- ORA-00600:[kclchkinteg_2]及[kjmsm_epc]内部错误一例