简介
     flume1.7新增了组件Taildir Source(详情参见官方链接:http://flume.apache.org/FlumeUserGuide.html#taildir-source),此组件支持断点续传功能。但是此组件有个bug,即如果有个A文件,被更名为B文件后,A中的数据会被重复采集一次。这里需要做出修复

环境及软件准备
       flume采集环境,flume源码包(下载地址:http://mirror.bit.edu.cn/apache/flume/1.8.0/apache-flume-1.8.0-src.tar.gz)
       注意:为了明显看到效果,我的环境是flume采集数据到kafka,由sparkstreaming来消费打印出结果。

修改及测试
     1.源码修改及编译。由于时间关系,我的操作都是简化的,没有完全重新编译整个flume,只编译了Taildir Source 组件。在eclipse中新建一个maven项目,将源码包中Taildir Source的代码拷贝过来。注意包的路径要于源码一致。

2.导入jdk1.7(注意这里的JDK版本,要与服务器上flume运行的Java环境一致,最好完全一样,绝对不能跨大版本。一开始我用jdk1.8编译,在1.7环境上运行直接报错。)同时,在eclipse项目上右键->properties,按照下图调节编译环境。

pom依赖必须导入flume。如果少了哪些依赖,在flume源码包根目录下的pom文件中找就行了

<dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.7.0</version>
</dependency>

3.更改源码如下:将第248行改为249行,第136行改为137行。原理就是如果有文件绝对路径的判断条件,那么当文件更名后,绝对路径就变了,在程序中就相当于要采集一个新文件,造成数据重复,这里要这么做,就是取消掉文件绝对路径的判断。
       
248        //if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) {
249        if (tf == null) {

136       //if (tf != null && tf.updatePos(path, inode, pos)) {
137        if (tf != null && tf.updatePos(tf.getPath(), inode, pos)) {

然后运行maven,用package打成jar包。
     
          3.在服务器上flume的lib下,将 flume-taildir-source-1.7.0.jar备份 flume-taildir-source-1.7.0.jar.bak。然后将刚刚打的jar包上传到本路径下,重命名为 flume-taildir-source-1.7.0.jar

4.启动flume
            配置文件flume_taildirsource.conf 如下:

[root@kjtlxsvr7 conf]# cat flume_taildirsource.conf
#定义agent名, source、channel、sink的名称
a3.sources = r1
a3.channels = c1
a3.sinks = k1

#具体定义source
a3.sources.r1.type = TAILDIR
a3.sources.r1.positionFile = /var/log/flume/taildir_position.json
a3.sources.r1.filegroups=f1
a3.sources.r1.filegroups.f1=/home/flume_test_dir/test.log.*
a3.sources.r1.fileHeader=true

#具体定义channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

#具体定义sink
a3.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a3.sinks.k1.topic = test
a3.sinks.k1.brokerList = 192.168.60.46:9092
a3.sinks.k1.requiredAcks = 1
a3.sinks.k1.batchSize = 20
#组装source、channel、sink
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
#指定编码。由于中文编码问题,如果采集的文件中含有中文,如果此处编码和文件不一致则可能报错
a3.sources.r1.inputCharset = utf-8

启动命令:
bin/flume-ng agent -n a3 -c conf -f conf/flume_taildirsource.conf -Dflume.root.logger=INFO,console
          5.如果没有报错,则可以进行测试,如果报错了,多半就是编译环境的问题,要自行调整。

6.测试:在/home/flume_test_dir/路径下,新建2个文件test.log.1  test.log.2。
                    首先向test.log.2中echo数据
[root@kjtlxsvr7 flume_test_dir]# echo "first" >> test.log.2
                    eclipse中打印出结果为:

(first,1)

-------------------------------------------
Time: 1499138450000 ms
-------------------------------------------
1
                    然后将test.log.2更名,再echo数据
                
[root@kjtlxsvr7 flume_test_dir]# mv test.log.2 test.log.3
[root@kjtlxsvr7 flume_test_dir]# ls
test.log.0  test.log.3
[root@kjtlxsvr7 flume_test_dir]# echo "second" >> test.log.3
                     eclipse中打印的结果为:
               
Time: 1499138475000 ms
-------------------------------------------
(second,1)

-------------------------------------------
Time: 1499138475000 ms
-------------------------------------------
1

通过上面的测试,可以发现文件更名后,已经采集的数据不会重复采集了。
             
              7. 关于flume_taildirsource.conf:观察配置文件,可以发现定义source时定义了一个taildir_position.json文件,这个文件就是flume用来记录文件信息如id,偏移量,绝对路径等。其中的内容如下:

[root@kjtlxsvr7 flume]# cat taildir_position.json
[{"inode":622633,"pos":63,"file":"/home/flume_test_dir/test.log.8"},{"inode":622639,"pos":13,"file":"/home/flume_test_dir/test.log.2"}][root@kjtlxsvr7 flume]#
        这里inode就是标记文件的,文件名称改变,这个iNode不会变,pos记录偏移量,file就是绝对路径

8.关于断点续传。可以吧flume停掉,然后往文件中echo数据,再启动flume。经过测试已经通过。

flume1.8 TailDirSource断点续传与文件更名后数据重复采集的bug修复相关推荐

  1. flume1.7 TailDirSource断点续传与文件更名后数据重复采集的bug修复

    flume1.7 TailDirSource断点续传与文件更名后数据重复采集的bug修复 一:flume1.7新增了组件Taildir Source 官方链接:http://flume.apache. ...

  2. oracle使用(五)表空间创建、删除以及删除后数据文件还存在的问题

    目录 概述 一.表空间创建 二.表空间删除 三.表空间及数据文件删除后数据文件还存在 其他有空再补充 概述 待补充 一.表空间创建 --表空间 create tablespace space_name ...

  3. fastdfs redis java,大文件上传_断点续传_文件分片传输_fastdfs_前后端一站式解决方案...

    大文件上传,断点续传,秒传,fastdfs 项目介绍 实现h5与fastdfs之间的断点续传,大文件上传,秒传 软件架构 软件架构说明 webuploader+springboot+redis+fas ...

  4. hadoop HDFS的文件夹创建、文件上传、文件下载、文件夹删除,文件更名、文件详细信息、文件类型判断(文件夹或者文件)

    摘要: 本篇文章主要介绍的是hadoop hdfs的基础api的使用.包括Windows端依赖配置,Maven依赖配置.最后就是进行实际的操作,包括:获取远程hadoop hdfs连接,并对其进行的一 ...

  5. 断点续传大文件,视频

    1,准备工作: 1,运行环境:jdk1.7; tomcat7;eclipse; 2,运用框架:ssm,webuploader,servlet; 2,   前端部分: 1,html: <scrip ...

  6. Java中实现文件更名操作

    Java中实现文件更名操作 要点:与此文件相连接的输入输出流要先关闭,后更名,才能成功更名. 代码如下: import java.io.*; class FileManager {static fin ...

  7. Pandas将dataframe保存为pickle文件并加载保存后的pickle文件查看dataframe数据实战

    Pandas将dataframe保存为pickle文件并加载保存后的pickle文件查看dataframe数据实战 目录 Pandas将dataframe保存为pickle文件并加载保存后的pickl ...

  8. 模拟数据库,表空间和数据文件损坏后的恢复操作

    1环境准备 对数据库做一次全备份: 验证当前的备份文件: 2数据库损坏的恢复 2.1模拟数据库损坏 尝试重启数据库查看报错: 这里需要重点说明的是因为我们用的是CATLOG数据库作为目录数据库,所以即 ...

  9. oracle11gr2查看数据库状态,Oracle 11gR2数据库文件丢失后的恢复测试

    一.测试环境 数据库版本是Oracle 11gR2,在做完一份完全备份之后,关机,做一份快照,每一次开机之后都执行数次alter system switch logfile以产生归档日志. 之后的测试 ...

  10. mysql 拷贝数据库 表存在却打不开_mysql数据库文件复制后表打不开

    mysql数据库文件复制后表打不开找了很多方法最终解决了.InnoDB只有frm表结构,拷贝过去mysql后说表不存在网上说还要拷贝ibdata1文件,但这样的话会覆盖掉mysql本来有的ibdata ...

最新文章

  1. oracle 辅助实例,初学rman问题小记三:创建辅助实例遇到的ORA-01031
  2. AQO.NET实现数据操作封装
  3. 微软Build2021今日召开,共同期待VS2022+.NET6!
  4. Java –从列表中删除所有空值
  5. 工作不能混日子,给自己留言
  6. Rwordseg和tmcn安装-2017.09.23
  7. jdialog 数据量大加载出现白板_王者荣耀:队友真的有人机?白板熟练进排位,资料面都是假的...
  8. mysql xtrabackup 保护模式_MySQL Xtrabackup备份原理和实现细节
  9. java用来存储键值的容器是_Java容器 - osc_y0caef0i的个人空间 - OSCHINA - 中文开源技术交流社区...
  10. 开源视频平台:Kaltura
  11. 也说说angularJs里的evalAsync
  12. SpringBoot源码篇:Spring5内置tomcat实现code-based的web.xml实现
  13. TIMESTEN安装配置指南-中文版
  14. traceroute和tracert原理
  15. 射频功率dbm-w换算表
  16. 机器学习基础(林軒田)笔记之七
  17. Makefile语法基础
  18. html页分割线最简单的实现方式
  19. 【亲测有效】win10修改电脑系统字体大小
  20. 用程序实现基本计算器功能

热门文章

  1. 计算机主机自动关机如何设置,电脑怎么设置自动关机?电脑自动关机方法教程 电脑维修技术网...
  2. 按字节编址、按字编址、按字节寻址、按字寻址。
  3. FatalThrowableError in Encrypter.php line 66: Call to undefined function openssl_encrypt()
  4. 双边功率谱密度和单边功率谱密度_以高斯信号为例,计算幅度谱、相位谱、双边功率谱、双边功率谱密度、单边功率谱、单边功率谱密度。...
  5. matlab s域转时域,时域 S域 Z域转换
  6. Systemverilog always_comb 过程块
  7. java程序的结构与类型实验报告_20172301 《Java软件结构与数据结构》实验三报告...
  8. 湘潭十八中2021年高考成绩查询,2021年 湖南省湘潭市高中学校排名top10
  9. 调用百度API(七)——获取百度API token 通用代码
  10. Gentoo 教程:系统完善