goland sql 脚本运行_Flink 1.9 实战:使用 SQL 读取 Kafka 并写入 MySQL
SqlSubmit 的实现
笔者一开始是想用 SQL Client 来贯穿整个演示环节,但可惜 1.9 版本 SQL CLI 还不支持处理 CREATE TABLE 语句。所以笔者就只好自己写了个简单的提交脚本。后来想想,也挺好的,可以让听众同时了解如何通过 SQL 的方式,和编程的方式使用 Flink SQL。
SqlSubmit 的主要任务是执行和提交一个 SQL 文件,实现非常简单,就是通过正则表达式匹配每个语句块。如果是 CREATE TABLE 或 INSERT INTO 开头,则会调用 tEnv.sqlUpdate(...) 。如果是 SET 开头,则会将配置设置到 TableConfig 上。其核心代码主要如下所示:
EnvironmentSettings settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build();// 创建一个使用 Blink Planner 的 TableEnvironment, 并工作在流模式TableEnvironment tEnv = TableEnvironment.create(settings);// 读取 SQL 文件List sql = Files.readAllLines(path);// 通过正则表达式匹配前缀,来区分不同的 SQL 语句List calls = SqlCommandParser.parse(sql);// 根据不同的 SQL 语句,调用 TableEnvironment 执行for (SqlCommandCall call : calls) { switch (call.command) { case SET: String key = call.operands[0]; String value = call.operands[1]; // 设置参数 tEnv.getConfig().getConfiguration().setString(key, value); break; case CREATE_TABLE: String ddl = call.operands[0]; tEnv.sqlUpdate(ddl); break; case INSERT_INTO: String dml = call.operands[0]; tEnv.sqlUpdate(dml); break; default: throw new RuntimeException("Unsupported command: " + call.command); }}// 提交作业tEnv.execute("SQL Job");
使用 DDL 连接 Kafka 源表
在 flink-sql-submit 项目中,我们准备了一份测试数据集(来自 阿里云天池公开数据集 ,特别鸣谢),位于 src/main/resources/user_behavior.log 。数据以 JSON 格式编码,大概长这个样子:
{"user_id": "543462
goland sql 脚本运行_Flink 1.9 实战:使用 SQL 读取 Kafka 并写入 MySQL相关推荐
- Flink 1.9 实战:使用 SQL 读取 Kafka 并写入 MySQL
上周六在深圳分享了<Flink SQL 1.9.0 技术内幕和最佳实践>,会后许多小伙伴对最后演示环节的 Demo 代码非常感兴趣,迫不及待地想尝试下,所以写了这篇文章分享下这份代码.希望 ...
- kafka 的pom文件_Flink的sink实战之二:kafka
欢迎访问我的GitHub https://github.com/zq2599/blog_demos 内容:所有原创文章分类汇总及配套源码,涉及Java.Docker.Kubernetes.DevOPS ...
- PowerDesigner导出SQL脚本运行注释出现乱码问题
乱码就是字符集的问题,先检查一下自己数据库的字符集,然后在Database --> Generate Database ,在formata页面中的Encoding中选中适合的字符集就可以解决问题 ...
- linux执行db2的sql脚本,LinuxShell自动执行当前目录所有DB2 SQL语句
该shell脚本用于自动执行当前目录下所有的SQL语句,分为以下几个步骤: 步骤1.删除当前目录下已存在的log日志文件 #删除当前目录下log日志文件 for test1 in `ls` do re ...
- java 生成sql脚本_java导出insert语句并生成sql脚本
insertSQL = new ArrayList(); ResultSet rs = null; try { rs = getColumnNameAndColumeValue(sm, listSQL ...
- kafka maven 依赖_Flink的sink实战之二:kafka
本文是<Flink的sink实战>系列的第二篇,<Flink的sink实战之一:初探>对sink有了基本的了解,本章来体验将数据sink到kafka的操作: 版本和环境准备 本 ...
- java jdbc脚本_关于java:使用MySQL和JDBC运行.sql脚本
我开始使用MySQL和JDBC. Class.forName("com.mysql.jdbc.Driver"); conn = DriverManager.getConnectio ...
- MySQL 命令行下执行.sql脚本
1.首先编写sql脚本,保存为的:book.sql,内容如下: 1use test; 2create table book 3( 4tisbn varchar(20) primary ke ...
- MySQL命令行下执行.sql脚本详解
本文主要介绍一个在MySQL命令行下执行脚本文件的例子,通过这个例子让我们来了解一下在命令行下MySQL是怎样执行脚本的吧.现在我们开始介绍这一过程. 1.首先编写sql脚本,保存为的:book.sq ...
最新文章
- BZOJ.3004.[SDOI2012]吊灯(结论)
- linux环境下安装tomcat6
- Java 注解用法详解——@SuppressWarnings
- cms java垃圾回收_java cms垃圾回收器总结
- oracle set markup,oracle sql*plus set spool介绍(二)
- leetcode验证冒泡排序效率
- VBoxManage获取虚拟机IP地址
- [Ubuntu] apt 添加第三方库
- IOS开发之Storyboard应用
- P1502 窗口的星星(扫描线入门第一题)
- DEPRECATION: Python 2.7 reached the end of its life on January 1st, 2020. Please upgrade
- 【优化算法】广义正态分布优化算法(GNDO)【含Matlab源码 1531期】
- Skyline软件二次开发初级——10如何在WEB页面中的三维地图上控制图层对象
- IDEA 炫酷编辑器主题大全,真的好看
- linux系统维护诸多定时器,linux定时器总结
- Android自动打开省电模式,常规省电模式 | Android 开源项目 | Android Open Source Project...
- 关于Cookie和Session
- 构建初级前端页面重构开发环境
- 【openstack-rally】使用rally执行tempest api测试并导出测试报告
- 为什么北京人和上海人都成了“杭漂“?
热门文章
- 通过DataX从Oracle同步数据到MySQL-安装配置过程
- vue-slicksort拖拽组件
- php实现最简单的MVC框架实例教程
- 转载——三种编程命名规范(匈牙利命名法、驼峰式命名法、帕斯卡命名法)...
- 存储类、链接、内存管理
- 面试中经常会被问到的70个问题
- 有关Activity的Launch mode 以及Intent的setFlags(转载)
- uni app 调用网络打印机_uni-app封装一个request请求
- 用python开发一个影视网站_GitHub - lyzhanghai/movie_project: 一个使用Python+Flask开发的微电影网站...
- 全球最顶级的电脑配置_全球最顶级外汇交易员,非这10位莫属