flume java 安装部署_[Hadoop] Flume安装部署与简单使用
1. Flume
Flume是一个分布式的日志收集框架,针对日志数据进行采集汇总,把日志从A地方搬运到B地方去。
使用场景:
RDBMS ==> Sqoop ==> Hadoop
分散在各个服务器上的日志 ==> Flume ==> Hadoop
Flume三大组件:
collecting 采集 source
aggregating 聚合 channel (找个地方把采集过来的数据暂存下)
moving 移动 sink
Flume开发: 编写配置文件,组合source、channel、sink三者之间的关系。而Agent就是由source、channel、sink组成,所以编写flume的配置文件其实就是配置agent的构成。
2. Flume 安装部署
根据官方文档描述,市面上的Flume主流版本有两个:0.9.x and 1.x。这两个版本差异非常非常大,旧版本已经被淘汰了,要用的话就使用新版本。当然本文中既定版本为cdh的ng版本(flume-ng-1.6.0-cdh5.7.0)。
系统要求:
下载安装:
# 下载解压
[hadoop@hadoop01 software]$ wget http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.6.0-cdh5.7.0.tar.gz
[hadoop@hadoop01 software]$ tar -zxvf flume-ng-1.6.0-cdh5.7.0.tar.gz -C ~/app/
# 配置环境变量(使用root用户)
[root@hadoop01 ~]# vi /etc/profile
export FLUME_HOME=/home/hadoop/app/apache-flume-1.6.0-cdh5.7.0-bin
export PATH=$FLUME_HOME/bin:$PATH
[root@hadoop01 ~]# source /etc/profile
# 修改配置文件
[hadoop@hadoop01 software]$ cd ~/app/apache-flume-1.6.0-cdh5.7.0-bin/
[hadoop@hadoop01 apache-flume-1.6.0-cdh5.7.0-bin]$ cp conf/flume-env.sh.template conf/flume-env.sh
[hadoop@hadoop01 apache-flume-1.6.0-cdh5.7.0-bin]$ vi conf/flume-env.sh
export JAVA_HOME=/usr/java/jdk1.8.0_45
# 验证安装完成
[hadoop@hadoop000 ~]$ flume-ng version
Flume 1.6.0-cdh5.7.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 8f5f5143ae30802fe79f9ab96f893e6c54a105d1
Compiled by jenkins on Wed Mar 23 11:38:48 PDT 2016
From source with checksum 50b533f0ffc32db9246405ac4431872e
[hadoop@hadoop000 ~]$
3. Flume 使用
概念解释
Flume的使用包含两个步骤:1-建立一个agent(写配置文件);2-启动agent。
(1) agent配置文件名词解释
a1:,agent的名称
r1:
k1:
c1:
(2) Flume支持的常用source、channel、sink有哪些
source
-- avro ==> 监听一个指定的Avro端口,通过Avro端口可以获取到Avro client发送过来的文件。只要应用程序通过Avro端口发送文件,source组件就可以获取到该文件中的内容(同thrift类似,都是一种RPC服务框架)
-- exec ==> 监听一个指定的命令,获取一条命令的结果作为它的数据源。常用的是tail -F file指令,即只要应用程序向日志(文件)里面写数据,source组件就可以获取到日志(文件)中最新的内容。
-- Spooling Directory ==> 监听一个指定的目录,只要应用程序向这个指定的目录中添加新的文件,source组件就可以获取到该信息,并解析该文件的内容,然后写入到channel。写入完成后,标记该文件已完成或者删除该文件。已完成的文件不能再更新,否则source组件监控不到;也不能向文件夹下放入文件名相同的文件,且文件夹下面不能有子文件夹。
-- Taildir ==> 相当于前面两个的整合,既可以监控文件也可以监控文件夹,生产上95%以上都是这个场景
-- netcat ==> 监听一个指定的网络端口,只要应用程序向这个端口里面写数据,这个source组件就可以获取到信息。
sink
-- hdfs
-- logger ==> 表示打印到控制台
-- avro ==> 配合avro source使用
-- kafka
channel
-- memory ==> 数据过来会存在内存的队列里面
-- file ==> 数据过来存在本地文件
配置agent,就是各种组合source、channel、sink之间的关系。source出来可以去多个channel,一个sink只能对应一个channel。
(3) 需求分析过程
-- 需求:把一个文件中新增的内容收集到HDFS上去
-- 分析:exec - memory - hdfs
-- 需求:一个文件夹
-- 分析:spooling -memory - hdfs
-- 需求:文件数据写入kafka
-- 分析:exec - memory - kafka
其中,ETL过程可以放在:exec - memory - hdfs ==> spark/hive/mr ETL ==> hdfs(新的位置) <== 分析。Flume的定位是日志采集,数据清洗的过程不由flume完成,数据丢失了无法处理。
应用实例
3.1 需求一:从指定的网络端口上采集日志到控制台输出
3.1.1 建立一个agent
# 新建一个agent配置文件,从官网拷贝示例代码到文件中来修改
[hadoop@hadoop000 flume]$ pwd
/home/hadoop/script/flume
[hadoop@hadoop000 flume]$ vi simple-flume.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0 # 改成本机IP地址
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
3.1.2 启动agent
# 启动Agent,注意这条命令下面带了三个-Dflume参数,第一个是指定日志级别,第二个和第三个是指定http监控形式和端口号,这些参数都可以不带
[hadoop@hadoop000 flume]$ flume-ng agent \
> --name a1 \
> --conf $FLUME_HOME/conf \
> --conf-file /home/hadoop/script/flume/simple-flume.conf \
> -Dflume.root.logger=INFO,console \
> -Dflume.monitoring.type=http \
> -Dflume.monitoring.port=34343
...
2018-07-29 15:33:49,578 (conf-file-poller-0) [INFO - org.mortbay.log.Slf4jLog.info(Slf4jLog.java:67)] Started SelectChannelConnector@0.0.0.0:34343
...
# 开启另外一个终端窗口,telnet 本机44444端口,向该端口发送一些数据
[root@hadoop000 ~]# telnet localhost 44444
Trying ::1...
Connected to localhost.
Escape character is '^]'.
hello
OK
flume
OK
# 此时回到flume agent的启动窗口,可以看到44444端口发送的数据已经回显到控制台上,成功被监听到
2018-07-29 15:42:03,785 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 68 65 6C 6C 6F 0D hello. }
2018-07-29 15:42:03,791 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 66 6C 75 6D 65 0D flume. }
# 其中,返回日志对象为Event:一个Event代表一条数据。格式为:Event: { headers + body(字节数组) }
至此,flume的一个简单实例完成,它实现了从指定的网络端口(44444)上采集日志到控制台(logger)输出的整个过程。
3.2 需求二:采集指定文件的内容到HDFS。技术选型:exec - memory - hdfs
3.2.1 建立一个agent
[hadoop@hadoop000 flume]$ cat exec-memory-hdfs.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/data/data.log
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.1.8:9000/tmp/flume
a1.sinks.k1.hdfs.batchSize = 10 # 集满多少条数据就刷新到hdfs上去,默认是100
a1.sinks.k1.hdfs.fileType = DataStream # 默认是SequenceFile,如果是压缩的要设置CodeC
a1.sinks.k1.hdfs.writeFormat = Text
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
3.2.2 启动agent
[hadoop@hadoop000 flume]$ flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/exec-memory-hdfs.conf \
-Dflume.root.logger=INFO,console \
-Dflume.monitoring.type=http \
-Dflume.monitoring.port=34343
# 另起一个窗口,在/home/hadoop/data/下新建一个data.log文件,并且向里面写入1行数据
[hadoop@hadoop000 data]$ touch data.log
[hadoop@hadoop000 data]$ echo flume >> data.log
# 查看flume agent窗口日志,flume在hdfs上create了一个临时文件,是以.tmp结尾的
2018-07-31 21:10:15,338 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.HDFSDataStream.configure(HDFSDataStream.java:58)] Serializer = TEXT, UseRawLocalFileSystem = false
2018-07-31 21:10:15,695 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:234)] Creating hdfs://192.168.1.8:9000/tmp/flume/FlumeData.1533042615339.tmp
# 继续向data.log里面写入数据,当data.log日志行数<10时,flume agent没有新的日志输出,临时文件名也没有发生变化。
# 当第10行数据写入时,agent日志有新的输出:flume刚才创建的.tmp文件关闭,并且把它rename成了一个正式文件,不带.tmp后缀
2018-07-31 20:36:45,296 (hdfs-k1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:363)] Closing hdfs://192.168.1.8:9000/tmp/flume/FlumeData.1533040575186.tmp
2018-07-31 20:36:45,311 (hdfs-k1-call-runner-4) [INFO - org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:629)] Renaming hdfs://192.168.1.8:9000/tmp/flume/FlumeData.1533040575186.tmp to hdfs://192.168.1.8:9000/tmp/flume/FlumeData.1533040575186
# 继续向data.log里面写入数据,fulme又在hdfs上面新建了一个临时文件,.tmp结尾。再次重复前面的过程。data.log写满第2个10条数据时,再次关闭文件,重命名成正式文件。这是因为,我们在agent配置里面定义了a1.sinks.k1.hdfs.batchSize = 10,每10条数据一个单位,满10条就转存一次。
# 查看hdfs
[hadoop@hadoop000 data]$ hdfs dfs -ls /tmp/flume
Found 2 items
-rw-r--r-- 1 hadoop supergroup 6 2018-07-31 20:40 /tmp/flume/FlumeData.1533040803755
-rw-r--r-- 1 hadoop supergroup 4 2018-07-31 20:45 /tmp/flume/FlumeData.1533041152328.tmp
生产中,我们绝对不会把hdfs.batchSize设为10,因为hdfs最怕小文件。应该根据实际情况来定。
3.3 需求三:采集指定文件夹的内容到控制台。技术选型:spooling - memory - logger
3.3.1 建立一个agent
[hadoop@hadoop000 flume]$ cat spooling-memory-logger.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/hadoop/tmp/flume
a1.sources.r1.fileHeader = true
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
[hadoop@hadoop000 flume]$
3.3.2 启动agent
# 启动agent
[hadoop@hadoop000 flume]$ flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/spooling-memory-logger.conf \
-Dflume.root.logger=INFO,console \
-Dflume.monitoring.type=http \
-Dflume.monitoring.port=34343
# 另起一个窗口,向/home/hadoop/tmp/flume/放一个文件进来
[hadoop@hadoop000 flume]$ pwd
/home/hadoop/tmp/flume
[hadoop@hadoop000 flume]$ cp ~/data/emp.txt .
# agent窗口日志
2018-07-31 21:44:33,180 (pool-4-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:258)] Last read took us just up to a file boundary. Rolling to the next file, if there is one.
2018-07-31 21:44:33,181 (pool-4-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:348)] Preparing to move file /home/hadoop/tmp/flume/emp.txt to /home/hadoop/tmp/flume/emp.txt.COMPLETED
2018-07-31 21:44:35,781 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{file=/home/hadoop/tmp/flume/emp.txt} body: 37 33 36 39 09 53 4D 49 54 48 09 43 4C 45 52 4B 7369.SMITH.CLERK }
2018-07-31 21:44:35,781 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{file=/home/hadoop/tmp/flume/emp.txt} body: 37 34 39 39 09 41 4C 4C 45 4E 09 53 41 4C 45 53 7499.ALLEN.SALES }
2018-07-31 21:44:35,781 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{file=/home/hadoop/tmp/flume/emp.txt} body: 37 35 32 31 09 57 41 52 44 09 53 41 4C 45 53 4D 7521.WARD.SALESM }
2018-07-31 21:44:35,781 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{file=/home/hadoop/tmp/flume/emp.txt} body: 37 35 36 36 09 4A 4F 4E 45 53 09 4D 41 4E 41 47 7566.JONES.MANAG }
# 当监听路径(/home/hadoop/tmp/flume)下刚刚拷贝进去的文件被处理完成时,flume会把刚丢进去的文件重命名,在文件名后面加一个.COMPLETED的后缀
[hadoop@hadoop000 flume]$ ls -ltr
total 4
-rw-r--r--. 1 hadoop hadoop 700 Jul 31 21:44 emp.txt.COMPLETED
[hadoop@hadoop000 flume]$
注意:不能向该文件夹下再次丢入文件名相同的文件,即要保证这个文件的原子性!而且已经完成的文件不能再做修改,修改了flume是识别不到的。
3.4 需求四:同时监控文件和文件夹。技术选型:taildir - memory - logger
生产上95%以上都是这个场景。
3.4.1 建立一个agent
[hadoop@hadoop000 flume]$ cat taildir-memory-logger.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /home/hadoop/tmp/flume/position/taildir_position
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /home/hadoop/tmp/flume/test1/example.log
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.filegroups.f2 = /home/hadoop/tmp/flume/test2/.*log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2
a1.sources.r1.fileHeader = true
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
3.4.2 启动agent
# 启动agent
./flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/taildir-memory-logger.conf \
-Dflume.root.logger=INFO,console \
-Dflume.monitoring.type=http \
-Dflume.monitoring.port=34343
# 向test1文件夹下的example.log文件里写数据
[hadoop@hadoop000 flume]$ pwd
/home/hadoop/tmp/flume
[hadoop@hadoop000 flume]$ cd test1
[hadoop@hadoop000 test1]$ echo aaa >> example.log
[hadoop@hadoop000 test1]$ echo aaa >> example.log
[hadoop@hadoop000 test1]$ echo aaa >> example.log
[hadoop@hadoop000 test1]$ echo aaa >> example.log
[hadoop@hadoop000 test1]$ echo aaa >> example.log
[hadoop@hadoop000 test1]$
# agent这边已经有收到写入的数据
2018-07-31 22:09:52,578 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{headerKey1=value1} body: 61 61 61 aaa }
2018-07-31 22:10:07,580 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{headerKey1=value1} body: 61 61 61 aaa }
2018-07-31 22:10:07,581 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{headerKey1=value1} body: 61 61 61 aaa }
2018-07-31 22:10:07,581 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{headerKey1=value1} body: 61 61 61 aaa }
# 这时,test1下面的complete.log文件不会被重命名成完成状态,它一直还在被监听。flume记录了该文件的一个偏移量到taildir_position这个文件中。如果文件被追加新的内容,新的内容也会被flume监听到。
[hadoop@hadoop000 test1]$ ls -ltr
total 4
-rw-rw-r--. 1 hadoop hadoop 20 Jul 31 22:10 example.log
[hadoop@hadoop000 test1]$ cat ../position/taildir_position
[{"inode":91165116,"pos":20,"file":"/home/hadoop/tmp/flume/test1/example.log"}]
# 下面再来测试监控文件夹test2,向test2文件夹内拷入一些.log结尾的文件
[hadoop@hadoop000 test1]$ cd ../
[hadoop@hadoop000 flume]$ cd test2
[hadoop@hadoop000 test2]$ cp ../test1/example.log 1.log
[hadoop@hadoop000 test2]$ cp ../test1/example.log 2.log
[hadoop@hadoop000 test2]$ cp ../test1/example.log 3.log
[hadoop@hadoop000 test2]$
# agent控制台也收到了所有拷贝进去的文件的所有行的数据
2018-07-31 22:17:12,018 (PollableSourceRunner-TaildirSource-r1) [INFO - org.apache.flume.source.taildir.ReliableTaildirEventReader.openFile(ReliableTaildirEventReader.java:300)] Opening file: /home/hadoop/tmp/flume/test2/1.log, inode: 145875746, pos: 0
2018-07-31 22:17:15,022 (PollableSourceRunner-TaildirSource-r1) [INFO - org.apache.flume.source.taildir.ReliableTaildirEventReader.openFile(ReliableTaildirEventReader.java:300)] Opening file: /home/hadoop/tmp/flume/test2/2.log, inode: 145875749, pos: 0
2018-07-31 22:17:15,646 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{headerKey1=value2, headerKey2=value2-2} body: 61 61 61 aaa }
2018-07-31 22:17:15,646 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{headerKey1=value2, headerKey2=value2-2} body: 61 61 61 aaa }
# 再次查看偏移量,此时文件里面新增了刚放进去test2路径下的所有文件的偏移量。当拷贝进去的文件被更新,flume同样可以监听到,且偏移量会再次被刷新。
[hadoop@hadoop000 test2]$ cat ../position/taildir_position
[{"inode":91165116,"pos":20,"file":"/home/hadoop/tmp/flume/test1/example.log"},{"inode":145875746,"pos":20,"file":"/home/hadoop/tmp/flume/test2/1.log"},{"inode":145875749,"pos":20,"file":"/home/hadoop/tmp/flume/test2/2.log"},{"inode":145875750,"pos":20,"file":"/home/hadoop/tmp/flume/test2/3.log"},{"inode":145875751,"pos":20,"file":"/home/hadoop/tmp/flume/test2/4.log"}]
[hadoop@hadoop000 test2]$
注意:如果使用的是apache 1.6版本的flume,是没有偏移量这个功能的。
问题:
问题1. Flume如何解决小文件的问题?
hdfs.rollInterval、hdfs.rollSize和hdfs.rollCount这三个参数配合使用来做小文件滚动控制:
hdfs.rollSize应设置为差不多一个block大小(128M)
然后把hdfs.rollInterval和hdfs.rollCount设置成业务需求可以接受值(比如说hdfs.rollInterval 10分钟,hdfs.rollCount 1000000条),这样就可以避免小文件的产生。
其中:
hdfs.rollInterval:默认值30,每30秒滚动成目标文件。设置成0表示不根据时间来滚动文件。
hdfs.rollSize:默认值1024,当临时文件达到1024 bytes时,滚动成目标文件。设置成0表示不根据临时文件大小来滚动文件。
hdfs.rollCount:默认值10,当event数量达到10条时,将临时文件滚动成目标文件,设置成0表示不根据event数量来滚动文件。
hdfs.idleTimeout:默认值0。当超时时间达到时,非活动文件自动关闭。设置成0表示不自动关闭。
hdfs.batchSize:默认值100,当event数量达到100条时,将文件刷新到hdfs。
# 基于上面的示例二来做改进:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/data/data.log
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.1.8:9000/tmp/flume
a1.sinks.k1.hdfs.batchSize = 10
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.rollSize = 10240
a1.sinks.k1.hdfs.rollCount = 0
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
这样就可以控制写入文件的自动滚动成目标文件,避免了太多小文件产生。
问题2. 如何保证flume传输数据100%不丢失?
每个文件做md5 放到redis 里面,传输完成后检验完整性,校验完成后删除redis里面的数据。
3.5 需求五:采集指定文件的内容到HDFS,并且分区存储
技术选型:exec - memory - hdfs
3.5.1 建立一个agent
[hadoop@hadoop01 flume]$ vi exec-memory-hdfs-partition.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/data/data.log
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.1.8:9000/tmp/flume/page_views/%Y%m%d%H%M
a1.sinks.k1.hdfs.batchSize = 10
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.rollSize = 10485760 # 10M
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.firePrefix = page-views
a1.sinks.k1.hdfs.round = true # 用到了时间戳,所以这里要设置按照时间戳来滚动
a1.sinks.k1.hdfs.roundValue = 1 # 1分钟滚动,即1分钟形成一个文件夹
a1.sinks.k1.hdfs.roundUnit = minute
# a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 10000
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
3.5.1 启动agent
flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/exec-memory-hdfs-partition.conf \
-Dflume.root.logger=INFO,console \
-Dflume.monitoring.type=http \
-Dflume.monitoring.port=34343
最终hdfs存储的结果格式为:
/tmp/flume/page_views/201808041523
/tmp/flume/page_views/201808041524
/tmp/flume/page_views/201808041525
**注意:**
1. 不论是学习还是工作中应用,最好是选择cdh等集成版本,因为这种集成版本已经处理好了各个框架之间的兼容性等复杂问题,使用相同cdh版本的框架一起配合使用的时候(比如flume搭配spark steaming),出现问题的几率会小很多;
2. flume每个版本之间的差异非常大,查文档时一定要查询指定版本的文档;
3. 当生产环境需要用到更高版本的flume的某种特性时,最方便的处理方式是去高版本flume源码里面把对应新特性的那一部分源码抠下来贴到自己当前版本的源码中来编译。
flume java 安装部署_[Hadoop] Flume安装部署与简单使用相关推荐
- openshift 部署_在OpenShift上部署Java EE微服务
openshift 部署 我昨天用WildFly Swarm在博客上发布了有关简单JAX-RS微服务的博客. 您学习了如何使用Maven构建所谓的"胖子",还使用Maven Doc ...
- python安装方法_听说你安装Python包很慢,试试这个方法
使用Python有快五年了,最近这一年多,经常听到大家说在安装python第三方包很慢很慢.比如这速度,每秒十几kb而网络正常的情况下,pip下载的速度至少应该每秒几百kb才对,甚至还有可能更快,比如 ...
- directplay需要安装吗_燃气报警器需要安装吗这里告诉你
一:燃气报警器的作用 燃气报警器的核心是气体传感器,俗称"电子鼻".这是一个独特的电阻,当"闻"到燃气时,传感器电阻随燃气浓度而变化.燃气达到一定浓度.电阻达到 ...
- docker选择安装位置_监控摄像机的安装位置选择和焦距选择
监控探头的清晰度,越来越高,但是如果安装位置和焦距选择不恰当,那最终的监控图像的效果往往不尽人意.本节文章,介绍一下如果避开不恰当的安装位置.如何选择合适的焦距. 安装位置选择 1:避免背光和过曝 安 ...
- 3不能安装库_不锈钢水槽如何安装?3个细节要注意,不能忽视,别被套路了
不锈钢水槽如何安装?3个细节要注意,不能忽视,别被套路了 随着时代不断的进步.人们生活水平也是不断的提高.生活用品有时间来选,慢慢的步入了我们的日常中.你们觉得家中最重要的地方是什么呢?其实就是厨房, ...
- 将ubuntu光盘作为安装源_从光盘安装ubuntu
我本人也是照着PCPOP上的这个教程安装的ubuntu,这个教程很详细,就直接拿过来贴了跟大家分享分享.这篇文章的作者还写了怎么样从硬盘安装...... 刻录镜像完成后,将你的电脑设为光盘启动并放进U ...
- kali安装步骤失败 选择并安装软件_手机软件安装失败?吉米柚教你几招!
不少使用安卓手机的朋友,经常遇到安装软件时出现安装失败的情况,尤其是手机用久后可能会出现这一问题,千万不要以为是手机坏掉了....吉米柚帮你整理了几种方法解决这个问题~ 随着手机系统的版本更新,软件商 ...
- Acer 4750 安装黑苹果_黑苹果全套安装教程!
制作 | 整理 | 排版 | 测试 | @阿雷 未经许可,谢绝转载 每天分享的资源尽量满足大部分人的需求 大家好,我是阿雷,最近给我的好几台电脑都安装上了黑苹果!黑苹果就是在普通的windows上安装 ...
- nodejs安装不好_【nodejs安装错误2503】nodejs安装2503_nodejs 2503-系统城
2017-12-27 14:50:11 浏览量:959 最近,一位windows10系统用户反馈自己在电脑中安装nodejs时,突然遇到提示错误代码2503,因为错误提示是英文界面所有不知道到底是那里 ...
最新文章
- java实现随机验证码的图片
- python算法与数据结构-二叉树的代码实现(46)
- logback.xml配置详解
- 只需单插槽的空间,即可拥有极致的视觉计算性能
- js中用script 嵌套script块
- linux下图像分析程序,三 - Linux+C语言:数字图像处理源程序_Linux编程_Linux公社-Linux系统门户网站...
- tidyr | 对数据框分行或分列进行嵌套操作
- kaggle数据挖掘竞赛Home Credit Default Risk讲解
- 右键新建Excel时如何设定其版本即.xlsx转.xls格式
- ubuntu proxy
- DXUT框架剖析(8)
- 鸿蒙os2.0电脑版,鸿蒙系统2.0PC版
- jmeter安装配置教程
- SQLite实现在线电子词典
- 远区场matlab仿真,matlab结题报告(电偶极子的辐射场)博客_0.doc
- pytorch实现自己制作训练集和测试集
- CC2430 CC2530 AD转换分辨率之“争”
- ALS推荐算法学习总结
- linux安装多路径软件,IBM服务器多路径软件RDAC安装详解
- 案例分析|戴森如何以DTC全渠道营销打造百亿规模增长
热门文章
- React Native 开发豆瓣评分(五)屏幕适配方案
- AttributeError: 'NoneType' object has no attribute 'append'
- Struts2之类型转换器
- 在你的网站中使用 AdSense广告
- WIN8系统中 任务管理器 性能栏 显示CPU利用率(已暂停)怎么回事?
- php访问方法外变量
- [凯立德]2013.12.17凯立德发布秋季版(2F21J0E)最新增量包SP1
- C:\WINDOWS\system32\drivers\etc\hosts
- [转载] Python - filter()用法
- [转载] python while循环 打印菱形