上一篇说了HDFSEventSink的实现,这里根据hdfs sink的配置和调用分析来看下sink中整个hdfs数据写入的过程:
线上hdfs sink的几个重要设置

1
2
3
4
5
6
7
8
hdfs.path = hdfs://xxxxx/%{logtypename}/%Y%m%d/%H:
hdfs.rollInterval = 60
hdfs.rollSize = 0 //想让文件只根据实际来roll
hdfs.rollCount = 0
hdfs.batchSize = 2000
hdfs.txnEventMax = 2000
hdfs.fileType = DataStream 
hdfs.writeFormat = Text

这里说下和类相关的hdfs.fileType和hdfs.writeFormat,一个定义了文件流式用的类,一个定义了具体的数据序列化的类.
1)hdfs.fileType 有3个可选项:SequenceFile/DataStream/CompressedStream,DataStream可以想象成hdfs的textfile,默认是SequenceFileType,CompressedStream是用于压缩时设置
2)hdfs.writeFormat 定义了3种序列化方法,TEXT只写Event的body部分,HEADER_AND_TEXT写Event的body和header,AVRO_EVENT是avro的序列化方式

上面的设置,其数据写入流程大概如下:

1
SinkRunner.process->SinkProcessor.process->HDFSEventSink.process->HDFSEventSink.append->BucketWriter.append->HDFSWriter.append->HDFSDataStream.append->BodyTextEventSerializer.write->java.io.OutputStream.write

简单说下:
在HDFSEventSink中会实例化BucketWriter和HDFSWriter:

1
2
3
4
5
6
7
        if (bucketWriter == null) {
          HDFSWriter hdfsWriter = writerFactory.getWriter(fileType ); //获取HDFSWriter 对象
....
          bucketWriter = new BucketWriter(rollInterval , rollSize , rollCount ,
              batchSize, context , realPath, realName, inUsePrefix, inUseSuffix,
              suffix, codeC, compType, hdfsWriter, timedRollerPool,
              proxyTicket, sinkCounter , idleTimeout , idleCallback, lookupPath); //根据HDFSWriter 对象获取BucketWriter对象

这里获取HDFSWriter 对象时用到了org.apache.flume.sink.hdfs.HDFSWriterFactory的getWriter方法,根据hdfs.fileType的设置会返回具体的org.apache.flume.sink.hdfs.HDFSWriter实现类的对象
目前只支持3种

1
2
3
4
5
6
7
8
9
10
11
12
13
14
  static final String SequenceFileType = "SequenceFile" ;
  static final String DataStreamType = "DataStream" ;
  static final String CompStreamType = "CompressedStream" ;
....
  public HDFSWriter getWriter(String fileType) throws IOException {
    if (fileType.equalsIgnoreCase( SequenceFileType)) { //SequenceFile,sequencefile
      return new HDFSSequenceFile();
    else if (fileType.equalsIgnoreCase(DataStreamType)) { //DataStream
      return new HDFSDataStream();
    else if (fileType.equalsIgnoreCase(CompStreamType)) { //CompressedStream
      return new HDFSCompressedDataStream();
    else {
      throw new IOException("File type " + fileType + " not supported");
    }

BucketWriter可以理解成是对下层数据操作的一个封装,比如数据写入时其实调用了其append方法,append主要有下面几个步骤:
1)首先判断文件是否打开:

1
2
3
4
5
6
7
    if (! isOpen) {
      if(idleClosed) {
        throw new IOException("This bucket writer was closed due to idling and this handle " +
            "is thus no longer valid");
      }
      open(); //如果没有打开,则调用open->doOpen->HDFSWriter.open方法打开bucketPath (bucketPath是临时写入目录,比如tmp结尾的目录,targetPath是最终目录)
    }

doOpen的主要步骤
a.设置两个文件名:

1
2
3
        bucketPath = filePath + DIRECTORY_DELIMITER + inUsePrefix
          + fullFileName + inUseSuffix;
        targetPath = filePath + DIRECTORY_DELIMITER + fullFileName;

b.调用HDFSWriter.open方法打开bucketPath

1
2
3
4
5
6
7
8
9
10
11
12
         if (codeC == null) {
          // Need to get reference to FS using above config before underlying
          // writer does in order to avoid shutdown hook & IllegalStateExceptions
          fileSystem = new Path(bucketPath ).getFileSystem(config);
          LOG.info("Creating " + bucketPath );
          writer.open( bucketPath);
        else {
          // need to get reference to FS before writer does to avoid shutdown hook
          fileSystem = new Path(bucketPath ).getFileSystem(config);
          LOG.info("Creating " + bucketPath );
          writer.open( bucketPath, codeC , compType );
        }

c.如果设置了rollInterval ,则执行计划任务调用close方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
    // if time-based rolling is enabled, schedule the roll
    if (rollInterval > 0) {
      Callable<Void> action = new Callable<Void>() {
        public Void call() throws Exception {
          LOG.debug("Rolling file ({}): Roll scheduled after {} sec elapsed." ,
              bucketPath, rollInterval );
          try {
            close();
          catch(Throwable t) {
            LOG.error("Unexpected error" , t);
          }
          return null ;
        }
      };
      timedRollFuture = timedRollerPool.schedule(action, rollInterval ,
          TimeUnit. SECONDS);
    }

2)判断文件是否需要翻转(达到hdfs.rollSize或者hdfs.rollCount设置):

1
2
3
4
5
    // check if it's time to rotate the file
    if (shouldRotate()) {
      close(); //close调用flush+doClose,flush调用doFlush,doFlush调用HDFSWriter.sync方法把数据同步到hdfs中
      open();
    }

其中shouldRotate(基于数量和大小的roll方式):

1
2
3
4
5
6
7
8
9
10
11
12
  private boolean shouldRotate() {
    boolean doRotate = false;
    if (( rollCount > 0) && (rollCount <= eventCounter )) { //hdfs.rollCount大于0并且处理的event的数量大于或等于hdfs.rollCount,doRotate 设置为true
      LOG.debug( "rolling: rollCount: {}, events: {}" , rollCount , eventCounter );
      doRotate = true;
    }
    if (( rollSize > 0) && ( rollSize <= processSize)) { //hdfs.rollCount大于0并且处理的event的数量大于或等于hdfs.rollCount,doRotate 设置为true
      LOG.debug( "rolling: rollSize: {}, bytes: {}" , rollSize , processSize );
      doRotate = true;
    }
    return doRotate;
  }

其中doClose主要的步骤
a.调用HDFSWriter.close方法
b.调用renameBucket方法把tmp文件命名为最终文件:

1
2
3
4
    if (bucketPath != null && fileSystem != null) {
      renameBucket(); // could block or throw IOException
      fileSystem = null;
    }

其中renameBucket:

1
fileSystem.rename(srcPath, dstPath)

3)调用HDFSWriter.append方法写入Event

1
writer.append(event);

4) 更新计数器

1
2
3
4
    // update statistics
    processSize += event.getBody(). length;
    eventCounter++;
    batchCounter++;

5)判断是否需要flush(达到hdfs.batchSize的设置),batch写入数据到hdfs

1
2
3
    if (batchCounter == batchSize) {
      flush();
    }

Event写入时BucketWriter的append方法调用org.apache.flume.sink.hdfs.HDFSWriter实现类的append方法,比如这里的HDFSDataStream类,HDFSDataStream的主要方法:
configure用于设置serializer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
  public void configure(Context context) {
    serializerType = context.getString( "serializer""TEXT" ); //默认序列化方式为TEXT
    useRawLocalFileSystem = context.getBoolean( "hdfs.useRawLocalFileSystem",
        false);
    serializerContext =
        new Context(context.getSubProperties(EventSerializer.CTX_PREFIX));
    logger.info( "Serializer = " + serializerType + ", UseRawLocalFileSystem = "
        + useRawLocalFileSystem);
  }
append方法用于Event的写入,调用EventSerializer.write方法:
  public void append(Event e) throws IOException {
    // shun flumeformatter...
    serializer.write(e); //调用EventSerializer.write方法写入Event
  }

open方法主要步骤:
1)根据hdfs.append.support的设置(默认为false)打开或者新建文件

1
2
3
4
5
6
7
8
    boolean appending = false;
    if (conf.getBoolean( "hdfs.append.support"false ) == true && hdfs.isFile
            (dstPath)) { //默认hdfs.append.support为false
      outStream = hdfs.append(dstPath);
      appending = true;
    else {
      outStream = hdfs.create(dstPath); //如果不支持append,则创建文件
    }

2)使用EventSerializerFactory.getInstance方法创建EventSerializer的对象

1
2
    serializer = EventSerializerFactory.getInstance(
        serializerType, serializerContext , outStream ); //实例化EventSerializer对象

3)如果EventSerializer对象支持reopen,并且hdfs.append.support设置为true时会抛出异常

1
2
3
4
5
6
    if (appending && ! serializer.supportsReopen()) {
      outStream.close();
      serializer = null;
      throw new IOException("serializer (" + serializerType +
          ") does not support append");
    }

4)调用文件打开或者reopen之后的操作

1
2
3
4
5
6
    if (appending) {
      serializer.afterReopen();
    else {
      serializer.afterCreate();
    }
  }

这里hdfs.writeFormat的3种设置和对应的类:

1
2
3
  TEXT(BodyTextEventSerializer.Builder. class), //支持reopen
  HEADER_AND_TEXT(HeaderAndBodyTextEventSerializer.Builder. class), //支持reopen
  AVRO_EVENT(FlumeEventAvroEventSerializer.Builder. class), // 不支持reopen

默认设置为TEXT,即BodyTextEventSerializer类:

1
2
3
4
5
6
7
8
9
10
  private BodyTextEventSerializer(OutputStream out, Context ctx) { //构造方法
    this. appendNewline = ctx.getBoolean(APPEND_NEWLINE , APPEND_NEWLINE_DFLT ); //默认为true
    this. out = out;
  }
....
  public void write(Event e) throws IOException { //write方法
    out.write(e.getBody()); //java.io.OutputStream.write,只写Event的body
    if (appendNewline) { //每一行之后增加一个回车
      out.write('\n');
    }

本文转自菜菜光 51CTO博客,原文链接:http://blog.51cto.com/caiguangguang/1618343,如需转载请自行联系原作者

flume源码学习8-hdfs sink的具体写入流程相关推荐

  1. flume源码学习4-SourceRunner与ExecSource实现

    在agent启动时,会启动Channel,SourceRunner,SinkRunner,比如在org.apache.flume.agent.embedded.EmbeddedAgent类的doSta ...

  2. Ejabberd源码学习——端口监听及报文转发流程

    这篇文章是我之前在RYTong内部分享的一篇文章.上一篇文章说到Ejabberd在启动的时候会监听配置的端口,但没有详细解释监听的流程.这篇我们就来看看Ejabberd监听端口的实现逻辑,了解下一个X ...

  3. Picasso 源码 学习(一) 图片加载流程

    Picasso地址 http://square.github.io/picasso/ 最简单的使用方法 Picasso.with(context).load("http://i.imgur. ...

  4. 【Android 源码学习】 init启动

    目录 Android 源码学习 init启动 从main.cpp开始 init.cpp 部分逻辑 init启动zygote 属性服务 总结 Android 源码学习 init启动 Android 11 ...

  5. Hadoop HDFS源码学习之NameNode部分

    NameNode源码学习 文章目录 NameNode源码学习 一.文件系统目录树(第一关系) 2.1 INode相关类 2.2 快照特性的实现 2.3 FSEditLog类 2.4 FSImage类 ...

  6. Shiro源码学习之二

    接上一篇 Shiro源码学习之一 3.subject.login 进入login public void login(AuthenticationToken token) throws Authent ...

  7. Shiro源码学习之一

    一.最基本的使用 1.Maven依赖 <dependency><groupId>org.apache.shiro</groupId><artifactId&g ...

  8. mutations vuex 调用_Vuex源码学习(六)action和mutation如何被调用的(前置准备篇)...

    前言 Vuex源码系列不知不觉已经到了第六篇.前置的五篇分别如下: 长篇连载:Vuex源码学习(一)功能梳理 长篇连载:Vuex源码学习(二)脉络梳理 作为一个Web前端,你知道Vuex的instal ...

  9. vue实例没有挂载到html上,vue 源码学习 - 实例挂载

    前言 在学习vue源码之前需要先了解源码目录设计(了解各个模块的功能)丶Flow语法. src ├── compiler # 把模板解析成 ast 语法树,ast 语法树优化,代码生成等功能. ├── ...

最新文章

  1. c语言流程图char,求救!各位大神,用程序流程图怎么描述下面这个???
  2. C#静态类 转载:(原文:http://www.cnblogs.com/chenlulouis/ )
  3. 自动化测试?看完这篇就够了
  4. [日常工作]WorkStation 使用端口转发的方式使用宿主机IP地址提供服务
  5. 22-while循环
  6. 1.C#基础之简介(完成)
  7. hiho 第155周 任务分配
  8. 数据治理的目的与意义
  9. c# 系列 - 基本知识
  10. 伍德里奇计量经济学导论之计算机操作题的R语言实现(虚拟变量)
  11. Java是什么?Java能干嘛?
  12. 加拿大大学计算机世界排名,加拿大计算机专业大学排名
  13. 主流营销渠道O2O营销平台特质有哪些
  14. UEFI shell - 脚本文件
  15. uva10158(并查集)
  16. 解决MySQL5.7在MAC下登录ERROR 1045 (28000): Access denied for user 'root'@'localhost' (using password: YES)
  17. 【常见的优化算法介绍】
  18. 市场周刊杂志市场周刊杂志社市场周刊编辑部2022年第6期目录
  19. 1062lcd在dxp哪个库_Protel DXP 自带常用元件库路径
  20. Item 2: Understand auto type deduction.

热门文章

  1. Druid 配置 wallfilter
  2. Linux下task_struct详解
  3. as3自定义加载图片类
  4. swift 跳转网页写法
  5. 【跃迁之路】【554天】程序员高效学习方法论探索系列(实验阶段311-2018.08.13)...
  6. phpcms中调用外部网站数据
  7. android获取手机通讯录
  8. 团队项目—每日记录3(补4.25)
  9. thrift框架使用C++
  10. hibernate中多对多分解成一对多,