Spark获取CSV文件导入ClickHouse

  • 环境配置
  • POM文件
  • ClickHouser创建临时表
  • 数据库连接工具类(Scala版)
  • 获取csv使用工具类导入ClickHouse(重点)
  • 线上运行语句

由于我们在工作中可能会用到导数需求,所以我就用sparkSQL进行开发了一个开发工具

环境配置

本地开发环境:WIN10、IDEA2019.3、Scala2.11.12、Spark2.4.0

POM文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><repositories><!-- 阿里云代码库 --><repository><id>maven-ali</id><url>http://maven.aliyun.com/nexus/content/groups/public//</url><releases><enabled>true</enabled></releases><snapshots><enabled>true</enabled><updatePolicy>always</updatePolicy><checksumPolicy>fail</checksumPolicy></snapshots></repository><repository><id>cloudera</id><url>https://repository.cloudera.com/artifactory/cloudera-repos/</url></repository><repository><id>cloudera.public.repo</id><url>https://repository.cloudera.com/artifactory/public</url></repository></repositories><groupId>org.example</groupId><artifactId>UploadFileScala</artifactId><version>1.0-SNAPSHOT</version><properties><log4j.version>1.2.17</log4j.version><slf4j.version>1.7.22</slf4j.version><casbah.version>3.1.1</casbah.version><redis.version>2.9.0</redis.version><spark.version>2.4.0</spark.version><jblas.version>1.2.1</jblas.version><pg.version>42.2.5</pg.version><scala.version>2.11.12</scala.version></properties><dependencies><dependency><groupId>ru.yandex.clickhouse</groupId><artifactId>clickhouse-jdbc</artifactId><version>0.2</version><exclusions><exclusion><groupId>com.fasterxml.jackson.core</groupId><artifactId>*</artifactId></exclusion></exclusions></dependency><!-- 引入共同的日志管理工具 --><dependency><groupId>org.slf4j</groupId><artifactId>jcl-over-slf4j</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>${log4j.version}</version></dependency><!-- 引入Spark相关的Jar包 --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-mllib_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.postgresql</groupId><artifactId>postgresql</artifactId><version>${pg.version}</version></dependency><!-- https://mvnrepository.com/artifact/com.databricks/spark-csv --><dependency><groupId>com.databricks</groupId><artifactId>spark-csv_2.10</artifactId><version>1.5.0</version></dependency><!-- 该插件是限制lz4的版本为1.3.0 --><dependency><groupId>net.jpountz.lz4</groupId><artifactId>lz4</artifactId><version>1.3.0</version></dependency><!--ClickHouser 驱动包--><dependency><groupId>ru.yandex.clickhouse</groupId><artifactId>clickhouse-jdbc</artifactId><version>0.2</version><exclusions><exclusion><groupId>com.fasterxml.jackson.core</groupId><artifactId>*</artifactId></exclusion></exclusions></dependency></dependencies><!--maven打包编译插件--><build><resources><resource><directory>src/main/scala</directory><includes><include>**/*.properties</include><include>**/*.xml</include></includes><filtering>true</filtering></resource><resource><directory>src/main/resources</directory></resource><resource><directory>src/main/resources/lib</directory><targetPath>BOOT-INF/lib/</targetPath><includes><include>**/*.jar</include></includes></resource></resources><!--scala待编译的文件目录--><sourceDirectory>src/main/scala</sourceDirectory><!--scala插件--><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions><configuration><scalaVersion>${scala.version}</scalaVersion><args><arg>-target:jvm-1.8</arg></args><jvmArgs><!--<jvmArg>-Xms256m</jvmArg><jvmArg>-Xmx1024m</jvmArg>--><jvmArg>-Xss4096k</jvmArg></jvmArgs></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
</project>

ClickHouser创建临时表

我们上传的CSV文件数据是供于临时机器学习数据,故导入临时表

CREATE DATABASE temp;
CREATE TABLE temp.user_Detail_20200416  (user_id Int32,sale_amount Float32,trans_count Int32,offline_count Int32,online_count Int32,shopping_count Int32,tuihuo_count Int32,tuihuo_lv Float32,apru Float32,create_day Int32,is_gravida Int32,is_dobule_source Int32,baby_day Int32,active_code Int32
) ENGINE = MergeTree() ORDER BY (user_id) SETTINGS index_granularity = 8192;

数据库连接工具类(Scala版)

该类是工具类主类DBUtils

object DBUtils {val DEV = "dev"val PROD = "prod"var concurrentMode = ""private def DBUtils(mode: String): Unit = {concurrentMode = mode}private def getDBProperties(mode: String) = {val dbProperties = new Properties()if (mode == null) {throw new IllegalArgumentException("需要正确的参数 mode 属性")}if (DEV.equals(mode)) {dbProperties.load(DBUtils(mode).getClass.getClassLoader.getResourceAsStream("dev-db.properties"))}if (PROD.equals(mode)) {dbProperties.load(DBUtils(mode).getClass.getClassLoader.getResourceAsStream("prod-db.properties"))}dbProperties}def getDBConfig(mode: String) = {if (mode == null) {println("需要正确的参数 mode 属性")}val properties = getDBProperties(mode)val url = properties.getProperty("bi.bigdata.spark.ml.gp.url")val driver = properties.getProperty("bi.bigdata.spark.ml.gp.driver")val username = properties.getProperty("bi.bigdata.spark.ml.gp.username")val password = properties.getProperty("bi.bigdata.spark.ml.gp.password")DBConfig(url, driver, username, password)}def sparkDBProp(mode: String): Unit ={val config = getDBConfig(mode)val properties = new Properties()properties.setProperty("user",config.username)properties.setProperty("password",config.password)properties.setProperty("url",config.url)properties.setProperty("driver",config.driver)}
}

DBConfig为数据库连接工具配置类

case class DBConfig(url:String,driver:String,username:String,password:String) {}

获取csv使用工具类导入ClickHouse(重点)

localhostToCsv 此类为主要调用类

object localhostToCsv {def main(args: Array[String]): Unit = {//Spark集群配置的各种参数val sparkConf = new SparkConf();//SparkContext的初始化需要一个SparkConf对象sparkConf.set("spark.testing.memory", "2147480000")//获取SparkSessionval sess = SparkSession.builder().appName("SQLTest").master("local[*]").config(sparkConf).getOrCreate()//HDFS文件路径地址val csvPath = "/leyou/temp/result1.csv"val csvDf = sess.read.option("delimiter", ",") //分隔符,默认为逗号,.option("header", "true") //指定一个字符串代表 null 值.option("quote", "'")     //引号字符,默认为双引号".option("nullValue", "\\N")   //第一行不作为数据内容,作为标题.option("inferSchema", "true")  //自动推测字段类型.schema(ScalaReflection.schemaFor[UserDetail].dataType.asInstanceOf[StructType]) //指定csv字段类型.csv(csvPath)val csvCols = csvDf.columns//csvDf.show()  //打印csv明细数据,只展示20行//csvDf.printSchema()  //打印数据结构信息包含每列的名称及类型//设置sprark临时表csvDf.createTempView("csvView")//使用sparkSQL进行临时表数据val frame = sess.sql("select user_id,sale_amount,trans_count,offline_count,online_count,shopping_count,tuihuo_count,tuihuo_lv,apru,create_day,is_gravida,is_dobule_source,baby_day,active_code from csvView")//新建配置类val connProperties = new PropertiesconnProperties.setProperty("driver", "ru.yandex.clickhouse.ClickHouseDriver")connProperties.setProperty("user", "default")frame.write.mode(SaveMode.Append).option("batchsize", "100000").jdbc("jdbc:clickhouse://cdh2:8123/", "temp.user_Detail_20200416", connProperties)//关闭SparkSessionsess.stop()}}

UserDetail 该类为csv字段映射类,主要用于设置字段类型

case class UserDetail(user_id:Int,sale_amount:Double,trans_count:Int,offline_count:Int,online_count:Int,shopping_count:Int,tuihuo_count:Int,tuihuo_lv:Double,apru:Double,create_day:Int,is_gravida:Int,is_dobule_source:Int,baby_day:Int,active_code:Int) {}

由于该项目是在yarn上运行的,故我们需要将csv文件上传只HDFS上

线上运行语句

 spark-submit --class com.leyou.bi.uploadFile.localhostToCsv --master yarn --deploy-mode cluster --executor-memory 4G --num-executors 4 --driver-memory 4G --conf spark.default.parallelism=1000 --conf spark.memory.fraction=0.75 --conf spark.memory.storageFraction=0.5 --conf spark.network.timeout=10000000 /opt/apps/spark/jars/UploadFileCsvScala20200416-0.0.1.jar

执行语句完成之后我们可以去yarn WEB管理页面上去我们执行的任务,查看是否运行成功

Spark获取CSV文件导入ClickHouse相关推荐

  1. clickhouse将csv文件导入表中出现的 DB::ParsingException: Cannot parse input: expected ‘,‘ before: ‘\‘错误的解决方法

    将csv文件导入到clickhouse的表中 出现如下错误 是由于csv文件中的符号出现解析错误 注意:csv文件中的分隔符是",",而不是"制表符",仔细检查 ...

  2. spark解析csv文件_Spark:解析CSV文件并按列值分组

    spark解析csv文件 我发现自己经常使用大型CSV文件,并且意识到我现有的工具集不能让我快速浏览它们,我以为我会花一些时间在Spark上看看是否有帮助. 我正在使用芝加哥市发布的犯罪数据集 :它的 ...

  3. php导入csv文件,php实现CSV文件导入和导出

    项目开发中,很多时候要将外部CSV文件导入到数据库中或者将数据导出为CSV文件,那么具体该如何实现呢?本文将使用PHP并结合mysql,实现了CSV格式数据的导入和导出功能. 我们先准备mysql数据 ...

  4. php inputcsv,php实现CSV文件导入和导出

    这篇文章主要介绍了php实现CSV文件导入和导出的方法,具有一定的参考价值,需要的朋友可以参考下 项目开发中,很多时候要将外部CSV文件导入到数据库中或者将数据导出为CSV文件,那么具体该如何实现呢? ...

  5. 收藏!用Python一键批量将任意结构的CSV文件导入MySQL数据库。

    Python有很多库可以对CSV文件和Excel文件进行自动化和规模化处理.但是,使用数据库可以将计算机完成任务的能力提升成千上万倍! 那么问题来了,如果有很多个文件需要导入数据库,一个一个操作效率太 ...

  6. CSV文件导入数据库后中文乱码

    CSV文件导入数据库后中文乱码 背景:我用的是Dbeaver连接的clickhouse数据库,向现场的同事要了数据(CSV格式文件),导入后都是途中这样的,根本用不了.在网上找到同样的情况的文章,记录 ...

  7. C#创建和获取.csv文件

    1.调用方法 (1)创建.csv文件 bool Filebool = FileUtil.OutPutCsvFile(DATATABLE表, "A:\XXX.csv", false, ...

  8. mysql可视化导入csv文件_我们如何将数据从.CSV文件导入MySQL表?

    实际上,CSV也是一个文本文件,其中的值由逗号分隔,换句话说,我们可以说该文本文件带有CSV(逗号分隔的值).在将数据从.CSV文件导入到MySQL表时,我们需要将FIELDS SEPARATED O ...

  9. 将多个csv文件导入到pandas中并串联到一个DataFrame中

    本文翻译自:Import multiple csv files into pandas and concatenate into one DataFrame I would like to read ...

最新文章

  1. ios10申请权限以及弹出允许使用数据框
  2. python求函数曲率_【Python】车道线拟合曲线的曲率半径计算公式及代码
  3. Exception in thread main java.lang.UnsupportedClassVersionError的另类解决办法
  4. enable 华为交换机ntdp_华为交换机配置
  5. mysql查询临时表是否存在_[转]SQL判断临时表是否存在
  6. Ffmpeg下载WINDOWS、MAC编译结果
  7. 图像空域增强:灰度映射法
  8. 安卓手机反应慢又卡怎么办_安卓手机出现卡顿反应慢的具体处理方法
  9. PEmicro GDB Launch Failure : Could not bind socket.
  10. 深度学习常用软件包和基本环境配置
  11. 微观经济学之供给与需求--第一章第二章
  12. 【pytest】三、pytest用例管理框架的前后置(固件、夹具)setup和teardown,及封装
  13. dpdk-16.04 扩展新网卡驱动过程
  14. DialogBox函数参数
  15. Java 中有 goto 吗?
  16. 容联云sdk 打包maven
  17. 对比Mac OS上的PF与iptables
  18. PHP 零基础入门笔记(11):字符串 String
  19. C语言学习5:机器数、真值、原码、反码和补码详解
  20. Unity编辑器开发(五):实战、开发一个AB包编辑器工具

热门文章

  1. 【V2ray 报错 failed to read response header】
  2. windows11删除此电脑的6个图标,包括视频、图片、文档、下载、音乐、桌面
  3. 小米usb当前设备已被临时限制3-2
  4. 入门Retrofit,看这一篇就够了
  5. 骞云云原生运维管理平台7.0版本正式发布
  6. php rewrite 开启,Apache Rewrite 开启和使用方法
  7. Elasticsearch 7.X 性能优化
  8. STM32 HAL库获取系统时钟与标准库获取系统时钟
  9. poj 1013 模拟 天平问题
  10. [因果推断] 学习资料汇总