SparkSql学习笔记(包含IDEA编写的本地代码)
Spark SQL and DataFrame
1.为什么要用Spark Sql
原来我们使用Hive,是将Hive Sql 转换成Map Reduce 然后提交到集群上去执行,大大简化了编写MapReduce的程序的复杂性,由于MapReduce这种计算模型执行效率比较慢,所以Spark Sql的应运而生,它是将SparkSql转换成RDD,然后提交到集群执行,执行效率非常的快。
Spark Sql的有点:1、易整合 2、统一的数据访问方式 3、兼容Hvie 4、标准的数据连接
2、DataFrames
什么是DataFrames?
与RDD类似,DataFrames也是一个分布式数据容器,然而DataFrame更像是传统数据库的二维表格,除了数据以外,还记录数据的结构信息,即schema。同时,与Hive类似,DataFrame与支持嵌套数据类型(struct、array和map)。从API的易用性上看,DataFrame API提供的是一套高层的关系操作,比函数式的RDD API要更加友好,门槛更低。由于与R和Pandas的DataFrame类似,Spark DataFrame很好地继承了传统单机数据分析的开发体验。
创建DataFrames
在Spark SQL中SQLContext是创建DataFrames和执行SQL的入口,在spark-1。5.2中已经内置了一个sqlContext。
1.在本地创建一个文件,有三列,分别是id、name、age,用空格分隔,然后上传到hdfs上
hdfs dfs -put person.txt /
2.在spark shell执行下面命令,读取数据,将每一行的数据使用列分隔符分割
val lineRDD = sc.textFile("hdfs://hadoop01:9000/person.txt").map(_.split(" "))
3.定义case class(相当于表的schema)
case class Person(id:Int, name:String, age:Int)
4.将RDD和case class关联
val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))
5.将RDD转换成DataFrame
val personDF = personRDD.toDF
6.对DataFrame进行处理
personDF.show
代码:
object SparkSqlTest {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local").setAppName("SQL-1")val sc = new SparkContext(conf)fun1(sc)}//定义case class 相当于表的schemacase class Person(id:Int,name:String,age:Int)def fun1(sc:SparkContext): Unit ={val sqlContext = new SQLContext(sc) // 位置一般情况下是换成HDFS文件路径val lineRdd = sc.textFile("D:\\data\\person.txt").map(_.split(" "))val personRdd = lineRdd.map(x=>Person(x(0).toInt,x(1),x(2).toInt))import sqlContext.implicits._val personDF = personRdd.toDF//注册表personDF.registerTempTable("person_df")//传入sqlval df = sqlContext.sql("select * from person_df order by age desc")//将结果以JSON的方式存储到指定位置df.write.json("D:\\data\\personOut")sc.stop()}
DataFrame 常用操作
DSL风格语法(个人理解短小精悍的含义)
// 查看DataFrame部分列中的内容df.select(personDF.col("name")).show()df.select(col = "age").show()df.select("id").show()// 打印DataFrame的Schema信息 df.printSchema()//查询所有的name 和 age ,并将 age+2df.select(df("id"),df("name"),df("age")+2).show()//查询所有年龄大于20的df.filter(df("age")>20).show()// 按年龄分组并统计相同年龄人数df.groupBy("age").count().show()
SQL风格语法(前提:需要将DataFrame注册成表)
//注册成表personDF.registerTempTable("person_df")// 查询年龄最大的两位 并用对象接接收val persons = sqlContext.sql("select * from person_df order by age desc limit 2")persons.foreach(x=>print(x(0),x(1),x(2)))
通过StructType直接指定Schema
1 /*通过StructType直接指定Schema*/ 2 def fun2(sc: SparkContext): Unit = { 3 val sqlContext = new SQLContext(sc) 4 val personRDD = sc.textFile("D:\\data\\person.txt").map(_.split(" ")) 5 // 通过StructType直接指定每个字段的Schema 6 val schema = StructType(List(StructField("id", IntegerType, true), StructField("name", StringType, true), StructField("age", IntegerType))) 7 //将rdd映射到RowRDD 8 val rowRdd = personRDD.map(x=>Row(x(0).toInt,x(1).trim,x(2).toInt)) 9 //将schema信息应用到rowRdd上 10 val dataFrame = sqlContext.createDataFrame(rowRdd,schema) 11 //注册 12 dataFrame.registerTempTable("person_struct") 13 14 sqlContext.sql("select * from person_struct").show() 15 16 sc.stop() 17 18 }
连接数据源
1 /*连接mysql数据源*/ 2 def fun3(sc:SparkContext): Unit ={ 3 val sqlContext = new SQLContext(sc) 4 val jdbcDF = sqlContext.read.format("jdbc").options(Map("url"->"jdbc:mysql://192.168.180.100:3306/bigdata","driver"->"com.mysql.jdbc.Driver","dbtable"->"person","user"->"root","password"->"123456")).load() 5 jdbcDF.show() 6 sc.stop() 7 }
再回写到数据库中
1 // 写入数据库 2 val personTextRdd = sc.textFile("D:\\data\\person.txt").map(_.split(" ")).map(x=>Row(x(0).toInt,x(1),x(2).toInt)) 3 4 val schema = StructType(List(StructField("id", IntegerType), StructField("name", StringType), StructField("age", IntegerType))) 5 6 val personDataFrame = sqlContext.createDataFrame(personTextRdd,schema) 7 8 val prop = new Properties() 9 prop.put("user","root") 10 prop.put("password","123456") 11 //写入数据库 12 personDataFrame.write.mode("append").jdbc("jdbc:mysql://192.168.180.100:3306/bigdata","bigdata.person",prop) 13 14 sc.stop()
转载于:https://www.cnblogs.com/chengzhihua/p/9517229.html
SparkSql学习笔记(包含IDEA编写的本地代码)相关推荐
- 最优控制和轨迹规划学习笔记 包含多个实际案例 主要思路是使用优化算法来找到车辆的最佳路径
最优控制和轨迹规划学习笔记 包含多个实际案例 倒立摆上翻控制 满足车辆运动学约束的路径规划 离散点参考线优化 lattice横向距离规划 这段代码包含了三个程序,我们将分别对它们进行详细的分析. 最速 ...
- iQQ 学习笔记3 :编写代码打包Ant脚本
iQQ 学习笔记声明 本文仅供学习研究使用,不得用于任何非法及侵权用途. 转贴请注明原发位置: http://xuekaiyuan.com/forum.php?mod=viewthread&t ...
- Python学习笔记:Day11 编写日志创建页
前言 最近在学习深度学习,已经跑出了几个模型,但Pyhton的基础不够扎实,因此,开始补习Python了,大家都推荐廖雪峰的课程,因此,开始了学习,但光学有没有用,还要和大家讨论一下,因此,写下这些帖 ...
- Python学习笔记:Day5 编写web框架
前言 最近在学习深度学习,已经跑出了几个模型,但Pyhton的基础不够扎实,因此,开始补习Python了,大家都推荐廖雪峰的课程,因此,开始了学习,但光学有没有用,还要和大家讨论一下,因此,写下这些帖 ...
- Python学习笔记:Day4 编写Model
前言 最近在学习深度学习,已经跑出了几个模型,但Pyhton的基础不够扎实,因此,开始补习Python了,大家都推荐廖雪峰的课程,因此,开始了学习,但光学有没有用,还要和大家讨论一下,因此,写下这些帖 ...
- ROS学习笔记三:编写第一个ROS节点程序
在编写第一个ROS节点程序之前需要创建工作空间(workspace)和功能包(package). 一.创建工作空间(workspace) 创建一个catkin_ws: #注意:如果使用sudo一次性创 ...
- python学习笔记12-类代码编写细节
一.class语句 一般形式 class <name>(superclass,...): data=value def mothod(self,...): self.mem ...
- “Spark三剑客”之SparkCore和SparkSql学习笔记(零基础入门)(一)
目录 1 Spark的介绍 1.1 Spark的定义 1.2 Spark为什么比MapReduce快? 1.3 RDD 弹性式分布式数据集 1.4 MasterURL 1.5 Spark为什么很占内存 ...
- Gatling学习笔记(四)---脚本编写及功能介绍
文章目录 1.脚本编写 1.1 脚本示例 1.2 脚本编写 2.SSL使用 3.条件语句 4.Check和Session使用 5.Feeder 1.脚本编写 其实在压测的过程中我们主要也是压测http ...
最新文章
- 比特币vs分布式账本vs以太坊vs区块链
- LeetCode实战:两两交换链表中的节点
- 泊松分布与正太分布在指导武器理论方面的使用
- 线性代数里的最小二乘法介绍
- 用Jquery自己开发个代阴影的对话框吧!
- python3-Python3 数字(Number)
- 【leetcode】1032. Stream of Characters
- 遍历目录下的文件每250M打包一个文件
- iOSUI视图面试及原理总结
- AIDE支持实时错误检查、代码重构、代码智能导航、生成APK
- SkinMagic使用后按钮加自定义图标或菜单GetMneu返回NULL的解决方法
- Blazor University (1)介绍 - 什么是 Blazor?
- 在Eclipse中配置Tomcat7.0
- Quartz调度源码分析
- Linux之Firewall防火墙、iptables、firewalld
- 计算机怎么删除表格,EXCEL如何删除表格内容中的部分文本
- 74HC/LS/HCT/F系列芯片的区别
- 天津理工大学物联网通信技术实验1:数字基带信号(NRZ、NRZ-I、AMI、HDB3信道编码)
- 巴菲特十大量化选股经
- pragma HLS interface 端口综合
热门文章
- python绑定句柄容易么_Python 有什么奇技淫巧?
- python随机数权重_Python实现基于权重的随机数2种方法
- Nginx一个server配置多个location
- springboot线程池使用
- Android开发笔记(一百七十八)更安全的数据仓库DataStore
- linux查看共享内存max,浅析Linux的共享内存与tmpfs文件系统
- 写论文的第三天 自建zookeeper集群
- [译]技术之外,工作之内,非常实际有用的技巧--如何宣布坏消息?
- Java10的新特性
- Spring Cloud with Turbine