问题导读:
1.Flume传输的数据的基本单位是是什么?

2.Event是什么,流向是怎么样的?
3.Source:完成对日志数据的收集,分成什么打入Channel中?
4.Channel的作用是什么?
5.取出Channel中的数据,进行相应的存储文件系统,数据库,或者提交到远程服务器,由谁来完成?

6.Flume支那些数据格式?

7.对于直接读取文件Source,有两种方式,分别是什么?

8.Channel有多种方式有哪些方式?

概述 Flume是Cloudera公司的一款高性能、高可能的分布式日志收集系统。现在已经是Apache Top项目。 Github地址 。同Flume相似的日志收集系统还有 Facebook Scribe , Apache Chuwka , Apache Kafka (也是LinkedIn的)。Flume是后起之秀,本文尝试简要分析Flume数据流通过程中提供的组件、可靠性保证来介绍Flume的主要设计,不涉及Flume具体的安装使用,也不涉及代码层面的剖析。写博文来记录这个工具主要是觉得与最近开发的一个流式的数据搬运的工具在设计上有相似之处,想看看有没有可以参考的地方。在博文的基础上,还需要浏览一下源码。

数据流通 Flume传输的数据的基本单位是event,如果是文本文件,通常是一行记录,这也是事务的基本单位。flume运行的核心是agent。它是一个完整的数据收集工具,含有三个核心组件,分别是source、channel、sink。Event从Source,流向Channel,再到Sink,本身为一个byte数组,并可携带headers信息。Event代表着一个数据流的最小完整单元,从外部数据源来,向外部的目的地去。Source:完成对日志数据的收集,分成transtion 和 event 打入到channel之中。Channel:主要提供一个队列的功能,对source提供中的数据进行简单的缓存。Sink:取出Channel中的数据,进行相应的存储文件系统,数据库,或者提交到远程服务器。通过这些组件,event可以从一个地方流向另一个地方,如下图所示。

Source消费从外部流进的Events,如AvroSource接收外部客户端传来的或是从别的agent流出来的Avro Event。Source可以把event送往一个或多个channel。channel是一个队列,持有event等待sink来消费,一种Channel的实现:FileChannel使用本地文件系统来作为它的存储。Sink的作用是把Event从channel里移除,送往外部数据仓库或给下一站agent的Source,如HDFSEventSink送往HDFS。同个agent下的source和sink是异步的。下面再举几个数据流通的例子,说明不同的使用方式。

多agent模式

多对一的合并/Collector 场景

一对多路输出模型

Source接入 Client端操作消费数据的来源,Flume支持Avro,log4j,syslog和http post(body为json格式)。可以让应用程序同已有的Source直接打交道,如AvroSource,SyslogTcpSource。也可以写一个Source,以IPC或RPC的方式接入自己的应用,Avro和Thrift都可以(分别有NettyAvroRpcClient和ThriftRpcClient实现了RpcClient接口),其中Avro是默认的RPC协议。具体代码级别的Client端数据接入,可以参考 官方手册 。
对现有程序改动最小的使用方式是使用是直接读取程序原来记录的日志文件,基本可以实现无缝接入,不需要对现有程序进行任何改动。 
对于直接读取文件Source,有两种方式:

  • ExecSource:以运行Linux命令的方式,持续的输出最新的数据,如tail -F 文件名指令,在这种方式下,取的文件名必须是指定的。 ExecSource可以实现对日志的实时收集,但是存在Flume不运行或者指令执行出错时,将无法收集到日志数据,无法保证日志数据的完整性。
  • SpoolSource:监测配置的目录下新增的文件,并将文件中的数据读取出来。

需要注意两点: 拷贝到spool目录下的文件不可以再打开编辑;spool目录下不可包含相应的子目录。SpoolSource虽然无法实现实时的收集数据,但是可以使用以分钟的方式分割文件,趋近于实时。如果应用无法实现以分钟切割日志文件的话,可以两种收集方式结合使用。 在实际使用的过程中,可以结合log4j使用,使用log4j的时候,将log4j的文件分割机制设为1分钟一次,将文件拷贝到spool的监控目录。log4j有一个TimeRolling的插件,可以把log4j分割的文件到spool目录。基本实现了实时的监控。Flume在传完文件之后,将会修改文件的后缀,变为.COMPLETED(后缀也可以在配置文件中灵活指定)

  1. public class MySource extends AbstractSource implements Configurable, PollableSource {
  2. private String myProp;
  3. @Override
  4. public void configure(Context context) {
  5. String myProp = context.getString("myProp", "defaultValue");
  6. // Process the myProp value (e.g. validation, convert to another type, ...)
  7. // Store myProp for later retrieval by process() method
  8. this.myProp = myProp;
  9. }
  10. @Override
  11. public void start() {
  12. // Initialize the connection to the external client
  13. }
  14. @Override
  15. public void stop () {
  16. // Disconnect from external client and do any additional cleanup
  17. // (e.g. releasing resources or nulling-out field values) ..
  18. }
  19. @Override
  20. public Status process() throws EventDeliveryException {
  21. Status status = null;
  22. // Start transaction
  23. Channel ch = getChannel();
  24. Transaction txn = ch.getTransaction();
  25. txn.begin();
  26. try {
  27. // This try clause includes whatever Channel operations you want to do
  28. // Receive new data
  29. Event e = getSomeData();
  30. // Store the Event into this Source's associated Channel(s)
  31. getChannelProcessor().processEvent(e)
  32. txn.commit();
  33. status = Status.READY;
  34. } catch (Throwable t) {
  35. txn.rollback();
  36. // Log exception, handle individual exceptions as needed
  37. status = Status.BACKOFF;
  38. // re-throw all Errors
  39. if (t instanceof Error) {
  40. throw (Error)t;
  41. }
  42. } finally {
  43. txn.close();
  44. }
  45. return status;
  46. }}

复制代码

Channel
Channel有多种方式:有MemoryChannel,JDBC Channel,MemoryRecoverChannel,FileChannel。
MemoryChannel可以实现高速的吞吐,但是无法保证数据的完整性。
MemoryRecoverChannel在官方文档的建议上已经建义使用FileChannel来替换。
FileChannel保证数据的完整性与一致性。在具体配置不限的FileChannel时,建议FileChannel设置的目录和程序日志文件保存的目录设成不同的磁盘,以便提高效率。

Sink Sink在设置存储数据时,可以向文件系统、数据库、hadoop存数据,在日志数据较少时,可以将数据存储在文件系中,并且设定一定的时间间隔保存数据。在日志数据较多时,可以将相应的日志数据存储到Hadoop中,便于日后进行相应的数据分析。
更多sink的内容可以参考 官方手册 。

  1. public class MySink extends AbstractSink implements Configurable {
  2. private String myProp;
  3. @Override
  4. public void configure(Context context) {
  5. String myProp = context.getString("myProp", "defaultValue");
  6. // Process the myProp value (e.g. validation)
  7. // Store myProp for later retrieval by process() method
  8. this.myProp = myProp;
  9. }
  10. @Override
  11. public void start() {
  12. // Initialize the connection to the external repository (e.g. HDFS) that
  13. // this Sink will forward Events to ..
  14. }
  15. @Override
  16. public void stop () {
  17. // Disconnect from the external respository and do any
  18. // additional cleanup (e.g. releasing resources or nulling-out
  19. // field values) ..
  20. }
  21. @Override
  22. public Status process() throws EventDeliveryException {
  23. Status status = null;
  24. // Start transaction
  25. Channel ch = getChannel();
  26. Transaction txn = ch.getTransaction();
  27. txn.begin();
  28. try {
  29. // This try clause includes whatever Channel operations you want to do
  30. Event event = ch.take();
  31. // Send the Event to the external repository.
  32. // storeSomeData(e);
  33. txn.commit();
  34. status = Status.READY;
  35. } catch (Throwable t) {
  36. txn.rollback();
  37. // Log exception, handle individual exceptions as needed
  38. status = Status.BACKOFF;
  39. // re-throw all Errors
  40. if (t instanceof Error) {
  41. throw (Error)t;
  42. }
  43. } finally {
  44. txn.close();
  45. }
  46. return status;
  47. }}

复制代码

可靠性 Flume的核心是把数据从数据源收集过来,再送到目的地。为了保证输送一定成功,在送到目的地之前,会先缓存数据,待数据真正到达目的地后,删除自己缓存的数据。
Flume使用事务性的方式保证传送Event整个过程的可靠性。Sink必须在Event被存入Channel后,或者,已经被传达到下一站agent里,又或者,已经被存入外部数据目的地之后,才能把Event从Channel中remove掉。这样数据流里的event无论是在一个agent里还是多个agent之间流转,都能保证可靠,因为以上的事务保证了event会被成功存储起来。而Channel的多种实现在可恢复性上有不同的保证。也保证了event不同程度的可靠性。比如Flume支持在本地保存一份文件channel作为备份,而memory channel将event存在内存queue里,速度快,但丢失的话无法恢复。
具体看一下Transaction。Source和Sink封装了Channel提供的对Event的事务存、取接口,下图为一个transaction过程:

一个Channel的实现里会包括一个transaction的实现,每个与channel打交道的source和sink都得带有一个transaction对象。下面的例子中可以看到一个Event的状态和变化会在一次transation中完成。transaction的状态也对应了时序图中的各个状态。

  1. Channel ch = new MemoryChannel();
  2. Transaction txn = ch.getTransaction();
  3. txn.begin();
  4. try {
  5. // This try clause includes whatever Channel operations you want to do
  6. Event eventToStage = EventBuilder.withBody("Hello Flume!",
  7. Charset.forName("UTF-8"));
  8. ch.put(eventToStage);
  9. // Event takenEvent = ch.take();
  10. // ...
  11. txn.commit();
  12. } catch (Throwable t) {
  13. txn.rollback();
  14. // Log exception, handle individual exceptions as needed
  15. // re-throw all Errors
  16. if (t instanceof Error) {
  17. throw (Error)t;
  18. }
  19. } finally {
  20. txn.close();
  21. }

复制代码

分布式日志收集系统Apache Flume的设计详细介绍相关推荐

  1. 基于Flume的美团日志收集系统(一)架构和设计

    背景 美团的日志收集系统负责美团的所有业务日志的收集,并分别给Hadoop平台提供离线数据和Storm平台提供实时数据流.美团的日志收集系统基于Flume设计和搭建而成. <基于Flume的美团 ...

  2. 一起来解读分布式日志收集系统:Facebook Scribe

    1.分布式日志收集系统:背景介绍 许多公司的平台每天会产生大量的日志(一般为流式数据,如,搜索引擎的pv,查询等),处理这些日志需要特定的日志系统,一般而言,这些系统需要具有以下特征: (1) 构建应 ...

  3. 分布式日志收集系统: Facebook Scribe

    转载于博主新浪微博:http://weibo.com/freshairbrucewoo. 欢迎大家相互交流,共同提高技术. 以下是我在公司内部分享的关于分布式日志收集系统的PPT内容,现在与大家分享, ...

  4. python分布式日志收集系统_分布式日志收集系统Scribe原理

    1.分布式日志收集系统:背景介绍 许多公司的平台每天会产生大量的日志(一般为流式数据,如,搜索引擎的pv,查询等),处理这些日志需要特定的日志系统,一般而言,这些系统需要具有以下特征: (1) 构建应 ...

  5. 探秘Hadoop生态12:分布式日志收集系统Flume

    这位大侠,这是我的公众号:程序员江湖.  分享程序员面试与技术的那些事. 干货满满,关注就送.  在具体介绍本文内容之前,先给大家看一下Hadoop业务的整体开发流程:  从Hadoop的业务开发流程 ...

  6. flume分布式日志收集系统操作

    1.flume是分布式的日志收集系统,把收集来的数据传送到目的地去. 2.flume里面有个核心概念,叫做agent.agent是一个java进程,运行在日志收集节点. 3.agent里面包含3个核心 ...

  7. python分布式日志收集系统_Go实现海量日志收集系统(一)

    项目背景 每个系统都有日志,当系统出现问题时,需要通过日志解决问题 当系统机器比较少时,登陆到服务器上查看即可满足 当系统机器规模巨大,登陆到机器上查看几乎不现实 当然即使是机器规模不大,一个系统通常 ...

  8. 分布式日志收集系统scribe介绍

    Scribe是facebook开源的日志收集系统,在facebook内部已经得到大量的应用. Scribe是基于一个使用非阻断C++服务器的thrift服务的实现.它能够从各种日志源上收集日志,存储到 ...

  9. 分布式日志收集系统--Chukwa

    1. 安装部署 1.1 环境要求 1.使用的JDK的版本必须是1.6或者更高版本,本实例中使用的是JDK1.6 2.使用的hadoop的版本必须是Hadoop0.20.205.1及以上版本,本实例中使 ...

最新文章

  1. 不限文件类型的ftp服务器,ftp服务器文件类型
  2. DotNetTextBoxV3.0在线编辑器控件Ver3.4.2 Open Source开源版
  3. C 语言编程 — 高级数据类型 — 字符串
  4. 分支结构,循环结构,for循环,九九乘法表
  5. 什么样的数据集可以被分成两类?
  6. KMM Kotlin expect的几种声明方式
  7. 数据结构 - 简单选择排序法
  8. 笔记整理-信息系统开发基础-软件测试-模糊测试
  9. Android开源项目整理:个性化空间View篇(看遍论坛千万篇,不看此篇也枉然)
  10. css改变指针形状,css 指针样式
  11. A* a=new B ,会不会产生内存泄露了,露了B-A的部分?
  12. 升级版的数据透视表!用一工具,做出了HR羡慕的人力数据分析
  13. 堆排序算法讲解视频java版_堆排序算法的讲解及Java版实现
  14. CS231n第一次作业_问题1
  15. WAPI网络认证原理
  16. Ubuntu修改键盘映射
  17. java eofexception_EOFException异常详解
  18. Java 数据库连接池工作原理
  19. 2022五一劳动节虾皮仓库物流放假安排
  20. 客户端到服务器端的通信过程及 原理图很好

热门文章

  1. android fragment点击返回键实现内容切换?
  2. 一个servlet,多个dwr.xml配置文件
  3. 拼多多算法笔试2020
  4. 并查集——程序自动分析(洛谷 P1955)
  5. 并查集——团伙(洛谷 P1892)
  6. C语言课后习题(7)
  7. python 字符串替换换行,Python fstring:替换换行符/lineb
  8. dakai微信小程序 ios_iOS APP拉起微信小程序
  9. python是机器语言还是编程语言_Python vs R,谁才是机器学习编程语言的首选?
  10. 周四下午3小时,4个行业分享,尽在信创行业发展高端研讨会数据库专场