icc校色文件使用教程_Flink教程-flink 1.11使用sql将流式数据写入文件系统
- 滚动策略
- 分区提交
- 分区提交触发器
- 分区时间的抽取
- 分区提交策略
- 完整示例
- 定义实体类
- 自定义source
- 写入file
flink提供了一个file system connector,可以使用DDL创建一个table,然后使用sql的方法将数据写入hdfs、local file等文件系统,支持的写入格式包括json、csv、avro、parquet、orc。
一个最简单的DDL如下:
CREATE TABLE fs_table ( user_id STRING, order_amount DOUBLE, dt STRING, h string, m string
) PARTITIONED BY (dt,h,m) WITH ( 'connector'='filesystem', 'path'='file:///tmp/abc', 'format'='orc' );
下面我们简单的介绍一下相关的概念和如何使用。
滚动策略
- 在写入列格式(比如parquet、orc)的时候,上述的配置和checkpoint的间隔一起来控制滚动策略,也就是说sink.rolling-policy.file-size、sink.rolling-policy.rollover-interval、checkpoint间隔,这三个选项,只要有一个条件达到了,然后就会触发分区文件的滚动,结束上一个文件的写入,生成新文件。
- 对于写入行格式的数据,比如json、csv,主要是靠sink.rolling-policy.file-size、sink.rolling-policy.rollover-interval,也就是文件的大小和时间来控制写入数据的滚动策略.
分区提交
在往一个分区写完了数据之后,我们希望做一些工作来通知下游。比如在分区目录写一个SUCCESS文件,或者是对于hive来说,去更新metastore的数据,自动刷新一下分区等等。
分区的提交主要依赖于触发器和提交的策略:
- 触发器:即什么时候触发分区的提交,
- 提交策略:也就是分区写完之后我们做什么,目前系统提供了两种内置策略:1.往分区目录写一个空SUCCESS文件;2.更新元数据.
分区提交触发器
- process-time. 这种提交方式依赖于系统的时间,一旦遇到数据延迟等情况,会造成分区和分区的数据不一致。
- partition-time :这种情况需要从分区字段里抽取出来相应的pattern,具体可参考下一个段落分区的抽取。
- sink.partition-commit.delay:一旦这个数值设置不为0,则在process-time情况下,当系统时间大于分区创建时间加上delay延迟,会触发分区提交; 如果是在partition-time 情况下,则需要水印大于分区创建时间加上delay时间,会触发分区提交.
第一个参数process-time、partition-time,我们不用做过多的解释,就类似于flink中的processtime和eventtime。
第二个参数sink.partition-commit.delay我们用实际案例解释下: 比如我们配置的是分区是/yyyy-MM-dd/HH/,写入的是ORC列格式,checkpoint配置的间隔是一分钟,也就是默认情况下会每分钟生成一个orc文件,最终会在每个分区(/yyyy-MM-dd/HH/)下面生成60个orc文件。
比如当前系统正在写入/day=2020-07-06/h=10/分区的数据,那么这个分区的创建时间是2020-07-06 10:00:00,如果这个delay配置采用的是默认值,也就是0s,这个时候当写完了一个ORC文件,也就是2020-07-06 10:01:00分钟的时候,就会触发分区提交,比如更新hive的元数据,这个时候我们去查询hive就能查到刚刚写入的文件;如果我们想/day=2020-07-06/h=10/这个分区的60个文件都写完了再更新分区,那么我们可以将这个delay设置成 1h,也就是等到2020-07-06 11:00:00的时候才会触发分区提交,我们才会看到/2020-07-06/10/分区下面的所有数据
分区时间的抽取
自定义抽取分区时间的话,需要实现PartitionTimeExtractor接口:
public interface PartitionTimeExtractor extends Serializable {String DEFAULT = "default";String CUSTOM = "custom";/*** Extract time from partition keys and values.*/LocalDateTime extract(List<String> partitionKeys, List<String> partitionValues);...................
}
分区提交策略
定义了分区提交的策略,也就是写完分区数据之后做什么事情,目前系统提供了以下行为:
- metastore,只支持hive table,也就是写完数据之后,更新hive的元数据.
- success file: 写完数据,往分区文件写一个success file.
- 自定义
完整示例
定义实体类
public static class UserInfo implements java.io.Serializable{private String userId;private Double amount;private Timestamp ts;public String getUserId(){return userId;}public void setUserId(String userId){this.userId = userId;}public Double getAmount(){return amount;}public void setAmount(Double amount){this.amount = amount;}public Timestamp getTs(){return ts;}public void setTs(Timestamp ts){this.ts = ts;}}
自定义source
public static class MySource implements SourceFunction<UserInfo>{String userids[] = {"4760858d-2bec-483c-a535-291de04b2247", "67088699-d4f4-43f2-913c-481bff8a2dc5","72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb", "dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b","aabbaa50-72f4-495c-b3a1-70383ee9d6a4", "3218bbb9-5874-4d37-a82d-3e35e52d1702","3ebfb9602ac07779||3ebfe9612a007979", "aec20d52-c2eb-4436-b121-c29ad4097f6c","e7e896cd939685d7||e7e8e6c1930689d7", "a4b1e1db-55ef-4d9d-b9d2-18393c5f59ee"};@Overridepublic void run(SourceContext<UserInfo> sourceContext) throws Exception{while (true){String userid = userids[(int) (Math.random() * (userids.length - 1))];UserInfo userInfo = new UserInfo();userInfo.setUserId(userid);userInfo.setAmount(Math.random() * 100);userInfo.setTs(new Timestamp(new Date().getTime()));sourceContext.collect(userInfo);Thread.sleep(100);}}@Overridepublic void cancel(){}}
写入file
通过sql的ddl创建一个最简单的基于process time的table,然后写入数据.
在这个实例中,我们开启了checkpoint的时间间隔是10s,所以会每隔10s写入一个orc文件.
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();bsEnv.enableCheckpointing(10000);StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv);DataStream<UserInfo> dataStream = bsEnv.addSource(new MySource());String sql = "CREATE TABLE fs_table (n" +" user_id STRING,n" +" order_amount DOUBLE,n" +" dt STRING," +" h string," +" m string n" +") PARTITIONED BY (dt,h,m) WITH (n" +" 'connector'='filesystem',n" +" 'path'='file:///tmp/abc',n" +" 'format'='orc'n" +")";tEnv.executeSql(sql);tEnv.createTemporaryView("users", dataStream);String insertSql = "insert into fs_table SELECT userId, amount, " +" DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') FROM users";tEnv.executeSql(insertSql);
完整的代码请参考 https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/connectors/sql/StreamingWriteFile.java
更多精彩内容,欢迎关注我的公众号【大数据技术与应用实战】
icc校色文件使用教程_Flink教程-flink 1.11使用sql将流式数据写入文件系统相关推荐
- flink源码分析_Flink源码分析之深度解读流式数据写入hive
前言 前段时间我们讲解了flink1.11中如何将流式数据写入文件系统和hive [flink 1.11 使用sql将流式数据写入hive],今天我们来从源码的角度深入分析一下.以便朋友们对flink ...
- Flink 1.11 与 Hive 批流一体数仓实践
导读:Flink 从 1.9.0 开始提供与 Hive 集成的功能,随着几个版本的迭代,在最新的 Flink 1.11 中,与 Hive 集成的功能进一步深化,并且开始尝试将流计算场景与Hive 进行 ...
- Iceberg 在基于 Flink 的流式数据入库场景中的应用
本文以流式数据入库的场景为基础,介绍引入 Iceberg 作为落地格式和嵌入 Flink sink 的收益,并分析了当前可实现的框架及要点. 应用场景 流式数据入库,是大数据和数据湖的典型应用场景.上 ...
- icc校色文件使用教程_使用ICC特性文件进行校色的方法
使用 ICC 特性文件进行校色的方法 使用 ICC 特性文件进行校色的方法 使用 ICC 特性文件进行校色的方法 6 A5 s: l3 D5 M) N- \+ j9 V 武汉信息传播职业技术学院兼职 ...
- icc校色文件使用教程_浅谈如何用ICC文件进行校色,校色前后效果展示
在网上很多人用ICC校色文件对显示器进行校色,开始我不太明白为什么要对显示器进行校色,而且每台显示器的色彩在出厂的时候都已经设置好了,为什么还要进行校色?后来看了很多校色文件的帖子,才知道原来对显示器 ...
- Demo:基于 Flink SQL 构建流式应用
摘要:上周四在 Flink 中文社区钉钉群中直播分享了<Demo:基于 Flink SQL 构建流式应用>,直播内容偏向实战演示.这篇文章是对直播内容的一个总结,并且改善了部分内容,比如除 ...
- 使用 Flink Hudi 构建流式数据湖
简介: 本文介绍了 Flink Hudi 通过流计算对原有基于 mini-batch 的增量计算模型的不断优化演进. 本文介绍了 Flink Hudi 通过流计算对原有基于 mini-batch 的增 ...
- Apache Griffin+Flink+Kafka实现流式数据质量监控实战
点击上方蓝色字体,选择"设为星标" 回复"面试"获取更多惊喜 八股文教给我,你们专心刷题和面试 Hi,我是王知无,一个大数据领域的原创作者. 放心关注我,获取更 ...
- 1.30.Flink SQL案例将Kafka数据写入hive
1.30.Flink SQL案例将Kafka数据写入hive 1.30.1.1.场景,环境,配置准备 1.30.1.2.案例代码 1.30.1.2.1.编写pom.xml文件 1.30.1.2.2.M ...
最新文章
- Cnnot find System Java Compiler Ensure that you have installed a JDK
- 非标自动化企业前十名_非标自动化设计:非标自动化是如何被称做企业里的血液?...
- HTML精确定位:scrollLeft,scrollWidth,clientWidth,offsetWidth之全然具体解释
- jQuery选择器总结(上)
- 使用jsonpath解析json内容
- 人脸识别技术大总结(1)——Face Detection Alignment
- 本周学习总结(ng-zorro/MDN索引/读书笔记)
- pytorch 笔记: 扩展torch.autograd
- 鸿蒙智慧电视,华为的鸿蒙电视与智能电视有什么区别
- TCP/IP协议--ARP协议(有了IP地址为什么还需要ARP协议)
- linux-文件管理-不完整版
- [Editor][002][Emacs] 从零到高级的进阶 - 实践开发 - 帮助菜单主页
- 高性能Web动画和渲染原理系列(3)——transform和opacity为什么高性能
- **python基础类和对象(十二)
- 全面解读:戴尔”未来就绪的存储保障计划” —— SC系列存储60天无理由退货的影响与意义...
- AWT_Swing_图标按钮(Java)
- vue中 key 值的作用
- 软件需求文档模板及说明
- 微信小程序自定义导航栏
- 如何用p5js做一个可爱的鲶鲶并添加不倒翁效果
热门文章
- VRRP+MSTP 实现流量分流与核心层备份
- 应用层之E-mail服务及javaMail邮件发送的知识总结
- Sidebar 左右菜单的使用
- pta 输出三角形字符阵列_PTA实验6-8 简单计算器 (20分)
- html5代码大全文库,HTML颜色代码表
- android 内部存储 清空,Android清空应用内部文件缓存
- java集合代码_Java-集合(示例代码)
- JQuery中2个等号与3个等号的区别
- 使用睡袋_在户外一个关乎睡眠的重要因素——睡袋
- 范德蒙德矩阵在MATLAB中怎么表示,Python 之 Python与MATLAB 矩阵操作总结