点击上方 好好学java ,选择 星标 公众号

重磅资讯、干货,第一时间送达
今日推荐:干掉 Navicat:这个 IDEA 的兄弟真香!个人原创100W+访问量博客:点击前往,查看更多

最近在学习 RocketMQ ,在发送异步消息的时候遇到了一些问题,我直接复现场景,异步消息生产者代码如下:

public class AsySendProducer {public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("test-producer-group");// 设置nameserver 地址producer.setNamesrvAddr("xxxxxxx:9876");//启动生产者producer.start();System.out.println("-------------启动服务--------------");// 设置异步发送失败重试次数,默认为2producer.setRetryTimesWhenSendAsyncFailed(0);for (int i = 0; i < 10; i++) {//final int index = i;// ID110:业务数据的ID,比如用户ID、订单编号等等Message msg = new Message("TopicTest","TagA","ID110",("Hello RocketMQ " +i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);producer.send(msg, new SendCallback() {/*** 发送成功的回调函数* 但会结果有多种状态,在SendStatus枚举中定义*/@Overridepublic void onSuccess(SendResult sendResult) {System.out.printf("%-10d OK %s%n", index,sendResult.getMsgId());}/*** 发送失败的回调函数*/@Overridepublic void onException(Throwable e) {System.out.printf("%-10d Exception %s %n", index, e);}});}//关闭生产者连接producer.shutdown();System.out.println("--------------关闭服务器-----------");}
}

如果您没有学过 MQ ,看不懂这段代码,没关系,我简单的讲一下,就是启动一个生产者,异步(多线程发送,不需要等待结果,直接返回,结果会回调我们的方法)发送 10 条消息,然后关闭生产者。运行 main 方法。结果如下:


结果并不是我们想要的,从结果中,我们可以获取到一些信息,生产者启动之后,紧接着生产者就被关闭了,导致后续发送消息时,生产者不可用,连接 RocketMQ 服务器失败。

很明显我们需要解决生产者关闭时机的问题,「需要保证生产者在消息都发送完成之后再关闭,那么我们可以如何保证?」

有一个常用的思路,简单直接,就是定义一个计数器,每发送完一条消息,计数器减一,当计数器为 0 时,说明消息全部发送完毕,这时候再把生产者关闭。这确实是一个不错的思路,但是我们并不需要自己实现一个计数器,因为在 JDK 中已经提供了。在 JUC 下有一个 CountDownLatch 类提供了类似的功能。

「CountDownLatch 是什么?」 CountDownLatch 提供的功能是「让一个或者多个线程一直等待,直到一组在其他线程中执行的操作完成」。这不就是我们需要的吗?我们需要让生产者等消息都发送完成后再关闭。

CountDownLatch 类实现了  AbstractQueuedSynchronizer 同步器,利用 AQS 中的 state 字段来判断其他线程是否完成。从源码入手了解 CountDownLatch 吧。

    /*** Synchronization control For CountDownLatch.* Uses AQS state to represent count.*/private static final class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 4982264981922014374L;Sync(int count) {setState(count);}int getCount() {return getState();}protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}protected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zerofor (;;) {int c = getState();if (c == 0)return false;int nextc = c-1;if (compareAndSetState(c, nextc))return nextc == 0;}}}

CountDownLatch 中实现 AQS 的内部类,实现的方法没有 Semaphore 类中的多,提供的功能也比 Semaphore 中的少。在 CountDownLatch 类中,主要提供了以下几个方法:

// 构造函数,count 为计数器
public CountDownLatch(int count);// 等待方法,会一直等下去
public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}// 带超时的等待方法,超时之后,count还没有到 0 ,则自动唤醒等待线下
public boolean await(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}// 计数器减一,重要方法
public void countDown() { sync.releaseShared(1);}

CountDownLatch 类比较简单,一共就 300 行代码,有兴趣的同学可以详细研究研究。我们将 CountDownLatch 应用到 RockerMQ 的异步消息发送中,改造起来也比较简单,无论消息发送成功 OR 失败,在回调方法中 调用CountDownLatch 的 countDown() 方法,在生产者关闭前调用 CountDownLatch 的 await() 方法,使生产者关闭线程处于阻塞状态。具体代码如下:

public class AsySendProducer {public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("test-producer-group");// 设置nameserver 地址producer.setNamesrvAddr("xxxxxx:9876");//启动生产者producer.start();System.out.println("-------------启动服务--------------");// 设置异步发送失败重试次数,默认为2producer.setRetryTimesWhenSendAsyncFailed(0);// 实例化一个 countDownLatch 实例,我们需要发送 10条消息,所以计数器为 10.CountDownLatch countDownLatch = new CountDownLatch(10);for (int i = 0; i < 10; i++) {//final int index = i;// ID110:业务数据的ID,比如用户ID、订单编号等等Message msg = new Message("TopicTest","TagA","ID110",("Hello RocketMQ " +i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);// 不管消息发送成功还是失败,CountDownLatch 中的计数器都需要减一producer.send(msg, new SendCallback() {/*** 发送成功的回调函数* 但会结果有多种状态,在SendStatus枚举中定义* @param sendResult*/@Overridepublic void onSuccess(SendResult sendResult) {System.out.printf(Thread.currentThread().getName() + " %-10d OK %s%n", index, sendResult.getMsgId());// 计数器减一countDownLatch.countDown();}/*** 发送失败的回调函数* @param e*/@Overridepublic void onException(Throwable e) {System.out.printf(Thread.currentThread().getName() + " %-10d Exception %s %n", index, e);// 计数器减一countDownLatch.countDown();}});}// 等待 countDownLatch 计数器为0,如果指定时间内计数器没有为0,则不等待了,直接唤醒等待线程。countDownLatch.await(5, TimeUnit.SECONDS);//关闭生产者连接producer.shutdown();System.out.println("--------------关闭服务器-----------");}
}

运行改造后的 producer 方法,结果如下:

引入 CountDownLatch 之后,我们的消息就发送正常了。

提到 CountDownLatch,那么肯定会联想到 「CyclicBarrier 类」

CyclicBarrier 类称作循环屏障,跟 CountDownLatch 的功能很相似,但是又不太一样,主要有以下几种区别:

  • 工作原理不一样,CountDownLatch 是使一批线程等待另一批线程执行完之后再执行,而 CyclicBarrier 是当等待的线程达到一定数目后,就开始继续工作。

  • CyclicBarrier 是可以循环使用的,而 CountDownLatch 类不一样。

  • 服务对象不一样,CountDownLatch 是等待后,其他线程继续执行,「而 CyclicBarrier 是等待线程数达到一定屏障后,自己继续执行」。就好比吃饭,等人到齐后,才开始吃,等的人是你,吃饭的人还是你。

  • 实现不一样,CountDownLatch 是基于 AQS 实现的,CyclicBarrier 是基于 ReentrantLock 实现的。

CyclicBarrier 类的源代码也不是太多,就不贴了,有兴趣的可以研究研究。直接实战一把。「在原有的异步消息中,添加一个功能,当所有的消息发送完成后,每个消息上报自己消息发送的情况」。代码如下:

public class AsySendProducer {public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("test-producer-group");// 设置nameserver 地址producer.setNamesrvAddr("xxxxx:9876");//启动生产者producer.start();System.out.println("-------------启动服务--------------");// 设置异步发送失败重试次数,默认为2producer.setRetryTimesWhenSendAsyncFailed(0);// 实例化一个 countDownLatch 实例,我们需要发送 10条消息,所以计数器为 10.CountDownLatch countDownLatch = new CountDownLatch(5);// 实例化 CyclicBarrierCyclicBarrier cyclicBarrier = new CyclicBarrier(5);for (int i = 0; i < 5; i++) {//final int index = i;// ID110:业务数据的ID,比如用户ID、订单编号等等Message msg = new Message("TopicTest","TagA","ID110",("Hello RocketMQ " +i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);// 不管消息发送成功还是失败,CountDownLatch 中的计数器都需要减一producer.send(msg, new SendCallback() {/*** 发送成功的回调函数* 但会结果有多种状态,在SendStatus枚举中定义* @param sendResult*/@Overridepublic void onSuccess(SendResult sendResult) {System.out.printf(Thread.currentThread().getName() + " %-10d OK %s%n", index, sendResult.getMsgId());countDownLatch.countDown();try {// 等待所有线程执行完cyclicBarrier.await();System.out.println(Thread.currentThread().getName()+":我执行成功!");} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}/*** 发送失败的回调函数* @param e*/@Overridepublic void onException(Throwable e) {System.out.printf(Thread.currentThread().getName() + " %-10d Exception %s %n", index, e);countDownLatch.countDown();try {// 等待所有线程执行完cyclicBarrier.await();System.out.println(Thread.currentThread().getName()+":我执行失败!");} catch (InterruptedException ex) {ex.printStackTrace();} catch (BrokenBarrierException ex) {ex.printStackTrace();}}});}//关闭生产者连接countDownLatch.await();producer.shutdown();System.out.println("--------------关闭服务器-----------");}
}

执行 main 方法,结果如下:


可以看出消息发送情况是在所有消息发送完成后才执行的。不信你可以试着将 CyclicBarrier 相关的代码注释,会得到不一样的结果。

当然这只是 CyclicBarrier 类的简单使用,您还可以试试 CyclicBarrier 的其他方法,CyclicBarrier 的个数设置的跟线程数不一致,看看会出现什么情况。「编程嘛,多动手试,什么都知道了」

CountDownLatch 和 CyclicBarrier 的介绍到这里就结束了,感谢您的阅读,希望这篇文章对您的学习或者工作有一点帮助。有收获的话,也可以帮忙推荐给其他的小伙伴,让更多的人受益,万分感谢。

最后,再附上我历时三个月总结的 Java 面试 + Java 后端技术学习指南,笔者这几年及春招的总结,github 1.1k star,拿去不谢!下载方式1. 首先扫描下方二维码
2. 后台回复「Java面试」即可获取

JUC 中的多线程协作工具类:CountDownLatch 和 CyclicBarrier相关推荐

  1. JAVA并发:并发工具类CountDownLatch、CyclicBarrier、Semaphore使用及源码分析

    在 JUC 下包含了一些常用的同步工具类,今天就来详细介绍一下,CountDownLatch,CyclicBarrier,Semaphore 的使用方法以及它们之间的区别. 1 CountDownLa ...

  2. 多线程十 JUC包下的常用工具类

    JUC包下的常用工具类 1. CountDownLatch-闭锁 2. CyclicBarrier-循环栅栏 3. Semaphore-信号量 4. Exchanger-线程数据交换器 这篇文章主要是 ...

  3. java多线程aqs实现工具类_Java并发多线程 - 并发工具类JUC

    (adsbygoogle = window.adsbygoogle || []).push({}); 安全共享对象策略 1.线程限制 : 一个被线程限制的对象,由线程独占,并且只能被占有它的线程修改 ...

  4. JUC 常用 4 大并发工具类

    欢迎关注方志朋的博客,回复"666"获面试宝典 什么是JUC? JUC就是java.util.concurrent包,这个包俗称JUC,里面都是解决并发问题的一些东西 该包的位置位 ...

  5. 线程工具类 - CountDownLatch(倒计时器)

    CountDownLatch官方文档 一.原理 CountDownLatch是一个非常实用的多线程控制工具类.Count Down在英文中意为倒计时,Latch意为门闩,可以简单的将CountDown ...

  6. Java之多线程下载工具类

    1.多线程下载工具类 import java.net.URL; import java.io.InputStream; import java.io.RandomAccessFile; import ...

  7. 14、详解java同步工具类CountDownLatch

    这篇文章主要讲解java中一个比较常用的同步工具类CountDownLatch,不管是在工作还是面试中都比较常见.我们将通过案例来进行讲解分析. 一.定义 CountDownLatch的作用很简单,就 ...

  8. java.lang包有哪些类_Java中Lang包的工具类有哪些

    Java中Lang包的工具类有哪些 发布时间:2020-12-08 16:15:36 来源:亿速云 阅读:76 作者:Leah 今天就跟大家聊聊有关Java中Lang包的工具类有哪些,可能很多人都不太 ...

  9. java运行python脚本_java中执行python脚本工具类详解

    java中执行python脚本工具类,需要jython.jar import java.io.FileInputStream; import java.io.IOException; import j ...

最新文章

  1. web服务器与网页表单通信,前端与后端通信的几种方式
  2. 表驱动设计的一点见解
  3. python def return 文件_python基础-文件处理与函数
  4. 从数据类型 nvarchar 转换为 numeric 时出错_Python数据分析类库系列Numpy之ndarray的数据类型...
  5. CoreCLR源码探索(五) GC内存收集器的内部实现 调试篇
  6. java treemap_Java TreeMap size()方法与示例
  7. 用计算机算出陈赫手机号码,陈赫手机号码遭《快本》曝光,并被网友打到关机!还有人搜到了他的支付宝账户......
  8. 极度偷懒 - 实现算命程序中tabcontrol的“美化”
  9. 2017北京国庆刷题Day2 afternoon
  10. c语言文件名错误的是,C语言程序错误,不能正常读写文件,求解啊
  11. Python批量下载MOOC课件
  12. 服务器口令怎么修改,畅捷通不能连接到服务器怎么修改口令
  13. 读取金税盘数据库_金税盘无法连接数据库是怎么回事
  14. 010 Editor 8.0.1 之 逆向分析及注册机编写
  15. ajax一般格式,ajax格式是什么样的?ajax教程
  16. java软件安装教程_r软件安装教程
  17. Android手机简易计时器(Chronometer实现)
  18. Overleaf使用技巧 (latex公式,latex表格,latex图片排版)
  19. qtableview 鼠标划过单元格弹出标签显示单元格内容
  20. 消费金融加速内卷,地推要求硕士起步…

热门文章

  1. .GRIDVIEW奇偶行变色
  2. find = in a string
  3. JavaScript与Asp.net传值
  4. linux 块设备驱动(二)——块设备数据结构
  5. 波卡链Substrate (7)Grandpa协议三“2阶段同步”
  6. cmake (2)其他指令
  7. 区块链BaaS云服务(24)秘猿科技CITA
  8. [architecture]-Cortex-A53的configuration signals
  9. [reference]-ARM/TEE/security等论文中的缩写和参考文献
  10. 类的实例方法静态方法类方法属性方法属性