本篇教程探讨了大数据技术之一次KAFKA消费者异常引起的思考,希望阅读本篇文章以后大家有所收获,帮助大家对相关内容的理解更加深入。

问题描述:

线上出现一台服务器特别慢,于是关闭了服务器上的kafka broker. 关闭后发现一些kafka consumer无法正常消费数据了, 日志错误:

o.a.kakfa.clients.consumer.internals.AbstractCordinator  Marking the coordinator (39.0.2.100) as dead.

原因:

经过一番排查,发现consumer group信息:

(kafka.coordinator.GroupMetadataMessageFormatter类型):

groupId::[groupId,Some(consumer),groupState,Map(memberId -> [memberId,clientId,clientHost,sessionTimeoutMs], ...->[]...)],

存到了KAFKA内部topic: __consumer_offsets里, , 它的key是 groupId.

同时发现broker 参数 offsets.topic.replication.factor 错误地被设置为1. 这个参数表示TOPIC: __Consumer_offsets 的副本数.  这样一旦某个broker被关闭, 如果被关闭的Broker 是__Consumer_offsets的某些partition的Leader. 则导致某些consumer group 不可用. 如果一旦broker已经启动, 需要手工通过命令行来扩展副本数.reassignment.json:{"version":1, "partitions": [{"topic": "xxx", "partition": 0, "replicas": {brokerId1, brokerId2}}]}kafka-reassign-partitions  --zookeeper localhost:2818 --reassignment-json-file  reassignment.json --execute

客户端寻找Consumer Coordinator的过程:

客户端 org.apache.kafka.clients.consumer.internals.AbstractCoordinator

如果Coordinator 未知 (AbstractCoordinator.coordinatorUnknown()), 发起请求 lookupCoordinator,向负载最低的节点发送FindCoordinatorRequest

服务端 KafkaApis.handleFindCoordinatorRequest 接收请求:

首先调用  GroupMetaManager.partitionFor(consumerGroupId)   consunerGroupId 的 hashCode 对 __consumer_offsets 总的分片数取模获取partition id 再从 __consumer_offset 这个Topic 中找到partition对应的 Partition Metadata, 并且获取对应的Partition leader  返回给客户端

引伸思考

KAFKA 的failover机制究竟是怎么样的?假使 __consumer_offset 设置了正确的副本数,重选举的过程是怎样的. 如果broker宕机后导致某些副本不可用, 副本会自动迁移到其他节点吗?带着这些问题稍微阅读了一下KAFKA的相关代码:

当一个Broker 被关掉时, 会有两步操作:

KafkaController.onBrokerFailure ->KafkaController.onReplicasBecomeOffline

主要是通过 PartitionStateMachine.handleStateChanges 方法通知Partition状态机将状态置为offline. ReplicaStateMachine.handleStateChanges方法会将Replica 状态修改为OfflineReplica, 同时修改partition ISR. 如果被关闭broker 是partition leader 那么需要重新触发partition leader 选举,最后发送LeaderAndIsrRequest获取最新的Leader ISR 信息.

KafkaController.unregisterBrokerModificationsHandler 取消注册的BrokerModificationsHandler 并取消zookeeper 中broker 事件的监听.

当ISR请求被发出,KafkaApis.handleLeaderAndIsrRequest() 会被调用. 这里如果需要变更leader的partition是属于__consumer_offset这个特殊的topic,取决于当前的broker节点是不是partition leader. 会分别调用GroupCoordinator.handleGroupImmigration 和 GroupCoordinator.handleGroupEmmigration. 如果是partition leader, GroupCoordinator.handleGroupImmigration -> GroupMetadataManager.loadGroupsForPartition 会重新从 __consumer_offset 读取group数据到本地metadata cache, 如果是partition follower, GroupCoordniator.handleGroupImigration -> GroupMetadataManager.removeGroupsForPartition 会从metadata cache中移除group信息. 并在onGroupUnloaded回调函数中将group的状态变更为dead. 同时通知所有等待join或者sync的组成员.

KAFKA在Broker关闭时不会自动做partition 副本的迁移. 这时被关闭的Broker上的副本变为under replicated 状态. 这种状态将持续直到Broker被重新拉起并且追上新的数据, 或者用户通过命令行 手动复制副本到其他节点.

官方建议设置两个参数来保证graceful shutdown.   controlled.shutdown.enable=true     auto.leader.rebalance.enable=true前者保证关机之前将日志数据同步到磁盘,并进行重选举. 后者保证在broker重新恢复后再次获得宕机前leader状态. 避免leader分配不均匀导致读写热点.

本文由职坐标整理发布,学习更多的相关知识,请关注职坐标IT知识库!

python消费kafka逻辑处理导致cpu升高_大数据技术之一次KAFKA消费者异常引起的思考...相关推荐

  1. python消费kafka逻辑处理导致cpu升高_爬虫架构|利用Kafka处理数据推送问题(1)

    如下图1-1所示,我们之前爬虫集群在采集完数据之后是直接插入到MySQL数据库中,分发服务再消费MySQL里面的数据.这样的设计会有两个主要的问题: 随着数据量越来越大,数据保存和数据存取的响应效率是 ...

  2. python消费kafka逻辑处理导致cpu升高_请教:Python模块KafkaConsumer会被Kerberos的状态影响嘛?...

    请教大家一下Kafka队列和Kerberos票据的问题. 我在运行一段python代码的时候, from kafka import KafkaConsumer, KafkaProducer impor ...

  3. python消费kafka逻辑处理导致cpu升高_Kafka 消费迟滞监控工具 Burrow

    Kafka 官方对于自身的 LAG 监控并没有太好的方法,虽然Kafka broker 自带有 kafka-topic.sh, kafka-consumer-groups.sh, kafka-cons ...

  4. kafka分区与分组原理_大数据技术-Kafka入门

    在大数据学习当中,主要的学习重点就是大数据技术框架,针对于大数据处理的不同环节,需要不同的技术框架来解决问题.以Kafka来说,主要就是针对于实时消息处理,在大数据平台当中的应用也很广泛.大数据学习一 ...

  5. python2 json大数据_大数据技术之python 操作json

    本篇文章探讨了大数据技术之python 操作json,希望阅读本篇文章以后大家有所收获,帮助大家对相关内容的理解更加深入. #!/usr/bin/env python import json s = ...

  6. 大数据开发和python的区别_大数据技术和python开发工程师

    容易来说,从大数据的生命周期来看,无外乎四个方面:大数据采集.大数据预处理.大数据存储.大数据分析,共同组成了大数据生命周期里最核心的技术,下面分开来说: 一.大数据采集 大数据采集,即对各种来源的结 ...

  7. kafka修改分区数_大数据技术:解析SparkStreaming和Kafka集成的两种方式

    Spark Streaming是基于微批处理的流式计算引擎,通常是利用Spark Core或者Spark Core与Spark Sql一起来处理数据.在企业实时处理架构中,通常将Spark Strea ...

  8. python编程技术总结_大数据技术学习之Spark技术总结

    Spark是基于内存的迭代计算框架,适用于需要多次操作特定数据集的应用场合.需要反复操作的次数越多,所需读取的数据量越大,受益越大,数据量小但是计算密集度较大的场合,受益就相对较小(大数据库架构中这是 ...

  9. python大数据技术_大数据技术python

    {"moduleinfo":{"card_count":[{"count_phone":1,"count":1}],&q ...

最新文章

  1. 在windows上使用virt-manager
  2. NSTimer与Run loop Modes
  3. windows mobile C# net3.5 如何捕捉Arrow的CheckBox的KeyDown事件?
  4. 跨域解决请求限制(script标签)(热门搜索出现对应的词条)
  5. Common Knowledge_快速幂
  6. 【2017年第3期】从点状应用到大数据统一平台
  7. qt creator源码全方面分析(3-1)
  8. 性能测试知多少----性能测试分类之我见
  9. STM32使用HAL库驱动W5500
  10. 计数器java代码_计数器的java代码
  11. 经验模态分解(Empirical Mode Decomposition ,EMD)特征提取及其原理
  12. 计算机网络技术基础第5版答案,计算机网络基础 (第5版)课后习题及答案.doc
  13. matlab dsolve函数构造微分方程
  14. 英语单词记忆(词缀 / 前缀)
  15. 艾草减肚子方法非常有效 赛乐赛骗局是真的吗
  16. linux如何卸载oracle数据库实例,linux下删除oracle数据库实例
  17. linux百度云下载脚本,百度网盘Linux版下载
  18. U盘文件夹病毒,.exe病毒删除方法
  19. 接口参数加解密,代码无侵入这样做方便多了
  20. Linux进程间通信源码剖析,共享内存(shmget函数详解)

热门文章

  1. JavaScript使用正则表达式进行邮箱表单验证实例
  2. 如何得知mysql表结构发生变化了呢?
  3. sql常用语法命令及函数_SQL右连接命令:语法示例
  4. nodejs 实践项目_NodeJS:最佳生产实践
  5. mailto 附带附件_为什么附带项目如此重要
  6. 增删改查通用测试用例-禅道模板
  7. 羞羞的Python模块包
  8. 多线程服务器(python 版)
  9. 超过1.2W星的「机器学习路线图」,你的收藏夹可以更新了!
  10. 小程序问题记录:小程序云开发获取不到数据库的记录