今天在做flume+kerberos写入hdfs时遇到的问题。
测试的配置文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
agent-server1.sources= testtail
agent-server1.sinks = hdfs-sink
agent-server1.channels= hdfs-channel
agent-server1.sources.testtail.type = netcat
agent-server1.sources.testtail.bind = localhost
agent-server1.sources.testtail.port = 9999
agent-server1.sinks.hdfs-sink.hdfs.kerberosPrincipal = hdfs/_HOST@KERBEROS_HADOOP
agent-server1.sinks.hdfs-sink.hdfs.kerberosKeytab = /home/vipshop/conf/hdfs.keytab
agent-server1.channels.hdfs-channel.type = memory
agent-server1.channels.hdfs-channel.capacity = 200000000
agent-server1.channels.hdfs-channel.transactionCapacity = 10000
agent-server1.sinks.hdfs-sink.type = hdfs
agent-server1.sinks.hdfs-sink.hdfs.path = hdfs://bipcluster/tmp/flume/%Y%m%d
agent-server1.sinks.hdfs-sink.hdfs.rollInterval = 60
agent-server1.sinks.hdfs-sink.hdfs.rollSize = 0
agent-server1.sinks.hdfs-sink.hdfs.rollCount = 0
agent-server1.sinks.hdfs-sink.hdfs.threadsPoolSize = 10
agent-server1.sinks.hdfs-sink.hdfs.round = false
agent-server1.sinks.hdfs-sink.hdfs.roundValue = 30
agent-server1.sinks.hdfs-sink.hdfs.roundUnit = minute
agent-server1.sinks.hdfs-sink.hdfs.batchSize = 100
agent-server1.sinks.hdfs-sink.hdfs.fileType = DataStream
agent-server1.sinks.hdfs-sink.hdfs.writeFormat = Text
agent-server1.sinks.hdfs-sink.hdfs.callTimeout = 60000
agent-server1.sinks.hdfs-sink.hdfs.idleTimeout = 100
agent-server1.sinks.hdfs-sink.hdfs.filePrefix = ip
agent-server1.sinks.hdfs-sink.channel = hdfs-channel
agent-server1.sources.testtail.channels = hdfs-channel

在启动服务后,使用telnet进行测试,发现如下报错:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
14/03/24 18:03:07 ERROR hdfs.HDFSEventSink: process failed
java.lang.RuntimeException: Flume wasn't able to parse timestamp header in the event to resolve time based bucketing.
 Please check that you're correctly populating timestamp header (for example using TimestampInterceptor source interceptor).
        at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:160)
        at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:343)
        at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:392)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:662)
Caused by: java.lang.NumberFormatException: null
        at java.lang.Long.parseLong(Long.java:375)
        at java.lang.Long.valueOf(Long.java:525)
        at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:158)
        ... 5 more
14/03/24 18:03:07 ERROR flume.SinkRunner: Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: java.lang.RuntimeException: Flume wasn't able to parse timestamp header in the event to
resolve time based bucketing. Please check that you're correctly populating timestamp header (for example using TimestampInterceptor source interceptor).
        at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:461)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:662)
Caused by: java.lang.RuntimeException: Flume wasn't able to parse timestamp header in the event to resolve time based bucketing. Please check that you're correctly populating timestamp header (for example using TimestampInterceptor source interceptor).
        at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:160)
        at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:343)
        at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:392)
        ... 3 more
Caused by: java.lang.NumberFormatException: null
        at java.lang.Long.parseLong(Long.java:375)
        at java.lang.Long.valueOf(Long.java:525)
        at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:158)
        ... 5 more

从调用栈的信息来看,错误出在org.apache.flume.formatter.output.BucketPath类的replaceShorthand方法。
在org.apache.flume.sink.hdfs.HDFSEventSink类中,使用process方法来生成hdfs的url,其中主要是调用了BucketPath类的escapeString方法来进行字符的转换,并最终调用了replaceShorthand方法。
其中replaceShorthand方法的相关代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
  public static String replaceShorthand(char c, Map<String, String> headers,
      TimeZone timeZone, boolean needRounding, int unit, int roundDown) {
    String timestampHeader = headers.get("timestamp");
    long ts;
    try {
      ts = Long.valueOf(timestampHeader);
    catch (NumberFormatException e) {
      throw new RuntimeException("Flume wasn't able to parse timestamp header"
        " in the event to resolve time based bucketing. Please check that"
        " you're correctly populating timestamp header (for example using"
        " TimestampInterceptor source interceptor).", e);
    }
    if(needRounding){
      ts = roundDown(roundDown, unit, ts);
    }
........

从代码中可以看到,timestampHeader 的值如果取不到,在向ts赋值时就会报错。。
这其实是flume的一个bug,bug id:
https://issues.apache.org/jira/browse/FLUME-1419
解决方法有3个:
1.更改配置,更新hdfs文件的路径格式

1
agent-server1.sinks.hdfs-sink.hdfs.path = hdfs://bipcluster/tmp/flume

但是这样就不能按天来存放日志了
2.通过更改相关的代码
(patch:https://issues.apache.org/jira/secure/attachment/12538891/FLUME-1419.patch)
如果在headers中获取不到timestamp的值,就给它一个当前timestamp的值。
相关代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
     String timestampHeader = headers.get("timestamp");
     long ts;
     try {
      if (timestampHeader == null) {
        ts = System.currentTimeMillis();
      else {
        ts = Long.valueOf(timestampHeader);
      }
     } catch (NumberFormatException e) {
       throw new RuntimeException("Flume wasn't able to parse timestamp header"
         " in the event to resolve time based bucketing. Please check that"
         " you're correctly populating timestamp header (for example using"
                  " TimestampInterceptor source interceptor).", e);
}

3.为source定义基于timestamp的interceptors 
在配置中增加两行即可:

1
2
agent-server1.sources.testtail.interceptors = i1
agent-server1.sources.testtail.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder

一个技巧:
在debug flume的问题时,可以在flume的启动参数中设置把debug日志打到console中。

1
-Dflume.root.logger=DEBUG,console,LOGFILE

本文转自菜菜光 51CTO博客,原文链接:http://blog.51cto.com/caiguangguang/1384187,如需转载请自行联系原作者

flume bucketpath的bug一例相关推荐

  1. 浅谈android网络框架——以课程格子的bug为例

    大家好! 在使用课程格子过程中 ,发现如果网络断开,点击树洞秘密,课程格子会因为无法从网络上刷新数据而崩溃掉.今天借解决此bug的为例,浅谈android的网络框架.

  2. MySQL Bug一例-----ibuf cursor restoration fails

    产生原因: 1.开启change buffer(innodb_change_buffering) 2.对表进行大量delete 操作 3.对相同表进行truncate bug名称:ibuf curso ...

  3. VC6.0下调bug的流程

    1.       首先在要调试的项目下建立一个文件夹CurrentUse,把要修改的那个类拖到此文件夹下,免得查看其它类后再次查找,如果类文件很多的话,查找一个类文件会很耽误时间 2.       然 ...

  4. Flume日志收集系统架构详解--转

    2017-09-06朱洁大数据和云计算技术 任何一个生产系统在运行过程中都会产生大量的日志,日志往往隐藏了很多有价值的信息.在没有分析方法之前,这些日志存储一段时间后就会被清理.随着技术的发展和分析能 ...

  5. 这么简单的bug,你改了2天?

    大家好,我是Z哥. "这个 bug 的问题不是很明显吗?怎么这么久才搞定?" "就改一行代码,你怎么弄了这么久?" 我想上面的言语几乎每个程序员都听到过.特别是 ...

  6. 自定义Flume Sink:ElasticSearch Sink

    Flume Sink的目的是从Flume Channel中获取数据然后输出到存储或者其他Flume Source中.Flume Agent启动的时候,它会为每一个Sink都启动一个SinkRunner ...

  7. itest work(爱测试) 开源接口测试敏捷测试管理平台 9.5.0 发布,接口测试及脑图用例重大升级

    (一)itest work 简介 itest work (爱测试)  一站式工作站让测试变得简单.敏捷,"好用.好看,好敏捷" ,是itest wrok 追求的目标.itest w ...

  8. 魅族手机突然显示无服务器,魅族Flyme6是悟空请来的?Bug竟然有这么多?

    原标题:魅族Flyme6是悟空请来的?Bug竟然有这么多? 赶在2017元旦之前,魅族终于发布了第一批二十多款机型的公测版,将近一周的时间过去,魅族官网也已经全面放出了Flyme6.0(A/Y,分别代 ...

  9. day06:如何定位分析前后端bug(详细版)

    文章目录 一. 为什么要区分? 二.如何定位分析? 三.借助什么工具? 四.如何复现bug? 五.案例分析 一. 为什么要区分? 第一,前端bug提交给后端,后端bug提交给前端,不仅给开发双方带来了 ...

最新文章

  1. Android adb shell 命令
  2. php 多维数组 array sort 排序 :array_multisort
  3. BRD、MRD 和 PRD 之间的区别与联系有哪些?
  4. 中兴通讯遭大股东减持逾两千万股 盘中跌逾6%
  5. python3 selenium安装教程_Mac OS下搭建 python3+pycharm+selenium+Chrome环境
  6. 中国通风外墙系统市场趋势报告、技术动态创新及市场预测
  7. 【SQL】结构化查询语言
  8. 点击复制公众号按钮_96编辑器如何复制文章到公众号发布?
  9. java流 视频_java如何对视频文件处理?包括拉流推流视频截取等?
  10. SQL server 2005下载地址
  11. 用mapgis数据转成arcgis中shape格式的方法
  12. Word学习笔记:P12-合并打印信封与标签设定
  13. choco 使用详解
  14. 人的判断力受制于他的知识和经验:明月当空叫,黄犬卧花心
  15. 正则-完美的身份证以及真实姓名
  16. 微博博主侮辱女性 街猫koryili
  17. IOS下载资源zip到本地然后读取
  18. rx580和gtx1650哪个好
  19. 教你怎样来优化Apache服务器的性能
  20. 浅谈:智能化变电站在线监测系统

热门文章

  1. Linux文本处理必杀技之awk应用详解
  2. centos升级mysql到5.5
  3. [转]Android UI 自动化测试
  4. Android在xml中定义Shape
  5. 9102年webpack4搭建vue项目
  6. Tencent云联网灾备方案
  7. 将不确定变成确定~frameset页面不能正确加载
  8. ul ol li的序号编号样式
  9. Disruptor技术调研之配置参数一览
  10. ubuntu 安装mysql ,postgresql (转)