一、需求

Flume向kafka发送数据时,同一个flume发送到kafka的数据总是固定在某一个partition中。而业务需求是发送的数据在所有的partition平均分布

二、实现

Flume的官方文档:

Kafka Sink uses the topic and key properties from the FlumeEvent headers to send events to Kafka. If topic exists in the headers, the event will be sent to that specific topic, overriding the topic configured for the Sink. If key exists in the headers, the key will used by Kafka to partition the data between the topic partitions. Events with same key will be sent to the same partition. If the key is null, events will be sent to random partitions.

kafka-sink是从header里的key参数的value值来hash到kafka的某个分区中。如果key为null,那么就会随机发布至分区中。事实上key为null被指定到kafka的某个固定分区。

要partition平均分布数据,就向header中写上随机的key,然后数据才会真正的向kafka分区进行随机发布。

官方文档有一个UUID Interceptor,会为每个event的head添加一个随机唯一的key,向flume添加拦截器达到随机分区发送。

在flume添加的配置文件如下:

agent1.sources.nginxlogSource.interceptors = UUIDi1
agent1.sources.nginxlogSource.interceptors.UUIDi1.type=org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
agent1.sources.nginxlogSource.interceptors.UUIDi1.headerName=key
agent1.sources.nginxlogSource.interceptors.UUIDi1.preserveExisting=false

三、出现的问题

由于网络抖动,国外nginx机器连接不上国内的kafka集群的部分机器,nginx机器的flume通道堵塞,内存占有率高,导致Nginx机器的cpu飙升100%,持续几十台机器崩溃发送告警。

故障原因:

flume添加了UUID拦截器,UUID拦截器给Event的header添加了一个key值,flume在发送到kafka中根据key指定了固定分区。由于网络抖动,该kafka分区连接不上,分区的所有数据发送失败回滚到channel通道,失败数据还是以key指定的分区进行重新发送,发送数据一直失败回滚channel通道,直到机器崩溃故障发生。

四、总结

不要使用UUID拦截器进行固定的分区发送,数据量大或者网络抖动容易导致机器崩溃。应该重新编写kafkaSink,flume在发送数据的时候随机生成一个key,发送到不同的分区。就算失败回滚到channel通道也会发送到新的分区。

示例:

KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>(eventTopic, UUID.randomUUID().toString(), eventBody);
messageList.add(data);

Flume均匀发送数据到kafka的partition配置UUID Interceptor生成key的坑相关推荐

  1. 【kafka】flink 发送 数据到 kafka 报错 Memory records is not writable

    1.场景1 1.1 概述 本次环境,kafka的版本为0.9.0.1,主要情况是flink写入数据到kafka shell端是可以发送的,我的程序重启后也是好的,运行一端时间后消息就发不出去了 然后程 ...

  2. JMeter发送数据到Kafka

    最近为了简单做一下性能测试,开始使用JMeter了,因为要先往Kafka中灌入数据,毕竟没用过JMeter,就在网上各种搜,发现写的都不是自己所需要的那种,即使是有的博客看着好像是符合自己要求,但是还 ...

  3. Flume实时采集mysql数据到kafka中并输出

    环境说明 centos7 flume1.9.0(flume-ng-sql-source插件版本1.5.3) jdk1.8 kafka 2.1.1 zookeeper(这个我用的kafka内置的zk) ...

  4. kafka partition java,kafka中partition数量与消费者对应关系以及Java实践

    kafka中partition数量与消费者对应关系以及Java实践 kafka中partition数量与消费者对应关系以及Java实践 kafka是由Apache软件基金会开发的一个开源流处理平台.k ...

  5. java connection 共享_java 使用HttpURLConnection发送数据简单实例

    java 使用HttpURLConnection发送数据简单实例 每个 HttpURLConnection 实例都可用于生成单个请求,但是其他实例可以透明地共享连接到 HTTP 服务器的基础网络.请求 ...

  6. android串口发送二进位,stm32107串口发送数据的数据用串口助手接收发现数据不对...

    下面是我用的STM32单片机串口5的初始化 然后再主函数里面一直发送数据 串口助手的相关配置和串口的配置一样 但是接收的数据和自己发送的数据不一致下面是串口5的初始化(单片机用的是CP2102芯片转U ...

  7. flume avro java 发送数据_flume将数据发送到kafka、hdfs、hive、http、netcat等模式的使用总结...

    1.source为http模式,sink为logger模式,将数据在控制台打印出来. conf配置文件如下: # Name the components on this agent a1.source ...

  8. discard connection丢失数据_python kafka 生产者发送数据的三种方式

    python kafka 生产者发送数据的三种方式 发送方式 同步发送 发送数据耗时最长 有发送数据的状态,不会丢失数据,数据可靠性高 以同步的方式发送消息时,一条一条的发送,对每条消息返回的结果判断 ...

  9. Kafka Sender线程如何发送数据

    文章目录 1.内存缓冲中的Batch,如何被判定可发送出去 2.标识那些元数据未拉取成功的 3.检查筛选出来的Broker是否可以发送数据 3.1 元数据是否已经就位 3.2 是否可以发送请求 3.3 ...

最新文章

  1. java如何确保单线程_java – 任何单线程程序如何成为有效的多线程程序?
  2. 关于DPM(Deformable Part Model)算法中模型可视化的解释
  3. supervisor配置文件中如何添加多个环境变量
  4. 微信公众号之微信买单
  5. YBTOJ洛谷P4331:数字序列(左偏树)
  6. Django(四)数据库
  7. 百度SEO站群PHP进销存源码ERP多仓库管理源码
  8. java中位操作_Java中使用位操作的几个小技巧
  9. 使用join()方法 分隔拆分后的数组
  10. sql server跨服务器修改数据,SQL Server跨数据库服务器查询和跨表更新的详细操作...
  11. 大型网站技术架构(八)——网站的安全架构
  12. Ionic4—JS扩展之ion-refresher下拉刷新
  13. 前端生成二维码 微信小程序
  14. 你会copying了吗?(Effective C++ 12 复制对象时勿忘其每一个成分)
  15. C#实现把图片转换为ico格式
  16. 宝塔Linux面板的安装配置以及基本使用教程(超详细)
  17. 软件测试实习生培训大纲
  18. el-table设置封装
  19. 如何利用拼音首字母查询数据库
  20. 区块链政策频频出台,政务应用多点开花 | 产业区块链一周动态

热门文章

  1. 机器学习——决策树学习
  2. 1010 Lehmer Code (35 分)(思路+详解+树状数组的学习+逆序对+map+vector) 超级详细 Come baby!!!
  3. mysql qps如何查看_一款查看mysql QPS的脚本
  4. 蓝桥杯2015初赛-加法变乘法-枚举
  5. G - 水陆距离 HihoCoder - 1478(广搜+队列先进先出性质)
  6. 2020年学python_Python学习路线图(2020年最新版)
  7. HDU 2588 GCD(欧拉函数)
  8. 邻值查找—算法进阶指南
  9. Codeforces Round #736 (Div. 2) D. Integers Have Friends ST表gcd + 尺取
  10. Codeforces Round #686 (Div. 3) E. Number of Simple Paths 基环树 + 容斥