Spark SQL操作外部数据源
目录
- 一、Spark SQL支持的外部数据源
- 二、Spark SQL —> CSV
- 2.1 读CSV文件
- a.有列名
- b.无列名
- 2.2 写CSV文件
- 三、Spark SQL —> JSON
- 3.1 读JSON文件
- 3.2 写JSON文件
- 四、Spark SQL —> Parquet
- 读&写 Parquet文件
- 五、Spark SQL —>Hive
- 5.1 Spark集成Hive
- 5.2 IDEA连接Hive
- 六、Spark SQL —>MySQL
一、Spark SQL支持的外部数据源
Spark SQL开放了一系列接入外部数据源的接口,来让开发者可以实现Spark SQL可以加载任何地方的数据,例如mysql,hive,hdfs,hbase等,而且支持很多种格式如json, parquet, avro, csv格式…
- Spark SQL的DataFrame接口支持多种数据源的操作。一个DataFrame可以进行RDDs方式的操作,也可以被注册为临时表。把DataFrame注册为临时表之后,就可以对该DataFrame执行SQL查询。
二、Spark SQL —> CSV
2.1 读CSV文件
a.有列名
e.g.
package cn.kgc.spark.Testimport org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}object Demo_LoadCSVFile {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[4]")val sc = new SparkContext(conf)val customer = sc.textFile("data/customer_details.csv")customer.map(x=>(x.split(",")))//创建SparkSession//通过SparkSession加载csc/json文件val spark = SparkSession.builder().config(conf).getOrCreate()//加载csv文件// .option("header",true)判断文件中的第一行是否为列的名称val df = spark.read.format("csv").option("header",true).load("data/customer_details.csv")df.show()df.printSchema()}
}
result:
"C:\Program Files\Java\jdk1.8.0_231\bin\java.exe" ...
2021-01-09 13:01:07,596 WARN [org.apache.spark.SparkContext] - Using an existing SparkContext; some configuration may not take effect.
+-----------+----------+-----------+--------------------+------+--------------------+-------+---------+--------------------+--------------------+--------------------+
|customer_id|first_name| last_name| email|gender| address|country| language| job| credit_type| credit_no|
+-----------+----------+-----------+--------------------+------+--------------------+-------+---------+--------------------+--------------------+--------------------+
| 1| Spencer| Raffeorty|sraffeorty0@dropb...| Male| 9274 Lyons Court| China| Khmer|Safety Technician...| jcb| 3589373385487660 |
| 2| Cherye| Poynor| cpoynor1@51.la|Female|1377 Anzinger Avenue| China| Czech| Research Nurse| instapayment| 6376594861844530 |
| 3| Natasha| Abendroth|nabendroth2@scrib...|Female| 2913 Evergreen Lane| China| Yiddish|Budget/Accounting...| visa| 4041591905616350 |
| 4| Huntley| Seally| hseally3@prlog.org| Male| 694 Del Sol Lane| China| Albanian|Environmental Spe...| laser| 677118310740263000 |
| 5| Druci| Coad| dcoad4@weibo.com|Female| 16 Debs Way| China| Hebrew| Teacher| jcb| 3537287259845040 |
| 6| Sayer| Brizell|sbrizell5@opensou...| Male| 71 Banding Terrace| China| Maltese| Accountant IV| americanexpress| 379709885387687 |
| 7| Becca| Brawley|bbrawley6@sitemet...|Female|7 Doe Crossing Ju...| China| Czech|Payment Adjustmen...| jcb| 3545380000000000 |
| 8| Michele| Bastable| mbastable7@sun.com|Female|98 Clyde Gallaghe...| China|Malayalam| Tax Accountant| jcb| 3588130000000000 |
| 9| Marla|Brotherhood|mbrotherhood8@ill...|Female|4538 Fair Oaks Trail| China| Dari| Design Engineer| china-unionpay|5602230000000000000 |
| 10| Lionello| Gogarty|lgogarty9@histats...| Male| 800 Sage Alley| China| Danish| Clinical Specialist|diners-club-carte...| 30290800000000 |
| 11| Camile| Ringer| cringera@army.mil|Female|5060 Fairfield Alley| China| Punjabi| Junior Executive| china-unionpay| 5602210000000000 |
| 12| Gillan| Banbridge|gbanbridgeb@wikip...|Female| 91030 Havey Point| China| Kurdish| Chemical Engineer| jcb| 3555950000000000 |
| 13| Guinna| Damsell|gdamsellc@spiegel.de|Female| 869 Ohio Park| China| Fijian| Analyst Programmer| jcb| 3532010000000000 |
| 14| Octavia| McDugal|omcdugald@rambler.ru|Female| 413 Forster Center| China| English|Desktop Support T...| maestro| 502018000000000000 |
| 15| Anjanette| Penk| apenke@lulu.com|Female| 8154 Schiller Road| China| Swedish| VP Sales| jcb| 3548040000000000 |
| 16| Maura| Teesdale|mteesdalef@globo.com|Female| 9568 Quincy Alley| China| Dutch| Dental Hygienist| jcb| 3582890000000000 |
| 17| Chastity| Neylon| cneylong@wix.com|Female|11952 Northwester...| China| Gujarati| Geologist I| jcb| 3543180000000000 |
| 18| Ralph| Coils|rcoilsh@artisteer...| Male| 5070 Hooker Pass| China| Khmer| Speech Pathologist| mastercard| 5505790000000000 |
| 19| Jehanna| Whybrow| jwhybrowi@wp.com|Female| 83 Maywood Way| China| Fijian| Assistant Professor| visa-electron| 4844570000000000 |
| 20| Ingeberg| Sutehall|isutehallj@feedbu...|Female| 272 Butternut Drive| China| Kazakh| Assistant Manager| visa-electron| 4405830000000000 |
+-----------+----------+-----------+--------------------+------+--------------------+-------+---------+--------------------+--------------------+--------------------+
only showing top 20 rowsroot|-- customer_id: string (nullable = true)|-- first_name: string (nullable = true)|-- last_name: string (nullable = true)|-- email: string (nullable = true)|-- gender: string (nullable = true)|-- address: string (nullable = true)|-- country: string (nullable = true)|-- language: string (nullable = true)|-- job: string (nullable = true)|-- credit_type: string (nullable = true)|-- credit_no: string (nullable = true)
b.无列名
加载的CSV文件无列名时,需要指定数据结构;
e.g.
"C:\Program Files\Java\jdk1.8.0_231\bin\java.exe" ...
package SparkSQLimport org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{StringType, StructField, StructType}object Demo9_LoadCSVFile {def main(args: Array[String]): Unit = {//创建SparkSessionval spark = SparkSession.builder().master("local[4]") //设置master.appName(this.getClass.getName) //设置appName.getOrCreate()//创建schema信息val schema = StructType(Array(StructField("number", StringType, true),StructField("date", StringType, true),StructField("orderNo", StringType, true),StructField("status", StringType, true)))val df = spark.read.format("csv").schema(schema).load("file:///F:\\notes\\java\\SparkFirst\\data2\\orders.csv")df.show()df.printSchema()}
}
result:
+------+-------------------+-------+---------------+
|number| date|orderNo| status|
+------+-------------------+-------+---------------+
| 1|2013-07-25 00:00:00| 11599| CLOSED|
| 2|2013-07-25 00:00:00| 256|PENDING_PAYMENT|
| 3|2013-07-25 00:00:00| 12111| COMPLETE|
| 4|2013-07-25 00:00:00| 8827| CLOSED|
| 5|2013-07-25 00:00:00| 11318| COMPLETE|
| 6|2013-07-25 00:00:00| 7130| COMPLETE|
| 7|2013-07-25 00:00:00| 4530| COMPLETE|
| 8|2013-07-25 00:00:00| 2911| PROCESSING|
| 9|2013-07-25 00:00:00| 5657|PENDING_PAYMENT|
| 10|2013-07-25 00:00:00| 5648|PENDING_PAYMENT|
| 11|2013-07-25 00:00:00| 918| PAYMENT_REVIEW|
| 12|2013-07-25 00:00:00| 1837| CLOSED|
| 13|2013-07-25 00:00:00| 9149|PENDING_PAYMENT|
| 14|2013-07-25 00:00:00| 9842| PROCESSING|
| 15|2013-07-25 00:00:00| 2568| COMPLETE|
| 16|2013-07-25 00:00:00| 7276|PENDING_PAYMENT|
| 17|2013-07-25 00:00:00| 2667| COMPLETE|
| 18|2013-07-25 00:00:00| 1205| CLOSED|
| 19|2013-07-25 00:00:00| 9488|PENDING_PAYMENT|
| 20|2013-07-25 00:00:00| 9198| PROCESSING|
+------+-------------------+-------+---------------+
only showing top 20 rowsroot|-- number: string (nullable = true)|-- date: string (nullable = true)|-- orderNo: string (nullable = true)|-- status: string (nullable = true)Process finished with exit code 0
2.2 写CSV文件
将DataFrame向外写成CSV格式文件时,除了指定数据追加模式外,还可以指定数据分隔符;
df.write.format("csv").mode("overwrite").option("sep", "\t").save("F:\\notes\\java\\SparkFirst\\output\\CSVdownload")
e.g.
package SparkSQLimport org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}object Demo10_WriteCSVFile {def main(args: Array[String]): Unit = {//创建SparkSessionval spark = SparkSession.builder().master("local[4]") //设置master.appName(this.getClass.getName) //设置appName.getOrCreate()val sc=spark.sparkContextimport spark.implicits._val peopleRDD = sc.textFile("data2/people.txt")//将RDD通过和Schema信息关联,得到DataFrame//通过StructType构建Schema信息,StructField代表一个字段//第一个参数是字段名称,第二个参数是字段类型,第三个参数是是否可以为空val schema = StructType(Array(StructField("name", StringType, true),StructField("age", IntegerType, true)))//将每行的字符串切割,切割成Array,将其转化为RDD[Row]类型val peopleRowRDD = peopleRDD.map(_.split(",")).map(x=>Row(x(0),x(1).toInt))//将Row类型的RDD和Schema信息关联,创建一个DataFrameval df = spark.createDataFrame(peopleRowRDD,schema)df.write.format("csv").mode("overwrite").save("F:\\notes\\java\\SparkFirst\\output\\CSVdownload")}
}
result:
三、Spark SQL —> JSON
3.1 读JSON文件
与读取CSV文件类似:
e.g.
package SparkSQLimport org.apache.spark.sql.SparkSessionobject Demo4_LoadJSONFile {def main(args: Array[String]): Unit = {//创建SparkSessionval spark = SparkSession.builder().master("local[4]") //设置master.appName(this.getClass.getName) //设置appName.getOrCreate()import spark.implicits._//两种加载json/csv文件:spark.read.format("json").load("PATH") / spark.read.json("PATH")val sc=spark.sparkContextval dataFrame = spark.read.json("data/users.json")dataFrame.show()}
}
3.2 写JSON文件
与写入CSV格式一样,通过DataFrame.write()方法写入:
df.write.format("json").mode("overwrite").save("F:\\notes\\java\\SparkFirst\\output\\JSONdownload")
四、Spark SQL —> Parquet
读&写 Parquet文件
Parquet文件:是一种流行的列式存储格式,以二进制存储,文件中包含数据和元数据
e.g.
package sparksql2import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types._object Demo1_ReadWriteParquet {def main(args: Array[String]): Unit = {//创建SparkSessionval spark = SparkSession.builder().master("local[4]") //设置master.appName(this.getClass.getName) //设置appName.getOrCreate()import spark.implicits._val sc=spark.sparkContextval rdd = sc.parallelize(List(("curry", "yellow", Array(3, 9, 15, 20)), ("jack", "red", null)))// 1.定义schema信息val schema = StructType(Array(StructField("name", StringType, true),StructField("favorite_color", StringType, true),StructField("favorite_numbers", ArrayType(IntegerType), true)))// 2.准备Row类型的RDDval rowRDD = rdd.map(x=>Row(x._1,x._2,x._3))//3.通过Row类型的RDD和schema信息创建DataFrameval df = spark.createDataFrame(rowRDD,schema)//4.如果直接写入文件,默认就是Parquet格式df.write.save("F:\\notes\\java\\SparkFirst\\output\\parquet")//也可以手动指定parquet格式//.mode是写入模式,可以是overwrite(覆盖),也可以是append(追加)df.write.mode("overwrite").parquet("F:\\notes\\java\\SparkFirst\\output\\parquet")//5.使用.read.parquet方法可以读取parquet文件val df2 = spark.read.parquet(("F:\\notes\\java\\SparkFirst\\output\\parquet"))df2.show()}
}
result:
五、Spark SQL —>Hive
5.1 Spark集成Hive
- 将hive/conf目录下的hive-site.xml复制到spark/conf目录下;
[root@single ~]#cp /opt/software/hadoop/hive110/conf/hive-site.xml /opt/software/hadoop/spark244/conf
- 将hive/lib目录下的mysql-connector-java-5.1.38.jar,复制到spark/jars目录下;
[root@single lib]# cp mysql-connector-java-5.1.32.jar /opt/software/hadoop/spark244/jars/
- 启动spark-shell并用拷贝的jar包;
spark-shell --jars /opt/software/hadoop/spark244/jars/mysql-connector-java-5.1.32.jar
- 访问hive(记得先将Hive元数据服务启动)
scala>spark.sql("show databases").show()
5.2 IDEA连接Hive
- MAVEN工程增加依赖(版本不一致见官网:https://mvnrepository.com/);
<!-- spark-hive --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.11</artifactId><version>2.4.4</version></dependency><!-- mysql-connector-java --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.31</version></dependency>
- 将hive-site.xml文件复制到rescourse资源文件夹内并将元数据地址补全;
- 创建工程连接Hive
package sparksql2import org.apache.spark.sql.SparkSessionobject Demo2_ConnectHive {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[4]") //设置master.appName(this.getClass.getName) //设置appName.enableHiveSupport() //开启Hive支持.getOrCreate()val df = spark.sql("select * from pv")df.show()}
}
TIPS:有可能出现的问题:MySQL访问被拒绝!可以考虑在linux下登录MySQL将登录用户重新授权修改密码并flush privileges刷新MySQL的系统权限相关表,或者也可以重新启动MySQL服务器使其生效。
mysql>grant all on *.* to 'root'@'%' identified by 'root';
mysql>grant all on *.* to 'root'@'localhost' identified by 'root';
mysql>flush privileges;
六、Spark SQL —>MySQL
- 创建工程连接MySQL
package sparksql2import java.util.Propertiesimport org.apache.spark.sql.SparkSessionobject Demo3_ConnectMysql {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[4]") //设置master.appName(this.getClass.getName) //设置appName.enableHiveSupport().getOrCreate()//设置要访问的mysql的url,表名val url = "jdbc:mysql://192.168.182.130:3306/mysqltest"val tablename ="student"val props = new Properties()//设置要访问的mysql的用户名,密码,Driverprops.setProperty("user","root")props.setProperty("password","root")props.setProperty("driver","com.mysql.jdbc.Driver")//通过spark.read.jdbc方法读取mysql中的数据val df = spark.read.jdbc(url,tablename,props)df.show()}
}
"C:\Program Files\Java\jdk1.8.0_231\bin\java.exe"...
+---+--------+------+--------+
|sid| sname|gender|class_id|
+---+--------+------+--------+
| 1| 孙尚香| 女| 1|
| 2| 貂蝉| 女| 1|
| 3| 刘备| 男| 2|
| 4| 孙二娘| 女| 2|
| 5| 张飞| 男| 3|
| 6| 关羽| 男| 4|
| 7| 林黛玉| 女| 5|
| 8| 薛宝钗| 女| 6|
| 9| 宋江| 男| 6|
| 10| 白骨精| 女| 7|
| 11| 猪八戒| 男| 8|
| 12| 王熙凤| 女| 1|
| 13| 李师师| 女| 2|
| 14| 金翠莲| 女| 9|
| 15| 如花| 女| 1|
| 16| 沙僧| 男| 2|
| 17| 李美丽| 女| 3|
| 18|金角大王| 男| 4|
+---+--------+------+--------+Process finished with exit code 0
PS:如果有写错或者写的不好的地方,欢迎各位大佬在评论区留下宝贵的意见或者建议,敬上!如果这篇博客对您有帮助,希望您可以顺手帮我点个赞!不胜感谢!
原创作者:wsjslient |
作者主页:https://blog.csdn.net/wsjslient |
Spark SQL操作外部数据源相关推荐
- Spark SQL操作多数据源
Spark SQL支持通过DataFrame接口操作的多种不同的数据源.DataFrame提供支持统一的接口加载和保存数据源中的数据,包括:结构化数据,Parquet文件,JSON文件,Hive表 , ...
- Spark SQL与外部数据源的操作(Spark SQL ——> CSV/JSON/Parquet/hive/mysql)
目录 一.Spark SQL支持的外部数据源 二.Spark SQL -> CSV 2.1 读CSV文件 a.有列名 b.无列名 2.2 写CSV文件 三.Spark SQL -> JSO ...
- Spark SQL连接外部数据源
一.Spark SQL支持的外部数据源 Spark SQL开放了一系列接入外部数据源的接口,来让开发者可以实现Spark SQL可以 加载任何地方的数据,例如mysql,hive,hdfs,hbase ...
- Spark SQL操作之-函数汇总篇-下
Spark SQL操作之-自定义函数篇-下 环境说明 自定义函数分类 用户自定义函数(UDF) 用户自定义聚合函数(UDAF) 环境说明 1. JDK 1.8 2. Spark 2.1 自定义函数分类 ...
- Spark操作外部数据源(RDBMS,Hive,HBase,Parquet)
文章目录 一.Spark SQL 二.Spark on Hive 三.Hive on Spark 四.Spark读取Parquet文件 五.Spark连接HBase 1.Maven工程添加依赖 2.代 ...
- Spark SQL操作Hive表
Spark SQL支持从Hive存储中读写数据.然而,Hive存在很多的依赖,而这些依赖又不包含在默认的各类Spark发型版本中.如果将Hive的依赖放入classpath中,Spark将自动加载它们 ...
- 第69课:Spark SQL通过Hive数据源JOIN实战 每天晚上20:00YY频道现场授课频道68917580
/* * *王家林老师授课http://weibo.com/ilovepains */ 每天晚上20:00YY频道现场授课频道68917580 源文件 person.txt Michael 29 A ...
- 2018年又传喜报!热烈祝贺王家林大师大数据经典著作《Spark SQL大数据实例开发教程》 畅销书籍 出版上市!
2018年又传喜报!热烈祝贺王家林大师大数据经典著作<Spark SQL大数据实例开发教程> 畅销书籍 出版上市! 作者: 王家林 段智华 条码书号:9787111591979 出版日期 ...
- Spark SQL之External DataSource外部数据源(二)源代码分析
上周Spark1.2刚公布,周末在家没事,把这个特性给了解一下,顺便分析下源代码,看一看这个特性是怎样设计及实现的. /** Spark SQL源代码分析系列文章*/ (Ps: External Da ...
最新文章
- 百度、长沙加码自动驾驶,湖南阿波罗智行科技公司成立...
- [BZOJ4994] [Usaco2017 Feb]Why Did the Cow Cross the Road III(树状数组)
- java中文 x_java环境url中文参数乱码处理
- android 点击侧滑代码,代码分析Android实现侧滑菜单
- javascript设计模式研究学习-设计模式类别
- Mybatis 输入映射
- [Python] random.uniform( ) 函数教程与实例解析
- Linux系统文件的隐藏属性
- 管理感悟:代码审查做哪些事情?
- 如何真正理解三极管饱和 放大的含义 (必收藏)
- paypal java_PaypalUtil PayPal付款JAVA工具类
- RAR解压、压缩命令
- 关于同比和环比的几个问题
- 佛山科目二仙塘考场(B场)-考试要点
- python爬虫匹配uniport数据库的Pathway字段是否存在(方法一)
- 第一天-2.安装vmware虚拟机kali系统
- Python的import
- 朴素贝叶斯算法:实现邮件分类
- 利用腾讯 优图visionseed硬件 实现人脸疲劳检测项目(包括数据读取,数据保存,数据web端展示)
- 云栖大会,一场边缘云计算的「超前瞻」之约