欢迎关注方志朋的博客,回复”666“获面试宝典

来源:blog.csdn.net/qq_38245668/article/

details/105900011

前言

本文针对解决Kafka不同Topic之间存在一定的数据关联时的顺序消费问题。如存在Topic-insert和Topic-update分别是对数据的插入和更新,当insert和update操作为同一数据时,应保证先insert再update。

1、问题引入

kafka的顺序消费一直是一个难以解决的问题,kafka的消费策略是对于同Topic同Partition的消息可保证顺序消费,其余无法保证。如果一个Topic只有一个Partition,那么这个Topic对应consumer的消费必然是有序的。不同的Topic的任何情况下都无法保证consumer的消费顺序和producer的发送顺序一致。

如果不同Topic之间存在数据关联且对消费顺序有要求,该如何处理?本文主要解决此问题。

2、解决思路

现有Topic-insert和Topic-update,数据唯一标识为id,对于id=1的数据而言,要保证Topic-insert消费在前,Topic-update消费在后。

两个Topic的消费为不同线程处理,所以为了保证在同一时间内的同一数据标识的消息仅有一个业务逻辑在处理,需要对业务添加锁操作。 使用synchronized进行加锁的话,会影响无关联的insert和update的数据消费能力,如id=1的insert和id=2的update,在synchronized的情况下,无法并发处理,这是没有必要的,我们需要的是对于id=1的insert和id=1的update在同一时间只有一个在处理,所以使用细粒度锁来完成加锁的操作。

细粒度锁实现:https://blog.csdn.net/qq_38245668/article/details/105891161

PS:如果为分布式系统,细粒度锁需要使用分布式锁的对应实现。

在对insert和update加锁之后,其实还是没有解决消费顺序的问题,只是确保了同一时间只有一个业务在处理。 对于消费顺序异常的问题,也就是先消费了update再消费insert的情况。

处理方式:消费到update数据,校验库中是否存在当前数据(也就是是否执行insert),如果没有,就将当前update数据存入缓存,key为数据标识id,在insert消费时检查是否存在id对应的update缓存,如果有,就证明当前数据的消费顺序异常,需执行update操作,再将缓存数据移除。

3、实现方案

消息发送:

kafkaTemplate.send("TOPIC_INSERT", "1");
kafkaTemplate.send("TOPIC_UPDATE", "1");

监听代码示例:

KafkaListenerDemo.java

@Component
@Slf4j
public class KafkaListenerDemo {// 消费到的数据缓存private Map<String, String> UPDATE_DATA_MAP = new ConcurrentHashMap<>();// 数据存储private Map<String, String> DATA_MAP = new ConcurrentHashMap<>();private WeakRefHashLock weakRefHashLock;public KafkaListenerDemo(WeakRefHashLock weakRefHashLock) {this.weakRefHashLock = weakRefHashLock;}@KafkaListener(topics = "TOPIC_INSERT")public void insert(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) throws InterruptedException{// 模拟顺序异常,也就是insert后消费,这里线程sleepThread.sleep(1000);String id = record.value();log.info("接收到insert :: {}", id);Lock lock = weakRefHashLock.lock(id);lock.lock();try {log.info("开始处理 {} 的insert", id);// 模拟 insert 业务处理Thread.sleep(1000);// 从缓存中获取 是否存在有update数据if (UPDATE_DATA_MAP.containsKey(id)){// 缓存数据存在,执行updatedoUpdate(id);}log.info("处理 {} 的insert 结束", id);}finally {lock.unlock();}acknowledgment.acknowledge();}@KafkaListener(topics = "TOPIC_UPDATE")public void update(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) throws InterruptedException{String id = record.value();log.info("接收到update :: {}", id);Lock lock = weakRefHashLock.lock(id);lock.lock();try {// 测试使用,不做数据库的校验if (!DATA_MAP.containsKey(id)){// 未找到对应数据,证明消费顺序异常,将当前数据加入缓存log.info("消费顺序异常,将update数据 {} 加入缓存", id);UPDATE_DATA_MAP.put(id, id);}else {doUpdate(id);}}finally {lock.unlock();}acknowledgment.acknowledge();}void doUpdate(String id) throws InterruptedException{// 模拟 updatelog.info("开始处理update::{}", id);Thread.sleep(1000);log.info("处理update::{} 结束", id);}}

日志(代码中已模拟必现消费顺序异常的场景):

接收到update ::1
消费顺序异常,将update数据 1 加入缓存
接收到insert ::1
开始处理 1 的insert
开始处理update::1
处理update::1 结束
处理 1 的insert 结束

观察日志,此方案可正常处理不同Topic再存在数据关联的消费顺序问题。

热门内容:
  • 面试官:private修饰的方法可以通过反射访问,那么private的意义是什么?

  • 最新 955 不加班的公司名单(2022版)

  • SpringCloud 微服务架构,适合接私活(附源码)

  • 一款基于 Spring Boot 的现代化社区

  • Spring Boot 3.0 M1 发布,正式弃用 Java 8,最低要求 Java 17。。。

最近面试BAT,整理一份面试资料《Java面试BAT通关手册》,覆盖了Java核心技术、JVM、Java并发、SSM、微服务、数据库、数据结构等等。
获取方式:点“在看”,关注公众号并回复 666 领取,更多内容陆续奉上。

明天见(。・ω・。)ノ♡

Kafka 顺序消费方案相关推荐

  1. videojs如何获取请求消息_中通消息平台 Kafka 顺序消费线程模型的实践与优化

    各类消息中间件对顺序消息实现的做法是将具有顺序性的一类消息发往相同的主题分区中,只需要将这类消息设置相同的 Key 即可,而 Kafka 会在任意时刻保证一个消费组同时只能有一个消费者监听消费,因此可 ...

  2. Kafka 顺序消费 详解

    问题引入 Kafka 顺序消费一直是一个难以解决的问题,Kafka的消费策略是对于同Topic同Partition的消息可保证顺序消费,其余无法保证.如果一个Topic只有一个Partition,那么 ...

  3. 答读者问:Kafka顺序消费吞吐量下降该如何优化?

    大家好,我是威哥,<RocketMQ技术内幕>一书作者,荣获RocketMQ官方社区优秀布道师.CSDN2020博客执之星Top2等荣誉称号.目前担任中通快递技术平台部资深架构师,主要负责 ...

  4. 一看就会的kafka多线程顺序消费【内附Demo哦】

    Hello,这里是爱 Coding,爱 Hiphop,爱喝点小酒的 AKA 柏炎. Kafka是一个分布式的,支持多分区.多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的基于 ...

  5. kafka的消费顺序_Kafka原理和实践云平台技术栈13

    导读:之前发布了云平台技术栈(ps:点击可查看),本文主要说一下其中的Kafka! 作者:阿龙 cnblogs.com/along21/p/10278100.htm 1.认识 Kafka 1.1 Ka ...

  6. Kafka丢数据、重复消费、顺序消费的问题

    面试官:今天我想问下,你觉得Kafka会丢数据吗? 候选者:嗯,使用Kafka时,有可能会有以下场景会丢消息 候选者:比如说,我们用Producer发消息至Broker的时候,就有可能会丢消息 候选者 ...

  7. kafka生产消费原理笔记

    一.什么是kafka Kafka是最初由Linkedin公司开发,是一个分布式.支持分区的(partition).多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性 ...

  8. 重复订单号校验_吊打面试官系列重复消费、顺序消费、分布式事务

    你知道的越多,你不知道的越多 前言 消息队列在互联网技术存储方面使用如此广泛,几乎所有的后端技术面试官都要在消息队列的使用和原理方面对小伙伴们进行360°的刁难. 作为一个在互联网公司面一次拿一次Of ...

  9. 字节跳动面试官这样问消息队列:高可用、不重复消费、可靠传输、顺序消费、消息堆积,我整理了下

    写在前面 又到了年底跳槽高峰季,很多小伙伴出去面试时,不少面试官都会问到消息队列的问题,不少小伙伴回答的不是很完美,有些小伙伴是心里知道答案,嘴上却没有很好的表达出来,究其根本原因,还是对相关的知识点 ...

最新文章

  1. 一种定位内存泄露的方法(Linux)
  2. Java程序员必看!2021Java大厂面试知识分享
  3. 【计算机网络】传输层 : TCP 连接管理 ( TCP 连接建立 | 三次握手 | TCP 连接释放 | 四次挥手 )
  4. 【内网安全】域横向PTHPTKPTT哈希票据传递
  5. C++ scanf()函数安全性问题
  6. 访问数,每次访问页数,平均停留时间,跳出率
  7. A饭福利,AMD Mantle API获众多游戏开发商青睐!
  8. PHP之MVC项目实战(三)
  9. UILabel(富文本)
  10. 125w短波通信距离_125W军用自主选频短波电台
  11. ueditor插入自定义内容和样式
  12. 算法设计与分析-动态规划
  13. c语言编程实现基2-fft,fft算法研究及基2fft算法的c语言实现.doc
  14. AD7705模数转换芯片工作原理
  15. RLC串联谐振那些事
  16. 【高通SDM660平台】(2) --- Camera Kernel 驱动层代码逻辑分析
  17. C语言:甲乙丙三人放鞭炮,求鞭炮响声问题
  18. Window如何进行日志分析
  19. 手机密码大全及国产贴牌与OEM型号对照表
  20. hbuildx打包 vue3项目成apk

热门文章

  1. Ubuntu安装Beyond-Compare 4
  2. 检验是否相关-------假设检验
  3. C. Edgy Trees Codeforces Round #548 (Div. 2) 【连通块】
  4. Synchronized的两个用法
  5. FastCGI与php-fpm
  6. Leetcode 764. Largest Plus Sign
  7. Unix Linux大学教程(三):过滤器、正则表达式、vi
  8. Understanding SOAP
  9. 【怎样写代码】工厂三兄弟之工厂方法模式(五):工厂方法模式扩展
  10. Matlab数据的可视化 -- 线性图函数plot