文章目录

  • 一、Spark对接Hive准备工作
    • 1.1 集群文件下载
    • 1.2 导入依赖
    • 1.3 打开集群metastore服务
  • 二、Spark对接Hive
    • 2.1 查询Hive
    • 2.2 读取MySQL中的数据,存入Hive
    • 2.3 读取Hive数据,导入MySQL
    • 2.4 读取本地MySQL,导入集群MySQL
  • 三、Row类
  • 四、SparkSQL函数
    • 4.1 开窗函数
    • 4.2 UDF函数(用户自定义函数)
    • 4.3 UDAF函数
    • 4.4 性能调优
      • 4.4.1 缓存数据到内存
      • 4.4.2 调优参数
    • 4.5 SparkSQL解决数据倾斜

一、Spark对接Hive准备工作

1.1 集群文件下载

下载hive/conf/hive-site.xml、hadoop/etc/hadoop/core-site.xml、hadoop/etc/hadoop/hdfs-site.xml
添加这些配置到idea中的resources中
我们的代码在读取文件的时候默认是在HDFS集群中读取的。此时如果需要读取本地文件时,则需要在本地路径书写前,添加file:///,让我们的代码读取本地文件

添加完成以后在hive-site.xml中添加以下配置

 <property><!-- Spark中内置了一个Hive,版本是1.x的版本 --><!-- 关闭掉Hive的版本检查 --><name>hive.metastore.schema.verification</name><value>false</value></property>

1.2 导入依赖

pom.xml
注意版本信息

        <dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.2.0</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.18</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.11</artifactId><version>2.2.0</version></dependency><dependency><groupId>org.apache.hive</groupId><artifactId>hive-exec</artifactId><version>1.2.1</version>

1.3 打开集群metastore服务

hive --service metastore &

二、Spark对接Hive

准备工作
提醒:我的windows本地的mysql版本为8.0.17

需要启动集群spark,否则会报错,报错解决方法
如果将master设置为local,则不需要启动spark也可以远程连接
master(“spark://host01:7077”) --表示集群模式,需要yarn环境
master(“local”) --表示本地模式,standalone

    val spark: SparkSession = SparkSession.builder().master("spark://host01:7077").enableHiveSupport().appName("sparkSQL").getOrCreate()

如果遇到权限问题,则添加一条语句

Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=18870, access=WRITE, inode="/user/hive/warehouse/mydb5.db":root:supergroup:drwxr-xr-x
请让这条语句最先执行
System.setProperty("HADOOP_USER_NAME","root")
 System.setProperty("HADOOP_USER_NAME", "root")private val spark: SparkSession = SparkSession.builder().master("local").appName("HiveOnSpark").enableHiveSupport().getOrCreate()

2.1 查询Hive

    @Test def basicOperation1(): Unit = {spark.sql("show databases").show()spark.sql("select * from mydb.account").show()}

2.2 读取MySQL中的数据,存入Hive

应当注意saveAsTableinsertInto的区别

    @Test def mysql2Hive(): Unit ={// 这里不要设置成localhost,会抛异常,// 这是因为配置中配置了spark和hdfs的配置,设置了localhost以后会从这些配置中读取ip// Caused by: com.mysql.cj.exceptions.CJCommunicationsException: Communications link failure//The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.//1. 配置urlval url: String = "jdbc:mysql://10.20.152.64:3306/mydb1?serverTimezone=UTC"val tableName: String = "emp"val properties: Properties = new Properties()properties.setProperty("user", "root")properties.setProperty("password", "123456")// 2. 连接数据库val df: DataFrame = spark.read.jdbc(url, tableName, properties)// saveAsTable: 以Hive表的形式存储,会自动的创建一张Hive的表// insertInto: 向一张已经存在的表中插入数据,需要手动创建表// 向Hive中存储的时候,最好事先将表创建好df.write.saveAsTable("emp1")df.write.insertInto("emp1")}

2.3 读取Hive数据,导入MySQL

    def hive2Mysql(): Unit ={val url: String = "jdbc:mysql://10.20.152.64:3306/mydb1?serverTimezone=UTC"val tableName: String = "emp1"val properties: Properties = new Properties()properties.setProperty("user", "root")properties.setProperty("password","123456")// val frame: DataFrame = spark.sql("select * from emp1")val hiveDF: DataFrame = spark.read.table("emp1")// 将数据导出到MySQLhiveDF.write.jdbc(url, tableName, properties)}

2.4 读取本地MySQL,导入集群MySQL

    @Test def mysql2mysql(): Unit = {val winUrl: String = "jdbc:mysql://10.20.152.64:3306/mydb1?serverTimezone=UTC"val linuxUrl: String = "jdbc:mysql://192.168.10.131:3306/mydb1?serverTimezone=UTC"val tableName: String = "emp1"val properties: Properties = new Properties()properties.setProperty("user", "root")properties.setProperty("password","123456")val frame: DataFrame = spark.read.jdbc(winUrl, tableName, properties)frame.write.jdbc(linuxUrl,tableName,properties)}

三、Row类

DataFrame会将源数据中的表头按照字典顺序排列,那么得到的RDD类型的数据顺序可能和源文件的数据顺序是不一样的

我们可以使用getAs[T](文件字段名)获取到指定的列,在使用该方法时应当指定泛型类型,或者在定义该方法的返回值时将Nothing类型改为相应的类型

account.json

{"account_number":1,"balance":39225,"firstname":"Amber","lastname":"Duke","age":32,"gender":"M","address":"880 Holmes Lane","employer":"Pyrami","email":"amberduke@pyrami.com","city":"Brogan","state":"MD"}
{"account_number":6,"balance":5686,"firstname":"Hattie","lastname":"Bond","age":36,"gender":"M","address":"671 Bristol Street","employer":"Netagy","email":"hattiebond@netagy.com","city":"Dante","state":"MD"}
{"account_number":13,"balance":32838,"firstname":"Nanette","lastname":"Bates","age":28,"gender":"F","address":"789 Madison Street","employer":"Quility","email":"nanettebates@quility.com","city":"Nogal","state":"VA"}
{"account_number":18,"balance":4180,"firstname":"Dale","lastname":"Adams","age":33,"gender":"M","address":"467 Hutchinson Court","employer":"Boink","email":"daleadams@boink.com","city":"Orick","state":"MD"}
{"account_number":20,"balance":16418,"firstname":"Elinor","lastname":"Ratliff","age":36,"gender":"M","address":"282 Kings Place","employer":"Scentric","email":"elinorratliff@scentric.com","city":"Ribera","state":"MD"}
class _04_Row {// DataFrame = DataSet[Row]private val spark: SparkSession = SparkSession.builder().master("local").getOrCreate()private val accountDF: DataFrame = spark.read.json("file:///...\account.json")@Test def rowTest(): Unit = {accountDF.printSchema()val rdd: RDD[Row] = accountDF.rddrdd.foreach(println)// map函数将RDD[Row]类型数据中的每一行进行映射rdd.map(row => {// 通过列的下标,从0开始,获取到指定的列的数据val address: String = row.getString(1)// 通过指定的下标或者的列的名字,获取到这一列的数据,需要指定泛型val employee: String = row.getAs[String]("employer")(employee, address)}).foreach(println)}
}

四、SparkSQL函数

4.1 开窗函数

row_number
rank
dense_rank

案例:统计每一个学科每一个老师的排名,结果在学科内降序排序,求其名次

teacher.txt

php guangkun
php liuneng
php guangkun
php liuneng
php guangkun
php liuneng
php guangkun
php liuneng
php guangkun
php liuneng
php guangkun
php guangkun
php laoli
bigdata laosi
bigdata laozhao
bigdata laosi
bigdata laosi
bigdata laosi
bigdata laosi
bigdata laozhao
bigdata laozhao
javaee laowang
javaee zhangsan
javaee laoqi
javaee zhangsan
javaee zhangsan
javaee zhangsan
javaee zhangsan
javaee zhangsan
javaee zhangsan
javaee zhangsan
object _05_OverFunctions {private val spark: SparkSession = SparkSession.builder().master("local").getOrCreate()import spark.implicits._def main(args: Array[String]): Unit = {// 需求: 统计每一个学科每一个老师的排名,结果在学科内降序排序求出名词val rdd: RDD[String] = spark.sparkContext.textFile("file:///C:\\Users\\luds\\Desktop\\dataSource\\teacher.txt")val df: DataFrame = rdd.map(line => {val parts: Array[String] = line.split("\\s+")(parts(0), parts(1))}).toDF("subject", "name")// 注册一个临时表df.createTempView("teacher")spark.sql("select subject, name, count(*) as count from teacher group by subject, name").createTempView("teacher_tmp")  // subject, name, count// spark.sql("select *, rank() over(partition by subject order by count desc) as rank from teacher_tmp").show()// 需求新: topn,查询每一个学科的前两名val sql:String ="""|select subject, name, rank from|(select *, dense_rank() over(partition by subject order by count desc) as rank from teacher_tmp) tmp|where rank < 3|""".stripMarginspark.sql(sql).show()}
}

4.2 UDF函数(用户自定义函数)

用户自定义函数

当一个函数太长的时候,我们可以在外部自定义一个方法,所以别忘记了怎么将方法转成函数-----使用 空格_ 方式

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}object _06_UDF {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().master("local").getOrCreate()import spark.implicits._// 1. 自定义一个UDF函数val myLength: String => Int = _.length// 2. 注册UDF函数spark.udf.register("myLen", myLength)spark.udf.register("revStr", reverseString _)// 3. 使用UDF函数val rdd: RDD[String] = spark.sparkContext.parallelize(Array("Li Lei", "han Meimei", "tom", "Jerry", "Snoopy", "shuke", "beita"))val df: DataFrame = rdd.toDF("name")df.createTempView("student")spark.sql("select name, myLen(name) as len from student").show()df.selectExpr("name", "myLen(name) as len").show()df.selectExpr("name", "myLen(name) as len", "revStr(name) as rev").show()}/*** 对参数字符串进行大小写翻转* 如果首字母是大写,全部转成小写* 如果首字母是小写,全部转成大写* @param input 输入的字符串* @return 输出的字符串*/def reverseString(input: String): String = {val firstLetter: Char = input.charAt(0)if (Character.isUpperCase(firstLetter)) {input.toLowerCase()} else if (Character.isLowerCase(firstLetter)) {input.toUpperCase()} else {input}}
}

4.3 UDAF函数

如果要自定义UDAF函数,需要继承自一个类UserDefinedAggregateFunction

这个类居然没有子类,要不然就可以看着子类写,好难写!

import org.apache.spark.sql.{Dataset, RelationalGroupedDataset, Row, SparkSession}
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, StructField, StructType}object _07_UDAF {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().master("local").getOrCreate()import spark.implicits._// 1. 注册UDAF函数spark.udf.register("myAvg", new MyAVG)val scores: Array[Score] = Array(Score(1, "zhangsan", "chinese", 98),Score(1, "zhangsan", "math", 88),Score(1, "zhangsan", "english", 89),Score(2, "lisi", "chinese", 99),Score(2, "lisi", "math", 79),Score(2, "lisi", "english", 80),Score(3, "liuneng", "chinese", 88),Score(3, "liuneng", "math", 79),Score(3, "liuneng", "english", 88))val ds: Dataset[Score] = spark.sparkContext.parallelize(scores).toDS()ds.printSchema()ds.groupBy("id", "name").agg("score" -> "avg").show()ds.createTempView("scores")spark.sql("select id, name, round(avg(score), 1) as avg from scores group by id, name").show()spark.sql("select id, name, round(myAvg(score), 1) as avg from scores group by id, name").show()}case class Score(id: Int, name: String, subject: String, score: Int)
}class MyAVG extends UserDefinedAggregateFunction {// 指定需要聚合的字段的类型override def inputSchema: StructType = StructType(Array(StructField("score", IntegerType, nullable = false)))// 指定在计算过程中使用到的临时的字段类型// 在当前的案例中,要计算的是一个平均值,需要有一个和还有一个数量override def bufferSchema: StructType = {StructType(Array(StructField("sum", IntegerType, nullable = false),StructField("count", IntegerType, nullable = false)))}// 最终的计算结果的类型override def dataType: DataType = DoubleType// 检测输入类型是否一致override def deterministic: Boolean = true// 初始化计算过程中,设置的临时变量的值override def initialize(buffer: MutableAggregationBuffer): Unit = {// 在上方的bufferSchema方法中,定义了两个临时的数据,用来计算最终的结果// 这两个临时的数据,存放在一个数组中// 用下标0获取sum,用下标1获取countbuffer.update(0, 0)buffer.update(1, 0)// buffer(0) = 0// buffer(1) = 0}// 一个分区内的数据累计计算override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {buffer.update(0, buffer.getInt(0) + input.getInt(0))        // 累加成绩的和buffer.update(1, buffer.getInt(1) + 1)                      // 成绩数量+1}// 合并多个bufferoverride def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {buffer1.update(0, buffer1.getInt(0) + buffer2.getInt(0))    // 用buffer1里面的第0列,也就是sum列,和buffer2中的第0列进行累加,得到总成绩buffer1.update(1, buffer1.getInt(1) + buffer2.getInt(1))    //}// 返回最终的值override def evaluate(buffer: Row): Any = buffer.getInt(0) / buffer.getInt(1).toDouble
}

4.4 性能调优

4.4.1 缓存数据到内存

Spark SQL可以通过调⽤sqlContext.cacheTable("tableName")或者dataFrame.cache(),将表⽤⼀种柱状格式(an inmemory columnar format)缓存⾄内存中。然后Spark SQL在执⾏查询任务时,只需扫描必需的列,从⽽以减少扫描数据量、提⾼性能。通过缓存数据,Spark SQL还可以⾃动调节压缩,从⽽达到最⼩化内存使⽤率和降低GC压⼒的⽬的。调⽤sqlContext.uncacheTable("tableName")可将缓存的数据移出内存。

可通过两种配置⽅式开启缓存数据功能:

  • 使⽤SQLContext的setConf⽅法
  • 执⾏SQL命令 SET key=value

4.4.2 调优参数

4.5 SparkSQL解决数据倾斜

 * 使用SQL解决数据倾斜的问题,以groupBy为例** 什么是数据倾斜?*      数据分布不均匀,体现在有某几个字段数据量过大,此时在进行聚合的时候数据会重新分布。*      这个过程,其实就是Shuffle的过程,有可能会导致有些分区的数据量过大。** 数据倾斜有什么表现?*      1. 大多数的Task都很快执行完了,但是个别Task需要耗费好长时间。极端的情况下,甚至会失败,导致整个Appliction的失败。*         比如: MR中,Map任务很快执行完了,Reduce会卡在99%*      2. 某一天,程序忽然出现OOM了。** 解决数据倾斜:*      需要对Task重新分配,打散原来的Task分配,打散数据。*      可以给数据添加一个随机的前缀,对这些数据进行聚合操作。*      之后,再去掉随机的前缀,再聚合。(两个阶段的聚合)

案例:制造一个数据倾斜情形,并将其解决

object _08_Leap {// wordcount:def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().master("spark://qianfeng01:7077").getOrCreate()import spark.implicits._// 准备倾斜的数据val list: List[String] = List("hello hello hello hello hello hello hello hello hello hello","hello hello hello world hello hello hello hello hello hello","hello hello hello hello ni hello hello world hello hello hello","hello hello hello hello hello world hello hello hello hello")val df: DataFrame = list.toDF("line")df.createTempView("words")// 1. 倾斜的情况                                          println("1. 数据的倾斜")val sql: String ="""|select|word, count(1)|from|(select explode(split(line, "\\s+")) word from words) tmp|group by word|""".stripMarginspark.sql(sql).show()// 2. 添加随机前缀                                           println("2. 添加随机前缀")spark.udf.register("prefix", randomPrefix _)// val sql2: String =//     """//       |select//       |prefix(word) prefixed_word//       |from//       |(select explode(split(line, "\\s+")) word from words) tmp//       |""".stripMarginval sql2: String ="""|select|concat_ws("_", cast(floor(rand() * 3) as string), word) as prefixed_word|from|(select explode(split(line, "\\s+")) word from words) tmp|""".stripMarginspark.sql(sql2).show()// 3. 对添加前缀之后的单词进行聚合                         println("3. 对添加前缀之后的单词进行聚合")val sql3: String ="""|select|concat_ws("_", cast(floor(rand() * 3) as string), word) as prefixed_word,|count(1) as prefixed_count|from|(select explode(split(line, "\\s+")) word from words) tmp|group by prefixed_word|""".stripMarginspark.sql(sql3).show()// 4. 去除之前添加的随机前缀                               println("4. 去除之前添加的随机前缀")val sql4: String ="""|select|substr(prefixed_word, instr(prefixed_word, "_") + 1) word,|prefixed_count as count|from|(select|concat_ws("_", cast(floor(rand() * 3) as string), word) as prefixed_word,|count(1) as prefixed_count|from|(select explode(split(line, "\\s+")) word from words) tmp|group by prefixed_word) tmp2|""".stripMarginspark.sql(sql4).show()// 5. 聚合                                                 println("5. 最终的聚合")val sql5: String ="""|select|substr(prefixed_word, instr(prefixed_word, "_") + 1) word,|sum(prefixed_count) as count|from|(select|concat_ws("_", cast(floor(rand() * 3) as string), word) as prefixed_word,|count(1) as prefixed_count|from|(select explode(split(line, "\\s+")) word from words) tmp|group by prefixed_word) tmp2|group by word|""".stripMarginspark.sql(sql5).show()}/*** UDF: 给一个单词添加一个随机的前缀* @param word* @return*/def randomPrefix(word: String): String = {val num: Int = Random.nextInt(3)num + "_" + word}
}

【大数据开发】SparkSQL——Spark对接Hive、Row类、SparkSQL函数、UDF函数(用户自定义函数)、UDAF函数、性能调优、SparkSQL解决数据倾斜相关推荐

  1. jvm性能调优实战 -59数据同步系统频繁OOM内存溢出故障排查

    文章目录 背景 从现象看到本质 通过jstat来确认我们的推断 通过MAT找到占用内存最大的对象 背景 首先说一下案例背景,线上有一个数据同步系统,是专门负责从另外一个系统去同步数据的,简单来说,另外 ...

  2. jvm性能调优实战 -57数据日志分析系统的OOM问题排查

    文章目录 Pre Case 初步分析内存快照 功夫在诗外:问题在JVM参数上 分析一下JVM的GC日志 分析一下JVM运行时内存使用模型 优化第一步:增加堆内存大小 优化第二步:改写代码 总结 Pre ...

  3. 大数据技术之_19_Spark学习_07_Spark 性能调优 + 数据倾斜调优 + 运行资源调优 + 程序开发调优 + Shuffle 调优 + GC 调优 + Spark 企业应用案例

    大数据技术之_19_Spark学习_07 第1章 Spark 性能优化 1.1 调优基本原则 1.1.1 基本概念和原则 1.1.2 性能监控方式 1.1.3 调优要点 1.2 数据倾斜优化 1.2. ...

  4. 大数据开发复习Spark篇

    11.spark 11.1.spark介绍 Apache Spark是用于大规模数据处理的统一分析计算引擎 Spark基于内存计算,提高了在大数据环境下数据处理的实时性,同时保证了高容错性和高可伸缩性 ...

  5. 大数据培训:Spark性能调优与参数配置

    Spark性能调优-基础篇 众所周知,正确的参数配置对提升Spark的使用效率具有极大助力,帮助相关数据开发.分析人员更高效地使用Spark进行离线批处理和SQL报表分析等作业. 推荐参数配置模板如下 ...

  6. 王家林大咖新书预发布:清华大学出版社即将出版《Spark大数据商业实战三部曲:内核解密|商业案例|性能调优》第二版 及《企业级AI技术内幕讲解》

    王家林大咖新书预发布:清华大学出版社即将出版<Spark大数据商业实战三部曲:内核解密|商业案例|性能调优>第二版,新书在第一版的基础上以Spark 2.4.3版本全面更新源码,并以Ten ...

  7. 《Spark商业案例与性能调优实战100课》第6课:商业案例之通过Spark SQL实现大数据电影用户行为分析

    <Spark商业案例与性能调优实战100课>第6课:商业案例之通过Spark SQL实现大数据电影用户行为分析 package com.dt.spark.sparksqlimport or ...

  8. Spark 性能常规性能调优广播大变量_大数据培训

    常规性能调优四:广播大变量 默认情况下,task中的算子中如果使用了外部的变量,每个task都会获取一份变量的复本,这就造成了内存的极大消耗.一方面,如果后续对RDD进行持久化,可能就无法将RDD数据 ...

  9. Spark商业案例与性能调优实战100课》第2课:商业案例之通过RDD实现分析大数据电影点评系统中电影流行度分析

    Spark商业案例与性能调优实战100课>第2课:商业案例之通过RDD实现分析大数据电影点评系统中电影流行度分析 package com.dt.spark.coresimport org.apa ...

最新文章

  1. 这可能是 π 被黑得最惨的一次
  2. JQuery元素选择器:和||,逻辑选择
  3. 重没想过好好走下去----我的IT成长路
  4. Gradle Goodness: Set Java Compiler Encoding--转载
  5. 从RocketMQ看长轮询(Long Polling)
  6. PHP发起POST DELETE GET POST 请求
  7. 计算机竞赛游戏探险岛,冒险岛2五大全新团本综合分析
  8. 智慧气象机器_智慧电缆隧道火热建设中 传感器+机器人成标配
  9. 牛客练习赛15A-吉姆的运算式(Python正则表达式瞎搞)
  10. Mysql中key与index区别
  11. nlogn最长单调递增
  12. 深度学习自学(十):人脸检测android端-JNI调试调用底层检测识别库
  13. 利用Minst数据集训练原生GAN网络
  14. 云计算平台能够提供计算服务器,云计算平台提供了什么服务器
  15. Skiller V3
  16. java开发加入购物车功能_java web开发——购物车功能实现
  17. 计算机专业装win几,低配电脑装win10还是win7系统比较合适
  18. python3 cookie_Python3标准库:http.cookies HTTP cookie
  19. 超简单Photoshop2019安装与教程(一步成功)
  20. 陀螺财经携百家产业合作伙伴推出“史上最强产业区块链案例集”

热门文章

  1. nginx 如何查看访问ip和对应访问路径
  2. STM32 USB组合设备HID+MIDI
  3. Annoy算法简单介绍
  4. 【哈佛公开课】积极心理学笔记-06乐观主义(下)
  5. Android状态栏下拉处理
  6. 一个简单的例子解释什么是量子计算机
  7. 计算机机原理是什么意思,计算机工作原理及与工控机的区别
  8. “景驰科技杯”2018年华南理工大学程序设计竞赛 C Youhane's Undergraduate Thesis (大模拟)
  9. 《FLUENT 14.0超级学习手册》——第1章 流体力学与计算流体力学基础1.1 流体力学基础...
  10. Android技术内幕.系统卷