SqlSubmit 的实现

笔者一开始是想用 SQL Client 来贯穿整个演示环节,但可惜 1.9 版本 SQL CLI 还不支持处理 CREATE TABLE 语句。所以笔者就只好自己写了个简单的提交脚本。后来想想,也挺好的,可以让听众同时了解如何通过 SQL 的方式,和编程的方式使用 Flink SQL。

SqlSubmit 的主要任务是执行和提交一个 SQL 文件,实现非常简单,就是通过正则表达式匹配每个语句块。如果是 CREATE TABLE 或 INSERT INTO 开头,则会调用 tEnv.sqlUpdate(...) 。如果是 SET 开头,则会将配置设置到 TableConfig 上。其核心代码主要如下所示:

EnvironmentSettings settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build();// 创建一个使用 Blink Planner 的 TableEnvironment, 并工作在流模式TableEnvironment tEnv = TableEnvironment.create(settings);// 读取 SQL 文件List sql = Files.readAllLines(path);// 通过正则表达式匹配前缀,来区分不同的 SQL 语句List calls = SqlCommandParser.parse(sql);// 根据不同的 SQL 语句,调用 TableEnvironment 执行for (SqlCommandCall call : calls) { switch (call.command) { case SET: String key = call.operands[0]; String value = call.operands[1]; // 设置参数 tEnv.getConfig().getConfiguration().setString(key, value); break; case CREATE_TABLE: String ddl = call.operands[0]; tEnv.sqlUpdate(ddl); break; case INSERT_INTO: String dml = call.operands[0]; tEnv.sqlUpdate(dml); break; default: throw new RuntimeException("Unsupported command: " + call.command); }}// 提交作业tEnv.execute("SQL Job");

使用 DDL 连接 Kafka 源表

在 flink-sql-submit 项目中,我们准备了一份测试数据集(来自 阿里云天池公开数据集 ,特别鸣谢),位于 src/main/resources/user_behavior.log 。数据以 JSON 格式编码,大概长这个样子:

{"user_id": "543462

goland sql 脚本运行_Flink 1.9 实战:使用 SQL 读取 Kafka 并写入 MySQL相关推荐

  1. Flink 1.9 实战:使用 SQL 读取 Kafka 并写入 MySQL

    上周六在深圳分享了<Flink SQL 1.9.0 技术内幕和最佳实践>,会后许多小伙伴对最后演示环节的 Demo 代码非常感兴趣,迫不及待地想尝试下,所以写了这篇文章分享下这份代码.希望 ...

  2. kafka 的pom文件_Flink的sink实战之二:kafka

    欢迎访问我的GitHub https://github.com/zq2599/blog_demos 内容:所有原创文章分类汇总及配套源码,涉及Java.Docker.Kubernetes.DevOPS ...

  3. PowerDesigner导出SQL脚本运行注释出现乱码问题

    乱码就是字符集的问题,先检查一下自己数据库的字符集,然后在Database --> Generate Database ,在formata页面中的Encoding中选中适合的字符集就可以解决问题 ...

  4. linux执行db2的sql脚本,LinuxShell自动执行当前目录所有DB2 SQL语句

    该shell脚本用于自动执行当前目录下所有的SQL语句,分为以下几个步骤: 步骤1.删除当前目录下已存在的log日志文件 #删除当前目录下log日志文件 for test1 in `ls` do re ...

  5. java 生成sql脚本_java导出insert语句并生成sql脚本

    insertSQL = new ArrayList(); ResultSet rs = null; try { rs = getColumnNameAndColumeValue(sm, listSQL ...

  6. kafka maven 依赖_Flink的sink实战之二:kafka

    本文是<Flink的sink实战>系列的第二篇,<Flink的sink实战之一:初探>对sink有了基本的了解,本章来体验将数据sink到kafka的操作: 版本和环境准备 本 ...

  7. java jdbc脚本_关于java:使用MySQL和JDBC运行.sql脚本

    我开始使用MySQL和JDBC. Class.forName("com.mysql.jdbc.Driver"); conn = DriverManager.getConnectio ...

  8. MySQL 命令行下执行.sql脚本

    1.首先编写sql脚本,保存为的:book.sql,内容如下:  1use test;  2create table book   3(   4tisbn varchar(20) primary ke ...

  9. MySQL命令行下执行.sql脚本详解

    本文主要介绍一个在MySQL命令行下执行脚本文件的例子,通过这个例子让我们来了解一下在命令行下MySQL是怎样执行脚本的吧.现在我们开始介绍这一过程. 1.首先编写sql脚本,保存为的:book.sq ...

最新文章

  1. BZOJ.3004.[SDOI2012]吊灯(结论)
  2. linux环境下安装tomcat6
  3. Java 注解用法详解——@SuppressWarnings
  4. cms java垃圾回收_java cms垃圾回收器总结
  5. oracle set markup,oracle sql*plus set spool介绍(二)
  6. leetcode验证冒泡排序效率
  7. VBoxManage获取虚拟机IP地址
  8. [Ubuntu] apt 添加第三方库
  9. IOS开发之Storyboard应用
  10. P1502 窗口的星星(扫描线入门第一题)
  11. DEPRECATION: Python 2.7 reached the end of its life on January 1st, 2020. Please upgrade
  12. 【优化算法】广义正态分布优化算法(GNDO)【含Matlab源码 1531期】
  13. Skyline软件二次开发初级——10如何在WEB页面中的三维地图上控制图层对象
  14. IDEA 炫酷编辑器主题大全,真的好看
  15. linux系统维护诸多定时器,linux定时器总结
  16. Android自动打开省电模式,常规省电模式  |  Android 开源项目  |  Android Open Source Project...
  17. 关于Cookie和Session
  18. 构建初级前端页面重构开发环境
  19. 【openstack-rally】使用rally执行tempest api测试并导出测试报告
  20. 为什么北京人和上海人都成了“杭漂“?

热门文章

  1. 通过DataX从Oracle同步数据到MySQL-安装配置过程
  2. vue-slicksort拖拽组件
  3. php实现最简单的MVC框架实例教程
  4. 转载——三种编程命名规范(匈牙利命名法、驼峰式命名法、帕斯卡命名法)...
  5. 存储类、链接、内存管理
  6. 面试中经常会被问到的70个问题
  7. 有关Activity的Launch mode 以及Intent的setFlags(转载)
  8. uni app 调用网络打印机_uni-app封装一个request请求
  9. 用python开发一个影视网站_GitHub - lyzhanghai/movie_project: 一个使用Python+Flask开发的微电影网站...
  10. 全球最顶级的电脑配置_全球最顶级外汇交易员,非这10位莫属