一 例子说明

用spark的RDD与DataFrame两种方式实现如下功能

1.合并主特征与单特征

2.对标签进行过滤

3.标签与特征进行合并

4.输出指定格式最后的数据

二 数据说明

包括三个文件:

标签文件 driver.txt 1001|1|1|10
1002|1|0|5
1003|1|0|10
1004|1|0|10
主特征文件 inst.txt 1001|0:1 1:1 2:1 3:1
1002|0:1 1:1 2:2
1003|0:1 1:1 2:3
单特征文件 feature.txt

1001|10
1002|11
1003|12
1004|13

三 使用RDD方式进行操作

1.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from pyspark.sql import SparkSessionimport sys
import loggingspark = SparkSession.builder.enableHiveSupport().getOrCreate()
sc = spark.sparkContext

2

org_inst_file = "./inst.txt"
label_input_file = "./driver.txt"
subscore_file = "./feature.txt"

3

def read_inst(line):cmid, inst_str = line.strip().split("|")return (cmid, inst_str)
org_inst = sc.textFile(org_inst_file).map(read_inst)  # (id, inst_str)
print(org_inst.collect())

4

def read_label(line):contents = line.strip().split("|")cmid = contents[0]label = contents[1]return (cmid, label)def filter_label(line):contents = line.strip().split("|")condition1 = contents[-1]condition2 = contents[-2]return condition1 == "5" and condition2 == "0"label = sc.textFile(label_input_file).filter(filter_label).map(lambda line: read_label(line))  # (cmid, suffix_str)
print(label.collect())

5

def read_subscore(line):cmid, score = line.strip().split("|")return (cmid, score)subscore = sc.textFile(subscore_file).map(read_subscore)  # (id, subscore)
print(subscore.collect())

6

subscore_index = "4"
def merge_subscore(values):# (cmid,(inst_str,subscore))inst_str = values[0]subscore = values[1]if subscore is None:return inst_strelse:return " ".join([inst_str, "{}:{}".format(subscore_index, subscore)])
new_inst = org_inst.leftOuterJoin(subscore).mapValues(merge_subscore)  #
print(new_inst.collect())

7

def merge_inst_label(data):cmid = data[0]inst_str = data[1][0]label_str = data[1][1]out = label_str + "\t" + inst_str + " #{}".format(cmid)return outinst_with_label = new_inst.join(label).map(merge_inst_label)
print(inst_with_label.collect())

8

inst_with_label.saveAsTextFile("./output_rdd")

四 使用DataFrame方式进行操作

1.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from pyspark.sql import SparkSessionimport sys
import loggingspark = SparkSession.builder.enableHiveSupport().getOrCreate()

2

org_inst_file = "./inst.txt"
label_input_file = "./driver.txt"
subscore_file = "./feature.txt"

3

df_inst = spark.read.format('csv')\.option('delimiter', '|')\.load(org_inst_file)\.toDF('id', 'index_with_feature')
df_inst.show()
df_inst.printSchema()

4

df_subscore = spark.read.format('csv')\.option('delimiter', '|')\.load(subscore_file)\.toDF('id', 'feature')
df_subscore.show()
df_subscore.printSchema()

5

df_merge_feature = df_inst.join(df_subscore, on="id", how="left")
df_merge_feature.show()

6

df_label = spark.read.format('csv')\.option('delimiter', '|')\.load(label_input_file)\.toDF('id', 'label', "condition1", "condition2")
df_label.show()df_label = df_label.filter((df_label['condition1'] == 0) & (df_label['condition2'] == 5))
df_label.show()

7

df_merge = df_merge_feature.join(df_label, on="id", how="inner")
df_merge.show()

8

from pyspark.sql.types import *
from pyspark.sql.functions import udfsubscore_index = "4"
def fc2(a, b):return "{} {}:{}".format(a, subscore_index, b)fc2 = udf(fc2, StringType())
df_merge = df_merge.withColumn('inst_feature', fc2("index_with_feature",'feature'))
df_merge.show()df_merge2 = df_merge[["id", "inst_feature", "label"]]
df_merge2.show()

9

# 写到csv
file="./output_dataframe"
df_merge2.write.csv(path=file, header=False, sep="\t", mode='overwrite')df_merge2.rdd.map(lambda x : str(x[2]) + "\t" + x[1] + " #" +x[0]).saveAsTextFile('./output_dataframe2')

[Spark]PySpark入门学习教程---例子RDD与DataFrame相关推荐

  1. [Spark]PySpark入门学习教程---介绍(1)

    一 安装指引 (91条消息) [Hadoop] mac搭建hadoop3.X 伪分布模式_小墨鱼的专栏-CSDN博客https://zengwenqi.blog.csdn.net/article/de ...

  2. [Spark]PySpark入门学习教程---RDD介绍(2)

    一 RDD pyspark.RDD        SparkRDD RDD指的是弹性分布式数据集(Resilient Distributed Dataset),它是spark计算的核心.尽管现在都使用 ...

  3. java中batch基础_详解Spring batch 入门学习教程(附源码)

    详解Spring batch 入门学习教程(附源码) 发布时间:2020-09-08 00:28:40 来源:脚本之家 阅读:99 作者:achuo Spring batch 是一个开源的批处理框架. ...

  4. 【OpenCV图像处理入门学习教程六】基于Python的网络爬虫与OpenCV扩展库中的人脸识别算法比较

    OpenCV图像处理入门学习教程系列,上一篇第五篇:基于背景差分法的视频目标运动侦测 一.网络爬虫简介(Python3) 网络爬虫,大家应该不陌生了.接下来援引一些Jack-Cui在专栏<Pyt ...

  5. CAD入门学习教程:用CAD软件计算面积时如何转换单位?

    在使用CAD建筑软件绘制图纸的过程中,经常会涉及到单位转换的问题,很多刚开始CAD入门学习的小伙伴可能不清楚如何更快速地使用CAD软件来进行CAD面积单位转换,虽然直接使用CAD计算面积无法调整单位, ...

  6. MyBatis入门学习教程

    MyBatis入门学习教程 1. MyBatis 原生使用入门篇 1.1 什么是MyBatis? 1.2 原生使用安装方法 1.3 使用示例 1.3.1 创建一个学习项目 1.3.1 添加依赖 1.3 ...

  7. MAYA 2022基础入门学习教程

    流派:电子学习| MP4 |视频:h264,1280×720 |音频:AAC,48.0 KHz 语言:英语+中英文字幕(根据原英文字幕机译更准确)|大小解压后:3.41 GB |时长:4.5小时 包含 ...

  8. 3dmax Vray建筑可视化入门学习教程

    面向初学者的3Ds Max Vray最佳Archviz可视化课程 从安装到最终图像的一切都将从头开始教授,不需要任何经验 大小解压后:3.25G 时长4h 6m 1280X720 MP4 语言:英语+ ...

  9. Blender 3.0基础入门学习教程 Introduction to Blender 3.0

    成为Blender通才,通过这个基于项目的循序渐进课程学习所有主题的基础知识. 你会学到什么 教程获取:Blender 3.0基础入门学习教程 Introduction to Blender 3.0- ...

最新文章

  1. RHEL6.3 DNS高级技术二 通过DNS主从区域复制实现DNS View负载均衡和冗余备份
  2. jQuery基础---filter()和find()
  3. 一篇很全面的freemarker 前端web教程
  4. rank() over,dense_rank(),row_number() 的区别
  5. Leetcode题库191.位1的个数(C实现)
  6. esri geometry-api-java的maven创建
  7. 2015-03-17 current note creation logic in my task
  8. caffe caffe.cpp 程序入口分析
  9. 深度学习中的信息论——交叉熵
  10. 震惊!原来leetcode竟然真的能中奖?
  11. 一文看尽Google I/O大会:AI打电话以假乱真,TPU 3.0正式发布
  12. (转载)python re模块详解 正则表达式
  13. Python使用datetime模块进行简单的日期换算与计算
  14. 代码开源许可证 区别 Apache MIT GPL
  15. Activity的几种启动模式介绍
  16. Midnight Commander强大的命令行文件管理器
  17. 【U8+】去掉填制凭证界面金额中的网格
  18. android页面监听扫描枪,Android监听扫描枪内容(二)
  19. VB对象的事件和方法
  20. 斯坦福大学自然语言处理第四课 语言模型(Language Modeling)笔记

热门文章

  1. 重庆大学计算机学院就读,唐远炎(计算机学院)老师 - 重庆大学 - 院校大全
  2. ffmpeg编译gb28181_国标GB28181协议视频推流平台EasyGBD在Linux下编译报“UINT64_C在此作用领域中尚未声明”错误...
  3. 教你用 Newprep 一键封装工具 封装XP克隆系统- 视频教程
  4. 大数据学习路线copy自淘宝
  5. SSH-Struts第四弹:Struts2学习过程中遇到的问题
  6. 转:MVC3系列:~Html.BeginForm与Ajax.BeginForm
  7. 深入探究VC —— 编译器cl.exe(1)
  8. 进一步:BSD信号和异常同时捕获
  9. 了解恶意软件和插件!
  10. 测试mysql连接服务器_实现服务器与数据库的连接