1. Spark的编程流程就是:

将数据加载为RDD(数据输入)

对RDD进行计算(数据计算)

将RDD转换为Python对象(数据输出)

2. 数据输出的方法

将RDD的结果输出为Python对象的各类方法

collect:将RDD内容转换为list

reduce:对RDD内容进行自定义聚合

take:取出RDD的前N个元素组成list返回

count:统计RDD元素个数返回

collect算子:

将RDD各个分区内的数据,统一收集到Drive中,形成一个list对象

reduce算子:

对RDD数据集按照传入的逻辑进行聚合,返回值等同于计算函数的返回

from pyspark import SparkConf, SparkContext
import os
import json
os.environ['PYSPARK_PYTHON'] = 'D:/dev/python/python310/python.exe'
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)# 准备RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])# collect算子,输出RDD为list对象
rdd_list: list = rdd.collect()
print(rdd_list)
print(type(rdd_list))
# reduce算子,对RDD进行两两聚合
num = rdd.reduce(lambda a, b: a + b)
print(num)
# take算子,取出RDD前N个元素,组成list返回
take_list = rdd.take(3)
print(take_list)
# count,统计rdd内有多少条数据,返回值为数字
num_count = rdd.count()
print(f"rdd内有{num_count}个元素")sc.stop()

将RDD的内容输出到文件中:

rdd.saveAsTextFile(路径),输出的结果是一个文件夹,有几个分区就输出多少个结果文件

修改RDD分区:

①SparkConf对象设置conf.set("spark.default.parallelism", "1")

②创建RDD的时候,sc.parallelize方法传入numSlices参数为1

from pyspark import SparkConf, SparkContext
import os
import json
os.environ['PYSPARK_PYTHON'] = 'D:/dev/python/python310/python.exe'
os.environ['HADOOP_HOME'] = "D:/dev/hadoop-3.0.0"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")sc = SparkContext(conf=conf)# 准备RDD1
rdd1 = sc.parallelize([1, 2, 3, 4, 5], numSlices=1)# 准备RDD2
rdd2 = sc.parallelize([("Hello", 3), ("Spark", 5), ("Hi", 7)], 1)# 准备RDD3
rdd3 = sc.parallelize([[1, 3, 5], [6, 7, 9], [11, 13, 11]], 1)# 输出到文件中
rdd1.saveAsTextFile("D:/output1")
rdd2.saveAsTextFile("D:/output2")
rdd3.saveAsTextFile("D:/output3")

注意:

调用保存文件的算子,需要配置Hadoop依赖

下载Hadoop安装包:

http://archive.apache.org/dist/hadoop/common/hadoop-3.0.0/hadoop-3.0.0.tar.gz

解压到电脑任意位置

在Python代码中使用os模块配置:os.environ[‘HADOOP_HOME’] = ‘HADOOP解压文件夹路径’

下载winutils.exe,并放入Hadoop解压文件夹的bin目录内:https://raw.githubusercontent.com/steveloughran/winutils/master/hadoop-3.0.0/bin/winutils.exe

下载hadoop.dll,并放入:C:/Windows/System32 文件夹内:https://raw.githubusercontent.com/steveloughran/winutils/master/hadoop-3.0.0/bin/hadoop.dll 

3. pyspark综合案例

from pyspark import SparkConf, SparkContext
import os
import json
os.environ['PYSPARK_PYTHON'] = 'D:/dev/python/python310/python.exe'
os.environ['HADOOP_HOME'] = "D:/dev/hadoop-3.0.0"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
conf.set("spark.default.parallelism", "1")
sc = SparkContext(conf=conf)# 读取文件转换成RDD
file_rdd = sc.textFile("D:/search_log.txt")
# TODO 需求1: 热门搜索时间段Top3(小时精度)
# 1.1 取出全部的时间并转换为小时
# 1.2 转换为(小时, 1) 的二元元组
# 1.3 Key分组聚合Value
# 1.4 排序(降序)
# 1.5 取前3
result1 = file_rdd.map(lambda x: (x.split("\t")[0][:2], 1)).\reduceByKey(lambda a, b: a + b).\sortBy(lambda x: x[1], ascending=False, numPartitions=1).\take(3)
print("需求1的结果:", result1)# TODO 需求2: 热门搜索词Top3
# 2.1 取出全部的搜索词
# 2.2 (词, 1) 二元元组
# 2.3 分组聚合
# 2.4 排序
# 2.5 Top3
result2 = file_rdd.map(lambda x: (x.split("\t")[2], 1)).\reduceByKey(lambda a, b: a + b).\sortBy(lambda x: x[1], ascending=False, numPartitions=1).\take(3)
print("需求2的结果:", result2)# TODO 需求3: 统计老婆关键字在什么时段被搜索的最多
# 3.1 过滤内容,只保留关键词
# 3.2 转换为(小时, 1) 的二元元组
# 3.3 Key分组聚合Value
# 3.4 排序(降序)
# 3.5 取前1
result3 = file_rdd.map(lambda x: x.split("\t")).\filter(lambda x: x[2] == '老婆').\map(lambda x: (x[0][:2], 1)).\reduceByKey(lambda a, b: a + b).\sortBy(lambda x: x[1], ascending=False, numPartitions=1).\take(1)
print("需求3的结果:", result3)# TODO 需求4: 将数据转换为JSON格式,写出到文件中
# 4.1 转换为JSON格式的RDD
# 4.2 写出为文件
file_rdd.map(lambda x: x.split("\t")).\map(lambda x: {"time": x[0], "user_id": x[1], "key_word": x[2], "rank1": x[3], "rank2": x[4], "url": x[5]}).\saveAsTextFile("D:/output_json")

4. 将案例提交到YARN集群中运行

提交命令:

bin/spark-submit --master yarn --num-executors 3 --queue root.teach --executor-cores 4 --executor-memory 4g /home/hadoop/demo.py

from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = '/export/server/anaconda3/bin/python'
os.environ['HADOOP_HOME'] = "/export/server/hadoop-3.3.1"
conf = SparkConf().setAppName("spark_cluster")
conf.set("spark.default.parallelism", "24")
sc = SparkContext(conf=conf)# 读取文件转换成RDD
file_rdd = sc.textFile("hdfs://m1:8020/data/search_log.txt")
# TODO 需求1: 热门搜索时间段Top3(小时精度)
# 1.1 取出全部的时间并转换为小时
# 1.2 转换为(小时, 1) 的二元元组
# 1.3 Key分组聚合Value
# 1.4 排序(降序)
# 1.5 取前3
result1 = file_rdd.map(lambda x: (x.split("\t")[0][:2], 1)).\reduceByKey(lambda a, b: a + b).\sortBy(lambda x: x[1], ascending=False, numPartitions=1).\take(3)
print("需求1的结果:", result1)# TODO 需求2: 热门搜索词Top3
# 2.1 取出全部的搜索词
# 2.2 (词, 1) 二元元组
# 2.3 分组聚合
# 2.4 排序
# 2.5 Top3
result2 = file_rdd.map(lambda x: (x.split("\t")[2], 1)).\reduceByKey(lambda a, b: a + b).\sortBy(lambda x: x[1], ascending=False, numPartitions=1).\take(3)
print("需求2的结果:", result2)# TODO 需求3: 统计老婆关键字在什么时段被搜索的最多
# 3.1 过滤内容,只保留关键词
# 3.2 转换为(小时, 1) 的二元元组
# 3.3 Key分组聚合Value
# 3.4 排序(降序)
# 3.5 取前1
result3 = file_rdd.map(lambda x: x.split("\t")).\filter(lambda x: x[2] == '老婆').\map(lambda x: (x[0][:2], 1)).\reduceByKey(lambda a, b: a + b).\sortBy(lambda x: x[1], ascending=False, numPartitions=1).\take(1)
print("需求3的结果:", result3)# TODO 需求4: 将数据转换为JSON格式,写出到文件中
# 4.1 转换为JSON格式的RDD
# 4.2 写出为文件
file_rdd.map(lambda x: x.split("\t")).\map(lambda x: {"time": x[0], "user_id": x[1], "key_word": x[2], "rank1": x[3], "rank2": x[4], "url": x[5]}).\saveAsTextFile("hdfs://m1:8020/output/output_json")

(日常美图时间)

Python---pyspark中的数据输出(collect,reduce,take,count,saveAsTextFile),了解PySpark代码在大数据集群上运行相关推荐

  1. 大数据集群机房搬迁数据迁移

    [一.背景] 按照中心总体计划,目前部署在生产区的运营大数据集群需要搬迁至万国机房. 本次采用的搬迁的方案是通过万国机房的72台物理主机上新建运营大数据集群,老集群应用数据同步至新集群的方式,之后应用 ...

  2. pythonspark集群模式运行_有关python numpy pandas scipy 等 能在YARN集群上 运行PySpark

    有关这个问题,似乎这个在某些时候,用python写好,且spark没有响应的算法支持, 能否能在YARN集群上 运行PySpark方式, 将python分析程序提交上去? Spark Applicat ...

  3. matlab将数据输出到excel中,matlab将数据保存为excel表格-怎样将MATLAB中的数据输出到excel中...

    怎样将MATLAB中的数据输出到excel中? xlswrite()函数可以将matlab中数据保存到excel中,请面例子: >> data = magic(5) % 示例数据 data ...

  4. matlab将数据输出到excel中,matlab数据输出为excel表格-如何把matlab中的数据导到excel表格中...

    如何将matlab工作空间的数据导出到excel 1.很简单的用xlswrite函数就可以了.首先打开matlab,输入你的代码 2.找到你要存放文件的位置复制绝对路径(致谢文件名的话就会存放在当前目 ...

  5. Python数据分析中的数据预处理:数据标准化

    [小白从小学Python.C.Java] [Python全国计算机等级考试] [Python数据分析考试必会题] ● 标题与摘要 Python数据分析中的 数据预处理:数据标准化 ● 选择题 以下关于 ...

  6. 福州传一卓越编程培训第二天2023 05 23 数据库sqlite增删改查,excel中的数据输出为sql语句

    数据库基本概念 ​ 数据库即 存储数据的系统(DBS database sysytem) ​ 数据库系统 ​   数据库的管理系统软件 DBMS manager ​   数据文件 db文件 ​ 存储数 ...

  7. python数据预测代码_手把手教你用Python玩转时序数据,从采样、预测到聚类丨代码...

    原标题:手把手教你用Python玩转时序数据,从采样.预测到聚类丨代码 原作 Arnaud Zinflou 郭一璞 编译 时序数据,也就是时间序列的数据. 像股票价格.每日天气.体重变化这一类,都是时 ...

  8. python自动化部署hadoop集群_大数据集群的自动化运维实现思路

    原标题:大数据集群的自动化运维实现思路 | 作者简介 王晓伟 知数堂<大数据实战就业>课程讲师 六年大数据相关工作经验 清华大学软件工程硕士 曾就职于网易.搜狗等互联网企业 从事大数据及数 ...

  9. 一篇运维老司机的大数据平台监控宝典(1)-联通大数据集群平台监控体系进程详解

    一篇运维老司机的大数据平台监控宝典(1)-联通大数据集群平台监控体系进程详解 "如果你是一个经验丰富的运维开发人员,那么你一定知道ganglia.nagios.zabbix.elastics ...

最新文章

  1. Flutter开发之PageView指示器(31)
  2. C 语言获取系统时间
  3. Android --- build.gradle(Module:app)中各版本号讲解,例如targetSdkVersion
  4. 从Wiesloch火车站到SAP Walldorf总部的交通方式
  5. Integer源码解析
  6. 系统上线日期被老外逼得延期了!
  7. Redis 混合存储最佳实践指南
  8. 26岁创造UNIX的编程大佬,退休后却成为一名飞行员
  9. 什么叫直播秒开?如何实现秒开?
  10. 【PyQt5 知识点示例代码】布局、菜单、信号与槽、对话框、组件
  11. v3 微信api 请求微信_GitHub - helibin/wechatpay-postman-script: 微信支付API v3的调试工具...
  12. 数据结构——循环队列
  13. 在同一局域网连接其他电脑的MySQL数据库
  14. 软工实践第二次作业之个人项目
  15. Android耗电原理及飞书耗电治理
  16. 手写签名转化为电子版
  17. apex英雄机器人探路者怎么玩_Apex英雄探路者玩法技巧攻略
  18. ionic3 教程(一)安装和配置 1
  19. RuntimeWarning: Glyph 19979 missing from current font.
  20. android 自动加微信,Android实现微信自动向附近的人打招呼(AccessibilityService)

热门文章

  1. 有人说,现在创业的机会已经都错过了,现在创业一切都晚了,你怎么看?
  2. 2020第六届美亚杯中国电子数据取证大赛个人资格赛
  3. 拯救“百家讲坛”其实很简单
  4. Sentinel限流及其滑动窗口算法
  5. 【6G 新技术】6G数据面介绍
  6. 让卡巴斯基7.0支持Windows2003
  7. c语言怎么控制输出字符长度,C语言基础之格式化输出控制长度
  8. Linux --------- Linux 目录结构
  9. 【Hive】unsupport subquery expression
  10. io流FileOutputStream输出流的用法