









  1. 加载并记录日志文件夹中标志状态信息的文件(kafka_cleanshutdown、recovery-point-offset-checkpoint、recovery-point-offset-checkpoint)
  2. 并发对每个 tp 的日志进行加载与恢复(下一小节详解)
  3. 记录并异步处理有问题的日志文件夹


 * Recover and load all logs in the given data directories


private[log] def loadLogs(topicConfigOverrides: Map[String, LogConfig]): Unit = {

  // 对所有可用的日志目录(liveLogDirs)进行加载,kafka server 启动时可能配置多个磁盘目录用来存储日志文件,但是不一定所有的磁盘都是可用的

  info(s"Loading logs from log dirs $liveLogDirs")

  val startMs = time.hiResClockMs()

  val threadPools = ArrayBuffer.empty[ExecutorService]

  val offlineDirs = mutable.Set.empty[(String, IOException)]

  val jobs = ArrayBuffer.empty[Seq[Future[_]]]

  var numTotalLogs = 0

  // 遍历所有的磁盘,进行日志加载与恢复,如果出现 IOException,则将该目录记录到 offlineDirs 中进行后续处理

  for (dir <- liveLogDirs) {

 val logDirAbsolutePath = dir.getAbsolutePath

 var hadCleanShutdown: Boolean = false

 try {

   val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir)


   // 如果 .kafka_cleanshutdown 文件存在,则将该文件删除并记录 hadCleanShutdown 状态,后续不需要进行日志恢复的流程。

   val cleanShutdownFile = new File(dir, Log.CleanShutdownFile)

   if (cleanShutdownFile.exists) {

     info(s"Skipping recovery for all logs in $logDirAbsolutePath since clean shutdown file was found")

     // Cache the clean shutdown status and use that for rest of log loading workflow. Delete the CleanShutdownFile

     // so that if broker crashes while loading the log, it is considered hard shutdown during the next boot up. KAFKA-10471


     hadCleanShutdown = true

   else {

     // log recovery itself is being performed by `Log` class during initialization

     info(s"Attempting recovery for all logs in $logDirAbsolutePath since no clean shutdown file was found")


   // 从 recovery-point-offset-checkpoint 文件读取所有 tp 目录的 recoveryPoint

   var recoveryPoints = Map[TopicPartition, Long]()

   try {

     recoveryPoints = this.recoveryPointCheckpoints(dir).read()

   catch {

     case e: Exception =>

       warn(s"Error occurred while reading recovery-point-offset-checkpoint file of directory " +

         s"$logDirAbsolutePath, resetting the recovery checkpoint to 0", e)


   // 从 log-start-offset-checkpoint 文件读取所有 tp 目录的 logStartOffset

   var logStartOffsets = Map[TopicPartition, Long]()

   try {

     logStartOffsets = this.logStartOffsetCheckpoints(dir).read()

   catch {

     case e: Exception =>

       warn(s"Error occurred while reading log-start-offset-checkpoint file of directory " +

         s"$logDirAbsolutePath, resetting to the base offset of the first segment", e)


   // 日志的加载与恢复主流程,并发对所有 tp 的日志执行 loadLog

   val logsToLoad = Option(dir.listFiles).getOrElse(Array.empty).filter(logDir =>

     logDir.isDirectory && Log.parseTopicPartitionName(logDir).topic != KafkaRaftServer.MetadataTopic)

   val numLogsLoaded = new AtomicInteger(0)

   numTotalLogs += logsToLoad.length

   val jobsForDir = logsToLoad.map { logDir =>

     val runnable: Runnable = () => {

       try {

         debug(s"Loading log $logDir")

         val logLoadStartMs = time.hiResClockMs()

         val log = loadLog(logDir, hadCleanShutdown, recoveryPoints, logStartOffsets, topicConfigOverrides)

         val logLoadDurationMs = time.hiResClockMs() - logLoadStartMs

         val currentNumLoaded = numLogsLoaded.incrementAndGet()

         info(s"Completed load of $log with ${log.numberOfSegments} segments in ${logLoadDurationMs}ms " +

           s"($currentNumLoaded/${logsToLoad.length} loaded in $logDirAbsolutePath)")

       catch {

         case e: IOException =>

           offlineDirs.add((logDirAbsolutePath, e))

           error(s"Error while loading log dir $logDirAbsolutePath", e)





   jobs += jobsForDir.map(pool.submit)

 catch {

   case e: IOException =>

     offlineDirs.add((logDirAbsolutePath, e))

     error(s"Error while loading log dir $logDirAbsolutePath", e)



  try {

 // 等待所有并发执行的日志加载流程执行完成

 for (dirJobs <- jobs) {



 // 记录所有有问题的的目录,后续该目录会被 ReplicaManager 执行下线操作

 offlineDirs.foreach { case (dir, e) =>

   logDirFailureChannel.maybeAddOfflineLogDir(dir, s"Error while loading log dir $dir", e)


  catch {

 case e: ExecutionException =>

   error(s"There was an error in one of the threads during logs loading: ${e.getCause}")

   throw e.getCause

  finally {



  info(s"Loaded $numTotalLogs logs in ${time.hiResClockMs() - startMs}ms.")


单 tp 日志加载与恢复

单个 tp 的日志加载与恢复是在 Log 类的静态代码块中进行的。如果该 tp 的文件夹的后缀为-delete,则认为该 tp 为待删除的,加入到 logsToBeDeleted 集合中等待定时任务对其进行清理。
Log 类的静态代码块中通过 loadSegments 加载日志

private def loadSegments(): Long = {

  // 清理临时文件(.delete 和 .clean 后缀)并保留可用的 swap 文件

  val swapFiles = removeTempFilesAndCollectSwapFiles()

  // retryOnOffsetOverflow 兜住可能发生的 LogSegmentOffsetOverflowException 异常,并进行日志切分处理。

  retryOnOffsetOverflow {

    // 加载文件的中的所有文件并进行必要的完整性检查





  // 根据 swap 文件恢复完成所有被中断的操作


  // 如果不是待删除的 tp 日志,执行 recover 流程

  if (!dir.getAbsolutePath.endsWith(Log.DeleteDirSuffix)) {

    val nextOffset = retryOnOffsetOverflow {



    // reset the index size of the currently active log segment to allow more entries



  else {

     if (logSegments.isEmpty) {

        addSegment(LogSegment.open(dir = dir,

          baseOffset = 0,


          time = time,

          initFileSize = this.initFileSize))





recoverLog 的核心代码如下:

// if we have the clean shutdown marker, skip recovery

// 只有未进行 cleanshutdown 的情况下才需要 recovery

if (!hadCleanShutdown) {

  // 取出 recoveryPoint 之后的所有 segment(正常情况下只有一个)

  val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator

  var truncated = false

  while (unflushed.hasNext && !truncated) {

    val segment = unflushed.next()

    info(s"Recovering unflushed segment ${segment.baseOffset}")

    val truncatedBytes =

      try {

        // 清空 segment 对应的 index,逐个 batch 读取校验数据,并重新构造index

        recoverSegment(segment, leaderEpochCache)

      catch {

        case _: InvalidOffsetException =>

          val startOffset = segment.baseOffset

          warn("Found invalid offset during recovery. Deleting the corrupt segment and " +

            s"creating an empty one with starting offset $startOffset")



    if (truncatedBytes > 0) {

      // 如果前一个 segment 执行了 truncate, 则之后的所有 segment 直接删除

      // unflushed 为迭代器,所以 unflushed.toList 代表的是所有未遍历到的 segment,而不是全部 segment

      warn(s"Corruption found in segment ${segment.baseOffset}, truncating to offset ${segment.readNextOffset}")


        asyncDelete = true,

        reason = LogRecovery)

      truncated = true





private def completeSwapOperations(swapFiles: Set[File]): Unit = {

  for (swapFile <- swapFiles) {

    val logFile = new File(CoreUtils.replaceSuffix(swapFile.getPath, SwapFileSuffix, ""))

    val baseOffset = offsetFromFile(logFile)

    val swapSegment = LogSegment.open(swapFile.getParentFile,

      baseOffset = baseOffset,


      time = time,

      fileSuffix = SwapFileSuffix)

    info(s"Found log file ${swapFile.getPath} from interrupted swap operation, repairing.")



    // We create swap files for two cases:

    // (1) Log cleaning where multiple segments are merged into one, and

    // (2) Log splitting where one segment is split into multiple.


    // Both of these mean that the resultant swap segments be composed of the original set, i.e. the swap segment

    // must fall within the range of existing segment(s). If we cannot find such a segment, it means the deletion

    // of that segment was successful. In such an event, we should simply rename the .swap to .log without having to

    // do a replace with an existing segment.

    val oldSegments = logSegments(swapSegment.baseOffset, swapSegment.readNextOffset).filter { segment =>

      segment.readNextOffset > swapSegment.baseOffset


    replaceSegments(Seq(swapSegment), oldSegments.toSeq, isRecoveredSwapFile = true)




private def recoverSegment(segment: LogSegment,

                           leaderEpochCache: Option[LeaderEpochFileCache] = None): Int = lock synchronized {

  val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs)


  rebuildProducerState(segment.baseOffset, reloadFromCleanShutdown = false, producerStateManager)

  val bytesTruncated = segment.recover(producerStateManager, leaderEpochCache)

  // once we have recovered the segment's data, take a snapshot to ensure that we won't

  // need to reload the same segment again while recovering another segment.





// Rebuild producer state until lastOffset. This method may be called from the recovery code path, and thus must be

// free of all side-effects, i.e. it must not update any log-specific state.

private def rebuildProducerState(lastOffset: Long,

                                 reloadFromCleanShutdown: Boolean,

                                 producerStateManager: ProducerStateManager): Unit = lock synchronized {


  val messageFormatVersion = config.messageFormatVersion.recordVersion.value

  val segments = logSegments

  val offsetsToSnapshot =

    if (segments.nonEmpty) {

      val nextLatestSegmentBaseOffset = lowerSegment(segments.last.baseOffset).map(_.baseOffset)

      Seq(nextLatestSegmentBaseOffset, Some(segments.last.baseOffset), Some(lastOffset))

    else {



  info(s"Loading producer state till offset $lastOffset with message format version $messageFormatVersion")

  // We want to avoid unnecessary scanning of the log to build the producer state when the broker is being

  // upgraded. The basic idea is to use the absence of producer snapshot files to detect the upgrade case,

  // but we have to be careful not to assume too much in the presence of broker failures. The two most common

  // upgrade cases in which we expect to find no snapshots are the following:


  // 1. The broker has been upgraded, but the topic is still on the old message format.

  // 2. The broker has been upgraded, the topic is on the new message format, and we had a clean shutdown.


  // If we hit either of these cases, we skip producer state loading and write a new snapshot at the log end

  // offset (see below). The next time the log is reloaded, we will load producer state using this snapshot

  // (or later snapshots). Otherwise, if there is no snapshot file, then we have to rebuild producer state

  // from the first segment.









  if (messageFormatVersion < RecordBatch.MAGIC_VALUE_V2 ||

      (producerStateManager.latestSnapshotOffset.isEmpty && reloadFromCleanShutdown)) {

    // To avoid an expensive scan through all of the segments, we take empty snapshots from the start of the

    // last two segments and the last offset. This should avoid the full scan in the case that the log needs

    // truncation.

    offsetsToSnapshot.flatten.foreach { offset =>




  else {

    val isEmptyBeforeTruncation = producerStateManager.isEmpty && producerStateManager.mapEndOffset >= lastOffset

    producerStateManager.truncateAndReload(logStartOffset, lastOffset, time.milliseconds())

    // Only do the potentially expensive reloading if the last snapshot offset is lower than the log end

    // offset (which would be the case on first startup) and there were active producers prior to truncation

    // (which could be the case if truncating after initial loading). If there weren't, then truncating

    // shouldn't change that fact (although it could cause a producerId to expire earlier than expected),

    // and we can skip the loading. This is an optimization for users which are not yet using

    // idempotent/transactional features yet.

    if (lastOffset > producerStateManager.mapEndOffset && !isEmptyBeforeTruncation) {

      val segmentOfLastOffset = floorLogSegment(lastOffset)

      logSegments(producerStateManager.mapEndOffset, lastOffset).foreach { segment =>

        val startOffset = Utils.max(segment.baseOffset, producerStateManager.mapEndOffset, logStartOffset)


        if (offsetsToSnapshot.contains(Some(segment.baseOffset)))


        val maxPosition = if (segmentOfLastOffset.contains(segment)) {




        else {



        val fetchDataInfo = segment.read(startOffset,

          maxSize = Int.MaxValue,

          maxPosition = maxPosition,

          minOneMessage = false)

        if (fetchDataInfo != null)

          loadProducersFromLog(producerStateManager, fetchDataInfo.records)













什么是 terminationGracePeriodSeconds?
5.同时 K8S 会给老POD发送SIGTERM信号,并且等待 terminationGracePeriodSeconds 这么长的时间。(默认为30秒)
6.超过terminationGracePeriodSeconds等待时间后, K8S 会强制结束老POD
看到这里,我想大家应该明白了,terminationGracePeriodSeconds 就是K8S给你程序留的最后的缓冲时间,来处理关闭之前的操作。

总结:宽限时间(terminationGracePeriodSeconds )可以定义优雅关闭的宽限期,即在收到停止请求后,


apiVersion: v1

kind: Pod


  name: busybox



    - name: busybox

      image: busybox:stable

      command: ["/bin/sh""-c""sleep 3600"]

  terminationGracePeriodSeconds: 5



K8S 会给老POD发送SIGTERM信号,并且等待 terminationGracePeriodSeconds 这么长的时间(默认为30秒)。也就是说如果kafka程序在宽限时间以内结束了,还没有达到30秒的时候,kafka退出了,pod也是能够自动退出的。terminationGracePeriodSeconds 只是保证pod强制退出,减少僵尸pod的保障,那么我们只要将terminationGracePeriodSeconds 设置成为我们能够接受的最大值即可


