最近看kafka源码,着实被它的客户端缓冲池技术优雅到了。忍不住要写篇文章赞美一下(哈哈)。

注:本文用到的源码来自kafka2.2.2版本。

背景

当我们应用程序调用kafka客户端 producer发送消息的时候,在kafka客户端内部,会把属于同一个topic分区的消息先汇总起来,形成一个batch。真正发往kafka服务器的消息都是以batch为单位的。如下图所示:

这么做的好处显而易见。客户端和服务端通过网络通信,这样批量发送可以减少网络带来的性能开销,提高吞吐量。

这个Batch的管理就非常值得探讨了。可能有人会说,这不简单吗?用的时候分配一个块内存,发送完了释放不就行了吗。

kafka是用java语言编写的(新版本大部分都是用java实现的了),用上面的方案就是使用的时候new一个空间然后赋值给一个引用,释放的时候把引用置为null等JVM GC处理就可以了。

看起来似乎也没啥问题。但是在并发量比较高的时候就会频繁的进行GC。我们都知道GC的时候有个stop the world,尽管最新的GC技术这个时间已经非常短,依然有可能成为生产环境的性能瓶颈。

kafka的设计者当然能考虑到这一层。下面我们就来学习下kafka是如何对batch进行管理的。

缓冲池技术原理解析

kafka客户端使用了缓冲池的概念,预先分配好真实的内存块,放在池子里。

每个batch其实都对应了缓冲池中的一个内存空间,发送完消息之后,batch不再使用了,就把内存块归还给缓冲池。

听起来是不是很耳熟啊?不错,数据库连接池,线程池等池化技术其实差不多都是这样的原理。通过池化技术降低创建和销毁带来的开销,提升执行效率。

代码是最好的文档,下面我们就来撸下源码。

我们撸代码的步骤采用的是从上往下的原则,先带你看看缓冲池在哪里使用,然后再深入到缓存池内部深入分析。

下面的代码做了一些删减,值保留了跟本文相关的部分便于分析。

public class KafkaProducer<K, V> implements Producer<K, V> {private final Logger log;private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);private static final String JMX_PREFIX = "kafka.producer";public static final String NETWORK_THREAD_PREFIX = "kafka-producer-network-thread";public static final String PRODUCER_METRIC_GROUP_NAME = "producer-metrics";@Overridepublic Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {// intercept the record, which can be potentially modified; this method does not throw exceptionsProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);return doSend(interceptedRecord, callback);}private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,serializedValue, headers, interceptCallback, remainingWaitMs);...}

当我们调用客户端的发送消息的时候,底层会调用doSend,然后里面使用一个记录累计器RecordAccumulator把消息append进来。我们继续往下看看。

public final class RecordAccumulator {private final Logger log;private volatile boolean closed;private final AtomicInteger flushesInProgress;private final AtomicInteger appendsInProgress;private final int batchSize;private final CompressionType compression;private final int lingerMs;private final long retryBackoffMs;private final int deliveryTimeoutMs;private final BufferPool free;private final Time time;private final ApiVersions apiVersions;private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;private final IncompleteBatches incomplete;// The following variables are only accessed by the sender thread, so we don't need to protect them.private final Map<TopicPartition, Long> muted;private int drainIndex;private final TransactionManager transactionManager;private long nextBatchExpiryTimeMs = Long.MAX_VALUE; // the earliest time (absolute) a batch will expire.public RecordAppendResult append(TopicPartition tp,long timestamp,byte[] key,byte[] value,Header[] headers,Callback callback,long maxTimeToBlock) throws InterruptedException {// We keep track of the number of appending thread to make sure we do not miss batches in// abortIncompleteBatches().appendsInProgress.incrementAndGet();ByteBuffer buffer = null;buffer = free.allocate(size, maxTimeToBlock);synchronized (dq) {// Need to check if producer is closed again after grabbing the dequeue lock.if (closed)throw new KafkaException("Producer closed while send in progress");RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);if (appendResult != null) {// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...return appendResult;}MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));dq.addLast(batch);...

RecordAccumulator其实就是管理一个batch队列,我们看到append方法实现其实是调用BufferPool的free方法申请(allocate)了一块内存空间(ByteBuffer), 然后把这个内存空空间包装成batch添加到队列后面。

当消息发送完成不在使用batch的时候,RecordAccumulator会调用deallocate方法归还内存,内部其实是调用BufferPool的deallocate方法。

public void deallocate(ProducerBatch batch) {incomplete.remove(batch);// Only deallocate the batch if it is not a split batch because split batch are allocated outside the// buffer pool.if (!batch.isSplitBatch())free.deallocate(batch.buffer(), batch.initialCapacity());}

很明显,BufferPool就是缓冲池管理的类,也是我们今天要讨论的重点。我们先来看看分配内存块的方法。

public class BufferPool {static final String WAIT_TIME_SENSOR_NAME = "bufferpool-wait-time";private final long totalMemory;private final int poolableSize;private final ReentrantLock lock;private final Deque<ByteBuffer> free;private final Deque<Condition> waiters;/** Total available memory is the sum of nonPooledAvailableMemory and the number of byte buffers in free * poolableSize.  */private long nonPooledAvailableMemory;private final Metrics metrics;private final Time time;private final Sensor waitTime;public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {if (size > this.totalMemory)throw new IllegalArgumentException("Attempt to allocate " + size+ " bytes, but there is a hard limit of "+ this.totalMemory+ " on memory allocations.");ByteBuffer buffer = null;this.lock.lock();try {// check if we have a free buffer of the right size pooledif (size == poolableSize && !this.free.isEmpty())return this.free.pollFirst();// now check if the request is immediately satisfiable with the// memory on hand or if we need to blockint freeListSize = freeSize() * this.poolableSize;if (this.nonPooledAvailableMemory + freeListSize >= size) {// we have enough unallocated or pooled memory to immediately// satisfy the request, but need to allocate the bufferfreeUp(size);this.nonPooledAvailableMemory -= size;} else {// we are out of memory and will have to blockint accumulated = 0;Condition moreMemory = this.lock.newCondition();try {long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);this.waiters.addLast(moreMemory);// loop over and over until we have a buffer or have reserved// enough memory to allocate onewhile (accumulated < size) {long startWaitNs = time.nanoseconds();long timeNs;boolean waitingTimeElapsed;try {waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);} finally {long endWaitNs = time.nanoseconds();timeNs = Math.max(0L, endWaitNs - startWaitNs);recordWaitTime(timeNs);}if (waitingTimeElapsed) {throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");}remainingTimeToBlockNs -= timeNs;// check if we can satisfy this request from the free list,// otherwise allocate memoryif (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {// just grab a buffer from the free listbuffer = this.free.pollFirst();accumulated = size;} else {// we'll need to allocate memory, but we may only get// part of what we need on this iterationfreeUp(size - accumulated);int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory);this.nonPooledAvailableMemory -= got;accumulated += got;}...

首先整个方法是加锁操作的,所以支持并发分配内存。

逻辑是这样的,当申请的内存大小等于poolableSize,则从缓存池中获取。这个poolableSize可以理解成是缓冲池的页大小,作为缓冲池分配的基本单位。从缓存池获取其实就是从ByteBuffer队列取出一个元素返回。

如果申请的内存不等于特定的数值,则向非缓存池申请。同时会从缓冲池中取一些内存并入到非缓冲池中。这个nonPooledAvailableMemory指的就是非缓冲池的可用内存大小。非缓冲池分配内存,其实就是调用ByteBuffer.allocat分配真实的JVM内存。

缓存池的内存一般都很少回收。而非缓存池的内存是使用后丢弃,然后等待GC回收。

继续来看看batch释放的代码,

public void deallocate(ByteBuffer buffer, int size) {lock.lock();try {if (size == this.poolableSize && size == buffer.capacity()) {buffer.clear();this.free.add(buffer);} else {this.nonPooledAvailableMemory += size;}Condition moreMem = this.waiters.peekFirst();if (moreMem != null)moreMem.signal();} finally {lock.unlock();}}

很简单,也是分为两种情况。要么直接归还缓冲池,要么就是更新非缓冲池部分的可以内存。然后通知等待队列里的第一个元素。

最近看Kafka源码,着实被它的客户端缓冲池技术优雅到了相关推荐

  1. 带你了解下Kafka的客户端缓冲池技术

    最近看kafka源码,着实被它的客户端缓冲池技术优雅到了.忍不住要写篇文章赞美一下(哈哈). 注:本文用到的源码来自kafka2.2.2版本. 背景 当我们应用程序调用kafka客户端 produce ...

  2. 刚看完 Kafka 源码,各位随便问!

    Kafka 因其优越的特性广泛用于数据传输.消息中间件的设计.开发和维护等方面,也得到越来越多大厂(阿里.美团.百度.快手等)的青睐,很多 IT 界前辈更是在技术层面不断深挖.最近有位后端三年的朋友在 ...

  3. kafka源码_终于看到有人把Kafka讲清楚了,阿里面试官推荐你看这份源码笔记

    这几年,大数据发展迅猛,其中 Kakfa 凭借高可靠.高吞吐.高可用.可伸缩几大特性,成为数据管道技术的首选. 越来越多人开始使用 Kafka,对学习源码的需求也愈发强烈,原因主要有这么几个方面: 了 ...

  4. 跟我学Kafka源码Producer分析

    2019独角兽企业重金招聘Python工程师标准>>> 跟我学Kafka源码Producer分析 博客分类: MQ 本章主要讲解分析Kafka的Producer的业务逻辑,分发逻辑和 ...

  5. kafka源码分析之一server启动分析

    0. 关键概念 关键概念 Concepts Function Topic 用于划分Message的逻辑概念,一个Topic可以分布在多个Broker上. Partition 是Kafka中横向扩展和一 ...

  6. 【kafka】Kafka 源码解析:Group 协调管理机制

    1.概述 转载:Kafka 源码解析:Group 协调管理机制 在 Kafka 的设计中,消费者一般都有一个 group 的概念(当然,也存在不属于任何 group 的消费者),将多个消费者组织成一个 ...

  7. kafka源码_Kafka日志段源码解析

    1 Kafka 日志结构 kafka 日志在磁盘上的组织架构如下: Kafka 日志对象由多个日志段对象组成,每个日志段对象在磁盘上创建一组文件,包括: 日志文件(.log) 索引文件(.index) ...

  8. Kafka源码分析10:副本状态机ReplicaStateMachine详解 (图解+秒懂+史上最全)

    文章很长,建议收藏起来,慢慢读! Java 高并发 发烧友社群:疯狂创客圈 奉上以下珍贵的学习资源: 免费赠送 经典图书:<Java高并发核心编程(卷1)> 面试必备 + 大厂必备 +涨薪 ...

  9. 【kafka源码】/log_dir_event_notification的LogDir脱机事件通知

    前言 我们会看到zk的数据中有一个节点/log_dir_event_notification/,这是一个序列号持久节点 这个节点在kafka中承担的作用是: 当某个Broker上的LogDir出现异常 ...

最新文章

  1. php中的几种跳转语句以及各自的特点,PHP中的跳转语句有且仅有break和continue两个语句。...
  2. Java设计模式-工厂方法模式和抽象工厂模式
  3. AI理论知识整理(18)-内积与范数
  4. spring入门案例plus
  5. 看YYModel源码的一些收获
  6. 前端学习(1843):前端面试题之vue管理状态
  7. 学习web标准、用户体验改善、Ajaxamp;Asp.Net
  8. 计蒜客——双重回文数
  9. if 与 while
  10. 手把手教你配置阿里云服务器搭建网站(图文教程)
  11. 映射表跟业务表的区别_方正飞鸿中间件开发平台
  12. 图像处理的Dither和Banding
  13. 今日小程序推荐:汇率即时查-打通微信直接搜一搜
  14. 机器人让你摆脱电销压力
  15. 谷歌应用商店现木马程序、百万WiFi路由器面临漏洞风险|12月6日全球网络安全热点
  16. IDEA settings.xml 阿里云配置备份
  17. JS高级---基础总结深入
  18. Centos8安装显卡驱动以及Cuda
  19. Android包体积优化上篇- 资源混淆优化
  20. Linux用户管理 day5

热门文章

  1. Python基础教程(十三):JSON、练习题100题
  2. Redis集群方案,Codis安装测试
  3. 技术大牛长成记之不要光看热闹
  4. bigapple之utils-update部分apk自动下载安装
  5. 解决stamp mismatch with graph file
  6. Javascript右键菜单类
  7. vs2008安装部署软件过程
  8. jupyter代码字体大小_Jupyter notebook设置背景主题,字体大小及自动补全代码的操作...
  9. 上下位机通讯协议_嵌入式中自定义协议的一些典型例子
  10. winsever 2008 r2 管理员账号没有权限_钉钉管理员攻略—主管理员①