FlinkSQL源码解析(三)执行流程
1、前言
前面2部分主要是介绍以下2点:
- flink sql整体的执行流程大致为:sqlNode --> Operation --> RelNode --> 优化 --> execNode --> Transformation。
- 以及flink sql源码解析需要使用到的java spi。
现在具体来看看flink sql 在其内部转换的实现步骤,就是如何去调用连接器,主要是在sqlNode --> RelNode这一步。
当执行建表语句,主要是进行语法的校验。真正把输入源、输出源连接在一起执行的,还是通过insert语句。即当我们在定义输入表或者输出表,有错误的定义connecter='abc',也不会在执行建表语句时返回错误,而是在执行插入语句时报错。这里以一份简单的代码为例:
def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(10)env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()val tableEnv = StreamTableEnvironment.create(env, settings)tableEnv.getConfig.setLocalTimeZone(ofHours(8))tableEnv.getConfig().getConfiguration().setBoolean("table.exec.emit.early-fire.enabled", true)tableEnv.getConfig().getConfiguration().setString("table.exec.emit.early-fire.delay", "5000ms")tableEnv.executeSql("""|CREATE TABLE input (| userId STRING,| pageId STRING,| sign STRING,| proctime AS PROCTIME(), -- generates processing-time attribute using computed column| eventTime TIMESTAMP(3),| WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND -- defines watermark on ts column, marks ts as event-time attribute|) WITH (| 'connector' = 'kafka', -- using kafka connector| 'topic' = 'flinksource', -- kafka topic| 'scan.startup.mode' = 'latest-offset', -- reading from the latest| 'properties.bootstrap.servers' = 'hadoop1:9092,hadoop2:9092,hadoop3:9092', -- kafka broker address| 'format' = 'json' -- the data format is json|)|""".stripMargin)tableEnv.executeSql("""|CREATE TABLE output (| userId STRING,| pageId STRING,| cnt BIGINT,| startTime timestamp,| endTime timestamp|) WITH (| 'connector' = 'print'|)|""".stripMargin)tableEnv.executeSql("""| insert into output| select| userId,| pageId,| count(*) as cnt,| HOP_START(eventTime, interval '1' HOUR, interval '1' DAY) as startTime,| HOP_END(eventTime, interval '1' HOUR, interval '1' DAY) as endTime|from (| select * from input where sign = 'error'|) a group by userId,pageId, hop(eventTime, interval '1' HOUR, interval '1' DAY)""".stripMargin)env.execute("insertDemo")}
2.
FlinkSQL源码解析(三)执行流程相关推荐
- 3.MyBatis源码解析-CRUD执行流程--阿呆中二
CRUD执行流程 MyBatis CRUD执行流程 与我联系 MyBatis 本文是对mybatis 3.x源码深度解析与最佳实践学习的总结,包括XML文件解析流程.SqlSession构建流程.CR ...
- skynet源码解析(三)——启动流程
对于你不了解的框架或者引擎,介绍再多的逻辑结构都好像有点茫然的感觉.所以小编认为,最有效的方式就是搞清楚框架启动流程的步骤,让自己心中有一条线可以牵引着. 当你在终端输入./skeynet examp ...
- 熔断器 Hystrix 源码解析 —— 命令执行(三)之执行超时
2019独角兽企业重金招聘Python工程师标准>>> 摘要: 原创出处 http://www.iocoder.cn/Hystrix/command-execute-third-ti ...
- Retrofit2源码解析——网络调用流程(下)
Retrofit2源码解析系列 Retrofit2源码解析(一) Retrofit2源码解析--网络调用流程(上) 本文基于Retrofit2的2.4.0版本 implementation 'com. ...
- 以太坊EVM源码注释之执行流程
以太坊EVM源码分析之执行流程 业务流程概述 EVM是用来执行智能合约的.输入一笔交易,内部会将之转换成一个Message对象,传入 EVM 执行.在合约中,msg 全局变量记录了附带当前合约的交易的 ...
- android 输入法如何启动流程_android输入法02:openwnn源码解析01—输入流程
android 输入法 02:openwnn 源码解析 01-输入流程 之后要开始 android 日文输入法的测试,因此现在开始研究 android 输入法.之前两 篇文章已经对 android 自 ...
- 【Flink】 Flink 源码之 SQL 执行流程
1.概述 转载:Flink 源码之 SQL 执行流程 2.前言 本篇为大家带来Flink执行SQL流程的分析.它的执行步骤概括起来包含: 解析.使用Calcite的解析器,解析SQL为语法树(SqlN ...
- Disruptor源码解析三 RingBuffer解析
目录 系列索引 前言 主要内容 RingBuffer的要点 源码解析 系列索引 Disruptor源码解析一 Disruptor高性能之道 Disruptor源码解析二 Sequence相关类解析 D ...
- OkHttp3源码解析(三)——连接池复用
OKHttp3源码解析系列 OkHttp3源码解析(一)之请求流程 OkHttp3源码解析(二)--拦截器链和缓存策略 本文基于OkHttp3的3.11.0版本 implementation 'com ...
最新文章
- 年卡在手,城墙我走: 记葡萄城控件团队建设
- Excel导入SQL数据库完整代码
- 2012 草莓音乐节 [组图]
- 【错误记录】Android NDK 错误排查记录 ( error: undefined reference to | Linking CXX shared library FAILED )
- linux搭建mq环境,Linux搭建servicemix、activemq环境
- keil C 51 strlen库函数使用
- 有关asp.net技术的外文文献_医学科技论文写作中参考文献的标准格式及常见问题...
- 【浅说】堆(heap)和栈(stack)区别
- 拓端tecdat|R语言用相关网络图可视化分析汽车配置和饮酒习惯
- PropertyChangeSupport 监听器模式的应用
- ACCESS数据库如何设置密码
- 存货的三个加权平均单价
- 身份证验证程序(一)
- 修改elementui 的默认样式element.style样式
- 接口的方式获取bing必应每天壁纸
- GDPR: Impact to Your Data Management Landscape: Part 1
- 无人机开发系列 Ubuntu18.04安装 含虚拟机与双系统
- 电销人员如何应对工作中的挫败感
- kotlin 两目运算符
- 搭建WordPress