KafkaApis负责处理所有请求,转发到相应到方法

1.KafkaApis初始化

/* start processing requests */
apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,fetchManager, brokerTopicStats, clusterId, time, tokenManager)

2.KafkaApis处理转发请求

客户端与broker交互、broker之间的所有交互请求全部都会经过此方法进行相应的转发处理。

/*** Top-level method that handles all requests and multiplexes to the right api*/def handle(request: RequestChannel.Request) {try {trace(s"Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};" +s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}")request.header.apiKey match {case ApiKeys.PRODUCE => handleProduceRequest(request)//数据生产请求case ApiKeys.FETCH => handleFetchRequest(request)//1)follower从leader抓取数据请求;2)消费者从broker消费数据请求;case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)case ApiKeys.METADATA => handleTopicMetadataRequest(request)case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)//处理topic的partition的leader和isr变更请求;case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)//处理停止副本请求case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request)//更新元数据请求case ApiKeys.CONTROLLED_SHUTDOWN => handleControlledShutdownRequest(request)//处理controller的shutdown请求;case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request)case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request)case ApiKeys.OFFSET_FOR_LEADER_EPOCH => handleOffsetForLeaderEpochRequest(request)case ApiKeys.ADD_PARTITIONS_TO_TXN => handleAddPartitionToTxnRequest(request)case ApiKeys.ADD_OFFSETS_TO_TXN => handleAddOffsetsToTxnRequest(request)case ApiKeys.END_TXN => handleEndTxnRequest(request)case ApiKeys.WRITE_TXN_MARKERS => handleWriteTxnMarkersRequest(request)case ApiKeys.TXN_OFFSET_COMMIT => handleTxnOffsetCommitRequest(request)case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request)case ApiKeys.CREATE_ACLS => handleCreateAcls(request)case ApiKeys.DELETE_ACLS => handleDeleteAcls(request)case ApiKeys.ALTER_CONFIGS => handleAlterConfigsRequest(request)case ApiKeys.DESCRIBE_CONFIGS => handleDescribeConfigsRequest(request)case ApiKeys.ALTER_REPLICA_LOG_DIRS => handleAlterReplicaLogDirsRequest(request)case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request)case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)case ApiKeys.CREATE_PARTITIONS => handleCreatePartitionsRequest(request)case ApiKeys.CREATE_DELEGATION_TOKEN => handleCreateTokenRequest(request)case ApiKeys.RENEW_DELEGATION_TOKEN => handleRenewTokenRequest(request)case ApiKeys.EXPIRE_DELEGATION_TOKEN => handleExpireTokenRequest(request)case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request)case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request)}} catch {case e: FatalExitError => throw ecase e: Throwable => handleError(request, e)} finally {request.apiLocalCompleteTimeNanos = time.nanoseconds}}

Kafka核心源码解析 - KafkaApis源码解析相关推荐

  1. Kafka 核心源码解读【一】--日志模块

    文章目录 1 日志段:保存消息文件的对象是怎么实现的? 1.1 Kafka 日志结构概览 1.2 日志段代码解析 1.3 日志段类声明 1.4 append 方法 1.5 read 方法 1.6 re ...

  2. SpringKafka原理解析及源码学习-Spring生态(一)

    Kafka 作为老牌消息中间件, 高吞吐是它的拿手好戏. Spring 的生态中也有官方提供的 spring-kafka.jar. 本文主要学习 Spring 官方提供的 spring-kafka.j ...

  3. Java定时任务(一) Timer及TimerTask的案例解析及源码分析

    Java定时任务(一)  Timer及TimerTask的案例解析及源码分析 一.概述: 定时任务这个概念在Java的学习以及项目的开发中并不陌生,应用场景也是多种多样.比如我们会注意到12306网站 ...

  4. php项目素材,PHP素材资源解析平台源码V8.0(thinkPHP框架内核)

    PHP素材资源解析平台源码V8.0 第三方平台下载千图网千库网等素材网站下载站 V8版本.十月一日最 新更新,全新的解析架构. 小白问题一:为什么我不能解析? 答:解析是需要开通目标站VIP的. 小白 ...

  5. Go实现的5G核心网开源项目free5gc源码分析系列 | Gopher Daily (2021.01.08) ʕ◔ϖ◔ʔ

    每日一谚:"Abstractions should be discovered, not created." Go技术新闻 Go实现的5G核心网开源项目free5gc源码分析系列 ...

  6. graylog+kafka+zookeeper(单机测试及源码),graylog设置URL报警方式(五)

    graylog+kafka+zookeeper(单机测试及源码),graylog设置URL报警方式(五) 问题背景 graylog+kafka+zookeeper(单机测试及源码),graylog组件 ...

  7. graylog+kafka+zookeeper(单机测试及源码),kafka+zookeeper组件部署(二)

    graylog+kafka+zookeeper(单机测试及源码),kafka+zookeeper组件部署(二) 问题背景 graylog+kafka+zookeeper(单机测试及源码),graylo ...

  8. graylog+kafka+zookeeper(单机测试及源码),微服务日志查询使用(七)

    graylog+kafka+zookeeper(单机测试及源码),微服务日志查询使用(七) 问题背景 graylog+kafka+zookeeper(单机测试及源码),graylog组件部署,查找问题 ...

  9. graylog+kafka+zookeeper(单机测试及源码),graylog收集kafka(脚本创建发布订阅方式)存储的消息(四)

    graylog+kafka+zookeeper(单机测试及源码),graylog收集kafka(脚本创建发布订阅方式)存储的消息(四) 问题背景 graylog+kafka+zookeeper(单机测 ...

最新文章

  1. 神经科学如何影响人工智能?看DeepMind在NeurIPS2020最新《神经科学人工智能》报告,126页ppt...
  2. Python Numpy包安装
  3. python json字符串类型的value换行方案
  4. EVERTEC是如何利用大型机帮客户省钱?
  5. 【信号处理】信号与系统 电子课本(郑君里)
  6. 做好固定资产管理,提升行政的工作绩效
  7. 疲劳检测(Fatigue Detection Algorithm)
  8. 五大免费采集器哪个好,火车头,海纳,ET,三人行,狂人采集
  9. pip‘s dependency resolver does not currently take into account 解决办法
  10. java提升的计划书
  11. 【英语语法】句子类型
  12. 前端知识解构脑图(一张)和工具
  13. 面试题精选:循环队列
  14. 算法:数据流中的中位数
  15. KVS(AWS Kinesis Video Stream)HLS IOS端播放声音停顿问题解决办法
  16. 凹凸世界服务器维护到几点,凹凸世界手游2021年7月21日版本更新停服维护公告_凹凸世界手游2021年7月21日更新了什么_玩游戏网...
  17. Ubuntu 14.04 安装 USB无线网卡驱动
  18. thirteen——sed
  19. 持续集成/持续部署(1)Git Gitlab
  20. Windows Server 2019 安装oracle11g

热门文章

  1. jQuery comet
  2. 这是我用Microsoft Word 2010 直接发布的测试用博客
  3. tomcat4 请求的处理——初步分析
  4. [网摘]动态SQL中set与select对变量赋值的影响
  5. [Axios] axios的请求合并以及参数配置
  6. 夯实基础,彻底掌握js的核心技术(一)
  7. 初识【jQuery】,入门必看!
  8. 机器学习中的数学(二)--梯度下降法
  9. OpenCV3学习(6.2)——霍夫(Hough)变换:霍夫线变换HoughLine,霍夫圆变换HoughCircles
  10. 处理veh调试器检测_越狱检测抖音逻辑???