目前HDFS上日志一部分由MR清洗生成&二次计算,一部分直接从服务器离线上传,但在私有云环境下,离线日志的压缩上传可能会对服务造成性能影响,而且在很多日志已经实时传输到Kafka集群的情况下,考虑Kafka->Hdfs也不失为一条合理的路径。

1. Kafka-Flume-Hdfs

这种方法直接通过Flume-ng的Hdfs-Sink往Hdfs导数据,Hdfs-Sink用来将数据写入Hadoop分布式文件系统(HDFS)中。支持创建text和sequence文件及这2种文件类型的压缩;支持文件周期性滚动(就是关闭当前文件在建立一个新的),滚动可以基于时间、数据大小、事件数量;也支持通过event hearder属性timestamp或host分割数据。HDFS目录路径或文件名支持格式化封装,相应的封装串在Hdfs-Sink生成目录或文件时被恰当的替换。使用HDFSSink需要首先安装hadoop,Hdfs-Sink是通过hadoop jar和HDFS集群通信的。注意Hadoop版本需要支持sync()。具体配置类似:

dataAgent.channels.kafka-piwikGlobal.kafka.producer.type=sync
dataAgent.channels.kafka-piwikGlobal.topic=app_piwik
dataAgent.channels.kafka-piwikGlobal.groupId=AutoCollect-piwikGlobal-1
dataAgent.channels.kafka-piwikGlobal.zookeeperConnect=192.168.1.10:2181,192.168.1.11:2181
dataAgent.channels.kafka-piwikGlobal.brokerList=192.168.1.10:9092,192.168.1.11:9092
dataAgent.channels.kafka-piwikGlobal.is_avro_event=false
dataAgent.channels.kafka-piwikGlobal.transactionCapacity=100000
dataAgent.channels.kafka-piwikGlobal.capacity=6000000
dataAgent.channels.kafka-piwikGlobal.type=org.apache.flume.channel.kafka.KafkaChannel
dataAgent.channels.kafka-piwikGlobal.parseAsFlumeEvent=falsedataAgent.sinks.hdfs-piwikGlobal.channel=kafka-piwikGlobal
dataAgent.sinks.hdfs-piwikGlobal.type=hdfs
#使用gzip压缩算法
dataAgent.sinks.hdfs-piwikGlobal.hdfs.codeC=gzip
dataAgent.sinks.hdfs-piwikGlobal.hdfs.fileType=CompressedStream
#日志保存路径,这里按小时存放
dataAgent.sinks.hdfs-piwikGlobal.hdfs.path=hdfs://argo/data/logs/autoCollect/piwikGlobal/%Y-%m-%d/%H
#文件前缀,也可以使用封装串
dataAgent.sinks.hdfs-piwikGlobal.hdfs.filePrefix=piwikGlobal
#不按时间滚动
dataAgent.sinks.hdfs-piwikGlobal.hdfs.rollInterval=0
#不根据文件大小滚动
dataAgent.sinks.hdfs-piwikGlobal.hdfs.rollSize=0
#按事件条数滚动
dataAgent.sinks.hdfs-piwikGlobal.hdfs.rollCount=1000000
#hadoop集群响应时间较长时需要配置
dataAgent.sinks.hdfs-piwikGlobal.hdfs.callTimeout=40000
#100秒后这个文件还没有被写入数据,就会关闭它然后去掉.tmp,后续的events会新开一个.tmp文件来接收
dataAgent.sinks.hdfs-piwikGlobal.hdfs.idleTimeout=100
dataAgent.sinks.hdfs-piwikGlobal.hdfs.useLocalTimeStamp=true

这种方式在日志量大的情况下,需要启动多个Hdfs-Sink或多个Flume进程,甚至需要部署在多台机器上,不好管理,并且在特定需求下,还需要做定制开发。

2.Kafka-Storm-Hdfs

这种方法通过storm往hdfs写数据,可以做定制开发,可以根据日志量调整并发度,上下线方便,可根据Storm REST Api做监控报警。

官方源码:https://github.com/apache/storm/tree/master/external/storm-hdfs

主要的类为HdfsBolt和SequenceFileBolt,都在org.apache.storm.hdfs.bolt包中。HdfsBolt用来写text数据, SequenceFileBolt用来写二进制数据。

HdfsBolt的配置参数:

1、RecordFormat:定义字段分隔符,你可以使用换行符\n或者制表符\t;

2、SyncPolicy:定义每次写入的tuple的数量;

3、FileRotationPolicy:定义写入的hdfs文件的轮转策略,你可以以时间轮转(TimedRotationPolicy)、大小轮转(FileSizeRotationPolicy)、不轮转(NoRotationPolicy);

4、FileNameFormat:定义写入文件的路径(withPath)和文件名的前后缀(withPrefix、withExtension);

5、withFsUrl:定义hdfs的地址。

示例:

RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter("|");SyncPolicy syncPolicy = new CountSyncPolicy(1000);FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath("/data/logs");HdfsBolt bolt = new HdfsBolt().withFsUrl("hdfs://localhost:8020").withFileNameFormat(fileNameFormat).withRecordFormat(format).withRotationPolicy(rotationPolicy).withSyncPolicy(syncPolicy);

如果要连接开启了HA的Hadoop集群,可以改为withFsURL(“hdfs://nameserviceID”)。

nameserviceID可以在hdfs-site.xml中查到。

<property><name>dfs.nameservices</name><value>nameserviceID</value>
</property>

这里存在的问题是,一个线程只会写一个文件,不支持压缩存储,无法分目录,因此需要做一些修改。

1)Gzip压缩存储

this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig);
CompressionCodecFactory compressionCodecFactory = new CompressionCodecFactory(new Configuration());
CompressionCodec compressionCodec = compressionCodecFactory.getCodecByClassName("org.apache.hadoop.io.compress.GzipCodec");
FSDataOutputStream out = this.fs.create(new Path(parentPath, new Path(childStrPath)));
CompressionOutputStream compressionOutput = compressionCodec.createOutputStream(out, compressionCodec.createCompressor());
#写数据
compressionOutput.write(bulkStr.toString().getBytes());

Flush操作也需要做些修改,太过频繁会影响写入性能:

try {compressionOutput.flush();if (out instanceof HdfsDataOutputStream) {((HdfsDataOutputStream) out).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));} else {out.hsync();}
} catch (IOException e) {LOG.error("flush error:{}",e.getMessage());
}

如果worker异常终止,造成gzip文件非正常关闭,通过hdfs -text命令是可以正常查看的,但一般MR程序无法读取此类文件,指标不治本的方法,可以简单设置mapred.max.map.failures.percent来跳过异常文件,或者自己实现InputStream类。

2)分目录写入

比如对于接收到的每一条日志,需要解析时间或类型,按/type/day/hour的方式存储,这就会导致一个hdfsBolt线程需要打开多个不同目录下的文件进行写入。

#每个目录对应一个Path对象,以防重复创建
private Map<String, Path> parentPathObjMap = Maps.newHashMap();#每个目录对应一个CompressionOutputStream对象,判断日志需要写入哪一个目录,则获取相应对象写入
private Map<String, CompressionOutputStream> pathToCompWriter = Maps.newHashMap();#每个目录对应一个StringBuilder对象,积攒一批日志写入,以提高性能
private Map<String, StringBuilder> pathToCache = Maps.newHashMap();#每个目录对应一个Long对象,判断积攒日志量是否满足写入阈值
private Map<String, Long> pathToCacheLineNum = Maps.newHashMap();#每个目录对应一个文件轮转对象
private Map<String, FileRotationPolicy> fileRotationMap = Maps.newHashMap();#每个目录写入的日志字节数,用来判断是否轮转
private Map<String, Long> offsetMap = Maps.newHashMap();#每个目录上次写入的时间,超过一定时间没有数据写入,则关闭文件
private Map<String, Long> lastFlushTimeMap = Maps.newHashMap();

因为一个线程在一个目录下只会往一个文件写,因此这些Map的key值都为目录路径。

在程序运行过程本来将日志解析单独作为一个bolt,后来将其融入HdfsBolt,以配置正则表达式的方式,减少网络传输开销,来提高性能。

Kafka实时流数据经Storm至Hdfs相关推荐

  1. sparkStreaming:实时流数据详解

    目录 一.概述 二.wordCount示例 三.初始化StreamingContext 四.DStreams(离散数据流) 五.输入DStream和接收器 Basic sources File Str ...

  2. 利用SparkSQL(java版)将离线数据或实时流数据写入hive的用法及坑点

    1. 通常利用SparkSQL将离线或实时流数据的SparkRDD数据写入Hive,一般有两种方法.第一种是利用org.apache.spark.sql.types.StructType和org.ap ...

  3. 在线实时大数据平台Storm集群组件学习

    Hadoop常用于离线的复杂的大数据处理,Spark常用于离线的快速(轻量级)的大数据处理, Storm常用于在线的实时的大数据处理:这句话一定程度上反应了三套大数据平台的鲜明特征.Storm是一套实 ...

  4. 实时流处理框架Storm、Spark Streaming、Samza、Flink,孰优孰劣?!

    https://mp.weixin.qq.com/s?__biz=MzU1NDA4NjU2MA==&mid=2247486490&idx=1&sn=e25a05be8cf98c ...

  5. 在线实时大数据平台Storm并行和通信机制理解

    1.storm系统角色和应用组件基本理解: 和Hadoop一起理解,清晰点. 1)物理节点Nimubus,负责资源分配和任务调度: 2)物理节点Supervisor负责接受nimbus分配的任务,启动 ...

  6. 重磅开源 KSQL:用于 Apache Kafka 的流数据 SQL 引擎 2017.8.29

    Kafka 的作者 Neha Narkhede 在 Confluent 上发表了一篇博文,介绍了Kafka 新引入的KSQL 引擎--一个基于流的SQL.推出KSQL 是为了降低流式处理的门槛,为处理 ...

  7. 在线实时大数据平台Storm集成redis开发(分布锁)

    1.需求场景:spout从ftp列表中拿到未读取的文件读取并发射行到Bolt,bolt进行业务处理后提交下一Bolt入库.用redis主要是:保存文件列表对象,使用分布锁来同步互斥访问共享对象,使文件 ...

  8. 在线实时大数据平台Storm并行度试验

    集群模式试验:同一文件输入数据如何处理,数据变量共享 1)集群模式一个worker内一个spout一个Bolt jps:1个worker storm list:1个wokers,4个tasks 2)集 ...

  9. 在线实时大数据平台Storm输入源共享试验

    1.背景:topology程序提交集群模式运行试验,验证在同一文件输入源情况下,worker之间是否会重复输入处理,以及数据变量能否在不同worker之间共享,如果文件新增数据,topology会不会 ...

最新文章

  1. Python新工具:用三行代码提取PDF表格数据
  2. IC/FPGA校招笔试题分析(一)
  3. matplotlib.transforms
  4. linux软件管理(Vim编辑器使用) ——(七)
  5. java泛型约束_JAVA泛型 - 约束与局限性
  6. 信息学奥赛一本通(1209:分数求和)
  7. BAT文件执行完成后如何删除自身的解决办法
  8. nodejs服务器部署教程一
  9. 各大公司官网都有哪些显而易见的Bug?
  10. STM8S——watchdog(IWDG)
  11. Newtonsoft.Json序列化和反序列之javascriptConvert.SerializeObject,DeserializeObject,JsonWriter,JsonReader...
  12. vivo解bl锁_黔隆科技刷机教程酷派Y82820忘记密码刷机解锁降级救砖解屏幕锁账户锁教程...
  13. 基于C# Winform的酒店管理系统
  14. dns检测工具命令总结
  15. Recap Language Model (LM) -- 自然语言处理中的预训练,对于Bert、ELMO、GPT的一些思考
  16. 2019年安徽大学ACM/ICPC实验室新生赛题解
  17. 15 个 Python 开源项目,使用 PyQt 做小型桌面应用!
  18. 自动控制原理-频率特性 G(jw ) 定义
  19. 数论-Lucas(卢卡斯定理)
  20. APP运营如何实现流量变现,获取更高收益?

热门文章

  1. 怎么把不能编辑的qlv转换成mp4
  2. Stata画图——散点图与折线图
  3. php采集喜马拉雅,喜马拉雅数据 JSSDK API 接入 demo WEB版 标准登录 、免登陆
  4. Java虚拟机内存模型简单介绍
  5. 小票打印机打印出空白
  6. Delphi的编程语言Object Pascal(3)
  7. 《勋伯格和声学》读书笔记(十):减七和弦
  8. 怎么做精准引流?如何精准引流加粉?怎样引流被加精准粉?
  9. shell入门基础知识
  10. pfs游戏识别 穿越火线、apex、csgo、吃鸡