Kafka核心源码解析 - KafkaApis源码解析
KafkaApis负责处理所有请求,转发到相应到方法
1.KafkaApis初始化
/* start processing requests */
apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,fetchManager, brokerTopicStats, clusterId, time, tokenManager)
2.KafkaApis处理转发请求
客户端与broker交互、broker之间的所有交互请求全部都会经过此方法进行相应的转发处理。
/*** Top-level method that handles all requests and multiplexes to the right api*/def handle(request: RequestChannel.Request) {try {trace(s"Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};" +s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}")request.header.apiKey match {case ApiKeys.PRODUCE => handleProduceRequest(request)//数据生产请求case ApiKeys.FETCH => handleFetchRequest(request)//1)follower从leader抓取数据请求;2)消费者从broker消费数据请求;case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)case ApiKeys.METADATA => handleTopicMetadataRequest(request)case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)//处理topic的partition的leader和isr变更请求;case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)//处理停止副本请求case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request)//更新元数据请求case ApiKeys.CONTROLLED_SHUTDOWN => handleControlledShutdownRequest(request)//处理controller的shutdown请求;case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request)case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request)case ApiKeys.OFFSET_FOR_LEADER_EPOCH => handleOffsetForLeaderEpochRequest(request)case ApiKeys.ADD_PARTITIONS_TO_TXN => handleAddPartitionToTxnRequest(request)case ApiKeys.ADD_OFFSETS_TO_TXN => handleAddOffsetsToTxnRequest(request)case ApiKeys.END_TXN => handleEndTxnRequest(request)case ApiKeys.WRITE_TXN_MARKERS => handleWriteTxnMarkersRequest(request)case ApiKeys.TXN_OFFSET_COMMIT => handleTxnOffsetCommitRequest(request)case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request)case ApiKeys.CREATE_ACLS => handleCreateAcls(request)case ApiKeys.DELETE_ACLS => handleDeleteAcls(request)case ApiKeys.ALTER_CONFIGS => handleAlterConfigsRequest(request)case ApiKeys.DESCRIBE_CONFIGS => handleDescribeConfigsRequest(request)case ApiKeys.ALTER_REPLICA_LOG_DIRS => handleAlterReplicaLogDirsRequest(request)case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request)case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)case ApiKeys.CREATE_PARTITIONS => handleCreatePartitionsRequest(request)case ApiKeys.CREATE_DELEGATION_TOKEN => handleCreateTokenRequest(request)case ApiKeys.RENEW_DELEGATION_TOKEN => handleRenewTokenRequest(request)case ApiKeys.EXPIRE_DELEGATION_TOKEN => handleExpireTokenRequest(request)case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request)case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request)}} catch {case e: FatalExitError => throw ecase e: Throwable => handleError(request, e)} finally {request.apiLocalCompleteTimeNanos = time.nanoseconds}}
Kafka核心源码解析 - KafkaApis源码解析相关推荐
- Kafka 核心源码解读【一】--日志模块
文章目录 1 日志段:保存消息文件的对象是怎么实现的? 1.1 Kafka 日志结构概览 1.2 日志段代码解析 1.3 日志段类声明 1.4 append 方法 1.5 read 方法 1.6 re ...
- SpringKafka原理解析及源码学习-Spring生态(一)
Kafka 作为老牌消息中间件, 高吞吐是它的拿手好戏. Spring 的生态中也有官方提供的 spring-kafka.jar. 本文主要学习 Spring 官方提供的 spring-kafka.j ...
- Java定时任务(一) Timer及TimerTask的案例解析及源码分析
Java定时任务(一) Timer及TimerTask的案例解析及源码分析 一.概述: 定时任务这个概念在Java的学习以及项目的开发中并不陌生,应用场景也是多种多样.比如我们会注意到12306网站 ...
- php项目素材,PHP素材资源解析平台源码V8.0(thinkPHP框架内核)
PHP素材资源解析平台源码V8.0 第三方平台下载千图网千库网等素材网站下载站 V8版本.十月一日最 新更新,全新的解析架构. 小白问题一:为什么我不能解析? 答:解析是需要开通目标站VIP的. 小白 ...
- Go实现的5G核心网开源项目free5gc源码分析系列 | Gopher Daily (2021.01.08) ʕ◔ϖ◔ʔ
每日一谚:"Abstractions should be discovered, not created." Go技术新闻 Go实现的5G核心网开源项目free5gc源码分析系列 ...
- graylog+kafka+zookeeper(单机测试及源码),graylog设置URL报警方式(五)
graylog+kafka+zookeeper(单机测试及源码),graylog设置URL报警方式(五) 问题背景 graylog+kafka+zookeeper(单机测试及源码),graylog组件 ...
- graylog+kafka+zookeeper(单机测试及源码),kafka+zookeeper组件部署(二)
graylog+kafka+zookeeper(单机测试及源码),kafka+zookeeper组件部署(二) 问题背景 graylog+kafka+zookeeper(单机测试及源码),graylo ...
- graylog+kafka+zookeeper(单机测试及源码),微服务日志查询使用(七)
graylog+kafka+zookeeper(单机测试及源码),微服务日志查询使用(七) 问题背景 graylog+kafka+zookeeper(单机测试及源码),graylog组件部署,查找问题 ...
- graylog+kafka+zookeeper(单机测试及源码),graylog收集kafka(脚本创建发布订阅方式)存储的消息(四)
graylog+kafka+zookeeper(单机测试及源码),graylog收集kafka(脚本创建发布订阅方式)存储的消息(四) 问题背景 graylog+kafka+zookeeper(单机测 ...
最新文章
- 神经科学如何影响人工智能?看DeepMind在NeurIPS2020最新《神经科学人工智能》报告,126页ppt...
- Python Numpy包安装
- python json字符串类型的value换行方案
- EVERTEC是如何利用大型机帮客户省钱?
- 【信号处理】信号与系统 电子课本(郑君里)
- 做好固定资产管理,提升行政的工作绩效
- 疲劳检测(Fatigue Detection Algorithm)
- 五大免费采集器哪个好,火车头,海纳,ET,三人行,狂人采集
- pip‘s dependency resolver does not currently take into account 解决办法
- java提升的计划书
- 【英语语法】句子类型
- 前端知识解构脑图(一张)和工具
- 面试题精选:循环队列
- 算法:数据流中的中位数
- KVS(AWS Kinesis Video Stream)HLS IOS端播放声音停顿问题解决办法
- 凹凸世界服务器维护到几点,凹凸世界手游2021年7月21日版本更新停服维护公告_凹凸世界手游2021年7月21日更新了什么_玩游戏网...
- Ubuntu 14.04 安装 USB无线网卡驱动
- thirteen——sed
- 持续集成/持续部署(1)Git Gitlab
- Windows Server 2019 安装oracle11g
热门文章
- jQuery comet
- 这是我用Microsoft Word 2010 直接发布的测试用博客
- tomcat4 请求的处理——初步分析
- [网摘]动态SQL中set与select对变量赋值的影响
- [Axios] axios的请求合并以及参数配置
- 夯实基础,彻底掌握js的核心技术(一)
- 初识【jQuery】,入门必看!
- 机器学习中的数学(二)--梯度下降法
- OpenCV3学习(6.2)——霍夫(Hough)变换:霍夫线变换HoughLine,霍夫圆变换HoughCircles
- 处理veh调试器检测_越狱检测抖音逻辑???