今天看到有小伙伴在问,就想着自己实现一下。

问题: Flink FileSink根据输入数据指定输出位置,比如讲对应日期的数据输出到对应目录

输入数据:
20190716 输出到路径 20190716
20190717 输出到路径 20190717
20190718 输出到路径 20190718

目前flink 对与输出到文件有两种实现(write 算子不算,只能指定目录):Rolling File Sink 和 Streaming File Sink,

Rolling File Sink 的实现就是 BucketingSink,使用也很简单,直接指定路径就可以了,也可以设置如:目录名称格式(按时间格式滚动),输出文件格式,文件大小、滚动间隔、文件前缀、后缀一类的

// the SequenceFileWriter only works with Flink Tuples
import org.apache.flink.api.java.tuple.Tuple2
val input: DataStream[Tuple2[A, B]] = ... val sink = new BucketingSink[Tuple2[IntWritable, Text]]("/base/path")
sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm", ZoneId.of("America/Los_Angeles")))
sink.setWriter(new SequenceFileWriter[IntWritable, Text])
sink.setBatchSize(1024 * 1024 * 400) // this is 400 MB,
sink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins

input.addSink(sink)

当然,如果是这么简单,就不会有这篇博客了,下面进入主题

--------------------------------------

默认的 DateTimeBucketer 只能根据时间指定文件名的滚动是规则,没办法根据数据指定文件的输出位置,这需要实现 BasePathBucketer 自定义输出路径

实现如下:

import java.io.File
import org.apache.flink.streaming.connectors.fs.Clock
import org.apache.flink.streaming.connectors.fs.bucketing.BasePathBucketer
import org.apache.hadoop.fs.Path/*** 根据实际数据返回数据输出的路径*/
class DayBasePathBucketer extends BasePathBucketer[String]{/*** 返回路径* @param clock* @param basePath* @param element* @return*/override def getBucketPath(clock: Clock, basePath: Path, element: String): Path = {// yyyyMMddval day = element.substring(1, 9)new Path(basePath + File.separator + day)}
}

调用如下:

import java.io.File
import java.text.SimpleDateFormat
import com.venn.index.conf.Common
import org.apache.flink.formats.json.JsonNodeDeserializationSchema
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.fs.StringWriter
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.api.scala._/*** 使用BucketingSink 实现 根据‘数据’自定义输出目录*/
object RollingFileSinkDemo {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val sdf = new SimpleDateFormat("yyyyMMddHHmmss")val source = new FlinkKafkaConsumer[ObjectNode]("roll_file_sink", new JsonNodeDeserializationSchema, Common.getProp)val sink = new BucketingSink[String]("D:\\idea_out\\rollfilesink")sink.setBucketer(new DayBasePathBucketer)sink.setWriter(new StringWriter[String])sink.setBatchSize(1024 * 1024 * 400) // this is 400 MB,//    sink.setBatchRolloverInterval(24 * 60 * 60 * 1000) // this is 24 hour
//    sink.setInProgressPrefix("inProcessPre")//    sink.setPendingPrefix("pendingpre")//    sink.setPartPrefix("partPre")
env.addSource(source).assignAscendingTimestamps(json => {sdf.parse(json.get("date").asText()).getTime}).map(json => {json.get("date") + "-" + json.toString // 将日期拼接到前面,方便后面使用}).addSink(sink)env.execute("rollingFileSink")}}

测试数据如下:

{"id" : 1, "name" : "venn1563288621091", "date" : "20190716230020"}
{"id" : 2, "name" : "venn1563288621310", "date" : "20190716231020"}
...
{"id" : 263, "name" : "venn1563288648926", "date" : "20190718184020"}
{"id" : 264, "name" : "venn1563288649029", "date" : "20190718185020"}
{"id" : 265, "name" : "venn1563288649132", "date" : "20190718190020"}

测试结果如下:

可以看到,当天的数据都输出到当天对应的目录中。

遇到个问题:

这里有个问题,因为重写了BasePathBucketer,自定义了输出文件,所有会同时打开多个输出文件,带来文件刷新的问题,在当前文件写完后(这里的表现是:当天的数据以及全部流过,下一天的文件以及开始写了),会发现当天的文件中的数据不全,因为数据还没有全部刷到文件,这个时候下一个文件又开始写了,会发现上一个文件还没刷完

猜想:

猜想:每个文件都有个输出缓冲,上一个文件最后一点数据还在缓冲区,下一个文件又使用新的缓冲区,没办法刷到上一个文件的数据,只有等缓冲区数据满、超时一类的操作触发刷写 ??

验证:

源码BucketingSink.closePartFilesByTime 默认每60秒或大于滚动时间间隔(batchRolloverInterval)(系统时间) 将当前park文件,将状态从 in-process 修改为 pending,随后关闭当前的part 文件,数据刷到磁盘

代码如下:

private void closePartFilesByTime(long currentProcessingTime) throws Exception {synchronized (state.bucketStates) {for (Map.Entry<String, BucketState<T>> entry : state.bucketStates.entrySet()) {if ((entry.getValue().lastWrittenToTime < currentProcessingTime - inactiveBucketThreshold)|| (entry.getValue().creationTime < currentProcessingTime - batchRolloverInterval)) {LOG.debug("BucketingSink {} closing bucket due to inactivity of over {} ms.",getRuntimeContext().getIndexOfThisSubtask(), inactiveBucketThreshold);closeCurrentPartFile(entry.getValue());}}}}

下篇: Flink FileSink 自定义输出路径——StreamingFileSink、BucketingSink 和 StreamingFileSink简单比较

搞定

转载于:https://www.cnblogs.com/Springmoon-venn/p/11198154.html

Flink FileSink 自定义输出路径——BucketingSink相关推荐

  1. MyBatis-Plus代码生成器,如何自定义代码生成路径

    1. 导入代码生成依赖 主要依赖于mybatis-plus-generator和模板引擎,这里使用的是freemarker,也可以使用其他模板引擎. <dependency><gro ...

  2. log4j日志改json格式自定义输出内容源码及说明

    log4j日志改json格式自定义输出内容源码及说明 最近项目需要接入日志管理平台,要求需要将项目log4j日志格式改为json,没系研究过log4j的我一时间还真被难住了,功夫不负有心人最后还是被我 ...

  3. 最短路径 输出路径 Dijkstra算法

    这个问题难受了半天终于搞懂了,来给分享一下 先弄个例子看看: B.wzy的大冒险--出发咯QAQ wzy踏上了冒险的旅程. 现在他从地精手里买了一份地图,地图上有n个城镇. 他从第一个城镇出发,走向( ...

  4. 2021年大数据Spark(四十一):SparkStreaming实战案例六 自定义输出 foreachRDD

    目录 SparkStreaming实战案例六 自定义输出-foreachRDD 需求 注意: 代码实现 SparkStreaming实战案例六 自定义输出-foreachRDD 需求 对上述案例的结果 ...

  5. Hadoop概念学习系列之为什么hadoop/spark执行作业时,输出路径必须要不存在?(三十九)...

    很多人只会,但没深入体会和想为什么要这样? 拿Hadoop来说,当然,spark也一样的道理. 输出路径由Hadoop自己创建,实际的结果文件遵守part-nnnn的约定. 如何指定一个已有目录作为H ...

  6. Google Test(GTest)使用方法和源码解析——自定义输出技术的分析和应用

    在介绍自定义输出机制之前,我们先了解下AssertResult类型函数.(转载请指明出于breaksoftware的csdn博客) 在函数中使用AssertionResult AssertionRes ...

  7. tomcat 设定自定义图片路径

    1.问题 平常图片路径都是在项目目录下存放,都是ip地址+端口号+项目名+图片路径,因为项目需要要把图片从tomcat中分离出来,并且设置可以通过自定义地址访问自定义图片路径. 2.解决 在 tomc ...

  8. C++builder XE 安装控件 及输出路径

    C++builder XE 安装控件 与cb6不一样了,和delphi可以共用一个包. 启动RAD Studio.打开包文件. Project>Options>Delphi Compile ...

  9. win10子系统ubuntu文件夹位置_win10子系统(WSL)自定义安装路径

    1.下载linux安装包: 下载地址:https://docs.microsoft.com/en-us/windows/wsl/install-manual 官方提供的离线安装包有 Ubuntu 18 ...

最新文章

  1. CGpoint,CGSize,CGRect,NSRange
  2. python3 str bytes 字符串 字节 互相转换
  3. Windows下VS2015 MPI编译64位Boost1.64
  4. Django之路--第一篇
  5. 套用表格格式转化为普通区域_学会修改表格格式,让你的报表更美观
  6. 多角度分析,通讯时序数据的预测与异常检测挑战
  7. hive外部表/内部表路径知识点
  8. printf()的冷门用法+格子中输出--蓝桥杯
  9. idea中 Application Server not specified
  10. 使用java自带工具监控jvm运行状态
  11. iOS开发,自定义字体,字体名称查询
  12. html图片轮播15个自动,15个超强的jQuery/HTML5图片轮播插件
  13. java web 下载文件 浏览器弹出下载框
  14. Hadoop_day01_大数据的概念及磁盘存储
  15. android 同步短信到iphone,教你把短信从安卓同步到iPhone
  16. 拼图游戏(8 puzzle)
  17. linux系统pam配置文件,【PAM】 How to Configure and Use PAM in Linux?
  18. mysql5.7.10 64_mysql5.7.10win764安装
  19. ADX集团宣布2020年上市计划
  20. 企业数字化建设的三个典型误区

热门文章

  1. 广度优先搜索算法(Breath-first Search)是如何搜索一张图的?
  2. 2012总结--目录
  3. 理解UIApplication
  4. Linux下Gcc生成和使用静态库和动态库详解(转)
  5. 的稳定性 linux_Linux系统KDE桌面,打造最接近Windows的界面环境!不用才后悔
  6. [note]标点符号和数学符号所对应的英文
  7. [Java] 蓝桥杯ALGO-148 算法训练 5-1最小公倍数
  8. 蓝桥杯 ALGO-61 算法训练 奇偶判断
  9. 蓝桥杯 ADV-194 算法提高 盾神与积木游戏 java版
  10. 蓝桥杯 ADV-108算法提高 分数统计