private def checkAndTriggerPartitionRebalance(): Unit = {
  if (isActive()) {
    trace("checking need to trigger partition rebalance")
    // 获取(存活的broker,AR副本集) => (2,Map([message,0]-> List(2, 0), [hadoop,0] -> List(2, 1)))
   
var preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicAndPartition, Seq[Int]]] = null
    inLock(controllerContext.controllerLock) {
      preferredReplicasForTopicsByBrokers=
        controllerContext.partitionReplicaAssignment.filterNot(p => deleteTopicManager.isTopicQueuedUpForDeletion(p._1.topic)).groupBy {
          case(topicAndPartition, assignedReplicas) => assignedReplicas.head
        }
    }
    debug("preferred replicas by broker " + preferredReplicasForTopicsByBrokers)
    // 过滤每一个存活的broker,检查是否需要一个preferredreplica 选举被触发
   
preferredReplicasForTopicsByBrokers.foreach {
      case(leaderBroker, topicAndPartitionsForBroker) => {
        var imbalanceRatio: Double = 0
        var topicsNotInPreferredReplica: Map[TopicAndPartition, Seq[Int]] = null
        inLock(controllerContext.controllerLock) {
          // 我们知道,正常情况下,broker的AR副本集第一个副本(preferred replica )就是leader
          // 如果leader不是preferred replica,比如 Leader : 0 ISR[2,0]
          // 我们需要过滤出这种topicPartition,然后我们好进行在平衡
         
topicsNotInPreferredReplica=
            topicAndPartitionsForBroker.filter {
              case(topicPartition, replicas) => {
                controllerContext.partitionLeadershipInfo.contains(topicPartition) &&
                controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != leaderBroker
              }
            }
          debug("topics not in preferred replica " + topicsNotInPreferredReplica)
          // broker 的AR副本数量
         
val totalTopicPartitionsForBroker = topicAndPartitionsForBroker.size
          // 过滤出的leader不是AR副本集的preferred replica的数量
         
val totalTopicPartitionsNotLedByBroker = topicsNotInPreferredReplica.size
          // 计算(过滤出的leader不是AR副本集的preferred replica的数量)/(broker 的AR副本数量)不平衡比例
         
imbalanceRatio= totalTopicPartitionsNotLedByBroker.toDouble / totalTopicPartitionsForBroker
          trace("leader imbalance ratio for broker %d is %f".format(leaderBroker, imbalanceRatio))
        }
        // 如果比例大于我们配置的leader.imbalance.per.broker.percentage参数,比如50%,就触发这个topic partitions的再平衡操作
       
if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
          topicsNotInPreferredReplica.foreach {
            case(topicPartition, replicas) => {
              inLock(controllerContext.controllerLock) {
                // 首先确保broker存活,而且没有分区正在重新分配或者没有进行preferredreplica 选举,且没有分区将被删除
               
if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
                    controllerContext.partitionsBeingReassigned.isEmpty &&
                    controllerContext.partitionsUndergoingPreferredReplicaElection.isEmpty &&
                    !deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
                    controllerContext.allTopics.contains(topicPartition.topic)) {
                  // 然后真正触发Prederred Replica选举操作
                 
onPreferredReplicaElection(Set(topicPartition), true)
                }
              }
            }
          }
        }
      }
    }
  }
}

KafkaController 分区Rebalance平衡机制相关推荐

  1. 高效实用Kafka-Kafka集群维护(分区平衡机制、kafka分区日志迁移)

    导语   昨天的分享中,从微观的层面上了解了关于Kafka消息处理机制,但是当面对一个kafka集群的时候从宏观的角度上来说怎么保证kafka集群的高可用性呢?下面就来看看 文章目录 Kafka集群基 ...

  2. kafka消费组与重平衡机制详解

    1.消费者组 1.1 介绍 消费者组,即 Consumer Group,应该算是 Kafka 比较有亮点的设计了. 那么何谓 Consumer Group 呢? Consumer Group 是 Ka ...

  3. 一文详细解析kafka重平衡机制

    前言 1.队列重平衡概述 如果对RocketMQ或者对消息中间件有所了解的话,消费端在进行消息消费时至少需要先进行队列(分区)的负载,即一个消费组内的多个消费者如何对订阅的主题中的队列进行负载均衡,当 ...

  4. Kafka从入门到精通(七)分区和副本机制

    1. 分区和副本机制 1.1 生产者分区写入策略 生产者写入消息到topic,Kafka将依据不同的策略将数据分配到不同的分区中 轮询分区策略 随机分区策略 按key分区分配策略 自定义分区策略 1. ...

  5. kafka 消息分发机制、分区和副本机制

    一.消息分发机制 1.1 kafka 消息分发策略 消息是 kafka 中最基本的数据单元,在 kafka 中,一条消息由key.value两部分构成,在发送一条消息 时,我们可以指定这个key,那么 ...

  6. paddlepaddle 27 支持任意维度数据的梯度平衡机制GHM Loss的实现(支持ignore_index、class_weight,支持反向传播训练,支持多分类)

    GHM Loss是Focal loss的升级版,它对难样本进行了深入的分析,认为并非所有的难样本都值得关注.有一些难样本属于标签错误的,不应该进行加强.GHM Loss根据loss的梯度模长(1-so ...

  7. pytorch 12 支持任意维度数据的梯度平衡机制GHM Loss的实现(支持ignore_index、class_weight,支持反向传播训练,支持多分类)

    梯度平衡机制GHM(Gradient Harmonized Mechanism) Loss是Focal loss的升级版,源自论文Gradient Harmonized Single-stage De ...

  8. paddle 40 支持任意维度数据的梯度平衡机制GHM Loss的实现(支持ignore_index、class_weight,支持反向传播训练,支持多分类)

    梯度平衡机制GHM(Gradient Harmonized Mechanism) Loss是Focal loss的升级版,源自论文Gradient Harmonized Single-stage De ...

  9. Kafka的分区和副本机制

    文章目录 Leader和Follower 生产者分区写入策略 轮询分区策略 随机策略(不用) 按key分配策略 乱序问题 自定义分区策略 消费者组Rebalance机制 消费者分区分配策略 Range ...

最新文章

  1. 使用OpenCV进行对象检测
  2. Layout两列定宽中间自适应三列布局
  3. Centos7修改默认网卡名(改为eth0)以及网卡启动报错RTNETLINK answers: File exists处理...
  4. MySQL数据库:SQL语句
  5. SPOJ 4564 Chop Ahoy! Revisited!
  6. 推荐一个在线查看.cer文件的网站
  7. css grid布局_如何使用CSS Grid重新创建Medium的文章布局
  8. sass、gulp应用
  9. SQL Server系统数据库–模型数据库
  10. 数学问题(二):质数、质因子
  11. 如你以安全模式启动计算机,如何以安全模式启动计算机?
  12. JSP学习——Eclipse自定义JSP模板(修改默认JSP文件)
  13. 智能硬件成在线教育救命稻草?
  14. 接收机PPM与SBUS
  15. Unity 动画系统:Animator
  16. python表示整除的符号_c语言中整除符号怎么表示?_后端开发
  17. dir 616 虚拟服务器,DIR-616(DLink)无线路由器设置指南
  18. 计算机联锁的检修与维护,tyjlⅱ计算机联锁检修作业指导书.docx
  19. 推进重大改革塑造韧性 吉林银行不良率大降近六成
  20. 【threejs】透视相机,实现相机根据模型大小自适应,将模型放置在视角内

热门文章

  1. 纯java程序意味着什么_Java到底是不是一种纯面向对象语言?
  2. Vue组件学习之组件自定义事件
  3. java并发编程之线程的生命周期详解
  4. android必看java_Android开发工程师必看笔试题:Java基础选择题(一)
  5. pip安装requirement.txt
  6. java js合并_JS合并单元格
  7. fluent并行 linux_windows 系统下启动linux主机群的fluent并行操作.docx
  8. java pdf转base64_后台返回pdf的base64编码到前端,如果pdf中有中文,不会显示问题?...
  9. Spring Cloud 之 Ribbon,Spring RestTemplate 调用服务使用Hystrix熔断器
  10. Python使用wordnet工具计算词集与词条基本用法(三)