http://janeky.iteye.com/category/124727

java多线程学习-java.util.concurrent详解(一) Latch/Barrier

  • 博客分类:
  • java多线程
多线程Java编程threadJDK
Java1.5提供了一个非常高效实用的多线程包:java.util.concurrent, 提供了大量高级工具,可以帮助开发者编写高效、易维护、结构清晰的Java多线程程序。从这篇blog起,我将跟大家一起共同学习这些新的Java多线程构件

1. CountDownLatch
    我们先来学习一下JDK1.5 API中关于这个类的详细介绍:
“一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。 用给定的计数 初始化 CountDownLatch。由于调用了 countDown() 方法,所以在当前计数到达零之前,await 方法会一直受阻塞。之后,会释放所有等待的线程,await 的所有后续调用都将立即返回。这种现象只出现一次——计数无法被重置。如果需要重置计数,请考虑使用 CyclicBarrier。”

这就是说,CountDownLatch可以用来管理一组相关的线程执行,只需在主线程中调用CountDownLatch 的await方法(一直阻塞),让各个线程调用countDown方法。当所有的线程都只需完countDown了,await也顺利返回,不再阻塞了。在这样情况下尤其适用:将一个任务分成若干线程执行,等到所有线程执行完,再进行汇总处理。

下面我举一个非常简单的例子。假设我们要打印1-100,最后再输出“Ok“。1-100的打印顺序不要求统一,只需保证“Ok“是在最后出现即可。

解决方案:我们定义一个CountDownLatch,然后开10个线程分别打印(n-1)*10+1至(n-1)*10+10。主线程中调用await方法等待所有线程的执行完毕,每个线程执行完毕后都调用countDown方法。最后再await返回后打印“Ok”。

具体代码如下(本代码参考了JDK示例代码):

Java代码  
  1. import java.util.concurrent.CountDownLatch;
  2. /**
  3. * 示例:CountDownLatch的使用举例
  4. * Mail: ken@iamcoding.com
  5. * @author janeky
  6. */
  7. public class TestCountDownLatch {
  8. private static final int N = 10;
  9. public static void main(String[] args) throws InterruptedException {
  10. CountDownLatch doneSignal = new CountDownLatch(N);
  11. CountDownLatch startSignal = new CountDownLatch(1);//开始执行信号
  12. for (int i = 1; i <= N; i++) {
  13. new Thread(new Worker(i, doneSignal, startSignal)).start();//线程启动了
  14. }
  15. System.out.println("begin------------");
  16. startSignal.countDown();//开始执行啦
  17. doneSignal.await();//等待所有的线程执行完毕
  18. System.out.println("Ok");
  19. }
  20. static class Worker implements Runnable {
  21. private final CountDownLatch doneSignal;
  22. private final CountDownLatch startSignal;
  23. private int beginIndex;
  24. Worker(int beginIndex, CountDownLatch doneSignal,
  25. CountDownLatch startSignal) {
  26. this.startSignal = startSignal;
  27. this.beginIndex = beginIndex;
  28. this.doneSignal = doneSignal;
  29. }
  30. public void run() {
  31. try {
  32. startSignal.await(); //等待开始执行信号的发布
  33. beginIndex = (beginIndex - 1) * 10 + 1;
  34. for (int i = beginIndex; i <= beginIndex + 10; i++) {
  35. System.out.println(i);
  36. }
  37. } catch (InterruptedException e) {
  38. e.printStackTrace();
  39. } finally {
  40. doneSignal.countDown();
  41. }
  42. }
  43. }
  44. }
import java.util.concurrent.CountDownLatch;
/**
* 示例:CountDownLatch的使用举例
* Mail: ken@iamcoding.com
* @author janeky
*/
public class TestCountDownLatch {
private static final int N = 10;
public static void main(String[] args) throws InterruptedException {
CountDownLatch doneSignal = new CountDownLatch(N);
CountDownLatch startSignal = new CountDownLatch(1);//开始执行信号
for (int i = 1; i <= N; i++) {
new Thread(new Worker(i, doneSignal, startSignal)).start();//线程启动了
}
System.out.println("begin------------");
startSignal.countDown();//开始执行啦
doneSignal.await();//等待所有的线程执行完毕
System.out.println("Ok");
}
static class Worker implements Runnable {
private final CountDownLatch doneSignal;
private final CountDownLatch startSignal;
private int beginIndex;
Worker(int beginIndex, CountDownLatch doneSignal,
CountDownLatch startSignal) {
this.startSignal = startSignal;
this.beginIndex = beginIndex;
this.doneSignal = doneSignal;
}
public void run() {
try {
startSignal.await(); //等待开始执行信号的发布
beginIndex = (beginIndex - 1) * 10 + 1;
for (int i = beginIndex; i <= beginIndex + 10; i++) {
System.out.println(i);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
doneSignal.countDown();
}
}
}
}

总结:CounDownLatch对于管理一组相关线程非常有用。上述示例代码中就形象地描述了两种使用情况。第一种是计算器为1,代表了两种状态,开关。第二种是计数器为N,代表等待N个操作完成。今后我们在编写多线程程序时,可以使用这个构件来管理一组独立线程的执行。

2. CyclicBarrier
    我们先来学习一下JDK1.5 API中关于这个类的详细介绍:
    “一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。
    CyclicBarrier 支持一个可选的 Runnable 命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态,此屏障操作 很有用。

我们在学习CountDownLatch的时候就提到了CyclicBarrier。两者究竟有什么联系呢?引用[JCIP]中的描述“The key difference is that with a barrier, all the threads must come together at a barrier point at the same time in order to proceed. Latches are for waiting for events; barriers are for waiting for other threads。CyclicBarrier等待所有的线程一起完成后再执行某个动作。这个功能CountDownLatch也同样可以实现。但是CountDownLatch更多时候是在等待某个事件的发生。在CyclicBarrier中,所有的线程调用await方法,等待其他线程都执行完。

举一个很简单的例子,今天晚上我们哥们4个去Happy。就互相通知了一下:晚上八点准时到xx酒吧门前集合,不见不散!。有个哥们住的近,早早就到了。有的事务繁忙,刚好踩点到了。无论怎样,先来的都不能独自行动,只能等待所有人

代码如下(参考了网上给的一些教程)

Java代码  
  1. import java.util.Random;
  2. import java.util.concurrent.BrokenBarrierException;
  3. import java.util.concurrent.CyclicBarrier;
  4. import java.util.concurrent.ExecutorService;
  5. import java.util.concurrent.Executors;
  6. public class TestCyclicBarrier {
  7. public static void main(String[] args) {
  8. ExecutorService exec = Executors.newCachedThreadPool();
  9. final Random random=new Random();
  10. final CyclicBarrier barrier=new CyclicBarrier(4,new Runnable(){
  11. @Override
  12. public void run() {
  13. System.out.println("大家都到齐了,开始happy去");
  14. }});
  15. for(int i=0;i<4;i++){
  16. exec.execute(new Runnable(){
  17. @Override
  18. public void run() {
  19. try {
  20. Thread.sleep(random.nextInt(1000));
  21. } catch (InterruptedException e) {
  22. e.printStackTrace();
  23. }
  24. System.out.println(Thread.currentThread().getName()+"到了,其他哥们呢");
  25. try {
  26. barrier.await();//等待其他哥们
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. } catch (BrokenBarrierException e) {
  30. e.printStackTrace();
  31. }
  32. }});
  33. }
  34. exec.shutdown();
  35. }
  36. }
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestCyclicBarrier {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
final Random random=new Random();
final CyclicBarrier barrier=new CyclicBarrier(4,new Runnable(){
@Override
public void run() {
System.out.println("大家都到齐了,开始happy去");
}});
for(int i=0;i<4;i++){
exec.execute(new Runnable(){
@Override
public void run() {
try {
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"到了,其他哥们呢");
try {
barrier.await();//等待其他哥们
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}});
}
exec.shutdown();
}
}

关于await方法要特别注意一下,它有可能在阻塞的过程中由于某些原因被中断

总结:CyclicBarrier就是一个栅栏,等待所有线程到达后再执行相关的操作。barrier 在释放等待线程后可以重用。

更多的Java编程资料,欢迎访问我的blog:http://janeky.iteye.com,希望能够与你有更多的交流

未完待续

  • src.rar (1.3 KB)
  • 下载次数: 24

java多线程学习-java.util.concurrent详解(二)Semaphore/FutureTask/Exchanger

  • 博客分类:
  • java多线程
Java多线程Exchangethread算法
前一篇文章 http://janeky.iteye.com/category/124727
我们学习了java.util.concurrent的CountDownLatch和CyclicBarrier
今天我们继续共同来探讨其他的多线程组件
-----------------------------------------------------------------------------
3. Semaphore
    我们先来学习一下JDK1.5 API中关于这个类的详细介绍:
“一个计数信号量。从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可。每个 release() 添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动。”

我们一般用它来控制某个对象的线程访问对象

例如,对于某个容器,我们规定,最多只能容纳n个线程同时操作
使用信号量来模拟实现

具体代码如下(参考 [JCIP])

Java代码  
  1. import java.util.Collections;
  2. import java.util.HashSet;
  3. import java.util.Set;
  4. import java.util.concurrent.ExecutorService;
  5. import java.util.concurrent.Executors;
  6. import java.util.concurrent.Semaphore;
  7. public class TestSemaphore {
  8. public static void main(String[] args) {
  9. ExecutorService exec = Executors.newCachedThreadPool();
  10. TestSemaphore t = new TestSemaphore();
  11. final BoundedHashSet<String> set = t.getSet();
  12. for (int i = 0; i < 3; i++) {//三个线程同时操作add
  13. exec.execute(new Runnable() {
  14. public void run() {
  15. try {
  16. set.add(Thread.currentThread().getName());
  17. } catch (InterruptedException e) {
  18. e.printStackTrace();
  19. }
  20. }
  21. });
  22. }
  23. for (int j = 0; j < 3; j++) {//三个线程同时操作remove
  24. exec.execute(new Runnable() {
  25. public void run() {
  26. set.remove(Thread.currentThread().getName());
  27. }
  28. });
  29. }
  30. exec.shutdown();
  31. }
  32. public BoundedHashSet<String> getSet() {
  33. return new BoundedHashSet<String>(2);//定义一个边界约束为2的线程
  34. }
  35. class BoundedHashSet<T> {
  36. private final Set<T> set;
  37. private final Semaphore semaphore;
  38. public BoundedHashSet(int bound) {
  39. this.set = Collections.synchronizedSet(new HashSet<T>());
  40. this.semaphore = new Semaphore(bound, true);
  41. }
  42. public void add(T o) throws InterruptedException {
  43. semaphore.acquire();//信号量控制可访问的线程数目
  44. set.add(o);
  45. System.out.printf("add:%s%n",o);
  46. }
  47. public void remove(T o) {
  48. if (set.remove(o))
  49. semaphore.release();//释放掉信号量
  50. System.out.printf("remove:%s%n",o);
  51. }
  52. }
  53. }
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class TestSemaphore {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
TestSemaphore t = new TestSemaphore();
final BoundedHashSet<String> set = t.getSet();
for (int i = 0; i < 3; i++) {//三个线程同时操作add
exec.execute(new Runnable() {
public void run() {
try {
set.add(Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
for (int j = 0; j < 3; j++) {//三个线程同时操作remove
exec.execute(new Runnable() {
public void run() {
set.remove(Thread.currentThread().getName());
}
});
}
exec.shutdown();
}
public BoundedHashSet<String> getSet() {
return new BoundedHashSet<String>(2);//定义一个边界约束为2的线程
}
class BoundedHashSet<T> {
private final Set<T> set;
private final Semaphore semaphore;
public BoundedHashSet(int bound) {
this.set = Collections.synchronizedSet(new HashSet<T>());
this.semaphore = new Semaphore(bound, true);
}
public void add(T o) throws InterruptedException {
semaphore.acquire();//信号量控制可访问的线程数目
set.add(o);
System.out.printf("add:%s%n",o);
}
public void remove(T o) {
if (set.remove(o))
semaphore.release();//释放掉信号量
System.out.printf("remove:%s%n",o);
}
}
}

总结:Semaphore通常用于对象池的控制

4.FutureTask
    我们先来学习一下JDK1.5 API中关于这个类的详细介绍:

“取消的异步计算。利用开始和取消计算的方法、查询计算是否完成的方法和获取计算结果的方法,此类提供了对 Future 的基本实现。仅在计算完成时才能获取结果;如果计算尚未完成,则阻塞 get 方法。一旦计算完成,就不能再重新开始或取消计算。
可使用 FutureTask 包装 Callable 或 Runnable 对象。因为 FutureTask 实现了 Runnable,所以可将 FutureTask 提交给 Executor 执行。
除了作为一个独立的类外,此类还提供了 protected 功能,这在创建自定义任务类时可能很有用。 “

应用举例:我们的算法中有一个很耗时的操作,在编程的是,我们希望将它独立成一个模块,调用的时候当做它是立刻返回的,并且可以随时取消的

具体代码如下(参考 [JCIP])

Java代码  
  1. import java.util.concurrent.Callable;
  2. import java.util.concurrent.ExecutionException;
  3. import java.util.concurrent.ExecutorService;
  4. import java.util.concurrent.Executors;
  5. import java.util.concurrent.FutureTask;
  6. public class TestFutureTask {
  7. public static void main(String[] args) {
  8. ExecutorService exec=Executors.newCachedThreadPool();
  9. FutureTask<String> task=new FutureTask<String>(new Callable<String>(){//FutrueTask的构造参数是一个Callable接口
  10. @Override
  11. public String call() throws Exception {
  12. return Thread.currentThread().getName();//这里可以是一个异步操作
  13. }});
  14. try {
  15. exec.execute(task);//FutureTask实际上也是一个线程
  16. String result=task.get();//取得异步计算的结果,如果没有返回,就会一直阻塞等待
  17. System.out.printf("get:%s%n",result);
  18. } catch (InterruptedException e) {
  19. e.printStackTrace();
  20. } catch (ExecutionException e) {
  21. e.printStackTrace();
  22. }
  23. }
  24. }
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
public class TestFutureTask {
public static void main(String[] args) {
ExecutorService exec=Executors.newCachedThreadPool();
FutureTask<String> task=new FutureTask<String>(new Callable<String>(){//FutrueTask的构造参数是一个Callable接口
@Override
public String call() throws Exception {
return Thread.currentThread().getName();//这里可以是一个异步操作
}});
try {
exec.execute(task);//FutureTask实际上也是一个线程
String result=task.get();//取得异步计算的结果,如果没有返回,就会一直阻塞等待
System.out.printf("get:%s%n",result);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}

总结:FutureTask其实就是新建了一个线程单独执行,使得线程有一个返回值,方便程序的编写

5. Exchanger
    我们先来学习一下JDK1.5 API中关于这个类的详细介绍:
    “可以在pair中对元素进行配对和交换的线程的同步点。每个线程将条目上的某个方法呈现给 exchange 方法,与伙伴线程进行匹配,并且在返回时接收其伙伴的对象。Exchanger 可能被视为 SynchronousQueue 的双向形式。Exchanger 可能在应用程序(比如遗传算法和管道设计)中很有用。 “

应用举例:有两个缓存区,两个线程分别向两个缓存区fill和take,当且仅当一个满了,两个缓存区交换

代码如下(参考了网上给的示例   http://hi.baidu.com/webidea/blog/item/2995e731e53ad5a55fdf0e7d.html)

Java代码  
  1. import java.util.ArrayList;
  2. import java.util.concurrent.Exchanger;
  3. public class TestExchanger {
  4. public static void main(String[] args) {
  5. final Exchanger<ArrayList<Integer>> exchanger = new Exchanger<ArrayList<Integer>>();
  6. final ArrayList<Integer> buff1 = new ArrayList<Integer>(10);
  7. final ArrayList<Integer> buff2 = new ArrayList<Integer>(10);
  8. new Thread(new Runnable() {
  9. @Override
  10. public void run() {
  11. ArrayList<Integer> buff = buff1;
  12. try {
  13. while (true) {
  14. if (buff.size() >= 10) {
  15. buff = exchanger.exchange(buff);//开始跟另外一个线程交互数据
  16. System.out.println("exchange buff1");
  17. buff.clear();
  18. }
  19. buff.add((int)(Math.random()*100));
  20. Thread.sleep((long)(Math.random()*1000));
  21. }
  22. } catch (InterruptedException e) {
  23. e.printStackTrace();
  24. }
  25. }
  26. }).start();
  27. new Thread(new Runnable(){
  28. @Override
  29. public void run() {
  30. ArrayList<Integer> buff=buff2;
  31. while(true){
  32. try {
  33. for(Integer i:buff){
  34. System.out.println(i);
  35. }
  36. Thread.sleep(1000);
  37. buff=exchanger.exchange(buff);//开始跟另外一个线程交换数据
  38. System.out.println("exchange buff2");
  39. } catch (InterruptedException e) {
  40. e.printStackTrace();
  41. }
  42. }
  43. }}).start();
  44. }
  45. }
import java.util.ArrayList;
import java.util.concurrent.Exchanger;
public class TestExchanger {
public static void main(String[] args) {
final Exchanger<ArrayList<Integer>> exchanger = new Exchanger<ArrayList<Integer>>();
final ArrayList<Integer> buff1 = new ArrayList<Integer>(10);
final ArrayList<Integer> buff2 = new ArrayList<Integer>(10);
new Thread(new Runnable() {
@Override
public void run() {
ArrayList<Integer> buff = buff1;
try {
while (true) {
if (buff.size() >= 10) {
buff = exchanger.exchange(buff);//开始跟另外一个线程交互数据
System.out.println("exchange buff1");
buff.clear();
}
buff.add((int)(Math.random()*100));
Thread.sleep((long)(Math.random()*1000));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable(){
@Override
public void run() {
ArrayList<Integer> buff=buff2;
while(true){
try {
for(Integer i:buff){
System.out.println(i);
}
Thread.sleep(1000);
buff=exchanger.exchange(buff);//开始跟另外一个线程交换数据
System.out.println("exchange buff2");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}}).start();
}
}

总结:Exchanger在特定的使用场景比较有用(两个伙伴线程之间的数据交互)
----------------------------------------------------------------------------------
更多的java多线程资料,欢迎访问 http://janeky.iteye.com/category/124727

java多线程学习-java.util.concurrent详解(三)ScheduledThreadPoolExecutor

  • 博客分类:
  • java多线程
Java多线程Blog
前一篇blog http://janeky.iteye.com/category/124727我们学习了java多线程的信号量/FutureTask
----------------------------------------------------------------------------------

6. ScheduledThreadPoolExecutor
    我们先来学习一下JDK1.5 API中关于这个类的详细介绍:

"可另行安排在给定的延迟后运行命令,或者定期执行命令。需要多个辅助线程时,或者要求 ThreadPoolExecutor 具有额外的灵活性或功能时,此类要优于 Timer。
    一旦启用已延迟的任务就执行它,但是有关何时启用,启用后何时执行则没有任何实时保证。按照提交的先进先出 (FIFO) 顺序来启用那些被安排在同一执行时间的任务。

虽然此类继承自 ThreadPoolExecutor,但是几个继承的调整方法对此类并无作用。特别是,因为它作为一个使用 corePoolSize 线程和一个无界队列的固定大小的池,所以调整 maximumPoolSize 没有什么效果。"

在JDK1.5之前,我们关于定时/周期操作都是通过Timer来实现的。但是Timer有以下几种危险[JCIP]

a. Timer是基于绝对时间的。容易受系统时钟的影响。
b. Timer只新建了一个线程来执行所有的TimeTask。所有TimeTask可能会相关影响
c. Timer不会捕获TimerTask的异常,只是简单地停止。这样势必会影响其他TimeTask的执行。

如果你是使用JDK1.5以上版本,建议用ScheduledThreadPoolExecutor代替Timer。它基本上解决了上述问题。它采用相对时间,用线程池来执行TimerTask,会出来TimerTask异常。

下面通过一个简单的实例来阐述ScheduledThreadPoolExecutor的使用。
  
    我们定期让定时器抛异常
    我们定期从控制台打印系统时间

代码如下(参考了网上的一些代码,在此表示感谢)

Java代码  
  1. import java.util.concurrent.ScheduledThreadPoolExecutor;
  2. import java.util.concurrent.TimeUnit;
  3. public class TestScheduledThreadPoolExecutor {
  4. public static void main(String[] args) {
  5. ScheduledThreadPoolExecutor exec=new ScheduledThreadPoolExecutor(1);
  6. exec.scheduleAtFixedRate(new Runnable(){//每隔一段时间就触发异常
  7. @Override
  8. public void run() {
  9. throw new RuntimeException();
  10. }}, 1000, 5000, TimeUnit.MILLISECONDS);
  11. exec.scheduleAtFixedRate(new Runnable(){//每隔一段时间打印系统时间,证明两者是互不影响的
  12. @Override
  13. public void run() {
  14. System.out.println(System.nanoTime());
  15. }}, 1000, 2000, TimeUnit.MILLISECONDS);
  16. }
  17. }
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TestScheduledThreadPoolExecutor {
public static void main(String[] args) {
ScheduledThreadPoolExecutor exec=new ScheduledThreadPoolExecutor(1);
exec.scheduleAtFixedRate(new Runnable(){//每隔一段时间就触发异常
@Override
public void run() {
throw new RuntimeException();
}}, 1000, 5000, TimeUnit.MILLISECONDS);
exec.scheduleAtFixedRate(new Runnable(){//每隔一段时间打印系统时间,证明两者是互不影响的
@Override
public void run() {
System.out.println(System.nanoTime());
}}, 1000, 2000, TimeUnit.MILLISECONDS);
}
}

总结:是时候把你的定时器换成 ScheduledThreadPoolExecutor了

--------------------------------------------------------------------
更多的java多线程资料,欢迎访问 http://janeky.iteye.com/category/124727

java多线程学习-java.util.concurrent详解(四) BlockingQueue

  • 博客分类:
  • java多线程
Java多线程thread生活数据结构
前面一篇blog http://janeky.iteye.com/category/124727
我们主要探讨了ScheduledThreadPoolExecutor定时器的使用
---------------------------------------------------------------------------------
7.BlockingQueue
    “支持两个附加操作的 Queue,这两个操作是:获取元素时等待队列变为非空,以及存储元素时等待空间变得可用。“

这里我们主要讨论BlockingQueue的最典型实现:LinkedBlockingQueue 和ArrayBlockingQueue。两者的不同是底层的数据结构不够,一个是链表,另外一个是数组。
   
    后面将要单独解释其他类型的BlockingQueue和SynchronousQueue

BlockingQueue的经典用途是 生产者-消费者模式

代码如下:

Java代码  
  1. import java.util.Random;
  2. import java.util.concurrent.BlockingQueue;
  3. import java.util.concurrent.LinkedBlockingQueue;
  4. public class TestBlockingQueue {
  5. public static void main(String[] args) {
  6. final BlockingQueue<Integer> queue=new LinkedBlockingQueue<Integer>(3);
  7. final Random random=new Random();
  8. class Producer implements Runnable{
  9. @Override
  10. public void run() {
  11. while(true){
  12. try {
  13. int i=random.nextInt(100);
  14. queue.put(i);//当队列达到容量时候,会自动阻塞的
  15. if(queue.size()==3)
  16. {
  17. System.out.println("full");
  18. }
  19. } catch (InterruptedException e) {
  20. e.printStackTrace();
  21. }
  22. }
  23. }
  24. }
  25. class Consumer implements Runnable{
  26. @Override
  27. public void run() {
  28. while(true){
  29. try {
  30. queue.take();//当队列为空时,也会自动阻塞
  31. Thread.sleep(1000);
  32. } catch (InterruptedException e) {
  33. e.printStackTrace();
  34. }
  35. }
  36. }
  37. }
  38. new Thread(new Producer()).start();
  39. new Thread(new Consumer()).start();
  40. }
  41. }
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class TestBlockingQueue {
public static void main(String[] args) {
final BlockingQueue<Integer> queue=new LinkedBlockingQueue<Integer>(3);
final Random random=new Random();
class Producer implements Runnable{
@Override
public void run() {
while(true){
try {
int i=random.nextInt(100);
queue.put(i);//当队列达到容量时候,会自动阻塞的
if(queue.size()==3)
{
System.out.println("full");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Consumer implements Runnable{
@Override
public void run() {
while(true){
try {
queue.take();//当队列为空时,也会自动阻塞
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
new Thread(new Producer()).start();
new Thread(new Consumer()).start();
}
}

总结:BlockingQueue使用时候特别注意take 和 put

8. DelayQueue

我们先来学习一下JDK1.5 API中关于这个类的详细介绍:
    “它是包含Delayed 元素的一个无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部 是延迟期满后保存时间最长的 Delayed 元素。如果延迟都还没有期满,则队列没有头部,并且 poll 将返回 null。当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于等于 0 的值时,将发生到期。即使无法使用 take 或 poll 移除未到期的元素,也不会将这些元素作为正常元素对待。例如,size 方法同时返回到期和未到期元素的计数。此队列不允许使用 null 元素。”

在现实生活中,很多DelayQueue的例子。就拿上海的SB会来说明,很多国家地区的开馆时间不同。你很早就来到园区,然后急急忙忙地跑到一些心仪的馆区,发现有些还没开,你吃了闭门羹。

仔细研究DelayQueue,你会发现它其实就是一个PriorityQueue的封装(按照delay时间排序),里面的元素都实现了Delayed接口,相关操作需要判断延时时间是否到了。

在实际应用中,有人拿它来管理跟实际相关的缓存、session等

下面我就通过 “上海SB会的例子来阐述DelayQueue的用法”

代码如下:

Java代码  
  1. import java.util.Random;
  2. import java.util.concurrent.DelayQueue;
  3. import java.util.concurrent.Delayed;
  4. import java.util.concurrent.TimeUnit;
  5. public class TestDelayQueue {
  6. private class Stadium implements Delayed
  7. {
  8. long trigger;
  9. public Stadium(long i){
  10. trigger=System.currentTimeMillis()+i;
  11. }
  12. @Override
  13. public long getDelay(TimeUnit arg0) {
  14. long n=trigger-System.currentTimeMillis();
  15. return n;
  16. }
  17. @Override
  18. public int compareTo(Delayed arg0) {
  19. return (int)(this.getDelay(TimeUnit.MILLISECONDS)-arg0.getDelay(TimeUnit.MILLISECONDS));
  20. }
  21. public long getTriggerTime(){
  22. return trigger;
  23. }
  24. }
  25. public static void main(String[] args)throws Exception {
  26. Random random=new Random();
  27. DelayQueue<Stadium> queue=new DelayQueue<Stadium>();
  28. TestDelayQueue t=new TestDelayQueue();
  29. for(int i=0;i<5;i++){
  30. queue.add(t.new Stadium(random.nextInt(30000)));
  31. }
  32. Thread.sleep(2000);
  33. while(true){
  34. Stadium s=queue.take();//延时时间未到就一直等待
  35. if(s!=null){
  36. System.out.println(System.currentTimeMillis()-s.getTriggerTime());//基本上是等于0
  37. }
  38. if(queue.size()==0)
  39. break;
  40. }
  41. }
  42. }
import java.util.Random;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class TestDelayQueue {
private class Stadium implements Delayed
{
long trigger;
public Stadium(long i){
trigger=System.currentTimeMillis()+i;
}
@Override
public long getDelay(TimeUnit arg0) {
long n=trigger-System.currentTimeMillis();
return n;
}
@Override
public int compareTo(Delayed arg0) {
return (int)(this.getDelay(TimeUnit.MILLISECONDS)-arg0.getDelay(TimeUnit.MILLISECONDS));
}
public long getTriggerTime(){
return trigger;
}
}
public static void main(String[] args)throws Exception {
Random random=new Random();
DelayQueue<Stadium> queue=new DelayQueue<Stadium>();
TestDelayQueue t=new TestDelayQueue();
for(int i=0;i<5;i++){
queue.add(t.new Stadium(random.nextInt(30000)));
}
Thread.sleep(2000);
while(true){
Stadium s=queue.take();//延时时间未到就一直等待
if(s!=null){
System.out.println(System.currentTimeMillis()-s.getTriggerTime());//基本上是等于0
}
if(queue.size()==0)
break;
}
}
}

总结:适用于需要延时操作的队列管理

9. SynchronousQueue
    我们先来学习一下JDK1.5 API中关于这个类的详细介绍:

“一种阻塞队列,其中每个插入操作必须等待另一个线程的对应移除操作 ,反之亦然。同步队列没有任何内部容量,甚至连一个队列的容量都没有。不能在同步队列上进行 peek,因为仅在试图要移除元素时,该元素才存在;除非另一个线程试图移除某个元素,否则也不能(使用任何方法)插入元素;也不能迭代队列,因为其中没有元素可用于迭代。队列的头 是尝试添加到队列中的首个已排队插入线程的元素;如果没有这样的已排队线程,则没有可用于移除的元素并且 poll() 将会返回 null。对于其他 Collection 方法(例如 contains),SynchronousQueue 作为一个空 collection。此队列不允许 null 元素。
    同步队列类似于 CSP 和 Ada 中使用的 rendezvous 信道。它非常适合于传递性设计,在这种设计中,在一个线程中运行的对象要将某些信息、事件或任务传递给在另一个线程中运行的对象,它就必须与该对象同步。 “

看起来很有意思吧。队列竟然是没有内部容量的。这个队列其实是BlockingQueue的一种实现。每个插入操作必须等待另一个线程的对应移除操作,反之亦然。它给我们提供了在线程之间交换单一元素的极轻量级方法

应用举例:我们要在多个线程中传递一个变量。

代码如下(其实就是生产者消费者模式)

Java代码  
  1. import java.util.Arrays;
  2. import java.util.List;
  3. import java.util.concurrent.BlockingQueue;
  4. import java.util.concurrent.SynchronousQueue;
  5. public class TestSynchronousQueue {
  6. class Producer implements Runnable {
  7. private BlockingQueue<String> queue;
  8. List<String> objects = Arrays.asList("one", "two", "three");
  9. public Producer(BlockingQueue<String> q) {
  10. this.queue = q;
  11. }
  12. @Override
  13. public void run() {
  14. try {
  15. for (String s : objects) {
  16. queue.put(s);// 产生数据放入队列中
  17. System.out.printf("put:%s%n",s);
  18. }
  19. queue.put("Done");// 已完成的标志
  20. } catch (InterruptedException e) {
  21. e.printStackTrace();
  22. }
  23. }
  24. }
  25. class Consumer implements Runnable {
  26. private BlockingQueue<String> queue;
  27. public Consumer(BlockingQueue<String> q) {
  28. this.queue = q;
  29. }
  30. @Override
  31. public void run() {
  32. String obj = null;
  33. try {
  34. while (!((obj = queue.take()).equals("Done"))) {
  35. System.out.println(obj);//从队列中读取对象
  36. Thread.sleep(3000);     //故意sleep,证明Producer是put不进去的
  37. }
  38. } catch (InterruptedException e) {
  39. e.printStackTrace();
  40. }
  41. }
  42. }
  43. public static void main(String[] args) {
  44. BlockingQueue<String> q=new SynchronousQueue<String>();
  45. TestSynchronousQueue t=new TestSynchronousQueue();
  46. new Thread(t.new Producer(q)).start();
  47. new Thread(t.new Consumer(q)).start();
  48. }
  49. }
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
public class TestSynchronousQueue {
class Producer implements Runnable {
private BlockingQueue<String> queue;
List<String> objects = Arrays.asList("one", "two", "three");
public Producer(BlockingQueue<String> q) {
this.queue = q;
}
@Override
public void run() {
try {
for (String s : objects) {
queue.put(s);// 产生数据放入队列中
System.out.printf("put:%s%n",s);
}
queue.put("Done");// 已完成的标志
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Consumer implements Runnable {
private BlockingQueue<String> queue;
public Consumer(BlockingQueue<String> q) {
this.queue = q;
}
@Override
public void run() {
String obj = null;
try {
while (!((obj = queue.take()).equals("Done"))) {
System.out.println(obj);//从队列中读取对象
Thread.sleep(3000);     //故意sleep,证明Producer是put不进去的
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
BlockingQueue<String> q=new SynchronousQueue<String>();
TestSynchronousQueue t=new TestSynchronousQueue();
new Thread(t.new Producer(q)).start();
new Thread(t.new Consumer(q)).start();
}
}

总结:SynchronousQueue主要用于单个元素在多线程之间的传递

------------------------------------------------------------
更多的Java多线程资料,欢迎访问 http://janeky.iteye.com/category/124727

java多线程学习-java.util.concurrent详解相关推荐

  1. Java中大数据数组,Java基础学习笔记之数组详解

    摘要:这篇Java开发技术栏目下的"Java基础学习笔记之数组详解",介绍的技术点是"java基础学习笔记.基础学习笔记.Java基础.数组详解.学习笔记.Java&qu ...

  2. java多线程中的join方法详解

    java多线程中的join方法详解 方法Join是干啥用的? 简单回答,同步,如何同步? 怎么实现的? 下面将逐个回答. 自从接触Java多线程,一直对Join理解不了.JDK是这样说的:join p ...

  3. Java多线程系列(六):深入详解Synchronized同步锁的底层实现

    谈到多线程就不得不谈到Synchronized,很多同学只会使用,缺不是很明白整个Synchronized的底层实现原理,这也是面试经常被问到的环节,比如: synchronized的底层实现原理 s ...

  4. java多线程学习-java.util.concurrent详解(一) Latch/Barrier

    2019独角兽企业重金招聘Python工程师标准>>> Java1.5提供了一个非常高效实用的多线程包:java.util.concurrent, 提供了大量高级工具,可以帮助开发者 ...

  5. java多线程详解 六_java多线程学习-java.util.concurrent详解(六) Exchanger

    转载于:http://janeky.iteye.com/blog/769965 我们先来学习一下JDK1.5 API中关于这个类的详细介绍: "可以在pair中对元素进行配对和交换的线程的同 ...

  6. java多线程学习-java.util.concurrent详解(五) ScheduledThreadPoolExecutor

    转载于:http://janeky.iteye.com/blog/769965 我们先来学习一下JDK1.5 API中关于这个类的详细介绍: "可另行安排在给定的延迟后运行命令,或者定期执行 ...

  7. Java并发包-java.util.concurrent详解

    转载自https://blog.csdn.net/axi295309066/article/details/65665090 一.阻塞队列BlockingQueue BlockingQueue通常用于 ...

  8. Java多线程系列之“JUC集合“详解

    Java集合包 在"Java 集合系列01之 总体框架"中,介绍java集合的架构.主体内容包括Collection集合和Map类:而Collection集合又可以划分为List( ...

  9. java多线程之——生产者和消费者(详解及提高)

    目录 前情引入 简单介绍 预备知识 代码及详解 简单代码 基本解释 生产者线程类 消费者线程类 测试类 执行流程 控制台输出 自我提高 问题一 问题二 升级代码 总结 前情引入 做一些简单的认识和告知 ...

最新文章

  1. Linux下查看进程和线程
  2. 微众WeCross 跨链平台(9)MIG多边跨域治理
  3. Loadrunner中socket协议中的三个关联函数
  4. python调用mysql数据库sql语句过长有问题吗_python连接MYSQL数据库,调用update语句后无法更新数据,解决...
  5. Python 迭代器,错误、异常处理
  6. Win7系统浏览器的兼容模式如何设置
  7. 常用开发术语及工具001---NGF_TPS_墨刀手机原型页面设计工具
  8. 码云克隆项目到IntelliJ IDEA中
  9. 微信小程序生成海报分享:canvas绘制文字溢出如何换行
  10. TensorFlow入门:线性回归
  11. war压缩命令_war包解压命令详解,java项目如何打包成war包?
  12. wifip2p重连 android,Android Wifi实现分析
  13. Jmeter_基本操作-取样器
  14. 互联网金融爬虫怎么写-第一课 p2p网贷爬虫(XPath入门)
  15. 嵌入式系统 ---> 程序存储器和数据存储器
  16. html调用短信接口发送消息的实例,HTTP电脑发送短信接口调用示例
  17. RTL8188CUS 无线网卡使用说明
  18. LS1043A 查看 MAC、BMI、QMI寄存器, 查看网口发包丢包情况
  19. power-virus
  20. 在计算机网络俗称网上邻居上能看到自己,为什么在“网上邻居”中可以看到自己,却看不到其他联网电脑?...

热门文章

  1. 2 0 2 0 年 第 十 一 届 蓝 桥 杯 - 省赛 - Python大学组 - A. 门牌制作
  2. ubuntu的Unity功能安装
  3. 【机器视觉】 dev_set_window_extents算子
  4. 【机器视觉】 executable_expression算子
  5. 【Qt】modbus之TCP模式写操作
  6. 【ARM】MRS MSR指令
  7. 【Linux系统编程】POSIX有名信号量
  8. 分页携带请求参数_一个值得深思的小问题 请求中的参数值为空要不要携带该参数?...
  9. 2020-09-11
  10. spring无法用三级缓存解决循环依赖的问题分析