本文基于ThriftSource,MemoryChannel,HdfsSink三个组件,对Flume数据传输的事务进行分析,如果使用的是其他组件,Flume事务具体的处理方式将会不同。一般情况下,用MemoryChannel就好了,我们公司用的就是这个,FileChannel速度慢,虽然提供日志级别的数据恢复,但是一般情况下,不断电MemoryChannel是不会丢数据的。

Flume提供事物操作,保证用户的数据的可靠性,主要体现在:

  • 数据在传输到下个节点时(通常是批量数据),如果接收节点出现异常,比如网络异常,则回滚这一批数据。因此有可能导致数据重发
  • 同个节点内,Source写入数据到Channel,数据在一个批次内的数据出现异常,则不写入到Channel。已接收到的部分数据直接抛弃,靠上一个节点重发数据。

编程模型

Flume在对Channel进行Put和Take操作的时候,必须要用事物包住,比如:

Channel ch = new MemoryChannel();
Transaction txn = ch.getTransaction();
//事物开始
txn.begin();
try {Event eventToStage = EventBuilder.withBody("Hello Flume!",Charset.forName("UTF-8")); //往临时缓冲区Put数据 ch.put(eventToStage); //或者ch.take() //将这些数据提交到channel中 txn.commit(); } catch (Throwable t) { txn.rollback(); if (t instanceof Error) { throw (Error)t; } } finally { txn.close(); } 

Put事务流程

Put事务可以分为以下阶段:

  • doPut:将批数据先写入临时缓冲区putList
  • doCommit:检查channel内存队列是否足够合并。
  • doRollback:channel内存队列空间不足,抛弃数据

我们从Source数据接收到写入Channel这个过程对Put事物进行分析。

ThriftSource会spawn多个Worker线程(ThriftSourceHandler)去处理数据,Worker处理数据的接口,我们只看batch批量处理这个接口:

    @Overridepublic Status appendBatch(List<ThriftFlumeEvent> events) throws TException {List<Event> flumeEvents = Lists.newArrayList();for(ThriftFlumeEvent event : events) {flumeEvents.add(EventBuilder.withBody(event.getBody(), event.getHeaders())); } //ChannelProcessor,在Source初始化的时候传进来.将数据写入对应的Channel getChannelProcessor().processEventBatch(flumeEvents); ... return Status.OK; } 

事务逻辑都在processEventBatch这个方法里:

public void processEventBatch(List<Event> events) {...//预处理每行数据,有人用来做ETL嘛events = interceptorChain.intercept(events);...//分类数据,划分不同的channel集合对应的数据 // Process required channels Transaction tx = reqChannel.getTransaction(); ... //事务开始,tx即MemoryTransaction类实例 tx.begin(); List<Event> batch = reqChannelQueue.get(reqChannel); for (Event event : batch) { // 这个put操作实际调用的是transaction.doPut reqChannel.put(event); } //提交,将数据写入Channel的队列中 tx.commit(); } catch (Throwable t) { //回滚 tx.rollback(); ... } } ... } 

每个Worker线程都拥有一个Transaction实例,保存在Channel(BasicChannelSemantics)里的ThreadLocal变量currentTransaction.

那么,事务到底做了什么?

实际上,Transaction实例包含两个双向阻塞队列LinkedBlockingDeque(感觉没必要用双向队列,每个线程写自己的putList,又不是多个线程?),分别为:

  • putList
  • takeList

对于Put事物操作,当然是只用到putList了。putList就是一个临时的缓冲区,数据会先put到putList,最后由commit方法会检查channel是否有足够的缓冲区,有则合并到channel的队列。

channel.put -> transaction.doPut:

    protected void doPut(Event event) throws InterruptedException {//计算数据字节大小 int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize); //写入临时缓冲区putList if (!putList.offer(event)) { throw new ChannelException( "Put queue for MemoryTransaction of capacity " + putList.size() + " full, consider committing more frequently, " + "increasing capacity or increasing thread count"); } putByteCounter += eventByteSize; } 

transaction.commit:

@Overrideprotected void doCommit() throws InterruptedException { //检查channel的队列剩余大小是否足够 ... int puts = putList.size(); ... synchronized(queueLock) { if(puts > 0 ) { while(!putList.isEmpty()) { //写入到channel的队列 if(!queue.offer(putList.removeFirst())) { throw new RuntimeException("Queue add failed, this shouldn't be able to happen"); } } } //清除临时队列 putList.clear(); ... } ... } 

如果在事务期间出现异常,比如channel剩余空间不足,则rollback:

@Overrideprotected void doRollback() {...//抛弃数据,没合并到channel的内存队列 putList.clear(); ... } 

Take事务

Take事务分为以下阶段:

  • doTake:先将数据取到临时缓冲区takeList
  • 将数据发送到下一个节点
  • doCommit:如果数据全部发送成功,则清除临时缓冲区takeList
  • doRollback:数据发送过程中如果出现异常,rollback将临时缓冲区takeList中的数据归还给channel内存队列。

Sink其实是由SinkRunner线程调用Sink.process方法来了处理数据的。我们从HdfsEventSink的process方法说起,Sink类都有个process方法,用来处理传输数据的逻辑。:

public Status process() throws EventDeliveryException {...Transaction transaction = channel.getTransaction();...//事务开始transaction.begin();...for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) { //take数据到临时缓冲区,实际调用的是transaction.doTake Event event = channel.take(); if (event == null) { break; } ... //写数据到HDFS bucketWriter.append(event); ... // flush all pending buckets before committing the transaction for (BucketWriter bucketWriter : writers) { bucketWriter.flush(); } //commit transaction.commit(); ... } catch (IOException eIO) { transaction.rollback(); ... } finally { transaction.close(); } } 

大致流程图:

接着看看channel.take,作用是将数据放到临时缓冲区,实际调用的是transaction.doTake:

protected Event doTake() throws InterruptedException {...//从channel内存队列取数据synchronized(queueLock) {event = queue.poll();}...//将数据放到临时缓冲区 takeList.put(event); ... return event; } 

接着,HDFS写线程bucketWriter将take到的数据写到HDFS,如果批数据都写完了,则要commit了:

protected void doCommit() throws InterruptedException {...takeList.clear();...
}

很简单,其实就是清空takeList而已。如果bucketWriter在写数据到HDFS的时候出现异常,则要rollback:

protected void doRollback() {int takes = takeList.size();//检查内存队列空间大小,是否足够takeList写回去 synchronized(queueLock) { Preconditions.checkState(queue.remainingCapacity() >= takeList.size(), "Not enough space in memory channel " + "queue to rollback takes. This should never happen, please report"); while(!takeList.isEmpty()) { queue.addFirst(takeList.removeLast()); } ... } ... }

转载于:https://www.cnblogs.com/whtydn/p/4384199.html

Flume数据传输事务分析[转]相关推荐

  1. 大数据系列(五)之 Flume 数据传输

    目录 一.Flume简介 二.Flume架构 2.1 Flume基本组件 2.2 Flume常见数据流模型 三.Source,Channel,Sink 详解 3.1 Source 3.2 Channe ...

  2. flume数据丢失与重复_Flume架构及常见面试

    一.Flume基础架构 1.Agent Agent是一个JVM进程,它以事件(Event)的形式将数据从源头送至目的. Agent主要有3个部分组成,Source.Channel.Sink. 2.Ev ...

  3. 大剑无锋之flume面试题【面试推荐】

    1 你是如何实现Flume数据传输的监控的 使用第三方框架Ganglia实时监控Flume. 2 Flume的Source,Sink,Channel的作用?你们Source是什么类型? 1.作用 (1 ...

  4. flume学习-含安装

    1.Flume是什么:Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集.聚合和传输的系统.Flume基于流式架构,灵活简单. Flume组成架构 下面我们来详细介绍一下F ...

  5. 大数据(9) - Flume的安装与使用

    Flume简介 --(实时抽取数据的工具) 1) Flume提供一个分布式的,可靠的,对大数据量的日志进行高效收集.聚集.移动的服务,Flume只能在Unix环境下运行. 2) Flume基于流式架构 ...

  6. 海量日志收集利器 —— Flume

    Flume 是什么? Flume是一个分布式.可靠.和高可用的海量日志聚合的系统,支持在系统中定制各类数据发送方,用于收集数据:同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的 ...

  7. 【hadoop生态之Flume】概念【笔记+代码】

    一.Flume简介 Flume提供一个分布式的,可靠的,对大数据量的日志进行高效收集.聚集.移动的服务,Flume只能在Unix环境下运行. Flume基于流式架构,容错性强,也很灵活简单. Flum ...

  8. 大数据技术之Flume —— (1)一文入门学习Flume

    目录 一.什么是Flume? 1.1.flume的定义 1.2.flume的架构 二.Flume入门 2.1.下载flume 2.2.安装flume 2.3.flume小案例 2.3.1.官方案例-- ...

  9. Flume 数据采集组件

    目录 1.数据收集工具/系统产生背景 2.专业的数据收集工具 2.1.Chukwa 2.2.Scribe 2.3.Fluentd 2.4.Logstash 2.5.Apache Flume 3.Flu ...

最新文章

  1. 优先级队列(小顶堆)的dijkstra算法
  2. vscode快速注释_Python快速入门(一)
  3. python代码_如何使用 Sphinx 给 Python 代码写文档
  4. 自己写的一个报表,研究SAP CRM ibase保存问题
  5. C#中的thread和task之 Thread ThreadPool
  6. 1.10-linux三剑客之sed命令详解及用法
  7. stl标准模板库_C ++标准模板库(STL)中的数组及其常用功能
  8. 谁记录了mysql error log中的超长信息
  9. ubuntu ftp server-转
  10. golang的定时任务
  11. Adb refused a command 解决方法
  12. vue项目中通过图片url下载图片
  13. 我研究了3年,终于找出2021年完美的听歌方案!
  14. C++计算某天是该年的第几天
  15. tomcat Note: further occurrences of HTTP header parsing errors will be logged at DEBUG
  16. JavaSE学习笔记(七)(常用类)
  17. 视唱练耳——调式调号听辨
  18. [论文总结] 森林生态系统中的水生生境
  19. macos13 Ventura虚拟机安装无网络问题
  20. 拉格朗日插值法的Matlab实现

热门文章

  1. garmin USB: linux USB host驱动
  2. 银行流水你真的会看吗?
  3. 系统业务逻辑书籍_「樊登读书会强推:免费送10本绝密书」彻底改变你的逻辑思维能力...
  4. linux监控哪些目录,linux管理文件和目录的命令
  5. 卡牌大师怎么玩_用卡牌大师如何上分
  6. lds天线技术流程图_音箱耳机入门,蓝牙真无线耳机中的LDS天线 「Soomal」
  7. python将csv文件导入mysql-使用python将csv文件导入Mysql数据库
  8. Django 模板语言 标签
  9. Python 字典删除元素clear、pop、popitem
  10. reorder-list