flink connector phoenix
参考:https://github.com/lixz3321/flink-connector-jdbc-ext
下载:flink-release-1.13.2 源码
进入flink-connectors module 子module flink-connector-jdbc,添加 PhoenixDialect.java ,PhoenixRowConverter.java,修改 JdbcDialects.java
mvn clean package -DskipTests -Dfast -Dra
t.skip=true
替代 flink-connector-jdbc
<!-- <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.11</artifactId><version>1.13.2</version>
</dependency> --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc-ext</artifactId><version>1.13.2</version>
</dependency>
PostgresDialect.java
org.apache.flink.connector.jdbc.dialect.PhoenixDialect
public class PhoenixDialect extends AbstractDialect{@Overridepublic int maxDecimalPrecision() {return 0;}@Overridepublic int minDecimalPrecision() {return 0;}@Overridepublic int maxTimestampPrecision() {return 0;}@Overridepublic int minTimestampPrecision() {return 0;}@Overridepublic List<LogicalTypeRoot> unsupportedTypes() {return null;}@Overridepublic String dialectName() {return "Phoenix";}@Overridepublic boolean canHandle(String url) {return url.startsWith("jdbc:phoenix:");}@Overridepublic JdbcRowConverter getRowConverter(RowType rowType) {return new PhoenixRowConverter(rowType);}@Overridepublic String getLimitClause(long limit) {return null;}@Overridepublic Optional<String> defaultDriverName() {return Optional.of("org.apache.phoenix.jdbc.PhoenixDriver");}@Overridepublic String quoteIdentifier(String identifier) {return "\"" + identifier + "\"";}@Overridepublic String getInsertIntoStatement(String tableName, String[] fieldNames) {String columns =Arrays.stream(fieldNames).map(this::quoteIdentifier).collect(Collectors.joining(", "));String placeholders =Arrays.stream(fieldNames).map(f -> ":" + f).collect(Collectors.joining(", "));return "UPSERT INTO "+ quoteIdentifier(tableName)+ "("+ columns+ ")"+ " VALUES ("+ placeholders+ ")";}
}
PhoenixRowConverter.java
org.apache.flink.connector.jdbc.internal.converter.PhoenixRowConverter
public class PhoenixRowConverter extends AbstractJdbcRowConverter{public PhoenixRowConverter(RowType rowType) {super(rowType);}@Overridepublic String converterName() {return "Phoenix";}
}
JdbcDialects.java
org.apache.flink.connector.jdbc.dialect.JdbcDialects
private static final List<JdbcDialect> DIALECTS =Arrays.asList(new DerbyDialect(),new MySQLDialect(),new PostgresDialect(),// 添加 phoenix 语义new PhoenixDialect());
======
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);System.out.println("tEnv--> " + tEnv);//加载clickhouse表tEnv.executeSql("create table test4(" +"`GOODS_ID` STRING," +"`ISWHITE` STRING," +"`DATA_DATE` STRING" +")WITH(" +"'connector' = 'jdbc'," +"'url' = 'jdbc:phoenix:172.16.34.121:2181'," +"'table-name' = 'WHO_GOODS_LABEL'" +")");tEnv.executeSql("select * from test4").print();
值得注意的是如下图 aaa / ccc 未在上图显示,因为是在 hbase shell 直接添加的、未查询的到…
注意字段大小写…
create table test4(
`goods_id` STRING,
`ISWHITE` STRING,
`DATA_DATE` STRING
)WITH(
'connector' = 'jdbc',
'url' = 'jdbc:phoenix:172.16.34.121:2181',
'table-name' = 'WHO_GOODS_LABEL'
)"
flink connector phoenix相关推荐
- 日志服务Flink Connector《支持Exactly Once》
摘要: Flink log connector是阿里云日志服务推出的,用于对接Flink的工具,包含两块,分别是消费者和生产者,消费者用于从日志服务中读数据,支持exactly once语义,生产者用 ...
- 扩展dlink-connector-phoenix使其phoenix-5.0.0支持flink1.16
感慨:玩大数据java必须要精通,不然遇到眼花缭乱的框架以及插件拓展的时候就会一下子傻眼了.各种框架之间版本不同有现成的插件或者方案到还可以但是没有就需要自己扩展.目前我使用的是CDH6.3.2,fl ...
- flink mysql connector_Flink JDBC Connector:Flink 与数据库集成最佳实践
整理:陈政羽(Flink 社区志愿者) 摘要:Flink 1.11 引入了 CDC,在此基础上, JDBC Connector 也发生比较大的变化,本文由 Apache Flink Contribut ...
- Flink JDBC Connector:Flink 与数据库集成最佳实践
整理:陈政羽(Flink 社区志愿者) 摘要:Flink 1.11 引入了 CDC,在此基础上, JDBC Connector 也发生比较大的变化,本文由 Apache Flink Contribut ...
- 说说Flink的连接器connector有哪些,怎么用?
标题: '说说Flink的连接器connector有哪些,怎么用?' 日期: 2021-07-31 10:26:51 标签: [flink,connector] 分类: 数据仓库 flink作为一个计 ...
- flink jdbc connector支持clickhouse
1.业务背景 业务需求把数据写入clickhouse,同时还需要支持主键更新.目前使用的开源flink1.11版本是不支持clickhouse的,项目中使用的是flink sql 所以需要对源代码进行 ...
- Flink SQL 自定义 redis connector
一般情况下,我们不需要创建新的 connector,因为 Flink SQL 已经内置了丰富的 connector 供我们使用,但是在实际生产环境中我们的存储是多种多样的,所以原生的 connecto ...
- flink redis connector(支持flink sql)
flink redis connector(支持flink sql) 1. 背景 工作原因,需要基于flink sql做redis sink,但bahir 分支的flink connector支持只是 ...
- 技术解析|Doris Connector 结合 Flink CDC 实现 MySQL 分库分表 Exactly Once精准接入
本篇文档将演示如何使用 Apache Doris Flink Connector 结合 Flink CDC 以及 Doris Stream Load 的两阶段提交,实现 MySQL 数据库分库分表实时 ...
最新文章
- 虚拟化--图解交互式方式安装ESXi_5.0的过程
- DEDECMS站点内容自动更新到新浪微博的方法
- html重复标题,在HTML中重复表标题
- 冒泡排序选择排序 以及时间效率对比
- JSP(Servlet)中从连接池获取连接
- 超速问题的c语言编程,超速行驶问题--精选.doc
- 视觉SLAM笔记(47) 优化 PnP 的结果
- 顺丰拟发行58亿可转债:15亿投入航空运力,12亿还贷款
- Qt4.7中 默认的构造函数
- 转: 深入理解Linux修改hostname
- Windows 下 Git 的安装及如何与码云协作
- excel自动调整列宽_Excel双击鼠标的9种用法
- SQL Server 2005 技术内幕之T-SQL查询——逻辑查询处理(上)
- 俄罗斯方块、纯前端实现俄罗斯方块、俄罗斯方块代码
- python实现通讯录代码
- 机器学习系列手记(六):概率图模型之概率图模型的联合概率分布
- 2022手机号段大全、归属运营商整理—2022.01.04更新(包含三大运营商)
- 100~200以内素数
- actor 模型原理 (二)
- 亚马逊 kindle ebook 大合集资源多真的好吗?