当kafka发送消息的时候,在完成消息的序列化之后,如果没有指定消息的分区,将会通过Partitioner来选择该消息发往的分区,在默认情况下,将采用DefaultPartitioner来进行消息的分区选择。

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();if (keyBytes == null) {int nextValue = nextValue(topic);List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);if (availablePartitions.size() > 0) {int part = Utils.toPositive(nextValue) % availablePartitions.size();return availablePartitions.get(part).partition();} else {// no partitions are available, give a non-available partitionreturn Utils.toPositive(nextValue) % numPartitions;}} else {// hash the keyBytes to choose a partitionreturn Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}
}private int nextValue(String topic) {AtomicInteger counter = topicCounterMap.get(topic);if (null == counter) {counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);if (currentCounter != null) {counter = currentCounter;}}return counter.getAndIncrement();
}

首先获得发送消息topic的分区数,如果消息定义了key,那么将会根据key的hash来选择具体发送到的分区编号,如果没有,则通过nextValue()方法内部维护了一个AtoimcInteger随着消息的个数顺序而增长而与总分区数取余达到分区轮流存放的目的。

而后,将会把所要发送的消息放入到客户端的缓冲区中等待发送。

在kafka客户端中,一份消息的内存被抽象为一份MemoryRecordsBuilder,用来存放具体的消息信息以及消息被序列化后的实体。而MemoryRecordsBuilder将会被包上一层ProducerBatch,这个ProducerBatch则是一组同样topic和分区的MemoryRecordsBuilder的集合。若干个同一个topic分区的ProducerBatch将会被保存在一个队列中,等待被获取信息被发送。

以上逻辑实现在kafka客户端的RecordAccumulator中。

Deque<ProducerBatch> dq = getOrCreateDeque(tp);
synchronized (dq) {if (closed)throw new KafkaException("Producer closed while send in progress");RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);if (appendResult != null)return appendResult;
}

每个topic加分区都会有一个对应的队列存放相应的ProducerBatch,当一条新消息进入时,将会取队列最末端的ProducerBatch加入消息。

MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));

如果消息的大小大于ProducerBatch的对应大小,将会重新从缓冲区中申请一分内存,来新建一个ProducerBatch加入到队列的尾部,存放消息的实体等待发送。

所以当客户端需要发送消息的时候,将会从队列的前端开始拉取消息进行发送。

当正式需要发送消息的时候,会直接拉取队列的第一个ProducerBatch,如果其中存储的消息实体大小小于消息体的剩余大小,将会全部加入到要发送的消息中,并移除该ProducerBatch。

kafka java客户端消息的分区与缓存发送相关推荐

  1. kafka Java客户端之 consumer API 消费消息

    背景:我使用docker-compose 搭建的kafka服务 kafka的简单介绍以及docker-compose部署单主机Kafka集群 使用consumer API消费指定Topic里面的消息 ...

  2. kafka Java客户端之 consumer API 多线程消费消息

    kafka consumer 线程设计 Kafka Java Consumer采用的是单线程的设计.其入口类KafkaConsumer是一个双线程的设计,即用户主线程和心跳线程. 用户主线程,指的是启 ...

  3. kafka java消费者消息拉取

    版本2.4.0 Kafka的客户端消费者在启动的过程中会通过ensureActiveGroup()方法来确保自己是可用的消费者,在这个方法中,会向kafka的broker集群发送join请求,在joi ...

  4. Kafka Java客户端Stream API

    Kafka Stream概念及初识高层架构图 Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature,它提供了对存储于Kafka内的数据进行流式处理和分析的功能.简 ...

  5. Kafka JAVA客户端代码示例--高级应用

    2019独角兽企业重金招聘Python工程师标准>>> 什么时间使用高级应用? 针对一个消息读取多次 在一个process中,仅仅处理一个topic中的一组partitions 使用 ...

  6. kafka java客户端编程

    kafka_2.10-0.8.1.1 maven <dependencies> <dependency> <groupId>org.apache.kafka< ...

  7. kafka Java客户端之consumer 流量控制 以及 Rebalance解析

    Consumer 流量控制 为了避免Kafka中的流量剧增导致过大的流量打到Consumer端将Consumer给压垮的情况,我们就需要针对Consumer进行限流.例如,当处理的数据量达到某个阈值时 ...

  8. kafka Java客户端之Connect API

    kafka Connect 简单介绍 Kafka Connect 是一个可扩展.可靠的在Kafka和其他系统之间流传输的数据工具.它可以通过connectors(连接器)简单.快速的将大集合数据导入和 ...

  9. Kafka | Java 消费者是如何管理TCP连接的? | 极客时间

    今天我要和你分享的主题是:Kafka 的 Java 消费者是如何管理 TCP 连接的. 在专栏中,我们专门聊过"Java生产者是如何管理 TCP 连接资源的"这个话题,你应该还有印 ...

最新文章

  1. node开发环境(mac)和线上环境(linux)搭建
  2. Keras入门(一)搭建深度神经网络(DNN)解决多分类问题
  3. 【机器学习入门到精通系列】大规模机器学习图示
  4. linux sh 字符截取,shell字符截断
  5. SAP固定资产、管理会计模块习题-针对END-USER
  6. const 内联 枚举 宏
  7. 设计事件驱动的微服务
  8. 多亏了这篇文章,我的开发效率远远领先于我的同事
  9. HTML中del标记是什么意思,HTML del标记
  10. VSCode打开中文乱码
  11. HDU-5781 ATM Mechine(概率DP)
  12. c语言编程入门教程for,C语言编程入门教程精 简版.ppt
  13. 贝叶斯网络python实现_在Python中使用贝叶斯网络的实例
  14. 建筑竞赛获奖项目解析国外教程
  15. matlab中sumf,使用SUMIF函数根据日期区间统计的方法
  16. Tomcat和Http协议详细解析
  17. 【理论知识学习32】归纳偏差与选择性偏差(概念作用以及举例说明)
  18. 御坂御坂题解(出自北航校赛) 约瑟夫环问题高效解决方案
  19. 使用AutoFac组织多项目应用程序
  20. 大数据_Hive_Hsql

热门文章

  1. 诗与远方:无题(六十二)
  2. Linux下如何从普通用户切换到root用户
  3. eDiary电子日记本
  4. Kafka源码解析 - 副本迁移任务提交流程
  5. spring——autowire自动注入
  6. php处理ubb代码,过滤UBB代码的php类
  7. csgo 机器人模式_分享一个休闲模式机器人Bug
  8. 网络编程之 osi七层协议
  9. 使用 ale.js 制作一个小而美的表格编辑器(1)
  10. spring配置jdbc连接oracle,mysql,sqlserver