2019独角兽企业重金招聘Python工程师标准>>>

之前说过了,请求到达业务线程池后,会被处理,但是如何被处理呢?这就是接下来要说的。

-----------------------------------------------------------------------------------------------

业务线程属于 Kafka的API层,对请求的处理通过调用KafkaAPIs中的方法实现!

1 KafkaRequestHandler

首先我们得知道这个业务线程池是怎么创建的

回到KafkaServer.scala

        requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,config.numIoThreads)
class KafkaRequestHandlerPool(val brokerId: Int,val requestChannel: RequestChannel,val apis: KafkaApis,time: Time,numThreads: Int) extends Logging with KafkaMetricsGroup {/* a meter to track the average free capacity of the request handlers */private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], "val runnables = new Array[KafkaRequestHandler](numThreads)for(i <- 0 until numThreads) {//创建这么多个runnable,放到线程里执行runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis, time)Utils.daemonThread("kafka-request-handler-" + i, runnables(i)).start()}

取出请求执行的代码

def run() {while (true) {try {var req : RequestChannel.Request = nullwhile (req == null) {// We use a single meter for aggregate idle percentage for the thread pool.// Since meter is calculated as total_recorded_value / time_window and// time_window is independent of the number of threads, each recorded idle// time should be discounted by # threads.val startSelectTime = time.nanosecondsreq = requestChannel.receiveRequest(300)val endTime = time.nanosecondsif (req != null)req.requestDequeueTimeNanos = endTimeval idleTime = endTime - startSelectTimeaggregateIdleMeter.mark(idleTime / totalHandlerThreads)}if (req eq RequestChannel.AllDone) {debug("Kafka request handler %d on broker %d received shut down command".format(id, brokerId))latch.countDown()return}trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req))apis.handle(req)} catch {case e: FatalExitError =>latch.countDown()Exit.exit(e.statusCode)case e: Throwable => error("Exception when handling request", e)}}}

这样就不难理解了吧,

可见,API层使用kafkaRequestHandlerPool来管理所有的KafkaRequestHandler线程,它是1个简易版的线程池,其中创建了多个KafkaRequestHandler线程。

KafkaApis

是Kafka服务器处理请求的入口类,负责将KafkaRequestHandler.Request分发到不同的handle*()方法里执行,见图:

因为函数太多,这里就不展开,后面碰到的时候再详细展开!

转载于:https://my.oschina.net/qiangzigege/blog/1507362

Apache Kafka源码剖析:第5篇 业务API处理相关推荐

  1. Apache Kafka源码剖析:第1篇 网络引擎漫谈(类比法)

    2019独角兽企业重金招聘Python工程师标准>>> 从这一篇开始,我们来研究kafka的网络引擎的源码. 可能很多读者有疑问,说好的Kafka讲解,怎么变成Thrift了? 答案 ...

  2. Kafka源码剖析 —— 网络I/O篇 —— 浅析KafkaSelector

    为什么80%的码农都做不了架构师?>>>    ##NioSelector和KafkaSelector有什么区别? 先说结论,KafkaSelector(org.apache.kaf ...

  3. 4.2.10 Kafka源码剖析, 阅读环境搭建, broker启动流程, topic创建流程, Producer生产者流程, Consumer消费者流程,

    目录 4.1 Kafka源码剖析之源码阅读环境搭建 4.1.1 安装配置Gradle 4.1.2 Scala的安装和配置 4.1.3 Idea配置 4.1.4 源码操作 4.2 Kafka源码剖析之B ...

  4. 源码系列第1弹 | 带你快速攻略Kafka源码之旅入门篇

    大家过年好,我是 华仔, 又跟大家见面了. 从今天开始我将为大家奉上 Kafka 源码剖析系列文章,正式开启 「Kafka的源码之旅」,跟我一起来掌握 Kafka 源码核心架构设计思想吧. 今天这篇我 ...

  5. Mongoose源码剖析:外篇之web服务器

    引言 在深入Mongoose源码剖析之前,我们应该清楚web服务器是什么?它提供什么服务?怎样提供服务?使用什么协议?客户端如何唯一标识web服务器的资源?下面我们抛开Mongoose,来介绍一个we ...

  6. apache kafka源码分析-Producer分析---转载

    原文地址:http://www.aboutyun.com/thread-9938-1-1.html 问题导读 1.Kafka提供了Producer类作为java producer的api,此类有几种发 ...

  7. Apache Kafka源码分析 – Log Management

    LogManager LogManager会管理broker上所有的logs(在一个log目录下),一个topic的一个partition对应于一个log(一个log子目录) 首先loadLogs会加 ...

  8. Mongoose源码剖析:Introduction and Installation

    引言 要剖析Mongoose的源码,首先你得知道它的一些基本情况和特性.并去使用它.本文就是介绍Mongoose是个什么东西?及如何安装和使用?这里假设你知道什么web服务器软件.web服务器使用什么 ...

  9. 一个完整的python项目源码-一个Python开源项目-腾讯哈勃沙箱源码剖析(上)

    前言 2019年来了,2020年还会远吗? 请把下一年的年终奖发一下,谢谢... 回顾逝去的2018年,最大的改变是从一名学生变成了一位工作者,不敢说自己多么的职业化,但是正在努力往那个方向走. 以前 ...

最新文章

  1. L1,L2正则化分析
  2. 【论文解读】AAAI21最佳论文Informer:效果远超Transformer的长序列预测神器!
  3. sendData to ABAP backend via multiple form content type
  4. 联想拯救者y7000加内存条_关于2020款联想拯救者Y7000、R7000和Y7000P,r7000p选哪个好?看这里就对了...
  5. java中static、final、static final浅析
  6. python位运算符_Python位运算符
  7. 前端面试js-手写事件委托(一点小改进)
  8. 开放集合目标检测任务 Open-set Detection
  9. 分布式框架的基石之远程通信协议
  10. 【机器人学:运动规划】快速搜索随机树(RRT---Rapidly-exploring Random Trees)入门及在Matlab中演示
  11. linux系统下已分好区的磁盘如何删除,linux下如何删除磁盘分区
  12. 怎么给PDF插入一个文本框写注释?PDF添加注释文本框教程
  13. udacity-谷歌自动驾驶-课程笔记-localization
  14. CKA考试指南和攻略
  15. 简易特效制作ByUnityParticleSystem
  16. 流程图、数据关系绘图神器yEd
  17. hexo+yilia添加背景音乐
  18. 胡润研究院发布《2018胡润区块链富豪榜》
  19. python如何裁剪图像
  20. broker-list与bootstrap-server辨析

热门文章

  1. php todo和其他备注,Vim中列出TODO与FIXME等备注的方法
  2. python 旋转图像
  3. sob攻略超详细攻略_重庆旅游攻略~超详细攻略大全!必看篇!!
  4. 动手实现Kotlin协程同步切换线程,以及Kotlin协程是如何实现线程切换的
  5. matlab潮流程序,IEEE33节点matlab潮流程序.doc
  6. 小程序之 保存canvas生成商品图片附加小程序二维码 分享到朋友圈
  7. python实现冒泡排序算法的非递归版本_python排序算法速度比较:快速排序,归并排序,冒泡排序...
  8. 手机搭载云服务器_华为云发布“鲲鹏云手机”:原生安卓系统、鲲鹏920加持
  9. mysql创建行_如何在MySQL中创建行生成器?
  10. 百度翻译API的使用