背景

目前我们需要将各渠道的数据进行入库然后进行分析,当前 api 返回的数据都是通过定制化代码开发来解析、建表、数据入库的。这种方式可复用性不高,因为每种渠道提供的接口数据格式是不一样的,每次都需要去修改代码,不能做到零代码通用方案入库。所以我们需要一个可以自动来根据 api 返回的 json 数据格式,来完成建表、自动入库的一种通用方案。

整体思路

因为接口返回的数据基本都不是就简单的一个层级的,可能是某个 key 对应的 value 还是 JSON 数据, 一直嵌套好多层,这个时候就需要给定嵌套的这种情况如何处理,生成多少张表,数据如何批量插入,如果生成多张表,表中的数据通过什么来关联?以上问题都是需要考虑的问题,具体该如何解决,下面详细讲解一下。

落地多少张表

关于落地多少张表,当前是支持二种方案:

单张宽表

单张宽表的思路就比较简单,一般是最外面一层数据是我们比较想要的数据,然后里面嵌套的可能是一些维度信息,因此我们会以最外层的 key 来作为表的字段,value 如果是基本类型(数据、字符串、布尔类型)就不做处理,如果是 json 数据(JSONObject 或者 JSONArray),则将其转化成 json 字符串,按照字符串类型来进行处理。

注:因为这边业务场景是使用 flink 来进行接下来的数据处理,所以,会写一个 udf 函数,来从 json 字符串中提取任意级别的 json 数据配合着使用。

多张表

多张表处理是将同一个 jsonObject 范围内的同一级别并且 value 是基本类型的所有 key 组成一个表,然后如果 value 是 json 数据(JSONObject 或者 JSONArray),则会再生成一张表,然后会添加一个上层 json 生成表的关联字段来进行表表之间的关联,这比较符合关系型数据的第三范式,也比较适合于那种没有 json 字符串和行列转换强功能支持的数据库类型,比如 mysql。

数据如何批量插入 & 解决幂等性

批量插入就是生成批量插入的 sql 语句,但是一个接口可能会重复导入,那如何解决幂等性的问题?

这就要从我们生成表那里的逻辑,就是在生成一个表的时候,会默认添加一列,列名就叫 md5_code,用来标志这条数据的唯一性,md5_code 取值就包括该行里面的数据(排除关联键的值)后经过 md5 加密后的一个值,来标志这行数据的唯一性,如果多次导入同一批数据,则只会更新导入时间,不会新插入一条数据。

落地多张表的情况,数据如何关联

这里的关联,包括建表的时候的关联键的添加和实际插入数据时,多张表之间行与行之间关联的值的关联二种情况。

关联键的添加

在建表的时候会生成关联键,关联键的名称生成规则就是外层表名拼上一个后缀。

多表之间的行数据关联

再往里解析 json 的时候,需要把关联键和关联键的值传递进去,关联键的值传递的就是外层表当前行数据的 md5 值,这样就生成了多表之间行数据关联的依据。

如何适配多种不同的数据源

这里就是整体设计方案来解决的问题了,我们这里采取的是工厂模式,根据不同的数据库类型,来选择不同的 builder 来处理。

再选择 SqlBuilder 的时候,根据要落库的数据类型来选择即可,在 buildSql 的时候在传入你想构建大宽表还是多表,即可根据你的需求进行定制化实现。

核心的json数据解析和生成统一的数据模型是在父类的代码里做的,不同的sql构建器则是根据解析和抽象好的数据模型来自定义生成不同类型的标准sql。

架构设计

上图是核心架构图,因为要支持同一个接口可以灵活入不同类型的库,所以我们这里设计思路有二大亮点:

  • 使用工厂模式的设计模式,使得用户可以随意选择入库的数据库类型,核心逻辑根据 json 生成下面统一的数据模型是在父类 SqlBuilder 中,子类在进行 buildSql 的时候只需要先调用父类的统一逻辑,再调用自己重写的 buildCreateSql 和 buildInsertSql 方法即可。

  • 抽象出来一个数据模型,将生成 sql 的必要的通用信息进行统一实现,不同数据库类型的 SqlBuilder 根据通用信息来根据自己的语法特征来生成对应的 sql 即可。通用信息包括:

    • 列信息(列名称、类型(长度))
    • 表名
    • 关联键
    • 要插入的数据集合

如何来使用呢,使用方法也比较简单,如下代码所示:

       // 首先根据入库的类型选择合适的sql构建器SqlBuilder sqlBuilder = SqlBuilderFactory.getSqlBuilder(SinkDbTypeEnum.MYSQL);  // 传入参数sqlBuilder.initRawJsonData(jsonData,"shop_order_list");// 构建sql(可选择是构建多表还是单个大宽表模式)sqlBuilder.buildSql(DataSinkTypeEnum.SINGLE_TABLE);// 后面即可将生成的sql打印出来sqlBuilder.getCreateSqlList().forEach(sql -> System.out.println(sql));sqlBuilder.getInsertSqlList().forEach(sql -> System.out.println(sql));

JSON 数据解析核心逻辑

解析成单表的逻辑

解析成单表的逻辑比较简单,主要考虑是一般接口直接返回的数据,最外层的数据一般是比较核心的数据,其中可能会有 json 嵌套,json 嵌套值会被认为是数据某个属性的扩展,类似是指标的维度的概念,所以我们在解析成单标的逻辑就是每个 key 会对应到表中的一个字段,value 类型和生成字段的类型的对应关系如下(Mysql 为例):

value 类型 字段类型 字段类型长度
字符串 varchar 取当前该字段要插入数据的最大长度和 255 中的最大值
整型 bigint bigint(11)
jsonObject varchar 序列化后字符串的长度和 255 取最大值
jsonArray varchar 序列化后字符串的长度和 255 取最大值

核心代码如下:

 /*** 生成单表的逻辑 ,只解析最外面一层做表结构*/private void buildSqlBlock(Object object) {SqlBlock sqlBlock = this.sqlMap.get(this.rootTableName);if (sqlBlock == null) {this.sqlMap.put(this.rootTableName, new SqlBlock(this.rootTableName));}if (object instanceof JSONObject) {buildSqlBlockForJSONObject((JSONObject) object);return;}if (object instanceof JSONArray) {JSONArray array = (JSONArray) object;for (int i = 0; i < array.size(); i++) {buildSqlBlockForJSONObject(array.getJSONObject(i));}return;}}private void buildSqlBlockForJSONObject(JSONObject row) {SqlBlock sqlBlock = this.sqlMap.get(this.rootTableName);for (Map.Entry<String, Object> entry : row.entrySet()) {if (entry.getValue() instanceof JSONObject || entry.getValue() instanceof JSONArray) {String value = JSON.toJSONString(entry.getValue());sqlBlock.addColumn(entry.getKey(), value);row.put(entry.getKey(), value);} else {sqlBlock.addColumn(entry.getKey(), entry.getValue());}}sqlBlock.addColumn(DEFAULT_COLUMN_MD5,"md5Column");String md5Code = md5Encode(row,null);row.put(DEFAULT_COLUMN_MD5,md5Code);sqlBlock.addRowData(row);}

解析成多表的逻辑

解析成多表的逻辑,就会比较复杂,从外向里遍历 json 结构的时候,如果发现某个 key 对应的 value 是 json 数据(JSONObject 或者 JSONArray)就会为其生成一个新表,并且在新表里面建立一个和外层表的关联键。

表名默认是 key 值,最外层没有 key 值,需要在进行初始化的时候进行指定。

核心代码如下:

/*** jsonArray 生成json语句** @param tableName* @param array* @param relationTable* @param relationTableValue*/private void buildSqlBlock(String tableName,JSONArray array,String relationTable, String relationTableValue) {if (array.size() <= 0) {return;}if (StringUtils.hasLength(relationTable)) {for (int i = 0; i < array.size(); i++) {array.getJSONObject(i).put(buildRelationKey(relationTable), relationTableValue);}}buildSqlBlock(tableName, array.getJSONObject(0), relationTable);}/*** jsonObject生成创建表语句** @param tableName* @param row*/private void buildSqlBlock( String tableName,JSONObject row,String relationTable) {SqlBlock sqlBlock = sqlMap.get(tableName);if (sqlBlock == null) {sqlBlock = new SqlBlock(tableName);}String rowDataMd5 = md5Encode(row, relationTable);Set<String> keySets = row.keySet();for (String key : keySets) {Object value = row.get(key);if (value instanceof JSONObject) {JSONObject object = row.getJSONObject(key);object.put(buildRelationKey(tableName), rowDataMd5);buildSqlBlock(key, object, tableName);} else if (value instanceof JSONArray) {JSONArray array = row.getJSONArray(key);buildSqlBlock(key, array, tableName, rowDataMd5);} else {sqlBlock.addColumn(key, value);}}sqlBlock.addColumn(DEFAULT_COLUMN_MD5, "string");row.put(DEFAULT_COLUMN_MD5, rowDataMd5);sqlBlock.addRowData(row);sqlBlock.addRelationTableKey(buildRelationKey(relationTable));sqlMap.put(tableName, sqlBlock);}private String buildRelationKey(String relationTable) {if (relationTable == null) {return null;}return relationTable.concat("_relation_key");}private String md5Encode(JSONObject row, String relationTable) {try {Map<String, Object> rowMap = new HashMap<>();row.entrySet().stream().filter(entry -> baseType(entry.getValue())).filter(entry -> StringUtils.isEmpty(relationTable) || !buildRelationKey(relationTable).equals(entry.getKey())).forEach(entry -> rowMap.put(entry.getKey(), entry.getValue()));return DigestUtils.md5DigestAsHex(JSON.toJSONString(rowMap).getBytes("utf-8"));} catch (UnsupportedEncodingException e) {log.error("DigestUtils.md5DigestAsHex execute Error : {}", e.getMessage(), e);}return row.toJSONString();}private Boolean baseType(Object object) {if (object instanceof JSONObject || object instanceof JSONArray) {return false;} else {return true;}}

Api 数据自动入库相关推荐

  1. 自动获取淘宝API数据访问的SessionKey

    原文地址为: 自动获取淘宝API数据访问的SessionKey 最近在忙与淘宝做对接的工作,总体感觉淘宝的api文档做的还不错,不仅有沙箱测试环境,而且对于每一个api都可以通过api测试工具生成想要 ...

  2. 【V2.0】基于运动步数API的自动更新系统(支持断线重连、数据补偿)

    [V2.0]基于运动步数API的自动更新系统(支持断线重连.数据补偿) 前文: https://blog.csdn.net/weixin_53403301/article/details/122882 ...

  3. api 数据gzip压缩_如何使用GZip和Jersey压缩Java REST API中的响应

    api 数据gzip压缩 在某些情况下,您的REST api会提供非常长的响应,我们都知道移动设备/网络上的传输速度和带宽仍然非常重要. 我认为这是开发支持移动应用程序的REST api时需要解决的第 ...

  4. spring、mybatis、测试用例执行后数据没有入库

    最近做一个例子,按照老的项目进行搭建, Junit测试对数据库的插入和删除操作 ,测试用例运行时一切正常,新增方法执行正常.但是结果数据没有入库. 经过一段时间的分析发现是由于spring进行了回滚( ...

  5. 快速搭建你的api数据交易平台-图文开发教程

    项目背景 如果你需要开发搭建自己的api数据交易平台,并且能在平台上面进行对客户管理.接口管理.套餐管理.账单管理.充值管理,那么下面将来介绍如何使用接口大师这个框架快速进行开发. 安装 PhalAp ...

  6. HENGSHI SENSE 4.0 预置明道云连接器,实现更灵活的数据自动传输

    摘要 衡石科技在最新发布的 HENGSHI SENSE 4.0 新版本中预置了明道云数据连接器.这意味着:明道云用户将更轻松便捷地集成至衡石平台,实现数据自动传输,提高数据分析处理能力. 背景 在数字 ...

  7. 用Python实现Flickr照片文本数据下载入库及图片保存(第一次帮忙)

    用Python实现Flickr照片文本数据下载入库及图片保存_fbc3173的博客-CSDN博客 来源 #原来的代码 #!/usr/bin/python import requests import ...

  8. 数字化外协生产综合管理系统,实现信息自动同步,数据自动统计分析!

    随着市场经济的不断发展,制造生产行业竞争不断加剧,精细化.无纸化办公已成为生产企业生存和发展的基本条件.要想将企业内部管理做的更精更细,就必须借助于现代先进的企业管理手段和工具,如企业资源计划系统.生 ...

  9. 老贼远古VOD自动入库使用说明

    我的机器没装远古的什么程序,只有装的数据库,数据库的数据是朋友给的,不是我机器上的,我也没什么片源,这个教程只是演示,告诉怎么设置. 本程序用VB6编写,在windwos 2003及SQL2000sp ...

最新文章

  1. 【学习笔记】超简单的多项式开方
  2. 台式电脑一般价钱多少_让你少走弯路,看我怎么花低价钱配出高配置的台式电脑的...
  3. ubuntu 安装 talib
  4. C++ Primer 5th笔记(chap 12 动态内存)动态数组
  5. ios 不同的数据类型转化为json类型
  6. 深入浅出CUDA编程
  7. 查看模拟器使用端口_为什么我们仍然使用模拟音频端口?
  8. 面向对象 封装 集成 特性
  9. Spring MVC DispatcherServlet改造为 CSE RestServlet 常见问题汇编
  10. 浅谈Fragment
  11. linux HZ 值_Linux操作系统中进程的操作命令(ps,kill,keep)
  12. js实现php中图片轮播,两种js实现轮播图的方式
  13. Docker搭建Nginx集群
  14. python图片尺寸大小修改_Python实现更改图片尺寸大小的方法(基于Pillow包)
  15. 【Python】如何用Python快速实现语音提醒功能
  16. Python下通过PR曲线值计算AP
  17. 详细了解如何在npm上发布自己的包?
  18. python提取XML信息保存为txt
  19. excel下拉隐藏_在Excel下拉列表中隐藏使用过的项目
  20. dw中css目标规则命名,css 常用样式命名规则

热门文章

  1. 58到家数据库30条军规解读
  2. Linux下批量修改文件名(rename)
  3. DLL 注入的三种方法详解
  4. (修订)MySQL优化/面试,看这一篇就够了
  5. php面试题3---php面试题系列
  6. C++ accumulate
  7. WrapPanel控件增加滚动条
  8. JavaScript execCommand函数
  9. JavaScript遍历数组和对象
  10. ubuntu下街机模拟器 mame 安装和玩拳王97