Java提供了两种新的容器类型:Queue和BlockingQueue。

  Queue用于保存一组等待处理的元素。它提供了几种实现,包括:ConcurrentLinkedQueue,这是一个先进先出的并发对列,以及PriorityQueue,这是一个非并发的优先队列。Queue上的操作不会阻塞,如果队列为空,获取元素的操作将返回空值。虽然可以用List来模拟一个Queue的行为----事实上正是通过LinkedList来实现Queue的行为的,但还需要一个Queue的类,因为它能去掉List的随机访问需求,从而实现更高效的并发。

  BlockingQueue扩展了Queue,增加了可阻塞的插入和获取操作。如果队列为空那么获取元素的操作将会一直阻塞,直到队列出现一个可以可用的元素。如果队列已满(对于有界队列来说),那么插入元素的操作将一直阻塞,直到队列中出现可用的元素。在"生产者-消费者"设计模式中,阻塞队还是非常有用的。

1.Queue的使用

  Queue接口与List、Set同一级别,都是继承了Collection接口。
  Queue使用时要尽量避免Collection的add()和remove()方法,而是要使用offer()来加入元素,使用poll()来获取并移出元素。它们的优点是通过返回值可以判断成功与否,add()和remove()方法在失败的时候会抛出异常。 如果要使用前端而不移出该元素,使用element()或者peek()方法。
  值得注意的是LinkedList类实现了Queue接口,因此我们可以把LinkedList当成Queue来用。

  Queue的方法也非常简单,就是三组(一个会抛出异常,一个返回特殊值):

方法 抛出异常 不会抛出异常
插入 boolean add(E e); boolean offer(E e);
移除(返回且移除头元素)  E remove(); E poll();
检查(返回头元素但不删除) E element(); E peek();

例如:(poll()返回了null,remove()抛出异常了)

package cn.qlq.thread.thirteen;import java.util.LinkedList;
import java.util.Queue;public class Demo1 {public static void main(String[] args) {Queue<String> queue = new LinkedList<String>();String poll = queue.poll();System.out.println(poll);String remove = queue.remove();System.out.println(remove);}
}

结果:

null
Exception in thread "main" java.util.NoSuchElementException
  at java.util.LinkedList.removeFirst(LinkedList.java:268)
  at java.util.LinkedList.remove(LinkedList.java:683)
  at cn.qlq.thread.thirteen.Demo1.main(Demo1.java:11)

2.BlockingQueue的使用

  BlockingQueue继承Queue接口,位于并发包下,对Queue接口进行了扩展。

package java.util.concurrent;import java.util.Collection;
import java.util.Queue;public interface BlockingQueue<E> extends Queue<E> {boolean add(E e);boolean offer(E e);void put(E e) throws InterruptedException;boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException;E take() throws InterruptedException;E poll(long timeout, TimeUnit unit)throws InterruptedException;int remainingCapacity();boolean remove(Object o);public boolean contains(Object o);int drainTo(Collection<? super E> c);int drainTo(Collection<? super E> c, int maxElements);
}

  阻塞队列提供了可阻塞的put和take方法,以及支持定时的offer和poll方法。如果队列已经满了,那么put方法将阻塞到有空间可用;如果队列为空,那么take方法将会阻塞到有元素可用。队列可以是有界的,也可以是无界的,无界队列永远不会充满,因此无界队列的put方法永远也不会阻塞。(offer方法如果数据项不能添加到队列中,就会返回一个失败状态。这样就能够创建更多灵活的策略来处理负荷过载的情况,例如减轻负载,将多余的工作项序列化并写入磁盘,减少生产者线程的数量,或者通过某种方式来抑制生产者线程)

  BlockingQueue简化了生产者-消费者模式的设计过程,消费者不需要知道生产者是谁,生产者也不需要知道生产者是谁;而且支持任意数量的生产者与消费者。一种最常见的生产者-消费者设计模式就是线程池与工作队列的组合,在Executor任务执行框架中就体现了这种模式。

  

  一个经典的例子:以洗盘子为例子,一个人洗完盘子把盘子放在盘架上,另一个人负责从盘架上取出盘子并把他们烘干。在这个例子中,盘架就相当于一个阻塞队列。如果盘架上没有盘子,消费者会一直等待,如果盘架满了,生产者会一直等待。我们可以将这种类比扩展为多个生产者与多个消费者,每个工人只需要与盘架打交道。人们不需要知道谁是生产者谁是消费者。

  生产者和消费者的角色是相对的,某种环境下的生产者在另一种不同的环境中可能会变为消费者。比如烘干盘子的人将"消费"洗干净的湿盘子,而产生烘干的盘子。第三个人把洗干净的盘子整理好,在这种情况下,烘干盘子的人是生产者也是消费者,从而就有了两个共享的队列(每个对垒对列可能阻塞烘干工作的运行)。

  

  JDK中有多个BlockingQueue的实现,其中LinkedBlockingQueue和ArrayBlockingQueue是FIFO队列,二者分别于LinkedList和ArrayList类似,但比同步List拥有更好的同步性能。PriorityBlockingQueue队列是一个按优先级排列的队列,这个队列可以根据元素的自然顺序来比较元素(如果他们实现了Comparable方法),也可以使用Comparator来比较。

  还有一个是SynchronousQueue,实际上它不是一个真正的队列,因为它不会维护队列中元素的存储空间,与其他队列不同的是,它维护一组线程,这些线程在等待把元素加入或移除队列。如果以洗盘子为例,那么久相当于没有盘架而是直接将洗好的盘子放入下一个空闲的烘干机中。这种方式看似很奇怪,由于可以直接交付工作降低了将数据从生产者移到消费者的延迟。因为SynchronousQueue没有存储功能,因此put和take会一直阻塞,直到有另一个线程准备好参与到交付过程,仅当有足够多的消费者,并且总是有一个消费者准备获取交付工作时,才适合使用同步队列。

BlockingQueue中的方法:

BlockingQueue既然是Queue的子接口,必然有Queue中的方法,上面已经列了。看一下BlockingQueue中特有的方法:

(1)void put(E e) throws InterruptedException

  把e添加进BlockingQueue中,如果BlockingQueue中没有空间,则调用线程被阻塞,进入等待状态,直到BlockingQueue中有空间再继续

(2)void take() throws InterruptedException

  取走BlockingQueue里面排在首位的对象,如果BlockingQueue为空,则调用线程被阻塞,进入等待状态,直到BlockingQueue有新的数据被加入

(3)int drainTo(Collection<? super E> c, int maxElements)

  一次性取走BlockingQueue中的数据到c中,可以指定取的个数。通过该方法可以提升获取数据效率,不需要多次分批加锁或释放锁

2.1   ArrayBlockingQueue的简单使用

  基于数组的阻塞队列,必须指定队列大小。比较简单。ArrayBlockingQueue中只有一个ReentrantLock对象,这意味着生产者和消费者无法并行运行。创建ArrayBlockingQueue可以指定锁的公平性,默认是非公平锁,如下源码:

    final ReentrantLock lock;private final Condition notEmpty;private final Condition notFull;public ArrayBlockingQueue(int capacity) {this(capacity, false);}public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0)throw new IllegalArgumentException();this.items = new Object[capacity];lock = new ReentrantLock(fair);notEmpty = lock.newCondition();notFull =  lock.newCondition();}

例如:基于ArrayBlockingQueue的单生产单消费模式:

package cn.qlq.thread.thirteen;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class Demo2 {private static int num ;private static final Logger LOGGER = LoggerFactory.getLogger(Demo2.class);public static void main(String[] args) throws InterruptedException {final BlockingQueue<String> strings = new ArrayBlockingQueue<>(1);//必须指定容量(指定容器最多为1)Thread producer = new Thread(new Runnable() {@Overridepublic void run() {try {for  (int i=0;i<5;i++) {String ele = "ele"+(++num);strings.put(ele);LOGGER.info("ThreadName ->{} put ele->{}",Thread.currentThread().getName(),ele);}} catch (InterruptedException e) {e.printStackTrace();}}},"producer");producer.start();Thread consumer = new Thread(new Runnable() {@Overridepublic void run() {try {for (int i=0;i<5;i++) {Thread.sleep(1*1000);String take = strings.take();LOGGER.info("ThreadName ->{} take ele->{}",Thread.currentThread().getName(),take);}} catch (InterruptedException e) {e.printStackTrace();}}},"consumer");consumer.start();}
}

结果:(可以看到生产者放进元素之后会等元素被拿走之后才会继续生成元素)

11:00:04 [cn.qlq.thread.thirteen.Demo2]-[INFO] ThreadName ->producer put ele->ele1
11:00:05 [cn.qlq.thread.thirteen.Demo2]-[INFO] ThreadName ->consumer take ele->ele1
11:00:05 [cn.qlq.thread.thirteen.Demo2]-[INFO] ThreadName ->producer put ele->ele2
11:00:06 [cn.qlq.thread.thirteen.Demo2]-[INFO] ThreadName ->consumer take ele->ele2
11:00:06 [cn.qlq.thread.thirteen.Demo2]-[INFO] ThreadName ->producer put ele->ele3
11:00:07 [cn.qlq.thread.thirteen.Demo2]-[INFO] ThreadName ->producer put ele->ele4
11:00:07 [cn.qlq.thread.thirteen.Demo2]-[INFO] ThreadName ->consumer take ele->ele3
11:00:08 [cn.qlq.thread.thirteen.Demo2]-[INFO] ThreadName ->producer put ele->ele5
11:00:08 [cn.qlq.thread.thirteen.Demo2]-[INFO] ThreadName ->consumer take ele->ele4
11:00:09 [cn.qlq.thread.thirteen.Demo2]-[INFO] ThreadName ->consumer take ele->ele5

2.2  LinkedBlockingQueue 简单使用

  类似于LinkedList,基于链表的阻塞队列。此队列如果不指定容量大小,默认采用Integer.MAX_VALUE(可以理解为无限队列)。此外LinkedBlockingList有两个锁,意味着生产者和消费者都有自己的锁。如下源码:

    private transient Node<E> head;
private transient Node<E> last;
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();public LinkedBlockingQueue() {this(Integer.MAX_VALUE);}public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;last = head = new Node<E>(null);}public LinkedBlockingQueue(Collection<? extends E> c) {this(Integer.MAX_VALUE);final ReentrantLock putLock = this.putLock;putLock.lock(); // Never contended, but necessary for visibilitytry {int n = 0;for (E e : c) {if (e == null)throw new NullPointerException();if (n == capacity)throw new IllegalStateException("Queue full");enqueue(new Node<E>(e));++n;}count.set(n);} finally {putLock.unlock();}}

例如:基于LinkedBlockingQueue的多生产多消费模式:

package cn.qlq.thread.thirteen;import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class Demo3 {private static int num ;private static final Logger LOGGER = LoggerFactory.getLogger(Demo3.class);public static void main(String[] args) throws InterruptedException {final BlockingQueue<String> strings = new LinkedBlockingQueue<>(3);Runnable producerRun = new Runnable() {@Overridepublic synchronized void  run() {//加同步避免出现线程非安全try {for  (int i=0;i<5;i++) {Thread.sleep(1000);String ele = "ele"+(++num);strings.put(ele);LOGGER.info("ThreadName ->{} put ele->{}",Thread.currentThread().getName(),ele);}} catch (InterruptedException e) {e.printStackTrace();}}};Thread producer = new Thread(producerRun,"producer");producer.start();Thread producer2 = new Thread(producerRun,"producer2");producer2.start();Runnable consumerRun = new Runnable() {@Overridepublic void run() {try {for (int i=0;i<5;i++) {Thread.sleep(3000);String take = strings.take();LOGGER.info("ThreadName ->{} take ele->{}",Thread.currentThread().getName(),take);}} catch (InterruptedException e) {e.printStackTrace();}}};Thread consumer = new Thread(consumerRun,"consumer");Thread consumer1 = new Thread(consumerRun,"consumer1");consumer.start();consumer1.start();}
}

结果:

11:46:47 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->producer put ele->ele1
11:46:48 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->producer put ele->ele2
11:46:49 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->consumer take ele->ele2
11:46:49 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->consumer1 take ele->ele1
11:46:49 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->producer put ele->ele3
11:46:50 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->producer put ele->ele4
11:46:51 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->producer put ele->ele5
11:46:52 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->consumer take ele->ele3
11:46:52 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->consumer1 take ele->ele4
11:46:52 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->producer2 put ele->ele6
11:46:53 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->producer2 put ele->ele7
11:46:55 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->producer2 put ele->ele8
11:46:55 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->consumer take ele->ele6
11:46:55 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->consumer1 take ele->ele5
11:46:56 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->producer2 put ele->ele9
11:46:58 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->consumer take ele->ele7
11:46:58 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->consumer1 take ele->ele8
11:46:58 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->producer2 put ele->ele10
11:47:01 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->consumer take ele->ele10
11:47:01 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->consumer1 take ele->ele9

2.3  PriorityBlockingQueue简单使用

  PriorityBlockingQueue 是一个按优先级排列的阻塞队列,类似于TreeSet,看到tree,可以按顺序进行排列,就要想到两个接口。Comparable(集合中元素实现这个接口,元素自身具备可比性),Comparator(比较器,传入容器构造方法中,容器具备可比性)。

  其内部只有一个Lock,所以生产消费者不能同时作业,而且默认的容量是11,其构造方法也可以传入一个比较器,如下源码:

   /*** Default array capacity.*/private static final int DEFAULT_INITIAL_CAPACITY = 11;public PriorityBlockingQueue() {this(DEFAULT_INITIAL_CAPACITY, null);}public PriorityBlockingQueue(int initialCapacity) {this(initialCapacity, null);}public PriorityBlockingQueue(int initialCapacity,Comparator<? super E> comparator) {if (initialCapacity < 1)throw new IllegalArgumentException();this.lock = new ReentrantLock();this.notEmpty = lock.newCondition();this.comparator = comparator;this.queue = new Object[initialCapacity];}public PriorityBlockingQueue(Collection<? extends E> c) {this.lock = new ReentrantLock();this.notEmpty = lock.newCondition();boolean heapify = true; // true if not known to be in heap orderboolean screen = true;  // true if must screen for nullsif (c instanceof SortedSet<?>) {SortedSet<? extends E> ss = (SortedSet<? extends E>) c;this.comparator = (Comparator<? super E>) ss.comparator();heapify = false;}else if (c instanceof PriorityBlockingQueue<?>) {PriorityBlockingQueue<? extends E> pq =(PriorityBlockingQueue<? extends E>) c;this.comparator = (Comparator<? super E>) pq.comparator();screen = false;if (pq.getClass() == PriorityBlockingQueue.class) // exact matchheapify = false;}Object[] a = c.toArray();int n = a.length;// If c.toArray incorrectly doesn't return Object[], copy it.if (a.getClass() != Object[].class)a = Arrays.copyOf(a, n, Object[].class);if (screen && (n == 1 || this.comparator != null)) {for (int i = 0; i < n; ++i)if (a[i] == null)throw new NullPointerException();}this.queue = a;this.size = n;if (heapify)heapify();}

测试按年龄逆序排列:

package cn.qlq.thread.thirteen;import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;public class Demo4 {public static void main(String[] args) throws InterruptedException {BlockingQueue<Person> persons = new PriorityBlockingQueue<Person>(3);persons.put(new Person(20,"張三"));persons.put(new Person(22,"李四"));persons.put(new Person(21,"王五"));persons.put(new Person(18,"八卦"));System.out.println(persons.take());System.out.println(persons.take());System.out.println(persons.take());System.out.println(persons.take());}
}class Person implements Comparable<Person>{private int age;private String name;public int getAge() {return age;}public void setAge(int age) {this.age = age;}@Overridepublic String toString() {return "Person [age=" + age + ", name=" + name + "]";}public String getName() {return name;}public void setName(String name) {this.name = name;}public Person(int age, String name) {super();this.age = age;this.name = name;}@Overridepublic int compareTo(Person o) {//返回-1表示排在他前面,返回1表示排在他后面if(o.getAge() > this.getAge()  ){return 1;}else if(o.getAge() < this.getAge()){return -1;}return 0;}
}

结果:

Person [age=22, name=李四]
Person [age=21, name=王五]
Person [age=20, name=張三]
Person [age=18, name=八卦]

2.4 SynchronousQueue简单使用

   前面已经介绍了,SynchronousQueue实际上它不是一个真正的队列,因为它不会维护队列中元素的存储空间,与其他队列不同的是,它维护一组线程,这些线程在等待把元素加入或移除队列。适用于生产者少消费者多的情况。

例如:

ArrayBlockingQueue有一个数组存储队列元素:

    /** The queued items */final Object[] items;

LinedBlockingQueue有一个内部Node类存储元素:

    /*** Linked list node class*/static class Node<E> {E item;Node<E> next;Node(E x) { item = x; }}

PriorityBlockingQueue有一个数组用于存储元素

private transient Object[] queue;

  可以这么理解,SynchronousQueue是生产者直接把数据给消费者(消费者直接从生产者这里拿数据)。换句话说,每一个插入操作必须等待一个线程对应的移除操作。SynchronousQueue又有两种模式:

1、公平模式

  采用公平锁,并配合一个FIFO队列(Queue)来管理多余的生产者和消费者

2、非公平模式

  采用非公平锁,并配合一个LIFO栈(Stack)来管理多余的生产者和消费者,这也是SynchronousQueue默认的模式

如下源码:

    public SynchronousQueue() {this(false);}public SynchronousQueue(boolean fair) {transferer = fair ? new TransferQueue() : new TransferStack();}

transferer 是一个内部类用于在生产者和消费者之间传递数据
    abstract static class Transferer {/*** Performs a put or take.**/abstract Object transfer(Object e, boolean timed, long nanos);}

例如:直接put元素会阻塞

package cn.qlq.thread.thirteen;import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class Demo5 {private static final Logger LOGGER = LoggerFactory.getLogger(Demo5.class);public static void main(String[] args) throws InterruptedException {BlockingQueue<String> persons = new SynchronousQueue<String>();persons.put("1");LOGGER.info("放入元素 1");LOGGER.info("獲取元素 "+persons.take());}
}

结果:(线程会一直处于阻塞状态,由于没有消费者线程消费元素所以一直处于阻塞,所以不会执行LOGGER.info()的代码)

解决办法:生产元素之前,先开启消费者线程:(也就是必须确保生产的元素有消费者在take(),否则会阻塞)

package cn.qlq.thread.thirteen;import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class Demo5 {private static final Logger LOGGER = LoggerFactory.getLogger(Demo5.class);public static void main(String[] args) throws InterruptedException {final BlockingQueue<String> strings = new SynchronousQueue<String>();Thread consumer = new Thread(new Runnable() {@Overridepublic void run() {try {String take = strings.take();LOGGER.info("ThreadName ->{} take ele->{}",Thread.currentThread().getName(),take);} catch (InterruptedException e) {e.printStackTrace();}}},"consumer");consumer.start();strings.put("1");LOGGER.info("放入元素 1");}
}

结果:(正常打印信息,并且进程也结束)

2.5  还有一个延迟队列DelayQueue---此队列可以实现有序与延迟的效果

  DelayQueue是一个无界阻塞队列,只有在延迟期满时才能从中提取元素。(获取元素的时候获取的是头部元素,而且头部元素只有在延迟期小于0才可以取出来)

  为了具有调用行为,存放到DelayDeque的元素必须继承Delayed接口。Delayed接口使对象成为延迟对象,它使存放在DelayQueue类中的对象具有了激活日期。该接口继承Comparable接口,如下:

public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}

  CompareTo(Delayed o):Delayed接口继承了Comparable接口,因此有了这个方法。
  getDelay(TimeUnit unit):这个方法返回到激活日期的剩余时间,时间单位由单位参数指定。(返回值为负数的时候才可以take()出来)

  此类也是只有一把锁,而且内部维护一个PriorityQueue用于存放有序队列(实现有序),查看源码:

    private transient final ReentrantLock lock = new ReentrantLock();private final PriorityQueue<E> q = new PriorityQueue<E>();
private Thread leader = null;public DelayQueue() {}
public DelayQueue(Collection<? extends E> c) {this.addAll(c);}public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {E first = q.peek();if (first == null)available.await();else {long delay = first.getDelay(TimeUnit.NANOSECONDS);if (delay <= 0)return q.poll();else if (leader != null)available.await();else {Thread thisThread = Thread.currentThread();leader = thisThread;try {available.awaitNanos(delay);} finally {if (leader == thisThread)leader = null;}}}}} finally {if (leader == null && q.peek() != null)available.signal();lock.unlock();}}

例如:测试队列中放入5s以后的元素才可以取出来:

package cn.qlq.thread.thirteen;import java.util.Date;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class Demo6 {private static final Logger LOGGER = LoggerFactory.getLogger(Demo6.class);public static void main(String[] args) throws InterruptedException {final BlockingQueue<DelayObj> delayObjs = new DelayQueue<DelayObj>();DelayObj delayObj = new DelayObj("1");delayObjs.put(delayObj);LOGGER.info("放入元素->{}", delayObj);Thread.sleep(1 * 1000);DelayObj delayObj2 = new DelayObj("3");delayObjs.put(delayObj2);LOGGER.info("放入元素->{}", delayObj2);LOGGER.info("{}", delayObjs.take());LOGGER.info("{}", delayObjs.take());}
}class DelayObj implements Delayed {private Date createTime;private String name;public Date getCreateTime() {return createTime;}public void setCreateTime(Date createTime) {this.createTime = createTime;}public DelayObj(String name) {this.createTime = new Date();this.name = name;}public String getName() {return name;}public void setName(String name) {this.name = name;}@Overridepublic int compareTo(Delayed o) { // 返回负数表示在前面,返回正数表示在后面if (this.getDelay(TimeUnit.NANOSECONDS) > o.getDelay(TimeUnit.NANOSECONDS)) {// NANOSECONDS是十亿分之秒return -1;} else if (this.getDelay(TimeUnit.NANOSECONDS) < o.getDelay(TimeUnit.NANOSECONDS)) {return 1;}return 0;}@Overridepublic String toString() {return "DelayObj [createTime=" + createTime + ", name=" + name + "]";}@Overridepublic long getDelay(TimeUnit unit) {Date now = new Date();long diff = createTime.getTime() + 5 * 1000 - now.getTime();System.out.println(diff);return unit.convert(diff, TimeUnit.MILLISECONDS);}
}

结果: (可以看到先获取的是最后创建的元素,而且只有在延迟期为0才可以获取到---实现了有序加延迟)

23:10:44 [cn.qlq.thread.thirteen.Demo6]-[INFO] 放入元素->DelayObj [createTime=Wed Dec 26 23:10:44 CST 2018, name=1]
4997
3976
23:10:45 [cn.qlq.thread.thirteen.Demo6]-[INFO] 放入元素->DelayObj [createTime=Wed Dec 26 23:10:45 CST 2018, name=3]
4991
-3
23:10:50 [cn.qlq.thread.thirteen.Demo6]-[INFO] DelayObj [createTime=Wed Dec 26 23:10:45 CST 2018, name=3]
-1023
23:10:50 [cn.qlq.thread.thirteen.Demo6]-[INFO] DelayObj [createTime=Wed Dec 26 23:10:44 CST 2018, name=1]

Queue和BlockingQueue的使用以及使用BlockingQueue实现生产者-消费者相关推荐

  1. 生产者-消费者中的缓冲区:BlockingQueue接口

    BlockingQueue接口使用场景 相信大家对生产者-消费者模式不陌生,这个经典的多线程协作模式,最简单的描述就是生产者线程往内存缓冲区中提交任务,消费者线程从内存缓冲区里获取任务执行.在生产者- ...

  2. Java阻塞队列(BlockingQueue)实现 生产者/消费者 示例

    Java阻塞队列(BlockingQueue)实现 生产者/消费者 示例 本文由 TonySpark 翻译自 Javarevisited.转载请参见文章末尾的要求. Java.util.concurr ...

  3. blockingqueue java_记录 Java 的 BlockingQueue 中的一些坑

    最近学习了 BlockingQueue,发现 java 的 BlockingQueue 并不是每一个实现都按照 BlockingQueue 的语意来的,其中有不少坑. 直接上代码吧: 1.关于Prio ...

  4. 使用Java的BlockingQueue实现生产者-消费者

    BlockingQueue也是java.util.concurrent下的主要用来控制线程同步的工具. BlockingQueue有四个具体的实现类,根据不同需求,选择不同的实现类 1.ArrayBl ...

  5. Java多线程(十):BlockingQueue实现生产者消费者模型

    BlockingQueue BlockingQueue.解决了多线程中,如何高效安全"传输"数据的问题.程序员无需关心什么时候阻塞线程,什么时候唤醒线程,该唤醒哪个线程. 方法介绍 ...

  6. python queue 生产者 消费者_【python】-- 队列(Queue)、生产者消费者模型

    队列(Queue) 在多个线程之间安全的交换数据信息,队列在多线程编程中特别有用 队列的好处: 提高双方的效率,你只需要把数据放到队列中,中间去干别的事情. 完成了程序的解耦性,两者关系依赖性没有不大 ...

  7. python queue 生产者 消费者_Queue: 应用于生产者-消费者模式的Python队列

    图片来源于网络 版权声明 © 著作权归作者所有 允许自由转载,但请保持署名和原文链接. 不允许商业用途.盈利行为及衍生盈利行为. 什么是Queue? Queue是Python标准库中的线程安全的队列( ...

  8. java queue通信_Java -- 使用阻塞队列(BlockingQueue)控制线程通信

    BlockingQueeu接口是Queue的子接口,但是它的主要作用并不是作为容器,而是作为线程同步的工具. 特征: 当生产者线程试图向BlockingQueue中放入元素时,如果该队列已满,则该线程 ...

  9. 生产者-消费者 BlockingQueue 运用示例

    简单说明: 1.生产者负责将字符串转换成int 数字放入BlockingQueue,失败就停止生产消费线程. 2.消费者从BlockingQueue获得数字,取平方根值,并累积值.如果有负数,失败!停 ...

  10. 生产者消费者ReentrantLock实现以及BlockingQueue实现

    1.ReentrantLock实现 /*** 描述: 一个初始值为0的变量,两个线程对其交替操作,一个加一个减** @author xinjiao.yu@marketin.cn* @create 20 ...

最新文章

  1. leetcode 46 全排列
  2. 面试必备:LinkedHashMap源码解析(JDK8)
  3. 反射获取构造方法并使用【应用】
  4. nginx php post限制,nginx + php 跨域问题,GET可以跨域成功,POST失败
  5. 操作系统---进程篇
  6. 记一次微信数据库解密过程
  7. springboot日志管理_最近Springboot有点火,只是因为面试问的频率高吗?
  8. 2020低压电工作业考试题库及低压电工模拟考试系统
  9. 网络编程在线英英词典之历史查询模块(六)
  10. 腾讯火力全开“吃鸡”:下一个游戏行业风口怎能错过?
  11. 成为一名嵌入式Linux开发工程师需要学习哪些知识?
  12. 图像处理之Texture Synthesis for Mobile Data Communications论文精读
  13. JAVA一维数组求和
  14. python随机生成20个数字_你如何在Python中生成20个随机数字
  15. 25个能够激发灵感的暗色调CSS Web设计欣赏
  16. 券商Robinhood大量客户被最低价格强平-交易成本拉大500倍,游戏驿站GME只能平仓不能开仓-看看行政总裁Vlad Tenev是如何回复这些问题的?
  17. eis电子防抖好还是光学防抖好_EIS和OIS有啥差别?一文搞懂手机防抖的那些事儿...
  18. elementUi中的el-select/el-input去掉border边框
  19. 揭示世界本质的「机器科学家」,比深度神经网络还强?
  20. 图像增强系列之图像自动去暗角算法。

热门文章

  1. David Cutler,VMS和Windows NT的首席设计师 (zz.is2120)
  2. 正则表达式 转义字符
  3. 主板故障的分析和诊断
  4. 《测绘程序开发实习》导线网平差 C++上机实验报告 CSU
  5. 《剑指offer》面试题31——连续子数组的最大和
  6. sklearn——model_selection——knn手写识别系统+iris分类
  7. mysql中in的问题
  8. Java 单向链表翻转
  9. ServletContextListener的用法
  10. 如何追求高质量的代码?