前言

阿里云 AnalyticDB(mysql) 以下简称 adb数据库,是可以直接使用 Mysql的连接方式,但 adb与 mysql语法并不完全相同,有一些特殊的语法有些出入,比如我在使用 upset的时候,当我们在插入表时定义了主键,如果主键已经存在并不会继续插入数据而是会去修改之前的数据。

业务场景

使用flink实时流计算从 kafka 获取数据插入到 adb数据库,但是遇到一个问题,建表的时候定义主键然后插入重复的数据应该是修改并不是插入,并且在使用**(upset)** 的时候报错:cannot update pk column SHOP_CODE to expr,当数据库为mysql的时候就没问题,查询了一下相关资料,原来adb的upset语法和mysql的语法有一些出入所以这里才会报错。

当前使用的flink版本为 1.13.5,下载源码查看mysql的upset是如何实现 ,github源码下载 Release release-1.13.5 · apache/flink · GitHub 把源码导入到idea中
导入 idea后所有组件都在, maven依赖也是需要很长的时间,中间可能也会有一些包下载失败,我的处理办法是maven直接使用默认的镜像不使用阿里云镜像,但这样操作的前提是电脑开启了科学上网才行。
依赖加载成功后找到jdbc

flink JDBC源码

通过查看源码发现 Flink将onnector接口注册在org.apache.flink.connector.jdbc.dialect.JdbcDialects 中:

同时每个方言都扩展了 AbstractDialect,并且通过查看发现 MySQLDialect自己重写了 getUpsertStatement实现 upsert。

自定义adb方言实现upset思路

adb使用的驱动和方言都和 mysql一致,我们只需要重写 getUpsertStatement方法实现 adb的 upset即可,直接复制一份 MySQLDialect,然后把 canHandle的返回由" jdbc:mysql:"修改为" jdbc:adb:",再修改 getUpsertStatement方法,把 insert 替换为 replace

具体代码实现

修改 AdbDialect 类中两个方法

@OverridepublicbooleancanHandle(String url) {return url.startsWith("jdbc:adb:");
}@OverridepublicOptional<String> getUpsertStatement(String tableName, String[] fieldNames, String[] uniqueKeyFields) {returnOptional.of(getInsertIntoStatement(tableName, fieldNames).replace("INSERT", "REPLACE"));
}

在 org.apache.flink.connector.jdbc.internal.converter包中复制一份 MySQLRowConverter改为 AdbRowConverter,把 converterName修改为 " adb" 。

public classAdbRowConverterextendsAbstractJdbcRowConverter{private static final long serialVersionUID = 1L;@Overridepublic String converterName() {return"adb";}public AdbRowConverter(RowType rowType) {super(rowType);}
}

最后将 AdbDialect注册到 JdbcDialects 中

publicfinalclassJdbcDialects {privatestaticfinal List<JdbcDialect> DIALECTS =Arrays.asList(newDerbyDialect(),newMySQLDialect(),newPostgresDialect(),newAdbDialect());/** Fetch the JdbcDialect class corresponding to a given database url. */publicstatic Optional<JdbcDialect> get(String url){for (JdbcDialect dialect : DIALECTS) {if (dialect.canHandle(url)) {return Optional.of(dialect);}}return Optional.empty();}
}

最后还需要修改 org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider 中的 getOrEstablishConnection 方法,当加载驱动的时候传入 jdbc:adb 改为 jdbc:mysql 让它去获取 mysql 的驱动

@Overridepublic Connection getOrEstablishConnection()throws SQLException, ClassNotFoundException {if (connection != null) {return connection;}if (jdbcOptions.getDriverName() == null) {connection =DriverManager.getConnection(jdbcOptions.getDbURL(),jdbcOptions.getUsername().orElse(null),jdbcOptions.getPassword().orElse(null));} else {Driverdriver= getLoadedDriver();Propertiesinfo=newProperties();jdbcOptions.getUsername().ifPresent(user -> info.setProperty("user", user));jdbcOptions.getPassword().ifPresent(password -> info.setProperty("password", password));StringdbURL= jdbcOptions.getDbURL();if (jdbcOptions.getDbURL().substring(0, 8).equals("jdbc:adb")) {dbURL = "jdbc:mysql" + jdbcOptions.getDbURL().substring(8);}connection = driver.connect(dbURL, info);if (connection == null) {// Throw same exception as DriverManager.getConnection when no driver found to match// caller expectation.thrownewSQLException("No suitable driver found for " + jdbcOptions.getDbURL() + "08001");}}return connection;
}

连接方式

还需要注意一个问题,就是我们修改了这些在使用的时候这样进行链接处理,但是我们并没有 adb的驱动包,要做的就是当我们使用 adb的时候让它去加载 mysql的驱动。把 jdbc后面 mysql修改为 adb,具体连接方式:

) WITH ('connector' = 'jdbc','url' = 'jdbc:adb://127.0.0.1:3306/text','username' = '','password' = '','driver'     = 'com.mysql.jdbc.Driver','table-name' = 'sys_dict'
);

可能出现的问题

如果打包的 idea使用的 jdk11, flink服务器上运行的是 jdk 1.8 ,那么就会报错,我们要么就升级服务器版本,要么就在 idea中更改项目版本,点击 maven -> Profiles,把 jdk11去掉,选中 jdk1.8然后打包就可以解决问题了

flink jdbc(mysql)修改为阿里云AnalyticDB方言相关推荐

  1. 阿里云怎样操作mysql数据库_阿里云主机如何操作mysql数据库

    阿里云主机如何操作mysql数据库,阿里云上传mysql数据库. 在阿里云ecs云服务器上部署数据库后,在平常的操作中可能会遇到些问题,可以先做个大致的了解: 如果您想看更多的在ecs上的数据库的相关 ...

  2. 阿里云mysql容量_阿里云RDS的mysql数据库占用空间超过90%的处理

    阿里云RDS数据库最大支持2T,目前已经占用了90%,如果进行分库或者迁移比较麻烦,思路是找出占用空间过大的日志或不重要的文件进行删除操作 查询所有数据库占用磁盘空间大小的SQL语句: show bi ...

  3. 江门农商银行引入阿里云AnalyticDB,实现数据自助分析平台升级

    简介:文章来源:中金在线 3月5日,记者采访获悉,江门农商银行日前完成自助分析平台的升级换代,由阿里云云原生数据仓库AnalyticDB替代传统DB2数据库,满足银行业务报表.管理会计.风控.营销等多 ...

  4. 如何将数据仓库从 AWS Redshift 迁移到阿里云 AnalyticDB for PostgreSQL

    阿里云AnalyticDB for PostgreSQL(以下简称 ADB PG,即原HybridDB for PostgreSQL)为基于PostgreSQL内核的MPP架构的实时数据仓库服务,可以 ...

  5. 阿里云mysql宽带_阿里云数据库RDS MySQL版购买使用详细过程

    阿里云数据库 RDS MySQL 版如何购买使用?通过前面学习如何选择阿里云数据库 RDS MySQL 版,魏艾斯博客已经阿里云 MySQL 有了初步认识,接下来跟着老魏一起购买使用阿里云数据库 RD ...

  6. 把Maven本地仓库修改为阿里云仓库

    为什么要把Maven本地仓库修改为阿里云仓库?还不是因为下载jar包的时候慢嘛,几十kb,想练习一下项目都不行,还不是一般的慢,都把我急的~ 1,下载安装maven 2,修改settings.xml文 ...

  7. 创业公司为什么不自建MySQL而使用阿里云的 RDS

    创业公司为什么不自建MySQL而使用阿里云的 RDS 在脉脉看到一条职言:"不明白有人居然使用阿里云的 RDS?" RDS 是什么? 关系型数据库RDS(Relational Da ...

  8. 仓库处理中 无法修改_阿里云自研数据仓库 AnalyticDB 再捧 TPC 全球冠军

    作者 | 马超责编 | 伍杏玲出品 | CSDN(ID:CSDNnews) 5月14日,TPC 官网正式公布,阿里云自研的 AnalyticDB 通过了TPC-DS全流程测试,将前世界纪录的性能提升了 ...

  9. 阿里云linux 重启mysql数据库_阿里云linux服务器mysql修改密码教程

    阿里云机器最多的好像是linux系统了,我今天来为大家介绍一下阿里云服务器的linux系统修改mysql数据库密码的方法,希望对各位会有帮助,这里整理了几种方法. MYSQL的密码我们并不经常修改,但 ...

最新文章

  1. QIIME 2教程. 17鉴定和过滤嵌合体q2-vsearch(2021.2)
  2. 4、Eternal框架-持有者
  3. 【Java】日期/事件字符串包含TZ
  4. InnoDB索引原理详解
  5. linux不同发行版 程序通用吗,为什么各种Linux发行版使用不同的包管理器?
  6. 把angular(vue等)项目部署在局域网上
  7. Swagger3、SpringBoot学习、使用复盘
  8. Linux学习总结(七十四)自动化运维之ansible
  9. python入门经典代码-Python入门经典
  10. 服务器安装mysql文档_Linux 服务器安装MySQL数据库
  11. 知识图谱——TransE模型原理
  12. 概率论与数理统计(2.3-2.4)随机变量的分布函数和密度函数(连续型)
  13. 华为存储iscsi配置_使用华为存储配置ISCSI存储方法和iSCSI建立连接提示目标错误...
  14. 2023年陕西师范大学宗教学考研上岸前辈备考经验指导
  15. Linux 修改环境变量设置的三种方式
  16. 学生专用计算机怎样开启关机,电脑自动关机,教您笔记本怎么设置自动关机
  17. Matplotlib 绘制条形图
  18. 2019美国大学计算机专业,2019美国计算机专业什么大学好
  19. 7-229 sdut-C语言实验- 排序7-227 sdut- C语言实验-计算1到n的和(循环结构)
  20. oracle 不等于但包含空,sql语句不等于null

热门文章

  1. 使用python+poco+夜神模拟器进行自动化测试。
  2. 我踩了mysqldump的一个地雷(续)
  3. 人类完整基因组测序意味着什么
  4. Spark内存管理-UnifiedMemoryManager和StaticMemoryManager
  5. 【Redis】redis如何保存对象
  6. 生命在于学习——框架-中间件的学习(一)
  7. 【音频分类与检测】PANNs:用于音频模式识别的大规模预训练音频神经网络
  8. 怎样将无线鼠标连接到Mac电脑?
  9. WinRAR4.20注册文件rarreg.key
  10. 企业用什么留住人才???