Kafka Zero-Copy 使用分析
前言
- NIO
- Zero Copy
- 磁盘顺序读写
- Queue数据结构的极致使用
- Kafka在什么场景下用了这个技术
- Zero-Copy 是如何被调用,并且发挥作用的。
Kafka在什么场景下使用该技术
java.nio.FileChannel.transferTo(
long position,
long count,
WritableByteChannel target)`
Kafka 如何使用Zero-Copy流程分析
数据的生成
kaka.server.KafkaApis
def handle(request: RequestChannel.Request) {ApiKeys.forId(request.requestId) match {case ApiKeys.PRODUCE => handleProducerRequest(request)case ApiKeys.FETCH => handleFetchRequest(request).....
replicaManager.fetchMessages( fetchRequest.maxWait.toLong, fetchRequest.replicaId, fetchRequest.minBytes, authorizedRequestInfo, sendResponseCallback)
replicaManager.fetchMessages
这个方法非常的长。我们只关注一句代码:
val logReadResults = readFromLocalLog(fetchOnlyFromLeader, fetchOnlyCommitted, fetchInfo)
log.read(offset, fetchSize, maxOffsetOpt)
val fetchInfo = entry.getValue.read(startOffset, maxOffset, maxLength, maxPosition)
offsetMetadata
FileMessageSet
def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Int): Int = {......val bytesTransferred = (destChannel match {case tl: TransportLayer => tl.transferFrom(channel, position, count)case dc => channel.transferTo(position, count, dc)}).toIntbytesTransferred}
val logReadResults = readFromLocalLog(fetchOnlyFromLeader, fetchOnlyCommitted, fetchInfo)
val fetchPartitionData = logReadResults.mapValues(result => FetchResponsePartitionData(result.errorCode, result.hw, result.info.messageSet))
responseCallback(fetchPartitionData)
def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData]) {val mergedResponseStatus = responsePartitionData ++ unauthorizedResponseStatusdef fetchResponseCallback(delayTimeMs: Int) {val response = FetchResponse(fetchRequest.correlationId, mergedResponseStatus, fetchRequest.versionId, delayTimeMs)requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, response)))}}
数据的发送
override def run() {startupComplete()while(isRunning) {try {// setup any new connections that have been queued upconfigureNewConnections()// register any new responses for writingprocessNewResponses()
private def processNewResponses() {var curr = requestChannel.receiveResponse(id)while(curr != null) {try {curr.responseAction match { case RequestChannel.SendAction =>selector.send(curr.responseSend)inflightResponses += (curr.request.connectionId -> curr)}} finally {curr = requestChannel.receiveResponse(id)}}}
//SocketServer.scala
public void send(Send send) {KafkaChannel channel = channelOrFail(send.destination());channel.setSend(send);}//KafkaChannel.scalapublic void setSend(Send send) {this.send = send; this.transportLayer.addInterestOps(SelectionKey.OP_WRITE); }
override def run() {startupComplete()while(isRunning) {try {// setup any new connections that have been queued upconfigureNewConnections()// register any new responses for writingprocessNewResponses()try {selector.poll(300)} catch {case...}
//KafkaChannel.scala
public Send write() throws IOException {if (send != null && send(send)) }
//
//KafkaChannel.scala
private boolean send(Send send) throws IOException {send.writeTo(transportLayer);if (send.completed())transportLayer.removeInterestOps(SelectionKey.OP_WRITE);return send.completed();}
private val sends = new MultiSend(dest, JavaConversions.seqAsJavaList(fetchResponse.dataGroupedByTopic.toList.map {case(topic, data) => new TopicDataSend(dest, TopicData(topic,data.map{case(topicAndPartition, message) => (topicAndPartition.partition, message)}))}))
override def writeTo(channel: GatheringByteChannel): Long = {..... written += sends.writeTo(channel)....}
- topicAndPartition.partition: 分区
- message:FetchResponsePartitionData
//partitionData 就是 FetchResponsePartitionData
//messages 其实就是FileMessageSet
val bytesSent = partitionData.messages.writeTo(channel, messagesSentSize, messageSize - messagesSentSize)
def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Int): Int = {......val bytesTransferred = (destChannel match {case tl: TransportLayer => tl.transferFrom(channel, position, count)case dc => channel.transferTo(position, count, dc)}).toIntbytesTransferred}
@Overridepublic long transferFrom(FileChannel fileChannel, long position, long count) throws IOException {return fileChannel.transferTo(position, count, socketChannel);}
总结
Kafka Zero-Copy 使用分析相关推荐
- 跟我学Kafka源码Producer分析
2019独角兽企业重金招聘Python工程师标准>>> 跟我学Kafka源码Producer分析 博客分类: MQ 本章主要讲解分析Kafka的Producer的业务逻辑,分发逻辑和 ...
- 【kafka】Kafka 之 Group 状态变化分析及 Rebalance 过程
文章目录 1.概述 2. Group 状态机 3.offset 那些事 4.Topic __consumer_offsets 5.GroupCoordinator 6.状态转移图 7.Consumer ...
- elk + kafka 简单搭建日志分析系统
elk + kafka 简单搭建日志分析系统 文介绍使用ELK(elasticsearch.logstash.kibana) + kafka来搭建一个日志系统. 想象一下淘宝,它可以对不同的用户实现推 ...
- ELK+Kafka集群日志分析系统
因为是自己本地写好的word文档复制进来的.格式有些出入还望体谅.如有错误请回复.谢谢! 一. 系统介绍 2 二. 版本说明 3 三. 服务部署 3 1) JDK部署 3 2) Elasticsear ...
- kafka原理和性能分析测试
1.Kafka写数据流程: producer先从zookeeper的broker-list的节点找到partition(分区)的leader: producer将消息发送给该leader的partit ...
- Kafka消息积压案例分析
案例 一个微服务同一个分组消费同一个topic的kafka消息,不通业务通过key值区分,由于其中一个业务消息量大,偶尔会出现消费滞后的情况,导致当前微服务消费组出现大量消息积压情况,影响业务. 简单 ...
- kafka性能测试、性能分析与性能调优
前言:最近在做kafka.mq.redis.fink.kudu等在中间件性能压测,压测kafka的时候参考了这篇文章,大家可以借鉴下! 一.测试环境 测试使用到三台机器,机器配置如下: 共同配置: I ...
- presto + kafka + logstash 实时监控分析nginx日志
文章目录 前言 一.方案选取 二.各项配置 1.logstash配置 2.presto 增加kafka connector 3. 分析监控 总结 前言 目前线上环境nginx日志一天10亿左右,日志已 ...
- std::copy性能分析与memmove机器级实现
复制数据的快速方法std::copy C++复制数据各种方法大家都会,很多时候我们都会用到std::copy这个STL函数,这个效率确实很不错,比我们一个一个元素复制或者用迭代器复制都来的要快很多. ...
- Python调用kafka构建完整实例分析与应用!
近期遇到一个需求就是我们需要把当前比较耗费资源的接口开发成异步通讯的机制,简单来说就是有一个消息队列来不停地进行消息的集中分发与任务处理,这里应用端给出的方案是使用kafka来做,但是这个我在之前没有 ...
最新文章
- python学习笔记 day44 数据库三范式
- JS 保持数组长度为3位并且值不重复
- SSH开发环境整合搭建
- nginx 内核优化参数
- OpenCV图像金字塔:高斯金字塔、拉普拉斯金字塔总结
- abap中的弹出窗口函数
- 事务超时时间无效_阿里分布式事务组件 fescar/seata 对 XA 2PC 的改进及其设计思想...
- linux下的二进制文件的编辑和查看
- java获取表主外键_通过 jdbc 分析数据库中的表结构和主键外键
- ios 旋转屏幕试图切换_iOS增强现实应用(AR)设计指南(上)
- Hemberg-lab单细胞转录组数据分析(六)
- mysql如何保证高可用_mysql怎么保证高可用
- 小米卢伟冰回应“低价误国”言论:华为是中国企业“低价”战略的榜样
- Spring boot 2.4开启静态资源缓存
- JavaScript—获取参数(23)
- 拓端tecdat|matlab对MCMC贝叶斯方法用于加筋复合板的冲击载荷识别
- Redis - 听说 Dragonfly 宣称比我快 25 倍
- Java编写一个公司员工类
- 安装ubuntu20.4桌面系统
- 基于肌肉骨骼模型的预测仿真
热门文章
- MYSQL数据库备份还原,并还原到最新状态(mysqldump,xtrabackup)
- 开源图像编辑器 GIMP 开发者透露 2019 年计划
- git恢复到上次提交
- Android Stuido启动提示No JVM installation found.Please install a 32-bit JDK....
- 看一家公司发展得如何就看这家公司的财务部门
- 手把手部署Linux下磁盘配额(quota)应用与实战
- redis windows
- 从茶叶蛋到互联网思维
- 把blogengine当作cms作公司网站
- Virtual PC 使用指南