1. 消费者服务背景

    网络订单中有很多业务使用了mq,主要是为了流量高峰期业务的异步、削峰处理,提高业务的吞吐量。
     

  2. 消息生产消费处理机制


    consumer server包含每个业务线的消息监听者。定时任务每隔1min扫描一次。

  3. 线上问题产具体体现

    本次线上生产问题http://wk.mweer.com/pages/viewpage.action?pageId=9332230
    具体体现
    订单号 1081801250请求回调接口

    秒付服务接收到数据打印日志信息,写入消息队列

    消息写入队列--消息消费中间差了2分多钟。

  4. 生产问题原因分析

    结论:消费者服务中由于integer 传null 给int导致代码问题导致消息无限重投,导致消费者线程增多并且都堆积到阻塞队列LinkBlockQueue中,当任务的执行速度小于任务的创建速度,则会出现延时的情况。

    过程:
          1、消费者在spring中的配置

    <bean id="threadPoolTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"><property name="corePoolSize" value="50"/><property name="maxPoolSize" value="300"/><property name="queueCapacity" value="1000"/><property name="threadNamePrefix" value="mf-listener-"/><property name="allowCoreThreadTimeOut" value="true"></property>
    </bean><bean id="takeawayReceiverListener" class="cn.mwee.order.listener.order.TakeawayReceiverListener"></bean>
    <bean class="org.springframework.jms.listener.SimpleMessageListenerContainer"><property name="connectionFactory" ref="jmsFactory"/><property name="messageListener" ref="takeawayReceiverListener"/><property name="concurrentConsumers" value="15"/><property name="destinationName" value="QUEUE.TAKEAWAY.TO.APPORDER"/><property name="taskExecutor" ref="threadPoolTaskExecutor"/>
    </bean>

    所有的消费者使用的是threadPoolTaskExecutor线程池,

    Set the Spring TaskExecutor to use for executing the listener once* a message has been received by the provider.
    @Override
    protected ExecutorService initializeExecutor(ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);ThreadPoolExecutor executor;if (this.taskDecorator != null) {executor = new ThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,queue, threadFactory, rejectedExecutionHandler) {@Overridepublic void execute(Runnable command) {super.execute(taskDecorator.decorate(command));}};}else {executor = new ThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,queue, threadFactory, rejectedExecutionHandler);}if (this.allowCoreThreadTimeOut) {executor.allowCoreThreadTimeOut(true);}this.threadPoolExecutor = executor;return executor;
    }/*** Create the BlockingQueue to use for the ThreadPoolExecutor.* <p>A LinkedBlockingQueue instance will be created for a positive* capacity value; a SynchronousQueue else.* @param queueCapacity the specified queue capacity* @return the BlockingQueue instance* @see java.util.concurrent.LinkedBlockingQueue* @see java.util.concurrent.SynchronousQueue*/
    protected BlockingQueue<Runnable> createQueue(int queueCapacity) {if (queueCapacity > 0) {return new LinkedBlockingQueue<Runnable>(queueCapacity);}else {return new SynchronousQueue<Runnable>();}
    }

    下面是ThreadPoolExecutor最核心的构造方法

    构造方法参数讲解 

    参数名 作用
    corePoolSize 核心线程池大小
    maximumPoolSize 最大线程池大小
    keepAliveTime 线程池中超过corePoolSize数目的空闲线程最大存活时间;可以allowCoreThreadTimeOut(true)使得核心线程有效时间
    TimeUnit keepAliveTime时间单位
    workQueue 阻塞任务队列
    threadFactory 新建线程工厂
    RejectedExecutionHandler 当提交任务数超过maxmumPoolSize+workQueue之和时,任务会交给RejectedExecutionHandler来处理



    1.当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,关闭空闲线程 
    2.当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭 

    结合代码以及grafana分析:

    由于消息消费机制遇到异常会10秒重投,18号业务高峰期,从12:25-12:40开始消费者服务器的线程数从732上升到822新增了90个线程,期间就有客户反馈清台延时,而我们的消费者是共用一个线程池,核心线程为50,最大300,当线程数大于50,并且阻塞队列未满,以后会把新的消费者线程入队列等待,当任务的创建速度和处理速度差异很大,LinkedBlockingQueue会快速增涨,消费者执行也会有相应的延时。

  5. 优化方案

    增加告警平台,针对消费者异常log的告警监控。
    异常消息重投机制需要优化。

  6. 总结

    Java中integer,int转换需要注意null类型。
    消息重投需要考虑异常重试机制。
    使用线程池的地方,当服务出现异常时,重点关注线程数量变化。

转载于:https://www.cnblogs.com/youngerliu/p/10445083.html

消费者服务消费延时分析相关推荐

  1. 我是如何将一个老系统的kafka消费者服务的性能提升近百倍的

    大家好,又见面了~ kafka作为一种高吞吐量的分布式发布订阅消息系统,在业务系统中被广泛的使用. 如果问你,如何提高kafka队列中的消息消费速度呢? 答案很简单,topic多分几个分片,然后使用消 ...

  2. 源码分析Dubbo服务消费端启动流程

    通过前面文章详解,我们知道Dubbo服务消费者标签dubbo:reference最终会在Spring容器中创建一个对应的ReferenceBean实例,而ReferenceBean实现了Spring生 ...

  3. 从源码角度分析RabbitMQ重启后,消费者停止消费怎么解决

    前段时间的RabbitMQ broker服务端由于某个队列一直积压消息,运维在凌晨对mq服务端机器pod进行了扩容,重启了RabbitMQ,然后早上发现自己的服务在mq重启之后一直报异常,停止消费了, ...

  4. Consumer消息拉取和消费流程分析

    1. 前言 MQConsumer是RocketMQ提供的消费者接口,从接口定义上可以看到,它主要的功能是订阅感兴趣的Topic.注册消息监听器.启动生产者开始消费消息. ​ 消费者获取消息的模式有两种 ...

  5. 2022-2028年中国餐饮服务行业发展现状分析及市场前景预测报告

    [报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了中国餐饮服务行业市场行业相关概述.中国餐饮服 ...

  6. 疫情启示 | 服务消费按下“暂停”键,生鲜消费重返“线上”

    疫情启示 | 服务消费按下"暂停"键,生鲜消费重返"线上" 2020年的春节是如此不同寻常.伴随新冠肺炎疫情的爆发,全国范围内的线上.线下消费"冰火两 ...

  7. Spring Cloud Alibaba基础教程:几种服务消费方式(RestTemplate、WebClient、Feign)

    热门:Spring Cloud Greenwich.RELEASE 正式发布!一个非常有看头的版本! 通过<Spring Cloud Alibaba基础教程:使用Nacos实现服务注册与发现&g ...

  8. Spring Cloud Alibaba基础教程:支持的几种服务消费方式(RestTemplate、WebClient、Feign)

    通过<Spring Cloud Alibaba基础教程:使用Nacos实现服务注册与发现>一文的学习,我们已经学会如何使用Nacos来实现服务的注册与发现,同时也介绍如何通过LoadBal ...

  9. spring整合dubbo服务消费和发现入门示例

    文章目录 准备 公共接口部分 服务提供 服务消费 准备 1.启动zookeeper 目录 其中interface包下是公共接口 测试预期结果: 订单服务web模块在A服务器,用户服务模块在B服务器,A ...

最新文章

  1. 浪潮的加班标语炸了,这是顶风作案?网传:1月加班87小时还要扣工资?
  2. Sword STL迭代器prev,next相关函数
  3. 举例说明信息熵、互信息的计算过程
  4. json中的转义字符和数字
  5. 周期获取Linux系统内存
  6. tomcat给android发图片,一步一步学会http获取tomcat服务端的图片,在android客户端显示...
  7. QT5开发及实例学习之十三Qt5文本编辑功能
  8. 编写Python脚本来获取Google搜索结果的示例
  9. 安装软件,竟然把UOS装崩溃了
  10. 基于itext的pdf拼接
  11. 爬虫练习案例:交通路况
  12. 高德离线地图瓦片坐标偏移纠偏
  13. yolo+ocr集装箱字符识别(pytorch版本)
  14. 推荐 2 个 Spring Boot 工作流项目,轻松搞定工作流!
  15. AliMe Chat: A Sequence to Sequence and Rerank based Chatbot Engine论文笔记
  16. java用数组显示周期性波形,常见的周期性变化波形有正弦波、三角波和矩形波。...
  17. string 拆分字符串
  18. 后序遍历链式二叉树(递归和非递归)
  19. 【英语】2月英语学习
  20. 【190222】VC局域网视频监控系统服务器源代码

热门文章

  1. Linux开启文件共享服务
  2. Nginx Parsing HTTP Package、header/post/files/args Sourcecode Analysis
  3. NHibernate之映射文件配置说明
  4. C++中static与const成员
  5. MySql 应该选择普通索引 还是唯一 索引???
  6. Project Life Cycle
  7. Django--工程搭建
  8. LeetCode14最长公共前缀
  9. (小技巧)Sql server查看sql语句的执行时间(转)
  10. (转)如何检查系统是否支持Zend Optimizer