点击关注公众号,回复“2T”获取2TB学习资源!

互联网架构师后台回复 2T 有特别礼包

上一篇:互联网后端技术栈大全,建议收藏!

前言

本文针对解决Kafka不同Topic之间存在一定的数据关联时的顺序消费问题。

如存在Topic-insert和Topic-update分别是对数据的插入和更新,当insert和update操作为同一数据时,应保证先insert再update。

1、问题引入

kafka的顺序消费一直是一个难以解决的问题,kafka的消费策略是对于同Topic同Partition的消息可保证顺序消费,其余无法保证。

如果一个Topic只有一个Partition,那么这个Topic对应consumer的消费必然是有序的。不同的Topic的任何情况下都无法保证consumer的消费顺序和producer的发送顺序一致。

如果不同Topic之间存在数据关联且对消费顺序有要求,该如何处理?本文主要解决此问题。

另外,Kafka 系列面试题和答案全部整理好了,微信搜索互联网架构师,在后台发送:面试,可以在线阅读。

2、解决思路

现有Topic-insert和Topic-update,数据唯一标识为id,对于id=1的数据而言,要保证Topic-insert消费在前,Topic-update消费在后。想成为架构师,这份架构师图谱建议看看,少走弯路。

两个Topic的消费为不同线程处理,所以为了保证在同一时间内的同一数据标识的消息仅有一个业务逻辑在处理,需要对业务添加锁操作。

使用synchronized进行加锁的话,会影响无关联的insert和update的数据消费能力,如id=1的insert和id=2的update,在synchronized的情况下,无法并发处理,这是没有必要的,我们需要的是对于id=1的insert和id=1的update在同一时间只有一个在处理,所以使用细粒度锁来完成加锁的操作。

细粒度锁实现:https://blog.csdn.net/qq_38245668/article/details/105891161

PS:如果为分布式系统,细粒度锁需要使用分布式锁的对应实现。

在对insert和update加锁之后,其实还是没有解决消费顺序的问题,只是确保了同一时间只有一个业务在处理。 对于消费顺序异常的问题,也就是先消费了update再消费insert的情况。

处理方式:消费到update数据,校验库中是否存在当前数据(也就是是否执行insert),如果没有,就将当前update数据存入缓存,key为数据标识id,在insert消费时检查是否存在id对应的update缓存,如果有,就证明当前数据的消费顺序异常,需执行update操作,再将缓存数据移除。

3、实现方案

消息发送:

kafkaTemplate.send("TOPIC_INSERT", "1");
kafkaTemplate.send("TOPIC_UPDATE", "1");

监听代码示例:

KafkaListenerDemo.java

@Component
@Slf4j
public class KafkaListenerDemo {// 消费到的数据缓存private Map<String, String> UPDATE_DATA_MAP = new ConcurrentHashMap<>();// 数据存储private Map<String, String> DATA_MAP = new ConcurrentHashMap<>();private WeakRefHashLock weakRefHashLock;public KafkaListenerDemo(WeakRefHashLock weakRefHashLock) {this.weakRefHashLock = weakRefHashLock;}@KafkaListener(topics = "TOPIC_INSERT")public void insert(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) throws InterruptedException{// 模拟顺序异常,也就是insert后消费,这里线程sleepThread.sleep(1000);String id = record.value();log.info("接收到insert :: {}", id);Lock lock = weakRefHashLock.lock(id);lock.lock();try {log.info("开始处理 {} 的insert", id);// 模拟 insert 业务处理Thread.sleep(1000);// 从缓存中获取 是否存在有update数据if (UPDATE_DATA_MAP.containsKey(id)){// 缓存数据存在,执行updatedoUpdate(id);}log.info("处理 {} 的insert 结束", id);}finally {lock.unlock();}acknowledgment.acknowledge();}@KafkaListener(topics = "TOPIC_UPDATE")public void update(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) throws InterruptedException{String id = record.value();log.info("接收到update :: {}", id);Lock lock = weakRefHashLock.lock(id);lock.lock();try {// 测试使用,不做数据库的校验if (!DATA_MAP.containsKey(id)){// 未找到对应数据,证明消费顺序异常,将当前数据加入缓存log.info("消费顺序异常,将update数据 {} 加入缓存", id);UPDATE_DATA_MAP.put(id, id);}else {doUpdate(id);}}finally {lock.unlock();}acknowledgment.acknowledge();}void doUpdate(String id) throws InterruptedException{// 模拟 updatelog.info("开始处理update::{}", id);Thread.sleep(1000);log.info("处理update::{} 结束", id);}}

日志(代码中已模拟必现消费顺序异常的场景):

接收到update ::1
消费顺序异常,将update数据 1 加入缓存
接收到insert ::1
开始处理 1 的insert
开始处理update::1
处理update::1 结束
处理 1 的insert 结束

观察日志,此方案可正常处理不同Topic再存在数据关联的消费顺序问题。

版权声明:本文为CSDN博主「方片龙」的原创文章,原文链接:https://blog.csdn.net/qq_38245668/article/details/105900011

-End-

最后,关注公众号互联网架构师,在后台回复:2T,可以获取我整理的 Java 系列面试题和答案,非常齐全。

正文结束

推荐阅读 ↓↓↓

1.心态崩了!税前2万4,到手1万4,年终奖扣税方式1月1日起施行~

2.深圳一普通中学老师工资单曝光,秒杀程序员,网友:敢问是哪个学校毕业的?

3.从零开始搭建创业公司后台技术栈

4.程序员一般可以从什么平台接私活?

5.清华大学:2021 元宇宙研究报告!

6.为什么国内 996 干不过国外的 955呢?

7.这封“领导痛批95后下属”的邮件,句句扎心!

8.15张图看懂瞎忙和高效的区别!

面试必备!Kafka 怎么顺序消费?相关推荐

  1. 一看就会的kafka多线程顺序消费【内附Demo哦】

    Hello,这里是爱 Coding,爱 Hiphop,爱喝点小酒的 AKA 柏炎. Kafka是一个分布式的,支持多分区.多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的基于 ...

  2. Kafka 顺序消费方案

    欢迎关注方志朋的博客,回复"666"获面试宝典 来源:blog.csdn.net/qq_38245668/article/ details/105900011 前言 本文针对解决K ...

  3. videojs如何获取请求消息_中通消息平台 Kafka 顺序消费线程模型的实践与优化

    各类消息中间件对顺序消息实现的做法是将具有顺序性的一类消息发往相同的主题分区中,只需要将这类消息设置相同的 Key 即可,而 Kafka 会在任意时刻保证一个消费组同时只能有一个消费者监听消费,因此可 ...

  4. Apache Kafka-消费端_顺序消费的实现

    文章目录 概述 Code POM依赖 配置文件 生产者 消费者 单元测试 测试结果 源码地址 概述 一个partition同一个时刻在一个consumer group中只能有一个consumer in ...

  5. 重复订单号校验_吊打面试官系列重复消费、顺序消费、分布式事务

    你知道的越多,你不知道的越多 前言 消息队列在互联网技术存储方面使用如此广泛,几乎所有的后端技术面试官都要在消息队列的使用和原理方面对小伙伴们进行360°的刁难. 作为一个在互联网公司面一次拿一次Of ...

  6. Kafka丢数据、重复消费、顺序消费的问题

    面试官:今天我想问下,你觉得Kafka会丢数据吗? 候选者:嗯,使用Kafka时,有可能会有以下场景会丢消息 候选者:比如说,我们用Producer发消息至Broker的时候,就有可能会丢消息 候选者 ...

  7. 答读者问:Kafka顺序消费吞吐量下降该如何优化?

    大家好,我是威哥,<RocketMQ技术内幕>一书作者,荣获RocketMQ官方社区优秀布道师.CSDN2020博客执之星Top2等荣誉称号.目前担任中通快递技术平台部资深架构师,主要负责 ...

  8. Kafka 顺序消费 详解

    问题引入 Kafka 顺序消费一直是一个难以解决的问题,Kafka的消费策略是对于同Topic同Partition的消息可保证顺序消费,其余无法保证.如果一个Topic只有一个Partition,那么 ...

  9. 2020年 Java 最常见200+ 面试题全解析:面试必备

    Java 最常见200+ 面试题全解析:面试必备 如想了解更多更全面的Java必备内容可以阅读:所有JAVA必备知识点面试题文章目录: JAVA必备知识点面试题 序 言 在本篇文章开始之前,我想先来回 ...

  10. 「面试必备」常见Java面试题大综合 马云见了都点赞

    一.Java基础 1.Arrays.sort实现原理和Collections.sort实现原理 答:Collections.sort方法底层会调用Arrays.sort方法,底层实现都是TimeSor ...

最新文章

  1. 核方法---径向基函数网络
  2. 使用Keras进行深度学习:(二)CNN讲解及实践
  3. centos 安装java1.7_centOs安装jdk1.7
  4. 《JAVA与模式》之建造模式
  5. Python IDE:PyCharm中的那些实用功能
  6. JVM堆GC回收次数
  7. 30+个必知的《人工智能》会议清单
  8. 百余名欧洲议会议员发函 呼吁英国留在欧盟
  9. Teams的manifest文件开始支持多语言
  10. java property_property在Java中的用法
  11. windows本地script脚本恶意代码分析(带注释)
  12. BDS和GPS、电离层相关SSR数据解码
  13. 物联网(IOT)之常见物联网通信技术概览-有线篇
  14. GAN(生成对抗神经网络 )的一点思考
  15. cmd命令行乱码 oracle_Oracle查询中文乱码问题
  16. wacom数位板怎么调压感_怎么设置PS的画笔利用到数位板压感?
  17. -------已搬运------SQL注入的 过滤 思路 payload 万能密码
  18. 第一个小项目——坦克大战
  19. 【ESD专题】ESD和EOS有什么差异?
  20. linux系统添加网卡驱动,linux系统怎么安装网卡驱动

热门文章

  1. javascript对象的浅拷贝、深拷贝和Object.assign方法浅析
  2. Windows 2008-TS测试-TS GateWay
  3. 全球首个企业云计算平台初探
  4. Sublime Text 3 破解版 + 注册机 + 汉化包 + 教程
  5. 关于‘挖矿’minerd
  6. 《programming in scala》2ed chap9学习笔记
  7. 如何在WEBLOGIC中设置日志输入
  8. HP-UX 11i v2安装使用python 2.5.2
  9. 不少同学想要放弃秋招了........
  10. UNIX环境高级编程之第7章:进程环境