flink redis connector(支持flink sql)

1. 背景

  1. 工作原因,需要基于flink sql做redis sink,但bahir 分支的flink connector支持只是基于datastream,而需要支持flink sql,还需要进一步完善
  2. flink sql及flink table api按照flink官方社区,会是未来重点方向,包括python支持。因为所有的技术都会往使用友好方向发展,对外接口和使用友好,内部则会因为这个原因变得复杂起来。(但需要确定方向是否有误才行)

2. 步骤

  1. 环境
  1. jdk8
  2. idea 2020
  3. scala 2.11
  4. pom
<packaging>jar</packaging>
<name>Flink Connector SnowDrop</name><properties><java.version>1.8</java.version><flink.version>1.12.0</flink.version><log4j.version>2.12.1</log4j.version><scala.binary.version>2.11</scala.binary.version><maven.compiler.source>${java.version}</maven.compiler.source><maven.compiler.target>${java.version}</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><mongo.driver.version>3.8.2</mongo.driver.version><fastjson.version>1.2.75</fastjson.version><connector_redis>1.0</connector_redis></properties><repositories><repository><id>apache.snapshots</id><name>Apache Development Snapshot Repository</name><url>https://repository.apache.org/content/repositories/snapshots/</url><releases><enabled>false</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository></repositories><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- This dependency is provided, because it should not be packaged into the JAR file. --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!--        <dependency>-->
<!--            <groupId>org.apache.flink</groupId>-->
<!--            <artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId>-->
<!--            <version>${flink.version}</version>-->
<!--&lt;!&ndash;            <scope>provided</scope>&ndash;&gt;-->
<!--        </dependency>--><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.34</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-avro</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-parquet_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-orc_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!--     测试代码,添加进来的依赖   --><!-- 以下是打包时需要添加进去的依赖 -->
<!--        <dependency>-->
<!--            <groupId>org.apache.bahir</groupId>-->
<!--            <artifactId>flink-connector-redis_${scala.binary.version}</artifactId>-->
<!--            <version>${connector_redis}</version>-->
<!--            <exclusions>-->
<!--                <exclusion>-->
<!--                    <groupId>redis.clients</groupId>-->
<!--                    <artifactId>jedis</artifactId>-->
<!--                </exclusion>-->
<!--            </exclusions>-->
<!--        </dependency>--><dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.1-SNAPSHOT</version><scope>system</scope><systemPath>${project.basedir}/src/main/resources/flink-connector-redis_2.11-1.1-SNAPSHOT.jar</systemPath></dependency><!--  jedis 2.8.0 有问题,单独引入,3.1.0  2.9.0 --><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.9.0</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><dependency><groupId>org.mongodb</groupId><artifactId>mongodb-driver</artifactId><version>${mongo.driver.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.mongodb/mongodb-driver-core --><dependency><groupId>org.mongodb</groupId><artifactId>mongodb-driver-core</artifactId><version>${mongo.driver.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.mongodb/bson --><dependency><groupId>org.mongodb</groupId><artifactId>bson</artifactId><version>${mongo.driver.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency></dependencies><build><plugins><!-- Java Compiler --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${java.version}</source><target>${java.version}</target></configuration></plugin><!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --><!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.flink:force-shading</exclude><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>org.apache.logging.log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters></configuration></execution></executions></plugin></plugins><pluginManagement><plugins><!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --><plugin><groupId>org.eclipse.m2e</groupId><artifactId>lifecycle-mapping</artifactId><version>1.0.0</version><configuration><lifecycleMappingMetadata><pluginExecutions><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><versionRange>[3.1.1,)</versionRange><goals><goal>shade</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><versionRange>[3.1,)</versionRange><goals><goal>testCompile</goal><goal>compile</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution></pluginExecutions></lifecycleMappingMetadata></configuration></plugin></plugins></pluginManagement></build>

注意,jedis 2.8.0会抱一个number解析的错误,升级到2.9.0及以上可以解决

  1. 注意点
<dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version>
</dependency>

这是bahir官方发布的jar包,但实际上使用时有几个需要注意的点

  1. 不支持ttl和additional key
  2. 内置的jedis版本是2.8.0,对接redis 4.0以上较新版本,会出现上图中解析cluster nodes 地址host和port信息出错的问题。
  3. 上述jedis版本低问题,可以通过maven的pom文件中exclude掉内置低版本的jedis再单独引入新版本如2.9.0版本的jedis,但ttl和additional key的支持需要使用源码编译出来的jar包才行

bahir flink 的github地址

下载下来bahir flink的源码后,跳到redis对应的子工程的pom文件中,然后调用maven的clean ,build命令,选择target目录下的jar包
打出来的jar包,以本地maven方式依赖到需要使用的idea工程中,具体可以参考如下:

<dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.1-SNAPSHOT</version><scope>system</scope><systemPath>${project.basedir}/src/main/resources/flink-connector-redis_2.11-1.1-SNAPSHOT.jar</systemPath></dependency>

直接以本地jar包方式,添加进去后,会发现无法调用package打包成功,报错,redis相关的类在包的classpath中找不到。所以采用上述本地maven方式加载可以找到

注意,flink中相关模块都是独立的,需要使用时单独引入,这是flink为了保证使用者按需使用,包括常见的flink json,flink csv等都是需要在pom文件中单独引入。但flink 安装目录的lib目录下也会包含常见的jar包,具体大家实际去看每个版本的flink 安装目录lib目录下的jar包。

  1. 具体代码
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.FactoryUtil;import java.util.HashSet;
import java.util.Set;public class RedisDynamicTableSourceFactory implements DynamicTableSinkFactory {public static final ConfigOption<String> host = ConfigOptions.key("host").stringType().noDefaultValue();public static final ConfigOption<Integer> port = ConfigOptions.key("port").intType().noDefaultValue();@Overridepublic DynamicTableSink createDynamicTableSink(Context context) {final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);helper.validate();ReadableConfig options = helper.getOptions();return new RedisDynamicTableSink(options);}@Overridepublic String factoryIdentifier() {return "redis";}@Overridepublic Set<ConfigOption<?>> requiredOptions() {Set<ConfigOption<?>> options = new HashSet();options.add(host);return options;}@Overridepublic Set<ConfigOption<?>> optionalOptions() {Set<ConfigOption<?>> options = new HashSet();options.add(port);return options;}
}
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;public class RedisDynamicTableSink implements DynamicTableSink {private ReadableConfig options;public RedisDynamicTableSink(ReadableConfig options) {this.options = options;}@Overridepublic ChangelogMode getChangelogMode(ChangelogMode changelogMode) {return ChangelogMode.insertOnly();}@Overridepublic SinkRuntimeProvider getSinkRuntimeProvider(Context context) {String host = options.get(RedisDynamicTableSourceFactory.host);FlinkJedisPoolConfig.Builder builder = new FlinkJedisPoolConfig.Builder().setHost(host);Integer port = options.get(RedisDynamicTableSourceFactory.port);if(port != null){builder.setPort(port);}FlinkJedisPoolConfig build = builder.build();RedisMapper<RowData> stringRedisMapper = new RedisMapper<RowData>() {@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.SET);}@Overridepublic String getKeyFromData(RowData rowData) {StringData string = rowData.getString(0);return string.toString();}@Overridepublic String getValueFromData(RowData rowData) {String s = rowData.toString();return s;}};RedisSink<RowData> stringRedisSink = new RedisSink<>(build, stringRedisMapper);return SinkFunctionProvider.of(stringRedisSink);}@Overridepublic DynamicTableSink copy() {return new RedisDynamicTableSink(this.options);}@Overridepublic String asSummaryString() {return "my_redis_sink";}
}

由于公司代码不好粘贴出来,借鉴其他博客的代码,原理是一致的。链接如下:https://blog.csdn.net/u010034713/article/details/109191934

  1. 注意,redis的sink其实基于datastream之上实现了一层,实现时区分不同command type类型如set,lpush等。
  2. redis本身有3种部署模式,cluster模式,主从模式,ha的setinental模式(哨兵模式),一般使用cluster模式较多。相关操作指令,大家可以看runoob的redis教程,安装教程推荐
    redis安装教程
    redis 客户端,建议another redis desktop manager,开源的软件
    如遇到moved error问题,可以查阅相关资料,一般是redis cli启动参数或者是redis的conf中配置需要设置一下,资料较多,就不赘述。
  3. 注意,flink的redis实现中,针对set, zset, hash,list的实现,有一些是没有additional key的如lpush,这就需要在实现RedisMapper的key和value方法时多多注意,特别是zset方法,其实是把key value倒过来,因为zset有一个score权重,源码中实现是把value当成score,key当成member来存放的。所以需要多多注意。

flink redis connector(支持flink sql)相关推荐

  1. Flink JDBC Connector:Flink 与数据库集成最佳实践

    整理:陈政羽(Flink 社区志愿者) 摘要:Flink 1.11 引入了 CDC,在此基础上, JDBC Connector 也发生比较大的变化,本文由 Apache Flink Contribut ...

  2. flink jdbc connector支持clickhouse

    1.业务背景 业务需求把数据写入clickhouse,同时还需要支持主键更新.目前使用的开源flink1.11版本是不支持clickhouse的,项目中使用的是flink sql 所以需要对源代码进行 ...

  3. Flink SQL 自定义 redis connector

    一般情况下,我们不需要创建新的 connector,因为 Flink SQL 已经内置了丰富的 connector 供我们使用,但是在实际生产环境中我们的存储是多种多样的,所以原生的 connecto ...

  4. Flink Redis Sink

    文章目录 官方API 自定义Redis Sink 官方API flink提供了专门操作redis的Redis Sink 依赖 <dependency><groupId>org. ...

  5. Flink CDC 系列(3)—— Flink CDC MySQL Connector 与 Flink SQL 的结合使用案例Demo

    Flink CDC 系列文章: <Flink CDC 系列(1)-- 什么是 Flink CDC> <Flink CDC 系列(2)-- Flink CDC 源码编译> < ...

  6. flink mysql connector_Flink JDBC Connector:Flink 与数据库集成最佳实践

    整理:陈政羽(Flink 社区志愿者) 摘要:Flink 1.11 引入了 CDC,在此基础上, JDBC Connector 也发生比较大的变化,本文由 Apache Flink Contribut ...

  7. Flink的Table API 与SQL的流处理

    1 流处理与SQL的区别   Table API和SQL,本质上还是基于关系型表的操作方式:而关系型表.SQL本身,一般是有界的,更适合批处理的场景.所以在流处理的过程中,有一些特殊概念. SQL 流 ...

  8. Flink的Table API 与SQL介绍及调用

    1 概述    DataSetAPI和DateStreamAPI是基于整个Flink的运行时环境做操作处理的,Table API和SQL是在DateStreamAPI上又包了一层.对于新版本的Blin ...

  9. Flink写RocketMQ支持动态UserProperty

    Flink写RocketMQ支持动态UserProperty Flink version: 1.14.0 RocketMQ version: 4.5.2 Github: https://github. ...

最新文章

  1. centos7 yum源安装ruby27方法
  2. linux asm 磁盘管理,asm磁盘管理篇
  3. 国内手机产业混乱:产业一窝蜂 企业捞快钱
  4. jQuery UI基础 学习笔记
  5. 导入失败 mysql_服务器宕机导致mysql出问题的处理方法
  6. modelsim仿真不出波形_直接扩频通信(下)仿真
  7. 用计算机和手算标准差不一致,计量师基础知识教案二第三章.ppt
  8. JavaScript对话框
  9. 维也纳新生生活指南(2018春季版)
  10. 中国各民族名称的罗马字母拼写法和代码-GBT 3304-1991
  11. 苹果按键强制恢复出厂_【数码】苹果手机忘了解锁密码不要慌,你可以这样做!...
  12. python drop用法,python数据处理--pandas的drop函数
  13. 怎么获得MIUI12系统的root权限
  14. linux下如何打开iso文件夹,Linux下打开ISO文件两种方法
  15. MacPro 迁移至 Mac Mini-M1 与 踩坑 For 后端开发
  16. Twincat3报错AdsWarning: 4115 (0x1013, RTIME: system clock setup fails
  17. 科技云报道:云密码,开辟网络安全的新“蓝海”
  18. 正则表达式匹配,2位数字,单个字符-华图在线试题修改
  19. 算法基础:4115:鸣人和佐助--广度优先搜索
  20. iOS 开发,工程中混合使用 ARC 和非ARC

热门文章

  1. 逻辑斯第回归、softmax分类与多层感知器
  2. 无法参加2022年6月PMP考试怎么办?
  3. 算法面试必备-----数据库与SQL面试题
  4. 2023健康展/山东睡眠健康展/养生保健展/产后恢复与健康展
  5. 针对视频剪辑软件调研
  6. 判断某年某月有多少天
  7. vue3如何暴露内容
  8. 求a和b的最大公约数
  9. 第四大主流币XRP说归零就要归零,那么其他主流币还靠得住吗?暴跌来了,你怎么又不敢卖房梭哈了?
  10. 文件已在explorer.exe中打开无法删除,解决办法如下,全图