一、场景分析

在前面几篇分析过,Log类用于管理服务端日志相关的各种操作,如:

  • 日志段管理:滚动生成新日志段、组织并管理分区下的所有日志段等

  • 关键偏移量管理:如LogStartOffset、LEO等

  • 读写操作:进行日志的读写

  • 高水位操作管理:定义了对于高水位值的各种操作,包括更新和读取

这篇主要分析第三个:日志的读写操作

二、图示说明

1.日志对象写数据操作流程:

2.日志对象读数据操作流程:

三、源码分析1.写数据操作

日志写数据操作的相关方法共有三个,appendAsLeader、appendAsFollower、append,它们的关系如下:

其中,appendAsLeader 用来往leader副本写数据,appendAsFollower 用于follower副本同步。两个方法的底层都调用了 append 方法。appendAsLeader():

def appendAsLeader(records: MemoryRecords, leaderEpoch: Int, isFromClient: Boolean = true,                   interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion): LogAppendInfo = {  append(records, isFromClient, interBrokerProtocolVersion, assignOffsets = true, leaderEpoch)}

appendAsFollower():

def appendAsFollower(records: MemoryRecords): LogAppendInfo = {  append(records, isFromClient = false, interBrokerProtocolVersion = ApiVersion.latestVersion, assignOffsets = false, leaderEpoch = -1)}

从上面两个方法的传参中,可以看到有几个关键的差异:

  • isFromClient:标识写入的消息是否来自于客户端,由于follower是从leader副本拉取消息,故该值为 false
  • assignOffsets:标识是否需要分配偏移量,如果是leader副本则需要给消息分配偏移量;而如果是follower副本则直接从leader副本上拉取消息,偏移量已经分配好了,故该值为 false
  • leaderEpoch:leader 的纪元值,如果是follower则该方法中传入固定值 -1

下面看一下 append 方法的具体流程,该方法较复杂,可以归纳为12个步骤,后面逐一进行分析:

private def append(records: MemoryRecords, isFromClient: Boolean, interBrokerProtocolVersion: ApiVersion, assignOffsets: Boolean, leaderEpoch: Int): LogAppendInfo = {    maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") {      //TODO 步骤一:消息校验,返回一个LogAppendInfo对象      val appendInfo = analyzeAndValidateRecords(records, isFromClient = isFromClient)      //如果没有需要写入的消息,直接返回      if (appendInfo.shallowCount == 0)        return appendInfo      //TODO 步骤二:调整消息大小      var validRecords = trimInvalidBytes(records, appendInfo)      lock synchronized {        //检查Log对象未关闭        checkIfMemoryMappedBufferClosed()        //判断是否需要分配偏移量,如果是leader副本则为true        if (assignOffsets) {          //TODO 步骤三:使用当前LEO值作为待写入消息集合中第一条消息的位移值          val offset = new LongRef(nextOffsetMetadata.messageOffset)          appendInfo.firstOffset = Some(offset.value)          //当前时间          val now = time.milliseconds          val validateAndOffsetAssignResult = try {            LogValidator.validateMessagesAndAssignOffsets(              validRecords,//有效消息              offset,//偏移量              time,              now,              appendInfo.sourceCodec,//来源数据的压缩类型              appendInfo.targetCodec,//写入时的目标压缩类型              config.compact,//是否采用日志压缩,Boolean类型              config.messageFormatVersion.recordVersion.value,//消息格式              config.messageTimestampType,//消息时间戳类型,默认为CreatTime,由服务端参数 message.timestamp.type 配置              config.messageTimestampDifferenceMaxMs,//消息从生产到写入的最大时间间隔,如果超过则消息会被拒绝,默认为Long.MaxValue                                                      // 由服务端参数 message.timestamp.difference.max.ms 配置,当消息时间戳类型为LogAppendTime时,该忽略该参数              leaderEpoch,//epoch值              isFromClient,              interBrokerProtocolVersion)          } catch {            case e: IOException =>              throw new KafkaException(s"Error validating messages while appending to log $name", e)          }          //更新校验结果          validRecords = validateAndOffsetAssignResult.validatedRecords          appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp          appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp          appendInfo.lastOffset = offset.value - 1          appendInfo.recordConversionStats = validateAndOffsetAssignResult.recordConversionStats          //如果时间戳的类型是:LogAppendTime,设置 appendInfo.logAppendTime 为当前时间          if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)            appendInfo.logAppendTime = now          //TODO 步骤四:验证消息批次大小不超限,即是否大于Broker端参数max.message.bytes值,默认为:1000000 + Records.LOG_OVERHEAD(12字节)          if (validateAndOffsetAssignResult.messageSizeMaybeChanged) {            for (batch               //如果批次大小超过:1000000 + Records.LOG_OVERHEAD(12字节)              if (batch.sizeInBytes > config.maxMessageSize) {                brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)                brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)                throw new RecordTooLargeException(s"Message batch size is ${batch.sizeInBytes} bytes in append to" +                  s"partition $topicPartition which exceeds the maximum configured size of ${config.maxMessageSize}.")              }            }          }          //如果不需要分配偏移量,说明是follower副本拉取到的消息        } else {          //如果消息偏移量不是单调递增,则抛出异常          if (!appendInfo.offsetsMonotonic)            throw new OffsetsOutOfOrderException(s"Out of order offsets found in append to $topicPartition: " +                                                 records.records.asScala.map(_.offset))          //获取appendInfo中的firstOffset,如果不存在则取第一个批次的最后一个偏移量,默认firstOffset不存在          //如果消息集中的最大偏移量都小于当前LEO,则说明没有要更新的消息          if (appendInfo.firstOrLastOffsetOfFirstBatch             //更新firstOffset,由于默认为None,所有这里取:records.batches.asScala.head.baseOffset(),即第一个批次的起始偏移量            val firstOffset = appendInfo.firstOffset match {              case Some(offset) => offset              case None => records.batches.asScala.head.baseOffset()            }            val firstOrLast = if (appendInfo.firstOffset.isDefined) "First offset" else "Last offset of the first batch"            throw new UnexpectedAppendOffsetException(              s"Unexpected offset in append to $topicPartition. $firstOrLast " +              s"${appendInfo.firstOrLastOffsetOfFirstBatch} is less than the next offset ${nextOffsetMetadata.messageOffset}. " +              s"First 10 offsets in append: ${records.records.asScala.take(10).map(_.offset)}, last offset in" +              s" append: ${appendInfo.lastOffset}. Log start offset = $logStartOffset",              firstOffset, appendInfo.lastOffset)          }        }        //TODO 步骤五:更新leader epoch缓存        validRecords.batches.asScala.foreach { batch =>          if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {            maybeAssignEpochStartOffset(batch.partitionLeaderEpoch, batch.baseOffset)          } else {            leaderEpochCache.filter(_.nonEmpty).foreach { cache =>              warn(s"Clearing leader epoch cache after unexpected append with message format v${batch.magic}")              cache.clearAndFlush()            }          }        }        //TODO 步骤六:确保消息大小不超限,不超过一个Segment的大小,即1G        if (validRecords.sizeInBytes > config.segmentSize) {          throw new RecordBatchTooLargeException(s"Message batch size is ${validRecords.sizeInBytes} bytes in append " +            s"to partition $topicPartition, which exceeds the maximum configured segment size of ${config.segmentSize}.")        }        //TODO 步骤七:验证事务状态        val (updatedProducers, completedTxns, maybeDuplicate) = analyzeAndValidateProducerState(validRecords, isFromClient)        maybeDuplicate.foreach { duplicate =>          appendInfo.firstOffset = Some(duplicate.firstOffset)          appendInfo.lastOffset = duplicate.lastOffset          appendInfo.logAppendTime = duplicate.timestamp          appendInfo.logStartOffset = logStartOffset          return appendInfo        }        //TODO 步骤八:判断是否执行日志切分。        // 当前日志段剩余容量可能无法容纳新消息集合,因此有必要创建一个新的日志段来保存待写入的所有消息        val segment = maybeRoll(validRecords.sizeInBytes, appendInfo)        val logOffsetMetadata = LogOffsetMetadata(          messageOffset = appendInfo.firstOrLastOffsetOfFirstBatch,          segmentBaseOffset = segment.baseOffset,          relativePositionInSegment = segment.size)        //TODO 步骤九:真正执行写入操作,追加数据到日志段文件,        // 主要调用日志段对象的append方法实现        segment.append(largestOffset = appendInfo.lastOffset,          largestTimestamp = appendInfo.maxTimestamp,          shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,          records = validRecords)        //TODO 步骤十:更新LEO对象,其中,LEO值是消息集合中最后一条消息位移值+1        updateLogEndOffset(appendInfo.lastOffset + 1)        //TODO 步骤十一:更新事务状态        for ((_, producerAppendInfo)           producerAppendInfo.maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata)          producerStateManager.update(producerAppendInfo)        }        for (completedTxn           val lastStableOffset = producerStateManager.lastStableOffset(completedTxn)          segment.updateTxnIndex(completedTxn, lastStableOffset)          producerStateManager.completeTxn(completedTxn)        }        producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1)        updateFirstUnstableOffset()        trace(s"Appended message set with last offset: ${appendInfo.lastOffset}, " +          s"first offset: ${appendInfo.firstOffset}, " +          s"next offset: ${nextOffsetMetadata.messageOffset}, " +          s"and messages: $validRecords")        //是否需要手动落盘。一般情况下不需要设置Broker端参数log.flush.interval.messages        // 落盘操作交由操作系统来完成。但某些情况下,可以设置该参数来确保高可靠性        //config.flushInterval默认值为Long.MaxValue,即默认这个分支条件不会满足,不执行flush方法        if (unflushedMessages >= config.flushInterval)          flush()        //TODO 步骤十二:返回写入结果        appendInfo      }    }  }
  • 第一步:校验消息,返回一个LogAppendInfo对象:
//TODO 步骤一:消息校验,返回一个LogAppendInfo对象val appendInfo = analyzeAndValidateRecords(records, isFromClient = isFromClient)

首先看一下LogAppendInfo类的定义:

case class LogAppendInfo(var firstOffset: Option[Long],//消息集中的第一个偏移量,只有消息版本小于V2,且追加给leader副本时才有                         var lastOffset: Long,// 消息集合最后一条消息的偏移量                         var maxTimestamp: Long,// 消息集合最大消息时间戳                         var offsetOfMaxTimestamp: Long,// 消息集合最大消息时间戳所属消息的偏移量                         var logAppendTime: Long,// 写入消息时间戳                         var logStartOffset: Long,// 消息集合首条消息的偏移量                         var recordConversionStats: RecordConversionStats,//消息转换统计类,里面记录了执行了格式转换的消息数等数据                         sourceCodec: CompressionCodec,//接收消息的压缩格式                         targetCodec: CompressionCodec,//写入日志的压缩格式                         shallowCount: Int,//消息批次数,每个消息批次下可能包含多条消息                         validBytes: Int,//写入消息总字节数                         offsetsMonotonic: Boolean,//消息位移值是否是单调递增的                         lastOffsetOfFirstBatch: Long//首个消息批次中最后一条消息的偏移量                        ) {                        ...

各个属性都已经加了注释,这里不再赘述。

analyzeAndValidateRecords 方法用来校验待写入的消息集合,然后将校验结果封装成LogAppendInfo对象返回,具体流程如下:

private def analyzeAndValidateRecords(records: MemoryRecords, isFromClient: Boolean): LogAppendInfo = {    var shallowMessageCount = 0    var validBytesCount = 0    var firstOffset: Option[Long] = None    var lastOffset = -1L    var sourceCodec: CompressionCodec = NoCompressionCodec    var monotonic = true    var maxTimestamp = RecordBatch.NO_TIMESTAMP    var offsetOfMaxTimestamp = -1L    var readFirstMessage = false    var lastOffsetOfFirstBatch = -1L    //遍历 MemoryRecords 中的 batch    for (batch       //V2版本的消息批次的起始偏移量必须为0,如果不为0则抛异常。      //TODO 注意这里还没有给消息分配存储时的偏移量,只是批次中的偏移量      if (batch.magic >= RecordBatch.MAGIC_VALUE_V2 && isFromClient && batch.baseOffset != 0)        throw new InvalidRecordException(s"The baseOffset of the record batch in the append to $topicPartition should " +          s"be 0, but it is ${batch.baseOffset}")      //只有第一个批次走这个分支      if (!readFirstMessage) {        //如果是V2版本的消息        if (batch.magic >= RecordBatch.MAGIC_VALUE_V2)          //更新初始偏移量          firstOffset = Some(batch.baseOffset)        //更新第一个批次的最大偏移量        lastOffsetOfFirstBatch = batch.lastOffset        readFirstMessage = true      }      //如果当前批次的lastOffset小于等于上一个批次的lastOffset,说明上一个批次中有偏移量大于后面batch的消息      //这违反了偏移量的单调递增性      if (lastOffset >= batch.lastOffset)        monotonic = false      //更新lastOffset为当前批次的最后的偏移量      lastOffset = batch.lastOffset      val batchSize = batch.sizeInBytes      //判断批次的大小是否大于配置的最大消息大小:由 broker 端参数 max.message.bytes 配置,默认为:1000000 + Records.LOG_OVERHEAD(12)      if (batchSize > config.maxMessageSize) {        brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)        brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)        throw new RecordTooLargeException(s"The record batch size in the append to $topicPartition is $batchSize bytes " +          s"which exceeds the maximum configured value of ${config.maxMessageSize}.")      }      //通过CRC 校验批次,如果不完整则抛异常      batch.ensureValid()      if (batch.maxTimestamp > maxTimestamp) {        //更新最大时间戳        maxTimestamp = batch.maxTimestamp        //更新最大时间戳对应的偏移量        offsetOfMaxTimestamp = lastOffset      }      //累加消息批次计数器以及有效字节数,更新shallowMessageCount字段      shallowMessageCount += 1      validBytesCount += batchSize      //获取生产消息的压缩格式      val messageCodec = CompressionCodec.getCompressionCodec(batch.compressionType.id)      //如果消息进行了压缩      if (messageCodec != NoCompressionCodec)        //更新压缩格式        sourceCodec = messageCodec    }    //获取broker端的压缩格式。即Broker端参数compression.type值。    //该参数默认值是producer,表示targetCodec 和 sourceCodec 一致    val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec)    //定义 LogAppendInfo 对象,里面包含了压缩类型,这里firstOffset默认为None    LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, RecordBatch.NO_TIMESTAMP, logStartOffset,      RecordConversionStats.EMPTY, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic, lastOffsetOfFirstBatch)  }

由于一个待写入的消息集合中可能包含多个批次,所以这里会遍历消息集合中的批次进行处理:

a. 如果消息版本大于等于V2,且消息来自于客户端,那么该批次消息的起始偏移量必须为0,否则就会抛出异常:

注意:这里的偏移量不是消息往日志中追加时的偏移量,只是生产的消息在所属批次中的偏移量,V2格式的消息要求这个偏移量必须从0开始。

if (batch.magic >= RecordBatch.MAGIC_VALUE_V2 && isFromClient && batch.baseOffset != 0)    throw new InvalidRecordException(s"The baseOffset of the record batch in the append to $topicPartition should " +      s"be 0, but it is ${batch.baseOffset}")

b. 如果是第一个批次,那么更新firstOffset和lastOffsetOfFirstBatch的值。变量readFirstMessage的初始值为false,更新完上面两个值后会修改为true,之后的其它批次不再走这个分支:

//只有第一个批次走这个分支if (!readFirstMessage) {  //如果是V2版本的消息  if (batch.magic >= RecordBatch.MAGIC_VALUE_V2)    //更新初始偏移量    firstOffset = Some(batch.baseOffset)  //更新第一个批次的最大偏移量  lastOffsetOfFirstBatch = batch.lastOffset  readFirstMessage = true}

c. 检查当前批次的最后一个偏移量是否不大于之前批次的最大偏移量,如果是,则违反了偏移量单调递增的原则,标记 monotonic 为 false

//如果当前批次的lastOffset小于等于上一个批次的lastOffset,说明上一个批次中有偏移量大于后面batch的消息//这违反了偏移量的单调递增性if (lastOffset >= batch.lastOffset)  monotonic = false

d. 更新lastOffset

//更新lastOffset为当前批次的最后的偏移量lastOffset = batch.lastOffset

e. 检查当前批次的大小是否超过最大限制,该限制由broker端参数:max.message.bytes 配置,默认为 1000000+12 个字节

//判断批次的大小是否大于配置的最大消息大小:由 broker 端参数 max.message.bytes 配置,默认为:1000000 + Records.LOG_OVERHEAD(12)if (batchSize > config.maxMessageSize) {  brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)  brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)  throw new RecordTooLargeException(s"The record batch size in the append to $topicPartition is $batchSize bytes " +    s"which exceeds the maximum configured value of ${config.maxMessageSize}.")}

f. 通过CRC校验批次数据,如果不完整则抛出异常

//通过CRC 校验批次,如果不完整则抛异常batch.ensureValid()

g. 判断是否需要更新待写入消息的最大时间戳和最大时间戳对应的偏移量

if (batch.maxTimestamp > maxTimestamp) {  //更新最大时间戳  maxTimestamp = batch.maxTimestamp  //更新最大时间戳对应的偏移量  offsetOfMaxTimestamp = lastOffset}

h. 更新批次计数器和待写入消息的有效字节数

//累加消息批次计数器以及有效字节数,更新shallowMessageCount字段shallowMessageCount += 1validBytesCount += batchSize

i. 获取消息的压缩格式。客户端生产消息时可供选择的压缩格式有四种:gzip、snappy、lz4、zstd,默认为不压缩

//获取生产消息的压缩格式val messageCodec = CompressionCodec.getCompressionCodec(batch.compressionType.id)//如果消息进行了压缩if (messageCodec != NoCompressionCodec)  //更新压缩格式  sourceCodec = messageCodec

j. 遍历完所有的批次后,获取写入日志时的压缩格式,通过compression.type 配置,默认为 "producer" ,即和生产消息时采用的压缩格式一致

//获取broker端的压缩格式。即Broker端参数compression.type值。//该参数默认值是producer,表示targetCodec 和 sourceCodec 一致val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec)

k. 封装LogAppendInfo对象并返回:

LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, RecordBatch.NO_TIMESTAMP, logStartOffset,      RecordConversionStats.EMPTY, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic, lastOffsetOfFirstBatch)
  • 第二步:调整待写入消息集的大小

var validRecords = trimInvalidBytes(records, appendInfo)

trimInvalidBytes方法如下:

private def trimInvalidBytes(records: MemoryRecords, info: LogAppendInfo): MemoryRecords = {  val validBytes = info.validBytes  if (validBytes < 0)    throw new CorruptRecordException(s"Cannot append record batch with illegal length $validBytes to " +      s"log for $topicPartition. A possible cause is a corrupted produce request.")  if (validBytes == records.sizeInBytes) {    records  } else {    //裁剪至有效大小    val validByteBuffer = records.buffer.duplicate()//创建一个和和recordds.buffer内容一样的新缓冲区    validByteBuffer.limit(validBytes)//设置新缓冲区可用大小    MemoryRecords.readableRecords(validByteBuffer)//用新缓冲区构建MemoryRecords对象  }}

这一步就是根据第一步校验数据的结果来调整待写入消息的大小。

  • 如果校验结果中有效消息字节数小于0,则抛异常

  • 如果消息大小和校验结果中一致,则直接返回待写入消息集合;

  • 如果校验结果中有效消息大小比待写入的消息总字节数要小,则重新封装一个MemoryRecords对象并返回,消息大小就是有效字节总数。

  • 第三步:分配偏移量。如果需要分配偏移量,即 assignoffsets = true,则采用当前LEO值作为待写入消息集合中第一条消息的偏移量
if (assignOffsets) {    //TODO 步骤三:使用当前LEO值作为待写入消息集合中第一条消息的位移值    val offset = new LongRef(nextOffsetMetadata.messageOffset)    appendInfo.firstOffset = Some(offset.value)

封装ValidationAndOffsetAssignResult对象并更新校验结果

val validateAndOffsetAssignResult = try {    LogValidator.validateMessagesAndAssignOffsets(      validRecords,//有效消息      offset,//偏移量      time,      now,      appendInfo.sourceCodec,//来源数据的压缩类型      appendInfo.targetCodec,//写入时的目标压缩类型      config.compact,//是否采用日志压缩,Boolean类型      config.messageFormatVersion.recordVersion.value,//消息格式      config.messageTimestampType,//消息时间戳类型,默认为CreatTime,由服务端参数 message.timestamp.type 配置      config.messageTimestampDifferenceMaxMs,//消息从生产到写入的最大时间间隔,如果超过则消息会被拒绝,默认为Long.MaxValue                                              // 由服务端参数 message.timestamp.difference.max.ms 配置,当消息时间戳类型为LogAppendTime时,该忽略该参数      leaderEpoch,//epoch值      isFromClient,      interBrokerProtocolVersion)    } catch {    case e: IOException =>      throw new KafkaException(s"Error validating messages while appending to log $name", e)    }    //更新校验结果    validRecords = validateAndOffsetAssignResult.validatedRecords    appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp    appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp    appendInfo.lastOffset = offset.value - 1    appendInfo.recordConversionStats = validateAndOffsetAssignResult.recordConversionStats    //如果时间戳的类型是:LogAppendTime,设置 appendInfo.logAppendTime 为当前时间    if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)    appendInfo.logAppendTime = now
  • 第四步:验证批次大小是否超过限制。如果消息未设置压缩算法,messageSizeMaybeChanged = false,其实是不走这个分支的。

if (validateAndOffsetAssignResult.messageSizeMaybeChanged) {  for (batch     //如果批次大小超过:1000000 + Records.LOG_OVERHEAD(12字节)    if (batch.sizeInBytes > config.maxMessageSize) {      brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)      brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)      throw new RecordTooLargeException(s"Message batch size is ${batch.sizeInBytes} bytes in append to" +        s"partition $topicPartition which exceeds the maximum configured size of ${config.maxMessageSize}.")    }  }}
  • 第五步:更新leader epoch 缓存

validRecords.batches.asScala.foreach { batch =>  //消息版本为V2  if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {    //更新leader epoch    maybeAssignEpochStartOffset(batch.partitionLeaderEpoch, batch.baseOffset)  } else {    leaderEpochCache.filter(_.nonEmpty).foreach { cache =>      warn(s"Clearing leader epoch cache after unexpected append with message format v${batch.magic}")      cache.clearAndFlush()    }  }}
  • 第六步:确保待写入消息的大小不超过一个LogSegment的容量,即 1G

//TODO 步骤六:确保消息大小不超限,不超过一个Segment的大小,即1Gif (validRecords.sizeInBytes > config.segmentSize) {  throw new RecordBatchTooLargeException(s"Message batch size is ${validRecords.sizeInBytes} bytes in append " +    s"to partition $topicPartition, which exceeds the maximum configured segment size of ${config.segmentSize}.")}
  • 第七步:验证事务状态

//TODO 步骤七:验证事务状态val (updatedProducers, completedTxns, maybeDuplicate) = analyzeAndValidateProducerState(validRecords, isFromClient)maybeDuplicate.foreach { duplicate =>  appendInfo.firstOffset = Some(duplicate.firstOffset)  appendInfo.lastOffset = duplicate.lastOffset  appendInfo.logAppendTime = duplicate.timestamp  appendInfo.logStartOffset = logStartOffset  return appendInfo}
  • 第八步:判断是否需要执行日志滚动如果需要,返回一个滚动后的新的日志段,如果不需要,返回当前的active segment

//TODO 步骤八:判断是否执行日志切分。// 当前日志段剩余容量可能无法容纳新消息集合,因此有必要创建一个新的日志段来保存待写入的所有消息val segment = maybeRoll(validRecords.sizeInBytes, appendInfo)
  • 第九步:调用LogSegment.append() 方法执行真正的写入操作

//TODO 步骤九:真正执行写入操作,追加数据到日志段文件,// 主要调用日志段对象的append方法实现segment.append(largestOffset = appendInfo.lastOffset,  largestTimestamp = appendInfo.maxTimestamp,  shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,  records = validRecords)
  • 第十步:更新LEO对象,其中LEO值就是消息集合中最大的偏移量 + 1

updateLogEndOffset(appendInfo.lastOffset + 1)
  • 第十一步:更新事务状态

//TODO 步骤十一:更新事务状态for ((_, producerAppendInfo)   producerAppendInfo.maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata)  producerStateManager.update(producerAppendInfo)}
  • 第十二步:返回写入结果

//TODO 步骤十二:返回写入结果appendInfo

除了这12步外,还有一个地方需要注意:flush()刷写磁盘

if (unflushedMessages >= config.flushInterval)    flush()

config.flushInterval 参数通过broker端参数 log.flush.interval.messages 设置,默认值为Long.MaxValue,说明这里的flush()方法默认不会调用。Kafka默认将落盘的操作交给操作系统来完成,程序中一般不会主动调用该方法进行刷盘。但是如果有特殊需求,可以通过这个参数来设置刷盘的时机。

2. 读数据操作

读数据操作通过read()方法完成,代码如下:

def read(startOffset: Long,//读取的起始偏移量         maxLength: Int,//读取的最大字节数         maxOffset: Option[Long],//读取的最大的偏移量         minOneMessage: Boolean,//是否至少读取一条消息         includeAbortedTxns: Boolean): FetchDataInfo = {  maybeHandleIOException(s"Exception while reading from $topicPartition in dir ${dir.getParent}") {    trace(s"Reading $maxLength bytes from offset $startOffset of length $size bytes")    //TODO 第一步:获取当前的LEO对象及LEO值    val currentNextOffsetMetadata = nextOffsetMetadata    val next = currentNextOffsetMetadata.messageOffset    //如果读取的起始偏移量 = LEO ,那么将不会返回任何数据    if (startOffset == next) {      val abortedTransactions =        if (includeAbortedTxns) Some(List.empty[AbortedTransaction])        else None      return FetchDataInfo(currentNextOffsetMetadata, MemoryRecords.EMPTY, firstEntryIncomplete = false,        abortedTransactions = abortedTransactions)    }    //TODO 第二步:获取日志段集合中起始偏移量小于等于startOffset的Entry:    var segmentEntry = segments.floorEntry(startOffset)    //TODO 第三步:判断读取的消息是否越界    // 满足以下条件之一将被视为消息越界,即你要读取的消息不在该Log对象中:    // 1. 要读取的消息位移超过了LEO值    // 2. 没找到对应的日志段对象    // 3. 要读取的消息在Log Start Offset之下,同样是对外不可见的消息    if (startOffset > next || segmentEntry == null || startOffset < logStartOffset)      throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " +        s"but we only have log segments in the range $logStartOffset to $next.")    //遍历日志段对象,直到读出数据或者读到日志末尾    while (segmentEntry != null) {      //获取起始偏移量所在的日志段对象      val segment = segmentEntry.getValue      //最大的读取位置      val maxPosition = {        //如果是最后一个日志段,即active segment        if (segmentEntry == segments.lastEntry) {          //exposedPos 就是LEO对应的位置          val exposedPos = nextOffsetMetadata.relativePositionInSegment.toLong          //再次检查是否为active segment,避免滚动生成了新的日志段          if (segmentEntry != segments.lastEntry)            //如果滚动生成了新的日志段,则可以读取当前日志段的全部            segment.size          else          //如果没有生成新的日志段,则可以读取到LEO对应的位置            exposedPos          //如果不是active segment,        } else {          segment.size        }      }      //TODO 第四步:调用LogSegment.read方法进行数据读取      val fetchInfo = segment.read(startOffset, maxOffset, maxLength, maxPosition, minOneMessage)      if (fetchInfo == null) {        //如果没有读到数据,继续读取下一个日志段        segmentEntry = segments.higherEntry(segmentEntry.getKey)      } else {        return if (includeAbortedTxns)          addAbortedTransactions(startOffset, segmentEntry, fetchInfo)        else        //TODO 第五步:返回读取到的消息          fetchInfo      }    }    // 已经读到日志末尾还是没有数据返回,只能返回空消息集合    FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY)  }}

读取日志的操作可以归纳为5步:

  • 第一步:从LEO对象中获取LEO值。判断读取的起始偏移量是否等于LEO,如果是则什么都读不到,返回空对象

val currentNextOffsetMetadata = nextOffsetMetadataval next = currentNextOffsetMetadata.messageOffset//如果读取的起始偏移量 = LEO ,那么将不会返回任何数据if (startOffset == next) {  val abortedTransactions =    if (includeAbortedTxns) Some(List.empty[AbortedTransaction])    else None  return FetchDataInfo(currentNextOffsetMetadata, MemoryRecords.EMPTY, firstEntryIncomplete = false,    abortedTransactions = abortedTransactions)}
  • 第二步:找到读取的起始偏移量对应的日志段。即日志段的起始偏移量小于等于读取的起始偏移量的Entry

//获取日志段集合中起始偏移量小于等于startOffset的Entry:var segmentEntry = segments.floorEntry(startOffset)
  • 第三步:判断要读取的消息是否越界。满足下面条件之一就被视为越界,即要读取的消息不在Log对象中:

    • 要读取消息的偏移量超过了LEO
    • 没找到对应的日志段对象
    • 要读取的消息偏移量小于LogStartOffset,即对外不可见的消息
if (startOffset > next || segmentEntry == null || startOffset < logStartOffset)    throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " +      s"but we only have log segments in the range $logStartOffset to $next.")
  • 第四步:遍历包含消息的日志段读取数据。这里调用了LogSegment.read()方法进行数据的读取,读取的消息被封装成了FetchDataInfo对象:

val fetchInfo = segment.read(startOffset, maxOffset, maxLength, maxPosition, minOneMessage)
  • 第五步:返回读取到的数据。

总结:

日志对象的写数据操作分为十二个步骤:

  • 校验消息
  • 调整待写入消息集合的大小
  • 分配偏移量
  • 验证批次大小是否超过限制
  • 更新leader epoch 缓存
  • 确保待写入消息集合大小不超过1个日志段容量
  • 验证事务状态
  • 判断是否需要进行日志滚动
  • 调用日志段的append方法写入数据
  • 更新LEO对象
  • 更新事务状态
  • 返回写入结果

日志对象的读数据操作分为五个步骤:

  • 从LEO对象中获取LEO值
  • 找到读取的起始偏移量对应的日志段
  • 判断要读取的消息是否越界
  • 遍历包含要读取消息的日志段,调用日志段的read方法进行数据读取
  • 返回读取结果

服务端设置忽略更新_深入理解Kafka服务端之日志对象的读写数据流程相关推荐

  1. 取本地数据_深入理解Kafka服务端之Follower副本如何同步Leader副本的数据

    一.场景分析Kafka采用的是主写主读的方式,即客户端的读写请求都由分区的Leader副本处理,那么Follower副本要想保证和Leader副本数据一致,就需要不断地从Leader副本拉取消息来进行 ...

  2. 服务端设置忽略更新_react服务端渲染: cookie如何透传给后端,后端如何设置cookie...

    原文链接: react服务端渲染: cookie如何透传给后端,后端如何设置cookie​www.douyacun.com react 服务端渲染 cookie 有2个问题: 服务端执行的代码如何拿到 ...

  3. kafka 启动_深入理解Kafka服务端之Acceptor线程是如何启动和工作的

    一.场景分析上一篇讲到了Kafka网络通信采用了Java NIO的主从Reactor多线程模型,而Acceptor就是Kafka网络通信中很重要的一个线程对象.它通过selector接收客户端的连接请 ...

  4. 异步通信还要设置波特率?_深入理解同步/异步通信

    异步通信还要设置波特率?_深入理解同步/异步通信   上一篇我们解释了串口通信中同步通信和异步通信的区别,详见上篇链接.其中我们分析同步/异步通信最重要的不同点就是是否同步时钟,可能就有很多小伙伴不理 ...

  5. netty tcp服务端主动断开客户端_【Netty】服务端和客户端

    欢迎关注公众号:[爱编程] 如果有需要后台回复2019赠送1T的学习资料哦!! 本文是基于Netty4.1.36进行分析 服务端 Netty服务端的启动代码基本都是如下: private void s ...

  6. 微服务平台的设计要点_我在微服务方面的经验中有5点要点

    微服务平台的设计要点 I am a big fan of microservices. And at the same time, I am not always so fond of them. S ...

  7. 楚留香pc端连接服务器未响应,楚留香游戏pc端闪退怎么办_楚留香游戏中pc端闪退解决办法汇总...

    楚留香手游的内存相对其他小型游戏来说比较大,所以游戏的过程中容易出现pc端闪退的情况,楚留香游戏pc端闪退怎么办?快啦小编给大家带来楚留香游戏中pc端闪退解决办法汇总. 楚留香游戏中pc端闪退解决办法 ...

  8. 电脑音频服务未运行怎么解决_电脑提示音频服务未运行怎么办,快来看看吧,图片描述(最多50字)...

    我们使用电脑放歌的时候,会发现音频服务不能正常使用,还会出现音频服务未运行的提示,那这时候我们该怎么解决这个问题呢?小编就来告诉你们怎么解决音频服务未运行的问题. 小伙伴们要耐心的看下去哟~~ 首先, ...

  9. 服务启动类型是灰色_系统Software Protection服务无法启用的解决方法

    电脑Software Protection服务打不开怎么办?Win7系统Software Protection系统服务无法启用该如何解决?下面就给大家介绍具体解决方法. 解决方法: 1.开始→运行→输 ...

最新文章

  1. Bert代码详解(一)重点详细
  2. 报错 org.springframework.beans.factory.BeanCreationException
  3. Tensorflow—Droupout
  4. mysql数据库报Access denied for user 的解决方法
  5. Redhat、CentOS进单用户模式进行维护
  6. 【CyberSecurityLearning 62】文件包含
  7. java json的使用方法_JAVA编写JSON常用的三种方法
  8. 永远年轻,永远热泪盈眶----致所有奋斗的ACMer
  9. simulink和psim仿真结果不同_(格麟倍)航空航天零件硬铬电镀工艺专业仿真评估工具...
  10. 百度大脑 EasyDL 专业版最新上线自研超大规模视觉预训练模型
  11. JenkinsDay18-查看服务器有哪些JOB
  12. C++内存机制中内存溢出、内存泄露、内存越界和栈溢出的区别和联系
  13. [BZOJ4987] Tree
  14. Python并发编程之线程池/进程池
  15. android 输入法出现挤压屏幕、android输入键盘覆盖了屏幕控件的解决办法
  16. 青岛工学院计算机专业分数线,青岛工学院分数线
  17. Tor 正在开发匿名即时聊天工具
  18. html常用代码大全
  19. Excel 2010 VBA 入门 108 个人所得税计算函数
  20. PHP屏蔽错误警告提示

热门文章

  1. 怎么查找电脑中的流氓软件_玻璃丝网印刷过程中油墨出现问题怎么查找原因解决问题?...
  2. 第四(装饰器、迭代器、生成器)
  3. 论文趣读:人工智能里程碑?回顾2015年登上Nature的DQN(全文翻译+批注)
  4. 深入浅出DDoS***
  5. java中的远程debug调试
  6. mysql5.6.25及以上下载衔接
  7. Fragment与FragmentActivity通信封装
  8. java可以实现agv调度吗_AGV路线优化及实时调度
  9. win7计算机用户配置文件存储路径,Win7用户配置文件夹位置怎么修改?
  10. 复用类库内部已有功能