Flume是一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。Flume最主要是用在分布式系统中,例如读取服务器本地的磁盘数据,并将数据写入到HDFS中。

对Flume的学习,最好就是结合官方文档进行学习。文档中有各种使用场景的配置,在开发的过程中,可以通过编写flume的工作配置文件来调用flume实现数据提取。

Flume文档地址:http://flume.apache.org/FlumeUserGuide.html

文档内有各种source, channel, sink, selector, processor, interceptor(这些名词后面会解释)的配置参数,主要围绕这个进行开发。

Flume基础架构

Flume的基础架构如下。

Agent

flume中通过agent进行日志采集、聚合、传输。agent是一个JVM进程,它以事件的形式将数据从源头送至目的。Agent 主要有 3 个部分组成,Source、Channel、Sink。

Source

Source作为Agent的输入口,负责接收各种类型、各种格式的日志数据,包括 avro、thrift、exec、jms、spooling directorynetcattaildir、sequence generator、syslog、http、legacy。

Sink

Sink作为Agent的输出口,负责不断轮询Channel中的事件并批量移除事件,根据配置文件的配置将事件写入到HDFS等存储系统,或者发到另一个Agent中。Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、HBase、solr、自定义。

Channel

Channel 是位于 Source 和 Sink 之间的缓冲区。因此,Channel 允许 Source 和 Sink 运作在不同的速率上。Channel 是线程安全的,可以同时处理几个 Source 的写入操作和几个
Sink 的读取操作。

Flume 自带两种 Channel:Memory Channel 和 File Channel。

Memory Channel 是内存中的队列。Memory Channel 在不需要关心数据丢失的情景下适用。如果需要关心数据丢失,那么 Memory Channel 就不应该使用,因为程序死亡、机器宕机或者重启都会导致数据丢失。File Channel 将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。

Event

event是flume的数据传输基本单元,event由两部分组成:Header和Body。

Header用来存放event的一些属性,为map结构。Body用来存放该条数据,以字节数组的形式存放。

Header Body
map结构 字节数组结构
public interface Event {/*** Returns a map of name-value pairs describing the data stored in the body.*/public Map<String, String> getHeaders();/*** Set the event headers* @param headers Map of headers to replace the current headers.*/public void setHeaders(Map<String, String> headers);/*** Returns the raw byte array of the data contained in this event.*/public byte[] getBody();/*** Sets the raw byte array of the data contained in this event.* @param body The data.*/public void setBody(byte[] body);}

下面用一个小例子来学习flume的基本使用。

Flume实例

需求:Flume实时监控目录下多个追加文件并上传到HDFS。

我们先看看flume的官方文档。此处我只截取了目录中一部分的组件。

Source:

Channel:

Sink:

为了实现这个任务,我们需要找到能完成任务的Source, Channel和Sink。根据文档的描述,我们可以得到以下配置。

  • 可以完成监控多个追加文件的Source,我选择Taildir Source,它可以实现多个实时追加的文件,并且能够实现断点续传;
  • 在测试中为了高效,我选择Memory Channel;
  • 最后,因为要上传到HDFS,那么当然是选择HDFS Sink。

那么,我们根据文档里,就可以找到需要写入的配置(必选或可选)。这里我截取一部分Taildir Source配置文档和官方示例。加粗为必选,非加粗为可选。我们根据这个文档编写配置文件。

那么,就开始动手写任务。首先,需要创建一个配置flume任务的文件。

vim flume-taildir-hdfs.conf

并在配置文件中进行配置,具体实现请看代码与注释:

#首先给source, sink, channel命名
a3.sources = r3
a3.sinks = k3
a3.channels = c3# 根据文档编写source配置
a3.sources.r3.type = TAILDIR
# positionFile记录一个json文件的位置
#这个json文件保存了filegroups中的日志监听断点,保证了断点续传功能
a3.sources.r3.positionFile = /opt/module/flume/tail_dir.json
#filegroups表示文件组,定义了多个文件组就可以实现多个目录的监控
a3.sources.r3.filegroups = f1 f2
a3.sources.r3.filegroups.f1 = /opt/module/flume/files/.*file.*
a3.sources.r3.filegroups.f2 = /opt/module/flume/files2/.*log.*# 根据文档编写sink配置
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop102:8020/flume/upload2/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是 128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a3.sinks.k3.hdfs.rollCount = 0# 根据文档编写channel配置
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100# 绑定channel的source和sink
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

启动监控命令。-c表示设置flume整体配置,这里做默认配置;-n为选择agent名字,这里为上面所写的a3;-f为flume本次任务的配置,这里为刚刚上面所写的文件路径。

bin/flume-ng agent -c conf -n a3 -f job/flume-taildir-hdfs.conf

在这之后,对配置文件中所指定的目录(比如 /opt/module/flume/files/.file.)加入符合的日志文件,就会自动被监测到并发到指定HDFS目录中。

Taildir 说明: Taildir Source 维护了一个 json 格式的 position File,其会定期的往 position File中更新每个文件读取到的最新的位置,因此能够实现断点续传。Position File 的格式如下:

{"inode":2496272,"pos":12,"file":"/opt/module/flume/files/file1.txt"}
{"inode":2496275,"pos":12,"file":"/opt/module/flume/files/file2.txt"}

注:Linux 中储存文件元数据的区域就叫做 inode,每个 inode 都有一个号码,操作系统用 inode 号码来识别不同的文件,Unix/Linux 系统内部不使用文件名,而使用 inode 号码来识别文件。

Flume事务

Flume的Agent中存在事务操作,当数据传输中出现意外时会进行回滚。

Put事务:

  • doPut:source将数据送到channel时,会把批处理数据线写入到putList
  • doCommit:检查channel内存队列是否足够合并
  • doRollback:channel内存队列空间不足,回滚数据

Take事务:

  • doTake:将数据提取到临时缓冲区takeList,并将数据发送到HDFS
  • doCommit:如果数据全部发送成功,则清楚临时缓冲区takeList
  • doRollback:数据发送过程中如果出现异常,rollback将临时缓冲区takeList中的数据归还给channel内存队列。

Flume Agent内部原理

Flume Agent的内部原理如下图所示。

  • 首先由Source接受数据。
  • 然后叫给Channel Processor处理事件(这里还不是Channel)。
  • 将事件传给Interceptor Chain处理并返回。
  • 将每个事件给Channel Selector。Channel Selector 的作用就是选出 Event 将要被发往哪个 Channel。其共有两种类型,分别是 Replicating(复制)和 Multiplexing(多路复用)。Replicating Selector 会将同一个 Event 发往所有的 Channel,Multiplexing 会根据相应的原则,将不同的 Event 发往不同的 Channel。
  • 根据Channel Selector的选择写入相应的Channel中。
  • Sink通过SinkProcessor拉取Channel的数据。这里需要注明一点,一个sink只能接受一个channel的event(在配置文件中绑定时也是channel而不是channels),但是一个channel的数据可以传给多个sink。
  • SinkProcessor 共 有 三 种 类 型 , 分 别 是 DefaultSinkProcessor 、LoadBalancingSinkProcessor 和 FailoverSinkProcessor。DefaultSinkProcessor 表示一个Channel对应一个Sink , LoadBalancingSinkProcessor 和FailoverSinkProcessor 对应的是 Sink Group,LoadBalancingSinkProcessor 可以实现负载均衡的功能,FailoverSinkProcessor 可以错误恢复的功能。

Flume拓扑结构

简单串联

将多个agent连接在一起,前一个的sink连接后一个source。此处要求type为avro。此模式不建议桥接过多的 flume 数量, flume 数量过多不仅会影响传输速率,而且一旦传输过程中某个节点 flume 宕机,会影响整个传输系统。

复制和多路复用

Flume 支持将事件流向一个或者多个目的地。这种模式可以将相同数据复制到多个channel 中,或者将不同数据分发到不同的 channel 中,sink 可以选择传送到不同的目的地。比如,这里可以在实时任务中,既发送到运行程序中,也发送到HDFS中。

负载均衡和故障转移

Flume支持使用将多个sink逻辑上分到一个sink组,sink组配合不同的SinkProcessor可以实现负载均衡和错误恢复的功能。

聚合

这种模式是我们最常见的,也非常实用,日常 web 应用通常分布在上百个服务器,大者甚至上千个、上万个服务器。产生的日志,处理起来也非常麻烦。用 flume 的这种组合方式能很好的解决这一问题,每台服务器部署一个 flume 采集日志,传送到一个集中收集日志的flume,再由此 flume 上传到 hdfs、hive、hbase 等,进行日志分析。

参考:尚硅谷

Flume基本原理及使用相关推荐

  1. 离线计算七 辅助系统(flume、sqoop、oozie)

    课程大纲(辅助系统) 离线辅助系统 数据接入 Flume介绍 Flume组件 Flume实战案例 任务调度 调度器基础 市面上调度工具 Oozie的使用 Oozie的流程定义详解 数据导出 sqoop ...

  2. Computer OS系统基本原理

    Computer OS系统基本原理 第一章 绪论(考概念) 什么是OS? o 操作系统是一组控制和管理计算机软硬件资源.合理地对各类作业进行调度以及方便用户使用的程序集合. o 操作系统是位于硬件层( ...

  3. XGBoost4J-Spark基本原理

    XGBoost4J-Spark基本原理 XGBoost4J-Spark是一个项目,旨在通过使XGBoost适应Apache Spark的MLLIB框架,无缝集成XGBoost和Apache Spark ...

  4. Docker基本原理概述

    Docker基本原理概述 Docker是一个用于开发,交付和运行应用程序的开放平台.Docker能够将应用程序与基础架构分开,从而可以快速交付软件.借助Docker,可以以与管理应用程序相同的方式来管 ...

  5. 多机多卡训练基本原理

    多机多卡训练基本原理 在工业实践中,许多较复杂的任务需要使用更强大的模型.强大模型加上海量的训练数据,经常导致模型训练耗时严重.比如在计算机视觉分类任务中,训练一个在ImageNet数据集上精度表现良 ...

  6. MindSpore基本原理

    MindSpore基本原理 • MindSpore介绍 o 自动微分 o 自动并行 • 安装 o pip方式安装 o 源码编译方式安装 o Docker镜像 • 快速入门 • 文档 MindSpore ...

  7. Docker Context基本原理

    Docker Context基本原理 介绍 本指南介绍了上下文如何使单个Docker CLI轻松管理多个Swarm集群.多个Kubernetes集群和多个单独的Docker节点. 单个Docker C ...

  8. 垃圾回收器的基本原理是什么?垃圾回收器可以马上回收内存吗?有什么办法主动通知虚拟机进行垃圾回收?...

    一.垃圾回收器的基本原理是什么?垃圾回收器可以马上回收内存吗?有什么办法主动通知虚拟机进行垃圾回收?   1.对于GC来说,当程序员创建对象时,GC就开始监控这个对象的地址.大小以及使用情况. 通常, ...

  9. flume写入mysql_Flume高级之自定义MySQLSource

    1 自定义Source说明 Source是负责接收数据到Flume Agent的组件.Source组件可以处理各种类型.各种格式的日志数据,包括avro.thrift.exec.jms.spoolin ...

最新文章

  1. php path当局者迷,当局者迷_成语故事_有品有墨_品故事 写人生
  2. SQL Server:分离和重新附加数据库
  3. Python 技术篇-用smtplib和email库实现邮件发送并展示本地图片实例演示
  4. 先查询再插入的存储过程怎么写_谈一谈 InnoDB(1) - 底层存储文件结构
  5. a few ideas for cambridge career
  6. elasticsearch索引结构和配置优化
  7. 为什么饮料瓶大都是圆的,牛奶盒却是方的?原因你想不到
  8. webpack debug
  9. 抽象数据类型和Python类的基础
  10. micronaut pk spring boot
  11. 第一百八十四节,jQuery-UI,验证注册表单
  12. OPPO 物联网开放之路
  13. 堆、栈及静态数据区详解
  14. Android Studio 4.1一键生成代码Template
  15. 《软件工程》第5章系统建模
  16. 般若波罗蜜多心经(观音心经)注解
  17. 头条流量android,今日头条的免流量看视频是怎么操作的?
  18. 热分析技术清单:导热材料热扩散系数闪光法测量中的样品厚度选择
  19. 网页特效大公开(转)
  20. 计算机研究与发展 杂志,计算机研究与发展杂志

热门文章

  1. practical python and opencv_Practical Python and OpenCV + Case Studies, 3rd Edition
  2. 写秒杀器的一点心得。
  3. 安装vsftp软件包
  4. Python爬虫大作业(仿虎牙直播客户端)
  5. 【企业】韬盛和夫六精进
  6. destooon7.0装修网站源码带分站多地区带设计报价
  7. 交叉编译 SQLite
  8. Unity3d 坦克大战开发日志1
  9. 甲骨文(oracle) 美国,甲骨文全球大会.美国
  10. 【Linux内核系列】花90分钟了解4种红黑树的Linux内核应用场景