Kafka Consumer 详解
文章目录
- 消费者和消费者组
- 分区再均衡
- 创建消费者
- 自动提交偏移量
- 自动提交 offset
- 手动提交偏移量
- 同步提交
- 异步提交
- 消费者其他属性
消费者和消费者组
- 在 Kafka 中,消费者通常是消费者群组的一部分,多个消费者群组共同读取同一个 Topic 时,彼此之间互不影响
- Kafka 之所以要引入消费者群组这个概念是因为 Kafka 消费者经常会做一些高延迟的操作,比如把数据写到数据库或 HDFS ,或者进行耗时的计算,在这些情况下,单个消费者无法跟上数据生成的速度
- 此时可以增加更多的消费者,让它们分担负载,分别处理部分 partition 的消息,这就是 Kafka 实现横向伸缩的主要手段
- 同一个 partition 只能被同一个消费者群组里面的一个消费者读取,不可能存在同一个 partition 被同一个消费者群里多个消费者共同读取的情况
- 在使用时应该合理设置消费者的数量,以免造成闲置和额外开销
分区再均衡
- 因为消费者组里的消费者共同读取 topic 的 partition,所以当一个消费者被关闭或发生崩溃时,它就离开了消费者组,原本由它读取的 partition 将由消费者组里的其他消费者来读取
- 在 topic 发生变化时, 比如添加了新的 partition,也会发生 partition 与消费者的重新分配,partition 的所有权从一个消费者转移到另一个消费者
- 正是因为这样的再均衡,消费者组才能保证高可用性和伸缩性
- 消费者通过向组协调器所在的 broker 发送心跳来维持它们和消费者组的从属关系以及它们对 partition 的所有权
- 消费者会在轮询消息或提交偏移量时发送心跳
- 只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在读取 partition 里的消息
- 如果消费者停止发送心跳的时间足够长,会话就会过期,组协调器认为它已经死亡,就会触发再均衡
创建消费者
- bootstrap.servers : 指定 broker 的地址清单,清单里不需要包含所有的 broker 地址
- key.deserializer : 指定 key 的反序列化器
- value.deserializer : 指定 value 的反序列化器
- group.id : 指定消费者组的名称
- consumer.subscribe(Collection<String> topics) :指明需要订阅的 topic 的集合
- consumer.subscribe(Pattern pattern) :使用正则来匹配需要订阅的 topic 的集合
- 最后只需要通过轮询API 定时向 broker 拉取数据, 一旦消费者订阅了主题, 轮询就会处理所有的细节, 包括组协调, 分区再均衡, 发送心跳, 拉取数据等
public static void main(String[] args) {String brokerList = "node0001:9092,node0002:9092,node0003:9092";String groupId = "group1";String topic = "hello-kafka";//配置消费者客户端final Properties properties = new Properties();properties.put("bootstrap.servers", brokerList);properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//配置消费者组名称properties.put("group.id", groupId);//创建 kafka 消费者实例KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);//订阅主题kafkaConsumer.subscribe(Collections.singletonList(topic));while (true) {final ConsumerRecords<String, String> records = kafkaConsumer.poll(1000L);for (ConsumerRecord<String, String> record : records) {System.out.printf("topic = %s,partition = %d, key = %s, value = %s, offset = %d,\n",record.topic(), record.partition(), record.key(), record.value(), record.offset());}}}
自动提交偏移量
- Kafka 的每一条消息
message
都有一个偏移量offset
属性记录了其在分区parition
中的位置,偏移量是一个单调递增的整数 - 消费者通过往一个叫作
_consumer_offset
的特殊 topic 发送消息,消息里包含每个 paritition 的偏移量 - 如果消费者一直处于运行状态,那么偏移量就没有什么用处
- 如果有消费者退出或者新 partition 加入,此时就会触发再均衡。完成再均衡之后,每个消费者可能分配到新的 partition,而不是之前处理的那个。为了能够继续之前的工作,消费者需要读取每个 partition 最后一次提交的offset,然后从偏移量指定的地方继续处理
自动提交 offset
- 自动提交偏移量只需要将消费者的
enable.auto.commit
属性配置为true
, 然后每隔固定的时间就会自动提交 - 提交间隔由
auto.commit.interval.ms
属性进行配置,默认值是 5s - 自动提交偏移量是有隐患的, 消息可能会被重复消费
手动提交偏移量
- 可以通过将
enable.auto.commit
设为false
,然后手动提交偏移量
同步提交
- 通过调用
consumer.commitSync()
来进行同步提交,不传递任何参数时提交的是当前轮询的最大偏移量 - 如果某个提交失败,同步提交还会进行重试,这可以保证数据能够最大限度提交成功,但是同时也会降低程序的吞吐量
异步提交
- 异步提交可以提高程序的吞吐量,因为此时可以尽管请求数据,而不用等待 Broker 的响应
consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {if (exception != null) {System.out.println("错误处理");offsets.forEach((x, y) -> System.out.printf("topic = %s,partition = %d, offset = %s \n",x.topic(), x.partition(), y.offset()));}}});
- 异步提交的问题是: 提交失败不会进行重试
消费者其他属性
- fetch.min.byte
消费者从服务器获取记录的最小字节数。如果可用的数据量小于设置值,broker 会等待有足够的可用数据时才会把它返回给消费者。
- fetch.max.wait.ms
broker 返回给消费者数据的等待时间,默认是 500ms。
- max.partition.fetch.bytes
该属性指定了服务器从每个分区返回给消费者的最大字节数,默认为 1MB。
- session.timeout.ms
消费者在被认为死亡之前可以与服务器断开连接的时间,默认是 3s。
- auto.offset.reset
该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
- latest (默认值) :在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的最新记录);
- earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录。
- enable.auto.commit
是否自动提交偏移量,默认值是 true。为了避免出现重复消费和数据丢失,可以把它设置为 false。
- client.id
客户端 id,服务器用来识别消息的来源。
- max.poll.records
单次调用 poll()
方法能够返回的记录数量。
- receive.buffer.bytes & send.buffer.byte
这两个参数分别指定 TCP socket 接收和发送数据包缓冲区的大小,-1 代表使用操作系统的默认值。
Kafka Consumer 详解相关推荐
- kafka Consumer详解
1.ZookeeperConsumer架构 ZookeeperConsumer类中consumer运行过程架构图: 图1 过程分析: ConsumerGroupExample类 2.消费者线程(con ...
- Kafka 原理详解
Kafka 原理详解 1 kakfa基础概念说明 Broker:消息服务器,就是我们部署的一个kafka服务 Partition:消息的水平分区,一个Topic可以有多个分区,这样实现了消息的无限量存 ...
- Kafka配置详解-Consumer配置
转载自:http://orchome.com/535 3.4 kafka消费者配置 在0.9.0.0中,我们引入了新的Java消费者来替代早期基于Scala的简单和高级消费者.新老客户端的配置如下. ...
- python使用kafka原理详解_Python操作Kafka原理及使用详解
Python操作Kafka原理及使用详解 一.什么是Kafka Kafka是一个分布式流处理系统,流处理系统使它可以像消息队列一样publish或者subscribe消息,分布式提供了容错性,并发处理 ...
- python使用kafka原理详解真实完整版_转:Kafka史上最详细原理总结 ----看完绝对不后悔...
消息队列的性能好坏,其文件存储机制设计是衡量一个消息队列服务技术水平和最关键指标之一.下面将从Kafka文件存储机制和物理结构角度,分析Kafka是如何实现高效文件存储,及实际应用效果. 1.1 K ...
- python使用kafka原理详解真实完整版_史上最详细Kafka原理总结
Kafka Kafka是最初由Linkedin公司开发,是一个分布式.支持分区的(partition).多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实 ...
- kafka 消费者详解
前言 读完本文,你将了解到如下知识点: kafka 的消费者 和 消费者组 如何正确使用 kafka consumer 常用的 kafka consumer 配置 消费者 和 消费者组 什么是消费者? ...
- kafka实战教程(python操作kafka),kafka配置文件详解
全栈工程师开发手册 (作者:栾鹏) 架构系列文章 应用往Kafka写数据的原因有很多:用户行为分析.日志存储.异步通信等.多样化的使用场景带来了多样化的需求:消息是否能丢失?是否容忍重复?消息的吞吐量 ...
- Kafka 安装详解
注意:确保有JDK1.8版本及以上 官方文档:https://kafka.apache.org/quickstart 清华镜像下载:https://mirrors.tuna.tsinghua.edu. ...
最新文章
- 教你用代码奏响天空之城! (C++中发声函数Beep详解)
- 使用php与mysql构建我们的网站
- hadoop中的jps是什么,Jps命令—使用详解【笔记自用】
- 带你了解『百度智能云发布云智一体的AI开发全栈模式』
- 0x0000007F蓝屏问题摸索解决-没有完成
- selenium+python笔记3
- 【Java】JDBC连接MySQL/SQLServer/Oracle三种数据库
- python--列表list
- python的json.dump参数使用
- eclipse中outline中图标含义
- C#.NET通用权限管理系统组件中数据集权限设置功能增加内部组织机构选项功能...
- 重新想象 Windows 8 Store Apps (23) - 文件系统: 文本的读写, 二进制的读写, 流的读写, 最近访问列表和未来访问列表...
- C#对STK11.4二次开发的Hello World
- Java Builder模式(设计模式之Builder模式)
- JRebel安装、最新激活方式
- MEMS传感市场,美/日/德企占主导地位
- n皇后问题回溯法-迭代实现
- php开发API接口的代码案例
- 实验二、贪吃蛇游戏开发
- Python编程沙龙召集令