1.Clickhouse的Nested数据结构

Nested是一种嵌套表结构。一张数据表,可以定义任意多个嵌套类型字段,但每个字段的嵌套层级只支持一级,即嵌套表内不能继续使用嵌套类型。对于简单场景的层级关系或关联关系,使用嵌套类型也是一种不错的选择。

create table test_nested(uid Int8 ,name String ,props Nested(pid Int8,pnames String ,pvalues String)
) engine = MergeTree ORDER BY uid ;
desc test_nested;
┌─name──────────┬─type───────┬
│ uid           │ Int8          │
│ name          │ String        │
│ props.pid     │ Array(Int8)   │
│ props.pnames  │ Array(String) │
│ props.pvalues │ Array(String) │
└─────────────┴─────────────┴

嵌套类型本质是一种多维数组的结构。嵌套表中的每个字段都是一个数组,并且行与行之间数组的长度无须对齐。需要注意的是,在同一行数据内每个数组字段的长度必须相等。

insert into test_nested values(1,'hadoop',[1,2,3],['p1','p2','p3'],['v1','v2','v3']);
-- 行和行之间的属性的个数可以不一致 ,但是当前行的Nested类型中的数组个数必须一致
insert into test_nested values(2,'spark',[1,2],['p1','p2'],['v1','v2']);
SELECT *
FROM test_nested┌─uid─┬─name───┬─props.pid─┬─props.pnames─────┬─props.pvalues────┐
│   1 │ hadoop │ [1,2,3]   │ ['p1','p2','p3'] │ ['v1','v2','v3'] │
└─────┴────────┴───────────┴──────────────────┴──────────────────┘
┌─uid─┬─name──┬─props.pid─┬─props.pnames─┬─props.pvalues─┐
│   2 │ spark │ [1,2]     │ ['p1','p2']  │ ['v1','v2']   │
└─────┴───────┴───────────┴──────────────┴───────────────┘
SELECTuid,name,props.pid,props.pnames[1]
FROM test_nested;
┌─uid─┬─name───┬─props.pid─┬─arrayElement(props.pnames, 1)─┐
│   1 │ hadoop │ [1,2,3]   │ p1                            │
└─────┴────────┴───────────┴───────────────────────────────┘
┌─uid─┬─name──┬─props.pid─┬─arrayElement(props.pnames, 1)─┐
│   2 │ spark │ [1,2]     │ p1                            │
└─────┴───────┴───────────┴───────────────────────────────┘

2.使用JDBC插入Nested数据

通过查询表结构可以看到Clickhouse存储Nested数据,本质上是将Nested数据拆成了多列存储,列数等于元素属性的个数,每一列存储的是一个Array类型的数据
因此使用insert into table values(?,?...)时,计算占位符的个数应当等于拆完Nested后总的列数

-- 使用以下语句作为PreparedStatement时, 计算占位符的个数应当等于拆完Nested后总的列数
insert into test_nested values(?,?,?,?,?)
-- 也可以指定需要插入的列, 例如
insert into test_nested (uid, name, props.pid) values(?,?,?)

3.基于Flink的JDBCSink插入案例

① Pojo定义

  • Nested
package com.zyx.flinkdemo.pojo;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.NonNull;import java.util.List;@Data
@AllArgsConstructor
@NoArgsConstructor
@NonNull
public class Nested {private String uid;private String name;private List<Props> props;}
  • Props
package com.zyx.flinkdemo.pojo;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.NonNull;@Data
@NonNull
@AllArgsConstructor
@NoArgsConstructor
public class Props {private String pid;private String pnames;private String pvalues;
}

② Clickhouse工具类

package com.zyx.flinkdemo.stream.utils;import com.zyx.flinkdemo.pojo.Nested;
import com.zyx.flinkdemo.pojo.Props;
import com.zyx.flinkdemo.pojo.TransientSink;
import com.zyx.flinkdemo.stream.cons.CommonConfig;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;import java.lang.reflect.Field;
import java.util.List;public class ClickHouseUtil {public static <T> SinkFunction<T> getNestedJdbcSink(String sql) {// obj  就是流中的一条数据对象return JdbcSink.sink(//要执行的SQL语句sql,// 执行写入操作 就是将当前流中的对象属性赋值给SQL的占位符(JdbcStatementBuilder<T>) (ps, obj) -> {// 获取当前类中  所有的属性Field[] fields = obj.getClass().getDeclaredFields();int j = 1;for (Field field : fields) {// 设置私有属性可访问field.setAccessible(true);if ("props".equals(field.getName())) {Nested nested = (Nested) obj;List<String[]> listStrArray = CommonUtils.listToStringArrayList(nested.getProps(), Props.class);for (String[] strArray : listStrArray) {ps.setArray(j++, ps.getConnection().createArrayOf("String", strArray));}continue;}try {// 获取属性值Object o = field.get(obj);ps.setObject(j++, o);} catch (IllegalAccessException e) {e.printStackTrace();}}},new JdbcExecutionOptions.Builder().withBatchSize(5).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl(CommonConfig.CLICKHOUSE_URL).withDriverName("ru.yandex.clickhouse.ClickHouseDriver").build());}
}

List转换成List<String[]>工具类

package com.zyx.flinkdemo.stream.utils;import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;public class CommonUtils {public static <T> List<String[]> listToStringArrayList(List<T> list, Class<T> tClass) {// 取出list中的元素并添加到字符串数组中Field[] fields = tClass.getDeclaredFields();List<String[]> resList = new ArrayList<>();if (list != null && list.size() > 0) {try {int listSize = list.size();for (Field field : fields) {field.setAccessible(true);String[] strArray = new String[listSize];for (int j = 0; j < listSize; j++) {Object obj = field.get(list.get(j));strArray[j] = obj == null ? "" : obj.toString();}resList.add(strArray);}} catch (IllegalAccessException e) {e.printStackTrace();}return resList;} else {for (int i = 0; i < fields.length; i++) {String[] init = {};resList.add(init);}return resList;}}
}

④ Flink主程序代码

package com.zyx.flinkdemo.stream.sink;import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.zyx.flinkdemo.pojo.Nested;
import com.zyx.flinkdemo.pojo.Props;
import com.zyx.flinkdemo.stream.utils.ClickHouseUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class ClickHouseNestedSinkDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);Props props = new Props();props.setPid("1002");props.setPnames("p1");JSONArray jsonArray = new JSONArray();jsonArray.add(props);JSONObject jsonObj1 = new JSONObject();jsonObj1.put("uid", "1001");jsonObj1.put("name", "zhangsan");jsonObj1.put("props", jsonArray);Nested nested1 = JSONObject.parseObject(jsonObj1.toJSONString(), Nested.class);JSONObject jsonObj2 = new JSONObject();jsonObj2.put("uid", "1001");Nested nested2 = JSONObject.parseObject(jsonObj2.toJSONString(), Nested.class);env.fromElements(nested1, nested2).addSink(ClickHouseUtil.getNestedJdbcSink("insert into test_nested values(?,?,?,?,?)"));env.execute();}
}

基于Flink的JDBC插入Nested结构数据到Clickhouse相关推荐

  1. Demo:基于 Flink SQL 构建流式应用

    摘要:上周四在 Flink 中文社区钉钉群中直播分享了<Demo:基于 Flink SQL 构建流式应用>,直播内容偏向实战演示.这篇文章是对直播内容的一个总结,并且改善了部分内容,比如除 ...

  2. 基于数据库数据增量同步_基于 Flink SQL CDC 的实时数据同步方案

    简介:Flink 1.11 引入了 Flink SQL CDC,CDC 能给我们数据和业务间能带来什么变化?本文由 Apache Flink PMC,阿里巴巴技术专家伍翀 (云邪)分享,内容将从传统的 ...

  3. cdc工具 postgresql_基于 Flink SQL CDC 的实时数据同步方案

    作者:伍翀 (云邪) 整理:陈政羽(Flink 社区志愿者) Flink 1.11 引入了 Flink SQL CDC,CDC 能给我们数据和业务间能带来什么变化?本文由 Apache Flink P ...

  4. hive增量表和全量表_基于 Flink + Hive 构建流批一体准实时数仓

    基于 Hive 的离线数仓往往是企业大数据生产系统中不可缺少的一环.Hive 数仓有很高的成熟度和稳定性,但由于它是离线的,延时很大.在一些对延时要求比较高的场景,需要另外搭建基于 Flink 的实时 ...

  5. 网易云音乐基于 Flink + Kafka 的实时数仓建设实践

    简介:本文由网易云音乐实时计算平台研发工程师岳猛分享,主要从以下四个部分将为大家介绍 Flink + Kafka 在网易云音乐的应用实战: 背景.Flink + Kafka 平台化设计.Kafka 在 ...

  6. 网络安全公司奇安信集团是如何基于 Flink 构建 CEP 引擎实时检测网络攻击【未来不可忽视的网络安全】

    摘要: 奇安信集团作为一家网络安全公司是如何基于 Flink 构建 CEP 引擎实时检测网络攻击?其中面临的挑战以及宝贵的实践经验有哪些?本文主要内容分为以下四个方面: 背景及现状 技术架构 产品及运 ...

  7. flink 写kafka_网易云音乐基于 Flink + Kafka 的实时数仓建设实践

    简介:本文由网易云音乐实时计算平台研发工程师岳猛分享,主要从以下四个部分将为大家介绍 Flink + Kafka 在网易云音乐的应用实战: 背景 Flink + Kafka 平台化设计 Kafka 在 ...

  8. Flink原理解析50篇(四)-基于 Flink CDC 打通数据实时入湖

    在构建实时数仓的过程中,如何快速.正确的同步业务数据是最先面临的问题,本文主要讨论一下如何使用实时处理引擎Flink和数据湖Apache Iceberg两种技术,来解决业务数据实时入湖相关的问题. 0 ...

  9. 基于 Flink + Hive 构建流批一体准实时数仓

    简介: 想要搭建流式链路就必须得抛弃现有的 Hive 数仓吗?并不是,借助 Flink 可以实现已有的 Hive 离线数仓准实时化.本文整理自 Apache Flink Committer.阿里巴巴技 ...

  10. php怎么把日志推送过去_实践 | 基于Flink的用户行为日志分析系统

    用户行为日志分析是实时数据处理很常见的一个应用场景,比如常见的PV.UV统计.本文将基于Flink从0到1构建一个用户行为日志分析系统,包括架构设计与代码实现.本文分享将完整呈现日志分析系统的数据处理 ...

最新文章

  1. C# Sato CL4NX打印机发送SBPL指令打印表面信息、RFID芯片数据写入
  2. 报错笔记:linux 命令行中的print输出内容无法重定向到文件中
  3. 学习笔记Hadoop(十四)—— MapReduce开发入门(2)—— MapReduce API介绍、MapReduce实例
  4. 致远表单代办状态删除
  5. iframe 页面刷新
  6. Java多线程(五)之BlockingQueue深入分析
  7. npm run dev (明明有.vue文件),却报错 cannot GET
  8. mysql 触发器 sql日志_触发器实现记录操作表的日志
  9. SBT管理java项目
  10. 山东大学计算机专业毕业后,儿子被山东大学数学系录取,毕业之后的前景将会如何...
  11. Vue进阶(贰零陆):Vue 培训课件
  12. 用Redis客户端工具连接Redis
  13. 2022年中国研究生数学建模竞赛
  14. 软考高级系统架构设计师你想知道的全在这
  15. mysql password_expired
  16. 黄太吉如何把煎饼卖到4000万估值
  17. maka做出好设计_MAKA下载-MAKA做出好设计最新版v4.15.1下载
  18. 人啊,就要对自己狠一点!
  19. Win7系统截图小工具
  20. 穷人思维和富人思维的根本区别在于底层系统不一样

热门文章

  1. NVIDIA SPADE 风景合成
  2. 基于JAVA图书商城购物系统的设计与实现
  3. 前端vue点击切换(黑夜/白天模式)主题最新(源码)
  4. Hive 异常,长期更新帖
  5. 实现外网Ping通WSL(网卡桥接方式实现)
  6. Idea中发布JAR包到中央仓库报错问题处理 unable to find valid certification path to requested target
  7. 谷歌浏览器打不开的解决方法
  8. VUCA时代的领导力开发
  9. 一个简单的多线程实现
  10. mysql数据库行列矩阵调换位置(行与列调换)