flume八种采集方案

案例1)avro+memory+logger

logger通常用于测试,数据流中的event最终显示在屏幕上
1)采集方案的配置
[root@hadoop01 ~]# mkdir flumeconf
[root@hadoop01 ~]# vim ./flumeconf/avro-mem-logger.properties
#定义三大组件的名称  和关联
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
​
#定义Source的相关属性
a1.sources.r1.type = avro
#绑定本机的ip或者是hostname
a1.sources.r1.bind = hadoop01
#要监听的本机上的某一个端口号,  当程序启动时,该端口号就会被使用
a1.sources.r1.port = 10086
​
#定义channel的相关属性
a1.channels.c1.type = memory
#内存存储容量 event的最大数量
a1.channels.c1.capacity=1000
#从内存中出来时,一次性提交的event的数量
a1.channels.c1.transactionCapacity=100
​
#定义Sink的相关属性
a1.sinks.k1.type=logger
a1.sinks.k1.maxBytesToLog = 16
2)启动方案
flume-ng agent -c /usr/local/flume/conf -f ./flumeconf/avro-mem-logger.properties -n a1 -Dflume.root.logger=INFO,console
3)测试:因为用的是avro的source,那么必须使用avro-client进行测试
[root@hadoop01 ~]# mkdir flumedata
[root@hadoop01 ~]# echo "hellworld" > flumedata/data.txt
[root@hadoop01 ~]# flume-ng avro-client -c /usr/local/flume/conf/ -H hadoop01 -p 10086 -F ./flumedata/data.txt

案例2)exec+memory+logger

注意:使用exec源,监听的文件,要提前创建
1)采集方案的编写
[root@hadoop01 ~]# vim ./flumeconf/exec-mem-logger.properties
#定义三大组件的名称  和关联
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
​
#定义Source的相关属性
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F ./flumedata/data.txt
​
#定义channel的相关属性
a1.channels.c1.type = memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
​
#定义Sink的相关属性
a1.sinks.k1.type=logger
a1.sinks.k1.maxBytesToLog = 16
2)启动采集方案
flume-ng agent -f ./flumeconf/exec-mem-logger.properties -n a1 -Dflume.root.logger=INFO,console
3)测试
echo "helloworld" >> ./flumedata/data.txt

案例3)exec+memory+hdfs

1)采集方案的编写
[root@hadoop01 ~]# vim ./flumeconf/exec-mem-hdfs.conf
#定义三大组件的名称  和关联
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
​
#定义Source的相关属性
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F ./flumedata/data.txt
​
#定义channel的相关属性
a1.channels.c1.type = memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
​
#定义Sink的相关属性
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path = /flume/%Y-%m-%d/%H-%M
a1.sinks.k1.hdfs.filePrefix = wcm
a1.sinks.k1.hdfs.fileSuffix = .wsy
#下面三个条件满足其一,就会产生新文件
#新文件产生的时间周期,单位是秒,   如果设置为0表示不会产生新文件。
a1.sinks.k1.hdfs.rollInterval = 60
#当前文件达到1000字节,就会产生新文件
a1.sinks.k1.hdfs.rollSize = 1000
#当前文件的event数量达到10条,就会产生新文件
a1.sinks.k1.hdfs.rollCount = 10
#如果writeFormat指定了Text,那么fileType必须是DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.fileType = DataStream
​
#round的作用,用于指定是否滚动文件夹  false 表示不滚动文件夹
a1.sinks.k1.hdfs.round = true
#设置文件夹滚动的时间单位
a1.sinks.k1.hdfs.roundUnit = minute
#设置文件夹固定的时间数字大小
a1.sinks.k1.hdfs.roundValue = 2
#如果目录上设置了时间格式字符串,比如%Y等,那么下面的属性应该设置为true,除非event的head里有一个叫timestamp的消息头
a1.sinks.k1.hdfs.useLocalTimeStamp = true
2)启动方案
flume-ng agent -f ./flumeconf/exec-mem-hdfs.conf -n a1 -Dflume.root.logger=INFO,console
3)测试
[root@hadoop01 ~]# echo "aaa " >> flumedata/data.txt
[root@hadoop01 ~]# echo "aaa " >> flumedata/data.txt
[root@hadoop01 ~]# echo "aaa " >> flumedata/data.txt
[root@hadoop01 ~]# echo "aaa " >> flumedata/data.txt
[root@hadoop01 ~]# echo "aaa " >> flumedata/data.txt
[root@hadoop01 ~]# echo "aaa " >> flumedata/data.txt

案例4)spool+memory+logger

spool源,是用来监听目录下的新文件的,并通过更名的方式来决定该文件已经采集完。注意,监听的目录必须提前存在。是一个可靠源
​
exec源不可靠
1)采集方案的编写
[root@hadoop01 ~]# vim flumeconf/spool-mem-logger.properties
#列出每个组件的名称
a1.sources=r1
a1.channels=c1
a1.sinks=s1
a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1
​
#设置source组件的属性
a1.sources.r1.type=spooldir
#要监听的目录必须提前创建
a1.sources.r1.spoolDir=/root/data/subdir
a1.sources.r1.fileSuffix=.gyy
a1.sources.r1.deletePolicy=never
a1.sources.r1.fileHeader=false
a1.sources.r1.fileHeaderKey=file
a1.sources.r1.basenameHeader=false
a1.sources.r1.basenameHeaderKey=basename
a1.sources.r1.batchSize=100
a1.sources.r1.inputCharset=UTF-8
a1.sources.r1.bufferMaxLines=1000
​
#设置channel组件的属性
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
​
#设置sink组件的属性
a1.sinks.s1.type=logger
a1.sinks.s1.maxBytesToLog=16
2)启动方案
flume-ng agent -f ./flumeconf/spool-mem-logger.properties -n a1 -Dflume.root.logger=INFO,console
3)测试
[root@hadoop01 ~]# echo "helloworld" >>data/subdir/a.txt
[root@hadoop01 ~]# cd data/subdir/
[root@hadoop01 subdir]# ll
总用量 4
-rw-r--r-- 1 root root 11 12月 23 16:25 a.txt.gyy
[root@hadoop01 subdir]# echo "helloworld" >>b.txt
[root@hadoop01 subdir]# echo "helloworld" >>c.txt
[root@hadoop01 subdir]# echo "helloworld" >>d.txt
[root@hadoop01 subdir]# ll
总用量 16
-rw-r--r-- 1 root root 11 12月 23 16:25 a.txt.gyy
-rw-r--r-- 1 root root 11 12月 23 16:25 b.txt.gyy
-rw-r--r-- 1 root root 11 12月 23 16:25 c.txt.gyy
-rw-r--r-- 1 root root 11 12月 23 16:25 d.txt.gyy
注意:因为每次监听都会更名,因此再次监听的文件名不能与之前的名字重复。

案例5)spool+file+hdfs

1)方案的编写
[root@hadoop01 ~]# vim flumeconf/spool-file-hdfs.properties
#命名,并关联
a1.sources=r1
a1.channels=c1
a1.sinks=s1
a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1
​
#设置spool源
a1.sources.r1.type=spooldir
a1.sources.r1.spoolDir=/root/data/subdir
a1.sources.r1.fileSuffix=.gyy
a1.sources.r1.deletePolicy=never
a1.sources.r1.fileHeader=false
a1.sources.r1.fileHeaderKey=file
a1.sources.r1.basenameHeader=false
a1.sources.r1.basenameHeaderKey=basename
a1.sources.r1.batchSize=100
a1.sources.r1.inputCharset=UTF-8
a1.sources.r1.bufferMaxLines=1000
​
#设置file的channel
a1.channels.c1.type=file
​
#设置hdfs的sink
a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://hadoop01:8020/flume/hdfs/%Y
a1.sinks.s1.hdfs.useLocalTimeStamp=true
a1.sinks.s1.hdfs.filePrefix=michael
a1.sinks.s1.hdfs.fileSuffix=.gyy
a1.sinks.s1.hdfs.rollInterval=60
a1.sinks.s1.hdfs.rollSize=1024
a1.sinks.s1.hdfs.rollCount=10
a1.sinks.s1.hdfs.batchSize=100
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.round=false
a1.sinks.s1.hdfs.roundValue=2
a1.sinks.s1.hdfs.roundUnit=minute
2)启动采集方案
[root@hadoop01 ~]# flume-ng agent  -f ./flumeconf/spool-file-hdfs.properties -n a1  -Dflume.root.logger=INFO,console
3)测试
[root@hadoop01 subdir]# echo "helloworld" >>e.txt
[root@hadoop01 subdir]# echo "helloworld" >>f.txt
[root@hadoop01 subdir]# echo "helloworld" >>g.txt

案例6)http+memory+logger

1)采集方案的编写
[root@hadoop01 ~]# vim flumeconf/http-mem-logger.properties
#list name of three core
a1.sources=r1
a1.channels=c1
a1.sinks=s1
a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1
​
#设置每个组件的接口以及属性
a1.sources.r1.type=http
#该源要监听的host或者是ip
a1.sources.r1.bind=hadoop01
#该源要监听的port
a1.sources.r1.port=10086
a1.sources.r1.handler=org.apache.flume.source.http.JSONHandler
​
​
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
​
a1.sinks.s1.type=logger
a1.sinks.s1.maxBytesToLog=32
2)启动方案
[root@hadoop01 ~]# flume-ng agent  -f ./flumeconf/http-mem-logger.properties -n a1  -Dflume.root.logger=INFO,console
3)使用curl指令发送post协议进行测试
[root@hadoop03 ~]# curl -X POST -d '[{"headers":{"girlfriend1":"zhangjunning","girlfriend":"nazha"},"body":"they are my girlfriends"}]' http://hadoop01:10086
​
​
解析:
-X  用来指定http的请求方式,如post或者是get
-d  用来模拟要发送的数据
第三个参数表示要将数据发送到的地址。

案例7)syslogtcp+memory+logger

1)采集方案的编写
[root@hadoop01 ~]# vim flumeconf/syslogtcp-mem-logger.properties
#list name of three core
a1.sources=r1
a1.channels=c1
a1.sinks=s1
a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1
​
#设置每个组件的接口以及属性
a1.sources.r1.type=syslogtcp
a1.sources.r1.host=hadoop01
a1.sources.r1.port=10086
a1.sources.r1.eventSize=2500
​
​
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
​
a1.sinks.s1.type=logger
a1.sinks.s1.maxBytesToLog=32
2)启动方案
[root@hadoop01 ~]# flume-ng agent  -f ./flumeconf/syslogtcp-mem-logger.properties -n a1  -Dflume.root.logger=INFO,console
3)使用nc指令来发送tcp协议,进行测试
先安装nc指令:yum -y install nmap-ncat
[root@hadoop03 ~]#  echo "helloworld" | nc hadoop01 10086
nc的语法:   nc  host  port

案例8)taildir+memory+hdfs
taildir与spooling这两个源的比较

  • 相同点:

    1. 都是可靠源
    2. 监听的都是目录
    3. 该目录一定要提前创建
  • 不同点:
    1. spooling监听完的文件会被重命名
    2. spooling监听的目录里的文件不能重名
    3. spooling监听的是目录里的新文件
    4. taildir监听的文件不会被重命名,可以一直监听文件里的新行。
1)采集方案的编写
[root@hadoop01 ~]# vim flumeconf/taildir-mem-hdfs.properties
#命名,并关联
a1.sources=r1
a1.channels=c1
a1.sinks=s1
a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1
​
#设置spool源
a1.sources.r1.type=TAILDIR
#设置要监听的文件所属组,可以设置多个
a1.sources.r1.filegroups=g1 g2
#规定每一个组要监听的文件的绝对路径,可以使用正则表达式来表示一批文件
a1.sources.r1.filegroups.g1=/root/data/dir2/.*.txt
a1.sources.r1.filegroups.g2=/root/data/dir2/.*.csv
#a1.sources.r1.positionFile=/root/taildir_position.json
​
#设置channel组件的属性
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
​
#设置hdfs的sink
a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://hadoop01:8020/flume/hdfs/%Y-%m
a1.sinks.s1.hdfs.useLocalTimeStamp=true
a1.sinks.s1.hdfs.filePrefix=michael
a1.sinks.s1.hdfs.fileSuffix=.gyy
a1.sinks.s1.hdfs.rollInterval=60
a1.sinks.s1.hdfs.rollSize=1024
a1.sinks.s1.hdfs.rollCount=10
a1.sinks.s1.hdfs.batchSize=100
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.round=true
a1.sinks.s1.hdfs.roundValue=2
a1.sinks.s1.hdfs.roundUnit=minute
2)启动方案
[root@hadoop01 ~]# flume-ng agent  -f ./flumeconf/taildir-mem-hdfs.properties -n a1  -Dflume.root.logger=INFO,console
3)测试
目录要提前创建出来
[root@hadoop01 dir2]# echo "chenyun" > a.txt
[root@hadoop01 dir2]# echo "chenyun" >> a.txt
[root@hadoop01 dir2]# echo "chenyun" >> a.csv
[root@hadoop01 dir2]# echo "chenyun" >> a.csv
[root@hadoop01 dir2]# echo "chenyun" >> a.csv
[root@hadoop01 dir2]# echo "chenyun" >> a.csv
[root@hadoop01 dir2]# echo "chenyun" >> a.json   # 不会被采集的

Flume-----八种采集方案相关推荐

  1. 优化if-else代码的八种方案

    前言 代码中如果if-else比较多,阅读起来比较困难,维护起来也比较困难,很容易出bug,接下来,本文将介绍优化if-else代码的八种方案. 优化方案一:提前return,去除不必要的else 如 ...

  2. if else if else语句格式_if-else代码优化的八种方案

    作者:Jay_huaxiao来源:掘金 链接:https://juejin.im/post/5e5fa79de51d45271849e7bd 前言 代码中如果if-else比较多,阅读起来比较困难,维 ...

  3. Flume安装部署,采集方案配置文件编写案例,启动agent采集数据

    1.2 Flume实战案例 1.2.1 Flume的安装部署 1.Flume的安装非常简单,只需要解压即可,当然,前提是已有hadoop环境 上传安装包到数据源所在节点上 然后解压 tar -zxvf ...

  4. Python 实现定时任务的八种方案,定时发微信

    import datetime import timedef time_printer():now = datetime.datetime.now()ts = now.strftime("% ...

  5. if-else代码优化的八种方案

    目录 前言 优化方案一:提前return,去除不必要的else 优化方案二:使用条件三目运算符 优化方案三:使用枚举 优化方案四:合并条件表达式 优化方案五:使用 Optional 优化方案六:表驱动 ...

  6. android 加载动画素材,八种APP启动界面的Loading进度条设计动效方案

    在移动端APP应用中,从用户点击图标那一瞬间到用户进入主界面这段过程,同样也决定用户对该APP应用的第一印象,如何让用户产生好感并快速熟悉应用是这一阶段重点考虑的问题.这一过程是否给用户留下好的第一饮 ...

  7. 一种低成本的兰吉尔电表电量采集方案

    目前,全国工业企业的能耗在线监测项目建设正在大力推进中,其中电力企业(火电.水电.新能源电厂等)的电力计量表具很多采用了瑞士兰吉尔的智能电表,兰吉尔电表支持通信模式较多,光电通信口支持DLMS,有线通 ...

  8. 面试官:说出八种消息队列的应用场景。啊?八种?

    本文来源于公众号:胖滚猪学编程.转载请注明出处! 一个风度翩翩,穿着格子衬衣的中年男子,拿着一个满是划痕的mac向她走来,看着铮亮的头,胖滚猪心想,这肯定是尼玛顶级架构师吧!完了要挂了. 结果面试官第 ...

  9. java 数据类型转换的一场_Java基础 — 四类八种基本数据类型

    整型:整数类型int 一般的数据. long 极大的数据. short 用于特定的场合,比如底层的文件处理或者需要控制占用存储单元空间量的大数组. byte 用于特定的场合,比如底层的文件处理或者需要 ...

最新文章

  1. MVC+Ninject+三层架构+代码生成 -- 总结(四、數據層)
  2. TCP和UDP套接字编程
  3. Bumblebee微服务网关之请求统一验证
  4. 怎样打造一个分布式数据库
  5. 【HTML基础】表格和表单
  6. html5有本地存储吗,HTML5的本地存储
  7. MySQL中快速复制数据表方法汇总
  8. android 编译api,Android逆向利器,直接将apk转换为可二次开发Android工程,提供So hook Api,......
  9. 十天征服单片机百度云_郭天祥十天征服单片机视频下载地址
  10. Linux内核之——等待队列wait queue
  11. 建模语言UML在软件开发中的应用
  12. 夜光带你走进python开发 (十七)传奇语言
  13. Android免费小说阅读器—程序员自己的阅读器,没广告,所有小说可搜索,专注阅读体验
  14. 如何理解卷积(Convolution)?
  15. Call From xxx/127.0.1.1 to xxx:9000 failed on connection拒绝连接部分解决办法
  16. AUTOMATE THE BORING STUFF WITH PYTHON读书笔记 - 第8章:INPUT VALIDATION
  17. 关于 某讯QQ群的群文件上传和下载出现错误-134 的解决方法
  18. 如何将Dicom系列转换为一个Nifti文件(Python)
  19. Unity 变体探秘
  20. 付款条件(Payment Term)

热门文章

  1. 安卓学习笔记5.3 按钮、图像视图与图像按钮
  2. 哲理故事与管理之道 20 -用危机激励下属
  3. new(创建)一个对象时都发生了什么?
  4. Mybatis 源码分析(一)
  5. 刷题刷题(个人记录)
  6. 差商matlab编程,Matlab数值计算差商与插值
  7. 基础算法(三) --- 轮询
  8. matlab fir1 filter,Matlab滤波器设计
  9. SpringBoot Mybatis注解调用Mysql存储过程并接收多个OUT结果集(多个mode=IN和mode=OUT参数)
  10. php+点击图片跳转网页,怎么在图片上加超链接 点击图片跳转到指定网页