Flink链接kafka并解析Json文件(三)
Flink解析Kafka中的Json数据
公司的JSON数据格式:
{
"data":[
{
"user_id":"",
"role":"teacher",
"stage":"after",
"fullname":"徐朝晖",
"id_card":null,
"sno":"",
"gender":"male",
"phone":"",
"department_code":"",
"department_name":"",
"guarder_name":null,
"guarder_phone":null,
"section_code":null,
"section_name":null,
"grade_name":null,
"grade_code":null,
"class_code":null,
"class_name":null,
"class_teacher_name":null,
"class_teacher_jobno":"",
"class_teacher_phone":null,
"register_province_code":"",
"register_city_code":"",
"register_county_code":"",
"register_province_name":"河南省",
"register_city_name":"",
"register_county_name":"",
"zj_town_code":"",
"zj_town_name":"陶朱街道",
"zj_village_code":null,
"zj_village_name":null,
"zj_address":"江山村",
"token":"",
"comment":"徐朝晖徐朝晖徐朝晖",
"device":"mobile",
"add_time":"2021-01-20 10:20:21",
"update_time":"2021-01-20 10:20:21",
"deleted":"0"
}
],
"database":"",
"es":,
"id":14011,
"isDdl":false,
"mysqlType":{
"user_id":"bigint(21) unsigned",
"role":"varchar(16)",
"stage":"varchar(12)",
"fullname":"varchar(32)",
"id_card":"varchar(32)",
"sno":"varchar(50)",
"gender":"varchar(12)",
"phone":"varchar(16)",
"department_code":"varchar(32)",
"department_name":"varchar(64)",
"guarder_name":"varchar(32)",
"guarder_phone":"varchar(16)",
"section_code":"varchar(32)",
"section_name":"varchar(64)",
"grade_name":"varchar(32)",
"grade_code":"varchar(32)",
"class_code":"varchar(32)",
"class_name":"varchar(32)",
"class_teacher_name":"varchar(32)",
"class_teacher_jobno":"varchar(32)",
"class_teacher_phone":"varchar(16)",
"register_province_code":"varchar(16)",
"register_city_code":"varchar(16)",
"register_county_code":"varchar(16)",
"register_province_name":"varchar(32)",
"register_city_name":"varchar(32)",
"register_county_name":"varchar(32)",
"zj_town_code":"varchar(32)",
"zj_town_name":"varchar(32)",
"zj_village_code":"varchar(32)",
"zj_village_name":"varchar(64)",
"zj_address":"text",
"token":"varchar(64)",
"comment":"text",
"device":"varchar(8)",
"add_time":"datetime",
"update_time":"timestamp",
"deleted":"tinyint(1)"
},
"old":null,
"pkNames":[
"user_id"
],
"sql":"",
"sqlType":{
"user_id":-5,
"role":12,
"stage":12,
"fullname":12,
"id_card":12,
"sno":12,
"gender":12,
"phone":12,
"department_code":12,
"department_name":12,
"guarder_name":12,
"guarder_phone":12,
"section_code":12,
"section_name":12,
"grade_name":12,
"grade_code":12,
"class_code":12,
"class_name":12,
"class_teacher_name":12,
"class_teacher_jobno":12,
"class_teacher_phone":12,
"register_province_code":12,
"register_city_code":12,
"register_county_code":12,
"register_province_name":12,
"register_city_name":12,
"register_county_name":12,
"zj_town_code":12,
"zj_town_name":12,
"zj_village_code":12,
"zj_village_name":12,
"zj_address":2005,
"token":12,
"comment":2005,
"device":12,
"add_time":93,
"update_time":93,
"deleted":-7
},
"table":"",
"ts":,
"type":"INSERT"
}
//没意外的的疫情填报操作与eceb_epidemic_report_after表字段没变化
{
"data":[
{
"report_id":"",
"user_id":"",
"rework_status":"work",
"physical":"normal",
"physical_des":"",
"temperature_status":"normal",
"temperature_count":"",
"isolate_site":"",
"isolate_desc":"",
"isolate_reason":"",
"inhome_reason":"",
"inhome_desc":"",
"hospital":"",
"sick_desc":"",
"is_went_areas":"no",
"went_time":"",
"went_address":"",
"went_active":"",
"add_time":"2021-01-20 08:00:00",
"update_time":"2021-01-20 09:09:49",
"deleted":null,
"journey_desc":null
}
],
"database":"",
"es":1611104989000,
"id":12191,
"isDdl":false,
"mysqlType":{
"report_id":"bigint(21) unsigned",
"user_id":"bigint(21)",
"rework_status":"varchar(12)",
"physical":"char(10)",
"physical_des":"text",
"temperature_status":"varchar(12)",
"temperature_count":"varchar(16)",
"isolate_site":"tinytext",
"isolate_desc":"tinytext",
"isolate_reason":"varchar(64)",
"inhome_reason":"varchar(12)",
"inhome_desc":"tinytext",
"hospital":"varchar(64)",
"sick_desc":"tinytext",
"is_went_areas":"char(6)",
"went_time":"varchar(255)",
"went_address":"varchar(128)",
"went_active":"tinytext",
"add_time":"datetime",
"update_time":"timestamp",
"deleted":"tinyint(1)",
"journey_desc":"varchar(600)"
},
"old":null,
"pkNames":[
"report_id"
],
"sql":"",
"sqlType":{
"report_id":-5,
"user_id":-5,
"rework_status":12,
"physical":1,
"physical_des":2005,
"temperature_status":12,
"temperature_count":12,
"isolate_site":2005,
"isolate_desc":2005,
"isolate_reason":12,
"inhome_reason":12,
"inhome_desc":2005,
"hospital":12,
"sick_desc":2005,
"is_went_areas":1,
"went_time":12,
"went_address":12,
"went_active":2005,
"add_time":93,
"update_time":93,
"deleted":-6,
"journey_desc":12
},
"table":"",
"ts":1611104945190,
"type":"INSERT"
}
pom依赖:
<!-- 解析json字符串 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version></dependency>
逻辑代码:
class MyMapFunction extends MapFunction[String,String]{@throws[Exception]override def map(value: String): String = {//解析Json字符串if (value.contains("eceb_epidemic_user")){val str1: String = JSON.toJSONString(value, SerializerFeature.EMPTY: _*)users = JSON.parseObject(str1,classOf[epidemicuser])import scala.collection.JavaConversions._println(users.data(users.data.toList.size - 1))JSON.parseArray(value).getJSONObject(0)if (users.`type`.contains("INSERT")){Number += 1}else if (users.`type`.contains("DELETE")){Number -= 1}(users.`type`,Number)}str} }
报错信息:
Caused by: com.alibaba.fastjson.JSONException: syntax error,except start with { or [,but actually start with string
正确方式:
1.Json => Object
val result = kafkaDS.map(value => {import com.alibaba.fastjson.JSONObjectval nObject: JSONObject = JSON.parseObject(value)
2. Object转换为Array并取出data数组
val array: JSONArray = nObject.getJSONArray("data") epidemicUsers = array.toJavaList(classOf[epidemicuser]) epidemicUserTemp = epidemicUsers.get(0)
3.操作data的数据
val temp1 = JSON.toJSON(epidemicUserTemp).toString value1 = JSON.parseObject(temp1,classOf[epidemicuser])
Flink链接kafka并解析Json文件(三)相关推荐
- 解析json文件、执行批量修改sql
要求:解析json文件,取出其中的参数,修改数据库中的数据 数据量:190万条 使用线程池批量处理sql 1.线程: public class DateHandleThread extends Thr ...
- 使用C/C++解析json文件
目录 为什么? 怎么做? 为什么? 举个例子,我们在使用C/C++进行深度学习模型的测试,由于测试过程中可能有许多参数要传给model,比如在进行目标检测时,要传入nms阈值等.我们要最优化测试结果, ...
- Java性能优化:正确的解析JSON文件
为什么80%的码农都做不了架构师?>>> 数据收集服务平均1小时OOM(java.lang.OutOfMemoryError: GC overhead limit exceed ...
- python解析json_python解析json文件
概念 序列化(Serialization):将对象的状态信息转换为可以存储或可以通过网络传输的过程,传输的格式可以是JSON.XML等.反序列化就是从存储区域(JSON,XML)读取反序列化对象的状态 ...
- 如何使用PHP解析JSON文件? [重复]
本文翻译自:How can I parse a JSON file with PHP? [duplicate] This question already has an answer here: 这个 ...
- java解析json文件_Java性能优化:正确的解析JSON文件
数据收集服务平均1小时OOM(java.lang.OutOfMemoryError: GC overhead limit exceeded)一次,发现都是在下载处理 JSON Atom Feed时OO ...
- Python解析json文件
Python解析json文件 实现代码 import json import sysstdout = sys.stdoutwith open("company.json", &qu ...
- 关于 pandas 解析 json 文件和其他类型文件的结果中日期格式数据类型不一致的问题
问题: 我有两个文件,一个 .csv 文件和一个 .json 文件,数据截图分别如下: 我的目的是解析这些文件,并将结果统一交由下一个程序块进行处理. 在了解到 pandas 可以解析数据文件(csv ...
- 解析json文件的Go依赖包
上一篇golang读取json配置文件介绍了使用encoding/json包来解析json文件,但是这种方法在面对结构复杂.字段较多的情况时,解析效率不是很高.上一篇中,我们在解析json文件时,需要 ...
- Android--------使用gson解析json文件
##使用gson解析json文件 **json的格式有两种:** **1. {}类型,及数据用{}包含:** **2. []类型,即数据用[]包含:** 下面用个例子,简单的介绍gson如何解析jso ...
最新文章
- 入华十年,一家互联网外企的“另类”视频广告模式
- php -- 检查是否存在
- QT的QMaterial类的使用
- 【LeetCode笔记】19.删除链表的倒数第N个结点(Java、快慢指针)
- modelsim10.1a安装破解说明
- 解锁树莓派root账号
- 禁止选择文字和文本超出显示省略号
- 文本框的值默认显示文本域上_13.4.4 键盘与文本事件
- git -- 忽略某个文件
- 关于水晶报表打包的一些注意的地方!
- 方差分析软件_手把手教你用Graphpad做单因素方差分析
- 【光纤传输特性】图文并茂,你该了解这些
- JQ ajax 请求事件处理
- 程序猿来找找自己的目标
- ArcMap制作TPK文件
- 1883:北京旅行日记1276695923新浪博客
- ffmpeg Intel硬件加速总结
- 萨班斯法案:由来、影响及争论
- JAVA计算机毕业设计学术会议信息网站Mybatis+源码+数据库+lw文档+系统+调试部署
- 1.Docker学习之基础知识
热门文章
- STM32通过SPI读W25Q64的ID时钟分析
- 学习手机软件开发学什么,怎样学?
- 蓝桥杯 算法训练 跳马
- mysql 实例结构体_C语言结构体实例-创建兔子
- 2007年浙江高考满分作文——行走在消逝中
- vs2015 linux开发 界面设计,Microsoft Office开发工具 Visual Studio 2015
- COMMAND NOT SUPPORTED 解决方法
- The Elder(hdu 5956 树上斜率dp + 队列还原)
- 基于python学生档案管理系统的设计与实现.rar(毕业论文+程序源码+答辩PPT)
- 激活win10企业长期服务版