作者:腾讯云流计算 Oceanus 团队

流计算 Oceanus 简介

流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。

Flink SQL 提供了 Regular Joins、Interval Joins、Temporal Joins、Lookup Join、Array 展平和 Table Function 六种方式实现数据关联。

视频教程:Flink 实践教程-进阶(11):SQL 关联:Regular Join - 云+社区 - 腾讯云

本文将为您介绍如何使用 Regualr Joins 实现数据关联。Regualr Joins 在使用时有一定的限制条件,比如只能在 Equi-Join 条件下使用。下面将以 Kafka 作为源表的左右表为例,将商品订单 order-source 中商品 ID 与 product-info 中商品 ID 进行左关联得到商品名称,最终将结果数据到 Logger Sink 中去。

前置准备

创建流计算 Oceanus 集群

进入 Oceanus 控制台 [1],点击左侧【集群管理】,点击左上方【创建集群】,具体可参考 Oceanus 官方文档 创建独享集群 [2]。

创建 Kafka  Topic

进入 CKafka 控制台 [3],点击左上角【新建】,即可完成 CKafka 实例的创建,并创建 2 个 Topic,order-sourceproduct-info

流计算 Oceanus 作业

1. 上传依赖

在 Oceanus 控制台,点击左侧【依赖管理】,点击左上角【新建】新建依赖,上传 Logger Sink[4]  JAR 包。

2. 创建作业

在 Oceanus 控制台,点击左侧【作业管理】,点击左上角【新建】新建作业,作业类型选择 SQL 作业,点击【开发调试】进入作业编辑页面。

CREATE TABLE `order_source` (`id` INT,`user_id` INT,`product_id` INT,`create_time` TIMESTAMP(3)
) WITH ('connector' = 'kafka','topic' = 'order-source',  'scan.startup.mode' = 'latest-offset', -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets / timestamp 的任何一种'properties.bootstrap.servers' = 'x.x.x.x:9092',  -- 替换为您的 Kafka 连接地址'properties.group.id' = 'testGroup0', -- 必选参数, 一定要指定 Group ID'format' = 'json','json.fail-on-missing-field' = 'false',  -- 如果设置为 false, 则遇到缺失字段不会报错。'json.ignore-parse-errors' = 'true'    -- 如果设置为 true,则忽略任何解析报错。
);CREATE TABLE `product_info` (`product_id` INT,`product_name` STRING,`update_time` TIMESTAMP(3)
) WITH ('connector' = 'kafka','topic' = 'product-info',  -- 替换为您要消费的 Topic'scan.startup.mode' = 'latest-offset', -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets / timestamp 的任何一种'properties.bootstrap.servers' = 'x.x.x.x:9092',  -- 替换为您的 Kafka 连接地址'properties.group.id' = 'testGroup0', -- 必选参数, 一定要指定 Group ID'format' = 'json','json.fail-on-missing-field' = 'false',  -- 如果设置为 false, 则遇到缺失字段不会报错。'json.ignore-parse-errors' = 'true'    -- 如果设置为 true,则忽略任何解析报错。
);CREATE TABLE logger_sink_table (`id` INT PRIMARY KEY NOT ENFORCED,`user_id` INT,`product_id` INT,`product_name` STRING,`create_time` TIMESTAMP(3)
) WITH ('connector' = 'logger','print-identifier' = 'DebugData'
);INSERT INTO logger_sink_table
SELECT order_source.id, order_source.user_id, order_source.product_id, product_info.product_name, order_source.create_time
FROM order_source left join product_info
on order_source.product_id = product_info.product_id;

3. 运行作业

点击【发布草稿】后启动作业,可通过【日志】面板 TaskManager 或 Flink UI 查看运行信息。

4. 模拟数据

通过 Kafka Client 发送数据到关联的左表 order-source 和右表 product-info。  发送消息命令:

[root@VM-3-centos ~]# cd /opt/kafka_2.11-2.4.1/bin/
[root@VM-3-centos bin]# bash kafka-console-producer.sh --broker-list 10.0.0.29:9092 --topic order-source

Topic order-source 模拟数据示例:

{"id":1,"user_id":10,"product_id":1001,"create_time":"2022-03-17 16:47:00"}
{"id":2,"user_id":10,"product_id":1001,"create_time":"2022-03-17 16:48:00"}
{"id":3,"user_id":10,"product_id":1002,"create_time":"2022-03-17 16:49:00"}

发送消息命令:

[root@VM-3-centos ~]# cd /opt/kafka_2.11-2.4.1/bin/
[root@VM-3-centos bin]# bash kafka-console-producer.sh --broker-list 10.0.0.29:9092 --topic product-info

Topic product-info 模拟数据示例:

{"id":1,"user_id":10,"product_id":1001,"create_time":"2022-03-17 16:47:00"}
{"id":2,"user_id":10,"product_id":1001,"create_time":"2022-03-17 16:48:00"}
{"id":3,"user_id":10,"product_id":1002,"create_time":"2022-03-17 16:49:00"}

更多接入方式请参考 CKafka 收发消息 [5]

5. 查看运行结果

在【日志】面板的 TaskManager 中查看收到的数据,可以看到已经关联到了 product_id 为1001的商品名称。

总结

Regular Joins 比较适合批量加载数据的场景,而当关联的右表为时常更新的维表时会出现关联不到的情况。此外,从上述运行结果可以看出:Regular Joins关联的记录为 Retract Stream(回撤流)下游需为 Upsert 类型 Sink。

更多 SQL Join 详情请参考开源 Flink官方文章 SQL Join 章节[5]。  更多 Flink 实践教程详见 流计算 Oceanus 教程[6]

参考链接

[1] Oceanus 控制台:https://console.cloud.tencent.com/oceanus/overview

[2] 创建独享集群:https://cloud.tencent.com/document/product/849/48298

[3] Kafka 控制台:https://console.cloud.tencent.com/ckafka

[4] Logger Sink 下载地址:https://cloud.tencent.com/document/product/849/58713

[5] Flink SQL Join:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/sql/queries/joins

[6] 流计算 Oceanus 教程:https://cloud.tencent.com/developer/tag/10509

Flink 实践教程-进阶(11):SQL 关联:Regular Join相关推荐

  1. webgis从基础到开发实践_开源WebGIS教程系列——11.1 GISLite 的开发背景与设计

    地理信息门户可以帮助人们更容易地发现.访问和使用地理空间信息, 是地理信息发布.服务和共享的重要环节.许多国家都很重视地理信息门户的 建设,把它作为国家空间数据基础设施(spatial data in ...

  2. Flink 1.11 SQL 十余项革新大揭秘,哪些演变在便捷你的使用体验?

    简介: SQL 作为 Flink 中公认的核心模块之一,对推动 Flink 流批一体功能的完善至关重要.在 1.11 中,Flink SQL 也进行了大量的增强与完善,开发大功能 10 余项,不仅扩大 ...

  3. Java课设对对碰_第11章对对碰游戏(图形版)(Java游戏编程原理与实践教程课件).ppt...

    游戏界面和相关图片素材 11.3 程序设计的步骤 11.3.1 设计游戏窗口类(GameRoom.java) 游戏窗口类GameRoom实现游戏全部功能,继承JFrame组件实现的.是由上方Panel ...

  4. 操作系统形式化验证实践教程(11) - 结构化证明语言Isar(转载)

    操作系统形式化验证实践教程(11) - 结构化证明语言Isar 结构化证明语言Isar基本语法 apply方法和by方法虽然可以完成功能,但是看起来更像是命令式语言.使用Isar语言,还可以写得更加形 ...

  5. 操作系统形式化验证实践教程(11) - 结构化证明语言Isar

    操作系统形式化验证实践教程(11) - 结构化证明语言Isar 结构化证明语言Isar基本语法 apply方法和by方法虽然可以完成功能,但是看起来更像是命令式语言.使用Isar语言,还可以写得更加形 ...

  6. FPE修改教程进阶(地址编辑部分)

    FPE修改教程进阶(地址编辑部分)     需要具备的几个初步知识 1.十六进制    十六进制是进制中的一种,是我们在进行编辑的时候将要碰到的最多的问题,你接触修改,就不可避免的会遇到进制上的换算, ...

  7. sql计算留存_免费教程《图解SQL面试题》

    运营.产品经理.数据分析.软件开发等职位,SQL是必会的技能. 面试经常考察SQL,但是不会做,怎么办? 工作里遇到的业务问题,需要用SQL实现,怎么办? 为了帮助你解决这些问题,我写了一本免费教程& ...

  8. ​网页图表Highcharts实践教程标之添加题副标题版权信息

    ​网页图表Highcharts实践教程标之添加题副标题版权信息 Highcharts辅助元素 辅助元素图表的非必要元素,如标题.版权信息.标签.载入动态.它们不和图表数据发生关联,只是额外说明一些基本 ...

  9. NGUI全面实践教程(大学霸内部资料)

    NGUI全面实践教程(大学霸内部资料) 试读文档下载地址:链接:http://pan.baidu.com/s/1jGosC9g 密码:8jq5 介绍:NGUI全面实践教程(大学霸内部资料)本书是国内N ...

  10. Flink的Table API 与SQL介绍及调用

    1 概述    DataSetAPI和DateStreamAPI是基于整个Flink的运行时环境做操作处理的,Table API和SQL是在DateStreamAPI上又包了一层.对于新版本的Blin ...

最新文章

  1. 4.Azure创建点到站点的***隧道(下)
  2. 下载服务 php,文件下载: 云---php服务---pc
  3. mysql(2)—— 由笛卡尔积现象分析数据库表的连接
  4. 撑起百万亿参数模型想象力!英伟达发布新一代SuperPOD超算,AI算力新巅峰!
  5. 软件设计模式之单例模式
  6. Linux 进入 5.0 时代!
  7. 20190906:(leetcode习题)Shuffle an Array
  8. python下载快手视频教程_[小玩意] 用Python写了个下载快手视频的小脚本
  9. C#中手动引用COM组建的例子
  10. 人工智能、机器学习、神经网络、深度学习之间的关系
  11. Android 补间动画之平移动画TranslateAnimation
  12. 企业征信查询工具--企信宝
  13. html5+JS制作音乐播放器
  14. python3入门与进阶笔记_Python3入门与进阶【笔记】
  15. HDU-5238 Calculator(线段树+中国剩余定理)
  16. PCDATA和CDATA的区别究竟是什么呢?
  17. 高光谱图像分析:分类 II
  18. SQL Sever数据库存储过程
  19. 封装 用户名.计算机,Windows系统自定义封装ISO镜像
  20. iPhone 连接电脑后一直断开重连,删除.DS_Store文件

热门文章

  1. 百度翻译API的调用
  2. Deep Ordinal Regression Network for Monocular Depth Estimation
  3. 【渝粤题库】陕西师范大学209020 史记研究 作业(专升本)
  4. SQL 升序、降序排列
  5. 设置计算机网络文件共享,局域网共享文件设置方法图文教程
  6. [UVALive 4490] Help Bubu
  7. 文章-编程需要知道多少数学知识?
  8. 基于X86汇编语言的简易打字游戏实现
  9. 愤怒的牛(重回基础二分)
  10. Java 数组的三种创建方法