flink redis connector(支持flink sql)
flink redis connector(支持flink sql)
1. 背景
- 工作原因,需要基于flink sql做redis sink,但bahir 分支的flink connector支持只是基于datastream,而需要支持flink sql,还需要进一步完善
- flink sql及flink table api按照flink官方社区,会是未来重点方向,包括python支持。因为所有的技术都会往使用友好方向发展,对外接口和使用友好,内部则会因为这个原因变得复杂起来。(但需要确定方向是否有误才行)
2. 步骤
- 环境
- jdk8
- idea 2020
- scala 2.11
- pom
<packaging>jar</packaging>
<name>Flink Connector SnowDrop</name><properties><java.version>1.8</java.version><flink.version>1.12.0</flink.version><log4j.version>2.12.1</log4j.version><scala.binary.version>2.11</scala.binary.version><maven.compiler.source>${java.version}</maven.compiler.source><maven.compiler.target>${java.version}</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><mongo.driver.version>3.8.2</mongo.driver.version><fastjson.version>1.2.75</fastjson.version><connector_redis>1.0</connector_redis></properties><repositories><repository><id>apache.snapshots</id><name>Apache Development Snapshot Repository</name><url>https://repository.apache.org/content/repositories/snapshots/</url><releases><enabled>false</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository></repositories><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- This dependency is provided, because it should not be packaged into the JAR file. --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId>-->
<!-- <version>${flink.version}</version>-->
<!--<!– <scope>provided</scope>–>-->
<!-- </dependency>--><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.34</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-avro</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-parquet_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-orc_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- 测试代码,添加进来的依赖 --><!-- 以下是打包时需要添加进去的依赖 -->
<!-- <dependency>-->
<!-- <groupId>org.apache.bahir</groupId>-->
<!-- <artifactId>flink-connector-redis_${scala.binary.version}</artifactId>-->
<!-- <version>${connector_redis}</version>-->
<!-- <exclusions>-->
<!-- <exclusion>-->
<!-- <groupId>redis.clients</groupId>-->
<!-- <artifactId>jedis</artifactId>-->
<!-- </exclusion>-->
<!-- </exclusions>-->
<!-- </dependency>--><dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.1-SNAPSHOT</version><scope>system</scope><systemPath>${project.basedir}/src/main/resources/flink-connector-redis_2.11-1.1-SNAPSHOT.jar</systemPath></dependency><!-- jedis 2.8.0 有问题,单独引入,3.1.0 2.9.0 --><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.9.0</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><dependency><groupId>org.mongodb</groupId><artifactId>mongodb-driver</artifactId><version>${mongo.driver.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.mongodb/mongodb-driver-core --><dependency><groupId>org.mongodb</groupId><artifactId>mongodb-driver-core</artifactId><version>${mongo.driver.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.mongodb/bson --><dependency><groupId>org.mongodb</groupId><artifactId>bson</artifactId><version>${mongo.driver.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency></dependencies><build><plugins><!-- Java Compiler --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${java.version}</source><target>${java.version}</target></configuration></plugin><!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --><!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.flink:force-shading</exclude><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>org.apache.logging.log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters></configuration></execution></executions></plugin></plugins><pluginManagement><plugins><!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --><plugin><groupId>org.eclipse.m2e</groupId><artifactId>lifecycle-mapping</artifactId><version>1.0.0</version><configuration><lifecycleMappingMetadata><pluginExecutions><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><versionRange>[3.1.1,)</versionRange><goals><goal>shade</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><versionRange>[3.1,)</versionRange><goals><goal>testCompile</goal><goal>compile</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution></pluginExecutions></lifecycleMappingMetadata></configuration></plugin></plugins></pluginManagement></build>
注意,jedis 2.8.0会抱一个number解析的错误,升级到2.9.0及以上可以解决
- 注意点
<dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version>
</dependency>
这是bahir官方发布的jar包,但实际上使用时有几个需要注意的点
- 不支持ttl和additional key
- 内置的jedis版本是2.8.0,对接redis 4.0以上较新版本,会出现上图中解析cluster nodes 地址host和port信息出错的问题。
- 上述jedis版本低问题,可以通过maven的pom文件中exclude掉内置低版本的jedis再单独引入新版本如2.9.0版本的jedis,但ttl和additional key的支持需要使用源码编译出来的jar包才行
bahir flink 的github地址
下载下来bahir flink的源码后,跳到redis对应的子工程的pom文件中,然后调用maven的clean ,build命令,选择target目录下的jar包
打出来的jar包,以本地maven方式依赖到需要使用的idea工程中,具体可以参考如下:
<dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.1-SNAPSHOT</version><scope>system</scope><systemPath>${project.basedir}/src/main/resources/flink-connector-redis_2.11-1.1-SNAPSHOT.jar</systemPath></dependency>
直接以本地jar包方式,添加进去后,会发现无法调用package打包成功,报错,redis相关的类在包的classpath中找不到。所以采用上述本地maven方式加载可以找到
注意,flink中相关模块都是独立的,需要使用时单独引入,这是flink为了保证使用者按需使用,包括常见的flink json,flink csv等都是需要在pom文件中单独引入。但flink 安装目录的lib目录下也会包含常见的jar包,具体大家实际去看每个版本的flink 安装目录lib目录下的jar包。
- 具体代码
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.FactoryUtil;import java.util.HashSet;
import java.util.Set;public class RedisDynamicTableSourceFactory implements DynamicTableSinkFactory {public static final ConfigOption<String> host = ConfigOptions.key("host").stringType().noDefaultValue();public static final ConfigOption<Integer> port = ConfigOptions.key("port").intType().noDefaultValue();@Overridepublic DynamicTableSink createDynamicTableSink(Context context) {final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);helper.validate();ReadableConfig options = helper.getOptions();return new RedisDynamicTableSink(options);}@Overridepublic String factoryIdentifier() {return "redis";}@Overridepublic Set<ConfigOption<?>> requiredOptions() {Set<ConfigOption<?>> options = new HashSet();options.add(host);return options;}@Overridepublic Set<ConfigOption<?>> optionalOptions() {Set<ConfigOption<?>> options = new HashSet();options.add(port);return options;}
}
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;public class RedisDynamicTableSink implements DynamicTableSink {private ReadableConfig options;public RedisDynamicTableSink(ReadableConfig options) {this.options = options;}@Overridepublic ChangelogMode getChangelogMode(ChangelogMode changelogMode) {return ChangelogMode.insertOnly();}@Overridepublic SinkRuntimeProvider getSinkRuntimeProvider(Context context) {String host = options.get(RedisDynamicTableSourceFactory.host);FlinkJedisPoolConfig.Builder builder = new FlinkJedisPoolConfig.Builder().setHost(host);Integer port = options.get(RedisDynamicTableSourceFactory.port);if(port != null){builder.setPort(port);}FlinkJedisPoolConfig build = builder.build();RedisMapper<RowData> stringRedisMapper = new RedisMapper<RowData>() {@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.SET);}@Overridepublic String getKeyFromData(RowData rowData) {StringData string = rowData.getString(0);return string.toString();}@Overridepublic String getValueFromData(RowData rowData) {String s = rowData.toString();return s;}};RedisSink<RowData> stringRedisSink = new RedisSink<>(build, stringRedisMapper);return SinkFunctionProvider.of(stringRedisSink);}@Overridepublic DynamicTableSink copy() {return new RedisDynamicTableSink(this.options);}@Overridepublic String asSummaryString() {return "my_redis_sink";}
}
由于公司代码不好粘贴出来,借鉴其他博客的代码,原理是一致的。链接如下:https://blog.csdn.net/u010034713/article/details/109191934
- 注意,redis的sink其实基于datastream之上实现了一层,实现时区分不同command type类型如set,lpush等。
- redis本身有3种部署模式,cluster模式,主从模式,ha的setinental模式(哨兵模式),一般使用cluster模式较多。相关操作指令,大家可以看runoob的redis教程,安装教程推荐
redis安装教程
redis 客户端,建议another redis desktop manager,开源的软件
如遇到moved error问题,可以查阅相关资料,一般是redis cli启动参数或者是redis的conf中配置需要设置一下,资料较多,就不赘述。- 注意,flink的redis实现中,针对set, zset, hash,list的实现,有一些是没有additional key的如lpush,这就需要在实现RedisMapper的key和value方法时多多注意,特别是zset方法,其实是把key value倒过来,因为zset有一个score权重,源码中实现是把value当成score,key当成member来存放的。所以需要多多注意。
flink redis connector(支持flink sql)相关推荐
- Flink JDBC Connector:Flink 与数据库集成最佳实践
整理:陈政羽(Flink 社区志愿者) 摘要:Flink 1.11 引入了 CDC,在此基础上, JDBC Connector 也发生比较大的变化,本文由 Apache Flink Contribut ...
- flink jdbc connector支持clickhouse
1.业务背景 业务需求把数据写入clickhouse,同时还需要支持主键更新.目前使用的开源flink1.11版本是不支持clickhouse的,项目中使用的是flink sql 所以需要对源代码进行 ...
- Flink SQL 自定义 redis connector
一般情况下,我们不需要创建新的 connector,因为 Flink SQL 已经内置了丰富的 connector 供我们使用,但是在实际生产环境中我们的存储是多种多样的,所以原生的 connecto ...
- Flink Redis Sink
文章目录 官方API 自定义Redis Sink 官方API flink提供了专门操作redis的Redis Sink 依赖 <dependency><groupId>org. ...
- Flink CDC 系列(3)—— Flink CDC MySQL Connector 与 Flink SQL 的结合使用案例Demo
Flink CDC 系列文章: <Flink CDC 系列(1)-- 什么是 Flink CDC> <Flink CDC 系列(2)-- Flink CDC 源码编译> < ...
- flink mysql connector_Flink JDBC Connector:Flink 与数据库集成最佳实践
整理:陈政羽(Flink 社区志愿者) 摘要:Flink 1.11 引入了 CDC,在此基础上, JDBC Connector 也发生比较大的变化,本文由 Apache Flink Contribut ...
- Flink的Table API 与SQL的流处理
1 流处理与SQL的区别 Table API和SQL,本质上还是基于关系型表的操作方式:而关系型表.SQL本身,一般是有界的,更适合批处理的场景.所以在流处理的过程中,有一些特殊概念. SQL 流 ...
- Flink的Table API 与SQL介绍及调用
1 概述 DataSetAPI和DateStreamAPI是基于整个Flink的运行时环境做操作处理的,Table API和SQL是在DateStreamAPI上又包了一层.对于新版本的Blin ...
- Flink写RocketMQ支持动态UserProperty
Flink写RocketMQ支持动态UserProperty Flink version: 1.14.0 RocketMQ version: 4.5.2 Github: https://github. ...
最新文章
- centos7 yum源安装ruby27方法
- linux asm 磁盘管理,asm磁盘管理篇
- 国内手机产业混乱:产业一窝蜂 企业捞快钱
- jQuery UI基础 学习笔记
- 导入失败 mysql_服务器宕机导致mysql出问题的处理方法
- modelsim仿真不出波形_直接扩频通信(下)仿真
- 用计算机和手算标准差不一致,计量师基础知识教案二第三章.ppt
- JavaScript对话框
- 维也纳新生生活指南(2018春季版)
- 中国各民族名称的罗马字母拼写法和代码-GBT 3304-1991
- 苹果按键强制恢复出厂_【数码】苹果手机忘了解锁密码不要慌,你可以这样做!...
- python drop用法,python数据处理--pandas的drop函数
- 怎么获得MIUI12系统的root权限
- linux下如何打开iso文件夹,Linux下打开ISO文件两种方法
- MacPro 迁移至 Mac Mini-M1 与 踩坑 For 后端开发
- Twincat3报错AdsWarning: 4115 (0x1013, RTIME: system clock setup fails
- 科技云报道:云密码,开辟网络安全的新“蓝海”
- 正则表达式匹配,2位数字,单个字符-华图在线试题修改
- 算法基础:4115:鸣人和佐助--广度优先搜索
- iOS 开发,工程中混合使用 ARC 和非ARC