文章目录

  • Spark内存计算框架
    • Spark SQL
      • SparkSQL概述
        • 1. SparkSQL的前世今生
        • 2. 什么是 SparkSQL
      • SparkSQL的四大特性
        • 1. 易整合
        • 2. 统一的数据源访问
        • 3. 兼容Hive
        • 4. 支持标准的数据库连接
      • DataFrame概述
        • 1. DataFrame发展
        • 2. DataFrame是什么
        • 3. DataFrame和RDD的优缺点
          • RDD
          • DataFrame
      • 初识DataFrame
        • 1. 读取文件文件
        • 2. 读取json文件
        • 3. 读取parquet文件
        • 4. 通过StructType动态指定Schema
      • DataFrame常用操作
        • 1. DSL风格语法
        • 2. SQL风格语法(常用)
      • DataSet概述
        • 1. DataSet是什么
        • 2. 如何构建DataSet
        • 3. RDD、DataFrame、DataSet对比
      • 读取外部数据源
        • 1. SparkSQL读取MySQL数据
        • 2. Spark操作CSV文件并将结果写入MySQL
        • 3. Spark与Hive整合
          • spark整合hive——通过SparkSql-shell
          • spark的thrift server与hive进行远程交互
        • 4. 读写Hive数据
        • 5. 读写HBase数据
      • SparkSQL自定义函数
        • 1. 自定义UDF函数:一对一
        • 2. 自定义UDAF函数:多对一
        • 3. 自定义UDTF函数:一对多

Spark内存计算框架

Spark SQL

SparkSQL概述

1. SparkSQL的前世今生

  • Shark 是专门针对于 spark 的,构建大规模数据仓库系统的一个框架。
  • Shark 依赖 hive、与 Hive 兼容、同时也依赖于 Spark 版本。
  • HiveSql底层把 sql 解析成了 mapreduce 程序,Shark 是把 sql 语句解析成了 Spark 任务。
  • 随着性能优化的上限,以及集成 SQL 的一些复杂的分析功能,发现 Hive 的 MapReduce 思想限制了 Shark 的发展。
  • 最后 Databricks 公司终止对 Shark 的开发,决定单独开发一个框架,不再依赖 hive,把重点转移到了 sparksql 这个框架上。

2. 什么是 SparkSQL

  • 官方文档:https://spark.apache.org/sql/

Spark SQL is Apache Spark’s module for working with structured data.

  • SparkSQL是 Apache Spark 用来处理结构化数据的一个模块。

SparkSQL的四大特性

1. 易整合

  • 将 SQL 查询与Spark 程序无缝混合

    • 即对结构化数据进行查询,可以使用 sql 分析;
    • 也可以使用 DataFrame、DataSet api;
    • 可以使用不同的语言进行代码开发java、scala、python、R

2. 统一的数据源访问

  • 以相同的方式(相同风格的API)连接到任何数据源
// sparksql 可以采用一种统一的方式去对接任意的外部数据源
val dataFrame = sparkSession.read.文件格式的方法名("该文件格式的路径")

3. 兼容Hive

  • Spark 支持 SQL 以及 HiveQL 语法;
  • 支持 Hive SerDes;
  • 支持 UDF;
  • 可以接入已存在的 Hive 数仓;
  • Spark SQL 使用 Hive 的 metastore 服务。

4. 支持标准的数据库连接

  • Spark SQL 支持标准的数据库连接JDBC或者ODBC。

DataFrame概述

  • Spark Core:操作 RDD ==>> 封装了数据 ==>> 对应的操作入口类 SparkContext。
  • Spark SQL:编程抽象 DataFrame ==>> 对应的操作入口类 SparkSession。
  • 从 Spark 2.0 开始,SparkSession 是 Spark 新的查询起始点,其内部封装了 SparkContext,所以计算的本质还是由 SparkContext 完成。

1. DataFrame发展

  • DataFrame 的前身是 schemaRDD,这个 schemaRDD 是直接继承自 RDD,它是 RDD 的一个实现类。
  • 在 Spark 1.3.0 之后把 schemaRDD 改名为 DataFrame
    • 它不再继承自 RDD;
    • 而是自己实现 RDD 上的一些功能。
  • 也可以把 DataFrame 转换成一个 RDD:通过调用 DataFrame 的一个方法 val rdd = dataFrame.rdd

2. DataFrame是什么

  • 在 Spark 中,DataFrame 是一种以 RDD 为基础的分布式数据集,类似于传统数据库的二维表格。
  • DataFrame 带有数据的结构信息:
    • Schema元信息;
    • DataFrame所表示的二维表数据集的每一列都带有名称和类型,但底层做了更多的优化。
  • DataFrame 可以从很对数据源构建,比如:已经存在的RDD、结构化文件、外部数据库、Hive表等。
  • RDD 可以把它内部元素看成是一个java对象,DataFrame 可以把内部看成是一个个Row对象,它表示一行一行的数据。

  • 总结:

    • 可以把 DataFrame 这样去理解:RDD + schema 元信息
    • dataFrame相比于rdd来说,多了对数据的描述信息(schema元信息)。

3. DataFrame和RDD的优缺点

RDD
  • 优点:

    • 编译时类型安全:开发会进行类型检查,在编译的时候及时发现错误
    • 具有面向对象编程的风格
  • 缺点:
    • font color=red>构建大量的java对象占用了大量heap堆空间,导致频繁的GC

      • 由于数据集RDD它的数据量比较大,后期都需要存储在heap堆中,这里有heap堆中的内存空间有限,出现频繁的垃圾回收(GC)
      • 程序在进行垃圾回收的过程中,所有的任务都是暂停(STW stop the world),影响程序执行的效率
    • 数据的序列化和反序列性能开销很大
      • 在分布式程序中,对象(对象的内容和结构)是先进行序列化,发送到其他服务器,进行大量的网络传输
      • 然后接受到这些序列化的数据之后,再进行反序列化来恢复该对象
DataFrame
  • 优点:

    • DataFrame引入了schema元信息和off-heap(堆外内存)
    • DataFrame引入off-heap
      • 大量的对象构建直接使用操作系统层面上的内存,不在使用heap堆中的内存
      • 这样一来heap堆中的内存空间就比较充足,不会导致频繁GC,程序的运行效率比较高
      • 它是解决了RDD构建大量的java对象占用了大量heap堆空间,避免导致频繁的GC这个缺点。
    • DataFrame引入了schema元信息:就是数据结构的描述信息
      • spark程序中的大量对象在进行网络传输的时候,只需要把数据的内容本身进行序列化就可以,数据结构信息可以省略掉
      • 这样一来数据网络传输的数据量是有所减少
      • 数据的序列化和反序列性能开销就不是很大了
      • 它是解决了RDD数据的序列化和反序列性能开销很大这个缺点
  • 缺点:DataFrame引入了schema元信息和off-heap(堆外)它是分别解决了RDD的缺点,同时它也丢失了RDD的优点
    • 编译时类型不安全:编译时不会进行类型的检查,这里也就意味着前期是无法在编译的时候发现错误,只有在运行的时候才会发现
    • 不在具有面向对象编程的风格:类似二维表

初识DataFrame

1. 读取文件文件

  • resources目录下创建文件 person.txt,内容如下
1 youyou 38
2 Tony 25
3 laowang 18
4 dali 30
  • 代码实现:
object Case01_ReadText {def main(args: Array[String]): Unit = {// 创建 SparkSessionval spark = SparkSession.builder().appName(this.getClass.getSimpleName).config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[*]").getOrCreate()val df: DataFrame = spark.read.text(this.getClass.getClassLoader.getResource("person.txt").getPath)/*** 打印schema信息* root* |-- value: string (nullable = true)*/df.printSchemaprintln("----------------")println(df.count()) // 4/*** +------------+* |       value|* +------------+* | 1 youyou 38|* |   2 Tony 25|* |3 laowang 18|* |   4 dali 30|* +------------+*/println("----------------")df.show()ss.stop()}
}
  • 改造代码,输出成对象形式的二维表格
case class Person(id: String, name: String, age: Int)object Case02_ReadTextV2 {def main(args: Array[String]): Unit = {// 创建 SparkSessionval spark = SparkSession.builder().appName(this.getClass.getSimpleName).config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[*]").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")// 添加隐式转换import spark.implicits._val rdd1: RDD[Array[String]] = sc.textFile(this.getClass.getClassLoader.getResource("person.txt").getPath).map(x => x.split(" "))// 将 rdd 与样例类进行关联val personRDD: RDD[Person] = rdd1.map(x => Person(x(0), x(1), x(2).toInt))// 将 rdd 转成 DataFrameval df = personRDD.toDF/*** root* |-- id: string (nullable = true)* |-- name: string (nullable = true)* |-- age: integer (nullable = false)*/df.printSchema()/*** +---+-------+---+* | id|   name|age|* +---+-------+---+* |  1| youyou| 38|* |  2|   Tony| 25|* |  3|laowang| 18|* |  4|   dali| 30|* +---+-------+---+*/df.show()spark.stop()}
}

2. 读取json文件

  • 在 resources 目录新建 person.json 文件,内容如下:
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
  • 代码实现
object Case03_ReadJson {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName(this.getClass.getSimpleName).config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[*]").getOrCreate()spark.sparkContext.setLogLevel("WARN")val df: DataFrame = spark.read.json(this.getClass.getClassLoader.getResource("person.json").getPath)/*** root* |-- age: long (nullable = true)* |-- name: string (nullable = true)*/df.printSchemaprintln("--------------")/*** +----+-------+* | age|   name|* +----+-------+* |null|Michael|* |  30|   Andy|* |  19| Justin|* +----+-------+*/df.show()spark.stop()}
}

3. 读取parquet文件

  • Spark 自带样例文件 spark-2.3.3-bin-hadoop2.7/examples/src/main/resources/users.parquet 复制到自己的工程的 resources 目录
  • 代码实现:
object Case04_ReadParquet {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName(this.getClass.getSimpleName).config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[*]").getOrCreate()spark.sparkContext.setLogLevel("WARN")val df: DataFrame = spark.read.parquet(this.getClass.getClassLoader.getResource("users.parquet").getPath)/*** root* |-- name: string (nullable = true)* |-- favorite_color: string (nullable = true)* |-- favorite_numbers: array (nullable = true)* |    |-- element: integer (containsNull = true)*/df.printSchema/*** +------+--------------+----------------+* |  name|favorite_color|favorite_numbers|* +------+--------------+----------------+* |Alyssa|          null|  [3, 9, 15, 20]|* |   Ben|           red|              []|* +------+--------------+----------------+*/df.showspark.stop()}
}

4. 通过StructType动态指定Schema

  • 应用场景:在开发代码之前,无法确定需要的 DataFrame 对应的 Schema 元信息,这时需要在开发代码的过程中指定。
  • 代码实现:
object Case05_StructTypeSchema {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName(this.getClass.getSimpleName).master("local[*]").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")val rdd: RDD[Array[String]] = sc.textFile(this.getClass.getClassLoader.getResource("person.txt").getPath).map(x => x.split(" "))// 将rdd与Row对象关联val rowRDD: RDD[Row] = rdd.map(x => Row(x(0), x(1), x(2).toInt))// 指定dataFrame的schema信息,这里指定的字段个数和类型必须要跟Row对象保持一致val schema = StructType(StructField("id", StringType) ::StructField("name", StringType) ::StructField("age", IntegerType) :: Nil)// 利用rdd生成DataFrameval df: DataFrame = spark.createDataFrame(rowRDD, schema)/*** root* |-- id: string (nullable = true)* |-- name: string (nullable = true)* |-- age: integer (nullable = true)*/df.printSchema/*** +---+-------+---+* | id|   name|age|* +---+-------+---+* |  1| youyou| 38|* |  2|   Tony| 25|* |  3|laowang| 18|* |  4|   dali| 30|* +---+-------+---+*/df.show()// 用sql的方式查询结构化数据df.createTempView("person")/*** +---+-------+---+* | id|   name|age|* +---+-------+---+* |  1| youyou| 38|* |  2|   Tony| 25|* |  3|laowang| 18|* |  4|   dali| 30|* +---+-------+---+*/spark.sql("select * from person").show()spark.stop()}
}

DataFrame常用操作

1. DSL风格语法

  • 就是 sparksql 中的 DataFrame 自身提供了一套自己的 Api,可以去使用这套 api 来做相应的处理
  • 在 Case02_ReadTextV2.scala 中:
 // 将 rdd 转成 DataFrameval personDF = personRDD.toDFpersonDF.printSchema()personDF.show()/************************** DSL风格语法 start *************************/// 1. 查询指定字段personDF.select("name").showpersonDF.select($"name").show// 2. 实现 age+1personDF.select($"name", $"age", $"age" + 1).show// 3. 实现 age>30 过滤personDF.filter($"age" > 30).show// 4. 按照 age 分组统计personDF.groupBy("age").count.show// 5. 按照age分组统计次数降序personDF.groupBy("age").count().sort($"age".desc).show/************************** DSL风格语法 end *************************/

2. SQL风格语法(常用)

  • 可以把 DataFrame 注册成一张表,然后通过 sparkSession.sql(sql语句) 操作
    /************************** SQL风格语法 start *************************/// 1. DataFrame注册成表personDF.createTempView("person")// 2. 使用SparkSession调用sql方法统计查询spark.sql("select * from person").showspark.sql("select name from person").showspark.sql("select name, age from person").showspark.sql("select * from person where age > 30").showspark.sql("select count(*) from person where age > 30").showspark.sql("select age, count(*) from person group by age").showspark.sql("select age, count(*) as count from person group by age").showspark.sql("select * from person order by age desc").show/************************** SQL风格语法 end *************************/

DataSet概述

1. DataSet是什么

  • DataSet 是分布式的数据集合,Dataset 提供了强类型支持,也是在 RDD 的每行数据加了类型约束。
  • DataSet 是 DataFrame 的一个扩展,是 SparkSQL1.6 后新增的数据抽象,API 友好
    • 它集中了 RDD 的优点(强类型和可以用强大lambda函数)
    • 以及使用了 Spark SQL 优化的执行引擎。
  • DataFrame 是 DataSet 的特例,type DataFrame=DataSet[Row]
    • 可以通过 as 方法将 DataFrame 转换成 DataSet
    • Row 是一个类型,可以是 Person、Animal,所有的表结构信息都用 Row 来表示
  • 优点:
    • DataSet 可以在编译时检查类型
    • 并且是面向对象的编程接口

2. 如何构建DataSet

  • 方式一:通过 sparkSession 调用 createDataset 方法
val ds = spark.createDataset(1 to 10)   // scala 集合
ds.showval ds = spark.createDataset(sc.textFile("/person.txt"))  //rdd
ds.show
  • 方式二:使用 scala 集合和 rdd 调用 toDS 方法
sc.textFile("/person.txt").toDS
List(1,2,3,4,5).toDS
  • 方式三:把一个 DataFrame 转换成 DataSet
val ds = dataFrame.as[强类型]
  • 方式四:通过一个 DataSet 转换生成一个新的 DataSet
List(1,2,3,4,5).toDS.map(x => x * 10)

3. RDD、DataFrame、DataSet对比

关系是怎样的?

  • 首先,Spark RDD、DataFrame 和 DataSet 是 Spark 的三类 API,他们的发展过程:

    • DataFrame 是 spark1.3.0 版本提出来的,spark1.6.0 版本又引入了 DateSet;
    • 但是在 spark2.0 版本中,DataFrame 和 DataSet 合并为DataSet。
  • 那么你可能会问了:那么,在2.0以后的版本里,RDD是不是不需要了呢?
    • 答案是:NO!
    • 首先,DataFrame 和 DataSet 都是基于 RDD 的,而且这三者之间可以通过简单的API调用进行无缝切换。

数据有什么区别?

三者 API 特点

  • RDD

    • 优点:相比于传统的 MapReduce 框架,Spark 在 RDD 中内置很多函数操作,group、map、filter等,方便处理结构化或非结构化数据。面向对象编程,直接存储的 java 对象,类型转化也安全。
    • 缺点:由于它基本和 hadoop 一样万能的,因此没有针对特殊场景的优化,比如对于结构化数据处理相对于 sql 来比非常麻烦。默认采用的是 java 序列号方式,序列化结果比较大,而且数据存储在 java 堆内存中,导致 gc 比较频繁。
  • DataFrame
    • 优点:结构化数据处理非常方便,支持Avro、CSV、ElasticSearch 和 Cassandra 等 kv 数据,也支持 HIVE tables、MySQL 等传统数据表。有针对性的优化:采用 Kryo 序列化;由于数据结构元信息 spark 已经保存,序列化时不需要带上元信息,大大的减少了序列化大小;而且数据保存在堆外内存中,减少了 gc 次数,所以运行更快。hive兼容,支持hql、udf等。
    • 缺点:编译时不能类型转化安全检查,运行时才能确定是否有问题。对于对象支持不友好,rdd 内部数据直接以 java 对象存储,dataframe 内存存储的是 row 对象而不能是自定义对象。
  • DataSet
    • 优点:DateSet 整合了 RDD 和 DataFrame 的优点,支持结构化和非结构化数据。和 RDD 一样,支持自定义对象存储。和 ataFrame 一样,支持结构化数据的 sql 查询。采用了堆外内存存储,gc 友好。类型转化安全,代码友好。

三者如何相互转换

  • 涉及到RDD,DataFrame,DataSet之间操作时,需要隐式转换导入:import spark.implicits._
  • 这里的 spark 不是包名,而是代表了 SparkSession 的那个对象名,所以必须先创建 SparkSession 对象再导入

case class Person(id: String, name: String, age: Int)object Case06_SparkConversion {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")val spark = SparkSession.builder().config(conf).getOrCreate()val sc = spark.sparkContextsc.setLogLevel("WARN")// 隐式转换import spark.implicits._val rdd = sc.textFile(this.getClass.getClassLoader.getResource("person.txt").getPath).map(x => x.split(" "))// 把rdd与样例类进行关联val personRDD = rdd.map(x => Person(x(0), x(1), x(2).toInt))// 1. rdd -> dfval df1 = personRDD.toDFdf1.show// 2. rdd -> dsval ds1 = personRDD.toDSds1.show// 3. df -> rddval rdd1 = df1.rddprintln(rdd1.collect.toList)// 4. ds -> rddval rdd2 = ds1.rddprintln(rdd2.collect.toList)// 5. ds -> dfval df2: DataFrame = ds1.toDFdf2.show// df -> dsval ds2: Dataset[Person] = df2.as[Person]ds2.showspark.stop()}
}

读取外部数据源

1. SparkSQL读取MySQL数据

  • Spark SQL 可以通过 JDBC 从关系型数据库中读取数据的方式创建 DataFrame,通过对 DataFrame 一些列的计算后,还可以将数据再写回关系型数据库中。
  • 代码示例:
/*** 使用 SparkSQL读写MySQL表中的数据*/
object Case07_ReadMySQL {def main(args: Array[String]): Unit = {// 1. 创建 SparkConf 对象val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")// 2. 创建 SparkSession 对象val spark = SparkSession.builder().config(conf).getOrCreate()// 3. 创建 DataFrameval url = "jdbc:mysql://192.168.254.132:3306/mydb?characterEncoding=UTF-8"val tableName = "jobdetail"val props = new Properties()props.setProperty("user", "root")props.setProperty("password", "123456")val mysqlDF: DataFrame = spark.read.jdbc(url, tableName, props)// 4. 读取 MySQL 表中的数据// 4.1 打印schema信息mysqlDF.printSchema()// 4.2 展示数据mysqlDF.show()// 4.3 将dataFrame注册成表mysqlDF.createTempView("job_detail")spark.sql("select * from job_detail where city = '广东'").show()spark.stop()}
}

2. Spark操作CSV文件并将结果写入MySQL

object Case08_ReadCsvWriteMySQL {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()spark.sparkContext.setLogLevel("WARN")val df: DataFrame = spark.read.format("csv").option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ") // 时间转换.option("header", "true") // 第一行数据都是head(字段属性的意思)
//      .option("multiLine", "true") // 数据可能换行.load(this.getClass.getClassLoader.getResource("data").getPath)df.createOrReplaceTempView("job_detail")spark.sql("select job_name,job_url,job_location,job_salary,job_company,job_experience,job_class,job_given,job_detail,company_type,company_person,search_key,city from job_detail where job_company = '北京无极慧通科技有限公司'").show(80)val props = new Properties()props.put("user", "root")props.put("password", "123456")df.write.mode(SaveMode.Append).jdbc("jdbc:mysql://192.168.254.132:3306/mydb?useSSL=false&useUnicode=true&characterEncoding=UTF-8","mydb.jobdetail_copy", props)}
}

3. Spark与Hive整合

  • Spark on hive:Spark 通过 Spark SQL 使用 Hive 的语句操作 Hive,底层运行的还是 spark rdd:

    • 通过 spark sql,加载 hive 的配置文件,获取到 hive 的元数据信息;
    • spark sql 获取到 hive 的元数据信息之后就可以拿到 hive 的所有表的数据;
    • 接下来就可以通过 spark sql 来操作 hive 表中的数据。
  • Hive on spark:将 Hive 查询,从 MapReduce的 MR(Hadoop计算引擎) 操作替换为 spark rdd(spark执行引擎)操作。相对于 spark on hive,这个实现起来则麻烦很多,必须重新编译你的 spark 和导入 jar 包,不过目前大部分使用的是 spark on hive。
spark整合hive——通过SparkSql-shell
  • 拷贝 hive-site.xml 配置文件:将 node03 服务器安装的 hive 目录下 conf 文件夹下面的 hive-site.xml 拷贝到 spark 安装的各个机器节点,node03 执行以下命令进行拷贝
$ pwd
/bigdata/install/hive-3.1.2/conf
$ scp hive-site.xml node01:/bigdata/install/spark-2.3.3-bin-hadoop2.7/conf/
$ scp hive-site.xml node02:/bigdata/install/spark-2.3.3-bin-hadoop2.7/conf/
$ scp hive-site.xml node03:/bigdata/install/spark-2.3.3-bin-hadoop2.7/conf/
  • 拷贝 mysql 驱动包:将 hive 当中 mysql 的连接驱动包拷贝到 spark 安装家目录下的 jars 目录下,node03 执行下命令拷贝 mysql 的 lib 驱动包
$ ll mysql-connector-java-5.1.38.jar
-rw-rw-r--. 1 hadoop hadoop 983911 12月  6 2021 mysql-connector-java-5.1.38.jar
$ pwd
/bigdata/install/hive-3.1.2/lib
$ scp mysql-connector-java-5.1.38.jar node01:/bigdata/install/spark-2.3.3-bin-hadoop2.7/jars/
$ scp mysql-connector-java-5.1.38.jar node02:/bigdata/install/spark-2.3.3-bin-hadoop2.7/jars/
$ scp mysql-connector-java-5.1.38.jar node03:/bigdata/install/spark-2.3.3-bin-hadoop2.7/jars/
  • 进入 spark-sql 直接操作 hive 数据库当中的数据:

    • 在 spark2.0 版本后,由于出现了 sparkSession,在初始化 sqlContext 时,会设置默认的 spark.sql.warehouse.dir=spark-warehouse,此时将 hive 与 spark sql 整合完成后,在通过 spark-sql 脚本启动时,会在当前目录下创建一个 spark.sql.warehouse.dir 为 spark-warehouse 的目录,存放由 spark-sql 创建数据库和创建表的数据信息,与之前 hive 的数据信息不是放在同一个路径下(可以互相访问)。但是此时 spark-sql 中表的数据在本地,不利于操作,也不安全。
    • 所有在启动的时候需要加上下面这样一个参数,以保证 spark-sql 启动时不再产生新的存放数据的目录,sparksql 与 hive 最终使用的是 hive 统一存放数据的目录。
--conf spark.sql.warehouse.dir=hdfs://node01:8020/user/hive/warehouse
  • 在 hive 中创建表并插入数据:
CREATE EXTERNAL TABLE `student`(`ID` bigint COMMENT '',`CreatedBy` string COMMENT '创建人',`CreatedTime` string COMMENT '创建时间',`UpdatedBy` string COMMENT '更新人',`UpdatedTime` string COMMENT '更新时间',`Version` int COMMENT '版本号',`name` string COMMENT '姓名'
) COMMENT '学生表'
PARTITIONED BY (`dt` String COMMENT 'partition')
row format delimited fields terminated by '\t'
location '/student';INSERT INTO TABLE student partition(dt='2022-07-12') VALUES(1, "xxx", "2022-07-12", "", "", 1, "zhangsan");
INSERT INTO TABLE student partition(dt='2022-07-12') VALUES(2, "xxx", "2022-07-12", "", "", 2, "lisi");

  • 通过 shell 方式:node01 直接执行以下命令,进入 spark-sql 交互界面,然后操作 hive 当中的数据
$ spark-sql --master local[2] \
--executor-memory 512m --total-executor-cores 3 \
--conf spark.sql.warehouse.dir=hdfs://node01:8020/user/hive/warehouse# 执行查询
select * from student;

  • 通过脚本方式:编写如下脚本并执行
#!/bin/sh
# 定义 spark sql 提交脚本的头信息
SUBMIT_INFO="spark-sql --master spark://node01:7077 --executor-memory 1g --total-executor-cores 4 --conf spark.sql.warehouse.dir=hdfs://node01:8020/user/hive/warehouse"
# 定义一个 sql 语句
SQL="select * from student;"
# 执行 sql 语句,类似于 hive -e sql
echo "$SUBMIT_INFO"
echo "$SQL"
$SUBMIT_INFO -e "$SQL"
  • 执行:
$ sh spark_on_hive.sh

spark的thrift server与hive进行远程交互
  • 除了可以通过 spark-shell 来与 hive 进行整合之外,我们也可以通过 spark 的 thrift 服务来远程与 hive 进行交互。
  • node03 执行以下命令修改 hive-site.xml 的配置属性,添加以下几个配置
<property><name>hive.metastore.uris</name><value>thrift://node03:9083</value><description>Thrift URI for the remote metastore</description>
</property>
<property><name>hive.server2.thrift.min.worker.threads</name><value>5</value>
</property>
<property><name>hive.server2.thrift.max.worker.threads</name><value>500</value>
</property>
  • 修改完的配置文件后,分发到其他机器:
$ pwd
/bigdata/install/hive-3.1.2/conf
$ scp hive-site.xml node01:/bigdata/install/spark-2.3.3-bin-hadoop2.7/conf/
$ scp hive-site.xml node02:/bigdata/install/spark-2.3.3-bin-hadoop2.7/conf/
$ scp hive-site.xml node03:/bigdata/install/spark-2.3.3-bin-hadoop2.7/conf/
  • node03 启动 metastore 服务
hive --service metastore
  • node03 执行以下命令启动 spark 的 thrift server:hive 安装在哪一台,就在哪一台服务器启动spark 的 thrift server
$ pwd
/bigdata/install/spark-2.3.3-bin-hadoop2.7/sbin
$ ./start-thriftserver.sh --master local[*] --executor-memory 2g --total-executor-cores 5
  • 直接使用 beeline 来连接:直接在 node03 服务器上面使用 beeline 来进行连接 spark-sql
$ beeline --color=true
beeline> !connect jdbc:hive2://node03:10000
Connecting to jdbc:hive2://node03:10000
Enter username for jdbc:hive2://node03:10000: hadoop
Enter password for jdbc:hive2://node03:10000: ******

4. 读写Hive数据

  • 添加依赖:
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.11</artifactId><version>2.3.3</version>
</dependency>
  • 将服务端配置 hive-site.xml,放入到 idea 的 resources 目录下
  • 代码实现:
object Case09_SparkSQLOnHive {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName(this.getClass.getSimpleName).master("local[*]").enableHiveSupport() // 启用hive.config("spark.sql.warehouse.dir", "hdfs://node01:8020/user/hive/warehouse").getOrCreate()val df: DataFrame = spark.sql("select * from student")df.show()// 直接写表达式,通过 insert into 插入df.write.saveAsTable("student1")spark.sql("insert into student1 select * from student")}
}

5. 读写HBase数据

  • 需要添加依赖:
<dependency><groupId>org.json4s</groupId><artifactId>json4s-jackson_2.11</artifactId><version>3.3.0</version>
</dependency>
  • 创建 HBase 表,并插入数据:
create 'spark_hbase','info'
put 'spark_hbase','0001','info:name','tangseng'
put 'spark_hbase','0001','info:age','30'
put 'spark_hbase','0001','info:sex','0'
put 'spark_hbase','0001','info:addr','beijing'
put 'spark_hbase','0002','info:name','sunwukong'
put 'spark_hbase','0002','info:age','508'
put 'spark_hbase','0002','info:sex','0'
put 'spark_hbase','0002','info:addr','shanghai'
put 'spark_hbase','0003','info:name','zhubajie'
put 'spark_hbase','0003','info:age','715'
put 'spark_hbase','0003','info:sex','0'
put 'spark_hbase','0003','info:addr','shenzhen'
put 'spark_hbase','0004','info:name','bailongma'
put 'spark_hbase','0004','info:age','1256'
put 'spark_hbase','0004','info:sex','0'
put 'spark_hbase','0004','info:addr','donghai'
put 'spark_hbase','0005','info:name','shaheshang'
put 'spark_hbase','0005','info:age','1008'
put 'spark_hbase','0005','info:sex','0'
put 'spark_hbase','0005','info:addr','tiangong'create "spark_hbase_copy",'info'
  • 代码实现:
object Case10_SparkSQLOnHBase {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName(this.getClass.getSimpleName).master("local[*]").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").getOrCreate()spark.sparkContext.setLogLevel("WARN")import spark.implicits._val hconf: Configuration = HBaseConfiguration.createhconf.set(HConstants.ZOOKEEPER_QUORUM, "node01:2181,node02:2181,node03:2181")val hbaseContext = new HBaseContext(spark.sparkContext, hconf)// 定义映射的 catalogval catalog: String = "{\"table\":{\"namespace\":\"default\",\"name\":\"spark_hbase\"},\"rowkey\":\"key\",\"columns\":{\"f0\":{\"cf\":\"rowkey\",\"col\":\"key\",\"type\":\"string\"},\"f1\":{\"cf\":\"info\",\"col\":\"addr\",\"type\":\"string\"},\"f2\":{\"cf\":\"info\",\"col\":\"age\",\"type\":\"boolean\"},\"f3\":{\"cf\":\"info\",\"col\":\"name\",\"type\":\"string\"}}}";// 读取HBase数据val ds: DataFrame = spark.read.format("org.apache.hadoop.hbase.spark").option(HBaseTableCatalog.tableCatalog, catalog).load()ds.show(10)val catalogCopy: String = catalog.replace("spark_hbase", "spark_hbase_out")// 数据写入HBaseds.write.format("org.apache.hadoop.hbase.spark").option(HBaseTableCatalog.tableCatalog, catalogCopy).mode(SaveMode.Overwrite).save()}
}

SparkSQL自定义函数

  • 用户自定义函数类别分为以下三种:

    • ① UDF:输入一行,返回一个结果(一对一)
    • ② UDAF:输入多行,返回一行,这里的是 aggregate,聚合的意思,如果业务复杂,需要自己实现聚合函数
    • ③ UDTF:输入一行,返回多行(一对多),在 SparkSQL 中没有,因为 Spark 中使用 flatMap 即可实现这个功能

1. 自定义UDF函数:一对一

  • 需求:读取深圳二手房成交数据,对房子的年份进行自定义函数处理,如果没有年份,那么就给默认值1990。
object Case11_SparkUDF {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")val spark = SparkSession.builder().config(conf).getOrCreate()val df: DataFrame = spark.read.format("csv").option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ").option("header", "true").option("multiLine", "true").load("/Volumes/F/MyGitHub/bigdata/spark-demo/src/main/resources/深圳链家二手房成交明细.csv")df.createOrReplaceTempView("house_sale")// 注册UDFspark.udf.register("house_udf", new UDF1[String, String] {val pattern: Pattern = Pattern.compile("^[0-9]*$")override def call(input: String): String = {val matcher = pattern.matcher(input)if (matcher.matches()) inputelse "1990"}}, DataTypes.StringType)// 使用UDFspark.sql("select house_udf(house_age) from house_sale limit 200").show()spark.stop()}
}

2. 自定义UDAF函数:多对一

  • 需求:自定义UDAF函数,读取深圳二手房数据,然后按照楼层进行分组,求取每个楼层的平均成交金额
object Case12_SparkUDAF {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")val spark = SparkSession.builder().config(conf).getOrCreate()val df: DataFrame = spark.read.format("csv").option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ").option("header", "true").option("multiLine", "true").load("/Volumes/F/MyGitHub/bigdata/spark-demo/src/main/resources/深圳链家二手房成交明细.csv")df.createOrReplaceTempView("house_sale")spark.sql("select floor from house_sale limit 30").show()spark.udf.register("udaf", new MyAverage)spark.sql("select floor, udaf(house_sale_money) from house_sale group by floor").show()df.printSchema()spark.stop()}
}
class MyAverage extends UserDefinedAggregateFunction {// 聚合函数输入函数的数据类型override def inputSchema: StructType = StructType(StructField("floor", DoubleType) :: Nil)// 聚合缓冲区中值的数据类型override def bufferSchema: StructType = {StructType(StructField("sum", DoubleType) :: StructField("count", LongType) :: Nil)}// 返回值类型override def dataType: DataType = DoubleType// 对于相同输入是否一直返回相同的输出override def deterministic: Boolean = true// 初始化override def initialize(buffer: MutableAggregationBuffer): Unit = {// 用于存储不同类型的楼房的总成交额buffer(0) = 0D// 用于存储不同类型的楼房的总个数buffer(1) = 0L}// 相同Execute间的数据合并(分区内聚合)override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {if (!input.isNullAt(0)) {buffer(0) = buffer.getDouble(0) + input.getDouble(0)buffer(1) = buffer.getLong(1) + 1}}// 不同Execute间的数据合并(分区外聚合)override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {buffer1(0) = buffer1.getDouble(0) + buffer2.getDouble(0)buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)}// 计算最终结果override def evaluate(buffer: Row): Any = buffer.getDouble(0) / buffer.getLong(1)
}

3. 自定义UDTF函数:一对多

  • 需求:自定义UDTF函数,读取深圳二手房数据,然后将 part_place(部分地区)以空格切分进行展示
object Case13_SparkUDTF {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")val spark = SparkSession.builder().config(conf).getOrCreate()import spark.implicits._val df: DataFrame = spark.read.format("csv").option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ").option("header", "true").option("multiLine", "true").load("/Volumes/F/MyGitHub/bigdata/spark-demo/src/main/resources/深圳链家二手房成交明细.csv")df.createOrReplaceTempView("house_sale")// 注册UDTF算子,这里无法使用sparkSession.udf.register(),注意包全路径spark.sql("CREATE TEMPORARY FUNCTION MySplit as 'com.yw.spark.example.sql.cases.MySplit'")spark.sql("select part_place, MySplit(part_place, ' ') from house_sale limit 50").show()spark.stop()}
}class MySplit extends GenericUDTF {override def initialize(args: Array[ObjectInspector]): StructObjectInspector = {if (args.length != 2) {throw new UDFArgumentLengthException("UserDefinedUDTF takes only two argument")}// 判断第一个参数是不是字符串参数if (args(0).getCategory() != ObjectInspector.Category.PRIMITIVE) {throw new UDFArgumentException("UserDefinedUDTF takes string as a parameter")}// 列名:会被用户传递的覆盖val fieldNames: ArrayList[String] = new ArrayList[String]()fieldNames.add("col1")// 返回列以什么格式输出,这里是string,添加几个就是几个列,和上面的名字个数对应个数val fieldOIs: ArrayList[ObjectInspector] = new ArrayList[ObjectInspector]()fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector)ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs)}override def process(objects: Array[AnyRef]): Unit = {// 获取数据val data: String = objects(0).toString// 获取分隔符val splitKey: String = objects(1).toString// 切分数据val words: Array[String] = data.split(splitKey)// 遍历写出words.foreach(x => {// 将数据放入集合val tmp: Array[String] = new Array[String](1)tmp(0) = xforward(tmp)})}override def close(): Unit = {// 没有流操作}
}
  • 相关代码 github 地址:https://github.com/shouwangyw/bigdata/tree/master/spark-demo/src/main/scala/com/yw/spark/example/sql

大数据高级开发工程师——Spark学习笔记(6)相关推荐

  1. 大数据高级开发工程师——Spark学习笔记(9)

    文章目录 Spark内存计算框架 Spark Streaming Spark Streaming简介 Spark Streaming架构流程 什么是DStream DStream算子操作 1. Tra ...

  2. 大数据高级开发工程师——Spark学习笔记(7)

    文章目录 Spark内存计算框架 Spark SQL SparkSQL架构设计 1. SparkSQL的架构设计实现 2. Catalyst执行过程 SQL 解析阶段 Parser 绑定逻辑计划 An ...

  3. 大数据高级开发工程师——Spark学习笔记(8)

    文章目录 Spark内存计算框架 Spark SQL Spark的动态资源划分 1. Executor动态调整范围? 2. 超时被杀的Executor中持久化数据如何处理? 3. 如何开启Spark的 ...

  4. 大数据高级开发工程师——Spark学习笔记(4)

    文章目录 Spark内存计算框架 Spark Core Spark的shuffle过程 1. HashShuffleManager 未经优化的HashShuffleManager 经过优化的HashS ...

  5. 大数据高级开发工程师——Spark学习笔记(10)

    文章目录 Spark内存计算框架 Spark Streaming Checkpoint 1. checkpoint的基本介绍 2. 什么时候需要使用checkpoint 3. 如何使用checkpoi ...

  6. 大数据高级开发工程师——HBase学习笔记(2)

    文章目录 大数据数据库之HBase HBase架构原理 HBase的数据存储原理 HBase读数据流程 HBase写数据流程 HBase的flush.compact机制 Flush触发条件 1. Me ...

  7. 大数据高级开发工程师——Hive学习笔记(2)

    文章目录 Hive提高篇 Hive的使用 Hive的分桶表 1. 分桶表的原理 2. 分桶表的作用 3. 案例演示 Hive数据导入 1. 直接向表中插入数据(强烈不推荐使用) 2. 通过load加载 ...

  8. 大数据高级开发工程师——HBase学习笔记(3)

    文章目录 Phoenix Phoenix介绍 什么是Phoenix Phoenix底层原理 安装部署 下载安装 配置环境变量 重启hbase集群 验证是否成功 Phoenix使用 批处理方式 命令行方 ...

  9. 大数据高级开发工程师——Hadoop学习笔记(3)

    文章目录 Hadoop进阶篇 HDFS:Hadoop分布式文件系统 NameNode和SecondaryNameNode功能剖析 1. NameNode和SecondaryNameNode解析 2. ...

最新文章

  1. 这两年的人工智能淘金热 真正赚钱的公司是这几家
  2. Python朴素贝叶斯
  3. 单例模式---懒汉模式与饿汉模式
  4. Semaphore、CountDownLatch和CyclicBarrier
  5. 4562亿元教育经费收从哪里花向何处
  6. 从零开始学PowerShell(7)编写一个函数体
  7. linux下gcc的编译过程详解
  8. 8月的最后一天,随意漫笔
  9. PS怎样删除文字成为背景颜色
  10. VLAN隔离技术 — MUX VLAN
  11. Win10 更改不了注册表,重启之后恢复原样
  12. ElasticSearch集群状态异常(Red、Yellow)原因分析
  13. 【R】【课程笔记】04+05 数据预处理+收益率计算
  14. 仿QQ设置头像(拍照/选择照片)
  15. AlexNet网络介绍
  16. AppScan详细使用教程
  17. QT/C++高级编程(上)
  18. nrf51822 --TWI(硬件IIC)
  19. 大数据知识图谱项目——基于知识图谱的医疗知识问答系统(详细讲解及源码)
  20. VMware VirtualCenter Servere服务不能启动的解决方法

热门文章

  1. php 获取酷狗音乐真实地址
  2. iPhone4 Siri
  3. 加密聊天应用依然安全
  4. 微信小程序怎么开发自己的小程序?
  5. 有道身份证查询接口API
  6. Memory cgroup out of memory
  7. 带上萌宠去上班 | IT办公室宠物报告
  8. s开头wifi测试软件,应对5G WiFi 新一代无线测试的挑战
  9. 2001年图灵奖--奥尔-约翰·戴尔和克里斯登·奈加特简介
  10. 通过BOMC制作微码更新介质方法