Java之BlockingQueue
Java之BlockingQueue
再看BlockingQueue之前,首先得了解什么是Queue,以解了解Queue的逻辑结构还有特性,了解ReentrantLock可重入锁是什么。
BlockingQueue是阻塞队列,多应用于多线程开发,线程池。
这里讲解基本的操作,如添加、删除、返回队首元素、清除队列
方式 | 抛出异常 | 有返回值,不抛出异常 | 阻塞等待 | 超时等待 |
---|---|---|---|---|
添加 | add() | offer() | pull() | offer() |
移除 | remove() | poll() | take() | poll() |
检测队首元素 | element() | peak() |
第一种add() remove() element() 继承自AbstractQueue.java
/*** 抛出异常 add remove*/public static void test1(){//队列的大小ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(3);//3代表容量System.out.println(blockingQueue.add("a"));System.out.println(blockingQueue.add("b"));System.out.println(blockingQueue.add("c"));//IllegalStateException: Queue full 抛出异常!System.out.println(blockingQueue.add("d"));System.out.println(blockingQueue.element());//查看队首元素是谁System.out.println(blockingQueue.remove());System.out.println(blockingQueue.remove());System.out.println(blockingQueue.remove());System.out.println(blockingQueue.remove());//NoSuchElementException 抛出异常!}
首先看一下ArrayBlockingQueue()的源码,会发现ArrayBlockingQueue()源码在ArrayBlockingQueue.java下
//ArrayBlockingQueue.javapublic ArrayBlockingQueue(int capacity) {this(capacity, false);}
继续点开this
//ArrayBlockingQueue.javapublic ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0)throw new IllegalArgumentException();this.items = new Object[capacity];lock = new ReentrantLock(fair);notEmpty = lock.newCondition();notFull = lock.newCondition();}
这就是底层的代码,应该都看得懂吧,多看看源码,还是非常有帮助的
add()方法
点开源码我们发现add()方法来自于ArrayBlockingQueue.java
//ArrayBlockingQueue.javapublic boolean add(E e) {return super.add(e);}
继续点super后面的add(e),会发现ArrayBlockingQueue.java里面的add()方法继承于AbstractQueue.java
//AbstractQueue.javapublic boolean add(E e) {if (offer(e))return true;elsethrow new IllegalStateException("Queue full");}
看清楚,这里注意,当队列满的时候,会抛出IllegalStateException(“Queue full”)异常
还有源码中,add()方法的实现基于offer()方法。
remove()方法
同理看一下源码
//AbstractQueue.javapublic E remove() {E x = poll();if (x != null)return x;elsethrow new NoSuchElementException();}
当队列为空时,继续移除会抛出NoSuchElementException()异常,remove()方法实现基于poll()方法
element()方法
//AbstractQueue.javapublic E element() {E x = peek();if (x != null)return x;elsethrow new NoSuchElementException();}
有关于offer()、poll()、peek()方法下面会讲解到
第二种offer()、poll()、peek()方法
/*** 有返回值,没有异常 offer poll*/public static void test2(){ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(2);System.out.println(blockingQueue.offer("a"));System.out.println(blockingQueue.offer("b"));//System.out.println(blockingQueue.offer("c"));System.out.println(blockingQueue.peek());System.out.println(blockingQueue.poll());//false 不抛出异常!System.out.println(blockingQueue.poll());System.out.println(blockingQueue.poll());//System.out.println(blockingQueue.poll());//null 返回null}
直接看方法的源码吧,方便理解
offer()方法
//ArrayBlockingQueue.javapublic boolean offer(E e) {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lock();try {if (count == items.length)return false;else {enqueue(e);return true;}} finally {lock.unlock();}}
可以看到这里加入了可重入锁,保证了线程安全
如果对队列如何存不了解的话,可以点开enqueue(e)
//ArrayBlockingQueue.javaprivate void enqueue(E x) {// assert lock.getHoldCount() == 1;// assert items[putIndex] == null;final Object[] items = this.items;items[putIndex] = x;if (++putIndex == items.length)putIndex = 0;count++;notEmpty.signal();}
自己理解一下
poll()方法
//ArrayBlockingQueue.javapublic E poll() {final ReentrantLock lock = this.lock;lock.lock();try {return (count == 0) ? null : dequeue();} finally {lock.unlock();}}
dequeue()队列取源码
//ArrayBlockingQueue.javaprivate E dequeue() {// assert lock.getHoldCount() == 1;// assert items[takeIndex] != null;final Object[] items = this.items;@SuppressWarnings("unchecked")E x = (E) items[takeIndex];items[takeIndex] = null;if (++takeIndex == items.length)takeIndex = 0;count--;if (itrs != null)itrs.elementDequeued();notFull.signal();return x;}
peek()方法
//ArrayBlockingQueue.javapublic E peek() {final ReentrantLock lock = this.lock;lock.lock();try {return itemAt(takeIndex); // null when queue is empty} finally {lock.unlock();}}
第三种offer(E e, long timeout, TimeUnit unit),poll(long timeout, TimeUnit unit)
/*** 等待,阻塞(等待超时)*/public static void test3() throws InterruptedException{ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(3);blockingQueue.offer("a");blockingQueue.offer("b");blockingQueue.offer("c");blockingQueue.offer("d",2, TimeUnit.SECONDS);System.out.println(blockingQueue.poll());System.out.println(blockingQueue.poll());System.out.println(blockingQueue.poll());System.out.println(blockingQueue.poll(3, TimeUnit.SECONDS));}
这里直接上源码,就是多了参数
//ArrayBlockingQueue.javapublic boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {checkNotNull(e);long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == items.length) {if (nanos <= 0)return false;nanos = notFull.awaitNanos(nanos);}enqueue(e);return true;} finally {lock.unlock();}}
//ArrayBlockingQueue.javapublic E poll(long timeout, TimeUnit unit) throws InterruptedException {long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == 0) {if (nanos <= 0)return null;nanos = notEmpty.awaitNanos(nanos);}return dequeue();} finally {lock.unlock();}}
这里可能对TimeUnit不了解,以及toNanos方法,这里讲一下
//TimeUnit.java部分代码
public enum TimeUnit {/*** Time unit representing one thousandth of a microsecond*/NANOSECONDS {public long toNanos(long d) { return d; }public long toMicros(long d) { return d/(C1/C0); }public long toMillis(long d) { return d/(C2/C0); }public long toSeconds(long d) { return d/(C3/C0); }public long toMinutes(long d) { return d/(C4/C0); }public long toHours(long d) { return d/(C5/C0); }public long toDays(long d) { return d/(C6/C0); }public long convert(long d, TimeUnit u) { return u.toNanos(d); }int excessNanos(long d, long m) { return (int)(d - (m*C2)); }},
这里可以看到toNanos方法,可能还是不太了解但是翻到TimeUnit.java的最后会看到这么一个方法
//TimeUnit.javapublic void sleep(long timeout) throws InterruptedException {if (timeout > 0) {long ms = toMillis(timeout);int ns = excessNanos(timeout, ms);Thread.sleep(ms, ns);}}
是不是很熟悉,如果知道Thread.sleep(),方法的就会发现,这里怎么也有sleep()方法,其实是Thread.sleep()方法的包装,这里将月、日、时、分、秒封装成方法,提供了更强的可读性,建议使用TimeUnit
第四种pu()t take()
/*** 等待,阻塞(一直阻塞) put take*/public static void test4() throws InterruptedException {ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(3);blockingQueue.put("a");blockingQueue.put("b");blockingQueue.put("c");//blockingQueue.put("d");System.out.println(blockingQueue.take());System.out.println(blockingQueue.take());System.out.println(blockingQueue.take());System.out.println(blockingQueue.take());}
直接上源码,对照offer()和poll()看
//ArrayBlockingQueue.javapublic void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == items.length)notFull.await();enqueue(e);} finally {lock.unlock();}}
//ArrayBlockingQueue.javapublic E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == 0)notEmpty.await();return dequeue();} finally {lock.unlock();}}
请大家注意对比一下两个的返回值
有什么问题的话,请大家赐教
如果大家看不懂的话,可以点开这个链接狂神说Java
–JDK1.8 API
链接:https://pan.baidu.com/s/11r2vCXO6Y2nm9wPTyijqmA
提取码:abcd
推荐大家多看看API,这一个系列主要还是讲JUC下的
java.util.concurrent
java.util.concurrent.atomic
java.util.concurrent.locks 三个包
Java之BlockingQueue相关推荐
- blockingqueue java_记录 Java 的 BlockingQueue 中的一些坑
最近学习了 BlockingQueue,发现 java 的 BlockingQueue 并不是每一个实现都按照 BlockingQueue 的语意来的,其中有不少坑. 直接上代码吧: 1.关于Prio ...
- 谈谈java的BlockingQueue
最近在维护一个java工程,在群里面也就聊起来java的优劣!无奈一些Java的终极粉丝,总是号称性能已经不必C++差,并且很多标准类库都是大师级的人写的,如何如何稳定等等.索性就认真研究一番,他们给 ...
- 使用Java的BlockingQueue实现生产者-消费者
BlockingQueue也是java.util.concurrent下的主要用来控制线程同步的工具. BlockingQueue有四个具体的实现类,根据不同需求,选择不同的实现类 1.ArrayBl ...
- Java线程--BlockingQueue使用
https://www.cnblogs.com/fanerwei222/p/11871704.html
- Java面试中,一些常见的有关多线程问题!
面试作为入职的第一道门槛,其重要性不言而喻.对于从事IT的很多工作人员而言,对面试往往信心不足,毕竟在真实面试中,会遇到很多技术问题,万一哪块技术点不熟,就会与心仪的offer失之交臂.接下来,小千以 ...
- Java基础教程:多线程基础(3)——阻塞队列
Java基础教程:多线程基础(3)--阻塞队列 快速开始 引入问题 生产者消费者问题是线程模型中的经典问题:生产者和消费者在同一时间段内共用同一存储空间,生产者向空间里生产数据,而消费者取走数据. 模 ...
- java面试题题目与解析(自己网上找的):java208
1.JDK和JRE 有什么区别? jdk 是提供给开发人员的编译工具,里面包含: jvm(java virtual machine java虚拟机//就是可识别java命令代码,把他转成操作系统可识别 ...
- Java并发与多线程
1.多线程优点 资源利用率更好:文件读写操作 程序设计在某些情况下更简单: 程序响应更快:端口监听操作 2.多线程的代价 设计更复杂:多线程共享数据时尤其需要注意 上下文切换的开销: CPU 会在一个 ...
- Java中的线程池如何实现,一文彻底搞懂
前言 为什么要用线程池一键获取线程相关资料,还可获取最新java面试真题库 在 HotSpot VM 的线程模型中,Java 线程被一对一映射为内核线程. Java 在使用线程执行程序时,需要调用操作 ...
最新文章
- oracle full outer join,oracle 内连接(inner join)、外连接(outer join)、全连接(full join)...
- python3 gzip 压缩/解压
- Golang 判断key是否在map中
- python中的try...except...finally函数的用法
- 一个鸡蛋”改变TA的世界——让贫困地区的孩子每天都能吃上一个鸡蛋
- Wannafly挑战赛22 B	字符路径 ( 拓扑排序+dp )
- 【转】基于DCMTK的DICOM相关程序编写攻略
- Linux查找命令find、loacte、whereis、which、type梳理
- ASP.NET Aries JSAPI 文档说明:AR.DataGrid
- 39-java 输入输出总结
- nyoj1086是否被整除(数学小技巧)
- SVN忽略提交文件设置
- NFT抢购合集工具(免费)
- 洛谷 P1359 租用游艇(简单dp/Dijkstra)
- 论文笔记1 MOEFL Multi-objective Evolutionary Federated Learning
- line 1: syntax error: unexpected (
- beamer笔记——幻灯片比例改为16:9
- UVM学习笔记(一)工厂、phase机制、config
- 软件行业排名前100名的企业大全
- 安卓apk解析包失败,重新签名