目录

简介

核心概念

消息流转模型图

​生产者基本流程

API实现

消息模型

主题topic、分区partion、分区副本replica

生产者

消费者和消费者组

offset的保存

分配partition--reblance

消息投递语义

At least once

At most once

Exactly once

生产幂等性

消费事务

文件组织

常用配置项

broker配置

topic配置


简介

kafka是一个分布式消息队列。具有高性能、持久化、多副本备份、横向扩展能力。生产者往队列里写消息,消费者从队列里取消息进行业务逻辑。一般在架构设计中起到解耦、削峰、异步处理的作用。官网地址:Apache Kafkahttps://kafka.apache.org/

kafka对外使用topic的概念,生产者往topic里写消息,消费者从topic读消息。为了做到水平扩展,一个topic实际是由多个partition组成的,遇到瓶颈时,可以通过增加partition的数量来进行横向扩容。单个parition内是保证消息有序。

核心概念

  • broker:节点,即集群中的一台主机。若存在多个broker,会选举出一个controller,用来访问zookeeper集群元数据实现集群管理
  • producer:生产者,即向broker发送消息的发送方
  • consumer:消费者,即从broker获取消息的接收方
  • consumer group:消费组,由多个消费者组成,同一条消息只会被组中的一个成员消费但可以被不同消费组消费
  • topic:主题,即一个消息队列
  • partition:分区,topic的物理组成,一个topic可以分为多个分区,分区可以分布在多个broker中。一个主题的一条消息只会发往一个分区
  • replica:分区副本,每个partition都有至少一个leader和若干个follower用于主备,实现高可用
  • leader:分区副本中的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 leader。
  • follower:分区副本中的“从”,实时从 leader 中同步数据,保持和 leader 数据的同步,不过不会和生产者消费者交互。leader 发生故障时,某个 follower 会成为新的 leader。
  • offset:消费者对某个分区的消费量,由group topic partition三者唯一标识。当消费者第一次连接kafka时会去读取该值,之后的时间会在自身内存中存储最新偏移量,消费者可以提交该值写入至kafka中(__consumer_offsets主题)

消息流转模型图

大概用法就是,Producers往Brokers里面的指定Topic中写消息,Consumers从Brokers里面拉去指定Topic的消息,然后进行业务处理。
图中有两个topic,topic 0有两个partition,topic 1有一个partition,三副本备份。可以看到consumer gourp 1中的consumer 2没有分到partition处理,这是有可能出现的,下面会讲到。

Producer发送数据

我们看上面的架构图中,producer就是生产者,是数据的入口。注意看图中的红色箭头,Producer在写入数据的时候永远的找leader,不会直接将数据写入follower!那leader怎么找呢?写入的流程又是什么样的呢?我们看下图:

发送的流程就在图中已经说明了,就不单独在文字列出来了!需要注意的一点是,消息写入leader后,follower是主动的去leader进行同步的!producer采用push模式将数据发布到broker,每条消息追加到分区中,顺序写入磁盘,所以保证同一分区内的数据是有序的!写入示意图如下:

 上面说到数据会写入到不同的分区,那kafka为什么要做分区呢?相信大家应该也能猜到,分区的主要目的是:

  1. 方便扩展:因为一个topic可以有多个partition,所以我们可以通过扩展机器去轻松的应对日益增长的数据量。
  2. 提高并发:以partition为读写单位,可以多个消费者同时消费数据,提高了消息的处理效率。

关于broker、topics、partitions的一些元信息用zk来存,监控和路由啥的也都会用到zk。


生产者基本流程

创建一条记录,记录中一个要指定对应的topic和value,key和partition可选。 先序列化,然后按照topic和partition,放进对应的发送队列中。kafka producer都是批量请求,会积攒一批,然后一起发送,不是调send()就进行立刻进行网络发包。
如果partition没填,那么情况会是这样的:

  1. key有填
    按照key进行哈希,相同key去一个partition。(如果扩展了partition的数量那么就不能保证了)
  2. key没填
    round-robin来选partition

这些要发往同一个partition的请求按照配置,攒一波,然后由一个单独的线程一次性发过去。

消息模型

1. 队列(点对点)模型

如上图所示,点对点模式通常是基于拉取或者轮询的消息传送模型,这个模型的特点是发送到队列的消息被一个且只有一个消费者进行处理。生产者将消息放入消息队列后,由消费者主动的去拉取消息进行消费。点对点模型的的优点是消费者拉取消息的频率可以由自己控制。

2. 发布订阅(广播)模型

如上图所示,发布订阅模式是一个基于消息送的消息传送模型,改模型可以有多种不同的订阅者。生产者将消息放入消息队列后,队列会将消息推送给订阅过该类消息的消费者。

主题topic、分区partion、分区副本replica

主题:是一个逻辑上的概念,代表着某一个消息队列,而分区则是其物理上的实现,它的表现形式为以log和index为后缀的文件(存储在log.dirs目录下),前者存储数据,后者存储索引用于快速定位数据。

partition:

当存在多副本replica的情况下,会尽量把多个副本replica,分配到不同的broker上。kafka会为partition选出一个leader,之后所有该partition的请求,实际操作的都是leader,然后再同步到其他的follower当一个broker down后,所有leader在该broker上的partition都会重新选举,选出一个leader。(这里不像分布式文件存储系统那样会自动进行复制保持副本数)

前面说过了每个topic都可以分为一个或多个partition,如果你觉得topic比较抽象,那partition就是比较具体的东西了!Partition在服务器上的表现形式就是一个一个的文件夹,每个partition的文件夹下面会有多组segment文件,每组segment文件又包含.index文件、.log文件、.timeindex文件(早期版本中没有)三个文件, log文件就实际是存储message的地方,而index和timeindex文件为索引文件,用于检索消息。

如上图,这个partition有三组segment文件,每个log文件的大小是一样的,但是存储的message数量是不一定相等的(每条的message大小不一致)。文件的命名是以该segment最小offset来命名的,如000.index存储offset为0~368795的消息,kafka就是利用分段+索引的方式来解决查找效率的问题。

在我们聊到了partition划分为多组segment,每个segment又包含.log、.index、.timeindex文件,存放的每条message包含offset、消息大小、消息体……我们多次提到segment和offset,查找消息的时候是怎么利用segment+offset配合查找的呢?

查找message举例如下:

假如现在需要查找一个offset为368801的message是什么样的过程呢?我们先看看下面的图:

  1. 先找到offset的368801message所在的segment文件(利用二分法查找),这里找到的就是在第二个segment文件。
  2. 打开找到的segment中的.index文件(也就是368796.index文件,该文件起始偏移量为368796+1,我们要查找的offset为368801的message在该index内的偏移量为368796+5=368801,所以这里要查找的相对offset为5)。由于该文件采用的是稀疏索引的方式存储着相对offset及对应message物理偏移量的关系,所以直接找相对offset为5的索引找不到,这里同样利用二分法查找相对offset小于或者等于指定的相对offset的索引条目中最大的那个相对offset,所以找到的是相对offset为4的这个索引。
  3. 根据找到的相对offset为4的索引确定message存储的物理偏移位置为256。打开数据文件,从位置为256的那个地方开始顺序扫描直到找到offset为368801的那条Message。

这套机制是建立在offset为有序的基础上,利用segment+有序offset+稀疏索引+二分查找+顺序查找等多种手段来高效的查找数据!至此,消费者就能拿到需要处理的数据进行处理了。那每个消费者又是怎么记录自己消费的位置呢?在早期的版本中,消费者将消费到的offset维护zookeeper中,consumer每间隔一段时间上报一次,这里容易导致重复消费,且性能不好!在新的版本中消费者消费到的offset已经直接维护在kafk集群的__consumer_offsets这个topic中!

那么对应Kafka怎么分配partition,怎么选leader?

关于partition的分配,还有leader的选举,总得有个执行者。在kafka中,这个执行者就叫controller。kafka使用zk在broker中选出一个controller,用于partition分配和leader选举。

partition的分配

  1. 将所有Broker(假设共n个Broker)和待分配的Partition排序
  2. 将第i个Partition分配到第(i mod n)个Broker上 (这个就是leader)
  3. 将第i个Partition的第j个Replica分配到第((i + j) mode n)个Broker上

leader容灾

controller会在Zookeeper的/brokers/ids 如图节点上注册Watch,一旦有broker宕机,它就能知道。当broker宕机后,controller就会给受到影响的partition选出新leader。controller从zk的/brokers/topics/[topic]/partitions/[partition]/state中,例如下图中自定义topic为test-lc-b 的0号partition 中的isr

读取对应partition的ISR(in-sync replica已同步的副本)列表,选一个出来做leader。
选出leader后,更新zk,然后发送LeaderAndISRRequest给受影响的broker,让它们改变知道这事。

如果ISR列表是空,那么会根据配置,随便选一个replica做leader,或者干脆这个partition就是歇菜。如果ISR列表的有机器,但是也歇菜了,那么还可以等ISR的机器活过来。

多副本同步

这里的策略,服务端这边的处理是follower从leader批量拉取数据来同步。但是具体的可靠性,是由生产者来决定的。保证消息不丢失是一个消息队列中间件的基本保证,那producer在向kafka写入消息的时候,怎么保证消息不丢失呢?那就是通过ACK应答机制!
生产者生产消息的时候,通过request.required.acks参数来设置数据的可靠性。

acks what happen
0 which means that the producer never waits for an acknowledgement from the broker.发过去就完事了,不关心broker是否处理成功,可能丢数据。
1 which means that the producer gets an acknowledgement after the leader replica has received the data. 当写Leader成功后就返回,其他的replica都是通过fetcher去同步的,所以kafka是异步写,主备切换可能丢数据。
-1 which means that the producer gets an acknowledgement after all in-sync replicas have received the data. 要等到isr里所有机器同步成功,才能返回成功,延时取决于最慢的机器。强一致,不会丢数据。

在acks=-1的时候,如果ISR少于min.insync.replicas指定的数目,那么就会返回不可用。

这里ISR列表中的机器是会变化的,根据配置replica.lag.time.max.ms,多久没同步,就会从ISR列表中剔除。以前还有根据落后多少条消息就踢出ISR,在1.0版本后就去掉了,因为这个值很难取,在高峰的时候很容易出现节点不断的进出ISR列表。

从ISA中选出leader后,follower会从把自己日志中上一个高水位后面的记录去掉,然后去和leader拿新的数据。因为新的leader选出来后,follower上面的数据,可能比新leader多,所以要截取。这里高水位的意思,对于partition和leader,就是所有ISR中都有的最新一条记录。消费者最多只能读到高水位;

从leader的角度来说高水位的更新会延迟一轮,例如写入了一条新消息,ISR中的broker都fetch到了,但是ISR中的broker只有在下一轮的fetch中才能告诉leader。

也正是由于这个高水位延迟一轮,在一些情况下,kafka会出现丢数据和主备数据不一致的情况,0.11开始,使用leader epoch来代替高水位。(https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+use+Leader+Epoch+rather+than+High+Watermark+for+Truncation#KIP-101-AlterReplicationProtocoltouseLeaderEpochratherthanHighWatermarkforTruncation-Scenario1:HighWatermarkTruncationfollowedbyImmediateLeaderElection)

ISR(in-sync replicas):已同步副本,之前提到过一个分区至少有一个leader和若干个follower,而ISR就是leader维护的一个集合,里面存放着已同步或者并未落后太多的follower,当leader挂了之后,会从ISR中挑选出一个新leader。这个"并未落后太多“可以通过replica.lag.time.max.ms配置指定,默认为30秒,指的是当follower在这个时间内还未向leader发起过同步数据请求,则将该follower踢出ISR集合

LEO(log end offset):每个副本的最后一个offset

HW(high watermark):同一组分区里每个副本中最小的LEO,HW之前的数据才对消费者可见。当leader挂了,也能保证消费者消费数据一致性,当挂掉的leader作为follower恢复后需要丢弃HW之后的消息数据,然后向新leader同步数据

分区分配策略:round-robin,range(默认)和sticky。当消费组中的消费者数量发生变化时,触发分区策略。

  • RangeAssignor策略的原理是按照消费者总数和分区总数进行整除运算来获得一个跨度,然后将分区按照跨度进行平均分配,以保证分区尽可能均匀地分配给所有的消费者。假设n=分区数/消费者数量,m=分区数%消费者数量,那么前m个消费者每个分配n+1个分区,后面的(消费者数量-m)个消费者每个分配n个分区。
  • RoundRobinAssignor的原理是根据每个topic列出对应的消费者,然后轮转分配该topic下的分区。
  • StickyAssignor策略的目标则是(1)分区的分配要尽可能的均匀;(2)分区的分配尽可能的与上次分配的保持相同。当两者发生冲突时,第一个目标优先于第二个目标。

生产者

生产者在将消息发送到某个Topic ,需要经过拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)的一系列作用之后才能发送到对应的Broker,在发往Broker之前是需要确定它所发往的分区。

消费者和消费者组

consumer 采用 pull(拉)模式从 broker 中读取数据。push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。 它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 consumer 的消费能力以适 当的速率消费消息。pull 模式不足之处是,如果 kafka 没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,Kafka 的消费者在消费数据时会传入一个时长参数 timeout,如果当前没有 数据可供消费,consumer 会等待一段时间之后再返回,这段时长即为 timeout。

订阅topic是以一个消费组来订阅的,一个消费组里面可以有多个消费者。同一个消费组中的两个消费者,不会同时消费一个partition。换句话来说,就是一个partition,只能被消费组里的一个消费者消费,但是可以同时被多个消费组消费。因此,如果消费组内的消费者如果比partition多的话,那么就会有个别消费者一直空闲。

offset的保存

一个消费组消费partition,需要保存offset记录消费到哪,以前保存在zk中,由于zk的写性能不好,以前的解决方法都是consumer每隔一分钟上报一次。这里zk的性能严重影响了消费的速度,而且很容易出现重复消费。
在0.10版本后,kafka把这个offset的保存,从zk中剥离,保存在一个名叫__consumeroffsets topic的topic中。写进消息的key由groupid、topic、partition组成,value是偏移量offset。topic配置的清理策略是compact。总是保留最新的key,其余删掉。一般情况下,每个key的offset都是缓存在内存中,查询的时候不用遍历partition,如果没有缓存,第一次就会遍历partition建立缓存,然后查询返回。

看上去该 Offset 就是一个数值而已,其实对于 Consumer Group 而言,它是一组 KV 对,Key 是分区,V 对应 Consumer 消费该分区的最新位移。如果用 Java 来表示的话,你大致可以认为是这样的数据结构,即 Map<TopicPartition, Long>,其中 TopicPartition 表示一个分区,而 Long 表示位移的类型。当然,我必须承认 Kafka 源码中并不是这样简单的数据结构,而是要比这个复杂得多,不过这并不会妨碍我们对 Group 位移的理解。

分配partition--reblance( Consumer Group 端的重平衡)

Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 Consumer 如何达成一致,来分配订阅 Topic 的每个分区。生产过程中broker要分配partition,消费过程这里,也要分配partition给消费者。类似broker中选了一个controller出来,消费也要从broker中选一个coordinator,用于分配partition。比如某个 Group 下有 20 个 Consumer 实例,它订阅了一个具有 100 个分区的 Topic。正常情况下,Kafka 平均会为每个 Consumer 分配 5 个分区。这个分配的过程就叫 Rebalance。
下面从顶向下,分别阐述一下

  1. 怎么选coordinator。
  2. 交互流程。
  3. reblance的流程。

选coordinator

  1. 看offset保存在那个partition
  2. 该partition leader所在的broker就是被选定的coordinator

这里我们可以看到,consumer group的coordinator,和保存consumer group offset的partition leader是同一台机器。

交互流程

把coordinator选出来之后,就是要分配了
整个流程是这样的:

  1. consumer启动、或者coordinator宕机了,consumer会任意请求一个broker,发送ConsumerMetadataRequest请求,broker会按照上面说的方法,选出这个consumer对应coordinator的地址。
  2. consumer 发送heartbeat请求给coordinator,返回IllegalGeneration的话,就说明consumer的信息是旧的了,需要重新加入进来,进行reblance。返回成功,那么consumer就从上次分配的partition中继续执行

reblance流程

  1. consumer给coordinator发送JoinGroupRequest请求。
  2. 这时其他consumer发heartbeat请求过来时,coordinator会告诉他们,要reblance了。
  3. 其他consumer发送JoinGroupRequest请求。
  4. 所有记录在册的consumer都发了JoinGroupRequest请求之后,coordinator就会在这里consumer中随便选一个leader。然后回JoinGroupRespone,这会告诉consumer你是follower还是leader,对于leader,还会把follower的信息带给它,让它根据这些信息去分配partition
  5. consumer向coordinator发送SyncGroupRequest,其中leader的SyncGroupRequest会包含分配的情况。
  6. coordinator回包,把分配的情况告诉consumer,包括leader。

当partition或者消费者的数量发生变化时,都得进行reblance。
列举一下会reblance的情况,触发条件有 3 个:

  1. 组成员数发生变更。比如有新的 Consumer 实例加入组或者离开组,抑或是有 Consumer 实例崩溃被“踢出”组。

  2. 订阅主题数发生变更。Consumer Group 可以使用正则表达式的方式订阅主题,比如 consumer.subscribe(Pattern.compile(“t.*c”)) 就表明该 Group 订阅所有以字母 t 开头、字母 c 结尾的主题。在 Consumer Group 的运行过程中,你新创建了一个满足这样条件的主题,那么该 Group 就会发生 Rebalance。

  3. 订阅主题的分区数发生变更。Kafka 当前只能允许增加一个主题的分区数。当分区数增加时,就会触发订阅该主题的所有 Group 开启 Rebalance。

消息投递语义

kafka支持3种消息投递语义
At most once:最多一次,消息可能会丢失,但不会重复
At least once:最少一次,消息不会丢失,可能会重复
Exactly once:只且一次,消息不丢失不重复,只且消费一次(0.11中实现,仅限于下游也是kafka)

在业务中,常常都是使用At least once的模型,如果需要可重入的话,往往是业务自己实现。

At least once

先获取数据,再进行业务处理,业务处理成功后commit offset。
1、生产者生产消息异常,消息是否成功写入不确定,重做,可能写入重复的消息
2、消费者处理消息,业务处理成功后,更新offset失败,消费者重启的话,会重复消费

At most once

先获取数据,再commit offset,最后进行业务处理。
1、生产者生产消息异常,不管,生产下一个消息,消息就丢了
2、消费者处理消息,先更新offset,再做业务处理,做业务处理失败,消费者重启,消息就丢了

Exactly once

思路是这样的,首先要保证消息不丢,再去保证不重复。所以盯着At least once的原因来搞。 首先想出来的:

  1. 生产者重做导致重复写入消息----生产保证幂等性
  2. 消费者重复消费---消灭重复消费,或者业务接口保证幂等性重复消费也没问题

由于业务接口是否幂等,不是kafka能保证的,所以kafka这里提供的exactly once是有限制的,消费者的下游也必须是kafka。所以一下讨论的,没特殊说明,消费者的下游系统都是kafka(注:使用kafka conector,它对部分系统做了适配,实现了exactly once)。

生产者幂等性好做,没啥问题。

解决重复消费有两个方法:

  1. 下游系统保证幂等性,重复消费也不会导致多条记录。
  2. 把commit offset和业务处理绑定成一个事务。

本来exactly once实现第1点就ok了。

但是在一些使用场景下,我们的数据源可能是多个topic,处理后输出到多个topic,这时我们会希望输出时要么全部成功,要么全部失败。这就需要实现事务性。既然要做事务,那么干脆把重复消费的问题从根源上解决,把commit offset和输出到其他topic绑定成一个事务。

生产幂等性

思路是这样的,为每个producer分配一个pid,作为该producer的唯一标识。producer会为每一个<topic,partition>维护一个单调递增的seq。类似的,broker也会为每个<pid,topic,partition>记录下最新的seq。当req_seq == broker_seq+1时,broker才会接受该消息。因为:

  1. 消息的seq比broker的seq大超过时,说明中间有数据还没写入,即乱序了。
  2. 消息的seq不比broker的seq小,那么说明该消息已被保存。

事务性/原子性广播

场景是这样的:

  1. 先从多个源topic中获取数据。
  2. 做业务处理,写到下游的多个目的topic。
  3. 更新多个源topic的offset。

其中第2、3点作为一个事务,要么全成功,要么全失败。这里得益与offset实际上是用特殊的topic去保存,这两点都归一为写多个topic的事务性处理。

基本思路是这样的:
引入tid(transaction id),和pid不同,这个id是应用程序提供的,用于标识事务,和producer是谁并没关系。就是任何producer都可以使用这个tid去做事务,这样进行到一半就死掉的事务,可以由另一个producer去恢复。
同时为了记录事务的状态,类似对offset的处理,引入transaction coordinator用于记录transaction log。在集群中会有多个transaction coordinator,每个tid对应唯一一个transaction coordinator。
注:transaction log删除策略是compact,已完成的事务会标记成null,compact后不保留。

做事务时,先标记开启事务,写入数据,全部成功就在transaction log中记录为prepare commit状态,否则写入prepare abort的状态。之后再去给每个相关的partition写入一条marker(commit或者abort)消息,标记这个事务的message可以被读取或已经废弃。成功后在transaction log记录下commit/abort状态,至此事务结束。

数据流:

  1. 首先使用tid请求任意一个broker(代码中写的是负载最小的broker),找到对应的transaction coordinator。

  2. 请求transaction coordinator获取到对应的pid,和pid对应的epoch,这个epoch用于防止僵死进程复活导致消息错乱,当消息的epoch比当前维护的epoch小时,拒绝掉。tid和pid有一一对应的关系,这样对于同一个tid会返回相同的pid。

  3. client先请求transaction coordinator记录<topic,partition>的事务状态,初始状态是BEGIN,如果是该事务中第一个到达的<topic,partition>,同时会对事务进行计时;client输出数据到相关的partition中;client再请求transaction coordinator记录offset的<topic,partition>事务状态;client发送offset commit到对应offset partition。

  4. client发送commit请求,transaction coordinator记录prepare commit/abort,然后发送marker给相关的partition。全部成功后,记录commit/abort的状态,最后这个记录不需要等待其他replica的ack,因为prepare不丢就能保证最终的正确性了。

这里prepare的状态主要是用于事务恢复,例如给相关的partition发送控制消息,没发完就宕机了,备机起来后,producer发送请求获取pid时,会把未完成的事务接着完成。

当partition中写入commit的marker后,相关的消息就可被读取。所以kafka事务在prepare commit到commit这个时间段内,消息是逐渐可见的,而不是同一时刻可见。

详细细节可看:https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging#KIP-98-ExactlyOnceDeliveryandTransactionalMessaging-TransactionalGuarantees

消费事务

前面都是从生产的角度看待事务。还需要从消费的角度去考虑一些问题。
消费时,partition中会存在一些消息处于未commit状态,即业务方应该看不到的消息,需要过滤这些消息不让业务看到,kafka选择在消费者进程中进行过来,而不是在broker中过滤,主要考虑的还是性能。kafka高性能的一个关键点是zero copy,如果需要在broker中过滤,那么势必需要读取消息内容到内存,就会失去zero copy的特性。

文件组织

kafka的数据,实际上是以文件的形式存储在文件系统的。topic下有partition,partition下有segment,segment是实际的一个个文件,topic和partition都是抽象概念。

在目录/${topicName}-{$partitionid}/下,存储着实际的log文件(即segment),还有对应的索引文件。

每个segment文件大小相等,文件名以这个segment中最小的offset命名,文件扩展名是.log;segment对应的索引的文件名字一样,扩展名是.index。有两个index文件,一个是offset index用于按offset去查message,一个是time index用于按照时间去查,其实这里可以优化合到一起,下面只说offset index。总体的组织是这样的:

为了减少索引文件的大小,降低空间使用,方便直接加载进内存中,这里的索引使用稀疏矩阵,不会每一个message都记录下具体位置,而是每隔一定的字节数,再建立一条索引。 索引包含两部分,分别是baseOffset,还有position。

baseOffset:意思是这条索引对应segment文件中的第几条message。这样做方便使用数值压缩算法来节省空间。例如kafka使用的是varint。

position:在segment中的绝对位置。

查找offset对应的记录时,会先用二分法,找出对应的offset在哪个segment中,然后使用索引,在定位出offset在segment中的大概位置,再遍历查找message。

常用配置项

broker配置

配置项 作用
broker.id broker的唯一标识
auto.create.topics.auto 设置成true,就是遇到没有的topic自动创建topic。
log.dirs log的目录数,目录里面放partition,当生成新的partition时,会挑目录里partition数最少的目录放。

topic配置

配置项 作用
num.partitions 新建一个topic,会有几个partition。
log.retention.ms 对应的还有minutes,hours的单位。日志保留时间,因为删除是文件维度而不是消息维度,看的是日志文件的mtime。
log.retention.bytes partion最大的容量,超过就清理老的。注意这个是partion维度,就是说如果你的topic有8个partition,配置1G,那么平均分配下,topic理论最大值8G。
log.segment.bytes 一个segment的大小。超过了就滚动。
log.segment.ms 一个segment的打开时间,超过了就滚动。
message.max.bytes message最大多大

关于日志清理,默认当前正在写的日志,是怎么也不会清理掉的。
还有0.10之前的版本,时间看的是日志文件的mtime,但这个指是不准确的,有可能文件被touch一下,mtime就变了。因此在0.10版本开始,改为使用该文件最新一条消息的时间来判断。
按大小清理这里也要注意,Kafka在定时任务中尝试比较当前日志量总大小是否超过阈值至少一个日志段的大小。如果超过但是没超过一个日志段,那么就不会删除。

参考文章:

震惊了!原来这才是kafka! - 简书

Pages - Apache Kafka - Apache Software Foundation

https://blog.csdn.net/li1669852599/category_10768923.html

kafka(1) 初识相关推荐

  1. 二十一、Hadoop学记笔记————kafka的初识

    这些场景的共同点就是数据由上层框架产生,需要由下层框架计算,其中间层就需要有一个消息队列传输系统 Apache flume系统,用于日志收集 Apache storm系统,用于实时数据处理 Spark ...

  2. 打怪升级之小白的大数据之旅(七十四)<初识Kafka>

    打怪升级之小白的大数据之旅(七十四) 初识Kafka 引言 学完Flume之后,接下来将为大家带来Kafka相关的知识点,在工作中,Kafka和Flume经常会搭配使用,那么Kafka究竟是什么呢?让 ...

  3. 初识Kafka-概念速览|安装与配置—《Kafka权威指南》笔记

    文章目录 初识Kafka 消息 批次 模式 主题与分区 生产者和消费者 broker和集群 保留消息 多集群 Kafka数据生态 安装与配置 安装 Java 安装 Zookeeper Zookeepe ...

  4. kafka学习(一)初识kafka

    本文借鉴:再过半小时,你就能明白kafka的工作原理了(特此感谢!) 一.简介 定义:kafka是一个分布式,基于zookeeper协调的发布/订阅模式的消息系统,本质是一个MQ(消息队列Messag ...

  5. kafka学习_kafka学习(第一章 初识kafka)

    kafka是什么? Kafka是由Apache开发的.开源的流处理平台,它是一个高吞吐量的.持久的.分布式的发布订阅模式的消息系统. 高吞吐量:普通配置的服务器即可满足每秒百万级别消息的生产和消费: ...

  6. 架构初识之 —— 使用kafka进行商品维度化缓存解决方案

    随着分布式,微服务越来越普遍,对开发的要求也在不断的增加,对架构的要求也提出了越来越多的要求,在那些分布式项目中,经常面临的一个问题就是,高效,解耦,举例来说,当一个小型电商网站越来越大的时候,单体架 ...

  7. Kafka核心设计与实践原理总结:基础篇

    作者:未完成交响曲,资深Java工程师!目前在某一线互联网公司任职,架构师社区合伙人! 一.基本概念 1.体系架构 Producer:生产者 Consumber:消费者 Broker:服务代理节点(k ...

  8. Kafka【入门】就这一篇!

    一.Kafka 简介 Kafka 创建背景 Kafka 是一个消息系统,原本开发自 LinkedIn,用作 LinkedIn 的活动流(Activity Stream)和运营数据处理管道(Pipeli ...

  9. 分布式系统保障—混沌工程—初识

    原文作者: 朱小厮的博客 原文地址:混沌工程(Chaos Engineering)初识 编辑推荐: 本文主要介绍什么是混沌工程.混沌工程的五大原则.混沌工程成熟度模型(CMM)以及混沌工程的目标--韧 ...

最新文章

  1. 谷歌全新轻量级新模型ALBERT刷新三大NLP基准
  2. linux ip add address,linux – ip地址范围参数
  3. uniapp自定义条件编译-定制化产品
  4. redis 缓存预热_Redis:缓存雪崩、缓存穿透、缓存预热、缓存更新、缓存降级
  5. mojoportal学习——文章翻译之使用Artisteer快捷的创建模板
  6. VS 内存不能read—堆栈空间解决栈溢出问题
  7. 《领域驱动设计精粹》DDD Domain-Driven Design Distilled -- Vaughn Vernon 读后感
  8. 【caffe】Layer解读之:Date
  9. 红帽子linux9百度云,linux安装--红帽子Linux REDHAT 9.0 ISO(3CD)
  10. 批处理文件获取计算机ip,批处理获取本机IP地址及MAC地址,输出到文件
  11. iOS searchbar实现汉字更具拼音首字母排序
  12. MATLAB 三维立体绘图
  13. 高德地图 SDK 的应用 01:绘制多边形区域图
  14. MySQL数据库基础备份-mysqldump备份
  15. PM Q7声卡使用教程
  16. 计算机毕业设计Android的手机音乐播放器app(源码+系统+mysql数据库+Lw文档)
  17. CSS开发技巧实用记(一)
  18. 游戏加盟和游戏代理有什么区别?哪个更好?
  19. 手把手教处理串口数据
  20. 微信小程序离线引入 iconfont 字体图标

热门文章

  1. 北京离职自己上社保哪个靠谱
  2. mac版Mysql可视化工具 - Sequel Pro
  3. 架构之——umi框架与dva的使用
  4. AppleScript 小试牛刀
  5. 计算机图形学 实验6 直线的裁剪—Cohen Sutherland裁剪算法(MFC中)
  6. 浅谈对dao层的理解
  7. [事件处理] js实现的文本框内容发生改变立马触发事件简单介绍
  8. mysql配置文件生效测试
  9. Linux 配置php
  10. PS2018学习笔记(03-18节)