1 将Map展开为多列多行。

CREATE TABLE source_table (userId INT,eventTime as '2021-10-01 08:08:08',eventType as 'click',productId INT,-- 数组(Array)类型productImages as ARRAY['image1','image2'],-- 对象(Map)类型pageInfo as MAP['pageId','1','pageName','yyds']
) WITH ('connector' = 'datagen','number-of-rows' = '2','fields.userId.kind' = 'random','fields.userId.min' = '2','fields.userId.max' = '2','fields.productId.kind' = 'sequence','fields.productId.start' = '1','fields.productId.end' = '2'
);

将Map展开为多列多行。

基于UNNEST

SELECT userId,eventTime,eventType,productId,mapKey,mapValue
FROM source_table, UNNEST(pageInfo) as t(mapKey,mapValue);

基于UDTF

package com.bigdata.flink.sql.udtf;import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;import java.util.Map;@FunctionHint(input = @DataTypeHint("MAP<STRING, STRING>"),output = @DataTypeHint("ROW<mapKey STRING, mapValue STRING>"))
public class ExpandMapMultColumnMultRowUDTF extends TableFunction {public void eval(Map<String, String> pageInfo) {for (Map.Entry<String, String> entry : pageInfo.entrySet()) {// 原来的一行,每个Key都输出一行collect(Row.of(entry.getKey(), entry.getValue()));}}
}// SQL使用
SELECT userId,eventTime,eventType,productId,mapKey,mapValue FROM source_table, LATERAL TABLE (ExpandMapMultColumnMultRowUDTF(`pageInfo`)) AS t(mapKey,mapValue)

结果示例

userId      eventTime                eventType      productId        mapKey            mapValue
2       2021-10-01 08:08:08       click           1              pageId            1
2       2021-10-01 08:08:08       click           1              pageName         yyds
2       2021-10-01 08:08:08       click           2              pageId            1
2       2021-10-01 08:08:08       click           2              pageName         yyds

2 将数组展开为单列多行。

基于UNNEST

SELECT userId,eventTime,eventType,productId,productImage
FROM source_table, UNNEST(productImages) as t(productImage);

基于UDTF

package com.bigdata.flink.sql.udtf;import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;@FunctionHint(input = @DataTypeHint("ARRAY<String>"),output = @DataTypeHint("String"))
public class ExpandArrayOneColumnMultRowUDTF extends TableFunction {public void eval(String... productImages) {for (String productImage : productImages) {collect(productImage);}}
}// SQL
SELECT userId, eventTime, eventType, productId, productImage FROM source_table, LATERAL TABLE (ExpandArrayOneColumnMultRowUDTF(`productImages`)) AS t(productImage);

结果示例

userId      eventTime            eventType      productId      productImage
2       2021-10-01 08:08:08       click           1              image1
2       2021-10-01 08:08:08       click           1              image2
2       2021-10-01 08:08:08       click           2              image1
2       2021-10-01 08:08:08       click           2              image2

3 将Array<ROW<c1 type,c2 type>>展开成多列多行

在 SQL 任务里面经常会遇到一列转多行的需求,今天就来总结一下在 Flink SQL 里面如何实现列转行的,先来看下面的一个具体案例.需求
原始数据格式如下:name   data
JasonLee    [{"content_type":"flink","url":"111"},{"content_type":"spark","url":"222"},{"content_type":"hadoop","url":"333"}]
data 格式化
{"name": "JasonLee","data": [{"content_type": "flink","url": "111"}, {"content_type": "spark","url": "222"},{"content_type": "hadoop","url": "333"}]
}
现在希望得到的数据格式是这样的:name    content_type    url
JasonLee    flink   111
JasonLee    spark   222
JasonLee    hadoop  333
这是一个典型的列转行或者一行转多行的场景,需要将 data 列进行拆分成为多行多列,下面介绍两种实现方式.使用 Flink 自带的 unnest 函数解析
使用自定义 UDTF 函数解析
建表 DDL
CREATE TABLE kafka_table (
name string,
`data` ARRAY<ROW<content_type STRING,url STRING>>
)
WITH ('connector' = 'kafka', -- 使用 kafka connector'topic' = 'test','properties.bootstrap.servers' = 'master:9092,storm1:9092,storm2:9092',  -- broker连接信息'properties.group.id' = 'jason_flink_test', -- 消费kafka的group_id'scan.startup.mode' = 'latest-offset',  -- 读取数据的位置'format' = 'json',  -- 数据源格式为 json'json.fail-on-missing-field' = 'false', -- 字段丢失任务不失败'json.ignore-parse-errors' = 'true'  -- 解析失败跳过
)
这里在定义 data 字段类型的时候需要定义为 ARRAY 类型,因为 unnest 函数需要一个数组类型的参数.
--------------------------------------------------
1 unnest解析
select name,content_type,url
from kafka_table CROSS JOIN UNNEST(`data`) AS t (content_type,url)
select name,content_type,url
from kafka_table, UNNEST(`data`) AS t (content_type,url)
select name,content_type,url
from kafka_table left join UNNEST(`data`) AS t (content_type,url) on true2 自定义 UDTF 解析
自定义表值函数(UDTF),自定义表值函数,将 0 个、1 个或多个标量值作为输入参数(可以是变长参数)。与自定义的标量函数类似,但与标量函数不同。表值函数可以返回任意数量的行作为输出,而不仅是 1 个值。返回的行可以由 1 个或多个列组成。调用一次函数输出多行或多列数据。必须继承 TableFunction 基类,并实现一个或者多个名为 eval 的方法, 在使用 UDTF 时,需要带上 LATERAL TABLE两个关键字.@FunctionHint(output = @DataTypeHint("ROW<content_type STRING,url STRING>"))
public class ParserJsonArrayTest extends TableFunction<Row> {private static final Logger log = Logger.getLogger(ParserJsonArrayTest.class);public void eval(String value) {try {JSONArray snapshots = JSONArray.parseArray(value);Iterator<Object> iterator = snapshots.iterator();while (iterator.hasNext()) {JSONObject jsonObject = (JSONObject) iterator.next();String content_type = jsonObject.getString("content_type");String url = jsonObject.getString("url");collect(Row.of(content_type,url));}} catch (Exception e) {log.error("parser json failed :" + e.getMessage());}}
}
自定义 UDTF 解析的时候,就不需要把 data 字段定义成 ARRAY 类型了,直接定义成 STRING 类型就可以了,并且这种方式会更加的灵活,比如还需要过滤数据或者更复杂的一些操作时都可以在 UDTF 里面完成.Flink SQL 使用 UDTF
select name,content_type,url
from kafka_table CROSS JOIN lateral TABLE (ParserJsonArrayTest(`data`)) AS t (content_type,url)
select name,content_type,url
from kafka_table, lateral TABLE (ParserJsonArrayTest(`data`)) AS t (content_type,url)
select name,content_type,url
from kafka_table left join lateral TABLE (ParserJsonArrayTest(`data`)) AS t (content_type,url) on true
注意:
unnest 和 自定义 UDTF 函数在使用的时候都有 3 种写法,前面两种写法的效果其实是一样的,第三种写法相当于 left join 的用法.区别在于 CROSS JOIN/INNER JOIN: 对于左侧表的每一行,右侧 UDTF 不输出,则这一行不输出.LEFT JOIN: 对于左侧表的每一行,右侧 UDTF 不输出,则这一行会输出,右侧 UDTF 字段为 null打印的结果
2> JasonLee,flink,111
2> JasonLee,spark,222
2> JasonLee,hadoop,333
总结
在实际使用的时候如果 unnest 可以满足需求就直接用 unnest 不需要带来额外的开发,如果 unnest 函数满足不了需求,那么就自定义 UDTF 去完成.

end

FlinkSQL 列转行/解开map array/unnest/lateral table udtf相关推荐

  1. 数据库:SQLServer 实现行转列、列转行用法笔记

    在许多的互联网项目当中,报表开发是整个项目当中很重要的一个功能模块.其中会有一些比较复杂的报表统计需要行转列或者列转行的需求.今天给大家简单介绍一下在SQLServer当中如何使用PIVOT.UNPI ...

  2. Mysql中行转列和列转行

    一.行转列 即将原本同一列下多行的不同内容作为多个字段,输出对应内容. 建表语句 DROP TABLE IF EXISTS tb_score;   CREATE TABLE tb_score(     ...

  3. mysql 列换行 表设计 设计_MySql行转列、列转行

    现mysql中有一张表php_user表,表结构为: 表中数据有: 现在想查询出来不同学生的语数外成绩在一行显示,那么需要用到行转列的用法, 一.行转列 1.使用case...when....then ...

  4. SQL 行转列、列转行

    行转列,列转行是我们在开发过程中经常碰到的问题. 1.行转列一般通过CASE WHEN 语句来实现 2.也可以通过 SQL SERVER 2005 新增的运算符PIVOT来实现. 用传统的方法,比较好 ...

  5. mysql行转列和列转行_Mysql中行转列和列转行

    一.行转列 即将原本同一列下多行的不同内容作为多个字段,输出对应内容. 建表语句 DROP TABLE IF EXISTS tb_score; CREATE TABLE tb_score( id IN ...

  6. mysql 行转列 列转行

    一.行转列 即将原本同一列下多行的不同内容作为多个字段,输出对应内容. 建表语句 DROP TABLE IF EXISTS tb_score;CREATE TABLE tb_score(id INT( ...

  7. mysql 列转行union all_mysql的 行转列(7种方法) 和 列转行

    # 一.行转列 # 即将原本同一列下多行的不同内容作为多个字段,输出对应内容. 建表语句 DROP TABLE IF EXISTS tb_score; CREATE TABLE tb_score( i ...

  8. mysql php 列转行_MySql行转列、列转行

    现mysql中有一张表php_user表,表结构为: 表中数据有: 现在想查询出来不同学生的语数外成绩在一行显示,那么需要用到行转列的用法, 一.行转列 1.使用case...when....then ...

  9. sql的行转列(PIVOT)与列转行(UNPIVOT)

    在做数据统计的时候,行转列,列转行是经常碰到的问题.case when方式太麻烦了,而且可扩展性不强,可以使用 PIVOT,UNPIVOT比较快速实现行转列,列转行,而且可扩展性强 一.行转列 1.测 ...

  10. 【精】mysql行转列的7种方法及列转行

    文章目录 一.行转列 1.使用case-when-then 进行行转列 2.使用IF() 进行行转列: 3.利用SUM(IF()) 生成列 + WITH ROLLUP 生成汇总行,并利用 IFNULL ...

最新文章

  1. 控制输入框只能输入数字
  2. [转] Nodejs 进阶:Express 常用中间件 body-parser 实现解析
  3. 【C++】函数缺省参数的作用
  4. Google搜索技巧总结
  5. 全球云端数据仓库领导者 MaxCompute 将于本月10日正式开服美东节点
  6. CCPC2018(秦皇岛站)赛后反思
  7. C++对象内存布局测试总结
  8. include 头文件循环引用问题
  9. Java 基础 之 变量
  10. 大数据量JSONObject.fromObject性能问题(大数据传给前台)
  11. Oracle数据库练习题及答案(个人总结)
  12. Adding Animations之Zooming a View
  13. MyBatis源码简单分析
  14. 论文理解【IL - 数据增广】 —— Adversarial Imitation Learning with Trajectorial Augmentation and Correction
  15. 微信公众号开发之生成并扫描带参数的二维码(无需改动)
  16. Python输出指定位数的浮点数
  17. 049_jQuery 操作标签
  18. [python爬虫] 正则表达式使用技巧及爬取个人博客实例
  19. 【数电实验】移位寄存器与计数器
  20. python3 从入门到精通视频教程下载-Python 3.7从入门到精通(视频教学版)

热门文章

  1. android 小米读写权限,Android 小米手机的权限问题
  2. 插头插座新旧标准对比和安规测试设备
  3. 简述redis集群的实现原理
  4. 联想服务器如何恢复预装系统,Thinkcentre E73 E63z等预装Win7系统如何恢复出厂系统...
  5. 白话区块链技术-区块链工程师大讲堂
  6. SAP FICO面试题
  7. 单项选择题标准化考试系统设计
  8. Hightopo 2D 入门
  9. C++-实现matlab的fftshift(OpenCV)
  10. Contexts使用以及详细配置