Spark SQL and DataFrame

1.为什么要用Spark Sql

原来我们使用Hive,是将Hive Sql 转换成Map Reduce 然后提交到集群上去执行,大大简化了编写MapReduce的程序的复杂性,由于MapReduce这种计算模型执行效率比较慢,所以Spark Sql的应运而生,它是将SparkSql转换成RDD,然后提交到集群执行,执行效率非常的快。

Spark Sql的有点:1、易整合  2、统一的数据访问方式 3、兼容Hvie 4、标准的数据连接

2、DataFrames

什么是DataFrames?

与RDD类似,DataFrames也是一个分布式数据容器,然而DataFrame更像是传统数据库的二维表格,除了数据以外,还记录数据的结构信息,即schema。同时,与Hive类似,DataFrame与支持嵌套数据类型(struct、array和map)。从API的易用性上看,DataFrame API提供的是一套高层的关系操作,比函数式的RDD API要更加友好,门槛更低。由于与R和Pandas的DataFrame类似,Spark DataFrame很好地继承了传统单机数据分析的开发体验。

创建DataFrames

在Spark SQL中SQLContext是创建DataFrames和执行SQL的入口,在spark-1。5.2中已经内置了一个sqlContext。

1.在本地创建一个文件,有三列,分别是id、name、age,用空格分隔,然后上传到hdfs上

hdfs dfs -put person.txt /

2.在spark shell执行下面命令,读取数据,将每一行的数据使用列分隔符分割

val lineRDD = sc.textFile("hdfs://hadoop01:9000/person.txt").map(_.split(" "))

3.定义case class(相当于表的schema)

case class Person(id:Int, name:String, age:Int)

4.将RDD和case class关联

val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))

5.将RDD转换成DataFrame

val personDF = personRDD.toDF

6.对DataFrame进行处理

personDF.show

代码:

object SparkSqlTest {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local").setAppName("SQL-1")val sc = new SparkContext(conf)fun1(sc)}//定义case class 相当于表的schemacase class Person(id:Int,name:String,age:Int)def fun1(sc:SparkContext): Unit ={val sqlContext = new SQLContext(sc)  // 位置一般情况下是换成HDFS文件路径val lineRdd = sc.textFile("D:\\data\\person.txt").map(_.split(" "))val personRdd = lineRdd.map(x=>Person(x(0).toInt,x(1),x(2).toInt))import sqlContext.implicits._val personDF = personRdd.toDF//注册表personDF.registerTempTable("person_df")//传入sqlval df = sqlContext.sql("select * from person_df order by age desc")//将结果以JSON的方式存储到指定位置df.write.json("D:\\data\\personOut")sc.stop()}

DataFrame 常用操作

DSL风格语法(个人理解短小精悍的含义)

     // 查看DataFrame部分列中的内容df.select(personDF.col("name")).show()df.select(col = "age").show()df.select("id").show()// 打印DataFrame的Schema信息
    df.printSchema()//查询所有的name 和 age ,并将 age+2df.select(df("id"),df("name"),df("age")+2).show()//查询所有年龄大于20的df.filter(df("age")>20).show()// 按年龄分组并统计相同年龄人数df.groupBy("age").count().show()

SQL风格语法(前提:需要将DataFrame注册成表)

//注册成表personDF.registerTempTable("person_df")// 查询年龄最大的两位 并用对象接接收val persons = sqlContext.sql("select * from person_df order by age desc limit 2")persons.foreach(x=>print(x(0),x(1),x(2)))

 

通过StructType直接指定Schema

 1  /*通过StructType直接指定Schema*/
 2   def fun2(sc: SparkContext): Unit = {
 3     val sqlContext = new SQLContext(sc)
 4     val personRDD = sc.textFile("D:\\data\\person.txt").map(_.split(" "))
 5     // 通过StructType直接指定每个字段的Schema
 6     val schema = StructType(List(StructField("id", IntegerType, true), StructField("name", StringType, true), StructField("age", IntegerType)))
 7     //将rdd映射到RowRDD
 8     val rowRdd = personRDD.map(x=>Row(x(0).toInt,x(1).trim,x(2).toInt))
 9     //将schema信息应用到rowRdd上
10     val dataFrame = sqlContext.createDataFrame(rowRdd,schema)
11     //注册
12     dataFrame.registerTempTable("person_struct")
13
14     sqlContext.sql("select * from person_struct").show()
15
16     sc.stop()
17
18   }

连接数据源

1 /*连接mysql数据源*/
2   def fun3(sc:SparkContext): Unit ={
3     val sqlContext = new SQLContext(sc)
4     val jdbcDF = sqlContext.read.format("jdbc").options(Map("url"->"jdbc:mysql://192.168.180.100:3306/bigdata","driver"->"com.mysql.jdbc.Driver","dbtable"->"person","user"->"root","password"->"123456")).load()
5     jdbcDF.show()
6     sc.stop()
7   }

 再回写到数据库中

 1  // 写入数据库
 2     val personTextRdd = sc.textFile("D:\\data\\person.txt").map(_.split(" ")).map(x=>Row(x(0).toInt,x(1),x(2).toInt))
 3
 4     val schema = StructType(List(StructField("id", IntegerType), StructField("name", StringType), StructField("age", IntegerType)))
 5
 6     val personDataFrame = sqlContext.createDataFrame(personTextRdd,schema)
 7
 8     val prop = new Properties()
 9     prop.put("user","root")
10     prop.put("password","123456")
11     //写入数据库
12     personDataFrame.write.mode("append").jdbc("jdbc:mysql://192.168.180.100:3306/bigdata","bigdata.person",prop)
13
14     sc.stop()

转载于:https://www.cnblogs.com/chengzhihua/p/9517229.html

SparkSql学习笔记(包含IDEA编写的本地代码)相关推荐

  1. 最优控制和轨迹规划学习笔记 包含多个实际案例 主要思路是使用优化算法来找到车辆的最佳路径

    最优控制和轨迹规划学习笔记 包含多个实际案例 倒立摆上翻控制 满足车辆运动学约束的路径规划 离散点参考线优化 lattice横向距离规划 这段代码包含了三个程序,我们将分别对它们进行详细的分析. 最速 ...

  2. iQQ 学习笔记3 :编写代码打包Ant脚本

    iQQ 学习笔记声明 本文仅供学习研究使用,不得用于任何非法及侵权用途. 转贴请注明原发位置: http://xuekaiyuan.com/forum.php?mod=viewthread&t ...

  3. Python学习笔记:Day11 编写日志创建页

    前言 最近在学习深度学习,已经跑出了几个模型,但Pyhton的基础不够扎实,因此,开始补习Python了,大家都推荐廖雪峰的课程,因此,开始了学习,但光学有没有用,还要和大家讨论一下,因此,写下这些帖 ...

  4. Python学习笔记:Day5 编写web框架

    前言 最近在学习深度学习,已经跑出了几个模型,但Pyhton的基础不够扎实,因此,开始补习Python了,大家都推荐廖雪峰的课程,因此,开始了学习,但光学有没有用,还要和大家讨论一下,因此,写下这些帖 ...

  5. Python学习笔记:Day4 编写Model

    前言 最近在学习深度学习,已经跑出了几个模型,但Pyhton的基础不够扎实,因此,开始补习Python了,大家都推荐廖雪峰的课程,因此,开始了学习,但光学有没有用,还要和大家讨论一下,因此,写下这些帖 ...

  6. ROS学习笔记三:编写第一个ROS节点程序

    在编写第一个ROS节点程序之前需要创建工作空间(workspace)和功能包(package). 一.创建工作空间(workspace) 创建一个catkin_ws: #注意:如果使用sudo一次性创 ...

  7. python学习笔记12-类代码编写细节

    一.class语句 一般形式 class         <name>(superclass,...): data=value def mothod(self,...): self.mem ...

  8. “Spark三剑客”之SparkCore和SparkSql学习笔记(零基础入门)(一)

    目录 1 Spark的介绍 1.1 Spark的定义 1.2 Spark为什么比MapReduce快? 1.3 RDD 弹性式分布式数据集 1.4 MasterURL 1.5 Spark为什么很占内存 ...

  9. Gatling学习笔记(四)---脚本编写及功能介绍

    文章目录 1.脚本编写 1.1 脚本示例 1.2 脚本编写 2.SSL使用 3.条件语句 4.Check和Session使用 5.Feeder 1.脚本编写 其实在压测的过程中我们主要也是压测http ...

最新文章

  1. 比特币vs分布式账本vs以太坊vs区块链
  2. LeetCode实战:两两交换链表中的节点
  3. 泊松分布与正太分布在指导武器理论方面的使用
  4. 线性代数里的最小二乘法介绍
  5. 用Jquery自己开发个代阴影的对话框吧!
  6. python3-Python3 数字(Number)
  7. 【leetcode】1032. Stream of Characters
  8. 遍历目录下的文件每250M打包一个文件
  9. iOSUI视图面试及原理总结
  10. AIDE支持实时错误检查、代码重构、代码智能导航、生成APK
  11. SkinMagic使用后按钮加自定义图标或菜单GetMneu返回NULL的解决方法
  12. Blazor University (1)介绍 - 什么是 Blazor?
  13. 在Eclipse中配置Tomcat7.0
  14. Quartz调度源码分析
  15. Linux之Firewall防火墙、iptables、firewalld
  16. 计算机怎么删除表格,EXCEL如何删除表格内容中的部分文本
  17. 74HC/LS/HCT/F系列芯片的区别
  18. 天津理工大学物联网通信技术实验1:数字基带信号(NRZ、NRZ-I、AMI、HDB3信道编码)
  19. 巴菲特十大量化选股经
  20. pragma HLS interface 端口综合

热门文章

  1. python绑定句柄容易么_Python 有什么奇技淫巧?
  2. python随机数权重_Python实现基于权重的随机数2种方法
  3. Nginx一个server配置多个location
  4. springboot线程池使用
  5. Android开发笔记(一百七十八)更安全的数据仓库DataStore
  6. linux查看共享内存max,浅析Linux的共享内存与tmpfs文件系统
  7. 写论文的第三天 自建zookeeper集群
  8. [译]技术之外,工作之内,非常实际有用的技巧--如何宣布坏消息?
  9. Java10的新特性
  10. Spring Cloud with Turbine