前提

本文的分析基于kafka 0.9版本的client, 0.10.1.0中已经修改心跳线程为后台线程,并支持设置max.poll.records,参见ChangeLog

使用场景

Kafka是一个高吞吐量的分布式消息系统,在APM的移动端请求数据的处理中,使用了Kafka。Kafka数据使用多线程阻塞的方式进行消费,即每个线程通过poll()的形式消费一个或者多个partition, 每次得到的消息集处理完成之后才会继续进行下一次poll()操作,同时使用了自动提交offset的模式。Rebalance发生的原因有可能是集群的问题,但大部分都在客户端,一旦服务端在设定的超时时间内没有收到消费者发起的心跳,则认为这个消费者已经死掉,就会执行Rebalance动作。

从源码上,我们一路从KafkaConsumer.poll(timeout)跟进来可以看到

    /*** Do one round of polling. In addition to checking for new data, this does any needed* heart-beating, auto-commits, and offset updates.* @param timeout The maximum time to block in the underlying poll* @return The fetched records (may be empty)*/private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {...// 上面是一些检查动作fetcher.initFetches(cluster);client.poll(timeout);return fetcher.fetchedRecords();}

从注释中,我们可以看出poll动作会发出一些列的心跳、自动offset提交和更新的动作。这是我们设定了自动提交的时候,我们的消费者发出心跳和offset的地方。
再进client.poll(timeout)方法中可以看到

    //ConsumerNetworkClient.javaprivate void poll(long timeout, long now, boolean executeDelayedTasks) {...//一些前置的判断// execute scheduled tasksif (executeDelayedTasks)delayedTasks.poll(now);...//其他动作}

从源码里面可以看到会吧delayedTask里面的所有任务执行掉,其中就有我们的心跳任务。 那么,很明显,如果我们在两次poll()调用的间隔做了太多的事情,也就是消费拉取下来的数据花了过长的时间,而没有及时发出心跳,则我们会被判定为死掉的节点,这个时候集群就会发起Rebalance。

Rebalance有什么影响

Rebalance本身是Kafka集群的一个保护设定,用于剔除掉无法消费或者过慢的消费者,然后由于我们的数据量较大,同时后续消费后的数据写入需要走网络IO,很有可能存在依赖的第三方服务存在慢的情况而导致我们超时。
Rebalance对我们数据的影响主要有以下几点:

  1. 数据重复消费: 消费过的数据由于提交offset任务也会失败,在partition被分配给其他消费者的时候,会造成重复消费,数据重复且增加集群压力
  2. Rebalance扩散到整个ConsumerGroup的所有消费者,因为一个消费者的退出,导致整个Group进行了Rebalance,并在一个比较慢的时间内达到稳定状态,影响面较大
  3. 频繁的Rebalance反而降低了消息的消费速度,大部分时间都在重复消费和Rebalance
  4. 数据不能及时消费,会累积lag,在Kafka的TTL之后会丢弃数据

上面的影响对于我们系统来说,都是致命的。

我们遇到Rebalance的场景

首先为了看下我们的rebalance有多么严重,我们增加了ConsumerRebalanceListener,并计算Rebalance发生的频率,同时将Rebalance的信息上报到监控平台上。
我们可以看到,Rebalance出现的非常频繁,一旦开始Rebalance则通常是多个机器多个消费线程同时开始Rebalance,并在一定时间后达到稳定。

同时加了一些日志看看每个partition rebalance需要多长的时间,每个partition rebalance完成都需要20秒左右(当然有些partition会被rebalance到其他消费者去,因为没有响应partition的成对的开始和结束日志),可想而知很频繁的rebalance会有很严重的问题。

2017-05-15 18:00:20,343-consumer:consumer-pool-3 ,topic:foreground.-_-.apm.online,partition:37 rebalance end, rebalance time:14818...
2017-05-15 18:00:20,343-consumer:consumer-pool-3 ,topic:foreground.-_-.apm.online,partition:36 rebalance end, rebalance time:14818...
2017-05-15 18:00:20,344-consumer:consumer-pool-9 ,topic:foreground.-_-.apm.interaction.online,partition:7 rebalance end, rebalance time:20035...
2017-05-15 18:00:20,344-consumer:consumer-pool-9 ,topic:foreground.-_-.apm.interaction.online,partition:6 rebalance end, rebalance time:20035...
2017-05-15 18:00:20,344-consumer:consumer-pool-8 ,topic:foreground.-_-.apm.interaction.online,partition:11 rebalance end, rebalance time:20322...
2017-05-15 18:00:20,344-consumer:consumer-pool-8 ,topic:foreground.-_-.apm.interaction.online,partition:10 rebalance end, rebalance time:20322...
2017-05-15 18:00:20,344-consumer:consumer-pool-1 ,topic:foreground.-_-.apm.web.online,partition:8 rebalance end, rebalance time:15162...
2017-05-15 18:00:20,344-consumer:consumer-pool-1 ,topic:foreground.-_-.apm.web.online,partition:9 rebalance end, rebalance time:15162...
2017-05-15 18:00:20,344-consumer:consumer-pool-6 ,topic:foreground.-_-.apm.diagnose.online,partition:22 rebalance end, rebalance time:15162...
2017-05-15 18:00:20,344-consumer:consumer-pool-6 ,topic:foreground.-_-.apm.diagnose.online,partition:23 rebalance end, rebalance time:15162...

常见且简单的Rebalance场景

我们的业务数据会写入Hbase,最经典的场景就是Hbase集群服务抖动或者我们写入数据造成Hbase RegionServer过热会造成消费到的消息过慢触发心跳超时。这种场景下,我们可以在日志里面明显看到Hbase写入抛出的异常。例如:

  1. 由于集群的抖动,导致我们无法正常写入数据,会造成Rebalance

     2017-03-08 14:31:45,593 411615 [htable-pool1-t99] (AsyncProcess.java:713) INFO org.apache.hadoop.hbase.client.AsyncProcess - #5, table=mam:MobileNetData, attempt=2/10 failed 56 ops, last exception: org.apache.hadoop.hbase.RegionTooBusyException: org.apache.hadoop.hbase.RegionTooBusyException: Above memstore limit, regionName=mam:MobileNetData,5852818ca18dc5fdf63bec8eded1db3c_9223370556190336483,1483673115059.19745fc99b3370aab016af1c8cc70d69., server=hbase8.photo.163.org,60020,1488738168459, memstoreSize=1077613864, blockingMemStoreSize=1073741824at org.apache.hadoop.hbase.regionserver.HRegion.checkResources(HRegion.java:2937)at org.apache.hadoop.hbase.regionserver.HRegion.batchMutate(HRegion.java:2249)at org.apache.hadoop.hbase.regionserver.HRegion.batchMutate(HRegion.java:2216)at org.apache.hadoop.hbase.regionserver.HRegion.batchMutate(HRegion.java:2220)at org.apache.hadoop.hbase.regionserver.HRegionServer.doBatchOp(HRegionServer.java:4478)at org.apache.hadoop.hbase.regionserver.HRegionServer.doNonAtomicRegionMutation(HRegionServer.java:3661)at org.apache.hadoop.hbase.regionserver.HRegionServer.multi(HRegionServer.java:3550)at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:29949)at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2027)at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:108)at org.apache.hadoop.hbase.ipc.RpcExecutor.consumerLoop(RpcExecutor.java:110)at org.apache.hadoop.hbase.ipc.RpcExecutor$1.run(RpcExecutor.java:90)at java.lang.Thread.run(Thread.java:662)
    
  2. 由于写入热点的问题导致的Rebalance

     2017-04-13 13:18:09,809 35209 [htable-pool10-t16] INFO org.apache.hadoop.hbase.client.AsyncProcess - #9, table=mam:MobileDiagnoseData, attempt=3/10 failed 86 ops, last exception: org.apache.hadoop.hbase.NotServingRegionException: org.apache.hadoop.hbase.NotServingRegionException: mam:MobileDiagnoseData,7d56fba948c044a0c7b95709a1d9084e_9223370546420477958_487c7012778ee696320ac91264d33fd1,1491464751441.e2d1589b4b227a56789cf4a5e0d6ec21. is closingat org.apache.hadoop.hbase.regionserver.HRegion.startRegionOperation(HRegion.java:5906)at org.apache.hadoop.hbase.regionserver.HRegion.batchMutate(HRegion.java:2254)at org.apache.hadoop.hbase.regionserver.HRegion.batchMutate(HRegion.java:2216)at org.apache.hadoop.hbase.regionserver.HRegion.batchMutate(HRegion.java:2220)at org.apache.hadoop.hbase.regionserver.HRegionServer.doBatchOp(HRegionServer.java:4478)at org.apache.hadoop.hbase.regionserver.HRegionServer.doNonAtomicRegionMutation(HRegionServer.java:3661)at org.apache.hadoop.hbase.regionserver.HRegionServer.multi(HRegionServer.java:3550)at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:29949)at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2027)at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:108)at org.apache.hadoop.hbase.ipc.RpcExecutor.consumerLoop(RpcExecutor.java:110)at org.apache.hadoop.hbase.ipc.RpcExecutor$1.run(RpcExecutor.java:90)at java.lang.Thread.run(Thread.java:745)on hbase12.xs.163.org,60020,1491464637625, tracking started Thu Apr 13 13:18:06 CST 2017, retrying after 3004 ms, replay 86 ops.
    

上面两种都是很容易解决的Rebalance,这种处理起来也简单,从发生的源头,要么查看是否集群出现问题,要么解决一下写入热点问题就可以了。

然而难解决的是没有出错日志,但是依旧会频繁的Rebalance。

需要进一步分析的异常

为了减少消费Kafka后写入到Hbase的数据不会产生明显的峰谷,我们采取了限流的策略,在写入Hbase的时候,使用RateLimiter获取令牌后写入到Hbase中,这加剧了Rebalance问题的产生,因为每次消费的时间会加上等待令牌产生的时间。 从统计上来看,左边是增加了限流后的Rebalance情况,右侧这部分红框中,则是去掉了限流之后的情况,21点左右Rebalance情况减少。

前面提到了Rebalance的原因就是同步消费poll()操作得到的数据的时间过长导致的,我们解决了这些简单的Case之后,发现还是很经常发生Rebalance,同时为什么限流会加剧Rebalance,只能增加日志来看poll()动作得到的数据的消费时间到底是多长。
这个日志中,poll record size是每次渠道的数据的条数,consumer time是消费这部分数据的时间,poll interval是消费线程两次poll的间隔。
从日志中,我们可以看到每次poll,在数据较多时会poll到1w多条数据,消费时间是3秒多,两次poll之间的间隔是12秒左右。而我们的参数中heartbeat.interval.ms设置的是10秒,而session.timeout.ms设置的是30秒,两次poll间隔12秒显然已经超出了心跳的10秒的间隔。从左边的日志也可以看出,集群发生了rebalance。

上述的日志中,我们有两个疑问:

  1. 能否调整每次拉取到的数据的条数,条数少一些,每次消费也会比较快一些
  2. 消费的时间只有3秒多,总的一次poll加上消费的时间竟然达到了12秒,poll(timeout)我们指定了超时时间为1秒,中间也没有其他操作了,所以只能怀疑poll动作有问题。

对于第1个问题,kafka0.9版本没有设置每次获取的数据条数的参数,在0.10版本中新增了,但是因为集群是0.9,所以这个暂时也没有办法,对于第2个问题,我们先加一些日志看看poll动作

显然poll()的超时时间已经超过1秒。
我们再看一下poll()和超时相关的代码

    @Overridepublic ConsumerRecords<K, V> poll(long timeout) {acquire();try {if (timeout < 0)throw new IllegalArgumentException("Timeout must not be negative");// poll for new data until the timeout expireslong start = time.milliseconds();long remaining = timeout;//循环退出条件是超过了超时时间do {//传入的也是剩余的超时时间Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);if (!records.isEmpty()) {fetcher.initFetches(metadata.fetch());client.quickPoll();return new ConsumerRecords<>(records);}//减去这次poll花掉的时间long elapsed = time.milliseconds() - start;remaining = timeout - elapsed;} while (remaining > 0);return ConsumerRecords.empty();} finally {release();}}

poll()会进行多次的pollOnce(),直到时间用尽。从这个代码片段来看,并不可能出现大于两倍的超时时间的情况,所以我们设定超时时间1秒,但是实际的poll()调用在2秒以内是正常的,可能是做了两次pollOnce()动作,但是日志中的7秒+和8秒+的调用时间是比较奇怪的。我们继续跟踪了代码。最终跟踪到了org.apache.kafka.common.network.Selector,最终poll()的超时逻辑会走到

    private int select(long ms) throws IOException {if (ms < 0L)throw new IllegalArgumentException("timeout should be >= 0");if (ms == 0L)return this.nioSelector.selectNow();elsereturn this.nioSelector.select(ms);}

可以看到这个超时时间只是nio中select的超时时间,并不包含读取数据的时间,所以从kafka集群读取数据的时间过长会导致单次poll的时间超长,感觉与超时的语义不符,应该保证在设定的timeout时间内返回设计才比较合理。这个受集群负载影响,不是我们所能控制的。

回到上面的问题,为什么正常消费也会产生Rebalance以及为什么限流会加剧Rebalance也就有了解释,因为poll()的时间会受集群影响,导致单次poll的时间超长,限流则因为限流获取令牌的等待时间会导致单次消费时间较长,加剧了Rebalance。

解决方案

那么我们如何解决这种问题呢,我们期待的结果是可以Rebalance,但是不应该因为一个消费者消费较慢或者突然的波动,而影响整个集群,导致整个集群Rebalance。很容易想到的方案就是单独控制心跳任务,让Kafka集群知道消费者还活着,但是如果依旧采用目前的消费模式,是做不到的。不过还是有解决方案的,那就是关闭offset的自动提交,由我们手动的管理offset的提交和心跳。方案网上有现成的,可以使用Spring-Kafka参考简书上的文章, 当然也可以自己编码实现,因为项目的紧迫性,我们使用了现成的方案。
我们在另一个业务里面先进行了验证性的部署,修改后的效果可以如图中所示:
上线前晚上的rebalance情况,高峰时期,数据量也较多,Kafka集群负载也较大

 然后我们在第二天下午15点上线了新的Kafka消费者逻辑,可以看到晚上高峰时期也没有Rebalance了

优缺点

陆陆续续经过好久,终于基本解决了Kafka消费者不断的Rebalance的问题,好处很明显,可以解决上述的会导致我们数据的各种问题,集群也不会因为某一个消费线程比较慢而影响整个集群,这是一个恶性循环的过程。但是也有缺点,比如出现过消费线程出现问题,心跳却在继续,数据不会被消费,也不会进行Rebalance。原因还在分析中,但是这种现象毕竟非常少数,可以通过报警和手动操作的方式进行重启等操作。

困扰许久的Kafka Rebalance问题相关推荐

  1. 数据库安全小白紧急求助论坛大神,困扰许久

    数据库安全小白紧急求助论坛大神,困扰许久 mircosoft SQL server 2008 R2怎样才能逃过甚至杜绝外来IP不断尝试SA密码? 求助各位论坛大神,封闭1433端口,设置数据库访问策略 ...

  2. 困扰许久的文泉驿显示慢问题终于解决了

    前言:很久没写了.5月忙着申请Google Summer of Code,六月忙着申请实习,现在忙着实习,果然,blog只有闲着的时候才能写-- 现在在 北京千橡实习中,做ATL/WTL,一切都还算顺 ...

  3. 一个困扰许久的Word嵌入文档问题(求助)

    最近碰到个Word问题,研究了很久,一直没能解决,看各位读者朋友,能不能提供一些更好的解决方案. 软件信息:Microsoft Word 2016(正式版) (1)操作 将某个word或者excel文 ...

  4. 一段困扰许久的防注入代码

    有段时间一直热衷于研究各种waf绕过,一般来说,云WAF可以通过找到网站真实IP来绕过,硬件waf也常因为HTTP协议解析差异导致绕过,但是,代码层的防护往往只能从代码逻辑里寻找绕过思路. 在一些网站 ...

  5. 纯属巧合,解决了一个困扰许久的问题,关于网卡设置时提示“系统找不到指定文件”。

    两年前arp病毒泛滥,我就装了一个arp防火墙,,,结果导致系统出问题不能正常使用,自动卸载无效换成手动卸载. 之后就出现头痛的问题-- 不能设置网卡,包括DNS.IP地址.掩码.网关以及是否自动获取 ...

  6. 每日一皮:产品和开发在线上吵了许久...

    产品和开发在线上吵了许久 于是领导开了一个会议室让他们聊聊 - 往期推荐 每日一皮:软件从业人员表情图... 每日一皮:强大的sudo ... 每日一皮:周六了,想跟你说一句... 每日一皮:一个难以 ...

  7. 犹豫了许久,还是写个年总结记录一下吧

    之所以犹豫了许久,是因为,直觉告诉我,发这篇文章出来,园子里肯定会掀起波澜,而我却身处于漩涡中央.看到园子没以前那么热闹了,那就来吧,豁出去了,生死看淡,不服就干!一起热闹热闹吧!毕竟很快就要过年啦~ ...

  8. Delphi字符串转日期,强大到窒息,VarToDateTime解决了困扰很久的小问题

    Delphi字符串转日期,强大到窒息,VarToDateTime解决了困扰很久的小问题 参考文章: (1)Delphi字符串转日期,强大到窒息,VarToDateTime解决了困扰很久的小问题 (2) ...

  9. 许久不动笔,,再来点人生感悟吧

    过了许久,方才又上来写点东西了, 人似乎都有很强的惰性,如果不靠纸笔,好像就不能有所沉淀,所有的思绪就如同漂絮一般,虚空的让我只能由以种种好似存在过的感觉. 很久没有更新东西了,真的是忘了怎么样的学习 ...

最新文章

  1. DeepLab v2的摘要部分(翻译加理解)
  2. ACM入门之【前缀和】
  3. 【渝粤教育】国家开放大学2018年秋季 1379T人文英语3 参考试题
  4. topcoder SRM712 Div1 LR
  5. PHPExcel导出excel 复制代码
  6. 论策谈百度快照回档和后退的原因
  7. stream去重_List stream 对象 属性去重
  8. 手机端和wap端页面的自适应技术方案
  9. 菜鸟学习oracle
  10. arcgis绘制shp文件
  11. SHOPEX网店系统测试 旗下50万家网站的安全另人担忧
  12. CodeSmith介绍
  13. 债券收益率预测模型_股债收益率模型看A股估值 股债收益率模型(EYBY)是一个经典的股市估值模型,其基本思想是将“股票收益率”(EY)与“债券收益率”(BY)进行对比... - 雪球...
  14. Vue中ref的三种用法
  15. 1125: 上三角矩阵的判断
  16. ios11 mjrefresh 上拉加载更多怎么停不下来了??
  17. 微信视频号视频或直播预约二维码如何生成?
  18. 手机号码归属地api文档
  19. fastapi中使用tortoise-orm(入门)
  20. 程序员提问的智慧(How-To-Ask-Questions-The-Smart-Way)

热门文章

  1. sm是什么职位_SM机油是什么级别,SM级是不是合成机油
  2. 高斯白噪声及matlab语言,matlab 给信号加高斯白噪声
  3. IOS开发之——上传-获取文件的MIMEType(03)
  4. 无人机工程安全巡检具体如何实践?
  5. 一、数据可视化之堆叠面积图 - Stacked Area Graph
  6. MyBatis的基础使用
  7. Python小白的数学建模课-12.非线性规划
  8. 游戏服务器端编程书籍
  9. Vue自定义动画/过渡
  10. 07、ADC的有效位数与有效分辨率的区别