2019独角兽企业重金招聘Python工程师标准>>>

这个格式好点:https://github.com/haogrgr/haogrgr-test/blob/master/logs/kafka_source.txt

5.日志管理器启动 logManager.startup(){
    主要是启动一些定时任务:
    
    a)LogManager.cleanupLogs, 根据配置清理日志文件(cleanupExpiredSegments[根据时间], cleanupSegmentsToMaintainSize[根据大小]).
    Log.deleteOldSegments {
        按时间和按大小逻辑类似, 按时间则是根据每个Segment的最后修改时间判断; 按大小, 则是依次累加每个LogSegment的大小, 当累计大小大于配置大小后的所有Segment都删除.
        注意, 这里不删除当前活动的Segment
         获取到要删除的Segment后, 依次调用kafka.log.Log.deleteSegment方法{
            首先将Segment从Log.segments中删除, 
            然后重命名Segment的index和log文件为.deleted后缀
            然后创建异步任务, 异步的删除Segment文件
        }
    }
    
    b)LogManager.flushDirtyLogs, 根据配置定时刷盘, 刷盘后, 会更新恢复点(log.recoveryPoint).
    LogManager.flushDirtyLogs{
        遍历logs
        val timeSinceLastFlush = time.milliseconds - log.lastFlushTime
        if(timeSinceLastFlush >= log.config.flushMs){
            log.flush
        }
    }
    Log.flush{
        通过treemap找到最新offset和上一次recoveryPoint(恢复点)的Segment, 依次调用Segment.flush方法, 内部调用index和logFile的force方法刷盘
        然后记录最新的recoveryPoint(恢复点), 更新lastflushedTime
    }
    
    c)LogManager.checkpointRecoveryPointOffsets, 根据配置, 将每个Log.recoveryPoint刷盘, 创建checkpointRecoveryPointOffset文件, 用途前面2.4.1有说.
    
    然后, 启动cleaner线程
    cleaner.startup() {
        大概作用就是, 清理日志, 具体方法是, 对同一个key的消息, 保留offset最大的消息体, 其他的丢弃, 对于空消息体, 则表示删除.
        可以看到, 适合某些场景, 比如说用kafka来同步某个属性的值, 每变动一次, 就发一次消息, 消费者更新为最新值, 这种情况, 新的客户端对于某一key只需要获取最新的一条消息的值就行了.
        有点类似于redis的aof rewrite.
        然后有时候清理后, 日志文件变得很小, 就需要合并多个日志文件.
        
        因为清理过程需要大量的IO操作, 所以通过Throttler类来对限速, 防止对正常的写日志产生影响
        因为清理是一个长时间的过程, 且是多线程的, 所以需要有个地方标记这个分区正在清理, 所以就有了LogCleanerManager, 来存放状态信息
        
        内容比较多, 具体见5.1
    }
    
}

5.1.Cleaner线程 {
    
    根据配置, 创建LogCleaner, 如果cleanerConfig.enableCleaner == true
    new LogCleaner(cleanerConfig, logDirs, logs, time = time) {
        创建清理管理器, 主要负责维护分区在clean过程中的状态信息
        private[log] val cleanerManager = new LogCleanerManager(logDirs, logs);
        
        限流器, 防止clean占用的IO可控, 线程安全, 多个清理线程共用一个.
        private val throttler = new Throttler(desiredRatePerSec = config.maxIoBytesPerSecond, checkIntervalMs = 300, throttleDown = true, "cleaner-io", "bytes", time = time)
        
        //清理线程
        val cleaners = (0 until config.numThreads).map(new CleanerThread(_))
    }
    
    先看和业务关系不大的Throttler{
        累加计数器, 然后判断时间是不是该检查速度了, 
        如果要检查速度, 就算出当前的速度, 和配置的数度对比, 看看是不是需要调整
        如果需要调整, 具体算法思路为下:
        具体问题也抽象为这样的问题, 两个人以不同的速度走了一秒钟, 求, 一秒钟后, 快的人停多久, 才能等到慢的人.
        具体解法就是, 根据速度和时间, 算出距离差距, 然后通过   (慢人的速度 / 距离差距   = 慢的人要赶的时间  = 快的人要等的时间)
        
        这里kafka的实现类似, 不过省略了中间过程, 所以比较难看懂, 具体可以看成下面代码
        if(needAdjustment) {
            val desiredRateMs = desiredRatePerSec / Time.MsPerSec.toDouble //限制的速度(慢人速度)
            val elapsedMs = elapsedNs.toDouble / Time.NsPerMs              //已经跑了多久时间
            val diffBytes = observedSoFar - (desiredRateMs * elapsedMs)    //快人跑的距离   - (慢人速度  * 已经跑了多久时间  = 慢人跑的距离) = 差距
            val newSleepTime = round(diffBytes / desiredRateMs)            //差距   / 慢人速度  = 要等待的时间
            if(newSleepTime > 0) {
                time.sleep(newSleepTime)
            }
        }
        把中间过程化简后, 就是kafka的写法
    }
    
    在来看看维护清理状态的 new LogCleanerManager(logDirs, logs) {
        用来保存清理进度快照, 保存的是每个分区上次清理到的位置(firstDirtyOffset), firstDirtyOffset之前的表示已经清理过了, 每次清理完成会更新快照文件.
        写:当clean完成的时候, 会调用LogCleanerManager.doneCleaning来更新分区最新的offset快照
        读:Clean线程中, 会不断通过LogCleanerManager.grabFilthiestLog()来获取脏日志, 而判断脏日志, 是通过获取到快照中记录的firstDirtyOffset, 从而计算出脏日志大小, 来判断是否需要清理.
        清理需要策略, 如果不记录清理进度, 则每次都做全部的扫描, 低效, 所以通过保存clean进度快照来记录清理进度, 减少不必要的清理.
        val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, offsetCheckpointFile)))).toMap
        
        一个Map, 用来记录分区正在进行中的clean状态信息. 有三个状态(LogCleaningInProgress, LogCleaningAborted, LogCleaningPaused)
        当判断到分区脏日志大于阀值时, 会进入LogCleaningInProgress状态, 开始清理, 当清理完成时, 更新快照, 讲分区从inProgress中移除.
        LogCleaningInProgress => LogCleaningAborted : 日志截取(truncate)时, 需要等待Clean完成, 且开始日志截取后, 就不能进行Clean, 需要等待日志截取完成, 才能继续Clean, 这个是通过LogCleaningAborted状态来实现的
        过程是这样的, 当日志截取时, 如果正在进行Clean, 则LogCleaningInProgress => LogCleaningAborted, 然后等待状态到LogCleaningPaused, 当Clean完成时, 会进行LogCleaningAborted => LogCleaningPaused
        这样就实现了截取日志时, Clean已经完成, 且分区被标记为LogCleaningPaused状态, 这样Clean线程后续也会忽略这个分区, 完成日志截取后, 通过CleanerManager.resumeCleaning方法, 将分区状态信息从Map中移除, 等待下次clean
        同样, 删除Log时, 也要做类似操作, 这时, 先将Log实例从Log池中移除, 然后保证Clean已经完成, 因为Log已经不再池中了, 所以下次Clean也会再Clean了, 所以这里用的是Cleaner.abortCleaning, 里面组合了abortAndPauseCleaning, resumeCleaning
        val inProgress = mutable.HashMap[TopicAndPartition, LogCleaningState]()
        
        好了, LogCleanerManager基本功能清楚了, 接下来回到LogCleaner
    }
    
    最好看看Clean线程 CleanerThread extends ShutdownableThread {
        ShutdownableThread里面的run方法里循环调用子类的doWork方法, 所以具体逻辑我们看doWork方法
        
        应该是具体Clean操作逻辑的类
        val cleaner = new Cleaner() {
            清理过程中的状态信息, 如开始时间, 读取字节数等信息, 一个用来表是当前的状态, 一个用来表示上一次完成的状态
            val statsUnderlying = (new CleanerStats(time), new CleanerStats(time))
            
            用来合并同Key消息的Map, 主要是索引用, 先读一遍脏日志, 建立Map
            offsetMap = new SkimpyOffsetMap(memory = math.min(config.dedupeBufferSize / config.numThreads, Int.MaxValue).toInt, hashAlgorithm = config.hashAlgorithm){
                既然是hash, 就先看hash算法吧
                private def positionOf(hash: Array[Byte], attempt: Int): Int = {
                    具体就是hash之后的值是一个byte数组, 当冲突次数小于(hashSize - 4)时, 取Int(hash_byte[attempt, attempt + 3]), 
                    当冲突次数高于(hashSize - 4)时, 取Int(hash_byte[12, hashSize]) + (attempt - hashSize + 4)
                    即先求hash, 然后冲突时, 使用简单的策略来继续新的hash值, 来计算新的位置
                }
                内部维护一个ByteBuffer, 不存储原始Key, 只存key的hash值, 碰撞后, 继续新的位置, 读取时类似, 读取, 比对, hash不相对, 计算新的hash, 再读取比对
                碰撞挺高的, 越到后面, 插入效率越低, 碰撞高了, 读取效率也较低, 不支持删除(1000元素的容量, 插满后, 碰撞率为19左右)
            }
            
        }
        
        doWork逻辑 cleanOrSleep {
            调用cleanerManager.grabFilthiestLog(), 扫描所有Log获取要清理的分区, 具体条件为: 1)Log配置开启compact, 2)没有处于inProgress中, 3)脏日志大于minCleanableRatio比例, 4)取最脏的那个.
            grabFilthiestLog返回LogToClean对象, 里面包含脏日志比例, 字节数, firstDirtyOffset等信息
            
            清理逻辑 cleaner.clean(cleanable) {
                清理Clean过程中的统计信息stats.clear(), 准备开始新的Clean过程
                
                先构建出Key索引Map, 清理范围是[firstDirtyOffset, activeSegment.baseOffset], 即清理除活动segment外的段
                val upperBoundOffset = log.activeSegment.baseOffset
                val endOffset = buildOffsetMap(log, cleanable.firstDirtyOffset, upperBoundOffset, offsetMap) + 1 {
                    map.clear(), 清空offsetMap, 准备建索引
                    根据offsetMap容量和负载因子, 计算出最多能清理到的offset(minStopOffset)        
                    然后遍历segment, 读取文件(有限流), 构建map, buildOffsetMapForSegment(){
                        不细说, 具体就是读取文件内容到buffer, 然后通过ByteBufferMessageSet来遍历消息, 建立map, 如果buffer不够读一条消息的, 就增大, 继续读
                        返回索引的最后一条消息的offset, 处理完后, 会恢复buffer
                    }
                    返回索引的最后一条消息的offset, 加1之后, 就是真实的清理上界offset(不含自身)
                }
                
                val deleteHorizonMs = 计算一个时间点, 用来判断消息是否需要忽略(删除), 即压缩的过程同时做清理的工作, 根据配置, 获取已清理过的最大offset的文件的修改时间, 没有文件则取0
                
                然后将offset[0, endOffset]的segment拿出来处理(即0到offsetMap中已索引的最大的offset), 根据大小进行group操作, 因为以前可能进行过clean操作, 导致segment可能变小, 这里group成配置指定的大小, 再处理.
                group后的结构是一个List<List<Segment>>, 内层的List是要合并的segement
                
                遍历group, 对里面的每个组做clean操作 cleanSegments(log, group, offsetMap, deleteHorizonMs){
                    接下来就是读老文件, 合并同key的offset, 在写到临时文件, 最后重命名文件, 删除老文件, 换成新文件, 具体4.1有提到
                    
                    先创建log和index的临时文件(.cleaned), 如果存在, 则表示上次处理到一半的文件, 直接删除, 处理过的消息, 先写到.clean文件中, 防止写到一半挂了, 保证一致完整性.
                    同时创建对应的OffsetIndex和LogSegment对象
                    
                    然后对每个segment进行处理 cleanInto(log.topicAndPartition, old, cleaned, map, retainDeletes){
                        处理分区前, 先检查分区是不是处于LogCleaningAborted状态(truncate时会进入这个状态, 具体看5.1), 是就抛异常结束这里处理
                        
                        然后清理readbuffer, writebuffer, 然后借助ByteBufferMessageSet对象来读取消息, 中间会判断readbuffer是不是太小(1条msg都读不到), 太小就扩容和前面buildOffsetMap类似,
                        先吧消息读到readbuffer里面(即一次读多条, 不够一条消息就需要扩容), 再对于每条消息, 获取到key和offset, 然后根据前面构建的OffsetMap来判断这条消息是不是留(offset要大于或等于OffsetMap里面的(map里面是最大的offset)), 
                        也会判断这个文件是不是达到了删除的条件, 要删的文件, 则消息不用处理, 最后, 如果消息体为空, 表示要忽略这条消息
                        
                        将消息写到writebuffer里面去, 因为readbuffer和writebuffer一样大, 且扩容的时候一起扩容的, 所以不怕writebuffer大小不够, 然后将writebuffer写到segment中去
                        
                        最后还原readbuff, writebuffer, clean过程中, 也会有类似于进度收集, IO限速等控制
                    }
                    
                    好了, 一组segment已经clean成一个了, 接着trim一下index文件到真实的大小, 然后flush日志和索引到文件, 接着恢复一下文件的lastModified, 因为不恢复会导致本来达到删除时间的文件继续存在
                    
                    最后, 交换一下, 用新的segment替换老的segment, log.replaceSegments(cleaned, segments){
                        具体4.1有提到, 先将新的segment文件的后缀从.cleaned重命名到.swap文件, 重命名成功后, 如果进程挂了, 启动时loadSegments会完成后续的操作
                        如果刚重命名为.swap, 老的log还没有删除掉, 则下次启动的时候, loadSegments中重命名会失败, 导致启动失败, 可以手动删除老的log文件再启动
                        
                        将新的segment放入到log.segments中, 将老的一个个移除(除了刚才put的, put新的seg相当于删除了这个老的seg, 所以这里不移除了), 
                        然后将老的文件重命名为.deleted文件, 异步线程中删除老的segment文件 , 最后, 将新的segment重命名, 去掉swap后缀, 完成clean
                    }
                    
                }cleanSegments
                
            } cleaner.clean
            
            日志记录  recordStats, 记录上次Clean状态this.lastStats = stats, 顺便交换Clean状态类(Cleaner.statsUnderlying.swap)
            好了, clean操作总数是结束了
            
        }cleanOrSleep
                
    }CleanerThread
    
}

6.LogManager总结{
    首先, kafka对于一个topic, 会分为多个partition, 一个partition一个文件夹, 分区下面又分为多个segment, segment中又分为log和index, .log文件是最终的消息存放文件
    
    log文件负责消息的读写, index负责index的读写, segment聚合index和log, 提供一个统一的读写接口, 屏蔽索引等相关的操作细节, 
    而Log聚合segment, 维护lastOffset, 统一flush操作, 屏蔽掉文件滚动操作的细节, 提供一个更加上层的接口, 屏蔽底层文件的读写.
    
    LogManager则负责提供快照, 日志Clean, 相关定时任务管理等功能, 外部通过LogManager来获取Log对象读写消息
}

转载于:https://my.oschina.net/haogrgr/blog/741574

Kafka代码走读-LogCleaner相关推荐

  1. java代码走读,Kafka代码走读-LogManager

    https://github.com/haogrgr/haogrgr-test/blob/master/logs/kafka_source.txt 源码阅读(0.8.2.2): (一)概览 1.调用k ...

  2. AM5728-Uboot SPL 代码走读

    1 执行流程 am5728的cpu上电后,执行流程:ROM->MLO(SPL)->u-boot.img 第一级bootloader:引导加载程序,板子上电后会自动执行这些代码,如启动方式( ...

  3. rtsp流媒体播放器----ffmpeg相关代码走读(一)

    rtsp流媒体播放器--ffmpeg相关代码走读(一) 本文介绍avformat_open_input和init_input两个函数 直接贴代码如下 avformat_open_input 函数 in ...

  4. TI单芯片毫米波雷达代码走读(二十五)—— 角度维(3D)处理流程

    专栏目录链接: TI单芯片毫米波雷达1642代码走读(〇)--总纲 我们从今天开始进入近的篇章--3D处理 毫米波雷达信号处理的3个维度: 第一维度:距离 第二维度:多普勒 第三维度:角度 第二维度知 ...

  5. TI单芯片毫米波雷达1642代码走读(〇)——总纲

    前言 近年来,自动驾驶行业发展如火如荼,雷达技术也逐渐从军工封闭圈走向了开放的市场. 毫米波雷达具有全天候探测能力,特别是在雨雪雾天气以及夜间都能可靠工作,并且探测距离相对其他车载传感器非常远,对运动 ...

  6. keras Mask Rcnn代码走读(九)-detect方法介绍

    keras Mask Rcnn代码走读(八)-detect方法介绍,主要用于图片实体分割的推断时调用的. 一,首先对图像进行处理,调用self.mold_inputs()函数,把原图等比例resize ...

  7. 代码走读测试案例分享

    现网使用zookeeper进行任务ID的保存,以便失败时进行恢复:zookeeper本身的客户端不支持级联删除节点,必须先逐个删除目录下的节点,再删除目录(简单理解是这样).为了维护zookeeper ...

  8. webrtc代码走读四(rtp包格式)

    一.rtp包头格式 V:版本号,必须为2 P:填充位 X:扩展位,标志是否存在扩展 CC:CSRC个数 M:Mark标志位,标志是否是一帧的结束,音频帧几乎每个包都是一个完整的帧,所以M一般情况都等于 ...

  9. 发送调节器 PacedSender代码走读

    一.简介 1.1.PacedSender(步长发送器) 无线网络最害怕的一个是干扰,一个是突然的大数据量冲击.视频编码后分关键帧I帧和非关键帧P帧,I帧一般是P帧的几十倍大小,比如一个I帧200k,一 ...

最新文章

  1. SAP IDoc Post不成功,报错 - Conventional invoice verification no longer maintained as of Release 4.6-
  2. Java 面试题基础概念收集(高级)
  3. 【Accelerated C++】重点回顾(续)
  4. 写了 30 万行基础设施代码后,我们得出 5 个有用的经验
  5. 单播,组播,广播的区别
  6. 1.cocos2dx记忆卡片游戏代码、并将游戏移植到“华为荣耀”手机上、移植中的问题总结
  7. Python连接MySQL的实例代码
  8. js 判断数组的4种方法
  9. 桌面计算机最小化,电脑不显示最小化窗口怎么办win7
  10. mac的几个垃圾文件目录
  11. 慢慢的,就没有了,就像从未存在过
  12. 在 JavaScript 中如何克隆对象?
  13. 【BZOJ1921】【CTSC2010】珠宝商(点分治,后缀自动机)
  14. btrace调试利器
  15. (翻译)Apache Kafka 官方文档:开始
  16. 计算机房间墙壁的布置,房间小怎么布置 小房间布置有妙招
  17. 分享150个PHP源码,总有一款适合您
  18. [附源码]计算机毕业设计Python+uniapp基于Android 工厂考勤系统4h291(程序+源码+LW+远程部署)
  19. 一毕业就失业?应届生错过秋招有多难受
  20. 中小企业信息化——ERP/CRM

热门文章

  1. 挑选适合自己的公司——网络工程师你是否真的已经倒下(二)
  2. Python制作当年第一款手机游戏-贪吃蛇游戏(练习)
  3. 最强python技术前沿!什么是python怎么学?通过这篇文字就知道了
  4. 卫星的性能和服务器比较,卫星通信的常用频段的详细对比
  5. 2018湖北计算机准考证打印,2018年3月湖北计算机等级考试准考证打印入口
  6. java将0到9随机输出_生成0到9之间的随机整数
  7. 基于单片机的超市储物柜设计_657【毕设课设】基于单片机智能存柜储物柜存储柜系统设计...
  8. git 修改仓库的描述_git简介、基本命令和仓库操作
  9. CSS初识(三):CSS字体文本相关属性
  10. mysql截取不含%_zp blog