Spark SQL与外部数据源的操作(Spark SQL ——> CSV/JSON/Parquet/hive/mysql)
目录
- 一、Spark SQL支持的外部数据源
- 二、Spark SQL —> CSV
- 2.1 读CSV文件
- a.有列名
- b.无列名
- 2.2 写CSV文件
- 三、Spark SQL —> JSON
- 3.1 读JSON文件
- 3.2 写JSON文件
- 四、Spark SQL —> Parquet
- 读&写 Parquet文件
- 五、Spark SQL —>Hive
- 5.1 Spark集成Hive
- 5.2 IDEA连接Hive
- 六、Spark SQL —>MySQL
一、Spark SQL支持的外部数据源
二、Spark SQL —> CSV
2.1 读CSV文件
a.有列名
package cn.kgc.spark.Testimport org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}object Demo_LoadCSVFile {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[4]")val sc = new SparkContext(conf)val customer = sc.textFile("data/customer_details.csv")customer.map(x=>(x.split(",")))//创建SparkSession//通过SparkSession加载csc/json文件val spark = SparkSession.builder().config(conf).getOrCreate()//加载csv文件// .option("header",true)判断文件中的第一行是否为列的名称val df = spark.read.format("csv").option("header",true).load("data/customer_details.csv")df.show()df.printSchema()}
}
"C:\Program Files\Java\jdk1.8.0_231\bin\java.exe" ...
2021-01-09 13:01:07,596 WARN [org.apache.spark.SparkContext] - Using an existing SparkContext; some configuration may not take effect.
+-----------+----------+-----------+--------------------+------+--------------------+-------+---------+--------------------+--------------------+--------------------+
|customer_id|first_name| last_name| email|gender| address|country| language| job| credit_type| credit_no|
+-----------+----------+-----------+--------------------+------+--------------------+-------+---------+--------------------+--------------------+--------------------+
| 1| Spencer| Raffeorty|sraffeorty0@dropb...| Male| 9274 Lyons Court| China| Khmer|Safety Technician...| jcb| 3589373385487660 |
| 2| Cherye| Poynor| cpoynor1@51.la|Female|1377 Anzinger Avenue| China| Czech| Research Nurse| instapayment| 6376594861844530 |
| 3| Natasha| Abendroth|nabendroth2@scrib...|Female| 2913 Evergreen Lane| China| Yiddish|Budget/Accounting...| visa| 4041591905616350 |
| 4| Huntley| Seally| hseally3@prlog.org| Male| 694 Del Sol Lane| China| Albanian|Environmental Spe...| laser| 677118310740263000 |
| 5| Druci| Coad| dcoad4@weibo.com|Female| 16 Debs Way| China| Hebrew| Teacher| jcb| 3537287259845040 |
| 6| Sayer| Brizell|sbrizell5@opensou...| Male| 71 Banding Terrace| China| Maltese| Accountant IV| americanexpress| 379709885387687 |
| 7| Becca| Brawley|bbrawley6@sitemet...|Female|7 Doe Crossing Ju...| China| Czech|Payment Adjustmen...| jcb| 3545380000000000 |
| 8| Michele| Bastable| mbastable7@sun.com|Female|98 Clyde Gallaghe...| China|Malayalam| Tax Accountant| jcb| 3588130000000000 |
| 9| Marla|Brotherhood|mbrotherhood8@ill...|Female|4538 Fair Oaks Trail| China| Dari| Design Engineer| china-unionpay|5602230000000000000 |
| 10| Lionello| Gogarty|lgogarty9@histats...| Male| 800 Sage Alley| China| Danish| Clinical Specialist|diners-club-carte...| 30290800000000 |
| 11| Camile| Ringer| cringera@army.mil|Female|5060 Fairfield Alley| China| Punjabi| Junior Executive| china-unionpay| 5602210000000000 |
| 12| Gillan| Banbridge|gbanbridgeb@wikip...|Female| 91030 Havey Point| China| Kurdish| Chemical Engineer| jcb| 3555950000000000 |
| 13| Guinna| Damsell|gdamsellc@spiegel.de|Female| 869 Ohio Park| China| Fijian| Analyst Programmer| jcb| 3532010000000000 |
| 14| Octavia| McDugal|omcdugald@rambler.ru|Female| 413 Forster Center| China| English|Desktop Support T...| maestro| 502018000000000000 |
| 15| Anjanette| Penk| apenke@lulu.com|Female| 8154 Schiller Road| China| Swedish| VP Sales| jcb| 3548040000000000 |
| 16| Maura| Teesdale|mteesdalef@globo.com|Female| 9568 Quincy Alley| China| Dutch| Dental Hygienist| jcb| 3582890000000000 |
| 17| Chastity| Neylon| cneylong@wix.com|Female|11952 Northwester...| China| Gujarati| Geologist I| jcb| 3543180000000000 |
| 18| Ralph| Coils|rcoilsh@artisteer...| Male| 5070 Hooker Pass| China| Khmer| Speech Pathologist| mastercard| 5505790000000000 |
| 19| Jehanna| Whybrow| jwhybrowi@wp.com|Female| 83 Maywood Way| China| Fijian| Assistant Professor| visa-electron| 4844570000000000 |
| 20| Ingeberg| Sutehall|isutehallj@feedbu...|Female| 272 Butternut Drive| China| Kazakh| Assistant Manager| visa-electron| 4405830000000000 |
+-----------+----------+-----------+--------------------+------+--------------------+-------+---------+--------------------+--------------------+--------------------+
only showing top 20 rowsroot|-- customer_id: string (nullable = true)|-- first_name: string (nullable = true)|-- last_name: string (nullable = true)|-- email: string (nullable = true)|-- gender: string (nullable = true)|-- address: string (nullable = true)|-- country: string (nullable = true)|-- language: string (nullable = true)|-- job: string (nullable = true)|-- credit_type: string (nullable = true)|-- credit_no: string (nullable = true)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
b.无列名
"C:\Program Files\Java\jdk1.8.0_231\bin\java.exe" ...
package SparkSQLimport org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{StringType, StructField, StructType}object Demo9_LoadCSVFile {def main(args: Array[String]): Unit = {//创建SparkSessionval spark = SparkSession.builder().master("local[4]") //设置master.appName(this.getClass.getName) //设置appName.getOrCreate()//创建schema信息val schema = StructType(Array(StructField("number", StringType, true),StructField("date", StringType, true),StructField("orderNo", StringType, true),StructField("status", StringType, true)))val df = spark.read.format("csv").schema(schema).load("file:///F:\\notes\\java\\SparkFirst\\data2\\orders.csv")df.show()df.printSchema()}
}
+------+-------------------+-------+---------------+
|number| date|orderNo| status|
+------+-------------------+-------+---------------+
| 1|2013-07-25 00:00:00| 11599| CLOSED|
| 2|2013-07-25 00:00:00| 256|PENDING_PAYMENT|
| 3|2013-07-25 00:00:00| 12111| COMPLETE|
| 4|2013-07-25 00:00:00| 8827| CLOSED|
| 5|2013-07-25 00:00:00| 11318| COMPLETE|
| 6|2013-07-25 00:00:00| 7130| COMPLETE|
| 7|2013-07-25 00:00:00| 4530| COMPLETE|
| 8|2013-07-25 00:00:00| 2911| PROCESSING|
| 9|2013-07-25 00:00:00| 5657|PENDING_PAYMENT|
| 10|2013-07-25 00:00:00| 5648|PENDING_PAYMENT|
| 11|2013-07-25 00:00:00| 918| PAYMENT_REVIEW|
| 12|2013-07-25 00:00:00| 1837| CLOSED|
| 13|2013-07-25 00:00:00| 9149|PENDING_PAYMENT|
| 14|2013-07-25 00:00:00| 9842| PROCESSING|
| 15|2013-07-25 00:00:00| 2568| COMPLETE|
| 16|2013-07-25 00:00:00| 7276|PENDING_PAYMENT|
| 17|2013-07-25 00:00:00| 2667| COMPLETE|
| 18|2013-07-25 00:00:00| 1205| CLOSED|
| 19|2013-07-25 00:00:00| 9488|PENDING_PAYMENT|
| 20|2013-07-25 00:00:00| 9198| PROCESSING|
+------+-------------------+-------+---------------+
only showing top 20 rowsroot|-- number: string (nullable = true)|-- date: string (nullable = true)|-- orderNo: string (nullable = true)|-- status: string (nullable = true)Process finished with exit code 0
2.2 写CSV文件
将DataFrame向外写成CSV格式文件时,除了指定数据追加模式外,还可以指定数据分隔符;
df.write.format("csv").mode("overwrite").option("sep", "\t").save("F:\\notes\\java\\SparkFirst\\output\\CSVdownload")
package SparkSQLimport org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}object Demo10_WriteCSVFile {def main(args: Array[String]): Unit = {//创建SparkSessionval spark = SparkSession.builder().master("local[4]") //设置master.appName(this.getClass.getName) //设置appName.getOrCreate()val sc=spark.sparkContextimport spark.implicits._val peopleRDD = sc.textFile("data2/people.txt")//将RDD通过和Schema信息关联,得到DataFrame//通过StructType构建Schema信息,StructField代表一个字段//第一个参数是字段名称,第二个参数是字段类型,第三个参数是是否可以为空val schema = StructType(Array(StructField("name", StringType, true),StructField("age", IntegerType, true)))//将每行的字符串切割,切割成Array,将其转化为RDD[Row]类型val peopleRowRDD = peopleRDD.map(_.split(",")).map(x=>Row(x(0),x(1).toInt))//将Row类型的RDD和Schema信息关联,创建一个DataFrameval df = spark.createDataFrame(peopleRowRDD,schema)df.write.format("csv").mode("overwrite").save("F:\\notes\\java\\SparkFirst\\output\\CSVdownload")}
}
三、Spark SQL —> JSON
3.1 读JSON文件
package SparkSQLimport org.apache.spark.sql.SparkSessionobject Demo4_LoadJSONFile {def main(args: Array[String]): Unit = {//创建SparkSessionval spark = SparkSession.builder().master("local[4]") //设置master.appName(this.getClass.getName) //设置appName.getOrCreate()import spark.implicits._//两种加载json/csv文件:spark.read.format("json").load("PATH") / spark.read.json("PATH")val sc=spark.sparkContextval dataFrame = spark.read.json("data/users.json")dataFrame.show()}
}
3.2 写JSON文件
与写入CSV格式一样,通过DataFrame.write()方法写入:
df.write.format("json").mode("overwrite").save("F:\\notes\\java\\SparkFirst\\output\\JSONdownload")
四、Spark SQL —> Parquet
读&写 Parquet文件
Parquet文件:是一种流行的列式存储格式,以二进制存储,文件中包含数据和元数据
e.g.
package sparksql2import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types._object Demo1_ReadWriteParquet {def main(args: Array[String]): Unit = {//创建SparkSessionval spark = SparkSession.builder().master("local[4]") //设置master.appName(this.getClass.getName) //设置appName.getOrCreate()import spark.implicits._val sc=spark.sparkContextval rdd = sc.parallelize(List(("curry", "yellow", Array(3, 9, 15, 20)), ("jack", "red", null)))// 1.定义schema信息val schema = StructType(Array(StructField("name", StringType, true),StructField("favorite_color", StringType, true),StructField("favorite_numbers", ArrayType(IntegerType), true)))// 2.准备Row类型的RDDval rowRDD = rdd.map(x=>Row(x._1,x._2,x._3))//3.通过Row类型的RDD和schema信息创建DataFrameval df = spark.createDataFrame(rowRDD,schema)//4.如果直接写入文件,默认就是Parquet格式df.write.save("F:\\notes\\java\\SparkFirst\\output\\parquet")//也可以手动指定parquet格式//.mode是写入模式,可以是overwrite(覆盖),也可以是append(追加)df.write.mode("overwrite").parquet("F:\\notes\\java\\SparkFirst\\output\\parquet")//5.使用.read.parquet方法可以读取parquet文件val df2 = spark.read.parquet(("F:\\notes\\java\\SparkFirst\\output\\parquet"))df2.show()}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
五、Spark SQL —>Hive
5.1 Spark集成Hive
[root@single ~]#cp /opt/software/hadoop/hive110/conf/hive-site.xml /opt/software/hadoop/spark244/conf
[root@single lib]# cp mysql-connector-java-5.1.32.jar /opt/software/hadoop/spark244/jars/
spark-shell --jars /opt/software/hadoop/spark244/jars/mysql-connector-java-5.1.32.jar
scala>spark.sql("show databases").show()
5.2 IDEA连接Hive
<!-- spark-hive --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.11</artifactId><version>2.4.4</version></dependency><!-- mysql-connector-java --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.31</version></dependency>
package sparksql2import org.apache.spark.sql.SparkSessionobject Demo2_ConnectHive {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[4]") //设置master.appName(this.getClass.getName) //设置appName.enableHiveSupport() //开启Hive支持.getOrCreate()val df = spark.sql("select * from pv")df.show()}
}
mysql>grant all on *.* to 'root'@'%' identified by 'root';
mysql>grant all on *.* to 'root'@'localhost' identified by 'root';
mysql>flush privileges;
六、Spark SQL —>MySQL
package sparksql2import java.util.Propertiesimport org.apache.spark.sql.SparkSessionobject Demo3_ConnectMysql {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[4]") //设置master.appName(this.getClass.getName) //设置appName.enableHiveSupport().getOrCreate()//设置要访问的mysql的url,表名val url = "jdbc:mysql://192.168.182.130:3306/mysqltest"val tablename ="student"val props = new Properties()//设置要访问的mysql的用户名,密码,Driverprops.setProperty("user","root")props.setProperty("password","root")props.setProperty("driver","com.mysql.jdbc.Driver")//通过spark.read.jdbc方法读取mysql中的数据val df = spark.read.jdbc(url,tablename,props)df.show()}
}
"C:\Program Files\Java\jdk1.8.0_231\bin\java.exe"...
+---+--------+------+--------+
|sid| sname|gender|class_id|
+---+--------+------+--------+
| 1| 孙尚香| 女| 1|
| 2| 貂蝉| 女| 1|
| 3| 刘备| 男| 2|
| 4| 孙二娘| 女| 2|
| 5| 张飞| 男| 3|
| 6| 关羽| 男| 4|
| 7| 林黛玉| 女| 5|
| 8| 薛宝钗| 女| 6|
| 9| 宋江| 男| 6|
| 10| 白骨精| 女| 7|
| 11| 猪八戒| 男| 8|
| 12| 王熙凤| 女| 1|
| 13| 李师师| 女| 2|
| 14| 金翠莲| 女| 9|
| 15| 如花| 女| 1|
| 16| 沙僧| 男| 2|
| 17| 李美丽| 女| 3|
| 18|金角大王| 男| 4|
+---+--------+------+--------+Process finished with exit code 0
Spark SQL与外部数据源的操作(Spark SQL ——> CSV/JSON/Parquet/hive/mysql)相关推荐
- spark集群配置以及java操作spark小demo
spark 安装 配置 使用java来操作spark spark 安装 tar -zxvf spark-2.4.0-bin-hadoop2.7.tgz rm spark-2.4.0-bin-hadoo ...
- Spark SQL连接外部数据源
一.Spark SQL支持的外部数据源 Spark SQL开放了一系列接入外部数据源的接口,来让开发者可以实现Spark SQL可以 加载任何地方的数据,例如mysql,hive,hdfs,hbase ...
- 命令行客户端MySQL基本命令的使用(登录、登出、数据库操作的SQL语句、表结构的SQL语句、表数据操作的SQL语句)
1. 登录和登出数据库 登录数据库: 输入下面命令: mysql -uroot -p 说明: -u 后面是登录的用户名 [写成-u root也是可以的] -p 后面是登录密码, 如果不填写, 回车之 ...
- 数据库学习day_01:SQL的发展和数据库操作相关sql语句
1.数据库 学习数据库主要学习的就是如何对数据进行增删改查操作. 增加(插入数据) 删除数据 修改数据 查询数据 为什么使用数据库软件? 之前在webserver时通过IO技术已经操作过数据,其实这部 ...
- Spark SQL操作外部数据源
目录 一.Spark SQL支持的外部数据源 二.Spark SQL -> CSV 2.1 读CSV文件 a.有列名 b.无列名 2.2 写CSV文件 三.Spark SQL -> JSO ...
- Spark SQL之External DataSource外部数据源(二)源代码分析
上周Spark1.2刚公布,周末在家没事,把这个特性给了解一下,顺便分析下源代码,看一看这个特性是怎样设计及实现的. /** Spark SQL源代码分析系列文章*/ (Ps: External Da ...
- Spark SQL External DataSource外部数据源
一:介绍 官网:https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html 随着Spark1.2的发布 ...
- Spark SQL 外部数据源
一.简介 1.1 多数据源支持 Spark 支持以下六个核心数据源,同时 Spark 社区还提供了多达上百种数据源的读取方式,能够满足绝大部分使用场景. CSV JSON Parquet ORC JD ...
- Spark RDD、DataFrame原理及操作详解
RDD是什么? RDD (resilientdistributed dataset),指的是一个只读的,可分区的分布式数据集,这个数据集的全部或部分可以缓存在内存中,在多次计算间重用. RDD内部可以 ...
最新文章
- c语言 int64 t占位符,为什么我会得到“您必须为dtype int64提供占位符张量输出值”?...
- 为什么是容器,Docker和Kubernetes?
- Linux/Unix 如何查看 man 搜索到的手册页(manual page)的位置及复制手册页的内容
- Android为网络请求自定义加载动画
- 成为LINUX系统管理员几点规则
- 手机端公告文本回滚(简单的jq代码)
- 1225 数数字
- Linux字符串转换函数汇总
- oracle数据的启动
- SCN和Checkpoint
- 桥架算量用什么软件_鹏业安装算量软件识别桥架
- 宇宙各种定律,也许可以改变你的命运。
- 系统更新win10服务器出错怎么办,windows10更新升级失败0x80072ee2解决方法
- mybatis中resultMap和resultType区别,三分钟读懂
- 玩机搞机---全网最详细的手机全机型 刷机教程 二
- Python数据分析实战【十二】:机器学习决策树算法案例实战【文末源码地址】
- 基于ZigBee和STM32的智能家居控制系统的设计与实现(四)
- 牛顿法和割线法方程求根(C语言)
- idea出现java___jb_old___
- 神仙爱情!年轻富翁捐5亿科研经费,让女友不用申请项目,专心搞科研!