并发编程——线程协作

​ 前面学习了线程,那么并发编程中,如何协调多个线程来开发呢?


Semaphore

​ 信号量跟前面将的同步互斥解决方案——信号量是一个东西,这是JDK的信号量实现。

源码分析

​ 先看下JDK的Semaphore类注释,看下Semaphore是做什么的:

A counting semaphore. Conceptually, a semaphore maintains a set of permits. Each acquire blocks if necessary until a permit is available, and then takes it. Each release adds a permit, potentially releasing a blocking acquirer. However, no actual permit objects are used; the Semaphore just keeps a count of the number available and acts accordingly.
Semaphores are often used to restrict the number of threads than can access some (physical or logical) resource

​ 简单的翻译:

​ 就是一组许可证,可以用来限制对共享资源的访问数量。

​ 也就是说,当count>1时,信号量是允许多个线程同时访问临界区的,当count=1,就是串行化的效果。

接下来看下它的构造函数:

  public Semaphore(int permits) {sync = new NonfairSync(permits);}public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);}

​ 也就是说信号量是排队的时候公平/非公平的,那我们分别看下它的两个内部类:NonfairSync与FairSync,都只有一个方法:tryAcquireShared,尝试获取许可证,来看下它们的实现:

​ 公平FairSync:

protected int tryAcquireShared(int acquires) {for (;;) {if (hasQueuedPredecessors())return -1;int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}

非公平NonfairSync:

final int nonfairTryAcquireShared(int acquires) {for (;;) {int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}

​ 可以看到,公平锁的做法是先去等待队列里看看,如果等待队列里还有人排队,就直接返回-1,其余的操作一样,都是根据当前可用许可证-申请许可证,查看是否足够返回。

接下来是它的重要的几个方法:

public void acquire(int permits) throws InterruptedException //获取permits数量的许可证,如果获取不到,线程就一直阻塞,但是线程可以被中断
public void acquireUninterruptibly()                     //获取许可证,如果获取不到就一直阻塞
public void release()                                   //归还许可证
public boolean tryAcquire()                             //尝试获取许可证
public boolean tryAcquire(int permits)                   //尝试获取permits数量的许可证
public boolean tryAcquire(long timeout, TimeUnit unit)     //尝试在timeout时间内获取许可证

​ 从源码上我们可以看出信号量几个特性:

  • 支持公平与非公平
  • 支持每次获取不同数量的许可证,也就是说我们每次获取的时候可以设置权重(也要注意归还相同数量的许可证)
  • 并不要求归还许可证的线程=获取许可证的线程

使用举例

​ 用Semaphore演示串行的场景


public class SemaphoreDemo1 {static Semaphore semaphore = new Semaphore(1);public static void main(String[] args) {Thread thread1 = new Thread(new SemaphoreRunnable());Thread thread2 = new Thread(new SemaphoreRunnable());thread1.start();thread2.start();}static class SemaphoreRunnable implements Runnable {@Overridepublic void run() {try {semaphore.acquire();} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + " ,我拿到了许可证~");try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + "释放了许可证");semaphore.release();}}
}


CountDownLatch

​ 这个词呢百度翻译出来就是倒计时门闩

源码分析

​ 照例看下源码注释对这个类的描述

A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.
A CountDownLatch is initialized with a given count. The await methods block until the current count reaches zero due to invocations of the countDown method, after which all waiting threads are released and any subsequent invocations of await return immediately. This is a one-shot phenomenon – the count cannot be reset. If you need a version that resets the count, consider using a CyclicBarrier.

​ 是种同步辅助机制,允许一个或者多个线程等待直到其他线程的业务逻辑执行完成。CountDownLatch 在初始化的时候有个计数器,调用一次就会倒计时,直到计数器为0就会释放所有的线程,并且,CountDownLatch 无法被重置,如果需要重复使用,用CyclicBarrier。

​ 这个类的初始化方法只有一个,就是传入计数器数量:

public CountDownLatch(int count)

​ 重要方法:

public void await() throws InterruptedException          //在计数器没到0之前一直等待,除非线程被中断
public boolean await(long timeout, TimeUnit unit)  throws InterruptedException  //在计数器没到0之前一直等待,除非线程被中断或者到了超时时间
public void countDown()                              //计数器-1
public long getCount()                               //获取当前计数器数量

使用举例

​ 这里演示个跑步比赛的场景:

  • 一共4名运动员,大家都准备好了等待裁判开枪

  • 4名运动员都到达重点后,终点守候的裁判发枪表明比赛结束

    public class CountDownLatchDemo {public static void main(String[] args) throws InterruptedException {CountDownLatch begin = new CountDownLatch(1);CountDownLatch end = new CountDownLatch(4);ExecutorService executorService = Executors.newFixedThreadPool(4);for (int i=0;i<4;i++){Runnable runnable = new Runnable() {@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + " 准备好了,等待裁判发枪");try {begin.await();System.out.println(Thread.currentThread().getName() + " 开始跑步");Thread.sleep((long) (Math.random() * 10000));System.out.println(Thread.currentThread().getName() + " 到达终点");} catch (InterruptedException e) {e.printStackTrace();}finally {end.countDown();}}};executorService.execute(runnable);}Thread.sleep(500);System.out.println("发令枪响,比赛开始!");begin.countDown();end.await();System.out.println("4名运动员都到终点,比赛结束");executorService.shutdown();}
    }
    


CyclicBarrier

​ 循环屏障,从名字就可以看出它作用跟CountDownLatch差不多,但是可以循环计数。

源码分析

​ 注释描述:

A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. The barrier is called cyclic because it can be re-used after the waiting threads are released.
A CyclicBarrier supports an optional Runnable command that is run once per barrier point, after the last thread in the party arrives, but before any threads are released. This barrier action is useful for updating shared-state before any of the parties continue.

​ 允许一组线程等待对方到达同一个障碍点

​ 可以在释放等待线程后重新使用

​ 支持可选参数Runnable,在最后一个线程到达之后但是在所有线程释放之前,所有的线程都运行一遍

​ 先看下CyclicBarrier有哪些成员变量:

 //可重入锁private final ReentrantLock lock = new ReentrantLock();/** 需要等待的条件Condition */private final Condition trip = lock.newCondition();/** 计数器数量,因为是可循环的,所以这里需要保留一份最初的值 */private final int parties;/* 最后一个到达的线程要执行的任务 */private final Runnable barrierCommand;/** 当前的generation */private Generation generation = new Generation();/*** 本次循环中的计数器值,为0就唤醒全部线程*/private int count;

​ 内部类generation,其实就是一个标识,标识本次循环状态,如果有线程被中断或者超时,又或者因为异常,broken就是true

    private static class Generation {boolean broken = false;}

两个构造函数:

public CyclicBarrier(int parties, Runnable barrierAction)    //计数器与最后一个到达的线程要执行的任务
public CyclicBarrier(int parties)

几个重要方法:

 public int await() throws InterruptedException, BrokenBarrierException      public int await(long timeout, TimeUnit unit)                   //有超时的等待throws InterruptedException,BrokenBarrierException,TimeoutException public void reset()                                         //重置屏障,会调用breakBarrier与nextGeneration方法private void breakBarrier()                               //打破屏障,将broken标记为true,重置计数器,唤醒所有线程private void nextGeneration()                                //重置计数器,唤醒所有线程,new一个新的Generation

如果await方法的线程不是最后一个线程,它将进入休眠状态,直到这四种情况之一被唤醒:

  • 最后一个线程到达

  • 其他线程中断当前线程

  • 其他线程中断了其余在等待的线程

  • 其他线程等待超时

  • 其他线程调用了reset方法

    CyclicBarrier的计数器递减其实是在隐藏在了await方法,await支持超时或者不超时,内部调用了doawait方法,我们具体看下实现逻辑:

​ 首先获取锁,然后计数器–;如果计数器–后为0,那么就会去执行我们传入的runnable,然后执行nextGeneration方法开启下一轮循环;如果在执行runnable方法中出错,就会去执行breakBarrier方法;如果当计数器不为0,就会继续等待(会根据是否允许超时判断处理逻辑);

使用举例

​ 一共8名小伙伴,都到了下班的时间了,大家都准备坐车回家。

​ 班车可以坐4个人,最后一个上车的人发现人满了可以发车后要跟司机喊一嗓子:车上人满啦,司机你快来开车送我们回家!

​ 等8名小伙伴都到家之后,再发出一个大家都平安到家的通知

public class CyclicBarrierDemo {public static void main(String[] args) throws InterruptedException {CountDownLatch end = new CountDownLatch(8);CyclicBarrier mid = new CyclicBarrier(4, new Runnable() {@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + " " +" 统计了下人数,当前班车人满了出发喽");}});for (int i = 0; i < 8; i++) {new Thread(new Task(mid,end)).start();}end.await();System.out.println("8位小伙伴都到家啦!");}static class Task implements Runnable {private CyclicBarrier cyclicBarrier;CountDownLatch end;public Task(CyclicBarrier cyclicBarrier, CountDownLatch end) {this.cyclicBarrier = cyclicBarrier;this.end = end;}@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + " 去做班车啦");try {Thread.sleep((long) (Math.random() * 10000));System.out.println("线程" + Thread.currentThread().getName() +"到了班车乘车点,等待其他线程");try {cyclicBarrier.await();} catch (BrokenBarrierException e) {e.printStackTrace();}System.out.println("线程" + Thread.currentThread().getName() +"   班车出发回家啦");} catch (InterruptedException e) {e.printStackTrace();}finally {end.countDown();}}}
}



Condition

​ Lock+Condition在之前的章节实现过管程模型

源码分析

​ Condition是个接口,是由抽象类AQS实现的,主要就包括了三个方法:await、signal和signalAll。

​ await:释放掉锁;同时线程处于wait状态直到被其他的线程唤醒或中断

​ signal:如果有多个线程都在这个条件这里等待,随机挑去一个唤醒

​ signalAll:唤醒等待这个条件的全部线程

使用举例

栗子一

​ 证明await的线程处于wait状态

示例代码
public class ConditionDemo1 {private ReentrantLock lock = new ReentrantLock();private Condition condition = lock.newCondition();public static void main(String[] args) throws InterruptedException {ConditionDemo1 conditionDemo1 = new ConditionDemo1();new Thread(new Runnable() {@Overridepublic void run() {try {Thread.sleep(1000);conditionDemo1.method2();} catch (InterruptedException e) {e.printStackTrace();}}}).start();conditionDemo1.method1();}void method1() throws InterruptedException {lock.lock();try {System.out.println("条件不满足,开始await");condition.await();System.out.println("条件满足了,开始执行后续的任务");} finally {lock.unlock();}}void method2() {lock.lock();try {System.out.println("准备工作完成,唤醒其他的线程");condition.signal();} finally {lock.unlock();}}
}

​ 在 condition.signal();处打断点,用Thread模式debug可以看到这时候的主线程状态是wait:

栗子二

​ 使用condition实现阻塞队列,阻塞队列有两个要求:

​ 队列为空不允许出队

​ 队列满不允许入队

​ 下面用Condition+ReetrantLock实现下由数组实现的有界阻塞队列

/*** @Description 用Condition实现阻塞队列* @Author Mirana* @Date 2021/11/22 15:32* @Version 1.0**/
public class ConditionBlockQueue<T> {final Lock lock = new ReentrantLock();//队列不空final Condition notEmpty = lock.newCondition();//队列不满final Condition notFull = lock.newCondition();//队列容量int capacity;//用数组实现的队列Object[] arr;//数组现在的容量int size;public ConditionBlockQueue(int capacity) {this.capacity = capacity;this.arr = new Object[capacity];this.size = 0;}public static void main(String[] args) throws InterruptedException {ConditionBlockQueue<Integer> conditionBlockQueue =new ConditionBlockQueue<>(1);new Thread(()->{System.out.println("你好,我是入队子线程:");try {conditionBlockQueue.offer(1);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第一个元素成功入队");try {conditionBlockQueue.offer(2);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第二个元素成功入队");}).start();try {Thread.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}new Thread(() -> {System.out.println("你好,我是出队子线程:");System.out.println(Thread.currentThread().getName() + "开始出队啦~");try {conditionBlockQueue.poll();System.out.println(Thread.currentThread().getName() + "出队完毕~");} catch (InterruptedException e) {e.printStackTrace();}}).start();}//入队void offer(T t) throws InterruptedException {lock.lock();try {while (size == capacity) {notFull.await();}//入队arr[size] = t;size++;//通知可以出队了notEmpty.signalAll();} finally {lock.unlock();}}//出队void poll() throws InterruptedException {lock.lock();try {while (size == 0) {notEmpty.await();}//出队size--;//通知可以入队了notFull.signalAll();} finally {lock.unlock();}}
}/*** @Description 用Condition实现阻塞队列* @Author Mirana* @Date 2021/11/22 15:32* @Version 1.0**/
public class ConditionBlockQueue<T> {final Lock lock = new ReentrantLock();//队列不空final Condition notEmpty = lock.newCondition();//队列不满final Condition notFull = lock.newCondition();//队列容量int capacity;//用数组实现的队列Object[] arr = new Object[capacity];//数组现在的容量int size;public ConditionBlockQueue(int capacity) {this.capacity = capacity;}//入队void offer(T t) throws InterruptedException {lock.lock();try {while (size == capacity) {notFull.await();}//入队size++;arr[size] = t;//通知可以出队了notEmpty.signalAll();} finally {lock.unlock();}}//出队void poll() throws InterruptedException {lock.lock();try {while (size == 0) {notEmpty.await();}//出队size--;//通知可以入队了notFull.signalAll();} finally {lock.unlock();}}
}

栗子三

​ 用Condition+PriorityQueue实现的生产者消费者模型,为什么是PriorityQueue呢,因为要选一个没有阻塞功能的队列

public class ConditionProducerConsumer {private int queueSize = 2;private PriorityQueue<Integer> queue =new PriorityQueue<>(queueSize);private Lock lock = new ReentrantLock();private Condition notFull = lock.newCondition();private Condition notEmpty = lock.newCondition();public static void main(String[] args) {ConditionProducerConsumer conditionDemo2 = new ConditionProducerConsumer();Producer producer = conditionDemo2.new Producer();Consumer consumer = conditionDemo2.new Consumer();producer.start();consumer.start();}class Producer extends Thread {@Overridepublic void run() {produce();}private void produce() {while (true) {lock.lock();try {while (queue.size() == queueSize) {System.out.println("队列满了,等待消费数据");try {notFull.await();} catch (InterruptedException e) {e.printStackTrace();}}queue.offer(queue.size());notEmpty.signalAll();System.out.println("生产者生产了数据,队列剩余空间:" + (queueSize - queue.size()));} finally {lock.unlock();}}}}class Consumer extends Thread {@Overridepublic void run() {consume();}private void consume() {while (true) {lock.lock();try {while (queue.size() == 0) {System.out.println("队列为空,等待生产者生产数据");try {notEmpty.await();} catch (InterruptedException e) {e.printStackTrace();}}queue.poll();notFull.signalAll();System.out.println("消费者消费掉一笔数据,队列剩余:" + queue.size());} finally {lock.unlock();}}}}
}

​ 没有设置需要生产消费几轮,所以就会一直生产消费。只要队列空,生产者就生产数据,消费者就消费数据

栗子四

​ Dubbo的同步转异步

​ 在TCP协议层面,发送完RPC请求后,线程数不会等待RPC的响应结果的,但是我们平常使用的RPC调用大多是同步的。那一定有人帮你做了同步转异步的事情。比如RPC框架——Dubbo:

​ 例如下面的例子,我们执行个sayHello方法,线程会停下来等待获取结果:

DemoService service = 初始化部分省略
String message = service.sayHello("dubbo");
System.out.println(message);

​ 如果此時dump綫程,会发现调用线程阻塞住了,状态是TIMED_WAITING,阻塞在了DefaultFuture.get方法

​ 那我们继续查看DefaultFuture.get方法做了什么事情:

// 创建锁与条件变量
private final Lock lock = new ReentrantLock();
private final Condition done = lock.newCondition();// 调用方通过该方法等待结果
Object get(int timeout){long start = System.nanoTime();lock.lock();try {while (!isDone()) {done.await(timeout);long cur=System.nanoTime();if (isDone() || cur-start > timeout){break;}}} finally {lock.unlock();}if (!isDone()) {throw new TimeoutException();}return returnFromResponse();
}
// RPC结果是否已经返回
boolean isDone() {return response != null;
}
// RPC结果返回时调用该方法
private void doReceived(Response res) {lock.lock();try {response = res;if (done != null) {done.signal();}} finally {lock.unlock();}
}

​ 当RPC结果返回时,会调用doReceived方法,在这个方法里:首先lock获取锁,然后在finally里释放锁。获取锁后,通过signal方法通知调用线程,结果已经返回,你可以继续执行了。


总结

​ Lock&Condition相比于synchronized+wait+notify,可以更灵活的实现管程;同时Dubbo中同步转异步也是通过Condition实现的;

​ Semaphore也是互斥方案之一,可以用信号量实现限流器——Hytrix的信号量隔离也是这么做的;

​ CountDownLatch与CyclicBarrier则是Java并发包提供的线程同步工具类,二者相似之处在于它们内部都维护了一个计数器,当计数器=0,就唤醒等待的线程;它们区别在于:

​ CountDownLatch适用于一个线程等待多个线程;CyclicBarrier适用于一组线程之间互相等待

​ CyclicBarrier的计数器是可以循环利用的,CountDownLatch不可以,当计数器为0了,下个线程会直接通过

​ CountDownLatch内部的计数器递减实际上是CAS实现的,需要我们手动调用CountDownLatch.countDown;CyclicBarrier则是通过可重入锁ReentrantLock,并且它在await方法中递减计数器:如果当前线程是最后一个线程,如果我们传入了回调函数,当前线程就会去执行回调函数,然后唤醒所有的线程

并发编程——线程协作相关推荐

  1. 判断线程是否执行完毕_Java并发编程 | 线程核心机制,基础概念扩展

    源码地址:GitHub || GitEE 一.线程基本机制 1.概念描述 并发编程的特点是:可以将程序划分为多个分离且独立运行的任务,通过线程来驱动这些独立的任务执行,从而提升整体的效率.下面提供一个 ...

  2. 高并发编程-线程通信_使用wait和notify进行线程间的通信2_多生产者多消费者导致程序假死原因分析

    文章目录 概述 jstack或者可视化工具检测是否死锁(没有) 原因分析 概述 高并发编程-线程通信_使用wait和notify进行线程间的通信 - 遗留问题 我们看到了 应用卡住了 .... 怀疑是 ...

  3. python 线程同步_Python并发编程-线程同步(线程安全)

    Python并发编程-线程同步(线程安全) 作者:尹正杰 版权声明:原创作品,谢绝转载!否则将追究法律责任. 线程同步,线程间协调,通过某种技术,让一个线程访问某些数据时,其它线程不能访问这些数据,直 ...

  4. Java 并发编程 -- 线程池源码实战

    一.概述 小编在网上看了好多的关于线程池原理.源码分析相关的文章,但是说实话,没有一篇让我觉得读完之后豁然开朗,完完全全的明白线程池,要么写的太简单,只写了一点皮毛,要么就是是晦涩难懂,看完之后几乎都 ...

  5. Java并发编程—线程间协作方式wait()、notify()、notifyAll()和Condition

    原文作者:Matrix海 子 原文地址:Java并发编程:线程间协作的两种方式:wait.notify.notifyAll和Condition 目录 一.wait().notify()和notifyA ...

  6. 并发编程线程通信之管道流

    前言 在并发编程中,需要处理两个问题:线程之间如何通信及线程之间如何同步.通知是指线程之间以何种机制来交换信息.在命令式编程中,线程之间的通信机制有两种:共享内存和消息传递. 在共享内存的并发模型里, ...

  7. C++并发编程线程间共享数据std::future和sd::promise

    线程间共享数据 使用互斥锁实现线程间共享数据 为了避免死锁可以考虑std::lock()或者boost::shared_mutex 要尽量保护更少的数据 同步并发操作 C++标准库提供了一些工具 可以 ...

  8. 并发编程——线程——锁

    并发编程中避免不了在同一时间对同一数据的更改,因此,对锁的使用变得尤为重要,什么时间.什么场景该用什么类型的锁都是有讲究的,接下来介绍几种常见的锁. 死锁现象 问题产生需求,在学新的锁之前先来看看我们 ...

  9. 【从入门到放弃-Java】并发编程-线程安全

    概述 并发编程,即多条线程在同一时间段内"同时"运行. 在多处理器系统已经普及的今天,多线程能发挥出其优势,如:一个8核cpu的服务器,如果只使用单线程的话,将有7个处理器被闲置, ...

最新文章

  1. CUDA运行时 Runtime(一)
  2. 第三章GIT使用入门
  3. Windows10下Python3做OpenGL的编程
  4. 关于Keil 的快速注释功能,并为其添加快捷键
  5. 3.1.3 覆盖与交换
  6. 学习OpenResty编程
  7. arthas的安装(在线/离线)和卸载
  8. Okhttp 插入缓存拦截器 解析
  9. 用matlab画出TFT,基于Matlab的TFT-LCD解码电路的仿真设计(含程序)
  10. UBUNTU安装EMQ
  11. docker视频教程 假装听听 应该还行
  12. linux经典学习网站及博客
  13. 我的无线路由器是红色的——N倍速的快感,初探Openwrt系统无线路由器
  14. Ipone桌面计算机没了,苹果笔记本桌面图标不见了怎么办
  15. 腾讯云为什么做不过阿里云?说腾讯云败了合适吗?
  16. 武汉大学计算机转专业2021,通知|关于做好2021年普通本科生转专业工作的通知...
  17. 关于pingpp(招行一网通)-混淆
  18. google android win10 ios,Flutter入门安装 ,win10 Android studio 教程
  19. 今日头条阅读量怎么刷_今日头条提升头条号阅读量的几大方法
  20. 在线考试系统软件测试总结,在线考试系统软件测试用例报告.doc

热门文章

  1. android 音乐文件删除,如何从华为手机恢复已删除的音频或音乐文件
  2. OpenCV特征检测出现Unhandled exception at……Access violation reading location 0x00000000.
  3. 币小秘:这些年,见过的带单老师们,这里有没有你踩过的坑?
  4. Spring IOC和DI 的学习资料(附带大师英文文章)
  5. 微信引流推广:美拍视频简单的引流方法分享
  6. DevOps 测试实践
  7. 怎样提高英语思维能力?
  8. java 过滤字符串_java 过滤字符串方法实现
  9. mybatis中显示更新数据成功 ,控制台显示成功,数据库数据却没有修改
  10. 微信小程序登录服务器失败,微信小程序后台登录一直失败