BlockingQueue
网上看了好多文章将线程池的但是似乎都没的多少人会详细讲解里面的任务队列,所以只有自己动手学习其中的任务队列
BlockingQueue
要学习其中的任务队列就需要先学习BlockingQueue,Blocking是一个接口,其中主要的方法为
// 尝试往队尾添加元素,添加成功返回true,添加失败返回falseboolean add(E e);// 尝试往队尾添加元素,添加成功返回true,添加失败返回falseboolean offer(E e);// 尝试往队尾添加元素,如果队列满了,则阻塞当前线程,直到其能够添加成功为止void put(E e) throws InterruptedException;// 尝试往队尾添加元素,如果队列满了,则阻塞当前线程,直到超时boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;// 从队头取出元素,如果队列为空则一直等待E take() throws InterruptedException;// 从队头取出元素,如果队列为空则等待一段时间E poll(long timeout, TimeUnit unit) throws InterruptedException;int remainingCapacity();//从队列中移除指定对象boolean remove(Object o);//判断队列是否存在指定对象public boolean contains(Object o);//将队列中元素转移到指定集合int drainTo(Collection<? super E> c);//将最多MAX个元素转移到指定集合int drainTo(Collection<? super E> c, int maxElements);
ArrayBlockingQueue
ArrayBlockingQueue的底层是基于数组实现,当指定容量后数组就确定了不会发生扩容
参数
// 元素final Object[] items;//可以被取到的元素下标int takeIndex;//可以放入元素的下标int putIndex;//元素个数int count;//锁final ReentrantLock lock;//等待条件,用于队列为空的时候阻塞当前线程获取private final Condition notEmpty;//等待条件,用于队列满的时候阻塞当前线程加入元素private final Condition notFull;transient Itrs itrs = null;
通过上述数据结构可以看出,ArrayBlockingQueue是通过一个循环数组的方式来实现存储元素的,这里takeIndex记录当前可以取元素的索引位置,而putIndex则记录了下一个元素可以放入的位置,如果队列满了则是takeIndex == putIndex,这里可以通过判断count字段来判断当前是处于满状态还是空置状态,通过一个全局锁lock来实现控制
对于其中的方法比较重要的是出队与入队方法,enqueue与dequeue
重要方法
enqueue与dequeue
其中入队与出队就是将对应位置的putIndex与takeIndex放入其中位置即可,然后加一,但是加一要判断是否超过了当前数组最大位置,如果是则设置为0,同时需要唤醒对应条件的等待队列
private void enqueue(E x) {final Object[] items = this.items;items[putIndex] = x;if (++putIndex == items.length)putIndex = 0;count++;notEmpty.signal();}private E dequeue() {final Object[] items = this.items;@SuppressWarnings("unchecked")E x = (E) items[takeIndex];items[takeIndex] = null;if (++takeIndex == items.length)takeIndex = 0;count--;if (itrs != null)itrs.elementDequeued();notFull.signal();return x;}
这是其中内层调用的方法,而外部方法我们提供方法为
put与take
put与take实现了其阻塞队列满足条件的方法
public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == items.length)notFull.await(); //通过while循环以防止当前线程被意外唤醒,如果当前循环被打破则代表没有满了enqueue(e); // 放入元素} finally {lock.unlock();}}public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == 0)notEmpty.await(); //与上面类似return dequeue();} finally {lock.unlock();}}
从这里可以看出ArrayBlockingQueue实现的是先进先出
LinkedBlockingQueue
LinkedBlockingQueue,其底层是通过一个单项链表实现的,由于单项链表需要有一个指向下一个节点的指针,因而其必须使用一个对象这里是Node来存储当前元素的值和下一个节点索引
Node节点
static class Node<E> {//当前元素的值E item;//下一个元素Node<E> next;Node(E x) { item = x; }}
参数
//容量private final int capacity;//当前队列已经存储个数private final AtomicInteger count = new AtomicInteger();//头指针transient Node<E> head;//尾指针private transient Node<E> last;//从队列取出元素的锁private final ReentrantLock takeLock = new ReentrantLock();//等待如果队列为空private final Condition notEmpty = takeLock.newCondition();//放入元素的锁private final ReentrantLock putLock = new ReentrantLock();private final Condition notFull = putLock.newCondition();
这里与ArrayBBlockingQueue存在着一些差异,其中head与last与takeIndex与putIndex都是类似的,但是LinkedBlockingQueue使用了两把锁,而上面只使用了一把锁
重要方法
enqueue与dequeue
private void enqueue(Node<E> node) {//将队列尾部节点的下一个节点指向新的节点,并更新尾部节点为最新的节点last = last.next = node;}//返回头节点的下一个节点并更新头节点//因为头节点存储不是第一个元素private E dequeue() {Node<E> h = head;Node<E> first = h.next;h.next = h; // help GChead = first;E x = first.item;first.item = null;return x;}
可以看到对于链表的入队与出队操作是非常简单的,所以我们需要看其中的take与put方法
take与put
public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();int c = -1;Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly();try {while (count.get() == capacity) {notFull.await(); // 如果满了则进入等待}enqueue(node); //放入元素c = count.getAndIncrement(); //元素个数++if (c + 1 < capacity) //如果添加元素过后还是未满那么则继续唤醒下一个notFull.signal();} finally {putLock.unlock();}if (c == 0) //将等待取出的线程唤醒,而唤醒的时候也必须获取take锁才能唤醒signalNotEmpty();}
public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {while (count.get() == 0) {notEmpty.await(); //同理}x = dequeue();c = count.getAndDecrement();if (c > 1)notEmpty.signal(); //继续获取} finally {takeLock.unlock();}if (c == capacity)signalNotFull(); //同理return x;}
ArrayBlockingQueue与LinkedBlockingQueue区别
1、两种底层数据结构不同,一个是基于循环数组一个是基于单向链表
2、两种阻塞方式不同,ArrayBlockingQueue使用了一个全局锁来处理所有操作,也就是无论插入还是获取都只能一个线程执行,而LinkedBlockingQueue则是使用两个锁,使得获取与放入无干扰
3、两着初始化不同,ArrayBlockingQueue必须指定一个大小初始化而LinkedBlockingQueue则可以不指定,不指定则为Integer.MAX_VALUE
SynchronousQueue
这个阻塞队列就比上面两种麻烦多了,那就需要一步一步理解
SynchronousQueue也是一个队列来的,但他的特别之处在于它内部没有容器,一个生产线程,当它生产产品(即put时候),如果当前没有人想要消费产品此生产线程必须阻塞等待一个消费者调用take操作,take操作将唤醒该生产线程,同时消费线程会获取生产线程的产品(即数据传递),这样的一个过程称一次配对过程
构造器
其构造器可传入是公平还是非公平的,默认是非公平的
如果是公平的则采用TransferQueue如果是非公平的则采用TransferStack
public SynchronousQueue(boolean fair) {transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();}
从源码上课其中的pull、take等方法都素调用transfer方法
transfer中有三个参数:
e:要存放的元素
timed:是否超时等待
nanos:超时等待时间
TransferQueue
TransferQueue内部有一个内部类:QNode,TransferQueue是由QNode节点构成的链表结构
QNode
//下一个节点volatile QNode next;//存入元素 volatile Object item;//等待线程 volatile Thread waiter; //是否是数据final boolean isData;
TransferQueue初始化
TransferQueue创建时会初始化一个QNode节点,head,tail都会指向这个空节点,在TransferQueue中会以根据传入的参数:e是否为null来将节点分为两类,从TransferQueue队列中获取元素的线程是同一类节点,比如:调用take,poll的线程就是同一类节点;从TransferQueue队列中添加元素的线程是一类节点
TransferQueue队列特殊的地方就在于这个队列中只会存在一种节点:要么是获取元素的线程节点,要么是添加元素的线程节点
在初始化TransferQueue对象时,会初始化生产一个节点队列的头,尾:head,tail都会指向这个init节点
举个例子:假设当前队列中都是put线程,此时有一个take线程,那么这个take线程就会唤醒队列中的一个put线程
在唤醒线程时,同时会修改该线程所在节点的item值,在后面分析源码时候会看到,如果只是唤醒线程是没有用的,还需要将item的值修改才能真正唤醒该线程
Transfer
下面就来分析Transfer方法
E transfer(E e, boolean timed, long nanos) {QNode s = null; boolean isData = (e != null); // 判断当前是什么类型线程for (;;) {QNode t = tail;QNode h = head;if (t == null || h == null) continue; if (h == t || t.isData == isData) { // 如果队列为空 || 新类型线程与队列中线程类型一致QNode tn = t.next; if (t != tail) //队列尾节点已经被更新 continue;if (tn != null) { //有新节点加入到队列 advanceTail(t, tn); //更新尾节点continue;}if (timed && nanos <= 0) return null;if (s == null)s = new QNode(e, isData); //将线程包装成QNode节点if (!t.casNext(null, s)) //将新节点添加到队列末尾continue;advanceTail(t, s); //添加成功后更新tail Object x = awaitFulfill(s, e, timed, nanos); //等待被唤醒if (x == s) { //中断标记,带阻塞时间的线程等待了规定时间恢复运行 clean(t, s); //节点从队列中删除return null;}if (!s.isOffList()) { advanceHead(t, s); if (x != null) s.item = s;s.waiter = null;}return (x != null) ? (E)x : e;} else { //唤醒队列节点 // 取出当前节点 QNode m = h.next; if (t != tail || m == null || h != head)continue; Object x = m.item;if (isData == (x != null) || x == m || !m.casItem(x, e)) { //将被唤醒线程的值修改为当前线程的值 advanceHead(h, m); continue;}advanceHead(h, m); LockSupport.unpark(m.waiter); //唤醒线程return (x != null) ? (E)x : e;}}}
awaitFulfill
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {/* Same idea as TransferStack.awaitFulfill */final long deadline = timed ? System.nanoTime() + nanos : 0L;Thread w = Thread.currentThread();int spins = ((head.next == s) ?(timed ? maxTimedSpins : maxUntimedSpins) : 0);for (;;) {if (w.isInterrupted())s.tryCancel(e);Object x = s.item;if (x != e)return x;if (timed) {nanos = deadline - System.nanoTime();if (nanos <= 0L) {s.tryCancel(e);continue;}}if (spins > 0) --spins;else if (s.waiter == null)s.waiter = w;else if (!timed)LockSupport.park(this);else if (nanos > spinForTimeoutThreshold)LockSupport.parkNanos(this, nanos);}}
特别说明一下变量spins,所有进入阻塞队列的线程都不着急立即阻塞,而是会先自旋一段时间,然后再阻塞,因为阻塞线程再唤醒线程的代价就比让线程自选的大
TransferStack
里面存在一个内部类:SNode,TransferStack是由Snode单链表构建成的堆栈结构,只有一个head指针指向链表的表头;每次添加元素都是在表头处添加,新节点称为新的表头head,唤醒的线程的时候也是唤醒head节点,因此就形成了先进后出的堆栈结构,TransferStack中根据e也就线程分为两类,一类是获取元素:REQUEST,一类的添加元素:DATA,其中也只有一种节点只有被唤醒时候才会短暂出现2种节点
SNode
//下一个节点volatile SNode next; volatile SNode match;//当前线程 volatile Thread waiter;//值 Object item;//模式 int mode;
在TransferStack的堆栈中,如果新加入的线程类型与堆栈中的节点类型不同,那么会先将新线程包装成Snode节点加入堆栈中,成为新的header节点并将旧的节点唤醒。然后更新head节点返回DATA类型节点的元素值
在有不同类型的节点进入堆栈中的时候,新节点添加到堆栈顶端并更新为新的head节点;这个节点的mode = REQUEST | FULFILLING ;FULFILLING 是用来标记,表示这个head节点正在唤醒堆栈中的一个节点线程;最后在新节点唤醒旧的head节点( oldHead节点)之后,更新堆栈的head节点;
TransferStack部分的源码就再不分析了,入队阻塞部分的源码几乎与TransferQ ueue一样;TransferStack唤醒节点的方式与TransferQueue有点差别,TransferStack是将新节点先包装成节点添加到堆栈中,再唤醒节点线程,最后重新设置堆栈的head指针并将这2个节点清除出堆栈。
SynchronousQueue 这位大佬写的SynchronousQueue感觉很好,画图也很好只有自己理解但是想不出这些理解的话,感谢这位大佬我只是资源的整合者
BlockingQueue相关推荐
- Java并发编程之——BlockingQueue(队列)
一.什么是BlockingQueue BlockingQueue即阻塞队列,从阻塞这个词可以看出,在某些情况下对阻塞队列的访问可能会造成阻塞.被阻塞的情况主要有如下两种: 1. 当队列满了的时候进行入 ...
- Java中Queue和BlockingQueue的区别
1.BlockingQueue:支持两个附加操作的 Queue,这两个操作是:检索元素时等待队列变为非空,以及存储元素时等待空间变得可用. 2.BlockingQueue 不接受 null 元素. 3 ...
- 阻塞队列BlockingQueue 学习
import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Time ...
- 并发编程(九)—— Java 并发队列 BlockingQueue 实现之 LinkedBlockingQueue 源码分析...
LinkedBlockingQueue 在看源码之前,通过查询API发现对LinkedBlockingQueue特点的简单介绍: 1.LinkedBlockingQueue是一个由链表实现的有界队列阻 ...
- JAVA线程池ThreadPoolExecutor与阻塞队列BlockingQueue .
2019独角兽企业重金招聘Python工程师标准>>> 从Java5开始,Java提供了自己的线程池.每次只执行指定数量的线程,java.util.concurrent.Thread ...
- 建房子之前先挖地基 - Java BlockingQueue理解
最近一直在看<Think In Java>里关于并发部分的章节,读到第二十一章有一个有趣的比喻:必须先挖房子的地基,但是接下来可以并行的铺设钢结构和构建水泥部件,而这两项任务必须在混凝土浇 ...
- java多线程-阻塞队列BlockingQueue
大纲 BlockingQueue接口 ArrayBlockingQueue 一.BlockingQueue接口 public interface BlockingQueue<E> exte ...
- 生产者与消费者(三)---BlockingQueue
前面阐述了实现生产者与消费者问题的两种方式:wait() / notify()方法 和 await() / signal()方法,本文继续阐述多线程的经典问题---生产者与消费者的第三种方式:Bloc ...
- java.util.concurrent BlockingQueue详解
什么是阻塞队列? 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列.这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空.当队列满时,存储元素的线程会等待队列可用.阻塞 ...
- java BlockingQueue 用法
在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全"传输"数据的问题.通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序 ...
最新文章
- 拍下首张黑洞照片的团队获300万美元奖金:2020年科学突破奖揭晓
- 基于RSSI利用KNN位置指纹法的室内定位(卡尔曼滤波)及代码
- 字节对齐和C/C++函数调用方式学习总结(多篇节选)
- 【转】.NET 的 WebSocket 开发包比较
- jieba分词提取小说人名
- 在ASP.NET中实现AJAX
- java 适配器模式记载学习
- 汽车编程都是用matlab,MATLAB编程与汽车仿真应用
- 沾化区php学校,推进校地合作 助力产教融合:滨州市技术学院与沾化经济开发区举行签约仪式...
- PBRT的scene.pbrt使用方法
- rd9700 linux网卡驱动,rd9700 usb网卡驱动
- Cisco(PacketTracer) - 三层交换机
- saver.save和saver.restore
- 八、python爬虫伪装 [免费伪装ip伪装请求头]
- IPO并不遥远,飞哥IPERi模型助你打开互联网创业创新成功密码
- 【Deep Learning】Transformers Assemble(PART I)
- swift monkeyking 社交分享
- 什么是数据驱动业务?
- 平面解析几何----过抛物线外一点和焦点的连线平分切点弦的两切点和焦点组成的角
- 大宗交易数据挖掘(一)