• 滚动策略
  • 分区提交
    • 分区提交触发器
    • 分区时间的抽取
    • 分区提交策略
  • 完整示例
    • 定义实体类
    • 自定义source
    • 写入file

flink提供了一个file system connector,可以使用DDL创建一个table,然后使用sql的方法将数据写入hdfs、local file等文件系统,支持的写入格式包括json、csv、avro、parquet、orc。

一个最简单的DDL如下:

CREATE TABLE fs_table ( user_id STRING, order_amount DOUBLE, dt STRING, h string, m string
) PARTITIONED BY (dt,h,m) WITH ( 'connector'='filesystem', 'path'='file:///tmp/abc', 'format'='orc' );

下面我们简单的介绍一下相关的概念和如何使用。

滚动策略

  • 在写入列格式(比如parquet、orc)的时候,上述的配置和checkpoint的间隔一起来控制滚动策略,也就是说sink.rolling-policy.file-size、sink.rolling-policy.rollover-interval、checkpoint间隔,这三个选项,只要有一个条件达到了,然后就会触发分区文件的滚动,结束上一个文件的写入,生成新文件。
  • 对于写入行格式的数据,比如json、csv,主要是靠sink.rolling-policy.file-size、sink.rolling-policy.rollover-interval,也就是文件的大小和时间来控制写入数据的滚动策略.

分区提交

在往一个分区写完了数据之后,我们希望做一些工作来通知下游。比如在分区目录写一个SUCCESS文件,或者是对于hive来说,去更新metastore的数据,自动刷新一下分区等等。
分区的提交主要依赖于触发器和提交的策略:

  • 触发器:即什么时候触发分区的提交,
  • 提交策略:也就是分区写完之后我们做什么,目前系统提供了两种内置策略:1.往分区目录写一个空SUCCESS文件;2.更新元数据.

分区提交触发器

  1. process-time. 这种提交方式依赖于系统的时间,一旦遇到数据延迟等情况,会造成分区和分区的数据不一致。
  2. partition-time :这种情况需要从分区字段里抽取出来相应的pattern,具体可参考下一个段落分区的抽取。
  3. sink.partition-commit.delay:一旦这个数值设置不为0,则在process-time情况下,当系统时间大于分区创建时间加上delay延迟,会触发分区提交; 如果是在partition-time 情况下,则需要水印大于分区创建时间加上delay时间,会触发分区提交.

第一个参数process-time、partition-time,我们不用做过多的解释,就类似于flink中的processtime和eventtime。

第二个参数sink.partition-commit.delay我们用实际案例解释下: 比如我们配置的是分区是/yyyy-MM-dd/HH/,写入的是ORC列格式,checkpoint配置的间隔是一分钟,也就是默认情况下会每分钟生成一个orc文件,最终会在每个分区(/yyyy-MM-dd/HH/)下面生成60个orc文件。

比如当前系统正在写入/day=2020-07-06/h=10/分区的数据,那么这个分区的创建时间是2020-07-06 10:00:00,如果这个delay配置采用的是默认值,也就是0s,这个时候当写完了一个ORC文件,也就是2020-07-06 10:01:00分钟的时候,就会触发分区提交,比如更新hive的元数据,这个时候我们去查询hive就能查到刚刚写入的文件;如果我们想/day=2020-07-06/h=10/这个分区的60个文件都写完了再更新分区,那么我们可以将这个delay设置成 1h,也就是等到2020-07-06 11:00:00的时候才会触发分区提交,我们才会看到/2020-07-06/10/分区下面的所有数据

分区时间的抽取

自定义抽取分区时间的话,需要实现PartitionTimeExtractor接口:

public interface PartitionTimeExtractor extends Serializable {String DEFAULT = "default";String CUSTOM = "custom";/*** Extract time from partition keys and values.*/LocalDateTime extract(List<String> partitionKeys, List<String> partitionValues);...................
}

分区提交策略

定义了分区提交的策略,也就是写完分区数据之后做什么事情,目前系统提供了以下行为:

  • metastore,只支持hive table,也就是写完数据之后,更新hive的元数据.
  • success file: 写完数据,往分区文件写一个success file.
  • 自定义

完整示例

定义实体类

public static class UserInfo implements java.io.Serializable{private String userId;private Double amount;private Timestamp ts;public String getUserId(){return userId;}public void setUserId(String userId){this.userId = userId;}public Double getAmount(){return amount;}public void setAmount(Double amount){this.amount = amount;}public Timestamp getTs(){return ts;}public void setTs(Timestamp ts){this.ts = ts;}}

自定义source

public static class MySource implements SourceFunction<UserInfo>{String userids[] = {"4760858d-2bec-483c-a535-291de04b2247", "67088699-d4f4-43f2-913c-481bff8a2dc5","72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb", "dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b","aabbaa50-72f4-495c-b3a1-70383ee9d6a4", "3218bbb9-5874-4d37-a82d-3e35e52d1702","3ebfb9602ac07779||3ebfe9612a007979", "aec20d52-c2eb-4436-b121-c29ad4097f6c","e7e896cd939685d7||e7e8e6c1930689d7", "a4b1e1db-55ef-4d9d-b9d2-18393c5f59ee"};@Overridepublic void run(SourceContext<UserInfo> sourceContext) throws Exception{while (true){String userid = userids[(int) (Math.random() * (userids.length - 1))];UserInfo userInfo = new UserInfo();userInfo.setUserId(userid);userInfo.setAmount(Math.random() * 100);userInfo.setTs(new Timestamp(new Date().getTime()));sourceContext.collect(userInfo);Thread.sleep(100);}}@Overridepublic void cancel(){}}

写入file

通过sql的ddl创建一个最简单的基于process time的table,然后写入数据.

在这个实例中,我们开启了checkpoint的时间间隔是10s,所以会每隔10s写入一个orc文件.

StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();bsEnv.enableCheckpointing(10000);StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv);DataStream<UserInfo> dataStream = bsEnv.addSource(new MySource());String sql = "CREATE TABLE fs_table (n" +"  user_id STRING,n" +"  order_amount DOUBLE,n" +"  dt STRING," +"  h string," +"  m string  n" +") PARTITIONED BY (dt,h,m) WITH (n" +"  'connector'='filesystem',n" +"  'path'='file:///tmp/abc',n" +"  'format'='orc'n" +")";tEnv.executeSql(sql);tEnv.createTemporaryView("users", dataStream);String insertSql = "insert into  fs_table SELECT userId, amount, " +" DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') FROM users";tEnv.executeSql(insertSql);

完整的代码请参考 https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/connectors/sql/StreamingWriteFile.java

更多精彩内容,欢迎关注我的公众号【大数据技术与应用实战】

icc校色文件使用教程_Flink教程-flink 1.11使用sql将流式数据写入文件系统相关推荐

  1. flink源码分析_Flink源码分析之深度解读流式数据写入hive

    前言 前段时间我们讲解了flink1.11中如何将流式数据写入文件系统和hive [flink 1.11 使用sql将流式数据写入hive],今天我们来从源码的角度深入分析一下.以便朋友们对flink ...

  2. Flink 1.11 与 Hive 批流一体数仓实践

    导读:Flink 从 1.9.0 开始提供与 Hive 集成的功能,随着几个版本的迭代,在最新的 Flink 1.11 中,与 Hive 集成的功能进一步深化,并且开始尝试将流计算场景与Hive 进行 ...

  3. Iceberg 在基于 Flink 的流式数据入库场景中的应用

    本文以流式数据入库的场景为基础,介绍引入 Iceberg 作为落地格式和嵌入 Flink sink 的收益,并分析了当前可实现的框架及要点. 应用场景 流式数据入库,是大数据和数据湖的典型应用场景.上 ...

  4. icc校色文件使用教程_使用ICC特性文件进行校色的方法

    使用 ICC 特性文件进行校色的方法 使用 ICC 特性文件进行校色的方法 使用 ICC 特性文件进行校色的方法 6 A5 s: l3 D5 M) N- \+ j9 V 武汉信息传播职业技术学院兼职 ...

  5. icc校色文件使用教程_浅谈如何用ICC文件进行校色,校色前后效果展示

    在网上很多人用ICC校色文件对显示器进行校色,开始我不太明白为什么要对显示器进行校色,而且每台显示器的色彩在出厂的时候都已经设置好了,为什么还要进行校色?后来看了很多校色文件的帖子,才知道原来对显示器 ...

  6. Demo:基于 Flink SQL 构建流式应用

    摘要:上周四在 Flink 中文社区钉钉群中直播分享了<Demo:基于 Flink SQL 构建流式应用>,直播内容偏向实战演示.这篇文章是对直播内容的一个总结,并且改善了部分内容,比如除 ...

  7. 使用 Flink Hudi 构建流式数据湖

    简介: 本文介绍了 Flink Hudi 通过流计算对原有基于 mini-batch 的增量计算模型的不断优化演进. 本文介绍了 Flink Hudi 通过流计算对原有基于 mini-batch 的增 ...

  8. Apache Griffin+Flink+Kafka实现流式数据质量监控实战

    点击上方蓝色字体,选择"设为星标" 回复"面试"获取更多惊喜 八股文教给我,你们专心刷题和面试 Hi,我是王知无,一个大数据领域的原创作者. 放心关注我,获取更 ...

  9. 1.30.Flink SQL案例将Kafka数据写入hive

    1.30.Flink SQL案例将Kafka数据写入hive 1.30.1.1.场景,环境,配置准备 1.30.1.2.案例代码 1.30.1.2.1.编写pom.xml文件 1.30.1.2.2.M ...

最新文章

  1. Cnnot find System Java Compiler Ensure that you have installed a JDK
  2. 非标自动化企业前十名_非标自动化设计:非标自动化是如何被称做企业里的血液?...
  3. HTML精确定位:scrollLeft,scrollWidth,clientWidth,offsetWidth之全然具体解释
  4. jQuery选择器总结(上)
  5. 使用jsonpath解析json内容
  6. 人脸识别技术大总结(1)——Face Detection Alignment
  7. 本周学习总结(ng-zorro/MDN索引/读书笔记)
  8. pytorch 笔记: 扩展torch.autograd
  9. 鸿蒙智慧电视,华为的鸿蒙电视与智能电视有什么区别
  10. TCP/IP协议--ARP协议(有了IP地址为什么还需要ARP协议)
  11. linux-文件管理-不完整版
  12. [Editor][002][Emacs] 从零到高级的进阶 - 实践开发 - 帮助菜单主页
  13. 高性能Web动画和渲染原理系列(3)——transform和opacity为什么高性能
  14. **python基础类和对象(十二)
  15. 全面解读:戴尔”未来就绪的存储保障计划” —— SC系列存储60天无理由退货的影响与意义...
  16. AWT_Swing_图标按钮(Java)
  17. vue中 key 值的作用
  18. 软件需求文档模板及说明
  19. 微信小程序自定义导航栏
  20. 如何用p5js做一个可爱的鲶鲶并添加不倒翁效果

热门文章

  1. VRRP+MSTP 实现流量分流与核心层备份
  2. 应用层之E-mail服务及javaMail邮件发送的知识总结
  3. Sidebar 左右菜单的使用
  4. pta 输出三角形字符阵列_PTA实验6-8 简单计算器 (20分)
  5. html5代码大全文库,HTML颜色代码表
  6. android 内部存储 清空,Android清空应用内部文件缓存
  7. java集合代码_Java-集合(示例代码)
  8. JQuery中2个等号与3个等号的区别
  9. 使用睡袋_在户外一个关乎睡眠的重要因素——睡袋
  10. 范德蒙德矩阵在MATLAB中怎么表示,Python 之 Python与MATLAB 矩阵操作总结