文章目录

  • 一、RDD详解
    • 1.为什么需要RDD?
    • 2.什么是RDD?
    • 3.RDD的5大特性
    • 4.WordCount案例中的RDD
    • 5.总结
  • 二、RDD编程入门
    • 1.程序入口 SparkContext对象
    • 2.RDD的创建
    • 3.RDD算子概念和分类
    • 4.常用Transformation算子
      • 4.1 转换算子——map
      • 4.2 转换算子——flatMap
      • 4.3 转换算子——reduceByKey
      • 4.4 转换算子——mapValues
      • 4.5 转换算子——groupBy
      • 4.6 转换算子——filter
      • 4.7 转换算子——distinct
      • 4.8 转换算子——union
      • 4.9 转换算子——join
      • 4.10 转换算子——intersection
      • 4.11 转换算子——glom
      • 4.12 转换算子——groupByKey
      • 4.13 转换算子——sortBy
      • 4.14 转换算子——sortByKey
      • 4.15 案例
    • 5.常用Action算子
      • 5.1 Action算子——countByKey
      • 5.2 Action算子——collect
      • 5.3 Action算子——reduce
      • 5.4 Action算子——fold
      • 5.5 Action算子——first
      • 5.6 Action算子——take
      • 5.7 Action算子——top
      • 5.8 Action算子——count
      • 5.9 Action算子——takeSample
      • 5.10 Action算子——takeOrdered
      • 5.11 Action算子——foreach
      • 5.12 Action算子——saveAsTextFile
    • 6.分区操作算子
      • 6.1 分区操作算子——mapPartitions
      • 6.2 分区操作算子——foreachPartitions
      • 6.3 分区操作算子——partitionBy
      • 6.4 分区操作算子——repartition & coalesce
    • 7. 总结

传送门:

  • 视频地址:黑马程序员Spark全套视频教程
  • 1.PySpark基础入门(一)
  • 2.PySpark基础入门(二)
  • 3.PySpark核心编程(一)
  • 4.PySpark核心编程(二)
  • 5.PySaprk——SparkSQL学习(一)
  • 6.PySaprk——SparkSQL学习(二)
  • 7.Spark综合案例——零售业务统计分析
  • 8. Spark3新特性及核心概念(背)

一、RDD详解

1.为什么需要RDD?

  分布式计算需要:

  • 分区控制:多台机器同时并行计算
  • Shuffle控制:不同分区之间数据的关联
  • 数据存储\序列化\发送:
  • 数据计算API:

等一系列功能。这些功能,不能简单的通过Python内置的本地集合对象(如 List\ 字典等)去完成。我们在分布式框架中,需要有一个统一的数据抽象对象,来实现上述分布式计算所需功能。这个抽象对象, 就是RDD。

2.什么是RDD?

  RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算的集合。所有的运算以及操作都建立在 RDD 数据结构的基础之上。

  • Dataset:一个数据集合,用于存放数据的。
  • Distributed:RDD中的数据是分布式存储的,可用于分布式计算
  • Resilient:RDD中的数据可以存储在内存中或者磁盘中

3.RDD的5大特性

  RDD 数据结构内部有五个特性:前三个特性每个RDD都具备的,后两个特性可选的。

  • 特性1:RDD是有分区的
    RDD的分区是数据存储的最小单位,一份RDD数据本质上是分隔成了多个分区。

  • 特性2:RDD的方法会作用在其所有的分区上
    如图,RDD3个分区在执行了map操作将数据都乘以10后,可以看到,3个分区的数据都乘以了10,体现了.map方法,是作用在了每一个分区之上。

  • 特性3:RDD之间是有依赖关系的(迭代计算关系)

    sc = SparkContext(conf=conf)
    rdd1 = sc.textFile('hdfs://node1:8020/test/input/wordcount.txt')
    # 将单词进行切割,得到一个存储全部单词的集合对象
    rdd2 = rdd1.flatMap(lambda line: line.split(" "))
    # 将单词转换为元祖对象,key是单词,value是数字1
    rdd3 = rdd2.map(lambda x: (x, 1))
    # 将元祖的value按照key来分组,对所有value执行聚合相加操作
    rdd4 = rdd3.reduceByKey(lambda a, b: a + b)
    # 通过collect方法手机RDD的数据打印输出结果
    print(rdd4.collect())
    

    如上代码,RDD之间是有依赖的。比如,RDD2会产生RDD3,但是RDD2依赖RDD1。同样,RDD3会产生RDD4,但是RDD3依赖RDD2会形成一个依赖链条。这个链条称之为RDD的血缘关系。

  • 特性4:Key-Value型的RDD可以有分区器
    默认分区器: Hash分区规则,可以手动设置一个分区器(rdd.partitionBy的方法来设置)。这个特性是可能的,因为不是所有RDD都是Key-Value型。
    Key-Value RDD:RDD中存储的是二元元组(只有2个元素的元组,比如: (“hadoop”,6)),这就是Key-Value型RDD。

  • 特性5:RDD的分区规划,会尽量靠近数据所在的服务器
    在初始RDD(读取数据的时候)规划的时候,分区会尽量规划到存储数据所在的服务器上。因为这样可以走本地读取,避免网络读取

    总结, Spark会在确保并行计算能力的前提下,尽量确保本地读取
    这里是尽量确保,而不是100%确保
    所以这个特性也是:可能的

4.WordCount案例中的RDD

  以WordCount案例的执行流程来分析,

sc = SparkContext(conf=conf)
rdd1 = sc.textFile('hdfs://node1:8020/test/input/wordcount.txt')
# 将单词进行切割,得到一个存储全部单词的集合对象
rdd2 = rdd1.flatMap(lambda line: line.split(" "))
# 将单词转换为元祖对象,key是单词,value是数字1
rdd3 = rdd2.map(lambda x: (x, 1))
# 将元祖的value按照key来分组,对所有value执行聚合相加操作
rdd4 = rdd3.reduceByKey(lambda a, b: a + b)
# 通过collect方法手机RDD的数据打印输出结果
print(rdd4.collect())

5.总结

  • 如何正确理解RDD?
    弹性分布式数据集,分布式计算的实现载体(数据抽象)
  • RDD五大特点分别是?
    • 特性1:RDD是有分区的
    • 特性2:RDD的方法会作用在其所有的分区上
    • 特性3:RDD之间是有依赖关系的(迭代计算关系)
    • 特性4:Key-Value型的RDD可以有分区器
    • 特性5:RDD的分区规划,会尽量靠近数据所在的服务器

二、RDD编程入门

1.程序入口 SparkContext对象

  Spark RDD 编程的程序入口对象是SparkContext对象(不论何种编程语言)。只有构建出SparkContext,基于它才能执行后续的API调用和计算。本质上,SparkContext对编程来说,主要功能就是创建第一个RDD出来。

2.RDD的创建

  RDD的创建主要有二种方式:

  • 通过并行化创建(本地集合转分布式RDD)

    • sc.parallelize:本地集合转化分布式RDD
    • rdd.getNumPartitions():获取RDD分区数量,返回值是Int数字
    • rdd.collect():分布式RDD转化为本地集合
  • 读取外部数据源(读取文件)
    • sparkcontext.textFile(参数1,参数2):既可以读取本地数据,也可以读取HDFS数据
    • sparkcontext.wholeTextFiles:小文件读取专用
  • 并行化创建(本地集合转分布式RDD)
    API:parallelize

    rdd = sparkcontext.parallelize(参数1,参数2)
    # 参数1:集合对象即可,比如list
    # 参数2:分区数
    
    #!usr/bin/env python
    # -*- coding:utf-8 -*-# 导入pyspark的相关包
    from pyspark import SparkConf, SparkContextif __name__ == '__main__':# 1.构建SparkContext对象conf = SparkConf().setAppName("create rdd").setMaster("local[*]")sc = SparkContext(conf=conf)# TODO:演示通过并行化集合的方式创建RDD# 本地集合转分布式RDDrdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9])# parallelize方法:在没有给定分区数的情况下,打印分区数(根据CPU核心数来定)print("默认分区数:", rdd.getNumPartitions())# rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 3)print("分区数:", rdd.getNumPartitions())# collect方法:是将RDD(分布式对象)中每个分区的数据,都发送到Driver中,形成一个Python List对象# collect:分布式RDD转本地集合print("RDD的内容是:", rdd.collect())
    
    默认分区数: 2
    分区数: 3
    RDD的内容是: [1, 2, 3, 4, 5, 6, 7, 8, 9]
    
  • 读取外部数据源(读取文件)
    API:textFile

    sparkcontext.textFile(参数1,参数2)
    # 这个API既可以读取本地数据,也可以读取HDFS数据
    # 参数1,必填,文件路径支持本地文件,支持HDFS,也支持一些比如S3协议
    # 参数2,可选,表示最小分区数量
    # 注意:textFile一般除了你有很明确的指向性,一般情况下,我们不设置分区参数。参数2话语权不足,spark有自己的判断,在它允许的范围内,参数2有效果,超出spank允许的范围,参数2失效。
    
    #!usr/bin/env python
    # -*- coding:utf-8 -*-# 导入pyspark的相关包
    from pyspark import SparkConf, SparkContextif __name__ == '__main__':# 1.构建sparkcontext对象conf = SparkConf().setAppName('create rdd').setMaster('local[*]')sc = SparkContext(conf=conf)# TODO:读取外部数据源(读取本地文件)# 2.textFile()读取文件file_rdd1 = sc.textFile('../data/input/words.txt')print('默认读取分区数:', file_rdd1.getNumPartitions())print('file_rdd1内容:', file_rdd1.collect())# 加最小分区数参数的测试file_rdd2 = sc.textFile('../data/input/words.txt', 3)print('file_rdd2分区数:', file_rdd2.getNumPartitions())file_rdd3 = sc.textFile('../data/input/words.txt', 100)print('file_rdd3分区数:', file_rdd3.getNumPartitions())# TODO:读取外部数据源(读取HDFS文件)hdfs_rdd = sc.textFile('hdfs://node1:8020/test/input/wordcount.txt')print("hdfs_rdd 内容:", hdfs_rdd.collect())
    
    默认读取分区数: 2
    file_rdd1内容: ['hello spark', 'hello hadoop', 'hello flink']
    file_rdd2分区数: 4
    file_rdd3分区数: 38
    hdfs_rdd 内容: ['hadoop hello world hive', 'hive hadoop hello hadoop', 'hadoop hive hive']
    

    API:wholeTextFile

    sparkcontext.wholeTextFiles(参数1,参数2)
    # 读取文件的API,有个适用场景:适合读取一堆小文件
    #参数1,必填,文件路径支持本地文件支持HDFS也支持一些比如S3协议
    #参数2,可选,表示最小分区数量.
    #注意:参数2话语权不足,这个API分区数量最多也只能开到文件数量
    # 这个API偏向于少量分区读取数据。因为,这个API表明了自己是小文件读取专用,那么文件的数据很小分区很多,导致shuffle的几率更高。所以尽量少分区读取数据。
    
    #!usr/bin/env python
    # -*- coding:utf-8 -*-from pyspark import SparkConf, SparkContextif __name__ == '__main__':# 1.构建SparkContext对象conf = SparkConf().setAppName('create rdd').setMaster('local[*]')sc = SparkContext(conf=conf)# TODO:读取小文件file_rdd1 = sc.wholeTextFiles('../data/input/tiny_files')print(file_rdd1.collect())print(file_rdd1.map(lambda x: x[1]).collect())
    
    [('file:/tmp/pycharm_project_189/data/input/tiny_files/1.txt', 'hello spark\r\nhello hadoop\r\nhello flink'), ('file:/tmp/pycharm_project_189/data/input/tiny_files/2.txt', 'hello spark\r\nhello hadoop\r\nhello flink'), ('file:/tmp/pycharm_project_189/data/input/tiny_files/3.txt', 'hello spark\r\nhello hadoop\r\nhello flink'), ('file:/tmp/pycharm_project_189/data/input/tiny_files/4.txt', 'hello spark\r\nhello hadoop\r\nhello flink'), ('file:/tmp/pycharm_project_189/data/input/tiny_files/5.txt', 'hello spark\r\nhello hadoop\r\nhello flink')]
    ['hello spark\r\nhello hadoop\r\nhello flink', 'hello spark\r\nhello hadoop\r\nhello flink', 'hello spark\r\nhello hadoop\r\nhello flink', 'hello spark\r\nhello hadoop\r\nhello flink', 'hello spark\r\nhello hadoop\r\nhello flink']
    

3.RDD算子概念和分类

  • RDD算子的概念:

    算子:分布式对象API
    方法/函数:本地对象API

  • RDD算子分类:

    RDD算子分为两类:Transformation算子和Action算子

    • Transformation算子
      定义:RDD的算子,返回值仍然是RDD的算子,称之为转换算子
      特性:这类算子是懒加载的。如果没有action算子,Transformation算子是不工作的
    • Action算子
      定义:返回值不是RDD的就是Action算子

    对于这两类算子来说,Transformation算子相当于在构建执行计划,action是一个指令让这个执行计划开始工作。如果没有action,Transformation算子之间的迭代关系就是一个没有通电的流水线。只有action到来,这个数据处理的流水线才开始工作。

4.常用Transformation算子

4.1 转换算子——map

  • 功能
    map算子将RDD的数据一条条处理(处理逻辑:基于map算子中接受的函数),返回新的RDD

  • 语法

    rdd.map(func)
    # func:  f(T) -> U,传入参数并给予返回值
    # map算子作用在每一个分区上,
    
  • 实战

    """
    对于算子的接收函数来说,两种方法都可以- lambda表达式适用于一行代码就搞定的函数体- 如果多行代码,需要定义独立的方法
    """
    from pyspark import SparkConf, SparkContextif __name__ == '__main__':# 1.构建SparkContext对象conf = SparkConf().setAppName('create rdd').setMaster('local[*]')sc = SparkContext(conf=conf)# TODO:map算子——定义add函数rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 3)# 定义方法,作为算子的传入函数体def add(data):return data * 10rdd1 = rdd.map(add)print(rdd1.collect())# TODO:map算子——定义lambda表达式写匿名函数rdd1 = rdd.map(lambda x: x * 10)print(rdd1.collect())
    
    [10, 20, 30, 40, 50, 60]
    [10, 20, 30, 40, 50, 60]
    

4.2 转换算子——flatMap

  • 功能
    对RDD执行map操作,然后进行解除嵌套操作。

  • 实战

    #!usr/bin/env python
    # -*- coding:utf-8 -*-from pyspark import SparkConf, SparkContextif __name__ == '__main__':# 1.构建SparkContext对象conf = SparkConf().setAppName('create rdd').setMaster('local[*]')sc = SparkContext(conf=conf)# 2.并行化创建——本地集合转分布式RDDrdd = sc.parallelize(['hadoop spark hadoop', 'spark hadoop hadoop', 'hadoop flink spark'])# TODO:map操作,返回嵌套式结构rdd2 = rdd.map(lambda line: line.split(' '))print(rdd2.collect())# TODO:flatMap操作,执行完map操作后,然后解除嵌套rdd3 = rdd.flatMap(lambda line: line.split(' '))print(rdd3.collect())
    
    [['hadoop', 'spark', 'hadoop'], ['spark', 'hadoop', 'hadoop'], ['hadoop', 'flink', 'spark']]
    ['hadoop', 'spark', 'hadoop', 'spark', 'hadoop', 'hadoop', 'hadoop', 'flink', 'spark']
    

4.3 转换算子——reduceByKey

  • 功能:
    针对KV型RDD,自动按照key分组,然后根据你提供的聚合逻辑,完成组内数据(value)的聚合操作。简单来说,自动按key分组,按value聚合

  • 语法:

    rdd.reduceByKey(func)
    # func:(V, V) → V
    #接受2个传入参数(类型要一致),返回一个返回值,类型和传入要求—致
    

    以聚合函数:lambda a,b : a+b为例,其组内数据聚合逻辑为:

  • 实战

    #!usr/bin/env python
    # -*- coding:utf-8 -*-from pyspark import SparkContext, SparkConfif __name__ == '__main__':# 1.构建SparkContext对象conf = SparkConf().setAppName('create rdd').setMaster('local[*]')sc = SparkContext(conf=conf)# 2.并行化创建rdd = sc.parallelize([('hadoop', 1), ('hadoop', 2), ('hadoop', 3), ('spark', 1), ('hadoop', 4), ('spark', 2), ('spark', 3),('flink', 1), ('flink', 2)])# TODO:reduceByKey:自动按key分组,按照自定义聚合逻辑对value进行聚合rdd2 = rdd.reduceByKey(lambda a, b: a + b)print(rdd2.collect())
    
    [('hadoop', 10), ('spark', 6), ('flink', 3)]
    

4.4 转换算子——mapValues

  • 功能:
    针对二元元祖RDD,对其内部的二元元祖的value执行map操作

  • 语法:

    rdd.mapvalues(func)
    #func:(V) → U
    #注意,传入的参数,是二元元组的value值
    #我们这个传入的方法,只对value进行处理
    
  • 实战

    #!usr/bin/env python
    # -*- coding:utf-8 -*-
    from pyspark import SparkConf, SparkContextif __name__ == '__main__':# 1.构建SparkContext对象conf = SparkConf().setAppName('create rdd').setMaster('local[*]')sc = SparkContext(conf=conf)# 2.并行化创建rdd = sc.parallelize([('a', 1), ('b', 2), ('c', 3), ('d', 4)])# TODO:对二元元祖内部value执行map操作rdd2 = rdd.map(lambda x: (x[0], x[1] * 10))print(rdd2.collect())rdd3 = rdd.mapValues(lambda x: x * 10)print(rdd3.collect())
    
    [('a', 10), ('b', 20), ('c', 30), ('d', 40)]
    [('a', 10), ('b', 20), ('c', 30), ('d', 40)]
    

4.5 转换算子——groupBy

  • 功能:
    将rdd数据进行分组

  • 语法:

    rdd . groupBy(func)# func函数
    # func: (T) → K
    #函数要求传入一个参数,返回一个返回值,类型无所谓
    #这个函数是拿到你的返回值后,将所有相同返回值的放入一个组中
    #分组完成后,每一个组是一个二元元组,key就是返回值,所有同组的数据放入一个迭代器对象中作为value
    
  • 实战

    #!usr/bin/env python
    # -*- coding:utf-8 -*-from pyspark import SparkConf, SparkContextif __name__ == '__main__':# 1.构建SparkContext对象conf = SparkConf().setAppName('create rdd').setMaster('local[*]')sc = SparkContext(conf=conf)# 2.将本地集合转换为分布式RDDrdd = sc.parallelize([('a', 1), ('a', 2), ('a', 3), ('b', 1), ('b', 2), ('c', 2)])# TODO:groupBy算子对数据进行分组# groupBy传入的函数表示通过这个函数确定按照谁来分组# 分组规则:相同的在一个组rdd1 = rdd.groupBy(lambda t: t[0])rdd2 = rdd1.map(lambda t: (t[0], list(t[1])))print(rdd2.collect())# 将本地集合转化为分布式RDDrdd = sc.parallelize([1, 2, 3, 4, 5])# 分组,将数字分为奇数和偶数rdd1 = rdd.groupBy(lambda x: 'even' if (x % 2 == 0) else 'add')print(rdd1.map(lambda x: (x[0], list(x[1]))).collect())
    
    [('b', [('b', 1), ('b', 2)]), ('c', [('c', 2)]), ('a', [('a', 1), ('a', 2), ('a', 3)])]
    [('even', [2, 4]), ('add', [1, 3, 5])]
    

4.6 转换算子——filter

  • 功能:
    过滤想要的数据进行保留

  • 语法:

    rdd.filter(func)
    # func: (T) -> bool传入1个参数(随意类型)进来,返回值必须是True or False
    # 将结果为True的保留,结果为False的丢弃
    
  • 实战

    #!usr/bin/env python
    # -*- coding:utf-8 -*-from pyspark import SparkConf, SparkContextif __name__ == '__main__':# 1.构建SparkContext对象conf = SparkConf().setAppName('create rdd').setMaster('local[*]')sc = SparkContext(conf=conf)# 2.将本地集合转化为分布式RDDrdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9])# TODO:filter算子——过滤想要的数据进行保留# 保留奇数rdd2 = rdd.filter(lambda x : x % 2 == 1)print(rdd2.collect())
    
    [1, 3, 5, 7, 9]
    

4.7 转换算子——distinct

  • 功能:
    对RDD算子进行去重,返回新RDD

  • 语法:

    rdd.distinct(参数1)
    #参数1,去重分区数量,一般不用传
    
  • 实战:

    #!usr/bin/env python
    # -*- coding:utf-8 -*-from pyspark import SparkConf, SparkContextif __name__ == '__main__':# 1.构建SparkContext对象conf = SparkConf().setAppName('create rdd').setMaster('local[*]')sc = SparkContext(conf=conf)# 2.将本地集合转化为分布式RDDrdd = sc.parallelize([1, 1, 1, 1, 2, 2, 2, 3, 3])# TODO:distinct算子进行去重操作print(rdd.distinct().collect())rdd = sc.parallelize([('a', 1), ('a', 1), ('a', 3)])print(rdd.distinct().collect())
    
    [2, 1, 3]
    [('a', 1), ('a', 3)]
    

4.8 转换算子——union

  • 功能:
    将2个rdd合并成一个rdd进行返回

    注意:

    • rdd之间只合并不去重
    • 不同类型的rdd依旧可以混合
  • 语法:

    rdd.union(other_rdd)
    
  • 实战

    #!usr/bin/env python
    # -*- coding:utf-8 -*-from pyspark import SparkConf, SparkContextif __name__ == '__main__':# 1.构建SparkContext对象conf = SparkConf().setAppName('create rdd').setMaster('local[*]')sc = SparkContext(conf=conf)# TODO:union算子将两个RDD合并返回rdd1 = sc.parallelize([1, 1, 3, 3])rdd2 = sc.parallelize(['a', 'b', 'c'])union_rdd = rdd1.union(rdd2)print(union_rdd.collect())
    
    [1, 1, 3, 3, 'a', 'b', 'c']
    

4.9 转换算子——join

  • 功能:
    对两个RDD实现JOIN操作(可实现SQL的内/外连接)

    注意:join算子只能用于二元元祖,按照key进行关联,如果想按照value进行关联,需要利用map算子将key与value调换

  • 语法:

    rdd .join(other_rdd) #内连接
    rdd.leftouterJoin(other_rdd) #左外连接
    rdd .rightOuterJoin(other_rdd) #右外连接
    
  • 实战:

    #!usr/bin/env python
    # -*- coding:utf-8 -*-from pyspark import SparkConf, SparkContextif __name__ == '__main__':# 1.构建SparkContext对象conf = SparkConf().setAppName('create rdd').setMaster('local[*]')sc = SparkContext(conf=conf)# 2.本地集合转分布式RDD# 部门ID和员工姓名rdd1 = sc.parallelize([(1001, 'zhangsan'), (1002, 'lisi'), (1003, 'wangwu'), (1004, 'zhangliu')])# 部门ID和部门名称rdd2 = sc.parallelize([(1001, 'sales'), (1002, 'tech')])# TODO:join算子是内连接print(rdd1.join(rdd2).collect())# TODO:leftOuterJoin是左外连接print(rdd1.leftOuterJoin(rdd2).collect())# TODO:rightOuterJoin是右外连接print(rdd1.rightOuterJoin(rdd2).collect())
    
    [(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]
    [(1004, ('zhangliu', None)), (1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech')), (1003, ('wangwu', None))]
    [(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]
    

4.10 转换算子——intersection

  • 功能:
    求两个rdd的交集,返回一个新的rdd

  • 语法

    rdd.intersection(other_rdd)
    
  • 实战

    #!usr/bin/env python
    # -*- coding:utf-8 -*-from pyspark import SparkConf, SparkContextif __name__ == '__main__':# 1.构建SparkContext对象conf = SparkConf().setAppName('create rdd').setMaster('local[*]')sc = SparkContext(conf=conf)# 2.TODO:intersection算子是求2个rdd的交集,返回新的rddrdd1 = sc.parallelize([('a', 1), ('b', 1)])rdd2 = sc.parallelize([('a', 1), ('c', 1)])inter_rdd = rdd1.intersection(rdd2)print(inter_rdd.collect())
    
    [('a', 1)]
    

4.11 转换算子——glom

  • 功能:
    将RDD的数据,加上嵌套,这个嵌套按照分区来进行。

    比如:RDD数据[1,2,3,4,5]有2个分区,那么,被glom后,数据变成:[[1,2,3],[4,5]]

  • 语法:

    rdd.glom()
    
  • 实战

    #!usr/bin/env python
    # -*- coding:utf-8 -*-from pyspark import SparkConf, SparkContextif __name__ == '__main__':# 1.构建SparkContext对象conf = SparkConf().setAppName('create rdd').setMaster('local[*]')sc = SparkContext(conf=conf)# 2.TODO:glom算子是将RDD的数据,加上嵌套,这个嵌套按照分区来进行。rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 3)# 加上嵌套print(rdd.glom().collect())# 解除嵌套print(rdd.glom().flatMap(lambda x : x).collect())
    
    [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
    [1, 2, 3, 4, 5, 6, 7, 8, 9]
    

4.12 转换算子——groupByKey

  • 功能:
    针对KV型RDD,自动按照key分组

  • 语法:

    rdd.groupByKey()
    
  • 实战

    #!usr/bin/env python
    # -*- coding:utf-8 -*-from pyspark import SparkConf, SparkContextif __name__ == '__main__':# 1.构建SparkContext对象conf = SparkConf().setAppName('create rdd').setMaster('local[*]')sc = SparkContext(conf=conf)# 2.TODO:groupByKey算子是针对KV型RDD,自动按照key分组rdd = sc.parallelize([('a', 1), ('a', 2), ('a', 3), ('b', 1), ('b', 2), ('c', 1), ('c', 3)])group_rdd = rdd.groupByKey()print(group_rdd.map(lambda x: (x[0], list(x[1]))).collect())
    
    [('b', [1, 2]), ('c', [1, 3]), ('a', [1, 2, 3])]
    

4.13 转换算子——sortBy

  • 功能:
    对RDD数据进行排序(基于指定的排序依据)

  • 语法:

    rdd.sortBy(func,ascending=False,numPartitions=1)
    #func: (T) → U:告知按照rdd中的哪个数据进行排序,比如lambda x: x[1]表示按照rdd中的第二列元素进行排序
    #ascending True升序False降序
    #numPartitions:用多少分区排序
    
  • 实战

    #!usr/bin/env python
    # -*- coding:utf-8 -*-from pyspark import SparkConf, SparkContextif __name__ == '__main__':# 1.构建SparkContext对象conf = SparkConf().setAppName('create rdd').setMaster('local[*]')sc = SparkContext(conf=conf)# 2.TODO:sortBy算子对RDD数据进行排序rdd = sc.parallelize([('a', 1), ('b', 2), ('c', 1), ('f', 5), ('e', 3), ('g', 6), ('h', 2)])# 参数1:函数,告知Spark按照数据的哪个列排序# 参数2:True表示升序;False表示降序# 参数3:排序的分区数"""注意:如果要全局有序,排序分区数请设置为1"""print(rdd.sortBy(lambda x: x[1], ascending=True, numPartitions=3).collect())print(rdd.sortBy(lambda x: x[0], ascending=False, numPartitions=3).collect())
    
    [('a', 1), ('c', 1), ('b', 2), ('h', 2), ('e', 3), ('f', 5), ('g', 6)]
    [('h', 2), ('g', 6), ('f', 5), ('e', 3), ('c', 1), ('b', 2), ('a', 1)]
    

4.14 转换算子——sortByKey

  • 功能:
    针对KV型RDD,按照key进行排序

  • 语法:

    rdd.sortByKey(ascending=True,numPartitions=None,keyfunc=<function RDD.<lambda>>)
    #ascending:升序or降序,True升序, False降序,默认是升序
    #numPartitions:按照几个分区进行排序,如果全局有序,设置1
    #keyfunc:在排序前对key进行处理,语法: (k)→ U ,一个参数传入,返回一个值
    
  • 实战

    #!usr/bin/env python
    # -*- coding:utf-8 -*-from pyspark import SparkConf, SparkContextif __name__ == '__main__':# 1.构建SparkContext对象conf = SparkConf().setAppName('create rdd').setMaster('local[*]')sc = SparkContext(conf=conf)# 2.TODO:sortByKey算子:针对KV型RDD,按照key进行排序rdd = sc.parallelize([('a', 1), ('E', 1), ('C', 1), ('D', 1), ('b', 1), ('g', 1), ('f', 1),('y', 1), ('u', 1), ('i', 1), ('o', 1), ('p', 1),('m', 1), ('n', 1), ('j', 1), ('k', 1), ('l', 1)], 3)# 将key强制转化为小写并按照key进行排序sort_rdd = rdd.sortByKey(ascending=True, numPartitions=1, keyfunc=lambda key: str(key).lower())print(sort_rdd.collect())
    
    [('a', 1), ('b', 1), ('C', 1), ('D', 1), ('E', 1), ('f', 1), ('g', 1), ('i', 1), ('j', 1), ('k', 1), ('l', 1), ('m', 1), ('n', 1), ('o', 1), ('p', 1), ('u', 1), ('y', 1)]
    

4.15 案例

  • 数据集

  • 需求:
    读取data文件夹中的order.text文件,提取北京的数据,组合北京和商品类别进行输出同时对结果集进行去重,得到北京售卖的商品类别信息

  • 代码:

    #!usr/bin/env python
    # -*- coding:utf-8 -*-from pyspark import SparkConf, SparkContext
    import jsonif __name__ == '__main__':# 1.构建SparkContext对象conf = SparkConf().setAppName('create rdd').setMaster('local[*]')sc = SparkContext(conf=conf)# 2.读取数据源文件file_rdd = sc.textFile('../data/input/order.text')# 3.rdd数据按照|符号分割,得到一个个json数据jsons_rdd = file_rdd.flatMap(lambda line: line.split('|'))# 4.通过python内置的json库,完成json字符串到字典对象的转换dict_rdd = jsons_rdd.map(lambda json_str: json.loads(json_str))# print(dict_rdd.collect())# 5.过滤数据,只保留北京的数据beijing_rdd = dict_rdd.filter(lambda dict: dict['areaName'] == '北京')# 6.组合北京和商品类型形成新的字符串category_rdd = beijing_rdd.map(lambda x: x['areaName'] + '_' + x['category'])# 7.对结果集进行去重操作result_rdd = category_rdd.distinct()print(result_rdd.collect())
    
    ['北京_平板电脑', '北京_家具', '北京_书籍', '北京_食品', '北京_服饰', '北京_手机', '北京_家电', '北京_电脑']
    
  • 将案例提交到yarn集群中运行
    在Pycharm中执行

    #!usr/bin/env python
    # -*- coding:utf-8 -*-from pyspark import SparkConf, SparkContext
    import json
    # 赋予环境变量
    import os
    os.environ['HADOOP_CONF_DIR'] = "/export/server/hadoop/etc/hadoop"if __name__ == '__main__':# 1.构建SparkContext对象# 提交到yaen集群,master设置为yarnconf = SparkConf().setAppName('test-yarn 1').setMaster('yarn')sc = SparkContext(conf=conf)# 2.读取HDFS文件file_rdd = sc.textFile('hdfs://node1:8020/test/input/order.text')# 3.rdd数据按照|符号分割,得到一个个json数据jsons_rdd = file_rdd.flatMap(lambda line: line.split('|'))# 4.通过python内置的json库,完成json字符串到字典对象的转换dict_rdd = jsons_rdd.map(lambda json_str: json.loads(json_str))# print(dict_rdd.collect())# 5.过滤数据,只保留北京的数据beijing_rdd = dict_rdd.filter(lambda dict: dict['areaName'] == '北京')# 6.组合北京和商品类型形成新的字符串category_rdd = beijing_rdd.map(lambda x: x['areaName'] + '_' + x['category'])# 7.对结果集进行去重操作result_rdd = category_rdd.distinct()print(result_rdd.collect())
    
    ['北京_书籍', '北京_食品', '北京_服饰', '北京_平板电脑', '北京_家具', '北京_手机', '北京_家电', '北京_电脑']
    

    如果在PyCharm中直接提交到yarn,依赖了其它的python文件,可以通过设置属性来指定依赖代码。

    在服务器上通过spark-submit提交到集群运行

    [root@node1 spark_test]# /export/server/spark/bin/spark-submit --master yarn /spark_test/main.py
    22/06/22 23:54:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    ['北京_平板电脑', '北京_家具', '北京_书籍', '北京_食品', '北京_服饰', '北京_电脑', '北京_手机', '北京_家电']
    

5.常用Action算子

  Action算子返回值不是RDD。其中,

  • foreach
  • saveAsTextFile

这两个算子是分区(Executor)直接执行的,跳过Driver,由分区所在的Executor直接执行。反之,其余的Action算子都会将结果发送至Driver。

5.1 Action算子——countByKey

  • 功能:
    统计key出现的次数(一般用于KV型RDD)

  • 实战:

    #!usr/bin/env python
    # -*- coding:utf-8 -*-from pyspark import SparkConf, SparkContextif __name__ == '__main__':# 1.构建SparkContext对象conf = SparkConf().setAppName('creat rdd').setMaster('local[*]')sc = SparkContext(conf=conf)# 2.读取源文件rdd1 = sc.textFile('../data/input/words.txt')rdd2 = rdd1.flatMap(lambda x: x.split(' '))rdd3 = rdd2.map(lambda x: (x, 1))# TODO:统计key出现的次数result = rdd3.countByKey()print(result)
    
    defaultdict(<class 'int'>, {'hello': 3, 'spark': 1, 'hadoop': 1, 'flink': 1})
    

5.2 Action算子——collect

  • 功能:
    将RDD各个分区内的数据,统一收集到Driver中,形成一个List形象

  • 用法:

    rdd.collect()
    

    这个算子,是将RDD各个分区数据都拉取到Driver
    注意:RDD是分布式对象,其数据量可以很大,所以用这个算子之前,要心知肚明的了解结果数据集不会太大。不然,会把Driver内存撑爆。

5.3 Action算子——reduce

  • 功能:
    对RDD数据集按照你传入的逻辑进行聚合

  • 语法:

    rdd.reduce(func)
    # func:(T,T) → T
    # 2参数传入1个返回值,返回值和参数要求类型一致
    
  • 代码:

    #!usr/bin/env python
    # -*- coding:utf-8 -*-from pyspark import SparkConf, SparkContextif __name__ == '__main__':# 1.构建SparkContext对象conf = SparkConf().setAppName('creat rdd').setMaster('local[*]')sc = SparkContext(conf=conf)# 2.本地集合转分布式RDDrdd = sc.parallelize(range(1, 10))# TODO:reduce算子是对RDD数据集按照传入的逻辑进行聚合rdd2 = rdd.reduce(lambda a, b: a + b)print(rdd2)
    

5.4 Action算子——fold

  • 功能:
    和reduce一样,接受传入逻辑进行聚合,聚合是带有初始值的。fold算子就是一个带有初始值的reduce算子。

    这个初始值聚合,会作用在

    • 分区内聚合
    • 分区间聚合
  • 实战:

    #!usr/bin/env python
    # -*- coding:utf-8 -*-from pyspark import SparkConf, SparkContextif __name__ == '__main__':# 1.构建SparkContext对象conf = SparkConf().setAppName('creat rdd').setMaster('local[*]')sc = SparkContext(conf=conf)# 2.本地集合转分布式RDDrdd = sc.parallelize(range(1, 10), 3)# TODO:fold算子接受传入逻辑进行聚合,聚合是带有初始值的。fold算子就是一个带有初始值的reduce算子。rdd2 = rdd.fold(10, lambda a, b: a + b)print(rdd2)
    
    85
    

5.5 Action算子——first

  • 功能:
    取出RDD的第一个元素

  • 实战

    #!usr/bin/env python
    # -*- coding:utf-8 -*-from pyspark import SparkConf, SparkContextif __name__ == '__main__':# 1.构建SparkContext对象conf = SparkConf().setAppName('creat rdd').setMaster('local[*]')sc = SparkContext(conf=conf)# 2.本地集合转分布式RDDrdd = sc.parallelize([3, 2, 1])# TODO:first算子是取出RDD的第一个元素rdd2 = rdd.first()print(rdd2)
    
    3
    

5.6 Action算子——take

  • 功能:
    取RDD的前N个元素,组合成list返回

  • 实战

    #!usr/bin/env python
    # -*- coding:utf-8 -*-from pyspark import SparkConf, SparkContextif __name__ == '__main__':# 1.构建SparkContext对象conf = SparkConf().setAppName('creat rdd').setMaster('local[*]')sc = SparkContext(conf=conf)# 2.本地集合转分布式RDDrdd = sc.parallelize([1, 2, 3, 4, 5, 6])# TODO:take算子是取出RDD的前N个元素rdd2 = rdd.take(5)print(rdd2)
    
    [1, 2, 3, 4, 5]
    

5.7 Action算子——top

  • 功能:
    对RDD数据集进行降序排序,取前N个

  • 实战

    #!usr/bin/env python
    # -*- coding:utf-8 -*-from pyspark import SparkConf, SparkContextif __name__ == '__main__':# 1.构建SparkContext对象conf = SparkConf().setAppName('creat rdd').setMaster('local[*]')sc = SparkContext(conf=conf)# 2.本地集合转分布式RDDrdd = sc.parallelize([5, 2, 3, 1, 5, 6])# TODO:top算子是对RDD数据集进行降序排序,取前N个rdd2 = rdd.top(3)print(rdd2)
    
    [6, 5, 5]
    

5.8 Action算子——count

  • 功能:
    计算RDD有多少条数据,返回值是一个数字

  • 实战

    #!usr/bin/env python
    # -*- coding:utf-8 -*-from pyspark import SparkConf, SparkContextif __name__ == '__main__':# 1.构建SparkContext对象conf = SparkConf().setAppName('creat rdd').setMaster('local[*]')sc = SparkContext(conf=conf)# 2.本地集合转分布式RDDrdd = sc.parallelize([5, 2, 3, 1, 5, 6])# TODO:count算子是计算RDD数据集有多少条数据,返回值是一个数字rdd2 = rdd.count()print(rdd2)
    
    6
    

5.9 Action算子——takeSample

  • 功能:
    随机抽样RDD的数据

  • 语法:

    takeSample(参数1: True or False, 参数2 :采样数,参数3: 随机数种子)
    #-参数1: True表示允许取同一个数据,False表示不允许取同一个数据。和数据内容无关,是否重复表示的是抽取同一个位置的数据
    #-参数2:抽样要几个
    #-参数3:随机数种子,这个参数传入一个数字即可,随意给
    
  • 实战:

    #!usr/bin/env python
    # -*- coding:utf-8 -*-from pyspark import SparkConf, SparkContextif __name__ == '__main__':# 1.构建SparkContext对象conf = SparkConf().setAppName('creat rdd').setMaster('local[*]')sc = SparkContext(conf=conf)# 2.本地集合转分布式RDDrdd = sc.parallelize([5, 2, 3, 1, 4, 7, 9, 6])# TODO:takeSample算子是随机抽样RDD的数据rdd2 = rdd.takeSample(True, 22)print(rdd2)rdd3 = rdd.takeSample(False, 22)print(rdd3)rdd4 = rdd.takeSample(False, 5)print(rdd4)
    
    [3, 5, 2, 6, 7, 6, 6, 4, 1, 4, 1, 4, 9, 6, 6, 4, 4, 3, 4, 7, 3, 1]
    [3, 6, 1, 5, 9, 2, 4, 7]
    [3, 7, 6, 5, 1]
    

5.10 Action算子——takeOrdered

  • 功能:
    对RDD进行排序,取前N个;与top算子相比,既可以升序排序,又可以降序排序

  • 语法:

    rdd.takeOrdered(参数1,参数2)
    #-参数1要几个数据
    #-参数2对排序的数据进行更改(不会更改数据本身,只是在排序的时候换个样子)
    #这个方法使用安装元素自然顺序升序排序,如果你想玩倒叙,需要用参数2,来对排序的数据进行处理
    
  • 实战

    #!usr/bin/env python
    # -*- coding:utf-8 -*-from pyspark import SparkConf, SparkContextif __name__ == '__main__':# 1.构建SparkContext对象conf = SparkConf().setAppName('creat rdd').setMaster('local[*]')sc = SparkContext(conf=conf)# 2.本地集合转分布式RDDrdd = sc.parallelize([5, 2, 3, 1, 4, 7, 9, 6])# TODO:takeOrdered算子是对RDD进行排序,既可以升序,也可以降序# 升序排序rdd1 = rdd.takeOrdered(5)print(rdd1)# 降序排序rdd2 = rdd.takeOrdered(5, lambda x: -x)print(rdd2)# top算子只能降序排序rdd3 = rdd.top(5)print(rdd3)
    
    [1, 2, 3, 4, 5]
    [9, 7, 6, 5, 4]
    [9, 7, 6, 5, 4]
    

5.11 Action算子——foreach

  • 功能:
    对RDD的每一个元素,执行你提供的逻辑操作(跟map类似),但是这个方法没有返回值。这个算子由分区执行,跳过Driver。

  • 语法:

    rdd.foreach(func)
    # func:(T) -> None
    # 接受一个传入参数,但是没有返回值
    
  • 实战

    #!usr/bin/env python
    # -*- coding:utf-8 -*-from pyspark import SparkConf, SparkContextif __name__ == '__main__':# 1.构建SparkContext对象conf = SparkConf().setAppName('creat rdd').setMaster('local[*]')sc = SparkContext(conf=conf)# 2.本地集合转分布式RDDrdd = sc.parallelize([1, 2, 3, 4, 5, 6])# TODO:foreach算子是对RDD的每一个元素,执行提供的逻辑操作# 注意:函数没有返回值,所以在里面直接打印# 打印操作在excutor上完成,而不是在Driver端result = rdd.foreach(lambda x: print(x * 10))# 没有返回值,所以为Noneprint(result)
    
    40
    50
    60
    10
    20
    30
    None
    

5.12 Action算子——saveAsTextFile

  • 功能:
    将RDD数据写入文本文件中,支持本地写出,hdfs等文件系统。这个算子由分区执行,跳过Driver。

  • 实战:

    #!usr/bin/env python
    # -*- coding:utf-8 -*-from pyspark import SparkConf, SparkContextif __name__ == '__main__':# 1.构建SparkContext对象conf = SparkConf().setAppName('creat rdd').setMaster('local[*]')sc = SparkContext(conf=conf)# 2.本地集合转分布式RDD# 由于saveAsTextFile算子是在分区上执行,跳过Driver。因此,需要设置分区数为1rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 1)# TODO:saveAsTextFile算子是将RDD数据写入到文本文件中rdd.saveAsTextFile('../data/output/out1')
    

6.分区操作算子

6.1 分区操作算子——mapPartitions

  • 功能:
    mapPartitions是对每个分区的RDD数据进行处理。一次被传递的是一整个分区的数据,作为一个迭代器对象传入进来,返回迭代器对象。

    在map场景下,如果是100万条数据,那么需要100万次计算与100万次IO
    在mapPartitions场景下,100万条数据被打包成迭代器对象,每个迭代器对象只有一个IO
    总结:在CPU执行层面,没有节省资源;但是,在网络IO,空间利用率上有了很大的提升

  • 语法:

    rdd.mapPartitions()
    #传入参数与返回值均是迭代器对象(一整个分区的数据)
    
  • 实战:

    #!usr/bin/env python
    # -*- coding:utf-8 -*-from pyspark import SparkConf, SparkContextif __name__ == '__main__':# 1.构建SparkContext对象conf = SparkConf().setAppName('creat rdd').setMaster('local[*]')sc = SparkContext(conf=conf)# 2.本地集合转分布式RDD# 由于saveAsTextFile算子是在分区上执行,跳过Driver。因此,需要设置分区数为1rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 3)# TODO:mapPartitions是对每个分区的RDD数据一条条进行处理def process(iter):result = list()for it in iter:result.append(it * 10)return resultprint(rdd.mapPartitions(process).collect())
    
    [10, 20, 30, 40, 50, 60]
    

    运行结果虽然与map算子一致,但是其运行性能比map算子好很多。

6.2 分区操作算子——foreachPartitions

  • 功能:
    和foreach算子一致。一次处理的是一整个分区的数据,没有返回值

    foreachPartitions就是一个没有返回值的mapPartitions

  • 实战:

    #!usr/bin/env python
    # -*- coding:utf-8 -*-from pyspark import SparkConf, SparkContextif __name__ == '__main__':# 1.构建SparkContext对象conf = SparkConf().setAppName('creat rdd').setMaster('local[*]')sc = SparkContext(conf=conf)# 2.本地集合转分布式RDD# 由于saveAsTextFile算子是在分区上执行,跳过Driver。因此,需要设置分区数为1rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 3)# TODO:foreachPartitions是对每个分区的RDD数据进行处理def process(iter):print("-------------------")result = list()for it in iter:result.append(it * 10)print(result)rdd.foreachPartition(process)
    
    -------------------
    [30, 40]
    -------------------
    [10, 20]
    -------------------
    [50, 60]
    

6.3 分区操作算子——partitionBy

  • 功能:
    对RDD进行自定义分区操作

    注意:

    • 分区号不要超标,你设置3个分区,分区号只能是0,1,2
    • 设置5个分区,分区号只能是0,1,2,3,4
  • 语法:

    rdd.partitionBy(参数1,参数2)
    #-参数1重新分区后有几个分区
    #-参数2自定义分区规则,函数传入#参数2: (K) + int
    #一个传入参数进来,类型无所谓,但是返回值一定是int类型。将key传给这个函数,你自己写逻辑,决定返回一个分区编号。分区编号从0开始,不要超出分区数-1
    
  • 实战

    #!usr/bin/env python
    # -*- coding:utf-8 -*-from pyspark import SparkConf, SparkContextif __name__ == '__main__':# 1.构建SparkContext对象conf = SparkConf().setAppName('creat rdd').setMaster('local[*]')sc = SparkContext(conf=conf)# 2.本地集合转分布式RDDrdd = sc.parallelize([('hadoop', 1), ('spark', 1), ('hello', 1), ('flink', 1), ('hadoop', 1), ('spark', 1)])# TODO:partitionBy是对RDD进行自定义分区操作# 基于key,返回不同的分区号def process(k):if 'hadoop' == k or 'hello' == k: return 0if 'spark' == k: return 1if 'flink' == k: return 2# 按照自定义的分区加好嵌套并收集print(rdd.partitionBy(3, process).glom().collect())
    
    [[('hadoop', 1), ('hello', 1), ('hadoop', 1)], [('spark', 1), ('spark', 1)], [('flink', 1)]]
    

6.4 分区操作算子——repartition & coalesce

  • 功能:
    对RDD分区执行重新分区(仅数量),repartition底层都是在调用coalesce,只是将shuffle设置为True。coalesce多了一个安全阀。

    注意:对分区的数量进行操作,一定要慎重
    一般情况下,我们写Spark代码,除了要求全局排序设置为1个分区外,多数时候,所有API中关于分区相关的代码我们都不太理会。
    因为,如果你改分区了

    • 会影响并行计算(内存迭代的并行管道数量)
    • 分区如果增加, 极大可能导致shuffle
  • 语法:

    rdd.repartition(N)
    #传入N,决定新的分区数
    
    rdd.coalesce(参数1,参数2)
    #-参数1,分区数
    #-参数2,True or False
    #True表示允许shuffle,也就是可以加分区
    #False表示不允许shuffle,也就是不能加分区,False是默认
    
  • 实现:

    #!usr/bin/env python
    # -*- coding:utf-8 -*-from pyspark import SparkConf, SparkContextif __name__ == '__main__':# 1.构建SparkContext对象conf = SparkConf().setAppName('creat rdd').setMaster('local[*]')sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 3)# TODO:repartition算子修改分区print(rdd.repartition(1).getNumPartitions())print(rdd.repartition(5).getNumPartitions())# TODO:coalesce算子修改分区# 这个算子有安全机制,如果想增加分区,必须设置shuffle=Trueprint(rdd.coalesce(1).getNumPartitions())print(rdd.coalesce(5, shuffle=True).getNumPartitions())
    
    1
    5
    1
    5
    

7. 总结

  1. RDD创建有哪几种方法?
    通过并行化集合的方式(本地集合转分布式集合)
    或者读取数据的方式创建(TextFile\WholeTextFile)
  2. RDD分区数如何查看?
    通过getNumPartitions API查看,返回值Int
  3. Transformation和Action的区别?
    转换算子的返回值100%是RDD,而Action算子的返回值100%不是RDD。
    转换算子是懒加载的,只有遇到Action才会执行。 Action就是转换算子处理链条的开关。
  4. 哪两个Action算子的结果不经过Driver,直接输出?
    foreach和saveAsTextFile直接由Executor执行后输出,不会将结果发送到Driver上去。
  5. reduceByKey和groupByKey的区别?
    reduceByKey自带聚合逻辑 ,groupByKey不带。如果做数据聚合reduceByKey的效率更好,因为可以先聚合后shuffle再最终聚合,传输的IO小。
  6. mapPartitions和foreachPartition的区别?
    mapPartitions带有返回值,foreachPartition不带。
    7.对于分区操作有什么要注意的地方?
    尽量不要增加分区,可能破坏内存迭代的计算管道。

PySpark | RDD相关推荐

  1. pyspark rdd 基本操作

    pyspark rdd 基本操作 原文链接 #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on F ...

  2. pyspark rdd 数据持久化

    pyspark rdd 数据持久化 from pyspark import SparkContext ,SparkConfconf=SparkConf().setAppName("miniP ...

  3. PySpark | RDD持久化 | 共享变量 | Spark内核调度

    文章目录 一.RDD持久化 1.RDD的数据是过程数据 2.RDD缓存 2.1 RDD缓存的特点 2.2 cache()与unpersist()实战 3.RDD CheckPoint 3.1 Chec ...

  4. pyspark RDD详细教程

    Spark的核心是RDD(Resilient Distributed Dataset)即弹性分布式数据集,属于一种分布式的内存系统的数据集应用,这些元素在多个节点上运行和操作,以便在集群上进行并行处理 ...

  5. PySpark RDD操作

    前提条件: 1.拥有Ubuntu16.04环境 2.Ubuntu下安装好Spark和PySpark 题目一:RDD创建 首先进入pyspark命令行 $ pyspark (1)从文件中加载 从本地文件 ...

  6. PySpark RDD 之 filter

    1. pyspark 版本 2.3.0版本 2. 官网 filter(f)[source] Return a new RDD containing only the elements that sat ...

  7. PySpark RDD 之 reduce

    1. pyspark 版本 2.3.0版本 2. 官网     reduce(f)[source] Reduces the elements of this RDD using the specifi ...

  8. [Spark]PySpark入门学习教程---RDD介绍(2)

    一 RDD pyspark.RDD        SparkRDD RDD指的是弹性分布式数据集(Resilient Distributed Dataset),它是spark计算的核心.尽管现在都使用 ...

  9. 弹性式分布数据集RDD——Pyspark基础 (二)

    连载中:http://ihoge.cn/tags/pyspark/ title: 弹性式分布数据集RDD--Pyspark基础 (二) date: 2018-04-15 17:59:21 commen ...

最新文章

  1. 【C#】集合_哈希表_字典_泛型_文件
  2. rocksdb学习笔记
  3. shell安装mysql5.7_一键部署----shell脚本安装MySQL5.7
  4. java 遍历属性文件路径_Java项目中读取properties文件,以及六种获取路径的方法...
  5. 激光slam_激光SLAM与视觉SLAM的特点
  6. php的注入点,Php注入点构造代码
  7. 添加class值_Java 虚拟机(二) - Class 文件结构
  8. linux自动截屏脚本,Android实现自动截图脚本
  9. 【机器学习_2】机器学习资料
  10. .Net的文件格式(zz)
  11. sqlite:WAL模式
  12. 联想笔记本关闭锁定计算机,联想笔记本电脑键盘锁了怎么解开
  13. D3D处理2D图像: NV12格式及其转换(1)
  14. 普元EOS之性能调优
  15. python开根号_python 开根号
  16. MedianFlow代码 程序示例
  17. 【运维面试】面试官: 你每天在公司都做啥?
  18. 英语语言用计算机研究什么软件,高中英语新课程标准词汇习得研究—多媒体计算机辅助语言软件在教学中的应用.doc...
  19. win7系统开启snmp服务器配置,Win7系统怎么开启snmp服务【图文】
  20. 还在为制作大屏苦恼?这个Smartbi神器才是你的出路

热门文章

  1. 国产男装「升级潮」下,九牧王、劲霸、海澜之家们顺利「上分」了吗?
  2. CentOS7安装k8s服务--Master节点和Node节点
  3. python dataframe 写入到doc文件_将Python Pandas DataFrame写入Word文档
  4. java 中有几种方法可以实现一个线程?用什么关键字修饰同步方法 ? stop() 和 suspend() 方 法为何不推荐使用?
  5. VB.net是个弥天大谎,VB.net已死(海康威视 SDK 开发有感)
  6. C# 关键字 使用where来限定泛型约束
  7. Pyhon中利用GM(1,1)和ARIMA模型对卫星DCB值进行预测
  8. Android实现百度网盘一些功能
  9. 中国大数据分析行业研究报告
  10. 关于我国计算机软件著作权保护的调研报告,我国计算机软件著作权保护问题研究...