最近查看Kafka文档, 发现 Kafka 有个 Log Compaction 功能是我们之前没有留意到的, 但是有着很高的潜在实用价值.

什么是Log Compaction

Kafka 中的每一条数据都有一对 Key 和 Value, 数据存放在磁盘上, 一般不会被永久保留, 而是在到达一定的量或者时间后对最早写入的数据进行删除. Log Compaction 在默认的删除规则之外提供了另一种删除过时数据(或者说保留有价值的数据)的方式, 就是对于有相同 Key 的不同数据, 只保留最后一条, 前面的数据在合适的情况下删除.

Log Compaction 的应用场景

Log Compaction 特性, 就实时计算而言, 可以在灾难恢复方面有很好地应用场景. 比如说我们在 Storm 里做计算时, 需要长期在内存里维护一些数据, 这些数据可能是通过聚合了一天或者一周的日志得到的, 这些数据一旦由于偶然的原因(磁盘,网络等)崩溃了, 从头开始计算需要漫长的时间.一个可行的应对方法是定时将内存里的数据备份到外部存储中, 比如 Redis 或者 Mysql 等, 当崩溃发生的时候再从外部存储读回来继续计算.

使用 Log Compaction 来代替这些外部存储有以下好处.

  1. Kafka 既是数据源又是存储工具, 可以简化技术栈, 降低维护成本.

  2. 使用 Mysql 或者 Redis 作为外部存储的话, 需要将存储的 Key 记录下来, 恢复时再用这些 Key 将数据取回, 实现起来有一定的工程复杂度. 用Log Compaction 特性的话只要把数据一股脑儿地写进 Kafka, 等灾难恢复的时候再读回内存就行了.

  3. Kafka 针对磁盘读写都有很高的顺序性, 相对于 Mysql 没有索引查询等工作量的负担, 可以实现高性能, 相对于 Redis 而言, 它可以充分利用廉价的磁盘而对内存要求很低, 在接近的性能下能实现非常高的性价比(仅仅针对灾难恢复这个场景而言).

实现方式的简要介绍

当 topic 的 cleanup.policy (默认为delete) 设置为 compact 时, Kafka 的后台线程会定时把 topic 遍历两次, 第一次把每个 key 的哈希值最后一次出现的 offset 都存下来, 第二次检查每个 offset 对应的 key 是否在更后面的日志中出现过,如果出现了就删除对应的日志.

源码解析

Log Compaction 的大部分功能由CleanerThread完成, 核心逻辑在 Cleaner 的 clean方法

/*** Clean the given log** @param cleanable The log to be cleaned** @return The first offset not cleaned and the statistics for this round of cleaning*/private[log] def clean(cleanable: LogToClean): (Long, CleanerStats) = {val stats = new CleanerStats()info("Beginning cleaning of log %s.".format(cleanable.log.name))val log = cleanable.log// build the offset mapinfo("Building offset map for %s...".format(cleanable.log.name))val upperBoundOffset = cleanable.firstUncleanableOffsetbuildOffsetMap(log, cleanable.firstDirtyOffset, upperBoundOffset, offsetMap, stats)// <----- 这里第一次遍历所有offset将key索引val endOffset = offsetMap.latestOffset + 1stats.indexDone()// figure out the timestamp below which it is safe to remove delete tombstones// this position is defined to be a configurable time beneath the last modified time of the last clean segmentval deleteHorizonMs =log.logSegments(0, cleanable.firstDirtyOffset).lastOption match {case None => 0Lcase Some(seg) => seg.lastModified - log.config.deleteRetentionMs}// determine the timestamp up to which the log will be cleaned// this is the lower of the last active segment and the compaction lagval cleanableHorizonMs = log.logSegments(0, cleanable.firstUncleanableOffset).lastOption.map(_.lastModified).getOrElse(0L)// group the segments and clean the groupsinfo("Cleaning log %s (cleaning prior to %s, discarding tombstones prior to %s)...".format(log.name, new Date(cleanableHorizonMs), new Date(deleteHorizonMs)))for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize))cleanSegments(log, group, offsetMap, deleteHorizonMs, stats)// <-- 这里第二次遍历所有offset,删除冗余的日志,并且将多个小的segment合并为一个// record buffer utilizationstats.bufferUtilization = offsetMap.utilizationstats.allDone()(endOffset, stats)}

log compaction 通过两次遍历所有数据来实现, 两次遍历之间交流的媒介就是一个
OffsetMap, 下面是OffsetMap的签名

trait OffsetMap {def slots: Intdef put(key: ByteBuffer, offset: Long)def get(key: ByteBuffer): Longdef clear()def size: Intdef utilization: Double = size.toDouble / slotsdef latestOffset: Long
}

这基本就是个普通的mutable map, 在 Kafka 项目中,它的实现只有一个, 叫做SkimpyOffsetMap

put方法

put 方法会为每个 key 生成一份摘要,默认使用 md5 方法生成一个 16byte 的摘要, 根据这个摘要在 bytes 中哈希的到一个下标, 如果这个下标已经被别的摘要占据, 则线性查找到下个空余的下标为止, 然后在对应位置插入该 key 对应的 offset

/*** Associate this offset to the given key.* @param key The key* @param offset The offset*/
override def put(key: ByteBuffer, offset: Long) {require(entries < slots, "Attempt to add a new entry to a full offset map.")lookups += 1hashInto(key, hash1)// probe until we find the first empty slotvar attempt = 0var pos = positionOf(hash1, attempt)  while(!isEmpty(pos)) {bytes.position(pos)bytes.get(hash2)if(Arrays.equals(hash1, hash2)) {// we found an existing entry, overwrite it and return (size does not change)bytes.putLong(offset)lastOffset = offsetreturn}attempt += 1pos = positionOf(hash1, attempt)}// found an empty slot, update it--size grows by 1bytes.position(pos)bytes.put(hash1)bytes.putLong(offset)lastOffset = offsetentries += 1
}

get方法

get 方法使用和 put 同样的摘要算法获得 key 的摘要, 通过摘要获得 offset 的存储位置

/*** Get the offset associated with this key.* @param key The key* @return The offset associated with this key or -1 if the key is not found*/override def get(key: ByteBuffer): Long = {lookups += 1hashInto(key, hash1)// search for the hash of this key by repeated probing until we find the hash we are looking for or we find an empty slotvar attempt = 0var pos = 0//we need to guard against attempt integer overflow if the map is full//limit attempt to number of slots once positionOf(..) enters linear search modeval maxAttempts = slots + hashSize - 4do {if(attempt >= maxAttempts)return -1Lpos = positionOf(hash1, attempt)bytes.position(pos)if(isEmpty(pos))return -1Lbytes.get(hash2)attempt += 1} while(!Arrays.equals(hash1, hash2))bytes.getLong()}

可能的空间问题 性能问题 冲突问题

空间问题

默认情况下, Kafka 用 16 个 byte 存放key的摘要, 用 8 个 byte 存放摘要对应的 offset, 1GB 的空间可以保存 1024* 1024*1024 / 24 = 44,739,242.666... 个 key 对应的数据.

性能问题

这个 log compaction 的原理挺简单, 就是定期把所有日志读两遍,写一遍, cpu 的速度超过磁盘完全不是问题, 只要日志的量对应的读两遍写一遍的时间在可接受的范围内, 它的性能就是可以接受的.

冲突问题

现在的 OffsetMap 唯一的实现名字叫做 SkimpyOffsetMap, 相信你们已经从这个名字里看出端倪, 最初的作者本身也认为这样的实现不够严谨. 这个算法在两个 key 的 md5 值相同的情况下就判断 key 是相同的, 如果遇到了 key 不同而 md5 值相同的情况, 那两个 key 中其中一个的消息就丢失了. 虽然 md5 值相同的概率很低, 但如果真的碰上了, 那就是100%, 概率值再低也没用, 而且从网上的反映看似乎冲突还不少见.

我个人目前想到的处理方案是, 大部分的 key 总长度并不算长, 可以把这类 key 所有可能的情况都md5一遍看一下是否有冲突, 如果没有的话就放心用.

Kafka Log Compaction 解析相关推荐

  1. 【kafka原理】kafka Log存储解析以及索引机制

    本文设置到的配置项有 名称 描述 类型 默认 num.partitions topic的默认分区数 int 1 log.dirs 保存日志数据的目录.如果未设置,则使用log.dir中的值 strin ...

  2. Kafka日志清理之Log Compaction

    欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 欢迎跳转到本文的原文链接:https: ...

  3. kafka redis vs 发布订阅_发布订阅的消息系统 Kafka的深度解析

    背景介绍 Kafka简介 Kafka是一种分布式的,基于发布/订阅的消息系统.主要设计目标如下: 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能 高吞吐 ...

  4. 4.2.4 Kafka高级特性解析(物理存储、稳定性:事物,控制器,可靠性,一致性,_consumer_offsets、延时队列、自定义重试队列)

    Kafka高级特性解析 文章目录 Kafka高级特性解析 2.5 物理存储 2.5.1 日志存储概述 2.5.2 日志存储 2.5.2.1 索引 2.5.2.1.1 偏移量 2.5.2.1.2 时间戳 ...

  5. 发布订阅的消息系统 Kafka的深度解析

    发布&订阅的消息系统 Kafka的深度解析 2015-01-27 10:25 Jason Guo Jason Guo的博客 字号: T | T 一个典型的kafka集群中包含若干produce ...

  6. 图解kafka - 设计原理解析

    什么是消息队列? 简单来说,消息队列是存放消息的容器.客户端可以将消息发送到消息服务器,也可以从消息服务器获取消息. 问题导读: ********* 为什么需要消息系统? kafka架构? kafka ...

  7. 【kafka】Kafka 源码解析:Group 协调管理机制

    1.概述 转载:Kafka 源码解析:Group 协调管理机制 在 Kafka 的设计中,消费者一般都有一个 group 的概念(当然,也存在不属于任何 group 的消费者),将多个消费者组织成一个 ...

  8. Kafka设计解析(五): Kafka Consumer设计解析

    Kafka设计解析(五)- Kafka Consumer设计解析 大数据架构(郭俊_Jason) · 2015-09-18 08:24 点击上方 大数据架构   快速关注 Kafka Consumer ...

  9. 【Kafka】Error when freeing index buffer (kafka.log.OffsetIndex) NullPointException

    1.美图 2.背景 今天windows启动kafka的时候,突然报错 2015-07-14 17:00:45,197] WARN Error when freeing index buffer (ka ...

最新文章

  1. PU-Net:一种基于数据的3D点云上采样网络
  2. ERP与EWM集成配置-ERP端组织架构(二)
  3. vue-cli项目打包多个与static文件同级的静态资源目录(copy-webpack-plugin插件的使用)...
  4. 【BZOJ1492】[NOI2007]货币兑换Cash 斜率优化+cdq分治
  5. 【新星计划】Python OpenCV 形态学应用—图像开运算与闭运算
  6. 在 Asp.Net Core 中使用 worker services
  7. 让机器有温度:带你了解文本情感分析的两种模型
  8. web前端开发面临挑战有哪些?
  9. 给定一个数值,计算最合适的行列数量的代码
  10. xml配置service服务器文件路径,xml配置service服务器文件路径
  11. 网盘共享文件有病毒吗?
  12. CSS3 animation 动画用法介绍
  13. JAVA解析IP地址
  14. 大数据时代:数据收集比数据挖掘更有意义
  15. 规模决定利润 网吧规模扩充升级参考方案(转)
  16. 机器学习算法之聚类算法拓展:Mini Batch K-Means算法
  17. 大脑简史(3)-大脑的结构
  18. 干货学起来!分享4个简单实用的Word技巧,请低调收藏!
  19. 计算机网络vlan的作用,计算机网络之九:VLAN
  20. PTA 找单词 (15分)(bfs)

热门文章

  1. python之父叫什么-Python之父谈Python的未来形式
  2. python软件怎么用-用Python如何打出你的第一个程序
  3. python与excel的区别-Python比较两个excel文档内容的异同
  4. 无意间看到Pure-Mvc记录下
  5. 鸿蒙系统的功能如何,华为鸿蒙系统发布会,这个功能怎么那么像小米MIUI的
  6. java I/O思维导图
  7. UVa1112 - Mice and Maze(Dijkstra和Floyd_warshall)
  8. SQL Server 数据库表的统计信息的更新
  9. C# 4.0中的协变和逆变(一)
  10. CentOS7和其他版本的虚拟机,防火墙命令等各种相关笔记