Flink SQL 自定义 Sink
- 1. 背景
- 2. 步骤
- 3.自定义 sink 代码
- 4. 使用 Redis Sink
- 5.详细解释
- 6.原理
- 7.参考
1.背景
内部要做 Flink SQL 平台,本文以自定义 Redis Sink 为例来说明 Flink SQL 如何自定义 Sink 以及自定义完了之后如何使用
基于 Flink 1.11
2.步骤
- implements DynamicTableSinkFactory
- implements DynamicTableSink
- 创建 Redis Sink
3.自定义 sink 代码
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.DynamicTableSink.DataStructureConverter;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import redis.clients.jedis.JedisCluster;import java.util.*;import static org.apache.flink.configuration.ConfigOptions.key;/*** @author shengjk1* @date 2020/10/16*/
public class RedisTableSinkFactory implements DynamicTableSinkFactory {public static final String IDENTIFIER = "redis";public static final ConfigOption<String> HOST_PORT = key("hostPort").stringType().noDefaultValue().withDescription("redis host and port,");public static final ConfigOption<String> PASSWORD = key("password").stringType().noDefaultValue().withDescription("redis password");public static final ConfigOption<Integer> EXPIRE_TIME = key("expireTime").intType().noDefaultValue().withDescription("redis key expire time");public static final ConfigOption<String> KEY_TYPE = key("keyType").stringType().noDefaultValue().withDescription("redis key type,such as hash,string and so on ");public static final ConfigOption<String> KEY_TEMPLATE = key("keyTemplate").stringType().noDefaultValue().withDescription("redis key template ");public static final ConfigOption<String> FIELD_TEMPLATE = key("fieldTemplate").stringType().noDefaultValue().withDescription("redis field template ");public static final ConfigOption<String> VALUE_NAMES = key("valueNames").stringType().noDefaultValue().withDescription("redis value name ");@Override// 当 connector 与 IDENTIFIER 一直才会找到 RedisTableSinkFactory 通过 public String factoryIdentifier() {return IDENTIFIER;}@Overridepublic Set<ConfigOption<?>> requiredOptions() {return new HashSet<>();}@Override//我们自己定义的所有选项 (with 后面的 ) 都会在这里获取public Set<ConfigOption<?>> optionalOptions() {Set<ConfigOption<?>> options = new HashSet<>();options.add(HOST_PORT);options.add(PASSWORD);options.add(EXPIRE_TIME);options.add(KEY_TYPE);options.add(KEY_TEMPLATE);options.add(FIELD_TEMPLATE);options.add(VALUE_NAMES);return options;}@Overridepublic DynamicTableSink createDynamicTableSink(Context context) {FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);helper.validate();ReadableConfig options = helper.getOptions();return new RedisSink(context.getCatalogTable().getSchema().toPhysicalRowDataType(),options);}private static class RedisSink implements DynamicTableSink {private final DataType type;private final ReadableConfig options;private RedisSink(DataType type, ReadableConfig options) {this.type = type;this.options = options;}@Override//ChangelogMode public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {return requestedMode;}@Override//具体运行的地方,真正开始调用用户自己定义的 streaming sink ,建立 sql 与 streaming 的联系public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {DataStructureConverter converter = context.createDataStructureConverter(type);return SinkFunctionProvider.of(new RowDataPrintFunction(converter, options, type));}@Override// sink 可以不用实现,主要用来 source 的谓词下推public DynamicTableSink copy() {return new RedisSink(type, options);}@Overridepublic String asSummaryString() {return "redis";}}/**同 flink streaming 自定义 sink ,只不过我们这次处理的是 RowData,不细说*/private static class RowDataPrintFunction extends RichSinkFunction<RowData> {private static final long serialVersionUID = 1L;private final DataStructureConverter converter;private final ReadableConfig options;private final DataType type;private RowType logicalType;private HashMap<String, Integer> fields;private JedisCluster jedisCluster;private RowDataPrintFunction(DataStructureConverter converter, ReadableConfig options, DataType type) {this.converter = converter;this.options = options;this.type = type;}@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);logicalType = (RowType) type.getLogicalType();fields = new HashMap<>();List<RowType.RowField> rowFields = logicalType.getFields();int size = rowFields.size();for (int i = 0; i < size; i++) {fields.put(rowFields.get(i).getName(), i);}jedisCluster = RedisUtil.getJedisCluster(options.get(HOST_PORT));}@Overridepublic void close() throws Exception {RedisUtil.closeConn(jedisCluster);}@Override/*2> +I(1,30017323,1101)2> -U(1,30017323,1101)2> +U(2,30017323,1101)2> -U(2,30017323,1101)2> +U(3,30017323,1101)2> -U(3,30017323,1101)2> +U(4,30017323,1101)3> -U(3,980897,3208)3> +U(4,980897,3208)*/public void invoke(RowData rowData, Context context) {RowKind rowKind = rowData.getRowKind();Row data = (Row) converter.toExternal(rowData);if (rowKind.equals(RowKind.UPDATE_AFTER) || rowKind.equals(RowKind.INSERT)) {String keyTemplate = options.get(KEY_TEMPLATE);if (Objects.isNull(keyTemplate) || keyTemplate.trim().length() == 0) {throw new NullPointerException(" keyTemplate is null or keyTemplate is empty");}if (keyTemplate.contains("${")) {String[] split = keyTemplate.split("\\$\\{");keyTemplate = "";for (String s : split) {if (s.contains("}")) {String filedName = s.substring(0, s.length() - 1);int index = fields.get(filedName);keyTemplate = keyTemplate + data.getField(index).toString();} else {keyTemplate = keyTemplate + s;}}}String keyType = options.get(KEY_TYPE);String valueNames = options.get(VALUE_NAMES);// type=hash must need fieldTemplateif ("hash".equalsIgnoreCase(keyType)) {String fieldTemplate = options.get(FIELD_TEMPLATE);if (fieldTemplate.contains("${")) {String[] split = fieldTemplate.split("\\$\\{");fieldTemplate = "";for (String s : split) {if (s.contains("}")) {String fieldName = s.substring(0, s.length() - 1);int index = fields.get(fieldName);fieldTemplate = fieldTemplate + data.getField(index).toString();} else {fieldTemplate = fieldTemplate + s;}}}//fieldName = fieldTemplate-valueNameif (valueNames.contains(",")) {HashMap<String, String> map = new HashMap<>();String[] fieldNames = valueNames.split(",");for (String fieldName : fieldNames) {String value = data.getField(fields.get(fieldName)).toString();map.put(fieldTemplate + "_" + fieldName, value);}jedisCluster.hset(keyTemplate, map);} else {jedisCluster.hset(keyTemplate, fieldTemplate + "_" + valueNames, data.getField(fields.get(valueNames)).toString());}} else if ("set".equalsIgnoreCase(keyType)) {jedisCluster.set(keyTemplate, data.getField(fields.get(valueNames)).toString());} else if ("sadd".equalsIgnoreCase(keyType)) {jedisCluster.sadd(keyTemplate, data.getField(fields.get(valueNames)).toString());} else if ("zadd".equalsIgnoreCase(keyType)) {jedisCluster.sadd(keyTemplate, data.getField(fields.get(valueNames)).toString());} else {throw new IllegalArgumentException(" not find this keyType:" + keyType);}if (Objects.nonNull(options.get(EXPIRE_TIME))) {jedisCluster.expire(keyTemplate, options.get(EXPIRE_TIME));}}}}
}
4.使用 Redis Sink
因为 Flink 是通过 SPI 来发现服务的,所以需要先配置 service ![在这里插入图片描述](https://img-blog.csdnimg.cn/2020102015530087.png#pic_center)
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.scala.typeutils.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.time.Duration;/*** @author shengjk1* @date 2020/9/25*/
public class SqlKafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, environmentSettings);// enable checkpointingConfiguration configuration = tableEnv.getConfig().getConfiguration();configuration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE);configuration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(10));String sql = "CREATE TABLE sourcedata (`id` bigint,`status` int,`city_id` bigint,`courier_id` bigint,info_index int,order_id bigint,tableName String" +") WITH (" +"'connector' = 'kafka','topic' = 'xxx'," +"'properties.bootstrap.servers' = 'xxx','properties.group.id' = 'testGroup'," +"'format' = 'json','scan.startup.mode' = 'earliest-offset')";tableEnv.executeSql(sql);//15017284 distinctTable bigtable = tableEnv.sqlQuery("select distinct a.id,a.courier_id,a.status,a.city_id,b.info_index from (select id,status,city_id,courier_id from sourcedata where tableName = 'orders' and status=60)a join (select " +" order_id,max(info_index)info_index from sourcedata where tableName = 'infos' group by order_id )b on a.id=b.order_id");sql = "CREATE TABLE redis (info_index BIGINT,courier_id BIGINT,city_id BIGINT" +") WITH (" +"'connector' = 'redis'," +"'hostPort'='xxx'," +"'keyType'='hash'," +"'keyTemplate'='test2_${city_id}'," +"'fieldTemplate'='test2_${courier_id}'," +"'valueNames'='info_index,city_id'," +"'expireTime'='1000')";tableEnv.executeSql(sql);Table resultTable = tableEnv.sqlQuery("select sum(info_index)info_index,courier_id,city_id from " + bigtable + " group by city_id,courier_id");TupleTypeInfo<Tuple3<Long, Long, Long>> tupleType = new TupleTypeInfo<>(Types.LONG(),Types.LONG(),Types.LONG());tableEnv.toRetractStream(resultTable, tupleType).print("===== ");tableEnv.executeSql("INSERT INTO redis SELECT info_index,courier_id,city_id FROM " + resultTable);env.execute("");}
}
5.详细解释
create table test(
`id` bigint,`url` string,`day` string,`pv` long,`uv` long
) with {'connector'='redis','hostPort'='xxx','password'='','expireTime'='100','keyType'='hash','keyTemplate'='test_${id}','fieldTemplate'='${day}','valueNames'='pv,uv',
}redis result: 假设 id=1 day=20201016 pv=20,uv=20hashtest_1 20201016-pv 20,20201016-uv 20参数解释:
connector 固定写法
hostPort redis 的地址
password redis 的密码
expireTime redis key 过期时间,单位为 s
keyType redis key 的类型,目前有 hash、set、sadd、zadd
keyTemplate redis key 的表达式,如 test_${id} 注意 id 为表的字段名
fieldTemplate redis keyType==hash 时,此选项为必选,表达式规则同 keyTemplate
valueNames redis value only 可以有多个
执行的时候需要先配置
6.原理
整个流程如图,CatalogTable —> DynamicTableSource and DynamicTableSink 这个过程中,其实是通过 DynamicTableSourceFactory and DynamicTableSinkFactory 起到了一个桥梁的作用
(Source/Sink)Factory 通过 connector=‘xxx’ 找到 (实际上是通过 SPI ),理论上会做三种操作
1. validate options
2. configure encoding/decoding formats( if required )
3. create a parameterized instance of the table connector
其中 formats 是通过 format=‘xxx’ 找到DynamicTableSource DynamicTableSink
官网虽说可以看做是有状态的,但是否真的有状态取决于具体实现的 source 和 sink生成 Runtime logic,Runtime logic 被 Flink core connector interfaces( 如 InputFormat or SourceFunction),如果是 kafka 的话 则 是 FlinkKafkaConsumer 实现,而这些实现又被抽象为 *Provider,然后开始执行 *Provider
*Provider 是连接 SQL 与 Streaming 代码级别的桥梁
7.参考
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html
Flink SQL 自定义 Sink相关推荐
- 【Flink】Flink SQL 自定义 Source format
1.概述 转载:Flink SQL 自定义 Source format 1.背景 由于 kafka 中的 json 属于嵌套,又不想二次序列化再把它展开,故自定义 format. 2.步骤 1.自定义 ...
- Flink SQL 自定义 Connector
文章目录 一.Flink Table 实现架构图: 二.Flink Table Conncetor流程解析 1.定义动态工厂类 2.定义Connector Source/Sink类 (1)定义Conn ...
- Flink SQL自定义connector
本文翻译自:Flink v1.11官方文档 动态表是Flink的Table&SQL API的核心概念,用于以统一方式处理有界和无界数据. 因为动态表只是一个逻辑概念,所以Flink并不拥有数据 ...
- Flink SQL 自定义 redis connector
一般情况下,我们不需要创建新的 connector,因为 Flink SQL 已经内置了丰富的 connector 供我们使用,但是在实际生产环境中我们的存储是多种多样的,所以原生的 connecto ...
- Flink sql:Table sink doesn‘t support consuming update and delete changes which is produced by node
一.问题描述 Flink sql将kafka作为join的输出,报错: Exception in thread "main" org.apache.flink.table.api. ...
- Flink SQL自定义聚合函数
<2021年最新版大数据面试题全面开启更新> 本篇幅介绍Flink Table/SQL中如何自定义一个聚合函数,介绍其基本用法.撤回定义以及与源码结合分析每个方法的调用位置. 基本使用 F ...
- Flink从入门到精通100篇(十五)-Flink SQL FileSystem Connector 分区提交与自定义小文件合并策略
前言 本文先通过源码简单过一下分区提交机制的两个要素--即触发(trigger)和策略(policy)的实现,然后用合并小文件的实例说一下自定义分区提交策略的方法. PartitionCommitTr ...
- 1.10.Flink DataStreamAPI(API的抽象级别、Data Sources、connectors、Source容错性保证、Sink容错性保证、自定义sink、partition等)
1.10.Flink DataStreamAPI 1.10.1.Flink API的抽象级别 1.10.2.DatSource部分详解 1.10.2.1.DataStream API之Data Sou ...
- flink sql 知其所以然(二)| 自定义 redis 数据维表(附源码)
感谢您的关注 + 点赞 + 再看,对博主的肯定,会督促博主持续的输出更多的优质实战内容!!! 1.序篇-本文结构 背景篇-为啥需要 redis 维表 目标篇-做 redis 维表的预期效果是什么 ...
最新文章
- spring18-1:采用jdk的动态代理 proxy。
- cmd中如何切换指定目录
- 虚拟现实大会ChinaVR2015报告之-From Visual Content to Virtual Reality Data-driven Intelligence Production
- 4.2.4 磁盘的管理
- 设计模式:策略模式(Strategy)
- java socket通信安全_Java Socket通信
- ifix如何设画面大小_如何让你的视频又小又清晰?视频编码输出软件来了
- 嵌入式linux面试题解析(二)——C语言部分三
- #3328. PYXFIB(单位根反演)
- 慕尼黑大学公开课 Competitive Strategy(竞争策略)总结
- NumPy Essentials 带注释源码 六、NumPy 中的傅里叶分析
- Android 系统性能优化(45)---Android 多线程
- java视频切片_使用ffmpeg视频切片并加密和视频AES-128加密后播放
- springboot tomcat配置_Spring Boot项目如何同时支持HTTP和HTTPS协议
- 小程序界面设计难?有了这个高颜值UI,我带你一步步解决所有难题
- 肠道重要菌属——嗜胆菌属 (Bilophila)喜欢脂肪、耐胆汁的促炎菌
- 超级AI买家阿里巴巴
- 京沪高铁,终于给了日本。。
- 介绍一个产品的思维导图_产品运营怎么使用思维导图?四个方面阐释思维导图的运用...
- Oracle问题处理——DBCA无法识别当前已有数据库实例