需要实时采集MongoDB中的数据,所以考虑使用flink cdc mongodb,在flink cdc2.1版本后也支持了MongoDB的数据采集,是通过oplog.

MongoDB中的存储数据的文档结构(JSON):

{"_id": {"$numberLong": "375756968729522176"}, "tenantId": "1000000001", "grade": 1, "name": "VIP1", "growth": "0", "enable": true, "createdAt": {"$date": 1623849842573}, "createdBy": "system", "lastUpdatedAt": {"$date": 1656424909977}, "_class": "com.xiaoshouyi.member.service.document.GradeSetting", "updatedBy": "707", "gradeRightList": [{"gradeRightType": "POSTAGE_DISCOUNT", "postageRight": {"consumeAmount": "10", "postageType": 1, "discountPostageAmt": "10"}}, {"gradeRightType": "MULT_CREDIT", "creditMultRight": {"isMult": true, "multiple": "12", "productScopeList": [{"scopeType": "0", "isIncluding": true, "elementList": [{"name": "Fancl防晒霜", "code": "C-3002"}, {"name": "沈欣牌特大龙虾500g(±3g)", "code": "S-05030001-500"}, {"name": "测试PLU拦截01", "code": "01010005"}, {"name": "测试PLU拦截的", "code": "01010004"}, {"name": "文档测试2", "code": "01010003"}, {"name": "路飞的帽子", "code": "06010001"}, {"name": "沪溪河", "code": "02010001"}, {"name": "鲍师傅", "code": "01010002"}]}]}}], "growthTo": "50"}

这个文档数据结构中包含一个比较复杂的嵌套JSON数据:

所以本文最主要就是介绍如何解析复杂monggo中复杂嵌套JSON数据

flink SQL

create table members(
_id bigint,
_class string,
createdAt date,
createdBy string,
enable int,
grade int,
gradeRightList ARRAY<ROW<gradeRightType STRING, creditMultRight ROW<isMult BOOLEAN,multiple STRING,productScopeList ARRAY< ROW<scopeType string,isIncluding BOOLEAN ,elementList ARRAY<ROW<name STRING ,code STRING>>>>> ,postageRight ROW<consumeAmount STRING,postageType int,discountPostageAmt STRING>>>,
growth String,
lastUpdatedAt date,
name string,
tenantId string,
updateBy string,
PRIMARY KEY(_id) NOT ENFORCED
)with(
'connector' = 'mongodb-cdc',
'hosts' = '10.150.20.12:27017',
'username' = 'readonly',
'password' = 'y5Gi2BjbK3',
'connection.options'='replicaSet=retailrs&connectTimeoutMS=3000',
'database' = 'member_db',
'collection' = 'm_grade_setting'
)

gradeRightList 字段就是嵌套JSON复杂的字段,如果有一样的复杂JSON嵌套,可以参考对应的解析,应该可以包括所有了。

gradeRightList ARRAY<ROW<gradeRightType STRING, creditMultRight ROW<isMult BOOLEAN,multiple STRING,productScopeList ARRAY< ROW<scopeType string,isIncluding BOOLEAN ,elementList ARRAY<ROW<name STRING ,code STRING>>>>> ,postageRight ROW<consumeAmount STRING,postageType int,discountPostageAmt STRING>>>,

对应的查询解析:

 tableEnv.executeSql(mongoDBKafaSql)tableEnv.sqlQuery("select _id,gradeRightList[2].creditMultRight,gradeRightList[1].postageRight from members").execute().print()

当然也可以参考:Flink sql 对 array map ROW的使用和解析

完整的SQL 代码:

object FlinkMongoConnect {def main(args: Array[String]): Unit = {var env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentvar tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)var mongoDBKafaSql:String ="""|create table members(|_id bigint,|_class string,|createdAt date,|createdBy string,|enable int,|grade int,|gradeRightList ARRAY<ROW<gradeRightType STRING, creditMultRight ROW<isMult BOOLEAN,multiple STRING,productScopeList ARRAY< ROW<scopeType string,isIncluding BOOLEAN ,elementList ARRAY<ROW<name STRING ,code STRING>>>>> ,postageRight ROW<consumeAmount STRING,postageType int,discountPostageAmt STRING>>>,|growth String,|lastUpdatedAt date,|name string,|tenantId string,|updateBy string,|PRIMARY KEY(_id) NOT ENFORCED|)with(|'connector' = 'mongodb-cdc',|'hosts' = '10.150.20.12:27017',|'username' = 'readonly',|'password' = 'y5Gi2BjbK3',|'connection.options'='replicaSet=retailrs&connectTimeoutMS=3000',|'database' = 'member_db',|'collection' = 'm_grade_setting'|)""".stripMargin/*** sql方式 375756968729522176*/tableEnv.executeSql(mongoDBKafaSql)tableEnv.sqlQuery("select _id,gradeRightList[2].creditMultRight,gradeRightList[1].postageRight from members").execute().print()env.execute()}
}

Flink 代码方式:

这种方式不多做介绍,代码方式处理JSON太容易了...

object FlinkMongoConnect {def main(args: Array[String]): Unit = {var env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentvar tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)var value: SourceFunction[String] = MongoDBSource.builder().hosts("10.150.20.12:27017").username("readonly").password("y5Gi2BjbK3").databaseList("member_db").collectionList("member_db.m_grade_setting").copyExisting(true).deserializer(new JsonDebeziumDeserializationSchema()).build()env.addSource(value).print().setParallelism(1)env.execute()}
}

参考官方文章:

1.CDC

本文主要就是介绍了MongoDB CDC的使用,后续会介绍flink SQL方式写入MongoDB,官方还没有很好地支持这一点,也是需要修改一些代码实现这一功能。更多原理精彩文章可以关注《迪答》公众号

Flink CDC mongoDB 使用及Flink sql解析monggo中复杂嵌套JSON数据实现相关推荐

  1. java json 嵌套解析_我们如何解析Java中的嵌套JSON对象?

    该JSON是一种轻量级的,基于文本和语言无关的数据交换格式.JSON可以表示两种结构化类型,如对象和数组.甲JSONArray可以从一个字符串解析文本以产生向量样的对象.我们可以使用JSONArray ...

  2. Flink CDC 系列(3)—— Flink CDC MySQL Connector 与 Flink SQL 的结合使用案例Demo

    Flink CDC 系列文章: <Flink CDC 系列(1)-- 什么是 Flink CDC> <Flink CDC 系列(2)-- Flink CDC 源码编译> < ...

  3. SQL Server 2008中新增的变更数据捕获(CDC)和更改跟踪

    SQL Server 2008中新增的变更数据捕获(CDC)和更改跟踪 SQL Server 2008中SQL应用系列--目录索引 本文主要介绍SQL Server中记录数据变更的四个方法:触发器.O ...

  4. 在SQL Server 2017中使用Python进行数据插值和转换

    As a continuation to my previous article, How to use Python in SQL Server 2017 to obtain advanced da ...

  5. Beanshell解析json-解析简单复杂json数据

    Beanshell解析json-解析简单&复杂json数据 1.概述 如果在beanshell中解析json数据,那么这篇文章你get到了重点.不仅给出了解决方案,同时还理清了解决的思路. 2 ...

  6. Android解析中国天气网的Json数据

    在Android开发中.一般的APP都是通过获取server端的数据来更新UI.从server获取到的数据能够是Json.它的数据量要比XML要小,这里解析中国天气网上获取的数据,尽管已经不再更新了. ...

  7. Flink CDC MongoDB Connector 的实现原理和使用实践

    本文整理自 XTransfer 资深 Java 开发工程师.Flink CDC Maintainer 孙家宝在 Flink CDC Meetup 的演讲.主要内容包括: MongoDB Change ...

  8. Flink CDC 2.0 数据处理流程全面解析

    点击上方蓝色字体,选择"设为星标" 回复"面试"获取更多惊喜 8月份 FlinkCDC 发布2.0.0版本,相较于1.0版本,在全量读取阶段支持分布式读取.支持 ...

  9. Flink CDC入门实践--基于 Flink CDC 构建 MySQL 和 Postgres 的 Streaming ETL

    文章目录 前言 1.环境准备 2.准备数据 2.1 MySQL 2.2 postgres 3.启动flink和flink sql client 3.1启动flink 3.2启动flink SQL cl ...

最新文章

  1. linux 利用yum源安装mysql5.7
  2. 【原】Coursera—Andrew Ng机器学习—课程笔记 Lecture 16—Recommender Systems 推荐系统...
  3. 简单实用一分钟上手级权限控制
  4. P3301 [SDOI2013]方程
  5. DB2查询主键、索引、表约束
  6. std::list 循环删除指针_数据结构_006_线性表_循环链表
  7. 图邻接表拓扑排序算法c语言完整,在用邻接表表示图时,拓扑排序算法时间复杂度为()...
  8. java new 关键字到底做了什么?
  9. 51单片机——多文件的建立
  10. 如何查看Tomcat版本信息
  11. 李宏毅机器学习Regression
  12. 人工智能:卷积神经网络及YOLO算法 入门详解与综述(二)
  13. c语言json数据转换成字符串,C语言将字符串转json
  14. python模拟ios点击_使用Xcode + Python进行IOS运动轨迹模拟
  15. 阿里云商标注册购物车功能怎么用?在哪查看?
  16. 腾讯云对象储存-图片上传-删除图片
  17. oracle软件工程,.Net软件工程师学用Oracle系列(9):系统函数(上)
  18. 向Linux增加一个系统调用或内核模块
  19. Grandmaster 楼教主回忆录
  20. source insight如何设置背景

热门文章

  1. 反反爬须知:AES加密和解密
  2. 第十一届中国国际软件质量工程(iSQE)峰会在重庆顺利落幕
  3. 【观察】华为:金融数字化转型再提速,移动先行筑牢转型底座
  4. python制作购物网站开题报告_购物网站的设计与实现开题报告
  5. 开题报告html5游戏,开题报告基于html5的音乐网站
  6. js删除对象中的元素
  7. java 木琴_我的世界:Java19w09a快照发布,堪称演奏家的声音盛宴
  8. win10php测试,window_Win10对决Win8:测试表明两者相比没有性能优势,目前,要搞清楚Windows 10性能相 - phpStudy...
  9. 终于有阿里p8进行了大汇总(Redis+JVM+MySQL+Spring)还有面试题解全在这里了!
  10. 单词加ed 以及 es 的规则及发音