目录

Spark简介

1 什么是Spark

2 Spark特点

3 Spark分布式环境安装

3.1 Spark HA的环境安装

3.2 动态增删一个worker节点到集群

4 Spark核心概念

5 Spark案例

5.2  Master URL

5.3 spark日志的管理

5.4 WordCount案例程序的执行过程

6 Spark作业运行架构图(standalone模式)

7 RDD操作

7.1 RDD初始化

7.2 RDD操作

7.3 transformation转换算子

7.3 action行动算子

8 高级排序

8.1 普通的排序

8.2 二次排序

8.3 分组TopN

8.4 优化分组TopN

9 持久化操作

9.1 为什要持久化

9.2 如何进行持久化

9.3 持久化策略

9.4 如何选择持久化策略

10 共享变量

10.1 概述

10.2 broadcast广播变量

10.3 accumulator累加器

10.4 自定义累加器


SparkCore基础

什么是Spark

Spark是一个通用的可扩展的处理海量数据集的计算引擎。

Spark集成离线计算,实时计算,SQL查询,机器学习,图计算为一体的通用的计算框架。

2 Spark特点

(1)快:相比给予MR,官方表明基于内存计算spark要快mr100倍,基于磁盘计算spark要快mr10倍

快的原因:①基于内存计算,②计算和数据的分离 ③基于DAGScheduler的计算划分 ④只有一次的Shuffle输出操作

(2)易用:Spark提供超过80多个高阶算子函数,来支持对数据集的各种各样的计算,使用的时候,可以使用java、scala、python、R,非常灵活易用。

(3)通用:在一个项目中,既可以使用离线计算,也可以使用其他比如,SQL查询,机器学习,图计算等等,而这时Spark最强大的优势

(4)到处运行

3 Spark分布式环境安装

(1)下载解压,添加环境变量

(2)修改配置文件

spark的配置文件,在$SPARK_HOME/conf目录下

①拷贝slaves和spark-env.sh文件 :cp slaves.template slaves和cp spark-env.sh.template spark-env.sh

②修改slaves配置,配置spark的从节点的主机名,spark中的从节点叫做worker,主节点叫做Master。vim slaves

bigdata02
bigdata03

③修改spark-env.sh文件,添加如下内容

export JAVA_HOME=/opt/jdk
export SCALA_HOME=/home/refuel/opt/mouldle/scala
export SPARK_MASTER_IP=bigdata01
export SPARK_MASTER_PORT=7077 ##rpc通信端口,类似hdfs的9000端口,不是50070
export SPARK_WORKER_CORES=2
export SPARK_WORKER_INSTANCES=1
export SPARK_WORKER_MEMORY=1g
export HADOOP_CONF_DIR=/home/refuel/opt/mouldle/hadoop/etc/hadoop

(3)同步spark到其它节点中

3.1 Spark HA的环境安装

有两种方式解决单点故障,一种基于文件系统FileSystem(生产中不用),还有一种基于Zookeeper(使用)。 配置基于Zookeeper的一个ha是非常简单的,只需要在spark-env.sh中添加一句话即可。

export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=bigdata01:2181,bigdata02:2181,bigdata03:2181 -Dspark.deploy.zookeeper.dir=/spark"

spark.deploy.recoveryMode设置成 ZOOKEEPER spark.deploy.zookeeper.urlZooKeeper URL spark.deploy.zookeeper.dir ZooKeeper 保存恢复状态的目录,缺省为 /spark。因为ha不确定master在bigdata01上面启动,所以将export SPARK_MASTER_IP=bigdata01和export SPARK_MASTER_PORT=7077注释掉

3.2 动态增删一个worker节点到集群

(1)上线一个节点:不需要在现有集群的配置上做任何修改,只需要准备一台worker机器即可,可和之前的worker的配置相同。

(2)下线一个节点:kill或者stop-slave.sh都可以

4 Spark核心概念

ClusterManager:在Standalone(依托于spark集群本身)模式中即为Master(主节点),控制整个集群,监控Worker。在YARN模式中为资源管理器ResourceManager。

Worker:从节点,负责控制计算节点,启动Executor。在YARN模式中为NodeManager,负责计算节点的控制,启动的进程叫Container。

Driver:运行Application的main()函数并创建SparkContext(是spark中最重要的一个概念,是spark编程的入口,作用相当于mr中的Job)。

Executor:执行器,在worker node上执行任务的组件、用于启动线程池运行任务。每个Application拥有独立的一组Executors。

SparkContext:整个应用的上下文,控制应用的生命周期,是spark编程的入口。

RDD:弹性式分布式数据集。Spark的基本计算单元,一组RDD可形成执行的有向无环图RDD Graph。

DAGScheduler:实现将Spark作业分解成一到多个Stage,每个Stage根据RDD的Partition个数决定Task的个数,然后生成相应的Task set放到TaskScheduler中。 DAGScheduler就是Spark的大脑,中枢神经

TaskScheduler:将任务(Task)分发给Executor执行。

Stage:一个Spark作业一般包含一到多个Stage。

Task :一个Stage包含一到多个Task,通过多个Task实现并行运行的功能。 task的个数由rdd的partition分区决定

Transformations:转换(Transformations) (如:map, filter, groupBy, join等),Transformations操作是Lazy的,也就是说从一个RDD转换生成另一个RDD的操作不是马上执行,Spark在遇到Transformations操作时只会记录需要这样的操作,并不会去执行,需要等到有Actions操作的时候才会真正启动计算过程进行计算。

Actions:操作/行动(Actions)算子 (如:count, collect, foreach等),Actions操作会返回结果或把RDD数据写到存储系统中。Actions是触发Spark启动计算的动因。

SparkEnv:线程级别的上下文,存储运行时的重要组件的引用。SparkEnv内创建并包含如下一些重要组件的引用。

MapOutPutTracker:负责Shuffle元信息的存储

BroadcastManager:负责广播变量的控制与元信息的存储。

BlockManager:负责存储管理、创建和查找块。

MetricsSystem:监控运行时性能指标信息。

SparkConf:负责存储配置信息。作用相当于hadoop中的Configuration。

5 Spark案例

pom文件的依赖配置如下

<dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency><!-- scala去除,因为spark-core包里有了scala的依赖<dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>2.11.8</version></dependency>  --><!-- sparkcore --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.2.2</version></dependency><!-- sparksql --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.2.2</version></dependency><!-- sparkstreaming --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>2.2.2</version></dependency></dependencies>

注意:入口类为SparkContext,java版本的是JavaSparkContext,scala的版本就是SparkContext;SparkSQL的入口有SQLContext、HiveContext;SparkStreaming的入口又是StreamingContext。

java版本

public class JavaSparkWordCountOps {public static void main(String[] args) {//step 1、创建编程入口类SparkConf conf = new SparkConf();conf.setMaster("local[*]");conf.setAppName(JavaSparkWordCountOps.class.getSimpleName());JavaSparkContext jsc = new JavaSparkContext(conf);//step 2、加载外部数据 形成spark中的计算的编程模型RDDJavaRDD<String> linesRDD = jsc.textFile("E:/hello.txt");// step 3、对加载的数据进行各种业务逻辑操作---转换操作transformationJavaRDD<String> wordsRDD = linesRDD.flatMap(new FlatMapFunction<String, String>() {public Iterator<String> call(String line) throws Exception {return Arrays.asList(line.split("\\s+")).iterator();}});//JavaRDD<String> wordsRDD = linesRDD.flatMap(line -> Arrays.asList(line.split("\\s+")).iterator());System.out.println("-----经过拆分之后的rdd数据----");wordsRDD.foreach(new VoidFunction<String>() {public void call(String s) throws Exception {System.out.println(s);}});System.out.println("-----word拼装成键值对----");JavaPairRDD<String, Integer> pairsRDD = wordsRDD.mapToPair(new PairFunction<String, String, Integer>() {public Tuple2<String, Integer> call(String word) throws Exception {return new Tuple2<String, Integer>(word, 1);}});//JavaPairRDD<String, Integer> pairsRDD = wordsRDD.mapToPair(word -> new Tuple2<String, Integer>(word, 1));pairsRDD.foreach(new VoidFunction<Tuple2<String, Integer>>() {public void call(Tuple2<String, Integer> t) throws Exception {System.out.println(t._1 + "--->" + t._2);}});System.out.println("------按照相同的key,统计value--------------");JavaPairRDD<String, Integer> retRDD = pairsRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {public Integer call(Integer v1, Integer v2) throws Exception {int i = 1 / 0; //印证出这些转换的transformation算子是懒加载的,需要action的触发return v1 + v2;}});//JavaPairRDD<String, Integer> retRDD = pairsRDD.reduceByKey((v1, v2) -> v1 + v2);retRDD.foreach(new VoidFunction<Tuple2<String, Integer>>() {public void call(Tuple2<String, Integer> t) throws Exception {System.out.println(t._1 + "--->" + t._2);}});}
}

scala版本

object SparkWordCountOps {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("SparkWordCount")val sc = new SparkContext(conf)//load data from fileval linesRDD:RDD[String] = sc.textFile("E:/hello.txt")val wordsRDD:RDD[String] = linesRDD.flatMap(line => line.split("\\s+"))val pairsRDD:RDD[(String, Int)] = wordsRDD.map(word => (word, 1))val ret = pairsRDD.reduceByKey((v1, v2) => v1 + v2)ret.foreach(t => println(t._1 + "---" + t._2))sc.stop()}
}

5.2  Master URL

master-url通过sparkConf.setMaster来完成。代表的是spark作业的执行方式,或者指定的spark程序的cluster-manager的类型。

master 含义
local 程序在本地运行,同时为本地程序提供一个线程来处理
local[M] 程序在本地运行,同时为本地程序分配M个工作线程来处理
local[*] 程序在本地运行,同时为本地程序分配机器可用的CPU core的个数工作线程来处理
local[M, N] 程序在本地运行,同时为本地程序分配M个工作线程来处理,如果提交程序失败,会进行最多N次的重试
spark://ip:port 基于standalone的模式运行,提交撑到ip对应的master上运行
spark://ip1:port1,ip2:port2 基于standalone的ha模式运行,提交撑到ip对应的master上运行
yarn/启动脚本中的deploy-mode配置为cluster 基于yarn模式的cluster方式运行,SparkContext的创建在NodeManager上面,在yarn集群中
yarn/启动脚本中的deploy-mode配置为client 基于yarn模式的client方式运行,SparkContext的创建在提交程序的那台机器上面,不在yarn集群中

5.3 spark日志的管理

(1)全局管理:项目classpath下面引入log4j.properties配置文件进行管理

# 基本日志输出级别为INFO,输出目的地为console
log4j.rootCategory=INFO, consolelog4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n# 输出配置的是spark提供的日志级别
log4j.logger.org.spark_project.jetty=INFO
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR

(2)局部管理 :就是在当前类中进行日志的管理。

import org.apache.log4j.{Level, Logger}
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
Logger.getLogger("org.spark_project").setLevel(Level.WARN)

5.4 WordCount案例程序的执行过程

当deploy-mode为client模式的时候,driver就在我们提交作业的本机,而spark的作业对应的executor在spark集群中运行。

在上图中可以发现相邻两个rdd之间有依赖关系,依赖分为宽依赖和窄依赖。

窄依赖:rdd中的partition中的数据只依赖于父rdd中的一个partition或者常数个partition。常见的窄依赖操作有:flatMap,map,filter,coalesce等

宽依赖:rdd中的partition中的数据只依赖于父rdd中的所有partition。常见的宽依赖操作有reduceByKey,groupByKey,join,sortByKey,repartition等

rdd和rdd之间的依赖关系构成了一个链条,这个链条称之为lineage(血缘)

6 Spark作业运行架构图(standalone模式)

①启动spark集群:通过spark的start-all.sh脚本启动spark集群,启动了对应的Master进程和Worker进程

②Worker启动之后向Master进程发送注册信息

③Worker向Master注册成功之后,worker要不断的向master发送心跳包,去监听主节点是否存在

④Driver向Spark集群提交作业,就是向Master提交作业,申请运行资源

⑤Master收到Driver的提交请求,向Worker节点指派相应的作业任务,就是在对应的Worker节点上启动对应的executor进程

⑥Worker节点接收到Master节点启动executor任务之后,就启动对应的executor进程,向master汇报成功启动,可以接收任务

⑦executor进程启动之后,就像Driver进程进行反向注册,告诉Driver谁可以执行spark任务

⑧Driver接收到注册之后,就知道向谁发送spark作业,那么这样在spark集群中就有一组独立的executor进程为该Driver服务

⑨DAGScheduler根据编写的spark作业逻辑,将spark作业分成若干个阶段Stage(基于Spark的transformation里是否有shuffle Dependency),然后为每一个阶段组装一批task组成taskSet(task里面包含了序列化之后的我们编写的spark transformation),然后将这些DAGScheduler组装好的taskSet,交给taskScheduler,由taskScheduler将这些任务发给对应的executor

⑩executor进程接收到了Driver发送过来的taskSet之后,进行反序列化,然后将这些task封装进一个叫tasksunner的线程中,然后放到本地线程池中调度我们的作业的执行。

7 RDD操作

7.1 RDD初始化

RDD的初始化,原生api提供的2中创建方式:

①是读取文件textFile

②加载一个scala集合parallelize。

当然,也可以通过transformation算子来创建的RDD。

7.2 RDD操作

RDD操作算子的分类,基本上分为两类:transformation和action,当然更加细致的分,可以分为输入算子,转换算子,缓存算子,行动算子。

输入:在Spark程序运行中,数据从外部数据空间(如分布式存储:textFile读取HDFS等,parallelize方法输入Scala集合或数据)输入Spark,数据进入Spark运行时数据空间,转化为Spark中的数据块,通过BlockManager进行管理。

运行:在Spark数据输入形成RDD后便可以通过变换算子,如filter等,对数据进行操作并将RDD转化为新的RDD,通过Action算子,触发Spark提交作业。 如果数据需要复用,可以通过Cache算子,将数据缓存到内存。

输出:程序运行结束数据会输出Spark运行时空间,存储到分布式存储中(如saveAsTextFile输出到HDFS),或Scala数据或集合中(collect输出到Scala集合,count返回Scala int型数据)。

7.3 transformation转换算子

(1)map

rdd.map(func):RDD,对rdd集合中的每一个元素,都作用一次该func函数,之后返回值为生成元素构成的一个新的RDD。

(2)flatMap

rdd.flatMap(func):RDD ==>rdd集合中的每一个元素,都要作用func函数,返回0到多个新的元素,这些新的元素共同构成一个新的RDD。

map操作是一个一到一的操作,flatMap操作是一个1到多的操作

(3)filter

rdd.filter(func):RDD ==> 对rdd中的每一个元素操作func函数,该函数的返回值为Boolean类型,保留返回值为true的元素,共同构成一个新的RDD,过滤掉哪些返回值为false的元素。

(4)sample

rdd.sample(withReplacement:Boolean, fraction:Double [, seed:Long]):RDD ===> 抽样,sample抽样不是一个精确的抽样。一个非常重要的作用,就是来看rdd中数据的分布情况,根据数据分布的情况,进行各种调优与优化,防止数据倾斜。

withReplacement:抽样的方式,true有放回抽样, false为无返回抽样

fraction: 抽样比例,取值范围就是0~1

seed: 抽样的随机数种子,有默认值,通常也不需要传值

(5)union

rdd1.union(rdd2),联合rdd1和rdd2中的数据,形成一个新的rdd,其作用相当于sql中的union all。

(6)join

join就是sql中的inner join。

注意:要想两个RDD进行连接,那么这两个rdd的数据格式,必须是k-v键值对的,其中的k就是关联的条件,也就是sql中的on连接条件。

RDD1的类型[K, V], RDD2的类型[K, W]

内连接 :val joinedRDD:RDD[(K, (V, W))] = rdd1.join(rdd2)

左外连接 :val leftJoinedRDD:RDD[(K, (V, Option[W]))] = rdd1.leftOuterJoin(rdd2)

右外连接 :val rightJoinedRDD:RDD[(K, (Option[V], W))] = rdd1.rightOuterJoin(rdd2)

全连接 :val fullJoinedRDD:RDD[(K, (Option[V], Option[W]))] = rdd1.fullOuterJoin(rdd2)

(7)groupByKey

rdd.groupByKey(),按照key进行分组,如果原始rdd的类型时[(K, V)] ,那必然其结果就肯定[(K, Iterable[V])],是一个shuffle dependency宽依赖shuffle操作,但是这个groupByKey不建议在工作过程中使用,除非非要用,因为groupByKey没有本地预聚合,性能较差,一般我们能用下面的reduceByKey或者combineByKey或者aggregateByKey代替就尽量代替。

(8)reduceByKey

rdd.reduceByKey(func:(V, V) => V):RDD[(K, V)] :在scala集合中学习过一个reduce(func:(W, W) => W)操作,是一个聚合操作,这里的reduceByKey按照就理解为在groupByKey(按照key进行分组[(K, Iterable[V])])的基础上,对每一个key对应的Iterable[V]执行reduce操作。

同时reduceByKey操作会有一个本地预聚合的操作,所以是一个shuffle dependency宽依赖shuffle操作。

(9)sortByKey

按照key进行排序

(10)combineByKey

这是spark最底层的聚合算子之一,按照key进行各种各样的聚合操作,spark提供的很多高阶算子,都是基于该算子实现的。

def combineByKey[C](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C): RDD[(K, C)] =  {...
}

createCombiner: V => C, 相同的Key在分区中会调用一次该函数,用于创建聚合之后的类型,为了和后续Key相同的数据进行聚合;mergeValue: (C, V) => C, 在相同分区中基于上述createCombiner基础之上的局部聚合;mergeCombiners: (C, C) => C) 将每个分区中相同key聚合的结果在分区间进行全局聚合

(11)aggregateByKey

aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]和combineByKey都是一个相对底层的聚合算子,可以完成系统没有提供的其它操作,相当于自定义算子。aggregateByKey底层使用combineByKeyWithClassTag来实现,所以本质上二者没啥区别,区别就在于使用时的选择而已。

aggregateByKey更为简单,但是如果聚合前后数据类型不一致,建议使用combineByKey;同时如果初始化操作较为复杂,也建议使用combineByKey。

7.3 action行动算子

这些算子都是在rdd上的分区partition上面执行的,不是在driver本地执行。

(1)foreach

用于遍历RDD,将函数f应用于每一个元素,无返回值(action算子)

(2)count

统计该rdd中元素的个数

(3)take(n)

返回该rdd中的前N个元素,如果该rdd的数据是有序的,那么take(n)就是Top N

(4)first

take(n)中比较特殊的一个take(1)(0)

(5)collect

将分布在集群中的各个partition中的数据拉回到driver中,进行统一的处理;但是这个算子有很大的风险存在,第一,driver内存压力很大,第二数据在网络中大规模的传输,效率很低;所以一般不建议使用,如果非要用,请先执行filter。

(6)reduce

reduce是一个action操作,reduceByKey是一个transformation。reduce对一个rdd执行聚合操作,并返回结果,结果是一个值。

(7)countByKey

统计key出现的次数

(8)saveAsTextFile

保存到文件,本质上是saveAsHadoopFile[TextOutputFormat[NullWritable, Text]]

(9)saveAsObjectFile和saveAsSequenceFile

saveAsObjectFile本质上是saveAsSequenceFile

(10)saveAsHadoopFile和saveAsNewAPIHadoopFile

这二者的主要区别就是OutputFormat的区别,接口org.apache.hadoop.mapred.OutputFormat,

抽象类org.apache.hadoop.mapreduce.OutputFormat   所以saveAshadoopFile使用的是接口OutputFormat,saveAsNewAPIHadoopFile使用的抽象类OutputFormat,建议使用后者。

8 高级排序

8.1 普通的排序

(1)sortByKey

object SortByKeyOps {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("SortByKeyOps").setMaster("local[2]")val sc = new SparkContext(conf)//sortByKey 数据类型为k-v,且是按照key进行排序val stuRDD:RDD[Student] = sc.parallelize(List(Student(1, "refuel01", 19, 168),Student(2, "refuel02", 25, 175),Student(3, "refuel03", 25, 176),Student(4, "refuel04", 16, 180),Student(5, "refuel05", 18, 168.5)))//按照学生身高进行降序排序val height2Stu = stuRDD.map(stu => (stu.height, stu))//注意:sortByKey是局部排序,不是全局排序,如果要进行全局排序,// 必须将所有的数据都拉取到一台机器上面才可以val sorted = height2Stu.sortByKey(ascending = false, numPartitions = 1)sorted.foreach{case (height, stu) => println(stu)}sc.stop()}
}case class Student(id:Int, name:String, age:Int, height:Double)

(2)sortBy

这个sortBy其实使用sortByKey来实现,但是比sortByKey更加灵活,因为sortByKey只能应用在k-v数据格式上,而这个sortBy可以应在非k-v键值对的数据格式上面。

val sortedBy = stuRDD.sortBy(stu => stu.height,ascending = true,numPartitions = 1)(new Ordering[Double](){override def compare(x: Double, y: Double) = y.compareTo(x)},ClassTag.Double.asInstanceOf[ClassTag[Double]])
sortedBy.foreach(println)

sortedBy的操作,除了正常的升序,分区个数以外,还需需要传递一个将原始数据类型,提取其中用于排序的字段;并且提供用于比较的方式,以及在运行时的数据类型ClassTag标记型trait。

(3)takeOrdered

takeOrdered也是对rdd进行排序,但是和上述的sortByKey和sortBy相比较,takeOrdered是一个action操作,返回值为一个集合,而前两者为transformation,返回值为rdd。如果我们想在driver中获取排序之后的结果,那么建议使用takeOrdered,因为该操作边排序边返回。其实是take和sortBy的一个结合体。

takeOrdered(n),获取排序之后的n条记录

//先按照身高降序排序,身高相对按照年龄升序排 ---> 二次排序
stuRDD.takeOrdered(3)(new Ordering[Student](){override def compare(x: Student, y: Student) = {var ret = y.height.compareTo(x.height)if(ret == 0) {ret = x.age.compareTo(y.age)}ret}
}).foreach(println)

8.2 二次排序

所谓二次排序,指的是排序字段不唯一,有多个,共同排序

object SortByKeyOps {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("SortByKeyOps").setMaster("local[2]")val sc = new SparkContext(conf)//sortByKey 数据类型为k-v,且是按照key进行排序val personRDD:RDD[Student] = sc.parallelize(List(Student(1, "refuel01", 19, 168),Student(2, "refuel02", 25, 175),Student(3, "refuel03", 25, 176),Student(4, "refuel04", 16, 180),Student(5, "refuel05", 18, 168.5)))personRDD.map(stu => (stu, null)).sortByKey(true, 1).foreach(p => println(p._1))sc.stop()}
}case class Person(id:Int, name:String, age:Int, height:Double) extends Ordered[Person] {//对学生的身高和年龄依次排序override def compare(that: Person) = {var ret = this.height.compareTo(that.height)if(ret == 0) {ret = this.age.compareTo(that.age)}ret}
}

8.3 分组TopN

在分组的情况之下,获取每个组内的TopN数据

object GroupSortTopN {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("GroupSortTopN").setMaster("local[2]")val sc = new SparkContext(conf)val lines = sc.textFile("file:/E:/data/topn.txt")//按照科目进行排序val course2Info:RDD[(String, String)] = lines.map(line => {val spaceIndex = line.indexOf(" ")val course = line.substring(0, spaceIndex)val info = line.substring(spaceIndex + 1)(course, info)})//按照科目排序,指的是科目内排序,不是科目间的排序,所以需要把每个科目的信息汇总val course2Infos:RDD[(String, Iterable[String])] = course2Info.groupByKey()//按照key进行分组//分组内的排序val sorted:RDD[(String, mutable.TreeSet[String])] = course2Infos.map{case (course, infos) => {val topN = mutable.TreeSet[String]()(new Ordering[String](){override def compare(x: String, y: String) = {val xScore = x.split("\\s+")(1)val yScore = y.split("\\s+")(1)yScore.compareTo(xScore)}})for(info <- infos) {topN.add(info)}(course, topN.take(3))}}sorted.foreach(println)sc.stop()}
}

8.4 优化分组TopN

上述在编码过程当中使用groupByKey,我们说着这个算子的性能很差,因为没有本地预聚合,所以应该在开发过程当中尽量避免使用,能用其它代替就代替。

(1)使用combineByKey优化1

object GroupSortByCombineByKeyTopN {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("GroupSortByCombineByKeyTopN").setMaster("local[2]")val sc = new SparkContext(conf)val lines = sc.textFile("file:/E:/data/topn.txt")//按照科目进行排序val course2Info:RDD[(String, String)] = lines.map(line => {val spaceIndex = line.indexOf(" ")val course = line.substring(0, spaceIndex)val info = line.substring(spaceIndex + 1)(course, info)})//按照科目排序,指的是科目内排序,不是科目间的排序,所以需要把每个科目的信息汇总val course2Infos= course2Info.combineByKey(createCombiner, mergeValue, mergeCombiners)//分组内的排序val sorted:RDD[(String, mutable.TreeSet[String])] = course2Infos.map{case (course, infos) => {val topN = mutable.TreeSet[String]()(new Ordering[String](){override def compare(x: String, y: String) = {val xScore = x.split("\\s+")(1)val yScore = y.split("\\s+")(1)yScore.compareTo(xScore)}})for(info <- infos) {topN.add(info)}(course, topN.take(3))}}sorted.foreach(println)sc.stop()}def createCombiner(info:String): ArrayBuffer[String] = {val ab = new ArrayBuffer[String]()ab.append(info)ab}def mergeValue(ab:ArrayBuffer[String], info:String): ArrayBuffer[String] = {ab.append(info)ab}def mergeCombiners(ab:ArrayBuffer[String], ab1: ArrayBuffer[String]): ArrayBuffer[String] = {ab.++:(ab1)}
}

此时这种写法和上面的groupByKey性能一模一样,没有任何的优化。

(2)使用combineByKey的优化2

object GroupSortByCombineByKeyTopN {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("GroupSortByCombineByKeyTopN").setMaster("local[2]")val sc = new SparkContext(conf)val lines = sc.textFile("file:/E:/data/spark/topn.txt")//按照科目进行排序val course2Info:RDD[(String, String)] = lines.map(line => {val spaceIndex = line.indexOf(" ")val course = line.substring(0, spaceIndex)val info = line.substring(spaceIndex + 1)(course, info)})//按照科目排序,指的是科目内排序,不是科目间的排序,所以需要把每个科目的信息汇总val sorted= course2Info.combineByKey(createCombiner, mergeValue, mergeCombiners)sorted.foreach(println)sc.stop()}def createCombiner(info:String): mutable.TreeSet[String] = {val ts = new mutable.TreeSet[String]()(new Ordering[String](){override def compare(x: String, y: String) = {val xScore = x.split("\\s+")(1)val yScore = y.split("\\s+")(1)yScore.compareTo(xScore)}})ts.add(info)ts}def mergeValue(ab:mutable.TreeSet[String], info:String): mutable.TreeSet[String] = {ab.add(info)if(ab.size > 3) {ab.take(3)} else {ab}}def mergeCombiners(ab:mutable.TreeSet[String], ab1: mutable.TreeSet[String]): mutable.TreeSet[String] = {for (info <- ab1) {ab.add(info)}if(ab.size > 3) {ab.take(3)} else {ab}}
}

9 持久化操作

9.1 为什要持久化

一个RDD如果被多次操作,为了提交后续的执行效率,我们建议对该RDD进行持久化操作。

9.2 如何进行持久化

rdd.persist()/cache()就完成了rdd的持久化操作,我们可以将该rdd的数据持久化到内存,磁盘,等等。

如果我们已经不再对该rdd进行一个操作,而此时程序并没有终止,可以卸载已经持久化的该rdd数据,rdd.unPersist()。

9.3 持久化策略

可以通过persist(StoreageLevle的对象)来指定持久化策略,eg:StorageLevel.MEMORY_ONLY。

持久化策略 含义
MEMORY_ONLY(默认) rdd中的数据以未经序列化的java对象格式,存储在内存中。如果内存不足,剩余的部分不持久化,使用的时候,没有持久化的那一部分数据重新加载。这种效率是最高,但是是对内存要求最高的。
MEMORY_ONLY_SER 就比MEMORY_ONLY多了一个SER序列化,保存在内存中的数据是经过序列化之后的字节数组,同时每一个partition此时就是一个比较大的字节数组。
MEMORY_AND_DISK 和MEMORY_ONLY相比就多了一个,内存存不下的数据存储在磁盘中。
MEMEORY_AND_DISK_SER 比MEMORY_AND_DISK多了个序列化。
DISK_ONLY 就是MEMORY_ONLY对应,都保存在磁盘,效率太差,一般不用。
xxx_2 就是上述多个策略后面加了一个_2,比如MEMORY_ONLY_2,MEMORY_AND_DISK_SER_2等等,就多了一个replicate而已,备份,所以性能会下降,但是容错或者高可用加强了。所以需要在二者直接做权衡。如果说要求数据具备高可用,同时容错的时间花费比从新计算花费时间少,此时便可以使用,否则一般不用。
HEAP_OFF(experimental) 使用非Spark的内存,也即堆外内存,比如Tachyon,HBase、Redis等等内存来补充spark数据的缓存。

9.4 如何选择持久化策略

(1)如果要持久化的数据是可以在内存中进行保存,那么毫无疑问,选择MEMEORY_ONLY,因为这种方式的效率是最高的,但是在生成中往往要进行缓存的数据量还是蛮大的,而且因为数据都是未经序列化的java对象,所以很容易引起频繁的gc。

(2)如果上述满足不了,就退而求其次,MEMORY_ONLY_SER,这种方式增加的额外的性能开销就是序列化和反序列化,经过反序列化之后的对象就是纯java对象,因此性能还是蛮高的。

(3)如果还是扛不住,再退而求其次,MEMOEY_AND_DISK_SER,因为到这一步的话,那说明对象体积确实很多,为了提交执行效率,应该尽可能的将数据保存在内存,所以就对数据进行序列化,其次在序列化到磁盘。

(4)一般情况下DISK_ONLY,DISK_SER不用,效率太低,有时候真的不容从源头计算一遍。

(5)一般情况下我们都不用XXX_2,代备份的种种持久化策略,除非程序对数据的安全性要求非常高,或者说备份的对性能的消耗低于从头再算一遍,我们可以使用这种xxx_2以外,基本不用。

10 共享变量

10.1 概述

如果transformation使用到Driver中的变量,在executor中执行的时候,就需要通过网络传输到对应的executor,如果该变量很大,那么网络传输一定会成为性能的瓶颈。Spark就提供了两种有限类型的共享变量:累加器和广播变量

10.2 broadcast广播变量

广播变量:为每个task都拷贝一份变量,将变量包装成为一个广播变量(broadcast),只需要在executor中拷贝一份,在task运行的时候,直接从executor调用即可,相当于局部变量变成成员变量,性能就得到了提升。

val num:Any = xxxval numBC:Broadcast[Any] = sc.broadcast(num)调用:val n = numBC.value注意:该num需要进行序列化。

10.3 accumulator累加器

累加器的一个好处是,不需要修改程序的业务逻辑来完成数据累加,同时也不需要额外的触发一个action job来完成累加,反之必须要添加新的业务逻辑,必须要触发一个新的action job来完成,显然这个accumulator的操作性能更佳!

构建一个累加器val accu = sc.longAccumuator()累加的操作accu.add(参数)获取累加器的结果val ret = accu.value
val conf = new SparkConf()
.setAppName("AccumulatorOps")
.setMaster("local[*]")val sc = new SparkContext(conf)val lines = sc.textFile("file:/data.txt")
val words = lines.flatMap(_.split("\\s+"))//统计每个单词出现的次数
val accumulator = sc.longAccumulatorval rbk = words.map(word => {if(word == "is")accumulator.add(1)(word, 1)
}).reduceByKey(_+_)
rbk.foreach(println)
println("================使用累加器===================")
println("is: " + accumulator.value)Thread.sleep(1000000)
sc.stop()

注意:累加器的调用,在action之后被调用,也就是说累加器必须在action触发之后;多次使用同一个累加器,应该尽量做到用完即重置;尽量给累加器指定name,方便我们在web-ui上面进行查看

10.4 自定义累加器

自定义一个类继承AccumulatorV2,重写方法

/*自定义累加器IN 指的是accmulator.add(sth.)中sth的数据类型OUT 指的是accmulator.value返回值的数据类型*/
class MyAccumulator extends AccumulatorV2[String, Map[String, Long]] {private var map = mutable.Map[String, Long]()/*** 当前累加器是否有初始化值* 如果为一个long的值,0就是初始化值,如果为list,Nil就是初始化值,是map,Map()就是初始化值*/override def isZero: Boolean = trueoverride def copy(): AccumulatorV2[String, Map[String, Long]] = {val accu = new MyAccumulatoraccu.map = this.mapaccu}override def reset(): Unit = map.clear()//分区内的数据累加 is: 5, of:4override def add(word: String): Unit = {if(map.contains(word)) {val newCount = map(word) + 1map.put(word, newCount)} else {map.put(word, 1)}
//        map.put(word, map.getOrElse(word, 0) + 1)}//多个分区间的数据累加override def merge(other: AccumulatorV2[String, Map[String, Long]]): Unit = {other.value.foreach{case (word, count) => {if(map.contains(word)) {val newCount = map(word) + countmap.put(word, newCount)} else {map.put(word, count)}
//            map.put(word, map.getOrElse(word, 0) + count)}}}override def value: Map[String, Long] = map.toMap
}

注册使用:

object _08AccumulatorOps {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("$AccumulatorOps").setMaster("local[*]")val sc = new SparkContext(conf)val lines = sc.textFile("file:/E:/data.txt")val words = lines.flatMap(_.split("\\s+"))//注册val myAccu = new MyAccumulator()sc.register(myAccu, "myAccu")//统计每个单词出现的次数val pairs = words.map(word => {if(word == "is" || word == "of" || word == "a")myAccu.add(word)(word, 1)})val rbk = pairs.reduceByKey(_+_)rbk.foreach(println)println("=============累加器==========")myAccu.value.foreach(println)Thread.sleep(10000000)sc.stop()}
}

SparkCore基础相关推荐

  1. Spark-Core基础练习题30道

    Spark-Core练习题 1.创建一个1-10数组的RDD,将所有元素*2形成新的RDD val inputRDD = sc.parallelize(1 to 10) val newRDD = in ...

  2. spark学习基础篇1--spark概述与入门

    spark总结 一: spark 概述 1. Spark 是 分布式内存计算框架 Apache Spark 是一个快速的, 多用途的集群计算系统, 相对于 Hadoop MapReduce 将中间结果 ...

  3. 大数据自学——Spark

    Spark自学之路 Spark基础--思维导图 #1.1Spark是什么 Apache Spark 是一个快速的,多用途的计算系统,相对于Hadoop MapReduce将中间结果保存在磁盘中,Spa ...

  4. SparkSQL入门

    介绍 构建在SparkCore基础之上的一个SQL计算模块 前身叫shark,基于hive,hive发展慢,制约了shark的发展 独立出来新的项目spark sql 处理结构化数据 Spark SQ ...

  5. SparkSql学习记录

    目录 一.SparkSQL介绍 二.SparkSQL的编程模型(DataFrame和DataSet) 2.1 编程模型简介 2.2 RDD\DataSet\DataFrame 三者的区别 2.3 Sp ...

  6. Day69_SparkSQL(一)

    课程大纲 课程内容 学习效果 掌握目标 SparkSQL简介 SparkSQL简介 了解 SparkSQL特点 SparkSQL编程 编程模型 掌握 API操作 掌握 SparkSQL函数 Spark ...

  7. SparkSQL知识点总结

    一.SparkSql的概述 1.1 SparkSql是什么 1. SparkSql 是Spark生态体系中的一个基于SparkCore的SQL处理模块 2. 用途是处理具有结构化的数据文件的 3. 前 ...

  8. 大数据之Spark(四):Spark SQL

    一.SparkSQL的发展 1.1 概述 SparkSQL是Spark⽣态体系中的构建在SparkCore基础之上的⼀个基于SQL的计算模块. SparkSQL的前身不叫SparkSQL,⽽叫Shar ...

  9. Spark高手之路1—Spark简介

    文章目录 Spark 概述 1. Spark 是什么 2. Spark与Hadoop比较 2.1 从时间节点上来看 2.2 从功能上来看 3. Spark Or Hadoop 4. Spark 4.1 ...

最新文章

  1. gis 数据框裁剪_BIM+GIS的八大挑战!大挑战,见未来
  2. 图像去雾----暗通道
  3. 「高并发秒杀」mysql数据库引擎区别
  4. 关于一类docker容器闪退问题定位
  5. python随机产生10个数然后前5个升序后5个降序_编写程序,生成包含 20 个随机数的列表,然后将前 10 个元素升序排列,后 10 个元素降序排列,并输出结果。_学小易找答案...
  6. 【风马一族_软件】微软卸载工具_msicuu2.exe
  7. 20171018 在小程序页面去获取用户的OpenID
  8. 富士施乐m115b怎么连接电脑_富士施乐 Fuji Xerox DocuPrint M118w/M118z打印机无线连接设置详解...
  9. 四元数、欧拉角及方向余弦矩阵的相互转换公式
  10. PCM音频压缩A-Law算法,uLaw
  11. Tomcat文件包含漏洞:CNVD-2020-10487(简介/验证/利用/修复)
  12. 面由心生,由脸观心:基于AI的面部微表情分析技术解读
  13. 荧光染料 ICG-HSA 吲哚菁绿修饰人血白蛋白
  14. 文件头格式标准魔数-magic number和mime.types
  15. 缓冲区溢出漏洞_缓冲区溢出漏洞简介
  16. Hadoop生态圈(十五)- HDFS Trash垃圾回收详解
  17. Vulnhub 靶机 Stapler write up samba+wp advanced-video ->mysql 密码 连接 john解密 登录后台 wp插件getshell sudo提权
  18. 石油大学计算机第三次在线作业,石油大学管理会计第三次在线作业答案
  19. 天俊注塑机伺服每小时能省多少电?
  20. MyBatisPlus批量新增或修改执行器

热门文章

  1. 直击中关村创业大街,新街头霸王来了
  2. PHP list的赋值
  3. 系统权限管理设计 (转)
  4. Delphi下物理删除dBase数据库的*.dbf文件
  5. [翻译] python Tutorial 之一
  6. SQL PROCEDURE和 FUNCTION的区别
  7. 会考计算机考试vb知识点,高中会考计算机vb知识点.doc
  8. STM32——SPI接口
  9. make 命令_make考点总结(建议中、高考学生收藏)
  10. mysql存储过程触发器游标_MySQL存储过程,触发器,游标