• 1. 背景
  • 2. 步骤
  • 3.自定义 sink 代码
  • 4. 使用 Redis Sink
  • 5.详细解释
  • 6.原理
  • 7.参考

1.背景

内部要做 Flink SQL 平台,本文以自定义 Redis Sink 为例来说明 Flink SQL 如何自定义 Sink 以及自定义完了之后如何使用
基于 Flink 1.11

2.步骤

  1. implements DynamicTableSinkFactory
  2. implements DynamicTableSink
  3. 创建 Redis Sink

3.自定义 sink 代码

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.DynamicTableSink.DataStructureConverter;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import redis.clients.jedis.JedisCluster;import java.util.*;import static org.apache.flink.configuration.ConfigOptions.key;/*** @author shengjk1* @date 2020/10/16*/
public class RedisTableSinkFactory implements DynamicTableSinkFactory {public static final String IDENTIFIER = "redis";public static final ConfigOption<String> HOST_PORT = key("hostPort").stringType().noDefaultValue().withDescription("redis host and port,");public static final ConfigOption<String> PASSWORD = key("password").stringType().noDefaultValue().withDescription("redis password");public static final ConfigOption<Integer> EXPIRE_TIME = key("expireTime").intType().noDefaultValue().withDescription("redis key expire time");public static final ConfigOption<String> KEY_TYPE = key("keyType").stringType().noDefaultValue().withDescription("redis key type,such as hash,string and so on ");public static final ConfigOption<String> KEY_TEMPLATE = key("keyTemplate").stringType().noDefaultValue().withDescription("redis key template ");public static final ConfigOption<String> FIELD_TEMPLATE = key("fieldTemplate").stringType().noDefaultValue().withDescription("redis field template ");public static final ConfigOption<String> VALUE_NAMES = key("valueNames").stringType().noDefaultValue().withDescription("redis value name ");@Override// 当 connector 与 IDENTIFIER 一直才会找到 RedisTableSinkFactory 通过 public String factoryIdentifier() {return IDENTIFIER;}@Overridepublic Set<ConfigOption<?>> requiredOptions() {return new HashSet<>();}@Override//我们自己定义的所有选项 (with 后面的 ) 都会在这里获取public Set<ConfigOption<?>> optionalOptions() {Set<ConfigOption<?>> options = new HashSet<>();options.add(HOST_PORT);options.add(PASSWORD);options.add(EXPIRE_TIME);options.add(KEY_TYPE);options.add(KEY_TEMPLATE);options.add(FIELD_TEMPLATE);options.add(VALUE_NAMES);return options;}@Overridepublic DynamicTableSink createDynamicTableSink(Context context) {FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);helper.validate();ReadableConfig options = helper.getOptions();return new RedisSink(context.getCatalogTable().getSchema().toPhysicalRowDataType(),options);}private static class RedisSink implements DynamicTableSink {private final DataType type;private final ReadableConfig options;private RedisSink(DataType type, ReadableConfig options) {this.type = type;this.options = options;}@Override//ChangelogMode public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {return requestedMode;}@Override//具体运行的地方,真正开始调用用户自己定义的 streaming sink ,建立 sql 与 streaming 的联系public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {DataStructureConverter converter = context.createDataStructureConverter(type);return SinkFunctionProvider.of(new RowDataPrintFunction(converter, options, type));}@Override// sink 可以不用实现,主要用来 source 的谓词下推public DynamicTableSink copy() {return new RedisSink(type, options);}@Overridepublic String asSummaryString() {return "redis";}}/**同 flink streaming 自定义 sink ,只不过我们这次处理的是 RowData,不细说*/private static class RowDataPrintFunction extends RichSinkFunction<RowData> {private static final long serialVersionUID = 1L;private final DataStructureConverter converter;private final ReadableConfig options;private final DataType type;private RowType logicalType;private HashMap<String, Integer> fields;private JedisCluster jedisCluster;private RowDataPrintFunction(DataStructureConverter converter, ReadableConfig options, DataType type) {this.converter = converter;this.options = options;this.type = type;}@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);logicalType = (RowType) type.getLogicalType();fields = new HashMap<>();List<RowType.RowField> rowFields = logicalType.getFields();int size = rowFields.size();for (int i = 0; i < size; i++) {fields.put(rowFields.get(i).getName(), i);}jedisCluster = RedisUtil.getJedisCluster(options.get(HOST_PORT));}@Overridepublic void close() throws Exception {RedisUtil.closeConn(jedisCluster);}@Override/*2> +I(1,30017323,1101)2> -U(1,30017323,1101)2> +U(2,30017323,1101)2> -U(2,30017323,1101)2> +U(3,30017323,1101)2> -U(3,30017323,1101)2> +U(4,30017323,1101)3> -U(3,980897,3208)3> +U(4,980897,3208)*/public void invoke(RowData rowData, Context context) {RowKind rowKind = rowData.getRowKind();Row data = (Row) converter.toExternal(rowData);if (rowKind.equals(RowKind.UPDATE_AFTER) || rowKind.equals(RowKind.INSERT)) {String keyTemplate = options.get(KEY_TEMPLATE);if (Objects.isNull(keyTemplate) || keyTemplate.trim().length() == 0) {throw new NullPointerException(" keyTemplate is null or keyTemplate is empty");}if (keyTemplate.contains("${")) {String[] split = keyTemplate.split("\\$\\{");keyTemplate = "";for (String s : split) {if (s.contains("}")) {String filedName = s.substring(0, s.length() - 1);int index = fields.get(filedName);keyTemplate = keyTemplate + data.getField(index).toString();} else {keyTemplate = keyTemplate + s;}}}String keyType = options.get(KEY_TYPE);String valueNames = options.get(VALUE_NAMES);// type=hash must need fieldTemplateif ("hash".equalsIgnoreCase(keyType)) {String fieldTemplate = options.get(FIELD_TEMPLATE);if (fieldTemplate.contains("${")) {String[] split = fieldTemplate.split("\\$\\{");fieldTemplate = "";for (String s : split) {if (s.contains("}")) {String fieldName = s.substring(0, s.length() - 1);int index = fields.get(fieldName);fieldTemplate = fieldTemplate + data.getField(index).toString();} else {fieldTemplate = fieldTemplate + s;}}}//fieldName = fieldTemplate-valueNameif (valueNames.contains(",")) {HashMap<String, String> map = new HashMap<>();String[] fieldNames = valueNames.split(",");for (String fieldName : fieldNames) {String value = data.getField(fields.get(fieldName)).toString();map.put(fieldTemplate + "_" + fieldName, value);}jedisCluster.hset(keyTemplate, map);} else {jedisCluster.hset(keyTemplate, fieldTemplate + "_" + valueNames, data.getField(fields.get(valueNames)).toString());}} else if ("set".equalsIgnoreCase(keyType)) {jedisCluster.set(keyTemplate, data.getField(fields.get(valueNames)).toString());} else if ("sadd".equalsIgnoreCase(keyType)) {jedisCluster.sadd(keyTemplate, data.getField(fields.get(valueNames)).toString());} else if ("zadd".equalsIgnoreCase(keyType)) {jedisCluster.sadd(keyTemplate, data.getField(fields.get(valueNames)).toString());} else {throw new IllegalArgumentException(" not find this keyType:" + keyType);}if (Objects.nonNull(options.get(EXPIRE_TIME))) {jedisCluster.expire(keyTemplate, options.get(EXPIRE_TIME));}}}}
}

4.使用 Redis Sink

因为 Flink 是通过 SPI 来发现服务的,所以需要先配置 service ![在这里插入图片描述](https://img-blog.csdnimg.cn/2020102015530087.png#pic_center)

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.scala.typeutils.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.time.Duration;/*** @author shengjk1* @date 2020/9/25*/
public class SqlKafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, environmentSettings);// enable checkpointingConfiguration configuration = tableEnv.getConfig().getConfiguration();configuration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE);configuration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(10));String sql = "CREATE TABLE sourcedata (`id` bigint,`status` int,`city_id` bigint,`courier_id` bigint,info_index int,order_id bigint,tableName String" +") WITH (" +"'connector' = 'kafka','topic' = 'xxx'," +"'properties.bootstrap.servers' = 'xxx','properties.group.id' = 'testGroup'," +"'format' = 'json','scan.startup.mode' = 'earliest-offset')";tableEnv.executeSql(sql);//15017284 distinctTable bigtable = tableEnv.sqlQuery("select distinct a.id,a.courier_id,a.status,a.city_id,b.info_index from (select id,status,city_id,courier_id from sourcedata where tableName = 'orders' and status=60)a join (select " +" order_id,max(info_index)info_index from sourcedata  where tableName = 'infos'  group by order_id )b on a.id=b.order_id");sql = "CREATE TABLE redis (info_index BIGINT,courier_id BIGINT,city_id BIGINT" +") WITH (" +"'connector' = 'redis'," +"'hostPort'='xxx'," +"'keyType'='hash'," +"'keyTemplate'='test2_${city_id}'," +"'fieldTemplate'='test2_${courier_id}'," +"'valueNames'='info_index,city_id'," +"'expireTime'='1000')";tableEnv.executeSql(sql);Table resultTable = tableEnv.sqlQuery("select sum(info_index)info_index,courier_id,city_id from " + bigtable + " group by city_id,courier_id");TupleTypeInfo<Tuple3<Long, Long, Long>> tupleType = new TupleTypeInfo<>(Types.LONG(),Types.LONG(),Types.LONG());tableEnv.toRetractStream(resultTable, tupleType).print("===== ");tableEnv.executeSql("INSERT INTO redis SELECT info_index,courier_id,city_id FROM " + resultTable);env.execute("");}
}

5.详细解释

create table test(
`id` bigint,`url` string,`day` string,`pv` long,`uv` long
) with {'connector'='redis','hostPort'='xxx','password'='','expireTime'='100','keyType'='hash','keyTemplate'='test_${id}','fieldTemplate'='${day}','valueNames'='pv,uv',
}redis result: 假设 id=1 day=20201016 pv=20,uv=20hashtest_1 20201016-pv 20,20201016-uv 20参数解释:
connector  固定写法
hostPort   redis 的地址
password   redis 的密码
expireTime  redis key 过期时间,单位为 s
keyType  redis key 的类型,目前有 hash、set、sadd、zadd
keyTemplate  redis key 的表达式,如 test_${id} 注意 id 为表的字段名
fieldTemplate  redis keyType==hash 时,此选项为必选,表达式规则同 keyTemplate
valueNames  redis value  only 可以有多个

执行的时候需要先配置

6.原理

  1. 整个流程如图,CatalogTable —> DynamicTableSource and DynamicTableSink 这个过程中,其实是通过 DynamicTableSourceFactory and DynamicTableSinkFactory 起到了一个桥梁的作用

  2. (Source/Sink)Factory 通过 connector=‘xxx’ 找到 (实际上是通过 SPI ),理论上会做三种操作
    1. validate options
    2. configure encoding/decoding formats( if required )
    3. create a parameterized instance of the table connector
    其中 formats 是通过 format=‘xxx’ 找到

  3. DynamicTableSource DynamicTableSink
    官网虽说可以看做是有状态的,但是否真的有状态取决于具体实现的 source 和 sink

  4. 生成 Runtime logic,Runtime logic 被 Flink core connector interfaces( 如 InputFormat or SourceFunction),如果是 kafka 的话 则 是 FlinkKafkaConsumer 实现,而这些实现又被抽象为 *Provider,然后开始执行 *Provider

  5. *Provider 是连接 SQL 与 Streaming 代码级别的桥梁

7.参考

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html

Flink SQL 自定义 Sink相关推荐

  1. 【Flink】Flink SQL 自定义 Source format

    1.概述 转载:Flink SQL 自定义 Source format 1.背景 由于 kafka 中的 json 属于嵌套,又不想二次序列化再把它展开,故自定义 format. 2.步骤 1.自定义 ...

  2. Flink SQL 自定义 Connector

    文章目录 一.Flink Table 实现架构图: 二.Flink Table Conncetor流程解析 1.定义动态工厂类 2.定义Connector Source/Sink类 (1)定义Conn ...

  3. Flink SQL自定义connector

    本文翻译自:Flink v1.11官方文档 动态表是Flink的Table&SQL API的核心概念,用于以统一方式处理有界和无界数据. 因为动态表只是一个逻辑概念,所以Flink并不拥有数据 ...

  4. Flink SQL 自定义 redis connector

    一般情况下,我们不需要创建新的 connector,因为 Flink SQL 已经内置了丰富的 connector 供我们使用,但是在实际生产环境中我们的存储是多种多样的,所以原生的 connecto ...

  5. Flink sql:Table sink doesn‘t support consuming update and delete changes which is produced by node

    一.问题描述 Flink sql将kafka作为join的输出,报错: Exception in thread "main" org.apache.flink.table.api. ...

  6. Flink SQL自定义聚合函数

    <2021年最新版大数据面试题全面开启更新> 本篇幅介绍Flink Table/SQL中如何自定义一个聚合函数,介绍其基本用法.撤回定义以及与源码结合分析每个方法的调用位置. 基本使用 F ...

  7. Flink从入门到精通100篇(十五)-Flink SQL FileSystem Connector 分区提交与自定义小文件合并策略 ​

    前言 本文先通过源码简单过一下分区提交机制的两个要素--即触发(trigger)和策略(policy)的实现,然后用合并小文件的实例说一下自定义分区提交策略的方法. PartitionCommitTr ...

  8. 1.10.Flink DataStreamAPI(API的抽象级别、Data Sources、connectors、Source容错性保证、Sink容错性保证、自定义sink、partition等)

    1.10.Flink DataStreamAPI 1.10.1.Flink API的抽象级别 1.10.2.DatSource部分详解 1.10.2.1.DataStream API之Data Sou ...

  9. flink sql 知其所以然(二)| 自定义 redis 数据维表(附源码)

    感谢您的关注  +  点赞 + 再看,对博主的肯定,会督促博主持续的输出更多的优质实战内容!!! 1.序篇-本文结构 背景篇-为啥需要 redis 维表 目标篇-做 redis 维表的预期效果是什么 ...

最新文章

  1. spring18-1:采用jdk的动态代理 proxy。
  2. cmd中如何切换指定目录
  3. 虚拟现实大会ChinaVR2015报告之-From Visual Content to Virtual Reality Data-driven Intelligence Production
  4. 4.2.4 磁盘的管理
  5. 设计模式:策略模式(Strategy)
  6. java socket通信安全_Java Socket通信
  7. ifix如何设画面大小_如何让你的视频又小又清晰?视频编码输出软件来了
  8. 嵌入式linux面试题解析(二)——C语言部分三
  9. #3328. PYXFIB(单位根反演)
  10. 慕尼黑大学公开课 Competitive Strategy(竞争策略)总结
  11. NumPy Essentials 带注释源码 六、NumPy 中的傅里叶分析
  12. Android 系统性能优化(45)---Android 多线程
  13. java视频切片_使用ffmpeg视频切片并加密和视频AES-128加密后播放
  14. springboot tomcat配置_Spring Boot项目如何同时支持HTTP和HTTPS协议
  15. 小程序界面设计难?有了这个高颜值UI,我带你一步步解决所有难题
  16. 肠道重要菌属——嗜胆菌属 (Bilophila)喜欢脂肪、耐胆汁的促炎菌
  17. 超级AI买家阿里巴巴
  18. 京沪高铁,终于给了日本。。
  19. 介绍一个产品的思维导图_产品运营怎么使用思维导图?四个方面阐释思维导图的运用...
  20. Oracle问题处理——DBCA无法识别当前已有数据库实例

热门文章

  1. 职场小故事,工作大道理——动物王国(11)
  2. Python-appium-behave app自动化测试环境搭建
  3. Newbe.Mahua 测试与调试
  4. html——简历设计
  5. QQ 群聊美少女语音AI(ChatGLM 本地化版本)
  6. rotateimage
  7. rotate函数css,CSS hue-rotate()用法及代码示例
  8. NLP算法-中文分词工具-Jieba
  9. java 注入依赖_依赖注入(Dependency Injection)
  10. 魏晋风骨敖千秋--2018年3月18日纪念一代大师李敖病逝