一般情况下,我们不需要创建新的 connector,因为 Flink SQL 已经内置了丰富的 connector 供我们使用,但是在实际生产环境中我们的存储是多种多样的,所以原生的 connector 并不能满足所有用户的需求,这个时候就需要我们自定义 connector,这篇文章的重点就是介绍一下如何实现自定义 Flink SQL connector ?

先来看一下官网的一张 connector 架构图:

Metadata

Table API 和 SQL 都是声明式 API。这包括表的声明。因此,执行 CREATE TABLE 语句会导致目标目录中的元数据更新。对于大多数目录实现,不会为此类操作修改外部系统中的物理数据。特定于连接器的依赖项不必出现在类路径中。WITH 子句中声明的选项既不经过验证也不以其他方式解释。动态表的元数据(通过 DDL 创建或由目录提供)表示为 CatalogTable 的实例。必要时,表名将在内部解析为 CatalogTable。

Planning

在规划和优化表程序时,需要将 CatalogTable 解析为 DynamicTableSource(用于在 SELECT 查询中读取)和 DynamicTableSink(用于在 INSERT INTO 语句中写入)。

DynamicTableSourceFactory 和 DynamicTableSinkFactory 提供特定于连接器的逻辑,用于将 CatalogTable 的元数据转换为 DynamicTableSource 和 DynamicTableSink 的实例。在大多数情况下,工厂的目的是验证选项(例如示例中的 'port' = '5022')、配置编码/解码格式(如果需要)以及创建表连接器的参数化实例。

默认情况下,使用 Java 的服务提供者接口 (SPI) 发现 DynamicTableSourceFactory 和 DynamicTableSinkFactory 的实例。连接器选项(例如示例中的 'connector' = 'custom')必须对应于有效的工厂标识符。

尽管在类命名中可能不明显,但 DynamicTableSource 和 DynamicTableSink 也可以被视为有状态的工厂,它们最终为读取/写入实际数据生成具体的运行时实现。

规划器使用源和接收器实例来执行特定于连接器的双向通信,直到找到最佳逻辑计划。根据可选声明的能力接口(例如 SupportsProjectionPushDown 或 SupportsOverwrite),规划器可能会对实例应用更改,从而改变生成的运行时实现。

Runtime

一旦逻辑规划完成,规划器将从表连接器获取运行时实现。运行时逻辑在 Flink 的核心连接器接口中实现,例如 InputFormat 或 SourceFunction。

这些接口按另一个抽象级别分组为 ScanRuntimeProvider、LookupRuntimeProvider 和 SinkRuntimeProvider 的子类。

例如,OutputFormatProvider(提供 org.apache.flink.api.common.io.OutputFormat)和 SinkFunctionProvider(提供 org.apache.flink.streaming.api.functions.sink.SinkFunction)都是 SinkRuntimeProvider 的具体实例,规划器可以 处理。

自定义 redis sink connector

大概需要下面 4 个过程:

  1. 自定义 Factory,根据需要实现 DynamicTableSourceFactory, DynamicTableSinkFactory.

  2. 自定义 TableSink, 实现 DynamicTableSink

  3. 定义 Options 也就是 connector 相关的属性

  4. 在 resource 下面添加配置文件 org.apache.flink.table.factories.Factory 里面添加 Factory 的全限定名

Factory

package flink.connector.redis;import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import java.util.HashSet;
import java.util.Set;/*** 自定义 Factory **/
public class RedisDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {@Overridepublic DynamicTableSink createDynamicTableSink(Context context) {final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);helper.validate();ReadableConfig options = helper.getOptions();return new RedisDynamicTableSink(options);}/*** 这里没有实现 source * @param context* @return*/@Overridepublic DynamicTableSource createDynamicTableSource(Context context) {return null;}@Overridepublic String factoryIdentifier() {return "redis";}@Overridepublic Set<ConfigOption<?>> requiredOptions() {final Set<ConfigOption<?>> options = new HashSet();options.add(RedisOptions.HOST);options.add(RedisOptions.PORT);return options;}@Overridepublic Set<ConfigOption<?>> optionalOptions() {final Set<ConfigOption<?>> options = new HashSet();options.add(RedisOptions.EXPIRE);return options;}
}

TableSink

package flink.connector.redis;import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.types.RowKind;
import static flink.connector.redis.RedisOptions.*;/*** 自定义 DynamicTableSink **/
public class RedisDynamicTableSink implements DynamicTableSink {private ReadableConfig options;public RedisDynamicTableSink(ReadableConfig options) {this.options = options;}@Overridepublic ChangelogMode getChangelogMode(ChangelogMode requestedMode) {return ChangelogMode.newBuilder().addContainedKind(RowKind.INSERT).addContainedKind(RowKind.UPDATE_BEFORE).addContainedKind(RowKind.UPDATE_AFTER).build();}@Overridepublic SinkRuntimeProvider getSinkRuntimeProvider(Context context) {// 获取 redis 的 host 和 portString host = options.getOptional(HOST).get();Integer port = options.getOptional(PORT).get();Integer expire = options.getOptional(EXPIRE).get();MyRedisSink myRedisSink = new MyRedisSink(host, port, expire);return SinkFunctionProvider.of(myRedisSink);}@Overridepublic DynamicTableSink copy() {return new RedisDynamicTableSink(this.options);}@Overridepublic String asSummaryString() {return "redis table sink";}
}
package flink.connector.redis;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.table.data.RowData;
import redis.clients.jedis.Jedis;/*** 自定义 sink 写入 redis**/
public class MyRedisSink extends RichSinkFunction<RowData> {private final String host;private final int port;private int expire;private Jedis jedis;public MyRedisSink(String host, int port, int expire) {this.host = host;this.port = port;this.expire = expire;}@Overridepublic void open(Configuration parameters) throws Exception {this.jedis = new Jedis(host, port);}@Overridepublic void invoke(RowData value, Context context) throws Exception {this.jedis.set(String.valueOf(value.getString(0)), String.valueOf(value.getInt(1)), "NX", "EX", expire);}@Overridepublic void close() throws Exception {this.jedis.close();}
}

这里用的是 Jedis 没有使用 Flink 自带的 redis connector ,因为 Flink 自带的功能有限,很多功能都需要自己扩展,所以就直接使用 Jedis.我这里只是为了演示,只实现了最简单的 set 功能.

Options

package flink.connector.redis;import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;/** * Option utils for redis table source sink. */
public class RedisOptions {private RedisOptions() {}public static final ConfigOption<String> HOST =ConfigOptions.key("host").stringType().noDefaultValue().withDescription("The Redis table host.");public static final ConfigOption<Integer> PORT =ConfigOptions.key("port").intType().defaultValue(6379).withDescription("The Redis table port.");public static final ConfigOption<Integer> EXPIRE =ConfigOptions.key("expire").intType().noDefaultValue().withDescription("The Redis table expire time.");
}

所有 redis 相关的属性都可以在这里添加,比如用户名密码,连接池相关的配置等.

配置文件

最后也是最重要的一点就是在 resource 下面添加配置文件,因为 Flink 是通过 SPI 机制来发现工厂的,如果有不了解 SPI 机制的可以看前面这篇文章,注意这个路径一定不要写错.

到这里基本就完成了,下面来测试一下自定义的 connector 能否把数据准确的写入到 redis 里面.

使用&测试

// 定义数据源表
tEnv.executeSql("""|CREATE TABLE datagen (| f_sequence INT,| f_random INT,| f_random_str STRING,| ts AS localtimestamp,| WATERMARK FOR ts AS ts|) WITH (| 'connector' = 'datagen',| -- optional options --| 'rows-per-second'='1',| 'fields.f_sequence.kind'='sequence',| 'fields.f_sequence.start'='1',| 'fields.f_sequence.end'='20',| 'fields.f_random.min'='1',| 'fields.f_random.max'='1000',| 'fields.f_random_str.length'='10'|)|""".stripMargin)// 定义 redis 表tEnv.executeSql("""|create table redis_sink (|f1 STRING,|f2 INT|) WITH (|'connector' = 'redis',|'host' = 'xxx',|'port' = '6379',|'expire' = '100'|)|""".stripMargin)// 执行插入 SQLtEnv.executeSql("""|insert into redis_sink|select f_random_str,f_random|from datagen|""".stripMargin)

上面的 datagen 会产生 20 条数据.执行上面的 SQL 然后查询一下 redis 打印的数据如下:

68652c3a52 : 396
de3044d6d0 : 248
b09690ec10 : 436
dab4bb9ea9 : 821
d57a47d883 : 134
4d3d23767a : 63
9ca712a25f : 527
cb3019326d : 164
4a4af63f89 : 803
3cb960dbf1 : 575
db95bf7590 : 500
4274665b4b : 910
5c27396cb1 : 993
c1d957a2c8 : 951
8b24d7abe2 : 66
817b59d742 : 354
baa51bb58a : 14
db32f9cd53 : 510
3c5db2220b : 44
7c169eaef9 : 160

通过上面的 Demo,相信大家对自定义 Flink SQL connector 已经有所了解,那在生产环境中就可以根据自己的需求去定制各种 connector 了.

推荐阅读

Flink SQL 如何实现列转行?

Flink SQL 结合 HiveCatalog 使用

Flink SQL 解析嵌套的 JSON 数据

Flink SQL 中动态修改 DDL 的属性

Flink WindowAssigner 源码解析

Flink 1.11.x WatermarkStrategy 不兼容问题

Flink mysql-cdc connector 源码解析

Java SPI 机制在 Flink 中的应用(源码分析)

Apache Flink 1.13.0 发布公告

Flink 1.13.0 反压监控的优化

如果你觉得文章对你有帮助,麻烦点一下在看吧,你的支持是我创作的最大动力.

Flink SQL 自定义 redis connector相关推荐

  1. 【Flink】Flink SQL 自定义 Source format

    1.概述 转载:Flink SQL 自定义 Source format 1.背景 由于 kafka 中的 json 属于嵌套,又不想二次序列化再把它展开,故自定义 format. 2.步骤 1.自定义 ...

  2. Flink SQL 自定义 Connector

    文章目录 一.Flink Table 实现架构图: 二.Flink Table Conncetor流程解析 1.定义动态工厂类 2.定义Connector Source/Sink类 (1)定义Conn ...

  3. Flink SQL自定义connector

    本文翻译自:Flink v1.11官方文档 动态表是Flink的Table&SQL API的核心概念,用于以统一方式处理有界和无界数据. 因为动态表只是一个逻辑概念,所以Flink并不拥有数据 ...

  4. Flink SQL 自定义 Sink

    1. 背景 2. 步骤 3.自定义 sink 代码 4. 使用 Redis Sink 5.详细解释 6.原理 7.参考 1.背景 内部要做 Flink SQL 平台,本文以自定义 Redis Sink ...

  5. 【Flink 实战系列】Flink SQL 使用 filesystem connector 同步 Kafka 数据到 HDFS(parquet 格式 + snappy 压缩)

    Flink SQL 同步 Kafka 数据到 HDFS(parquet + snappy) 在上一篇文章中,我们用 datastream API 实现了从 Kafka 读取数据写到 HDFS 并且用 ...

  6. Flink SQL自定义聚合函数

    <2021年最新版大数据面试题全面开启更新> 本篇幅介绍Flink Table/SQL中如何自定义一个聚合函数,介绍其基本用法.撤回定义以及与源码结合分析每个方法的调用位置. 基本使用 F ...

  7. flink sql 知其所以然(二)| 自定义 redis 数据维表(附源码)

    感谢您的关注  +  点赞 + 再看,对博主的肯定,会督促博主持续的输出更多的优质实战内容!!! 1.序篇-本文结构 背景篇-为啥需要 redis 维表 目标篇-做 redis 维表的预期效果是什么 ...

  8. flink redis connector(支持flink sql)

    flink redis connector(支持flink sql) 1. 背景 工作原因,需要基于flink sql做redis sink,但bahir 分支的flink connector支持只是 ...

  9. 基于数据库数据增量同步_基于 Flink SQL CDC 的实时数据同步方案

    简介:Flink 1.11 引入了 Flink SQL CDC,CDC 能给我们数据和业务间能带来什么变化?本文由 Apache Flink PMC,阿里巴巴技术专家伍翀 (云邪)分享,内容将从传统的 ...

最新文章

  1. Ie6/ie7 不支持 JSON
  2. codesmith使用的一个小问题
  3. 爬取热门网站的热榜,集中展示
  4. java adf是什么_在ArcIMS9.2中使用JAVA ADF实现图层要素的查询
  5. 群同态基本定理证明_群同态基本定理II
  6. Android中WebView和JavaScript进行简单通信
  7. JAVA里plain_Java中POJO及其细分XO、DAO的概念
  8. 主成分分析 与 因子分析
  9. python nonetype iterable_无法解决“NoneType”对象不是iterable类型
  10. CleanMyMac X 4.8版本更新!
  11. curl_multi异步高并发服务实现
  12. 信息安全等级测评内容
  13. Java学习 day7 (常用API)Scanner类.Random类.Arraylist类
  14. 免校准的电量计量芯片_免校准电能计量芯片,让家电智能化更简单
  15. JVM和ART、DVM(dalvik VM)的区别
  16. 烧钱数亿后,趣店罗敏的预制菜业务从兴到衰
  17. 回顾过去 展望未来(写给自己)
  18. 基于 LSTM 的分布式能源发电预测(Matlab代码实现)
  19. 04.TFT_RGB接口时序分析
  20. 广告营销DSP和DMP概念解释

热门文章

  1. Axure RP8 图片手风琴
  2. 科普介绍 | 想要开展人工智能教育?一文了解创客教育中常见的视觉摄像头
  3. 生活里的插曲——致《奔跑吧,兄弟》
  4. 分享一些web前端工程师常用且实用的网站
  5. 渗透测试常用术语总结
  6. linux下pfam使用方法,使用pfam-scan进行预测
  7. VR资讯——局势明朗下的前景展望(V客学院知识分享)
  8. Android应用开发 00:Jetpack Compose学习 生日贺卡 图片 Compose象限 名片
  9. (2008-10-31)山西五日自助游记
  10. Linux网络环境搭建