Flink 实践教程-进阶(11):SQL 关联:Regular Join
作者:腾讯云流计算 Oceanus 团队
流计算 Oceanus 简介
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
Flink SQL 提供了 Regular Joins、Interval Joins、Temporal Joins、Lookup Join、Array 展平和 Table Function 六种方式实现数据关联。
视频教程:Flink 实践教程-进阶(11):SQL 关联:Regular Join - 云+社区 - 腾讯云
本文将为您介绍如何使用 Regualr Joins 实现数据关联。Regualr Joins 在使用时有一定的限制条件,比如只能在 Equi-Join 条件下使用。下面将以 Kafka 作为源表的左右表为例,将商品订单 order-source
中商品 ID 与 product-info
中商品 ID 进行左关联得到商品名称,最终将结果数据到 Logger Sink 中去。
前置准备
创建流计算 Oceanus 集群
进入 Oceanus 控制台 [1],点击左侧【集群管理】,点击左上方【创建集群】,具体可参考 Oceanus 官方文档 创建独享集群 [2]。
创建 Kafka Topic
进入 CKafka 控制台 [3],点击左上角【新建】,即可完成 CKafka 实例的创建,并创建 2 个 Topic,order-source
和 product-info
。
流计算 Oceanus 作业
1. 上传依赖
在 Oceanus 控制台,点击左侧【依赖管理】,点击左上角【新建】新建依赖,上传 Logger Sink[4] JAR 包。
2. 创建作业
在 Oceanus 控制台,点击左侧【作业管理】,点击左上角【新建】新建作业,作业类型选择 SQL 作业,点击【开发调试】进入作业编辑页面。
CREATE TABLE `order_source` (`id` INT,`user_id` INT,`product_id` INT,`create_time` TIMESTAMP(3)
) WITH ('connector' = 'kafka','topic' = 'order-source', 'scan.startup.mode' = 'latest-offset', -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets / timestamp 的任何一种'properties.bootstrap.servers' = 'x.x.x.x:9092', -- 替换为您的 Kafka 连接地址'properties.group.id' = 'testGroup0', -- 必选参数, 一定要指定 Group ID'format' = 'json','json.fail-on-missing-field' = 'false', -- 如果设置为 false, 则遇到缺失字段不会报错。'json.ignore-parse-errors' = 'true' -- 如果设置为 true,则忽略任何解析报错。
);CREATE TABLE `product_info` (`product_id` INT,`product_name` STRING,`update_time` TIMESTAMP(3)
) WITH ('connector' = 'kafka','topic' = 'product-info', -- 替换为您要消费的 Topic'scan.startup.mode' = 'latest-offset', -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets / timestamp 的任何一种'properties.bootstrap.servers' = 'x.x.x.x:9092', -- 替换为您的 Kafka 连接地址'properties.group.id' = 'testGroup0', -- 必选参数, 一定要指定 Group ID'format' = 'json','json.fail-on-missing-field' = 'false', -- 如果设置为 false, 则遇到缺失字段不会报错。'json.ignore-parse-errors' = 'true' -- 如果设置为 true,则忽略任何解析报错。
);CREATE TABLE logger_sink_table (`id` INT PRIMARY KEY NOT ENFORCED,`user_id` INT,`product_id` INT,`product_name` STRING,`create_time` TIMESTAMP(3)
) WITH ('connector' = 'logger','print-identifier' = 'DebugData'
);INSERT INTO logger_sink_table
SELECT order_source.id, order_source.user_id, order_source.product_id, product_info.product_name, order_source.create_time
FROM order_source left join product_info
on order_source.product_id = product_info.product_id;
3. 运行作业
点击【发布草稿】后启动作业,可通过【日志】面板 TaskManager 或 Flink UI 查看运行信息。
4. 模拟数据
通过 Kafka Client 发送数据到关联的左表 order-source
和右表 product-info
。 发送消息命令:
[root@VM-3-centos ~]# cd /opt/kafka_2.11-2.4.1/bin/
[root@VM-3-centos bin]# bash kafka-console-producer.sh --broker-list 10.0.0.29:9092 --topic order-source
Topic order-source
模拟数据示例:
{"id":1,"user_id":10,"product_id":1001,"create_time":"2022-03-17 16:47:00"}
{"id":2,"user_id":10,"product_id":1001,"create_time":"2022-03-17 16:48:00"}
{"id":3,"user_id":10,"product_id":1002,"create_time":"2022-03-17 16:49:00"}
发送消息命令:
[root@VM-3-centos ~]# cd /opt/kafka_2.11-2.4.1/bin/
[root@VM-3-centos bin]# bash kafka-console-producer.sh --broker-list 10.0.0.29:9092 --topic product-info
Topic product-info
模拟数据示例:
{"id":1,"user_id":10,"product_id":1001,"create_time":"2022-03-17 16:47:00"}
{"id":2,"user_id":10,"product_id":1001,"create_time":"2022-03-17 16:48:00"}
{"id":3,"user_id":10,"product_id":1002,"create_time":"2022-03-17 16:49:00"}
更多接入方式请参考 CKafka 收发消息 [5]
5. 查看运行结果
在【日志】面板的 TaskManager 中查看收到的数据,可以看到已经关联到了 product_id 为1001的商品名称。
总结
Regular Joins 比较适合批量加载数据的场景,而当关联的右表为时常更新的维表时会出现关联不到的情况。此外,从上述运行结果可以看出:Regular Joins关联的记录为 Retract Stream(回撤流)下游需为 Upsert 类型 Sink。
更多 SQL Join 详情请参考开源 Flink官方文章 SQL Join 章节[5]。 更多 Flink 实践教程详见 流计算 Oceanus 教程[6]
参考链接
[1] Oceanus 控制台:https://console.cloud.tencent.com/oceanus/overview
[2] 创建独享集群:https://cloud.tencent.com/document/product/849/48298
[3] Kafka 控制台:https://console.cloud.tencent.com/ckafka
[4] Logger Sink 下载地址:https://cloud.tencent.com/document/product/849/58713
[5] Flink SQL Join:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/sql/queries/joins
[6] 流计算 Oceanus 教程:https://cloud.tencent.com/developer/tag/10509
Flink 实践教程-进阶(11):SQL 关联:Regular Join相关推荐
- webgis从基础到开发实践_开源WebGIS教程系列——11.1 GISLite 的开发背景与设计
地理信息门户可以帮助人们更容易地发现.访问和使用地理空间信息, 是地理信息发布.服务和共享的重要环节.许多国家都很重视地理信息门户的 建设,把它作为国家空间数据基础设施(spatial data in ...
- Flink 1.11 SQL 十余项革新大揭秘,哪些演变在便捷你的使用体验?
简介: SQL 作为 Flink 中公认的核心模块之一,对推动 Flink 流批一体功能的完善至关重要.在 1.11 中,Flink SQL 也进行了大量的增强与完善,开发大功能 10 余项,不仅扩大 ...
- Java课设对对碰_第11章对对碰游戏(图形版)(Java游戏编程原理与实践教程课件).ppt...
游戏界面和相关图片素材 11.3 程序设计的步骤 11.3.1 设计游戏窗口类(GameRoom.java) 游戏窗口类GameRoom实现游戏全部功能,继承JFrame组件实现的.是由上方Panel ...
- 操作系统形式化验证实践教程(11) - 结构化证明语言Isar(转载)
操作系统形式化验证实践教程(11) - 结构化证明语言Isar 结构化证明语言Isar基本语法 apply方法和by方法虽然可以完成功能,但是看起来更像是命令式语言.使用Isar语言,还可以写得更加形 ...
- 操作系统形式化验证实践教程(11) - 结构化证明语言Isar
操作系统形式化验证实践教程(11) - 结构化证明语言Isar 结构化证明语言Isar基本语法 apply方法和by方法虽然可以完成功能,但是看起来更像是命令式语言.使用Isar语言,还可以写得更加形 ...
- FPE修改教程进阶(地址编辑部分)
FPE修改教程进阶(地址编辑部分) 需要具备的几个初步知识 1.十六进制 十六进制是进制中的一种,是我们在进行编辑的时候将要碰到的最多的问题,你接触修改,就不可避免的会遇到进制上的换算, ...
- sql计算留存_免费教程《图解SQL面试题》
运营.产品经理.数据分析.软件开发等职位,SQL是必会的技能. 面试经常考察SQL,但是不会做,怎么办? 工作里遇到的业务问题,需要用SQL实现,怎么办? 为了帮助你解决这些问题,我写了一本免费教程& ...
- 网页图表Highcharts实践教程标之添加题副标题版权信息
网页图表Highcharts实践教程标之添加题副标题版权信息 Highcharts辅助元素 辅助元素图表的非必要元素,如标题.版权信息.标签.载入动态.它们不和图表数据发生关联,只是额外说明一些基本 ...
- NGUI全面实践教程(大学霸内部资料)
NGUI全面实践教程(大学霸内部资料) 试读文档下载地址:链接:http://pan.baidu.com/s/1jGosC9g 密码:8jq5 介绍:NGUI全面实践教程(大学霸内部资料)本书是国内N ...
- Flink的Table API 与SQL介绍及调用
1 概述 DataSetAPI和DateStreamAPI是基于整个Flink的运行时环境做操作处理的,Table API和SQL是在DateStreamAPI上又包了一层.对于新版本的Blin ...
最新文章
- 4.Azure创建点到站点的***隧道(下)
- 下载服务 php,文件下载: 云---php服务---pc
- mysql(2)—— 由笛卡尔积现象分析数据库表的连接
- 撑起百万亿参数模型想象力!英伟达发布新一代SuperPOD超算,AI算力新巅峰!
- 软件设计模式之单例模式
- Linux 进入 5.0 时代!
- 20190906:(leetcode习题)Shuffle an Array
- python下载快手视频教程_[小玩意] 用Python写了个下载快手视频的小脚本
- C#中手动引用COM组建的例子
- 人工智能、机器学习、神经网络、深度学习之间的关系
- Android 补间动画之平移动画TranslateAnimation
- 企业征信查询工具--企信宝
- html5+JS制作音乐播放器
- python3入门与进阶笔记_Python3入门与进阶【笔记】
- HDU-5238 Calculator(线段树+中国剩余定理)
- PCDATA和CDATA的区别究竟是什么呢?
- 高光谱图像分析:分类 II
- SQL Sever数据库存储过程
- 封装 用户名.计算机,Windows系统自定义封装ISO镜像
- iPhone 连接电脑后一直断开重连,删除.DS_Store文件