接上一篇,使用fiter+sql方式进行分布式写表,存在效率低的问题,现在尝试从源码入手,制定clickhouse的分布式表本地写入方案

编译好的class文件:

https://download.csdn.net/download/cakecc2008/81878962https://download.csdn.net/download/cakecc2008/81878962

1、目标:

实现按行hash和按行随机两张写表模式,兼容之前的单机模式和分区随机模式

2、思路:

新增2个参数

write_mode:写表方式 hash_fields:hash字段,支持多个字段,逗号分隔

伪码:

如果(【cluster】参数有效):
​   如果(【write_mode】有效):
​       如果(write_mode=‘rowhash’,且【hash_fields】有效):
​           行hash模式
​       否则如果(write_mode=‘rowrandom’)
​           行随机模式
​       否则
​           分区随机模式,默认
​   否则:
​       分区随机模式,默认
​否则:
​   单机模式

### 3、源码修改:

只需修改 io.github.interestinglab.waterdrop.output.batch.Clickhouse 类中的process方法即可,下面是Clickhouse 修改后的完整代码(版本1.5.1)

package io.github.interestinglab.waterdrop.output.batchimport java.text.SimpleDateFormat
import java.util
import java.util.Properties
import java.math.BigDecimal
import java.sql.ResultSetimport io.github.interestinglab.waterdrop.config.{Config, ConfigFactory}
import io.github.interestinglab.waterdrop.apis.BaseOutput
import io.github.interestinglab.waterdrop.config.ConfigRuntimeException
import io.github.interestinglab.waterdrop.config.TypesafeConfigUtils
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import ru.yandex.clickhouse.except.{ClickHouseException, ClickHouseUnknownException}
import ru.yandex.clickhouse.settings.ClickHouseProperties
import ru.yandex.clickhouse.{BalancedClickhouseDataSource,ClickHouseConnectionImpl,ClickHousePreparedStatement,ClickhouseJdbcUrlParser
}import scala.collection.JavaConversions._
import scala.collection.immutable.HashMap
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.WrappedArray
import scala.util.matching.Regex
import scala.util.{Failure, Success, Try}class Clickhouse extends BaseOutput {var tableSchema: Map[String, String] = new HashMap[String, String]()var jdbcLink: String = _var initSQL: String = _var table: String = _var fields: java.util.List[String] = _var cluster: String = _//contians cluster basic infovar clusterInfo: ArrayBuffer[(String, Int, Int, String)] = _var retryCodes: java.util.List[Integer] = _var config: Config = ConfigFactory.empty()val clickhousePrefix = "clickhouse."val properties: Properties = new Properties()var writeMode:String = "single"var hashFieldsArray:Array[String] = _/*** Set Config.* */override def setConfig(config: Config): Unit = {this.config = config}/*** Get Config.* */override def getConfig(): Config = {this.config}override def checkConfig(): (Boolean, String) = {val requiredOptions = List("host", "table", "database")val nonExistsOptions = requiredOptions.map(optionName => (optionName, config.hasPath(optionName))).filter { p =>val (optionName, exists) = p!exists}if (TypesafeConfigUtils.hasSubConfig(config, clickhousePrefix)) {val clickhouseConfig = TypesafeConfigUtils.extractSubConfig(config, clickhousePrefix, false)clickhouseConfig.entrySet().foreach(entry => {val key = entry.getKeyval value = String.valueOf(entry.getValue.unwrapped())properties.put(key, value)})}if (nonExistsOptions.nonEmpty) {(false,"please specify " + nonExistsOptions.map { option =>val (name, exists) = option"[" + name + "]"}.mkString(", ") + " as non-empty string")}val hasUserName = config.hasPath("username")val hasPassword = config.hasPath("password")if (hasUserName && !hasPassword || !hasUserName && hasPassword) {(false, "please specify username and password at the same time")}if (hasPassword) {properties.put("user", config.getString("username"))properties.put("password", config.getString("password"))}(true, "")}override def prepare(spark: SparkSession): Unit = {this.jdbcLink = String.format("jdbc:clickhouse://%s/%s", config.getString("host"), config.getString("database"))val balanced: BalancedClickhouseDataSource = new BalancedClickhouseDataSource(this.jdbcLink, properties)val conn = balanced.getConnection.asInstanceOf[ClickHouseConnectionImpl]this.table = config.getString("table")this.tableSchema = getClickHouseSchema(conn, table)if (this.config.hasPath("fields")) {this.fields = config.getStringList("fields")val (flag, msg) = acceptedClickHouseSchema()if (!flag) {throw new ConfigRuntimeException(msg)}}val defaultConfig = ConfigFactory.parseMap(Map("bulk_size" -> 20000,// "retry_codes" -> util.Arrays.asList(ClickHouseErrorCode.NETWORK_ERROR.code),"retry_codes" -> util.Arrays.asList(),"retry" -> 1))//检查配置文件中是否存在cluster参数if (config.hasPath("cluster")) {this.cluster = config.getString("cluster")//从数据库中获取集群信息,后面在process方法中用到,clusterInfo其实是一个数组this.clusterInfo = getClickHouseClusterInfo(conn, cluster)if (this.clusterInfo.size == 0) {val errorInfo = s"cloud not find cluster config in system.clusters, config cluster = $cluster"logError(errorInfo)throw new RuntimeException(errorInfo)}logInfo(s"get [$cluster] config from system.clusters, the replica info is [$clusterInfo].")}config = config.withFallback(defaultConfig)retryCodes = config.getIntList("retry_codes")super.prepare(spark)}override def process(df: Dataset[Row]): Unit = {val dfFields = df.schema.fieldNamesval bulkSize = config.getInt("bulk_size")val retry = config.getInt("retry")if (!config.hasPath("fields")) {fields = dfFields.toList}this.initSQL = initPrepareSQL()logInfo(this.initSQL)//如果clusterInfo中有数据,就是集群模式if (this.clusterInfo != null && this.clusterInfo.size > 0) {var writeMode="single"var hashFields="" //支持多个字段,逗号分隔//获取 write_mode、和hash_fields两个参数if(config.hasPath("write_mode")) {writeMode=config.getString("write_mode")}if(config.hasPath("hash_fields")) {hashFields=config.getString("hash_fields")hashFieldsArray=hashFields.split(",")}//写模式为hash,且hash_fields参数不为空,且hash_fields字段都存在于流中,则为使用hash方式,否则还是使用随机方式logInfo(s"conf's writeMode: $writeMode")logInfo(s"conf's HashFields: $hashFields")//logDebug(s"dfFields.contains(HashField):"+dfFields.contains(HashField))if("rowhash".equals(writeMode) && !"".equals(hashFields) && hashFields != null && checkFields(hashFields,dfFields)) {this.writeMode="rowHash"logInfo(s"cluster rowHash  mode,  shard index select in iteator")}else if("rowrandom".equals(writeMode)) {this.writeMode="rowRandom"logInfo(s"cluster rowRandom  mode,  shard index select in iteator")}else{this.writeMode="partRandom"logInfo(s"cluster partRandom  mode,  shard index select in foreachPartition.  the jdbc url is ")}}else{this.writeMode="single"logInfo(s"single mode, the jdbc url create in foreachPartition.")}logDebug(s"ready foreachPartition...")df.foreachPartition { iter =>var jdbcUrl = this.jdbcLinkvar statementArray = new Array[ClickHousePreparedStatement](1)  //先初始化1大小的数组,如果是集群模式再修改数组var lengthArray = new Array[Int](1)var shardIndex=0logDebug(s"this.writeMode:" + this.writeMode)if ("partRandom".equals(this.writeMode) || "rowHash".equals(this.writeMode) || "rowRandom".equals(this.writeMode)){statementArray = new Array[ClickHousePreparedStatement](this.clusterInfo.size)lengthArray = new Array[Int](this.clusterInfo.size)//初始化statementlogInfo("this.clusterInfo.size is " + this.clusterInfo.size)for(i <- 0 until this.clusterInfo.size){logInfo(s"create connect for cluster[$i]")val shardInfo = this.clusterInfo.get(i)val host = shardInfo._4val port = getJDBCPort(this.jdbcLink)val database = config.getString("database") //数据库名也是从配置文件中获取,使用配置文件需要配置本地表对于的库名jdbcUrl = s"jdbc:clickhouse://$host:$port/$database" //重新对jdbcUrl赋值,其实主要就是hoststatementArray(i) = new BalancedClickhouseDataSource(jdbcUrl, this.properties).getConnection.asInstanceOf[ClickHouseConnectionImpl].createClickHousePreparedStatement(this.initSQL, ResultSet.TYPE_FORWARD_ONLY)lengthArray(i) = 0}if ("partRandom".equals(this.writeMode)){shardIndex = (Math.random() * this.clusterInfo.size).asInstanceOf[Int]  //分区随机的核心代码,生成一个0~clusterInfo.size的随机数logInfo(s"cluster partRandom  mode,  select shard index [$shardIndex] to insert data.  the jdbc url is "+ statementArray(shardIndex).getConnection.getMetaData.getURL)}}else {statementArray(0)=new BalancedClickhouseDataSource(jdbcUrl, this.properties).getConnection.asInstanceOf[ClickHouseConnectionImpl].createClickHousePreparedStatement(this.initSQL, ResultSet.TYPE_FORWARD_ONLY)shardIndex=0logInfo(s"single mode, the jdbc url is [$jdbcUrl].")}while (iter.hasNext) {val row = iter.next()if("rowHash".equals(this.writeMode)){val hashValue=getFieldsRowHash(hashFieldsArray,row)shardIndex= hashValue.abs % this.clusterInfo.size //根据HashField,计算hash值logTrace(s"cluster rowHash  mode,  select shard index [$shardIndex] to insert data.  the jdbc url is "+ statementArray(shardIndex).getConnection.getMetaData.getURL)}else if("rowRandom".equals(this.writeMode)){shardIndex= (Math.random() * this.clusterInfo.size).asInstanceOf[Int] //按行随机logTrace(s"cluster rowRandom  mode,  select shard index [$shardIndex] to insert data.  the jdbc url is "+ statementArray(shardIndex).getConnection.getMetaData.getURL)}//如果是partRandom模式,shardIndex为遍历iter前生成的随机数//如果是single模式,shardIndex为初始化值:0lengthArray(shardIndex) += 1renderStatement(fields, row, dfFields, statementArray(shardIndex)) //fields:配置文件中的字段,row:数据行,dfFields:dataFrame中的字段statementArray(shardIndex).addBatch()if (lengthArray(shardIndex) >= bulkSize) { //如果缓冲区大小大于等于阈值(默认20000)则执行入库execute(statementArray(shardIndex), retry) //lengthArray(shardIndex) = 0}}for (i <- 0 until statementArray.length){execute(statementArray(i), retry)lengthArray(i) = 0}}}private def checkFields(fields:String,dfFields: Array[String]): Boolean ={val fieldsArray=fields.split(",")checkFields(fieldsArray,dfFields)}private def checkFields(fields:Array[String],dfFields: Array[String]): Boolean ={for (field <- fields){if (!dfFields.contains(field)){return false}}true}private def getFieldsRowHash(fields:Array[String],row: Row): Int ={var rowHashCode:Int = 0for (field <- fields){val fieldHash=row.getAs(field).hashCode()rowHashCode = 31 * rowHashCode + fieldHash;}rowHashCode}private def getJDBCPort(jdbcUrl: String): Int = {val clickHouseProperties: ClickHouseProperties = ClickhouseJdbcUrlParser.parse(jdbcUrl, properties)clickHouseProperties.getPort}private def execute(statement: ClickHousePreparedStatement, retry: Int): Unit = {val res = Try(statement.executeBatch())res match {case Success(_) => {logInfo("Insert into ClickHouse succeed")statement.close()}case Failure(e: ClickHouseException) => {val errorCode = e.getErrorCodeif (retryCodes.contains(errorCode)) {logError("Insert into ClickHouse failed. Reason: ", e)if (retry > 0) {execute(statement, retry - 1)} else {logError("Insert into ClickHouse failed and retry failed, drop this bulk.")statement.close()}} else {throw e}}case Failure(e: ClickHouseUnknownException) => {statement.close()throw e}case Failure(e: Exception) => {throw e}}}private def getClickHouseSchema(conn: ClickHouseConnectionImpl, table: String): Map[String, String] = {val sql = s"desc $table"val resultSet = conn.createStatement.executeQuery(sql)var schema = new HashMap[String, String]()while (resultSet.next()) {schema += (resultSet.getString(1) -> resultSet.getString(2))}schema}private def getClickHouseClusterInfo(conn: ClickHouseConnectionImpl,cluster: String): ArrayBuffer[(String, Int, Int, String)] = {val sql =s"SELECT cluster, shard_num, shard_weight, host_address FROM system.clusters WHERE cluster = '$cluster' AND replica_num = 1"val resultSet = conn.createStatement.executeQuery(sql)val clusterInfo = ArrayBuffer[(String, Int, Int, String)]()while (resultSet.next()) {val shardWeight = resultSet.getInt("shard_weight")for (_ <- 1 to shardWeight) {val custerName = resultSet.getString("cluster")val shardNum = resultSet.getInt("shard_num")val hostAddress = resultSet.getString("host_address")val shardInfo = Tuple4(custerName, shardNum, shardWeight, hostAddress)clusterInfo += shardInfo}}clusterInfo}private def initPrepareSQL(): String = {val prepare = List.fill(fields.size)("?")val sql = String.format("insert into %s (%s) values (%s)",this.table,this.fields.map(a => s"`$a`").mkString(","),prepare.mkString(","))sql}private def acceptedClickHouseSchema(): (Boolean, String) = {val nonExistsFields = fields.map(field => (field, tableSchema.contains(field))).filter { case (_, exist) => !exist }if (nonExistsFields.nonEmpty) {(false,"field " + nonExistsFields.map { case (option) => "[" + option + "]" }.mkString(", ") + " not exist in table " + this.table)} else {val nonSupportedType = fields.map(field => (tableSchema(field), Clickhouse.supportOrNot(tableSchema(field)))).filter { case (_, exist) => !exist }if (nonSupportedType.nonEmpty) {(false,"clickHouse data type " + nonSupportedType.map { case (option) => "[" + option + "]" }.mkString(", ") + " not support in current version.")} else {(true, "")}}}private def renderDefaultStatement(index: Int, fieldType: String, statement: ClickHousePreparedStatement): Unit = {fieldType match {case "DateTime" | "Date" | "String" =>statement.setString(index + 1, Clickhouse.renderStringDefault(fieldType))case "Int8" | "UInt8" | "Int16" | "Int32" | "UInt32" | "UInt16" =>statement.setInt(index + 1, 0)case "UInt64" | "Int64" =>statement.setLong(index + 1, 0)case "Float32" => statement.setFloat(index + 1, 0)case "Float64" => statement.setDouble(index + 1, 0)case Clickhouse.lowCardinalityPattern(lowCardinalityType) =>renderDefaultStatement(index, lowCardinalityType, statement)case Clickhouse.arrayPattern(_) => statement.setArray(index + 1, List())case Clickhouse.nullablePattern(nullFieldType) => renderNullStatement(index, nullFieldType, statement)case _ => statement.setString(index + 1, "")}}private def renderNullStatement(index: Int, fieldType: String, statement: ClickHousePreparedStatement): Unit = {fieldType match {case "String" =>statement.setNull(index + 1, java.sql.Types.VARCHAR)case "DateTime" => statement.setNull(index + 1, java.sql.Types.DATE)case "Date" => statement.setNull(index + 1, java.sql.Types.TIME)case "Int8" | "UInt8" | "Int16" | "Int32" | "UInt32" | "UInt16" =>statement.setNull(index + 1, java.sql.Types.INTEGER)case "UInt64" | "Int64" =>statement.setNull(index + 1, java.sql.Types.BIGINT)case "Float32" => statement.setNull(index + 1, java.sql.Types.FLOAT)case "Float64" => statement.setNull(index + 1, java.sql.Types.DOUBLE)case "Array" => statement.setNull(index + 1, java.sql.Types.ARRAY)case Clickhouse.decimalPattern(_) => statement.setNull(index + 1, java.sql.Types.DECIMAL)}}private def renderBaseTypeStatement(index: Int,fieldIndex: Int,fieldType: String,item: Row,statement: ClickHousePreparedStatement): Unit = {fieldType match {case "DateTime" | "Date" | "String" =>statement.setString(index + 1, item.getAs[String](fieldIndex))case "Int8" | "UInt8" | "Int16" | "UInt16" | "Int32" =>statement.setInt(index + 1, item.getAs[Int](fieldIndex))case "UInt32" | "UInt64" | "Int64" =>statement.setLong(index + 1, item.getAs[Long](fieldIndex))case "Float32" => statement.setFloat(index + 1, item.getAs[Float](fieldIndex))case "Float64" => statement.setDouble(index + 1, item.getAs[Double](fieldIndex))case Clickhouse.arrayPattern(_) =>statement.setArray(index + 1, item.getAs[WrappedArray[AnyRef]](fieldIndex))case "Decimal" => statement.setBigDecimal(index + 1, item.getAs[BigDecimal](fieldIndex))case _ => statement.setString(index + 1, item.getAs[String](fieldIndex))}}private def renderStatementEntry(index: Int,fieldIndex: Int,fieldType: String,item: Row,statement: ClickHousePreparedStatement): Unit = {fieldType match {case "String" | "DateTime" | "Date" | Clickhouse.arrayPattern(_) =>renderBaseTypeStatement(index, fieldIndex, fieldType, item, statement)case Clickhouse.floatPattern(_) | Clickhouse.intPattern(_) | Clickhouse.uintPattern(_) =>renderBaseTypeStatement(index, fieldIndex, fieldType, item, statement)case Clickhouse.nullablePattern(dataType) =>renderStatementEntry(index, fieldIndex, dataType, item, statement)case Clickhouse.lowCardinalityPattern(dataType) =>renderBaseTypeStatement(index, fieldIndex, dataType, item, statement)case Clickhouse.decimalPattern(_) =>renderBaseTypeStatement(index, fieldIndex, "Decimal", item, statement)case _ => statement.setString(index + 1, item.getAs[String](fieldIndex))}}private def renderStatement(fields: util.List[String],item: Row,dsFields: Array[String],statement: ClickHousePreparedStatement): Unit = {for (i <- 0 until fields.size()) {val field = fields.get(i)val fieldType = tableSchema(field)if (dsFields.indexOf(field) == -1) {// specified field does not existed in row.renderDefaultStatement(i, fieldType, statement)} else {val fieldIndex = item.fieldIndex(field)if (item.isNullAt(fieldIndex)) {// specified field is Null in Row.renderDefaultStatement(i, fieldType, statement)} else {renderStatementEntry(i, fieldIndex, fieldType, item, statement)}}}}
}object Clickhouse {val arrayPattern: Regex = "(Array.*)".rval nullablePattern: Regex = "Nullable\\((.*)\\)".rval lowCardinalityPattern: Regex = "LowCardinality\\((.*)\\)".rval intPattern: Regex = "(Int.*)".rval uintPattern: Regex = "(UInt.*)".rval floatPattern: Regex = "(Float.*)".rval decimalPattern: Regex = "(Decimal.*)".r/*** Waterdrop support this clickhouse data type or not.** @param dataType ClickHouse Data Type* @return Boolean* */private[waterdrop] def supportOrNot(dataType: String): Boolean = {dataType match {case "Date" | "DateTime" | "String" =>truecase arrayPattern(_) | nullablePattern(_) | floatPattern(_) | intPattern(_) | uintPattern(_) =>truecase lowCardinalityPattern(_) =>truecase decimalPattern(_) =>truecase _ =>false}}private[waterdrop] def renderStringDefault(fieldType: String): String = {fieldType match {case "DateTime" =>val dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")dateFormat.format(System.currentTimeMillis())case "Date" =>val dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")dateFormat.format(System.currentTimeMillis())case "String" =>""}}
}

由于1.x使用的是sbt进行打包和管理依赖,这个sbt比较难搞,最好是能连接官网。

如果懒得编译打包,可以下载我编译好的class文件,直接替换jar包中的class文件,重新压缩成jar包使用即可

编译好的class文件

4、使用说明

cluster:

​    还是跟原版一样,配置集群名称,如果不配置,就是单机模式

write_mode:

​    2个有效值rowhash、rowrandom。rowhash:按行hash、rowrandom:按行随机、其他:分区随机模式(默认)

hash_fields:

​    用于获取数据流中的字段,生成hash值。如果write_mode为rowhash,就需要配置个参数,否则不需要配置。如果write_mode为rowhash,但未配置hash_fields或者数据集中获取不到这些字段值,就回落到分区随机模式。

5、测试

cluster write_mode hash_fields 最终写模式
未配置 - - 单机模式
配置错误 - - 单机模式
配置正确 未配置 - 分区随机模式
配置正确 配置错误 - 分区随机模式
配置正确 配置正确 未配置 分区随机模式
配置正确 配置正确 配置错误 分区随机模式
配置正确 配置正确 配置正确 行hash或者行随机模式

在测试之前,请确保你的clickhouse分布式配置以及完成。

5.1、准备数据:

vi /tmp/dist_test.csv
id,user_name
1,aa
2,bb
3,cc
4,dd
5,ee
6,ff
7,gg
8,hh
9,ii
10,jj

5.2、测试用例

用例id cluster write_mode hash_fields 预期 配置文件摘要 结果
1 未配置 - - 单机模式 clickhouse { host = "10.1.99.191:8123" database = "dw_local" table = "dist_test_01" username = "myuser" password = "mypassword" } 符合预期
2 xxx - - 报错 clickhouse { host = "10.1.99.191:8123" database = "dw_local" table = "dist_test_02" cluster = "xxx" username = "myuser" password = "mypassword" } 符合预期
3 dw_cluster 未配置 - 分区随机模式 clickhouse { host = "10.1.99.191:8123" database = "dw_local" table = "dist_test_03" cluster = "dw_cluster" username = "myuser" password = "mypassword" } 符合预期
4 dw_cluster xxx - 分区随机模式 clickhouse { host = "10.1.99.191:8123" database = "dw_local" table = "dist_test_04" cluster = "dw_cluster" write_mode = "xxx" username = "myuser" password = "mypassword" } 符合预期
5 dw_cluster rowrandom 未配置 行随机模式 clickhouse { host = "10.1.99.191:8123" database = "dw_local" table = "dist_test_05" cluster = "dw_cluster" write_mode = "rowrandom" username = "myuser" password = "mypassword" } 符合预期
6 dw_cluster rowrandom xxx 行随机模式 clickhouse { host = "10.1.99.191:8123" database = "dw_local" table = "dist_test_06" cluster = "dw_cluster" write_mode = "rowrandom" hash_fields="xxx" username = "myuser" password = "mypassword" } 符合预期
7 dw_cluster rowhash 未配置 分区随机模式 clickhouse { host = "10.1.99.191:8123" database = "dw_local" table = "dist_test_07" cluster = "dw_cluster" write_mode = "rowhash" username = "myuser" password = "mypassword" } 符合预期
8 dw_cluster rowhash xxx 分区随机模式 clickhouse { host = "10.1.99.191:8123" database = "dw_local" table = "dist_test_08" cluster = "dw_cluster" write_mode = "rowhash" hash_fields="xxx" username = "myuser" password = "mypassword" } 符合预期
9 dw_cluster rowhash user_name 行hash模式 clickhouse { host = "10.1.99.191:8123" database = "dw_local" table = "dist_test_09" cluster = "dw_cluster" write_mode = "rowhash" hash_fields="user_name" username = "myuser" password = "mypassword" } 符合预期
10 dw_cluster rowhash user_name,id 行hash模式 clickhouse { host = "10.1.99.191:8123" database = "dw_local" table = "dist_test_10" cluster = "dw_cluster" write_mode = "rowhash" hash_fields="user_name,id" username = "myuser" password = "mypassword" } 符合预期

5.3、创建表:

-- 创建本地表,在所有节点中都需要执行
DROP TABLE IF EXISTS dw_local.dist_test_01;
CREATE TABLE  dw_local.dist_test_01(id String,user_name String) engine = MergeTree primary key (id) order by  (id);DROP TABLE IF EXISTS dw_local.dist_test_02;
CREATE TABLE  dw_local.dist_test_02(id String,user_name String) engine = MergeTree primary key (id) order by  (id);DROP TABLE IF EXISTS dw_local.dist_test_03;
CREATE TABLE  dw_local.dist_test_03(id String,user_name String) engine = MergeTree primary key (id) order by  (id);DROP TABLE IF EXISTS dw_local.dist_test_04;
CREATE TABLE  dw_local.dist_test_04(id String,user_name String) engine = MergeTree primary key (id) order by  (id);DROP TABLE IF EXISTS dw_local.dist_test_05;
CREATE TABLE  dw_local.dist_test_05(id String,user_name String) engine = MergeTree primary key (id) order by  (id);DROP TABLE IF EXISTS dw_local.dist_test_06;
CREATE TABLE  dw_local.dist_test_06(id String,user_name String) engine = MergeTree primary key (id) order by  (id);DROP TABLE IF EXISTS dw_local.dist_test_07;
CREATE TABLE  dw_local.dist_test_07(id String,user_name String) engine = MergeTree primary key (id) order by  (id);DROP TABLE IF EXISTS dw_local.dist_test_08;
CREATE TABLE  dw_local.dist_test_08(id String,user_name String) engine = MergeTree primary key (id) order by  (id);DROP TABLE IF EXISTS dw_local.dist_test_09;
CREATE TABLE  dw_local.dist_test_09(id String,user_name String) engine = MergeTree primary key (id) order by  (id);DROP TABLE IF EXISTS dw_local.dist_test_10;
CREATE TABLE  dw_local.dist_test_10(id String,user_name String) engine = MergeTree primary key (id) order by  (id);-- 创建分布式表
DROP TABLE IF EXISTS dw.dist_test_01;
CREATE TABLE  dw.dist_test_01(id String,user_name String) ENGINE = Distributed(dw_cluster, dw_local, dist_test_01);DROP TABLE IF EXISTS dw.dist_test_02;
CREATE TABLE  dw.dist_test_02(id String,user_name String) ENGINE = Distributed(dw_cluster, dw_local, dist_test_02);DROP TABLE IF EXISTS dw.dist_test_03;
CREATE TABLE  dw.dist_test_03(id String,user_name String) ENGINE = Distributed(dw_cluster, dw_local, dist_test_03);DROP TABLE IF EXISTS dw.dist_test_04;
CREATE TABLE  dw.dist_test_04(id String,user_name String) ENGINE = Distributed(dw_cluster, dw_local, dist_test_04);DROP TABLE IF EXISTS dw.dist_test_05;
CREATE TABLE  dw.dist_test_05(id String,user_name String) ENGINE = Distributed(dw_cluster, dw_local, dist_test_05);DROP TABLE IF EXISTS dw.dist_test_06;
CREATE TABLE  dw.dist_test_06(id String,user_name String) ENGINE = Distributed(dw_cluster, dw_local, dist_test_06);DROP TABLE IF EXISTS dw.dist_test_07;
CREATE TABLE  dw.dist_test_07(id String,user_name String) ENGINE = Distributed(dw_cluster, dw_local, dist_test_07);DROP TABLE IF EXISTS dw.dist_test_08;
CREATE TABLE  dw.dist_test_08(id String,user_name String) ENGINE = Distributed(dw_cluster, dw_local, dist_test_08);DROP TABLE IF EXISTS dw.dist_test_09;
CREATE TABLE  dw.dist_test_09(id String,user_name String) ENGINE = Distributed(dw_cluster, dw_local, dist_test_09);DROP TABLE IF EXISTS dw.dist_test_10;
CREATE TABLE  dw.dist_test_10(id String,user_name String) ENGINE = Distributed(dw_cluster, dw_local, dist_test_10);

5.4、waterdrop配置:

用python3写批量写入配置文件

str1='''spark {spark.app.name = "Waterdrop"spark.executor.instances = 1spark.executor.cores = 1spark.executor.memory = "1g"spark.sql.catalogImplementation = "hive"}input {file {path = "file:///tmp/dist_test.csv"format = "csv"options.header = "true"result_table_name = "dist_test"}}filter {repartition {"注释":"对数进进行重新分区。"num_partitions = 2}}
'''filePath="/tmp/dist_test_01.conf"
str_output='''
output {clickhouse {host = "10.1.99.191:8123"database = "dw_local"table = "dist_test_01"username = "myuser"password = "mypassword"}
}
'''
with open(filePath,"w",encoding="utf-8") as f:f.write(str1+str_output)filePath="/tmp/dist_test_02.conf"
str_output='''
output {clickhouse {host = "10.1.99.191:8123"database = "dw_local"table = "dist_test_02"cluster = "xxx"username = "myuser"password = "mypassword"}
}
'''
with open(filePath,"w",encoding="utf-8") as f:f.write(str1+str_output)filePath="/tmp/dist_test_03.conf"
str_output='''
output {clickhouse {host = "10.1.99.191:8123"database = "dw_local"table = "dist_test_03"cluster = "dw_cluster"username = "myuser"password = "mypassword"}
}
'''
with open(filePath,"w",encoding="utf-8") as f:f.write(str1+str_output)filePath="/tmp/dist_test_04.conf"
str_output='''
output {clickhouse {host = "10.1.99.191:8123"database = "dw_local"table = "dist_test_04"cluster = "dw_cluster"write_mode = "xxx"username = "myuser"password = "mypassword"}
}
'''
with open(filePath,"w",encoding="utf-8") as f:f.write(str1+str_output)filePath="/tmp/dist_test_05.conf"
str_output='''
output {clickhouse {host = "10.1.99.191:8123"database = "dw_local"table = "dist_test_05"cluster = "dw_cluster"write_mode = "rowrandom"username = "myuser"password = "mypassword"}
}
'''
with open(filePath,"w",encoding="utf-8") as f:f.write(str1+str_output)filePath="/tmp/dist_test_06.conf"
str_output='''
output {clickhouse {host = "10.1.99.191:8123"database = "dw_local"table = "dist_test_06"cluster = "dw_cluster"write_mode = "rowrandom"hash_fields="xxx"username = "myuser"password = "mypassword"}
}
'''
with open(filePath,"w",encoding="utf-8") as f:f.write(str1+str_output)filePath="/tmp/dist_test_07.conf"
str_output='''
output {clickhouse {host = "10.1.99.191:8123"database = "dw_local"table = "dist_test_07"cluster = "dw_cluster"write_mode = "rowhash"username = "myuser"password = "mypassword"}
}
'''
with open(filePath,"w",encoding="utf-8") as f:f.write(str1+str_output)filePath="/tmp/dist_test_08.conf"
str_output='''
output {clickhouse {host = "10.1.99.191:8123"database = "dw_local"table = "dist_test_08"cluster = "dw_cluster"write_mode = "rowhash"hash_fields="xxx"username = "myuser"password = "mypassword"}
}
'''
with open(filePath,"w",encoding="utf-8") as f:f.write(str1+str_output)filePath="/tmp/dist_test_09.conf"
str_output='''
output {clickhouse {host = "10.1.99.191:8123"database = "dw_local"table = "dist_test_09"cluster = "dw_cluster"write_mode = "rowhash"hash_fields="user_name"username = "myuser"password = "mypassword"}
}
'''
with open(filePath,"w",encoding="utf-8") as f:f.write(str1+str_output)filePath="/tmp/dist_test_10.conf"
str_output='''
output {clickhouse {host = "10.1.99.191:8123"database = "dw_local"table = "dist_test_10"cluster = "dw_cluster"write_mode = "rowhash"hash_fields="user_name,id"username = "myuser"password = "mypassword"}
}
'''
with open(filePath,"w",encoding="utf-8") as f:f.write(str1+str_output)

5.5、执行导入:

bin/start-waterdrop.sh --master local[1] --deploy-mode client --config /tmp/dist_test_01.conf
bin/start-waterdrop.sh --master local[1] --deploy-mode client --config /tmp/dist_test_02.conf
bin/start-waterdrop.sh --master local[1] --deploy-mode client --config /tmp/dist_test_03.conf
bin/start-waterdrop.sh --master local[1] --deploy-mode client --config /tmp/dist_test_04.conf
bin/start-waterdrop.sh --master local[1] --deploy-mode client --config /tmp/dist_test_05.conf
bin/start-waterdrop.sh --master local[1] --deploy-mode client --config /tmp/dist_test_06.conf
bin/start-waterdrop.sh --master local[1] --deploy-mode client --config /tmp/dist_test_07.conf
bin/start-waterdrop.sh --master local[1] --deploy-mode client --config /tmp/dist_test_08.conf
bin/start-waterdrop.sh --master local[1] --deploy-mode client --config /tmp/dist_test_09.conf
bin/start-waterdrop.sh --master local[1] --deploy-mode client --config /tmp/dist_test_10.conf

5.6、查询数据:

-- 节点1
select * from (
select 'dist_test_01' as table_name ,count() from dw_local.dist_test_01 union all
select 'dist_test_02' as table_name ,count() from dw_local.dist_test_02 union all
select 'dist_test_03' as table_name ,count() from dw_local.dist_test_03 union all
select 'dist_test_04' as table_name ,count() from dw_local.dist_test_04 union all
select 'dist_test_05' as table_name ,count() from dw_local.dist_test_05 union all
select 'dist_test_06' as table_name ,count() from dw_local.dist_test_06 union all
select 'dist_test_07' as table_name ,count() from dw_local.dist_test_07 union all
select 'dist_test_08' as table_name ,count() from dw_local.dist_test_08 union all
select 'dist_test_09' as table_name ,count() from dw_local.dist_test_09 union all
select 'dist_test_10' as table_name ,count() from dw_local.dist_test_10
) t order by table_name
FINAL;select * from (
select 'dist_test_01' as table_name ,* from dw_local.dist_test_01 union all
select 'dist_test_02' as table_name ,* from dw_local.dist_test_02 union all
select 'dist_test_03' as table_name ,* from dw_local.dist_test_03 union all
select 'dist_test_04' as table_name ,* from dw_local.dist_test_04 union all
select 'dist_test_05' as table_name ,* from dw_local.dist_test_05 union all
select 'dist_test_06' as table_name ,* from dw_local.dist_test_06 union all
select 'dist_test_07' as table_name ,* from dw_local.dist_test_07 union all
select 'dist_test_08' as table_name ,* from dw_local.dist_test_08 union all
select 'dist_test_09' as table_name ,* from dw_local.dist_test_09 union all
select 'dist_test_10' as table_name ,* from dw_local.dist_test_10
) t order by table_name desc ,id;

5.7删除测试表和文件

DROP TABLE IF EXISTS dw_local.dist_test_01;
DROP TABLE IF EXISTS dw_local.dist_test_02;
DROP TABLE IF EXISTS dw_local.dist_test_03;
DROP TABLE IF EXISTS dw_local.dist_test_04;
DROP TABLE IF EXISTS dw_local.dist_test_05;
DROP TABLE IF EXISTS dw_local.dist_test_06;
DROP TABLE IF EXISTS dw_local.dist_test_07;
DROP TABLE IF EXISTS dw_local.dist_test_08;
DROP TABLE IF EXISTS dw_local.dist_test_09;
DROP TABLE IF EXISTS dw_local.dist_test_10;DROP TABLE IF EXISTS dw.dist_test_01;
DROP TABLE IF EXISTS dw.dist_test_02;
DROP TABLE IF EXISTS dw.dist_test_03;
DROP TABLE IF EXISTS dw.dist_test_04;
DROP TABLE IF EXISTS dw.dist_test_05;
DROP TABLE IF EXISTS dw.dist_test_06;
DROP TABLE IF EXISTS dw.dist_test_07;
DROP TABLE IF EXISTS dw.dist_test_08;
DROP TABLE IF EXISTS dw.dist_test_09;
DROP TABLE IF EXISTS dw.dist_test_10;
rm -f /tmp/dist_test_01.conf
rm -f /tmp/dist_test_02.conf
rm -f /tmp/dist_test_03.conf
rm -f /tmp/dist_test_04.conf
rm -f /tmp/dist_test_05.conf
rm -f /tmp/dist_test_06.conf
rm -f /tmp/dist_test_07.conf
rm -f /tmp/dist_test_08.conf
rm -f /tmp/dist_test_09.conf
rm -f /tmp/dist_test_10.conf

6、性能测试:

测试表信息:行数:6800万,文件大小:3.6G,列数:24

节点数量:2

单机配置:100G内存,8核32线程

写入方式 第1次耗时(秒) 第2次耗时(秒) 第3次耗时(秒) 平均耗时(秒)
单机 234.835144 242.014209 239.198289 238.68
分区随机 224.140249 230.265629 236.603885 230.34
行随机 238.440763 243.962106 245.21358 242.54
行hash 228.066742 234.923924 227.100249 230.03

7、总结

1、新增的两种写模式都是基于行的,行随机模式需要每一行都计算一个随机值,行hash模式需要每一行基于一个或多个属性计算hash,相比默认的分区随机模式,理论上性能还有一定的影响,但实际测试中似乎没有什么影响

2、由于jdbc连接是在rdd分区中创建,且要保存所有shard的信息,所以每个分区中都需要创建N个连接对象,使用一个数组来存储,所以对应shard数量较多的集群来说,可能需要资源消耗会比较大,不过这个问题似乎无法避免。

waterdrop1.x导入clickhouse分布式表-修改源码相关推荐

  1. waterdrop1.x导入clickhouse分布式表-默认方式

    先引用一段官方output clickhouse插件中,对分布式表的说明 官方文档地址:https://interestinglab.github.io/seatunnel-docs/#/zh-cn/ ...

  2. waterdrop1.x导入clickhouse分布式表-fitersql

    接上一篇,最后留下的两个问题, 针对问题2:在不修改源代码的情况下,如何实现分布式表的本地hash方式写入? 现在做一些尝试和验证. 思路: waterdrop是可以进行多数据流程处理的,官方说明文档 ...

  3. ClickHouse 分布式表创建细节

    ClickHouse 分布式表创建细节 记录一次创建分布式表的过程. 背景 ClickHouse服务器数量:10 需创建本地表(local)与分布式表 问题发现 创建本地表的过程中未出现问题,一切正常 ...

  4. React 表单源码阅读笔记

    1 概念 1.1 什么是表单 实际上广义上的表单并不是特别好界定,维基上讲表单是一系列带有空格的文档,用于输写或选择.更具体的,在网页中表单主要负责数据采集的功能,我们下文中所提到的表单都指后者.如下 ...

  5. Redis分布式锁解析源码分析

    Redis分布式锁解析&源码分析 概述 实战 简单的分布式锁 Redisson实现分布式锁 Redission源码分析 构造方法 获取锁lock 解锁 锁失效 红锁 案例分析 原始的写法 进化 ...

  6. 6S大气传输模型修改源码添加、自定义卫星光谱响应(以HJ-1B CCD为例)

    6S大气传输模型修改源码添加.自定义卫星光谱响应(以HJ-1B CCD为例) 最近要做国产卫星的大气校正,打算用6s模型模拟气溶胶的查找表,但是发现6s模型中没有国产卫星的相应光谱响应函数,只能在输入 ...

  7. 【微信小程序控制硬件②】 开始微信小程序之旅,导入小程序Mqtt客户端源码,实现简单的验证和通讯于服务器.(附带源码)

    文章目录 一.前言: 二.注册微信小程序: 三.本博文连接和微信物联有何区别: 四.微信小程序`MQTT`客户端源码导入注意事项: 五.下载: 微信物联网生态主要分在微信硬件开发平台与腾讯物联开发平台 ...

  8. android系统源码7.1.2_r8下载,编译,运行到nexus5X上,修改源码并编译SDK进行测试

    一,学习android系统源码下载,编译的作用 1,可以自己 DIY 自己的rom系统,从系统层面,宏观的加深理解 android系统 2,编译自己的 userdebug(原生root权限) rom, ...

  9. mybatis-generator修改源码2

    参考: MyBatis Generator系列(三)----修改源码实现中文注释 (包括java.net.MalformedURLException at java.net.URL.<init& ...

最新文章

  1. quartus总线怎样连接(例如,怎么和ROM连接)
  2. Python数据可视化之Matplotlib实现各种图表
  3. 关于spring boot集成MQTT的一写新人问题
  4. filezilla 设置服务器_树莓派 LAMP服务器搭建
  5. fastapi 模式的额外信息,示例 / Cookie参数 / Header参数
  6. VB6的后期绑定和前期绑定
  7. React-router 4 按需加载的实现方式及原理(Code Splitting)
  8. WordPress快速开发的博客平台
  9. matlab卷积反投影,卷积反投影法图象重建.pdf
  10. 电脑报制作黑客入门新手特训第1版
  11. threejs学习05-OimoPhysics模型的物理特性
  12. web基础(一)——初识HTML5
  13. android usb 以太网,如何在Android智能手机上通过USB-OtG使用连接USB的有线以太网适配器?...
  14. 局域网监控软件有哪些功能
  15. Lora源码的相关问题(lora_pkt_fwd.c 修改记录)
  16. 报错:ERROR: modpost: “__aeabi_unwind_cpp_pr1“ [/tmp/stapuOPLIl/stap_xxx.ko]undefined!
  17. XYCMS搬家公司建站系统 v3.8
  18. JAVA SHA-1加密及DES加解密
  19. java文件夹拒绝访问-java.io.FileNotFoundException: .\xx\xx (拒绝访问。)
  20. KeyMob:移动广告聚合平台 收益提高30%

热门文章

  1. 产品原型-10.用户体验地图
  2. GDMSS.LITE
  3. PHP使用FPDF、Fpdi类库给PDF文件添加水印
  4. 《初等数论》:高斯函数、n的阶乘的标准分解式
  5. 单列集合系列之List集合的初了解
  6. 第13节 实现不同交换机同VLAN之间通信——基于VLAN Trunk技术
  7. 解决上传图片时报错Uncaught (in promise) DOMException: Failed to execute ‘put‘ on ‘IDBObjectStore‘
  8. 2022年中级会计职称考试会计实务练习题及答案
  9. 用例图分析---账号管理系统
  10. DTOJ#5043. 路哥