【Kafka】从kafka中读取最新数据

  • 一、死循环无限拉取kafka数据
    • 1.1 整体框架剖析
    • 1.2 测试
  • 二、@KafkaListener注解 实现监听kafka数据
  • 三、参考资料

前情提要:我这里只是读取kafka里面的数据,生产者已经配置好且会自动监控数据库的变化来推入kafka中,所以这里不对生产者做过多的解释。

一、死循环无限拉取kafka数据

1.1 整体框架剖析

1、要想从Kafka中读取数据,就要先对消费者信息进行配置

//1、创建消费者配置信息Properties properties = new Properties();//2、给配置信息赋值//2.1 kafka集群信息properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.9.220:9092,192.168.9.221:9092,192.168.9.222:9092");//2.2 开启自动提交offset 提交以后每次offset都在消费的最新位置properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");//2.3 自动提交offset延时 1秒钟提交一次properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");//2.4 key value的反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");//2.5 消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG,"base_db_app_210325");

2、消费者基本配置信息完成以后,创建消费者、订阅主题、为了后面的消费

 //创建消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);//订阅主题consumer.subscribe(Collections.singletonList("ticdc-paperfree-monitor"));

3、订阅主题,就相当于已经订阅了kafka中的消息,下一步就是消费。而kafka消费消息的方式是poll拉取,我们这里对kafka中的数据进行消费,上面我们选择了自动提交offset,那么每次offset就是在上一次消费完成以后的最新位置,所以我们接下来的每次消费得到的都是最新未消费的数据!

while (true) {//获取数据ConsumerRecords<String, String> poll = consumer.poll(100);//解析并打印for (ConsumerRecord<String, String> stringStringConsumerRecord : poll) {System.out.println("stringStringConsumerRecord.key() = " + stringStringConsumerRecord.key() + "------" + stringStringConsumerRecord.value() + "----" + stringStringConsumerRecord.topic());}}

1.2 测试

方法一:

1、创建MyConsumer1类,根据上面整体结构的剖析,添加如下代码,并进行测试。

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Collections;
import java.util.Properties;/*** @author potential*/
public class MyConsumer1 {public static void main(String[] args) {//1、创建消费者配置信息Properties properties = new Properties();//2、给配置信息赋值//2.1 kafka集群信息properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.9.220:9092,192.168.9.221:9092,192.168.9.222:9092");//2.2 开启自动提交offsetproperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");//2.3 自动提交offset延时 1秒钟提交一次properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");//2.4 key value的反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");//2.5 消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG,"base_db_app_210325");//创建消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);//订阅主题consumer.subscribe(Collections.singletonList("ticdc-paperfree-monitor"));while (true) {//获取数据ConsumerRecords<String, String> poll = consumer.poll(100);//解析并打印for (ConsumerRecord<String, String> stringStringConsumerRecord : poll) {System.out.println("stringStringConsumerRecord.key() = " + stringStringConsumerRecord.key() + "------" + stringStringConsumerRecord.value() + "----" + stringStringConsumerRecord.topic());}}
//        //关闭连接
//        consumer.close();}
}

方法二:
2、创建MyConsumer2类,根据上面整体结构的剖析,添加如下代码,并进行测试。

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.*;/*** @author potential*/
public class MyConsumer2 {public static void main(String[] args) {//配置必要的参数//准备一个map集合放置参数Map<String, Object> config = new HashMap<String, Object>();//bootserversconconfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.9.220:9092,192.168.9.221:9092,192.168.9.222:9092");//开启自动提交offsetconfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");//自动提交offset延时 1秒钟提交一次config.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");//valuedeserilizerconfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG , StringDeserializer.class);//groupidconfig.put(ConsumerConfig.GROUP_ID_CONFIG, "base_db_app_210325");//如果找不到偏移量,设置earliest,则从最新消费开始,也就是消费者一开始最新消费的时候//一定要注意顺序,读取时候的顺序会影响config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//        //此处是把消费者的偏移量重置到生产者最顶端
//        Map<TopicPartition, OffsetAndMetadata> hashMaps = new HashMap<TopicPartition, OffsetAndMetadata>();
//        hashMaps.put(new TopicPartition("ticdc-paperfree-monitor", 0), new OffsetAndMetadata(129));//消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(config);
//        //放置刚刚设置的偏移量
//        consumer.commitSync(hashMaps);//先订阅后消费consumer.subscribe(Arrays.asList("ticdc-paperfree-monitor"));//        // 批量从主题的分区拉取消息
//        //final ConsumerRecords<Integer, String> consumerRecords = consumer.poll(3_000);
//        ConsumerRecords<String, String> consumerRecords = consumer.poll(3000);while (true) {//获取数据ConsumerRecords<String, String> poll = consumer.poll(100);//解析并打印for (ConsumerRecord<String, String> stringStringConsumerRecord : poll) {System.out.println("stringStringConsumerRecord.key() = " + stringStringConsumerRecord.key() + "------" + stringStringConsumerRecord.value() + "----" + stringStringConsumerRecord.topic());}}//
//        //遍历本次从主题的分区拉取的批量消息 这里是将整个分区中的全部数据都拉出来了
//        consumerRecords.forEach(new java.util.function.Consumer<ConsumerRecord<Integer, String>>() {//            @Override
//            public void accept(ConsumerRecord<Integer, String> consumerRecord) {//                System.out.println(
//                        consumerRecord.topic() +"\t"
//                                +consumerRecord.offset() + "\t"
//                                +consumerRecord.key() +"\t"
//                                +consumerRecord.value()+"\t"
//                );
//            }
//        });
//        consumer.close();}
}

注意:
方式一、方式二只是写法上的不同,整体架构都是一样的,任选其一来写即可。
至此,从kafka中读取最新数据的流程就全部结束了。

二、@KafkaListener注解 实现监听kafka数据

1、导入依赖

【我这里SpringBoot版本是2.2.13】

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.3.1</version></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.3.7.RELEASE</version></dependency>

注意:
1、springboot +2、kafka-clients +3、spring-kafka(在下图中体现为Sprig for Apache Kafka Version) 这三个 要注意版本对应。具体对应情况如下图所示:
2、配置文件
application.yml文件中添加如下内容:

spring:kafka:consumer:bootstrap-servers: 192.168.9.220:9092,192.168.9.221:9092,192.168.9.222:9092 #集群信息producer: #生产者retries: 0 #设置大于0的值,则客户端将发送失败的记录重新发送batch-size: 16384 #批量大小buffer-momory: 33554432 #生产端缓冲区大小acks: 1 #应答级别#指定消息key和消息体的解编码方式  序列化与反序列化key- key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializconsumer:group-id: base_db_app_210325enable-auto-comnit: true #是否自动提交offsetauto-offset-reset: latest #重置为分区中最新的offset(消费者分区中新产生的数据)key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializertopic-name: ticdc-paperfree-monitor #主题listener:ack-mode: manual_immediate

3、创建MyConsumer类,添加如下内容:

 @KafkaListener(id = "test1", topics = "ticdc-paperfree-monitor")//这里id是随意起的,我这里叫test1,我这里主题直接写死,取ticdc-paperfree-monitor这个主题下的数据,也可以${},动态获取主题名称,group_idpublic void listen(ConsumerRecord<String, String> record) {//从Kafka中读取到的数据System.out.println("topic:" + record.topic());System.out.println("value:" + record.value());}

4、测试
运行主启动类,会自动进行监听且在程序运行的过程中将数据输出。

三、参考资料

https://blog.csdn.net/m0_67391270/article/details/126505944
https://blog.csdn.net/weixin_46271129/article/details/119800649

【Kafka】从kafka中读取最新数据相关推荐

  1. Structured Streaming从Kafka 0.8中读取数据的问题

    众所周知,Structured Streaming默认支持Kafka 0.10,没有提供针对Kafka 0.8的Connector,但这对高手来说不是事儿,于是有个Hortonworks的邵大牛(前段 ...

  2. Flink使用KafkaSource从Kafka消息队列中读取数据

    Flink使用KafkaSource从Kafka消息队列中读取数据 使用KafkaSource从Kafka消息队列中读取数据 1.KafkaSource创建的DataStream是一个并行的DataS ...

  3. 从MySQL中读取股票数据——从零到实盘10

    前文介绍了把股票数据写入MySQL的过程,本文记录从MySQL中读取股票数据的过程. 到目前为止,我们在访问股票代码列表时,每次需要通过BaoStock重新下载.本文将把下载的股票代码保存到MySQL ...

  4. 随机从mysql中读取_如何实现MySQL表数据随机读取?从mysql表中读取随机数据

    文章转自 http://blog.efbase.org/2006/10/16/244/ 如何实现MySQL表数据随机读取?从mysql表中读取随机数据?以前在群里讨论过这个问题,比较的有意思.mysq ...

  5. matlab 十六进制数组,【MATLAB】MATLAB中读取二进制数据文件并加入到矩阵中

    MATLAB中读取二进制数据文件并加入到矩阵中的应用如下: 如果对c语言十分熟悉的话,应该对fopen,fclose,ftell,fseek,fread,fwrite,feof 这些函数非常熟悉了,在 ...

  6. python读取图像数据流_浅谈TensorFlow中读取图像数据的三种方式

    本文面对三种常常遇到的情况,总结三种读取数据的方式,分别用于处理单张图片.大量图片,和TFRecorder读取方式.并且还补充了功能相近的tf函数. 1.处理单张图片 我们训练完模型之后,常常要用图片 ...

  7. C# 从Excel中读取时间数据

    之前写到从Excel中读取时间数据 //读取Excel数据Excel.Application xapp = new Excel.Application();string filepath = txt_ ...

  8. matlab处理记事本数据库,如何从Matlab的记事本文件中读取大块数据?

    如何从Matlab的记事本文件中读取大块数据? 我的数据采用以下格式: TABLE NUMBER 1 FILE: name_1 name_2 TIME name_3 day name_4 -0.01 ...

  9. R语言中读取excel数据的常用方式有哪些?

    R语言中读取excel数据的常用方式有哪些? 目录 R语言中读取excel数据的常用方式有哪些? R语言是解决什么问题的? R语言中读取excel数据的常用方式有哪些? R语言是解决什么问题的? R ...

最新文章

  1. 如何构建虚拟护士应用程序?
  2. python变量类型怎么决定的_python里测试变量类型用什么
  3. 数据库存在即更新的高并发处理 - 转
  4. 024 Android 自定义样式对话框(AlertDialog)
  5. (转)java类初始化顺序 - jackyrong的世界 - 博客园
  6. 如何让eclipse恢复默认布局
  7. c++中stringstream_文史哲与艺术中的数学_智慧树章节答案
  8. 面向对象-多态,反射
  9. 定义一个圆的类,输入半径,计算周长和面积并输出
  10. 《信息可视化:交互设计(原书第2版)》——1.1节可视化
  11. Python技术知识清单(数据分析)
  12. linux源码安装mysql
  13. 基于ATmega16单片机 程控滤波器程序源代码
  14. 计算机专业实训是干什么,计算机系实习目的是什么
  15. CentOS 7 : 一 、安装WeKan
  16. 国仁猫哥:视频号超详细运营攻略教程;教你如何打造一个优质的视频号【建议收藏】
  17. 二维码解码程序的两大难点问题
  18. PICASSO,一个高效的搜推广稀疏训练解决方案
  19. [转载]芋道 Soul 极简入门(国产微服务网关)
  20. Kepware与 smart200建立连接的方法

热门文章

  1. JavaScript-关闭图片
  2. linux cpio 备份系统,linux备份命令-cpio
  3. 奥运英语[2] 你好! 早上好!Hi! Good Morning.
  4. 中文分词原理及分词工具介绍
  5. 百度CTO王海峰做客《中国经济大讲堂》:文心一言,读书破万亿
  6. 以pfile或者spfile启动时show parameter pfile的不同结果
  7. python实现卷积运算
  8. 三级页面爬取苏宁图书
  9. 心理学、文学和数学——学习资料汇总
  10. Gradle project sync failed. Basic functionality (e.g. editing, debugging) will not work properly