spark读取csv转化为rdd(python+scala+java三种代码汇总)
--------------------------------------------------------------------基本信息----------------------------------------------------------
编程语言 | 运行方式 | sc.textFile默认路径 |
Python | pyspark | hdfs:// |
Scala | spark-shell | hdfs:// |
Java | Intellij | file:// |
这里的默认路径的意思是说:
如果你只写了sc.textFile("/xxx")
那么就会默认是本地路径下面的根目录或者是hdfs下面的根目录。
所以最好是都统一写成sc.textFile("/hdfs://xxx")
------------------------------------------------------------------------------------Pyspark[1]--------------------------------------------------------------
>>> import pandas as pd
>>> from pyspark.sql import SparkSession
>>> from pyspark import SparkContext
>>> from pyspark.sql import SQLContext
>>> from pyspark.sql.types import *
>>> lines = sc.textFile("/rdd1.csv")
>>> header = lines.first()
>>> lines = lines.filter(lambda row:row != header)
>>> lines
PythonRDD[13] at RDD at PythonRDD.scala:53
>>> lines.collect()
['002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello']
>>> exit()
这里注意哈,这里的路径仅仅适用于local模式,如果是集群模式,必须是传递到hdfs上去。[2]
------------------------------------------------------Scala----------------------------------------------------------------------------------------------
scala> val lines = sc.textFile("/rdd1.csv")
lines: org.apache.spark.rdd.RDD[String] = /rdd1.csv MapPartitionsRDD[1] at textFile at <console>:24
scala> var header = lines.first()
header: String = 001,hello
scala> val lines2=lines.filter(_!=header)
lines2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:27
scala> lines2.collect()
res0: Array[String] = Array(002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,he...
scala> :q
------------------------------------------------------Java----------------------------------------------------------------------------------------
src/main/java/javaread.java如下:
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import scala.Tuple2;
import java.util.*;
import org.apache.spark.SparkConf;import javax.swing.*;
import java.util.Iterator;
import java.util.Random;public class javaread {public static void main(String[] args){SparkConf conf = new SparkConf().setMaster("spark://Desktop:7077").setJars(new String[]{"/home/appleyuchi/桌面/spark_success/Spark数据倾斜处理/Java/sampling_salting/target/sampling-salting-1.0-SNAPSHOT.jar"}).setAppName("TestSpark");JavaSparkContext sc = new JavaSparkContext(conf);sc.setLogLevel("WARN");final JavaRDD<String>lines = sc.textFile("hdfs://Desktop:9000/rdd1.csv");final String header= lines.first();JavaRDD<String> lines2 = (JavaRDD<String>) lines.filter(new Function<String, Boolean>(){private static final long serialVersionUID = 1L;@Overridepublic Boolean call(String v1) throws Exception{return !v1.equals(header);}});// System.out.println(lines2.foreach(print));System.out.println(lines2.collect());}}pom.xml如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>sampling-salting</groupId><artifactId>sampling-salting</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.0.0</version></dependency></dependencies></project>
上述java代码是连接的真实集群,不是local模式,所以运行步骤和local模式不太一样:
①mvn package
②Ctrl+Shift+F10
--------------------------------------------------------------------------------------------------------------------------------------------------------
Reference:
[1]pyspark学习系列(二)读取CSV文件 为RDD或者DataFrame进行数据处理
[2]spark集群模式下textFile读取file本地文件报错解决
spark读取csv转化为rdd(python+scala+java三种代码汇总)相关推荐
- Spark 读取CSV文件为RDD
Spark 读取CSV文件为RDD 1 准备数据 在开始之前,假设我们在文件夹"c:/tmp/files"中有以下带有逗号分隔文件内容的 CSV 文件名,我使用这些文件来演示示例. ...
- Spark 读取csv文件quote配置无效
在进行数据清洗时,使用spark 读取csv文件时,遭遇到数据列中存在 \n的字符 原始数据: names "小红\n小明" 解析后数据: index names 1 小红 2 小 ...
- Python二叉树的三种深度优先遍历
Python二叉树的三种深度优先遍历 一.广度优先遍历和深度优先遍历 对二叉树进行遍历(traversal)是指依次对树中每个节点进行访问,在遍历的过程中实现需要的业务. 对树的遍历方式有广度优先遍历 ...
- 执行 Python 程序的三种方式及Python 的 IDE —— `PyCharm`
执行 Python 程序的三种方式 3.1. 解释器 python / python3 Python 的解释器 # 使用 python 2.x 解释器 $ python xxx.py# 使用 pyth ...
- python和c++哪个好-Scratch和Python与C++三种编程语言选哪个好
Scratch和Python与C++三种编程语言选哪个好? 目前少儿编程培训机构主要提供的有三种主流课程,分别是Scratch.Python和C++,面对这三种课程家长该如何选择呢,到底哪种课程适合孩 ...
- python定时爬虫三种方法
python定时爬虫三种方法 第一种 import timefrom scrapy import cmdlinedef doSth():# 把爬虫程序放在这个类里 zhilian_spider 是爬虫 ...
- python读取csv文件的方法-python读取csv文件指定行的2种方法详解
csv是Comma-Separated Values的缩写,是用文本文件形式储存的表格数据,比如如下的表格 就可以存储为csv文件,文件内容是: No.,Name,Age,Score 1,Apple, ...
- pandas读取csv写入mysql_使用python的pandas库读取csv文件保存至mysql数据库
第一:pandas.read_csv读取本地csv文件为数据框形式 data=pd.read_csv('G:\data_operation\python_book\chapter5\\sales.cs ...
- python怎么读取csv的一部分数据_python批量读取csv文件 如何用python将csv文件中的数据读取成数组...
如何用python把多个csv文件数据处理后汇总到新csv文件你看这月光多温柔,小编转头还能看见你,一切从未坍塌. 可以用pandas读取数据,首先把文件方同一个文件价里,然后对当前文件价的所有内容循 ...
最新文章
- 输入一个链表,反转链表后,输出新链表的表头(ACM格式)(美团面试题)
- 上海计算机应用基础自考上机,上海2010年自考计算机应用基础上机大纲
- Linux - chmod
- 【网络知识点】防火墙主备冗余技术
- I2C总线串行串行输入输出结构
- 嵌入式gdbserver远程调试【原创】
- can总线配置读入是什么意思_Simulink(常量amp;总线amp;示波器模块)+嵌入式(AURIX入门学习记录CAN通信配置)...
- 获得驱动器信息卷设备Ring3得到磁盘文件系统(NTFS WIN10)
- LeetCode 71. Simplify Path
- [Algorithm] Fibonacci Sequence - Anatomy of recursion and space complexity analysis
- SQL教程及学习 我选择了《SQL必知必会》
- springboot开源热门项目-bootdo修改支持多数据源
- html选择地区代码,jQuery中国区域选择器插件
- 权限提升-烂土豆dll劫持引号路径服务权限
- canvas将两张图片合并成一张图片并下载
- 安徽赛区-云巡未来-第十一届全国大学生电子商务“创新、创意及创业”挑战赛 赛后总结
- 玩转黑群晖(持续更新)
- 【Git】fatal Not a git repository or any of the parent direc
- 如何使用CC攻击中小型网站?
- python启动Android模拟器,从Python-Django启动Android模拟器