Flume系列:Flume Sink使用
目录
Apache Hadoop生态-目录汇总-持续更新
1:HDFS Sink
HDFS小文件的处理
HDFS存入大量小文件的影响:
HDFS小文件处理:
2:logger Sink
3:写入Kafka - 可以使用kafka channel代替
Apache Hadoop生态-目录汇总-持续更新
系统环境:centos7
Java环境:Java8
Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。
Sink 组件目的地包括 hdfs、logger(常用语测试)、avro、thrift、ipc、file、HBase、solr、自定义。
1:HDFS Sink
# 1:定义组件
kafka_flume_hdfs.sources = r1
kafka_flume_hdfs.channels = c1
kafka_flume_hdfs.sinks = k1# 2:定义source
这里主要介绍Channel顾这里省略,到source模块查看写法# 3:定义channel
这里主要介绍sources顾这里省略,到channel模块查看写法# 4:定义sink
kafka_flume_hdfs.sinks.k1.type = hdfs
kafka_flume_hdfs.sinks.k1.hdfs.path = hdfs://hadoop322ha/project/v4/log/topic_log/%Y-%m-%d# 上传文件的前缀
kafka_flume_hdfs.sinks.k1.hdfs.filePrefix = logs-# 设置是否需要滚动生成文件,比如1小时一个, 如果设置为true需要设置对应的,roundValue,roundUnit
kafka_flume_hdfs.sinks.k1.hdfs.round = false## 控制生成的小文件# 控制多久滚动一次文件,防止凑不够rollSize卡住, 正常设置3600,这里测试为了快速写出
kafka_flume_hdfs.sinks.k1.hdfs.rollInterval = 20# 控制文件多大后,滚动文件,128M滚动文件
kafka_flume_hdfs.sinks.k1.hdfs.rollSize = 134217728# 多少个events滚动文件,一般不指定写0
kafka_flume_hdfs.sinks.k1.hdfs.rollCount = 0## 配置输出类型CompressedStream(压缩流),DataStream(原样输出),与压缩# 压缩流
kafka_flume_hdfs.sinks.k1.hdfs.fileType = CompressedStream# 压缩类型
kafka_flume_hdfs.sinks.k1.hdfs.codeC = lzop# 5:定义关联关系
kafka_flume_hdfs.sources.r1.channels = c1
kafka_flume_hdfs.sinks.k1.channel = c1
sink到hdfs注意timestamp,默认是从Flume event headers里取的,如果header里没有不配useLocalTimeStamp 会直接报错
2022-12-06 11:47:33,481 ERROR hdfs.HDFSEventSink: process failed
java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
HDFS小文件的处理
HDFS存入大量小文件的影响:
元数据层面:每个小文件都有一份元数据,小文件过多会占用Namenode服务器大量内存,影响Namenode性能和使用寿命。
计算层面:默认情况下MR会对每个小文件启用一个Map任务计算(默认1G内存),非常影响计算性能。同时也影响磁盘寻址时间。
HDFS小文件处理:
通过调整:hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount 这三个参数,控制小文件的生成
①文件在达到128M时会滚动生成新文件
②文件创建超3600秒时会滚动生成新文件
hdfs.rollInterval=3600
hdfs.rollSize=134217728 #128M滚动文件
hdfs.rollCount =0## 对hdfs进行压缩
a1.sinks.k1.hdfs.fileType = CompressedStream # 压缩流
a1.sinks.k1.hdfs.codeC = lzop注意:hdfs要开启对应的压缩格式
2:logger Sink
# 1:定义组件
a1.sources = r1
a1.sinks = k1
a1.channels = c1# 2:定义source
这里主要介绍Channel顾这里省略,到source模块查看写法# 3:定义channel
这里主要介绍sources顾这里省略,到channel模块查看写法# 4:定义sink
a1.sinks.k1.type = logger # 表示a1的输出目的地是控制台logger类型# 5:定义关联关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动方式:flume-ng agent --name a1 --conf-file flume-netcat-logger.conf -Dflume.root.logger=INFO,console
3:写入Kafka - 可以使用kafka channel代替
Flume系列:Flume Sink使用相关推荐
- flume系列之:监控JMX reporter
flume系列之:监控JMX reporter flume指标监控设置官方文档: https://flume.apache.org/FlumeUserGuide.html#monitoring 通过使 ...
- flume系列之:flume基于kafka.topics和kafka.topics.regex两种方式匹配Kafka Topic
flume系列之:flume基于kafka.topics和kafka.topics.regex两种方式匹配Kafka Topic 一.flume基于kafka.topics匹配Kafka Topic ...
- flume系列之:监控flume上个小时生成的HDFS文件是否有损毁,并发送告警信息
flume系列之:监控flume上个小时生成的HDFS文件是否有损毁,并发送告警信息 一.查看HDFS文件状态背景知识 二.登陆远程服务器代码 三.获取当前时间和上个小时时间 四.访问hdfs执行ke ...
- flume系列之:处理flume 678M损毁文件
flume系列之:处理flume 678M损毁文件 一.查看损毁文件 二.删除损毁文件 一.查看损毁文件 hdu -h /com-nio-insight-cn-prod/raw/kafka/*/*/* ...
- flume系列之:-Xmx20m -Xms1024m -Xmx2048m
flume系列之:-Xmx20m -Xms1024m -Xmx2048m Java HotSpot™ 64-Bit Server VM warning: INFO: os::commit_memory ...
- flume系列之:hdfs.timeZone设置中国北京或上海时区
flume系列之:hdfs.timeZone设置中国北京或上海时区 查看服务器时间 flume设置中国时区 查看服务器时间 date 2021年 12月 23日 星期四 10:09:49 CST hd ...
- flume系列之:使用通配符批量消费kafka的Topic
flume系列之:使用通配符批量消费kafka的Topic #指定kafka topic使用注释的这个 #kafka_topics: "optics-production-data" ...
- 【Flume】Flume入门
Flume 简介 Flume 作为 cloudera 开发的实时日志收集系统,受到了业界的认可与广泛应用.Flume 初始的发行版本目前被统称为 Flume OG(original generatio ...
- 日志采集框架Flume以及Flume的安装部署(一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统)...
Flume支持众多的source和sink类型,详细手册可参考官方文档,更多source和sink组件 http://flume.apache.org/FlumeUserGuide.html Flum ...
最新文章
- python多态的例子_Python编程之多态用法实例详解
- android碎片化的解决方法,解决 Android 设备碎片化--屏幕适配
- 企业开发中,git提交时屏蔽某些文件,怎么搞!【idea的处理方法】
- mysql left join 索引失效_MySQL索引列上做操作导致索引失效案例分析
- 【Spring Boot】Spring Boot之整合RabbitMQ并实现消息的发送和接收
- Atitit 如何设置与安放知识的trap陷阱 知识聚合 rss url聚合工具 以及与trap的对比
- WinForm窗体美化
- 以太网转串口代码C语言,基于STM32 串口转以太网收发数据(stm32移植代码+网络调试助手等)...
- 【2020.10.27 牛客 普及组 模拟赛5】T4 飞行棋
- 51单片机蜂鸣器演奏《我和我的祖国》
- itunes备份电脑C盘内存不够怎么办?
- wegame系统推荐头像_热点微信国旗头像刷屏,怎么回事?(附国旗获取方式)
- HALCON联合C#检测表面缺陷——检测缺陷原理(三)
- 推荐系统用户反馈延迟新解法!阿里提出CVR无偏估计算法
- js随机飘动的广告图片代码demo效果示例(整理)
- linux磁盘检测和修复
- [渝粤教育] 西南科技大学 建筑CAD 在线考试复习资料
- 骁龙768g和765g的差距大不大
- 查询注册表的命令行工具reg
- 【React Native】集成声网Agora语音通讯
热门文章
- Automatic Exposure Correction of Consumer Photographs 分析
- 对于为何地球上没有与人类媲美的存在的疑惑引发思考
- 数据挖掘学习小组简介!
- weex android 简书,weex打包Android和IOS
- POJ 动态规划题目列表
- postgresql的配置文件中找不到postgresql.conf和pg_hba.conf
- Effective Java 枚举和注解 第33条:用 EnumMap 代替序数索引
- 02 Hadoop概述
- 进入著名外企并非没有限制
- 100%与100vh/100vw的区别,为什么有时候100%不生效