目录

  • 一、Spark SQL支持的外部数据源
  • 二、Spark SQL —> CSV
    • 2.1 读CSV文件
      • a.有列名
      • b.无列名
    • 2.2 写CSV文件
  • 三、Spark SQL —> JSON
    • 3.1 读JSON文件
    • 3.2 写JSON文件
  • 四、Spark SQL —> Parquet
    • 读&写 Parquet文件
  • 五、Spark SQL —>Hive
    • 5.1 Spark集成Hive
    • 5.2 IDEA连接Hive
  • 六、Spark SQL —>MySQL

一、Spark SQL支持的外部数据源

Spark SQL开放了一系列接入外部数据源的接口,来让开发者可以实现Spark SQL可以加载任何地方的数据,例如mysql,hive,hdfs,hbase等,而且支持很多种格式如json, parquet, avro, csv格式…


二、Spark SQL —> CSV

2.1 读CSV文件

a.有列名

e.g.

package cn.kgc.spark.Testimport org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}object Demo_LoadCSVFile {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[4]")val sc = new SparkContext(conf)val customer = sc.textFile("data/customer_details.csv")customer.map(x=>(x.split(",")))//创建SparkSession//通过SparkSession加载csc/json文件val spark = SparkSession.builder().config(conf).getOrCreate()//加载csv文件// .option("header",true)判断文件中的第一行是否为列的名称val df = spark.read.format("csv").option("header",true).load("data/customer_details.csv")df.show()df.printSchema()}
}

result:

"C:\Program Files\Java\jdk1.8.0_231\bin\java.exe" ...
2021-01-09 13:01:07,596 WARN [org.apache.spark.SparkContext] - Using an existing SparkContext; some configuration may not take effect.
+-----------+----------+-----------+--------------------+------+--------------------+-------+---------+--------------------+--------------------+--------------------+
|customer_id|first_name|  last_name|               email|gender|             address|country| language|                 job|         credit_type|           credit_no|
+-----------+----------+-----------+--------------------+------+--------------------+-------+---------+--------------------+--------------------+--------------------+
|          1|   Spencer|  Raffeorty|sraffeorty0@dropb...|  Male|    9274 Lyons Court|  China|    Khmer|Safety Technician...|                 jcb|   3589373385487660 |
|          2|    Cherye|     Poynor|      cpoynor1@51.la|Female|1377 Anzinger Avenue|  China|    Czech|      Research Nurse|        instapayment|   6376594861844530 |
|          3|   Natasha|  Abendroth|nabendroth2@scrib...|Female| 2913 Evergreen Lane|  China|  Yiddish|Budget/Accounting...|                visa|   4041591905616350 |
|          4|   Huntley|     Seally|  hseally3@prlog.org|  Male|    694 Del Sol Lane|  China| Albanian|Environmental Spe...|               laser| 677118310740263000 |
|          5|     Druci|       Coad|    dcoad4@weibo.com|Female|         16 Debs Way|  China|   Hebrew|             Teacher|                 jcb|   3537287259845040 |
|          6|     Sayer|    Brizell|sbrizell5@opensou...|  Male|  71 Banding Terrace|  China|  Maltese|       Accountant IV|     americanexpress|    379709885387687 |
|          7|     Becca|    Brawley|bbrawley6@sitemet...|Female|7 Doe Crossing Ju...|  China|    Czech|Payment Adjustmen...|                 jcb|   3545380000000000 |
|          8|   Michele|   Bastable|  mbastable7@sun.com|Female|98 Clyde Gallaghe...|  China|Malayalam|      Tax Accountant|                 jcb|   3588130000000000 |
|          9|     Marla|Brotherhood|mbrotherhood8@ill...|Female|4538 Fair Oaks Trail|  China|     Dari|     Design Engineer|      china-unionpay|5602230000000000000 |
|         10|  Lionello|    Gogarty|lgogarty9@histats...|  Male|      800 Sage Alley|  China|   Danish| Clinical Specialist|diners-club-carte...|     30290800000000 |
|         11|    Camile|     Ringer|   cringera@army.mil|Female|5060 Fairfield Alley|  China|  Punjabi|    Junior Executive|      china-unionpay|   5602210000000000 |
|         12|    Gillan|  Banbridge|gbanbridgeb@wikip...|Female|   91030 Havey Point|  China|  Kurdish|   Chemical Engineer|                 jcb|   3555950000000000 |
|         13|    Guinna|    Damsell|gdamsellc@spiegel.de|Female|       869 Ohio Park|  China|   Fijian|  Analyst Programmer|                 jcb|   3532010000000000 |
|         14|   Octavia|    McDugal|omcdugald@rambler.ru|Female|  413 Forster Center|  China|  English|Desktop Support T...|             maestro| 502018000000000000 |
|         15| Anjanette|       Penk|     apenke@lulu.com|Female|  8154 Schiller Road|  China|  Swedish|            VP Sales|                 jcb|   3548040000000000 |
|         16|     Maura|   Teesdale|mteesdalef@globo.com|Female|   9568 Quincy Alley|  China|    Dutch|    Dental Hygienist|                 jcb|   3582890000000000 |
|         17|  Chastity|     Neylon|    cneylong@wix.com|Female|11952 Northwester...|  China| Gujarati|         Geologist I|                 jcb|   3543180000000000 |
|         18|     Ralph|      Coils|rcoilsh@artisteer...|  Male|    5070 Hooker Pass|  China|    Khmer|  Speech Pathologist|          mastercard|   5505790000000000 |
|         19|   Jehanna|    Whybrow|    jwhybrowi@wp.com|Female|      83 Maywood Way|  China|   Fijian| Assistant Professor|       visa-electron|   4844570000000000 |
|         20|  Ingeberg|   Sutehall|isutehallj@feedbu...|Female| 272 Butternut Drive|  China|   Kazakh|   Assistant Manager|       visa-electron|   4405830000000000 |
+-----------+----------+-----------+--------------------+------+--------------------+-------+---------+--------------------+--------------------+--------------------+
only showing top 20 rowsroot|-- customer_id: string (nullable = true)|-- first_name: string (nullable = true)|-- last_name: string (nullable = true)|-- email: string (nullable = true)|-- gender: string (nullable = true)|-- address: string (nullable = true)|-- country: string (nullable = true)|-- language: string (nullable = true)|-- job: string (nullable = true)|-- credit_type: string (nullable = true)|-- credit_no: string (nullable = true)

b.无列名

加载的CSV文件无列名时,需要指定数据结构;
e.g.

"C:\Program Files\Java\jdk1.8.0_231\bin\java.exe" ...
package SparkSQLimport org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{StringType, StructField, StructType}object Demo9_LoadCSVFile {def main(args: Array[String]): Unit = {//创建SparkSessionval spark = SparkSession.builder().master("local[4]") //设置master.appName(this.getClass.getName) //设置appName.getOrCreate()//创建schema信息val schema = StructType(Array(StructField("number", StringType, true),StructField("date", StringType, true),StructField("orderNo", StringType, true),StructField("status", StringType, true)))val df = spark.read.format("csv").schema(schema).load("file:///F:\\notes\\java\\SparkFirst\\data2\\orders.csv")df.show()df.printSchema()}
}

result:

+------+-------------------+-------+---------------+
|number|               date|orderNo|         status|
+------+-------------------+-------+---------------+
|     1|2013-07-25 00:00:00|  11599|         CLOSED|
|     2|2013-07-25 00:00:00|    256|PENDING_PAYMENT|
|     3|2013-07-25 00:00:00|  12111|       COMPLETE|
|     4|2013-07-25 00:00:00|   8827|         CLOSED|
|     5|2013-07-25 00:00:00|  11318|       COMPLETE|
|     6|2013-07-25 00:00:00|   7130|       COMPLETE|
|     7|2013-07-25 00:00:00|   4530|       COMPLETE|
|     8|2013-07-25 00:00:00|   2911|     PROCESSING|
|     9|2013-07-25 00:00:00|   5657|PENDING_PAYMENT|
|    10|2013-07-25 00:00:00|   5648|PENDING_PAYMENT|
|    11|2013-07-25 00:00:00|    918| PAYMENT_REVIEW|
|    12|2013-07-25 00:00:00|   1837|         CLOSED|
|    13|2013-07-25 00:00:00|   9149|PENDING_PAYMENT|
|    14|2013-07-25 00:00:00|   9842|     PROCESSING|
|    15|2013-07-25 00:00:00|   2568|       COMPLETE|
|    16|2013-07-25 00:00:00|   7276|PENDING_PAYMENT|
|    17|2013-07-25 00:00:00|   2667|       COMPLETE|
|    18|2013-07-25 00:00:00|   1205|         CLOSED|
|    19|2013-07-25 00:00:00|   9488|PENDING_PAYMENT|
|    20|2013-07-25 00:00:00|   9198|     PROCESSING|
+------+-------------------+-------+---------------+
only showing top 20 rowsroot|-- number: string (nullable = true)|-- date: string (nullable = true)|-- orderNo: string (nullable = true)|-- status: string (nullable = true)Process finished with exit code 0

2.2 写CSV文件

将DataFrame向外写成CSV格式文件时,除了指定数据追加模式外,还可以指定数据分隔符;

df.write.format("csv").mode("overwrite").option("sep", "\t").save("F:\\notes\\java\\SparkFirst\\output\\CSVdownload")

e.g.

package SparkSQLimport org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}object Demo10_WriteCSVFile {def main(args: Array[String]): Unit = {//创建SparkSessionval spark = SparkSession.builder().master("local[4]") //设置master.appName(this.getClass.getName) //设置appName.getOrCreate()val sc=spark.sparkContextimport spark.implicits._val peopleRDD = sc.textFile("data2/people.txt")//将RDD通过和Schema信息关联,得到DataFrame//通过StructType构建Schema信息,StructField代表一个字段//第一个参数是字段名称,第二个参数是字段类型,第三个参数是是否可以为空val schema = StructType(Array(StructField("name", StringType, true),StructField("age", IntegerType, true)))//将每行的字符串切割,切割成Array,将其转化为RDD[Row]类型val peopleRowRDD = peopleRDD.map(_.split(",")).map(x=>Row(x(0),x(1).toInt))//将Row类型的RDD和Schema信息关联,创建一个DataFrameval df = spark.createDataFrame(peopleRowRDD,schema)df.write.format("csv").mode("overwrite").save("F:\\notes\\java\\SparkFirst\\output\\CSVdownload")}
}

result:


三、Spark SQL —> JSON

3.1 读JSON文件

与读取CSV文件类似:

e.g.

package SparkSQLimport org.apache.spark.sql.SparkSessionobject Demo4_LoadJSONFile {def main(args: Array[String]): Unit = {//创建SparkSessionval spark = SparkSession.builder().master("local[4]") //设置master.appName(this.getClass.getName) //设置appName.getOrCreate()import spark.implicits._//两种加载json/csv文件:spark.read.format("json").load("PATH") / spark.read.json("PATH")val sc=spark.sparkContextval dataFrame = spark.read.json("data/users.json")dataFrame.show()}
}

3.2 写JSON文件

与写入CSV格式一样,通过DataFrame.write()方法写入:

df.write.format("json").mode("overwrite").save("F:\\notes\\java\\SparkFirst\\output\\JSONdownload")

四、Spark SQL —> Parquet

读&写 Parquet文件

Parquet文件:是一种流行的列式存储格式,以二进制存储,文件中包含数据和元数据
e.g.

package sparksql2import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types._object Demo1_ReadWriteParquet {def main(args: Array[String]): Unit = {//创建SparkSessionval spark = SparkSession.builder().master("local[4]") //设置master.appName(this.getClass.getName) //设置appName.getOrCreate()import spark.implicits._val sc=spark.sparkContextval rdd = sc.parallelize(List(("curry", "yellow", Array(3, 9, 15, 20)), ("jack", "red", null)))// 1.定义schema信息val schema = StructType(Array(StructField("name", StringType, true),StructField("favorite_color", StringType, true),StructField("favorite_numbers", ArrayType(IntegerType), true)))// 2.准备Row类型的RDDval rowRDD = rdd.map(x=>Row(x._1,x._2,x._3))//3.通过Row类型的RDD和schema信息创建DataFrameval df = spark.createDataFrame(rowRDD,schema)//4.如果直接写入文件,默认就是Parquet格式df.write.save("F:\\notes\\java\\SparkFirst\\output\\parquet")//也可以手动指定parquet格式//.mode是写入模式,可以是overwrite(覆盖),也可以是append(追加)df.write.mode("overwrite").parquet("F:\\notes\\java\\SparkFirst\\output\\parquet")//5.使用.read.parquet方法可以读取parquet文件val df2 = spark.read.parquet(("F:\\notes\\java\\SparkFirst\\output\\parquet"))df2.show()}
}

result:

五、Spark SQL —>Hive

5.1 Spark集成Hive

[root@single ~]#cp /opt/software/hadoop/hive110/conf/hive-site.xml  /opt/software/hadoop/spark244/conf
[root@single lib]# cp mysql-connector-java-5.1.32.jar  /opt/software/hadoop/spark244/jars/
spark-shell --jars /opt/software/hadoop/spark244/jars/mysql-connector-java-5.1.32.jar
scala>spark.sql("show databases").show()


5.2 IDEA连接Hive

 <!-- spark-hive --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.11</artifactId><version>2.4.4</version></dependency><!-- mysql-connector-java --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.31</version></dependency>

package sparksql2import org.apache.spark.sql.SparkSessionobject Demo2_ConnectHive {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[4]") //设置master.appName(this.getClass.getName) //设置appName.enableHiveSupport() //开启Hive支持.getOrCreate()val df = spark.sql("select * from pv")df.show()}
}

TIPS:有可能出现的问题:MySQL访问被拒绝!可以考虑在linux下登录MySQL将登录用户重新授权修改密码并flush privileges刷新MySQL的系统权限相关表,或者也可以重新启动MySQL服务器使其生效。

mysql>grant all on *.* to 'root'@'%' identified by 'root';
mysql>grant all on *.* to 'root'@'localhost' identified by 'root';
mysql>flush privileges;

六、Spark SQL —>MySQL

package sparksql2import java.util.Propertiesimport org.apache.spark.sql.SparkSessionobject Demo3_ConnectMysql {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[4]") //设置master.appName(this.getClass.getName) //设置appName.enableHiveSupport().getOrCreate()//设置要访问的mysql的url,表名val url = "jdbc:mysql://192.168.182.130:3306/mysqltest"val tablename ="student"val props = new Properties()//设置要访问的mysql的用户名,密码,Driverprops.setProperty("user","root")props.setProperty("password","root")props.setProperty("driver","com.mysql.jdbc.Driver")//通过spark.read.jdbc方法读取mysql中的数据val df = spark.read.jdbc(url,tablename,props)df.show()}
}

"C:\Program Files\Java\jdk1.8.0_231\bin\java.exe"...
+---+--------+------+--------+
|sid|   sname|gender|class_id|
+---+--------+------+--------+
|  1|  孙尚香|    女|       1|
|  2|    貂蝉|    女|       1|
|  3|    刘备|    男|       2|
|  4|  孙二娘|    女|       2|
|  5|    张飞|    男|       3|
|  6|    关羽|    男|       4|
|  7|  林黛玉|    女|       5|
|  8|  薛宝钗|    女|       6|
|  9|    宋江|    男|       6|
| 10|  白骨精|    女|       7|
| 11|  猪八戒|    男|       8|
| 12|  王熙凤|    女|       1|
| 13|  李师师|    女|       2|
| 14|  金翠莲|    女|       9|
| 15|    如花|    女|       1|
| 16|    沙僧|    男|       2|
| 17|  李美丽|    女|       3|
| 18|金角大王|    男|       4|
+---+--------+------+--------+Process finished with exit code 0

Spark SQL与外部数据源的操作(Spark SQL ——> CSV/JSON/Parquet/hive/mysql)相关推荐

  1. spark集群配置以及java操作spark小demo

    spark 安装 配置 使用java来操作spark spark 安装 tar -zxvf spark-2.4.0-bin-hadoop2.7.tgz rm spark-2.4.0-bin-hadoo ...

  2. Spark SQL连接外部数据源

    一.Spark SQL支持的外部数据源 Spark SQL开放了一系列接入外部数据源的接口,来让开发者可以实现Spark SQL可以 加载任何地方的数据,例如mysql,hive,hdfs,hbase ...

  3. 命令行客户端MySQL基本命令的使用(登录、登出、数据库操作的SQL语句、表结构的SQL语句、表数据操作的SQL语句)

    1. 登录和登出数据库 登录数据库: 输入下面命令: mysql -uroot -p 说明: -u 后面是登录的用户名  [写成-u root也是可以的] -p 后面是登录密码, 如果不填写, 回车之 ...

  4. 数据库学习day_01:SQL的发展和数据库操作相关sql语句

    1.数据库 学习数据库主要学习的就是如何对数据进行增删改查操作. 增加(插入数据) 删除数据 修改数据 查询数据 为什么使用数据库软件? 之前在webserver时通过IO技术已经操作过数据,其实这部 ...

  5. Spark SQL操作外部数据源

    目录 一.Spark SQL支持的外部数据源 二.Spark SQL -> CSV 2.1 读CSV文件 a.有列名 b.无列名 2.2 写CSV文件 三.Spark SQL -> JSO ...

  6. Spark SQL之External DataSource外部数据源(二)源代码分析

    上周Spark1.2刚公布,周末在家没事,把这个特性给了解一下,顺便分析下源代码,看一看这个特性是怎样设计及实现的. /** Spark SQL源代码分析系列文章*/ (Ps: External Da ...

  7. Spark SQL External DataSource外部数据源

    一:介绍 官网:https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html 随着Spark1.2的发布 ...

  8. Spark SQL 外部数据源

    一.简介 1.1 多数据源支持 Spark 支持以下六个核心数据源,同时 Spark 社区还提供了多达上百种数据源的读取方式,能够满足绝大部分使用场景. CSV JSON Parquet ORC JD ...

  9. Spark RDD、DataFrame原理及操作详解

    RDD是什么? RDD (resilientdistributed dataset),指的是一个只读的,可分区的分布式数据集,这个数据集的全部或部分可以缓存在内存中,在多次计算间重用. RDD内部可以 ...

最新文章

  1. c语言 int64 t占位符,为什么我会得到“您必须为dtype int64提供占位符张量输出值”?...
  2. 为什么是容器,Docker和Kubernetes?
  3. Linux/Unix 如何查看 man 搜索到的手册页(manual page)的位置及复制手册页的内容
  4. Android为网络请求自定义加载动画
  5. 成为LINUX系统管理员几点规则
  6. 手机端公告文本回滚(简单的jq代码)
  7. 1225 数数字
  8. Linux字符串转换函数汇总
  9. oracle数据的启动
  10. SCN和Checkpoint
  11. 桥架算量用什么软件_鹏业安装算量软件识别桥架
  12. 宇宙各种定律,也许可以改变你的命运。
  13. 系统更新win10服务器出错怎么办,windows10更新升级失败0x80072ee2解决方法
  14. mybatis中resultMap和resultType区别,三分钟读懂
  15. 玩机搞机---全网最详细的手机全机型 刷机教程 二
  16. Python数据分析实战【十二】:机器学习决策树算法案例实战【文末源码地址】
  17. 基于ZigBee和STM32的智能家居控制系统的设计与实现(四)
  18. 牛顿法和割线法方程求根(C语言)
  19. idea出现java___jb_old___
  20. 神仙爱情!年轻富翁捐5亿科研经费,让女友不用申请项目,专心搞科研!

热门文章

  1. 超过C++、压制Java与C,Python拔得TIOBE年度编程语言!
  2. 英特尔推深度学习加速工具包OpenVINO,布局边缘计算,发力物联网业务
  3. 专访 | 在AI 医疗这个热门的赛道上,阿里在怎么玩?
  4. Redlock——Redis集群分布式锁
  5. Java 中的语法糖,真甜。
  6. nginx 反向代理和负载均衡策略实战案例
  7. GNN教程:第六篇Spectral算法细节详解!
  8. 中国移动这个编程大赛来了!
  9. 从理论到实践,Top选手带你进入数据竞赛的大门
  10. 3亿人养老靠机器人?这家公司要在2030年实现,有谱