目录

  • 一、Spark SQL支持的外部数据源
  • 二、Spark SQL —> CSV
    • 2.1 读CSV文件
      • a.有列名
      • b.无列名
    • 2.2 写CSV文件
  • 三、Spark SQL —> JSON
    • 3.1 读JSON文件
    • 3.2 写JSON文件
  • 四、Spark SQL —> Parquet
    • 读&写 Parquet文件
  • 五、Spark SQL —>Hive
    • 5.1 Spark集成Hive
    • 5.2 IDEA连接Hive
  • 六、Spark SQL —>MySQL

一、Spark SQL支持的外部数据源

Spark SQL开放了一系列接入外部数据源的接口,来让开发者可以实现Spark SQL可以加载任何地方的数据,例如mysql,hive,hdfs,hbase等,而且支持很多种格式如json, parquet, avro, csv格式…

  • Spark SQL的DataFrame接口支持多种数据源的操作。一个DataFrame可以进行RDDs方式的操作,也可以被注册为临时表。把DataFrame注册为临时表之后,就可以对该DataFrame执行SQL查询。


二、Spark SQL —> CSV

2.1 读CSV文件

a.有列名

e.g.

package cn.kgc.spark.Testimport org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}object Demo_LoadCSVFile {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[4]")val sc = new SparkContext(conf)val customer = sc.textFile("data/customer_details.csv")customer.map(x=>(x.split(",")))//创建SparkSession//通过SparkSession加载csc/json文件val spark = SparkSession.builder().config(conf).getOrCreate()//加载csv文件// .option("header",true)判断文件中的第一行是否为列的名称val df = spark.read.format("csv").option("header",true).load("data/customer_details.csv")df.show()df.printSchema()}
}

result:

"C:\Program Files\Java\jdk1.8.0_231\bin\java.exe" ...
2021-01-09 13:01:07,596 WARN [org.apache.spark.SparkContext] - Using an existing SparkContext; some configuration may not take effect.
+-----------+----------+-----------+--------------------+------+--------------------+-------+---------+--------------------+--------------------+--------------------+
|customer_id|first_name|  last_name|               email|gender|             address|country| language|                 job|         credit_type|           credit_no|
+-----------+----------+-----------+--------------------+------+--------------------+-------+---------+--------------------+--------------------+--------------------+
|          1|   Spencer|  Raffeorty|sraffeorty0@dropb...|  Male|    9274 Lyons Court|  China|    Khmer|Safety Technician...|                 jcb|   3589373385487660 |
|          2|    Cherye|     Poynor|      cpoynor1@51.la|Female|1377 Anzinger Avenue|  China|    Czech|      Research Nurse|        instapayment|   6376594861844530 |
|          3|   Natasha|  Abendroth|nabendroth2@scrib...|Female| 2913 Evergreen Lane|  China|  Yiddish|Budget/Accounting...|                visa|   4041591905616350 |
|          4|   Huntley|     Seally|  hseally3@prlog.org|  Male|    694 Del Sol Lane|  China| Albanian|Environmental Spe...|               laser| 677118310740263000 |
|          5|     Druci|       Coad|    dcoad4@weibo.com|Female|         16 Debs Way|  China|   Hebrew|             Teacher|                 jcb|   3537287259845040 |
|          6|     Sayer|    Brizell|sbrizell5@opensou...|  Male|  71 Banding Terrace|  China|  Maltese|       Accountant IV|     americanexpress|    379709885387687 |
|          7|     Becca|    Brawley|bbrawley6@sitemet...|Female|7 Doe Crossing Ju...|  China|    Czech|Payment Adjustmen...|                 jcb|   3545380000000000 |
|          8|   Michele|   Bastable|  mbastable7@sun.com|Female|98 Clyde Gallaghe...|  China|Malayalam|      Tax Accountant|                 jcb|   3588130000000000 |
|          9|     Marla|Brotherhood|mbrotherhood8@ill...|Female|4538 Fair Oaks Trail|  China|     Dari|     Design Engineer|      china-unionpay|5602230000000000000 |
|         10|  Lionello|    Gogarty|lgogarty9@histats...|  Male|      800 Sage Alley|  China|   Danish| Clinical Specialist|diners-club-carte...|     30290800000000 |
|         11|    Camile|     Ringer|   cringera@army.mil|Female|5060 Fairfield Alley|  China|  Punjabi|    Junior Executive|      china-unionpay|   5602210000000000 |
|         12|    Gillan|  Banbridge|gbanbridgeb@wikip...|Female|   91030 Havey Point|  China|  Kurdish|   Chemical Engineer|                 jcb|   3555950000000000 |
|         13|    Guinna|    Damsell|gdamsellc@spiegel.de|Female|       869 Ohio Park|  China|   Fijian|  Analyst Programmer|                 jcb|   3532010000000000 |
|         14|   Octavia|    McDugal|omcdugald@rambler.ru|Female|  413 Forster Center|  China|  English|Desktop Support T...|             maestro| 502018000000000000 |
|         15| Anjanette|       Penk|     apenke@lulu.com|Female|  8154 Schiller Road|  China|  Swedish|            VP Sales|                 jcb|   3548040000000000 |
|         16|     Maura|   Teesdale|mteesdalef@globo.com|Female|   9568 Quincy Alley|  China|    Dutch|    Dental Hygienist|                 jcb|   3582890000000000 |
|         17|  Chastity|     Neylon|    cneylong@wix.com|Female|11952 Northwester...|  China| Gujarati|         Geologist I|                 jcb|   3543180000000000 |
|         18|     Ralph|      Coils|rcoilsh@artisteer...|  Male|    5070 Hooker Pass|  China|    Khmer|  Speech Pathologist|          mastercard|   5505790000000000 |
|         19|   Jehanna|    Whybrow|    jwhybrowi@wp.com|Female|      83 Maywood Way|  China|   Fijian| Assistant Professor|       visa-electron|   4844570000000000 |
|         20|  Ingeberg|   Sutehall|isutehallj@feedbu...|Female| 272 Butternut Drive|  China|   Kazakh|   Assistant Manager|       visa-electron|   4405830000000000 |
+-----------+----------+-----------+--------------------+------+--------------------+-------+---------+--------------------+--------------------+--------------------+
only showing top 20 rowsroot|-- customer_id: string (nullable = true)|-- first_name: string (nullable = true)|-- last_name: string (nullable = true)|-- email: string (nullable = true)|-- gender: string (nullable = true)|-- address: string (nullable = true)|-- country: string (nullable = true)|-- language: string (nullable = true)|-- job: string (nullable = true)|-- credit_type: string (nullable = true)|-- credit_no: string (nullable = true)

b.无列名

加载的CSV文件无列名时,需要指定数据结构;
e.g.

"C:\Program Files\Java\jdk1.8.0_231\bin\java.exe" ...
package SparkSQLimport org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{StringType, StructField, StructType}object Demo9_LoadCSVFile {def main(args: Array[String]): Unit = {//创建SparkSessionval spark = SparkSession.builder().master("local[4]") //设置master.appName(this.getClass.getName) //设置appName.getOrCreate()//创建schema信息val schema = StructType(Array(StructField("number", StringType, true),StructField("date", StringType, true),StructField("orderNo", StringType, true),StructField("status", StringType, true)))val df = spark.read.format("csv").schema(schema).load("file:///F:\\notes\\java\\SparkFirst\\data2\\orders.csv")df.show()df.printSchema()}
}

result:

+------+-------------------+-------+---------------+
|number|               date|orderNo|         status|
+------+-------------------+-------+---------------+
|     1|2013-07-25 00:00:00|  11599|         CLOSED|
|     2|2013-07-25 00:00:00|    256|PENDING_PAYMENT|
|     3|2013-07-25 00:00:00|  12111|       COMPLETE|
|     4|2013-07-25 00:00:00|   8827|         CLOSED|
|     5|2013-07-25 00:00:00|  11318|       COMPLETE|
|     6|2013-07-25 00:00:00|   7130|       COMPLETE|
|     7|2013-07-25 00:00:00|   4530|       COMPLETE|
|     8|2013-07-25 00:00:00|   2911|     PROCESSING|
|     9|2013-07-25 00:00:00|   5657|PENDING_PAYMENT|
|    10|2013-07-25 00:00:00|   5648|PENDING_PAYMENT|
|    11|2013-07-25 00:00:00|    918| PAYMENT_REVIEW|
|    12|2013-07-25 00:00:00|   1837|         CLOSED|
|    13|2013-07-25 00:00:00|   9149|PENDING_PAYMENT|
|    14|2013-07-25 00:00:00|   9842|     PROCESSING|
|    15|2013-07-25 00:00:00|   2568|       COMPLETE|
|    16|2013-07-25 00:00:00|   7276|PENDING_PAYMENT|
|    17|2013-07-25 00:00:00|   2667|       COMPLETE|
|    18|2013-07-25 00:00:00|   1205|         CLOSED|
|    19|2013-07-25 00:00:00|   9488|PENDING_PAYMENT|
|    20|2013-07-25 00:00:00|   9198|     PROCESSING|
+------+-------------------+-------+---------------+
only showing top 20 rowsroot|-- number: string (nullable = true)|-- date: string (nullable = true)|-- orderNo: string (nullable = true)|-- status: string (nullable = true)Process finished with exit code 0

2.2 写CSV文件

将DataFrame向外写成CSV格式文件时,除了指定数据追加模式外,还可以指定数据分隔符;

df.write.format("csv").mode("overwrite").option("sep", "\t").save("F:\\notes\\java\\SparkFirst\\output\\CSVdownload")

e.g.

package SparkSQLimport org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}object Demo10_WriteCSVFile {def main(args: Array[String]): Unit = {//创建SparkSessionval spark = SparkSession.builder().master("local[4]") //设置master.appName(this.getClass.getName) //设置appName.getOrCreate()val sc=spark.sparkContextimport spark.implicits._val peopleRDD = sc.textFile("data2/people.txt")//将RDD通过和Schema信息关联,得到DataFrame//通过StructType构建Schema信息,StructField代表一个字段//第一个参数是字段名称,第二个参数是字段类型,第三个参数是是否可以为空val schema = StructType(Array(StructField("name", StringType, true),StructField("age", IntegerType, true)))//将每行的字符串切割,切割成Array,将其转化为RDD[Row]类型val peopleRowRDD = peopleRDD.map(_.split(",")).map(x=>Row(x(0),x(1).toInt))//将Row类型的RDD和Schema信息关联,创建一个DataFrameval df = spark.createDataFrame(peopleRowRDD,schema)df.write.format("csv").mode("overwrite").save("F:\\notes\\java\\SparkFirst\\output\\CSVdownload")}
}

result:


三、Spark SQL —> JSON

3.1 读JSON文件

与读取CSV文件类似:

e.g.

package SparkSQLimport org.apache.spark.sql.SparkSessionobject Demo4_LoadJSONFile {def main(args: Array[String]): Unit = {//创建SparkSessionval spark = SparkSession.builder().master("local[4]") //设置master.appName(this.getClass.getName) //设置appName.getOrCreate()import spark.implicits._//两种加载json/csv文件:spark.read.format("json").load("PATH") / spark.read.json("PATH")val sc=spark.sparkContextval dataFrame = spark.read.json("data/users.json")dataFrame.show()}
}

3.2 写JSON文件

与写入CSV格式一样,通过DataFrame.write()方法写入:

df.write.format("json").mode("overwrite").save("F:\\notes\\java\\SparkFirst\\output\\JSONdownload")

四、Spark SQL —> Parquet

读&写 Parquet文件

Parquet文件:是一种流行的列式存储格式,以二进制存储,文件中包含数据和元数据
e.g.

package sparksql2import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types._object Demo1_ReadWriteParquet {def main(args: Array[String]): Unit = {//创建SparkSessionval spark = SparkSession.builder().master("local[4]") //设置master.appName(this.getClass.getName) //设置appName.getOrCreate()import spark.implicits._val sc=spark.sparkContextval rdd = sc.parallelize(List(("curry", "yellow", Array(3, 9, 15, 20)), ("jack", "red", null)))// 1.定义schema信息val schema = StructType(Array(StructField("name", StringType, true),StructField("favorite_color", StringType, true),StructField("favorite_numbers", ArrayType(IntegerType), true)))// 2.准备Row类型的RDDval rowRDD = rdd.map(x=>Row(x._1,x._2,x._3))//3.通过Row类型的RDD和schema信息创建DataFrameval df = spark.createDataFrame(rowRDD,schema)//4.如果直接写入文件,默认就是Parquet格式df.write.save("F:\\notes\\java\\SparkFirst\\output\\parquet")//也可以手动指定parquet格式//.mode是写入模式,可以是overwrite(覆盖),也可以是append(追加)df.write.mode("overwrite").parquet("F:\\notes\\java\\SparkFirst\\output\\parquet")//5.使用.read.parquet方法可以读取parquet文件val df2 = spark.read.parquet(("F:\\notes\\java\\SparkFirst\\output\\parquet"))df2.show()}
}

result:

五、Spark SQL —>Hive

5.1 Spark集成Hive

  • 将hive/conf目录下的hive-site.xml复制到spark/conf目录下;
[root@single ~]#cp /opt/software/hadoop/hive110/conf/hive-site.xml  /opt/software/hadoop/spark244/conf
  • 将hive/lib目录下的mysql-connector-java-5.1.38.jar,复制到spark/jars目录下;
[root@single lib]# cp mysql-connector-java-5.1.32.jar  /opt/software/hadoop/spark244/jars/
  • 启动spark-shell并用拷贝的jar包;
spark-shell --jars /opt/software/hadoop/spark244/jars/mysql-connector-java-5.1.32.jar
  • 访问hive(记得先将Hive元数据服务启动)
scala>spark.sql("show databases").show()


5.2 IDEA连接Hive

  • MAVEN工程增加依赖(版本不一致见官网:https://mvnrepository.com/);
 <!-- spark-hive --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.11</artifactId><version>2.4.4</version></dependency><!-- mysql-connector-java --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.31</version></dependency>
  • 将hive-site.xml文件复制到rescourse资源文件夹内并将元数据地址补全

  • 创建工程连接Hive
package sparksql2import org.apache.spark.sql.SparkSessionobject Demo2_ConnectHive {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[4]") //设置master.appName(this.getClass.getName) //设置appName.enableHiveSupport() //开启Hive支持.getOrCreate()val df = spark.sql("select * from pv")df.show()}
}

TIPS:有可能出现的问题:MySQL访问被拒绝!可以考虑在linux下登录MySQL将登录用户重新授权修改密码并flush privileges刷新MySQL的系统权限相关表,或者也可以重新启动MySQL服务器使其生效。

mysql>grant all on *.* to 'root'@'%' identified by 'root';
mysql>grant all on *.* to 'root'@'localhost' identified by 'root';
mysql>flush privileges;

六、Spark SQL —>MySQL

  • 创建工程连接MySQL
package sparksql2import java.util.Propertiesimport org.apache.spark.sql.SparkSessionobject Demo3_ConnectMysql {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[4]") //设置master.appName(this.getClass.getName) //设置appName.enableHiveSupport().getOrCreate()//设置要访问的mysql的url,表名val url = "jdbc:mysql://192.168.182.130:3306/mysqltest"val tablename ="student"val props = new Properties()//设置要访问的mysql的用户名,密码,Driverprops.setProperty("user","root")props.setProperty("password","root")props.setProperty("driver","com.mysql.jdbc.Driver")//通过spark.read.jdbc方法读取mysql中的数据val df = spark.read.jdbc(url,tablename,props)df.show()}
}

"C:\Program Files\Java\jdk1.8.0_231\bin\java.exe"...
+---+--------+------+--------+
|sid|   sname|gender|class_id|
+---+--------+------+--------+
|  1|  孙尚香|    女|       1|
|  2|    貂蝉|    女|       1|
|  3|    刘备|    男|       2|
|  4|  孙二娘|    女|       2|
|  5|    张飞|    男|       3|
|  6|    关羽|    男|       4|
|  7|  林黛玉|    女|       5|
|  8|  薛宝钗|    女|       6|
|  9|    宋江|    男|       6|
| 10|  白骨精|    女|       7|
| 11|  猪八戒|    男|       8|
| 12|  王熙凤|    女|       1|
| 13|  李师师|    女|       2|
| 14|  金翠莲|    女|       9|
| 15|    如花|    女|       1|
| 16|    沙僧|    男|       2|
| 17|  李美丽|    女|       3|
| 18|金角大王|    男|       4|
+---+--------+------+--------+Process finished with exit code 0

PS:如果有写错或者写的不好的地方,欢迎各位大佬在评论区留下宝贵的意见或者建议,敬上!如果这篇博客对您有帮助,希望您可以顺手帮我点个赞!不胜感谢!

原创作者:wsjslient

作者主页:https://blog.csdn.net/wsjslient


Spark SQL操作外部数据源相关推荐

  1. Spark SQL操作多数据源

    Spark SQL支持通过DataFrame接口操作的多种不同的数据源.DataFrame提供支持统一的接口加载和保存数据源中的数据,包括:结构化数据,Parquet文件,JSON文件,Hive表 , ...

  2. Spark SQL与外部数据源的操作(Spark SQL ——> CSV/JSON/Parquet/hive/mysql)

    目录 一.Spark SQL支持的外部数据源 二.Spark SQL -> CSV 2.1 读CSV文件 a.有列名 b.无列名 2.2 写CSV文件 三.Spark SQL -> JSO ...

  3. Spark SQL连接外部数据源

    一.Spark SQL支持的外部数据源 Spark SQL开放了一系列接入外部数据源的接口,来让开发者可以实现Spark SQL可以 加载任何地方的数据,例如mysql,hive,hdfs,hbase ...

  4. Spark SQL操作之-函数汇总篇-下

    Spark SQL操作之-自定义函数篇-下 环境说明 自定义函数分类 用户自定义函数(UDF) 用户自定义聚合函数(UDAF) 环境说明 1. JDK 1.8 2. Spark 2.1 自定义函数分类 ...

  5. Spark操作外部数据源(RDBMS,Hive,HBase,Parquet)

    文章目录 一.Spark SQL 二.Spark on Hive 三.Hive on Spark 四.Spark读取Parquet文件 五.Spark连接HBase 1.Maven工程添加依赖 2.代 ...

  6. Spark SQL操作Hive表

    Spark SQL支持从Hive存储中读写数据.然而,Hive存在很多的依赖,而这些依赖又不包含在默认的各类Spark发型版本中.如果将Hive的依赖放入classpath中,Spark将自动加载它们 ...

  7. 第69课:Spark SQL通过Hive数据源JOIN实战 每天晚上20:00YY频道现场授课频道68917580

    /* * *王家林老师授课http://weibo.com/ilovepains */ 每天晚上20:00YY频道现场授课频道68917580 源文件 person.txt Michael 29  A ...

  8. 2018年又传喜报!热烈祝贺王家林大师大数据经典著作《Spark SQL大数据实例开发教程》 畅销书籍 出版上市!

    2018年又传喜报!热烈祝贺王家林大师大数据经典著作<Spark SQL大数据实例开发教程> 畅销书籍 出版上市! 作者: 王家林 段智华  条码书号:9787111591979 出版日期 ...

  9. Spark SQL之External DataSource外部数据源(二)源代码分析

    上周Spark1.2刚公布,周末在家没事,把这个特性给了解一下,顺便分析下源代码,看一看这个特性是怎样设计及实现的. /** Spark SQL源代码分析系列文章*/ (Ps: External Da ...

最新文章

  1. 百度、长沙加码自动驾驶,湖南阿波罗智行科技公司成立...
  2. [BZOJ4994] [Usaco2017 Feb]Why Did the Cow Cross the Road III(树状数组)
  3. java中文 x_java环境url中文参数乱码处理
  4. android 点击侧滑代码,代码分析Android实现侧滑菜单
  5. javascript设计模式研究学习-设计模式类别
  6. Mybatis 输入映射
  7. [Python] random.uniform( ) 函数教程与实例解析
  8. Linux系统文件的隐藏属性
  9. 管理感悟:代码审查做哪些事情?
  10. 如何真正理解三极管饱和 放大的含义 (必收藏)
  11. paypal java_PaypalUtil PayPal付款JAVA工具类
  12. RAR解压、压缩命令
  13. 关于同比和环比的几个问题
  14. 佛山科目二仙塘考场(B场)-考试要点
  15. python爬虫匹配uniport数据库的Pathway字段是否存在(方法一)
  16. 第一天-2.安装vmware虚拟机kali系统
  17. Python的import
  18. 朴素贝叶斯算法:实现邮件分类
  19. 利用腾讯 优图visionseed硬件 实现人脸疲劳检测项目(包括数据读取,数据保存,数据web端展示)
  20. 云栖大会,一场边缘云计算的「超前瞻」之约

热门文章

  1. 关于bootstrap 对于 IE9 的兼容问题
  2. 服务器和数据库哪一个更重要
  3. 如果不知道这些小技巧,你的Mac触控板就浪费了!
  4. (二)以太坊——在私有链进行转账操作
  5. 比技术债更可怕的人债
  6. C# 自定义纸张大小打印 PDF
  7. 群辉安装自定义mysql_群晖下docker搭建mysql环境体会
  8. frame被废除_废除Microsoft Office:G Suite机会
  9. PPTX新员工入场三级安全教育培训教材(附下载)
  10. 移动端/PC端网页开发建议