1、前言

前面2部分主要是介绍以下2点:

  1. flink sql整体的执行流程大致为:sqlNode --> Operation --> RelNode --> 优化 --> execNode --> Transformation。
  2. 以及flink sql源码解析需要使用到的java spi。

现在具体来看看flink sql 在其内部转换的实现步骤,就是如何去调用连接器,主要是在sqlNode --> RelNode这一步。

当执行建表语句,主要是进行语法的校验。真正把输入源、输出源连接在一起执行的,还是通过insert语句。即当我们在定义输入表或者输出表,有错误的定义connecter='abc',也不会在执行建表语句时返回错误,而是在执行插入语句时报错。这里以一份简单的代码为例:

 def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(10)env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()val tableEnv = StreamTableEnvironment.create(env, settings)tableEnv.getConfig.setLocalTimeZone(ofHours(8))tableEnv.getConfig().getConfiguration().setBoolean("table.exec.emit.early-fire.enabled", true)tableEnv.getConfig().getConfiguration().setString("table.exec.emit.early-fire.delay", "5000ms")tableEnv.executeSql("""|CREATE TABLE input (|    userId STRING,|    pageId STRING,|    sign STRING,|    proctime AS PROCTIME(),   -- generates processing-time attribute using computed column|    eventTime TIMESTAMP(3),|    WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND  -- defines watermark on ts column, marks ts as event-time attribute|) WITH (|    'connector' = 'kafka',  -- using kafka connector|    'topic' = 'flinksource',  -- kafka topic|    'scan.startup.mode' = 'latest-offset',  -- reading from the latest|    'properties.bootstrap.servers' = 'hadoop1:9092,hadoop2:9092,hadoop3:9092',  -- kafka broker address|    'format' = 'json'  -- the data format is json|)|""".stripMargin)tableEnv.executeSql("""|CREATE TABLE output (|    userId STRING,|    pageId STRING,|    cnt BIGINT,|    startTime timestamp,|    endTime timestamp|) WITH (|    'connector' = 'print'|)|""".stripMargin)tableEnv.executeSql("""| insert into output| select| userId,| pageId,| count(*) as cnt,| HOP_START(eventTime, interval '1' HOUR, interval '1' DAY) as startTime,| HOP_END(eventTime, interval '1' HOUR, interval '1' DAY) as endTime|from (| select * from input where sign = 'error'|) a group by userId,pageId, hop(eventTime, interval '1' HOUR, interval '1' DAY)""".stripMargin)env.execute("insertDemo")}

2.

FlinkSQL源码解析(三)执行流程相关推荐

  1. 3.MyBatis源码解析-CRUD执行流程--阿呆中二

    CRUD执行流程 MyBatis CRUD执行流程 与我联系 MyBatis 本文是对mybatis 3.x源码深度解析与最佳实践学习的总结,包括XML文件解析流程.SqlSession构建流程.CR ...

  2. skynet源码解析(三)——启动流程

    对于你不了解的框架或者引擎,介绍再多的逻辑结构都好像有点茫然的感觉.所以小编认为,最有效的方式就是搞清楚框架启动流程的步骤,让自己心中有一条线可以牵引着. 当你在终端输入./skeynet examp ...

  3. 熔断器 Hystrix 源码解析 —— 命令执行(三)之执行超时

    2019独角兽企业重金招聘Python工程师标准>>> 摘要: 原创出处 http://www.iocoder.cn/Hystrix/command-execute-third-ti ...

  4. Retrofit2源码解析——网络调用流程(下)

    Retrofit2源码解析系列 Retrofit2源码解析(一) Retrofit2源码解析--网络调用流程(上) 本文基于Retrofit2的2.4.0版本 implementation 'com. ...

  5. 以太坊EVM源码注释之执行流程

    以太坊EVM源码分析之执行流程 业务流程概述 EVM是用来执行智能合约的.输入一笔交易,内部会将之转换成一个Message对象,传入 EVM 执行.在合约中,msg 全局变量记录了附带当前合约的交易的 ...

  6. android 输入法如何启动流程_android输入法02:openwnn源码解析01—输入流程

    android 输入法 02:openwnn 源码解析 01-输入流程 之后要开始 android 日文输入法的测试,因此现在开始研究 android 输入法.之前两 篇文章已经对 android 自 ...

  7. 【Flink】 Flink 源码之 SQL 执行流程

    1.概述 转载:Flink 源码之 SQL 执行流程 2.前言 本篇为大家带来Flink执行SQL流程的分析.它的执行步骤概括起来包含: 解析.使用Calcite的解析器,解析SQL为语法树(SqlN ...

  8. Disruptor源码解析三 RingBuffer解析

    目录 系列索引 前言 主要内容 RingBuffer的要点 源码解析 系列索引 Disruptor源码解析一 Disruptor高性能之道 Disruptor源码解析二 Sequence相关类解析 D ...

  9. OkHttp3源码解析(三)——连接池复用

    OKHttp3源码解析系列 OkHttp3源码解析(一)之请求流程 OkHttp3源码解析(二)--拦截器链和缓存策略 本文基于OkHttp3的3.11.0版本 implementation 'com ...

最新文章

  1. 年卡在手,城墙我走: 记葡萄城控件团队建设
  2. Excel导入SQL数据库完整代码
  3. 2012 草莓音乐节 [组图]
  4. 【错误记录】Android NDK 错误排查记录 ( error: undefined reference to | Linking CXX shared library FAILED )
  5. linux搭建mq环境,Linux搭建servicemix、activemq环境
  6. keil C 51 strlen库函数使用
  7. 有关asp.net技术的外文文献_医学科技论文写作中参考文献的标准格式及常见问题...
  8. 【浅说】堆(heap)和栈(stack)区别
  9. 拓端tecdat|R语言用相关网络图可视化分析汽车配置和饮酒习惯
  10. PropertyChangeSupport 监听器模式的应用
  11. ACCESS数据库如何设置密码
  12. 存货的三个加权平均单价
  13. 身份证验证程序(一)
  14. 修改elementui 的默认样式element.style样式
  15. 接口的方式获取bing必应每天壁纸
  16. GDPR: Impact to Your Data Management Landscape: Part 1
  17. 无人机开发系列 Ubuntu18.04安装 含虚拟机与双系统
  18. 电销人员如何应对工作中的挫败感
  19. kotlin 两目运算符
  20. 搭建WordPress

热门文章

  1. Tessent专栏第五篇:TessentMemoryBIST用户手册第二章下
  2. 【JavaSE入门】:Java运算符及进制转换
  3. 如何往github开源项目提交代码
  4. 2019上交、上科、北航、中科大、自动化所计算机夏令营+浙大计算机预推免简记
  5. Android Spinner
  6. 中山培训学校python
  7. 深度学习之词向量Word Embedding总结
  8. python中try的作用_Python异常处理中try,except用法?
  9. OPPO互联网java后端二面题目
  10. 工业能耗在线监测系统助力企业节能降耗,跳出限电包围圈(安科瑞 须静燕)