凌云时刻 · 技术

导读:这一节来看看如何使用Java编写Kafka Consumer。

作者 | 计缘

来源 | 凌云时刻(微信号:linuxpk)

Java Consumer

首先创建Consumer需要的配置信息,最基本的有五个信息

  • Kafka集群的地址。

  • 发送的Message中Key的序列化方式。

  • 发送的Message中Value的序列化方式。

  • 指定Consumer Group。

  • 指定拉取Message范围的策略。

    Properties properties = new Properties();
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:Port");
    properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "consumer_group_1");
    properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // earliest, none
    

然后传入上面实例化好的配置信息,实例化Consumer:

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

然后通过Consumer的subscribe(Collection<String> topics)方法订阅Topic:

consumer.subscribe(Arrays.asList("first_topic"));

最后获取Topic里的Message,将Message信息输出到日志中:

while(true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for(ConsumerRecord<String, String> record : records) {logger.info("Key: " + record.key() + ", Value: " + record.value());logger.info("Partition: " + record.partition() + ", Offset: " + record.offset());}
}

Consumer的poll(Duration timeout)方法可以设置获取数据的时间间隔,同时回忆一下在之前Consumer章节的Consumer Poll Options小节中,说过关于Consumer获取Message的四个配置项,都可以在Properties里进行设置。

启动Java Consumer后,在控制台可以看到如下信息:

[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.0.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 3402a8361b734732
[main] INFO org.apache.kafka.clients.Metadata - Cluster ID: 4nh_0r5iQ_KsR_Fzf1HTGg
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Discovered group coordinator IP:9092 (id: 2147483647 rack: null)
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Revoking previously assigned partitions []
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] (Re-)joining group
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Successfully joined group with generation 1
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Setting newly assigned partitions [first_topic-0, first_topic-1, first_topic-2]
[main] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=consumer_group_1] Resetting offset for partition first_topic-0 to offset 23.
[main] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=consumer_group_1] Resetting offset for partition first_topic-1 to offset 24.
[main] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=consumer_group_1] Resetting offset for partition first_topic-2 to offset 21.

在上面的信息中,可以看到Setting newly assigned partitions [first_topic-0, first_topic-1, first_topic-2]这句话,说明当前这个Consumer会获取first_topic这个Topic中全部Partition中的Message。

如果我们再启动一个Consumer,这个Consumer和第一个在同一个组里,看看会有什么输出信息:

[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.0.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 3402a8361b734732
[main] INFO org.apache.kafka.clients.Metadata - Cluster ID: 4nh_0r5iQ_KsR_Fzf1HTGg
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Discovered group coordinator IP:9092 (id: 2147483647 rack: null)
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Revoking previously assigned partitions []
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] (Re-)joining group
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Successfully joined group with generation 2
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Setting newly assigned partitions [first_topic-2]

可以看到新启动的Consumer会输出Setting newly assigned partitions [first_topic-2]这句话,说明新的这个Consumer只会获取first_topic这个Topic的一个Partition中的Message。

再回去看看第一个Consumer的控制台:

[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Attempt to heartbeat failed since group is rebalancing
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Revoking previously assigned partitions [first_topic-0, first_topic-1, first_topic-2]
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] (Re-)joining group
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Successfully joined group with generation 2
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=consumer_group_1] Setting newly assigned partitions [first_topic-0, first_topic-1]

第一个Consumer新输出在控制台中的信息很关键,首先看到Attempt to heartbeat failed since group is rebalancing这句话,说明Kafka会自动重新给Consumer Group里的Consumer分配Topic的Partition。

再看Setting newly assigned partitions [first_topic-0, first_topic-1]这句,说明第一个Consumer不会再获取first_topic-2这个Partition里的Message了。这也印证了在Consumer章节的Consumer Group小节里讲过的概念。

Java Consumer

Java Consumer with Assign and Seek

如果我们有一个临时的Consumer,不想加入任何一个Consumer Group,而且需要指定Topic的Partition,以及指定从哪个Message Offset开始获取数据,怎么办?所幸,Kafka提供了这样的API。

首先我们在实例化配置信息时,就不需要指定Consumer Group了:

Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstant.BOOTSTRAP_SERVER);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // earliest, none

然后实例化TopicPartition,指定Topic和Partition序号。使用Consumer的assign(Collection<TopicPartition> partitions)方法,分配给该Consumer:

TopicPartition topicPartition = new TopicPartition("first_topic", 0);
consumer.assign(Arrays.asList(topicPartition));

再然后指定Message Offset:

long offset = 21L;
consumer.seek(topicPartition, offset);

运行该Consumer,可以看到如下输出信息:

[main] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=] Fetch offset 21 is out of range for partition first_topic-0, resetting offset
[main] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=] Resetting offset for partition first_topic-0 to offset 22.
[main] INFO com.devtalking.jacefu.kafka.tutorial.ConsumerDemoAssignSeek - Key: null, Value: hello world!
[main] INFO com.devtalking.jacefu.kafka.tutorial.ConsumerDemoAssignSeek - Partition: 0, Offset: 22

如果我们使用Consumer Group CLI查看,会发现这种操作其实也是临时创建了一个Consumer Group:

root@iZ2ze2booskait1cxxyrljZ:~# kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --listconsumer_group_1
KMOffsetCache-iZ2ze2booskait1cxxyrljZ

小结

这一章节带大家实践如何使用Kafka提供的API编写Java Consumer。上一节和这一节主要介绍了Kafka Java Client(Producer和Consumer)的使用方式,相比Kafka CLI,Java Client在实际的开发中可能使用的更加频繁,希望能给使用Java语言的小伙伴们带来帮助。

END

往期精彩文章回顾

Kafka Java Producer

Kafka CLI:Reseting Offset & Config CLI

Kafka CLI:Consumer CLI & Producer CLI

Kafka CLI:Topic CLI & Producer CLI

Kafka从上手到实践 - 实践真知:搭建单机Kafka

Kafka从上手到实践 - 庖丁解牛:Consumer

Kafka从上手到实践 - 庖丁解牛:Producer

Kafka从上手到实践 - 庖丁解牛:Partition

Kafka从上手到实践 - 庖丁解牛:Topic & Broker

Kafka从上手到实践 - 初步认知:MQ系统

长按扫描二维码关注凌云时刻

每日收获前沿技术与科技洞见

Kafka从上手到实践 - 实践真知:Kafka Java Consumer | 凌云时刻相关推荐

  1. Kafka Java consumer动态修改topic订阅

    前段时间在Kafka QQ群中有人问及此事--关于Java consumer如何动态修改topic订阅的问题.仔细一想才发现这的确是个好问题,因为如果简单地在另一个线程中直接持有consumer实例然 ...

  2. 新书《深入理解Kafka:核心设计与实践原理》上架,感谢支持~

    新书上架 初识 Kafka 时,笔者接触的还是 0.8.1 版本,Kafka 发展到目前的 2.x 版本,笔者也见证了Kafka的蜕变,比如旧版客户端的淘汰.新版客户端的设计.Kafka 控制器的迭代 ...

  3. kafka 在 360 商业化的实践

    精选30+云产品,助力企业轻松上云!>>> 本文参考闫锁鹏老师在2019DAMS上海站关于Kafka在360的商业化实践分享. 关于作者:近10年基础架构与大数据开发经验,2013年 ...

  4. 【kafka系列】kafka之生产者发送消息实践

    目录 一.准备工作 二.终端命令 生产者命令 消费者命令 三.Java实践 搭建项目 异步发送-无回调 异步发送-有回调 同步发送 一.准备工作 进入实战之前先熟悉一下topic的相关命令,使用终端命 ...

  5. 超详细!一文详解 SparkStreaming 如何整合 Kafka !附代码可实践

    来源 | Alice菌 责编 | Carol 封图 |  CSDN 下载于视觉中国 出品 | CSDN(ID:CSDNnews) 相信很多小伙伴已经接触过 SparkStreaming 了,理论就不讲 ...

  6. kafka partition java,kafka中partition数量与消费者对应关系以及Java实践

    kafka中partition数量与消费者对应关系以及Java实践 kafka中partition数量与消费者对应关系以及Java实践 kafka是由Apache软件基金会开发的一个开源流处理平台.k ...

  7. Kafka的原理介绍及实践

    一.官方定义 根据官网的介绍,kafka是一个提供统一的.高吞吐.低延迟的,用来处理实时数据的流式平台,它具备以下三特性: 流式记录的发布和订阅:类似于消息系统. 存储:在一个分布式.容错的集群中安全 ...

  8. 超详细!一文告诉你 SparkStreaming 如何整合 Kafka !附代码可实践

    来源 | Alice菌 责编 | Carol 封图 |  CSDN 下载于视觉中国 相信很多小伙伴已经接触过 SparkStreaming 了,理论就不讲太多了,今天的内容主要是为大家带来的是 Spa ...

  9. 深入理解 Kafka :核心设计与实践 读书笔记

    第1章 初识 Kafka Kafka 架构有什么组件? 一个典型的 Kafka 体系架构包括若干 Producer.若干 Broker .若干 Consumer,以及一个 ZooKeeper 集群. ...

  10. kafka Confluent Schema Registry 简单实践

    解释及目的: 使用传统的Avro API自定义序列化类和反序列化类或者使用Twitter的Bijection类库实现Avro的序列化与反序列化,这两种方法都有一个缺点:在每条Kafka记录里都嵌入了s ...

最新文章

  1. rhel6用centos163 yum源
  2. IPC介绍——10个ipcs例子
  3. ppt flash倒计时器_PPT三大神器之iSlide插件
  4. Simulink之负载换流式无源逆变电路
  5. 数据结构(动态树):[国家集训队2012]tree(伍一鸣)
  6. Unity渲染管线-百人计划笔记
  7. realm android,Realm for Android快速入门教程
  8. 《图解机器学习-杉山将著》读书笔记---CH5
  9. mysql中12e10等于多少_一篇文章看懂mysql中varchar能存多少汉字、数字,以及varchar(100)和varchar(10)的区别...
  10. leetcode 之Rotate List(18)
  11. String类的两种赋值
  12. HAProxy + Keepalived实现MySQL的高可用负载均衡
  13. Louvain算法在反作弊上的应用
  14. linux系统查看内核版本是多少,在linux下查看内核版本、gcc版本、操作系统多少位等参数...
  15. 《生与死》- 瓦特·兰德
  16. 解析智能推荐系统开发中十大关键要素
  17. k8s出现问题导致cpu使用率过高
  18. 2012总结之pcode.Class
  19. 计算机里删除的文件可以在哪里进行恢复,电脑上删除的文件怎么恢复?方法在这里...
  20. Google map地图限制显示区域、拖拽范围

热门文章

  1. SSIS包生成注意事项
  2. 家庭上网用路由器和ADSL的连接
  3. redis安装----非基于lnmp安装
  4. 导出excel用ajax不行,提交form表单可以
  5. Uva 1588.Kickdown
  6. 190729每日一句
  7. 190705每日一句; 寻找内心的勇气, 一切从零开始
  8. latex减少图片和图片解释文字之间的距离
  9. 190408每日一句
  10. Atitit 大数据体系树 艾提拉著 数据采集 gui自动化 爬虫 Nui自动化  Ocr技术 Tts语音处理 文档处理(office zip等) html文档处理解析 转换与处理