Flink CDC mongoDB 使用及Flink sql解析monggo中复杂嵌套JSON数据实现
需要实时采集MongoDB中的数据,所以考虑使用flink cdc mongodb,在flink cdc2.1版本后也支持了MongoDB的数据采集,是通过oplog.
MongoDB中的存储数据的文档结构(JSON):
{"_id": {"$numberLong": "375756968729522176"}, "tenantId": "1000000001", "grade": 1, "name": "VIP1", "growth": "0", "enable": true, "createdAt": {"$date": 1623849842573}, "createdBy": "system", "lastUpdatedAt": {"$date": 1656424909977}, "_class": "com.xiaoshouyi.member.service.document.GradeSetting", "updatedBy": "707", "gradeRightList": [{"gradeRightType": "POSTAGE_DISCOUNT", "postageRight": {"consumeAmount": "10", "postageType": 1, "discountPostageAmt": "10"}}, {"gradeRightType": "MULT_CREDIT", "creditMultRight": {"isMult": true, "multiple": "12", "productScopeList": [{"scopeType": "0", "isIncluding": true, "elementList": [{"name": "Fancl防晒霜", "code": "C-3002"}, {"name": "沈欣牌特大龙虾500g(±3g)", "code": "S-05030001-500"}, {"name": "测试PLU拦截01", "code": "01010005"}, {"name": "测试PLU拦截的", "code": "01010004"}, {"name": "文档测试2", "code": "01010003"}, {"name": "路飞的帽子", "code": "06010001"}, {"name": "沪溪河", "code": "02010001"}, {"name": "鲍师傅", "code": "01010002"}]}]}}], "growthTo": "50"}
这个文档数据结构中包含一个比较复杂的嵌套JSON数据:
所以本文最主要就是介绍如何解析复杂monggo中复杂嵌套JSON数据
flink SQL
create table members(
_id bigint,
_class string,
createdAt date,
createdBy string,
enable int,
grade int,
gradeRightList ARRAY<ROW<gradeRightType STRING, creditMultRight ROW<isMult BOOLEAN,multiple STRING,productScopeList ARRAY< ROW<scopeType string,isIncluding BOOLEAN ,elementList ARRAY<ROW<name STRING ,code STRING>>>>> ,postageRight ROW<consumeAmount STRING,postageType int,discountPostageAmt STRING>>>,
growth String,
lastUpdatedAt date,
name string,
tenantId string,
updateBy string,
PRIMARY KEY(_id) NOT ENFORCED
)with(
'connector' = 'mongodb-cdc',
'hosts' = '10.150.20.12:27017',
'username' = 'readonly',
'password' = 'y5Gi2BjbK3',
'connection.options'='replicaSet=retailrs&connectTimeoutMS=3000',
'database' = 'member_db',
'collection' = 'm_grade_setting'
)
gradeRightList 字段就是嵌套JSON复杂的字段,如果有一样的复杂JSON嵌套,可以参考对应的解析,应该可以包括所有了。
gradeRightList ARRAY<ROW<gradeRightType STRING, creditMultRight ROW<isMult BOOLEAN,multiple STRING,productScopeList ARRAY< ROW<scopeType string,isIncluding BOOLEAN ,elementList ARRAY<ROW<name STRING ,code STRING>>>>> ,postageRight ROW<consumeAmount STRING,postageType int,discountPostageAmt STRING>>>,
对应的查询解析:
tableEnv.executeSql(mongoDBKafaSql)tableEnv.sqlQuery("select _id,gradeRightList[2].creditMultRight,gradeRightList[1].postageRight from members").execute().print()
当然也可以参考:Flink sql 对 array map ROW的使用和解析
完整的SQL 代码:
object FlinkMongoConnect {def main(args: Array[String]): Unit = {var env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentvar tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)var mongoDBKafaSql:String ="""|create table members(|_id bigint,|_class string,|createdAt date,|createdBy string,|enable int,|grade int,|gradeRightList ARRAY<ROW<gradeRightType STRING, creditMultRight ROW<isMult BOOLEAN,multiple STRING,productScopeList ARRAY< ROW<scopeType string,isIncluding BOOLEAN ,elementList ARRAY<ROW<name STRING ,code STRING>>>>> ,postageRight ROW<consumeAmount STRING,postageType int,discountPostageAmt STRING>>>,|growth String,|lastUpdatedAt date,|name string,|tenantId string,|updateBy string,|PRIMARY KEY(_id) NOT ENFORCED|)with(|'connector' = 'mongodb-cdc',|'hosts' = '10.150.20.12:27017',|'username' = 'readonly',|'password' = 'y5Gi2BjbK3',|'connection.options'='replicaSet=retailrs&connectTimeoutMS=3000',|'database' = 'member_db',|'collection' = 'm_grade_setting'|)""".stripMargin/*** sql方式 375756968729522176*/tableEnv.executeSql(mongoDBKafaSql)tableEnv.sqlQuery("select _id,gradeRightList[2].creditMultRight,gradeRightList[1].postageRight from members").execute().print()env.execute()}
}
Flink 代码方式:
这种方式不多做介绍,代码方式处理JSON太容易了...
object FlinkMongoConnect {def main(args: Array[String]): Unit = {var env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentvar tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)var value: SourceFunction[String] = MongoDBSource.builder().hosts("10.150.20.12:27017").username("readonly").password("y5Gi2BjbK3").databaseList("member_db").collectionList("member_db.m_grade_setting").copyExisting(true).deserializer(new JsonDebeziumDeserializationSchema()).build()env.addSource(value).print().setParallelism(1)env.execute()}
}
参考官方文章:
1.CDC
本文主要就是介绍了MongoDB CDC的使用,后续会介绍flink SQL方式写入MongoDB,官方还没有很好地支持这一点,也是需要修改一些代码实现这一功能。更多原理精彩文章可以关注《迪答》公众号
Flink CDC mongoDB 使用及Flink sql解析monggo中复杂嵌套JSON数据实现相关推荐
- java json 嵌套解析_我们如何解析Java中的嵌套JSON对象?
该JSON是一种轻量级的,基于文本和语言无关的数据交换格式.JSON可以表示两种结构化类型,如对象和数组.甲JSONArray可以从一个字符串解析文本以产生向量样的对象.我们可以使用JSONArray ...
- Flink CDC 系列(3)—— Flink CDC MySQL Connector 与 Flink SQL 的结合使用案例Demo
Flink CDC 系列文章: <Flink CDC 系列(1)-- 什么是 Flink CDC> <Flink CDC 系列(2)-- Flink CDC 源码编译> < ...
- SQL Server 2008中新增的变更数据捕获(CDC)和更改跟踪
SQL Server 2008中新增的变更数据捕获(CDC)和更改跟踪 SQL Server 2008中SQL应用系列--目录索引 本文主要介绍SQL Server中记录数据变更的四个方法:触发器.O ...
- 在SQL Server 2017中使用Python进行数据插值和转换
As a continuation to my previous article, How to use Python in SQL Server 2017 to obtain advanced da ...
- Beanshell解析json-解析简单复杂json数据
Beanshell解析json-解析简单&复杂json数据 1.概述 如果在beanshell中解析json数据,那么这篇文章你get到了重点.不仅给出了解决方案,同时还理清了解决的思路. 2 ...
- Android解析中国天气网的Json数据
在Android开发中.一般的APP都是通过获取server端的数据来更新UI.从server获取到的数据能够是Json.它的数据量要比XML要小,这里解析中国天气网上获取的数据,尽管已经不再更新了. ...
- Flink CDC MongoDB Connector 的实现原理和使用实践
本文整理自 XTransfer 资深 Java 开发工程师.Flink CDC Maintainer 孙家宝在 Flink CDC Meetup 的演讲.主要内容包括: MongoDB Change ...
- Flink CDC 2.0 数据处理流程全面解析
点击上方蓝色字体,选择"设为星标" 回复"面试"获取更多惊喜 8月份 FlinkCDC 发布2.0.0版本,相较于1.0版本,在全量读取阶段支持分布式读取.支持 ...
- Flink CDC入门实践--基于 Flink CDC 构建 MySQL 和 Postgres 的 Streaming ETL
文章目录 前言 1.环境准备 2.准备数据 2.1 MySQL 2.2 postgres 3.启动flink和flink sql client 3.1启动flink 3.2启动flink SQL cl ...
最新文章
- linux 利用yum源安装mysql5.7
- 【原】Coursera—Andrew Ng机器学习—课程笔记 Lecture 16—Recommender Systems 推荐系统...
- 简单实用一分钟上手级权限控制
- P3301 [SDOI2013]方程
- DB2查询主键、索引、表约束
- std::list 循环删除指针_数据结构_006_线性表_循环链表
- 图邻接表拓扑排序算法c语言完整,在用邻接表表示图时,拓扑排序算法时间复杂度为()...
- java new 关键字到底做了什么?
- 51单片机——多文件的建立
- 如何查看Tomcat版本信息
- 李宏毅机器学习Regression
- 人工智能:卷积神经网络及YOLO算法 入门详解与综述(二)
- c语言json数据转换成字符串,C语言将字符串转json
- python模拟ios点击_使用Xcode + Python进行IOS运动轨迹模拟
- 阿里云商标注册购物车功能怎么用?在哪查看?
- 腾讯云对象储存-图片上传-删除图片
- oracle软件工程,.Net软件工程师学用Oracle系列(9):系统函数(上)
- 向Linux增加一个系统调用或内核模块
- Grandmaster 楼教主回忆录
- source insight如何设置背景
热门文章
- 反反爬须知:AES加密和解密
- 第十一届中国国际软件质量工程(iSQE)峰会在重庆顺利落幕
- 【观察】华为:金融数字化转型再提速,移动先行筑牢转型底座
- python制作购物网站开题报告_购物网站的设计与实现开题报告
- 开题报告html5游戏,开题报告基于html5的音乐网站
- js删除对象中的元素
- java 木琴_我的世界:Java19w09a快照发布,堪称演奏家的声音盛宴
- win10php测试,window_Win10对决Win8:测试表明两者相比没有性能优势,目前,要搞清楚Windows 10性能相 - phpStudy...
- 终于有阿里p8进行了大汇总(Redis+JVM+MySQL+Spring)还有面试题解全在这里了!
- 单词加ed 以及 es 的规则及发音