文章目录

  • 1. kafka日志清理策略概述
  • 2. kafka segment
    • 2.1 segmnet 的作用
    • 2.2 segment生成相关的配置
  • 3. 日志清理delete策略
    • 3.1 delete 相关配置
    • 3.2 简单总结
  • 4. 日志清理compact策略
    • 4.1 日志compact的使用场景
    • 4.2 compact的工作模式
    • 4.3 tombstone 消息
    • 4.4 低流量topic的注意事项
    • 4.5 简单总结compact的配置
  • 5. kafka创建修改topic的命令
    • 5.1. 创建topic的时候指定配置
    • 5.2 topic创建时忘了设置,修改方式
    • 5.3 查看你的配置

1. kafka日志清理策略概述

kafka log的清理策略有两种:delete,compact,默认是delete
这个对应了kafka中每个topic对于record的管理模式

  1. delete:一般是使用按照时间保留的策略,当不活跃的segment的时间戳是大于设置的时间的时候,当前segment就会被删除
  2. compact: 日志不会被删除,会被去重清理,这种模式要求每个record都必须有key,然后kafka会按照一定的时机清理segment中的key,对于同一个key只保留罪行的那个key.同样的,compact也只针对不活跃的segment
    配置为
cleanup.policy: delete
cleanup.policy: compact

如果没有特殊说明,本文中的配置均为kafka-1.1, 且为topic级别是设置

2. kafka segment

在学习日志清理策略之前,首先了解一下kafka是如何存储和管理日志的,因为他的管理都是基于segment的,所以有必要先了解清楚这个
segement的产生策略。

2.1 segmnet 的作用

  1. kafka的日志存储和消费,对外的最小粒度是partion,也就是producer和consumer最小的选择粒度是某个topic的某些partition。
  2. 每个partition又多个segment组成,这些segment一般是按照时间顺序产生的。
  3. 在单个partition中只有一个处于active的segment,这个segment是正在写入的segment(假设为segmentA),当segmentA的大小达到一定的程度(或者是经过了一定的时长),就会产生一个新的segmentB,这个时候segmentA就不再有数据写入了,变成了不活跃的segment,而segmentB就是当前Active的segment.
  4. 日志清理的策略总是针对不活跃的segment进行的。

2.2 segment生成相关的配置

  1. segment.bytes: 每个segment的大小,达到这个大小会产生新的segment, 默认是1G
  2. segment.ms: 配置每隔n ms产生一个新的segment,默认是168h,也就是7天

这两个配置是同时起作用的,那个条件先满足都会执行产生新的segment的动作
在这里,我们就需要注意,需要理解这两个配置在日志的不同场景下可能带来的影响,在下面介绍具体的日志清理策略的时候会再回来看这一块儿。

3. 日志清理delete策略

3.1 delete 相关配置

假如对某个topic(假设为user_topic)设置了 cleanup.policy: delete
那么当前topic使用的log删除策略就是 delete,这个策略会周期性的检查partion中的不活跃的segment,根据配置采用两种方式删除一些旧的segment.

retention.bytes: 总的segment的大小限制,达到这个限制后会删除旧的segment,默认值为-1,就是不会删除
retention.ms: segment的最后写入record的时间-当前时间 > retention.ms 的segment会被删除,默认是168h, 7天

一些其他的辅助性配置

log.retention.check.interval.ms: 每隔多久检查一次是否有可以删除的log,默认是300s,5分钟 这个是broker级别的设置
file.delete.delay.ms: 在彻底删除文件前保留的时间,默认为1分钟   这个是broker级别的设置

在delete的日志策略下,我们来讨论一下,假如我们想要日志保留3天
我们可以通过设置

retention.ms: 259200000

假设我们保留其他的配置不便。想想一下这个场景:
日志产生的很慢,3天还没有达到1g,这个时候根据segment的产生策略,还只有一个segment,而且这个segment是活跃的,所以不能被删除,即使到了第4天可能依然不能被删除。这个时候如果我们依然想要优化存储,就可以使用另一个配置来使删除依然能够按时触发,对的,我们可以设置

segment.ms: 43200000 # 12个小时

这样的话,每隔12个小时,只有有新的数据进来,都会产生一个新的segment,这样的话,就可以触发3天的删除策略了。
所以,比较重要的一点是,对于流量比较低的topic,要注意控制 segment.ms< retention.ms

3.2 简单总结

kafka启用delete的清理策略的时候需要注意配置

cleanup.policy: delete
segment.bytes: 每个segment的大小,达到这个大小会产生新的segment, 默认是1G
segment.ms: 配置每隔n ms产生一个新的segment,默认是168h,也就是7天
retention.bytes: 总的segment的大小限制,达到这个限制后会删除旧的segment,默认值为-1,就是不会删除
retention.ms: segment的最后写入record的时间-当前时间 > retention.ms 的segment会被删除,默认是168h, 7天

对于低流量的topic需要关注使用segment.ms 来配合日志的清理

4. 日志清理compact策略

4.1 日志compact的使用场景

日志清理的compact策略,对于那种需要留存一份全量数据的需求比较有用,什么意思呢,比如,

我用flink计算了所有用户的粉丝数,而且每5分钟更新一次,结果都存储到kafka当中。
这个时候kafka相当于是一个数据总线,任何需要用户粉丝数的业务部门都可以从kafka中拿到这个数据。
这个时候如果数据的保存使用delete策略,为了保存所有用户的粉丝数,只能设置不删除,也就是

retention.bytes: -1
retention.ms: Long.MAX #这个值需要自己去设置实际的数值值

这样的话,数据会无限膨胀,而且,很多数据是无意义的,因为业务方从kafka中消费数据的时候,实际上只是想知道用户的当前粉丝数是多少,不关注一个月前这个用户有多少粉丝数,但是这些数据都在kafka中存储,会造成无意义的消费。
kafka提供了一种叫做compact的清理策略,这个策略可以很好的帮助我们应对这种情况。

kafka的compact 策略要求每个record都要有key,kafka是根据key来进行去重合并的。每个key至少保留一个最新的值。

4.2 compact的工作模式

对于每一个kafka partition的日志,以segment为单位,都会被分为两部分,已清理和未清理的部分。同时,未清理的那部分又分为可以清理的和不可清理的。对于可以清理的segment大致是下面的一个清理思路。

同时对于清理过后的segment如果太小,kafka也会有一定的策略去合并这些segemnt,防止segment碎片化。
我们通过配置

cleanup.policy: compact

来开启compact的日志清理策略
配套的配置还有

  1. min.cleanable.dirty.ratio: 可以进行compact的脏数据的比例,dirtyRatio = dirtyBytes / (cleanBytes + dirtyBytes) 其中dirtyBytes表示可清理部分的日志大小,cleanBytes表示已清理部分的日志大小。这个配置也是为了提升清理的性价比设置的,因为清理数据需要对磁盘进行读写,开销并不小,如果你的数据只有很小的重复比例,实际上是没有清理的必要的。这个值默认是0.5 也就是脏了的数据达到了总数据的50%才会清理,一般情况下我如果开启了compact策略,都会将这个值设置为0.1,感觉这样对于想要消费当前topic的业务方更加友好。

  2. min.compaction.lag.ms: 这个设置了在一条消息在被produer发送到kafka当中之后,多久时间以内不会被compact,为了满足有些想要获取一定时间内的历史快照的业务,默认是0,就是不会根据消息投递的时间来决定消息是否应该被compacted

4.3 tombstone 消息

在compact下,还有一类比较特殊的消息,只有key,value值为null的消息,这一类消息如果合并了实际上也是没有意义的,因为没有值,所以kafka在compact的时候会删除value为null的消息,但是并不是在第一次去重的时候立刻删除,而是允许存储的更久一些。有一个特殊的配置来处理。
delete.retention.ms: 这个配置就是专门针对tombstone类型的消息进行设置的。默认为24小时,也就是这个tombstone在当次compact完成后并不会被清理,在下次compact的时候,他的最后修改时间+delete.retention.ms>当前时间,才会被删掉。

这里面还有一点需要注意的是,如果你想测试tombstone的删除功能的话,请注意不要使用console-producer,他并不能产生value为null的record,坑死个人的,还是老老实实的用java-client跑一跑吧。

4.4 低流量topic的注意事项

同样,因为compact针对的是不活跃的segment,所以我们要对低流量的topic特别小心。
在流量较低的情况下。假如我们设置

segment.bytes: 每个segment的大小,达到这个大小会产生新的segment, 默认是1G
segment.ms: 配置每隔n ms产生一个新的segment,默认是168h,也就是7天

这样的话,新的segment没有办法产生,也就无从进行compact了。

4.5 简单总结compact的配置

kafka启用delete的清理策略的时候需要注意配置

cleanup.policy: compact
segment.bytes: 每个segment的大小,达到这个大小会产生新的segment, 默认是1G
segment.ms: 配置每隔n ms产生一个新的segment,默认是168h,也就是7天
retention.bytes: 总的segment的大小限制,达到这个限制后会删除旧的segment,默认值为-1,就是不会删除
retention.ms: segment的最后写入record的时间-当前时间 > retention.ms 的segment会被删除,默认是168h, 7天
min.cleanable.dirty.ratio: 脏数据可以容忍的比例,如果你的机器性能可以,而且数据量较大的话,建议这个值设置更小一些,对consumer更友好
min.compaction.lag.ms: 看业务有需要的话可以设置

对于低流量的topic需要关注使用segment.ms 来配合日志的清理

5. kafka创建修改topic的命令

5.1. 创建topic的时候指定配置

#!/bin/bashZOOKEEPER="127.0.0.1:2181"
BROKER="127.0.0.1:9092"TOPIC="post-count-processed"## 删除操作
../kafka_2.12-2.2.0/bin/kafka-topics.sh --bootstrap-server $BROKER  --delete --topic $TOPIC
sleep 3echo "----------------------"../kafka_2.12-2.2.0/bin/kafka-topics.sh --zookeeper ${ZOOKEEPER} --create --topic ${TOPIC} --partitions 1    --replication-factor 3  --config cleanup.policy=compact --config segment.ms=86400000 --config min.cleanable.dirty.ratio=0.1

5.2 topic创建时忘了设置,修改方式

#!/bin/bashZOOKEEPER="127.0.0.1:2181"
BROKER="127.0.0.1:9092"
TOPIC="post-count-processed"./kafka_2.12-2.2.0/bin/kafka-configs.sh --zookeeper ${ZOOKEEPER} --entity-type topics --entity-name ${TOPIC}    --alter --add-config "delete.retention.ms=86400000,segment.ms=3600000,min.cleanable.dirty.ratio=0.1"

5.3 查看你的配置

#!/bin/bashZOOKEEPER="127.0.0.1:2181"
BROKER="127.0.0.1:9092"
TOPIC="post-count-processed"../kafka_2.12-2.2.0/bin/kafka-configs.sh --zookeeper  ${ZOOKEEPER} --entity-type topics --entity-name ${TOPIC} --describe

参考
http://kafka.apache.org/11/documentation.html#compaction
https://www.linkedin.com/pulse/introduction-topic-log-compaction-apache-kafka-nihit-saxena
https://blog.csdn.net/u013332124/article/details/82793381

kafka日志清理策略,compact和delete相关推荐

  1. 清理offset_关于 kafka 日志清理策略的问题

    现象: 搭建了一个 kafka 服务, 使用 kafka-python 包正常生产数据, 但是 kafka 过五分钟就把我的 topic 删除掉. 但是配置 log 的已经配置了, 我认为 kafka ...

  2. Kafka日志清理之Log Deletion

    欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 欢迎跳转到本文的原文链接:https: ...

  3. Kafka日志清理之Log Compaction

    欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 欢迎跳转到本文的原文链接:https: ...

  4. Kafka日志清除策略

    日志清除策略 基于时间的删除策略 在server.properity文件中设置如下: log.retention.hours=168 //7d log.retention.check.interval ...

  5. kafka历史数据清理策略以及配置

    1.关于Kafka的日志 日志的英语是"log",但Kafka的数据文件也被称为log,所以很多时候会造成一定的歧义.在Kafka中,日志分为两种: 数据日志 操作日志 数据日志是 ...

  6. Kafka日志刷新策略

    Kafka的日志实际上是开始是在缓存中的,然后根据策略定期一批一批写入到日志文件中去,以提高吞吐率. 属性名 含义 默认值 log.flush.interval.messages 消息达到多少条时将数 ...

  7. PG 备份策略及 WAL 日志清理策略

    一.Postgresql 备份策略 备份时间:每天 05:10(根据备份量级可灵活调整) 备份模式:全量备份 备份保存天数:7 天 备份地址:/home/postgres/postgresql_bac ...

  8. 你花了多久弄明白架构设计?kafka日志清理

    大数据.算法项目在任何大厂无论是面试还是工作运用都是非常广泛的,我们精选了50个百度.腾讯.阿里等大厂的大数据.算法落地经验甩给大家,千万不要做收藏党哦,空闲时间记得随时看看! 如果你没有大厂项目经验 ...

  9. kafka 日志相关配置

    日志目录 ${kafka.logs.dir}/server.log :服务器日志 ${kafka.logs.dir}/state-change.log:状态变化日志 ${kafka.logs.dir} ...

最新文章

  1. 基于用户投票的排名算法(三):Stack Overflow
  2. 记一次生产事故-mysql执行update导致锁整表
  3. 水环境模型与大数据技术融合研究
  4. 关于BCT,你需要知道的是...
  5. java 查询sql_Java 中如何使用 SQL 查询文本
  6. We change lives !
  7. 【Foreign】朗格拉日计数 [暴力]
  8. sar命令和vmstat命令详解
  9. kali 安装vmware 14 for linux 出现问题
  10. Java对二维数组排序
  11. JS 获取 URL 地址/参数
  12. Charles抓包安卓端
  13. 阿里矢量图iconfont的两种使用方法
  14. 微信小程序请求函数的封装
  15. FFmpeg学习(音视频理论知识)
  16. html文本框后面紧挨着按钮,Word题目与答案
  17. 西班牙建筑中的突起感是什么呀
  18. drf路由组件Routers
  19. 组织病理学的生存模型综述
  20. 单片机计数器实验代码c语言,单片机计数器功能实验程序

热门文章

  1. 人类会被人工智能取代吗
  2. 详解程序员驻场开发服务的具体流程
  3. 我的二十条择偶标准 [转]
  4. Mac Unity导入FBX模型时出现材质丢失,模型为白膜的情况
  5. 艾 宾 浩 斯 记 忆 法
  6. RDP协议简介与通讯数据加密等级及设置说明
  7. x86上 /proc/cpuinfo中的cpufreq和scaling_cur_freq怎么算
  8. 计算机ipad手机组成,如何为自适应手机,计算机和iPad制作网页的摘要
  9. 美的 Dell 国信证券面经整理
  10. Pytorch数据使用列表的卷积层时报错及解决-RuntimeError: Input type (torch.cuda.HalfTensor) and weight type (torch.Floa