JDK-阻塞队列、非阻塞队列原理
前言
在jdk提供的队列中,有阻塞队列和非阻塞队列
阻塞队列是指:在添加元素的时候,如果队列已经满了,此时会阻塞当前入队的线程,将入队的线程放入到AQS的同步条件队列中,在队列元素出队之后,会尝试唤醒阻塞的线程,阻塞的线程将从条件队列进入到同步等待队列中进行排队;
在出队的时候,也是同样的道理,如果队列中元素为空,就会阻塞出队元素,进入到同步条件队列中,在队列中入队了元素之后,会唤醒阻塞的线程,阻塞的出队线程,从条件队列进入到等待队列中。进行排队,等待获取执行权限
非阻塞队列:在juc中,提供的非阻塞队列,是通过CAS来实现的,也就是说,如果在入队的时候,如果入队失败,会进行CAS自旋。而不是阻塞,进入到条件队列
除了阻塞和非阻塞队列之外,jdk还提供了优先级队里、延迟队列
阻塞队列
阻塞队列中,我比较熟悉的是ArrayBlockingQueue和LinkedBlockingQueue,所以,以这两个为例,来做一个学习
ArrayBlockingQueue
ArrayBlockingQueue,从名字就可以看出来,内部是数组结构,是有界的,那阻塞是怎么实现的呢?
在ArrayBlockingQueue中,维护了一个ReentrantLock和两个condition
/** Main lock guarding all access */
final ReentrantLock lock;/** Condition for waiting takes */
private final Condition notEmpty;/** Condition for waiting puts */
private final Condition notFull;
入队的源码
/**
* 这是入队的操作
* 1.加锁
* 2.判断当前数组中的元素个数等于数组长度,就返回false,表示此时队列已经满了
* 3.入队,入队成功的话,返回true
* 4.解锁
*/
public boolean offer(E e) {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lock();try {if (count == items.length)return false;else {enqueue(e);return true;}} finally {lock.unlock();}
}
出队的源码
/**
* 这是出队的逻辑
* 1.首先加锁
* 2.如果当前队列中为空,就阻塞,调用condition的await()方法,将当前线程放入到同步条件队列中进行休眠
* 3.如果不为空,正常出队,所谓的出队,就是从数组头部出队,返回对应的元素
* 4.然后释放锁
* @return
* @throws InterruptedException
*/
public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == 0)notEmpty.await();return dequeue();} finally {lock.unlock();}
}
可以看到,对于ArrayBlockingQueue来说,出队和入队加的lock是同一把锁lock,所以,在同一时刻,对于ArrayBlockingQueue来说,只会有一个线程去入队或者出队
这样的话,效率就是大大的降低了,所以,linkedBlockingQueue就不一样了
LinkedBlockingQueue
linkedBlockingQueue从名字也可以看到,内部采用的是linked链表结构,假如在初始化的时候,不指定链表的长度,那就是无界的,也就是说,如果我们不指定链表长度,那就是无界的,在put的时候,永远会也不会阻塞;所以,这里所说的阻塞,是指我们在指定链表长度的前提下,才会阻塞
但是,对于get()操作,如果链表为空,都会阻塞
这是两个构造函数,可以看到,在不指定阈值的情况下,capacity的值就是Integer.MAX_VALUE
public LinkedBlockingQueue() {this(Integer.MAX_VALUE);
}/*** Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.** @param capacity the capacity of this queue* @throws IllegalArgumentException if {@code capacity} is not greater* than zero*/
public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;last = head = new Node<E>(null);
}
在LinkedBlockingQueue中,有一个和ArrayBlockingQueue,区别最大的地方是:LinkedBlockingQueue内部维护了两个lock和两个condition,所以,对于linkedBlockingQueue来说,入队和出队用的不是同一个lock,在性能上,会有一定的提升
可以看到,在类中,维护了两个lock和两个condition
/** 出队锁 Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();/** 入队锁 Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
来看下linkedBlockingQueue的出队和入队的源码
需要注意的是:对于linkedBlockingQueue来说,offer入队操作,不会阻塞线程,如果当前链表中元素个数超过了阈值,就会返回false,put方法在入队的时候,如果队列已经满了,就会调用condition的await()方法进入到条件队列中,就不贴源码了,基本思想都是一样的
/*** @throws NullPointerException if the specified element is null* 这是入队的源码* 1.如果入队元素为null,抛出异常* 2.加锁,加putLock* 3.如果当前元素个数小于链表capacity阈值,就入队* 4.*/
public boolean offer(E e) {if (e == null) throw new NullPointerException();final AtomicInteger count = this.count;if (count.get() == capacity)return false;int c = -1;Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;putLock.lock();try {if (count.get() < capacity) {/*** 这里的enqueue方法,就是将当前node节点添加到链表的尾部*/enqueue(node);c = count.getAndIncrement();/*** 如果 + 1之后依旧没有超过阈值,就唤起生产的线程,继续入队*/if (c + 1 < capacity)notFull.signal();}} finally {putLock.unlock();}/*** c == 0这个条件满足,说明之前是空的队列,现在插入了一个,因为初始化的时候,c为 -1* 如果插入了一个元素,就唤起消费的线程去出队*/if (c == 0)signalNotEmpty();return c >= 0;
}
/**
* 这是出队的方法
* 1.加takeLock锁
* 2.如果当前链表为空,就await
* 3.否则,就出队,然后将链表数量 - 1
* 4.如果数量 > 1,就唤醒出队线程,继续出队
* 5.解锁
* 6.最后会判断,如果当前链表长度 = capacity阈值。就唤醒入队线程继续入队
* @return
* @throws InterruptedException
*/
public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {while (count.get() == 0) {notEmpty.await();}x = dequeue();c = count.getAndDecrement();if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}if (c == capacity)signalNotFull();return x;
}
其实对于阻塞队列的阻塞,我之前是一直有疑问的,把源码整个读下来之后,我认为,所谓的阻塞是指:
1.在尝试加锁的时候,线程有可能加锁失败,此时会进入到同步等待队列中排队,等待获取执行权限
2.在加锁成功之后,如果队列为空,出队线程会阻塞;如果队列已经满了,入队线程会阻塞;此时的阻塞,是指线程会进入到同步条件队列中,在队列不为空,或者队列不满的时候,会唤醒该线程,去继续进行队列操作
非阻塞队列
非阻塞队列我是以ConcurrentLinkedQueue为切入点来学习的,所谓的非阻塞,简单来说,就是通过cas来进行入队和出队操作,这样的话,如果有多线程同时入队,会通过cas保证,只有一个线程可以cas,这样的话,其他线程会不停的cas去重试,这里有一个疑问点:
对于这种不停的进行自旋的操作,是否有阻塞的效率高?我觉得如果长时间cas,是否会对CPU造成影响?
我们先来看下入队的源码
public boolean offer(E e) {checkNotNull(e);final Node<E> newNode = new Node<E>(e);/*** 这里的t = p = tail* q = tail.next* 这里的for循环,如果插入失败,会一直重试,直到成功为止*/for (Node<E> t = tail, p = t;;) {Node<E> q = p.next;/*** 如果p.next = null 表示当前p节点就是尾结点* 直接cas设置当前newNode节点到p.next*/if (q == null) {// p is last nodeif (p.casNext(null, newNode)) {// Successful CAS is the linearization point// for e to become an element of this queue,// and for newNode to become "live".if (p != t) // hop two nodes at a timecasTail(t, newNode); // Failure is OK.return true;}// Lost CAS race to another thread; re-read next}/*** 如果p == p.next?*/else if (p == q)// We have fallen off list. If tail is unchanged, it// will also be off-list, in which case we need to// jump to head, from which all live nodes are always// reachable. Else the new tail is a better bet.p = (t != (t = tail)) ? t : head;else// Check for tail updates after two hops.p = (p != t && t != (t = tail)) ? t : q;}
}
可以看到,在入队的时候,并没有像阻塞队列那样,加锁、await等操作
并且,在concurrentLinkedQueue中,维护的node节点,都是volatile修饰的
private transient volatile Node<E> head;private transient volatile Node<E> tail;
并且node节点中的
volatile E item;
volatile Node<E> next;
也是volatile修饰的
JDK-阻塞队列、非阻塞队列原理相关推荐
- Java多线程学习二十五:阻塞和非阻塞队列的并发安全原理||如何选择适合自己的阻塞队列?
阻塞和非阻塞队列的并发安全原理. 之前我们探究了常见的阻塞队列的特点,以 ArrayBlockingQueue 为例, 首先分析 BlockingQueue 即阻塞队列的线程安全原理,然后再看看它的兄 ...
- 阻塞和非阻塞队列下两种生产者消费者实现
队列可分为两种,一种是阻塞队列,一种是非阻塞队列. 阻塞队列和非阻塞队列的区别:阻塞队列可以阻塞,非阻塞队列不能阻塞,只能使用队列wait(),notify()进行队列消息传送.而阻塞队列当队列里面没 ...
- python epoll 并发_Python语言之python并发原理(阻塞、非阻塞、epoll)
本文主要向大家介绍了Python语言之python并发原理(阻塞.非阻塞.epoll),通过具体的内容向大家展示,希望对大家学习Python语言有所帮助. 在Linux系统中 01 阻塞服务端 特征: ...
- 那些年让你迷惑的阻塞、非阻塞、异步、同步
点击上方"方志朋",选择"置顶或者星标" 你的关注意义重大! 在IT圈混饭吃,不管你用什么编程语言.从事前端还是后端,阻塞.非阻塞.异步.同步这些概念,都需要清 ...
- python gevent模块 下载_Python协程阻塞IO非阻塞IO同步IO异步IO
Python-协程-阻塞IO-非阻塞IO-同步IO-异步IO 一.协程 协程又称为微线程 CPU 是无法识别协程的,只能识别是线程,协程是由开发人员自己控制的.协程可以在单线程下实现并发的效果(实际计 ...
- 系统间通信1:阻塞与非阻塞式通信B
版权声明:本文引用https://yinwj.blog.csdn.net/article/details/48274255 接上篇:系统间通信1:阻塞与非阻塞式通信A 4.3 NIO通信框架 目前流行 ...
- linux非阻塞输入函数,Linux fcntl函数设置阻塞与非阻塞
转自http://www.cnblogs.com/xuyh/p/3273082.html 用命令F_GETFL和F_SETFL设置文件标志,比如阻塞与非阻塞 F_SETFL 设置给arg描述符 ...
- Linux Socket网络编程UDP、TCP 阻塞与非阻塞 断线重连机制
三种非阻塞模式的方法: (1) fcntl函数 int Mode = fcntl(sockfd, F_GETFL, 0); //获取文件的Mode值 fcntl(sockfd, F ...
- linux设备驱动中的阻塞与非阻塞(一)
这两天在搞linux驱动的阻塞和非阻塞,困扰了两天,看了不少博客,有了点自己的想法,也不知是否对错,但还是写写吧,让各位大神给我指点指点. 首先说说什么是阻塞和非阻塞的概念:阻塞操作就是指 ...
- linux驱动系列学习之阻塞与非阻塞IO(六)
一. 阻塞与非阻塞IO概念 阻塞操作是指在执行设备操作时,若不能获取资源,则挂起进程进入休眠状态,等待可满足条件后进行操作.被挂起的进程从调度器队列移动到挂起队列(睡眠状态).当操作驱动程序r ...
最新文章
- 一文读懂神经网络初始化!吴恩达Deeplearning.ai最新干货
- SerialPort实现对串口COM的操作(有些纠结)
- 企业网络推广——企业网络推广专员浅析网站关键词优化要注意哪些问题
- 图解Oracle同义词
- Spring中自动装配的方式有哪些?
- java8 函数式编程_使用Javaslang进行Java 8中的函数式编程
- 转:Cocoa Runtime系统知识整理
- Java 1.1.5 空串与 Null 串
- CloudStack 中关于注册ISO模版的问题解决
- 强化学习实战(二)ubuntu16.04安装Anaconda、Gym和 Universe
- 强大的Mockito测试框架
- 手写instanceof (详解原型链) 和 实现绑定解绑和派发的事件类
- 关于AOP的几个问题
- Matlab程序中调用其他程序
- API获取订单接口文档使用说明
- Supervisor守护进程
- #define 用法解析
- python接口自动化测试框架实战从设计到开发_【B0753】[java视频教程]Python接口自动化测试框架设计到开发完整版视频教程 it教程...
- 【地铁网络售票系统】之 后知后觉
- 最大流之Dinic 算法
热门文章
- 算法:24.两两交换链表中的节点
- 获取以及自定义User-Agent在URLSession, NSURLConnection, WKWebView iOS
- Mac查看占用端口进程
- 多层感知机从零开始实现
- android公交车代码,android实现查询公交车还有几站的功能
- 2021-09-09二叉树的最大深度, 深度优先搜索
- 代码整洁之道读书笔记----第一章---综述--第二节-整洁代码和我们的孩子
- 机器学习,斯坦福公开课
- 【2019南京ICPC网络赛 D】Robots【DAG上的随机游走】
- Raki的读paper小记:Model Zoo: A Growing “Brain” That Learns Continually