本文将通过页面操作入口和程序代码进行reassign流程分析。reassign的大致流程为页面操作触发coordinator调用相应的receiver进行处理:reassign分为3个部分,preAssignment(这些replica set 只存在于之前,新的assignment中不存在)、newAssignment(这些replica set 只存在于新的,之前的assignment中没有此replica set)、commonAssignment(即存在于之前assignment也在新的assignment中)。其中commonAssignment不做任何处理;preAssignment将会通知各个replica set消费到当前receiver消费到的各partition的最晚offset,然后停止消费并将这些receiver上的segment由active变为immutable;newAssignment和正常流程一样进行assign分配消费,只是各partition消费的offset为preAssignment各partition消费的最晚offset。

一、页面操作

选择需要reassign的cube:Action——AssignMent——edit——save

二、源码分析

2.1 代码执行流程:

用户提交reassignment,会触发Coordinator.reAssignCube方法(调用Coordinator的reAssignCube方法时,coordinator的leader为本机则直接调用,非本机则通过http调用。),reAssignCube方法中主要分两种Case处理此次reassign:
Case1:之前此cube没有assignment,此种情况只会存在于之前整个集群没有可用的replica_set或手动将cube中的assignment删除了,则不存在处理之前的assignment的情况,直接走assign流程即调用coordinator.doAssignCube方法;
Case2:如果之前有assignment则需要涉及到处理之前assignment并切换到新的(大多数为这种情况),主要在coordinator.reassignCubeImpl方法中实现。
下面主要对Case2进行分析即对coordinator.reassignCubeImpl方法进行追踪分析。

2.2.1 Case1: 此cube之前没有assignment

StreamingV2Controller(master).reAssignStreamingCube——》StreamingV2Service.reAssignCube——》Coordinator.reAssignCube——》doAssignCube()——后续继续走正常的的分配消费流程 ,代码比较清晰易懂,本文不多做分析;

2.2.2 Case2: 此cube之前有assignment,此次为调整。

StreamingV2Controller(master).reAssignStreamingCube——》StreamingV2Service.reAssignCub——》reassignCubeImpl——》分两种情况,调整前后是否完全一样,如果完全一样则不做任何处理(newAssignments=preAssignments);如果调整前后有差别,则继续——》 doReassign,接下来分三大步:

2.2.2.1 第一步是处理preAssignments即调用syncAndStopConsumersInRs:

通知此cube之前所有的replicasets(此replica set且不再新的replica sets中)下的receiver停止消费并返回offset;得到每个partition当前已经消费的最晚offset;通知负责消费相应partition的receiver均消费到此partition的最晚offset然后停止消费。当然如果preAssignments中某个replicaset包含在新的里面,不做任何处理的.

  • 暂停此replicatSet下所有receiver消费此topic且返回每个receiver消费的offset:
    暂停本次要去掉的replicat_set下所有节点对此cube的消费:Coordinator.doReassign——》syncAndStopConsumersInRs——》pauseConsumersInReplicaSet(pause暂停消费)——》pauseConsumersForReceiver——》HttpReceiverAdminClient.pauseConsumers(http call receiver)——》AdminController.pauseConsumers——》StreamingServer.pauseConsumer(暂停消费并返回每个receiver当前消费的consumePosition,receiver每消费一条记录会通过StreamingSegmentManager.addEvent记录consumePosition)——》StreamingConsumerChannel.pause(true)暂停消费;
  • 根据上一步返回的offset并计算各replica
    set当前已经消费的最晚offset,然后通知所有相关的receiver均消费到此offset后停止:

    Coordinator.doReassign——》Coordinator.syncAndStopConsumersInRs——》KafkaPositionHandler.mergePositions(得到所有receiver消费此partition的最晚offset)——》Coordinator.resumeConsumersInReplicaSet——》resumeConsumersForReceiver——》AdminController.resumeConsumers——》AdminController.resumeConsumers(http 通知此replicaSet的所有receiver均消费到最后的offset)——》StreamingServer.resumeConsumer——》StreamingConsumerChannel.resumeToStopCondition——》setStopCondition——》run(会进行判断,消费到最晚的offset后自动停止),并返回每个的最新信息
2.2.2.2 第二步是处理所有的newAssignments:**

通知newAssignments的每个replicaset下的receiver进行assign并从指定的partition的offset(offset来源于第一步各个partition消费的最晚offset)开始消费start
consumer。当然如果newAssignments中的某个replicaset在preAssignments也不做任何处理,保持不变:

  • assign(仅assign不start),调用assignCubeToReplicaSet
    Coordinator.doReassign——》assignCubeToReplicaSet(assign所有replicaset下的receiver)——》assignCubeToReplicaSet——》assignToReceiver——》HttpReceiverAdminClient.assign——》AdminController.assign(http receiver)——》StreamingServer.assign(String cubeName, List partitions)(将此cube及partition放入内存StreamingServer.assignments中,等待后续start);

  • start consumer(通知receiver从指定的offset开始消费)即调用startConsumersInReplicaSet
    Coordinator.doReassign——》startConsumersInReplicaSet(触发消费start consumer)——》startConsumersForReceiver——》HttpReceiverAdminClient.startConsumers(http receiver)——》AdminController.startConsumers——》StreamingServer.startConsumer(String cubeName, ConsumerStartProtocol startProtocol) startProtocol来源于第一步中返回的各receiver关于此topic各partition的最新消费信息——》Coordinator.createNewConsumer ——》KafkaSource.createStreamingConnector(创建连接kafka的coonector设定topic、partition、offset集合等)——》createNewConsumer——》Coordinator.createNewConsumer——》new StreamingConsumerChannel——》StreamingConsumerChannel.start()——》kafkaConnect.open(为kafka的topic绑定本次消费的partition、以及offset)——》StreamingConsumerChannel.run(启动topic消费)

2.2.2.3 第三步是理removedAssignments,将preAssignments的相应receiver下的本地segment由active 变更为immutable:

将只在preAssignments中不在newAssignments中的replica_set,需要将这些replica_set下的receiver中的数据变更为Immutable,并上传build等后续处理。

  • removedAssignments相应receiver下的本地segment由active 变更为immutable:
    Coordinator.doReassign——》makeCubeImmutableInReplicaSet——》makeCubeImmutableForReceiver——》HttpReceiverAdminClient.makeCubeImmutable——》AdminController.immuteCube(http receiver)——》StreamingServer.makeCubeImmutable (1、停用此cube消费;2、从内存cubeConsumerMap中移除此cube)——》StreamingSegmentManager.makeAllSegmentsImmutable(状态变更,将此cube在此receiver上的所有active segments为immutable)——》convertImmutable(1、从内存activeSegments中移除,2、此segment的状态变为immutable【持久化此segment到磁盘、修改内存状态为immutable、修改磁盘此segment状态为immutable】;3、此segment放入内存immutableSegments;)

Kylin RT OLAP reassign流程即重新分配replica_set 流程相关推荐

  1. EC-PCA: 利润中心分配分摊流程-3KE5 / 4KE5

    文章目录 一.概述 二.分配-4KE1/4KE2/4KE3/4KE5 2.1 维护分配循环和段-4KE1/4KE2/4KE3 2.2 执行分配-4KE5 三.分摊-3KE1/3KE2/3KE3/3KE ...

  2. blf文件用什么软件打开_如何用皕杰流程创建一个blf演示流程文件?

    1. 打开BIOS Studio.exe报表流程设计器,新建项目,命名为演示流程项目: 2. 在项目下新建目录,命名为演示流程: 3. 在目录下新建工作流程bfl文件,流程名称leaveFlow,显示 ...

  3. android uefi 编译报错,【Android SDM660开机流程】- UEFI XBL 代码流程分析

    [Android SDM660开机流程]- UEFI XBL 代码流程分析 一.UEFI XBL 1.1 boot_images代码目录 1.2 UEFI代码运行流程 1.3 SEC (安全验证) 1 ...

  4. JAVAWEB开发之工作流详解(二)——Activiti核心API的使用(流程定义和流程实例的管理、流程变量、监听器...)以及与Spring的集成

    管理流程定义 设计流程定义文档 bpmn文件 设置方式可以直接使用插件图形化界面进行设置 为某任务节点指定任务执行者 保存后的BPMN文件可以使用XML编辑器打开 BPMN 2.0根节点是defini ...

  5. 学编程前博主是做测试的,当初在测试部作为一个小官还写了不少流程呢,今天突然翻到来跟大家分享一下测试流程(之测试内部流程)

    ··· 这个测试内部流程写于2010年6月,那个时候刚从大公司出来进了一个小公司.待惯大公司的人再去小公司真的不习惯,大公司分工分明,流程清晰:小公司就不一样了,什么都不明确,逮着什么干什么,逮着着谁 ...

  6. lte接口流程图_一种LTE系统内部X2接口切换流程和Uu接口信令流程的关联方法

    一种LTE系统内部X2接口切换流程和Uu接口信令流程的关联方法 [技术领域] [0001 ] 本发明涉及通信技术领域,尤其涉及一种LTE系统内部X2接口切换流程和Uu接口信令流程的关联方法. [背景技 ...

  7. 话里话外:浅淡对流程管理的认识及流程管理对企业价值

    1.经过两个月的项目,对流程的认识发生了哪些变化? 经过两个月的项目,对流程的认识发生深刻变化,从开始的对现状描述,对现状的满足,到在绘制流程图时发现的一系列问题,分析总结出企业的现状是:     ( ...

  8. 流程设计建模方法:流程的需求梳理之流程级别梳理

    一般而言,对于一个业务系统的开发过程,可以划分成:需求.设计.开发.测试.集成.部署等阶段.在需求阶段形成<需求规格说明书>之后,设计阶段需要对需求进行设计建模.业务流程需求是业务人员从业 ...

  9. 流程设计建模方法:流程的需求梳理之活动属性梳理

    业务需求建模是否合理,直接影响到流程的技术实现,在流程设计过程中首先需要对业务需求从技术实现的角度重新进行梳理.下图是业务流程需求梳理过程的示意图: 本文介绍活动属性梳理方法 活动属性梳理包括:人工活 ...

最新文章

  1. 响应式编程笔记(二):代码编写
  2. mysql 的connect 设置 无法点next_技术分享 | MySQL 使用 MariaDB 审计插件
  3. cent 8.0 安装tomcat 9.0_Linux服务器:安装tomcat并部署war应用
  4. 洛谷2015 二叉苹果树 树形DP
  5. boost::type_erasure::tuple相关的测试程序
  6. 验证二叉搜索数—leetcode98
  7. npm打包前端项目太慢问题分析以及暂时解决方案
  8. 虚拟机Ubuntu开机后提示:无法应用原保存的显示器配置(屏幕显示问题)
  9. IDE 插件新版本发布,总有一个功能帮到你——开发部署提速 8 倍
  10. egg直接取req_Egg服务器基础功能
  11. 月入1万,在北上广深能过什么样的生活?
  12. 永擎服务器主板稳定性,主板看不停 Computex2015华擎展台一览
  13. JAVAWeb项目 微型商城项目-------(七)后台添加用户管理和商品类型管理操作
  14. Java项目导出为jar包+导出第三方jar包+使用命令行调用+传参
  15. [转]Unity开发之-Unity入门简介(近万字攻略)
  16. Seat分布式事务学习
  17. 计算机无法识别佳能5d2,佳能相机连接后电脑显示无法识别
  18. 远程数据库对象 Mmzrmo4Delphidelphi盒子
  19. 面试题:看数字找规律
  20. [免费专栏] 车联网基础理论之车联网安全车端知识科普

热门文章

  1. [转] ReactNative Animated动画详解
  2. 【我的世界】自定义局域网服务器-LanServerPropertie-1.17.x-自定义端口+关正版验证
  3. web前端入门到实战:CSS mix-blend-mode滤色screen混合模式
  4. 浅尝springboot
  5. Centos下搭建个人网站
  6. jzoj. 3518. 【NOIP2013模拟11.6A组】进化序列(evolve)
  7. A股明日风口:央行工作会议要求推进法定数字货币研发
  8. 煦涵说Webpack-IE低版本兼容指南
  9. 路由器NAT 类型检测实现
  10. 未明学院学员报告:“民以食为天”?看了这份天猫超市数据分析报告你就知道了!