HDFS sink主要处理过程在process方法:

//循环batchSize次或者Channel为空

for(txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {

//该方法会调用BasicTransactionSemantics的具体实现

Event event = channel.take();

if (event == null) {

break;

}

......

//sfWriter是一个LRU缓存,缓存对文件Handler,最大打开文件由参数maxopenfiles控制

BucketWriter bucketWriter = sfWriters.get(lookupPath);

// 如果不存在,则构造一个缓存

if (bucketWriter == null) {

//通过HDFSWriterFactory根据filetype生成一个hdfswriter,由参数hdfs.Filetype控制;eg:HDFSDataStream

HDFSWriter hdfsWriter = writerFactory.getWriter(fileType);

//idleCallback会在bucketWriter flush完毕后从LRU中删除;

bucketWriter = new BucketWriter(rollInterval, rollSize, rollCount,

batchSize, context, realPath, realName, inUsePrefix, inUseSuffix,

suffix, codeC, compType,hdfsWriter, timedRollerPool,

proxyTicket, sinkCounter, idleTimeout, idleCallback,

lookupPath, callTimeout, callTimeoutPool);

sfWriters.put(lookupPath, bucketWriter);

}

......

// track一个事务内的bucket

if (!writers.contains(bucketWriter)) {

writers.add(bucketWriter);

}

// 写数据到HDFS;

bucketWriter.append(event);->

open();//如果底层支持append,则通过open接口打开;否则create接口

//判断是否进行日志切换

//根据复制的副本书和目标副本数做对比,如果不满足则doRotate=false

if(doRotate) {

close();

open();

}

HDFSWriter.append(event);

if(batchCounter == batchSize) {//如果达到batchSize行进行一次flush

flush();->

doFlush()->

HDFSWriter.sync()->

FSDataoutputStream.flush/sync

}

// 提交事务之前,刷新所有的bucket

for(BucketWriter bucketWriter : writers){

bucketWriter.flush();

}

transaction.commit();

这里,无论是BucketWriter执行append,sync还是rename等操作都是提交到一个后台线程池进行异步处理:callWithTimeout,这个线程池的大小是由hdfs.threadsize来设置;

本文转自MIKE老毕 51CTO博客,原文链接:http://blog.51cto.com/boylook/1298627,如需转载请自行联系原作者

Flume-ng HDFS sink原理解析相关推荐

  1. Flume之HDFS Sink使用案例

    前言 操作系统:CentOS 7 Java版本:1.8.0_221 Flume版本:1.8.0 HDFS版本:2.7.7 Flume agent配置:Netcat TCP Source.Memory ...

  2. 【Hadoop】HDFS操作、数据上传与下载原理解析、高级特性及底层原理

    HDFS操作.数据上传与下载原理解析.高级特性及底层原理 1 HDFS操作 1.1 Web Console网页工具 1.2 命令行 1.2.1 普通的操作命令 1.2.2 管理员命令 1.3 Java ...

  3. [ETL] Flume 理论与demo(Taildir Source Hdfs Sink)

    一.Flume简介 1. Flume概述 Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集.聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据: ...

  4. Flume中的HDFS Sink配置

    Flume中的HDFS Sink配置参数说明 type:hdfs path:hdfs的路径,需要包含文件系统标识,比如:hdfs://namenode/flume/webdata/ filePrefi ...

  5. Flume中的HDFS Sink配置参数说明

    Flume中的HDFS Sink应该是非常常用的,其中的配置参数也比较多,在这里记录备忘一下. channel type:hdfs path:写入hdfs的路径,需要包含文件系统标识,可以使用flum ...

  6. flume hdfs sink 文件滚动策略

    一般使用hdfs sink都会采用滚动生成文件的方式,hdfs sink滚动生成文件的策略有: 基于时间 基于文件大小 基于hdfs文件副本数(一般要规避这种情况) 基于event数量 基于文件闲置时 ...

  7. flume组件之hdfs sink

    简介 flume的三大组件 source\channel\sink对应着采集位置类型\缓存类型\下沉地类型 本文主要讲解sink中的hdfs sink的常见属性以及常见问题 常用属性 type:指定s ...

  8. Flume HDFS Sink配置详解

    Name Default Description channel –   type – 组件的名称,必须为:HDFS hdfs.path – HDFS目录路径,例如:hdfs://namenode/f ...

  9. Flume sinks案例HDFS Sink(每 5 秒在 hdfs 上创建一个新的文件夹)

    参考网址:hdfs sinks %t Unix 时间戳,毫秒 %{host} 替换名为"host"的事件 header 的值.支持任意标题名称. %a 星期几的短名,即 Mon, ...

最新文章

  1. C++实现读取指定文件夹下的所有文件
  2. 列举web开发中,为满足高性能的架构技术实现
  3. 将数据导入到mysql_06955.10.2如何将CM的外部PostgreSQL数据库迁移至MySQL服务
  4. 软件工程师学硬件-怎么看原理图之协议类接口UART
  5. iphone最新款手机_泄密者称,iPhone 13将拥有苹果粉丝多年来一直想要的新功能|iphone|apple|泄密者|手机|安卓...
  6. Truebine聚合监控
  7. 数据结构 传统链表实现与Linux内核链表
  8. Exchange 2016 之分层通讯簿
  9. Windows Insider 最新重大升级
  10. springsoure.sts下载地址
  11. 宁海象山H5棋牌游戏定制开发
  12. SSH与SSM的区别
  13. ajax传参的3种形式
  14. FPGA仿锆石代码风格组合电路时序电路严格分开之(一)8通道16位AD采集
  15. 国产Si24R2F+2.4GHz超低功耗有源RFID无线发射芯片
  16. 2021年金属非金属矿山支柱考试题库及金属非金属矿山支柱找解析
  17. freeswitch ws php,针对FreeSwitch的呼叫中心接口
  18. 媒体邀约得3个步骤和5个注意事项
  19. 收藏了800道Java后端经典面试题,共享给大家
  20. Oracle中用户角色权限管理

热门文章

  1. linux hosts请求报错
  2. 读书笔记 - 《我的美国之路》
  3. BZOJ 4602: [Sdoi2016]齿轮 dfs
  4. DevExpress学习笔记之如何获取Repository Item的值
  5. oracle中的合并查询
  6. 【Java】MapReduce 程序五步走的思想详细描述
  7. C语言求两个数的最大公约数
  8. 华为鸿蒙系统手机匹配,【图片】华为鸿蒙系统的厉害之处在于 你可能非用不可 !【手机吧】_百度贴吧...
  9. Spring MVC JSR-303验证框架之Hibernate-Validator
  10. 04-02 接口协议分析工具