新版的 Kafka 使用一个选举出来的 controller 来监听 zookeeper,其他 node 再去和 controller 通信,这么做的目的是为了减少 zookeeper 的压力。bootstrap-servers 会自动发现其他 broker,这也是 bootstrap 的含义

前面我们讲到了消费者,接下来我们详细讲下消费者个数改变,分区重新分配是如何接着消费的

首先我们还是一样创建一个新的主题topic 主题是名字叫bigdata  这里报错原因我已经创建过了,主题已经存在

发送一条消息 hello  因为有两个分区,其中一个分区有变化,一个没有变化

我们开启一个消费者,让其中一个分区被消费

查看zookeeper  ,进入zk客户端

[root@backup01 bin]# ./zkCli.sh
Connecting to localhost:2181
2020-04-06 08:54:43,793 [myid:] - INFO  [main:Environment@100] - Client environment:zookeeper.version=3.4.13-2d71af4dbe22557fda74f9a9b4309b15a7487f03, built on 06/29/2018 04:05 GMT
2020-04-06 08:54:43,816 [myid:] - INFO  [main:Environment@100] - Client environment:host.name=backup01
2020-04-06 08:54:43,816 [myid:] - INFO  [main:Environment@100] - Client environment:java.version=1.8.0_172
2020-04-06 08:54:43,817 [myid:] - INFO  [main:Environment@100] - Client environment:java.vendor=Oracle Corporation
2020-04-06 08:54:43,818 [myid:] - INFO  [main:Environment@100] - Client environment:java.home=/usr/local/java/jdk1.8.0_172/jre
2020-04-06 08:54:43,818 [myid:] - INFO  [main:Environment@100] - Client environment:java.class.path=/usr/local/hadoop/zookeeper/zookeeper-3.4.13/bin/../build/classes:/usr/local/hadoop/zookeeper/zookeeper-3.4.13/bin/../build/lib/*.jar:/usr/local/hadoop/zookeeper/zookeeper-3.4.13/bin/../lib/slf4j-log4j12-1.7.25.jar:/usr/local/hadoop/zookeeper/zookeeper-3.4.13/bin/../lib/slf4j-api-1.7.25.jar:/usr/local/hadoop/zookeeper/zookeeper-3.4.13/bin/../lib/netty-3.10.6.Final.jar:/usr/local/hadoop/zookeeper/zookeeper-3.4.13/bin/../lib/log4j-1.2.17.jar:/usr/local/hadoop/zookeeper/zookeeper-3.4.13/bin/../lib/jline-0.9.94.jar:/usr/local/hadoop/zookeeper/zookeeper-3.4.13/bin/../lib/audience-annotations-0.5.0.jar:/usr/local/hadoop/zookeeper/zookeeper-3.4.13/bin/../zookeeper-3.4.13.jar:/usr/local/hadoop/zookeeper/zookeeper-3.4.13/bin/../src/java/lib/*.jar:/usr/local/hadoop/zookeeper/zookeeper-3.4.13/bin/../conf:
2020-04-06 08:54:43,818 [myid:] - INFO  [main:Environment@100] - Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
2020-04-06 08:54:43,818 [myid:] - INFO  [main:Environment@100] - Client environment:java.io.tmpdir=/tmp
2020-04-06 08:54:43,818 [myid:] - INFO  [main:Environment@100] - Client environment:java.compiler=<NA>
2020-04-06 08:54:43,818 [myid:] - INFO  [main:Environment@100] - Client environment:os.name=Linux
2020-04-06 08:54:43,818 [myid:] - INFO  [main:Environment@100] - Client environment:os.arch=amd64
2020-04-06 08:54:43,818 [myid:] - INFO  [main:Environment@100] - Client environment:os.version=3.10.0-862.el7.x86_64
2020-04-06 08:54:43,818 [myid:] - INFO  [main:Environment@100] - Client environment:user.name=root
2020-04-06 08:54:43,818 [myid:] - INFO  [main:Environment@100] - Client environment:user.home=/root
2020-04-06 08:54:43,818 [myid:] - INFO  [main:Environment@100] - Client environment:user.dir=/usr/local/hadoop/zookeeper/zookeeper-3.4.13/bin
2020-04-06 08:54:43,819 [myid:] - INFO  [main:ZooKeeper@442] - Initiating client connection, connectString=localhost:2181 sessionTimeout=30000 watcher=org.apache.zookeeper.ZooKeeperMain$MyWatcher@277050dc
2020-04-06 08:54:43,845 [myid:] - INFO  [main-SendThread(localhost:2181):ClientCnxn$SendThread@1029] - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
Welcome to ZooKeeper!
JLine support is enabled
2020-04-06 08:54:43,960 [myid:] - INFO  [main-SendThread(localhost:2181):ClientCnxn$SendThread@879] - Socket connection established to localhost/127.0.0.1:2181, initiating session
2020-04-06 08:54:43,985 [myid:] - INFO  [main-SendThread(localhost:2181):ClientCnxn$SendThread@1303] - Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x1000393bf830000, negotiated timeout = 30000WATCHER::WatchedEvent state:SyncConnected type:None path:null

我们进入了zk客户端的根目录   看到的除了zookeeper其他都是kafka

这个controller是后面要聊的,是关于kafka的老大controller不是指定的,这个controller是争抢资源的,谁先抢到谁就是老大,正常情况下,先启动的就是老大

[zk: localhost:2181(CONNECTED) 1] get /controllercontroller_epoch   controller
[zk: localhost:2181(CONNECTED) 1] get /controller
{"version":1,"brokerid":0,"timestamp":"1586133981273"}
cZxid = 0x500000003
ctime = Mon Apr 06 08:46:21 CST 2020
mZxid = 0x500000003
mtime = Mon Apr 06 08:46:21 CST 2020
pZxid = 0x500000003
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x2000392dd2c0000
dataLength = 54
numChildren = 0
[zk: localhost:2181(CONNECTED) 2] 

我们看到brokerid:0   ---> backup01:9092

[zk: localhost:2181(CONNECTED) 3] ls /brokers
[ids, topics, seqid]
[zk: localhost:2181(CONNECTED) 4]

broker/ids    0    1     2

topics有5个主题

[zk: localhost:2181(CONNECTED) 2] ls /controller
[]
[zk: localhost:2181(CONNECTED) 3] ls /brokers
[ids, topics, seqid]
[zk: localhost:2181(CONNECTED) 4] ls /brokers/ids
[0, 1, 2]
[zk: localhost:2181(CONNECTED) 5] ls /brokers/topics
[secend, bigdata, three, first, __consumer_offsets]
[zk: localhost:2181(CONNECTED) 6]

在新的版本中topics中多了一个__consumer_offsets这样一个主题

查看到消费者的默认50个分区信息

这里我也不知道为啥在zk客户端查看consumers(消费者组consumer-group)信息为空,,从上面的消息可以知道,我的生产者和消费者都是正常的,不知道新版本有什么特殊的改变,

查资料可知:Kafka 0.9 版本之前,consumer 默认将 offset 保存在 Zookeeper 中,从 0.9 版本开始,consumer 默认将 offset 保存在 Kafka 一个内置的 topic 中,该 topic 为__consumer_offsets。

1)修改配置文件 consumer.properties

exclude.internal.topics=false
cd /usr/local/hadoop/kafka/kafka_2.12-2.4.1/config
vim consumer.properties

所以我们知道了新版的kafka使用__consumer_offsets想要消费生成功 必须修改consumer.properties 这个配置文件才能正常记录消费信息

__consumer_offsets这个也是个主题,正常情况下,kafka并不希望我们去获取这个主题下的数据,我们这里是为了看里面的消费情况,所以我们就设置一下

2)读取 offset  (我们使用的是0.11版本之后)

0.11.0.0 之前:
bin/kafka-console-consumer.sh --topic __consumer_offsets zookeeper backup01:2181 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties from-beginning
0.11.0.0 之后版本(含):
bin/kafka-console-consumer.sh --topic __consumer_offsets zookeeper backup01:2181 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties frombeginning 

这个很关键,指定配置文件,如果不指定,那就不生效

这里抛出一个问题,为什么要使用zookeeper?

假设我们要启动一个消费者A去消费这个T1这个主题,我们连的是bootstrap-server,这个A 会把消息放到 topic 为__consumer_offsets这个主题下,那A是不是相当于__consumer_offsets这个主题的生产者,我们要消费的是__consumer_offsets,如果我们这里__consumer_offsets连的bootstrap-server,感觉这个__consumer_offsets又充当了生产者,感觉自己生产数据自己消费,有点怪怪的,所以我们这里使用了zookeeper,

但是在新的版本中

[root@backup01 kafka_2.12-2.4.1]# bin/kafka-console-consumer.sh --topic __consumer_offsets zookeeper backup01:2181 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties frombeginning
Missing required argument "[bootstrap-server]"

我们从consumer.properties配置文件也可以只是这里用的是bootstrap.servers

Missing required argument "[bootstrap-server]"那我们不能用zookeeper了

[root@backup01 kafka_2.12-2.4.1]# bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server backup01:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties frombeginning

key value形式      [test-consumer-group,__consumer_offsets,3]的hash值我们不知道,但是程序自己是能找到那个分区(50个分区中)能确定唯一的offset

假如一个消费者组的的其中一个消费者挂了,其他消费者怎么能拿到offset,接着消费呢?

因为是同一个组的,挂了之后会通知其他消费者会带着同一个组名来继续消费

我们可以发现大概1秒钟更新一次,这个提交是速度是可以改的

补充一点消费者组kafka的命令操作:

我们通过其它方式查看在kafka消费者组的信息  ,查看consumer group列表,使用--list参数

查看consumer group列表有新、旧两种命令,分别查看新版(信息保存在broker中)consumer列表和老版(信息保存在zookeeper中)consumer列表,因而需要区分指定bootstrap--server和zookeeper参数:

注意:在新的版本中老版本的查询方式已经移除,这里只提供新版本的查询方式

bin/kafka-consumer-groups.sh new--consumer --bootstrap-server backup01:9092 --list

使用kafka的bin目录下面的kafka-consumer-groups.sh命令可以查看offset消费情况,注意,如果你的offset是存在kafka集群上的,就指定kafka服务器的地址bootstrap-server:

 ./bin/kafka-consumer-groups.sh --bootstrap-server backup01:9092 --describe --group console-consumer-63897
GROUP                  TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                            HOST            CLIENT-ID
console-consumer-63897 bigdata         0          -               3               -               consumer-console-consumer-63897-1-255b7119-e051-4ee1-ad64-d44581be1d20 /192.168.0.120  consumer-console-consumer-63897-1
console-consumer-63897 bigdata         1          -               2               -               consumer-console-consumer-63897-1-255b7119-e051-4ee1-ad64-d44581be1d20 /192.168.0.120  consumer-console-consumer-63897-1
[root@backup01 kafka_2.12-2.4.1]# ./bin/kafka-consumer-groups.sh --bootstrap-server backup01:9092 --describe --group console-consumer-69882GROUP                  TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                            HOST            CLIENT-ID
console-consumer-69882 bigdata         0          -               3               -               consumer-console-consumer-69882-1-7df49099-2e83-4871-8468-3279b3213edb /192.168.0.120  consumer-console-consumer-69882-1
console-consumer-69882 bigdata         1          -               2               -               consumer-console-consumer-69882-1-7df49099-2e83-4871-8468-3279b3213edb /192.168.0.120  consumer-console-consumer-69882-1

补充说明:

关于这个__consumer_offsets(后续再说,这里大概了解下)

抛出问题:

__consumer_offsets这个topic是由kafka自动创建的,默认50个,但是都存在一台kafka服务器上,这是不是就存在很明显的单点故障? 经测试,如果将存储consumer_offsets的这台机器kill掉,所有的消费者都停止消费了。请问这个问题是怎么解决的呢?

原因分析:

由于__consumer_offsets这个用于存储offset的分区是由kafka服务器默认自动创建的,那么它在创建该分区的时候,分区数和副本数的依据是什么? 分区数是固定的50,这个没什么可怀疑的,副本数呢?应该是一个默认值1,依据是,如果我们没有在server.properties文件中指定topic分区的副本数的话,它的默认值就是1。

__consumer_offsets是一个非常重要的topic,我们怎么能允许它只有一个副本呢?这样就存在单点故障,也就是如果该分区所在的集群宕机了的话,
我们的消费者就无法正常消费数据了。

我在笔记本上搭建了kafka集群,共3个Broker,来解决这个问题。下面是一些记录。

说明:如果你的__consumer_offsets这个topic已经被创建了,而且只存在一台broker上,如果你直接使用命令删除这个topic是会报错了,提示这是kafka内置的topic,禁止删除。可以在zookeeper中删除(我是使用ZooInspector这个客户端连上zookeeper,删除和__consumer_offsets这个topic有关的目录或节点)。

大数据技术之kafka (第 3 章 Kafka 架构深入 ) offset讲解相关推荐

  1. 大数据技术原理与应用——第一章 大数据概述

    第一章 大数据概述 1.1 大数据时代 1.1.1 第三次信息化浪潮 信息化浪潮 发生时间 标志 解决的问题 代表企业 第一次浪潮 1980年前后 个人计算机 信息处理 Intel.AMD.IBM.苹 ...

  2. 大数据技术原理与应用——第一章

    大数据时代 三次信息化浪潮 信息化浪潮 时间 标志 解决的问题 第一次信息化浪潮 1980 个人计算机 信息处理 第二次信息化浪潮 1995 互联网 信息传输 第三次信息化浪潮 2010 大数据.云计 ...

  3. 百分点大数据技术团队:可插拔OSS架构设计和实战经验

    编者按:随着互联网.大数据和人工智能等技术的发展,信息资源得到最大程度的共享,但随之而来的海量文件存取的功能和性能问题也日渐突出.在政务领域解决方案中,对象存储往往扮演着非常重要的角色,如全国各地的健 ...

  4. 大数据技术之Hive 第4章 DDL数据定义语言

    第4章 DDL数据定义语言 4.1 创建数据库 CREATE DATABASE [IF NOT EXISTS] database_name [COMMENT database_comment] [LO ...

  5. 百分点大数据技术团队:互联网舆情系统的架构实践

    现代社会是一个信息驱动的社会,每天都有大量的信息产生. 据统计,互联网上每天有数十亿条媒体文章产生,在线信息检索超过500亿次. 伴随着互联网技术的发展和新媒体创新应用,人们越来越倾向于通过微博.微信 ...

  6. 大数据技术技能分析大赛——第一章 python数据分析概述

    目标:掌握python,进行数据处理.统计分析.回归建模和数据可视化. 教材:<大数据分析务实初级教程(python)## 标题> 第一章 python数据分析概述 1.数据分析概述 1 ...

  7. 大数据技术之Shell 第11章 企业真实面试题

    第10章 正则表达式入门 正则表达式使用单个字符串来描述.匹配一系列符合某个句法规则的字符串.在很多文本编辑器里,正则表达式通常被用来检索.替换那些符合某个模式的文本.在Linux中,grep,sed ...

  8. 大数据技术之 Flume

    为什么选用Flume Python爬虫数据 Java后台日志数据 服务器本地磁盘 文件夹 HDFS Flume Flume最主要的作用就是,实时读取服务器本地磁盘的数据,将数据写入到HDFS. 网络端 ...

  9. axure9数据统计插件_WMDA:大数据技术栈的综合实践

    一.概述 WMDA是58自主开发的用户行为分析产品,同时也是一款支持无埋点的数据采集产品,只需要在第一次使用的时候加载一段SDK代码,即可采集全量.实时的PC.M.APP三端以及小程序的用户行为数据. ...

  10. 大数据技术之MapReduce

    大数据技术之MapReduce 目录 大数据技术之MapReduce 第 1 章 MapReduce 概述 1.1 MapReduce 定义 1.2 MapReduce 优缺点 1.2.1 优点 1. ...

最新文章

  1. 2021年大数据HBase(十五):HBase的Bulk Load批量加载操作
  2. Office Live for Small Business--开启您创业的大门
  3. python3项目-python3项目
  4. 自己写一个实现ApplicationListener​接口并且把该组件加入到容器中
  5. ubuntu下永久设置文件句柄数和虚拟内存,不需重启
  6. java继承与覆盖_简单的继承,方法重载与方法覆盖
  7. [vue] 如何在子组件中访问父组件的实例?
  8. count函数里加函数_PHP count()函数与示例
  9. java addobject_springMVC后台的值无法通过ModelAndView的addObject传到前台
  10. c#12星座速配代码_原来12星座的软件工程师是这样的
  11. SAP License:SAP收购Roambi,领导全球市场商务分析云迁移
  12. 如何用100美元和TensorFlow来造一个能“看”东西的机器人
  13. 前端综合学习笔记---异步、ES6/7、Module、Promise同步 vs 异步
  14. 输入n个字符串字典序排序_30个Python奇淫技巧集
  15. 【sklearn第十五讲】决策树之回归篇
  16. Android支付实践(三)之银联支付功能(客户端+服务端)
  17. 软件测试有效性指标,软件测试用例评审有效性的44个衡量标准[1]
  18. 计算机病毒防治——计算机病毒种类
  19. 写给独自站在人生面前的自己1-java加密算法
  20. UnicodeTOGB,能够将Unicode串转换成GB码,方便开发。

热门文章

  1. python实例 99,100
  2. 数学建模学习笔记(十二)——奇异值分解
  3. 安卓软件错误log_关于android程序的log错误信息的问题,请明白人看下吧,我是初学者,新建的android项目,一个代码都没改过...
  4. 百度贴吧发帖软件_贴吧自动发帖软件
  5. 解决$ is not define
  6. ERROR: Cannot unpack file C:\Users\admin\AppData\Local\Temp\pip-unpack-yo8pmupp\simple.htm (download
  7. 无法打开物理文件 XXX.mdf,操作系统错误 5.5(拒绝访问) 的解决办法
  8. 抽卡时下拉通知栏导致icon丢失
  9. 在Windows 7 x64 上编译libsvn
  10. linux下的pdf分割合并软件