Flume-ng HDFS sink原理解析
HDFS sink主要处理过程在process方法:
//循环batchSize次或者Channel为空
for(txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {
//该方法会调用BasicTransactionSemantics的具体实现
Event event = channel.take();
if (event == null) {
break;
}
......
//sfWriter是一个LRU缓存,缓存对文件Handler,最大打开文件由参数maxopenfiles控制
BucketWriter bucketWriter = sfWriters.get(lookupPath);
// 如果不存在,则构造一个缓存
if (bucketWriter == null) {
//通过HDFSWriterFactory根据filetype生成一个hdfswriter,由参数hdfs.Filetype控制;eg:HDFSDataStream
HDFSWriter hdfsWriter = writerFactory.getWriter(fileType);
//idleCallback会在bucketWriter flush完毕后从LRU中删除;
bucketWriter = new BucketWriter(rollInterval, rollSize, rollCount,
batchSize, context, realPath, realName, inUsePrefix, inUseSuffix,
suffix, codeC, compType,hdfsWriter, timedRollerPool,
proxyTicket, sinkCounter, idleTimeout, idleCallback,
lookupPath, callTimeout, callTimeoutPool);
sfWriters.put(lookupPath, bucketWriter);
}
......
// track一个事务内的bucket
if (!writers.contains(bucketWriter)) {
writers.add(bucketWriter);
}
// 写数据到HDFS;
bucketWriter.append(event);->
open();//如果底层支持append,则通过open接口打开;否则create接口
//判断是否进行日志切换
//根据复制的副本书和目标副本数做对比,如果不满足则doRotate=false
if(doRotate) {
close();
open();
}
HDFSWriter.append(event);
if(batchCounter == batchSize) {//如果达到batchSize行进行一次flush
flush();->
doFlush()->
HDFSWriter.sync()->
FSDataoutputStream.flush/sync
}
// 提交事务之前,刷新所有的bucket
for(BucketWriter bucketWriter : writers){
bucketWriter.flush();
}
transaction.commit();
这里,无论是BucketWriter执行append,sync还是rename等操作都是提交到一个后台线程池进行异步处理:callWithTimeout,这个线程池的大小是由hdfs.threadsize来设置;
本文转自MIKE老毕 51CTO博客,原文链接:http://blog.51cto.com/boylook/1298627,如需转载请自行联系原作者
Flume-ng HDFS sink原理解析相关推荐
- Flume之HDFS Sink使用案例
前言 操作系统:CentOS 7 Java版本:1.8.0_221 Flume版本:1.8.0 HDFS版本:2.7.7 Flume agent配置:Netcat TCP Source.Memory ...
- 【Hadoop】HDFS操作、数据上传与下载原理解析、高级特性及底层原理
HDFS操作.数据上传与下载原理解析.高级特性及底层原理 1 HDFS操作 1.1 Web Console网页工具 1.2 命令行 1.2.1 普通的操作命令 1.2.2 管理员命令 1.3 Java ...
- [ETL] Flume 理论与demo(Taildir Source Hdfs Sink)
一.Flume简介 1. Flume概述 Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集.聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据: ...
- Flume中的HDFS Sink配置
Flume中的HDFS Sink配置参数说明 type:hdfs path:hdfs的路径,需要包含文件系统标识,比如:hdfs://namenode/flume/webdata/ filePrefi ...
- Flume中的HDFS Sink配置参数说明
Flume中的HDFS Sink应该是非常常用的,其中的配置参数也比较多,在这里记录备忘一下. channel type:hdfs path:写入hdfs的路径,需要包含文件系统标识,可以使用flum ...
- flume hdfs sink 文件滚动策略
一般使用hdfs sink都会采用滚动生成文件的方式,hdfs sink滚动生成文件的策略有: 基于时间 基于文件大小 基于hdfs文件副本数(一般要规避这种情况) 基于event数量 基于文件闲置时 ...
- flume组件之hdfs sink
简介 flume的三大组件 source\channel\sink对应着采集位置类型\缓存类型\下沉地类型 本文主要讲解sink中的hdfs sink的常见属性以及常见问题 常用属性 type:指定s ...
- Flume HDFS Sink配置详解
Name Default Description channel – type – 组件的名称,必须为:HDFS hdfs.path – HDFS目录路径,例如:hdfs://namenode/f ...
- Flume sinks案例HDFS Sink(每 5 秒在 hdfs 上创建一个新的文件夹)
参考网址:hdfs sinks %t Unix 时间戳,毫秒 %{host} 替换名为"host"的事件 header 的值.支持任意标题名称. %a 星期几的短名,即 Mon, ...
最新文章
- C++实现读取指定文件夹下的所有文件
- 列举web开发中,为满足高性能的架构技术实现
- 将数据导入到mysql_06955.10.2如何将CM的外部PostgreSQL数据库迁移至MySQL服务
- 软件工程师学硬件-怎么看原理图之协议类接口UART
- iphone最新款手机_泄密者称,iPhone 13将拥有苹果粉丝多年来一直想要的新功能|iphone|apple|泄密者|手机|安卓...
- Truebine聚合监控
- 数据结构 传统链表实现与Linux内核链表
- Exchange 2016 之分层通讯簿
- Windows Insider 最新重大升级
- springsoure.sts下载地址
- 宁海象山H5棋牌游戏定制开发
- SSH与SSM的区别
- ajax传参的3种形式
- FPGA仿锆石代码风格组合电路时序电路严格分开之(一)8通道16位AD采集
- 国产Si24R2F+2.4GHz超低功耗有源RFID无线发射芯片
- 2021年金属非金属矿山支柱考试题库及金属非金属矿山支柱找解析
- freeswitch ws php,针对FreeSwitch的呼叫中心接口
- 媒体邀约得3个步骤和5个注意事项
- 收藏了800道Java后端经典面试题,共享给大家
- Oracle中用户角色权限管理
热门文章
- linux hosts请求报错
- 读书笔记 - 《我的美国之路》
- BZOJ 4602: [Sdoi2016]齿轮 dfs
- DevExpress学习笔记之如何获取Repository Item的值
- oracle中的合并查询
- 【Java】MapReduce 程序五步走的思想详细描述
- C语言求两个数的最大公约数
- 华为鸿蒙系统手机匹配,【图片】华为鸿蒙系统的厉害之处在于 你可能非用不可
!【手机吧】_百度贴吧...
- Spring MVC JSR-303验证框架之Hibernate-Validator
- 04-02 接口协议分析工具