继之前 入门篇 和 Batch 篇 之后,今天这篇 Flink on Zeppelin 主要讲述如何在 Zeppelin 中使用 Flink 的 Streaming 功能,我们会以 2 个主要的场景来讲:

  • Streaming ETL

  • Streaming Data Analytics

准备工作

本文我们会用 Kafka 作为我们的数据源,使用 Flink SQL 处理 Kafka 中的某个 topic 数据,然后写入到另外一个 Kafka Topic。为了使用 Flink 的 Kafka connector,你需要在 Flink Interpreter 中配置 flink.execution.packages。

  • flink.execution.packages org.apache.flink:flink-connector-kafka_2.11:1.10-SNAPSHOT,org.apache.flink:flink-connector-kafka-base_2.11:1.10-SNAPSHOT,org.apache.flink:flink-json:1.10-SNAPSHOT

本文使用的 kafka 数据源是 json 格式,所以需要添加 org.apache.flink:flink-json。

另外本文的例子会使用这个 docker compose 来创建 Kafka Cluster,https://github.com/xushiyan/kafka-connect-datagen/

需要运行下面2个命令来启动kafka集群和创建kafka topic:

‍
docker-compose up -d
curl -X POST http://localhost:8083/connectors \
-H 'Content-Type:application/json' \
-H 'Accept:application/json' \
-d @connect.source.datagen.json

具体请参考这个官方链接:https://kafka-connect-datagen.readthedocs.io/en/latest/

Streaming ETL

接下里我们会用 Flink SQL 来做基于 Kafka 的 Streaming ETL。首先我们需要创建 Kafka source table 代表 kafka 中的源数据。

‍
%flink.ssqlDROP TABLE IF EXISTS source_kafka;CREATE TABLE source_kafka (status  STRING,direction STRING,event_ts BIGINT
) WITH ('connector.type' = 'kafka',       'connector.version' = 'universal','connector.topic' = 'generated.events','connector.startup-mode' = 'earliest-offset','connector.properties.zookeeper.connect' = 'localhost:2181','connector.properties.bootstrap.servers' = 'localhost:9092','connector.properties.group.id' = 'testGroup','connector.startup-mode' = 'earliest-offset','format.type'='json','update-mode' = 'append'
);

然后创建 Kafka sink table,代表清洗后的数据 (这里我们定义了 WATERMARK,是为了下一步做基于 window 的流式数据分析)。

%flink.ssqlDROP TABLE IF EXISTS sink_kafka;CREATE TABLE sink_kafka (status  STRING,direction STRING,event_ts TIMESTAMP(3),WATERMARK FOR event_ts AS event_ts - INTERVAL '5' SECOND
) WITH ('connector.type' = 'kafka',       'connector.version' = 'universal',    'connector.topic' = 'generated.events2','connector.properties.zookeeper.connect' = 'localhost:2181','connector.properties.bootstrap.servers' = 'localhost:9092','connector.properties.group.id' = 'testGroup','format.type'='json','update-mode' = 'append'
)

接下来我们就可以用 Insert Into 语句来做 Streaming ETL 的工作了。

%flink.ssqlinsert into sink_kafka select status, direction, cast(event_ts/1000000000 as timestamp(3)) from source_kafka where status <> 'foo'

这条 Insert into 语句非常简单,我们过滤掉了status 为 foo 的数据,以及将 event_ts 转化为 timestamp 类型。

然后可以用 select 语句来预览 sink table 中的数据来确认 Streaming ETL 正常工作。

Streaming Data Analytics

在完成了上面的 Streaming ETL 工作之后,我们就可以在 Zeppelin 中做流式数据分析了。在 Zeppelin 中可以用 Select 语句来做 Flink 流数据分析,Select 的结果会 push 到 Zeppelin 前端展示,可以用来做流式数据的 dashboard。

Zeppelin 支持 3 种模式的流式数据分析:

  • Single 模式

  • Update 模式

  • Append 模式

Single 模式

Single 模式适合当输出结果是一行的情况,比如下面的 Select 语句。这条 SQL 语句永远只有一行数据,但这行数据会持续不断的更新。这种模式的数据输出格式是 html 形式,用户可以用 template 来指定输出模板,{i} 是第 i 列的 placeholder。

%flink.ssql(type=single, parallelism=1, refreshInterval=3000, template=<h1>{1}</h1> until <h2>{0}</h2>)select max(event_ts), count(1) from sink_kafka

Update 模式

Update 模式适合多行输出的情况,比如下面的 select group by 语句。这种模式会定期更新这多行数据,输出是 Zeppelin 的 table 格式,所以可以用 Zeppelin 自带的可视化控件。

%flink.ssql(type=update, refreshInterval=2000, parallelism=1)select status, count(1) as pv from sink_kafka group by status

Append 模式

Append 模式适合不断有新数据输出,但不会覆盖原有数据,只会不断 append 的情况。比如下面的基于窗口的 group by 语句。Append 模式要求第一列数据类型是 timestamp,这里的 start_time 就是 timestamp 类型。

%flink.ssql(type=append, parallelism=1, refreshInterval=2000, threshold=60000)select TUMBLE_START(event_ts, INTERVAL '5' SECOND) as start_time, status, count(1) from sink_kafka
group by TUMBLE(event_ts, INTERVAL '5' SECOND), status

更多 Flink SQL 资料

本文只是简单介绍如何在 Zeppelin 中使用 Flink Streaming SQL,关于更多 Flink SQL 请参考 Flink 官方文档

  • https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/

  • https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html

Zeppelin on Flink 系列

Zeppelin on Flink (1) 入门篇

Zeppelin on Flink (2)Batch 篇

如果有碰到任何问题,请加入下面这个钉钉群讨论。后续我们会有更多 Tutorial 的文章,敬请期待。


关注 Flink 中文社区,获取更多技术干货

你也「在看」吗?????

Flink on Zeppelin (3) - Streaming 篇相关推荐

  1. Flink on Zeppelin (4) - 机器学习篇

    今天我来讲下如何在 Zeppelin 里做机器学习.机器学习的重要性我就不多说了,我们直奔主题. Flink 在机器学习这个领域发力较晚,社区版没有一个完整的机器学习算法库可以用,Alink[1]是目 ...

  2. Flink x Zeppelin ,Hive Streaming 实战解析

    行业解决方案.产品招募中!想赚钱就来传!>>> Flink 1.11 正式发布已经三周了,其中最吸引我的特性就是 Hive Streaming.正巧 Zeppelin-0.9-pre ...

  3. Flink on zeppelin第五弹设置checkpoint

    概述 Flink的exactly-once语义实现是需要依赖checkpoint的,对于一个有状态的Flink任务来说如果想要在任务发生failover,或者手动重启任务的时候任务的状态不丢失是必须要 ...

  4. Flink on Zeppelin 系列之:Yarn Application 模式支持

    简介:Zeppelin 如何实现并使用 Yarn Application 模式. 作者:章剑锋(简锋) 去年 Flink Forward 在讲 Flink on Zeppelin 这个项目的未来时我们 ...

  5. Flink面试,看这篇就足够了

    概述 2019 年是大数据实时计算领域最不平凡的一年,2019 年 1 月阿里巴巴 Blink (内部的 Flink 分支版本)开源,大数据领域一夜间从 Spark 独步天下走向了两强争霸的时代.Fl ...

  6. Flink on Zeppelin 流计算处理最佳实践

    简介: 欢迎钉钉扫描文章底部二维码进入 EMR Studio 用户交流群 直接和讲师交流讨论~ 点击以下链接直接观看直播回放:https://developer.aliyun.com/live/247 ...

  7. Flink入门看完这篇文章就够了

    文章目录 第一章:概述 第一节:什么是Flink? 第二节:Flink特点? 第三节:Flink应用场景? 第四节:Flink核心组成 第五节:Flink处理模型:流处理和批处理 第六节:流处理引擎的 ...

  8. Flink应用实战案例50篇(五)-Apache Flink 在 bilibili 的多元化探索与实践

    一.B 站实时的前世与今生 1. 生态场景辐射     说起实时计算的未来,关键词就在于数据的实效性.首先从整个大数据发展的生态上,来看它的核心场景辐射:在大数据发展的初期,核心是以面向天为粒度的离线 ...

  9. Flink应用实战案例50篇(一)- Flink SQL 在京东的优化实战

    一.背景 目前,京东搜索推荐的数据处理流程如上图所示.可以看到实时和离线是分开的,离线数据处理大部分用的是 Hive / Spark,实时数据处理则大部分用 Flink / Storm. 这就造成了以 ...

  10. Apache Zeppelin:可能是开源届最好的Flink开发平台

    原文:https://www.codenong.com/cs106935099/ 这个集成有点类似后来dinky 开源项目. 如果你是Flink的学习者或者爱好者,除了学习Flink本身之外,你是否在 ...

最新文章

  1. Python基础学习3
  2. 【学术技巧】让你的 GitHub 秒变高大上!
  3. opencv学习笔记(六)直方图比较图片相似度
  4. jquery与Ajax() 调用后台方法
  5. Powershell: powershell 获取本机IP地址
  6. python pymysql模块 链接mysql 遍历查询结果的方法 详解
  7. apache服务上配置https安全与域名请求
  8. Linux 更新 CPU microcode
  9. 模拟退火算法及MATLAB代码
  10. C++ 多线程--STL库 总结版 (详细)
  11. PhotoShopnbsp;CS5nbsp;官方中文正式原版下…
  12. 电脑接显示屏后提示计算机休眠,解决方法:主机正常。电脑显示器(hp)睡眠状态...
  13. ubuntu从一个单纯的系统到装上自己需要的一些软件的过程
  14. 【Oracle】record varray (associative array 关联数组) table (nested table type 嵌套表类型)和%type、%rowtype的使用详解
  15. 齐振宏教授 变革领导力导师
  16. Hadoop分布式集群搭建完全详细教程
  17. Distance correlation(距离相关系数)
  18. windows vmware虚拟机安装ubuntu蓝屏终极解决方法
  19. 华为云计算IE面试笔记-其它知识点
  20. 【C语言】数据的存储——M,E

热门文章

  1. Go基础:产生随机数
  2. SecureRandom生成随机数慢(阻塞)问题解决记录
  3. SLAM 领域国内外优秀实验室/开源方案(汇总)
  4. 吴恩达教授机器学习课程笔记【六】- Part 6 学习理论
  5. 终于有人把深度学习讲明白了!
  6. 树莓派 4B 配置 Ubuntu20.04 和 ROS2
  7. spark-submit(spark版本2.3.2)
  8. 【Java例题】7.5 文件题2-学生成绩统计
  9. python正则表达式(1)--特殊字符
  10. 嵌套的SQL另外一种写法