Kafka 的 Java 消费者如何管理 TCP 连接?
何时创建 TCP 连接?
和生产者不同的是,构建 KafkaConsumer 实例时是不会创建任何 TCP 连接的,TCP 连接是在调用 KafkaConsumer.poll 方法时被创建的。再细粒度地说,在 poll 方法内部有 3 个时机可以创建 TCP 连接。
1.发起 FindCoordinator 请求时。
Coordinator是消费者端的一个组件,驻留在 Broker 端的内存中,负责消费者组的组成员管理和各个消费者的位移提交管理。当消费者程序首次启动调用 poll 方法时,它需要向 Kafka 集群发送一个名为 FindCoordinator 的请求,希望 Kafka 集群告诉它哪个 Broker 是管理它的协调者。
消费者应该向哪个 Broker 发送这类请求呢?理论上任何一个 Broker 都能回答这个问题,也就是说消费者可以发送 FindCoordinator 请求给集群中的任意服务器。kafka的优化是,消费者程序会向集群中当前负载最小的那台 Broker 发送请求。负载是如何评估的呢?看消费者连接的所有 Broker 中,谁的待发送请求最少。这种评估显然是消费者端的单向评估,并非是站在全局角度,因此有的时候也不一定是最优解。
2.连接协调者时。
Broker 处理完上一步发送的 FindCoordinator 请求之后,会返还对应的响应结果(Response),显式地告诉消费者哪个 Broker 是真正的协调者,消费者知晓了真正的协调者后,会创建连向该 Broker 的 Socket 连接。只有成功连入协调者,协调者才能开启正常的组协调操作,比如加入组、等待组分配方案、心跳请求处理、位移获取、位移提交等。
3.消费数据时。
消费者会为每个要消费的分区创建与该分区领导者副本所在 Broker 连接的 TCP。假设消费者要消费 5 个分区的数据,这 5 个分区各自的领导者副本分布在 4 台 Broker 上,那么该消费者在消费时会创建与这 4 台 Broker 的 Socket 连接。
那么消费者总共创建多少个 TCP 连接?
[2019-05-27 10:00:54,142] DEBUG [Consumer clientId=consumer-1, groupId=test] Initiating connection to node localhost:9092 (id: -1 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)…[2019-05-27 10:00:54,188] DEBUG [Consumer clientId=consumer-1, groupId=test] Sending metadata request MetadataRequestData(topics=[MetadataRequestTopic(name=‘t4’)], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to node localhost:9092 (id: -1 rack: null) (org.apache.kafka.clients.NetworkClient:1097)…[2019-05-27 10:00:54,188] TRACE [Consumer clientId=consumer-1, groupId=test] Sending FIND_COORDINATOR {key=test,key_type=0} with correlation id 0 to node -1 (org.apache.kafka.clients.NetworkClient:496)[2019-05-27 10:00:54,203] TRACE [Consumer clientId=consumer-1, groupId=test] Completed receive from node -1 for FIND_COORDINATOR with correlation id 0, received {throttle_time_ms=0,error_code=0,error_message=null, node_id=2,host=localhost,port=9094} (org.apache.kafka.clients.NetworkClient:837)…[2019-05-27 10:00:54,204] DEBUG [Consumer clientId=consumer-1, groupId=test] Initiating connection to node localhost:9094 (id: 2147483645 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)…[2019-05-27 10:00:54,237] DEBUG [Consumer clientId=consumer-1, groupId=test] Initiating connection to node localhost:9094 (id: 2 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)[2019-05-27 10:00:54,237] DEBUG [Consumer clientId=consumer-1, groupId=test] Initiating connection to node localhost:9092 (id: 0 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)[2019-05-27 10:00:54,238] DEBUG [Consumer clientId=consumer-1, groupId=test] Initiating connection to node localhost:9093 (id: 1 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)
日志的第一行是消费者程序创建的第一个 TCP 连接,主要是用于发送 FindCoordinator 请求。由于这是消费者程序创建的第一个连接,此时消费者对于要连接的 Kafka 集群一无所知,因此它连接的 Broker 节点的 ID 是 -1,表示消费者根本不知道要连接的 Kafka Broker 的任何信息。
第二行,消费者复用了刚才创建的那个 Socket 连接,向 Kafka 集群发送元数据请求以获取整个集群的信息。
第三行,消费者程序开始发送 FindCoordinator 请求给第一步中连接的 Broker,即 localhost:9092,也就是 nodeId 等于 -1 的那个。在十几毫秒之后,消费者程序成功地获悉协调者所在的 Broker 信息“node_id = 2”。
此时,消费者就已经知道协调者 Broker 的连接信息了,因此在日志的第五行发起了第二个 Socket 连接,创建了连向 localhost:9094 的 TCP。只有连接了协调者,消费者进程才能正常地开启消费者组的各种功能以及后续的消息消费。
在日志的最后三行中,消费者又分别创建了新的 TCP 连接,主要用于实际的消息获取。要消费的分区的领导者副本在哪台 Broker 上,消费者就要创建连向哪台 Broker 的 TCP。
在这个过程中,日志中的这些 Broker 节点的 ID 在不断变化。有时候是 -1,有时候是 2147483645,只有在最后的时候才回归正常值 0、1 和 2。
-1 :,即消费者程序(其实也不光是消费者,生产者也是这样的机制)首次启动时,对 Kafka 集群一无所知,因此用 -1 来表示尚未获取到 Broker 数据。
2147483645 :它是由 Integer.MAX_VALUE 减去协调者所在 Broker 的真实 ID 计算得来的。在第4行中,协调者 ID 是 2,这个 Socket 连接的节点 ID 就是 Integer.MAX_VALUE 减去 2,即 2147483647 减去 2,也就是 2147483645。这种节点 ID 的标记方式是 Kafka 社区特意为之的结果,目的就是要让组协调请求和真正的数据获取请求使用不同的 Socket 连接。
0、1、2:真实的 Broker ID,也就是在 server.properties 中配置的 broker.id 值。
通常来说,消费者程序会创建 3 类 TCP 连接:
1.确定协调者和获取集群元数据。
2.连接协调者,令其执行组成员管理操作。
3.执行实际的消息获取。
这些 TCP 连接是何时被关闭的呢?
消费者关闭 Socket 也分为主动关闭和 Kafka 自动关闭。主动关闭是指你显式地调用消费者 API 的方法去关闭消费者,具体方式就是手动调用 KafkaConsumer.close() 方法,或者是执行 Kill 命令,不论是 Kill -2 还是 Kill -9;
而 Kafka 自动关闭是由消费者端参数 connection.max.idle.ms控制的,该参数现在的默认值是 9 分钟,即如果某个 Socket 连接上连续 9 分钟都没有任何请求“过境”的话,那么消费者会强行“杀掉”这个 Socket 连接。
如果在编写消费者程序时,你使用了循环的方式来调用 poll 方法消费消息,那么上面提到的所有请求都会被定期发送到 Broker,因此这些 Socket 连接上总是能保证有请求在发送,从而也就实现了“长连接”的效果。
当第三类 TCP 连接成功创建后,消费者程序就会废弃第一类 TCP 连接,之后在定期请求元数据时,它会改为使用第三类 TCP 连接。最终你会发现,第一类 TCP 连接会在后台被默默地关闭掉。对一个运行了一段时间的消费者程序来说,只会有后面两类 TCP 连接存在。
Kafka 的 Java 消费者如何管理 TCP 连接?相关推荐
- tcp实时传输kafka数据_关于Kafka producer管理TCP连接的讨论
在Kafka中,TCP连接的管理交由底层的Selector类(org.apache.kafka.common.network)来维护.Selector类定义了很多数据结构,其中最核心的当属java.n ...
- Kafka | Java 消费者是如何管理TCP连接的? | 极客时间
今天我要和你分享的主题是:Kafka 的 Java 消费者是如何管理 TCP 连接的. 在专栏中,我们专门聊过"Java生产者是如何管理 TCP 连接资源的"这个话题,你应该还有印 ...
- 【Java 网络编程】TCP 连接 断开 机制 ( 三次握手 | 四次挥手 )
文章目录 I TCP 连接建立流程 ( 三次握手 ) II SYN 和 ACK 中的随机值 III TCP 连接建关闭流程 ( 四次挥手 ) IV TCP 连接断开的保证 V 四次挥手的必要性 I T ...
- 基于Java的Socket实现TCP连接
资源下载地址:https://download.csdn.net/download/sheziqiong/86159198 资源下载地址:https://download.csdn.net/downl ...
- TCP 连接的前世今生
前言 大家好!我是盼盼! 之前写了几篇关于算法和 linux 命令的文章,今天来学习下,网络协议相关的知识.不管你是客户端,还是服务端开发,网络协议这块都是要学习和了解的. 工作和面试中,网络协议都会 ...
- kafka partition java,kafka中partition数量与消费者对应关系以及Java实践
kafka中partition数量与消费者对应关系以及Java实践 kafka中partition数量与消费者对应关系以及Java实践 kafka是由Apache软件基金会开发的一个开源流处理平台.k ...
- tcp连接服务器需要响应吗,HTTP的TCP连接管理
Socket是大部分应用层协议的基础,常见的Socket主要有两种类型:TCP和UDP,HTTP协议默认使用的是TCP类型的Socket,80端口. 为什么呢?因为TCP连接可靠,用其它协议也是允许的 ...
- CC00034.kafka——|Hadoopkafka.V19|——|kafka.v19|消费者位移管理.v02|
一.消费者位移管理数据准备 ### --- 准备数据~~~ # 生成消息文件 [root@hadoop ~]# for i in `seq 60`; do echo "hello yanqi ...
- 7.【kafka运维】 kafka-consumer-groups.sh消费者组管理
文章目录 消费者组管理 kafka-consumer-groups.sh 1. 查看消费者列表`--list` 2. 查看消费者组详情`--describe` 3. 删除消费者组`--delete` ...
最新文章
- CentOS7.2中NFS1.3 安装
- Android内存管理之道
- 测量工具(keras)
- gc java root_一个两年Java程序员的面试总结
- SpringBoot入门(1)——创建springBoot项目
- java url特殊字符处理_简单实例处理url特殊符号处理(2种方法)
- Matlab并行编程方法
- TRUNCATE,DELETE,DROP的区别
- 希尔排序听起来有点难,其实很简单
- Linux 下如何查找 MySQL 数据库的数据根目录呢?
- 无法运行的愿意_分享减肥食谱一周瘦10斤,你愿意尝试吗?
- zookeeper专题:使用zookeeper客户端实现动态监听节点并获取数据
- Java基础篇:什么是FileWriter
- Linux中进程的基本概念
- npm下载地址的查询与切换
- 第68天-内网安全-域横向 PTHPTKPTT 哈希票据传
- Openlayers + Vue实现GIS地图的一些常见问题(整理)
- 【BZOJ1502】【NOI2005】月下柠檬树
- 创意示范:苹果如何使用无线充电技术
- 速看|快速软件开发框架突破信息孤岛,高效实现数字化发展!
热门文章
- 1635-超大型 LED 显示屏 ZCMU
- torch-geometric安装详细步骤
- Sqlserver 英文月份格式时间字符串转换为数字型日期
- AI编译器XLA调研
- 今天介绍一款事半功倍的Maya插件包
- 双网卡设置一个外网一个内网_双网卡同时上网,内网外网同时启用的解放办法...
- Linux sed在某行前一行和后一行添加内容
- java 判断日期是周末_java计算两个日期之前的天数实例(排除节假日和周末)
- conda 包安装位置 虚拟环境_conda指定位置配置虚拟环境
- 苹果退款_苹果 App Store 里自动订阅续费的应用可以退款吗?