为什么80%的码农都做不了架构师?>>>   

前面一篇文章讨论了ConsumerFetcherManager的MaxLag与ConsumerOffsetChecker的lag值的区别。但是关于MaxLag的值还没有讲的太透彻,这里再深入一下,如何让ConsumerFetcherManager的MaxLag有值。

AbstractFetcherThread#processFetchRequest

kafka_2.10-0.8.2.2-sources.jar!/kafka/server/AbstractFetcherThread.scala

override def doWork() {inLock(partitionMapLock) {if (partitionMap.isEmpty)partitionMapCond.await(200L, TimeUnit.MILLISECONDS)partitionMap.foreach {case((topicAndPartition, offset)) =>fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition,offset, fetchSize)}}val fetchRequest = fetchRequestBuilder.build()if (!fetchRequest.requestInfo.isEmpty)processFetchRequest(fetchRequest)}

值得注意,这里构建了fetchRequest 这里的partitionMap,key是TopicAndPartition,value就是本地最大的offset 每次拉取的时候,以本地已经拉取的最大值,还有拉取大小构造fetchRequest

FetchRequest

kafka_2.10-0.8.2.2-sources.jar!/kafka/api/FetchRequest.scala

def addFetch(topic: String, partition: Int, offset: Long, fetchSize: Int) = {requestMap.put(TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize))this}

可以看到这里的offset与fetchSize决定了这个fetcher从broker拉取数据的开始位置和拉取数据的条数。

ConsumerFetcherThread

kafka_2.10-0.8.2.2-sources.jar!/kafka/consumer/ConsumerFetcherThread.scala

class ConsumerFetcherThread(name: String,val config: ConsumerConfig,sourceBroker: Broker,partitionMap: Map[TopicAndPartition, PartitionTopicInfo],val consumerFetcherManager: ConsumerFetcherManager)extends AbstractFetcherThread(name = name, clientId = config.clientId,sourceBroker = sourceBroker,socketTimeout = config.socketTimeoutMs,socketBufferSize = config.socketReceiveBufferBytes,fetchSize = config.fetchMessageMaxBytes,fetcherBrokerId = Request.OrdinaryConsumerId,maxWait = config.fetchWaitMaxMs,minBytes = config.fetchMinBytes,isInterruptible = true) {//...
}

这里使用的fetchSize来自config.fetchMessageMaxBytes

kafka_2.10-0.8.2.2-sources.jar!/kafka/consumer/ConsumerConfig.scala

class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(props) {//.../** the number of byes of messages to attempt to fetch */val fetchMessageMaxBytes = props.getInt("fetch.message.max.bytes", FetchSize)
}
object ConsumerConfig extends Config {val RefreshMetadataBackoffMs = 200val SocketTimeout = 30 * 1000val SocketBufferSize = 64*1024val FetchSize = 1024 * 1024val MaxFetchSize = 10*FetchSizeval NumConsumerFetchers = 1val DefaultFetcherBackoffMs = 1000val AutoCommit = trueval AutoCommitInterval = 60 * 1000val MaxQueuedChunks = 2val MaxRebalanceRetries = 4val AutoOffsetReset = OffsetRequest.LargestTimeStringval ConsumerTimeoutMs = -1val MinFetchBytes = 1val MaxFetchWaitMs = 100val MirrorTopicsWhitelist = ""val MirrorTopicsBlacklist = ""val MirrorConsumerNumThreads = 1val OffsetsChannelBackoffMs = 1000val OffsetsChannelSocketTimeoutMs = 10000val OffsetsCommitMaxRetries = 5val OffsetsStorage = "zookeeper"val MirrorTopicsWhitelistProp = "mirror.topics.whitelist"val MirrorTopicsBlacklistProp = "mirror.topics.blacklist"val ExcludeInternalTopics = trueval DefaultPartitionAssignmentStrategy = "range" /* select between "range", and "roundrobin" */val MirrorConsumerNumThreadsProp = "mirror.consumer.numthreads"val DefaultClientId = ""//...
}

这个fetchSize默认是1024 * 1024,也就是1048576,即每次fetch的时候拉取1048576这么多条。

AbstractFetcherThread#processFetchRequest

private def processFetchRequest(fetchRequest: FetchRequest) {val partitionsWithError = new mutable.HashSet[TopicAndPartition]var response: FetchResponse = nulltry {trace("Issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest))response = simpleConsumer.fetch(fetchRequest)} catch {case t: Throwable =>if (isRunning.get) {warn("Error in fetch %s. Possible cause: %s".format(fetchRequest, t.toString))partitionMapLock synchronized {partitionsWithError ++= partitionMap.keys}}}fetcherStats.requestRate.mark()if (response != null) {// process fetched datainLock(partitionMapLock) {response.data.foreach {case(topicAndPartition, partitionData) =>val (topic, partitionId) = topicAndPartition.asTupleval currentOffset = partitionMap.get(topicAndPartition)// we append to the log if the current offset is defined and it is the same as the offset requested during fetchif (currentOffset.isDefined && fetchRequest.requestInfo(topicAndPartition).offset == currentOffset.get) {partitionData.error match {case ErrorMapping.NoError =>try {val messages = partitionData.messages.asInstanceOf[ByteBufferMessageSet]val validBytes = messages.validBytes//这里请求之后,如果返回数据为空,那么newOffset就是取本地最大的offsetval newOffset = messages.shallowIterator.toSeq.lastOption match {case Some(m: MessageAndOffset) => m.nextOffsetcase None => currentOffset.get}partitionMap.put(topicAndPartition, newOffset)fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffsetfetcherStats.byteRate.mark(validBytes)//下面这个方法将拉回来的数据放进队列// Once we hand off the partition data to the subclass, we can't mess with it any more in this threadprocessPartitionData(topicAndPartition, currentOffset.get, partitionData)} catch {case ime: InvalidMessageException =>// we log the error and continue. This ensures two things// 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag// 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and//    should get fixed in the subsequent fetcheslogger.error("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentOffset.get + " error " + ime.getMessage)case e: Throwable =>throw new KafkaException("error processing data for partition [%s,%d] offset %d".format(topic, partitionId, currentOffset.get), e)}case ErrorMapping.OffsetOutOfRangeCode =>try {val newOffset = handleOffsetOutOfRange(topicAndPartition)partitionMap.put(topicAndPartition, newOffset)error("Current offset %d for partition [%s,%d] out of range; reset offset to %d".format(currentOffset.get, topic, partitionId, newOffset))} catch {case e: Throwable =>error("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e)partitionsWithError += topicAndPartition}case _ =>if (isRunning.get) {error("Error for partition [%s,%d] to broker %d:%s".format(topic, partitionId, sourceBroker.id,ErrorMapping.exceptionFor(partitionData.error).getClass))partitionsWithError += topicAndPartition}}}}}}if(partitionsWithError.size > 0) {debug("handling partitions with error for %s".format(partitionsWithError))handlePartitionsWithErrors(partitionsWithError)}}

ConsumerFetcherThread#processPartitionData

kafka_2.10-0.8.2.2-sources.jar!/kafka/consumer/ConsumerFetcherThread.scala

// process fetched datadef processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: FetchResponsePartitionData) {val pti = partitionMap(topicAndPartition)if (pti.getFetchOffset != fetchOffset)throw new RuntimeException("Offset doesn't match for partition [%s,%d] pti offset: %d fetch offset: %d".format(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset, fetchOffset))pti.enqueue(partitionData.messages.asInstanceOf[ByteBufferMessageSet])}

PartitionTopicInfo#enqueue

kafka_2.10-0.8.2.2-sources.jar!/kafka/consumer/PartitionTopicInfo.scala

/*** Enqueue a message set for processing.*/def enqueue(messages: ByteBufferMessageSet) {val size = messages.validBytesif(size > 0) {val next = messages.shallowIterator.toSeq.last.nextOffsettrace("Updating fetch offset = " + fetchedOffset.get + " to " + next)chunkQueue.put(new FetchedDataChunk(messages, this, fetchedOffset.get))fetchedOffset.set(next)debug("updated fetch offset of (%s) to %d".format(this, next))consumerTopicStats.getConsumerTopicStats(topic).byteRate.mark(size)consumerTopicStats.getConsumerAllTopicStats().byteRate.mark(size)} else if(messages.sizeInBytes > 0) {chunkQueue.put(new FetchedDataChunk(messages, this, fetchedOffset.get))}}

如果数据为空,则不放进队列

chunkQueue大小

kafka_2.10-0.8.2.2-sources.jar!/kafka/consumer/ZookeeperConsumerConnector.scala

def consume[K, V](topicCountMap: scala.collection.Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V]): Map[String,List[KafkaStream[K,V]]] = {debug("entering consume ")if (topicCountMap == null)throw new RuntimeException("topicCountMap is null")val topicCount = TopicCount.constructTopicCount(consumerIdString, topicCountMap)val topicThreadIds = topicCount.getConsumerThreadIdsPerTopic// make a list of (queue,stream) pairs, one pair for each threadIdval queuesAndStreams = topicThreadIds.values.map(threadIdSet =>threadIdSet.map(_ => {val queue =  new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages)val stream = new KafkaStream[K,V](queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.clientId)(queue, stream)})).flatten.toListval dirs = new ZKGroupDirs(config.groupId)registerConsumerInZK(dirs, consumerIdString, topicCount)reinitializeConsumer(topicCount, queuesAndStreams)loadBalancerListener.kafkaMessageAndMetadataStreams.asInstanceOf[Map[String, List[KafkaStream[K,V]]]]}

queue在这里创建了,大小为config.queuedMaxMessages

/** max number of message chunks buffered for consumption, each chunk can be up to fetch.message.max.bytes*/val queuedMaxMessages = props.getInt("queued.max.message.chunks", MaxQueuedChunks)val MaxQueuedChunks = 2

默认队列最大只能有2个FetchedDataChunk 而每个FetchedDataChunk里头最大的消息数目就是fetchSize大小也就是10241024 也就是说每个消费线程的chunkQueue里头默认最大的消息数目为21024*1024

当超过这个数目的时候,enquue就会阻塞,这样就形成了对整个fetch的拉取速度的控制。

ConsumerFetcherManager的MaxLag

要使得这个有值的话,那就是修改fetch.message.max.bytes的值,改小一点。比如

        props.put("fetch.message.max.bytes","10");props.put("queued.max.message.chunks","1");

那么每次只拉10条消息,假设目前的lag如下

Group  Topic             Pid Offset          logSize         Lag             Owner
mgroup mtopic              0   353             8727            8374            demo-1514550322182-6d67873d-0
mgroup mtopic              1   258             8702            8444            demo-1514550322182-6d67873d-1
mgroup mtopic              2   307             8615            8308            demo-1514550322182-6d67873d-2

拉取一次之后

                 val newOffset = messages.shallowIterator.toSeq.lastOption match {case Some(m: MessageAndOffset) => m.nextOffsetcase None => currentOffset.get}partitionMap.put(topicAndPartition, newOffset)fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset

这里的nextOffset = offset + 1,也就是拉取回来的最大offset+1 = 259,hw的话是8702,那么lag值就是8702-259=8443 这里为了复现,让消费线程拉取一条之后抛异常退出

小结

生产环境注意根据消息大小以及环境内存等对如下参数进行配置,否则很容易引发OOM

  • queued.max.message.chunks,默认是2,控制chunkQueue的容量
  • fetch.message.max.bytes,默认是1024*1024,控制每个chunk的消息最大数目

另外关于ConsumerFetcherManager的MaxLag,只有在上面两个参数合理设置的情况下,才能对监控有点点帮助(chunkQueue越小越能从MaxLag反应消费者消费滞后的情况;否则只能反应client fetcher thread的消息拉取的滞后情况;不过设置太小的话就得频繁拉取,影响消费者消费,可以根据情况适中调整)。从实际场景来看,还是一般比较少改动参数的话,那么还是得以ConsumerOffsetChecker的lag值做消费者消费滞后的监控才准确。

doc

  • ConsumerFetcherManager MaxLag
  • apache kafka系列之jmx监控指标参数
  • Kafka源码分析 Consumer(2) Fetcher

转载于:https://my.oschina.net/go4it/blog/1599800

聊聊kafka client chunkQueue 与 MaxLag值相关推荐

  1. Windbg调优Kafka.Client内存泄露

    从来没写过Blog,想想也是,工作十多年了,搞过N多的架构.技术,不与大家分享实在是可惜了.另外,从传统地ERP行业转到互联网,也遇到了很所前所未有的问题,原来知道有一些坑,但是不知道坑太多太深.借着 ...

  2. 聊聊 Kafka:Kafka 消息重复的场景以及最佳实践

    一.前言 上一篇我们讲了 聊聊 Kafka:Kafka 消息丢失的场景以及最佳实践,这一篇我们来说一说 Kafka 消息重复的场景以及最佳实践. 我们下面会从以下两个方面来说一下 Kafka 消息重复 ...

  3. .net Kafka.Client多个Consumer Group对Topic消费不能完全覆盖研究总结(二)

    依据Partition和Consumer的Rebalance策略,找到Kafka.Client Rebalance代码块,还原本地环境,跟踪调试,发现自定义Consumer Group 的Consum ...

  4. .net Kafka.Client多个Consumer Group对Topic消费不能完全覆盖研究总结(一)

    我们知道Kafka支持Consumer Group的功能,但是最近在应用Consumer Group时发现了一个Topic 的Partition不能100%覆盖的问题. 程序部署后,发现Kafka在p ...

  5. 【解决方案】kafka: client has run out of available brokers to talk to (Is your cluster reachable?)

      终端中报错kafka: client has run out of available brokers to talk to (Is your cluster reachable?)   同时Ka ...

  6. [ERROR] sarama.NewSyncProducer error:kafka: client has run out of available brokers to talk to (Is y

    [ERROR] sarama.NewSyncProducer error:kafka: client has run out of available brokers to talk to (Is y ...

  7. 聊聊 Kafka:协调者 GroupCoordinator 源码剖析之 FIND_COORDINATOR

    四.ApiKeys.FIND_COORDINATOR 我们前面的文章说过,与消费组相关的两个组件,一个是消费者客户端的 ConsumerCoordinator,一个是 Kafka Broker 服务端 ...

  8. 聊聊 Kafka: Kafka 为啥这么快?

    一.前言 我们都知道 Kafka 是基于磁盘进行存储的,但 Kafka 官方又称其具有高性能.高吞吐.低延时的特点,其吞吐量动辄几十上百万.小伙伴们是不是有点困惑了,一般认为在磁盘上读写数据是会降低性 ...

  9. 聊聊 Kafka:如何避免消费组的 Rebalance

    一.前言 我们上一篇聊了 Rebalance 机制,相信你对消费组的重平衡有个整体的认识.这里再简单回顾一下,Rebalance 就是让一个 Consumer Group 下所有的 Consumer ...

最新文章

  1. linux下压缩和解压的命令汇总
  2. 洛谷P2512 糖果传递
  3. android中volley通信框架简介
  4. java 源码分析_Java 源代码编译成 Class 文件的过程分析
  5. CDH5离线安装手册
  6. Java不兼容类型问题解决方案
  7. abs 不会整数 方法 溢出_asp cint clng的范围与防止cint和clng的溢出解决方法大全
  8. Web容器默认的servlet
  9. 数字证书转换cer---pem
  10. div和span标签(HTML)
  11. 微软切断XP供应 Vista成制造商惟一选择
  12. memento about Linux
  13. 新版火狐浏览器安装Flash组件,解决部分网站视频无法观看问题
  14. python读取视频文件大小,码率,帧率,以及通过码率计算文件大小与流量
  15. SpringBoot @Mapper注解实现类型转换bean无法注入
  16. 尝试关闭阿里云ESC的阿里云盾相关服务
  17. 博士毕业论文英文参考文献换行_写毕业论文时,需要掌握这10个最实用的Word技巧...
  18. 吉他的那些事-----------------吉他零基础入门
  19. 计算机的应用主要有哪几个,计算机的应用主要有哪几个方面?
  20. 最大化参数 火车头_火车头采集器教程:使用正则匹配模式采集数据

热门文章

  1. svn清理失败且路径显示乱码
  2. 微软牛津计划-语音转文本-文本转语音代码和实现
  3. HDU 1432 Lining Up (POJ 1118)
  4. winphone8 模拟器设置(新)
  5. HttpContext.Current.Session ,出现未将对象引用设置到实例上
  6. cp复制文件和目录的使用举例
  7. jQuery获取Text和Value
  8. 解读eXtremeComponents代码结构--转载
  9. Chapter 17 高级进程间通信
  10. Linux下root无法运行Chrome浏览器的解决方法