如图6-42所示,上一节将日志作为人- ,分析了日志的读写,但并没有分析日志相关的上下文。本节会从服务端的人- 出发,通过副本管理器、分区、副本,一直到日志,将整个读写流程串联起来。图中分区到日志的虚线表示:业务逻辑层的一个分区对应物理存储层的一个日志。消息集到数据文件的虚线表示:客户端发送的消息集最终会写入日志分段对应的数据文件,存储至UKafka的消息、代理节点。

Kafka服务在启动时会先创建各种相关的组件,最后才会创建KafkaApi.s。业务组件一般都有后台的线程,除了创建组件后,也要启动这些后台线程。比如,日志管理器会启动日志管理相关的后台线程(详见6.1.5节);网络服务端会启动处理器线程(详见2.3.1节)。相关代码如下:

从5.2.l节我们了解到,消费者客户端发送“加入组请求”和“同步组请求”给服务端,服务端通过KafkaApi.s~每请求的处理交给消费组的协调者(GroupCoordi.nator)。与之类似,本章客户端发送“生产请求”和“拉取请求”给服务端,服务端将请求的处理交给副本管理器(Repli.caManager)。与日志存储相关的业务组件是副本管理器,负责日志的底层类是日志管理器,副本管理器通过日志管理器间接地操作底层的日志。相关代码如下:

注意:服务端创建日志、读取日志、管理日志都只能通过日志管理器完成。实际上,日志管理器会传给副本管理器,而昌1]本管理器管理所有的分区,分区管理所有的副本,每个副本对应一个日志。分区在创建副本时才会创建日志。每个分区都需妥持有副本管理器的引用,才能通过日志管理器创建日志。

KafkaApis是服务端处理所有请求的人- ,它只负责将请求的具体处理将发给其他组件去处理。服务端处理客户端发送的生产请求和拉取请求,会先解析请求中的数据,不同的请求有不同的数据。生产请求包含:分区消息集、应答超时时间、是否需要应答。拉取请求包含:最长等待时间、最小拉取字节数、备份副本的编号。由于服务端处理请求最后都要返回响应结果给客户端,它们都要事先定义一个“发送响应结果的回调方法”,并作为参数传给副本管理器。相关代码如下:


服务端处理客户端的生产请求和拉取请求,都会通过副本管理器执行具体的业务处理逻辑。

6.2.1 副本管理器

追加消息时,生产者客户端会发送每个分区以及对应的消息集(l’lessagesPerParti.ti.on);拉取消息时,客户端会发送每个分区以及对应的拉取信息(fetchlnfo)。服务端返回给客户端的响应结果也会按照分区分别返回,生产请求的响应结果(Parti.ti.onResponse)包含追加消息集到分区后返回的起始偏移量。拉取请求的响应结果(FetchResponseParti.ti.onData)包含每个分区的最高水位、每个分区的消息集。相关代码如下:


注意:为了快速理解服务端处理读写消息的流程,下面的代码暂时省略了与“延迟操作”相关的内容。服务端追加消息集通过appendMessages()调用appendTolocalLog()方法,将多个分区的消息集分别追加到分区对应的本地日志文件中。拉取消息集通过fetchMessages()方法调用readFroMLocallog()方法,从多个分区对应的本地日志文件中拉取出消息集。

  1. 追加和读取本地日志
    服务端接收客户端发送的生产请求和拉取请求都会包括多个分区。追加消息集和读取消息集,首先都需要获取到分区对象,然后再获取到分区的主副本。针对本地日志文件的读写,追加和拉取在实现上稍微有些不同。
  • 追加消息集时,在分区(Part"i.t"i.on)中获取主副本,并写入本地的日志文件。
  • 读取消息集时,在副本管理器中获取分区的主副本,并读取本地的日志文件。

为了对比追加消息集和拉取消息集的不同,我们将追加消息集到分区的代码从分区中移动到副本管理器中。这样两者的使用方式就有点类似了,它们都会获取到本地副本的日志对象(localReplicalog),然后分别调用日志(Log)的append()方法和read()方法。每个分区的消息集追加到本地日志文件后,都会返回一个LogReadResult对象。从每个分区的本地日志文件拉取归消息集后,都会返回一个LogReadResult对象。相关代码如下:

注意:appendMessages()和fetchMessages()中分别调用localProduceResults和logReadResults的l’lapValues()方法,这是因为转换Map的值时不需妥利用Map的键。而上面两段代码在迭代Map时会用到Map的键Ur分区)执行一些其他的操作,不能用l’lapValues()方法,而只能采用Map()方法。另外Map()方法也有两种使用模式:在case模式匹配中可以用变量匹配、对象匹西己。

在appendTolocalLog()方法中,每个分区的消息集追加到主副本对应的日志文件后,有两个“延迟操作”相关的方法:尝试完成延迟的拉取请求(tryCol’lpleteDelayedFetch)、尝试完成当前分区的延迟请求(tryCol’lpleteDelayedReq四sts)。同样地,拉取每个分区的消息集时,也有类似的“延迟操作”,只不过在实现上相关的代码比较隐蔽。副本管理器的fetchMessages()方法,在读取所有分区的拉取结果后,判断拉取请求:如果是来向备份副本,就会通过updateFollowerLogReadResults()方法调用每个分区的updateRepli.calogReadResult()方法更新每个分区的备份副本。下面的前两段代码去掉了分区的迭代,只专注于一个分区的处理:

为了方便理解“读写本地日志”与“延迟操作”的关联关系,上面3段代码都对源码进行了改造。
实际的源码与上面代码有几个不同点,如下。

  • appendTolocallog()和readFr01TJLocallog()会循环所有的分区,每个分区都会读写日志。这里只列出一个分区的处理方式。
  • appendTolocallog()方法写入主副本日志的逻辑在分区类中,这里移到副本管理器。这是为了和readFr01TJLocallog()形成对比,readFrol’llocallog()方法在副本管理器中读取主副本的日志。
  • 副本管理器调用完readFr01TJLocallog(),才会调用叩dateRepli.calogReadRes1」lt()方法更新每个分区的备份副本。而这里把更新分区的备份副本,移到了readFrOl’llO日llog()读取主副本日志之后,这是为了和appendTolocallog()形成对比。
  • appendTolocallog()方法在写入主副本日志后,会尝试完成被延迟的拉取求。readFrOl’llocallog()方法在读取主副本日志后,会尝试完成被延迟的生产请求。
  • tryCol’lpleteDelayedRequests()方法在分区类中,它会尝试完成当前分区的生产请求和拉取请求。这里把它们移到副本管理器中,就需要给这个方法添加上分区参数。

如表6-4所示,副本管理器处理客户端发送的生产请求和拉取请求,整体上的处理方式是类似的。

生产者发送的生产请求会设置应答值,服务端会根据应答值判断是否需要创建延迟的生产对象。

  1. 生产者客户端设置的应答值
    在2.1.1节中,生产者可以用同步和异步模式发送生产请求给服务端:同步模式下,生产者发送一条消息后,必须等待收到响应结果,才会接着发送下一条消息;异步模式下,生产者发送一条消息后,不用等待收到上一条消息的响应结果,就可以接着发送下一条消息。生产者发送的生产请求还有一个设置项(request.requi.red.acks)一一是杏需要等待服务端的应答,应答的值表示:生产者要求主副本收到指定数量的应答,才会认为生产请求完成了。应答的值有下面3种场景。

“应答值等于0”,表示生产者不会等待服务端的任何应答。客户端将消息添加到网络缓冲区(socketbuffer)后,就认为生产请求已经完成了。这种情况下,主副本收到的应答数量为0,意味着主副本可能都没有收到客户端发送的消息,消息有可能会丢失。客户端发送生产请求后,对应的响应结果需要返回每条消息在服务端的偏移filo应答值等于0时,每条消息的偏移革者F是-10如图6-43所示,生产者设置的应答值等于0,可能会丢失数据,具体步骤如下。

(l)生产者将消息发送到网络通道后就认为生产请求完成。
(2)消息l和消息2发送州去,它们都算完成了。但消息3没完全发送出去,就不算完成。
(3)主副本只收到了消息l,但不保证备份副本会复制这条消息。
(4)主副本挂掉了,它接收的消息l也没有备份,这条消息就丢失了。
(5)备份副本变为主副本后,客户端认为成功的消息l和消息2实际上都丢失了。

“应答值等于l”,表示生产者会等待主副本收到一个应答后,认为生产请求完成了。这一个应答,实际上就是主副本向己的应答。主副本收到客户端发送的消息,并存储到本地日志后,生产请求就算完成,服务端就可以返回响应结果给客户端。这种情况下,由于主副本只收到向己发送的应答(实际上主副本自己不会发送应答,只不过主副本写入成功,就算作一个应答),没有收到备份副本发送的应答,仍然有可能丢失消息。比如主副本写入本地日志后,发送了它向己的应答,生产者认为请求完成了,但这时主副本挂掉了,备份副本都还没有及时地同步主副本写入本地日志的那些消息。

如图6-44所示,生产者设置的应答值等于l,也有可能会丢失数据,具体步骤如下。

(1)生产者将消息发送到网络通道后,需要等待一个副本的应答。
(2)主副本接收消息l和消息2并存储到本地日志。备份副本同步了消息l,但没有同步消息2。
(3)主副本将消息存储到主副本的本地日志,生产请求就算完成,消息3对应的生产请求不算完成。
(4)主副本挂掉了,它接收的消息2没有备份,但消息l在挂掉之前存在一个副本备份。
(5)备份副本变为主副本,客户端认为成功的消息l和消息2,只有消息1,丢失了消息2。

“应答值等于-1(或者all)”,表示生产者发送生产请求后,“所有处于同步的备份副本(ISRY’都向主副本发送了应答之后,生产请求才算完成。在这之前,如果这些备份副本只要有一个没有向主副本发送应答,主副本就会阻塞井等待,生产请求就不能完成。这种情况保证了:JSR中的副本只要有一个是存活的,消息就不会丢失。ISR一定会包括主副本,即使主副本挂掉了,只要还有一个备份副本存活,仍然可以保证消息不会丢失。

如图6-45所示,生产者设置的应答值等于-1,不会丢失数据,具体步骤如下。

(1)生产者将消息发送到网络通道后,需要等待多个副本的应答。
(2)主副本接收消息并存储到本地日志,备份副本都同步了消息l和消息2,但未完全同步消息3。
(3)主副本收到所有备份副本的应答,生产请求才算完成,消息3对应的生产请求不算完成。
(4)主副本挂掉了,消息1和消息2都有备份,但1肖息3不一定所有的副本都有备份。
(5)备份副本变为主副本,客户端认为成功的消息1和消息2,在所有备份副本中都存在,不会丢失。

对于应答值等于0,服务端并不保证一定会收到生产者客户端发送的消息集,但这并不意味着服务端不需要处理生产请求。服务端仍然需要处理生产请求,并将消息集追加到本地日志中。只不过消息存储到主副本的本地日志文件后,不再需要发送响应结果给生产者客户端。(因为生产者客户端在把生产请求发送到网络通道后,就会立即收到响应结果。如果服务端处理完成后还要发送响应结果给客户端,那客户端就收到了两次响应结果,而这显然是不必要的。)

对于应答值等于1,副本管理器的appendMessages()方法调用appendTolocallog()就确保消息集追加到主副本的木地日志文件中了。这时就可以调用回调方法,立即返回响应结果给客户端。

对于应答值等于-1,消息集只是追加到主副本还不够,主副本还需要等待ISR中的所有备份副本都向它发送应答。这意味着:消息集在写到主副本的本地日志文件之后,服务端还不能返回响应结果给客户端。服务端为了等待备份副本发送应答,可以采用阻塞的方式,但这种实现方式对服务端的性能影响较大。Kafka针对这种需要“延迟返回响应结果”给客户端的情况,专门会有一个“延迟的操作”。

注意:从服务端的角度来看,服务端会延迟返回响应结果给客户端。而从客户端的角度来看,客户端友送的请求需妥等待一段时间才能收到响应结果,生产请求的响应结果被延迟返回。

  1. 创建延迟的生产和延迟的拉取
    服务端延迟返回响应结果是有一定限制条件的。对于生产请求,delayedProduceRequest()方法必须同时满足下面3个条件,服务端才需要“延迟返回生产结果”给客户端。
  • 生产者等待所有ISR备份副本都向主副本发送应答:requi.redAcks==-1。
  • 生产者发送的消息有数据:MessagesPerParti.ti.on.si.ze>0。
  • 至少要有一个分区写入到主副本的本地日志文件是成功的。

如果不满足上面任意一个条件,服务端就会立即返回响应结果给客户端。比如,生产者设置的应答值等于l,或生产者发送的消息集根本没有数据,或所有分区写入到主副本的本地日志文件都失败了。服务端创建“延迟的生产”,意味着备份副本会向主副本拉取数据。如果没有创建“延迟的生产”,并不意味着备份副本不会向主副本拉取数据,只是生产者客户端不关心而已。相关代码如下:

对于拉取请求,必须同时满足下面4个条件,服务端才需要“延迟返回拉取结果”给客户端。

  • 拉取请求设置等待时间:ti.l’leout>0。
  • 拉取请求要有拉取分区:fetchinfo.si.ze>0。
  • 本次拉取还没有收集足够的数据:bytesReadable<fetchMi.nBytes。
  • 拉取分区时不能发生错误:!errOrReadi.ngDatao

追加消息集到主副本的本地日志文件后,如果满足“延迟生产”的限制条件,就会创建一个“延迟的生产”对象(DelayedProduce)。读取主副本的本地日志文件后,如果满足“延迟拉取”的限制条件,就会创建一个“延迟的拉取”对象(DelayedFetch)。相关代码如下:

回顾下第5章可知,协调者处理消费者“加入组请求”“同步组请求”“心跳请求”时,也会创建两种延迟操作:“延迟的加入”(DelayedJoin)、“延迟的心跳”(DelayedHeartbeat)。相关代码如下:

服务端创建延迟操作对象后,会立即尝试是否能够完成这个延迟的操作,如果不能完成会加入“延迟缓存”。如l入延迟缓存中的延迟操作对象,有两种完成方式:超时或者外部事件。超时后,服务端必须返回响应结果给客户端。外部事件导致“限制条件”不再满足,服务端可以立即返回响应结果给客户端。创建延迟操作对象,并加入延迟缓存的步骤如下。

(1)获取延迟操作的超时时间。“延迟生产”和“延迟拉取”的超时时间都是客户端设置的,“延迟加入”是消费组的再平衡时间,“延迟心跳”是消费组的会话时间。
(2)创建延迟的操作对象。除了设置超时时间,还要传递相关的元数据,即延迟操作对象是有状态的。
(3)指定延迟操作的键。“延迟生产”和“延迟拉取”的键都是分区,“延迟加入”的键是消费组编号,“延迟心跳”的键是消费者成员编号。
(4)通过延迟缓存执行第一次的尝试完成操作。如果完成不了,将延迟操作对象加入延迟缓存的监控。

如表6-5所示,列举了服务端创建的4种延迟操作对象,以及它们分别在对应的外部事件触发下,返回响应结果给客户端需要满足的条件。“延迟加入”的外部事件是:消费组中的所有||二|消费者全部重新发送了“加入组请求”。这时“延迟的加入”操作才认为不再被延迟了,服务端会返回“加入组响应结果”给每个消费者。“延迟生产”的外部事件是:!SR的所有备份副本都向主副本发送了应答。这时“延迟的生产”操作才认为不再被延迟,服务端会返回“生产响应结果”给生产者。

除了上面的外部事件会完成延迟的操作外,针对“延迟的生产”和“延迟的拉取”,还有其他事件会尝试完成延迟的操作对象。

  1. 延迟的操作与外部事件的关系
    如表6-6所示,服务端处理生产请求,追加消息集到主副本的本地日志后,会尝试完成延迟的拉取;服务端处理备份副本的拉取请求,向主副本的本地日志读取消息集后,会尝试完成延迟的生产。

服务端处理的拉取请求可以来自消费者和备份副本。备份副本拉取主副本的消息、,会尝试完成“延迟的生产”,而消费者拉取主副本的消息时,并不会尝试完成“延迟的生产”。不过,生产者追加消息、到主副本的本地日志后,则可能会尝试完成消费者创建的“延迟拉取”。如图6-46所示,生产者在追加消息集后(中图),会创建延迟的生产请求;消费者(左图)和备份副本(右图)在读取消息集后,会创建延迟的拉取请求。图中生产者、消费者、备份副本之间的虚线连线表示:不同的外部事件尝试完成不同的延迟操作。

生产者追加消息集到主副本的本地日志、备份副本读取主副本的本地日志,这两者看起来没有什么关联关系。但是在服务端中,通过“延迟的生产”和“延迟的拉取“,两者联系在一起了。理解上面几条虚线之间的关系,要理解下面两个问题。

(1)当不能返回响应结果时,什么限制条件需要让服务端创建一个延迟操作对象。
(2)当发生限制条件对应的外部事件时,就可以尝试完成第一步创建的延迟操作。

如图6-47所示,生产者追加消息创建延迟的生产(问题1),它的限制条件是:所有备份副本发送应答给主副本。当备份副本投取消息,表示备价副本发送应答给主副本,就会尝试完成延迟的生产请求(问题2)。同样地,备份副本拉取消息创建延迟的拉取(问题I),它的限制条件是:拉取到足够的消息。~生产者追加消息到主副本后,表示有新的消息,就会尝试完成延迟的拉取请求(问题2)。

关于延迟操作和延迟缓存,以及延迟操作的尝试完成、真正完成,会在6.3节再深入分析,接下来看存储层上的分区。副本管理器追加消息到分区,或者从分区拉取消息,都需要操作分区对象。

注意:分区是Kafka的一个重妥概念,Kafka源码中和分区相关的类也有很多。比如,2.1.1节的分区信息(Partitioninfo)记录了分区的信息,包括主题、分区、主副本、所有副本、同步的副本。3.2.3节的分区主题信息对象(PartitionTopicinfo)记录了消费者的分配信息,包括主题、分区、队列、偏移量、拉取大小。
另外,还有两个只记录了主题和分区编号的对象一一TopicAndPartition和TopicPartition,前者是一个Scala类,后者是一个Java类,它们主要用在客户端。下面分析的分区对象(Partition)则用于服务端。如果把客户端的两个分区类看作元状态的,服务端则是有状态的。服务端的分区对象除了有主题和分区编号,它还要管理所有的副本,包括主副本、备份副本。

6.2.1 副本管理器相关推荐

  1. 95-10-080-启动-replicaManager副本管理器

    文章目录 1.视界 1 概述 2 入口方法 3. 启动方法 4. 线程isr-expiration 4.1 maybeShrinkIsr 4.1.1 getOutOfSyncReplicas 4.1. ...

  2. spring 配置只读事务_只读副本和Spring Data第3部分:配置两个实体管理器

    spring 配置只读事务 我们之前的设置可以正常工作. 我们现在要做的是进一步发展,并配置两个单独的实体管理器,而不会影响我们之前实现的功能. 第一步是将默认实体管理器配置设置为主要配置. 这是第一 ...

  3. 只读副本和Spring Data第3部分:配置两个实体管理器

    我们之前的设置可以正常工作. 我们现在要做的是进一步发展,并配置两个单独的实体管理器,而不会影响我们之前实现的功能. 第一步是将默认的实体管理器配置设置为主要配置. 这是第一步 package com ...

  4. javascript原理_JavaScript程序包管理器工作原理简介

    javascript原理 by Shubheksha 通过Shubheksha JavaScript程序包管理器工作原理简介 (An introduction to how JavaScript pa ...

  5. python笔记2(函数 面向对象 文件编程 上下文管理器)

    记录python听课笔记 文章目录 记录python听课笔记 一,函数 1.介绍python里的函数 2.用户自定义函数 3.变量的作用域 4.参数的传递 5.参数的默认值 6.向函数内部批量传递数据 ...

  6. 转载:Hyper-V管理器和SCVMM 2008 R2区别

    Hyper-V管理器和SCVMM 2008 R2区别 2010-05-27 12:22:19 转载:http://yewind.blog.51cto.com/33144/322090 标签:管理器 S ...

  7. django 1.8 官方文档翻译: 2-5-1 管理器 (初稿)

    Django 文档协作翻译小组人手紧缺,有兴趣的朋友可以加入我们,完全公益性质. 交流群:467338606 网站:http://python.usyiyi.cn/django/index.html ...

  8. django 1.8 官方文档翻译: 2-5-1 管理器

    管理器 class Manager 管理器是一个接口,数据库查询操作通过它提供给django的模型.django应用的每个模型至少拥有一个 管理器. 管理器类的工作方式在 执行查询文档中阐述,而这篇文 ...

  9. 从零开始山寨Caffe·叁:全局线程管理器

    你需要一个管家,随手召唤的那种,想吃啥就吃啥. --设计一个全局线程管理器 一个机器学习系统,需要管理一些公共的配置信息,如何存储这些配置信息,是一个难题. 设计模式 MVC框架 在传统的MVC编程框 ...

最新文章

  1. python简单目标检测代码_Python Opencv实现单目标检测的示例代码
  2. 隔离级别(未提交读、提交读、可重复读、可串行化)、多版本并发控制、Next-Key Locks(Record Locks、Gap Locks)
  3. golang协程进行同步方法
  4. 60道Python面试题答案精选!找工作前必看
  5. 进阶篇-安卓系统:2.多点触控的交互处理
  6. SpringCloud Greenwich(七)集成dubbo先启动消费者(check=false),然后启动提供者无法自动发现注册
  7. 接口隔离原则(ISP)
  8. 《微软的软件测试之道》读书笔记 之 结构测试技术
  9. React:Conditional Rendering(条件渲染)
  10. 如何取到两个日期中的每一天,并且打印出来
  11. 【优化求解】基于matalb改进的遗传算法(GA+IGA)求解城市交通信号优化问题【含Matlab源码 213期】
  12. Tomcat日志分割
  13. 5 三层交换机实现VLAN间路由
  14. [Err] 1267 - Illegal mix of collations (utf8_unicode_ci,IMPLICIT) and (utf8_general_ci,IMPLICIT)
  15. win7网络里计算机登录失败,Win7访问网上邻居提示“登陆失败”原因及解决方法...
  16. 氟虫腈-13C2,15N2同位素内标的基质效应
  17. java微信二维码登录
  18. python 拆包_python3拆包详解
  19. 【从零开始】阿里云服务器配置web开发环境及部署本地web项目
  20. 大一c语言课程设计答辩ppt,c语言课程设计讲解内容图文.ppt

热门文章

  1. 7-12 编程实现两个分数相加
  2. 入侵网站的黑客被我抓到了!
  3. Streamline(流线),Pathline(迹线),Streakline(脉线) and Timeline(时间线)
  4. Machine Learning系列--L0、L1、L2范数
  5. C# 实现视频监控系统(附源码)
  6. Deep Retinex Decomposition for Low-Light Enhancement 论文阅读笔记
  7. c++对象的生存期 静态生存期和动态生存期
  8. ORA-01012: not logged on处理
  9. SI(crosstalk)对common path的影响(CPPR)
  10. 触动精灵怎么向服务器发送消息,触动精灵 函数说明及使用方法