flume源码学习8-hdfs sink的具体写入流程
上一篇说了HDFSEventSink的实现,这里根据hdfs sink的配置和调用分析来看下sink中整个hdfs数据写入的过程:
线上hdfs sink的几个重要设置
1
2
3
4
5
6
7
8
|
hdfs.path = hdfs: //xxxxx/%{logtypename}/%Y%m%d/%H:
hdfs.rollInterval = 60
hdfs.rollSize = 0 //想让文件只根据实际来roll
hdfs.rollCount = 0
hdfs.batchSize = 2000
hdfs.txnEventMax = 2000
hdfs.fileType = DataStream
hdfs.writeFormat = Text
|
这里说下和类相关的hdfs.fileType和hdfs.writeFormat,一个定义了文件流式用的类,一个定义了具体的数据序列化的类.
1)hdfs.fileType 有3个可选项:SequenceFile/DataStream/CompressedStream,DataStream可以想象成hdfs的textfile,默认是SequenceFileType,CompressedStream是用于压缩时设置
2)hdfs.writeFormat 定义了3种序列化方法,TEXT只写Event的body部分,HEADER_AND_TEXT写Event的body和header,AVRO_EVENT是avro的序列化方式
上面的设置,其数据写入流程大概如下:
1
|
SinkRunner.process->SinkProcessor.process->HDFSEventSink.process->HDFSEventSink.append->BucketWriter.append->HDFSWriter.append->HDFSDataStream.append->BodyTextEventSerializer.write->java.io.OutputStream.write
|
简单说下:
在HDFSEventSink中会实例化BucketWriter和HDFSWriter:
1
2
3
4
5
6
7
|
if (bucketWriter == null ) {
HDFSWriter hdfsWriter = writerFactory.getWriter(fileType ); //获取HDFSWriter 对象
....
bucketWriter = new BucketWriter(rollInterval , rollSize , rollCount ,
batchSize, context , realPath, realName, inUsePrefix, inUseSuffix,
suffix, codeC, compType, hdfsWriter, timedRollerPool,
proxyTicket, sinkCounter , idleTimeout , idleCallback, lookupPath); //根据HDFSWriter 对象获取BucketWriter对象
|
这里获取HDFSWriter 对象时用到了org.apache.flume.sink.hdfs.HDFSWriterFactory的getWriter方法,根据hdfs.fileType的设置会返回具体的org.apache.flume.sink.hdfs.HDFSWriter实现类的对象
目前只支持3种
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
static final String SequenceFileType = "SequenceFile" ;
static final String DataStreamType = "DataStream" ;
static final String CompStreamType = "CompressedStream" ;
....
public HDFSWriter getWriter(String fileType) throws IOException {
if (fileType.equalsIgnoreCase( SequenceFileType)) { //SequenceFile,sequencefile
return new HDFSSequenceFile();
} else if (fileType.equalsIgnoreCase(DataStreamType)) { //DataStream
return new HDFSDataStream();
} else if (fileType.equalsIgnoreCase(CompStreamType)) { //CompressedStream
return new HDFSCompressedDataStream();
} else {
throw new IOException( "File type " + fileType + " not supported" );
}
|
BucketWriter可以理解成是对下层数据操作的一个封装,比如数据写入时其实调用了其append方法,append主要有下面几个步骤:
1)首先判断文件是否打开:
1
2
3
4
5
6
7
|
if (! isOpen) {
if (idleClosed) {
throw new IOException( "This bucket writer was closed due to idling and this handle " +
"is thus no longer valid" );
}
open(); //如果没有打开,则调用open->doOpen->HDFSWriter.open方法打开bucketPath (bucketPath是临时写入目录,比如tmp结尾的目录,targetPath是最终目录)
}
|
doOpen的主要步骤
a.设置两个文件名:
1
2
3
|
bucketPath = filePath + DIRECTORY_DELIMITER + inUsePrefix
+ fullFileName + inUseSuffix;
targetPath = filePath + DIRECTORY_DELIMITER + fullFileName;
|
b.调用HDFSWriter.open方法打开bucketPath
1
2
3
4
5
6
7
8
9
10
11
12
|
if (codeC == null ) {
// Need to get reference to FS using above config before underlying
// writer does in order to avoid shutdown hook & IllegalStateExceptions
fileSystem = new Path(bucketPath ).getFileSystem(config);
LOG.info( "Creating " + bucketPath );
writer.open( bucketPath);
} else {
// need to get reference to FS before writer does to avoid shutdown hook
fileSystem = new Path(bucketPath ).getFileSystem(config);
LOG.info( "Creating " + bucketPath );
writer.open( bucketPath, codeC , compType );
}
|
c.如果设置了rollInterval ,则执行计划任务调用close方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
// if time-based rolling is enabled, schedule the roll
if (rollInterval > 0 ) {
Callable<Void> action = new Callable<Void>() {
public Void call() throws Exception {
LOG.debug( "Rolling file ({}): Roll scheduled after {} sec elapsed." ,
bucketPath, rollInterval );
try {
close();
} catch (Throwable t) {
LOG.error( "Unexpected error" , t);
}
return null ;
}
};
timedRollFuture = timedRollerPool.schedule(action, rollInterval ,
TimeUnit. SECONDS);
}
|
2)判断文件是否需要翻转(达到hdfs.rollSize或者hdfs.rollCount设置):
1
2
3
4
5
|
// check if it's time to rotate the file
if (shouldRotate()) {
close(); //close调用flush+doClose,flush调用doFlush,doFlush调用HDFSWriter.sync方法把数据同步到hdfs中
open();
}
|
其中shouldRotate(基于数量和大小的roll方式):
1
2
3
4
5
6
7
8
9
10
11
12
|
private boolean shouldRotate() {
boolean doRotate = false ;
if (( rollCount > 0 ) && (rollCount <= eventCounter )) { //hdfs.rollCount大于0并且处理的event的数量大于或等于hdfs.rollCount,doRotate 设置为true
LOG.debug( "rolling: rollCount: {}, events: {}" , rollCount , eventCounter );
doRotate = true ;
}
if (( rollSize > 0 ) && ( rollSize <= processSize)) { //hdfs.rollCount大于0并且处理的event的数量大于或等于hdfs.rollCount,doRotate 设置为true
LOG.debug( "rolling: rollSize: {}, bytes: {}" , rollSize , processSize );
doRotate = true ;
}
return doRotate;
}
|
其中doClose主要的步骤
a.调用HDFSWriter.close方法
b.调用renameBucket方法把tmp文件命名为最终文件:
1
2
3
4
|
if (bucketPath != null && fileSystem != null ) {
renameBucket(); // could block or throw IOException
fileSystem = null ;
}
|
其中renameBucket:
1
|
fileSystem.rename(srcPath, dstPath)
|
3)调用HDFSWriter.append方法写入Event
1
|
writer.append(event);
|
4) 更新计数器
1
2
3
4
|
// update statistics
processSize += event.getBody(). length;
eventCounter++;
batchCounter++;
|
5)判断是否需要flush(达到hdfs.batchSize的设置),batch写入数据到hdfs
1
2
3
|
if (batchCounter == batchSize) {
flush();
}
|
Event写入时BucketWriter的append方法调用org.apache.flume.sink.hdfs.HDFSWriter实现类的append方法,比如这里的HDFSDataStream类,HDFSDataStream的主要方法:
configure用于设置serializer:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
public void configure(Context context) {
serializerType = context.getString( "serializer" , "TEXT" ); //默认序列化方式为TEXT
useRawLocalFileSystem = context.getBoolean( "hdfs.useRawLocalFileSystem" ,
false );
serializerContext =
new Context(context.getSubProperties(EventSerializer.CTX_PREFIX));
logger.info( "Serializer = " + serializerType + ", UseRawLocalFileSystem = "
+ useRawLocalFileSystem);
}
append方法用于Event的写入,调用EventSerializer.write方法:
public void append(Event e) throws IOException {
// shun flumeformatter...
serializer.write(e); //调用EventSerializer.write方法写入Event
}
|
open方法主要步骤:
1)根据hdfs.append.support的设置(默认为false)打开或者新建文件
1
2
3
4
5
6
7
8
|
boolean appending = false ;
if (conf.getBoolean( "hdfs.append.support" , false ) == true && hdfs.isFile
(dstPath)) { //默认hdfs.append.support为false
outStream = hdfs.append(dstPath);
appending = true ;
} else {
outStream = hdfs.create(dstPath); //如果不支持append,则创建文件
}
|
2)使用EventSerializerFactory.getInstance方法创建EventSerializer的对象
1
2
|
serializer = EventSerializerFactory.getInstance(
serializerType, serializerContext , outStream ); //实例化EventSerializer对象
|
3)如果EventSerializer对象支持reopen,并且hdfs.append.support设置为true时会抛出异常
1
2
3
4
5
6
|
if (appending && ! serializer.supportsReopen()) {
outStream.close();
serializer = null ;
throw new IOException( "serializer (" + serializerType +
") does not support append" );
}
|
4)调用文件打开或者reopen之后的操作
1
2
3
4
5
6
|
if (appending) {
serializer.afterReopen();
} else {
serializer.afterCreate();
}
}
|
这里hdfs.writeFormat的3种设置和对应的类:
1
2
3
|
TEXT(BodyTextEventSerializer.Builder. class ), //支持reopen
HEADER_AND_TEXT(HeaderAndBodyTextEventSerializer.Builder. class ), //支持reopen
AVRO_EVENT(FlumeEventAvroEventSerializer.Builder. class ), // 不支持reopen
|
默认设置为TEXT,即BodyTextEventSerializer类:
1
2
3
4
5
6
7
8
9
10
|
private BodyTextEventSerializer(OutputStream out, Context ctx) { //构造方法
this . appendNewline = ctx.getBoolean(APPEND_NEWLINE , APPEND_NEWLINE_DFLT ); //默认为true
this . out = out;
}
....
public void write(Event e) throws IOException { //write方法
out.write(e.getBody()); //java.io.OutputStream.write,只写Event的body
if (appendNewline) { //每一行之后增加一个回车
out.write( '\n' );
}
|
flume源码学习8-hdfs sink的具体写入流程相关推荐
- flume源码学习4-SourceRunner与ExecSource实现
在agent启动时,会启动Channel,SourceRunner,SinkRunner,比如在org.apache.flume.agent.embedded.EmbeddedAgent类的doSta ...
- Ejabberd源码学习——端口监听及报文转发流程
这篇文章是我之前在RYTong内部分享的一篇文章.上一篇文章说到Ejabberd在启动的时候会监听配置的端口,但没有详细解释监听的流程.这篇我们就来看看Ejabberd监听端口的实现逻辑,了解下一个X ...
- Picasso 源码 学习(一) 图片加载流程
Picasso地址 http://square.github.io/picasso/ 最简单的使用方法 Picasso.with(context).load("http://i.imgur. ...
- 【Android 源码学习】 init启动
目录 Android 源码学习 init启动 从main.cpp开始 init.cpp 部分逻辑 init启动zygote 属性服务 总结 Android 源码学习 init启动 Android 11 ...
- Hadoop HDFS源码学习之NameNode部分
NameNode源码学习 文章目录 NameNode源码学习 一.文件系统目录树(第一关系) 2.1 INode相关类 2.2 快照特性的实现 2.3 FSEditLog类 2.4 FSImage类 ...
- Shiro源码学习之二
接上一篇 Shiro源码学习之一 3.subject.login 进入login public void login(AuthenticationToken token) throws Authent ...
- Shiro源码学习之一
一.最基本的使用 1.Maven依赖 <dependency><groupId>org.apache.shiro</groupId><artifactId&g ...
- mutations vuex 调用_Vuex源码学习(六)action和mutation如何被调用的(前置准备篇)...
前言 Vuex源码系列不知不觉已经到了第六篇.前置的五篇分别如下: 长篇连载:Vuex源码学习(一)功能梳理 长篇连载:Vuex源码学习(二)脉络梳理 作为一个Web前端,你知道Vuex的instal ...
- vue实例没有挂载到html上,vue 源码学习 - 实例挂载
前言 在学习vue源码之前需要先了解源码目录设计(了解各个模块的功能)丶Flow语法. src ├── compiler # 把模板解析成 ast 语法树,ast 语法树优化,代码生成等功能. ├── ...
最新文章
- c语言流程图char,求救!各位大神,用程序流程图怎么描述下面这个???
- C#静态类 转载:(原文:http://www.cnblogs.com/chenlulouis/ )
- 自动化测试?看完这篇就够了
- [日常工作]WorkStation 使用端口转发的方式使用宿主机IP地址提供服务
- 22-while循环
- 1.C#基础之简介(完成)
- hiho 第155周 任务分配
- 数据治理的目的与意义
- c# 系列 - 基本知识
- 伍德里奇计量经济学导论之计算机操作题的R语言实现(虚拟变量)
- Java是什么?Java能干嘛?
- 加拿大大学计算机世界排名,加拿大计算机专业大学排名
- 主流营销渠道O2O营销平台特质有哪些
- UEFI shell - 脚本文件
- uva10158(并查集)
- 解决MySQL5.7在MAC下登录ERROR 1045 (28000): Access denied for user 'root'@'localhost' (using password: YES)
- 【常见的优化算法介绍】
- 市场周刊杂志市场周刊杂志社市场周刊编辑部2022年第6期目录
- 1062lcd在dxp哪个库_Protel DXP 自带常用元件库路径
- Item 2: Understand auto type deduction.