Kafka从上手到实践 - 实践真知:Kafka Java Consumer | 凌云时刻
凌云时刻 · 技术
导读:这一节来看看如何使用Java编写Kafka Consumer。
作者 | 计缘
来源 | 凌云时刻(微信号:linuxpk)
Java Consumer
首先创建Consumer需要的配置信息,最基本的有五个信息:
Kafka集群的地址。
发送的Message中Key的序列化方式。
发送的Message中Value的序列化方式。
指定Consumer Group。
指定拉取Message范围的策略。
Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:Port"); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "consumer_group_1"); properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // earliest, none
然后传入上面实例化好的配置信息,实例化Consumer:
|
然后通过Consumer的subscribe(Collection<String> topics)
方法订阅Topic:
|
最后获取Topic里的Message,将Message信息输出到日志中:
|
Consumer的poll(Duration timeout)
方法可以设置获取数据的时间间隔,同时回忆一下在之前Consumer章节的Consumer Poll Options小节中,说过关于Consumer获取Message的四个配置项,都可以在Properties里进行设置。
启动Java Consumer后,在控制台可以看到如下信息:
|
在上面的信息中,可以看到Setting newly assigned partitions [first_topic-0, first_topic-1, first_topic-2]
这句话,说明当前这个Consumer会获取first_topic
这个Topic中全部Partition中的Message。
如果我们再启动一个Consumer,这个Consumer和第一个在同一个组里,看看会有什么输出信息:
|
可以看到新启动的Consumer会输出Setting newly assigned partitions [first_topic-2]
这句话,说明新的这个Consumer只会获取first_topic
这个Topic的一个Partition中的Message。
再回去看看第一个Consumer的控制台:
|
第一个Consumer新输出在控制台中的信息很关键,首先看到Attempt to heartbeat failed since group is rebalancing
这句话,说明Kafka会自动重新给Consumer Group里的Consumer分配Topic的Partition。
再看Setting newly assigned partitions [first_topic-0, first_topic-1]
这句,说明第一个Consumer不会再获取first_topic-2
这个Partition里的Message了。这也印证了在Consumer章节的Consumer Group小节里讲过的概念。
Java Consumer
Java Consumer with Assign and Seek
如果我们有一个临时的Consumer,不想加入任何一个Consumer Group,而且需要指定Topic的Partition,以及指定从哪个Message Offset开始获取数据,怎么办?所幸,Kafka提供了这样的API。
首先我们在实例化配置信息时,就不需要指定Consumer Group了:
|
然后实例化TopicPartition
,指定Topic和Partition序号。使用Consumer的assign(Collection<TopicPartition> partitions)
方法,分配给该Consumer:
|
再然后指定Message Offset:
|
运行该Consumer,可以看到如下输出信息:
|
如果我们使用Consumer Group CLI查看,会发现这种操作其实也是临时创建了一个Consumer Group:
|
小结
这一章节带大家实践如何使用Kafka提供的API编写Java Consumer。上一节和这一节主要介绍了Kafka Java Client(Producer和Consumer)的使用方式,相比Kafka CLI,Java Client在实际的开发中可能使用的更加频繁,希望能给使用Java语言的小伙伴们带来帮助。
END
往期精彩文章回顾
Kafka Java Producer
Kafka CLI:Reseting Offset & Config CLI
Kafka CLI:Consumer CLI & Producer CLI
Kafka CLI:Topic CLI & Producer CLI
Kafka从上手到实践 - 实践真知:搭建单机Kafka
Kafka从上手到实践 - 庖丁解牛:Consumer
Kafka从上手到实践 - 庖丁解牛:Producer
Kafka从上手到实践 - 庖丁解牛:Partition
Kafka从上手到实践 - 庖丁解牛:Topic & Broker
Kafka从上手到实践 - 初步认知:MQ系统
长按扫描二维码关注凌云时刻
每日收获前沿技术与科技洞见
Kafka从上手到实践 - 实践真知:Kafka Java Consumer | 凌云时刻相关推荐
- Kafka Java consumer动态修改topic订阅
前段时间在Kafka QQ群中有人问及此事--关于Java consumer如何动态修改topic订阅的问题.仔细一想才发现这的确是个好问题,因为如果简单地在另一个线程中直接持有consumer实例然 ...
- 新书《深入理解Kafka:核心设计与实践原理》上架,感谢支持~
新书上架 初识 Kafka 时,笔者接触的还是 0.8.1 版本,Kafka 发展到目前的 2.x 版本,笔者也见证了Kafka的蜕变,比如旧版客户端的淘汰.新版客户端的设计.Kafka 控制器的迭代 ...
- kafka 在 360 商业化的实践
精选30+云产品,助力企业轻松上云!>>> 本文参考闫锁鹏老师在2019DAMS上海站关于Kafka在360的商业化实践分享. 关于作者:近10年基础架构与大数据开发经验,2013年 ...
- 【kafka系列】kafka之生产者发送消息实践
目录 一.准备工作 二.终端命令 生产者命令 消费者命令 三.Java实践 搭建项目 异步发送-无回调 异步发送-有回调 同步发送 一.准备工作 进入实战之前先熟悉一下topic的相关命令,使用终端命 ...
- 超详细!一文详解 SparkStreaming 如何整合 Kafka !附代码可实践
来源 | Alice菌 责编 | Carol 封图 | CSDN 下载于视觉中国 出品 | CSDN(ID:CSDNnews) 相信很多小伙伴已经接触过 SparkStreaming 了,理论就不讲 ...
- kafka partition java,kafka中partition数量与消费者对应关系以及Java实践
kafka中partition数量与消费者对应关系以及Java实践 kafka中partition数量与消费者对应关系以及Java实践 kafka是由Apache软件基金会开发的一个开源流处理平台.k ...
- Kafka的原理介绍及实践
一.官方定义 根据官网的介绍,kafka是一个提供统一的.高吞吐.低延迟的,用来处理实时数据的流式平台,它具备以下三特性: 流式记录的发布和订阅:类似于消息系统. 存储:在一个分布式.容错的集群中安全 ...
- 超详细!一文告诉你 SparkStreaming 如何整合 Kafka !附代码可实践
来源 | Alice菌 责编 | Carol 封图 | CSDN 下载于视觉中国 相信很多小伙伴已经接触过 SparkStreaming 了,理论就不讲太多了,今天的内容主要是为大家带来的是 Spa ...
- 深入理解 Kafka :核心设计与实践 读书笔记
第1章 初识 Kafka Kafka 架构有什么组件? 一个典型的 Kafka 体系架构包括若干 Producer.若干 Broker .若干 Consumer,以及一个 ZooKeeper 集群. ...
- kafka Confluent Schema Registry 简单实践
解释及目的: 使用传统的Avro API自定义序列化类和反序列化类或者使用Twitter的Bijection类库实现Avro的序列化与反序列化,这两种方法都有一个缺点:在每条Kafka记录里都嵌入了s ...
最新文章
- rhel6用centos163 yum源
- IPC介绍——10个ipcs例子
- ppt flash倒计时器_PPT三大神器之iSlide插件
- Simulink之负载换流式无源逆变电路
- 数据结构(动态树):[国家集训队2012]tree(伍一鸣)
- Unity渲染管线-百人计划笔记
- realm android,Realm for Android快速入门教程
- 《图解机器学习-杉山将著》读书笔记---CH5
- mysql中12e10等于多少_一篇文章看懂mysql中varchar能存多少汉字、数字,以及varchar(100)和varchar(10)的区别...
- leetcode 之Rotate List(18)
- String类的两种赋值
- HAProxy + Keepalived实现MySQL的高可用负载均衡
- Louvain算法在反作弊上的应用
- linux系统查看内核版本是多少,在linux下查看内核版本、gcc版本、操作系统多少位等参数...
- 《生与死》- 瓦特·兰德
- 解析智能推荐系统开发中十大关键要素
- k8s出现问题导致cpu使用率过高
- 2012总结之pcode.Class
- 计算机里删除的文件可以在哪里进行恢复,电脑上删除的文件怎么恢复?方法在这里...
- Google map地图限制显示区域、拖拽范围