Spark学习笔记总结

01. Spark基础

1. 介绍

Spark可以用于批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX)。
Spark是MapReduce的替代方案,而且兼容HDFS、Hive,可融入Hadoop的生态系统,以弥补MapReduce的不足。

2. Spark-Shell

  1. spark-shell是Spark自带的交互式Shell程序,用户可以在该命令行下用scala编写spark程序。
  2. 直接启动spark-shell,实质是spark的local模式,在master:8080中并未显示客户端连接。
  3. 集群模式:
    /usr/local/spark/bin/spark-shell --master spark://172.23.27.19:7077 --executor-memory 2g --total-executor-cores 2
  4. spark-shell中编写wordcount
    sc.textFile("hdfs://172.23.27.19:9000/wrd/wc/srcdata/").flatMap(.split(" ")).map((,1)).reduceByKey(+).sortBy(_._2,false).collect

3. RDD介绍与属性

1. 介绍

RDD(Resilient Distributed Dataset)叫做分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变(创建了内容不可变)、可分区、里面的元素可并行计算的集合。

2. 属性:

  1. 由多个分区组成。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。
  2. 一个计算函数用于每个分区。Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。
  3. RDD之间的依赖关系。RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。数据丢失时,根据依赖重新计算丢失的分区而不是整个分区。
  4. 一个Partitioner,即RDD的分片函数。默认是HashPartition
  5. 分区数据的最佳位置去计算。就是将计算任务分配到其所要处理数据块的存储位置。数据本地化。
3. 创建方式:
  1. 可通过并行化scala集合创建RDD
    val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
  2. 通过HDFS支持的文件系统创建,RDD里没有真的数据,只是记录了元数据
    val rdd2 = sc.textFile("hdfs://172.23.27.19:9000/wrd/wc/srcdata/")

查看该rdd的分区数量
rdd1.partitions.length

3. 基础的transformation和action

RDD中两种算子:
transformation转换,是延迟加载的

常用的transformation:
(1)map、flatMap、filter
(2)intersection求交集、union求并集:注意类型要一致
distinct:去重
(3)join:类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD
(4)groupByKey:在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD
但是效率reduceByKey较高,因为有一个本地combiner的过程。
(5)cartesian笛卡尔积

常用的action
(1)collect()、count()
(2)reduce:通过func函数聚集RDD中的所有元素
(3)take(n):取前n个;top(2):排序取前两个
(4)takeOrdered(n),排完序后取前n个

4. 较难的transformation和action

参考《http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html》

(1)mapPartitions(func)和
mapPartitions(func):
独立地在RDD的每一个分片上运行,但是返回值;foreachPartition(func)也常用,不需要返回值

mapPartitionsWithIndex(func):
可以看到分区的编号,以及该分区数据。
类似于mapPartitions,但func带有一个整数参数表示分片的索引值,func的函数类型必须是
(Int, Interator[T]) => Iterator[U]

val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
val func = (index: Int, iter: Iterator[(Int)]) => {iter.toList.map(x => "[partID:" +  index + ", val: " + x + "]").iterator}
rdd1.mapPartitionsWithIndex(func).collect

(2)aggregate
action操作,
第一个参数是初始值,
第二个参数:是2个函数[每个函数都是2个参数(第一个参数:先对个个分区进行的操作, 第二个:对个个分区合并后的结果再进行合并), 输出一个参数]

例子:

rdd1.aggregate(0)(_+_, _+_)
//前一个是对每一个分区进行的操作,第二个是对各分区结果进行的结果rdd1.aggregate(5)(math.max(_, _), _ + _)
//结果:5 + (5+9) = 19val rdd3 = sc.parallelize(List("12","23","345","4567"),2)
rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y)
//结果:24或者42val rdd4 = sc.parallelize(List("12","23","345",""),2)
rdd4.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
//结果01或者10

(3)aggregateByKey
将key值相同的,先局部操作,再整体操作。。和reduceByKey内部实现差不多

val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect
//结果:Array((dog,12), (cat,17), (mouse,6))

PS:
和reduceByKey(+)调用的都是同一个方法,只是aggregateByKey要底层一些,可以先局部再整体操作。

(4)combineByKey
和reduceByKey是相同的效果,是reduceByKey的底层。
第一个参数x:原封不动取出来, 第二个参数:是函数, 局部运算, 第三个:是函数, 对局部运算后的结果再做运算
每个分区中每个key中value中的第一个值,

val rdd1 = sc.textFile("hdfs://master:9000/wordcount/input/").flatMap(_.split(" ")).map((_, 1))
val rdd2 = rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
rdd2.collect

第一个参数的含义:
每个分区中相同的key中value中的第一个值
如:
(hello,1)(hello,1)(good,1)-->(hello(1,1),good(1))-->x就相当于hello的第一个1, good中的1

val rdd3 = rdd1.combineByKey(x => x + 10, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
rdd3.collect
//每个会多加3个10val rdd4 = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
val rdd5 = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)
val rdd6 = rdd5.zip(rdd4)
val rdd7 = rdd6.combineByKey(List(_), (x: List[String], y: String) => x :+ y, (m: List[String], n: List[String]) => m ++ n)
//将key相同的数据,放入一个集合中

(5)collectAsMap
Action
Map(b -> 2, a -> 1)//将Array的元祖转换成Map,以后可以通过key取值

val rdd = sc.parallelize(List(("a", 1), ("b", 2)))
rdd.collectAsMap
//可以下一步使用

(6)countByKey
根据key计算key的数量
Action

val rdd1 = sc.parallelize(List(("a", 1), ("b", 2), ("b", 2), ("c", 2), ("c", 1)))
rdd1.countByKey
rdd1.countByValue//将("a", 1)当做一个元素,统计其出现的次数

(7)flatMapValues
对每一个value进行操作后压平

转载于:https://www.cnblogs.com/wangrd/p/6216924.html

Spark RDD算子介绍相关推荐

  1. Spark学习之Spark RDD算子

    个人主页zicesun.com 这里,从源码的角度总结一下Spark RDD算子的用法. 单值型Transformation算子 map /*** Return a new RDD by applyi ...

  2. Spark RDD算子(八)mapPartitions, mapPartitionsWithIndex

    Spark RDD算子(八) mapPartitions scala版本 java版本 mapPartitionsWithIndex scala版本 java版本 mapPartitions mapP ...

  3. Spark RDD算子(transformation + action)

    概念 RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变.可分区.里面的元素可并行计算的集合.RDD具有数据流模 ...

  4. spark RDD算子大全

    目录 map()&&flatMap() map()&&mapPartitions() mapPartitionsWithIndex() filter() take()& ...

  5. Spark—RDD算子使用IDEA-Scala操作练习:请根据磁盘文件“数据集”data01.txt,该数据集包含了某大学计算机系的成绩,计算下列问题。

    一.数据源文件下载 https://download.csdn.net/download/weixin_45947938/66589736 二.问题描述 请根据给定的实验数据,在idea中通过Scal ...

  6. Spark RDD算子(四) mapToPair、flatMapToPair

    目录 一.mapToPair 二.flatMapToPair 一.mapToPair 将每一行的第一个单词作为键,1 作为value创建pairRDD scala版本 scala没有mapToPair ...

  7. spark常用RDD算子 汇总(java和scala版本)

    github: https://github.com/zhaikaishun/spark_tutorial  spark RDD的算子挺多,有时候如何灵活的使用,该如何用一下子想不起来,这一段时间将s ...

  8. Spark-----Spark 与 Hadoop 对比,Spark 集群搭建与示例运行,RDD算子简单入门

    目录 一.Spark 概述 1.1. Spark是什么 1.2. Spark的特点(优点) 1.3. Spark组件 1.4. Spark和Hadoop的异同 二.Spark 集群搭建 2.1. Sp ...

  9. 学习笔记Spark(四)—— Spark编程基础(创建RDD、RDD算子、文件读取与存储)

    文章目录 一.创建RDD 1.1.启动Spark shell 1.2.创建RDD 1.2.1.从集合中创建RDD 1.2.2.从外部存储中创建RDD 任务1: 二.RDD算子 2.1.map与flat ...

最新文章

  1. JS进阶篇--ckplayer.js视频播放插件
  2. mysql 执行计划大于_Mysql执行计划(大章)
  3. boost:is_straight_line_drawing用法的测试程序
  4. 机器学习--Lasso回归(LassoRegression)
  5. codis配置_Codis的安装配置
  6. 产品必备-产品FDD模板(PRD)
  7. win10哪个版本最好用,推荐win10企业版LTSC
  8. Ubuntu18.04修改登录页面背景
  9. iphone11 android,iphone11launcher
  10. 做到这几点在家也能拍出好看的证件照
  11. Vagrant 安装 Centos7
  12. CF1132D Stressful Training
  13. CentOS6.5挂载大于2TB的磁盘使用parted和GPT类型
  14. DirectX12 3D游戏开发实践(龙书)第二章_矩阵代数
  15. unity中的四元数,欧拉角,方向向量之间的相互转换方法。
  16. luogu P4233 射命丸文的笔记
  17. C++代码封装成dll供C#中调用、调用dll无可用源
  18. about Red_Hat_Enterprise_Linux_7
  19. keras自定义simm作为损失函数,并且实现Tensor和数组之间的转换
  20. 解决Visual Studio 2008安装时出现的1330数字签名错误

热门文章

  1. Java线程面试题 Top 53
  2. C++std命名空间和头文件详解
  3. 求完全二叉树的结点个数
  4. 返回地址【数据结构】
  5. C++:15---异常机制
  6. caffe各层参数详解
  7. 学点数学(2)-特征函数
  8. 13个好习惯 教你健康一整年
  9. 科目三电子考的通过率普遍偏低
  10. 简述 maven 命令 package、install、deploy 的区别