1.概述

本文是 https://www.cnblogs.com/huxi2b/p/11262995.html 的补充

Kafka Broker端处理请求采用Reactor模型。每台Broker上有个类似于Dispatcher的Acceptor线程,还有若干个处理请求的Processor线程(当然真正处理请求逻辑的线程不是Processor,实际上是KafkaRequestHandler)。每个Processor线程启动后大致做以下这么几件事情:

  1. 设置新的入站连接

  2. 处理新的请求响应(所谓的处理也就是放入到响应队列中)

  3. 执行Selector.select操作获取那些准备完毕的IO操作

  4. 接收新的入站请求

  5. 执行已发送响应的回调逻辑

  6. 处理已断开连接

每个Broker启动之后它创建的Processor线程会不停地执行以上这些动作,循环往复,直至Broker被关闭。

我们重点看看第一步中的逻辑,以下是1.1.1版本的源码(选择1.1.1版本不是特意的,其实所有2.3版本之前都是差不多的情形):

/*** Register any new connections that have been queued up*/private def configureNewConnections() {while (!newConnections.isEmpty) {val channel = newConnections.poll()try {debug(s"Processor $id listening to new connection from ${channel.socket.getRemoteSocketAddress}")selector.register(connectionId(channel.socket), channel)} catch {// We explicitly catch all exceptions and close the socket to avoid a socket leak.case e: Throwable =>val remoteAddress = channel.socket.getRemoteSocketAddress// need to close the channel here to avoid a socket leak.close(channel)processException(s"Processor $id closed connection from $remoteAddress", e)}}}

注意我标成红色的语句。基本上Processor线程设置新入站连接的方式就是一次性处理完才罢休。代码中的newConnections是java.util.concurrent.ArrayBlockingQueue实例。Acceptor线程也会访问newConnections,因此必须是线程安全的。

这种一次性处理完成才收手的做法在某些情况下是有风险的,比如当Kafka集群遭遇到DDOS攻击时,外部IP会创建海量的入站连接全部砸向newConnections中。此时Processor线程运行时会一直尝试消耗掉这些新连接,否则它不会干其他事情——比如处理请求等。换句话说,目前Kafka对新入站连接的处理优先级要高于已有连接。当遭遇连接风暴时,Kafka Broker端会优先处理新连接,因此可能造成已有连接上的请求处理被暂停,并最终导致超时。这样客户端得到请求超时通知后会会进一步地发送新的请求,因而出现雪崩效应。

另外Broker端维护每个连接也不是没有开销的。连接信息本身肯定要占用一些内容资源。如果是启用了SSL的连接,Kafka为额外为其维护一个48KB的临时缓冲区。因此一旦遭遇连接风暴,OOM错误是很常见的。

鉴于这些原因,社区在2.3版本改进了Broker端处理新连接请求的方式。首先阻塞队列保存新连接的个数不再是没有限制了,而是被固定为20,即每个Processor的新连接队列最大就是20个连接——这个写死在代码里面了,目前没法修改。第二、社区引入了新参数max.connections,用于控制Broker端所允许连接的最大连接数。你可以调节这个参数来控制一个Broker最多能接收多少个入站连接。这个参数可以在server.properties中被设置,也可以使用kafka-configs脚本动态修改。max.connections是全局性的,你也可以给每个监听器设置不同的连接数上限。比如你的监听器中同时使用了PLAINTEXT和SSL,那么你能够使用listener.name.plaintext.max.connections和listener.name.ssl.max.connections来为这两个listeners配置各自的连接数,命令如下:

$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --add-config max.connections=100$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --add-config listener.name.plaintext.max.connections=80
Completed updating config for broker: 0.$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --add-config listener.name.ssl.max.connections=80
Completed updating config for broker: 0.

第三是Kafka Broker的每个Processor线程会在每轮任务结束之前尝试去关闭多余的连接。判断是否需要关闭多余连接的依据有两点:1. 总的连接数超过了max.connections值;2. 你为Broker设置了多个监听器,但Kafka会保护Broker内部连接使用的那个监听器。比如你如果设置了多个监听器:PLAINTEXT://9092, SSL://9093,SASL://9094,然后设置inter.broker.listener.name=SSL,那么SSL这套监听器下的连接是不会被Processor强行关闭的。

最后提一句,如果所有Processor的阻塞队列都满了, 那么前面的Acceptor线程会阻塞住,不会再接收任何入站请求。社区新增加了一个JMX指标来计算Acceptor线程被阻塞的时间比例:kafka.network:type=Acceptor,name=AcceptorBlockedPercent,listener={listenerName}

2. kafka 2.3的代码

/*** Register any new connections that have been queued up. The number of connections processed* in each iteration is limited to ensure that traffic and connection close notifications of* existing channels are handled promptly.** 注册任何已排队的新连接。每个迭代中处理的连接数量有限,以确保及时处理现有通道的流量和连接关闭通知。** 从并发队列Q1里取出SocketChannel,添加到自身的nio selector中,监听读事件;* 队列中有新的SocketChannel,首先为通道注册OP_READ事件到统一的选择器上*/private def configureNewConnections() {var connectionsProcessed = 0// 队列中有新的SocketChannel,遍历newConnections队列while (connectionsProcessed < connectionQueueSize && !newConnections.isEmpty) {// 从队列中弹出SocketChannel,一个SocketChannel只会注册一个val channel = newConnections.poll()try {debug(s"Processor $id listening to new connection from ${channel.socket.getRemoteSocketAddress}")// connectionId 从通道中获取本地服务端和远程客户端的地址和端口,构造唯一的 connectionId// selector.register 注册通道的读事件selector.register(connectionId(channel.socket), channel)connectionsProcessed += 1} catch {// We explicitly catch all exceptions and close the socket to avoid a socket leak.case e: Throwable =>val remoteAddress = channel.socket.getRemoteSocketAddress// need to close the channel here to avoid a socket leak.close(listenerName, channel)processException(s"Processor $id closed connection from $remoteAddress", e)}}}

【kafka】kafka 2.3 关于控制Broker端入站连接数的讨论相关推荐

  1. kafka部分重要参数配置-broker端参数

    broker端参数主要在config/server.properties目录下设置: 启动命令:nohup ./kafka-server-start.sh -daemon ../config/serv ...

  2. Kafka发送超过broker限定大小的消息时Client和Broker端各自会有什么异常?

    欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 欢迎跳转到本文的原文链接:https: ...

  3. kafka是什么_Kafka的Controller Broker是什么

    控制器组件(Controller),是 Apache Kafka 的核心组件.它的主要作用是在 Apache ZooKeeper 的帮助下管理和协调整个 Kafka 集群.集群中任意一台 Broker ...

  4. 【Kafka】测试集群中Broker故障对客户端的影响

    本文主要测试Kafka集群中Broker节点故障对客户端的影响. 集群信息:4个broker.topic:100+(每个topic30个partition).集群加密方式:plaintext.存储:c ...

  5. [kafka]kafka中的zookeeper是做什么的?

    前言 为什么自己要整理博客和学习笔记呢?是想把知识系统的,有条理的归纳在一起~ 而且一个东西的完成,也很有成就感,还可以打卡某一个知识点. 标红可以快速回忆自己整理过的知识~ ZooKeeper是什么 ...

  6. Kafka | Kafka副本机制详解

    今天我要和你分享的主题是:Apache Kafka 的副本机制. 所谓的副本机制(Replication),也可以称之为备份机制,通常是指分布式系统在多台网络互联的机器上保存有相同的数据拷贝.副本机制 ...

  7. [Kafka] Kafka基本架构

    [Kafka] Kafka基本架构 [Kafka] Kafka基本架构 [Kafka] Kafka基本架构 生产者Producer :生产信息: 消费者Consumer :订阅主题.消费信息: 代理B ...

  8. P2中ZYNQ的PS控制PL端LED

    第一个ZYNQ的实验,用于熟悉开发环境和板卡,通过GPIO控制LED,由于P2在ps端没有LED,所以需要通过axi总线控制PL端LED. 1.参考设计 参考的是黑金的<PL 端和 PS 端的协 ...

  9. html5手机排名,手机网站排名怎么做?移动端手机站SEO排名的13个要点

    手机网站最重要的就是不同机型的完美适配,现在大大小小尺寸的手机那么多,如果你的网站可以做到不管任何尺寸的手机都很好适配的话,那想做手机网站的排名工作已经赢在第一步了! 1:百度官方意见:使用合理的di ...

最新文章

  1. Redux 入门教程(二):中间件与异步操作
  2. php 接收多图片base64
  3. skimage.io.imread vs caffe.io.load_image
  4. JAVA面试题集收藏大放送
  5. linux vmware 安装后无法桥接到物理网卡的解决办法
  6. TopOn的两种测试方法
  7. ue4缓存位置怎么改_[UE4]动态液体材质浅谈
  8. PySpark-Recipes : RDD对象的基本操作
  9. Android自适应国际化语言
  10. Android mc怎么和win10联机,大更新我的世界手机版/win10版联机完美互通
  11. 用python实现简版区块链-交易(2)
  12. 蔡甸17万亩粮田丰收 国稻种芯:夏汛蓄洪水护住28天抗旱期
  13. deepfacelab训练多久_DeepFaceLab进阶:H128,DF,SAE模型有何不同?哪个最好?
  14. 2022年3月18到5月18的思考
  15. 云电脑有显卡吗?云电脑怎么做画面处理?
  16. iOS:xcode5 自定义模板
  17. Cisco服务器硬盘状态jbod,2018-11-06 JBOD模式下LSI9361RAID卡操作步骤
  18. 若梦博客-优质个人博客
  19. 【SpringBoot】11、SpringBoot中使用Lombok
  20. 电脑计算机word2007的介绍,word2007电脑版

热门文章

  1. 泡泡玛特上市首日涨79.22%报69港元 总市值953亿港元
  2. 三星官方确认:vivo将首发搭载Exynos 1080旗舰芯片
  3. 北京文化:截至10月8日 来源于《我和我的家乡》的收益约为8000万元-1亿元
  4. 迷惑行为!淘宝上线新版“相亲名片”:上来先告诉相亲对象你花了多少钱?...
  5. 滴滴顺风车上线新功能,特殊时期便捷出行
  6. 会玩!今年天猫双11可以买房了 还是特价 网友:满300减40吗?
  7. 有望年前发布?魅族16s Pro Plus曝光:下半年旗舰担当
  8. 任正非:6G华为也是领先世界 或在十年后投入使用
  9. 全新骁龙855 Plus加持!ROG游戏手机2下周发布:无惧逆风挑战
  10. 苹果这个酷炫的项目要流产了?市场未爆发或成主因