17.3 消费者群例子

消费群是多线程或多机器接收KafkaTopic。

17.3.1 消费者群

Ø  消费者可以通过使用相同的“group.id”来加入组。

Ø  组的最大并行数目是组中消费者数<=分区数。

Ø  Kafka将Topic分区分配给组中的消费者,以便每个分区都由组中的一个消费者使用。

Ø  Kafka保证消息只能被组中的一个消费者读取。

Ø  消费者可以按照消息存储在日志中的顺序查看消息。

17.3.2消费者重现平衡

添加更多进程/线程将导致Kafka重新平衡。如果任何消费者或broker无法向ZooKeeper发送心跳,则可以通过Kafka集群重新配置。在重新平衡期间,Kafka将分配可用分区到可用线程,可能将分区移动到另一个进程:

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class ConsumerGroup {public static void main(String[] args) throws Exception {if(args.length < 2){System.out.println("Usage: consumer <topic> <groupname>");return;}String topic = args[0].toString();String group = args[1].toString();Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", group);props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("session.timeout.ms", "30000");props.put("key.deserializer", "org.apache.kafka.common.serializa-tion.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serializa-tion.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);consumer.subscribe(Arrays.asList(topic));System.out.println("Subscribed to topic " + topic);int i = 0;while (true) {ConsumerRecords<String, String> records = con-sumer.poll(100);for (ConsumerRecord<String, String> record : records)System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());}}
}

编译:应用程序用下面的命令进行编译。

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” ConsumerGroup.java

执行:用下面的命令进行执行。

java -cp“/path/to/kafka/kafka_2.11-0.9.0.0/libs/*":. ConsumerGroup<topic-name> my-group
java -cp"/home/bala/Workspace/kafka/kafka_2.11-0.9.0.0/libs/*":.ConsumerGroup <topic-name> my-group

输入:打开生产者CLI,发送像下面的信息

Test consumer group 01
Test consumer group 02

第一个进程输出:

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumergroup 01

第二个进程输出:

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumergroup 02

转载于:https://my.oschina.net/u/2291665/blog/884970

大数据学习笔记-------------------(17_3)相关推荐

  1. 大数据学习笔记:Hadoop生态系统

    文章目录 一.Hadoop是什么 二.Hadoop生态系统图 三.Hadoop生态圈常用组件 (一)Hadoop (二)HDFS (三)MapReduce (四)Hive (五)Hbase (六)Zo ...

  2. Hadoop 大数据学习笔记

    Hadoop 大数据学习笔记1 大数据部门组织架构 Hadoop Hadoop是什么 Hadoop的优势 Hadoop的组成 HDFS架构 YARN架构 MapReduce 大数据技术生态体系![在这 ...

  3. 大数据学习笔记(一)

    大数据学习笔记(一)大数据概论 大数据是什么 1大数据概念:(big data ) : 指无法在一定时间内用常规软件工具进行捕捉.管理和处理数据集合,是需要新处理模式才能具有更强的决策力.洞察发现力和 ...

  4. 大数据学习笔记第1课 Hadoop基础理论与集群搭建

    大数据学习笔记第1课 Hadoop基础理论与集群搭建 一.环境准备 二.下载JDK 三.安装JDK 四.下载hadoop 五.安装hadoop集群 六.打通3台服务器的免密登录 七.hadoop集群配 ...

  5. 大数据学习笔记二:Ubuntu/Debian 下安装大数据框架Hadoop

    文章目录 安装Java 为Hadoop创建用户 安装Hadoop 配置Hadoop 配置环境变量 设置配置文件 格式化namenode 启动hadoop集群 访问hadoop集群 大数据学习系列文章: ...

  6. 大数据学习笔记一:大数据的发展历程--MapReduce,Hive,Yarn,Hadoop,Spark,Flink

    大数据学习系列文章:大数据-博客专栏 今天在学习极客时间专栏:<从0开始学大数据> 从预习 01 | 大数据技术发展史:大数据的前世今生到预习 03 | 大数据应用领域:数据驱动一切,系统 ...

  7. 此文献给正打算入门大数据的朋友:大数据学习笔记1000条(2)

    501.MapReduce计算框架中的输入和输出的基本数据结构是键-值对. 502.Hadoop神奇的一部分在于sort和shuffle过程. 503.Hive驱动计算的"语言"是 ...

  8. 大数据学习笔记-hadoop(1)

    目录 前言 一.什么是大数据? 二.Ubuntu基础配置 1.安装Ubuntu20.04 2.修改系统语言为中文 3.修复vi编辑器 4.使用root账户 5.网络配置 6.配置源 7.更新 三.ss ...

  9. 大数据学习笔记1000条

    1.Zookeeper用于集群主备切换. 2.YARN让集群具备更好的扩展性. 3.Spark没有存储能力. 4.Spark的Master负责集群的资源管理,Slave用于执行计算任务. 5.Hado ...

最新文章

  1. 刚才遇到了关于C#使用外部DLL函数上的char*的问题。
  2. webstorm如何自动换行_怎样在word中自动生成目录
  3. Mybatis高级应用 查询缓存
  4. Java CAS无锁技术深度解析
  5. POJ 2777 ZOJ 1610 HDU 1698 --线段树--区间更新
  6. java oracle序列化_Java序列化(Serialization)的理解
  7. zuul网关_SpringCould之服务网关(zuul)介绍与配置
  8. Android底部菜单栏的两种实现方式 附完整源码
  9. 【EI会议推荐】机电一体化、自动化与智能控制领域
  10. 软件测试——selenium环境搭建及自动化测试
  11. 卷积码 c语言编码,利用c语言实现卷积码编码器示例
  12. Node-跟着李南江学编程
  13. 分析在智能语音对话流程
  14. vue项目运行后页面一片空白
  15. SpringCloud Alibaba 从入门到精通(精选)
  16. 增加/删除临时IP地址
  17. html控制智能家居,一种通过web控制的智能家居系统的制作方法
  18. OpenCV——角点检测原理分析(Harris,Shi-Tomasi、亚像素级角点检测)
  19. 【IT情感】个性是成功的利器
  20. bpnn——matlab工具箱-归一化函数 premnmx、tramnmx、postmnmx、mapminmax

热门文章

  1. Codeforces Round #375 (Div. 2) B. Text Document Analysis(字符串处理,简单题目)
  2. Ubuntu22.04更换国内镜像源(阿里、网易163、清华、中科大)
  3. 一个13年ABAP老兵的建议:了解这些基础知识,对ABAP开发有百利而无一害
  4. Android 自定义字体样式 及系统默认字体样式 的设置
  5. 一个等号= 二个等号== 三个等号=== 的区别
  6. 程序员漫画集连载[1]
  7. facebook网页版登录_微信网页版关闭登录将影响一大批使用itchat等Web Api方案的微信机器人...
  8. win7网站服务器空间怎么清理,window_Win7系统如何清理C盘空间?Win7系统清理C盘空间小技巧,  Win7系统清理C盘空间小技 - phpStudy...
  9. 【Multisim 14.0】软件安装教程
  10. 有时候缘分来了,挡也挡不住,我们终究能等到对的那个人。