文章系转载,便于整理和归纳,源文地址:https://cloud.tencent.com/developer/article/1805272

事件是达到解藕目的的手段之一。

最近项目出了一个线上故障,就是因事件引起的,有必要进行一次对事件知识点的梳理

现在公司都是使用的spring全家桶,所以技术面都会在spring boot,这方面我也是新手了,学习与总结并进,新旧知识连贯

此文包含几个知识点

  1. 事件实现原理
  2. 事件使用的几种方式事件,达到解藕目的的手段之一

最近项目出了一个线上故障,就是因事件引起的,有必要进行一次对事件知识点的梳理

事件是引子,背后还是线程池的知识点

说来也搞笑,在面试时,几轮面试官都问了线程池问题,说明此公司肯定有过线程池的事故,要么大家统一对线程池有了高度认知,却不然遇到的第一个故障就是线程池引起的

可见面试与实现差距有多大

现在公司都是使用的spring全家桶,所以技术面都会在spring boot,这方面我也是新手了,学习与总结并进,新旧知识连贯

此文包含几个知识点

  1. 事件实现原理
  2. 事件使用的几种方式
  3. 异步事件
  4. 故障始末

原理

事件实现原理其实就是个观察者模式,所以也没什么好说的

实现方式

此处实现都以spring boot为基础,约定大于配置,所以在spring boot项目中,配置文件大大减少,相对以前写大量代码、配置文件还真是不习惯,感觉苦日子过惯了,还真是不习惯,感觉心里不踏实

达到一个事件,需要三件东西:event、publisher、listener

而在spring boot中,使用几个注解就可以

注解方式

最常见最普通的事件

事件

一个简单的类

@Data
public class DemoEvent  {public DemoEvent(String data){this.eventData = data;}private String eventData;
}

监听者

一个简单的bean,再加上一个@EventListener,还有所在方法的参数是所需要监听的事件

@Component
@Slf4j
public class DemoEventListener {@EventListenerpublic void demoEventListener(DemoEvent demoEvent){log.info("demoevent listener,eventValue:{},order=2",demoEvent.getEventData());}
}

发布者

spring已经有了现成的ApplicationEventPublisher,而ApplicationContext就是实现者,因此只要是spring容器,都会有个context

实现监听接口

发布者不变了,事件与监听者换一种方式

监听者

实现ApplicationListener接口

@Component
@Slf4j
public class DemoEventApplictionListener implements ApplicationListener<AppEvent> {@Override//@Async@Order(3)public void onApplicationEvent(AppEvent demoEvent) {log.info("listener value:{},order=3",demoEvent.getEventData());}}

事件

因为使用了ApplicationListener,所以事件需要是ApplicationEvent的子类

@Data
public class AppEvent extends org.springframework.context.ApplicationEvent {private String eventData;public AppEvent(String source) {super(source);this.eventData = source;}
}

有顺序监听者

一个事件,可以有很多监听者,这些监听者需要按执行顺序执行

此时,监听者实现SmartApplicationListener,此接口中有getOrder()

@Component
@Slf4j
public class DemoEventSmartApplicationListener implements SmartApplicationListener {@Overridepublic boolean supportsEventType(Class<? extends ApplicationEvent> aClass) {return aClass == AppEvent.class;}@Overridepublic boolean supportsSourceType(Class<?> aClass) {return true;}@Overridepublic int getOrder() {return 1;}@Overridepublic void onApplicationEvent(ApplicationEvent applicationEvent) {AppEvent event = (AppEvent)applicationEvent;log.info("smartapplicationListener,value:{},order=1",event.getEventData());}
}

其实还有@Order注解,也是一样的,并不一定要实现此接口

异步事件

在此前的实现方式都是同步的,虽然解藕,但对性能没有多大提升

异步事件就不一样了,有时使用就是为了不拖累主流程

要达到异步,使用两个注解

一在入口使用@EnableAsync,二是在监听方法上加上@Async

异步线程

使用异步,那必然是需要线程池的。到此没有配置线程池,因此需要关注一下默认的线程池是什么样的

对于spring boot的异步实现原理在AsyncExecutionAspectSupport类中,实现方法:

获取Executor

protected AsyncTaskExecutor determineAsyncExecutor(Method method) {AsyncTaskExecutor executor = this.executors.get(method);if (executor == null) {Executor targetExecutor;String qualifier = getExecutorQualifier(method);if (StringUtils.hasLength(qualifier)) {targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);}else {targetExecutor = this.defaultExecutor.get();}if (targetExecutor == null) {return null;}executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));this.executors.put(method, executor);}return executor;}

没有配置qualifier,取默认线程池

protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {if (beanFactory != null) {try {// Search for TaskExecutor bean... not plain Executor since that would// match with ScheduledExecutorService as well, which is unusable for// our purposes here. TaskExecutor is more clearly designed for it.return beanFactory.getBean(TaskExecutor.class);}catch (NoUniqueBeanDefinitionException ex) {logger.debug("Could not find unique TaskExecutor bean", ex);try {return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);}catch (NoSuchBeanDefinitionException ex2) {if (logger.isInfoEnabled()) {logger.info("More than one TaskExecutor bean found within the context, and none is named " +"'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " +"as an alias) in order to use it for async processing: " + ex.getBeanNamesFound());}}}catch (NoSuchBeanDefinitionException ex) {logger.debug("Could not find default TaskExecutor bean", ex);try {return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);}catch (NoSuchBeanDefinitionException ex2) {logger.info("No task executor bean found for async processing: " +"no bean of type TaskExecutor and no bean named 'taskExecutor' either");}// Giving up -> either using local default executor or none at all...}}return null;
}

在TaskExecutionAutoConfiguration中有默认的线程池配置了

@Lazy@Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME,AsyncAnnotationBeanPostProcessor.DEFAULT_TASK_EXECUTOR_BEAN_NAME })@ConditionalOnMissingBean(Executor.class)public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {return builder.build();}

springboot1

在springboot1版本中,其实没有默认线程池的,在springboot2之后才有,可以看release-notes

Task Execution Spring Boot now provides auto-configuration for ThreadPoolTaskExecutor. If you are using @EnableAsync, your custom TaskExecutor can be removed in favor of customizations available from the spring.task.execution namespace. Custom ThreadPoolTaskExecutor can be easily created using TaskExecutorBuilder.

那么在springboot1中默认使用了哪个线程池呢?

在AsyncExecutionInterceptor中

@Override
@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {Executor defaultExecutor = super.getDefaultExecutor(beanFactory);return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}

可以看出如果没有配置线程池,就是SimpleAsyncTaskExecutor

这个类注释讲得很清楚

{@link TaskExecutor} implementation that fires up a new Thread for each task,executing it asynchronously.

protected void doExecute(Runnable task) {Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));thread.start();
}

有任务时,就是新建线程


线程池

事件本身没有太多的知识点,但对于线程池,需要再温习一下,在后面的故障分析中,关键知识点还是线程池

对于线程池的理解,从习得第一印象,认为线程池其实是对于线程与任务两者合并的消费生产者模式

但如果面试中,面试官询问如何理解线程池的?我也不会这样回答,这涉及到两人对于知识的共振范围,最好谈一些共识最大的点,利在于不踩坑,弊在于平凡不起眼

自jdk5开始,JDK引入了concurrent包,也就是大名鼎鼎的JUC,其中最常见ThreadPoolExecutor

常规知识点,线程池的几个重要参数

  1. 核心线程数:corePoolSize
  2. 最大线程数: maxPoolSize
  3. 线程空间时长: keepAliveTime
  4. 核心线程超时: allowCoreThreadTimeout,默认值false,若为true,空闲时,线程池中会没有线程
  5. 任务队列: workQueue
  6. 拒绝处理器: rejectedExecutionHandler

自带拒绝策略

  • AbortPolicy:默认策略,丢弃任务,抛RejectedExecutionException异常
  • DiscardPolicy:丢弃任务,但不抛异常
  • DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务
  • CallerRunsPolicy:由调用线程处理任务

线程池的工作原理,网上有很多资料,提交任务给线程池,大致流程:

  1. 当线程池中线程数量小于 corePoolSize 则创建线程,并处理请求
  2. 当线程池中线程数量大于等于 corePoolSize 时,则把请求放入 workQueue中,随着线程池中的核心线程们不断执行任务,只要线程池中有空闲的核心线程,线程池就从 workQueue 中取任务并处理
  3. 当 taskQueue 已存满,放不下新任务时则新建非核心线程入池,并处理请求直到线程数目达到 maximumPoolSize(最大线程数量设置值)
  4. 如果线程池中线程数大于 maximumPoolSize 则使用 RejectedExecutionHandler 来进行任务拒绝处理

参数设定

在日常使用过程中,需要关注的是各参数数值设定,那么如何设定呢?

一、线程数越多越好吗?

这个明显不是,比如单核CPU,设置上万个线程没有意义;而且线程过多,上下文切换,反而降低性能

这儿引出一个问题,单核能多线程吗?

进程是资源的基本单位,线程是cpu调度的基本单位

如果一个线程一直占用CPU,那肯定多线程无用,但总有等待时候,如IO,此时,多线程就有了用武之地

线程数小了,显示又达不到最大化CPU性能

二、队列容量越大越好吗?

显然也不是,常人都关注线程数的设定,但队列大小鲜有人问,如果队列过大,积压相当多的任务,必然导致响应时间过长

如果队列过小,甚至没有,那任务没有缓冲,可能造成线程快速扩张

线程数与队列容量得配合使用,怎么才是合理的参数呢?

最直接的方式,模拟线上请求进行压测,随着请求量增加,QPS上升,当到了一定的阀值之后,请求数量增加QPS并不会增加,或者增加不明显,同时请求的响应时间却大幅增加。这个阀值我们认为是最佳线程数

对于线程数量多少,有很多的设定理论依据

任务类型

以任务类型设定,这是常用来粗略估值的

  • IO密集型
  • CPU密集型

如果是IO密集型,一般是CPU数*2;IO密集型CPU使用率不高,可以让CPU等待IO的时候处理别的任务,充分利用CPU

如果是CPU密集型 ,一般是CPU数+1;CPU使用率高,若开过多线程,增加线程上下文切换次数,带来额外开销

公式

前人早就有了计算公式

一、《Java Concurrency in Practice》

这本书中作者给出了定义

比如平均每个线程CPU运行时间为0.5s,而线程等待时间(非CPU运行时间,比如IO)为1.5s,CPU核心数为8,那么根据上面这个公式估算得到:8 * (1+(1.5/0.5)=32

二、《Programming Concurrency on the JVM Mastering》

线程数=Ncpu/(1-阻塞系数):其中计算密集型阻塞系数为0,IO密集型阻塞系数接近1:

如果以这个示例使用第一个公式计算:2 * (1+(0.9/0.1)) =20,结果也是20

可以这么思考:两个公式表达不同,但实质一致

即Ncpu/(1-阻塞系数)=Ncpu*(1+w/c)

阻塞系数=w/(w+c),阻塞系数=阻塞时间/(阻塞时间+计算时间)

以第二个公式计算第一公式中的示例:8 / (1-(1.5/(1.5+0.5))) = 32

这样两个公式计算出的结果就是一样的

以公式计算法推算任务类型分类方式:

  • IO型:w/c≈1,公式算出2*cpu
  • CPU型:w/c≈0,公式算出1*cpu

公式的方法有了,如何确定公式中的变量值呢?

最原始的办法打印日志,等待CPU时间无非就是db,io,网络调用,在发包前,发包后打上日志,算得时间差,求出平均值

三、其它公式

设置的线程数 = 目标QPS/(1/任务实际处理时间) = QPS*每个任务处理时间

(核心线程就以平均qps定;最大线程就以最高qps定)

举例说明,假设目标QPS=100,任务实际处理时间0.2s,100 * 0.2 = 20个线程,这里的20个线程必须对应物理的20个CPU核心,否则将不能达到预估的QPS指标

队列大小 = 线程数 * (最大响应时间/任务实际处理时间)

假设目标最大响应时间为0.4s,计算阻塞队列的长度为20 * (0.4 / 0.2) = 40

这个公式有点难以理解,最初的公式应该是

线程数/任务实际处理时间 * 响应时间 = 队列大小

类似 速度*时间=长度


这么多的公式,死记硬背是不行的,简单理解一下,qps是每秒处理任务数,如果一个线程处理是1s,那么多少qpS,就需要多少线程了,若是处理0.2s,那1s可以处理5个任务了,也就是1/0.2,那线程数只需要qps/5了,也就是公式qps/(1/任务处理时间)

故障描述

有了上面的知识储备,现在回顾一下整个故障过程

起因

项目中使用的是springboot1,鉴于上面的线程实现,同事认为每次异步事件都是新创建线程,是很不合理的,这个不用讲,是的确很不合适,放着线程池不用,却使用最原始的新建线程。所以需要优化,使用线程池

过程

private Integer corePoolSize = 8;
private Integer maxPoolSize = 40;
private Integer queueCapacity = 200;

设置了线程池参数,因为机器是4核,所以就cpu*2,分析认为请求不多,所以看这个配置也没有大过

表象

发了版本之后,发现了问题,发现线上业务数据都不对,感觉所有的请求都失败了。

通过日志发现,请求正常进来了,但异步事件业务都没有正常执行执行到

分析

由结果来复盘整个事件,事件之中处理出了很多的问题,罗列一些,以鉴未来

1、全局性配置不可轻动

现在的系统是个遗留系统,业务复杂,代码交错。就是个正常健壮系统都不能随意变更,更何况现在不稳定,也没有理清根节,万不可动

动也得不仅有理论基础,还得实操配合,压力测试不可缺少

也得必须留有后手,如开关,如此次线程池没有配置拒绝策略

2、处理故障第一要务

发布后出现问题,再所难免,但一旦出现故障不是去分析故障,而是马上恢复业务提供能力:回滚

现在都是基于容器,完全就是留一台保留事故现场,留做后期分期

而不是放任故障蔓延,不停地分析争论原因

当然,回滚动作比较大,可以设置开关,这次更大问题是,开关有了,结果没有测试完备,没有生效

3、基础知识扎实

此次在处理事故中,开始不清楚springboot1的默认线程池,以为是默认的CachedThreadPool,在开关失效时,想通过修改线程池配置来修复问题,同事一直强调CPU就4核,设置大了,系统跟不上,也让我烦躁了,结果错乱地指挥把队列设置了无限大,更加加大了任务的延迟

线程的数量的确不宜过大,但也没说只能最大2*CPU,为CPU核数的2倍,为CPU核数的8倍,为CPU核数的32倍,法无定法

springboot event线程池总结相关推荐

  1. SpringBoot使用线程池

    SpringBoot使用线程池 软件环境 名称 版本号 jdk 1.8 springboot 2.1.6 maven 3.3.9 1.Java中创建线程池 只会介绍java中线程池的核心类Thread ...

  2. SpringBoot—自定义线程池及并发定时任务模板

    关注微信公众号:CodingTechWork,一起学习进步. 介绍   在项目开发中,经常遇到定时任务,今天通过自定义多线程池总结一下SpringBoot默认实现的定时任务机制. 定时任务模板 pom ...

  3. 【ThreadPoolTaskExecutor】 SpringBoot 的线程池的使用

    一.配置 ThreadPoolTaskExecutor 创建一个文件夹 config ,新建一个类 ThreadPoolConfig.java import org.springframework.c ...

  4. SpringBoot 引入线程池+Queue缓冲队列实现高并发下单业务

    点击关注公众号,利用碎片时间学习 主要是自己在项目中(中小型项目) 有支付下单业务(只是办理VIP,没有涉及到商品库存),目前用户量还没有上来,目前没有出现问题,但是想到如果用户量变大,下单并发量变大 ...

  5. springboot 初始化线程池_springboot项目中线程池的使用

    在application.properties添加线程池配置项 spring.task.scheduling.thread-name-prefix=SysAsyncExecutor_ spring.t ...

  6. SpringBoot 自定义线程池

    文章目录 一.自定义线程池 1. yml配置 2. 线程池配置属性类 3. 开启异步线程支持 4. 创建自定义线程池配置类 5. service逻辑层 6. controller控制层 7. 效果图 ...

  7. java springboot 监控线程池的状态

    @Autowiredprivate Executor personInfoTaskExecutor;/*** 监控线程池状态* @return*/@GetMapping("asyncExce ...

  8. Springboot线程池的使用和扩展

    我们常用ThreadPoolExecutor提供的线程池服务,springboot框架提供了@Async注解,帮助我们更方便的将业务逻辑提交到线程池中异步执行,今天我们就来实战体验这个线程池服务: 实 ...

  9. 【二十四】springboot使用EasyExcel和线程池实现多线程导入Excel数据

      springboot篇章整体栏目:  [一]springboot整合swagger(超详细 [二]springboot整合swagger(自定义)(超详细) [三]springboot整合toke ...

  10. 如何优雅的使用和理解线程池

    前言 平时接触过多线程开发的童鞋应该都或多或少了解过线程池,之前发布的<阿里巴巴 Java 手册>里也有一条: 可见线程池的重要性. 简单来说使用线程池有以下几个目的: 线程是稀缺资源,不 ...

最新文章

  1. Rocksdb DeleteRange实现原理
  2. 【渝粤题库】陕西师范大学180103市场营销学Ⅰ作业(高起专)
  3. 动态加载js文件以支持跨域脚本
  4. excel去重_数据处理之EXCEL的高效技巧分享
  5. CentOS下如何配置LAMP环境
  6. android preferenceActivity的用法
  7. C# 改变图片大小的功能代码片段 (wince5)
  8. 计算机表格制作ppt,计算机基础使用excel2003制作表格.ppt
  9. SMT贴片相关知识梳理
  10. Kubernetes系列教程(三)---纯三层网络方案
  11. 共射放大电路以及三极管三种接法的判断--共基?共集?共射?
  12. 【Android - 技术期刊】第002期
  13. 华为od一面面试算法
  14. UE4 获取目录下所有的图片转换成Texture2D并通过UMG显示出来
  15. ASP.NET中使用动态令牌进行安全认证
  16. 浅谈table布局和div布局的区别
  17. docker私有仓库registry(v2)的搭建
  18. ultraiso使用说明
  19. Hyperledger Fabric无排序组织以Raft共识算法启动多个Orderer服务、多组织共同运行维护Orderer服务
  20. 所有神经网络的关系和分类-附思维导图

热门文章

  1. JSP — request 内置对象
  2. git提交注释内容分行处理
  3. Python3 pygal 与 pygal_maps_world 绘制世界地图
  4. 《Systems Performance》阅读笔记及收获
  5. AngularJS 的常用特性(四)
  6. Hadoop2.x集群动态添加删除数据节点
  7. oracle中插入一个blob数据
  8. web安全day30:人人都要懂的LAMP--apache服务安装和配置
  9. IS-IS详解(十五)——IS-IS 认证
  10. IS-IS详解(九)——IS-IS 骨干区域与非骨干区域访问基础