Apache Kafka源码剖析:第5篇 业务API处理
2019独角兽企业重金招聘Python工程师标准>>>
之前说过了,请求到达业务线程池后,会被处理,但是如何被处理呢?这就是接下来要说的。
-----------------------------------------------------------------------------------------------
业务线程属于 Kafka的API层,对请求的处理通过调用KafkaAPIs中的方法实现!
1 KafkaRequestHandler
首先我们得知道这个业务线程池是怎么创建的
回到KafkaServer.scala
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,config.numIoThreads)
class KafkaRequestHandlerPool(val brokerId: Int,val requestChannel: RequestChannel,val apis: KafkaApis,time: Time,numThreads: Int) extends Logging with KafkaMetricsGroup {/* a meter to track the average free capacity of the request handlers */private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], "val runnables = new Array[KafkaRequestHandler](numThreads)for(i <- 0 until numThreads) {//创建这么多个runnable,放到线程里执行runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis, time)Utils.daemonThread("kafka-request-handler-" + i, runnables(i)).start()}
取出请求执行的代码
def run() {while (true) {try {var req : RequestChannel.Request = nullwhile (req == null) {// We use a single meter for aggregate idle percentage for the thread pool.// Since meter is calculated as total_recorded_value / time_window and// time_window is independent of the number of threads, each recorded idle// time should be discounted by # threads.val startSelectTime = time.nanosecondsreq = requestChannel.receiveRequest(300)val endTime = time.nanosecondsif (req != null)req.requestDequeueTimeNanos = endTimeval idleTime = endTime - startSelectTimeaggregateIdleMeter.mark(idleTime / totalHandlerThreads)}if (req eq RequestChannel.AllDone) {debug("Kafka request handler %d on broker %d received shut down command".format(id, brokerId))latch.countDown()return}trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req))apis.handle(req)} catch {case e: FatalExitError =>latch.countDown()Exit.exit(e.statusCode)case e: Throwable => error("Exception when handling request", e)}}}
这样就不难理解了吧,
可见,API层使用kafkaRequestHandlerPool来管理所有的KafkaRequestHandler线程,它是1个简易版的线程池,其中创建了多个KafkaRequestHandler线程。
KafkaApis
是Kafka服务器处理请求的入口类,负责将KafkaRequestHandler.Request分发到不同的handle*()方法里执行,见图:
因为函数太多,这里就不展开,后面碰到的时候再详细展开!
转载于:https://my.oschina.net/qiangzigege/blog/1507362
Apache Kafka源码剖析:第5篇 业务API处理相关推荐
- Apache Kafka源码剖析:第1篇 网络引擎漫谈(类比法)
2019独角兽企业重金招聘Python工程师标准>>> 从这一篇开始,我们来研究kafka的网络引擎的源码. 可能很多读者有疑问,说好的Kafka讲解,怎么变成Thrift了? 答案 ...
- Kafka源码剖析 —— 网络I/O篇 —— 浅析KafkaSelector
为什么80%的码农都做不了架构师?>>> ##NioSelector和KafkaSelector有什么区别? 先说结论,KafkaSelector(org.apache.kaf ...
- 4.2.10 Kafka源码剖析, 阅读环境搭建, broker启动流程, topic创建流程, Producer生产者流程, Consumer消费者流程,
目录 4.1 Kafka源码剖析之源码阅读环境搭建 4.1.1 安装配置Gradle 4.1.2 Scala的安装和配置 4.1.3 Idea配置 4.1.4 源码操作 4.2 Kafka源码剖析之B ...
- 源码系列第1弹 | 带你快速攻略Kafka源码之旅入门篇
大家过年好,我是 华仔, 又跟大家见面了. 从今天开始我将为大家奉上 Kafka 源码剖析系列文章,正式开启 「Kafka的源码之旅」,跟我一起来掌握 Kafka 源码核心架构设计思想吧. 今天这篇我 ...
- Mongoose源码剖析:外篇之web服务器
引言 在深入Mongoose源码剖析之前,我们应该清楚web服务器是什么?它提供什么服务?怎样提供服务?使用什么协议?客户端如何唯一标识web服务器的资源?下面我们抛开Mongoose,来介绍一个we ...
- apache kafka源码分析-Producer分析---转载
原文地址:http://www.aboutyun.com/thread-9938-1-1.html 问题导读 1.Kafka提供了Producer类作为java producer的api,此类有几种发 ...
- Apache Kafka源码分析 – Log Management
LogManager LogManager会管理broker上所有的logs(在一个log目录下),一个topic的一个partition对应于一个log(一个log子目录) 首先loadLogs会加 ...
- Mongoose源码剖析:Introduction and Installation
引言 要剖析Mongoose的源码,首先你得知道它的一些基本情况和特性.并去使用它.本文就是介绍Mongoose是个什么东西?及如何安装和使用?这里假设你知道什么web服务器软件.web服务器使用什么 ...
- 一个完整的python项目源码-一个Python开源项目-腾讯哈勃沙箱源码剖析(上)
前言 2019年来了,2020年还会远吗? 请把下一年的年终奖发一下,谢谢... 回顾逝去的2018年,最大的改变是从一名学生变成了一位工作者,不敢说自己多么的职业化,但是正在努力往那个方向走. 以前 ...
最新文章
- L1,L2正则化分析
- 【论文解读】AAAI21最佳论文Informer:效果远超Transformer的长序列预测神器!
- sendData to ABAP backend via multiple form content type
- 联想拯救者y7000加内存条_关于2020款联想拯救者Y7000、R7000和Y7000P,r7000p选哪个好?看这里就对了...
- java中static、final、static final浅析
- python位运算符_Python位运算符
- 前端面试js-手写事件委托(一点小改进)
- 开放集合目标检测任务 Open-set Detection
- 分布式框架的基石之远程通信协议
- 【机器人学:运动规划】快速搜索随机树(RRT---Rapidly-exploring Random Trees)入门及在Matlab中演示
- linux系统下已分好区的磁盘如何删除,linux下如何删除磁盘分区
- 怎么给PDF插入一个文本框写注释?PDF添加注释文本框教程
- udacity-谷歌自动驾驶-课程笔记-localization
- CKA考试指南和攻略
- 简易特效制作ByUnityParticleSystem
- 流程图、数据关系绘图神器yEd
- hexo+yilia添加背景音乐
- 胡润研究院发布《2018胡润区块链富豪榜》
- python如何裁剪图像
- broker-list与bootstrap-server辨析
热门文章
- php todo和其他备注,Vim中列出TODO与FIXME等备注的方法
- python 旋转图像
- sob攻略超详细攻略_重庆旅游攻略~超详细攻略大全!必看篇!!
- 动手实现Kotlin协程同步切换线程,以及Kotlin协程是如何实现线程切换的
- matlab潮流程序,IEEE33节点matlab潮流程序.doc
- 小程序之 保存canvas生成商品图片附加小程序二维码 分享到朋友圈
- python实现冒泡排序算法的非递归版本_python排序算法速度比较:快速排序,归并排序,冒泡排序...
- 手机搭载云服务器_华为云发布“鲲鹏云手机”:原生安卓系统、鲲鹏920加持
- mysql创建行_如何在MySQL中创建行生成器?
- 百度翻译API的使用