大数据学习笔记-------------------(17_3)
17.3 消费者群例子
消费群是多线程或多机器接收KafkaTopic。
17.3.1 消费者群
Ø 消费者可以通过使用相同的“group.id”来加入组。
Ø 组的最大并行数目是组中消费者数<=分区数。
Ø Kafka将Topic分区分配给组中的消费者,以便每个分区都由组中的一个消费者使用。
Ø Kafka保证消息只能被组中的一个消费者读取。
Ø 消费者可以按照消息存储在日志中的顺序查看消息。
17.3.2消费者重现平衡
添加更多进程/线程将导致Kafka重新平衡。如果任何消费者或broker无法向ZooKeeper发送心跳,则可以通过Kafka集群重新配置。在重新平衡期间,Kafka将分配可用分区到可用线程,可能将分区移动到另一个进程:
import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class ConsumerGroup {public static void main(String[] args) throws Exception {if(args.length < 2){System.out.println("Usage: consumer <topic> <groupname>");return;}String topic = args[0].toString();String group = args[1].toString();Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", group);props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("session.timeout.ms", "30000");props.put("key.deserializer", "org.apache.kafka.common.serializa-tion.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serializa-tion.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);consumer.subscribe(Arrays.asList(topic));System.out.println("Subscribed to topic " + topic);int i = 0;while (true) {ConsumerRecords<String, String> records = con-sumer.poll(100);for (ConsumerRecord<String, String> record : records)System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());}}
}
编译:应用程序用下面的命令进行编译。
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” ConsumerGroup.java
执行:用下面的命令进行执行。
java -cp“/path/to/kafka/kafka_2.11-0.9.0.0/libs/*":. ConsumerGroup<topic-name> my-group
java -cp"/home/bala/Workspace/kafka/kafka_2.11-0.9.0.0/libs/*":.ConsumerGroup <topic-name> my-group
输入:打开生产者CLI,发送像下面的信息
Test consumer group 01
Test consumer group 02
第一个进程输出:
Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumergroup 01
第二个进程输出:
Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumergroup 02
转载于:https://my.oschina.net/u/2291665/blog/884970
大数据学习笔记-------------------(17_3)相关推荐
- 大数据学习笔记:Hadoop生态系统
文章目录 一.Hadoop是什么 二.Hadoop生态系统图 三.Hadoop生态圈常用组件 (一)Hadoop (二)HDFS (三)MapReduce (四)Hive (五)Hbase (六)Zo ...
- Hadoop 大数据学习笔记
Hadoop 大数据学习笔记1 大数据部门组织架构 Hadoop Hadoop是什么 Hadoop的优势 Hadoop的组成 HDFS架构 YARN架构 MapReduce 大数据技术生态体系![在这 ...
- 大数据学习笔记(一)
大数据学习笔记(一)大数据概论 大数据是什么 1大数据概念:(big data ) : 指无法在一定时间内用常规软件工具进行捕捉.管理和处理数据集合,是需要新处理模式才能具有更强的决策力.洞察发现力和 ...
- 大数据学习笔记第1课 Hadoop基础理论与集群搭建
大数据学习笔记第1课 Hadoop基础理论与集群搭建 一.环境准备 二.下载JDK 三.安装JDK 四.下载hadoop 五.安装hadoop集群 六.打通3台服务器的免密登录 七.hadoop集群配 ...
- 大数据学习笔记二:Ubuntu/Debian 下安装大数据框架Hadoop
文章目录 安装Java 为Hadoop创建用户 安装Hadoop 配置Hadoop 配置环境变量 设置配置文件 格式化namenode 启动hadoop集群 访问hadoop集群 大数据学习系列文章: ...
- 大数据学习笔记一:大数据的发展历程--MapReduce,Hive,Yarn,Hadoop,Spark,Flink
大数据学习系列文章:大数据-博客专栏 今天在学习极客时间专栏:<从0开始学大数据> 从预习 01 | 大数据技术发展史:大数据的前世今生到预习 03 | 大数据应用领域:数据驱动一切,系统 ...
- 此文献给正打算入门大数据的朋友:大数据学习笔记1000条(2)
501.MapReduce计算框架中的输入和输出的基本数据结构是键-值对. 502.Hadoop神奇的一部分在于sort和shuffle过程. 503.Hive驱动计算的"语言"是 ...
- 大数据学习笔记-hadoop(1)
目录 前言 一.什么是大数据? 二.Ubuntu基础配置 1.安装Ubuntu20.04 2.修改系统语言为中文 3.修复vi编辑器 4.使用root账户 5.网络配置 6.配置源 7.更新 三.ss ...
- 大数据学习笔记1000条
1.Zookeeper用于集群主备切换. 2.YARN让集群具备更好的扩展性. 3.Spark没有存储能力. 4.Spark的Master负责集群的资源管理,Slave用于执行计算任务. 5.Hado ...
最新文章
- 刚才遇到了关于C#使用外部DLL函数上的char*的问题。
- webstorm如何自动换行_怎样在word中自动生成目录
- Mybatis高级应用 查询缓存
- Java CAS无锁技术深度解析
- POJ 2777 ZOJ 1610 HDU 1698 --线段树--区间更新
- java oracle序列化_Java序列化(Serialization)的理解
- zuul网关_SpringCould之服务网关(zuul)介绍与配置
- Android底部菜单栏的两种实现方式 附完整源码
- 【EI会议推荐】机电一体化、自动化与智能控制领域
- 软件测试——selenium环境搭建及自动化测试
- 卷积码 c语言编码,利用c语言实现卷积码编码器示例
- Node-跟着李南江学编程
- 分析在智能语音对话流程
- vue项目运行后页面一片空白
- SpringCloud Alibaba 从入门到精通(精选)
- 增加/删除临时IP地址
- html控制智能家居,一种通过web控制的智能家居系统的制作方法
- OpenCV——角点检测原理分析(Harris,Shi-Tomasi、亚像素级角点检测)
- 【IT情感】个性是成功的利器
- bpnn——matlab工具箱-归一化函数 premnmx、tramnmx、postmnmx、mapminmax
热门文章
- Codeforces Round #375 (Div. 2) B. Text Document Analysis(字符串处理,简单题目)
- Ubuntu22.04更换国内镜像源(阿里、网易163、清华、中科大)
- 一个13年ABAP老兵的建议:了解这些基础知识,对ABAP开发有百利而无一害
- Android 自定义字体样式 及系统默认字体样式 的设置
- 一个等号= 二个等号== 三个等号=== 的区别
- 程序员漫画集连载[1]
- facebook网页版登录_微信网页版关闭登录将影响一大批使用itchat等Web Api方案的微信机器人...
- win7网站服务器空间怎么清理,window_Win7系统如何清理C盘空间?Win7系统清理C盘空间小技巧, Win7系统清理C盘空间小技 - phpStudy...
- 【Multisim 14.0】软件安装教程
- 有时候缘分来了,挡也挡不住,我们终究能等到对的那个人。