waterdrop1.x导入clickhouse分布式表-默认方式
先引用一段官方output clickhouse插件中,对分布式表的说明
官方文档地址:https://interestinglab.github.io/seatunnel-docs/#/zh-cn/v1/configuration/output-plugins/Clickhouse
分布式表配置ClickHouse {host = "localhost:8123"database = "nginx"table = "access_msg"cluster = "no_replica_cluster"fields = ["date", "datetime", "hostname", "http_code", "data_size", "ua", "request_time"]
}
根据提供的cluster名称,会从system.clusters表里面获取当前table实际分布在那些节点上。单spark partition的数据会根据随机策略选择某一个ClickHouse节点执行具体的写入操作
从文字说明上可以得知,waterdrop实际上是写的本地表,数据的分配策略是随机
下面来实际测试:
在测试之前,请确保你的clickhouse分布式配置以及完成。可以参考:clickhouse集群模式配置_cakecc2008的专栏-CSDN博客
1、创建表:
-- 创建本地表,在所有节点中都需要执行
DROP TABLE IF EXISTS dw_local.dist_test;
CREATE TABLE dw_local.dist_test(id String COMMENT 'id' ,user_name String COMMENT '用户姓名'
)
engine = MergeTree
primary key (id)
order by (id)
;truncate table dw_local.dist_test;
-- 创建分布式表
DROP TABLE IF EXISTS dw.dist_test;
CREATE TABLE dw.dist_test(id String COMMENT 'id' ,user_name String COMMENT '用户姓名' )
ENGINE = Distributed(dw_cluster, dw_local, dist_test);select * from dw_local.dist_test t ;
select * from dw.dist_test t ;
2、准备数据:
vi /tmp/dist_test.csv
id,user_name
1,zhangsan
2,lisi
3,wangwu
4,lili
5,lucy
6,poli
7,lilei
8,hanmeimei
9,liudehua
10,wuyanzu
3、waterdrop配置:
vi /tmp/dist_test.conf
spark {spark.app.name = "Waterdrop"spark.executor.instances = 1spark.executor.cores = 1spark.executor.memory = "1g"spark.sql.catalogImplementation = "hive"
}
input {file {path = "file:///tmp/dist_test.csv"format = "csv"options.header = "true"result_table_name = "dist_test"}
}
filter {repartition {"注释":"对数进进行重新分区。由于测试数据很少,如果不repartition,数据就会都进入同一个节点,后面源码分析的时候会提到"num_partitions = 5}
}
output {clickhouse {host = "10.1.99.191:8123""注释":"因为waterdrop是写本地表,所以这里database需要配置为本地表对于的库名"database = "dw_local"table = "dist_test"cluster = "dw_cluster"username = "user"password = "password"}
}
4、执行导入:
bin/start-waterdrop.sh --master local[1] --deploy-mode client --config /tmp/dist_test.conf
5、查询数据:
-- 节点1
select * from dw_local.dist_test t ;Query id: ff2dfdb8-1d58-413a-8fe6-f17992630d1a┌─id─┬─user_name─┐
│ 8 │ hanmeimei │
│ 9 │ liudehua │
└────┴───────────┘
┌─id─┬─user_name─┐
│ 10 │ wuyanzu │
│ 5 │ lucy │
└────┴───────────┘
┌─id─┬─user_name─┐
│ 4 │ lili │
│ 7 │ lilei │
└────┴───────────┘
┌─id─┬─user_name─┐
│ 1 │ zhangsan │
│ 2 │ lisi │
└────┴───────────┘-- 节点2
select * from dw_local.dist_test t ;Query id: ed9ca714-d301-4691-b1bd-7fbf7f34be07┌─id─┬─user_name─┐
│ 3 │ wangwu │
│ 6 │ poli │
└────┴───────────┘
很可能你的测试结果和我的不一样,因为它是随机策略。下面从源码层面分析下Clickhouse output插件的集群分配策略 。
6、源码分析:
版本1.5.1
从官方文档中我们可以知道:
Output插件调用结构与Filter插件相似。在调用时会先执行checkConfig方法核对调用插件时传入的参数是否正确,然后调用prepare方法配置参数的缺省值以及初始化类的成员变量,最后调用process方法将 Dataset[Row] 格式数据输出到外部数据源。
所以我们重点就看2个方法prepare、process。相关的分析已经写在注释中
package io.github.interestinglab.waterdrop.output.batchclass Clickhouse extends BaseOutput { override def prepare(spark: SparkSession): Unit = {this.jdbcLink = String.format("jdbc:clickhouse://%s/%s", config.getString("host"), config.getString("database"))val balanced: BalancedClickhouseDataSource = new BalancedClickhouseDataSource(this.jdbcLink, properties)val conn = balanced.getConnection.asInstanceOf[ClickHouseConnectionImpl]this.table = config.getString("table")this.tableSchema = getClickHouseSchema(conn, table)if (this.config.hasPath("fields")) {this.fields = config.getStringList("fields")val (flag, msg) = acceptedClickHouseSchema()if (!flag) {throw new ConfigRuntimeException(msg)}}val defaultConfig = ConfigFactory.parseMap(Map("bulk_size" -> 20000,// "retry_codes" -> util.Arrays.asList(ClickHouseErrorCode.NETWORK_ERROR.code),"retry_codes" -> util.Arrays.asList(),"retry" -> 1))if (config.hasPath("cluster")) { //检查配置文件中是否存在cluster参数this.cluster = config.getString("cluster")this.clusterInfo = getClickHouseClusterInfo(conn, cluster) //从数据库中获取集群信息,后面在process方法中用到,clusterInfo其实是一个数组if (this.clusterInfo.size == 0) {val errorInfo = s"cloud not find cluster config in system.clusters, config cluster = $cluster"logError(errorInfo)throw new RuntimeException(errorInfo)}logInfo(s"get [$cluster] config from system.clusters, the replica info is [$clusterInfo].")}config = config.withFallback(defaultConfig)retryCodes = config.getIntList("retry_codes")super.prepare(spark)}override def process(df: Dataset[Row]): Unit = {val dfFields = df.schema.fieldNamesval bulkSize = config.getInt("bulk_size")val retry = config.getInt("retry")if (!config.hasPath("fields")) {fields = dfFields.toList}this.initSQL = initPrepareSQL()logInfo(this.initSQL)df.foreachPartition { iter => //这里使用Dataset的foreachPartition变量分区,所以所谓的随机,是按分区随机。如果只有一个分区,那么数据就只会进入一个shardvar jdbcUrl = this.jdbcLinkif (this.clusterInfo != null && this.clusterInfo.size > 0) { //如果clusterInfo中有数据,就是集群模式//using random policy to select shard when insert dataval randomShard = (Math.random() * this.clusterInfo.size).asInstanceOf[Int] //随机策略的核心代码,生成一个0~clusterInfo.size的随机数val shardInfo = this.clusterInfo.get(randomShard) //跟进上面的随机数,获取其中一个shardval host = shardInfo._4 //从shard中获取host地址,其他信息使用的还是配置文件中的参数val port = getJDBCPort(this.jdbcLink)val database = config.getString("database") //数据库名也是从配置文件中获取,所以配置文件中需要配置本地表对应的库名jdbcUrl = s"jdbc:clickhouse://$host:$port/$database" //重新对jdbcUrl赋值,其实主要就是hostlogInfo(s"cluster mode, select shard index [$randomShard] to insert data, the jdbc url is [$jdbcUrl].")} else {logInfo(s"single mode, the jdbc url is [$jdbcUrl].")}val executorBalanced = new BalancedClickhouseDataSource(jdbcUrl, this.properties)val executorConn = executorBalanced.getConnection.asInstanceOf[ClickHouseConnectionImpl]val statement = executorConn.createClickHousePreparedStatement(this.initSQL, ResultSet.TYPE_FORWARD_ONLY)var length = 0while (iter.hasNext) { //添加数据到缓冲区val row = iter.next()length += 1renderStatement(fields, row, dfFields, statement)statement.addBatch()if (length >= bulkSize) { //如果缓冲区大小大于等于阈值(默认20000)则执行入库execute(statement, retry)length = 0}}execute(statement, retry)}}
}
7、总结
- clickhouse output插件写分布式表的时候,是直接写的本地表,性能上没有什么大大问题
- shard的分配策略是随机,核心代码:(Math.random() * this.clusterInfo.size).asInstanceOf[Int]。具体来说是按分区随机,即如果有N个分区,每个分区都会随机获取一次shard,同一个分区必定进入同一个shard。
- 随机策略导致了两个缺陷:
1)数据分布不均,笔者测试5000万的数据,2个节点,偏差可达到16%;
2)无法指定或预知数据进入哪一个shard,导致后续如果需要join或group时,效率不高。
需要注意的是,在2.x版本没有分布式表写入功能,可能也是基于以上两点原因。
8、思考
由于随机策略在实际应用中并不好用,那么如何解决这个问题呢?
1、修改源代码,增加hash策略,可指定字段进行hash
2、在不修改源代码的情况下,如何实现分布式表的本地写入?
waterdrop1.x导入clickhouse分布式表-默认方式相关推荐
- waterdrop1.x导入clickhouse分布式表-修改源码
接上一篇,使用fiter+sql方式进行分布式写表,存在效率低的问题,现在尝试从源码入手,制定clickhouse的分布式表本地写入方案 编译好的class文件: https://download.c ...
- waterdrop1.x导入clickhouse分布式表-fitersql
接上一篇,最后留下的两个问题, 针对问题2:在不修改源代码的情况下,如何实现分布式表的本地hash方式写入? 现在做一些尝试和验证. 思路: waterdrop是可以进行多数据流程处理的,官方说明文档 ...
- ClickHouse 分布式表创建细节
ClickHouse 分布式表创建细节 记录一次创建分布式表的过程. 背景 ClickHouse服务器数量:10 需创建本地表(local)与分布式表 问题发现 创建本地表的过程中未出现问题,一切正常 ...
- clickhouse分布式表调研
clickhouse分区表调研 文章目录 clickhouse分区表调研 1.搭建本地环境 1.1.搜索镜像是否存在 1.2.下载镜像 1.3.运行容器 1.4.修改密码 1.4.1.进入容器 1.4 ...
- clickhouse 分布式表
分布式表一般用来查询,实际数据写入还是在本地式表 在操作分布式表之前: 1 连接到tutorial数据库. 2 在MergeTree 引擎上创建hits_v1表,该表将位于所有集群主机上:(ON CL ...
- 深入理解ClickHouse-本地表和分布式表
在集群的每个机器上面建立本地表 这里需要谨记,在进行下面的操作前(使用ReplicatedMergeTree表引擎),必须保证集群配置中internal_replication=true且配置了zoo ...
- clickhouse删除表的问题
文章目录 前言 测试 解决办法 前言 在日常使用clickhouse的时候,肯定会遇到删除表的操作,删除表的命令:DROP TABLE IF EXISTS test.test.有时候删除后表又想马上重 ...
- Clickhouse Distributed分布式表引擎的基本介绍和使用说明
目录 1. 分布式的本地表 1.1 分布式的创建本地表 1.2 分布式的更改本地表表名 1.3 分布式的删除本地表 2. Distributed表 2.1 创建Distributed表 2.2 删除分 ...
- ClickHouse Replicated*MergeTree复制表原理、Distributed分布式表原理
一.Replicated*MergeTree复制表原理 复制表通过zookeeper实现,其实就是通过zookeeper进行统一命名服务,并不依赖config.xml的remote_servers配置 ...
最新文章
- Android微信智能心跳方案 Android微信智能心跳方案
- 机器学习梯度下降法举例
- Windows 服务全攻略(1)
- rf调用的python函数报错_Robot Framework(15)- 扩展关键字
- c# u盘使用记录_C# 系统应用之通过注册表获取USB使用记录(一)
- 使用决策树算法对Iris数据构建决策树
- 武林外传服务器时间修改,浅谈武林外传关于2021年4月29日大合区
- 记忆力训练软件测试自学,[记忆力训练软件]记忆力训练软件有哪些?
- CSS复合选择器---后代选择器、子选择器、并集选择器、伪类选择器
- MongoDB可视化工具 Studio 3T
- db2iupgrade失败:DBI1205E One or more local databases cannot be upgraded
- 《白帽子讲Web安全 》 随手记(一)
- 单片机笔记五:改进无源蜂鸣片驱动电路
- VUE element-ui之form表单自定义验证11位手机号码(封装验证规则)
- 儿童用计算机吗,儿童如何正确使用电脑
- 立创开源 BGA162芯片开发
- js使用基础总结(简单封装,事件,foreach 原生js写法)
- [龙讯2号]我用带有胡伟武签名的龙芯电脑
- 区块链:对链式结构型 PoS 系统的 “虚假权益” 攻击
- android h5调用百度地图,h5页面如何调用百度地图获取当前位置(代码)
热门文章
- 谷歌浏览器版本大全地址
- 推荐一篇70年前的雄文
- Origin添加列的方法
- 自学深度学习 (一)机器学习基础1
- 夯实安全“三大体系”建设,腾讯云打造安全可靠的云上高速公路
- 开设计算机金融专业的学校,开设金融数学专业的大学
- 对“基于复杂网络的机器学习”的理解(机器学习、复杂网络、人工智能)
- 解决Lombok版本过低导致的编译出错问题(You aren‘t using a compiler supported by lombok)
- 微信小程序点赞功能,用 MySQL 还是 Redis ?
- 绘制 Logistic 映射分叉图