写在前面

系统为ubuntu, spark为pyspark

一. 简单配置和读取txt,并打印

这里我们定义一个任务:

从txt中读取文件,并打印文件的每一行

from pyspark import SparkConf, SparkContext
import os
# 这里配置spark对用的python版本,如果版本不一致就会报错
os.environ["PYSPARK_PYTHON"] = "/home/wgq/anaconda3/envs/spark/bin/python3.7"# 配置单机模式
conf = SparkConf().setMaster('local').setAppName('my_app')
sc = SparkContext(conf=conf)# 文件路径,这个文件里的内容如下
"""
a b c
a c
d a
"""
path = 'file:///home/wgq/learn_spark/test.txt'# 读取文件
data = sc.textFile(path, 2)# 如果是从Python列表里读取
"""
arr = ['a b c', 'a c', 'd a']
# 读取文件
data = sc.parallelize(arr)
"""# 打印每一行
data.foreach(print)"""
输出:
a b c
a c
d a
"""

二. filter操作

定义任务:

从txt中读取文件,并且过滤掉没有c字母的行

from pyspark import SparkConf, SparkContext
import os
# 这里配置spark对用的python版本,如果版本不一致就会报错
os.environ["PYSPARK_PYTHON"] = "/home/wgq/anaconda3/envs/spark/bin/python3.7"# 配置单机模式
conf = SparkConf().setMaster('local').setAppName('my_app')
sc = SparkContext(conf=conf)# 文件路径,这个文件里的内容如下
"""
a b c
a c
d a
"""
path = 'file:///home/wgq/learn_spark/test.txt'# 读取文件
data = sc.textFile(path, 2)
data = data.filter(lambda line: 'c' in line)
# 打印每一行
data.foreach(print)"""
输出:
a b c
a c
"""

三. map操作

定义任务:

将[1, 2, 3, 4, 5]里的数据,奇数取反,偶数+1

from pyspark import SparkConf, SparkContext
import os
# 这里配置spark对用的python版本,如果版本不一致就会报错
os.environ["PYSPARK_PYTHON"] = "/home/wgq/anaconda3/envs/spark/bin/python3.7"# 配置单机模式
conf = SparkConf().setMaster('local').setAppName('my_app')
sc = SparkContext(conf=conf)arr = [1, 2, 3, 4, 5]
data = sc.parallelize(arr)
def change(x):if x % 2 == 1:return -xelse:return x + 1
# 打印每一行
data = data.map(change)
data.foreach(print)
"""
输出:
-1
3
-3
5
-5
"""

四. flatMap操作

将map后的所有元素打平

定义任务:

输出文件中的所有字母

from pyspark import SparkConf, SparkContext
import os
# 这里配置spark对用的python版本,如果版本不一致就会报错
os.environ["PYSPARK_PYTHON"] = "/home/wgq/anaconda3/envs/spark/bin/python3.7"# 配置单机模式
conf = SparkConf().setMaster('local').setAppName('my_app')
sc = SparkContext(conf=conf)arr = ['a b c', 'b c', 'd a']
data = sc.parallelize(arr)# 打印每一行
data = data.flatMap(lambda x: x.split(' '))
data.foreach(print)
"""
输出:
a
b
c
b
c
d
a
"""

五. groupByKey操作

这个操作会对key进行groupby,然后将value存在一个resultIterable里

from pyspark import SparkConf, SparkContext
import os
os.environ["PYSPARK_PYTHON"] = "/home/wgq/anaconda3/envs/spark/bin/python3.7"conf = SparkConf().setMaster('local').setAppName('my_app')
sc = SparkContext(conf=conf)arr = [('a', 1), ('b', 1), ('c', 1),('b', 1), ('c', 1), ('d', 1), ('a', 1)
]
data = sc.parallelize(arr)# 打印每一行
data = data.groupByKey()
data.foreach(print)
"""
输出:
('a', <pyspark.resultiterable.ResultIterable object at 0x7f3c1764f7d0>)
('b', <pyspark.resultiterable.ResultIterable object at 0x7f3c1764f790>)
('c', <pyspark.resultiterable.ResultIterable object at 0x7f3c1764f7d0>)
('d', <pyspark.resultiterable.ResultIterable object at 0x7f3c1764f790>)
"""

六. reduceByKey操作

按照key对value执行reduce

from pyspark import SparkConf, SparkContext
import os
os.environ["PYSPARK_PYTHON"] = "/home/wgq/anaconda3/envs/spark/bin/python3.7"conf = SparkConf().setMaster('local').setAppName('my_app')
sc = SparkContext(conf=conf)arr = [('a', 1), ('b', 1), ('c', 1),('b', 1), ('c', 1), ('d', 1), ('a', 1)
]
data = sc.parallelize(arr)# 打印每一行
data = data.reduceByKey(lambda a, b: a + b)
data.foreach(print)
"""
输出:
('a', 2)
('b', 2)
('c', 2)
('d', 1)
"""

七. 词频统计例子

from pyspark import SparkConf, SparkContext
import os
os.environ["PYSPARK_PYTHON"] = "/home/wgq/anaconda3/envs/spark/bin/python3.7"conf = SparkConf().setMaster('local').setAppName('my_app')
sc = SparkContext(conf=conf)path = 'file:///home/wgq/learn_spark/test.txt'# 读取文件
data = sc.textFile(path)data = data.flatMap(lambda x: x.split(' ')). \map(lambda x: (x, 1)). \reduceByKey(lambda a, b: a + b). \foreach(print)
"""
输出:
('a', 3)
('b', 1)
('c', 2)
('d', 1)
"""

八. 计算平均值例子

定义任务:

每个字母代表一本书,每个键值对表示某一天某本书被借用几次,计算每本书平均每天被借用多少次

from pyspark import SparkConf, SparkContext
import os
os.environ["PYSPARK_PYTHON"] = "/home/wgq/anaconda3/envs/spark/bin/python3.7"conf = SparkConf().setMaster('local').setAppName('my_app')
sc = SparkContext(conf=conf)arr = [('a', 2), ('a', 3), ('a', 7),('b', 1), ('b', 2),('c', 1), ('c', 2), ('c', 3)
]data = sc.parallelize(arr)data = data.map(lambda x: (x[0], (x[1], 1))). \reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1])). \mapValues(lambda x: x[0] / x[1]). \foreach(print)
"""
输出:
('a', 4.0)
('b', 1.5)
('c', 2.0)
"""

九. 二次排序

定义任务:

一个元素包含两个数字,按第一个升序,如果第一个相同,第二个降序排列

from pyspark import SparkConf, SparkContext
import os
os.environ["PYSPARK_PYTHON"] = "/home/wgq/anaconda3/envs/spark/bin/python3.7"conf = SparkConf().setMaster('local').setAppName('my_app')
sc = SparkContext(conf=conf)arr = [(1, 2), (1, 1), (1, 7),(3, 3), (3, 1),(2, 1), (2, 2)
]data = sc.parallelize(arr)data = data.sortBy(lambda x: (x[0], -x[1])). \foreach(print)
"""
输入:
(1, 7)
(1, 2)
(1, 1)
(2, 2)
(2, 1)
(3, 3)
(3, 1)
"""

十. 文件排序

任务定义

有多个输入文件,文件中每一行为一个数字,要求读取所有文件,排序,第一个数字为rank值,第二个为整数

注意,读取多个文件以后,需要repartition在一起,否则是每个文件分开计算的

from pyspark import SparkConf, SparkContext
import os
os.environ["PYSPARK_PYTHON"] = "/home/wgq/anaconda3/envs/spark/bin/python3.7"conf = SparkConf().setMaster('local').setAppName('my_app')
sc = SparkContext(conf=conf)path = 'file:///home/wgq/learn_spark/nums*'
data = sc.textFile(path)# arr = [
#     (1, 2), (1, 1), (1, 7),
#     (3, 3), (3, 1),
#     (2, 1), (2, 2)
# ]
idx = -1
def getIndex():global idxidx += 1return idxdata = data.map(lambda x: int(x.strip())). \repartition(1). \sortBy(lambda x: x). \map(lambda x: (getIndex(), x)). \foreach(print)
"""
输出:
(0, 1)
(1, 2)
(2, 4)
(3, 5)
(4, 7)
(5, 9)
(6, 10)
"""

pyspark中RDD基本操作相关推荐

  1. PySpark之RDD基本操作

    PySpark之RDD基本操作 Spark是基于内存的计算引擎,它的计算速度非常快.但是仅仅只涉及到数据的计算,并没有涉及到数据的存储,但是,spark的缺点是:吃内存,不太稳定 总体而言,Spark ...

  2. pyspark rdd 基本操作

    pyspark rdd 基本操作 原文链接 #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on F ...

  3. [大数据]PySpark原理与基本操作

    一 PySpark Spark运行时架构 首先我们先回顾下Spark的基本运行时架构,如下图所示,其中橙色部分表示为JVM,Spark应用程序运行时主要分为Driver和Executor,Driver ...

  4. Spark RDD/Core 编程 API入门系列 之rdd实战(rdd基本操作实战及transformation和action流程图)(源码)(三)...

    本博文的主要内容是: 1.rdd基本操作实战 2.transformation和action流程图 3.典型的transformation和action RDD有3种操作: 1.  Trandform ...

  5. pyspark:RDD和DataFrame

    作为数据挖掘工程师,以后必不可免要用到并行计算,pyspark是python操作spark的API,本人因此入了坑. 1 pyspark的安装 见我另一篇博客:https://blog.csdn.ne ...

  6. Nilearn中的基本操作和查看

    目录 Nilearn简介 Nilearn操作 第一步:查看数据 第二步:平滑操作 第三步:保存结果到文件中 本分享为脑机学习者Rose整理发表于公众号:脑机接口社区 .QQ交流群:903290195 ...

  7. Spark中 RDD之coalesce与repartition区别

    Spark中 RDD之coalesce与repartition区别 coalesce def coalesce(numPartitions: Int, shuffle: Boolean = false ...

  8. pyspark 条件_删除pyspark中特定条件下的特定行

    我是火花的新手 . 我想删除一行使用spark sql.due来删除temptable中的不兼容性到目前为止我已经读过,操作删除像sql查询我需要永久保存pyspark中的表,这是hive表我猜 . ...

  9. ORCAD 原理图中的基本操作

    ORCAD 原理图中的基本操作 ================================================================ ORCAD支持单快捷键操作,这是太方便 ...

最新文章

  1. Mybatis系列:解决foreach标签内list为空的问题
  2. MySQL局域网内访问慢的原因及解决方法
  3. Unity3D面试问题
  4. matlab神经网络(二)-bp神经网络,MATLAB神经网络(2) BP神经网络的非线性系统建模——非线性函数拟合...
  5. pyzabbix 删除触发器_zabbix设置邮件报警, Zabbix常用Key值, zabbix触发器表达式详解, zabbix制作自己模板...
  6. mybatis基础,mybatis核心配置文件properties元素
  7. 打乱一个排好序的 list 对象 alist?
  8. eslint自动检测_GitHub - Noob-Lab/fis3-lint-noob-eslint: ~基于fis3的eslint检测插件
  9. 国内有哪些做得好的企业协同办公软件?这6款你知道吗?
  10. 关于华为设备远程登录telnet和ssh的配置
  11. 数据分析,如何支持管理层决策
  12. Verilog HDL 总结(1)
  13. 价格行为交易策略:锤子十字线,Fakey,内部日烛线
  14. 应聘时要问HR的7个问题
  15. 【id:180】【20分】D. DS二叉树--赫夫曼树解码(不含代码框架)
  16. 基于注入式木马病毒(浏览器绑架)实现及防御方法的研究
  17. 弥勒“甘丝蜜朵”旅游地产策划规划方案
  18. MIPI解决方案 ICN6202:MIPI DSI转LVDS转换芯片
  19. 每个python对象都具有布尔值、下列是true的是_智慧职教云课堂2020Linux云服务管理期末考试搜题公众号答案...
  20. 【Sass】437- 一文详解Sass新特性 - 模块

热门文章

  1. 关于处理公司内网办公系统登录不了的问题
  2. MATLAB中关于patch的用法(涉及vertice,faces等的基础的介绍)
  3. 小游戏之天选之人:通过幸运数字生成双色球号码(随机函数的应用)
  4. c语言中a lt b a b是什么意思,C语言中c=ab?a:b是什么意思
  5. AutoCAD.NET API2018二次开发第十一章
  6. 红外光波长对血氧饱和度的影响
  7. 特斯拉充电电流设置多大_【干货】特斯拉电动汽车4种充电方式详解!
  8. RPA 的优点与缺点
  9. 基于单目视觉的同时定位与地图构建方法综述
  10. html左侧下拉输入选项菜单栏,怎么实现html可输入下拉菜单