大数据之flume数据采集
Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。
它可以采集文件,socket数据包等各种形式源数据,又可以将采集到的数据输出到HDFS、hbase、hive、kafka等众多外部存储系统中。
一、flume结构
Flume分布式系统中最核心的角色是agent,每一个agent相当于一个数据传递员,内部有三个组件:
Source: 采集源,用于跟数据源对接,以获取数据;
Channel : angent内部的数据传输通道,用于从source将数据传递到sink。
Sink::下沉地,采集数据的传送目的,用于往下一级agent传递数据或者往最终存储系统传递数据;
数据在flume内部以Event的封装形式存在。
flume的事务控制机制:
1、source到channel
2、channel到sink
二、Flume多个agent串联
三、Flume安装使用(未安装)
1、上传安装包,解压
2、执行脚本,模拟日志生产
while true; do echo 111111111111111111111111_$RANDOM >> access.log; sleep 0.2; done
案例一、采集端口数据
1、增加netcat-logger.conf
# Name the components on this agent
#给那三个组件取个名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
#类型, 从网络端口接收数据,在本机启动, 所以localhost, type=spoolDir采集目录源,目录里有就采
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444# Describe the sink
a1.sinks.k1.type = logger# Use a channel which buffers events in memory
#下沉的时候是一批一批的, 下沉的时候是一个个eventChannel参数解释:
#capacity:默认该通道中最大的可以存储的event数量
#trasactionCapacity:每次最大可以从source中拿到或者送到sink中的event数量
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2、启动
$ bin/flume-ng agent --conf conf --conf-file conf/netcat-logger.conf --name a1 -Dflume.root.logger=INFO,console
3、传入数据:
$ telnet localhost 44444
案例二、采集文件夹数据
1、增加spooldir-hdfs.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
#监听目录,spoolDir指定目录, fileHeader要不要给文件夹前坠名
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/hadoop/flumespool
a1.sources.r1.fileHeader = true# Describe the sink
a1.sinks.k1.type = logger# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2、启动
bin/flume-ng agent -c ./conf -f ./conf/spool-logger.conf -n a1 -Dflume.root.logger=INFO,console
3、传入数据:
往/home/hadoop/flumeSpool放文件
案例三:采集文件数据(方式一)
exec source 适用于监控一个实时追加的文件,没有偏移量,会出现数据丢失情况;
1、增加tail-hdfs.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# exec 指的是命令
a1.sources.r1.channels = c1
a1.sources.r1.type = exec
# F根据文件名追中, f根据文件的nodeid追踪,即使换了文件名,也能跟踪到
a1.sources.r1.command = tail -F /home/hadoop/log/test.log#下沉目标
a1.sinks.k1.channel = c1
a1.sinks.k1.type = hdfs
# 指定目录, flum帮做目的替换
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/
#文件的命名, 前缀
a1.sinks.k1.hdfs.filePrefix = events-#10 分钟就改目录
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute#文件滚动之前的等待时间(秒)
a1.sinks.k1.hdfs.rollInterval = 120
#文件滚动的大小限制(bytes)
a1.sinks.k1.hdfs.rollSize = 268435456
#写入多少个event数据后滚动文件(事件个数)
a1.sinks.k1.hdfs.rollCount = 20#1000个事件就往里面写入
a1.sinks.k1.hdfs.batchSize = 1000#用本地时间格式化目录
a1.sinks.k1.hdfs.useLocalTimeStamp = true#下沉后, 生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
a1.sinks.k1.hdfs.fileType = DataStream# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000
2、启动命令
bin/flume-ng agent -c conf -f conf/tail-hdfs.conf -n a1
案例四:采集文件数据(方式二)
taildir Source 既能够实现断点续传,又可以保证数据不丢失,还能够进行实时监控。为了防止00:00的时候,今日的数据写到明日,在sink处增加拦截器,给数据一个时间戳,不使用节点机器上时间。
tail dir 是根据通配符监视多个文件,即使文件改了名,也不会重复采集,它是根据偏移量进行跟踪的;
1、增加tail-hdfs.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1a1.sources.r1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups=g1
a1.sources.r1.filegroups.g1= /logdata/a.*
a1.sources.r1.fileHeader = true# 加入拦截器
a1.sources.s1.interceptors = i1
a1.sources.s1.interceptors.i1.type = timestamp
a1.sources.s1.interceptors.i1.headerName= timestamp#下沉目标
a1.sinks.k1.channel = c1
a1.sinks.k1.type = hdfs
# 指定目录, flum帮做目的替换
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/
#文件的命名, 前缀
a1.sinks.k1.hdfs.filePrefix = events-#10 分钟就改目录
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute#文件滚动之前的等待时间(秒)
a1.sinks.k1.hdfs.rollInterval = 120
#文件滚动的大小限制(bytes)
a1.sinks.k1.hdfs.rollSize = 268435456
#写入多少个event数据后滚动文件(事件个数)
a1.sinks.k1.hdfs.rollCount = 20#1000个事件就往里面写入
a1.sinks.k1.hdfs.batchSize = 1000#用本地时间格式化目录
a1.sinks.k1.hdfs.useLocalTimeStamp = false#下沉后, 生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
a1.sinks.k1.hdfs.fileType = DataStream# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000
2、启动命令
bin/flume-ng agent -c conf -f conf/tail-hdfs.conf -n a1
案例五、flume级联操作
使用前景:复杂的网络或者日志服务器特别多,每台服务器流量不多,需要进行汇集;
需要写两个配置文件,分别放在两个机器上,一个当发送者,一个当收集者(Kafka为例)
1、编写tail-avro-.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1a1.sources.r1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = g1
a1.sources.r1.filegroups.g1 = /logdata/a.*
a1.sources.r1.fileHeader = falsea1.channels.c1.type = filea1.sinks.k1.channel = c1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = doitedu02
a1.sinks.k1.port = 4444
2、编写avro-fakfa.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1a1.sources.r1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = doitedu02
a1.sources.r1.port = 4444
a1.sources.r1.batchSize = 100a1.channels.c1.type = filea1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = doitedu01:9092,doitedu02:9092,doitedu03:9092
a1.sinks.k1.kafka.topic = doitedu17
a1.sinks.k1.kafka.producer.acks = 1
2、先启动avro-fakfa.conf,再启动tail-avro-.conf
bin/flume-ng agent -c conf -f conf/avro-fakfa.conf -n al -Dflume.root.logger=INFO,consolebin/flume-ng agent -c conf -f conf/tail-avro-.conf -n a1
3、kafka基本命令
## topic查看
bin/kafka-topics.sh --list --zookeeper doitedu01:2181## topic创建
bin/kafka-topics.sh --create --topic topic2 --partitions 2 --replication-factor 2 --zookeeper doitedu01:2181## 启动一个控制台生产者来生产数据
bin/kafka-console-producer.sh --broker-list doitedu01:9092,doitedu02:9092,doitedu03:9092 --topic topic2
>hello tom## 启动一个控制台消费者来消费数据
bin/kafka-console-consumer.sh --bootstrap-server doitedu01:9092,doitedu02:9092,doitedu03:9092 --topic topic2 --from-beginning
案例六:flume选择器
一个 source 可以对接多个 channel,那么,source 的数据如何在多个 channel 之间传递,就由 selector 来控制,配置应该挂载到 source 组件
1、复制选择器
一个连hdfs, 一个连kafka
a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2a1.sources.r1.channels = c1 c2
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = g1
a1.sources.r1.filegroups.g1 = /logdata/a.*
a1.sources.r1.fileHeader = false
a1.sources.r1.selector.type = replicatinga1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i1.headerName = timestampa1.channels.c1.type = memory
a1.channels.c2.type = memorya1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = doitedu01:9092,doitedu02:9092,doitedu03:9092
a1.sinks.k1.kafka.topic = doitedu17
a1.sinks.k1.kafka.producer.acks = 1a1.sinks.k2.channel = c2
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = hdfs://doitedu01:8020/flumedata/%Y-%m-%d/%H
a1.sinks.k2.hdfs.filePrefix = doitedu-log-
a1.sinks.k2.hdfs.fileSuffix = .log
a1.sinks.k2.hdfs.rollSize = 268435456
a1.sinks.k2.hdfs.rollInterval = 120
a1.sinks.k2.hdfs.rollCount = 0
a1.sinks.k2.hdfs.batchSize = 1000
a1.sinks.k2.hdfs.fileType = CompressedStream
a1.sinks.k2.hdfs.codeC = snappy
a1.sinks.k2.hdfs.useLocalTimeStamp = false
2、多路选择器
一个source里数据,可能有不同种类数据,需要使用拦截器,对数据进行区分,然后使用多路选择器插入到不同的channel里,一个写到kakfa,一个写到hdfs。
2.1 拦截器,并打包放到flume的lib下
package cn.doitedu.yiee.flume;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.util.List;public class MultiplexingInterceptor implements Interceptor {private Integer flagfield = 0;private Integer timestampfield = 0;public MultiplexingInterceptor(Integer flagfield,Integer timestampfield) {this.flagfield = flagfield;this.timestampfield = timestampfield;}/*** 拦截器构造实例后的初始化工作*/public void initialize() {}// 日志格式:// u01,ev1,mall,1568738583468public Event intercept(Event event) {// 根据event的数据内容,以及参数中指定的标记字段,来产生不同的header值byte[] body = event.getBody();String line = new String(body);String[] split = line.split(",");// 切出业务标记,并添加到headerevent.getHeaders().put("flag",split[flagfield]);// 切出行为(事件)时间戳,并添加到headerevent.getHeaders().put("timestamp",split[timestampfield]);return event;}public List<Event> intercept(List<Event> list) {for (Event event : list) {intercept(event);}return list;}/*** 拦截器销毁之前的一些清理工作*/public void close() {}public static class MultiplexingInterceptorBuilder implements Interceptor.Builder{Integer flagfield = 0;Integer timestampfield = 0;/*** 用户构建一个拦截器实例* @return*/public Interceptor build() {return new MultiplexingInterceptor(flagfield,timestampfield);}/*** 获取参数的入口* @param context*/public void configure(Context context) {flagfield = context.getInteger("flagfield");timestampfield = context.getInteger("timestampfield");}}
}
2.2 模拟日志生成脚本
while truedoif [ $(($RANDOM % 2)) -eq 0 ]thenecho "u$RANDOM,e1,waimai,`date +%s`000" >> a.logelseecho "u$RANDOM,e1,mall,`date +%s`000" >> a.logfisleep 0.2done
2.3变成配置文件
1.sources = r1a1.channels = c1 c2a1.sinks = k1 k2a1.sources.r1.channels = c1 c2a1.sources.r1.type = TAILDIRa1.sources.r1.filegroups = g1a1.sources.r1.filegroups.g1 = /logdata/a.*a1.sources.r1.fileHeader = falsea1.sources.r1.interceptors = i1a1.sources.r1.interceptors.i1.type = cn.doitedu.yiee.flume.MultiplexingInterceptor$MultiplexingInterceptorBuildera1.sources.r1.interceptors.i1.flagfield = 2a1.sources.r1.interceptors.i1.timestampfield = 3a1.sources.r1.selector.type = multiplexinga1.sources.r1.selector.header = flaga1.sources.r1.selector.mapping.mall = c1a1.sources.r1.selector.mapping.waimai = c2a1.sources.r1.selector.default = c2a1.channels.c1.type = memorya1.channels.c1.capacity = 2000a1.channels.c1.transactionCapacity = 1000a1.channels.c2.type = memorya1.channels.c2.capacity = 2000a1.channels.c2.transactionCapacity = 1000a1.sinks.k1.channel = c1a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.kafka.bootstrap.servers = doitedu01:9092,doitedu02:9092,doitedu03:9092a1.sinks.k1.kafka.topic = malla1.sinks.k1.kafka.producer.acks = 1a1.sinks.k2.channel = c2a1.sinks.k2.type = hdfsa1.sinks.k2.hdfs.path = hdfs://doitedu01:8020/waimai/%Y-%m-%d/%Ha1.sinks.k2.hdfs.filePrefix = doitedu-log-a1.sinks.k2.hdfs.fileSuffix = .loga1.sinks.k2.hdfs.rollSize = 268435456a1.sinks.k2.hdfs.rollInterval = 120a1.sinks.k2.hdfs.rollCount = 0a1.sinks.k2.hdfs.batchSize = 1000a1.sinks.k2.hdfs.fileType = DataStreama1.sinks.k2.hdfs.useLocalTimeStamp = false
案例七:自动失败切换
多个sink连接一个channel,默认不需要专门去配置的, 相当于负载均衡,或者failover sink processor 自动失败,需要将多个 sink 创建成 group。正常情况下,只运行一个sink,只有当它失败后,才切换到别的sink上。
默认是走兰色的线,若是兰色的机器挂掉,就走绿色的线;
1、级联高可用配置第一级
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1a1.sources.r1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = g1
a1.sources.r1.filegroups.g1 = /logdata/a.*
a1.sources.r1.fileHeader = falsea1.channels.c1.type = memory
a1.channels.c1.capacity = 2000
a1.channels.c1.transactionCapacity = 1000a1.sinks.k1.channel = c1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = doitedu02
a1.sinks.k1.port = 4444a1.sinks.k2.channel = c1
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = doitedu03
a1.sinks.k2.port = 4444a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 200
a1.sinkgroups.g1.processor.priority.k2 = 100
a1.sinkgroups.g1.processor.maxpenalty = 5000
2、级联高可用配置第2级(节点1)
a1.sources = r1
a1.sinks = k1
a1.channels = c1a1.sources.r1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = doitedu02
a1.sources.r1.port = 4444
a1.sources.r1.batchSize = 100a1.channels.c1.type = memory
a1.channels.c1.capacity = 2000
a1.channels.c1.transactionCapacity = 1000a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = doitedu01:9092,doitedu02:9092,doitedu03:9092
a1.sinks.k1.kafka.topic = failover
a1.sinks.k1.kafka.producer.acks = 1
3、级联高可用配置第2级(节点2)
a1.sources = r1
a1.sinks = k1
a1.channels = c1a1.sources.r1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = doitedu03
a1.sources.r1.port = 4444
a1.sources.r1.batchSize = 100a1.channels.c1.type = memory
a1.channels.c1.capacity = 2000
a1.channels.c1.transactionCapacity = 1000a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = doitedu01:9092,doitedu02:9092,doitedu03:9092
a1.sinks.k1.kafka.topic = failover
a1.sinks.k1.kafka.producer.acks = 1
四、flume监控
flume 在运行时,状态是否正常,吞吐量是否正常,可以使用ganglia 进行展现:
-Dflume.monitoring.type=ganglia -Dflume.monitoring.port=34890
Ganglia 是一个通用的集群运维监控系统;
它在各台需要监控状态信息的机器上安装“探针”,然后这些“探针”会收集所在机器上的各种状态
信息(cpu 负载,内存负载,磁盘 IO 负载,网络 IO 负载,以及各类应用软件的状态信息),然后汇
聚到它的中心汇聚点,并提供 web 页面进行图形可视化查看
五、监控flume进程、自动拉起
#!/bin/bashexport FLUME_HOME=/opt/apps/flume-1.9.0
while true
do
pc=`ps -ef | grep flume | grep -v "grep" | wc -l`if [[ $pc -lt 1 ]]
thenecho "detected no flume process.... preparing to launch flume agent...... "${FLUME_HOME}/bin/flume-ng agent -n a1 -c ${FLUME_HOME}/conf/ -f ${FLUME_HOME}/agentconf/failover.properties 1>/dev/null 2>&1 &
elseecho "detected flume process number is : $pc "
fisleep 1done
大数据之flume数据采集相关推荐
- 大数据技术之数据采集篇
数据采集是进行大数据分析的前提也是必要条件,在整个流程中占据重要地位.本文将介绍大数据三种采集形式:系统日志采集法.网络数据采集法以及其他数据采集法. (一)系统日志采集法 系统日志是记录系统中硬件. ...
- 大数据技术——Flume原理分析
摘要 主要是分析和讲解Flum的原理源码分析 Flume概述 Flume是的一个分布式.高可用.高可靠的海量日志采集.聚合和传输的系统,支持在日志系统中定制各类数据发送方,用于收集数据,同时提供了对数 ...
- 剖析大数据平台的数据采集
我在一次社区活动中做过一次分享,演讲题目为<大数据平台架构技术选型与场景运用>.在演讲中,我主要分析了大数据平台架构的生态环境,并主要以数据源.数据采集.数据存储与数据处理四个方面展开分析 ...
- 大数据(9) - Flume的安装与使用
Flume简介 --(实时抽取数据的工具) 1) Flume提供一个分布式的,可靠的,对大数据量的日志进行高效收集.聚集.移动的服务,Flume只能在Unix环境下运行. 2) Flume基于流式架构 ...
- 大数据学习——Flume入门
文章目录 一.Flume概述 1.1.Flume定义 1.2.Flume基础架构 二.Flume快速入门 2.1.安装Flume部署 2.2.入门案例 2.2.1.监控端口数据(官方案例) 2.2.2 ...
- 【大数据之 Flume】入门到放弃
文章目录 1 Flume 概述 1.1 Flume 定义 1.2 Flume 基础架构 2 Flume 入门 2.1 Flume 安装部署 2.1.1 安装地址 2.1.2 安装部署 2.2 Flum ...
- 大数据之Flume原理
听完小吴老师讲的以后,做的笔记~~~ 1.Flume定义 Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集.聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方 ...
- 大数据之Flume:Flume概述
Flume概述 1.1 Flume定义 Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集.聚合和传输的系统.Flume基于流式架构,灵活简单. 1.2 Flume基础架构 ...
- 大数据-初识flume
目录 flume概述 flume基础架构 flume 细节 flume概述 flume百度百科 flume 是 Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集.聚合和传输系统,基 ...
- flume数据采集_大数据采集系统Flume集群部署
集群规划 Flume集群,负载均衡和故障转移模式,笔者准备了3台机器安装了flume,其中webapp200是应用服务器,flume安装在这里,目的是收集应用服务器上的日志,通过2个avro sink ...
最新文章
- 第62天:手风琴效果
- 分享一款博客园皮肤及其解决方案
- 可以搜python题答案的app-Python数据分析与数据可视化知到APP答案
- Python zmq的三种简单模式
- 《疯狂Java讲义》7
- InterlockedIncrement函数详解
- 【ABAP】如何判断单据是否被锁定
- Python延迟打印字符
- linux界面版admin,linux下Nginx+Django Admin界面无样式问题解决方法
- SAP Spartacus Category Navigation的accessibility问题
- ERP Configurable product不会被CRM中间件下载
- 学生信息管理系统的价值PHP,php技术对学生管理系统实现的价值研究
- jerasure 2.0译文
- 酷应用背后,低代码正在被重估
- 二阶系统的性能分析(开环相幅和阶跃响应)——自动控制原理基础补充(三)
- matlab矩阵运算中只对部分数值进行计算的技巧
- 音频压缩算法ALaw,uLaw
- monthCalendar (日历)控件常用操作
- 2022年全球及中国舞台灯光系统行业头部企业市场占有率及排名调研报告
- Centos测试作死命令rm -rf /