1.目标

在我们的上一篇文章中,我们讨论了Kafka Producer。今天,我们将讨论Kafka Consumer。首先,我们将看到什么是Kafka Consumer和Kafka Consumer的例子。之后,我们将学习Kafka Consumer Group。此外,我们将看到Kafka Consumer的消费者记录API和配置设置。
创建Kafka Producer后,将消息发送到Apache Kafka集群。现在,我们正在创建一个Kafka Consumer来使用来自Kafka集群的消息。
所以,让我们详细讨论Kafka Consumer。

Apache Kafka Consumer | 卡夫卡消费者集团

2.什么是卡夫卡消费者?

从Kafka Topics读取数据的应用程序就是我们所说的Consumer。基本上,Kafka Consumer订阅了Kafka集群中的一个或多个主题,然后进一步提供来自Kafka主题的令牌或消息。  
此外,使用Heartbeat,我们可以了解Consumer与Kafka Cluster的连接性。但是,让我们定义Heartbeat。它设置在Consumer,让Zookeeper或Broker Coordinator知道Consumer是否仍然连接到Cluster。因此,如果心跳不存在,Kafka Consumer将不再连接到群集。在这种情况下,经纪协调员必须重新平衡负载。此外,Heartbeat是群集的开销。此外,通过考虑数据吞吐量和开销,我们可以配置心跳为消费者的时间间隔。

什么是Apache Kafka Consumer

此外,我们可以对消费者进行分组,而Kafka中的消费者群体中的消费者可以共享他们订阅的Kafka主题的分区。要理解,如果主题中有N个分区,Kafka Consumer Group中的N个消费者和该组已订阅主题,则每个消费者将从主题的分区中读取数据。因此,我们可以说,这只是一个消费者可以成群结队的提醒。
让我们用命令修改Apache Kafka Operations
要具体来说,要连接到Kafka集群并使用数据流,Kafka的Consumer API会有所帮助。
下面是显示Apache Kafka Consumer的图片:

Apache Kafka Consumer的工作

要订阅一个或多个主题并处理在应用程序中生成的记录流,我们使用此Kafka Consumer API。换句话说,我们使用KafkaConsumer API来使用来自Kafka集群的消息。而且,下面看KafkaConsumer类的构造函数。

  1. public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)

  • CONFIGS

返回消费者配置图。
KafkaConsumer类有以下重要方法:1。public 
java.util.Set <TopicPar-tition> assignment()
获取当前由使用者分配的分区集。
2. public string subscription()
为了订阅给定的主题列表,获取动态分配的分区。
探索Kafka性能调优 - Kafka优化的方法
3. public void sub-scribe(java.util.List <java.lang.String> topics,ConsumerRe-balanceListener listener)
此外,订阅给定动态分配的主题列表分区。
4。 public void unsubscribe()
现在,取消订阅给定分区列表中的主题。
5. public void sub-scribe(java.util.List <java.lang.String> topics)
为了订阅给定的主题列表以获取动态分配的分区。如果给定的主题列表为空,则将其视为与unsubscribe()相同。
6.  public void subscribe(java.util.regex.Pattern pattern,ConsumerRebalanceLis-tener listener)
这里,参数模式引用正则表达式格式的订阅模式,并且listener参数从订阅模式获取通知。
7. public void as-sign(java.util.List <TopicParti-tion> partitions)
手动为客户分配分区列表。
8.  民意调查()
获取使用其中一个subscribe / assign API指定的主题或分区的数据。如果在轮询数据之前未订阅主题,则会返回错误。
9. public void commitSync()
为了提交所有订阅的主题和分区列表的最后一个poll()中返回的偏移量。对commitAsyn()应用相同的操作。
10.  public void seek(TopicPartition partition,long offset)
获取消费者将在下一个poll()方法中使用的当前偏移值。
阅读Kafka的优点和缺点
11. public void resume()
为了恢复暂停的分区。
12. public void wakeup()
唤醒消费者。

3. ConsumerRecord API

基本上,要从Kafka集群接收记录,我们使用ConsumerRecord API。它包括一个主题名称,分区号,从中接收记录的偏移量也指向Kafka分区中的记录。此外,要创建具有特定主题名称,分区计数和<key,value>对的使用者记录,我们使用consumerRecord类。它的签名是:

  1. public ConsumerRecord (字符串主题,int分区,长偏移量,K键,V值)
  2. public ConsumerRecord(string topic,int partition, long offset,K key, V value)

  • 话题

从Kafka群集收到的消费者记录的主题名称。

  • 划分

主题的分区。

记录的密钥,如果没有密钥存在,则返回null。

记录内容。
学习Apache Kafka Streams | 流处理拓扑

4. ConsumerRecords API

基本上,它是ConsumerRecord的容器。要保留特定主题的每个分区的ConsumerRecord列表,我们使用此API。它的构造函数是:

  1. public ConsumerRecords(java.util.Map<TopicPartition,java.util.List
    <Consumer-Record>K,V>>> records)

  • TopicPartition

返回特定主题的分区映射。

  • 记录

返回ConsumerRecord列表。
这些是ConsumerRecords类的以下方法:
1。public  int count()
所有主题的记录数。
2.  public Set partitions()
具有此记录集中数据的分区集(如果未返回任何数据,则该集为空)。
3.  public Iterator iterator()
通常,迭代器使您可以遍历集合,获取或删除元素。
4.  public list records()
基本上,获取给定分区的记录列表。

5. ConsumerRecord API与ConsumerRecords API

一个。ConsumerRecord API
ConsumerRecord API是从Kafka接收的键/值对。它包含主题名称和分区号,从中接收记录以及指向Kafka分区中记录的偏移量。
湾 ConsumerRecords API
然而,ConsumerRecords API是一个容器,它为特定主题的每个分区保存ConsumerRecord列表。基本上,Consumer.poll(long)操作返回的每个主题分区都有一个ConsumerRecord列表。
Apache Kafka工作流程| Kafka Pub-Sub Messaging

6.配置设置

在这里,我们列出了Consumer客户端API的配置设置 - 
1.  bootstrap.servers
它引导了代理列表。
2.  group.id
将个人消费者分配给一个组。
3.  enable.auto.commit
基本上,如果值为true,则启用偏移的自动提交,否则不提交。
4.  auto.commit.interval.ms
基本上,它返回更新的消耗偏移量写入ZooKeeper的频率。
5.  session.timeout.ms
它表示Kafka在放弃并继续使用消息之前等待ZooKeeper响应请求(读或写)的毫秒数。

7. SimpleConsumer应用程序

确保生产者应用程序步骤在此处保持不变。这里也开始你的ZooKeeper和Kafka经纪人。此外,使用名为SimpleCon-sumer.java 的java类创建SimpleConsumer应用程序。然后键入以下代码:
阅读Apache Kafka职业范围与薪资趋势。

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class SimpleConsumer {public static void main(String[] args) throws Exception {if(args.length == 0){System.out.println("Enter topic name");return;}//Kafka consumer configuration settingsString topicName = args[0].toString();Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("session.timeout.ms", "30000");props.put("key.deserializer","org.apache.kafka.common.serializa-tion.StringDeserializer");props.put("value.deserializer","org.apache.kafka.common.serializa-tion.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);//Kafka Consumer subscribes list of topics here.
     consumer.subscribe(Arrays.asList(topicName))//print the topic nameSystem.out.println("Subscribed to topic " + topicName);int i = 0;while (true) {ConsumerRecords<String, String> records = con-sumer.poll(100);for (ConsumerRecord<String, String> record : records)// print the offset,key and value for the consumer records.System.out.printf("offset = %d, key = %s, value = %s\n",record.offset(), record.key(), record.value());}}
}

A。汇编

通过使用以下命令,我们可以编译应用程序。

  1. javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

B.执行

而且,使用以下命令我们可以执行应用程序。

  1. java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleConsumer <topic-name>

C.输入

此外,打开生产者CLI并向主题发送一些消息。我们可以将简单输入作为'Hello Consumer'。

d. 产量

该输出是

  1. 订阅主题Hello-Kafka
  2. offset = 3 ,key = null,value = Hello Consumer
  3. Subscribed to topic Hello-Kafka
    offset = 3, key = null, value = Hello Consumer

8.卡夫卡消费者集团

基本上,Kafka中的Consumer组是来自Kafka主题的多线程或多机器消费。

卡夫卡消费者 - 卡夫卡消费者集团

  • 通过使用相同的group.id,消费者可以加入一个组。
  • 组的最大并行度是组中的消费者数量←分区数。
  • 此外,Kafka将主题的分区分配给组中的消费者。因此,每个分区仅由该组中的一个消费者使用。
  • 此外,Kafka保证消息只能由组中的单个消费者读取。
  • 消费者可以按照日志中存储的顺序查看消息。

看看Storm Kafka与配置和代码
的集成 a。重新平衡消费者
基本上,增加更多流程/线程将导致Kafka重新平衡。基本上,如果任何消费者或代理以某种方式无法向ZooKeeper发送心跳,则可以通过Kafka群集重新配置它。此外,在此重新平衡期间,Kafka将可用分区分配给可用线程,可能将分区移动到另一个进程。

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class ConsumerGroup {public static void main(String[] args) throws Exception {if(args.length < 2){System.out.println("Usage: consumer <topic> <groupname>");return;}String topic = args[0].toString();String group = args[1].toString();Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", group);props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("session.timeout.ms", "30000");props.put("key.deserializer","org.apache.kafka.common.serializa-tion.StringDeserializer");props.put("value.deserializer","org.apache.kafka.common.serializa-tion.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);consumer.subscribe(Arrays.asList(topic));System.out.println("Subscribed to topic " + topic);int i = 0;while (true) {ConsumerRecords<String, String> records = con-sumer.poll(100);for (ConsumerRecord<String, String> record : records)System.out.printf("offset = %d, key = %s, value = %s\n",record.offset(), record.key(), record.value());}}
}
ii. Compilation
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*" ConsumerGroup.java
iii. Execution
>>java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*":.
ConsumerGroup <topic-name> my-group
>>java -cp "/home/bala/Workspace/kafka/kafka_2.11-0.9.0.0/libs/*":.
ConsumerGroup <topic-name> my-group

因此,我们可以看到我们创建了名称的样本组,my-group和两个消费者。
湾 输入
现在,在打开生产者CLI后,发送一些消息,如 -

Test consumer group 01
Test consumer group 02

  1. 测试消费者组01
  2. 测试消费者组02

C。第一个过程的输出
学习Apache Kafka用例| 卡夫卡应用程序

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 01

  1. 订阅主题Hello-kafka
  2. offset = 3 ,key = null,value =测试消费者组01

d。此外,第二个过程的输出

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 02

  1. 订阅主题Hello-kafka
  2. offset = 3 ,key = null,value =测试消费者组02

所以,这完全是关于Kafka的Apache Kafka消费者和消费者群体的例子。希望你喜欢我们的解释。

9.结论:卡夫卡消费者

因此,我们通过使用Java客户端演示详细了解了Kafka Consumer和ConsumerGroup。此外,通过这个,我们了解了如何使用Java客户端发送和接收消息。此外,我们讨论了Kafka Consumer记录API和Consumer Records API以及两者的比较。此外,我们还学习了Kafka Consumer客户端API的配置设置。但是,如果有任何疑问,请随时在评论部分询问。
另请参阅 -  
Kafka Broker 
供参考

转载于:https://www.cnblogs.com/a00ium/p/10850141.html

Apache Kafka Consumer 消费者集相关推荐

  1. kafka consumer消费者 offset groupID详解

    kafka consumer:消费者可以从多个broker中读取数据.消费者可以消费多个topic中的数据. 因为Kafka的broker是无状态的,所以consumer必须使用partition o ...

  2. java kafka 集群消费_kafka集群搭建和使用Java写kafka生产者消费者

    转自:http://chengjianxiaoxue.iteye.com/blog/2190488 1 kafka集群搭建 1.zookeeper集群 搭建在110, 111,112 2.kafka使 ...

  3. 【记一次kafka报org.apache.kafka.clients.consumer.CommitFailedException异常处理】

    项目场景: 项目中,使用到了kafka作为消息中间件,项目作为消费端,消费消息并进行业务处理 问题描述 在实际应用的过程中,发现偶尔但是一直存在的,有消费数据报:org.apache.kafka.cl ...

  4. 4.2.10 Kafka源码剖析, 阅读环境搭建, broker启动流程, topic创建流程, Producer生产者流程, Consumer消费者流程,

    目录 4.1 Kafka源码剖析之源码阅读环境搭建 4.1.1 安装配置Gradle 4.1.2 Scala的安装和配置 4.1.3 Idea配置 4.1.4 源码操作 4.2 Kafka源码剖析之B ...

  5. AKHQ:用于Apache Kafka管理主题、主题数据、消费者组、模式注册表、连接等的Kafka GUI。。。

    参考文章:https://www.5axxw.com/wiki/content/q7nyiu AKHQ(以前称为KafkaHQ) 用于Apache Kafka管理主题.主题数据.消费者组.模式注册表. ...

  6. Apache Kafka教程A系列:消费者群体示例

    原文地址:https://www.tutorialspoint.com/apache_kafka/apache_kafka_consumer_group_example.htm 消费者群体(group ...

  7. 关于Kafka 的 consumer 消费者手动提交详解

    前言 在上一篇 Kafka使用Java实现数据的生产和消费demo 中介绍如何简单的使用kafka进行数据传输.本篇则重点介绍kafka中的 consumer 消费者的讲解. 应用场景 在上一篇kaf ...

  8. 《Apache Kafka实战》读书笔记-调优Kafka集群

    <Apache Kafka实战>读书笔记-调优Kafka集群 作者:尹正杰 版权声明:原创作品,谢绝转载!否则将追究法律责任. 一.确定调优目标 1>.常见的非功能性要求 一.性能( ...

  9. kafka consumer配置拉取速度慢_Kafka消费者的使用和原理

    这周我们学习下消费者,仍然还是先从一个消费者的Hello World学起: public class Consumer { public static void main(String[] args) ...

最新文章

  1. 多个矩形,求覆盖面积,周长,及交点
  2. 中文版!学习TensorFlow、PyTorch、机器学习、深度学习和数据结构五件套!(附免费下载)...
  3. 解吧源码解析重点看withWeight
  4. 豆瓣评分9.3,陪伴无数程序员成长的神作,终于升级了!
  5. Symantec Backup Exec System Recovery还原向导
  6. Android开发之如何保证Service不被杀掉(前台服务)
  7. django render_2020年最新Django经典面试问题与答案汇总(下)大江狗整理
  8. C# Winform编程之Button
  9. Google高性能RPC框架gRPC 1.0.0发布
  10. Makefile用法链接
  11. php嵌套查询mysql语句_mysql 查询嵌套
  12. 项目微管理 - 总结也是新的开始
  13. “ create-react-app”和创建React应用程序的未来
  14. 源代码安装apache遇到的问题解决
  15. 关于提BUG的一点思考以及工作中总结的规范
  16. 小D课堂 - 新版本微服务springcloud+Docker教程_4-02 微服务调用方式之ribbon实战 订单调用商品服务...
  17. 小鬼授权系统源码全解密源码 附授权代码
  18. 【OS笔记 9】操作系统内核的功能
  19. OBCA认证知识点-part3
  20. os.path.dirname()用法

热门文章

  1. 【Android】高德地图在Debug模式下运行正常但是打Release包时则闪退解决办法
  2. 多相机BEV感知表达
  3. 51单片机模拟PS2协议制作5X5矩阵工业键盘
  4. 1.学Python后到底能干什么?
  5. 为什么瓜子一嗑就停不下来
  6. 科技企业捐赠武汉最新最全排名(截止2月13日)
  7. 【学习笔记】信息系统项目监理“四控三管一协调”以及监理工作的分类和监理单位的作用
  8. 签名不对,请检查签名是否与开放平台上填写的一致。
  9. 动画程序时长缩放是什么意思_1分钟做出高逼格动画!PPT中自带的小功能帮你一键搞定!...
  10. 二向箔-百日打卡writeup 1-5