假设现在有一个对账系统,需要优化一下。它的基本业务是用户通过在线商城下单,然后生成电子订单,保存在订单库;之后物流会生成派送单给用户发货,派送单保存在派送单库。为了防止漏派送或重复派送,对账系统每天会校验是否存在异常订单。

目前的对账系统的逻辑处理是先查询订单,然后查询派送单,之后对比订单和派送单,将差异写入差异库。流程如图所示:

            

对账系统的代码抽象之后的核心代码如下,就是在一个单线程里面循环查询订单、派送单,然后执行对账,最后将差异写入差异库。

while(存在未对账订单){// 查询未对账订单pos = getPOrders();// 查询派送单dos = getDOrders();// 执行对账操作diff = check(pos,dos);// 差异写入差异库save(diff);
}

利用并行优化对账系统

要想优化上面所说的对账系统的性能,首先要找到它的瓶颈所在。

目前的对账系统由于订单量和派送单量很大,所说义查询未对账订单 getPOrders() 和查询派送单 getDOrders() 相对较慢,如何优化下呢?现在它是一个单线程的(如图),优化的话我们首先可以想到 是否可以利用多线程并行处理
              
从图中可以看出该系统的瓶颈:查询未对账订单 getPOrders() 和查询派送单 getDOrders() 是否可以平行处理?可以。因为这两个操作没有先后顺序依赖,那么我们让它俩并行执行后,如图所示。可以看出,相比于单线程的执行过程,同等时间里,并行执行的吞吐量几乎是单线程的2倍。

              
现在,优化思路我们已经想好了,如何用代码实现呢?

我们先创建两个线程 T1 和 T2,并行执行查询未对账订单 getPOrders() 和查询派送单 getDOrders() 这两个操作。在主线程中执行对账操作 check() 和差异写入 save() 两个操作。不过需要注意:主线程需要等待线程 T1 和 T2 执行完才能执行 check() 和 save() 操作,所以我们可以通过调用 T1.join() 和 T2.join() 来实现等待,当 T1 和T2 线程退出时,调用 T1.join() 和 T2.join() 的主线程就会从阻塞态被唤醒,从而执行之后的 check() 和 save() 操作。

while(){// 查询未对账订单Thread T1 = new Thread(() -> {pos = getPOrders();});T1.start();// 查询派送单Thread T2 = new Thread(() ->{dos = getDOrders();});T2.start();// 等待T1,T2结束T1.join();T2.join();// 执行对账操作diff = check(pos,dos);// 差异写入差异库save(diff);
}

用 CountDownLatch 实现线程等待

上面的优化过程中,while循环里每次都会创建新的线程,这也是一个耗时操作,所以还可以继续优化:要是创建的线程可以循环利用就好了。这时可使用线程池来解决它。

康康代码吧。首先创建一个固定大小为2的线程池,之后在 while 循环里重复利用。不过这样看似完美,但是依然有个问题:主线程如何知道 getPOrders() 和 getDOrders() 这两个操作什么时候执行完?前面主线程通过调用 T1 和 T2 的 join() 方法来等待线程 T1 和 T2 退出,但是在线程池的方案里,线程不会退出,所以 join() 方法就失效了。

// 创建2个线程的线程池
Executor executor = Executor.newFixedThreadPool(2);while(存在未对账订单){// 查询未对账订单executor.execute(() -> {pos = getPOrders();});// 查询派送单executor.execute(() -> {dos = getDOrders();});// 如何实现等待???// 执行对账操作diff = check(pos,dos);// 差异写入差异库save(diff);
}

如何解决这个问题呢?聪明的你可能已经想到了计数器。没错,弄一个计数器,初始值设置为2,执行完 pos = getPOrders(); 后,计数器减1,执行完 dos = getDOrders(); 后,计数器再减1,在主线程里,等待计数器等于0;当计数器等于0时,说明两个查询操作执行完了。

不过,在Java并发包里已经提供了实现类似功能的工具列:CountDownLatch,我们可以直接使用。所以更建议下面这种方式。

下面的代码示例中,在 while 循环里,首先创建了一个 CountDownLatch,计数器初始值等于2,之后在 pos = getPOrders(); 和dos = getDOrders(); 两条语句后面对计数器执行减1操作,这个减1操作是通过调用 latch.countDown(); 来实现的。在主线程中,通过调用 larch.await() 来实现对计数器等于 0 的等待。

// 创建2个线程的线程池
Executor executor = Executor.newFixedThreadPool(2);while(存在未对账订单){// 计数器初始化为2CountDownLatch latch = new CountDownLatch(2);// 查询未对账订单executor.execute(() -> {pos = getPOrders();latch.countDown();});// 查询派送单executor.execute(() -> {dos = getDOrders();latch.countDown();});// 等待两个查询操作结束latch.await();// 执行对账操作diff = check(pos,dos);// 差异写入差异库save(diff);
}

进一步优化性能

前面我们用 CountDownLatch 实现线程等待,而且 getPOrders() 和 getDOrders() 这两个查询操作并行执行,但是他俩和对账操作 check() 和 save() 之间还是串行的。他们之间能也并行么?也就是再执行队长操作的时候,同时执行下一轮查询操作。我们用形象的方式理解一下:
              
根据需求我们可以了解到:队长操作依赖于查询操作的结果,就像 生产者-消费者 模式,两次查询操作是生产者,对账操作是消费者。既然是生产者-消费者 模式就需要有个队列,来保存生产者生产的数据,而消费者聪这个队列中取出数据进行消费。

如图是针对这个对账项目设计的队列。两个队列的元素之间有对应关系。订单查询操作将订单查询结果插入订单队列,派送单查询操作将派送但插入派送单队列,这两个队列的元素之间有一一对应的关系。两个队列的好处是,对账操作可以每次聪订单队列出一个元素,聪派送但队列出一个元素,然后对这两个元素执行对账操作,是不是井井有条的?
                 
下面再来看看如何用双队列实现完全的并行。一个线程T1 执行订单查询工作,一个线程T2 执行派送单查询工作,当线程T1 和T2都各自生产完1条数据时,通知线程T3执行对账操作。

下图就描述了上面的过程:线程T1和T2只有都生产完1条数据时,才能一起向下执行,也就是说,线程T1 和线程T2的工作步调要一致,相互等待。同时它们都生产完一条数据,还得能通知线程T3执行对账操作。
                  

用 CyclicBarrier 实现线程同步

上面的过程有两个问题:

  • 线程T1 和T2 如何做到步调一致?
  • 怎样做到能够通知线程T3?

Java并发包里提供了相关的工具类:CyclicBarrier

下面的代码中先创建了一个计数器初始值为2的 CyclicBarrier,需要注意的是创建 CyclicBarrier 的时候,传入了一个回调函数,当计数器减到0的时候,会调用这个函数。

线程T1负责查询订单,当查出一条时,调用 barrier.await() 来将计数器减1,同时等待计数器变为0;线程T2负责查询派送单,当查出一条时,调用 barrier.await() 将计数器减1,同时等待计数器变为0;当T1 和T2 都调用 barrier.await() 的时候,计数器会减到0,此时T1和T2就可以执行下一条语句了,同时会调用barrier的回调函数来执行对账操作。

CyclicBarrier 的计数器有自动重置功能,当减到0的时候,会自动重置你设置的初始值

// 订单队列
Vector<P> pos;
// 派送单队列
Vector<D> dos;
// 执行回调的线程池
Executor executor = Executor.newFixedThreadPool(1);
final CyclicBarrier barrier = new CyclicBarriere(2,()->{executor.execute(()->check());
});void check(){P p = pos.remove(0);D d = dos.remove(0);// 执行对账操作diff = check(p,d);// 差异写入差异库save(diff);
}void checkAll(){// 循环查询订单库Thread T1 = new Thread(()-> {while(存在未对账订单){// 查询订单库pos.add(getPOrders());// 等待barrier.await();}});T1.start();// 循环查询运单库Thread T2 = new Thread(()->{while(存在未对账订单){// 查询运单库dos.add(getDOrders());// 等待barrier.await();}});T2.start();
}

总结

CountDownLatch 和 CyclicBarrier 是 Java 并发包提供的两个非常易用的线程同步工具类,它们的区别是:CountDownLatch 主要用来解决一个线程等待多个线程的场景,可以类比旅游团团长要等所有游客到齐才能去下一个景点;同时它的计数器不能循环利用,一旦计数器减到0,再有线程调用 await() ,该线程会直接通过。而 CyclicBarrier 是一组线程之间相互等待CyclicBarrier的计数器是可以循环利用的,而且可以自动重置,同时它还有一个回调函数。

【多线程】CountDownLatch 和 CyclicBarrier:如何让多线程步调一致?相关推荐

  1. JAVA所有选手就位后比赛开始_Java多线程-CountDownLatch、CyclicBarrier、Semaphore

    上次简单了解了多线程中锁的类型,今天要简单了解下多线程并发控制的一些工具类了. 1. 概念说明: CountDownLatch:相当于一个待执行线程计数器,当计数减为零时表示所有待执行线程都已执行完毕 ...

  2. JUC 中的多线程协作工具类:CountDownLatch 和 CyclicBarrier

    点击上方 好好学java ,选择 星标 公众号 重磅资讯.干货,第一时间送达 今日推荐:干掉 Navicat:这个 IDEA 的兄弟真香!个人原创100W+访问量博客:点击前往,查看更多 最近在学习 ...

  3. Java多线程(八)之Semaphore、CountDownLatch、CyclicBarrier、Exchanger

    一.引言 Semaphore               :一个计数信号量 CountDownLatch          :一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线 ...

  4. java多线程下LongAdder、CountDownLatch、CyclicBarrier、Phaser 的用法

    前言 一文读懂java多线程下常用常考的阻塞方法LongAdder.CountDownLatch.CyclicBarrier.Phaser 包含演示代码 高并发模拟,性能比较实例代码 前言 LongA ...

  5. JUC多线程:CountDownLatch、CyclicBarrier、Semaphore同步器原理总结

    一.CountDownLatch: 1.什么是 CountDownLatch: CountDownLatch,闭锁,就是一个基于 AQS 共享模式的同步计数器,它内部的方法都是围绕 AQS 实现的.主 ...

  6. 多线程CountDownLatch

    import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.CountDownLatch ...

  7. Java的CountDownLatch和CyclicBarrier的理解和区别

    CountDownLatch和CyclicBarrier的功能看起来很相似,不易区分,有一种谜之的神秘.本文将通过通俗的例子并结合代码讲解两者的使用方法和区别. CountDownLatch和Cycl ...

  8. 秒懂 CountDownLatch 与 CyclicBarrier 使用场景

    作者 | pony-zi 来源 | https://blog.csdn.net/zzg1229059735/article/details/61191679 相信每个想深入了解多线程开发的Java开发 ...

  9. 并发工具类:CountDownLatch、CyclicBarrier、Semaphore

    在多线程的场景下,有些并发流程需要人为来控制,在JDK的并发包里提供了几个并发工具类:CountDownLatch.CyclicBarrier.Semaphore. 一.CountDownLatch ...

最新文章

  1. 测试报告-1.1组成和要点
  2. 手摸手入门前端--01.webpack4
  3. 虹影图片下载器(Preview)
  4. 转载《全国研究生考试专业课资料大全(部分资料)》
  5. Technical User Stories – What, When, and How?
  6. vue调用methods里的方法
  7. leetcode 697. 数组的度(hashmap)
  8. REVERSE-PRACTICE-BUUCTF-6
  9. 使用Reloader实现更新configmap后自动重启pod
  10. 【2019上海网络赛:K】Peekaboo(勾股数知c求a和b--数论)
  11. python的扩展库numpy如何安装_python如何安装numpy库
  12. Java中的sql语句代码拼接问题
  13. DELPHI获取硬盘、CPU、网卡序列号
  14. 【转载】SNMPv3 配置及snmpwalk命令信息获取
  15. 访谈,智能座舱开发中的人机交互与人机工程布置
  16. 用Python做一个连连看游戏辅助脚本,完整编程思路分享
  17. DPDK 20.11 meson build
  18. 搜索引擎网站提交入口
  19. Python - poetry(4)管理环境
  20. 如何设置苹果手机铃声

热门文章

  1. 移动端touch事件的处理
  2. MySQL实现类似Oracle的序列
  3. drupal ajax json异步调用
  4. 《Algorithms》Comparable 实现冒泡排序
  5. struts内的action方法自动提交
  6. Linux系统发行版本介绍(一)——CentOS介绍
  7. 从企业管理看报表软件的应用
  8. 从传统企业谈大数据的战略意义
  9. python--二叉树库函数
  10. jQuery里如何使用ajax发送请求