案例

一个微服务同一个分组消费同一个topic的kafka消息,不通业务通过key值区分,由于其中一个业务消息量大,偶尔会出现消费滞后的情况,导致当前微服务消费组出现大量消息积压情况,影响业务。

简单的说,即一个单线程消费,一个消息处理完毕,才会处理下一个。

Created with Raphaël 2.3.0 开始 消费者组 消息处理 消费成功? yes

分析

存在消息积压的场景:消息消费

    @Override@Consumer(topic = Const.KAFKA_TOPIC, key = Const.TOPIC_KEY)public boolean handle(String msg) throws Exception {OriginInfo msg = JSON.parseObject(msg, OriginInfo.class);return service.process(msg);}

其中, service.process(msg)方法流程比较复杂,存在多个查询和入库、更新操作,有时候由于数据库性能,这个过程会变慢,导致整体消息消费跟不上。

建议

持久化方案:消息先入库,再消费,去掉中间处理流程。

Created with Raphaël 2.3.0 开始 消费者组 消息入库 消费成功? yes

消息入库:

    @Override@Consumer(topic = Const.KAFKA_TOPIC, key = Const.TOPIC_KEY)public boolean handle(String msg) throws Exception {OriginInfo msg = JSON.parseObject(msg, OriginInfo.class);// 消息入库,后续通过定时调度消费消息service.insert(msg);return true;}

消息处理:可以构建一个单线程池,定时创建一个新线程丢到线程池,已确保消息顺序消费处理。

/*** kafka消息异步处理* @author loongshawn*/
public class MsgService implements Runnable {private static final Logger logger = LoggerFactory.getLogger(MsgService.class);private AService aService;public MsgService() {this.aService = (AService) SpringContextUtil.getBean("aService");}/*** 消息处理入口*/void process() {List<OriginInfo> list = new ArrayList<>();try {// 依据入库顺序读出消息记录,顺序消费list = aService.get();if (CollectionUtils.isEmpty(list)) {return;}} catch (Exception e) {logger.error("exception {}", e.toString());}for (OriginInfo item : list) {try {// 顺序消费aService.process(item);// 消费完毕,更新消息标记aService.update(item);} catch (Exception e) {logger.error("exception {}", e.toString());}}}@Overridepublic void run() {try {this.process();} catch (Exception e) {logger.error("exception {}", e.toString());}}
}

优化

由于此样例中呈现的是消息写入库,比如MySQL、Oracle等等,消息量几十万还能应对,但如果应对大规模比如数千万消息要写入库,上述方案就不行了。需要考虑引入缓存方式,降低数据库IO成本。

Created with Raphaël 2.3.0 开始 消费者组 消息缓存 消费成功? yes

三步走方案:

  • kafka消息---->写入redis缓存---->消费结束
  • 读redis缓存---->持久化至数据库
  • 读数据库---->处理消息

Kafka消息积压案例分析相关推荐

  1. kafka消息积压解决

    消息积压的解决方法 加强监控报警以及完善重新拉起任务机制,这里就不赘述了. 1.实时/消费任务挂掉导致的消费积压的解决方法 在积压数据不多和影响较小的情况下,重新启动消费任务,排查宕机原因. 如果消费 ...

  2. 微服务 消息中间件kafka消息丢失问题

    微服务 消息中间件kafka消息丢失问题 1. kafka消息丢失概述 1.1 kafka概述 1.2 kafka架构 1.3 kafka问题 2. kafka消息传递语义 3. kafka消息丢失问 ...

  3. RabbitMq——消息积压分析和解决思路

    文章目录 前言 消息积压产生的原因 消息积压问题解决 前言 专栏中之前进行了一系列各种模式的配置.使用和测试操作.但是都只是应用于使用阶段,暂未面向问题解决分析方向. 最近看了一篇资料,有大佬说到了消 ...

  4. 大型网站技术架构:核心原理与案例分析 mobi_大数据技术经典学习路线

    如果你看完有信心能坚持学习的话,那就当下开始行动吧! 点击链接加入群聊[大数据学习交流群]:想要在大数据这个领域汲取养分,让自己壮大成长.分享方向,行动以前先分享下一个大数据交流分享资源,欢迎想学习, ...

  5. java微服务案例分析_《Java深入微服务原理改造房产销售平台》知识点梳理与问答总结...

    一图胜千言 一图胜千言 既然已经在session有了,直接session取就行了,为什么要再放threadlocal,之后再从threadlocal清掉,一直从session拿不就行了? 答: 放在T ...

  6. Hadoop大数据平台开发与案例分析

    关于举办"Hadoop大数据平台开发与案例分析 "高级工程师 一.课程介绍 1. 需求理解 Hadoop 设计之初的目标就定位于高可靠性.高可拓展性.高容错性和高效性,正是这些设计 ...

  7. 19 kafka消息队列

    文章目录 19 kafka消息队列 一.kafka介绍 1.消息队列基本介绍 2.常用的消息队列介绍 3.消息队列的应用场景 4.消息队列的两种模式 5.kafka的基本介绍 6.kafka的架构介绍 ...

  8. 架构师的 36 项修炼第10讲:架构实战案例分析

    本课时的主题是架构案例分享,通过案例分析来加深对前面所学内容的理解.下面将分析三种不同的系统架构案例. 分析初创互联网公司的架构演化案例,看一个小的系统架构是如何演化成一个较为成熟的.能够承受百万级订 ...

  9. 解决kafka 消息堆积问题的排查及调优

    一.背景说明 深夜接到客户紧急电话,反馈腾讯云 kafka 中有大量消息堆积未及时消费.每分钟堆积近 100w 条数据.但是查看 ES 监控,各项指标都远还没到性能瓶颈.后天公司就要搞电商促销活动,到 ...

最新文章

  1. 2022-2028年中国XPS挤塑板行业市场全景评估及产业前景规划报告
  2. js模块化编程之CommonJS和AMD/CMD
  3. 五、唱歌不如跳舞(下)
  4. 虚拟机生命周期八招巧管理
  5. iptables复习记忆
  6. 随想录(B+树的实现)
  7. 5-Scala对象(Class)和类(Object)
  8. 17.1.1.3 Creating a User for Replication 创建一个用于用于复制:
  9. python深度学习库keras——各类网络层
  10. 正则表达式案例分析 (二)
  11. jracdrive变频器说明书580_ABB变频器ACS580说明书.pdf
  12. 企业信息化战略规划方法
  13. RuntimeError: cuda runtime error (999)
  14. 电脑IE图标删不掉怎么办
  15. table 点击文字按钮预览图片
  16. Excel 多条件筛选 与 数据透视表 实现
  17. luogu 题解 P1217 【[USACO1.5]回文质数 Prime Palindromes】
  18. error:command ‘gcc‘ failed with exit status 1 记录
  19. 苹果搜索广告ASA“保姆级”开户教程来袭!拿来吧你!
  20. python 等比例裁剪图片

热门文章

  1. 【Java】Java的各个版本和各个版本的历史版本号的关系与解读
  2. 2JS-操作BOM对象
  3. Android:动态使用权限(一)
  4. 六、Quartz-配置详解
  5. SECS连接模式中active与passive
  6. 合成大西瓜游戏|微信合成大西瓜游戏技巧及资源
  7. Python 绘制五角星 【初识Python】
  8. Windows SWIG 安装与部署
  9. SQLMap使用|命令大全(干货)
  10. Python毕业设计必备案例:【学生信息管理系统】