上周六在深圳分享了《Flink SQL 1.9.0 技术内幕和最佳实践》,会后许多小伙伴对最后演示环节的 Demo 代码非常感兴趣,迫不及待地想尝试下,所以写了这篇文章分享下这份代码。希望对于 Flink SQL 的初学者能有所帮助。完整分享可以观看 Meetup 视频回顾 :https://developer.aliyun.com/live/1416

演示代码已经开源到了 GitHub 上:https://github.com/wuchong/flink-sql-submit

这份代码主要由两部分组成:1) 能用来提交 SQL 文件的 SqlSubmit 实现。2) 用于演示的 SQL 示例、Kafka 启动停止脚本、 一份测试数据集、Kafka 数据源生成器。

通过本实战,你将学到:

  1. 如何使用 Blink Planner
  2. 一个简单的 SqlSubmit 是如何实现的
  3. 如何用 DDL 创建一个 Kafka 源表和 MySQL 结果表
  4. 运行一个从 Kafka 读取数据,计算 PVUV,并写入 MySQL 的作业
  5. 设置调优参数,观察对作业的影响

SqlSubmit 的实现

笔者一开始是想用 SQL Client 来贯穿整个演示环节,但可惜 1.9 版本 SQL CLI 还不支持处理 CREATE TABLE 语句。所以笔者就只好自己写了个简单的提交脚本。后来想想,也挺好的,可以让听众同时了解如何通过 SQL 的方式,和编程的方式使用 Flink SQL。

SqlSubmit 的主要任务是执行和提交一个 SQL 文件,实现非常简单,就是通过正则表达式匹配每个语句块。如果是 CREATE TABLE 或 INSERT INTO 开头,则会调用 tEnv.sqlUpdate(...)。如果是 SET 开头,则会将配置设置到 TableConfig 上。其核心代码主要如下所示:

EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
// 创建一个使用 Blink Planner 的 TableEnvironment, 并工作在流模式
TableEnvironment tEnv = TableEnvironment.create(settings);
// 读取 SQL 文件
List<String> sql = Files.readAllLines(path);
// 通过正则表达式匹配前缀,来区分不同的 SQL 语句
List<SqlCommandCall> calls = SqlCommandParser.parse(sql);
// 根据不同的 SQL 语句,调用 TableEnvironment 执行
for (SqlCommandCall call : calls) {switch (call.command) {case SET:String key = call.operands[0];String value = call.operands[1];// 设置参数tEnv.getConfig().getConfiguration().setString(key, value);break;case CREATE_TABLE:String ddl = call.operands[0];tEnv.sqlUpdate(ddl);break;case INSERT_INTO:String dml = call.operands[0];tEnv.sqlUpdate(dml);break;default:throw new RuntimeException("Unsupported command: " + call.command);}
}
// 提交作业
tEnv.execute("SQL Job");

使用 DDL 连接 Kafka 源表

在 flink-sql-submit 项目中,我们准备了一份测试数据集(来自阿里云天池公开数据集,特别鸣谢),位于 src/main/resources/user_behavior.log。数据以 JSON 格式编码,大概长这个样子:

{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}

为了模拟真实的 Kafka 数据源,笔者还特地写了一个 source-generator.sh 脚本(感兴趣的可以看下源码),会自动读取 user_behavior.log 的数据并以默认每毫秒1条的速率灌到 Kafka 的 user_behavior topic 中。

有了数据源后,我们就可以用 DDL 去创建并连接这个 Kafka 中的 topic(详见 src/main/resources/q1.sql)。

CREATE TABLE user_log (user_id VARCHAR,item_id VARCHAR,category_id VARCHAR,behavior VARCHAR,ts TIMESTAMP
) WITH ('connector.type' = 'kafka', -- 使用 kafka connector'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本'connector.topic' = 'user_behavior',  -- kafka topic'connector.startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取'connector.properties.0.key' = 'zookeeper.connect',  -- 连接信息'connector.properties.0.value' = 'localhost:2181', 'connector.properties.1.key' = 'bootstrap.servers','connector.properties.1.value' = 'localhost:9092', 'update-mode' = 'append','format.type' = 'json',  -- 数据源格式为 json'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则
)

注:可能有用户会觉得其中的 connector.properties.0.key 等参数比较奇怪,社区计划将在下一个版本中改进并简化 connector 的参数配置。

使用 DDL 连接 MySQL 结果表

连接 MySQL 可以使用 Flink 提供的 JDBC connector。例如

CREATE TABLE pvuv_sink (dt VARCHAR,pv BIGINT,uv BIGINT
) WITH ('connector.type' = 'jdbc', -- 使用 jdbc connector'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', -- jdbc url'connector.table' = 'pvuv_sink', -- 表名'connector.username' = 'root', -- 用户名'connector.password' = '123456', -- 密码'connector.write.flush.max-rows' = '1' -- 默认5000条,为了演示改为1条
)

PV UV 计算

假设我们的需求是计算每小时全网的用户访问量,和独立用户数。很多用户可能会想到使用滚动窗口来计算。但这里我们介绍另一种方式。即 Group Aggregation 的方式。

INSERT INTO pvuv_sink
SELECTDATE_FORMAT(ts, 'yyyy-MM-dd HH:00') dt,COUNT(*) AS pv,COUNT(DISTINCT user_id) AS uv
FROM user_log
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00')

它使用 DATE_FORMAT 这个内置函数,将日志时间归一化成“年月日小时”的字符串格式,并根据这个字符串进行分组,即根据每小时分组,然后通过 COUNT(*) 计算用户访问量(PV),通过 COUNT(DISTINCT user_id) 计算独立用户数(UV)。这种方式的执行模式是每收到一条数据,便会进行基于之前计算的值做增量计算(如+1),然后将最新结果输出。所以实时性很高,但输出量也大。

我们将这个查询的结果,通过 INSERT INTO 语句,写到了之前定义的 pvuv_sink MySQL 表中。

注:在深圳 Meetup 中,我们有对这种查询的性能调优做了深度的介绍。

实战演示

环境准备

本实战演示环节需要安装一些必须的服务,包括:

  • Flink 本地集群:用来运行 Flink SQL 任务。
  • Kafka 本地集群:用来作为数据源。
  • MySQL 数据库:用来作为结果表。
  • Flink 本地集群安装

1.下载 Flink 1.9.0 安装包并解压:https://www.apache.org/dist/flink/flink-1.9.0/flink-1.9.0-bin-scala_2.11.tgz
2.下载以下依赖 jar 包,并拷贝到 flink-1.9.0/lib/ 目录下。因为我们运行时需要依赖各个 connector 实现。

  • flink-sql-connector-kafka_2.11-1.9.0.jar
    http://central.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.9.0/flink-sql-connector-kafka_2.11-1.9.0.jar
  • flink-json-1.9.0-sql-jar.jar
    http://central.maven.org/maven2/org/apache/flink/flink-json/1.9.0/flink-json-1.9.0-sql-jar.jar
  • flink-jdbc_2.11-1.9.0.jar
    http://central.maven.org/maven2/org/apache/flink/flink-jdbc_2.11/1.9.0/flink-jdbc_2.11-1.9.0.jar
  • mysql-connector-java-5.1.48.jar
    https://dev.mysql.com/downloads/connector/j/5.1.html

3.将 flink-1.9.0/conf/flink-conf.yaml 中的 taskmanager.numberOfTaskSlots 修改成 10,因为我们的演示任务可能会消耗多于1个的 slot。
4.在 flink-1.9.0 目录下执行 ./bin/start-cluster.sh,启动集群。

运行成功的话,可以在 http://localhost:8081 访问到 Flink Web UI。

另外,还需要将 Flink 的安装路径填到 flink-sql-submit 项目的 env.sh 中,用于后面提交 SQL 任务,如我的路径是

FLINK_DIR=/Users/wuchong/dev/install/flink-1.9.0

Kafka 本地集群安装

下载 Kafka 2.2.0 安装包并解压:https://www.apache.org/dist/kafka/2.2.0/kafka_2.11-2.2.0.tgz

将安装路径填到 flink-sql-submit 项目的 env.sh 中,如我的路径是

KAFKA_DIR=/Users/wuchong/dev/install/kafka_2.11-2.2.0

在 flink-sql-submit 目录下运行 ./start-kafka.sh 启动 Kafka 集群。

在命令行执行 jps,如果看到 Kafka 进程和 QuorumPeerMain 进程即表明启动成功。

MySQL 安装

可以在官方页面下载 MySQL 并安装:
https://dev.mysql.com/downloads/mysql/
如果有 Docker 环境的话,也可以直接通过 Docker 安装
https://hub.docker.com/_/mysql

$ docker pull mysql
$ docker run --name mysqldb -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql

然后在 MySQL 中创建一个 flink-test 的数据库,并按照上文的 schema 创建 pvuv_sink 表。

提交 SQL 任务

1.在 flink-sql-submit 目录下运行 ./source-generator.sh,会自动创建 user_behavior topic,并实时往里灌入数据。

2.在 flink-sql-submit 目录下运行 ./run.sh q1, 提交成功后,可以在 Web UI 中看到拓扑。

在 MySQL 客户端,我们也可以实时地看到每个小时的 pv uv 值在不断地变化

结尾

本文带大家搭建基础集群环境,并使用 SqlSubmit 提交纯 SQL 任务来学习了解如何连接外部系统。flink-sql-submit/src/main/resources/q1.sql 中还有一些注释掉的调优参数,感兴趣的同学可以将参数打开,观察对作业的影响。关于这些调优参数的原理,可以看下我在 深圳 Meetup 上的分享《Flink SQL 1.9.0 技术内幕和最佳实践》。

原文链接
本文为云栖社区原创内容,未经允许不得转载。

Flink 1.9 实战:使用 SQL 读取 Kafka 并写入 MySQL相关推荐

  1. goland sql 脚本运行_Flink 1.9 实战:使用 SQL 读取 Kafka 并写入 MySQL

    SqlSubmit 的实现 笔者一开始是想用 SQL Client 来贯穿整个演示环节,但可惜 1.9 版本 SQL CLI 还不支持处理 CREATE TABLE 语句.所以笔者就只好自己写了个简单 ...

  2. kafka学习_《从0到1学习Flink》—— Flink 读取 Kafka 数据写入到 RabbitMQ

    前言 之前有文章 <从0到1学习Flink>-- Flink 写入数据到 Kafka 写过 Flink 将处理后的数据后发到 Kafka 消息队列中去,当然我们常用的消息队列可不止这一种, ...

  3. Python3读取kafka消息写入HBASE

    全栈工程师开发手册 (作者:栾鹏) 架构系列文章 ubunut系统下hbase的安装和使用参考:https://blog.csdn.net/luanpeng825485697/article/deta ...

  4. java读写德卡数据_Spark Streaming 读取Kafka数据写入ES

    简介: 目前项目中已有多个渠道到Kafka的数据处理,本文主要记录通过Spark Streaming 读取Kafka中的数据,写入到Elasticsearch,达到一个实时(严格来说,是近实时,刷新时 ...

  5. fink sql 读取 kafka 的数据写到 kafka

    版本: flink 1.12  平台:streamX -- source CREATE TABLE `stg_access_its_rt_kafka` (`message` STRING COMMEN ...

  6. Flink1.11 读取kafka数据写入hive,未完待续

    昨天晚上Flink1.11出了,这次改动很多,我只关心hive这一部分. 目前尝试了几个小时用代码读取hive,安装官网的文档,没成功,先蹭个热点,记录下. 先贴一下依赖吧: 注意:反正各种报错,看社 ...

  7. Logstash读取Kafka数据写入HDFS详解

    强大的功能,丰富的插件,让logstash在数据处理的行列中出类拔萃 通常日志数据除了要入ES提供实时展示和简单统计外,还需要写入大数据集群来提供更为深入的逻辑处理,前边几篇ELK的文章介绍过利用lo ...

  8. sql注入新手入门 从实战讲解SQL注入(手动注入+MySQL+靶场源码)

    资源放在下面,自取 https://blog.csdn.net/qq_45514735?spm=1011.2266.3001.5343 1.我们首先用小皮面板和sqli-labs靶场源码搭建一个环境 ...

  9. spark读取oracle写入kafka,spark读取kafka文件写入hive

    1.将hdfs-site,core-site.hive-site文件拷贝到resources目录下 image.png 2.添加maven依赖 org.apache.spark spark-strea ...

最新文章

  1. DNC-cs6200 ospfv3
  2. HDU 1248 寒冰王座(全然背包:入门题)
  3. 临床观察性研究论文如何撰写“方法”?
  4. c语言修改elf文件crc32,ELF文件中调试信息的格式说明?
  5. Oracle(一):简介、安装、DDL、DML、JDBC、导入导出
  6. Java 用栈实现队列
  7. ppt怎么一次性改全部字体_PPT的字体怎么选择?
  8. 【转】C#的内存管理:堆栈、托管堆与指针
  9. Uber 开源深度学习分布训练库 Petastorm
  10. 【李宏毅2020 ML/DL】P57 Unsupervised Learning - Linear Methods | PCA Matrix Factorization
  11. Android通过第三方软件打开Word、Excel、PPT、PDF等文档
  12. 直接管理和维护计算机系统的程序称为,全国2008年4月高等教育自学考试计算机原理试题...
  13. 手把手教python发送邮件
  14. 深入理解计算机系统-cachelab
  15. 如何学习云计算这一文就够了!
  16. Tiny Heroes:道高一尺,魔高一丈
  17. matlab小数分数互换
  18. pagehelper插件显示total为-1或1
  19. tzc 1345 推箱子
  20. python dataset_数据管道Dataset

热门文章

  1. 打桩时不修改源码_考研网上报名点了修改报名,修改后到报名点时,出现部分信息不符合参考条件,请修改或重新报名...
  2. python怎么理解_讨论 - 廖雪峰的官方网站
  3. java逸出_Java并发编程 - 对象的共享
  4. php链接本地mysql数据库配置文件_php连接 mysql 数据库如何添加一个公共的配置文件...
  5. spring 2.0核心技术与最佳实践 pdf_推荐 Spring Boot 实践学习案例大全 数据缓存 和中间件 安全权限...
  6. java swt 下拉列表_求助:SWT 下拉列表
  7. leetcode 4 --- 寻找两个有序数组的中位数
  8. 开题报告方案论证_观点讨论研究方案、立项申报书、开题报告的联系和区别
  9. 各类木材强度_凯狄解析各类抽芯铆钉的工作原理
  10. 会议容易中吗_在装配式建筑中重要又容易被忽视的部分,你中招了吗?