先引用一段官方output clickhouse插件中,对分布式表的说明

官方文档地址:https://interestinglab.github.io/seatunnel-docs/#/zh-cn/v1/configuration/output-plugins/Clickhouse

分布式表配置ClickHouse {host = "localhost:8123"database = "nginx"table = "access_msg"cluster = "no_replica_cluster"fields = ["date", "datetime", "hostname", "http_code", "data_size", "ua", "request_time"]
}
根据提供的cluster名称,会从system.clusters表里面获取当前table实际分布在那些节点上。单spark partition的数据会根据随机策略选择某一个ClickHouse节点执行具体的写入操作

从文字说明上可以得知,waterdrop实际上是写的本地表,数据的分配策略是随机

下面来实际测试:

在测试之前,请确保你的clickhouse分布式配置以及完成。可以参考:clickhouse集群模式配置_cakecc2008的专栏-CSDN博客

1、创建表:

-- 创建本地表,在所有节点中都需要执行
DROP TABLE IF EXISTS dw_local.dist_test;
CREATE TABLE  dw_local.dist_test(id String    COMMENT 'id' ,user_name String    COMMENT '用户姓名'
)
engine = MergeTree
primary key (id)
order by  (id)
;truncate table dw_local.dist_test;
-- 创建分布式表
DROP TABLE IF EXISTS dw.dist_test;
CREATE TABLE  dw.dist_test(id String    COMMENT 'id' ,user_name String    COMMENT '用户姓名' )
ENGINE = Distributed(dw_cluster, dw_local, dist_test);select * from  dw_local.dist_test t  ;
select * from  dw.dist_test t  ;

2、准备数据:

vi /tmp/dist_test.csv
id,user_name
1,zhangsan
2,lisi
3,wangwu
4,lili
5,lucy
6,poli
7,lilei
8,hanmeimei
9,liudehua
10,wuyanzu

3、waterdrop配置:

vi /tmp/dist_test.conf

spark {spark.app.name = "Waterdrop"spark.executor.instances = 1spark.executor.cores = 1spark.executor.memory = "1g"spark.sql.catalogImplementation = "hive"
}
input {file {path = "file:///tmp/dist_test.csv"format = "csv"options.header = "true"result_table_name = "dist_test"}
}
filter {repartition {"注释":"对数进进行重新分区。由于测试数据很少,如果不repartition,数据就会都进入同一个节点,后面源码分析的时候会提到"num_partitions = 5}
}
output {clickhouse {host = "10.1.99.191:8123""注释":"因为waterdrop是写本地表,所以这里database需要配置为本地表对于的库名"database = "dw_local"table = "dist_test"cluster = "dw_cluster"username = "user"password = "password"}
}

4、执行导入:

bin/start-waterdrop.sh --master local[1] --deploy-mode client --config /tmp/dist_test.conf

5、查询数据:

-- 节点1
select * from  dw_local.dist_test t  ;Query id: ff2dfdb8-1d58-413a-8fe6-f17992630d1a┌─id─┬─user_name─┐
│ 8  │ hanmeimei │
│ 9  │ liudehua  │
└────┴───────────┘
┌─id─┬─user_name─┐
│ 10 │ wuyanzu   │
│ 5  │ lucy      │
└────┴───────────┘
┌─id─┬─user_name─┐
│ 4  │ lili      │
│ 7  │ lilei     │
└────┴───────────┘
┌─id─┬─user_name─┐
│ 1  │ zhangsan  │
│ 2  │ lisi      │
└────┴───────────┘-- 节点2
select * from  dw_local.dist_test t  ;Query id: ed9ca714-d301-4691-b1bd-7fbf7f34be07┌─id─┬─user_name─┐
│ 3  │ wangwu    │
│ 6  │ poli      │
└────┴───────────┘

很可能你的测试结果和我的不一样,因为它是随机策略。下面从源码层面分析下Clickhouse output插件的集群分配策略 。

6、源码分析:

版本1.5.1

从官方文档中我们可以知道:

Output插件调用结构与Filter插件相似。在调用时会先执行checkConfig方法核对调用插件时传入的参数是否正确,然后调用prepare方法配置参数的缺省值以及初始化类的成员变量,最后调用process方法将 Dataset[Row] 格式数据输出到外部数据源。

所以我们重点就看2个方法prepare、process。相关的分析已经写在注释中

package io.github.interestinglab.waterdrop.output.batchclass Clickhouse extends BaseOutput { override def prepare(spark: SparkSession): Unit = {this.jdbcLink = String.format("jdbc:clickhouse://%s/%s", config.getString("host"), config.getString("database"))val balanced: BalancedClickhouseDataSource = new BalancedClickhouseDataSource(this.jdbcLink, properties)val conn = balanced.getConnection.asInstanceOf[ClickHouseConnectionImpl]this.table = config.getString("table")this.tableSchema = getClickHouseSchema(conn, table)if (this.config.hasPath("fields")) {this.fields = config.getStringList("fields")val (flag, msg) = acceptedClickHouseSchema()if (!flag) {throw new ConfigRuntimeException(msg)}}val defaultConfig = ConfigFactory.parseMap(Map("bulk_size" -> 20000,// "retry_codes" -> util.Arrays.asList(ClickHouseErrorCode.NETWORK_ERROR.code),"retry_codes" -> util.Arrays.asList(),"retry" -> 1))if (config.hasPath("cluster")) {              //检查配置文件中是否存在cluster参数this.cluster = config.getString("cluster")this.clusterInfo = getClickHouseClusterInfo(conn, cluster)  //从数据库中获取集群信息,后面在process方法中用到,clusterInfo其实是一个数组if (this.clusterInfo.size == 0) {val errorInfo = s"cloud not find cluster config in system.clusters, config cluster = $cluster"logError(errorInfo)throw new RuntimeException(errorInfo)}logInfo(s"get [$cluster] config from system.clusters, the replica info is [$clusterInfo].")}config = config.withFallback(defaultConfig)retryCodes = config.getIntList("retry_codes")super.prepare(spark)}override def process(df: Dataset[Row]): Unit = {val dfFields = df.schema.fieldNamesval bulkSize = config.getInt("bulk_size")val retry = config.getInt("retry")if (!config.hasPath("fields")) {fields = dfFields.toList}this.initSQL = initPrepareSQL()logInfo(this.initSQL)df.foreachPartition { iter =>         //这里使用Dataset的foreachPartition变量分区,所以所谓的随机,是按分区随机。如果只有一个分区,那么数据就只会进入一个shardvar jdbcUrl = this.jdbcLinkif (this.clusterInfo != null && this.clusterInfo.size > 0) {                 //如果clusterInfo中有数据,就是集群模式//using random policy to select shard when insert dataval randomShard = (Math.random() * this.clusterInfo.size).asInstanceOf[Int]   //随机策略的核心代码,生成一个0~clusterInfo.size的随机数val shardInfo = this.clusterInfo.get(randomShard)                             //跟进上面的随机数,获取其中一个shardval host = shardInfo._4                                                       //从shard中获取host地址,其他信息使用的还是配置文件中的参数val port = getJDBCPort(this.jdbcLink)val database = config.getString("database")                                    //数据库名也是从配置文件中获取,所以配置文件中需要配置本地表对应的库名jdbcUrl = s"jdbc:clickhouse://$host:$port/$database"                          //重新对jdbcUrl赋值,其实主要就是hostlogInfo(s"cluster mode, select shard index [$randomShard] to insert data, the jdbc url is [$jdbcUrl].")} else {logInfo(s"single mode, the jdbc url is [$jdbcUrl].")}val executorBalanced = new BalancedClickhouseDataSource(jdbcUrl, this.properties)val executorConn = executorBalanced.getConnection.asInstanceOf[ClickHouseConnectionImpl]val statement = executorConn.createClickHousePreparedStatement(this.initSQL, ResultSet.TYPE_FORWARD_ONLY)var length = 0while (iter.hasNext) {                                                          //添加数据到缓冲区val row = iter.next()length += 1renderStatement(fields, row, dfFields, statement)statement.addBatch()if (length >= bulkSize) {                                                     //如果缓冲区大小大于等于阈值(默认20000)则执行入库execute(statement, retry)length = 0}}execute(statement, retry)}}
}

7、总结

  1. clickhouse output插件写分布式表的时候,是直接写的本地表,性能上没有什么大大问题
  2. shard的分配策略是随机,核心代码:(Math.random() * this.clusterInfo.size).asInstanceOf[Int]。具体来说是按分区随机,即如果有N个分区,每个分区都会随机获取一次shard,同一个分区必定进入同一个shard。
  3. 随机策略导致了两个缺陷:

1)数据分布不均,笔者测试5000万的数据,2个节点,偏差可达到16%;

2)无法指定或预知数据进入哪一个shard,导致后续如果需要join或group时,效率不高。

需要注意的是,在2.x版本没有分布式表写入功能,可能也是基于以上两点原因。

8、思考

由于随机策略在实际应用中并不好用,那么如何解决这个问题呢?

1、修改源代码,增加hash策略,可指定字段进行hash

2、在不修改源代码的情况下,如何实现分布式表的本地写入?

waterdrop1.x导入clickhouse分布式表-默认方式相关推荐

  1. waterdrop1.x导入clickhouse分布式表-修改源码

    接上一篇,使用fiter+sql方式进行分布式写表,存在效率低的问题,现在尝试从源码入手,制定clickhouse的分布式表本地写入方案 编译好的class文件: https://download.c ...

  2. waterdrop1.x导入clickhouse分布式表-fitersql

    接上一篇,最后留下的两个问题, 针对问题2:在不修改源代码的情况下,如何实现分布式表的本地hash方式写入? 现在做一些尝试和验证. 思路: waterdrop是可以进行多数据流程处理的,官方说明文档 ...

  3. ClickHouse 分布式表创建细节

    ClickHouse 分布式表创建细节 记录一次创建分布式表的过程. 背景 ClickHouse服务器数量:10 需创建本地表(local)与分布式表 问题发现 创建本地表的过程中未出现问题,一切正常 ...

  4. clickhouse分布式表调研

    clickhouse分区表调研 文章目录 clickhouse分区表调研 1.搭建本地环境 1.1.搜索镜像是否存在 1.2.下载镜像 1.3.运行容器 1.4.修改密码 1.4.1.进入容器 1.4 ...

  5. clickhouse 分布式表

    分布式表一般用来查询,实际数据写入还是在本地式表 在操作分布式表之前: 1 连接到tutorial数据库. 2 在MergeTree 引擎上创建hits_v1表,该表将位于所有集群主机上:(ON CL ...

  6. 深入理解ClickHouse-本地表和分布式表

    在集群的每个机器上面建立本地表 这里需要谨记,在进行下面的操作前(使用ReplicatedMergeTree表引擎),必须保证集群配置中internal_replication=true且配置了zoo ...

  7. clickhouse删除表的问题

    文章目录 前言 测试 解决办法 前言 在日常使用clickhouse的时候,肯定会遇到删除表的操作,删除表的命令:DROP TABLE IF EXISTS test.test.有时候删除后表又想马上重 ...

  8. Clickhouse Distributed分布式表引擎的基本介绍和使用说明

    目录 1. 分布式的本地表 1.1 分布式的创建本地表 1.2 分布式的更改本地表表名 1.3 分布式的删除本地表 2. Distributed表 2.1 创建Distributed表 2.2 删除分 ...

  9. ClickHouse Replicated*MergeTree复制表原理、Distributed分布式表原理

    一.Replicated*MergeTree复制表原理 复制表通过zookeeper实现,其实就是通过zookeeper进行统一命名服务,并不依赖config.xml的remote_servers配置 ...

最新文章

  1. Android微信智能心跳方案 Android微信智能心跳方案
  2. 机器学习梯度下降法举例
  3. Windows 服务全攻略(1)
  4. rf调用的python函数报错_Robot Framework(15)- 扩展关键字
  5. c# u盘使用记录_C# 系统应用之通过注册表获取USB使用记录(一)
  6. 使用决策树算法对Iris数据构建决策树
  7. 武林外传服务器时间修改,浅谈武林外传关于2021年4月29日大合区
  8. 记忆力训练软件测试自学,[记忆力训练软件]记忆力训练软件有哪些?
  9. CSS复合选择器---后代选择器、子选择器、并集选择器、伪类选择器
  10. MongoDB可视化工具 Studio 3T
  11. db2iupgrade失败:DBI1205E One or more local databases cannot be upgraded
  12. 《白帽子讲Web安全 》 随手记(一)
  13. 单片机笔记五:改进无源蜂鸣片驱动电路
  14. VUE element-ui之form表单自定义验证11位手机号码(封装验证规则)
  15. 儿童用计算机吗,儿童如何正确使用电脑
  16. 立创开源 BGA162芯片开发
  17. js使用基础总结(简单封装,事件,foreach 原生js写法)
  18. [龙讯2号]我用带有胡伟武签名的龙芯电脑
  19. 区块链:对链式结构型 PoS 系统的 “虚假权益” 攻击
  20. android h5调用百度地图,h5页面如何调用百度地图获取当前位置(代码)

热门文章

  1. 谷歌浏览器版本大全地址
  2. 推荐一篇70年前的雄文
  3. Origin添加列的方法
  4. 自学深度学习 (一)机器学习基础1
  5. 夯实安全“三大体系”建设,腾讯云打造安全可靠的云上高速公路
  6. 开设计算机金融专业的学校,开设金融数学专业的大学
  7. 对“基于复杂网络的机器学习”的理解(机器学习、复杂网络、人工智能)
  8. 解决Lombok版本过低导致的编译出错问题(You aren‘t using a compiler supported by lombok)
  9. 微信小程序点赞功能,用 MySQL 还是 Redis ?
  10. 绘制 Logistic 映射分叉图