前言

Kafka 我个人感觉是性能优化的典范。而且使用Scala开发,代码写的也很漂亮的。重点我觉得有四个:
  • NIO
  • Zero Copy
  • 磁盘顺序读写
  • Queue数据结构的极致使用
Zero-Copy 实际的原理,大家还是去Google下。这篇文章重点会分析这项技术是怎么被嵌入到Kafa里的。包含两部分:
  • Kafka在什么场景下用了这个技术
  • Zero-Copy 是如何被调用,并且发挥作用的。

Kafka在什么场景下使用该技术

答案是:
消息消费的时候 
包括外部Consumer以及Follower 从partiton Leader同步数据,都是如此。简单描述就是:
Consumer从Broker获取文件数据的时候,直接通过下面的方法进行channel到channel的数据传输。
java.nio.FileChannel.transferTo(
long position,
long count,
WritableByteChannel target)`

也就是说你的数据源是一个Channel,数据接收端也是一个Channel(SocketChannel),则通过该方式进行数据传输,是直接在内核态进行的,避免拷贝数据导致的内核态和用户态的多次切换。

Kafka 如何使用Zero-Copy流程分析

估计看完这段内容,你对整个Kafka的数据处理流程也差不多了解了个大概。为了避免过于繁杂,以至于将整个Kafka的体系都拖进来,我们起始点从KafkaApis相关的类开始。

数据的生成

对应的类名称为:
kaka.server.KafkaApis

该类是负责真正的Kafka业务逻辑处理的。在此之前的,譬如 SocketServer等类似Tomcat服务器一样,侧重于交互,属于框架层次的东西。KafkaApis 则类似于部署在Tomcat里的应用。
def handle(request: RequestChannel.Request) {ApiKeys.forId(request.requestId) match {case ApiKeys.PRODUCE => handleProducerRequest(request)case ApiKeys.FETCH => handleFetchRequest(request).....

handle 方法是所有处理的入口,然后根据请求的不同,有不同的处理逻辑。这里我们关注ApiKeys.FETCH这块,也就是有消费者要获取数据的逻辑。进入 handleFetchRequest方法,你会看到最后一行代码如下:
replicaManager.fetchMessages(  fetchRequest.maxWait.toLong, fetchRequest.replicaId, fetchRequest.minBytes,  authorizedRequestInfo,  sendResponseCallback)

ReplicaManager 包含所有主题的所有partition消息。大部分针对Partition的操作都是通过该类来完成的。
replicaManager.fetchMessages 这个方法非常的长。我们只关注一句代码:
val logReadResults = readFromLocalLog(fetchOnlyFromLeader, fetchOnlyCommitted, fetchInfo)

该方法获取本地日志信息数据。内部会调用kafka.cluster.Log对象的read方法:
log.read(offset, fetchSize, maxOffsetOpt)

Log 对象是啥呢?其实就是对应的一个Topic的Partition. 一个Partition是由很多端(Segment)组成的,这和Lucene非常相似。一个Segment就是一个文件。实际的数据自然是从这里读到的。代码如下:
val fetchInfo = entry.getValue.read(startOffset, maxOffset, maxLength, maxPosition)

这里的fetchInfo(FetchDataInfo)对象包含两个字段:
offsetMetadata
FileMessageSet 

FileMessageSet 其实就是用户在这个Partition这一次消费能够拿到的数据集合。当然,真实的数据还躺在byteBuffer里,并没有记在到内存中。FileMessageSet 里面包含了一个很重要的方法:
def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Int): Int = {......val bytesTransferred = (destChannel match {case tl: TransportLayer => tl.transferFrom(channel, position, count)case dc => channel.transferTo(position, count, dc)}).toIntbytesTransferred}

这里我们看到了久违的transferFrom方法。那么这个方法什么时候被调用呢?我们先搁置下,因为那个是另外一个流程。我们继续分析上面的代码。也就是接着从这段代码开始分析:
val logReadResults = readFromLocalLog(fetchOnlyFromLeader, fetchOnlyCommitted, fetchInfo)

获取到这个信息后,会执行如下操作:
val fetchPartitionData = logReadResults.mapValues(result =>  FetchResponsePartitionData(result.errorCode, result.hw, result.info.messageSet))
responseCallback(fetchPartitionData)

logReadResults 的信息被包装成FetchResponsePartitionData, FetchResponsePartitionData 包喊了我们的FileMessageSet 对象。还记得么,这个对象包含了我们要跟踪的tranferTo方法。然后FetchResponsePartitionData 会给responseCallback作为参数进行回调。
responseCallback 的函数签名如下(我去掉了一些我们不关心的信息):
def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData]) {val mergedResponseStatus = responsePartitionData ++ unauthorizedResponseStatusdef fetchResponseCallback(delayTimeMs: Int) {val response = FetchResponse(fetchRequest.correlationId, mergedResponseStatus, fetchRequest.versionId, delayTimeMs)requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, response)))}}

我们重点关注这个回调方法里的fetchResponseCallback。 我们会发现这里 FetchResponsePartitionData 会被封装成一个FetchResponseSend ,然后由requestChannel发送出去。
因为Kafka完全应用是NIO的异步机制,所以到这里,我们无法再跟进去了,需要从另外一部分开始分析。

数据的发送

前面只是涉及到数据的获取。读取日志,并且获得对应MessageSet对象。MessageSet 是一段数据的集合,但是该数据没有真实的被加载。这里会涉及到Kafka 如何将数据发送回Consumer端。
在SocketServer,也就是负责和所有的消费者打交道,建立连接的中枢里,会不断的进行poll操作
override def run() {startupComplete()while(isRunning) {try {// setup any new connections that have been queued upconfigureNewConnections()// register any new responses for writingprocessNewResponses()

首先会注册新的连接,如果有的话。接着就是处理新的响应了。还记得刚刚上面我们通过requestChannel把FetchResponseSend发出来吧。
private def processNewResponses() {var curr = requestChannel.receiveResponse(id)while(curr != null) {try {curr.responseAction match {         case RequestChannel.SendAction =>selector.send(curr.responseSend)inflightResponses += (curr.request.connectionId -> curr)}} finally {curr = requestChannel.receiveResponse(id)}}}

这里类似的,processNewResponses方法会先通过send方法把FetchResponseSend注册到selector上。 这个操作其实做的事情如下:
//SocketServer.scala
public void send(Send send) {KafkaChannel channel = channelOrFail(send.destination());channel.setSend(send);}//KafkaChannel.scalapublic void setSend(Send send) {this.send = send;          this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);     }

为了方便看代码,我对代码做了改写。我们看到,其实send就是做了一个WRITE时间注册。这个是和NIO机制相关的。如果大家看的有障碍,不妨先学习下相关的机制。
回到 SocketServer 的run方法里,也就是上面已经贴过的代码:
  override def run() {startupComplete()while(isRunning) {try {// setup any new connections that have been queued upconfigureNewConnections()// register any new responses for writingprocessNewResponses()try {selector.poll(300)} catch {case...}

SocketServer 会poll队列,一旦对应的KafkaChannel 写操作ready了,就会调用KafkaChannel的write方法:
//KafkaChannel.scala
public Send write() throws IOException {if (send != null && send(send)) }
//
//KafkaChannel.scala
private boolean send(Send send) throws IOException {send.writeTo(transportLayer);if (send.completed())transportLayer.removeInterestOps(SelectionKey.OP_WRITE);return send.completed();}

依然的,为了减少代码,我做了些调整,其中write会调用 send方法,对应的Send对象其实就是上面我们注册的FetchResponseSend 对象。
这段代码里真实发送数据的代码是send.writeTo(transportLayer);,
对应的writeTo方法为:
private val sends = new MultiSend(dest, JavaConversions.seqAsJavaList(fetchResponse.dataGroupedByTopic.toList.map {case(topic, data) => new TopicDataSend(dest, TopicData(topic,data.map{case(topicAndPartition, message) => (topicAndPartition.partition, message)}))}))
override def writeTo(channel: GatheringByteChannel): Long = {.....    written += sends.writeTo(channel)....}

这里我依然做了代码简化,只让我们关注核心的。 这里最后是调用了sends的writeTo方法,而sends 其实是个MultiSend。这个MultiSend 里有两个东西:
  • topicAndPartition.partition: 分区
  • message:FetchResponsePartitionData
还记得这个FetchResponsePartitionData  么?我们的MessageSet 就被放在了FetchResponsePartitionData这个对象里。
TopicDataSend 也包含了sends,该sends 包含了 PartitionDataSend,而 PartitionDataSend则包含了FetchResponsePartitionData。
最后进行writeTo的时候,其实是调用了
//partitionData 就是 FetchResponsePartitionData
//messages 其实就是FileMessageSet
val bytesSent = partitionData.messages.writeTo(channel, messagesSentSize, messageSize - messagesSentSize)

如果你还记得的话,FileMessageSet 也有个writeTo方法,就是我们之前已经提到过的那段代码:
def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Int): Int = {......val bytesTransferred = (destChannel match {case tl: TransportLayer => tl.transferFrom(channel, position, count)case dc => channel.transferTo(position, count, dc)}).toIntbytesTransferred}

终于走到最底层了,最后其实是通过tl.transferFrom(channel, position, count) 来完成最后的数据发送的。这里你可能比较好奇,不应该是调用transferTo 方法么? transferFrom其实是Kafka自己封装的一个方法,最终里面调用的也是transerTo:
  @Overridepublic long transferFrom(FileChannel fileChannel, long position, long count) throws IOException {return fileChannel.transferTo(position, count, socketChannel);}

总结

Kafka的整个调用栈还是非常绕的。尤其是引入了NIO的事件机制,有点类似Shuffle,把流程调用给切断了,无法简单通过代码引用来进行跟踪。Kafka还有一个非常优秀的机制就是DelayQueue机制,我们在分析的过程中,为了方便,把这块完全给抹掉了。

Kafka Zero-Copy 使用分析相关推荐

  1. 跟我学Kafka源码Producer分析

    2019独角兽企业重金招聘Python工程师标准>>> 跟我学Kafka源码Producer分析 博客分类: MQ 本章主要讲解分析Kafka的Producer的业务逻辑,分发逻辑和 ...

  2. 【kafka】Kafka 之 Group 状态变化分析及 Rebalance 过程

    文章目录 1.概述 2. Group 状态机 3.offset 那些事 4.Topic __consumer_offsets 5.GroupCoordinator 6.状态转移图 7.Consumer ...

  3. elk + kafka 简单搭建日志分析系统

    elk + kafka 简单搭建日志分析系统 文介绍使用ELK(elasticsearch.logstash.kibana) + kafka来搭建一个日志系统. 想象一下淘宝,它可以对不同的用户实现推 ...

  4. ELK+Kafka集群日志分析系统

    因为是自己本地写好的word文档复制进来的.格式有些出入还望体谅.如有错误请回复.谢谢! 一. 系统介绍 2 二. 版本说明 3 三. 服务部署 3 1) JDK部署 3 2) Elasticsear ...

  5. kafka原理和性能分析测试

    1.Kafka写数据流程: producer先从zookeeper的broker-list的节点找到partition(分区)的leader: producer将消息发送给该leader的partit ...

  6. Kafka消息积压案例分析

    案例 一个微服务同一个分组消费同一个topic的kafka消息,不通业务通过key值区分,由于其中一个业务消息量大,偶尔会出现消费滞后的情况,导致当前微服务消费组出现大量消息积压情况,影响业务. 简单 ...

  7. kafka性能测试、性能分析与性能调优

    前言:最近在做kafka.mq.redis.fink.kudu等在中间件性能压测,压测kafka的时候参考了这篇文章,大家可以借鉴下! 一.测试环境 测试使用到三台机器,机器配置如下: 共同配置: I ...

  8. presto + kafka + logstash 实时监控分析nginx日志

    文章目录 前言 一.方案选取 二.各项配置 1.logstash配置 2.presto 增加kafka connector 3. 分析监控 总结 前言 目前线上环境nginx日志一天10亿左右,日志已 ...

  9. std::copy性能分析与memmove机器级实现

    复制数据的快速方法std::copy C++复制数据各种方法大家都会,很多时候我们都会用到std::copy这个STL函数,这个效率确实很不错,比我们一个一个元素复制或者用迭代器复制都来的要快很多. ...

  10. Python调用kafka构建完整实例分析与应用!

    近期遇到一个需求就是我们需要把当前比较耗费资源的接口开发成异步通讯的机制,简单来说就是有一个消息队列来不停地进行消息的集中分发与任务处理,这里应用端给出的方案是使用kafka来做,但是这个我在之前没有 ...

最新文章

  1. python学习笔记 day44 数据库三范式
  2. JS 保持数组长度为3位并且值不重复
  3. SSH开发环境整合搭建
  4. nginx 内核优化参数
  5. OpenCV图像金字塔:高斯金字塔、拉普拉斯金字塔总结
  6. abap中的弹出窗口函数
  7. 事务超时时间无效_阿里分布式事务组件 fescar/seata 对 XA 2PC 的改进及其设计思想...
  8. linux下的二进制文件的编辑和查看
  9. java获取表主外键_通过 jdbc 分析数据库中的表结构和主键外键
  10. ios 旋转屏幕试图切换_iOS增强现实应用(AR)设计指南(上)
  11. Hemberg-lab单细胞转录组数据分析(六)
  12. mysql如何保证高可用_mysql怎么保证高可用
  13. 小米卢伟冰回应“低价误国”言论:华为是中国企业“低价”战略的榜样
  14. Spring boot 2.4开启静态资源缓存
  15. JavaScript—获取参数(23)
  16. 拓端tecdat|matlab对MCMC贝叶斯方法用于加筋复合板的冲击载荷识别
  17. Redis - 听说 Dragonfly 宣称比我快 25 倍
  18. Java编写一个公司员工类
  19. 安装ubuntu20.4桌面系统
  20. 基于肌肉骨骼模型的预测仿真

热门文章

  1. MYSQL数据库备份还原,并还原到最新状态(mysqldump,xtrabackup)
  2. 开源图像编辑器 GIMP 开发者透露 2019 年计划
  3. git恢复到上次提交
  4. Android Stuido启动提示No JVM installation found.Please install a 32-bit JDK....
  5. 看一家公司发展得如何就看这家公司的财务部门
  6. 手把手部署Linux下磁盘配额(quota)应用与实战
  7. redis windows
  8. 从茶叶蛋到互联网思维
  9. 把blogengine当作cms作公司网站
  10. Virtual PC 使用指南