Spark SQL支持通过DataFrame接口操作的多种不同的数据源。DataFrame提供支持统一的接口加载和保存数据源中的数据,包括:结构化数据,Parquet文件,JSON文件,Hive表 ,以及通过JDBC连接外部数据源。

转载请标明原文地址:原文链接

与Hive类似的,Spark SQL也可以创建临时表和持久表(即管理表),使用registerTempTable命令创建临时表,使用saveAsTable命令将数据保存到值就表,该命令将创建一个“管理表”,这也就意味着 数据的位置由Metastore控制,当表删除的时候,管理表将表数据自动删除。

也可以通过配置SaveMode指定如何处理现有数据,实现保存模式不使用任何锁定,而且不是原子操作,因此,在并发环境下操作不能保证数据的安全性,保存模式参数选项如下:

|Scala/Java |python|含义
|-
|SaveMode.ErrorIfExists(default)|“error”|如果保存数据已经存在,抛出异常
|SaveMode.Append|“append”|如果保存数据已经存在,追写DataFrame数据
|SaveMode.Overwrite|“overwrite”|如果保存数据已经存在,重写DataFrame数据
|SaveMode.Ignore|“ignore”|如果保存数据已经存在,忽略DataFrame数据

文本数据

Spark SQL可以加载普通的文本文件来创建DataFrame来进行操作。
以下面数据为例:

shinelon,19
mike,20
wangwu,25

操作代码如下:

 val sqlContext=new sql.SQLContext(sc)import sqlContext.implicits._              //隐式转换,将一个RDD转换为DataFrame//使用前缀hdfs://来标识HDFS存储系统的文件val people=sc.textFile("file:///F:/spark-2.0.0/SparkApp/src/cn/just/shinelon/txt/SparkSql02.txt").map(_.split(",")).map(p=>Person(p(0),p(1).trim.toInt)).toDF()//DataFrame注册临时表people.registerTempTable("person")//使用sql运行SQL表达式val result=sqlContext.sql("SELECT name,age from person WHERE age>=19")println(result.map(t=>"Name:"+t(0)).collect())

其实在上面将RDD转换为DataFrame,有两种方法,通过反射推断RDD模式和以编程方式定义模式,上面使用了反射方式,另一种方式参考上一篇文章末尾Spark SQL与DataFrame详解以及使用。

Parquet格式文件

使用parquet格式文件,高效,因为其列式存储避免读入不需要的数据,有极好的性能和GC。而且方便压缩和解压缩,有更好的缓存效果。

操作代码如下:

/*** 将普通文本文件转换为parquet数据源来创建临时表* @param sc*/def parquetTable(sc:SparkContext): Unit ={val sqlContext=new SQLContext(sc)//隐式转换为一个DataFrameimport sqlContext.implicits._val peopleDF=sc.textFile("file:///F:/spark-2.0.0/SparkApp/src/cn/just/shinelon/txt/SparkSql02.txt").map(_.split(",")).map(p=>Person(p(0),p(1).trim.toInt)).toDF()peopleDF.saveAsParquetFile("hdfs://hadoop-senior.shinelon.com:8020/user/shinelon/SparkSql/people.parquet")//加载Parquet为DataFrameval parquetFile=sqlContext.parquetFile("hdfs://hadoop-senior.shinelon.com:8020/user/shinelon/SparkSql/people.parquet")//将DataFrame注册为临时表,提供SQL查询使用parquetFile.registerTempTable("parquetTable")val result=sqlContext.sql("select name from parquetTable")result.map(t=>"Name:"+t(0)).collect().foreach(println)/*** 运行结果如下:* Name:shinelonName:mikeName:wangwu*/}

上面程序需要注意的是,parquet文件不能在本地读写,需要在集群上操作,在windows本地操作有可能会报错。

分区表
与Hive类似的,Spark SQL也可以进行分区,下面是一个简单的分区表的创建:

  /***scala> df1.printSchema()
root|-- single: integer (nullable = false)|-- double: integer (nullable = false)* @param sc*/def testPartition(sc:SparkContext): Unit ={val sqlContext=new SQLContext(sc)//隐式转换import sqlContext.implicits._val df1=sc.makeRDD(1 to 5).map(i=>(i,i*2)).toDF("single","double")df1.saveAsParquetFile("data/test/key=1")df1.printSchema()
}

Json文件

Spark SQL可以自动推断出一个JSON数据集的Schema并作为一个DataFrame加载。下面是一个简单的实例。

json数据如下:

{"name":"Mirckel"}
{"name":"Andy","age":30}
{"name":"Jsutin","age":13}
def test01(sc:SparkContext): Unit ={val sqlContext=new sql.SQLContext(sc)val df=sqlContext.jsonFile("file:///F:/spark-2.0.0/SparkApp/src/cn/just/shinelon/txt/SparkSql01.json")df.registerTempTable("people")println(df.show())      //打印表数据println(df.printSchema())  //以树的形式打印DataFrame的Schemaprintln(df.select(df("name"),df("age")+1).show())println("===============================")val result=sqlContext.sql("select name from people")result.foreach(println)}

Hive表

Spark SQL也可以从Hive表中读写数据,通过创建HiveContext进行一系列的操作。操作Hive表就需要将HIve的相关依赖或者jar包导入项目中,如果创建的是maven工程或者scala工程还必须将hive-site.xml和core-site.xml以及hdfs-site.xml配置文件拷贝到资源文件目录下。

测试的相关数据集格式如下:

238val_238
86val_86
311val_311
27val_27
165val_165
409val_409
255val_255
278val_278
98val_98

即Spark源码中examples\src\main\resources目录下的数据集kv1.txt。读者可以去github中的Spark源码中下载。

操作代码如下:

/*** Spark SQL集成Hive表* 在Spark-shell中可以运行* @param sc*/def testHive(sc:SparkContext): Unit ={val hiveContext=new HiveContext(sc)//创建表hiveContext.sql("create table if not exists src (key int,value string)")//加载数据hiveContext.sql("load data local inpath 'file:///opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/data/kv1.txt' into table src")//查询hiveContext.sql("from src select key,value").collect().foreach(println)/*** 运行部分结果如下:*[238,val_238][86,val_86][311,val_311][27,val_27][165,val_165][409,val_409][255,val_255]*/sc.stop()}

JDBC操作数据库

除了上面介绍的一系列数据源之外,Spark SQL还支持使用JDBC操作关系型数据库,从关系型数据库中读写数据。这里连接mysql数据库。

在mysql中,表中的数据如下所示:

操作代码如下:

  def testJDBC(sc:SparkContext): Unit ={val sqlContext=new SQLContext(sc)val jdbcDF=sqlContext.load("jdbc",Map("url"->"jdbc:mysql://127.0.0.1:3306/library?user=root&password=123456","dbtable"->"book","driver"->"com.mysql.jdbc.Driver"
//      "user"->"root",
//      "password"->"123456"))jdbcDF.show()}

代码运行结果如下:

至此,本篇文章介绍完了Spark SQL如何对多数据源进行操作,完整的代码下载地址为:Spark SQL操作多数据源完整代码下载地址。


如果你想和我们一起学习交流,共同进步,欢迎加群:

Spark SQL操作多数据源相关推荐

  1. Spark SQL操作外部数据源

    目录 一.Spark SQL支持的外部数据源 二.Spark SQL -> CSV 2.1 读CSV文件 a.有列名 b.无列名 2.2 写CSV文件 三.Spark SQL -> JSO ...

  2. Spark SQL操作之-函数汇总篇-下

    Spark SQL操作之-自定义函数篇-下 环境说明 自定义函数分类 用户自定义函数(UDF) 用户自定义聚合函数(UDAF) 环境说明 1. JDK 1.8 2. Spark 2.1 自定义函数分类 ...

  3. Spark SQL与外部数据源的操作(Spark SQL ——> CSV/JSON/Parquet/hive/mysql)

    目录 一.Spark SQL支持的外部数据源 二.Spark SQL -> CSV 2.1 读CSV文件 a.有列名 b.无列名 2.2 写CSV文件 三.Spark SQL -> JSO ...

  4. Spark SQL连接外部数据源

    一.Spark SQL支持的外部数据源 Spark SQL开放了一系列接入外部数据源的接口,来让开发者可以实现Spark SQL可以 加载任何地方的数据,例如mysql,hive,hdfs,hbase ...

  5. Spark SQL操作Hive表

    Spark SQL支持从Hive存储中读写数据.然而,Hive存在很多的依赖,而这些依赖又不包含在默认的各类Spark发型版本中.如果将Hive的依赖放入classpath中,Spark将自动加载它们 ...

  6. 第69课:Spark SQL通过Hive数据源JOIN实战 每天晚上20:00YY频道现场授课频道68917580

    /* * *王家林老师授课http://weibo.com/ilovepains */ 每天晚上20:00YY频道现场授课频道68917580 源文件 person.txt Michael 29  A ...

  7. 2018年又传喜报!热烈祝贺王家林大师大数据经典著作《Spark SQL大数据实例开发教程》 畅销书籍 出版上市!

    2018年又传喜报!热烈祝贺王家林大师大数据经典著作<Spark SQL大数据实例开发教程> 畅销书籍 出版上市! 作者: 王家林 段智华  条码书号:9787111591979 出版日期 ...

  8. Spark SQL之External DataSource外部数据源(二)源代码分析

    上周Spark1.2刚公布,周末在家没事,把这个特性给了解一下,顺便分析下源代码,看一看这个特性是怎样设计及实现的. /** Spark SQL源代码分析系列文章*/ (Ps: External Da ...

  9. Hive on Spark和Spark sql on Hive,你能分的清楚么

    摘要:结构上Hive On Spark和SparkSQL都是一个翻译层,把一个SQL翻译成分布式可执行的Spark程序. 本文分享自华为云社区<Hive on Spark和Spark sql o ...

最新文章

  1. Java网络编程基础(三)---基于UDP编程
  2. HTML5最新漏洞:用户硬盘或被垃圾数据塞满
  3. Segment Routing — SRv6 — Overview
  4. java sql超过32k_db2 clob类型如何能存储大于32k的字符串
  5. case里面两个条件_Go语言条件语句之 switch 语句
  6. 《需求工程——软件建模与分析》阅读笔记之二
  7. Git笔记(25) 选择修订版本
  8. 高通855比高通675贵多少钱,性能差距有多大?
  9. html箭头实现流程箭头,js实现带箭头的进度流程
  10. Python翻译Excel文件
  11. HTML YouTube 视频
  12. 微信开发者工具保存的时候,提示权限不足,选择以管理员的身份重试个人解决方法
  13. 关于 error: invalid types ‘int[int]‘ for array subscript 的解决
  14. 23位子网掩码是多少_23位子网掩码包含哪几个网段
  15. 2022年高考送祝福,金秋9月,CSDN等你哦!
  16. 蒙特卡洛方法求圆周率
  17. Json是什么?要怎样理解?
  18. 印象笔记、为知笔记、有道云笔记使用比较
  19. _012_IDEA_idea 创建工作空间(空项目) 项目组
  20. 回家,一朵花开的时间

热门文章

  1. t420i升级固态硬盘提升_SSD固态硬盘读写速度测试工具(附SSD性能提升知识干货)...
  2. rails 调试工具pry 换掉debugger 和 rails c
  3. iOS实现网速实时监测
  4. Nodejs 中的非阻塞I/O、异步和事件驱动
  5. 西雅图又一家科技公司准备上市!
  6. 基于ZFAKA二次开发,添加PayJS支付渠道
  7. explain的使用
  8. mysql8.017安装教程_mysql 8.0.17 安装图文教程
  9. windows不安装虚拟机如何使用Linux系统作为开发工具?
  10. TPS77618DR PMIC - 稳压器 - 线性 正 固定 1 输出 500mA 8-SOIC