一、大致介绍
1、了解过netty原理的童鞋,其实应该知道工作线程组的每个子线程都维护了一个任务队列;
2、细心的童鞋会发现netty的队列是重写了队列的实现方法,覆盖了父类中的LinkedBlockingQueue队列,但是如今却换成了JCTools的一些并发队列,因为JCTools是一款对jdk并发数据结构进行增强的并发工具;
3、那么问题就来了,现在的netty要用新的队列呢?难道是新的队列确实很高效么?
4、那么本章节就来和大家分享分析一下Netty新采用的队列之一MpscUnboundedArrayQueue,分析Netty的源码版本为:netty-netty-4.1.22.Final;

二、预备知识
2.1 构造队列
1、源码:
// NioEventLoop.java
@Override
protected Queue newTaskQueue(int maxPendingTasks) {
// This event loop never calls takeTask()
// 由于默认是没有配置io.netty.eventLoop.maxPendingTasks属性值的,所以maxPendingTasks默认值为Integer.MAX_VALUE;
// 那么最后配备的任务队列的大小也就自然使用无参构造队列方法
return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.newMpscQueue()
: PlatformDependent.newMpscQueue(maxPendingTasks);
}

// PlatformDependent.java
/*** Create a new {@link Queue} which is safe to use for multiple producers (different threads) and a single* consumer (one thread!).* @return A MPSC queue which may be unbounded.*/
public static <T> Queue<T> newMpscQueue() {return Mpsc.newMpscQueue();
}// Mpsc.java
static <T> Queue<T> newMpscQueue() {return USE_MPSC_CHUNKED_ARRAY_QUEUE ? new MpscUnboundedArrayQueue<T>(MPSC_CHUNK_SIZE): new MpscUnboundedAtomicArrayQueue<T>(MPSC_CHUNK_SIZE);
}

2、通过源码回顾,想必大家已经隐约回忆起之前分析过这段代码,我们在构建工作线程管理组的时候,还需要实例化子线程数组children[],所以自然就会碰到这段代码;

3、这段代码其实就是为了实现一个无锁方式的线程安全队列,总之一句话,效率相当相当的高;

2.2 何为JCTools?
1、JCTools是服务虚拟机并发开发的工具,提供一些JDK没有的并发数据结构辅助开发。

2、是一个聚合四种 SPSC/MPSC/SPMC/MPMC 数据变量的并发队列:
• SPSC:单个生产者对单个消费者(无等待、有界和无界都有实现)
• MPSC:多个生产者对单个消费者(无锁、有界和无界都有实现)
• SPMC:单生产者对多个消费者(无锁 有界)
• MPMC:多生产者对多个消费者(无锁、有界)

3、SPSC/MPSC 提供了一个在性能,分配和分配规则之间的平衡的关联数组队列;

2.3 常用重要的成员属性及方法
1、private volatile long producerLimit;
// 数据链表所分配或者扩展后的容量值

2、protected long producerIndex;
// 生产者指针,每添加一个数据,指针加2

3、protected long consumerIndex;
// 消费者指针,每移除一个数据,指针加2

4、private static final int RETRY = 1; // 重新尝试,有可能是因为并发原因,CAS操作指针失败,所以需要重新尝试添加动作
private static final int QUEUE_FULL = 2; // 队列已满,直接返回false操作
private static final int QUEUE_RESIZE = 3; // 需要扩容处理,扩容的后的容量值producerLimit一般都是mask的N倍
// 添加数据时,根据offerSlowPath返回的状态值来做各种处理

5、protected E[] producerBuffer;
// 数据缓冲区,需要添加的数据放在此

6、protected long producerMask;
// 生产者扩充容量值,一般producerMask与consumerMask是一致的,而且需要扩容的数值一般和此值一样

7、public boolean offer(final E e)
// 添加元素

8、public E poll()
// 移除元素

2.4 数据结构
1、如果chunkSize初始化大小为4,则最后显示的数据结构如下:
E1,E2,。。。,EN:表示具体的元素;
NBP:表示下一个缓冲区的指针,我采用的是英文的缩写( Next Buffer Pointer);

而且你看着我是拆分开写的,其实每一个NBP指向的就是下面一组缓冲区;
Buffer1中的NBP其实就是Buffer2的指针引用;
Buffer2中的NBP其实就是Buffer3的指针引用;
以此类推。。。
±-----±-----±-----±-----±-----+
| | | | | |
| E1 | E2 | E3 | JUMP | NBP | Buffer1
| | | | | |
±-----±-----±-----±-----±-----+

±-----±-----±-----±-----±-----+
| | | | | |
| E5 | E6 | JUMP | E4 | NBP | Buffer2
| | | | | |
±-----±-----±-----±-----±-----+

±-----±-----±-----±-----±-----+
| | | | | |
| E9 | JUMP | E7 | E8 | NBP | Buffer3
| | | | | |
±-----±-----±-----±-----±-----+

±-----±-----±-----±-----±-----+
| | | | | |
| JUMP | E10 | E11 | E12 | NBP | Buffer4
| | | | | |
±-----±-----±-----±-----±-----+

±-----±-----±-----±-----±-----+
| | | | | |
| E13 | E14 | E15 | JUMP | NBP | Buffer5
| | | | | |
±-----±-----±-----±-----±-----+

2、这个数据结构和我们通常所认知的链表是不是有点异样,其实大体还是雷同的,这种数据结构其实也是指针的单项指引罢了;

三、源码分析MpscUnboundedArrayQueue
3.1、MpscUnboundedArrayQueue(int)
1、源码:
// MpscUnboundedArrayQueue.java
public MpscUnboundedArrayQueue(int chunkSize)
{
super(chunkSize); // 调用父类的含参构造方法
}

// BaseMpscLinkedArrayQueue.java
/*** @param initialCapacity the queue initial capacity. If chunk size is fixed this will be the chunk size.*                        Must be 2 or more.*/
public BaseMpscLinkedArrayQueue(final int initialCapacity)
{// 校验队列容量值,大小必须不小于2RangeUtil.checkGreaterThanOrEqual(initialCapacity, 2, "initialCapacity");// 通过传入的参数通过Pow2算法获取大于initialCapacity最近的一个2的n次方的值int p2capacity = Pow2.roundToPowerOfTwo(initialCapacity);// leave lower bit of mask clearlong mask = (p2capacity - 1) << 1; // 通过p2capacity计算获得mask值,该值后续将用作扩容的值// need extra element to point at next arrayE[] buffer = allocate(p2capacity + 1); // 默认分配一个 p2capacity + 1 大小的数据缓冲区producerBuffer = buffer;producerMask = mask;consumerBuffer = buffer;consumerMask = mask;// 同时用mask作为初始化队列的Limit值,当生产者指针producerIndex超过该Limit值时就需要做扩容处理soProducerLimit(mask); // we know it's all empty to start with
}// RangeUtil.java
public static int checkGreaterThanOrEqual(int n, int expected, String name)
{// 要求队列的容量值必须不小于 expected 值,这个 expected 值由上层决定,但是对 MpscUnboundedArrayQueue 而言,expected 为 2;// 那么就是说 MpscUnboundedArrayQueue 的值必须不小于 2;if (n < expected){throw new IllegalArgumentException(name + ": " + n + " (expected: >= " + expected + ')');}return n;
}

2、通过调用父类的构造方法,分配了一个数据缓冲区,初始化容量大小,并且容量值不小于2,差不多就这样队列的实例化操作已经完成了;

3.2、offer(E)
1、源码:
// BaseMpscLinkedArrayQueue.java
@Override
public boolean offer(final E e)
{
if (null == e) // 待添加的元素e不允许为空,否则抛空指针异常
{
throw new NullPointerException();
}

    long mask;E[] buffer;long pIndex;while (true){long producerLimit = lvProducerLimit(); // 获取当前数据Limit的阈值pIndex = lvProducerIndex(); // 获取当前生产者指针位置// lower bit is indicative of resize, if we see it we spin until it's clearedif ((pIndex & 1) == 1){continue;}// pIndex is even (lower bit is 0) -> actual index is (pIndex >> 1)// mask/buffer may get changed by resizing -> only use for array access after successful CAS.mask = this.producerMask;buffer = this.producerBuffer;// a successful CAS ties the ordering, lv(pIndex) - [mask/buffer] -> cas(pIndex)// assumption behind this optimization is that queue is almost always empty or near emptyif (producerLimit <= pIndex) // 当阈值小于等于生产者指针位置时,则需要扩容,否则直接通过CAS操作对pIndex做加2处理{// 通过offerSlowPath返回状态值,来查看怎么来处理这个待添加的元素int result = offerSlowPath(mask, pIndex, producerLimit);switch (result){case CONTINUE_TO_P_INDEX_CAS:break;case RETRY: // 可能由于并发原因导致CAS失败,那么则再次重新尝试添加元素continue;case QUEUE_FULL: // 队列已满,直接返回false操作return false;case QUEUE_RESIZE: // 队列需要扩容操作resize(mask, buffer, pIndex, e); // 对队列进行直接扩容操作return true;}}// 能走到这里,则说明当前的生产者指针位置还没有超过阈值,因此直接通过CAS操作做加2处理if (casProducerIndex(pIndex, pIndex + 2)) {break;}}// INDEX visible before ELEMENT// 获取计算需要添加元素的位置final long offset = modifiedCalcElementOffset(pIndex, mask);// 在buffer的offset位置添加e元素soElement(buffer, offset, e); // release element ereturn true;
}// BaseMpscLinkedArrayQueueProducerFields.java
@Override
public final long lvProducerIndex()
{// 通过Unsafe对象调用native方法,获取生产者指针位置return UNSAFE.getLongVolatile(this, P_INDEX_OFFSET);
}   // UnsafeRefArrayAccess.java
/*** An ordered store(store + StoreStore barrier) of an element to a given offset** @param buffer this.buffer* @param offset computed via {@link UnsafeRefArrayAccess#calcElementOffset}* @param e      an orderly kitty*/
public static <E> void soElement(E[] buffer, long offset, E e)
{// 通过Unsafe对象调用native方法,将元素e设置到buffer缓冲区的offset位置UNSAFE.putOrderedObject(buffer, offset, e);
}

2、此方法为添加新的元素对象,当pIndex指针超过阈值producerLimit时则扩容处理,否则直接通过CAS操作添加记录pIndex位置;

3.3、offerSlowPath(long, long, long)
1、源码:
// BaseMpscLinkedArrayQueue.java
/**
* We do not inline resize into this method because we do not resize on fill.
*/
private int offerSlowPath(long mask, long pIndex, long producerLimit)
{
// 获取消费者指针
final long cIndex = lvConsumerIndex();
// 获取当前缓冲区的容量值,getCurrentBufferCapacity方法由子类MpscUnboundedArrayQueue实现,默认返回mask值
long bufferCapacity = getCurrentBufferCapacity(mask);

    // 如果消费指针加上容量值如果超过了生产指针,那么则会尝试进行扩容处理if (cIndex + bufferCapacity > pIndex){if (!casProducerLimit(producerLimit, cIndex + bufferCapacity)){// retry from topreturn RETRY;}else{// continue to pIndex CASreturn CONTINUE_TO_P_INDEX_CAS;}}// full and cannot grow 子类MpscUnboundedArrayQueue默认返回Integer.MAX_VALUE值,所以不会进入此分支else if (availableInQueue(pIndex, cIndex) <= 0){// offer should return false;return QUEUE_FULL;}// grab index for resize -> set lower bit 尝试扩容队列else if (casProducerIndex(pIndex, pIndex + 1)){// trigger a resizereturn QUEUE_RESIZE;}else{// failed resize attempt, retry from topreturn RETRY;}
}// MpscUnboundedArrayQueue.java
@Override
protected long getCurrentBufferCapacity(long mask)
{// 获取当前缓冲区的容量值return mask;
}   // BaseMpscLinkedArrayQueue.java
final boolean casProducerLimit(long expect, long newValue)
{// 通过CAS尝试对阈值进行修改扩容处理return UNSAFE.compareAndSwapLong(this, P_LIMIT_OFFSET, expect, newValue);
}// MpscUnboundedArrayQueue.java
@Override
protected long availableInQueue(long pIndex, long cIndex)
{// 获取可用容量值return Integer.MAX_VALUE;
}   // BaseMpscLinkedArrayQueueProducerFields.java
final boolean casProducerIndex(long expect, long newValue)
{// 通过CAS操作更新生产者指针return UNSAFE.compareAndSwapLong(this, P_INDEX_OFFSET, expect, newValue);
}

2、该方法主要通过一系列的if…else判断,并结合子类MpscUnboundedArrayQueue的一些重写方法来判断针对该新添加的元素要做何种状态处理;

3.4、resize(long, E[], long, E)
1、源码:
// BaseMpscLinkedArrayQueue.java
private void resize(long oldMask, E[] oldBuffer, long pIndex, E e)
{
// 获取oldBuffer的长度值
int newBufferLength = getNextBufferSize(oldBuffer);
// 重新创建新的缓冲区
final E[] newBuffer = allocate(newBufferLength);

    producerBuffer = newBuffer; // 将新创建的缓冲区赋值到生产者缓冲区对象上final int newMask = (newBufferLength - 2) << 1;producerMask = newMask;// 根据oldMask获取偏移位置值final long offsetInOld = modifiedCalcElementOffset(pIndex, oldMask);// 根据newMask获取偏移位置值final long offsetInNew = modifiedCalcElementOffset(pIndex, newMask);// 将元素e设置到新的缓冲区newBuffer的offsetInNew位置处soElement(newBuffer, offsetInNew, e);// element in new array// 通过nextArrayOffset(oldMask)计算新的缓冲区将要放置旧的缓冲区的哪个位置// 将新的缓冲区newBuffer设置到旧的缓冲区oldBuffer的nextArrayOffset(oldMask)位置处// 主要是将oldBuffer中最后一个元素的位置指向新的缓冲区newBuffer// 这样就构成了一个单向链表指向的关系soElement(oldBuffer, nextArrayOffset(oldMask), newBuffer);// buffer linked// ASSERT codefinal long cIndex = lvConsumerIndex();final long availableInQueue = availableInQueue(pIndex, cIndex);RangeUtil.checkPositive(availableInQueue, "availableInQueue");// Invalidate racing CASs// We never set the limit beyond the bounds of a buffer// 重新扩容阈值,因为availableInQueue反正都是Integer.MAX_VALUE值,所以自然就取mask值啦// 因此针对MpscUnboundedArrayQueue来说,扩容的值其实就是mask的值的大小soProducerLimit(pIndex + Math.min(newMask, availableInQueue));// make resize visible to the other producers// 设置生产者指针加2处理soProducerIndex(pIndex + 2);// INDEX visible before ELEMENT, consistent with consumer expectation// make resize visible to consumer// 用一个空对象来衔接新老缓冲区,凡是在缓冲区中碰到JUMP对象的话,那么就得琢磨着准备着获取下一个缓冲区的数据元素了soElement(oldBuffer, offsetInOld, JUMP);
}// MpscUnboundedArrayQueue.java
@Override
protected int getNextBufferSize(E[] buffer)
{// 获取buffer缓冲区的长度return length(buffer);
}// LinkedArrayQueueUtil.java
static int length(Object[] buf)
{// 直接通过length属性来获取数组的长度return buf.length;
}   // CircularArrayOffsetCalculator.java
@SuppressWarnings("unchecked")
public static <E> E[] allocate(int capacity)
{// 根据容量值创建数组return (E[]) new Object[capacity];
}

2、该方法主要完成新的元素的放置,同时也完成了扩容操作,采用单向链表指针关系,将原缓冲区和新创建的缓冲区衔接起来;

3.5、poll()
1、源码:
// BaseMpscLinkedArrayQueue.java
/**
* {@inheritDoc}
*

* This implementation is correct for single consumer thread use only.
*/
@SuppressWarnings(“unchecked”)
@Override
public E poll()
{
final E[] buffer = consumerBuffer; // 获取缓冲区的数据
final long index = consumerIndex;
final long mask = consumerMask;

    // 根据消费指针与mask来获取当前需要从哪个位置开始来移除元素final long offset = modifiedCalcElementOffset(index, mask);// 从buffer缓冲区的offset位置获取元素内容Object e = lvElement(buffer, offset);// LoadLoadif (e == null) // 如果元素为null的话{// 则再探讨看看消费指针是不是和生产指针是不是相同if (index != lvProducerIndex()){// poll() == null iff queue is empty, null element is not strong enough indicator, so we must// check the producer index. If the queue is indeed not empty we spin until element is// visible.// 若不相同的话,则先尝试从buffer缓冲区的offset位置获取元素先,若获取元素为null则结束while处理do{e = lvElement(buffer, offset);}while (e == null);}// 说明消费指针是不是和生产指针是相等的,那么则缓冲区的数据已经被消费完了,直接返回null即可else{return null;}}// 如果元素为JUMP空对象的话,那么意味着我们就得获取下一缓冲区进行读取数据了if (e == JUMP){// final E[] nextBuffer = getNextBuffer(buffer, mask);// return newBufferPoll(nextBuffer, index);}// 能执行到这里,说明需要移除的元素既不是空的,也不是JUMP空对象,那么则就按照正常处理置空即可// 移除元素时,则将buffer缓冲区的offset位置的元素置为空即可soElement(buffer, offset, null); // release element null// 同时也通过CAS操作增加消费指针的关系,加2操作soConsumerIndex(index + 2); // release cIndexreturn (E) e;
}// BaseMpscLinkedArrayQueueProducerFields.java
@Override
public final long lvProducerIndex()
{// 通过Unsafe对象调用native方法,获取当前生产者指针值return UNSAFE.getLongVolatile(this, P_INDEX_OFFSET);
}   // UnsafeRefArrayAccess.java
/*** A volatile load (load + LoadLoad barrier) of an element from a given offset.** @param buffer this.buffer* @param offset computed via {@link UnsafeRefArrayAccess#calcElementOffset(long)}* @return the element at the offset*/
@SuppressWarnings("unchecked")
public static <E> E lvElement(E[] buffer, long offset)
{// 通过Unsafe对象调用native方法,获取buffer缓冲区offset位置的元素return (E) UNSAFE.getObjectVolatile(buffer, offset);
}// BaseMpscLinkedArrayQueue.java
@SuppressWarnings("unchecked")
private E[] getNextBuffer(final E[] buffer, final long mask)
{// 获取下一个缓冲区的偏移位置值final long offset = nextArrayOffset(mask);// 从buffer缓冲区的offset位置获取下一个缓冲区数组final E[] nextBuffer = (E[]) lvElement(buffer, offset);// 获取出来后,同时将buffer缓冲区的offset位置置为空,代表指针已经被取出,原来位置没用了,清空即可soElement(buffer, offset, null);return nextBuffer;
}// BaseMpscLinkedArrayQueue.java
private E newBufferPoll(E[] nextBuffer, long index)
{// 从下一个新的缓冲区中找到需要移除数据的指针位置final long offset = newBufferAndOffset(nextBuffer, index);// 从newBuffer新的缓冲区中offset位置取出元素final E n = lvElement(nextBuffer, offset);// LoadLoadif (n == null) // 若取出的元素为空,则直接抛出异常{throw new IllegalStateException("new buffer must have at least one element");}// 如果取出的元素不为空,那么先将这个元素原先的位置内容先清空掉soElement(nextBuffer, offset, null);// StoreStore// 然后通过Unsafe对象调用native方法,修改消费指针的数值偏移加2处理soConsumerIndex(index + 2);return n;
}

2、该方法主要阐述了该队列是如何的移除数据的;取出的数据如果为JUMP空对象的话,那么则准备从下一个缓冲区获取数据元素,否则还是从当前的缓冲区对象中移除元素,并且更新消费指针;

3.6、size()
1、源码:
// BaseMpscLinkedArrayQueue.java
@Override
public final int size()
{
// NOTE: because indices are on even numbers we cannot use the size util.

    /** It is possible for a thread to be interrupted or reschedule between the read of the producer and* consumer indices, therefore protection is required to ensure size is within valid range. In the* event of concurrent polls/offers to this method the size is OVER estimated as we read consumer* index BEFORE the producer index.*/long after = lvConsumerIndex(); // 获取消费指针long size;while (true) // 为了防止在获取大小的时候指针发生变化,那么则死循环自旋方式获取大小数值{final long before = after;final long currentProducerIndex = lvProducerIndex(); // 获取生产者指针after = lvConsumerIndex(); // 获取消费指针// 如果后获取的消费指针after和之前获取的消费指针before相等的话,那么说明此刻还没有指针变化if (before == after){// 那么则直接通过生产指针直接减去消费指针,然后向偏移一位,即除以2,得出最后size大小size = ((currentProducerIndex - after) >> 1);// 计算完了之后则直接break中断处理break;}// 若消费指针前后不一致,那么可以说是由于并发原因导致了指针发生了变化;// 那么则进行下一次循环继续获取最新的指针值再次进行判断}// Long overflow is impossible, so size is always positive. Integer overflow is possible for the unbounded// indexed queues.if (size > Integer.MAX_VALUE){return Integer.MAX_VALUE;}else{return (int) size;}
}

2、获取缓冲区数据大小其实很简单,就是拿着生产指针减去消费指针,但是为了防止并发操作计算错,才用了死循环的方式计算zise值;

3.7、isEmpty()
1、源码:
// BaseMpscLinkedArrayQueue.java
@Override
public final boolean isEmpty()
{
// Order matters!
// Loading consumer before producer allows for producer increments after consumer index is read.
// This ensures this method is conservative in it’s estimate. Note that as this is an MPMC there is
// nothing we can do to make this an exact method.
// 这个就简单了,直接判断消费指针和生产指针是不是相等就知道了
return (this.lvConsumerIndex() == this.lvProducerIndex());
}

2、通过前面我们已经知道了,添加数据的话生产指针在不停的累加操作,而做移除数据的时候消费指针也在不停的累加操作;

3、那么这种指针总会有一天会碰面的吧,碰面的那个时候则是数据已经空空如也的时刻;

四、性能测试
1、测试Demo:
/**

  • 比较队列的消耗情况。

  • @author hmilyylimh

  • _

  • @version 0.0.1

  • _

  • @date 2018/3/30
    */
    public class CompareQueueCosts {

    /** 生产者数量 */
    private static int COUNT_OF_PRODUCER = 2;

    /** 消费者数量 */
    private static final int COUNT_OF_CONSUMER = 1;

    /** 准备添加的任务数量值 */
    private static final int COUNT_OF_TASK = 1 << 20;

    /** 线程池对象 */
    private static ExecutorService executorService;

    public static void main(String[] args) throws Exception {

     for (int j = 1; j < 7; j++) {COUNT_OF_PRODUCER = (int) Math.pow(2, j);executorService = Executors.newFixedThreadPool(COUNT_OF_PRODUCER * 2);int basePow = 8;int capacity = 0;for (int i = 1; i <= 3; i++) {capacity = 1 << (basePow + i);System.out.print("Producers: " + COUNT_OF_PRODUCER + "\t\t");System.out.print("Consumers: " + COUNT_OF_CONSUMER + "\t\t");System.out.print("Capacity: " + capacity + "\t\t");System.out.print("LinkedBlockingQueue: " + testQueue(new LinkedBlockingQueue<Integer>(capacity), COUNT_OF_TASK) + "s" + "\t\t");// System.out.print("ArrayList: " + testQueue(new ArrayList<Integer>(capacity), COUNT_OF_TASK) + "s" + "\t\t");// System.out.print("LinkedList: " + testQueue(new LinkedList<Integer>(), COUNT_OF_TASK) + "s" + "\t\t");System.out.print("MpscUnboundedArrayQueue: " + testQueue(new MpscUnboundedArrayQueue<Integer>(capacity), COUNT_OF_TASK) + "s" + "\t\t");System.out.print("MpscChunkedArrayQueue: " + testQueue(new MpscChunkedArrayQueue<Integer>(capacity), COUNT_OF_TASK) + "s" + "\t\t");System.out.println();}System.out.println();executorService.shutdown();}
    

    }

    private static Double testQueue(final Collection queue, final int taskCount) throws Exception {
    CompletionService completionService = new ExecutorCompletionService(executorService);

     long start = System.currentTimeMillis();for (int i = 0; i < COUNT_OF_PRODUCER; i++) {executorService.submit(new Producer(queue, taskCount / COUNT_OF_PRODUCER));}for (int i = 0; i < COUNT_OF_CONSUMER; i++) {completionService.submit((new Consumer(queue, taskCount / COUNT_OF_CONSUMER)));}for (int i = 0; i < COUNT_OF_CONSUMER; i++) {completionService.take().get();}long end = System.currentTimeMillis();return Double.parseDouble("" + (end - start)) / 1000;
    

    }

    private static class Producer implements Runnable {
    private Collection queue;
    private int taskCount;

     public Producer(Collection<Integer> queue, int taskCount) {this.queue = queue;this.taskCount = taskCount;}@Overridepublic void run() {// Queue队列if (this.queue instanceof Queue) {Queue<Integer> tempQueue = (Queue<Integer>) this.queue;while (this.taskCount > 0) {if (tempQueue.offer(this.taskCount)) {this.taskCount--;} else {// System.out.println("Producer offer failed.");}}}// List列表else if (this.queue instanceof List) {List<Integer> tempList = (List<Integer>) this.queue;while (this.taskCount > 0) {if (tempList.add(this.taskCount)) {this.taskCount--;} else {// System.out.println("Producer offer failed.");}}}}
    

    }

    private static class Consumer implements Callable {
    private Collection queue;
    private int taskCount;

     public Consumer(Collection<Integer> queue, int taskCount) {this.queue = queue;this.taskCount = taskCount;}@Overridepublic Long call() {// Queue队列if (this.queue instanceof Queue) {Queue<Integer> tempQueue = (Queue<Integer>) this.queue;while (this.taskCount > 0) {if ((tempQueue.poll()) != null) {this.taskCount--;}}}// List列表else if (this.queue instanceof List) {List<Integer> tempList = (List<Integer>) this.queue;while (this.taskCount > 0) {if (!tempList.isEmpty() && (tempList.remove(0)) != null) {this.taskCount--;}}}return 0L;}
    

    }
    }

2、指标结果:
Producers: 2 Consumers: 1 Capacity: 512 LinkedBlockingQueue: 1.399s MpscUnboundedArrayQueue: 0.109s MpscChunkedArrayQueue: 0.09s
Producers: 2 Consumers: 1 Capacity: 1024 LinkedBlockingQueue: 1.462s MpscUnboundedArrayQueue: 0.041s MpscChunkedArrayQueue: 0.048s
Producers: 2 Consumers: 1 Capacity: 2048 LinkedBlockingQueue: 0.281s MpscUnboundedArrayQueue: 0.037s MpscChunkedArrayQueue: 0.082s

Producers: 4 Consumers: 1 Capacity: 512 LinkedBlockingQueue: 0.681s MpscUnboundedArrayQueue: 0.085s MpscChunkedArrayQueue: 0.133s
Producers: 4 Consumers: 1 Capacity: 1024 LinkedBlockingQueue: 0.405s MpscUnboundedArrayQueue: 0.094s MpscChunkedArrayQueue: 0.172s
Producers: 4 Consumers: 1 Capacity: 2048 LinkedBlockingQueue: 0.248s MpscUnboundedArrayQueue: 0.107s MpscChunkedArrayQueue: 0.153s

Producers: 8 Consumers: 1 Capacity: 512 LinkedBlockingQueue: 1.523s MpscUnboundedArrayQueue: 0.093s MpscChunkedArrayQueue: 0.172s
Producers: 8 Consumers: 1 Capacity: 1024 LinkedBlockingQueue: 0.668s MpscUnboundedArrayQueue: 0.094s MpscChunkedArrayQueue: 0.281s
Producers: 8 Consumers: 1 Capacity: 2048 LinkedBlockingQueue: 0.555s MpscUnboundedArrayQueue: 0.078s MpscChunkedArrayQueue: 0.455s

Producers: 16 Consumers: 1 Capacity: 512 LinkedBlockingQueue: 2.676s MpscUnboundedArrayQueue: 0.093s MpscChunkedArrayQueue: 0.753s
Producers: 16 Consumers: 1 Capacity: 1024 LinkedBlockingQueue: 2.135s MpscUnboundedArrayQueue: 0.093s MpscChunkedArrayQueue: 0.792s
Producers: 16 Consumers: 1 Capacity: 2048 LinkedBlockingQueue: 0.944s MpscUnboundedArrayQueue: 0.098s MpscChunkedArrayQueue: 0.64s

Producers: 32 Consumers: 1 Capacity: 512 LinkedBlockingQueue: 6.647s MpscUnboundedArrayQueue: 0.078s MpscChunkedArrayQueue: 2.109s
Producers: 32 Consumers: 1 Capacity: 1024 LinkedBlockingQueue: 3.893s MpscUnboundedArrayQueue: 0.095s MpscChunkedArrayQueue: 1.797s
Producers: 32 Consumers: 1 Capacity: 2048 LinkedBlockingQueue: 2.019s MpscUnboundedArrayQueue: 0.109s MpscChunkedArrayQueue: 2.427s

Producers: 64 Consumers: 1 Capacity: 512 LinkedBlockingQueue: 26.59s MpscUnboundedArrayQueue: 0.078s MpscChunkedArrayQueue: 3.627s
Producers: 64 Consumers: 1 Capacity: 1024 LinkedBlockingQueue: 22.566s MpscUnboundedArrayQueue: 0.093s MpscChunkedArrayQueue: 3.047s
Producers: 64 Consumers: 1 Capacity: 2048 LinkedBlockingQueue: 1.719s MpscUnboundedArrayQueue: 0.093s MpscChunkedArrayQueue: 2.549s

3、结果分析(一):
通过结果打印耗时可以明显看到MpscUnboundedArrayQueue耗时几乎大多数都是不超过0.1s的,这添加、删除的操作效率不是一般的高,这也难怪人家netty要舍弃自己写的队列框架了;

4、结果分析(二):
CompareQueueCosts代码里面我将ArrayList、LinkedList注释掉了,那是因为队列数量太大,List的操作太慢,效率低下,所以在大量并发的场景下,大家还是能避免则尽量避免,否则就遭殃了;

五、总结
1、通过底层无锁的Unsafe操作方式实现了多生产者同时访问队列的线程安全模型;

2、由于使用锁会造成的线程切换,特别消耗资源,因此使用无锁而是采用CAS的操作方式,虽然会在一定程度上造成CPU使用率过高,但是整体上将效率还是听可观的;

3、队列的数据结构是一种单向链表式的结构,通过生产、消费指针来标识添加、移除元素的指针位置,缓冲区与缓冲区之间通过指针指向,避免的数组的复制,较少了大量内存的占用情况;

六、推荐springcloud学习下载地址
https://gitee.com/ylimhhmily/SpringCloudTutorial.git

SpringCloudTutorial交流QQ群: 235322432

SpringCloudTutorial交流微信群: 微信沟通群二维码图片链接

欢迎关注,您的肯定是对我最大的支持!!!

原文:https://blog.csdn.net/ylimh_hmily/article/details/79765018

原理剖析-Netty之无锁队列相关推荐

  1. 原理剖析(第 012 篇)Netty之无锁队列MpscUnboundedArrayQueue原理分析

    原理剖析(第 012 篇)Netty之无锁队列MpscUnboundedArrayQueue原理分析 - 一.大致介绍 1.了解过netty原理的童鞋,其实应该知道工作线程组的每个子线程都维护了一个任 ...

  2. 无锁队列原理及实现(一)

    背景 在进行实际生产多线程开发的时候通常不会直接使用使用锁机制来操作线程间传递的数据,特别是对效率要求很高的场景中.最典型的就是音视频项目或者网络项目.这里先拿网络传输场景举例, 从这篇开始就开始详细 ...

  3. ZMQ无锁队列的原理与实现

    ZMQ无锁队列的原理与实现 前言 1. 为什么需要⽆锁队列 2. 无锁队列的实现(参考zmq,只支持一写一读的场景) 2.1 无锁队列前言 2.2 原⼦操作函数介绍 2.3 yqueue_t的chun ...

  4. 无锁队列原理及实现(二)

    环形队列 Ring Buffer 上一篇降到了环形队列的一些性质.现在来说实现. 从需求开看环形队列需要进行两个操作 写入 读取 而作为一个大小有限内存区域,就会有临界状态来限制置否能写入成功,或者是 ...

  5. 你应该知道的高性能无锁队列Disruptor

    1.何为队列 听到队列相信大家对其并不陌生,在我们现实生活中队列随处可见,去超市结账,你会看见大家都会一排排的站得好好的,等待结账,为什么要站得一排排的,你想象一下大家都没有素质,一窝蜂的上去结账,不 ...

  6. 无锁队列以及ABA问题

    队列是我们非常常用的数据结构,用来提供数据的写入和读取功能,而且通常在不同线程之间作为数据通信的桥梁.不过在将无锁队列的算法之前,需要先了解一下CAS(compare and swap)的原理.由于多 ...

  7. 【高并发】多线程之无锁队列|性能优化

    队列操作模型 (1)单生产者--单消费者 (2)多生产者--单消费者 (3)单生产者--多消费者 (4)多生产者--多消费者 3.队列数据定长与变长 (1)队列数据定长 (2)队列数据变长 并发无锁处 ...

  8. CAS无锁队列的实现

    文章目录 1. 基本原理 2. 代码实现 2.1 使用链表实现无锁队列 2.2 使用数组实现环形无锁队列 3. ABA 问题及解决 4. 参考资料 1. 基本原理 源于1994年10月发表在国际并行与 ...

  9. 无锁CAS及无锁队列实现

    CAS ⽐较并交换(compare and swap, CAS),是原⼦操作的⼀种,可⽤于在多线程编程中实现不被打断的数据 交换操作,从⽽避免多线程同时改写某⼀数据时由于执⾏顺序不确定性以及中断的不可 ...

最新文章

  1. at24c16如何划分出多个读写区_如何1分钟遍历100T数据?
  2. api key for Alpha Vantage
  3. window wamp中配置安装xhprof步骤(windows)
  4. 3520a SDL_tff库做bmp 也就是osd
  5. 【华为云技术分享】基于自动机器学习的心脏病预测模型(1)
  6. springboot集成Spring Security oauth2(八)
  7. Spring 集成mybatis 3.几之后,打印sql语句到控制台
  8. 远程控制工具ToDesk手机端测评,移动办公增强,pad变电脑
  9. 【nvidia】1.命令行方式安装nvidia显卡驱动
  10. 基于SSM+VUE的交通事故案例库系统(前后端分离)
  11. php汉字存储容量大小,存储400个24*24点阵汉字字形所需的存储容量是多少
  12. 理解Mybatis一级缓存,以及如何真正使用到一级缓存
  13. CSS z-index与JQ fadeOut()缓动效果无效问题
  14. css3 transition属性实现三角形
  15. 用科傻软件,求平面网和高程网的平差
  16. trie树之敏感词过滤算法
  17. mysql主键自增策略_MySQL 自增主键机制
  18. Qt设计师使用和原理
  19. 一文让你彻底了解市面蓝牙架构,无忧蓝牙产品选型
  20. 2014CSU-ACM暑假集训训练赛--七夕专场

热门文章

  1. jfinal连接oracle_连接jfinal
  2. resultMap标签中里的association标签
  3. 2023年大学英语B统考题库网考大学英语B试题(完整版)
  4. 电子设备必须通过电磁兼容试验
  5. python miio 连接小米网关_时隔五年小米门窗传感器重磅升级开合光线检测二合一 仅49元...
  6. 面试中人力资源部常问的问题
  7. 【AI人工智能】人工智能简介——AI 的发展是否会导致人类失去工作?
  8. 视频播放插件(video.js)
  9. 辐射3游戏登录是提示计算机丢失xlive.dll文件,win10系统玩辐射3丢失xlive.dll怎么解决...
  10. 微信公众号网页调用微信扫一扫功能