在 Flink on Zeppelin 入门篇 中我们讲述了如何配置 Zeppelin + Flink 来运行一个最简单的 WordCount 例子。本文将讲述如何使用 Flink SQL + UDF 来做 Batch ETL 和 BI 数据分析的任务。

Flink Interpreter 类型

首先介绍下 Zeppelin 中的 Flink Interpreter 类型。Zeppelin 的 Flink Interpreter 支持 Flink 的所有 API (DataSet, DataStream, Table API )。语言方面支持 Scala,Python,SQL。下图是 Zeppelin 中支持的不同场景下的 Flink Interpreter。

Name

Class

Description

%flink

FlinkInterpreter

Creates ExecutionEnvironment/StreamExecutionEnvironment/BatchTableEnvironment/StreamTableEnvironment and provides a Scala environment

%flink.pyflink

PyFlinkInterpreter

Provides a python environment

%flink.ipyflink

IPyFlinkInterpreter

Provides an ipython environment

%flink.ssql

FlinkStreamSqlInterpreter

Provides a stream sql environment

%flink.bsql

FlinkBatchSqlInterpreter

Provides a batch sql environment

配置 Flink Interpreter

下图例举了所有重要的 Flink 配置信息,除此之外你还可以配置任意 Flink 的Configuration。(https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html)

Property

Default

Description

FLINK_HOME

Flink 的安装目录

HADOOP_CONF_DIR

Hadoop 配置信息目录

HIVE_CONF_DIR

Hive 配置信息目录

flink.execution.mode

local

Execution mode of flink, e.g. local | yarn | remote

flink.execution.remote.host

jobmanager hostname if it is remote mode

flink.execution.remote.port

jobmanager port if it is remote mode

flink.jm.memory

1024

Total number of memory(mb) of JobManager

flink.tm.memory

1024

Total number of memory(mb) of TaskManager

flink.yarn.appName

Zeppelin Flink Session

Yarn app name‍

flink.yarn.queue

queue name of yarn app

flink.execution.jars

additional user jars (comma separated)‍‍‍

flink.execution.packages

additional user packages (comma separated), e.g. org.apache.flink:flink-connector-kafka_2.11:1.10,org.apache.flink:flink-connector-kafka-base_2.11:1.10,org.apache.flink:flink-json:1.10

zeppelin.pyflink.python

python

python binary executable for PyFlink

table.exec.resource.default-parallelism

1

Default parallelism for flink sql job

zeppelin.flink.scala.color

true

whether display scala shell output in colorful format

zeppelin.flink.enableHive

false

whether enable hive

zeppelin.flink.printREPLOutput

true

Print REPL output

zeppelin.flink.maxResult

1

max number of rows returned by sql interpreter

‍‍‍

StreamExecutionEnvironment, ExecutionEnvironment, StreamTableEnvironment, BatchTableEnvironment

Flink Interpreter (%flink) 为用户自动创建了下面 6 个变量作为 Flink Scala 程序的入口。

  • senv (StreamExecutionEnvironment),

  • benv (ExecutionEnvironment)

  • stenv (StreamTableEnvironment for blink planner)

  • btenv (BatchTableEnvironment for blink planner)

  • stenv_2 (StreamTableEnvironment for flink planner)

  • btenv_2 (BatchTableEnvironment for flink planner)

PyFlinkInterpreter (%flink.pyflink, %flink.ipyflink) 为用户自动创建了 6 个 Python 变量作为 PyFlink 程序的入口

  • s_env (StreamExecutionEnvironment),

  • b_env (ExecutionEnvironment)

  • st_env (StreamTableEnvironment for blink planner)

  • bt_env (BatchTableEnvironment for blink planner)

  • st_env_2 (StreamTableEnvironment for flink planner)

  • bt_env_2 (BatchTableEnvironment for flink planner)

Blink/Flink Planner

Flink 1.10 中有 2 种 table api 的 planner:flink & blink。

  • 如果你用 DataSet API 以及需要把 DataSet 转换成 Table,那么就需要使用 Flink planner 的 TableEnvironment (btenv_2 and stenv_2)。

  • 其他场景下, 我们都会建议用户使用 blink planner。这也是 Flink SQL 使用的 planner(%flink.bsql & %flink.ssql)。

使用 Flink Batch SQL

%flink.bsql 是用来执行 Flink 的 batch sql. 运行 help 命令可以得到所有可用的命令。

总的来说,Flink Batch SQL 可以用来做 2 大任务:

  • 使用 insert into 语句来做 Batch ETL

  • 使用 select 语句来做 BI 数据分析

基于 Bank 数据的 Batch ETL

下面我们基于 Bank (https://archive.ics.uci.edu/ml/datasets/bank+marketing)数据来做 Batch ETL 任务。首先用 Flink SQL 创建一个 raw 数据的 source table,以及清洗干净后的 sink table。

然后再定义 Table Function 来 parse raw data。

接下来就可以用 insert into 语句来进行数据转换(source table --> sink table)。

用 select 语句来 Preview 最终数据,验证 insert into 语句的正确性。

基于 Bank 数据的 BI 数据分析

经过上面的数据清洗工作,接下来就可以对数据进行分析了。用户不仅可以使用标准的 SQL Select 语句进行分析,也可以使用 Zeppelin 的 dynamic forms 来增加交互性(TextBox,Select,Checkbox)。

  • 使用 Flink UDF

SQL 虽然强大,但表达能力毕竟有限。有时候就要借助于 UDF 来表达更复杂的逻辑。Flink Interpreter 支持 2 种 UDF (Scala + Python)。下面是 2 个简单的例子。

  • Scala UDF

%flinkclass ScalaUpper extends ScalarFunction {def eval(str: String) = str.toUpperCase
}btenv.registerFunction("scala_upper", new ScalaUpper()
  • Python UDF

%flink.pyflinkclass PythonUpper(ScalarFunction):def eval(self, s):return s.upper()bt_env.register_function("python_upper", udf(PythonUpper(), DataTypes.STRING(), DataTypes.STRING()))

创建完 UDF 之后,你就可以在 SQL 里使用了。

对 Hive 数据的数据分析

除了可以分析 Flink SQL 创建的 table 之外,Flink 也可以分析 Hive 上已有的 table。如果要让 Flink Interpreter 使用 Hive,那么需要做以下配置:

  • 设置 zeppelin.flink.enableHive 为 true

  • Copy 下面这些 dependencies 到 Flink 的 lib 目录

    • flink-connector-hive_{scala_version}-{flink.version}.jar

    • flink-hadoop-compatibility_{scala_version}-{flink.version}.jar

    • flink-shaded-hadoop-2-uber-{hadoop.version}-{flink-shaded.version}.jar

    • hive-exec-2.x.jar (for Hive 1.x, you need to copy hive-exec-1.x.jar, hive-metastore-1.x.jar, libfb303-0.9.2.jar and libthrift-0.9.2.jar)

  • 在 Flink interpreter setting 里或者 zeppelin-env.sh 里指定 HIVE_CONF_DIR

  • 在 Flink interpreter setting 指定 zeppelin.flink.hive.version 为你使用的 Hive 版本

下面就用一个简单的例子展示如何在 Zeppelin 中用 Flink 查询 Hive table。

1. 用 Zeppelin 的 jdbc interpreter 查询 Hive tables

2. 用 Flink SQL 查询 Hive table 的 schema

3. 用 Flink SQL 查询 Hive table

更多 Flink SQL 资料

本文只是简单介绍如何在 Zeppelin 中使用 Flink SQL + UDF,关于更多 Flink SQL 和 UDF 请参考 Flink 官方文档:

  • https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/

  • https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html

  • https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/systemFunctions.html

  • https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/udfs.html

如果有碰到任何问题,请加入下面这个钉钉群讨论。后续我们会有更多 Tutorial 的文章,敬请期待。


关注 Flink 中文社区,获取更多技术干货

你也「在看」吗?????

Flink on Zeppelin (2) - Batch 篇相关推荐

  1. Flink on Zeppelin (4) - 机器学习篇

    今天我来讲下如何在 Zeppelin 里做机器学习.机器学习的重要性我就不多说了,我们直奔主题. Flink 在机器学习这个领域发力较晚,社区版没有一个完整的机器学习算法库可以用,Alink[1]是目 ...

  2. Flink on Zeppelin 系列之:Yarn Application 模式支持

    简介:Zeppelin 如何实现并使用 Yarn Application 模式. 作者:章剑锋(简锋) 去年 Flink Forward 在讲 Flink on Zeppelin 这个项目的未来时我们 ...

  3. Flink x Zeppelin ,Hive Streaming 实战解析

    行业解决方案.产品招募中!想赚钱就来传!>>> Flink 1.11 正式发布已经三周了,其中最吸引我的特性就是 Hive Streaming.正巧 Zeppelin-0.9-pre ...

  4. Flink on zeppelin第五弹设置checkpoint

    概述 Flink的exactly-once语义实现是需要依赖checkpoint的,对于一个有状态的Flink任务来说如果想要在任务发生failover,或者手动重启任务的时候任务的状态不丢失是必须要 ...

  5. Flink on Zeppelin 流计算处理最佳实践

    简介: 欢迎钉钉扫描文章底部二维码进入 EMR Studio 用户交流群 直接和讲师交流讨论~ 点击以下链接直接观看直播回放:https://developer.aliyun.com/live/247 ...

  6. 从flink-example分析flink组件(1)WordCount batch实战及源码分析

    上一章<windows下flink示例程序的执行> 简单介绍了一下flink在windows下如何通过flink-webui运行已经打包完成的示例程序(jar),那么我们为什么要使用fli ...

  7. Flink面试,看这篇就足够了

    概述 2019 年是大数据实时计算领域最不平凡的一年,2019 年 1 月阿里巴巴 Blink (内部的 Flink 分支版本)开源,大数据领域一夜间从 Spark 独步天下走向了两强争霸的时代.Fl ...

  8. Flink应用实战案例50篇(一)- Flink SQL 在京东的优化实战

    一.背景 目前,京东搜索推荐的数据处理流程如上图所示.可以看到实时和离线是分开的,离线数据处理大部分用的是 Hive / Spark,实时数据处理则大部分用 Flink / Storm. 这就造成了以 ...

  9. Flink入门看完这篇文章就够了

    文章目录 第一章:概述 第一节:什么是Flink? 第二节:Flink特点? 第三节:Flink应用场景? 第四节:Flink核心组成 第五节:Flink处理模型:流处理和批处理 第六节:流处理引擎的 ...

  10. Flink应用实战案例50篇(五)-Apache Flink 在 bilibili 的多元化探索与实践

    一.B 站实时的前世与今生 1. 生态场景辐射     说起实时计算的未来,关键词就在于数据的实效性.首先从整个大数据发展的生态上,来看它的核心场景辐射:在大数据发展的初期,核心是以面向天为粒度的离线 ...

最新文章

  1. Codeforces 300E(数学)
  2. C#进行单击操作、单击位置记录、捕获全局左右键单击事件
  3. CVPR2020|Facebook PIFuHD:二维图像生成高质量、高细节三维人物
  4. HashMap 1.8 源码解析以及非线程安全分析
  5. 新加坡暂停建设新的数据中心
  6. 一步步学习微软InfoPath2010和SP2010--第三章节--表单设计基础:处理InfoPath布局、控件和视图(4)--控件属性功能区...
  7. Java实现插入排序及其优化 Shell Sort
  8. UNIX 高手的 20 个习惯
  9. java和opencv配置_Java——OpenCVWindows配置和项目中jar包的简单配置
  10. MySQL名字的年夜小写敏感性
  11. 第四代双模5G旗舰:vivo X30系列为啥「超有梗」?
  12. python opencv findcontours_OpenCV之视频分析 – 背景消除与前景ROI提取
  13. 嵌入式电路设计(符号库和封装库)
  14. 美团打车低至1分,前三月司机零抽成,快车市场将再起波澜
  15. sizeof(std::string) 的大小
  16. 机器之心深度研学社每周干货:2017年第13周
  17. 转行做程序员,培训or自学?过来人亲身经历良心分享
  18. 【田间连着车间、佘太酒业这十年!
  19. Hadoop高可用原理及环境搭建
  20. 拼多多618来了:500余家官旗齐聚“品牌专场”,发放30亿消费红包

热门文章

  1. 实战Node—幼教平台
  2. cmd for语句的用法
  3. 论文笔记_S2D.24_2014-ECCV_LSD-SLAM: 基于直接法的大范围单目即时定位和地图构建方法
  4. 巨星陨落!图灵奖得主Edmund Clarke感染新冠逝世,教计算机自己检查错误的人走了...
  5. 请实现一个函数,将一个字符串中的每个空格替换成...
  6. Excel word PDF导入导出 Easy POI
  7. 结队编程思路及感悟(python、tkinter、mysql、腾讯云短信等)
  8. Angular2 的 View Encapsulation(样式封装)
  9. Orchard详解--第五篇 CacheManager
  10. Makedown文件保存问题