flume bucketpath的bug一例
今天在做flume+kerberos写入hdfs时遇到的问题。
测试的配置文件:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
agent-server1.sources= testtail
agent-server1.sinks = hdfs-sink
agent-server1.channels= hdfs-channel
agent-server1.sources.testtail. type = netcat
agent-server1.sources.testtail.bind = localhost
agent-server1.sources.testtail.port = 9999
agent-server1.sinks.hdfs-sink.hdfs.kerberosPrincipal = hdfs /_HOST @KERBEROS_HADOOP
agent-server1.sinks.hdfs-sink.hdfs.kerberosKeytab = /home/vipshop/conf/hdfs .keytab
agent-server1.channels.hdfs-channel. type = memory
agent-server1.channels.hdfs-channel.capacity = 200000000
agent-server1.channels.hdfs-channel.transactionCapacity = 10000
agent-server1.sinks.hdfs-sink. type = hdfs
agent-server1.sinks.hdfs-sink.hdfs.path = hdfs: //bipcluster/tmp/flume/ %Y%m%d
agent-server1.sinks.hdfs-sink.hdfs.rollInterval = 60
agent-server1.sinks.hdfs-sink.hdfs.rollSize = 0
agent-server1.sinks.hdfs-sink.hdfs.rollCount = 0
agent-server1.sinks.hdfs-sink.hdfs.threadsPoolSize = 10
agent-server1.sinks.hdfs-sink.hdfs.round = false
agent-server1.sinks.hdfs-sink.hdfs.roundValue = 30
agent-server1.sinks.hdfs-sink.hdfs.roundUnit = minute
agent-server1.sinks.hdfs-sink.hdfs.batchSize = 100
agent-server1.sinks.hdfs-sink.hdfs.fileType = DataStream
agent-server1.sinks.hdfs-sink.hdfs.writeFormat = Text
agent-server1.sinks.hdfs-sink.hdfs.callTimeout = 60000
agent-server1.sinks.hdfs-sink.hdfs.idleTimeout = 100
agent-server1.sinks.hdfs-sink.hdfs.filePrefix = ip
agent-server1.sinks.hdfs-sink.channel = hdfs-channel
agent-server1.sources.testtail.channels = hdfs-channel
|
在启动服务后,使用telnet进行测试,发现如下报错:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
14 / 03 / 24 18 : 03 : 07 ERROR hdfs.HDFSEventSink: process failed
java.lang.RuntimeException: Flume wasn't able to parse timestamp header in the event to resolve time based bucketing.
Please check that you're correctly populating timestamp header ( for example using TimestampInterceptor source interceptor).
at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java: 160 )
at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java: 343 )
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java: 392 )
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java: 68 )
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java: 147 )
at java.lang.Thread.run(Thread.java: 662 )
Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java: 375 )
at java.lang.Long.valueOf(Long.java: 525 )
at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java: 158 )
... 5 more
14 / 03 / 24 18 : 03 : 07 ERROR flume.SinkRunner: Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: java.lang.RuntimeException: Flume wasn't able to parse timestamp header in the event to
resolve time based bucketing. Please check that you're correctly populating timestamp header ( for example using TimestampInterceptor source interceptor).
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java: 461 )
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java: 68 )
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java: 147 )
at java.lang.Thread.run(Thread.java: 662 )
Caused by: java.lang.RuntimeException: Flume wasn 't able to parse timestamp header in the event to resolve time based bucketing. Please check that you' re correctly populating timestamp header ( for example using TimestampInterceptor source interceptor).
at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java: 160 )
at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java: 343 )
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java: 392 )
... 3 more
Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java: 375 )
at java.lang.Long.valueOf(Long.java: 525 )
at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java: 158 )
... 5 more
|
从调用栈的信息来看,错误出在org.apache.flume.formatter.output.BucketPath类的replaceShorthand方法。
在org.apache.flume.sink.hdfs.HDFSEventSink类中,使用process方法来生成hdfs的url,其中主要是调用了BucketPath类的escapeString方法来进行字符的转换,并最终调用了replaceShorthand方法。
其中replaceShorthand方法的相关代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
public static String replaceShorthand( char c, Map<String, String> headers,
TimeZone timeZone, boolean needRounding, int unit, int roundDown) {
String timestampHeader = headers.get( "timestamp" );
long ts;
try {
ts = Long.valueOf(timestampHeader);
} catch (NumberFormatException e) {
throw new RuntimeException( "Flume wasn't able to parse timestamp header"
+ " in the event to resolve time based bucketing. Please check that"
+ " you're correctly populating timestamp header (for example using"
+ " TimestampInterceptor source interceptor)." , e);
}
if (needRounding){
ts = roundDown(roundDown, unit, ts);
}
........
|
从代码中可以看到,timestampHeader 的值如果取不到,在向ts赋值时就会报错。。
这其实是flume的一个bug,bug id:
https://issues.apache.org/jira/browse/FLUME-1419
解决方法有3个:
1.更改配置,更新hdfs文件的路径格式
1
|
agent-server1.sinks.hdfs-sink.hdfs.path = hdfs: //bipcluster/tmp/flume
|
但是这样就不能按天来存放日志了
2.通过更改相关的代码
(patch:https://issues.apache.org/jira/secure/attachment/12538891/FLUME-1419.patch)
如果在headers中获取不到timestamp的值,就给它一个当前timestamp的值。
相关代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
String timestampHeader = headers.get( "timestamp" );
long ts;
try {
if (timestampHeader == null) {
ts = System.currentTimeMillis();
} else {
ts = Long.valueOf(timestampHeader);
}
} catch (NumberFormatException e) {
throw new RuntimeException( "Flume wasn't able to parse timestamp header"
+ " in the event to resolve time based bucketing. Please check that"
+ " you're correctly populating timestamp header (for example using"
+ " TimestampInterceptor source interceptor)." , e);
}
|
3.为source定义基于timestamp的interceptors
在配置中增加两行即可:
1
2
|
agent-server1.sources.testtail.interceptors = i1
agent-server1.sources.testtail.interceptors.i1. type = org.apache.flume.interceptor.TimestampInterceptor$Builder
|
一个技巧:
在debug flume的问题时,可以在flume的启动参数中设置把debug日志打到console中。
1
|
-Dflume.root.logger=DEBUG,console,LOGFILE
|
flume bucketpath的bug一例相关推荐
- 浅谈android网络框架——以课程格子的bug为例
大家好! 在使用课程格子过程中 ,发现如果网络断开,点击树洞秘密,课程格子会因为无法从网络上刷新数据而崩溃掉.今天借解决此bug的为例,浅谈android的网络框架.
- MySQL Bug一例-----ibuf cursor restoration fails
产生原因: 1.开启change buffer(innodb_change_buffering) 2.对表进行大量delete 操作 3.对相同表进行truncate bug名称:ibuf curso ...
- VC6.0下调bug的流程
1. 首先在要调试的项目下建立一个文件夹CurrentUse,把要修改的那个类拖到此文件夹下,免得查看其它类后再次查找,如果类文件很多的话,查找一个类文件会很耽误时间 2. 然 ...
- Flume日志收集系统架构详解--转
2017-09-06朱洁大数据和云计算技术 任何一个生产系统在运行过程中都会产生大量的日志,日志往往隐藏了很多有价值的信息.在没有分析方法之前,这些日志存储一段时间后就会被清理.随着技术的发展和分析能 ...
- 这么简单的bug,你改了2天?
大家好,我是Z哥. "这个 bug 的问题不是很明显吗?怎么这么久才搞定?" "就改一行代码,你怎么弄了这么久?" 我想上面的言语几乎每个程序员都听到过.特别是 ...
- 自定义Flume Sink:ElasticSearch Sink
Flume Sink的目的是从Flume Channel中获取数据然后输出到存储或者其他Flume Source中.Flume Agent启动的时候,它会为每一个Sink都启动一个SinkRunner ...
- itest work(爱测试) 开源接口测试敏捷测试管理平台 9.5.0 发布,接口测试及脑图用例重大升级
(一)itest work 简介 itest work (爱测试) 一站式工作站让测试变得简单.敏捷,"好用.好看,好敏捷" ,是itest wrok 追求的目标.itest w ...
- 魅族手机突然显示无服务器,魅族Flyme6是悟空请来的?Bug竟然有这么多?
原标题:魅族Flyme6是悟空请来的?Bug竟然有这么多? 赶在2017元旦之前,魅族终于发布了第一批二十多款机型的公测版,将近一周的时间过去,魅族官网也已经全面放出了Flyme6.0(A/Y,分别代 ...
- day06:如何定位分析前后端bug(详细版)
文章目录 一. 为什么要区分? 二.如何定位分析? 三.借助什么工具? 四.如何复现bug? 五.案例分析 一. 为什么要区分? 第一,前端bug提交给后端,后端bug提交给前端,不仅给开发双方带来了 ...
最新文章
- Android adb shell 命令
- php 多维数组 array sort 排序 :array_multisort
- BRD、MRD 和 PRD 之间的区别与联系有哪些?
- 中兴通讯遭大股东减持逾两千万股 盘中跌逾6%
- python3 selenium安装教程_Mac OS下搭建 python3+pycharm+selenium+Chrome环境
- 中国通风外墙系统市场趋势报告、技术动态创新及市场预测
- 【SQL】结构化查询语言
- 点击复制公众号按钮_96编辑器如何复制文章到公众号发布?
- java流 视频_java如何对视频文件处理?包括拉流推流视频截取等?
- SQL server 2005下载地址
- 用mapgis数据转成arcgis中shape格式的方法
- Word学习笔记:P12-合并打印信封与标签设定
- choco 使用详解
- 人的判断力受制于他的知识和经验:明月当空叫,黄犬卧花心
- 正则-完美的身份证以及真实姓名
- 微博博主侮辱女性 街猫koryili
- IOS下载资源zip到本地然后读取
- rx580和gtx1650哪个好
- 教你怎样来优化Apache服务器的性能
- 浅谈:智能化变电站在线监测系统