1.概述

转载:Flink SQL 自定义 Source format

1.背景

由于 kafka 中的 json 属于嵌套,又不想二次序列化再把它展开,故自定义 format。

2.步骤

1.自定义 Factory 实现 DeserializationFormatFactory

2.自定义 DeserializationSchema 实现 DeserializationSchema

自定义 Factory 中 createDecodingFormat 方法返回 createDecodingFormat

3.自定义 Format

为了简单起见,我们自定义一个 NullFormat ,也就是无论 kafka 中的消息是什么都返回 null,相当于 kafka 中没有消息

自定义 Factory

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.formats.json.JsonOptions;
import org.apache.flink.formats.json.TimestampFormat;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;import java.util.Collections;
import java.util.HashSet;
import java.util.Set;import static org.apache.flink.formats.json.JsonOptions.*;/*** Table format factory for providing configured instances of JSON to RowData* {@link SerializationSchema} and {@link DeserializationSchema}.*/
public class NullFormatFactory implementsDeserializationFormatFactory {// Factory 的唯一标识public static final String IDENTIFIER = "null";@SuppressWarnings("unchecked")@Override// 解码的入口方法 基本上属于固定写法public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(DynamicTableFactory.Context context,ReadableConfig formatOptions) {FactoryUtil.validateFactoryOptions(this, formatOptions);validateFormatOptions(formatOptions);final boolean failOnMissingField = formatOptions.get(FAIL_ON_MISSING_FIELD);final boolean ignoreParseErrors = formatOptions.get(IGNORE_PARSE_ERRORS);TimestampFormat timestampOption = JsonOptions.getTimestampFormat(formatOptions);return new DecodingFormat<DeserializationSchema<RowData>>() {@Overridepublic DeserializationSchema<RowData> createRuntimeDecoder(DynamicTableSource.Context context,//ScanRuntimeProviderContextDataType producedDataType) { // 表的字段名和数据类型final RowType rowType = (RowType) producedDataType.getLogicalType();final TypeInformation<RowData> rowDataTypeInfo =(TypeInformation<RowData>) context.createTypeInformation(producedDataType);return new NullRowDataDeserializationSchema(rowType,rowDataTypeInfo,failOnMissingField,ignoreParseErrors,timestampOption);}@Overridepublic ChangelogMode getChangelogMode() {return ChangelogMode.insertOnly();}};}@Overridepublic String factoryIdentifier() {return IDENTIFIER;}@Overridepublic Set<ConfigOption<?>> requiredOptions() {return Collections.emptySet();}@Overridepublic Set<ConfigOption<?>> optionalOptions() {Set<ConfigOption<?>> options = new HashSet<>();options.add(FAIL_ON_MISSING_FIELD);options.add(IGNORE_PARSE_ERRORS);options.add(TIMESTAMP_FORMAT);return options;}// ------------------------------------------------------------------------//  Validation// ------------------------------------------------------------------------static void validateFormatOptions(ReadableConfig tableOptions) {boolean failOnMissingField = tableOptions.get(FAIL_ON_MISSING_FIELD);boolean ignoreParseErrors = tableOptions.get(IGNORE_PARSE_ERRORS);String timestampFormat = tableOptions.get(TIMESTAMP_FORMAT);if (ignoreParseErrors && failOnMissingField) {throw new ValidationException(FAIL_ON_MISSING_FIELD.key()+ " and "+ IGNORE_PARSE_ERRORS.key()+ " shouldn't both be true.");}if (!TIMESTAMP_FORMAT_ENUM.contains(timestampFormat)) {throw new ValidationException(String.format("Unsupported value '%s' for %s. Supported values are [SQL, ISO-8601].",timestampFormat, TIMESTAMP_FORMAT.key()));}}
}

自定义 DeserializationSchema

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.formats.json.TimestampFormat;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;import java.io.IOException;
import java.util.Objects;import static org.apache.flink.util.Preconditions.checkNotNull;@Internal
public class NullRowDataDeserializationSchema implements DeserializationSchema<RowData> {private static final long serialVersionUID = 1L;/*** Flag indicating whether to fail if a field is missing.*/private final boolean failOnMissingField;/*** Flag indicating whether to ignore invalid fields/rows (default: throw an exception).*/private final boolean ignoreParseErrors;/*** TypeInformation of the produced {@link RowData}.**/private final TypeInformation<RowData> resultTypeInfo;/*** Runtime converter that converts {@link JsonNode}s into* objects of Flink SQL internal data structures.**//*** Object mapper for parsing the JSON.*/private final ObjectMapper objectMapper = new ObjectMapper();/*** Timestamp format specification which is used to parse timestamp.*/private final TimestampFormat timestampFormat;public NullRowDataDeserializationSchema(RowType rowType,TypeInformation<RowData> resultTypeInfo,boolean failOnMissingField,boolean ignoreParseErrors,TimestampFormat timestampFormat) {if (ignoreParseErrors && failOnMissingField) {throw new IllegalArgumentException("JSON format doesn't support failOnMissingField and ignoreParseErrors are both enabled.");}this.resultTypeInfo = checkNotNull(resultTypeInfo);this.failOnMissingField = failOnMissingField;this.ignoreParseErrors = ignoreParseErrors;this.timestampFormat = timestampFormat;}@Override// 这里其实是真正的反序列化逻辑,比如说将 json 拍平 (多层嵌套转化为一层嵌套 )// 这里是重点,记得关注重点public RowData deserialize(byte[] message) throws IOException {return null;}@Overridepublic boolean isEndOfStream(RowData nextElement) {return false;}@Overridepublic TypeInformation<RowData> getProducedType() {return resultTypeInfo;}@Overridepublic boolean equals(Object o) {if (this == o) {return true;}if (o == null || getClass() != o.getClass()) {return false;}NullRowDataDeserializationSchema that = (NullRowDataDeserializationSchema) o;return failOnMissingField == that.failOnMissingField &&ignoreParseErrors == that.ignoreParseErrors &&resultTypeInfo.equals(that.resultTypeInfo) &&timestampFormat.equals(that.timestampFormat);}@Overridepublic int hashCode() {return Objects.hash(failOnMissingField, ignoreParseErrors, resultTypeInfo, timestampFormat);}
}

4.使用自定义 Format

public class SqlKafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, environmentSettings);// enable checkpointingConfiguration configuration = tableEnv.getConfig().getConfiguration();configuration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE);configuration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(10));String sql = "CREATE TABLE sourcedata (`id` bigint,`status` int,`city_id` bigint,`courier_id` bigint,info_index int,order_id bigint,tableName String" +") WITH (" +"'connector' = 'kafka','topic' = 'canal_monitor_order'," +"'properties.bootstrap.servers' = 'bigdata-dev-mq:9092','properties.group.id' = 'testGroup'," +"'format' = 'null','scan.startup.mode' = 'earliest-offset')";tableEnv.executeSql(sql);......

‘format’ = ‘null’ Factory 的唯一标识

然后就可以直接执行了

【Flink】Flink SQL 自定义 Source format相关推荐

  1. Flink SQL 自定义 Connector

    文章目录 一.Flink Table 实现架构图: 二.Flink Table Conncetor流程解析 1.定义动态工厂类 2.定义Connector Source/Sink类 (1)定义Conn ...

  2. Flink SQL自定义connector

    本文翻译自:Flink v1.11官方文档 动态表是Flink的Table&SQL API的核心概念,用于以统一方式处理有界和无界数据. 因为动态表只是一个逻辑概念,所以Flink并不拥有数据 ...

  3. Flink 源码 | 自定义 Format 消费 Maxwell CDC 数据

    Flink 1.11 最重要的 Feature -- Hive Streaming 之前已经和大家分享过了,今天就和大家来聊一聊另一个特别重要的功能 -- CDC. CDC概述 何为CDC?Chang ...

  4. 《从0到1学习Flink》—— 如何自定义 Data Source ?

    前言 在 <从0到1学习Flink>-- Data Source 介绍 文章中,我给大家介绍了 Flink Data Source 以及简短的介绍了一下自定义 Data Source,这篇 ...

  5. Flink自带的Source源算子以及自定义数据源Source

    文章目录 Flink的DataStream API(基础篇) Source源算子 从集合中读取数据 从文件中读取数据 从Scoket中读取数据 从Kafka中读取数据 自定义Source Flink的 ...

  6. Flink SQL自定义聚合函数

    <2021年最新版大数据面试题全面开启更新> 本篇幅介绍Flink Table/SQL中如何自定义一个聚合函数,介绍其基本用法.撤回定义以及与源码结合分析每个方法的调用位置. 基本使用 F ...

  7. 实时数仓入门训练营:实时计算 Flink 版 SQL 实践

    简介:<实时数仓入门训练营>由阿里云研究员王峰.阿里云高级产品专家刘一鸣等实时计算 Flink 版和 Hologres 的多名技术/产品一线专家齐上阵,合力搭建此次训练营的课程体系,精心打 ...

  8. 【Flink】Flink Table SQL 用户自定义函数: UDF、UDAF、UDTF

    本文总结Flink Table & SQL中的用户自定义函数: UDF.UDAF.UDTF. UDF: 自定义标量函数(User Defined Scalar Function).一行输入一行 ...

  9. flink之SQL入门

    SQL部分学习 Table API的特点Table API和SQL都是Apache Flink中高等级的分析API,SQL所具备的特点Table API也都具有,如下: 声明式 - 用户只关心做什么, ...

最新文章

  1. js 自带的三种弹框
  2. html标签的显示模式(块级标签,行内标签,行内块标签)(转)
  3. ROS系统 常用命令行工具的使用
  4. 【GDAL】聊聊GDAL的数据模型(二)——Band对象
  5. 基于Spatial CNN的车道线检测和交通场景理解
  6. GPU Gems1 - 26 OpenEXR图像文件格式与HDR(The OpenEXR Image File Format and HDR)
  7. 商品WEB开发的商品定单与存储过程的应用
  8. 基于tornado的爬虫并发问题
  9. 客户商品生命周期应用
  10. log4net 不生成日志文件的解决办法
  11. 【Linux】如何在文件中写入感叹号
  12. 基于Java的线上诊疗系统毕业设计源码1617411
  13. Cortex-M3/M4(5)-软件移植
  14. 苹果icloud登录_如何在Windows电脑上使用苹果iCloud服务?
  15. Ubuntu 安装uwsgi出错
  16. 如何在Nintendo Switch上禁用截图按钮
  17. 2020.3.23 bugku(21-25)
  18. Java初学 通过接口实现猫狗跳高
  19. 信息处部门职责及岗位部门职责(附下载)
  20. 华为手机_text是什么文件_AS读取华为手机内置SD卡文件时,找不到文件问题

热门文章

  1. LG电子成功进行太赫兹频段6G无线信号传输,距离超过100米
  2. 苹果下调macbook AppleCare+价格 英特尔芯pro除外
  3. 定档6月11日 诺基亚手机新品发布亮点抢先看
  4. 上海美特斯邦威成被执行人 执行标的超79万
  5. 百度景鲲:9月15日发布小度真无线智能耳机
  6. 亚马逊云计算业务上半年营收210亿美元
  7. 搭载MIUI for Watch,支持eSIM独立通话!小米手表首发1299元起
  8. 在线支付巨头PayPal宣布退出Libra:继续就未来合作方式进行对话
  9. 传音控股回应“遭华为起诉侵权”:未收到起诉状 科创板上市不受影响
  10. 明年5G智能手机大爆发!出货量惊人