[Spark]PySpark入门学习教程---例子RDD与DataFrame
一 例子说明
用spark的RDD与DataFrame两种方式实现如下功能
1.合并主特征与单特征
2.对标签进行过滤
3.标签与特征进行合并
4.输出指定格式最后的数据
二 数据说明
包括三个文件:
标签文件 | driver.txt |
1001|1|1|10 1002|1|0|5 1003|1|0|10 1004|1|0|10 |
主特征文件 | inst.txt |
1001|0:1 1:1 2:1 3:1 1002|0:1 1:1 2:2 1003|0:1 1:1 2:3 |
单特征文件 | feature.txt |
1001|10 |
三 使用RDD方式进行操作
1.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from pyspark.sql import SparkSessionimport sys
import loggingspark = SparkSession.builder.enableHiveSupport().getOrCreate()
sc = spark.sparkContext
2
org_inst_file = "./inst.txt"
label_input_file = "./driver.txt"
subscore_file = "./feature.txt"
3
def read_inst(line):cmid, inst_str = line.strip().split("|")return (cmid, inst_str)
org_inst = sc.textFile(org_inst_file).map(read_inst) # (id, inst_str)
print(org_inst.collect())
4
def read_label(line):contents = line.strip().split("|")cmid = contents[0]label = contents[1]return (cmid, label)def filter_label(line):contents = line.strip().split("|")condition1 = contents[-1]condition2 = contents[-2]return condition1 == "5" and condition2 == "0"label = sc.textFile(label_input_file).filter(filter_label).map(lambda line: read_label(line)) # (cmid, suffix_str)
print(label.collect())
5
def read_subscore(line):cmid, score = line.strip().split("|")return (cmid, score)subscore = sc.textFile(subscore_file).map(read_subscore) # (id, subscore)
print(subscore.collect())
6
subscore_index = "4"
def merge_subscore(values):# (cmid,(inst_str,subscore))inst_str = values[0]subscore = values[1]if subscore is None:return inst_strelse:return " ".join([inst_str, "{}:{}".format(subscore_index, subscore)])
new_inst = org_inst.leftOuterJoin(subscore).mapValues(merge_subscore) #
print(new_inst.collect())
7
def merge_inst_label(data):cmid = data[0]inst_str = data[1][0]label_str = data[1][1]out = label_str + "\t" + inst_str + " #{}".format(cmid)return outinst_with_label = new_inst.join(label).map(merge_inst_label)
print(inst_with_label.collect())
8
inst_with_label.saveAsTextFile("./output_rdd")
四 使用DataFrame方式进行操作
1.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from pyspark.sql import SparkSessionimport sys
import loggingspark = SparkSession.builder.enableHiveSupport().getOrCreate()
2
org_inst_file = "./inst.txt"
label_input_file = "./driver.txt"
subscore_file = "./feature.txt"
3
df_inst = spark.read.format('csv')\.option('delimiter', '|')\.load(org_inst_file)\.toDF('id', 'index_with_feature')
df_inst.show()
df_inst.printSchema()
4
df_subscore = spark.read.format('csv')\.option('delimiter', '|')\.load(subscore_file)\.toDF('id', 'feature')
df_subscore.show()
df_subscore.printSchema()
5
df_merge_feature = df_inst.join(df_subscore, on="id", how="left")
df_merge_feature.show()
6
df_label = spark.read.format('csv')\.option('delimiter', '|')\.load(label_input_file)\.toDF('id', 'label', "condition1", "condition2")
df_label.show()df_label = df_label.filter((df_label['condition1'] == 0) & (df_label['condition2'] == 5))
df_label.show()
7
df_merge = df_merge_feature.join(df_label, on="id", how="inner")
df_merge.show()
8
from pyspark.sql.types import *
from pyspark.sql.functions import udfsubscore_index = "4"
def fc2(a, b):return "{} {}:{}".format(a, subscore_index, b)fc2 = udf(fc2, StringType())
df_merge = df_merge.withColumn('inst_feature', fc2("index_with_feature",'feature'))
df_merge.show()df_merge2 = df_merge[["id", "inst_feature", "label"]]
df_merge2.show()
9
# 写到csv
file="./output_dataframe"
df_merge2.write.csv(path=file, header=False, sep="\t", mode='overwrite')df_merge2.rdd.map(lambda x : str(x[2]) + "\t" + x[1] + " #" +x[0]).saveAsTextFile('./output_dataframe2')
[Spark]PySpark入门学习教程---例子RDD与DataFrame相关推荐
- [Spark]PySpark入门学习教程---介绍(1)
一 安装指引 (91条消息) [Hadoop] mac搭建hadoop3.X 伪分布模式_小墨鱼的专栏-CSDN博客https://zengwenqi.blog.csdn.net/article/de ...
- [Spark]PySpark入门学习教程---RDD介绍(2)
一 RDD pyspark.RDD SparkRDD RDD指的是弹性分布式数据集(Resilient Distributed Dataset),它是spark计算的核心.尽管现在都使用 ...
- java中batch基础_详解Spring batch 入门学习教程(附源码)
详解Spring batch 入门学习教程(附源码) 发布时间:2020-09-08 00:28:40 来源:脚本之家 阅读:99 作者:achuo Spring batch 是一个开源的批处理框架. ...
- 【OpenCV图像处理入门学习教程六】基于Python的网络爬虫与OpenCV扩展库中的人脸识别算法比较
OpenCV图像处理入门学习教程系列,上一篇第五篇:基于背景差分法的视频目标运动侦测 一.网络爬虫简介(Python3) 网络爬虫,大家应该不陌生了.接下来援引一些Jack-Cui在专栏<Pyt ...
- CAD入门学习教程:用CAD软件计算面积时如何转换单位?
在使用CAD建筑软件绘制图纸的过程中,经常会涉及到单位转换的问题,很多刚开始CAD入门学习的小伙伴可能不清楚如何更快速地使用CAD软件来进行CAD面积单位转换,虽然直接使用CAD计算面积无法调整单位, ...
- MyBatis入门学习教程
MyBatis入门学习教程 1. MyBatis 原生使用入门篇 1.1 什么是MyBatis? 1.2 原生使用安装方法 1.3 使用示例 1.3.1 创建一个学习项目 1.3.1 添加依赖 1.3 ...
- MAYA 2022基础入门学习教程
流派:电子学习| MP4 |视频:h264,1280×720 |音频:AAC,48.0 KHz 语言:英语+中英文字幕(根据原英文字幕机译更准确)|大小解压后:3.41 GB |时长:4.5小时 包含 ...
- 3dmax Vray建筑可视化入门学习教程
面向初学者的3Ds Max Vray最佳Archviz可视化课程 从安装到最终图像的一切都将从头开始教授,不需要任何经验 大小解压后:3.25G 时长4h 6m 1280X720 MP4 语言:英语+ ...
- Blender 3.0基础入门学习教程 Introduction to Blender 3.0
成为Blender通才,通过这个基于项目的循序渐进课程学习所有主题的基础知识. 你会学到什么 教程获取:Blender 3.0基础入门学习教程 Introduction to Blender 3.0- ...
最新文章
- RHEL6.3 DNS高级技术二 通过DNS主从区域复制实现DNS View负载均衡和冗余备份
- jQuery基础---filter()和find()
- 一篇很全面的freemarker 前端web教程
- rank() over,dense_rank(),row_number() 的区别
- Leetcode题库191.位1的个数(C实现)
- esri geometry-api-java的maven创建
- 2015-03-17 current note creation logic in my task
- caffe caffe.cpp 程序入口分析
- 深度学习中的信息论——交叉熵
- 震惊!原来leetcode竟然真的能中奖?
- 一文看尽Google I/O大会:AI打电话以假乱真,TPU 3.0正式发布
- (转载)python re模块详解 正则表达式
- Python使用datetime模块进行简单的日期换算与计算
- 代码开源许可证 区别 Apache MIT GPL
- Activity的几种启动模式介绍
- Midnight Commander强大的命令行文件管理器
- 【U8+】去掉填制凭证界面金额中的网格
- android页面监听扫描枪,Android监听扫描枪内容(二)
- VB对象的事件和方法
- 斯坦福大学自然语言处理第四课 语言模型(Language Modeling)笔记
热门文章
- 重庆大学计算机学院就读,唐远炎(计算机学院)老师 - 重庆大学 - 院校大全
- ffmpeg编译gb28181_国标GB28181协议视频推流平台EasyGBD在Linux下编译报“UINT64_C在此作用领域中尚未声明”错误...
- 教你用 Newprep 一键封装工具 封装XP克隆系统- 视频教程
- 大数据学习路线copy自淘宝
- SSH-Struts第四弹:Struts2学习过程中遇到的问题
- 转:MVC3系列:~Html.BeginForm与Ajax.BeginForm
- 深入探究VC —— 编译器cl.exe(1)
- 进一步:BSD信号和异常同时捕获
- 了解恶意软件和插件!
- 测试mysql连接服务器_实现服务器与数据库的连接