• 背景
  • 示例
  • 源码解析

背景

1.11.0 之前,用户如果依赖 Flink 的 source/sink 读写关系型数据库或读取 changelog 时,必须要手动创建对应的 schema。但是这样会有一个问题,当数据库中的 schema 发生变化时,也需要手动更新对应的 Flink 任务以保持类型匹配,任何不匹配都会造成运行时报错使作业失败。这个操作冗余且繁琐,体验极差。

实际上对于任何和 Flink 连接的外部系统都可能有类似的上述问题,在 1.11.0 中重点解决了和关系型数据库对接的这个问题。提供了 JDBC catalog 的基础接口以及 Postgres catalog 的实现,这样方便后续实现与其它类型的关系型数据库的对接。

1.11.0 版本后,用户使用 Flink SQL 时可以自动获取表的 schema 而不再需要输入 DDL。除此之外,任何 schema 不匹配的错误都会在编译阶段提前进行检查报错,避免了之前运行时报错造成的作业失败。

示例

目前对于jdbc catalog,flink仅提供了postgres catalog,我们基于postgres的catalog讲解一下如何使用flink的catalog ,

  • 引入pom
   <dependency><groupId>org.postgresql</groupId><artifactId>postgresql</artifactId><version>42.2.5</version></dependency>

  • 新建PostgresCatalog
    目前flink通过一个静态类来创建相相应的jdbc catalog,对于PostgresCatalog,没有提供public类型的构造方法。

通过JdbcCatalogUtils.createCatalog构造PostgresCatalog时这五个参数都是必填项,其中baseUrl要求是不能带有数据库名的

  String catalogName = "mycatalog";String defaultDatabase = "postgres";String username = "postgres";String pwd = "postgres";String baseUrl = "jdbc:postgresql://localhost:5432/";PostgresCatalog postgresCatalog = (PostgresCatalog) JdbcCatalogUtils.createCatalog(catalogName,defaultDatabase,username,pwd,baseUrl);

访问postgres 数据库指定表名的时候完整的路径名应该是以下格式:

<catalog>.<db>.`<schema.table>`

其中schema默认是public,如果是使用缺省值,public是可以省略的。比如下面的查询语句:

SELECT * FROM mypg.mydb.test_table;
SELECT * FROM mydb.test_table;
SELECT * FROM test_table;

如果非缺省schema,则不能被省略:

SELECT * FROM mypg.mydb.`custom_schema.test_table2`
SELECT * FROM mydb.`custom_schema.test_table2`;
SELECT * FROM `custom_schema.test_table2`;

  • 常见操作

我们PostgresCatalog将注册到StreamTableEnvironment 的变量tEnv中,然后就可以用tEnv进行一些操作了。

 tEnv.registerCatalog(postgresCatalog.getName(), postgresCatalog);tEnv.useCatalog(postgresCatalog.getName());

  1. 列出来所有的数据库:
        System.out.println("list databases :");String[] databases = tEnv.listDatabases();Stream.of(databases).forEach(System.out::println);

  1. 列出来所有的table
     tEnv.useDatabase(defaultDatabase);System.out.println("list tables :");String[] tables = tEnv.listTables(); // 也可以使用  postgresCatalog.listTables(defaultDatabase);Stream.of(tables).forEach(System.out::println);

  1. 列出所有函数
        System.out.println("list functions :");String[] functions = tEnv.listFunctions();Stream.of(functions).forEach(System.out::println);

  1. 获取table的schema
 CatalogBaseTable catalogBaseTable = postgresCatalog.getTable(new ObjectPath(defaultDatabase,"table1"));TableSchema tableSchema = catalogBaseTable.getSchema();System.out.println("tableSchema --------------------- :");System.out.println(tableSchema);

  1. 查询表的数据
  List<Row> results = Lists.newArrayList(tEnv.sqlQuery("select * from table1").execute().collect());results.stream().forEach(System.out::println);

  1. 插入数据
tEnv.executeSql("insert into table1 values (3,'c')");

完整的代码请参考:

https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/catalog/PostgresCatalogTest.java

源码解析

AbstractJdbcCatalog

这个类主要是对jdbc catalog一些公共的操作做了抽象.目前实现了实际功能的只有一个方法:getPrimaryKey,其他方式主要是对于Catalog的一些其他实现类做了特殊处理,比如类似create table 或者 alter table是不支持的,listView只是返回一个空列表,因为我们使用jdbc catalog主要是来做一些DML操作。

 @Overridepublic void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {throw new UnsupportedOperationException();}@Overridepublic List<String> listViews(String databaseName) throws DatabaseNotExistException, CatalogException {return Collections.emptyList();}

PostgresCatalog

在这里面,主要是实现了一些常用的操作数据库的方法,比如getTable、listTables、listDatabases等等,其实简单的来说就是从postgres元数据库里查询出来相应的信息,然后组装成flink的相关对象,返回给调用方。 以一个简单的方法listDatabases为例:

从元数据表pg_database中查询所有的tablename,然后去掉内置的数据库,也就是template0和template1,然后封装到一个list对象里,返回。

 @Overridepublic List<String> listDatabases() throws CatalogException {List<String> pgDatabases = new ArrayList<>();try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {PreparedStatement ps = conn.prepareStatement("SELECT datname FROM pg_database;");ResultSet rs = ps.executeQuery();while (rs.next()) {String dbName = rs.getString(1);if (!builtinDatabases.contains(dbName)) {pgDatabases.add(rs.getString(1));}}return pgDatabases;} catch (Exception e) {throw new CatalogException(String.format("Failed listing database in catalog %s", getName()), e);}}

有不兼容的地方需要做一些转换,比如getTable方法,有些数据类型是不匹配的,要做一些类型的匹配,如postgres里面的serial和int4都会转成flink的int类型,具体的参考下PostgresCatalog#fromJDBCType方法。

 private DataType fromJDBCType(ResultSetMetaData metadata, int colIndex) throws SQLException {String pgType = metadata.getColumnTypeName(colIndex);int precision = metadata.getPrecision(colIndex);int scale = metadata.getScale(colIndex);switch (pgType) {case PG_BOOLEAN:return DataTypes.BOOLEAN();case PG_BOOLEAN_ARRAY:return DataTypes.ARRAY(DataTypes.BOOLEAN());case PG_BYTEA:return DataTypes.BYTES();.........................

参考资料:
[1].https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html

更多内容,欢迎关注我的公众号【大数据技术与应用实战】

dosbox中out of memory_flink教程-详解flink 1.11 中的JDBC Catalog相关推荐

  1. linux基础配置脚本,Linux中selinux基础配置教程详解

    selinux(Security-Enhanced Linux)安全增强型linux,是一个Linux内核模块,也是Linux的一个安全子系统. 三种模式: Enforcing:强制模式,在selin ...

  2. 我的世界服务器权限组文件在哪,我的世界GroupManager权限组管理教程详解

    我的世界GroupManager权限组管理教程详解,详细描述了我的世界GroupManager权限组管理教程,希望这篇我的世界GroupManager权限组管理教程详解,能够帮助到各位正在玩我的世界的 ...

  3. Python中的select、epoll详解

    Python中的select.epoll详解 文章目录 Python中的select.epoll详解 一.select 1.相关概念 2.select的特性 1.那么单进程是如何实现多并发的呢??? ...

  4. Python中数组切片的用法详解

    Python中数组切片的用法详解 一.python中"::-1"代表什么? 二.python中":"的用法 三.python中数组切片 三.numpy中的整数数 ...

  5. Android中联系人和通话记录详解(2)

    在文章Android中联系人和通话记录详解(1)中对通话记录进行了分析,本章将对联系人的数据库表.字段以及Insert,Query,Delelte,Update四大基本数据操作进行分析. 与联系人相关 ...

  6. CSS3中font-face属性的用法详解

    CSS3中font-face属性的用法详解 @font-face是CSS3中的一个模块,主要是把自定义的Web字体嵌入到你的网页中,随着@font-face模块的出现,我们在Web的开发中使用字体不怕 ...

  7. mysql5.7.11 linux_CentOS 7 中以命令行方式安装 MySQL 5.7.11 for Linux Generic 二进制版本教程详解...

    MySQL 目前的最新版本是 5.7.11,在 Linux 下提供特定发行版安装包(如 .rpm)以及二进制通用版安装包(.tar.gz).一般情况下,很多项目都倾向于采用二进制通用安装包形式来进行安 ...

  8. pythondjango教程_Python 中Django安装和使用教程详解

    一.安装 一般使用cmd 安装就可以 手动安装通过下载方式 二.配置使用 1.通过cmd新建一个项目,我是在桌面新建 上面命令会在桌面新建pythonDjango文件夹,在里面会生成如下图两个文件 m ...

  9. win7 64位操作系统中Oracle 11g + plsql安装教程详解(图解)

    这篇文章主要介绍了win7 64位操作系统中Oracle 11g + plsql安装教程详解(图解),详细的介绍了Oracle 11g 安装的步骤,有兴趣的可以了解一下. 先去网上把下面列表里的文件下 ...

  10. 检查python是否安装成功的命令是_Python 中的pygame安装与配置教程详解

    安装软件环境及版本说明 OS: Win10 x 64 专业版 Python: 2.7 IDE: PyCharm Community 2018 1. 安装python 1)下载并安装python pyt ...

最新文章

  1. 数字图像处理——第七章 小波和多分辨处理
  2. AB_PLC_入门教程
  3. kohana分析之主程序加载流程
  4. mysql集群之MMM简单搭建
  5. ubuntu 18.04设置系统自带系统截图快捷键
  6. RTEMS 的 AT91SAM9260 移植(5): 调试串口驱动
  7. 小程序richtext_用于基于SWT的应用程序的RichText编辑器组件
  8. em算法 实例 正态分布_EM算法解GMM
  9. 米筐量化不支持c语言_从零开始学量化(三):数据获取途径
  10. 使用 ale.js 制作一个小而美的表格编辑器(4)
  11. Linux 命令(58)—— readelf 命令
  12. Android 异步处理AsyncTask
  13. Bean的生命周期简单过程
  14. android 8 ldac,小米Android 8.0机型支持LDAC功能
  15. CAD2019安装方法及步骤
  16. 微信小程序地图添加标记点
  17. matlab波浪号怎么打,数学中字母上面波浪号 波浪号怎么打到上面
  18. php中fastcgi和php-fpm是什么
  19. 泊松分布的期望和方差推导
  20. 2023你冲不冲,冲冲冲冲~~

热门文章

  1. textContent与innerText
  2. c#的const可以用于引用类型吗
  3. iOS 关于本地持久化存储的探讨
  4. Xamarin Android中引用Jar包的方法
  5. JS Bin 在线编辑代码,所见所得
  6. Indigo Untyped Channel
  7. webpack-dev-server‘ is not recognized as an internal or external command
  8. List、Set和Map的简单理解
  9. 设计模式 ( 三 ) 工厂方法模式
  10. 软件设计原则(六) 合成复用原则