4.1.5 消费者获取记录

拉取器处理拉取响应时已经将原始的响应数据封装成了分区记录集,并放到全局的成员变量this.records中 。 但要真正被消费者可用,还需要封装成消费者记录( ConsumerRecord )。 相关代码如下:


拉取器的获取记录集方法会使用414节“处理拉取响应”方法生成的全局成员变茧,作为数据源构成最终的拉取结果。既然数据已经在全局成员变量中了,那么要提供给客户端使用,就可以直接返回。但实际上拉取器在这一步还做了下面几点优化。

  • 一次轮询发送两次拉取请求,必须确保第一个请求获取到结果后,才允许发送第二个请求。
  • 全局的thisrecords成员变革不会同时存放两个请求的拉取结果。
  • 客户端轮询时可以设置每个分区的拉取阔值和最大记录数,防止客户端处理不了。
  • 如果分区的记录集没有被客户端处理完,新的投取请求不会拉取这个分区。
  1. 保存每次拉取结果的全局成员变量

拉取器的thisrecor也是个全局变量,在客户端的一次轮询里会发送两次拉取请求。虽然第二次发送请求后是无阻塞的快速轮询,但第二次的请求也可能立即产生结果。而每个拉取请求的回调方法都会将向己请求的拉取结果添加到全局变量中。为了保证同一次轮询里两个拉取请求的结果数据不会互相泪淆,必须确保第一个请求获取到结果后,才允许发送第二个请求。

如图4-17(左)所示,如果没有获取到第一个请求的结果就发送第二个请求,快速轮询返回的结果也会放到全局变量中。最后客户端获取的全局变量包括了两个请求的结果,显然有问题。如图4-17(有)所示,快速轮询的第二个请求结果会先用临时变量保存。当第一个请求的结果返回给客户端时,会将临时变量赋值给全局变量,第二种方案的具体步骤如下。

{1)拉取器发送第一个请求,并轮询得到结果,放入全局变革’中。
(2)拉取器获得第一个请求的记录集。
(3)拉取器发送第二个请求,并快速轮询得到结果,暂存到临时变量。
(4)将步骤(1)生成的全局变量返回给客户端。
(5)将步骤(3)暂存的临时变量赋值给全局变量,用于下一次的轮询。

注意:两个请求不应该操作同一个全局变量,否则第二次请求产生的结果掺杂在全局变量中,
就会导致返回给客户端的结果不准确。

上面第二种做法虽然保证了返回结果的准确性,但是在具体的实现上一旦返回结果给客户端,就不好做步骤(5)的控制。一种更好的办法是用一个临时变量(drained)来保存第一个请求的结果,返回给客户端的结果也是这个临时变量。实际上,发送拉取请求→处理拉取结果→添加到全局变量→将全局变量赋值给临时变量→清空全局变量这几步是严格有序的,下一个请求只能接着上一个请求的最后一步“请空全局变茧”开始执行。第一个请求的临时变量肯定不会包含第二个请求的拉取结果:在生成第一个请求的临时变革时,第二个请求根本就还没有机会执行;而第二个请求开始执行时,第一个请求的临时变量已经尘埃落定,不会再被更改了。所以用临时变量作为第一个请求的拉取结果返回值是没有问题的。

如图4-18所示,还可以将步骤(5)提前到步骤(4)之前,并且将步骤(3)和步骤(5)进行合并:即不需要再用一个中间变盐(tl’lpRecords),而是直接更新全局变量。虽然步骤(孙中将临时变盘赋值给全局变量会更新全局变盐的值,但是因为最后要返回给客户端的并不是这个全局变量,而是临时变量,所以结果仍然是准确的。以第一次轮询为例,全局变量和临时变量的变化步骤如下。

(l)拉取器发送第一个请求,并轮询得到结果,放入全局变量中。
(2)拉取器获得第一个请求的记录集,将全局变量赋值给临时变量,并清空全局变量。
(3)拉取器发送第二个请求,并快速轮询得到结果,也放入全局变量中。
(4)拉取器获取步骤(2)的临时变革作为返回值。

如图4-19所示,以第二次轮询为例,全局变量和临时变量的变化步骤如下。

(1)拉取器的上一次轮询会将第二个请求的结果放入全局变量中。
(2)拉取器获得第二个请求的记录集,将全局变革赋值给临时变盏,井清空全局变盘。
(3)拉取器发送第三个请求,并快速轮询得到结果,也放入全局变盘中。
(4)拉取器获取步骤(2)的临时变量作为返回值。

如果在同一次轮询中,全局变革表示的第一个请求结果被赋值给临时变量后被清空,而客户端的快速轮询没有(或者没那么快)产生第二个请求结果放到全局变量中,那么,在上面两种轮询场景中,没有步骤(3)往全局变革-放入新请求的结果,全局变量最后就是空的。下一次轮询时要获取全局变量,它所表示的第二个请求结果就没有数据,只能等待第二个请求的回调方法执行后才有数据。因为两个请求执行的时间顺序没有任何交集,所以两次拉取请求不会互相影响,而且两次请求允许拉取的分区,都是分配给消费者的所有分区。

  1. 设置分区拉取闹值

KafkaConsu附riftJ用一次轮询方法只是拉取一次消息。客户端为了不断拉取消息,会用一个外部循环不断调用消费者的轮询方法。每次轮询到消息,在处理完这一批消息后,才会继续下一次轮询。但如果一次轮询返回的结果没办法及时处理,会有什么后果呢?比如服务端会约定客户端要在指定的会话时间内,必须发送一次轮询请求。如果客户端处理一批消息花费的时间超过了会话超时时间,就会导致下一次轮询没有被及时地调用,服务端可能就会把消费者客户端移除掉,显然这不是我们希望看到的。相关代码如下:

那么有什么办法来解决上面的问题呢?客户端拉取消息时有下面两个相关的配置项。

  • 消息大小闹值(MessageMaxbytes。服务端允许接收一条消息的最大字节,超过这个大小的消息不会被服务端接受,默认值为1000012(976阻)。
  • 分区拉取阔值(Maxpart’it’ionfetchbytes)。客户端拉取每个分区的消息时,返回的每个分区最大字节,默认值为1048576(lMB)。

注意,分区拉取阔值必须比消息大小阔值大。如图4-20所示,假设服务端设置消息大小阔值等于lMB,表示最大允许接收lMB的消息。如果分区拉取阔值设置为512KB,低于lMB,对于那些小于lMB但大于512阳的消息,就永远无法被消费者获取到,因为服务端返回的分区消息最多只有512KB。

消息大小阔值是服务端的选项,用户通常无法直接控制,但Kafka针对主题级别还提供了另外一个配置项m川咽ssagebytes,用来控制消息大小的阔值。分区拉取阔值是消费者客户端的选项,每个消费者都可以向定义这个阔值大小。所以如果消费者的处理性能不够好,可以将分区拉取阔值设置低一点,保证每次拉取的分区数据都能很快地处理完成。

  1. 设置轮询记录阎值

上面客户端伪代码中的轮询方法每次拉取到多少条消息,都要一次性处理完。下面的代码精简了拉取器获取记录的其他细节,最后返回的临时变量实际上就是全局变量的值,即通过拉取器获取到多少条记录,都会全部返回给客户端。客户端也必须一次性全部处理完所有也记录:3设置轮询记录阎值上面客户端伪代码中的轮询方法每次拉取到多少条消息,都要一次性处理完。下面的代码精简了拉取器获取记录的其他细节,最后返回的临时变量实际上就是全局变量的值,即通过拉取器获取到多少条记录,都会全部返回给客户端。客户端也必须一次性全部处理完所有也记录:

假设客户端一次拉取到了10000条消息,如果处理时并不想要(或者根本没办法)一次性处理完,而是期望按照每次100条分批处理,就做不到了。前面的分区拉取阔值选项只能用来控制拉取的消息大小,但无法精确控制消息的数量。新版本(010)的消费者客户端可以通过设置轮询记录阔值(!'laxpollrecords),控制客户端调用一次轮询方法最多允许处理多少条记录,默认值为2147483647(21亿)。实际上,这是一种批处理的方式,若一次处理不完全部记录,就会分成多次。

轮询记录阔值配置项表示的并非“服务端最多返回这些记录”,而是“客户端的一次轮询最多能处理多少条记录”。比如客户端发送一次拉取请求从服务端得到了1000条记录,但是客户端最多一次只处理100条记录,那么客户端需要分10次,才能处理完一次拉取请求的所有数据。下面的3段伪代码展示了客户端处理记录集的不同方案:

如图4-21(上)所示,(对应方案一)在获取记录集时将全局变量赋值给临时变量,并且清空了全局变毡,但客户端只处理小批量的记录就结束了,因此会丢失剩余的记录。解决这个问题的办法是(方案二):在客户端循环处理获取到的所有记录,每次只处理一小批数据。但这种方法无法解决客户端每次轮询时必须处理完所有拉取记录,才能再次执行新的轮询调用的问题,仍可能出现前面说过的“处理超时”问题。

如图4-21(下)所示(对应方案三),将全局变量添加到临时变量时,就限制消息数量,确保通过“获取记录集”返回的|临时变量只是一小部分消息,足够保证客户端的处理性能。同时,剩余未被处理的记录会继续留在全局变盘中。当下一次轮询时会继续获取全局变量中剩余的记录。即使下一次轮询时拉取到新的消息,也会一起放入全局变量。并且,我们还能保证上次剩余的消息相较于新拉取的消息会被优先处理。

注意:尽管方案三限制了一次处理的消息数量,会导致未被处理的消息在客户端不断“积累”,但总比丢失消息妥好得多。况且,客户端处理消息的速度本身娘不上拉取的消息量,有消息堆积也是情有可原的。

  1. 分区的记录集分多次消费

批处理的实现方式是,调用拉取器的fetchedRecords()方法从全局变盘中获取数据,一次最多只返回l’laxRecords条记录。因为全局变量包括了分配给消费者所有分区的数据,而每个分区可能需要调用多次获取记录的方法才会全部返回,所以在迭代每个分区的记录集时,只有一个分区完全被消费完,才会从迭代器中移除。相关代码如下:

一个分区记录集对象包括分区信息、记录集以及拉取偏移量。可能有些分区记录集的记录数比轮询记录|现值(l’laxRecords)配置项要多,那么拉取器每次获取一批记录集的方法就需要调用多次,才能完全消费完这个分区。相关代码如下:

在分区记录集对象的take()方法返回最多l’laxRecords条记录给“获取记录集”方法之前,会更新分区记录集对象的拉取变盏,然后在“获取记录集”方法中也会更新订阅状态中分区状态的拉取偏移盘,即分区记录集对象的拉取偏移盐和分区状态的拉取偏移量要保持一致的数据。

如图14-22所示,新AP1巾构建拉取请求使用的拉取偏移量来向于分区状态的拉取偏移量,而不是分区记录集对象。分区记录集表示的是拉取到的分区数据结果只会用于将分区结果放入全局变ill中,它的作用并不是很大。拉取器获取拉取记录集后更新偏移盘的具体步骤如下。

(1)拉取器发送拉取请求,将订阅状态中分区的position作为拉取偏移量。
(2)消费者收到拉取请求后,创建存储拉取结果的分区信息对象,并存储到全局变革。
(3)消费者调用拉取器的获取记录集方法,会从步骤(2)的全局变盘中获取数据。
(4)为了保证应用程序处理记录的性能,会对每次返回的记录集数量进行限制。
(5)在返回记录集给应用程序之前,会更新订阅状态中分区信息的position。

注意:在拉取过程中,持有“拉取偏移量”这个语义的不同对象都妥保持数据的一致性。就像高级AP1中,分区信息对象的拉取偏移量要和拉取线程中拉取状态的拉取偏移量保持一致。

  1. 新请求不会拉取没有处理完的分区

采用轮询记录阔值每次只处理一小批记录,而不是拉取到的全部记录。那么每个分区记录集可能会分成多次才被完全处理,这就带来了一个新的问题:如果这个分区还没有被客户端处理完成,新的拉取请求就不会处理这个分区。相关代码如下:

注意:如果没有采用批处理的方案,才主耳又器获取的所有记录会被一次性处理完成,说明所有分区的记录集都会被完全处理,那么新的拉取请求总是可以拉取分配给消费者的所有分区。

全局变量(thisrecords)表示的是“未被客户端处理的所有记录”,获取记录集方法中的临时变量(drained)表示的是“本次会被客户端处理的记录”。每次调用获取记录集方法,最多只会从全局变量取出l’laxRecords条记录放入临时变盏,返回给客户端处理,而剩余未被客户端处理的记录仍然保留在全局变盘中。拉取器在发送新的拉取请求时,如果分区记录集仍然存在全局变量,这个分区就不需要拉取。

如表4-4所示,假设“轮询记录阔值”为100,消费者分配了[PO-P9]共10个分区,每个分区都有1000条消息。第一次轮询后只会处理分区PO序号为[0-100)的100条消息。此时全局变量中的分区仍然是[PO-P坷,所以下一次发送拉取请求时,不会拉取任何分区,因为分配给消费者的10个分区都还没有处理完。分区PO的1000条消息要分成10次才能消费完,所以调用10次“获取记录集”方法(对应10次轮询)后,分区PO才会从全局变量中移除,接着才会处理Pl分区的消息。这时如果再次发送拉取请求,才会开始拉取分区内的现消息。

以第11次轮询为例,之前拉取请求返回的分区PO,它的所有消息已经都消费完成了(第一次到第十次轮询),所以新的拉取请求允许拉取分区PO。全局变量中还保留了[Pl-P9]共9个分区,调用“获取记录集”方法返回的是分区Pl的记录集。因为分区Pl的记录集也有1000条,超过了“轮询记录阔值”的JOO条限制,所以分区Pl仍然还在全局变量中。又因为新拉取请求返回了分区内的新消息,所以分区PO也会重新加入全局变盘中。最终全局变量的分区顺序是[Pl-P9,PO],注意:分区PO在最后,它表示的是第二次拉取请求的新消息,而分区[Pl-P9]还只是第一次拉取请求的旧消息。

本节分析了拉取器的 “获取记录集”方法的多个优化方案 , 这个方法是在消费者的轮询 中调用,并不是由消费者客户端应用程序调用的 。 否则 ,如果要让应用程序获取记录集,就要把消费者内部的“拉取器”对象也暴露给应用程序代码 。 应用程序应该只要调用消费者的轮询方法,就可以得到需要的数据。

消费者的轮询方法封装了拉取消息的流程,主要包括3个步骤 : 发送拉取请求 、 网络层轮询 、 获取记录集。 如果最后一步获取记录集没有得到数据,并且在超时时间内,轮询方法会再次发送拉取请求,并执行网络轮询,直到有数据返回给应用程序,供其进行实际的业务逻辑处理。

4.1.5 消费者获取记录相关推荐

  1. sql表格模型获取记录内容_SQL Server和BI –如何使用Excel记录表格模型

    sql表格模型获取记录内容 介绍 (Introduction) A few weeks back I had been working on an interesting proof of conce ...

  2. sql表格模型获取记录内容_SQL Server和BI –如何使用Reporting Services 2016记录表格模型

    sql表格模型获取记录内容 介绍 (Introduction) A few weeks back I had been working on an interesting proof of conce ...

  3. 解决OleDbDataReader重新获取记录时,使用GetString()方法出错的问题

    问题是这样的,我写了一个方法,在程序中我用定时器不停的调用此方法,该方法中的主要代码如下: if (dbReader.Read()) //判断是否已经读完 {Char=dbReader.GetStri ...

  4. spark rdd map java_如何在spark RDD(JavaRDD)中获取记录的文件名

    我正在使用多个文件加载到 JavaRDD中 JavaRDD allLines = sc.textFile(hdfs://path/*.csv); 加载文件后,我修改每条记录并想保存它们.但是,我还需要 ...

  5. python网页内容获取记录pkg

    最近为了获取网页数据,积累了一些经验,这里记录 一下.网页内容获取用python真的是很好用,编写代码也快,偶尔有一些Bug需要调一下.这里记录一下常用的包 bs4-----网页内容解析,还有一个好用 ...

  6. mongodb查询数据库表里总记录数count_documents()和获取记录里面的name字段的值docView[“name“].get_utf8().value.to_string()

    1.先开始写点起因,就是最近开始搞mongodb的c++开发,结果呢,光是装各种驱动就花了两天.再加上视频教程里的testmongo.cpp的代码,在新版本驱动下根本就不能运行,我就想着重写一下,功能 ...

  7. Oracle rownum 用法 --使用rownum 实现获取记录

    实现以下查询: (1)获取前两条的记录: (2)获取第三条到第六条的记录: 表 student ,记录数8条 (1)获取前两条的记录 直接生成 select * from student where ...

  8. rowspan 动态变化_php – 从数据库中获取记录时的动态rowspan

    抱歉我的英语不好: 在这里,我回答了这个问题 How to show data from database with dynamic rowspan.再次让我试着回答这个问题.首先,以免我们在mysq ...

  9. 通过getGeneratedKeys获取记录的主键

    Connection con=null;PreparedStatement ps=null;ResultSet rs=null;try {//建立连接con= JDBCUtils.getConnect ...

  10. rabbitmq消费者获取消息慢_RabbitMQ:快速生产者和慢速消费者

    我有一个应用程序使用RabbitMQ作为消息队列来发送/接收两个组件之间的消息:发送者和接收者.发送者以非常快的方式发送消息.接收器接收到该消息,然后执行一些非常耗时的任务(主要是为非常大的数据大小编 ...

最新文章

  1. [转] Transact_SQL手册
  2. case when用法java_Oracle CASE WHEN 用法介绍
  3. BAT的前端,不是技术牛就够了!还应该锻炼这些能力
  4. 将小写金额转换成大写金额[存储过程版]
  5. BZOJ 1066[SCOI2007]蜥蜴
  6. 高等数学常用符号大全及符号的含义
  7. Linux - 增加用户、添加用户组
  8. 世界33种名车标志及来历
  9. python将.tif格式图批量转化为.jpg格式图
  10. Google Sketchup 三维建模软件(含建筑装修摆设资源,包括 office)
  11. 通常在计算机中连接硬盘驱动器的接口为,解决方案:如何在Win10系统中向计算机添加硬盘驱动器...
  12. STM8S003FP6 TIM4配置
  13. kubernetes 亲和、反亲和、污点、容忍
  14. 准相位匹配二阶频率转换
  15. 怎么把计算机里的文件放到桌面上,ipad怎么把文件放到桌面
  16. 测绘资质办理需要注意的流程和规定
  17. 2018年年初的面试经验谈
  18. PostgreSQL数据库之国际化语言支持学习总结
  19. 百分百医学论文发表网是个骗子网站
  20. ImmuCellAI | 免疫浸润计算工具 R包学习

热门文章

  1. 深度挖掘积分墙,积分墙到底好不好?
  2. 前后端交互、Node、npm、Express、mysql基础
  3. node.js核心模块实例应用,基于nods.js环境向json文件添加数据
  4. android word分页,word文档如何设置分页以及取消分页
  5. 从零開始搭建微信硬件开发环境全过程——1小时掌握微信硬件开发流程
  6. 人工智能全球 2000 位最具影响力学者榜单
  7. 完整版PayPal支付(java后端教程)
  8. 计算机右键管理 已停止工作,管理器停止工作,详细教您怎么解决资源管理器已停止工作...
  9. 正态分布的峰度和偏度分别为_偏度与峰度的正态性分布判断
  10. 微信公众号程序开发接入流程