Flink解析Kafka中的Json数据

公司的JSON数据格式:

{
    "data":[
        {
            "user_id":"",
            "role":"teacher",
            "stage":"after",
            "fullname":"徐朝晖",
            "id_card":null,
            "sno":"",
            "gender":"male",
            "phone":"",
            "department_code":"",
            "department_name":"",
            "guarder_name":null,
            "guarder_phone":null,
            "section_code":null,
            "section_name":null,
            "grade_name":null,
            "grade_code":null,
            "class_code":null,
            "class_name":null,
            "class_teacher_name":null,
            "class_teacher_jobno":"",
            "class_teacher_phone":null,
            "register_province_code":"",
            "register_city_code":"",
            "register_county_code":"",
            "register_province_name":"河南省",
            "register_city_name":"",
            "register_county_name":"",
            "zj_town_code":"",
            "zj_town_name":"陶朱街道",
            "zj_village_code":null,
            "zj_village_name":null,
            "zj_address":"江山村",
            "token":"",
            "comment":"徐朝晖徐朝晖徐朝晖",
            "device":"mobile",
            "add_time":"2021-01-20 10:20:21",
            "update_time":"2021-01-20 10:20:21",
            "deleted":"0"
        }
    ],
    "database":"",
    "es":,
    "id":14011,
    "isDdl":false,
    "mysqlType":{
        "user_id":"bigint(21) unsigned",
        "role":"varchar(16)",
        "stage":"varchar(12)",
        "fullname":"varchar(32)",
        "id_card":"varchar(32)",
        "sno":"varchar(50)",
        "gender":"varchar(12)",
        "phone":"varchar(16)",
        "department_code":"varchar(32)",
        "department_name":"varchar(64)",
        "guarder_name":"varchar(32)",
        "guarder_phone":"varchar(16)",
        "section_code":"varchar(32)",
        "section_name":"varchar(64)",
        "grade_name":"varchar(32)",
        "grade_code":"varchar(32)",
        "class_code":"varchar(32)",
        "class_name":"varchar(32)",
        "class_teacher_name":"varchar(32)",
        "class_teacher_jobno":"varchar(32)",
        "class_teacher_phone":"varchar(16)",
        "register_province_code":"varchar(16)",
        "register_city_code":"varchar(16)",
        "register_county_code":"varchar(16)",
        "register_province_name":"varchar(32)",
        "register_city_name":"varchar(32)",
        "register_county_name":"varchar(32)",
        "zj_town_code":"varchar(32)",
        "zj_town_name":"varchar(32)",
        "zj_village_code":"varchar(32)",
        "zj_village_name":"varchar(64)",
        "zj_address":"text",
        "token":"varchar(64)",
        "comment":"text",
        "device":"varchar(8)",
        "add_time":"datetime",
        "update_time":"timestamp",
        "deleted":"tinyint(1)"
    },
    "old":null,
    "pkNames":[
        "user_id"
    ],
    "sql":"",
    "sqlType":{
        "user_id":-5,
        "role":12,
        "stage":12,
        "fullname":12,
        "id_card":12,
        "sno":12,
        "gender":12,
        "phone":12,
        "department_code":12,
        "department_name":12,
        "guarder_name":12,
        "guarder_phone":12,
        "section_code":12,
        "section_name":12,
        "grade_name":12,
        "grade_code":12,
        "class_code":12,
        "class_name":12,
        "class_teacher_name":12,
        "class_teacher_jobno":12,
        "class_teacher_phone":12,
        "register_province_code":12,
        "register_city_code":12,
        "register_county_code":12,
        "register_province_name":12,
        "register_city_name":12,
        "register_county_name":12,
        "zj_town_code":12,
        "zj_town_name":12,
        "zj_village_code":12,
        "zj_village_name":12,
        "zj_address":2005,
        "token":12,
        "comment":2005,
        "device":12,
        "add_time":93,
        "update_time":93,
        "deleted":-7
    },
    "table":"",
    "ts":,
    "type":"INSERT"
}

//没意外的的疫情填报操作与eceb_epidemic_report_after表字段没变化
{
    "data":[
        {
            "report_id":"",
            "user_id":"",
            "rework_status":"work",
            "physical":"normal",
            "physical_des":"",
            "temperature_status":"normal",
            "temperature_count":"",
            "isolate_site":"",
            "isolate_desc":"",
            "isolate_reason":"",
            "inhome_reason":"",
            "inhome_desc":"",
            "hospital":"",
            "sick_desc":"",
            "is_went_areas":"no",
            "went_time":"",
            "went_address":"",
            "went_active":"",
            "add_time":"2021-01-20 08:00:00",
            "update_time":"2021-01-20 09:09:49",
            "deleted":null,
            "journey_desc":null
        }
    ],
    "database":"",
    "es":1611104989000,
    "id":12191,
    "isDdl":false,
    "mysqlType":{
        "report_id":"bigint(21) unsigned",
        "user_id":"bigint(21)",
        "rework_status":"varchar(12)",
        "physical":"char(10)",
        "physical_des":"text",
        "temperature_status":"varchar(12)",
        "temperature_count":"varchar(16)",
        "isolate_site":"tinytext",
        "isolate_desc":"tinytext",
        "isolate_reason":"varchar(64)",
        "inhome_reason":"varchar(12)",
        "inhome_desc":"tinytext",
        "hospital":"varchar(64)",
        "sick_desc":"tinytext",
        "is_went_areas":"char(6)",
        "went_time":"varchar(255)",
        "went_address":"varchar(128)",
        "went_active":"tinytext",
        "add_time":"datetime",
        "update_time":"timestamp",
        "deleted":"tinyint(1)",
        "journey_desc":"varchar(600)"
    },
    "old":null,
    "pkNames":[
        "report_id"
    ],
    "sql":"",
    "sqlType":{
        "report_id":-5,
        "user_id":-5,
        "rework_status":12,
        "physical":1,
        "physical_des":2005,
        "temperature_status":12,
        "temperature_count":12,
        "isolate_site":2005,
        "isolate_desc":2005,
        "isolate_reason":12,
        "inhome_reason":12,
        "inhome_desc":2005,
        "hospital":12,
        "sick_desc":2005,
        "is_went_areas":1,
        "went_time":12,
        "went_address":12,
        "went_active":2005,
        "add_time":93,
        "update_time":93,
        "deleted":-6,
        "journey_desc":12
    },
    "table":"",
    "ts":1611104945190,
    "type":"INSERT"
}

pom依赖:

    <!-- 解析json字符串 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version></dependency>

逻辑代码:

class MyMapFunction extends MapFunction[String,String]{@throws[Exception]override def map(value: String): String = {//解析Json字符串if (value.contains("eceb_epidemic_user")){val str1: String = JSON.toJSONString(value, SerializerFeature.EMPTY: _*)users = JSON.parseObject(str1,classOf[epidemicuser])import scala.collection.JavaConversions._println(users.data(users.data.toList.size - 1))JSON.parseArray(value).getJSONObject(0)if (users.`type`.contains("INSERT")){Number += 1}else if (users.`type`.contains("DELETE")){Number -= 1}(users.`type`,Number)}str}
}

报错信息:

Caused by: com.alibaba.fastjson.JSONException: syntax error,except start with { or [,but actually start with string

正确方式:

1.Json => Object

val result = kafkaDS.map(value => {import com.alibaba.fastjson.JSONObjectval nObject: JSONObject = JSON.parseObject(value)

2. Object转换为Array并取出data数组

val array: JSONArray = nObject.getJSONArray("data")
epidemicUsers = array.toJavaList(classOf[epidemicuser])
epidemicUserTemp = epidemicUsers.get(0)

3.操作data的数据

val temp1 = JSON.toJSON(epidemicUserTemp).toString
value1 = JSON.parseObject(temp1,classOf[epidemicuser])

Flink链接kafka并解析Json文件(三)相关推荐

  1. 解析json文件、执行批量修改sql

    要求:解析json文件,取出其中的参数,修改数据库中的数据 数据量:190万条 使用线程池批量处理sql 1.线程: public class DateHandleThread extends Thr ...

  2. 使用C/C++解析json文件

    目录 为什么? 怎么做? 为什么? 举个例子,我们在使用C/C++进行深度学习模型的测试,由于测试过程中可能有许多参数要传给model,比如在进行目标检测时,要传入nms阈值等.我们要最优化测试结果, ...

  3. Java性能优化:正确的解析JSON文件

    为什么80%的码农都做不了架构师?>>>    数据收集服务平均1小时OOM(java.lang.OutOfMemoryError: GC overhead limit exceed ...

  4. python解析json_python解析json文件

    概念 序列化(Serialization):将对象的状态信息转换为可以存储或可以通过网络传输的过程,传输的格式可以是JSON.XML等.反序列化就是从存储区域(JSON,XML)读取反序列化对象的状态 ...

  5. 如何使用PHP解析JSON文件? [重复]

    本文翻译自:How can I parse a JSON file with PHP? [duplicate] This question already has an answer here: 这个 ...

  6. java解析json文件_Java性能优化:正确的解析JSON文件

    数据收集服务平均1小时OOM(java.lang.OutOfMemoryError: GC overhead limit exceeded)一次,发现都是在下载处理 JSON Atom Feed时OO ...

  7. Python解析json文件

    Python解析json文件 实现代码 import json import sysstdout = sys.stdoutwith open("company.json", &qu ...

  8. 关于 pandas 解析 json 文件和其他类型文件的结果中日期格式数据类型不一致的问题

    问题: 我有两个文件,一个 .csv 文件和一个 .json 文件,数据截图分别如下: 我的目的是解析这些文件,并将结果统一交由下一个程序块进行处理. 在了解到 pandas 可以解析数据文件(csv ...

  9. 解析json文件的Go依赖包

    上一篇golang读取json配置文件介绍了使用encoding/json包来解析json文件,但是这种方法在面对结构复杂.字段较多的情况时,解析效率不是很高.上一篇中,我们在解析json文件时,需要 ...

  10. Android--------使用gson解析json文件

    ##使用gson解析json文件 **json的格式有两种:** **1. {}类型,及数据用{}包含:** **2. []类型,即数据用[]包含:** 下面用个例子,简单的介绍gson如何解析jso ...

最新文章

  1. 入华十年,一家互联网外企的“另类”视频广告模式
  2. php -- 检查是否存在
  3. QT的QMaterial类的使用
  4. 【LeetCode笔记】19.删除链表的倒数第N个结点(Java、快慢指针)
  5. modelsim10.1a安装破解说明
  6. 解锁树莓派root账号
  7. 禁止选择文字和文本超出显示省略号
  8. 文本框的值默认显示文本域上_13.4.4 键盘与文本事件
  9. git -- 忽略某个文件
  10. 关于水晶报表打包的一些注意的地方!
  11. 方差分析软件_手把手教你用Graphpad做单因素方差分析
  12. 【光纤传输特性】图文并茂,你该了解这些
  13. JQ ajax 请求事件处理
  14. 程序猿来找找自己的目标
  15. ArcMap制作TPK文件
  16. 1883:北京旅行日记1276695923新浪博客
  17. ffmpeg Intel硬件加速总结
  18. 萨班斯法案:由来、影响及争论
  19. JAVA计算机毕业设计学术会议信息网站Mybatis+源码+数据库+lw文档+系统+调试部署
  20. 1.Docker学习之基础知识

热门文章

  1. STM32通过SPI读W25Q64的ID时钟分析
  2. 学习手机软件开发学什么,怎样学?
  3. 蓝桥杯 算法训练 跳马
  4. mysql 实例结构体_C语言结构体实例-创建兔子
  5. 2007年浙江高考满分作文——行走在消逝中
  6. vs2015 linux开发 界面设计,Microsoft Office开发工具 Visual Studio 2015
  7. COMMAND NOT SUPPORTED 解决方法
  8. The Elder(hdu 5956 树上斜率dp + 队列还原)
  9. 基于python学生档案管理系统的设计与实现.rar(毕业论文+程序源码+答辩PPT)
  10. 激活win10企业长期服务版