handleProduceRequest

该方法的流程如下:

  • 前置检查,判断ProduceRequest中是否含有事务record,或者幂等record。
  • 迭代ProduceRequest的partitionRecord,partitionRecord是一个(topicPartition, memoryRecords)的二元组。
  • 如果brocker的元数据缓存包含该topicPartition,将该二元组添加到authorizedRequestInfo的map集合。
  • 调用ReplicaManager,将authorizedRequestInfo追加到leader副本。
  • 追加完成后,调用sendResponseCallback()方法执行回调。
/*** Handle a produce request*/def handleProduceRequest(request: RequestChannel.Request) {val produceRequest = request.body[ProduceRequest]val numBytesAppended = request.header.toStruct.sizeOf + request.sizeOfBodyInBytes
//如果produceRequest含有事务recordif (produceRequest.hasTransactionalRecords) {val isAuthorizedTransactional = produceRequest.transactionalId != null &&authorize(request.session, Write, Resource(TransactionalId, produceRequest.transactionalId, LITERAL))
//如果事务未开启授权if (!isAuthorizedTransactional) {sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)return}// Note that authorization to a transactionalId implies ProducerId authorization
//如果produceRequest含有幂等record,但会话没有开启授权} else if (produceRequest.hasIdempotentRecords && !authorize(request.session, IdempotentWrite, Resource.ClusterResource)) {sendErrorResponseMaybeThrottle(request, Errors.CLUSTER_AUTHORIZATION_FAILED.exception)return}
//未授权topic的responseval unauthorizedTopicResponses = mutable.Map[TopicPartition, PartitionResponse]()
//不存在topic的responseval nonExistingTopicResponses = mutable.Map[TopicPartition, PartitionResponse]()
//无效请求的responseval invalidRequestResponses = mutable.Map[TopicPartition, PartitionResponse]()val authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]()
//迭代produceRequest的partitionRecordsfor ((topicPartition, memoryRecords) <- produceRequest.partitionRecordsOrFail.asScala) {if (!authorize(request.session, Write, Resource(Topic, topicPartition.topic, LITERAL)))
//如果判断topic未授权,生成未授权topic的responseunauthorizedTopicResponses += topicPartition -> new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED)else if (!metadataCache.contains(topicPartition))
//如果判断broker的元数据缓存不包含该topicPartition,生成不存在topic的responsenonExistingTopicResponses += topicPartition -> new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)elsetry {ProduceRequest.validateRecords(request.header.apiVersion(), memoryRecords)
//如果brocker的元数据缓存包含该topicPartition,将该二元组添加到authorizedRequestInfo的map集合authorizedRequestInfo += (topicPartition -> memoryRecords)} catch {case e: ApiException =>invalidRequestResponses += topicPartition -> new PartitionResponse(Errors.forException(e))}}// the callback for sending a produce responsedef sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {val mergedResponseStatus = responseStatus ++ unauthorizedTopicResponses ++ nonExistingTopicResponses ++ invalidRequestResponsesvar errorInResponse = falsemergedResponseStatus.foreach { case (topicPartition, status) =>if (status.error != Errors.NONE) {errorInResponse = truedebug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(request.header.correlationId,request.header.clientId,topicPartition,status.error.exceptionName))}}// When this callback is triggered, the remote API call has completedrequest.apiRemoteCompleteTimeNanos = time.nanoseconds// Record both bandwidth and request quota-specific values and throttle by muting the channel if any of the quotas// have been violated. If both quotas have been violated, use the max throttle time between the two quotas. Note// that the request quota is not enforced if acks == 0.val bandwidthThrottleTimeMs = quotas.produce.maybeRecordAndGetThrottleTimeMs(request, numBytesAppended, time.milliseconds())val requestThrottleTimeMs = if (produceRequest.acks == 0) 0 else quotas.request.maybeRecordAndGetThrottleTimeMs(request)val maxThrottleTimeMs = Math.max(bandwidthThrottleTimeMs, requestThrottleTimeMs)if (maxThrottleTimeMs > 0) {if (bandwidthThrottleTimeMs > requestThrottleTimeMs) {quotas.produce.throttle(request, bandwidthThrottleTimeMs, sendResponse)} else {quotas.request.throttle(request, requestThrottleTimeMs, sendResponse)}}// Send the response immediately. In case of throttling, the channel has already been muted.if (produceRequest.acks == 0) {// no operation needed if producer request.required.acks = 0; however, if there is any error in handling// the request, since no response is expected by the producer, the server will close socket server so that// the producer client will know that some error has happened and will refresh its metadataif (errorInResponse) {val exceptionsSummary = mergedResponseStatus.map { case (topicPartition, status) =>topicPartition -> status.error.exceptionName}.mkString(", ")info(s"Closing connection due to error during produce request with correlation id ${request.header.correlationId} " +s"from client id ${request.header.clientId} with ack=0\n" +s"Topic and partition to exceptions: $exceptionsSummary")closeConnection(request, new ProduceResponse(mergedResponseStatus.asJava).errorCounts)} else {// Note that although request throttling is exempt for acks == 0, the channel may be throttled due to// bandwidth quota violation.sendNoOpResponseExemptThrottle(request)}} else {sendResponse(request, Some(new ProduceResponse(mergedResponseStatus.asJava, maxThrottleTimeMs)), None)}}def processingStatsCallback(processingStats: FetchResponseStats): Unit = {processingStats.foreach { case (tp, info) =>updateRecordConversionStats(request, tp, info)}}if (authorizedRequestInfo.isEmpty)sendResponseCallback(Map.empty)else {val internalTopicsAllowed = request.header.clientId == AdminUtils.AdminClientId// call the replica manager to append messages to the replicas
//将record添加到副本brokerreplicaManager.appendRecords(timeout = produceRequest.timeout.toLong,requiredAcks = produceRequest.acks,internalTopicsAllowed = internalTopicsAllowed,isFromClient = true,entriesPerPartition = authorizedRequestInfo,responseCallback = sendResponseCallback,recordConversionStatsCallback = processingStatsCallback)// if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected;// hence we clear its data here in order to let GC reclaim its memory since it is already appended to logproduceRequest.clearPartitionRecords()}}

ReplicaManager.appendRecord方法

追加record到leader副本的分区中,并且阻塞等待record被其它follower副本复制。回调函数会被触发,当timeout参数指定的超时时间到达时,或者requireAck参数指定的ack个数已经满足时。回调函数本身也必须使用synchronized同步。

该方法的主要流程如下:

  • leader副本将record写入local log
  • 判断ack参数是否为-1,即是否要求所有副本都写入消息成功后才返回响应,如果是,创建DelayedProduce;否则立即返回响应,触发responseCallback回调函数。
 def appendRecords(timeout: Long,requiredAcks: Short,internalTopicsAllowed: Boolean,isFromClient: Boolean,entriesPerPartition: Map[TopicPartition, MemoryRecords],responseCallback: Map[TopicPartition, PartitionResponse] => Unit,delayedProduceLock: Option[Lock] = None,recordConversionStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => ()) {
//如果是有效的ack,即ack是1、-1或0if (isValidRequiredAcks(requiredAcks)) {val sTime = time.milliseconds
//追加到local logval localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,isFromClient = isFromClient, entriesPerPartition, requiredAcks)debug("Produce to local log in %d ms".format(time.milliseconds - sTime))val produceStatus = localProduceResults.map { case (topicPartition, result) =>topicPartition ->ProducePartitionStatus(result.info.lastOffset + 1, // required offsetnew PartitionResponse(result.error, result.info.firstOffset.getOrElse(-1), result.info.logAppendTime, result.info.logStartOffset)) // response status}recordConversionStatsCallback(localProduceResults.mapValues(_.info.recordConversionStats))
//如果请求要求所有副本都写入消息成功后才能返回响应if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {// create delayed produce operationval produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
//创建DalayedProduceval delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock)// create a list of (topic, partition) pairs to use as keys for this delayed produce operationval producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq// try to complete the request immediately, otherwise put it into the purgatory// this is because while the delayed produce operation is being created, new// requests may arrive and hence make this operation completable.delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)} else {
//否则立即返回响应// we can respond immediatelyval produceResponseStatus = produceStatus.mapValues(status => status.responseStatus)
//触发回调函数responseCallback(produceResponseStatus)}} else {
//如果是无效的ack,返回错误信息的response// If required.acks is outside accepted range, something is wrong with the client// Just return an error and don't handle the request at allval responseStatus = entriesPerPartition.map { case (topicPartition, _) =>topicPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS,LogAppendInfo.UnknownLogAppendInfo.firstOffset.getOrElse(-1), RecordBatch.NO_TIMESTAMP, LogAppendInfo.UnknownLogAppendInfo.logStartOffset)}responseCallback(responseStatus)}}

ReplicaManager#appendToLocalLog方法

该方法的流程如下:

  1. 首先判断要写的 topic 是不是 Kafka 内置的 topic,内置的 topic 是不允许 追加日志;
  2. 先查找 topic-partition 对应的 Partition 对象,如果在RepliaManager成员变量 —— allPartitions 中查找到了对应的 partition,那么直接调用 partition.appendRecordsToLeader() 方法追加相应的 records,否则会向 client 抛出异常。
  /*** Append the messages to the local replica logs*/private def appendToLocalLog(internalTopicsAllowed: Boolean,isFromClient: Boolean,entriesPerPartition: Map[TopicPartition, MemoryRecords],requiredAcks: Short): Map[TopicPartition, LogAppendResult] = {trace(s"Append [$entriesPerPartition] to local log")
//遍历所有要写的topicPartition,以及每个topicPartition要写入的recordsentriesPerPartition.map { case (topicPartition, records) =>brokerTopicStats.topicStats(topicPartition.topic).totalProduceRequestRate.mark()brokerTopicStats.allTopicsStats.totalProduceRequestRate.mark()// reject appending to internal topics if it is not allowed
//判断是否是内部topic,如果是内部topic且不允许写入数据,拒绝追加日志if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) {(topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo,Some(new InvalidTopicException(s"Cannot append to internal topic ${topicPartition.topic}"))))} else {try {
//获取topicPartition的Partition对象
//在RepliaManager成员变量 —— allPartitions是[topicPartition,Partition]的map集合val partition = getPartitionOrException(topicPartition, expectLeader = true)
//追加日志val info = partition.appendRecordsToLeader(records, isFromClient, requiredAcks)val numAppendedMessages = info.numMessages// update stats for successfully appended bytes and messages as bytesInRate and messageInRatebrokerTopicStats.topicStats(topicPartition.topic).bytesInRate.mark(records.sizeInBytes)brokerTopicStats.allTopicsStats.bytesInRate.mark(records.sizeInBytes)brokerTopicStats.topicStats(topicPartition.topic).messagesInRate.mark(numAppendedMessages)brokerTopicStats.allTopicsStats.messagesInRate.mark(numAppendedMessages)trace(s"${records.sizeInBytes} written to log $topicPartition beginning at offset " +s"${info.firstOffset.getOrElse(-1)} and ending at offset ${info.lastOffset}")(topicPartition, LogAppendResult(info))} catch {// NOTE: Failed produce requests metric is not incremented for known exceptions// it is supposed to indicate un-expected failures of a broker in handling a produce requestcase e@ (_: UnknownTopicOrPartitionException |_: NotLeaderForPartitionException |_: RecordTooLargeException |_: RecordBatchTooLargeException |_: CorruptRecordException |_: KafkaStorageException |_: InvalidTimestampException) =>(topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e)))case t: Throwable =>val logStartOffset = getPartition(topicPartition) match {case Some(partition) =>partition.logStartOffsetcase _ =>-1}brokerTopicStats.topicStats(topicPartition.topic).failedProduceRequestRate.mark()brokerTopicStats.allTopicsStats.failedProduceRequestRate.mark()error(s"Error processing append operation on partition $topicPartition", t)(topicPartition, LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithLogStartOffset(logStartOffset), Some(t)))}}}}

Partition#appendRecordsToLeader方法

该方法会根据 topic 的 min.isrs 配置以及当前这个 partition 的 isr 情况判断是否可以写入,如果不满足条件,就会抛出 NotEnoughReplicasException 的异常,如果满足条件,就会调用 log.append() 向 replica 追加日志

def appendRecordsToLeader(records: MemoryRecords, isFromClient: Boolean, requiredAcks: Int = 0): LogAppendInfo = {val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {leaderReplicaIfLocal match {case Some(leaderReplica) =>
//获取Log对象val log = leaderReplica.log.get
//获取配置参数中ISR集合中replica的最小个数val minIsr = log.config.minInSyncReplicasval inSyncSize = inSyncReplicas.size// Avoid writing to leader if there are not enough insync replicas to make it safe
//如果ack 设置为-1, 且isr数小于设置的min.isr 时,抛出相应的异常if (inSyncSize < minIsr && requiredAcks == -1) {throw new NotEnoughReplicasException(s"The size of the current ISR ${inSyncReplicas.map(_.brokerId)} " +s"is insufficient to satisfy the min.isr requirement of $minIsr for partition $topicPartition")}
//向Log对象中追加recordval info = log.appendAsLeader(records, leaderEpoch = this.leaderEpoch, isFromClient,interBrokerProtocolVersion)// we may need to increment high watermark since ISR could be down to 1
//判断是否需要增加 HW(追加日志后会进行一次判断)(info, maybeIncrementLeaderHW(leaderReplica))case None =>throw new NotLeaderForPartitionException("Leader not local for partition %s on broker %d".format(topicPartition, localBrokerId))}}// some delayed operations may be unblocked after HW changedif (leaderHWIncremented)tryCompleteDelayedRequests()else {// probably unblock some follower fetch requests since log end offset has been updatedreplicaManager.tryCompleteDelayedFetch(new TopicPartitionOperationKey(topicPartition))}info}

DelayedOperationPurgatory.tryCompleteElseWatch()方法

检查operation是否可以完成,如果不能完成则基于给定的key监听该operation。可以基于多个key监听delayed operation。很有可能在将一个operation加入到监听列表后,这个operation完成了。因此这个operation会被认为是完成的,并且不会再基于剩余的key添加该到监听列表。reaper线程会从监听列表中删除该operation。

tryComplete()方法的成本是和key的数量成正比的。为每个key调用一次tryComplete方法显然是昂贵的。因此,我们通过以下方式执行检查:调用tryComplete方法,如果opration未完成,则添加该operation到监听列表。然后,再调用一次tryComplete方法。这时,如果opration仍是没有执行完成,我们可以可以保证一定能收到future,既然该operation已经在监听列表中。如果在2此调用tryComplete方法之间,该operation已经被其它线程完成了,那么该operation实际上是没必要再添加到监听列表的,但是这只是小事情,既然reaper线程会定期从监听列表中删除过期的operation。

/*** Check if the operation can be completed, if not watch it based on the given watch keys* @param operation the delayed operation to be checked* @param watchKeys keys for bookkeeping the operation* @return true iff the delayed operations can be completed by the caller*/def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = {assert(watchKeys.nonEmpty, "The watch key list can't be empty")
//首次执行tryComplete方法
//此时能访问该operation的只有当前线程,所以即使没有加锁保护也是线程安全的var isCompletedByMe = operation.tryComplete()if (isCompletedByMe)return truevar watchCreated = false
//迭代watchKeys,将operation添加到每个key的opration监听列表for(key <- watchKeys) {// If the operation is already completed, stop adding it to the rest of the watcher list.
//如果operation已经完成,停止添加该operation到剩余key的opration监听列表if (operation.isCompleted)return false
//将operation添加到该key的opration监听列表watchForOperation(key, operation)if (!watchCreated) {watchCreated = trueestimatedTotalOperations.incrementAndGet()}}
//加锁保护,再一次调用tryComplete()方法isCompletedByMe = operation.maybeTryComplete()if (isCompletedByMe)return true// if it cannot be completed by now and hence is watched, add to the expire queue alsoif (!operation.isCompleted) {if (timerEnabled)timeoutTimer.add(operation)if (operation.isCompleted) {// cancel the timer taskoperation.cancel()}}false}

DelayedOperation.maybeTryComplete()方法

maybeTryComplete方法是tryComplete方法的变体,它使得tryComplete方法的调用是线程安全的。它通过Lock#tryLock()方法尝试获得锁。如果锁是空闲的,不被其它线程占用,则tryLock方法立即获得锁并返回true;否则直接返回false,不阻塞等待锁的释放。

/*** Thread-safe variant of tryComplete() that attempts completion only if the lock can be acquired* without blocking.** If threadA acquires the lock and performs the check for completion before completion criteria is met* and threadB satisfies the completion criteria, but fails to acquire the lock because threadA has not* yet released the lock, we need to ensure that completion is attempted again without blocking threadA* or threadB. `tryCompletePending` is set by threadB when it fails to acquire the lock and at least one* of threadA or threadB will attempt completion of the operation if this flag is set. This ensures that* every invocation of `maybeTryComplete` is followed by at least one invocation of `tryComplete` until* the operation is actually completed.*/private[server] def maybeTryComplete(): Boolean = {var retry = falsevar done = falsedo {if (lock.tryLock()) {try {tryCompletePending.set(false)done = tryComplete()} finally {lock.unlock()}// While we were holding the lock, another thread may have invoked `maybeTryComplete` and set// `tryCompletePending`. In this case we should retry.retry = tryCompletePending.get()} else {// Another thread is holding the lock. If `tryCompletePending` is already set and this thread failed to// acquire the lock, then the thread that is holding the lock is guaranteed to see the flag and retry.// Otherwise, we should set the flag and retry on this thread since the thread holding the lock may have// released the lock and returned by the time the flag is set.retry = !tryCompletePending.getAndSet(true)}} while (!isCompleted && retry)done}

DelayedProduce.tryComplete方法

/*** The delayed produce operation can be completed if every partition* it produces to is satisfied by one of the following:** Case A: This broker is no longer the leader: set an error in response* Case B: This broker is the leader:*   B.1 - If there was a local error thrown while checking if at least requiredAcks*         replicas have caught up to this operation: set an error in response*   B.2 - Otherwise, set the response with no error.*/override def tryComplete(): Boolean = {// check for each partition if it still has pending acksproduceMetadata.produceStatus.foreach { case (topicPartition, status) =>trace(s"Checking produce satisfaction for $topicPartition, current status $status")// skip those partitions that have already been satisfiedif (status.acksPending) {val (hasEnough, error) = replicaManager.getPartition(topicPartition) match {case Some(partition) =>if (partition eq ReplicaManager.OfflinePartition)(false, Errors.KAFKA_STORAGE_ERROR)elsepartition.checkEnoughReplicasReachOffset(status.requiredOffset)case None =>// Case A(false, Errors.UNKNOWN_TOPIC_OR_PARTITION)}// Case B.1 || B.2if (error != Errors.NONE || hasEnough) {status.acksPending = falsestatus.responseStatus.error = error}}}// check if every partition has satisfied at least one of case A or Bif (!produceMetadata.produceStatus.values.exists(_.acksPending))forceComplete()elsefalse}

kafka2.2源码分析之handleProduceRequest相关推荐

  1. kafka2.2源码分析之Log日志存储

    概述 Log由一系列LogSegment组成,每个LogSegment都有一个base offset,表示该段中的第一条消息. 新的LogSegment会根据Log的配置策略来创建.配置策略控制了Lo ...

  2. storm-kafka源码分析

    storm-kafka源码分析 @(KAFKA)[kafka, 大数据, storm] storm-kafka源码分析 一概述 一代码结构 二orgapachestormkafka 三orgapach ...

  3. 【Golang源码分析】Go Web常用程序包gorilla/mux的使用与源码简析

    目录[阅读时间:约10分钟] 一.概述 二.对比: gorilla/mux与net/http DefaultServeMux 三.简单使用 四.源码简析 1.NewRouter函数 2.HandleF ...

  4. SpringBoot-web开发(四): SpringMVC的拓展、接管(源码分析)

    [SpringBoot-web系列]前文: SpringBoot-web开发(一): 静态资源的导入(源码分析) SpringBoot-web开发(二): 页面和图标定制(源码分析) SpringBo ...

  5. SpringBoot-web开发(二): 页面和图标定制(源码分析)

    [SpringBoot-web系列]前文: SpringBoot-web开发(一): 静态资源的导入(源码分析) 目录 一.首页 1. 源码分析 2. 访问首页测试 二.动态页面 1. 动态资源目录t ...

  6. SpringBoot-web开发(一): 静态资源的导入(源码分析)

    目录 方式一:通过WebJars 1. 什么是webjars? 2. webjars的使用 3. webjars结构 4. 解析源码 5. 测试访问 方式二:放入静态资源目录 1. 源码分析 2. 测 ...

  7. Yolov3Yolov4网络结构与源码分析

    Yolov3&Yolov4网络结构与源码分析 从2018年Yolov3年提出的两年后,在原作者声名放弃更新Yolo算法后,俄罗斯的Alexey大神扛起了Yolov4的大旗. 文章目录 论文汇总 ...

  8. ViewGroup的Touch事件分发(源码分析)

    Android中Touch事件的分发又分为View和ViewGroup的事件分发,View的touch事件分发相对比较简单,可参考 View的Touch事件分发(一.初步了解) View的Touch事 ...

  9. View的Touch事件分发(二.源码分析)

    Android中Touch事件的分发又分为View和ViewGroup的事件分发,先来看简单的View的touch事件分发. 主要分析View的dispatchTouchEvent()方法和onTou ...

最新文章

  1. linux patches工具,patch工具入门
  2. OpenStack Nova 高性能虚拟机之 CPU 绑定
  3. 用JavaScript实现列数据的标出重复项和去重(解决科学计数法的excel数据去重异常问题)
  4. java中四种默认的权限修饰符,Java中四种访问权限资料整理
  5. linux cd -目录,linux cd
  6. 有线电视网(洛谷-P1273)
  7. Python代码—测试
  8. Linux 内核中的数据结构:双链表,基数树,位图
  9. 注解-@Conditional的使用
  10. 为什么SQL用UPDATE语句更新时更新行数会多3行有触发器有触发器有触发器有触发器有触发器有触发器...
  11. 每天花30分钟看OGRE--(13)Ogre的渲染流程,在渲染时材质是如何起作用的,材质加载和解析...
  12. ArcMap 入门教程
  13. 整人输入指定内容退出html,整人代码
  14. Debian分区工具partman
  15. python开源管理系统_基于Python开源框架Flask的地震信息网络运维管理系统实现
  16. PPT设置密码和加水印的方法
  17. JPG图片怎么转换成Word文档
  18. Windows Embedded Standard 8 入门指南 2 of 5
  19. html打印预览空白,win7系统下使用IE浏览器预览打印页面时显示页面空白
  20. 【牛客竞赛】Increasing Subsequence题解

热门文章

  1. 20220422web前端面试记录
  2. 怎么改变图片的尺寸大小?图片大小如何修改?
  3. iPhone开发部分总结
  4. 计算机键盘复制键,键盘按键设置复制粘贴的方法
  5. ​ 众至科技数据防泄露系统,保护企业办公核心数据
  6. C++入门教程(四十二):函数参数使用引用
  7. 多云定义:什么是多云,多云究竟是什么?
  8. C#测试网络连接测试
  9. 流氓软件强夺用户数据,马斯克截胡扎克伯格!
  10. Java实现规则几何图形问题求解