kafka 消费者API操作入门
首先下载消费者的配置文件到idea的resource目录下,并更改
主要配置下kafka的服务地址以及反序列化的相关类
bootstrap.servers=mypc01:9092,mypc02:9092,mypc03:9092# consumer group id
group.id=test-consumer-group# What to do when there is no initial offset in Kafka or if the current
# offset does not exist any more on the server: latest, earliest, none
auto.offset.reset=earliest
key.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
代码
import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer}object ConsumerDemo extends App {private val properties = new Properties()//加载读取配置文件
properties.load(ConsumerDemo.getClass.getClassLoader.getResourceAsStream("consumer.properties"))
//新建一个消费者,传入配置参数private val consumer = new KafkaConsumer[Integer, String](properties)
//订阅topic列表,这里以订阅pet topic为例private val unit: Unit = consumer.subscribe(util.Arrays.asList("pet"))while (true) {//轮询拉取指定分区上的数据val records: ConsumerRecords[Integer, String] = consumer.poll(1000)import scala.collection.JavaConversions._//遍历拉取的结果,解析每一条记录for (record <- records) {val key: Integer = record.key()val value: String = record.value()val partition: Int = record.partition()val offset: Long = record.offset()val topic: String = record.topic()println(s"topic: $topic partition: $partition offset: $offset key: $key value: $value")}}
}
执行结果
topic: pet partition: 0 offset: 0 key: null value:
topic: pet partition: 0 offset: 1 key: null value: gaoyu
topic: pet partition: 0 offset: 2 key: null value: 66
指定分区进行消费
消费 当然要有选择的权利,比如只想消费cat
主题的0
分区
def ConsumerTest(): Unit = {val properties = new Properties()val stream: InputStream = ConsumerDemo2.getClass.getClassLoader.getResourceAsStream("consumer.properties")properties.load(stream)val consumer = new KafkaConsumer[String, String](properties)//指定需要消费的分区val partition = new TopicPartition("cat", 0)//给消费者分配要消费的主题和分区,保证成java的集合类型//因为可以同时消费多个主题和分区consumer.assign(Arrays.asList(partition))import scala.collection.JavaConversions._while (true) {val records: ConsumerRecords[String, String] = consumer.poll(1000)for (x <- records) {println(x)}}}
一条结果示例
ConsumerRecord(topic = cat, partition = 0, offset = 1341, CreateTime = 1609485529409, serialized key size = 10, serialized value size = 761, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1609485529, value = eyJjdGltZ)
同时消费多个主题和分区
//同时消费多个主题和分区def ConsumerTest2(): Unit = {val properties = new Properties()val stream: InputStream = ConsumerDemo2.getClass.getClassLoader.getResourceAsStream("consumer.properties")properties.load(stream)val consumer = new KafkaConsumer[String, String](properties)//指定需要消费的分区val partition1 = new TopicPartition("cat", 0)val partition2 = new TopicPartition("pet", 0)//给消费者分配多个主题和分区consumer.assign(Arrays.asList(partition1, partition2))import scala.collection.JavaConversions._while (true) {val records: ConsumerRecords[String, String] = consumer.poll(1000)for (x <- records) {println(x)}}}
指定分区和offset进行消费
def ConsumerTest2(): Unit = {val properties = new Properties()val stream: InputStream = ConsumerDemo2.getClass.getClassLoader.getResourceAsStream("consumer.properties")properties.load(stream)val consumer = new KafkaConsumer[String, String](properties)//指定需要消费的分区val partition1 = new TopicPartition("cat", 0)consumer.assign(Arrays.asList(partition1))import scala.collection.JavaConversions._//在调用seek之前需要先调用一次poll,否则会报错consumer.poll(1000)//seek可以指定offset,就是从你指定的offset开始读取数据consumer.seek(partition1, 1500)//因为poll每次拉取的数据并不是所有,所以需要放在循环里面while (true) {val records: ConsumerRecords[String, String] = consumer.poll(1000)for (x <- records) {println(x)}}}
方法解析
poll方法
public org.apache.kafka.clients.consumer.ConsumerRecords<K, V> poll(long timeout)
参数为超时时间–如果缓冲区中没有数据,则等待轮询的时间(以毫秒为单位)。 如果为0,则立即返回缓冲区中当前可用的任何记录,否则返回空。 不能为负。
返回值为自上次获取主题和分区的已订阅列表以来的主题到记录的映射
subscribe方法
参数是一个java的集合,元素为topic. 无返回值
public void subscribe(java.util.Collection<String> topics)
报错
No current assignment for partition cat-0
在调用seek之前先要调用一次poll.
总结
subscribe
与assign
方法对kafka
而言是互斥的,二者只能指定其一,前者指定消费模式为指定topic,由kafka指定partition分配策略,后者指定消费模式为指定partition,由用户自定义partition分配策略。- 指定分区进行消费需要使用
assign
方法 - 指定分区和offset进行消费需要结合使用
assign
和seek
方法
kafka 消费者API操作入门相关推荐
- Kafka消费者APi
Kafka客户端从集群中消费消息,并透明地处理kafka集群中出现故障服务器,透明地调节适应集群中变化的数据分区.也和服务器交互,平衡均衡消费者. public class KafkaConsumer ...
- Python之kafka消息队列操作入门
1 kafka简介 1.1 什么是kafka kafka是一个分布式.高吞吐量.高扩展性的消息队列系统.kafka最初是由Linkedin公司开发的,后来在2010年贡献给了Apache基金会,成为了 ...
- kafka生产者API操作
example 创建producer发送信息给消费者 object kafakaTest extends App {//设定配置相关private val prop = new Properties( ...
- kafka的topic操作入门
创建topic kafka-topics.sh \ --zookeeper mypc01:2181,mypc02:2181,mypc03:2181/kafka \ --create \ --topic ...
- java调用kafka接口发送数据_Java调用Kafka生产者,消费者Api及相关配置说明
本次的记录内容包括: 1.Java调用生产者APi流程 2.Kafka生产者Api的使用及说明 3.Kafka消费者Api的使用及说明 4.Kafka消费者自动提交Offset和手动提交Offset ...
- Kafka系列三 java API操作
使用java API操作kafka 1.pom.xml <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xs ...
- Kafka 0.9 新消费者API
kafka诞生之初,它自带一个基于scala的生产者和消费者客户端.但是慢慢的我们认识到这些API有很多限制.比如,消费者有一个"高级"API支持分组和异常控制,但是不支持很多更复 ...
- kafka偏移量保存到mysql里_Kafka 新版消费者 API(二):提交偏移量
1. 自动提交 最简单的提交方式是让消费者自动提交偏移量.如果 enable.auto.commit 被设为 true,那么每过 5s,消费者会自动把从 poll() 方法接收到的最大偏移量提交上去. ...
- kafka详解(JAVA API操作kafka、kafka原理、kafka监控)-step2
1.JAVA API操作kafka 修改Windows的Host文件: 目录:C:\Windows\System32\drivers\etc (win10) 内容: 192.168.40.150 k ...
最新文章
- [原]LVM管理问题解决
- 深入浅出Websocket(二)分布式Websocket集群
- python从入门到实践django看不懂_Python编程:从入门到实践踩坑记 Django
- HBase性能优化总结
- P2634 [国家集训队]聪聪可可(树上启发式合并)
- 优启通怎么重装系统win10_重装系统失败?小编教你安全给神舟战神GX9 Pro重装win10系统方法...
- 973. 最接近原点的 K 个点
- 如何检查数字是否为2的幂
- python教程视频-私藏已久的7个Python视频教程
- 《高翔视觉slam十四讲》学习笔记 第七讲 视觉里程计
- 2020年 Java开发者进阶手册.pdf(吐血整理)
- dism++封装系统使用教程_【原创】最新WIN10系统封装教程2019系列(一)——定制母盘...
- 绿盟科技网络安全威胁周报2017.02 请关注Microsoft Edge远程权限提升漏洞 CVE-2017-0002...
- 百度Python面试题
- Pycharm新建项目,new environment 和 existing interpreter的区别
- (28)部署强命名程序集到GAC
- MRAM学习笔记——3.SOT-MTJ SPICE model解析
- Matlab 校验方法
- html5文本框里插图片文字,word应用教程:在文本框内插入图片
- 巧用千寻位置GNSS软件|逐点放样应用技巧
热门文章
- mysql insert 性能_MySQL 提高Insert性能
- java实现红包要多少钱_Java实现抢红包功能
- X5045的C语言源码,X5045看门狗的单片机源程序和Proteus仿真原理图
- Linux下修改mysql密码以及忘记密码重置
- linux内核中union,Linux上的Union mount
- 高斯正反算 java_高斯投影正反算的代码
- keras 升级_如何入门Keras?
- 计算机图形学算法详解,计算机图形学裁剪算法详解
- 基因突变PHP6,基因突变中那些“披着狼皮的羊” 很多“致命性”基因突变正在被证实无害...
- 在linux中关于组的命令,linux 用户和组管理相关的命令