[TOC]

加载保存功能

数据加载(json文件、jdbc)与保存(json、jdbc)

测试代码如下:

package cn.xpleaf.bigdata.spark.scala.sql.p1

import java.util.Properties

import org.apache.log4j.{Level, Logger}

import org.apache.spark.{SparkConf, SparkContext}

import org.apache.spark.sql.{SQLContext, SaveMode}

/**

* SparkSQL关于加载数据和数据落地的各种实战操作

*/

object _03SparkSQLLoadAndSaveOps {

def main(args: Array[String]): Unit = {

Logger.getLogger("org.apache.spark").setLevel(Level.OFF)

val conf = new SparkConf().setMaster("local[2]").setAppName(_01SparkSQLOps.getClass.getSimpleName)

val sc = new SparkContext(conf)

val sqlContext = new SQLContext(sc)

// readOps(sqlContext)

writeOps(sqlContext)

sc.stop()

}

/**

* 在write结果到目录中的时候需要留意相关异常

* org.apache.spark.sql.AnalysisException: path file:/D:/data/spark/sql/people-1.json already exists

* 如果还想使用该目录的话,就需要设置具体的保存模式SaveMode

* ErrorIfExist

* 默认的,目录存在,抛异常

* Append

* 追加

* Ingore

* 忽略,相当于不执行

* Overwrite

* 覆盖

*/

def writeOps(sqlContext:SQLContext): Unit = {

val df = sqlContext.read.json("D:/data/spark/sql/people.json")

df.registerTempTable("people")

val retDF = sqlContext.sql("select * from people where age > 20")

// retDF.show()

// 将结果落地

//retDF.coalesce(1).write.mode(SaveMode.Overwrite).json("D:/data/spark/sql/people-1.json")

// 落地到数据库

val url = "jdbc:mysql://localhost:3306/test"

val table = "people1" // 会重新创建一张新表

val properties = new Properties()

properties.put("user", "root")

properties.put("password", "root")

retDF.coalesce(1).write.jdbc(url, table, properties)

}

/*

// sparkSQL读数据

// java.lang.RuntimeException: file:/D:/data/spark/sql/people.json is not a Parquet file

sparkSQL使用read.load加载的默认文件格式为parquet(parquet.apache.org)

加载其它文件格式怎么办?

需要指定加载文件的格式.format("json")

*/

def readOps(sqlContext:SQLContext): Unit = {

// val df = sqlContext.read.load("D:/data/spark/sql/users.parquet")

// val df = sqlContext.read.format("json").load("D:/data/spark/sql/people.json")

// val df = sqlContext.read.json("D:/data/spark/sql/people.json")

val url = "jdbc:mysql://localhost:3306/test"

val table = "people"

val properties = new Properties()

properties.put("user", "root")

properties.put("password", "root")

val df = sqlContext.read.jdbc(url, table, properties)

df.show()

}

}

当执行读操作时,输出结果如下:

+---+----+---+------+

| id|name|age|height|

+---+----+---+------+

| 1| 小甜甜| 18| 168.0|

| 2| 小丹丹| 19| 167.0|

| 3| 大神| 25| 181.0|

| 4| 团长| 38| 158.0|

| 5| 记者| 22| 169.0|

+---+----+---+------+

当执行写操作时:

1.如果保存到json文件

注意有各种写模式,另外其保存的是一个目录,与HDFS兼容的目录格式

2.如果保存到jdbc

则会在数据库中创建一个DataFrame所包含列的表,注意该表不能存在

Spark SQL和Hive的集成

需要先启动Hive,然后再进行下面的操作。

代码编写

测试代码如下:

package cn.xpleaf.bigdata.spark.scala.sql.p2

import cn.xpleaf.bigdata.spark.scala.sql.p1._01SparkSQLOps

import org.apache.log4j.{Level, Logger}

import org.apache.spark.{SparkConf, SparkContext}

import org.apache.spark.sql.hive.HiveContext

/**

* 通过创建HiveContext来操作Hive中表的数据

* 数据源:

* teacher_info.txt

* name(String) height(double)

* zhangsan,175

* lisi,180

* wangwu,175

* zhaoliu,195

* zhouqi,165

* weiba,185

*

* create table teacher_info(

* name string,

* height double

* ) row format delimited

* fields terminated by ',';

*

* teacher_basic.txt

* name(String) age(int) married(boolean) children(int)

* zhangsan,23,false,0

* lisi,24,false,0

* wangwu,25,false,0

* zhaoliu,26,true,1

* zhouqi,27,true,2

* weiba,28,true,3

*

* create table teacher_basic(

* name string,

* age int,

* married boolean,

* children int

* ) row format delimited

* fields terminated by ',';

* *

* 需求:

*1.通过sparkSQL在hive中创建对应表,将数据加载到对应表

*2.执行sparkSQL作业,计算teacher_info和teacher_basic的关联信息,将结果存放在一张表teacher中

*

* 在集群中执行hive操作的时候,需要以下配置:

* 1、将hive-site.xml拷贝到spark/conf目录下,将mysql connector拷贝到spark/lib目录下

2、在$SPARK_HOME/conf/spark-env.sh中添加一条记录

export SPARK_CLASSPATH=$SPARK_CLASSPATH:$SPARK_HOME/lib/mysql-connector-java-5.1.39.jar

*/

object _01HiveContextOps {

def main(args: Array[String]): Unit = {

Logger.getLogger("org.apache.spark").setLevel(Level.OFF)

val conf = new SparkConf()

// .setMaster("local[2]")

.setAppName(_01SparkSQLOps.getClass.getSimpleName)

val sc = new SparkContext(conf)

val hiveContext = new HiveContext(sc)

//创建teacher_info表

hiveContext.sql("CREATE TABLE teacher_info(" +

"name string, " +

"height double) " +

"ROW FORMAT DELIMITED " +

"FIELDS TERMINATED BY ','")

hiveContext.sql("CREATE TABLE teacher_basic(" +

"name string, " +

"age int, " +

" married boolean, " +

"children int) " +

"ROW FORMAT DELIMITED " +

"FIELDS TERMINATED BY ','")

// 向表中加载数据

hiveContext.sql("LOAD DATA LOCAL INPATH '/home/uplooking/data/hive/sql/teacher_info.txt' INTO TABLE teacher_info")

hiveContext.sql("LOAD DATA LOCAL INPATH '/home/uplooking/data/hive/sql/teacher_basic.txt' INTO TABLE teacher_basic")

//第二步操作 计算两张表的关联数据

val joinDF = hiveContext.sql("SELECT " +

"b.name, " +

"b.age, " +

"if(b.married, '已婚', '未婚') as married, " +

"b.children, " +

"i.height " +

"FROM teacher_info i " +

"INNER JOIN teacher_basic b ON i.name = b.name")

joinDF.collect().foreach(println)

joinDF.write.saveAsTable("teacher")

sc.stop()

}

}

打包、上传与配置

打包后上传到集群环境中,然后针对Spark做如下配置:

在集群中执行hive操作的时候,需要以下配置:

1、将hive-site.xml拷贝到spark/conf目录下,将mysql connector拷贝到spark/lib目录下

2、在$SPARK_HOME/conf/spark-env.sh中添加一条记录

export SPARK_CLASSPATH=$SPARK_CLASSPATH:$SPARK_HOME/lib/mysql-connector-java-5.1.39.jar

提交spark作业

使用的spark提交作业的脚本如下:

[uplooking@uplooking01 spark]$ cat spark-submit-standalone.sh

#export HADOOP_CONF_DIR=/home/uplooking/app/hadoop/etc/hadoop

/home/uplooking/app/spark/bin/spark-submit \

--class $2 \

--master spark://uplooking02:7077 \

--executor-memory 1G \

--num-executors 1 \

$1 \

执行如下命令:

./spark-submit-standalone.sh spark-hive.jar cn.xpleaf.bigdata.spark.scala.sql.p2._01HiveContextOps

验证

可以在作业执行的输出结果有看到我们期望的输出,也可以直接在Hive中操作来进行验证:

hive> show tables;

OK

hpeople

people

t1

teacher

teacher_basic

teacher_info

Time taken: 0.03 seconds, Fetched: 6 row(s)

hive> select * from teacher;

OK

zhangsan 23 未婚 0 175.0

lisi 24 未婚 0 180.0

wangwu 25 未婚 0 175.0

zhaoliu 26 已婚 1 195.0

zhouqi 27 已婚 2 165.0

weiba 28 已婚 3 185.0

Time taken: 0.369 seconds, Fetched: 6 row(s)

Spark和ES的集成

需要确保ElasticSearch环境已经搭建好。

测试代码如下:

package cn.xpleaf.bigdata.spark.scala.sql.p2

import org.apache.log4j.{Level, Logger}

import org.apache.spark.sql.SQLContext

import org.apache.spark.{SparkConf, SparkContext}

import org.elasticsearch.spark.sql._

import org.elasticsearch.spark._

/**

* Spark和ES的集成操作

* 引入Spark和es的maven依赖

* elasticsearch-hadoop

* 2.3.0

* 将account.json加载到es的索引库spark/account

* 可以参考官方文档:https://www.elastic.co/guide/en/elasticsearch/hadoop/2.3/spark.html

*/

object _02SparkElasticSearchOps {

def main(args: Array[String]): Unit = {

Logger.getLogger("org.apache.spark").setLevel(Level.OFF)

val conf = new SparkConf()

.setAppName(_02SparkElasticSearchOps.getClass().getSimpleName)

.setMaster("local[2]")

/**

* Spark和es的集成配置

*/

conf.set("es.index.auto.create", "true")

conf.set("es.nodes", "uplooking01")

conf.set("es.port", "9200")

val sc = new SparkContext(conf)

val sqlContext = new SQLContext(sc)

// write2ES(sqlContext)

readFromES(sc)

sc.stop()

}

/**

* 从es中读数据

* (使用sparkContext进行操作)

*/

def readFromES(sc:SparkContext): Unit = {

val resources = "spark/account" // 索引库/类型

val jsonRDD = sc.esJsonRDD(resources)

jsonRDD.foreach(println)

}

/**

* 向es中写入数据

* (使用sqlContext进行操作)

*/

def write2ES(sqlContext:SQLContext): Unit = {

val jsonDF = sqlContext.read.json("D:/data/spark/sql/account.json")

val resources = "spark/account" // 索引库/类型

jsonDF.saveToEs(resources)

}

}

Spark SQL函数

概述(Spark 1.5.X ~ 1.6.X的内置函数)

使用Spark SQL中的内置函数对数据进行分析,Spark SQL API不同的是,DataFrame中的内置函数操作的结果是返回一个Column对象,而DataFrame天生就是"A distributed collection of data organized into named columns.",这就为数据的复杂分析建立了坚实的基础并提供了极大的方便性,例如说,我们在操作DataFrame的方法中可以随时调用内置函数进行业务需要的处理,这之于我们构建附件的业务逻辑而言是可以极大的减少不必须的时间消耗(基于上就是实际模型的映射),让我们聚焦在数据分析上,这对于提高工程师的生产力而言是非常有价值的Spark 1.5.x开始提供了大量的内置函数,还有max、mean、min、sum、avg、explode、size、sort_array、day、to_date、abs、acos、asin、atan

总体上而言内置函数包含了五大基本类型:

1、聚合函数,例如countDistinct、sumDistinct等;

2、集合函数,例如sort_array、explode等

3、日期、时间函数,例如hour、quarter、next_day

4、数学函数,例如asin、atan、sqrt、tan、round等;

5、开窗函数,例如rowNumber等

6、字符串函数,concat、format_number、rexexp_extract

7、其它函数,isNaN、sha、randn、callUDF

以下为Hive中的知识内容,但是显然Spark SQL也有同样的概念

UDF

用户自定义函数:User Definded Function

一路输入,一路输出

a--->A

strlen("adbad")=5

UDAF

用户自定义聚合函数:User Definded Aggregation Function

多路输入,一路输出

sum(a, b, c, d)---->汇总的结果

表函数

UDTF:用户自定义表函数:User Definded Table Function

多路输入,多路输出

"hello you"

"hello me" ---->转换操作,----->split("")---->Array[]

["hello, "you"]--->

"hello"

"you"

---->行列转换

一个基本的案例如下:

package cn.xpleaf.bigdata.spark.scala.sql.p2

import org.apache.log4j.{Level, Logger}

import org.apache.spark.{SparkConf, SparkContext}

import org.apache.spark.sql.SQLContext

/**

* SparkSQL 内置函数操作

*/

object _03SparkSQLFunctionOps {

def main(args: Array[String]): Unit = {

Logger.getLogger("org.apache.spark").setLevel(Level.OFF)

val conf = new SparkConf()

.setAppName(_03SparkSQLFunctionOps.getClass().getSimpleName)

.setMaster("local[2]")

val sc = new SparkContext(conf)

val sqlContext = new SQLContext(sc)

val pdf = sqlContext.read.json("D:/data/spark/sql/people.json")

pdf.show()

pdf.registerTempTable("people")

// 统计人数

sqlContext.sql("select count(1) from people").show()

// 统计最小年龄

sqlContext.sql("select age, " +

"max(age) as max_age, " +

"min(age) as min_age, " +

"avg(age) as avg_age, " +

"count(age) as count " +

"from people group by age order by age desc").show()

sc.stop()

}

}

输出结果如下:

+---+------+-------+

|age|height| name|

+---+------+-------+

| 10| 168.8|Michael|

| 30| 168.8| Andy|

| 19| 169.8| Justin|

| 32| 188.8| Jack|

| 10| 158.8| John|

| 19| 179.8| Domu|

| 13| 179.8| 袁帅|

| 30| 175.8| 殷杰|

| 19| 179.9| 孙瑞|

+---+------+-------+

18/05/09 17:53:23 INFO FileInputFormat: Total input paths to process : 1

+---+

|_c0|

+---+

| 9|

+---+

18/05/09 17:53:24 INFO FileInputFormat: Total input paths to process : 1

+---+-------+-------+-------+-----+

|age|max_age|min_age|avg_age|count|

+---+-------+-------+-------+-----+

| 32| 32| 32| 32.0| 1|

| 30| 30| 30| 30.0| 2|

| 19| 19| 19| 19.0| 3|

| 13| 13| 13| 13.0| 1|

| 10| 10| 10| 10.0| 2|

+---+-------+-------+-------+-----+

Spark SQL开窗函数

1、Spark 1.5.x版本以后,在Spark SQL和DataFrame中引入了开窗函数,比如最经典的就是我们的row_number(),可以让我们实现分组取topn的逻辑。

2、做一个案例进行topn的取值(利用Spark的开窗函数),不知道同学们是否还有印象,我们之前在最早的时候,做过topn的计算,当时是非常麻烦的。但是现在用了Spark SQL之后,非常方便。

Spark SQL之UDF操作

测试代码如下:

package cn.xpleaf.bigdata.spark.scala.sql.p2

import org.apache.log4j.{Level, Logger}

import org.apache.spark.sql.types.{DataTypes, StructField, StructType}

import org.apache.spark.sql.{Row, SQLContext}

import org.apache.spark.{SparkConf, SparkContext}

/**

* SparkSQL 内置函数操作

*/

object _04SparkSQLFunctionOps {

def main(args: Array[String]): Unit = {

Logger.getLogger("org.apache.spark").setLevel(Level.OFF)

val conf = new SparkConf()

.setAppName(_04SparkSQLFunctionOps.getClass().getSimpleName)

.setMaster("local[2]")

val sc = new SparkContext(conf)

val sqlContext = new SQLContext(sc)

/**

* hive中的用户自定义函数UDF操作(即在SparkSQL中类比hive来进行操作,因为hive和SparkSQL都是交互式计算)

* 1.创建一个普通的函数

* 2.注册(在SqlContext中注册)

* 3.直接使用即可

*

* 案例:创建一个获取字符串长度的udf

*/

// 1.创建一个普通的函数

def strLen(str:String):Int = str.length

// 2.注册(在SqlContext中注册)

sqlContext.udf.register[Int, String]("myStrLen", strLen)

val list = List("Hello you", "Hello he", "Hello me")

// 将RDD转换为DataFrame

val rowRDD = sqlContext.sparkContext.parallelize(list).flatMap(_.split(" ")).map(word => {

Row(word)

})

val scheme = StructType(List(

StructField("word", DataTypes.StringType, false)

))

val df = sqlContext.createDataFrame(rowRDD, scheme)

df.registerTempTable("test")

// 3.直接使用即可

sqlContext.sql("select word, myStrLen(word) from test").show()

sc.stop()

}

}

输出结果如下:

+-----+---+

| word|_c1|

+-----+---+

|Hello| 5|

| you| 3|

|Hello| 5|

| he| 2|

|Hello| 5|

| me| 2|

+-----+---+

Spark SQL之wordcount操作

测试代码如下:

package cn.xpleaf.bigdata.spark.scala.sql.p2

import org.apache.log4j.{Level, Logger}

import org.apache.spark.sql.types.{DataTypes, StructField, StructType}

import org.apache.spark.{SparkConf, SparkContext}

import org.apache.spark.sql.{Row, SQLContext}

/**

* 这两部分都比较重要:

* 1.使用SparkSQL完成单词统计操作

* 2.开窗函数使用

*/

object _05SparkSQLFunctionOps2 {

def main(args: Array[String]): Unit = {

Logger.getLogger("org.apache.spark").setLevel(Level.OFF)

val conf = new SparkConf()

.setAppName(_05SparkSQLFunctionOps2.getClass().getSimpleName)

.setMaster("local[2]")

val sc = new SparkContext(conf)

val sqlContext = new SQLContext(sc)

val list = List("Hello you", "Hello he", "Hello me")

// 将RDD转换为DataFrame

val rowRDD = sqlContext.sparkContext.parallelize(list).map(line => {

Row(line)

})

val scheme = StructType(List(

StructField("line", DataTypes.StringType, false)

))

val df = sqlContext.createDataFrame(rowRDD, scheme)

df.registerTempTable("test")

df.show()

// 执行wordcount

val sql = "select t.word, count(1) as count " +

"from " +

"(select " +

"explode(split(line, ' ')) as word " +

"from test) as t " +

"group by t.word order by count desc"

sqlContext.sql(sql).show()

sc.stop()

}

}

输出结果如下:

+---------+

| line|

+---------+

|Hello you|

| Hello he|

| Hello me|

+---------+

+-----+-----+

| word|count|

+-----+-----+

|Hello| 3|

| me| 1|

| he| 1|

| you| 1|

+-----+-----+

sparksql 保存点_Spark SQL笔记整理(三):加载保存功能与Spark SQL函数相关推荐

  1. spark SQL(三)数据源 Data Source----通用的数据 加载/保存功能

    Spark SQL 的数据源------通用的数据 加载/保存功能 Spark SQL支持通过DataFrame接口在各种数据源上进行操作.DataFrame可以使用关系变换进行操作,也可以用来创建临 ...

  2. 7.3 TensorFlow笔记(基础篇):加载数据之从队列中读取

    前言 整体步骤 在TensorFlow中进行模型训练时,在官网给出的三种读取方式,中最好的文件读取方式就是将利用队列进行文件读取,而且步骤有两步: 1. 把样本数据写入TFRecords二进制文件 2 ...

  3. 7.1 TensorFlow笔记(基础篇):加载数据之预加载数据与填充数据

    TensorFlow加载数据 TensorFlow官方共给出三种加载数据的方式: 1. 预加载数据 2. 填充数据 预加载数据的缺点: 将数据直接嵌在数据流图中,当训练数据较大时,很消耗内存.填充的方 ...

  4. 【EF学习笔记07】----------加载关联表的数据 贪婪加载

    [EF学习笔记07]----------加载关联表的数据 贪婪加载 讲解之前,先来看一下我们的数据库结构:班级表 学生表 贪婪加载 //贪婪加载 using (var db = new Entitie ...

  5. Pandas将dataframe保存为pickle文件并加载保存后的pickle文件查看dataframe数据实战

    Pandas将dataframe保存为pickle文件并加载保存后的pickle文件查看dataframe数据实战 目录 Pandas将dataframe保存为pickle文件并加载保存后的pickl ...

  6. 加载执行预编译的Sql :prepareStatement

    1.获得连接:Connection con = null; con = DBUtil.getConnection(); 2.写sql语句:String sql=""; 3.用连接加 ...

  7. VS2010不能正确加载 'VSTS for Database Professionals Sql Server Data-tier Application'的解决方法...

    VS10出了点问题,卸载重装之后启动,弹出"VS2010不能正确加载 'VSTS for Database Professionals Sql Server Data-tier Applic ...

  8. 01_01 python机器学习_第一章学习内容整理_加载样本数据绘制散点图

    第一章学习内容整理_加载样本数据&绘制散点图 01 常用包说明 python可以解决很多问题,相应解决方案使用的包也很多,不太好记忆. 为了便于记忆,用大白话简单描述一下各个包的功能. # 科 ...

  9. SQL语句整理三--hive

    文章目录 字符串拼接: split函数(分割字符串): Hive中的replace方法: 行列转换: 创建数据库: 创建表: 添加或删除字段: insert into 和 insert overwri ...

  10. python爬虫动态加载页面_python3的爬虫笔记8——动态加载页面爬虫

    其实大部分主流网站都不是静态的html,html和Javascript相结合已经是大势所趋. 本篇以花瓣网主页为例子. 花瓣网主页,右键查看网页源代码,获得的页面是这样的: 如果还是用之前静态页面的那 ...

最新文章

  1. 终于收到微软的衬衫了!!!
  2. phpMyAdmin安装部署
  3. 温州大学《深度学习》课程课件(一)
  4. javascript返回上一页的三种写法
  5. SAP Spartacus powertools-spa site在Commerce Cloud后台的属性
  6. 阿里云视频点播解决方案使用教程
  7. hadoop版本升级到2.4.1
  8. 渗透测试攻击(二)——wireshark过滤数据包语法详解
  9. 经典人生感悟 看看你少了那一条
  10. gcc include lib路径扩展
  11. Kepware配置OPC UA实现匿名or用户名/密码连接
  12. 电子海图信息系统 (ECDIS)的发展及应用
  13. 通过ahocorasick快速构建一棵actree(AC自动机)
  14. 1395786-30-7,DBCO Maleimide,DBCO-Mal
  15. linux远程主机拒绝连接,linux – Telnet [无法连接到远程主机:拒绝连接]
  16. Java中mongodb指定DB通过aggregate聚合查询操作示例
  17. vivox21支持html,vivo X21支持快充吗_vivo X21支持无线充电吗-太平洋IT百科
  18. 如何从 Android 手机恢复丢失的联系人
  19. 石墨笔记,为知笔记和Effie哪个更适合采编?
  20. Graphite详解

热门文章

  1. 3D目标检测多模态融合综述
  2. 后缀表达式转中缀表达式(非常简单易懂)
  3. PCL中把点云拟合成曲面(附源代码)
  4. LeetCode之同构字符串
  5. 奇怪的比赛|2012年蓝桥杯B组题解析第四题-fishers
  6. python学习:猜数字游戏
  7. wxpython记录生词GUI程序
  8. HDR色调映射(一):基础概念
  9. leetcode刷题日记-leetcode刷题日记-71. 简化路径
  10. 斐波那契数列(剑指offer)