Flink FileSink 自定义输出路径——BucketingSink
今天看到有小伙伴在问,就想着自己实现一下。
问题: Flink FileSink根据输入数据指定输出位置,比如讲对应日期的数据输出到对应目录
输入数据: 20190716 输出到路径 20190716
20190717 输出到路径 20190717
20190718 输出到路径 20190718
目前flink 对与输出到文件有两种实现(write 算子不算,只能指定目录):Rolling File Sink 和 Streaming File Sink,
Rolling File Sink 的实现就是 BucketingSink,使用也很简单,直接指定路径就可以了,
也可以设置如:目录名称格式(按时间格式滚动),输出文件格式,文件大小、滚动间隔、文件前缀、后缀一类的
// the SequenceFileWriter only works with Flink Tuples import org.apache.flink.api.java.tuple.Tuple2 val input: DataStream[Tuple2[A, B]] = ... val sink = new BucketingSink[Tuple2[IntWritable, Text]]("/base/path") sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm", ZoneId.of("America/Los_Angeles"))) sink.setWriter(new SequenceFileWriter[IntWritable, Text]) sink.setBatchSize(1024 * 1024 * 400) // this is 400 MB, sink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins input.addSink(sink)
当然,如果是这么简单,就不会有这篇博客了,下面进入主题
--------------------------------------
默认的 DateTimeBucketer 只能根据时间指定文件名的滚动是规则,没办法根据数据指定文件的输出位置,这需要实现 BasePathBucketer 自定义输出路径
实现如下:
import java.io.File import org.apache.flink.streaming.connectors.fs.Clock import org.apache.flink.streaming.connectors.fs.bucketing.BasePathBucketer import org.apache.hadoop.fs.Path/*** 根据实际数据返回数据输出的路径*/ class DayBasePathBucketer extends BasePathBucketer[String]{/*** 返回路径* @param clock* @param basePath* @param element* @return*/override def getBucketPath(clock: Clock, basePath: Path, element: String): Path = {// yyyyMMddval day = element.substring(1, 9)new Path(basePath + File.separator + day)} }
调用如下:
import java.io.File import java.text.SimpleDateFormat import com.venn.index.conf.Common import org.apache.flink.formats.json.JsonNodeDeserializationSchema import org.apache.flink.runtime.state.filesystem.FsStateBackend import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.fs.StringWriter import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.api.scala._/*** 使用BucketingSink 实现 根据‘数据’自定义输出目录*/ object RollingFileSinkDemo {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val sdf = new SimpleDateFormat("yyyyMMddHHmmss")val source = new FlinkKafkaConsumer[ObjectNode]("roll_file_sink", new JsonNodeDeserializationSchema, Common.getProp)val sink = new BucketingSink[String]("D:\\idea_out\\rollfilesink")sink.setBucketer(new DayBasePathBucketer)sink.setWriter(new StringWriter[String])sink.setBatchSize(1024 * 1024 * 400) // this is 400 MB,// sink.setBatchRolloverInterval(24 * 60 * 60 * 1000) // this is 24 hour
// sink.setInProgressPrefix("inProcessPre")// sink.setPendingPrefix("pendingpre")// sink.setPartPrefix("partPre")
env.addSource(source).assignAscendingTimestamps(json => {sdf.parse(json.get("date").asText()).getTime}).map(json => {json.get("date") + "-" + json.toString // 将日期拼接到前面,方便后面使用}).addSink(sink)env.execute("rollingFileSink")}}
测试数据如下:
{"id" : 1, "name" : "venn1563288621091", "date" : "20190716230020"} {"id" : 2, "name" : "venn1563288621310", "date" : "20190716231020"} ... {"id" : 263, "name" : "venn1563288648926", "date" : "20190718184020"} {"id" : 264, "name" : "venn1563288649029", "date" : "20190718185020"} {"id" : 265, "name" : "venn1563288649132", "date" : "20190718190020"}
测试结果如下:
可以看到,当天的数据都输出到当天对应的目录中。
遇到个问题:
这里有个问题,因为重写了BasePathBucketer,自定义了输出文件,所有会同时打开多个输出文件,带来文件刷新的问题,在当前文件写完后(这里的表现是:当天的数据以及全部流过,下一天的文件以及开始写了),会发现当天的文件中的数据不全,因为数据还没有全部刷到文件,这个时候下一个文件又开始写了,会发现上一个文件还没刷完
猜想:
猜想:每个文件都有个输出缓冲,上一个文件最后一点数据还在缓冲区,下一个文件又使用新的缓冲区,没办法刷到上一个文件的数据,只有等缓冲区数据满、超时一类的操作触发刷写 ??
验证:
源码BucketingSink.closePartFilesByTime 默认每60秒或大于滚动时间间隔(batchRolloverInterval)(系统时间) 将当前park文件,将状态从 in-process 修改为 pending,随后关闭当前的part 文件,数据刷到磁盘
代码如下:
private void closePartFilesByTime(long currentProcessingTime) throws Exception {synchronized (state.bucketStates) {for (Map.Entry<String, BucketState<T>> entry : state.bucketStates.entrySet()) {if ((entry.getValue().lastWrittenToTime < currentProcessingTime - inactiveBucketThreshold)|| (entry.getValue().creationTime < currentProcessingTime - batchRolloverInterval)) {LOG.debug("BucketingSink {} closing bucket due to inactivity of over {} ms.",getRuntimeContext().getIndexOfThisSubtask(), inactiveBucketThreshold);closeCurrentPartFile(entry.getValue());}}}}
下篇: Flink FileSink 自定义输出路径——StreamingFileSink、BucketingSink 和 StreamingFileSink简单比较
搞定
转载于:https://www.cnblogs.com/Springmoon-venn/p/11198154.html
Flink FileSink 自定义输出路径——BucketingSink相关推荐
- MyBatis-Plus代码生成器,如何自定义代码生成路径
1. 导入代码生成依赖 主要依赖于mybatis-plus-generator和模板引擎,这里使用的是freemarker,也可以使用其他模板引擎. <dependency><gro ...
- log4j日志改json格式自定义输出内容源码及说明
log4j日志改json格式自定义输出内容源码及说明 最近项目需要接入日志管理平台,要求需要将项目log4j日志格式改为json,没系研究过log4j的我一时间还真被难住了,功夫不负有心人最后还是被我 ...
- 最短路径 输出路径 Dijkstra算法
这个问题难受了半天终于搞懂了,来给分享一下 先弄个例子看看: B.wzy的大冒险--出发咯QAQ wzy踏上了冒险的旅程. 现在他从地精手里买了一份地图,地图上有n个城镇. 他从第一个城镇出发,走向( ...
- 2021年大数据Spark(四十一):SparkStreaming实战案例六 自定义输出 foreachRDD
目录 SparkStreaming实战案例六 自定义输出-foreachRDD 需求 注意: 代码实现 SparkStreaming实战案例六 自定义输出-foreachRDD 需求 对上述案例的结果 ...
- Hadoop概念学习系列之为什么hadoop/spark执行作业时,输出路径必须要不存在?(三十九)...
很多人只会,但没深入体会和想为什么要这样? 拿Hadoop来说,当然,spark也一样的道理. 输出路径由Hadoop自己创建,实际的结果文件遵守part-nnnn的约定. 如何指定一个已有目录作为H ...
- Google Test(GTest)使用方法和源码解析——自定义输出技术的分析和应用
在介绍自定义输出机制之前,我们先了解下AssertResult类型函数.(转载请指明出于breaksoftware的csdn博客) 在函数中使用AssertionResult AssertionRes ...
- tomcat 设定自定义图片路径
1.问题 平常图片路径都是在项目目录下存放,都是ip地址+端口号+项目名+图片路径,因为项目需要要把图片从tomcat中分离出来,并且设置可以通过自定义地址访问自定义图片路径. 2.解决 在 tomc ...
- C++builder XE 安装控件 及输出路径
C++builder XE 安装控件 与cb6不一样了,和delphi可以共用一个包. 启动RAD Studio.打开包文件. Project>Options>Delphi Compile ...
- win10子系统ubuntu文件夹位置_win10子系统(WSL)自定义安装路径
1.下载linux安装包: 下载地址:https://docs.microsoft.com/en-us/windows/wsl/install-manual 官方提供的离线安装包有 Ubuntu 18 ...
最新文章
- CGpoint,CGSize,CGRect,NSRange
- python3 str bytes 字符串 字节 互相转换
- Windows下VS2015 MPI编译64位Boost1.64
- Django之路--第一篇
- 套用表格格式转化为普通区域_学会修改表格格式,让你的报表更美观
- 多角度分析,通讯时序数据的预测与异常检测挑战
- hive外部表/内部表路径知识点
- printf()的冷门用法+格子中输出--蓝桥杯
- idea中 Application Server not specified
- 使用java自带工具监控jvm运行状态
- iOS开发,自定义字体,字体名称查询
- html图片轮播15个自动,15个超强的jQuery/HTML5图片轮播插件
- java web 下载文件 浏览器弹出下载框
- Hadoop_day01_大数据的概念及磁盘存储
- android 同步短信到iphone,教你把短信从安卓同步到iPhone
- 拼图游戏(8 puzzle)
- linux系统pam配置文件,【PAM】 How to Configure and Use PAM in Linux?
- mysql5.7.10 64_mysql5.7.10win764安装
- ADX集团宣布2020年上市计划
- 企业数字化建设的三个典型误区
热门文章
- 广度优先搜索算法(Breath-first Search)是如何搜索一张图的?
- 2012总结--目录
- 理解UIApplication
- Linux下Gcc生成和使用静态库和动态库详解(转)
- 的稳定性 linux_Linux系统KDE桌面,打造最接近Windows的界面环境!不用才后悔
- [note]标点符号和数学符号所对应的英文
- [Java] 蓝桥杯ALGO-148 算法训练 5-1最小公倍数
- 蓝桥杯 ALGO-61 算法训练 奇偶判断
- 蓝桥杯 ADV-194 算法提高 盾神与积木游戏 java版
- 蓝桥杯 ADV-108算法提高 分数统计