Kafka消息积压案例分析
案例
一个微服务同一个分组消费同一个topic的kafka消息,不通业务通过key值区分,由于其中一个业务消息量大,偶尔会出现消费滞后的情况,导致当前微服务消费组出现大量消息积压情况,影响业务。
简单的说,即一个单线程消费,一个消息处理完毕,才会处理下一个。
分析
存在消息积压的场景:消息消费
@Override@Consumer(topic = Const.KAFKA_TOPIC, key = Const.TOPIC_KEY)public boolean handle(String msg) throws Exception {OriginInfo msg = JSON.parseObject(msg, OriginInfo.class);return service.process(msg);}
其中, service.process(msg)方法流程比较复杂,存在多个查询和入库、更新操作,有时候由于数据库性能,这个过程会变慢,导致整体消息消费跟不上。
建议
持久化方案:消息先入库,再消费,去掉中间处理流程。
消息入库:
@Override@Consumer(topic = Const.KAFKA_TOPIC, key = Const.TOPIC_KEY)public boolean handle(String msg) throws Exception {OriginInfo msg = JSON.parseObject(msg, OriginInfo.class);// 消息入库,后续通过定时调度消费消息service.insert(msg);return true;}
消息处理:可以构建一个单线程池,定时创建一个新线程丢到线程池,已确保消息顺序消费处理。
/*** kafka消息异步处理* @author loongshawn*/
public class MsgService implements Runnable {private static final Logger logger = LoggerFactory.getLogger(MsgService.class);private AService aService;public MsgService() {this.aService = (AService) SpringContextUtil.getBean("aService");}/*** 消息处理入口*/void process() {List<OriginInfo> list = new ArrayList<>();try {// 依据入库顺序读出消息记录,顺序消费list = aService.get();if (CollectionUtils.isEmpty(list)) {return;}} catch (Exception e) {logger.error("exception {}", e.toString());}for (OriginInfo item : list) {try {// 顺序消费aService.process(item);// 消费完毕,更新消息标记aService.update(item);} catch (Exception e) {logger.error("exception {}", e.toString());}}}@Overridepublic void run() {try {this.process();} catch (Exception e) {logger.error("exception {}", e.toString());}}
}
优化
由于此样例中呈现的是消息写入库,比如MySQL、Oracle等等,消息量几十万还能应对,但如果应对大规模比如数千万消息要写入库,上述方案就不行了。需要考虑引入缓存方式,降低数据库IO成本。
三步走方案:
- kafka消息---->写入redis缓存---->消费结束
- 读redis缓存---->持久化至数据库
- 读数据库---->处理消息
Kafka消息积压案例分析相关推荐
- kafka消息积压解决
消息积压的解决方法 加强监控报警以及完善重新拉起任务机制,这里就不赘述了. 1.实时/消费任务挂掉导致的消费积压的解决方法 在积压数据不多和影响较小的情况下,重新启动消费任务,排查宕机原因. 如果消费 ...
- 微服务 消息中间件kafka消息丢失问题
微服务 消息中间件kafka消息丢失问题 1. kafka消息丢失概述 1.1 kafka概述 1.2 kafka架构 1.3 kafka问题 2. kafka消息传递语义 3. kafka消息丢失问 ...
- RabbitMq——消息积压分析和解决思路
文章目录 前言 消息积压产生的原因 消息积压问题解决 前言 专栏中之前进行了一系列各种模式的配置.使用和测试操作.但是都只是应用于使用阶段,暂未面向问题解决分析方向. 最近看了一篇资料,有大佬说到了消 ...
- 大型网站技术架构:核心原理与案例分析 mobi_大数据技术经典学习路线
如果你看完有信心能坚持学习的话,那就当下开始行动吧! 点击链接加入群聊[大数据学习交流群]:想要在大数据这个领域汲取养分,让自己壮大成长.分享方向,行动以前先分享下一个大数据交流分享资源,欢迎想学习, ...
- java微服务案例分析_《Java深入微服务原理改造房产销售平台》知识点梳理与问答总结...
一图胜千言 一图胜千言 既然已经在session有了,直接session取就行了,为什么要再放threadlocal,之后再从threadlocal清掉,一直从session拿不就行了? 答: 放在T ...
- Hadoop大数据平台开发与案例分析
关于举办"Hadoop大数据平台开发与案例分析 "高级工程师 一.课程介绍 1. 需求理解 Hadoop 设计之初的目标就定位于高可靠性.高可拓展性.高容错性和高效性,正是这些设计 ...
- 19 kafka消息队列
文章目录 19 kafka消息队列 一.kafka介绍 1.消息队列基本介绍 2.常用的消息队列介绍 3.消息队列的应用场景 4.消息队列的两种模式 5.kafka的基本介绍 6.kafka的架构介绍 ...
- 架构师的 36 项修炼第10讲:架构实战案例分析
本课时的主题是架构案例分享,通过案例分析来加深对前面所学内容的理解.下面将分析三种不同的系统架构案例. 分析初创互联网公司的架构演化案例,看一个小的系统架构是如何演化成一个较为成熟的.能够承受百万级订 ...
- 解决kafka 消息堆积问题的排查及调优
一.背景说明 深夜接到客户紧急电话,反馈腾讯云 kafka 中有大量消息堆积未及时消费.每分钟堆积近 100w 条数据.但是查看 ES 监控,各项指标都远还没到性能瓶颈.后天公司就要搞电商促销活动,到 ...
最新文章
- 2022-2028年中国XPS挤塑板行业市场全景评估及产业前景规划报告
- js模块化编程之CommonJS和AMD/CMD
- 五、唱歌不如跳舞(下)
- 虚拟机生命周期八招巧管理
- iptables复习记忆
- 随想录(B+树的实现)
- 5-Scala对象(Class)和类(Object)
- 17.1.1.3 Creating a User for Replication 创建一个用于用于复制:
- python深度学习库keras——各类网络层
- 正则表达式案例分析 (二)
- jracdrive变频器说明书580_ABB变频器ACS580说明书.pdf
- 企业信息化战略规划方法
- RuntimeError: cuda runtime error (999)
- 电脑IE图标删不掉怎么办
- table 点击文字按钮预览图片
- Excel 多条件筛选 与 数据透视表 实现
- luogu 题解 P1217 【[USACO1.5]回文质数 Prime Palindromes】
- error:command ‘gcc‘ failed with exit status 1 记录
- 苹果搜索广告ASA“保姆级”开户教程来袭!拿来吧你!
- python 等比例裁剪图片