消费者服务消费延时分析
消费者服务背景
网络订单中有很多业务使用了mq,主要是为了流量高峰期业务的异步、削峰处理,提高业务的吞吐量。
消息生产消费处理机制
consumer server包含每个业务线的消息监听者。定时任务每隔1min扫描一次。线上问题产具体体现
本次线上生产问题http://wk.mweer.com/pages/viewpage.action?pageId=9332230
具体体现
订单号 1081801250请求回调接口
秒付服务接收到数据打印日志信息,写入消息队列
消息写入队列--消息消费中间差了2分多钟。
生产问题原因分析
结论:消费者服务中由于integer 传null 给int导致代码问题导致消息无限重投,导致消费者线程增多并且都堆积到阻塞队列LinkBlockQueue中,当任务的执行速度小于任务的创建速度,则会出现延时的情况。
过程:
1、消费者在spring中的配置<bean id="threadPoolTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"><property name="corePoolSize" value="50"/><property name="maxPoolSize" value="300"/><property name="queueCapacity" value="1000"/><property name="threadNamePrefix" value="mf-listener-"/><property name="allowCoreThreadTimeOut" value="true"></property> </bean><bean id="takeawayReceiverListener" class="cn.mwee.order.listener.order.TakeawayReceiverListener"></bean> <bean class="org.springframework.jms.listener.SimpleMessageListenerContainer"><property name="connectionFactory" ref="jmsFactory"/><property name="messageListener" ref="takeawayReceiverListener"/><property name="concurrentConsumers" value="15"/><property name="destinationName" value="QUEUE.TAKEAWAY.TO.APPORDER"/><property name="taskExecutor" ref="threadPoolTaskExecutor"/> </bean>
所有的消费者使用的是threadPoolTaskExecutor线程池,
Set the Spring TaskExecutor to use for executing the listener once* a message has been received by the provider.
@Override protected ExecutorService initializeExecutor(ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);ThreadPoolExecutor executor;if (this.taskDecorator != null) {executor = new ThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,queue, threadFactory, rejectedExecutionHandler) {@Overridepublic void execute(Runnable command) {super.execute(taskDecorator.decorate(command));}};}else {executor = new ThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,queue, threadFactory, rejectedExecutionHandler);}if (this.allowCoreThreadTimeOut) {executor.allowCoreThreadTimeOut(true);}this.threadPoolExecutor = executor;return executor; }/*** Create the BlockingQueue to use for the ThreadPoolExecutor.* <p>A LinkedBlockingQueue instance will be created for a positive* capacity value; a SynchronousQueue else.* @param queueCapacity the specified queue capacity* @return the BlockingQueue instance* @see java.util.concurrent.LinkedBlockingQueue* @see java.util.concurrent.SynchronousQueue*/ protected BlockingQueue<Runnable> createQueue(int queueCapacity) {if (queueCapacity > 0) {return new LinkedBlockingQueue<Runnable>(queueCapacity);}else {return new SynchronousQueue<Runnable>();} }
下面是ThreadPoolExecutor最核心的构造方法
构造方法参数讲解
参数名 作用 corePoolSize 核心线程池大小 maximumPoolSize 最大线程池大小 keepAliveTime 线程池中超过corePoolSize数目的空闲线程最大存活时间;可以allowCoreThreadTimeOut(true)使得核心线程有效时间 TimeUnit keepAliveTime时间单位 workQueue 阻塞任务队列 threadFactory 新建线程工厂 RejectedExecutionHandler 当提交任务数超过maxmumPoolSize+workQueue之和时,任务会交给RejectedExecutionHandler来处理
1.当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,关闭空闲线程
2.当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭结合代码以及grafana分析:
由于消息消费机制遇到异常会10秒重投,18号业务高峰期,从12:25-12:40开始消费者服务器的线程数从732上升到822新增了90个线程,期间就有客户反馈清台延时,而我们的消费者是共用一个线程池,核心线程为50,最大300,当线程数大于50,并且阻塞队列未满,以后会把新的消费者线程入队列等待,当任务的创建速度和处理速度差异很大,LinkedBlockingQueue会快速增涨,消费者执行也会有相应的延时。
优化方案
增加告警平台,针对消费者异常log的告警监控。
异常消息重投机制需要优化。
总结
Java中integer,int转换需要注意null类型。
消息重投需要考虑异常重试机制。
使用线程池的地方,当服务出现异常时,重点关注线程数量变化。
转载于:https://www.cnblogs.com/youngerliu/p/10445083.html
消费者服务消费延时分析相关推荐
- 我是如何将一个老系统的kafka消费者服务的性能提升近百倍的
大家好,又见面了~ kafka作为一种高吞吐量的分布式发布订阅消息系统,在业务系统中被广泛的使用. 如果问你,如何提高kafka队列中的消息消费速度呢? 答案很简单,topic多分几个分片,然后使用消 ...
- 源码分析Dubbo服务消费端启动流程
通过前面文章详解,我们知道Dubbo服务消费者标签dubbo:reference最终会在Spring容器中创建一个对应的ReferenceBean实例,而ReferenceBean实现了Spring生 ...
- 从源码角度分析RabbitMQ重启后,消费者停止消费怎么解决
前段时间的RabbitMQ broker服务端由于某个队列一直积压消息,运维在凌晨对mq服务端机器pod进行了扩容,重启了RabbitMQ,然后早上发现自己的服务在mq重启之后一直报异常,停止消费了, ...
- Consumer消息拉取和消费流程分析
1. 前言 MQConsumer是RocketMQ提供的消费者接口,从接口定义上可以看到,它主要的功能是订阅感兴趣的Topic.注册消息监听器.启动生产者开始消费消息. 消费者获取消息的模式有两种 ...
- 2022-2028年中国餐饮服务行业发展现状分析及市场前景预测报告
[报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了中国餐饮服务行业市场行业相关概述.中国餐饮服 ...
- 疫情启示 | 服务消费按下“暂停”键,生鲜消费重返“线上”
疫情启示 | 服务消费按下"暂停"键,生鲜消费重返"线上" 2020年的春节是如此不同寻常.伴随新冠肺炎疫情的爆发,全国范围内的线上.线下消费"冰火两 ...
- Spring Cloud Alibaba基础教程:几种服务消费方式(RestTemplate、WebClient、Feign)
热门:Spring Cloud Greenwich.RELEASE 正式发布!一个非常有看头的版本! 通过<Spring Cloud Alibaba基础教程:使用Nacos实现服务注册与发现&g ...
- Spring Cloud Alibaba基础教程:支持的几种服务消费方式(RestTemplate、WebClient、Feign)
通过<Spring Cloud Alibaba基础教程:使用Nacos实现服务注册与发现>一文的学习,我们已经学会如何使用Nacos来实现服务的注册与发现,同时也介绍如何通过LoadBal ...
- spring整合dubbo服务消费和发现入门示例
文章目录 准备 公共接口部分 服务提供 服务消费 准备 1.启动zookeeper 目录 其中interface包下是公共接口 测试预期结果: 订单服务web模块在A服务器,用户服务模块在B服务器,A ...
最新文章
- 浪潮的加班标语炸了,这是顶风作案?网传:1月加班87小时还要扣工资?
- Sword STL迭代器prev,next相关函数
- 举例说明信息熵、互信息的计算过程
- json中的转义字符和数字
- 周期获取Linux系统内存
- tomcat给android发图片,一步一步学会http获取tomcat服务端的图片,在android客户端显示...
- QT5开发及实例学习之十三Qt5文本编辑功能
- 编写Python脚本来获取Google搜索结果的示例
- 安装软件,竟然把UOS装崩溃了
- 基于itext的pdf拼接
- 爬虫练习案例:交通路况
- 高德离线地图瓦片坐标偏移纠偏
- yolo+ocr集装箱字符识别(pytorch版本)
- 推荐 2 个 Spring Boot 工作流项目,轻松搞定工作流!
- AliMe Chat: A Sequence to Sequence and Rerank based Chatbot Engine论文笔记
- java用数组显示周期性波形,常见的周期性变化波形有正弦波、三角波和矩形波。...
- string 拆分字符串
- 后序遍历链式二叉树(递归和非递归)
- 【英语】2月英语学习
- 【190222】VC局域网视频监控系统服务器源代码
热门文章
- Linux开启文件共享服务
- Nginx Parsing HTTP Package、header/post/files/args Sourcecode Analysis
- NHibernate之映射文件配置说明
- C++中static与const成员
- MySql 应该选择普通索引 还是唯一 索引???
- Project Life Cycle
- Django--工程搭建
- LeetCode14最长公共前缀
- (小技巧)Sql server查看sql语句的执行时间(转)
- (转)如何检查系统是否支持Zend Optimizer