聊聊kafka client chunkQueue 与 MaxLag值
为什么80%的码农都做不了架构师?>>>
序
前面一篇文章讨论了ConsumerFetcherManager的MaxLag与ConsumerOffsetChecker的lag值的区别。但是关于MaxLag的值还没有讲的太透彻,这里再深入一下,如何让ConsumerFetcherManager的MaxLag有值。
AbstractFetcherThread#processFetchRequest
kafka_2.10-0.8.2.2-sources.jar!/kafka/server/AbstractFetcherThread.scala
override def doWork() {inLock(partitionMapLock) {if (partitionMap.isEmpty)partitionMapCond.await(200L, TimeUnit.MILLISECONDS)partitionMap.foreach {case((topicAndPartition, offset)) =>fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition,offset, fetchSize)}}val fetchRequest = fetchRequestBuilder.build()if (!fetchRequest.requestInfo.isEmpty)processFetchRequest(fetchRequest)}
值得注意,这里构建了fetchRequest 这里的partitionMap,key是TopicAndPartition,value就是本地最大的offset 每次拉取的时候,以本地已经拉取的最大值,还有拉取大小构造fetchRequest
FetchRequest
kafka_2.10-0.8.2.2-sources.jar!/kafka/api/FetchRequest.scala
def addFetch(topic: String, partition: Int, offset: Long, fetchSize: Int) = {requestMap.put(TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize))this}
可以看到这里的offset与fetchSize决定了这个fetcher从broker拉取数据的开始位置和拉取数据的条数。
ConsumerFetcherThread
kafka_2.10-0.8.2.2-sources.jar!/kafka/consumer/ConsumerFetcherThread.scala
class ConsumerFetcherThread(name: String,val config: ConsumerConfig,sourceBroker: Broker,partitionMap: Map[TopicAndPartition, PartitionTopicInfo],val consumerFetcherManager: ConsumerFetcherManager)extends AbstractFetcherThread(name = name, clientId = config.clientId,sourceBroker = sourceBroker,socketTimeout = config.socketTimeoutMs,socketBufferSize = config.socketReceiveBufferBytes,fetchSize = config.fetchMessageMaxBytes,fetcherBrokerId = Request.OrdinaryConsumerId,maxWait = config.fetchWaitMaxMs,minBytes = config.fetchMinBytes,isInterruptible = true) {//...
}
这里使用的fetchSize来自config.fetchMessageMaxBytes
kafka_2.10-0.8.2.2-sources.jar!/kafka/consumer/ConsumerConfig.scala
class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(props) {//.../** the number of byes of messages to attempt to fetch */val fetchMessageMaxBytes = props.getInt("fetch.message.max.bytes", FetchSize)
}
object ConsumerConfig extends Config {val RefreshMetadataBackoffMs = 200val SocketTimeout = 30 * 1000val SocketBufferSize = 64*1024val FetchSize = 1024 * 1024val MaxFetchSize = 10*FetchSizeval NumConsumerFetchers = 1val DefaultFetcherBackoffMs = 1000val AutoCommit = trueval AutoCommitInterval = 60 * 1000val MaxQueuedChunks = 2val MaxRebalanceRetries = 4val AutoOffsetReset = OffsetRequest.LargestTimeStringval ConsumerTimeoutMs = -1val MinFetchBytes = 1val MaxFetchWaitMs = 100val MirrorTopicsWhitelist = ""val MirrorTopicsBlacklist = ""val MirrorConsumerNumThreads = 1val OffsetsChannelBackoffMs = 1000val OffsetsChannelSocketTimeoutMs = 10000val OffsetsCommitMaxRetries = 5val OffsetsStorage = "zookeeper"val MirrorTopicsWhitelistProp = "mirror.topics.whitelist"val MirrorTopicsBlacklistProp = "mirror.topics.blacklist"val ExcludeInternalTopics = trueval DefaultPartitionAssignmentStrategy = "range" /* select between "range", and "roundrobin" */val MirrorConsumerNumThreadsProp = "mirror.consumer.numthreads"val DefaultClientId = ""//...
}
这个fetchSize默认是1024 * 1024,也就是1048576,即每次fetch的时候拉取1048576这么多条。
AbstractFetcherThread#processFetchRequest
private def processFetchRequest(fetchRequest: FetchRequest) {val partitionsWithError = new mutable.HashSet[TopicAndPartition]var response: FetchResponse = nulltry {trace("Issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest))response = simpleConsumer.fetch(fetchRequest)} catch {case t: Throwable =>if (isRunning.get) {warn("Error in fetch %s. Possible cause: %s".format(fetchRequest, t.toString))partitionMapLock synchronized {partitionsWithError ++= partitionMap.keys}}}fetcherStats.requestRate.mark()if (response != null) {// process fetched datainLock(partitionMapLock) {response.data.foreach {case(topicAndPartition, partitionData) =>val (topic, partitionId) = topicAndPartition.asTupleval currentOffset = partitionMap.get(topicAndPartition)// we append to the log if the current offset is defined and it is the same as the offset requested during fetchif (currentOffset.isDefined && fetchRequest.requestInfo(topicAndPartition).offset == currentOffset.get) {partitionData.error match {case ErrorMapping.NoError =>try {val messages = partitionData.messages.asInstanceOf[ByteBufferMessageSet]val validBytes = messages.validBytes//这里请求之后,如果返回数据为空,那么newOffset就是取本地最大的offsetval newOffset = messages.shallowIterator.toSeq.lastOption match {case Some(m: MessageAndOffset) => m.nextOffsetcase None => currentOffset.get}partitionMap.put(topicAndPartition, newOffset)fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffsetfetcherStats.byteRate.mark(validBytes)//下面这个方法将拉回来的数据放进队列// Once we hand off the partition data to the subclass, we can't mess with it any more in this threadprocessPartitionData(topicAndPartition, currentOffset.get, partitionData)} catch {case ime: InvalidMessageException =>// we log the error and continue. This ensures two things// 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag// 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and// should get fixed in the subsequent fetcheslogger.error("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentOffset.get + " error " + ime.getMessage)case e: Throwable =>throw new KafkaException("error processing data for partition [%s,%d] offset %d".format(topic, partitionId, currentOffset.get), e)}case ErrorMapping.OffsetOutOfRangeCode =>try {val newOffset = handleOffsetOutOfRange(topicAndPartition)partitionMap.put(topicAndPartition, newOffset)error("Current offset %d for partition [%s,%d] out of range; reset offset to %d".format(currentOffset.get, topic, partitionId, newOffset))} catch {case e: Throwable =>error("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e)partitionsWithError += topicAndPartition}case _ =>if (isRunning.get) {error("Error for partition [%s,%d] to broker %d:%s".format(topic, partitionId, sourceBroker.id,ErrorMapping.exceptionFor(partitionData.error).getClass))partitionsWithError += topicAndPartition}}}}}}if(partitionsWithError.size > 0) {debug("handling partitions with error for %s".format(partitionsWithError))handlePartitionsWithErrors(partitionsWithError)}}
ConsumerFetcherThread#processPartitionData
kafka_2.10-0.8.2.2-sources.jar!/kafka/consumer/ConsumerFetcherThread.scala
// process fetched datadef processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: FetchResponsePartitionData) {val pti = partitionMap(topicAndPartition)if (pti.getFetchOffset != fetchOffset)throw new RuntimeException("Offset doesn't match for partition [%s,%d] pti offset: %d fetch offset: %d".format(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset, fetchOffset))pti.enqueue(partitionData.messages.asInstanceOf[ByteBufferMessageSet])}
PartitionTopicInfo#enqueue
kafka_2.10-0.8.2.2-sources.jar!/kafka/consumer/PartitionTopicInfo.scala
/*** Enqueue a message set for processing.*/def enqueue(messages: ByteBufferMessageSet) {val size = messages.validBytesif(size > 0) {val next = messages.shallowIterator.toSeq.last.nextOffsettrace("Updating fetch offset = " + fetchedOffset.get + " to " + next)chunkQueue.put(new FetchedDataChunk(messages, this, fetchedOffset.get))fetchedOffset.set(next)debug("updated fetch offset of (%s) to %d".format(this, next))consumerTopicStats.getConsumerTopicStats(topic).byteRate.mark(size)consumerTopicStats.getConsumerAllTopicStats().byteRate.mark(size)} else if(messages.sizeInBytes > 0) {chunkQueue.put(new FetchedDataChunk(messages, this, fetchedOffset.get))}}
如果数据为空,则不放进队列
chunkQueue大小
kafka_2.10-0.8.2.2-sources.jar!/kafka/consumer/ZookeeperConsumerConnector.scala
def consume[K, V](topicCountMap: scala.collection.Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V]): Map[String,List[KafkaStream[K,V]]] = {debug("entering consume ")if (topicCountMap == null)throw new RuntimeException("topicCountMap is null")val topicCount = TopicCount.constructTopicCount(consumerIdString, topicCountMap)val topicThreadIds = topicCount.getConsumerThreadIdsPerTopic// make a list of (queue,stream) pairs, one pair for each threadIdval queuesAndStreams = topicThreadIds.values.map(threadIdSet =>threadIdSet.map(_ => {val queue = new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages)val stream = new KafkaStream[K,V](queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.clientId)(queue, stream)})).flatten.toListval dirs = new ZKGroupDirs(config.groupId)registerConsumerInZK(dirs, consumerIdString, topicCount)reinitializeConsumer(topicCount, queuesAndStreams)loadBalancerListener.kafkaMessageAndMetadataStreams.asInstanceOf[Map[String, List[KafkaStream[K,V]]]]}
queue在这里创建了,大小为config.queuedMaxMessages
/** max number of message chunks buffered for consumption, each chunk can be up to fetch.message.max.bytes*/val queuedMaxMessages = props.getInt("queued.max.message.chunks", MaxQueuedChunks)val MaxQueuedChunks = 2
默认队列最大只能有2个FetchedDataChunk 而每个FetchedDataChunk里头最大的消息数目就是fetchSize大小也就是10241024 也就是说每个消费线程的chunkQueue里头默认最大的消息数目为21024*1024
当超过这个数目的时候,enquue就会阻塞,这样就形成了对整个fetch的拉取速度的控制。
ConsumerFetcherManager的MaxLag
要使得这个有值的话,那就是修改fetch.message.max.bytes的值,改小一点。比如
props.put("fetch.message.max.bytes","10");props.put("queued.max.message.chunks","1");
那么每次只拉10条消息,假设目前的lag如下
Group Topic Pid Offset logSize Lag Owner
mgroup mtopic 0 353 8727 8374 demo-1514550322182-6d67873d-0
mgroup mtopic 1 258 8702 8444 demo-1514550322182-6d67873d-1
mgroup mtopic 2 307 8615 8308 demo-1514550322182-6d67873d-2
拉取一次之后
val newOffset = messages.shallowIterator.toSeq.lastOption match {case Some(m: MessageAndOffset) => m.nextOffsetcase None => currentOffset.get}partitionMap.put(topicAndPartition, newOffset)fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset
这里的nextOffset = offset + 1,也就是拉取回来的最大offset+1 = 259,hw的话是8702,那么lag值就是8702-259=8443 这里为了复现,让消费线程拉取一条之后抛异常退出
小结
生产环境注意根据消息大小以及环境内存等对如下参数进行配置,否则很容易引发OOM
- queued.max.message.chunks,默认是2,控制chunkQueue的容量
- fetch.message.max.bytes,默认是1024*1024,控制每个chunk的消息最大数目
另外关于ConsumerFetcherManager的MaxLag,只有在上面两个参数合理设置的情况下,才能对监控有点点帮助(chunkQueue越小越能从MaxLag反应消费者消费滞后的情况;否则只能反应client fetcher thread的消息拉取的滞后情况;不过设置太小的话就得频繁拉取,影响消费者消费,可以根据情况适中调整
)。从实际场景来看,还是一般比较少改动参数的话,那么还是得以ConsumerOffsetChecker的lag值做消费者消费滞后的监控才准确。
doc
- ConsumerFetcherManager MaxLag
- apache kafka系列之jmx监控指标参数
- Kafka源码分析 Consumer(2) Fetcher
转载于:https://my.oschina.net/go4it/blog/1599800
聊聊kafka client chunkQueue 与 MaxLag值相关推荐
- Windbg调优Kafka.Client内存泄露
从来没写过Blog,想想也是,工作十多年了,搞过N多的架构.技术,不与大家分享实在是可惜了.另外,从传统地ERP行业转到互联网,也遇到了很所前所未有的问题,原来知道有一些坑,但是不知道坑太多太深.借着 ...
- 聊聊 Kafka:Kafka 消息重复的场景以及最佳实践
一.前言 上一篇我们讲了 聊聊 Kafka:Kafka 消息丢失的场景以及最佳实践,这一篇我们来说一说 Kafka 消息重复的场景以及最佳实践. 我们下面会从以下两个方面来说一下 Kafka 消息重复 ...
- .net Kafka.Client多个Consumer Group对Topic消费不能完全覆盖研究总结(二)
依据Partition和Consumer的Rebalance策略,找到Kafka.Client Rebalance代码块,还原本地环境,跟踪调试,发现自定义Consumer Group 的Consum ...
- .net Kafka.Client多个Consumer Group对Topic消费不能完全覆盖研究总结(一)
我们知道Kafka支持Consumer Group的功能,但是最近在应用Consumer Group时发现了一个Topic 的Partition不能100%覆盖的问题. 程序部署后,发现Kafka在p ...
- 【解决方案】kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
终端中报错kafka: client has run out of available brokers to talk to (Is your cluster reachable?) 同时Ka ...
- [ERROR] sarama.NewSyncProducer error:kafka: client has run out of available brokers to talk to (Is y
[ERROR] sarama.NewSyncProducer error:kafka: client has run out of available brokers to talk to (Is y ...
- 聊聊 Kafka:协调者 GroupCoordinator 源码剖析之 FIND_COORDINATOR
四.ApiKeys.FIND_COORDINATOR 我们前面的文章说过,与消费组相关的两个组件,一个是消费者客户端的 ConsumerCoordinator,一个是 Kafka Broker 服务端 ...
- 聊聊 Kafka: Kafka 为啥这么快?
一.前言 我们都知道 Kafka 是基于磁盘进行存储的,但 Kafka 官方又称其具有高性能.高吞吐.低延时的特点,其吞吐量动辄几十上百万.小伙伴们是不是有点困惑了,一般认为在磁盘上读写数据是会降低性 ...
- 聊聊 Kafka:如何避免消费组的 Rebalance
一.前言 我们上一篇聊了 Rebalance 机制,相信你对消费组的重平衡有个整体的认识.这里再简单回顾一下,Rebalance 就是让一个 Consumer Group 下所有的 Consumer ...
最新文章
- linux下压缩和解压的命令汇总
- 洛谷P2512 糖果传递
- android中volley通信框架简介
- java 源码分析_Java 源代码编译成 Class 文件的过程分析
- CDH5离线安装手册
- Java不兼容类型问题解决方案
- abs 不会整数 方法 溢出_asp cint clng的范围与防止cint和clng的溢出解决方法大全
- Web容器默认的servlet
- 数字证书转换cer---pem
- div和span标签(HTML)
- 微软切断XP供应 Vista成制造商惟一选择
- memento about Linux
- 新版火狐浏览器安装Flash组件,解决部分网站视频无法观看问题
- python读取视频文件大小,码率,帧率,以及通过码率计算文件大小与流量
- SpringBoot @Mapper注解实现类型转换bean无法注入
- 尝试关闭阿里云ESC的阿里云盾相关服务
- 博士毕业论文英文参考文献换行_写毕业论文时,需要掌握这10个最实用的Word技巧...
- 吉他的那些事-----------------吉他零基础入门
- 计算机的应用主要有哪几个,计算机的应用主要有哪几个方面?
- 最大化参数 火车头_火车头采集器教程:使用正则匹配模式采集数据