Flink on Zeppelin (2) - Batch 篇
在 Flink on Zeppelin 入门篇 中我们讲述了如何配置 Zeppelin + Flink 来运行一个最简单的 WordCount 例子。本文将讲述如何使用 Flink SQL + UDF 来做 Batch ETL 和 BI 数据分析的任务。
Flink Interpreter 类型
首先介绍下 Zeppelin 中的 Flink Interpreter 类型。Zeppelin 的 Flink Interpreter 支持 Flink 的所有 API (DataSet, DataStream, Table API )。语言方面支持 Scala,Python,SQL。下图是 Zeppelin 中支持的不同场景下的 Flink Interpreter。
Name |
Class |
Description |
%flink |
FlinkInterpreter |
Creates ExecutionEnvironment/StreamExecutionEnvironment/BatchTableEnvironment/StreamTableEnvironment and provides a Scala environment |
%flink.pyflink |
PyFlinkInterpreter |
Provides a python environment |
%flink.ipyflink |
IPyFlinkInterpreter |
Provides an ipython environment |
%flink.ssql |
FlinkStreamSqlInterpreter |
Provides a stream sql environment |
%flink.bsql |
FlinkBatchSqlInterpreter |
Provides a batch sql environment |
配置 Flink Interpreter
下图例举了所有重要的 Flink 配置信息,除此之外你还可以配置任意 Flink 的Configuration。(https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html)
Property |
Default |
Description |
FLINK_HOME |
Flink 的安装目录 |
|
HADOOP_CONF_DIR |
Hadoop 配置信息目录 |
|
HIVE_CONF_DIR |
Hive 配置信息目录 |
|
flink.execution.mode |
local |
Execution mode of flink, e.g. local | yarn | remote |
flink.execution.remote.host |
jobmanager hostname if it is remote mode |
|
flink.execution.remote.port |
jobmanager port if it is remote mode |
|
flink.jm.memory |
1024 |
Total number of memory(mb) of JobManager |
flink.tm.memory |
1024 |
Total number of memory(mb) of TaskManager |
flink.yarn.appName |
Zeppelin Flink Session |
Yarn app name |
flink.yarn.queue |
queue name of yarn app |
|
flink.execution.jars |
additional user jars (comma separated) |
|
flink.execution.packages |
additional user packages (comma separated), e.g. org.apache.flink:flink-connector-kafka_2.11:1.10,org.apache.flink:flink-connector-kafka-base_2.11:1.10,org.apache.flink:flink-json:1.10 |
|
zeppelin.pyflink.python |
python |
python binary executable for PyFlink |
table.exec.resource.default-parallelism |
1 |
Default parallelism for flink sql job |
zeppelin.flink.scala.color |
true |
whether display scala shell output in colorful format |
zeppelin.flink.enableHive |
false |
whether enable hive |
zeppelin.flink.printREPLOutput |
true |
Print REPL output |
zeppelin.flink.maxResult |
1 |
max number of rows returned by sql interpreter |
StreamExecutionEnvironment, ExecutionEnvironment, StreamTableEnvironment, BatchTableEnvironment
Flink Interpreter (%flink) 为用户自动创建了下面 6 个变量作为 Flink Scala 程序的入口。
senv (StreamExecutionEnvironment),
benv (ExecutionEnvironment)
stenv (StreamTableEnvironment for blink planner)
btenv (BatchTableEnvironment for blink planner)
stenv_2 (StreamTableEnvironment for flink planner)
btenv_2 (BatchTableEnvironment for flink planner)
PyFlinkInterpreter (%flink.pyflink, %flink.ipyflink) 为用户自动创建了 6 个 Python 变量作为 PyFlink 程序的入口
s_env (StreamExecutionEnvironment),
b_env (ExecutionEnvironment)
st_env (StreamTableEnvironment for blink planner)
bt_env (BatchTableEnvironment for blink planner)
st_env_2 (StreamTableEnvironment for flink planner)
bt_env_2 (BatchTableEnvironment for flink planner)
Blink/Flink Planner
Flink 1.10 中有 2 种 table api 的 planner:flink & blink。
如果你用 DataSet API 以及需要把 DataSet 转换成 Table,那么就需要使用 Flink planner 的 TableEnvironment (btenv_2 and stenv_2)。
其他场景下, 我们都会建议用户使用 blink planner。这也是 Flink SQL 使用的 planner(%flink.bsql & %flink.ssql)。
使用 Flink Batch SQL
%flink.bsql 是用来执行 Flink 的 batch sql. 运行 help 命令可以得到所有可用的命令。
总的来说,Flink Batch SQL 可以用来做 2 大任务:
使用 insert into 语句来做 Batch ETL
使用 select 语句来做 BI 数据分析
基于 Bank 数据的 Batch ETL
下面我们基于 Bank (https://archive.ics.uci.edu/ml/datasets/bank+marketing)数据来做 Batch ETL 任务。首先用 Flink SQL 创建一个 raw 数据的 source table,以及清洗干净后的 sink table。
然后再定义 Table Function 来 parse raw data。
接下来就可以用 insert into 语句来进行数据转换(source table --> sink table)。
用 select 语句来 Preview 最终数据,验证 insert into 语句的正确性。
基于 Bank 数据的 BI 数据分析
经过上面的数据清洗工作,接下来就可以对数据进行分析了。用户不仅可以使用标准的 SQL Select 语句进行分析,也可以使用 Zeppelin 的 dynamic forms 来增加交互性(TextBox,Select,Checkbox)。
使用 Flink UDF
SQL 虽然强大,但表达能力毕竟有限。有时候就要借助于 UDF 来表达更复杂的逻辑。Flink Interpreter 支持 2 种 UDF (Scala + Python)。下面是 2 个简单的例子。
Scala UDF
%flinkclass ScalaUpper extends ScalarFunction {def eval(str: String) = str.toUpperCase
}btenv.registerFunction("scala_upper", new ScalaUpper()
Python UDF
%flink.pyflinkclass PythonUpper(ScalarFunction):def eval(self, s):return s.upper()bt_env.register_function("python_upper", udf(PythonUpper(), DataTypes.STRING(), DataTypes.STRING()))
创建完 UDF 之后,你就可以在 SQL 里使用了。
对 Hive 数据的数据分析
除了可以分析 Flink SQL 创建的 table 之外,Flink 也可以分析 Hive 上已有的 table。如果要让 Flink Interpreter 使用 Hive,那么需要做以下配置:
设置 zeppelin.flink.enableHive 为 true
Copy 下面这些 dependencies 到 Flink 的 lib 目录
flink-connector-hive_{scala_version}-{flink.version}.jar
flink-hadoop-compatibility_{scala_version}-{flink.version}.jar
flink-shaded-hadoop-2-uber-{hadoop.version}-{flink-shaded.version}.jar
hive-exec-2.x.jar (for Hive 1.x, you need to copy hive-exec-1.x.jar, hive-metastore-1.x.jar, libfb303-0.9.2.jar and libthrift-0.9.2.jar)
在 Flink interpreter setting 里或者 zeppelin-env.sh 里指定 HIVE_CONF_DIR
在 Flink interpreter setting 指定 zeppelin.flink.hive.version 为你使用的 Hive 版本
下面就用一个简单的例子展示如何在 Zeppelin 中用 Flink 查询 Hive table。
1. 用 Zeppelin 的 jdbc interpreter 查询 Hive tables
2. 用 Flink SQL 查询 Hive table 的 schema
3. 用 Flink SQL 查询 Hive table
更多 Flink SQL 资料
本文只是简单介绍如何在 Zeppelin 中使用 Flink SQL + UDF,关于更多 Flink SQL 和 UDF 请参考 Flink 官方文档:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/systemFunctions.html
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/udfs.html
如果有碰到任何问题,请加入下面这个钉钉群讨论。后续我们会有更多 Tutorial 的文章,敬请期待。
关注 Flink 中文社区,获取更多技术干货
你也「在看」吗?????
Flink on Zeppelin (2) - Batch 篇相关推荐
- Flink on Zeppelin (4) - 机器学习篇
今天我来讲下如何在 Zeppelin 里做机器学习.机器学习的重要性我就不多说了,我们直奔主题. Flink 在机器学习这个领域发力较晚,社区版没有一个完整的机器学习算法库可以用,Alink[1]是目 ...
- Flink on Zeppelin 系列之:Yarn Application 模式支持
简介:Zeppelin 如何实现并使用 Yarn Application 模式. 作者:章剑锋(简锋) 去年 Flink Forward 在讲 Flink on Zeppelin 这个项目的未来时我们 ...
- Flink x Zeppelin ,Hive Streaming 实战解析
行业解决方案.产品招募中!想赚钱就来传!>>> Flink 1.11 正式发布已经三周了,其中最吸引我的特性就是 Hive Streaming.正巧 Zeppelin-0.9-pre ...
- Flink on zeppelin第五弹设置checkpoint
概述 Flink的exactly-once语义实现是需要依赖checkpoint的,对于一个有状态的Flink任务来说如果想要在任务发生failover,或者手动重启任务的时候任务的状态不丢失是必须要 ...
- Flink on Zeppelin 流计算处理最佳实践
简介: 欢迎钉钉扫描文章底部二维码进入 EMR Studio 用户交流群 直接和讲师交流讨论~ 点击以下链接直接观看直播回放:https://developer.aliyun.com/live/247 ...
- 从flink-example分析flink组件(1)WordCount batch实战及源码分析
上一章<windows下flink示例程序的执行> 简单介绍了一下flink在windows下如何通过flink-webui运行已经打包完成的示例程序(jar),那么我们为什么要使用fli ...
- Flink面试,看这篇就足够了
概述 2019 年是大数据实时计算领域最不平凡的一年,2019 年 1 月阿里巴巴 Blink (内部的 Flink 分支版本)开源,大数据领域一夜间从 Spark 独步天下走向了两强争霸的时代.Fl ...
- Flink应用实战案例50篇(一)- Flink SQL 在京东的优化实战
一.背景 目前,京东搜索推荐的数据处理流程如上图所示.可以看到实时和离线是分开的,离线数据处理大部分用的是 Hive / Spark,实时数据处理则大部分用 Flink / Storm. 这就造成了以 ...
- Flink入门看完这篇文章就够了
文章目录 第一章:概述 第一节:什么是Flink? 第二节:Flink特点? 第三节:Flink应用场景? 第四节:Flink核心组成 第五节:Flink处理模型:流处理和批处理 第六节:流处理引擎的 ...
- Flink应用实战案例50篇(五)-Apache Flink 在 bilibili 的多元化探索与实践
一.B 站实时的前世与今生 1. 生态场景辐射 说起实时计算的未来,关键词就在于数据的实效性.首先从整个大数据发展的生态上,来看它的核心场景辐射:在大数据发展的初期,核心是以面向天为粒度的离线 ...
最新文章
- Codeforces 300E(数学)
- C#进行单击操作、单击位置记录、捕获全局左右键单击事件
- CVPR2020|Facebook PIFuHD:二维图像生成高质量、高细节三维人物
- HashMap 1.8 源码解析以及非线程安全分析
- 新加坡暂停建设新的数据中心
- 一步步学习微软InfoPath2010和SP2010--第三章节--表单设计基础:处理InfoPath布局、控件和视图(4)--控件属性功能区...
- Java实现插入排序及其优化 Shell Sort
- UNIX 高手的 20 个习惯
- java和opencv配置_Java——OpenCVWindows配置和项目中jar包的简单配置
- MySQL名字的年夜小写敏感性
- 第四代双模5G旗舰:vivo X30系列为啥「超有梗」?
- python opencv findcontours_OpenCV之视频分析 – 背景消除与前景ROI提取
- 嵌入式电路设计(符号库和封装库)
- 美团打车低至1分,前三月司机零抽成,快车市场将再起波澜
- sizeof(std::string) 的大小
- 机器之心深度研学社每周干货:2017年第13周
- 转行做程序员,培训or自学?过来人亲身经历良心分享
- 【田间连着车间、佘太酒业这十年!
- Hadoop高可用原理及环境搭建
- 拼多多618来了:500余家官旗齐聚“品牌专场”,发放30亿消费红包
热门文章
- 实战Node—幼教平台
- cmd for语句的用法
- 论文笔记_S2D.24_2014-ECCV_LSD-SLAM: 基于直接法的大范围单目即时定位和地图构建方法
- 巨星陨落!图灵奖得主Edmund Clarke感染新冠逝世,教计算机自己检查错误的人走了...
- 请实现一个函数,将一个字符串中的每个空格替换成...
- Excel word PDF导入导出 Easy POI
- 结队编程思路及感悟(python、tkinter、mysql、腾讯云短信等)
- Angular2 的 View Encapsulation(样式封装)
- Orchard详解--第五篇 CacheManager
- Makedown文件保存问题