2019独角兽企业重金招聘Python工程师标准>>>

一个countDown在多线程调度下使用不当的分享

1. 诡异的数据抖动

在一个需求开发过程中,由于有多角色需要获取每个角色下的菜单;结果出现了单角色下拉去菜单没问题,多角色情况下只有一个角色的菜单正常返回的问题。这个问题很忧伤,没有菜单如何进功能页面?

2. 怀疑是缓存

因为多角色下菜单采用了Redis缓存,故而怀疑是其中一个角色下的菜单是缓存失效,但是关闭掉缓存依然不起作用,排除缓存影响。

3. debug发现问题

通过增加日志输出,在关闭掉缓存的实时模式下,依然存在菜单时而有,时而没有的情况。证明应该是代码有问题。

在一个多线程调度调度服务类中,发现一个问题,即在debug到如下代码,会出现后续代码未执行完全,接口结果即被返回的情况。

//经过查询相关API,不会出现时序问题,可放心使用final CountDownLatch latch = new CountDownLatch(callableList.size());for(Callable callable :callableList){ListenableFuture<T> listenableFuture = threadPoolTaskExecutor.submitListenable(callable);listenableFuture.addCallback(new ListenableFutureCallback<T>() {@Overridepublic void onFailure(Throwable throwable) {//过早调用countDown BUGlatch.countDown();LogHelper.EXCEPTION.error("执行任务异常",throwable);if(futureCallback!=null){futureCallback.onFailure(throwable);}}@Overridepublic void onSuccess(T t) {//过早调用countDown BUGlatch.countDown();if(futureCallback!=null){futureCallback.onSuccess(t);}}});}

4. countDown调用时机不对

在调用远程接口返回后,立即执行 lacth.countDown(); 会立刻造成主线程阻塞释放,立即响应结果,丢失部分数据。如下图所示,在第二个任务获取数据处理完成后, 就立即调用latch.countDown(), 致使后续的回调还未执行。主线程在收到 latch的释放阻塞后,返回了不完整的数据结果。

改造后,将countDown放在回调执行完成之后,并放置在 try {} finally { }代码块之中,保证一定得到执行,防止抛异常后,主线程阻塞。 如下所示:

        final CountDownLatch latch = new CountDownLatch(callableList.size());for(Callable callable :callableList){ListenableFuture<T> listenableFuture = threadPoolTaskExecutor.submitListenable(callable);listenableFuture.addCallback(new ListenableFutureCallback<T>() {@Overridepublic void onFailure(Throwable throwable) {try{LogHelper.DEFAULT.info("多线程回调,latchCount="+latch.getCount());LogHelper.EXCEPTION.error("执行任务异常",throwable);if(futureCallback!=null){futureCallback.onFailure(throwable);}}finally {latch.countDown();}}@Overridepublic void onSuccess(T t) {try{LogHelper.DEFAULT.info("多线程回调成功,latchCount="+latch.getCount());if(futureCallback!=null){futureCallback.onSuccess(t);}}finally {latch.countDown();}}});}

5. 本次使用的多线程调度说明

使用Future阻塞模式,不会出现以上问题,使用future.get()的阻塞式获取,不需要CountDownLatch工具类配合使用。而且本次其实也可以使用Future来实现同样的功能。但是Future没有提供 onFailure, onSucess 这样的回调接口,考虑到易用性,采用了ListenableFuture;

本次采用的 spring的 ListenableFuture 方式回调来实现回调式聚合。在对CountDownLatch使用不当的情况下,出现了该问题。解决该问题后,功能运行正常。

本次提供的服务类 ThreadPoolExecutorService, 其主要的方法如下:


/*** 线程池服务类** @author David* @since 2018/5/15*/
public interface ThreadPoolExecutorService {/*** 执行多线程任务处理,并通过 futureCallback对结果进行回调处理* @param callableList  异步线程可执行的任务 List* @param futureCallback  异步回调* @param timeout 超时时间,毫秒* @param <T>*/<T> void execute(List<Callable<T>> callableList, ListenableFutureCallback<T> futureCallback, long timeout);/*** 执行多线程任务处理,并将结果聚合成一个List 进行返回** @param callableList   异步线程可执行的任务 List* @param failureCallback  失败的回调方法* @param skipNull        是否忽略Null 结果,如果忽略 null 不会添加到List 之中* @param timeout 超时时间,毫秒* @param <T>*/<T> List<T> executeAndMerge(List<Callable<T>> callableList, FailureCallback failureCallback, boolean skipNull, long timeout);/*** 执行多线程任务处理,并将结果List,聚合成一个List 进行返回** @param callableList 异步线程可执行的任务 List* @param failureCallback 失败的回调方法* @param timeout 超时时间,毫秒* @param <T>*/<T> List<T> executeAndMergeList(List<Callable<List<T>>> callableList, FailureCallback failureCallback, long timeout);

6. Future 和 ListenableFuture的区别

spring 或者 guava 的 ListenableFutrue其使用方式和机理应该是类似的,这里的说明是通用的。本次工具类使用的是spring自带的ListenableFutrue。

6.1 区别说明

ListenableFuture顾名思义就是可以监听的Future,它是对java原生Future的扩展增强。我们知道Future表示一个异步计算任务,当任务完成时可以得到计算结果。如果我们希望一旦计算完成就拿到结果展示给用户或者做另外的计算,就必须使用另一个线程不断的查询计算状态。这样做,代码复杂,而且效率低下。使用ListenableFuture帮我们检测Future是否完成了,如果完成就自动调用回调函数,这样可以让主线程不必阻塞,减少并发程序的复杂度。

6.4 spring ListenableFuture使用示例

一个简单的示例,如果要获得 ListenableFuture,则需要一个对Java线程池进行修饰过的线程池执行器。如下所示:

<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"><property name="corePoolSize" value="${threadpool.corePoolSize}" /><property name="keepAliveSeconds" value="${threadpool.keepAliveSeconds}" /><property name="maxPoolSize" value="${threadpool.maxPoolSize}" /><property name="queueCapacity" value="${threadpool.queueCapacity}" /><property name="rejectedExecutionHandler"><bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" /></property></bean>

guava的请参考以下方式进行修饰,这里不进行详细说明:

   //Java线程池的类型是可选的【根据场景自行构建】   ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());

得到修饰过的线程池执行器后,即可提交可Callable任务,得到 ListenableFuture 进行处理。

以下为计算1~5的3次方的结果并输出,不需要聚合结果,每个线程计算完毕后,立刻输出结果:

for(int i=1; i<=5; i++){final int num = i;ListenableFuture<Integer> listenableFuture = threadPoolTaskExecutor.submitListenable(new Callable<Integer>() {@Overridepublic Integer call(){return (int)Math.pow(num,3);}});listenableFuture.addCallback(new ListenableFutureCallback<Integer>() {@Overridepublic void onFailure(Throwable throwable) {LogHelper.EXCEPTION.error("处理失败", throwable);}@Overridepublic void onSuccess(Integer result) {System.out.println(num + "的3次方为:"+ result);}});}

6.2 是否可以添加多个callback

答案是肯定的,因为ListenableFuture 的addCallback是添加到ListenableFutureCallbackRegistry 一个注册中心。而注册中心底层是支持多个回调的。在6.3会具体介绍回调方法注册中心的处理逻辑。

 public void addCallback(ListenableFutureCallback<? super T> callback) {this.callbacks.addCallback(callback);}

6.3 addCallback是否需要考虑时序

guava 和 spring的 ListenableFuture 均做了时序兼容,在listenableFuture执行的任意时刻调用 addCallback 均可准确的执行回调。这里就不得不说在调用addCallback的时候,其实将回到方法注册到ListenableFutureCallbackRegistry(回调注册中心)。

6.3.1 ListenableFutureCallbackRegistry的特性有:

a. 记录了Callable的3种调度状态:
NEW(新建,还未执行),SUCCESS(返回成功),FAILURE(抛异常,失败)

b. 有两个处理队列,分别是:
successCallbacks:存储执行成功的回调方法;
failureCallbacks: 存储执行失败的回调方法(抛异常)。

c. 存储响应结果
将Callable<T> 返回的结果,也放在回调中心里面;

d. mutex 对象保证线程安全
有一个成员变量:private final Object mutex; 用来保证在添加回调任务,或者设置结果集的时候,注册中心是线程安全的。

其在添加回调任务的时候,处理流程为:

 synchronized(this.mutex) {...}

如上图所示,在添加回调任务的时候,会通过synchronized先获取 mutex 排它锁,保证处理的线程安全。继而判断任务的状态:

如果为NEW,标识任务还未执行完毕,这时候需要将任务先放入队列,待任务执行完毕再根据状态调用回调方法;

如果为SUCCESS,标识任务已经执行成功,不需要再放入队列,而是在当前线程中,直接调用 onSuccess方法;

如果为FAILURE, 标识任务已经执行失败,不需要再放入队列,而是在当前线程中,直接调用 onFailure方法;

此外,还有一个流程,即在Callable任务调度完成,返回结果后,如果未抛异常:对successCallbacks队列中的方法进行逐个回调;如果抛出异常,对failureCallbacks队列中的方法进行逐个回调。因流程较简单,这里只是简单说明。

6.4 说明

本文只分析到 ListenableFuture 的使用方式, CountDownLatch的调用时机和ListenableFuture 的特性。

因时间和篇幅有限,具体spring 或 guava在submitListenable 任务之后,内部处理逻辑并没有阐述。感兴趣的同学可以具体到源码查看其内部实行逻辑。

转载于:https://my.oschina.net/davidzhang/blog/1839206

一个countDown在多线程调度下使用不当的分享相关推荐

  1. 错过一个订单后,吐槽下自己(顺便分享下书单),剧终版

    事先啰嗦几句 1. 纯叙述分享,分享一点创业过程的经历.也当挖坟自我重新认识(大概就是把以前的经历又翻出来的意思). 2. 前面叙述有"摇尾乞怜"之嫌,所以采用分段叙述的方式,只想 ...

  2. 错过一个订单后,吐槽下自己(顺便分享下书单),欢迎交流

    2015年4月28日 12:05:26   一个人于办公室,错过一个不小订单后的感慨(130W,不算小了吧),想想该总结下自己了. 关于自己的两个阶段: 一.程序猿阶段. 不算好学的人在不算出名的高中 ...

  3. Ubuntu 系统下的phe.Pailliar同态加密,速度慢,无法多线程调度的问题

    Ubuntu 系统下的phe.Pailliar同态加密,速度慢,无法多线程调度的问题 问题描述: 在跑同态加密实验时发现一个有趣的现象! i9 12900KF的Ubuntu主机在进行Pailliar同 ...

  4. [一个经典的多线程同步问题]解决方案一:关键段CS

    前面提出了一个经典的多线程同步互斥问题,本篇将用关键段CRITICAL_SECTION来尝试解决这个问题. 本文先介绍如何使用关键段,然后再深层次的分析下关键段的实现机制和原理. 关键段CRITICA ...

  5. HttpClient在多线程环境下踩坑总结

    HttpClient在多线程环境下踩坑总结 问题现场 在多线程环境下使用HttpClient组件对某个HTTP服务发起请求,运行一段时间之后发现客户端主机CPU利用率呈现出下降趋势,而不是一个稳定的状 ...

  6. Win10与Win7 64位系统的CPU多线程调度差异

    最近在做兼容性测试的时候发现一个特别有趣的问题,在Win10上可以完美运行的C++程序在Win7上一运行就挂掉了,代码一模一样!在经过了两天没日没夜的调试后发现,Win10系统与Win7系统的CPU多 ...

  7. 多线程环境下的线程不安全问题(1)

    在不考虑多线程的情况下,很多类代码都是完全正确的,但是如果放在多线程环境下,这些代码就很容易出错,我们称这些类为 线程不安全类 .多线程环境下使用线程安全类 才是安全的. 下面是一个线程不安全类的例子 ...

  8. Java多线程之单例模式在多线程环境下的安全问题

    Java多线程之单例模式在多线程环境下的安全问题 目录: 单例模式基本概念 单线程下的单例模式 多线程下的单例模式 单例模式volatile分析 1. 单例模式基本概念 基本概念转载自:单例模式|菜鸟 ...

  9. 无锁编程[0]__多线程条件下的计数器__原子的加/减/与/或/异或操作__sync_fetch_and_add,__sync_add_and_fetch等

    多线程条件下的计数器是服务器开发的常用操作,比如异步请求sessionid的活动,通常我们会用: 1.加锁取sessionid 2.分段取sessionid (在初始化阶段完成多线程分段取sessio ...

最新文章

  1. [SDK文档]SDK简介
  2. 中盐总公司:盐业公司24小时配送保供应
  3. 通过随机数生成兑换码和概率生成随机数
  4. C语言函数不能返回局部变量的地址
  5. 掌握这四点核心思想,统计学才算入门
  6. java第二章复习_JAVA第二章知识点
  7. 同等学力申硕计算机科学与技术参考书,2017同等学力申硕计算机科学与技术综合备考规划...
  8. mysql主从同步错误记录。
  9. 分类问题的评估指标一览
  10. c软件查表获得电量代码_energy.c 源代码在线查看 - 基于单片机的多费率电能表源程序 资源下载 虫虫电子下载站...
  11. 年度光电领域盛会——CIOE中国光博会开幕在即!小枣君将全程在线直播!
  12. IAR调试stm8的优化设置
  13. 通过阿里P9代考这件事,聊聊职级
  14. html图片在表格平铺,CSS----层级、背景图片,表格
  15. 人工智能生成 logo 神器
  16. 谷歌三大核心技术:from--http://blog.csdn.net/together_cz/article/details/66969003
  17. mybatis报错:argument type mismatch
  18. 开发、测试、测试开发
  19. 学会自我管理有哪些好处?自我管理包括哪些内容?
  20. 计算机技术指标主频是指微机的时钟频率,计算机中央处理器(CPU)的主要性能指标...

热门文章

  1. linux 数据转换
  2. Java(ArrayList和LinkedList)、(HashTable与HashMap)、(HashMap、Hashtable、LinkedHashMap和TreeMap比较)
  3. 效果图底图 线框图_5分钟的线框图教程
  4. 一致性设计,而不是一致性
  5. 线框图用什么软件_为什么要在线框中着色?
  6. cannot find -lunwind-x86_64
  7. Android两个注意事项.深入了解Intent和IntentFilter(两)
  8. 全球五大顶级域名一周统计:7月第三周新增超9万个
  9. User Experience Kit
  10. RHCE课程-初级部分6、编辑工具VIM,网络配置,进程优先,日志文件简介。