一 Sender run方法调用流程

# 从Metadata获取集群元数据

# 调用RecordAccumulator.ready方法,根据RecordAccumulator的缓存情况,选出可以向哪些Node发送消息,返回ReadyCheckResult对象

# 如果ReadyCheckResult存在某些分区没有leader副本,则调用Metadata的requestUpdate方法,标记需要更新kafka的集群信息

# 针对ReadyCheckResult的readyNodes集合,循环调用NetworkClient的ready方法,目的是检测网络I/O方面是否符合发送条件,不符合发送条件的Node将会从readyNodes集合中删除

# 调用RecordAccumulator的drain方法获取待发送的消息集合

# 调用RecordAccumulator的abortExpiredBatchers方法处理RecordAccumulator中超时的消息

# 调用Sender的createProduceRequests方法,将发送的消息封装成ClientRequest请求

# 调用NetworkClient.send方法,将ClientRequest写入KafkaChannel的send字段

# 调用NetworkClient的poll方法,将KafkaChannel中send字段保存的ClientRequest发送出去,同时还会处理服务端发回的响应处理超时请求,调用用户自定义的函数等

void run(long now) {
    // 从Metadata获取集群元数据
   
Cluster cluster = metadata.fetch();
    // 根据RecordAccumulator的缓存情况,选出可以向哪些Node发送消息,返回ReadyCheckResult对象
   
RecordAccumulator.ReadyCheckResultresult = this.accumulator.ready(cluster, now);
    // 如果ReadyCheckResult存在某些分区没有leader副本,则调用Metadata的requestUpdate方法,标记需要更新kafka的集群信息
   
if (!result.unknownLeaderTopics.isEmpty()) {
        for (String topic : result.unknownLeaderTopics)
            this.metadata.add(topic);
        this.metadata.requestUpdate();
    }

// 针对ReadyCheckResult的readyNodes集合,循环调用NetworkClient的ready方法,
    // 目的是检测网络I/O方面是否符合发送条件,不符合发送条件的Node将会从readyNodes集合中删除
   
Iterator<Node> iter = result.readyNodes.iterator();
    long notReadyTimeout = Long.MAX_VALUE;
    while (iter.hasNext()) {
        Node node = iter.next();
        if (!this.client.ready(node, now)) {
            iter.remove();
            notReadyTimeout= Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
        }
    }

// 调用RecordAccumulator的drain方法获取待发送的消息集合
   
Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster, result.readyNodes,
        this.maxRequestSize, now);
    // 是否需要保证消息的顺序
   
if (guaranteeMessageOrder) {
        // 遍历record batch
       
for (List<RecordBatch> batchList : batches.values()) {
            for (RecordBatch batch : batchList)
                this.accumulator.mutePartition(batch.topicPartition);
        }
    }
    // 调用RecordAccumulator的abortExpiredBatchers方法处理RecordAccumulator中超时的消息
   
List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);

for (RecordBatch expiredBatch : expiredBatches)
        this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);

sensors.updateProduceRequestMetrics(batches);
    // 创建生产者请求
   
List<ClientRequest> requests = createProduceRequests(batches, now);

long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
    if (result.readyNodes.size() > 0) {
        log.trace("Nodes with data ready tosend: {}", result.readyNodes);
        log.trace("Created {} producerequests: {}", requests.size(), requests);
        pollTimeout = 0;
    }
    // 将ClientRequest写入KafkaChannel的send字段
   
for (ClientRequest request : requests)
        client.send(request, now);
    // 调用NetworkClient的poll方法,将KafkaChannel中send字段保存的ClientRequest发送出去,
    // 同时还会处理服务端发回的响应处理超时请求,调用用户自定义的函数等
   
this.client.poll(pollTimeout, now);
}

二 创建请求

我们先分析ProduceRequest和ProduceResponse消息体格式:

api_key: API标识

api_version: API版本号

correaltion_id: 一个单调递增序号

client_id: 客户端id

acks: 确认机制,0 不需要确认,1 只需要leader确认,-1所有副本都需要确认

timeout: 超时时间

topic: topic名称

partition: partition编号

record_set: 消息

correaltion_id: 一个单调递增序号

topic: topic名称

partition: partition编号

error_code: 错误码

base_offset: 服务端为消息生成的一个offset

timestamp: 瞬间戳

throttle_time_ms: 延长时间

private List<ClientRequest> createProduceRequests(Map<Integer, List<RecordBatch>> collated, long now) {// 保存创建的ClientRequest列表List<ClientRequest> requests = new ArrayList<ClientRequest>(collated.size());for (Map.Entry<Integer, List<RecordBatch>> entry : collated.entrySet())// 将发往同一个Node的RecordBatch封装成ClientRequestrequests.add(produceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue()));return requests;
}
private ClientRequest produceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) {Map<TopicPartition, ByteBuffer> produceRecordsByPartition = new HashMap<TopicPartition, ByteBuffer>(batches.size());final Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<TopicPartition, RecordBatch>(batches.size());// 将RecordBatch按照partiiton分类,同时构建集合for (RecordBatch batch : batches) {TopicPartition tp = batch.topicPartition;produceRecordsByPartition.put(tp, batch.records.buffer());recordsByPartition.put(tp, batch);}// 创建ProduceRequest和RequestSendProduceRequest request = new ProduceRequest(acks, timeout, produceRecordsByPartition);RequestSend send = new RequestSend(Integer.toString(destination),this.client.nextRequestHeader(ApiKeys.PRODUCE),request.toStruct());// 创建RequestCompletionHandler作为回调对象RequestCompletionHandler callback = new RequestCompletionHandler() {public void onComplete(ClientResponse response) {handleProduceResponse(response, recordsByPartition, time.milliseconds());}};// 创建ClientRequest对象return new ClientRequest(now, acks != 0, send, callback);
}

sender分析之创建请求相关推荐

  1. 权限控制相关数据表分析和创建

    权限控制相关数据表分析和创建 实体类分析:包含用户.角色.权限三大块 用户User,角色Role,权限Permission 为了方便进行动态的菜单管理,也就是不同权限用户进入到后台系统所看到的菜单是不 ...

  2. Tornado请求分析request, 获取请求参数

    全栈工程师开发手册 (作者:栾鹏) 架构系列文章 请求分析request WEB请求的那些事 这节中,我们来根据客户请求报文中的请求行(Request line) ,请求头部(Request Head ...

  3. java创建请求拦截器,HttpClient拦截器

    本篇文章帮大家学习HttpClient拦截器,包含了HttpClient拦截器使用方法.操作技巧.实例演示和注意事项,有一定的学习价值,大家可以用来参考. 拦截器(interceptor)是那些有助于 ...

  4. Django源码分析3:处理请求wsgi分析与视图View

    django源码分析 本文环境python3.5.2,django1.10.x系列 根据前上一篇runserver的博文,已经分析了本地调试服务器的大致流程,现在我们来分析一下当runserver运行 ...

  5. nginx日志分析查询异常请求IP之狙击网络黑客

    1 分析 使用nginx作为web端口分发时,只要请求服务器,nginx便会在access.log与error.log文件中留下记录,包括请求时间.请求方式.浏览器..以及访问的静态文件等信息. 网络 ...

  6. 报告分析工具创建完美的分析报告

    目录 前言 本书涵盖的内容 这本书需要什么 这本书是给谁的 约定 读者反馈 客户支持 下载示例代码 下载本书的彩色图像 勘误 海盗行为 问题 1. Pentaho报告简介 介绍Pentaho报告 引入 ...

  7. tp5源码分析之网络请求

    1 网络请求 网络请求 对客户端而言,指服务器发起的请求操作. 对服务器端而言,指客户端发起的请求信息. 服务器端主要用来对客户端发起的网络请求进行处理. 2 请求信息 2-1 Url相关 Reque ...

  8. SpringBoot(十二)启动流程分析之创建应用上下文AnnotationConfigServletWebServerApplicationContext

    SpringBoot版本:2.1.1      ==>启动流程分析汇总 接上篇博客Spring Boot 2.1.1(十一)启动流程分析之设置系统属性spring.beaninfo.ignore ...

  9. kubernetes源码分析-pod创建流程

    前言 首先放一张kubernetes的架构图.其中apiserver是整个架构的信息交互中心.所有组件都会与apisever交互. kubernetes中,每个node都部署了一个kubelet,通过 ...

最新文章

  1. Apache用户认证、域名跳转、Apache访问日志
  2. DedeCMS筛选简单实现方法不改后台源文件
  3. tf.squeeze
  4. Java并发编程之ConcurrentHashMap
  5. 借呗利息为什么比银行信用贷款高很多?
  6. 【C#程序设计】教学讲义——第一章:C#语言概述
  7. android之seekbar
  8. MyBatis 在xml文件中处理大于号小于号的方法
  9. OpenStack精华问答 | OpenStack是云吗?
  10. 广州市城市智能交通大数据体系研究与实践
  11. 默写标准答案0917
  12. 一个简单的Python爬虫
  13. mysql 7 for windows_Mysql 5.7 for windows 免安装版(解压版)安装和配置
  14. 不能不说的C#特性-表达式树
  15. 穿越火线显示无法连接服务器,穿越火线无法连接服务器是什么原因
  16. 谷歌的现实、摩托的无奈与联想的接盘
  17. Protocol Buffer 简介
  18. 安装Rhythmbox mp3插件
  19. about-page
  20. android动态修改桌面图标,Android动态更换桌面图标

热门文章

  1. Dubbo场景场景配置用法详解
  2. 【系统架构设计师】软考高级职称,一次通过,2017年下半年系统架构设计师考试论文真题(论软件架构风格)
  3. Must-read papers on deep learning to hash
  4. python是开源语言吗c,属于新十年的开发语言:Go语言可能很快会取代Python
  5. c语言内部堆排序的实现,内部排序之堆排序的实现详解
  6. haspusersetup 安装后har 蓝屏_电脑蓝屏90%的人不知道怎么办?
  7. 鸿蒙os在3月底推送,华为鸿蒙OS Beta 3将从3月31日起推送
  8. php 求 相似 比,php比较相似字符串的方法
  9. java linux解压_linux整套java环境解压版
  10. python统计缺失值