接着上文kafka的简述,这一章我们一探kafka生产者是如何发送消息到消息服务器的。

代码的入口还是从

kafkaTemplate.send开始

最终我们就会到

org.springframework.kafka.core.KafkaTemplate#doSend方法

这里的关键就是

org.apache.kafka.clients.producer.Producer#send(org.apache.kafka.clients.producer.ProducerRecord<K,V>, org.apache.kafka.clients.producer.Callback)

我们再一路点击下去,一直到

 org.apache.kafka.clients.producer.KafkaProducer#doSend方法

这里将步骤分为五步

1.更新Metadata,Metadata用于存储部分topic数据
2.将发送内容序列化
3.如果我们有多个分区的话,在这里会根据算法选择相应的分区
4.向accumulator写入数据,accumulator是一种ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;结构,在这里对发送数据做零时缓存 5.缓存的够多了,唤醒线程发送数据。

所以看到这里我们就明白了,kafka不是直接将数据发送到服务器。而是缓存到内存中,知道大于batchsize才去做发送

接下来我们看下sender线程做了什么

直接来到

org.apache.kafka.clients.producer.internals.Sender#run(long)

1.连接的获取,

org.apache.kafka.clients.NetworkClient#initiateConnect

具体的connect代码如下

首先与kafka serve端建立了一个non blocking 的SocketChannel,然后将该channel注册到一个java.nio.channels.Selector上面,并注册OP_CONNECT事件。

接下来,我们再看下消息的发送

首先调用

client.send(request, now);

这个方法最终会调用

org.apache.kafka.common.network.KafkaChannel#setSend

为每个request注册

OP_WRITE事件

同时把send传递进来

接下来调用

this.client.poll(pollTimeout, now);

这个的调用链是

org.apache.kafka.common.network.Selector#poll----> org.apache.kafka.common.network.Selector#pollSelectionKeys--->

这里的

key.isWritable()

就是我们上文注册写事件,当所有的都准备好了,我们调用channel将消息发送到服务端

到这里我们就知道了kafka发送消息的大致流程。本文并没有对细节深入,只想对kafka做出快速的了解。

转载于:https://www.cnblogs.com/xmzJava/p/9536351.html

Kafka Producer源码简述相关推荐

  1. Kafka Eagle 源码解读

    1.概述 在<Kafka 消息监控 - Kafka Eagle>一文中,简单的介绍了 Kafka Eagle这款监控工具的作用,截图预览,以及使用详情.今天笔者通过其源码来解读实现细节.目 ...

  2. kafka消费者源码解析

    在分析kafka消费者源码之前,我先提出以下问题,大家带着问题去看. 1.消费者如何知道需要消费的topic分区分布在集群的哪些broker上呢? 2.由于消费组内存在多个消费者消费同一topic的情 ...

  3. 读Kafka Consumer源码

    最近一直在关注阿里的一个开源项目:OpenMessaging OpenMessaging, which includes the establishment of industry guideline ...

  4. KClient——kafka消息中间件源码解读

    目录 kclient消息中间件 kclient-processor top.ninwoo.kclient.app.KClientApplication top.ninwoo.kclient.app.K ...

  5. kafka 生产者源码解析

    为学日益,为道日损,损之又损,以至于无为,无为而无不为 0x01: 概述 kafka作为大数据领域消息系统一哥,其架构与代码设计十分巧妙与优雅,从中我们可以学习与借鉴到很多分布式高性能并发与缓存方案, ...

  6. Flink kafka connectors 源码详解---<1>

    先抛几个简单问题,1问, 4个topic,每个topic 5个分区,问并行度10 ,这个并行度是怎么划分这些topic 分区的.2问,topic 分区 动态更新怎么做的.3问,就1问中的tm 是怎么产 ...

  7. Kafka生产者源码解析

    在讲解之前,我们带着以下疑问去看 1.生产者客户端如何获取要生产数据的topic元数据: 2.生产者如何组装消息: 3.生产者组装好消息后是直接发送到broker端吗? 4.消息是如何发送到broke ...

  8. 跟我学Kafka源码Producer分析

    2019独角兽企业重金招聘Python工程师标准>>> 跟我学Kafka源码Producer分析 博客分类: MQ 本章主要讲解分析Kafka的Producer的业务逻辑,分发逻辑和 ...

  9. 刚看完 Kafka 源码,各位随便问!

    Kafka 因其优越的特性广泛用于数据传输.消息中间件的设计.开发和维护等方面,也得到越来越多大厂(阿里.美团.百度.快手等)的青睐,很多 IT 界前辈更是在技术层面不断深挖.最近有位后端三年的朋友在 ...

最新文章

  1. 利用数据寿命增加基于闪存的存储类存储器的使用寿命
  2. ios(iphone/ipad)一个简单的用代码判断当前设备的方法
  3. c程序设计语言中printf,编程序 用getchar函数读入两个字符c1和c2,然后分别用putchar和printf函数输出这两个字符。请问...
  4. FTP服务器管理【Linux运维之道之脚本案例】
  5. linux shell 判断文件 修改时间和系统时间差
  6. 基于OpenCV高斯模型的公共场景人流量统计
  7. 从 Vuex 0.6.x 迁移到 1.0
  8. c++ 回调函数与std::function使用实例
  9. 【大数据论文笔记】大数据技术研究综述
  10. xfire客户端对返回list很挑剔,所以需要使用泛型。
  11. 绿茶软件测试自学,7号心理测试小程序
  12. composer 更新php位置,使用composer安装/更新PHP包
  13. AlphaGo浅析——浅析卷积神经网络
  14. 串标是什么意思?用同一台电脑制作或上传投标书,会被认定为串标吗?
  15. 教你如何批量下载QQ相册或是手机相册里原照片
  16. 很多次游戏的最后取胜实际上都有很强的偶然性
  17. ZOJ - 3939 The Lucky Week【简单方法】
  18. D类数字功放输出接到传统模拟功放,结果效果奇差,看来没这么简单
  19. 《Asp.Net MVC 》复习题目
  20. 每月一书(202201):《知识星球绝非偶然》

热门文章

  1. Swift - 添加纯净的Alamofire
  2. Loading 遮蔽层 简单实现。
  3. 关键字restrict简介
  4. 使用线程——创建线程
  5. Oracle 存储过程错误之PLS-00201: 必须声明标识符
  6. 『协议』XML-RPC 协议规格说明
  7. android传递数据bundle封装传递map对象
  8. WPF自定义控件与样式(1)-矢量字体图标(iconfont)
  9. PXE装机+kickstart无人值守安装
  10. 服务器USB启动故障一例