RDD对象的基本操作

from pyspark.sql import SparkSessionspark = SparkSession.builder.appName('learn_RDD').getOrCreate()
# Create RDD of list.List = [2.3,3.4,4.3,2.4,2.3,4.0]rdd = spark.sparkContext.parallelize(List, 2)  # numSlices--the number of partitions of the new RDDrdd.collect()
[2.3, 3.4, 4.3, 2.4, 2.3, 4.0]
# 获取第一个元素,前两个rdd.first(), rdd.take(2)
(2.3, [2.3, 3.4])
# 分区数rdd.getNumPartitions()
2

对RDD对象中元素的处理

temp = [59,57.2,53.6,55.4,51.8,53.6,55.4]
rdd = spark.sparkContext.parallelize(temp,2)
rdd.collect()
[59, 57.2, 53.6, 55.4, 51.8, 53.6, 55.4]
# 转换
def func(x) :y = x*x/(x*2)return ynew_rdd = rdd.map(func)
new_rdd.collect()
[29.5, 28.6, 26.8, 27.7, 25.9, 26.8, 27.7]
new_rdd.filter(lambda x: x>=27).collect()
[29.5, 28.6, 27.7, 27.7]

基本的数据处理

# 建表
studentData = [["id_1","year1",62.08,62.4],["id_1","year2",75.94,76.75],["id_2","year1",68.26,72.95],["id_2","year2",85.49,75.8],["id_3","year1",75.08,79.84],["id_3","year2",54.98,87.72],["id_4","year1",50.03,66.85],["id_4","year2",71.26,69.77],["id_5","year1",52.74,76.27],["id_5","year2",50.39,68.58],["id_6","year1",74.86,60.8],["id_6","year2",58.29,62.38],["id_7","year1",63.95,74.51],["id_7","year2",66.69,56.92]]studentRDD = spark.sparkContext.parallelize(studentData,4)studentRDD.take(2)
[['id_1', 'year1', 62.08, 62.4], ['id_1', 'year2', 75.94, 76.75]]
# 计算特定列的均值
temp_mean = studentRDD.map(lambda x: [x[0], x[1], (x[2]+x[3])/2])temp_mean.take(2)
[['id_1', 'year1', 62.239999999999995], ['id_1', 'year2', 76.345]]
# 过滤 year2temp_mean.filter(lambda x: 'year2' in x).take(2)
[['id_1', 'year2', 76.345], ['id_2', 'year2', 80.645]]
# 第二年平均成绩最高的,top 3temp_year2 = temp_mean.filter(lambda x: 'year2' in x)temp_year2.sortBy(keyfunc = lambda x: -x[2]).take(3)
[['id_2', 'year2', 80.645],['id_1', 'year2', 76.345],['id_3', 'year2', 71.35]]
temp_year2.takeOrdered(num=3, key=lambda x: -x[2])
[['id_2', 'year2', 80.645],['id_1', 'year2', 76.345],['id_3', 'year2', 71.35]]

RDD上的集合操作

list_1 = ['RIN1', 'RIN2', 'RIN3', 'RIN4', 'RIN5', 'RIN6', 'RIN7']
list_2 = ['RIN3', 'RIN4', 'RIN7', 'RIN8', 'RIN9']
list_3 = ['RIN4', 'RIN8', 'RIN10', 'RIN11', 'RIN12']# parallelize
rdd_1 = spark.sparkContext.parallelize(list_1,2)
rdd_2 = spark.sparkContext.parallelize(list_2,2)
rdd_3 = spark.sparkContext.parallelize(list_3,2)# unionunion_12 = rdd_1.union(rdd_2)
union_12.collect()
['RIN1','RIN2','RIN3','RIN4','RIN5','RIN6','RIN7','RIN3','RIN4','RIN7','RIN8','RIN9']
union_12.union(rdd_3).collect()
['RIN1','RIN2','RIN3','RIN4','RIN5','RIN6','RIN7','RIN3','RIN4','RIN7','RIN8','RIN9','RIN4','RIN8','RIN10','RIN11','RIN12']
# 去重union_123 = union_12.union(rdd_3)union_123.distinct().collect()
['RIN1','RIN10','RIN12','RIN2','RIN3','RIN5','RIN8','RIN4','RIN9','RIN11','RIN6','RIN7']
# rdd_1 - rdd_2
rdd_1.subtract(rdd_2).collect()
['RIN1', 'RIN2', 'RIN5', 'RIN6']
# rdd_1 与 rdd_2的交集rdd_1.intersection(rdd_2).collect()
['RIN3', 'RIN4', 'RIN7']

计算一些统计值

temp = [12,13,15,12,11,12,11]
rdd = spark.sparkContext.parallelize(temp,2)# countrdd.count()
7
# sumrdd.sum()
86
# meanrdd.mean()
12.285714285714286
# var  /Nrdd.variance()
1.63265306122449
# Sample Variance  /(N-1)rdd.sampleVariance()
1.904761904761905
# stdrdd.stdev()
1.2777531299998799
# Sample Standard Deviation  /(N-1)rdd.sampleStdev()
1.3801311186847085
print(rdd.stats())
print(rdd.stats().asDict())
print(rdd.stats().mean())
print(rdd.stats().stdev())
print(rdd.stats().count())
print(rdd.stats().min())
print(rdd.stats().max())
(count: 7, mean: 12.285714285714286, stdev: 1.2777531299998799, max: 15.0, min: 11.0)
{'count': 7, 'mean': 12.285714285714286, 'sum': 86.0, 'min': 11.0, 'max': 15.0, 'stdev': 1.3801311186847085, 'variance': 1.904761904761905}
12.285714285714286
1.2777531299998799
7
11.0
15.0

PySpark-Recipes : RDD对象的基本操作相关推荐

  1. PySpark之RDD基本操作

    PySpark之RDD基本操作 Spark是基于内存的计算引擎,它的计算速度非常快.但是仅仅只涉及到数据的计算,并没有涉及到数据的存储,但是,spark的缺点是:吃内存,不太稳定 总体而言,Spark ...

  2. pyspark:RDD和DataFrame

    作为数据挖掘工程师,以后必不可免要用到并行计算,pyspark是python操作spark的API,本人因此入了坑. 1 pyspark的安装 见我另一篇博客:https://blog.csdn.ne ...

  3. django python3会员中心_python3开发进阶-Django框架的自带认证功能auth模块和User对象的基本操作...

    阅读目录 一.auth模块 from django.contrib import auth django.contrib.auth中提供了许多方法,这里主要介绍其中的三个: authenticate( ...

  4. pyspark中RDD基本操作

    写在前面 系统为ubuntu, spark为pyspark 一. 简单配置和读取txt,并打印 这里我们定义一个任务: 从txt中读取文件,并打印文件的每一行 from pyspark import ...

  5. pyspark操作 rdd dataframe,pyspark.sql.functions详解 行列变换

    官网文档可以参考:https://spark.apache.org/docs/latest/api/python/index.html dataframe读写 生成以逗号分隔的数据 stringCSV ...

  6. pyspark的rdd直接写入mysql

    Google搜索"RDD write into  mysql"前面5页得到:[5][6][7][8][9][10] 我们一个个来分析 [1][2][3]读出的是RDD,写入的是fo ...

  7. dataframe 转rdd java,在pyspark中将RDD转换为Dataframe

    我想在pyspark中将我的RDD转换为Dataframe . 我的RDD: [(['abc', '1,2'], 0), (['def', '4,6,7'], 1)] 我希望RDD以Dataframe ...

  8. python安装成功第三方库但import出问题_为什么会在pyspark在RDD中调用python第三方库失败?...

    问题描述 Hi, 我在公司线上运行pyspark时调用jieba分词, 发现可以成功import, 但是在RDD中调用分词函数时却提示没有 module jieba, 在本地虚拟机时没有这些问题 问题 ...

  9. 服务器json文件怎么创建对象,JavaScript中对JSON对象的基本操作示例

    JSON对象 1.对象的属性:对象的属性是有键值对组成的,其中key为一个字符串,value可以为任何的Javascript对象. //使用[]设置和获取对象的属性 var obj = new Obj ...

  10. javaScript中内置对象Date基本操作入门

    Date基础语法 /*** Date** 1970年1月1日(UTC)起经过的毫秒数.** 语法* new Date();* new Date(value);* new Date(dateString ...

最新文章

  1. 喜讯不断,BCH又迎来两个代币发行方案
  2. .Net Compact Framework 小技巧(1)
  3. libevent 1.4.13 / 源代码文件组织
  4. 《极乐迪斯科》:一趟充满新奇的旅程 ,一款里程碑式的 CRPG
  5. 小白开学Asp.Net Core 《六》 —— 探究.Net Core 跨平台的奥秘
  6. python_fullstack基础(十一)-常用模块
  7. 递归-计算字符串长度(代码、分析、汇编)
  8. html select onchange 失效
  9. inode对接微软服务器,14 普通用户接入-iNode客户端升级
  10. android 视频播放器框架 [饺子播放器](https://github.com/Jzvd/JiaoZiVideoPlayer) 源码解析与评估
  11. 记录几种敏捷开发应用的工具
  12. 小米手机 开启 开发者模式
  13. 批处理创建桌面快捷方式
  14. 教你如何使用Excel中的INT函数
  15. springcloud配置中心config上线时配置文件被其他服务无法读取的两种情况
  16. 【diannaoxitong】word安全模式怎么解除?word安全模式快速解除方法
  17. 连连跨境支付独立站收款,最高90天提现0费率!
  18. 第一行代码Android个人笔记(五)——文件存储
  19. 【装机吧】电脑上网慢的解决方法
  20. 算法教学 _ 决策树算法

热门文章

  1. 用简单 JS 实现代替 MYBATIS LOG PLUGIN 的功能
  2. 微信浪漫告白小程序java_厉害了,微信小程序可以这样表白,还怕他(她)拒绝你?...
  3. vue 获取安卓原生方法_H5-vue与原生Android、ios交互获取相册图片
  4. centos7下yum安装mysql_CentOS7下使用YUM安装MySQL5.6-Go语言中文社区
  5. 10个技巧帮你搞定IE 6
  6. ASP.NET页面生命周期和asp.net应用程序生命周期
  7. Aaron Swartz Rewriting Reddit中关于web.py的创建思路
  8. Jquery cookies 记忆菜单
  9. 2022 基于SpringBoot的高校食堂点评系统 大众点评系统
  10. springboot + mybatis 学英语网、背单词网站