0x00 概述

kafka server虽然原则上是兼容详细的client,但只是高版本的Server端兼容低版本的Client端;

在有高版本Client端连接时,会导致低版本Server集群会hang住,严重的话直接导致 kafka broker进程僵死,同时也会导致其他的客户端僵死。

0x01 案发现场

生产端疯狂告警

在一个月黑风高的夜晚,我们kafka生产端开始疯狂告警,出现大量程序队列堵塞、数据写入失败、写入性能下降的告警。

我们先看看生产端程序的日志:

在生产端采用参数调优、增大并发、服务重启等一系列手段而无果后,我们将问题排查锁定在后端kafka集群。

集群异常日志与分析

我们看到服务端频繁有如下异常日志:

从google的信息来看,可能是由于高版本的客户端连接集群而发送了kafka服务端不支持的请求。

0x02 问题追踪与解决

开启Trace日志

正常日志级别下,日志是比较稀疏,我们把异常前一条相关日志的消费组提取出来进行分析,发现其完全是一个正常版本的客户端。且其日志时间与异常日志时间间隔较大(约7s),直接相关性不大。

快速瞄了kafka服务端的源码得知: 想要精确查询到每个request日志需要开启trace日志。如图修改配置文件:

日志分析

我们检索日志,进行分析

事后复盘时发现从日志排查这类问题更方便一些

寻找异常任务

我们通过来源连接的ip与端口,定位到对应storm任务的日志,果然存在高版本客户端连接的问题。且任务启动时间与数据写入异常时间点完全吻合。

kill任务后集群逐渐恢复,数据写入恢复正常。

0x03 深入分析

现场临时恢复了,但我们对问题深入的分析才刚刚开始。

既然问题源自异常连接,那我们首先需要回顾一下kafka的网络通信模型。

kafka的网络通信模型

熟悉kafka的同学都知道,kafka的网络通信模型是1(1个Acceptor线程)+N(N个Processor线程)+M(M个业务处理线程)。

线程数

线程名

线程具体说明

1

kafka-socket-acceptor_%x

Acceptor线程,负责监听Client端发起的请求

N

kafka-network-thread_%d

Processor线程,负责对Socket进行读写

M

kafka-request-handler-_%d

Worker线程,处理具体的业务逻辑并生成Response返回

Kafka网络通信层的完整框架图如下图所示:

为什么会数组越界

从源码可以看出的kafka集群支持如下ApiKey的请求:

PRODUCE(0, "Produce"),
FETCH(1, "Fetch"),
LIST_OFFSETS(2, "Offsets"),
METADATA(3, "Metadata"),
LEADER_AND_ISR(4, "LeaderAndIsr"),
STOP_REPLICA(5, "StopReplica"),
UPDATE_METADATA_KEY(6, "UpdateMetadata"),
CONTROLLED_SHUTDOWN_KEY(7, "ControlledShutdown"),
OFFSET_COMMIT(8, "OffsetCommit"),
OFFSET_FETCH(9, "OffsetFetch"),
GROUP_COORDINATOR(10, "GroupCoordinator"),
JOIN_GROUP(11, "JoinGroup"),
HEARTBEAT(12, "Heartbeat"),
LEAVE_GROUP(13, "LeaveGroup"),
SYNC_GROUP(14, "SyncGroup"),
DESCRIBE_GROUPS(15, "DescribeGroups"),

并将ApiKey整合成一个数组,这样getRequest()就能把客户端对应Apikey的请求引导到对应的处理方法。

static {int maxKey = -1;for (ApiKeys key : ()) {maxKey = (maxKey, );}codeToType = new ApiKeys[maxKey + 1];for (ApiKeys key : ()) {codeToType[] = key;}MAX_API_KEY = maxKey;
}

.0对应的发现,kafka随着版本升级已经新增了大量Apikey

SASL_HANDSHAKE(17, "SaslHandshake", (), ()),
API_VERSIONS(18, "ApiVersions", (), ()) {@Overridepublic Struct parseResponse(short version, ByteBuffer buffer) {// Fallback to version 0 for ApiVersions response. If a client sends an ApiVersionsRequest// using a version higher than that supported by the broker, a version 0 response is sent// to the client indicating UNSUPPORTED_VERSION.return parseResponse(version, buffer, (short) 0);}
},
CREATE_TOPICS(19, "CreateTopics", (), ()),DELETE_TOPICS(20, "DeleteTopics", (), ()),DELETE_RECORDS(21, "DeleteRecords", (), ()),
INIT_PRODUCER_ID(22, "InitProducerId", (),()),
OFFSET_FOR_LEADER_EPOCH(23, "OffsetForLeaderEpoch", false, (),()),
ADD_PARTITIONS_TO_TXN(24, "AddPartitionsToTxn", false, RecordBatch.MAGIC_VALUE_V2,AddPartitionsToTxnRequest.schemaVersions(), ()),
ADD_OFFSETS_TO_TXN(25, "AddOffsetsToTxn", false, RecordBatch.MAGIC_VALUE_V2, AddOffsetsToTxnRequest.schemaVersions(),()),
END_TXN(26, "EndTxn", false, RecordBatch.MAGIC_VALUE_V2, EndTxnRequest.schemaVersions(),()),
WRITE_TXN_MARKERS(27, "WriteTxnMarkers", true, RecordBatch.MAGIC_VALUE_V2, WriteTxnMarkersRequest.schemaVersions(),()),
TXN_OFFSET_COMMIT(28, "TxnOffsetCommit", false, RecordBatch.MAGIC_VALUE_V2, TxnOffsetCommitRequest.schemaVersions(),()),
DESCRIBE_ACLS(29, "DescribeAcls", (), ()),
CREATE_ACLS(30, "CreateAcls", (), ()),
DELETE_ACLS(31, "DeleteAcls", (), ()),
DESCRIBE_CONFIGS(32, "DescribeConfigs", (),()),
ALTER_CONFIGS(33, "AlterConfigs", (),()),
ALTER_REPLICA_LOG_DIRS(34, "AlterReplicaLogDirs", (),()),
DESCRIBE_LOG_DIRS(35, "DescribeLogDirs", (),()),
SASL_AUTHENTICATE(36, "SaslAuthenticate", (),()),
CREATE_PARTITIONS(37, "CreatePartitions", (),()),
CREATE_DELEGATION_TOKEN(38, "CreateDelegationToken", (), ()),RENEW_DELEGATION_TOKEN(39, "RenewDelegationToken", (), ()),EXPIRE_DELEGATION_TOKEN(40, "ExpireDelegationToken", (), ()),DESCRIBE_DELEGATION_TOKEN(41, "DescribeDelegationToken", (), ()),DELETE_GROUPS(42, "DeleteGroups", (), ()),ELECT_PREFERRED_LEADERS(43, "ElectPreferredLeaders", ,

从日志可以看出这次服务端数组越界的index是18,对应的客户端ApiVersions请求,从名字可以推测还是高版本kafka客户端的高频请求,而只支持0~16的请求,所以有相关异常。

为什么会导致集群hang住

受益于java类语言的异常机制,scala程序还是相对健壮。我们需要深入研究一下为何一个小小的数组越界的影响范围超过了单次请求,甚至会导致整个集群不稳定。

借助日志,我们知道应该从的入手(kafka基于java nio实现了高性能SocketServer,据说实现相对优美,有时间再细品)。

override def run() {startupComplete()while(isRunning) {try {// setup any new connections that have been queued up        configureNewConnections()// register any new responses for writing        processNewResponses()// 重写了selectortry {(300)} catch {case e @ (_: IllegalStateException | _: IOException) =>error("Closing processor %s due to illegal state or IO exception".format(id))swallow(closeAll())shutdownComplete()throw e}{ receive =>try {val channel = ()val session = (new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName),)val req = (processor = id, connectionId = , session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol)(req)} catch {// 重点,这里没有捕获数组越界异常!导致被外围的异常捕获,后续逻辑没有处理case e @ (_: InvalidRequestException | _: SchemaException) =>// note that even though we got an exception, we can assume that  is valid. Issues with constructing a valid receive object were handled earliererror("Closing socket for " +  + " because of error", e)close(selector, )}()}{ send =>val resp = ().getOrElse {thrownew IllegalStateException(s"Send for ${} completed, but not in `inflightResponses`")}()()}{ connectionId =>val remoteHost = (connectionId).getOrElse {thrownew IllegalStateException(s"connectionId has unexpected format: $connectionId")}.remoteHost// the channel has been closed by the selector but the quotas still need to be updated          connectionQuotas.dec((remoteHost))}} catch {// We catch all the throwables here to prevent the processor thread from exiting. We do this because// letting a processor exit might cause bigger impact on the broker. Usually the exceptions thrown would// be either associated with a specific socket channel or a bad request. We just ignore the bad socket channel// or request. This behavior might need to be reviewed if we see an exception that need the entire broker to stop.case e : ControlThrowable => throw ecase e : Throwable =>error("Processor got uncaught exception.", e)}}debug("Closing selector - processor " + id)swallowError(closeAll())shutdownComplete()
}

SocketServer有小段的异常捕获,假如单端异常影响范围是有限。但是在第二小段的异常捕获却没有捕获数组越界异常,直接导致其被外围的异常捕获退出而不处理接下来的逻辑,从而会漏处理一些Request,从而导致一些关键的Request异常(如broker之间通信、生产程序与broker通信),从而导致整个集群出现问题数据写入异常。

0x04 事后总结

这次问题本质上是版本kafka的bug。

总所周知,kafka虽然原则上是支持向下兼容,但只是高版本的服务端兼容低版本的客户端。在有高版本客户端连接时,集群会hang住,严重的话直接导致broker进程僵死,同时也会导致其他的客户端僵死。

改进方案一览表

周期

改进方案

短期

做好宣导和力所能及的管控: 严禁用户使用高版本客户端连接集群;kafka服务端对数组越界的日志进行监控告警

中期

评估是否可以对kafka服务端代码打补丁

长期

升级或迁移到高版本集群

rdkafka解决方案

使用c++客户端rdkafka消费我们的kafka。经过沟通后,使用如下方法安全连接到kafka集群,供大家参考:

在rdkafka构建配置conf时,把配置下,就可以了。

0x05 参考

  • https://www.jianshu.com/p/d2cbaae38014

  • https://www.imooc.com/article/36519

  • 《 throws ArrayIndexOutOfBoundsException》https://issues.apache.org/jira/browse/KAFKA-359

0x06 转载

c++排查线程hang住_Kafka学习笔记之kafka高版本Client连接0.9Server引发的血案排查 - 时光飞逝,逝者如斯...相关推荐

  1. Java 线程同步与死锁 学习笔记

    Java 线程同步与死锁 学习笔记 Java 线程同步与死锁 学习笔记 1 多线程共享数据 2 线程同步 3 同步准则 4 线程死锁 1. 多线程共享数据 在多线程操作中,多个线程可能同时处理同一个资 ...

  2. Windows系统调用学习笔记(二)—— 3环进0环

    Windows系统调用学习笔记(二)-- 3环进0环 要点回顾 基本概念 _KUSER_SHARED_DATA 0x7FFE0300 实验:判断CPU是否支持快速调用 第一步:修改EAX=1 第二步: ...

  3. websocket 获取连接id_Swoole学习笔记七:搭建WebSocket长连接 之 使用 USER_ID 作为身份凭证...

    Swoole学习笔记七:搭建WebSocket长连接 之 使用 USER_ID 作为身份凭证 2年前 阅读 3678 评论 0 喜欢 0 ### 0.前言 前面基本的WebSocket操作,我们基本都 ...

  4. B站台湾大学郭彦甫|MATLAB 学习笔记|06 高阶绘图 Advanced Plot

    MATLAB学习笔记(06 高阶绘图 Advanced Plot) 如果想获得更好浏览体验的朋友可以转到下面链接 06 1. 对数图 (Logarithm Plots) x = logspace(-1 ...

  5. oracle 操作系统升级,学习笔记:Oracle升级 linux操作系统10.2.0.1 升级至 10.2.0.3详细过程...

    天萃荷净 记录一次用户现场Oracle 10G数据库升级的过程,Linux操作环境 FOR 10.2.0.1 to 10.2.0.3的版本升级过程 数据库版本linux 32位(10.2.0.1升级到 ...

  6. GD32学习笔记1(高难度工程,点亮一个LED灯)

    系列文章目录 第一章 GD32学习笔记1(高难度工程,点亮一个LED灯) 文章目录 系列文章目录 前言 一.工作流程 二.新建工程的准备工作 三.新建工程 四.工程目录管理 五.代码实现 1.初始化 ...

  7. OpenCV学习笔记(三十一)——让demo在他人电脑跑起来 OpenCV学习笔记(三十二)——制作静态库的demo,没有dll也能hold住 OpenCV学习笔记(三十三)——用haar特征训练自己

    OpenCV学习笔记(三十一)--让demo在他人电脑跑起来 这一节的内容感觉比较土鳖.这从来就是一个老生常谈的问题.学MFC的时候就知道这个事情了,那时候记得老师强调多次,如果写的demo想在人家那 ...

  8. ESP8266学习笔记6:ESP8266规范wifi连接操作

    一.前言 我整理了从2015年至今关于ESP8266的学习笔记,梳理出来了开发环境.基础功能.进阶学习三大部分.方便自己和他人.可点此查看,欢迎交流. 之前在笔记4<ESP8266的SmartC ...

  9. TWAIN学习笔记006 探索TWAIN之DS连接及扫描

    前一篇中我们已经成功连接了DSM并找到了所有可用的DS.本文中我们试着连接其中一个DS并完成一次扫描. 第一步,选择并连接一个DS,将TWAIN状态由3转到4. 注:本文选用了测试用的DS - TWA ...

最新文章

  1. cgroup 介绍 与使用
  2. 连接器篇(一) 低频系列
  3. tf.ConfigProto()函数
  4. 华为鸿蒙分布式系统2020,鸿蒙2.0来了!华为开发者大会HDC 2020宣布
  5. 一本通1613打印文章
  6. Spring Boot 2.0---使用Swagger2构建强大的API文档
  7. Disconnected from the target VM, address:xxxx 或者 Process finished with exit code 1 终极解决办法 idea
  8. python35安装教程_python详细安装教程,非常值得看的一篇文章
  9. 001-keras简介
  10. kali使用rz sz命令
  11. Centos7安装iNode客户端
  12. 【python使用】使用python读取mid/mif文件,高精地图解析
  13. 将 .qrc文件转换成 .py文件的解决方法
  14. 谷歌大脑2017总结下篇:Jeff Dean梳理6大领域研究
  15. 如何禁用电脑文件共享
  16. tungsten-replication实现mysql-mongo数据同步
  17. JavaScript获取浏览器可视区域的宽高
  18. 从国外官网github下载各种软件安装包项目太慢怎么办
  19. 高中数理化杂志高中数理化杂志社高中数理化编辑部2022年第23期目录
  20. 了解Robocode

热门文章

  1. 我的第一篇paper
  2. HTML5 WebSocket之HelloWorld
  3. 使用AWSTATS自动分析Nginx日志
  4. 用“组策略”修改注册表十大个经典范例
  5. 对付感冒的十二种偏方
  6. C【C#公共帮助类】分页逻辑处理类
  7. 【C# 复习总结】类、继承和接口
  8. vuex——做简单的购物车功能
  9. 一个强迫症的Git 选择
  10. ACM数论之旅17---反演定理 第一回 二项式反演(神说要有光 于是就有了光(´・ω・`))...