除了可以对本地文件系统进行读写以外,Spark还支持很多其他常见的文件格式(如文本文件、JSON、SequenceFile等)和文件系统(如HDFS、Amazon S3等)和数据库(如MySQL、HBase、Hive等)。数据库的读写我们将在Spark SQL部分介绍,因此,这里只介绍文件系统的读写和不同文件格式的读写。

请进入Linux系统,打开“终端”,进入Shell命令提示符状态,然后,在“/usr/local/spark/mycode”目录下,新建一个wordcount子目录(如果已经存在就不用创建),并在“/usr/local/spark/mycode/wordcount”目录下新建一个包含了一些语句的文本文件word.txt(你可以在文本文件中随意输入一些单词,用空格隔开)。

首先,请登录Linux系统(要注意记住登录采用的用户名,本教程统一采用hadoop用户名进行登录),打开“终端”(可以在Linux系统中使用Ctrl+Alt+T组合键开启终端),进入shell命令提示符状态,然后执行以下命令进入pyspark:

cd /usr/local/spark

./bin/pyspark

....#这里省略启动过程显示的一大堆信息

>>>

启动进入pyspark需要一点时间,在进入pyspark后,我们可能还需要到Linux文件系统中对相关目录下的文件进行编辑和操作(比如要查看spark程序执行过程生成的文件),这个无法在pyspark中完成,因此,这里再打开第二个终端,用来在Linux系统的Shell命令提示符下操作。

文件系统的数据读写

下面分别介绍本地文件系统的数据读写和分布式文件系统HDFS的数据读写。

本地文件系统的数据读写

首先,请在第二个终端窗口下操作,用下面命令到达“/usr/local/spark/mycode/wordcount”目录,查看一下上面已经建好的word.txt的内容:

cd /usr/local/spark/mycode/wordcount

cat word.txt

cat命令会把word.txt文件的内容全部显示到屏幕上。

现有让我们切换回到第一个终端,也就是spark-shell,然后输入下面命令:

>>> textFile = sc.textFile("file:///usr/local/spark/mycode/wordcount/word.txt")

上面代码中,sc.textFile()中的这个textFile是sc的一个方法名称,这个方法用来加载文件数据。这两个textFile不是一个东西,不要混淆。实际上,val后面的是变量textFile,你完全可以换个变量名称,比如,val lines = sc.textFile(“file:///usr/local/spark/mycode/wordcount/word.txt”)。这里使用相同名称,就是有意强调二者的区别。

注意,要加载本地文件,必须采用“file:///”开头的这种格式。执行上上面这条命令以后,并不会马上显示结果,因为,Spark采用惰性机制,只有遇到“行动”类型的操作,才会从头到尾执行所有操作。所以,下面我们执行一条“行动”类型的语句,就可以看到结果:

>>> textFile.first()

first()是一个“行动”(Action)类型的操作,会启动真正的计算过程,从文件中加载数据到变量textFile中,并取出第一行文本。屏幕上会显示很多反馈信息,这里不再给出,你可以从这些结果信息中,找到word.txt文件中的第一行的内容。

正因为Spark采用了惰性机制,在执行转换操作的时候,即使我们输入了错误的语句,pyspark也不会马上报错,而是等到执行“行动”类型的语句时启动真正的计算,那个时候“转换”操作语句中的错误就会显示出来,比如:

textFile = sc.textFile("file:///usr/local/spark/mycode/wordcount/word123.txt")

上面我们使用了一个根本就不存在的word123.txt,执行上面语句时,pyspark根本不会报错,因为,没有遇到“行动”类型的first()操作之前,这个加载操作时不会真正执行的。然后,我们执行一个“行动”类型的操作first(),如下:

>>> textFile.first()

执行上面语句后,你会发现,会返回错误信息,其中有四个醒目的中文文字“拒绝连接”,因为,这个word123.txt文件根本就不存在。

好了,现在我们可以练习一下如何把textFile变量中的内容再次写回到另外一个文本文件wordback.txt中:

>>> textFile = sc.textFile("file:///usr/local/spark/mycode/wordcount/word.txt")

>>> textFile.saveAsTextFile("file:///usr/local/spark/mycode/wordcount/writeback.txt")

saveAsTextFile()是一个“行动”(Action)类型的操作,所以,马上会执行真正的计算过程,从word.txt中加载数据到变量textFile中,然后,又把textFile中的数据写回到writeback.txt中。现在我们到/usr/local/spark/mycode/wordcount/目录看一下,会发现,确实多了一个writeback.txt,但是,和我们预期的不一样,它不是一个文件,而是一个文件夹(writeback.txt作为文件夹名称当然是没有问题的,虽然不符合我们平时的习惯)。现在让我们切换到Linux Shell命令提示符窗口中,执行下面命令:

cd /usr/local/spark/mycode/wordcount/writeback.txt/

ls

执行结果中可以看到,writeback.txt这个目录下面包含两个文件:

part-00000

_SUCCESS

也就是说,该目录下包含两个文件,我们可以使用cat命令查看一下part-00000文件(注意:part-后面是五个零):

cat part-00000

显示结果,是和上面word.txt中的内容一样的。

现在的问题是,我们如果想再次把数据加载在RDD中,应该使用哪个文件呢?答案很简单,只要使用writeback.txt这个目录即可,如下:

>>> textFile = sc.textFile("file:///usr/local/spark/mycode/wordcount/writeback.txt")

分布式文件系统HDFS的数据读写

为了能够读取HDFS中的文件,请首先启动Hadoop中的HDFS组件。注意,之前我们在“Spark安装”这章内容已经介绍了如何安装Hadoop和Spark,所以,这里我们可以使用以下命令直接启动Hadoop中的HDFS组件(由于用不到MapReduce组件,所以,不需要启动MapReduce或者YARN)。请到第二个终端窗口,使用Linux Shell命令提示符状态,然后输入下面命令:

cd /usr/local/hadoop

./sbin/start-dfs.sh

启动结束后,HDFS开始进入可用状态。如果你在HDFS文件系统中,还没有为当前Linux登录用户创建目录(本教程统一使用用户名hadoop登录Linux系统),请使用下面命令创建:

./bin/hdfs dfs -mkdir -p /user/hadoop

也就是说,HDFS文件系统为Linux登录用户开辟的默认目录是“/user/用户名”(注意:是user,不是usr),本教程统一使用用户名hadoop登录Linux系统,所以,上面创建了“/user/hadoop”目录,再次强调,这个目录是在HDFS文件系统中,不在本地文件系统中。创建好以后,下面我们使用命令查看一下HDFS文件系统中的目录和文件:

./bin/hdfs dfs -ls .

上面命令中,最后一个点号“.”,表示要查看Linux当前登录用户hadoop在HDFS文件系统中与hadoop对应的目录下的文件,也就是查看HDFS文件系统中“/user/hadoop/”目录下的文件,所以,下面两条命令是等价的:

./bin/hdfs dfs -ls .

./bin/hdfs dfs -ls /user/hadoop

如果要查看HDFS文件系统根目录下的内容,需要使用下面命令:

./bin/hdfs dfs -ls /

下面,我们把本地文件系统中的“/usr/local/spark/mycode/wordcount/word.txt”上传到分布式文件系统HDFS中(放到hadoop用户目录下):

./bin/hdfs dfs -put /usr/local/spark/mycode/wordcount/word.txt .

然后,用命令查看一下HDFS的hadoop用户目录下是否多了word.txt文件,可以使用下面命令列出hadoop目录下的内容:

./bin/hdfs dfs -ls .

可以看到,确实多了一个word.txt文件,我们使用cat命令查看一个HDFS中的word.txt文件的内容,命令如下:

./bin/hdfs dfs -cat ./word.txt

上面命令执行后,就会看到HDFS中word.txt的内容了。

现在,让我们切换回到pyspark窗口,编写语句从HDFS中加载word.txt文件,并显示第一行文本内容:

>>> val textFile = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")

>>> textFile.first()

执行上面语句后,就可以看到HDFS文件系统中(不是本地文件系统)的word.txt的第一行内容了。

需要注意的是,sc.textFile(“hdfs://localhost:9000/user/hadoop/word.txt”)中,“hdfs://localhost:9000/”是前面介绍Hadoop安装内容时确定下来的端口地址9000。实际上,也可以省略不写,如下三条语句都是等价的:

>>> val textFile = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")

>>> val textFile = sc.textFile("/user/hadoop/word.txt")

>>> val textFile = sc.textFile("word.txt")

下面,我们再把textFile的内容写回到HDFS文件系统中(写到hadoop用户目录下):

>>> val textFile = sc.textFile("word.txt")

>>> textFile.saveAsTextFile("writeback.txt")

执行上面命令后,文本内容会被写入到HDFS文件系统的“/user/hadoop/writeback.txt”目录下,我们可以切换到Linux Shell命令提示符窗口查看一下:

./bin/hdfs dfs -ls .

执行上述命令后,在执行结果中,可以看到有个writeback.txt目录,下面我们查看该目录下有什么文件:

./bin/hdfs dfs -ls ./writeback.txt

执行结果中,可以看到存在两个文件:part-00000和_SUCCESS。我们使用下面命令输出part-00000文件的内容(注意:part-00000里面有五个零):

./bin/hdfs dfs -cat ./writeback.txt/part-00000

执行结果中,就可以看到和word.txt文件中一样的文本内容。

当需要再次把writeback.txt中的内容加载到RDD中时,只需要加载writeback.txt目录即可,不需要使用part-00000文件,如下所示:

>>> textFile = sc.textFile("hdfs://localhost:9000/user/hadoop/writeback.txt")

不同文件格式的读写

文本文件

实际上,我们在上面演示的都是文本文件的读写,因此,这里不再赘述,只是简单再总结一下。

把本地文件系统中的文本文件加载到RDD中的语句如下:

>>> rdd = sc.textFile("file:///usr/local/spark/mycode/wordcount/word.txt")

当我们给textFile()函数传递一个“包含完整路径的文件名”时,就会把这个文件加载到RDD中。如果我们给textFile()函数传递的不是文件名,而是一个目录,则该目录下的所有文件内容都会被读取到RDD中。

关于把RDD中的数据保存到文本文件,可以采用如下语句:

>>> rdd.saveAsTextFile("file:///usr/local/spark/mycode/wordcount/outputFile")

正像上面我们已经介绍的那样,我们在saveAsTextFile()函数的参数中给出的是目录,不是文件名,RDD中的数据会被保存到给定的目录下。

JSON

JSON(JavaScript Object Notation) 是一种轻量级的数据交换格式。

Spark提供了一个JSON样例数据文件,存放在“/usr/local/spark/examples/src/main/resources/people.json”中。people.json文件的内容如下:

{"name":"Michael"}

{"name":"Andy", "age":30}

{"name":"Justin", "age":19}

我们下面可以对这个样例数据文件进行解析。

下面请在Linux系统的Shell命令提示符下操作,请进入“/usr/local/spark/mycode”目录,并新建一个json子目录,代码如下:

cd /usr/local/spark/mycode

mkdir json

cd json

在编写解析程序之前,我们首先来看一下把本地文件系统中的people.json文件加载到RDD中以后,数据是什么形式,请在spark-shell中执行如下操作:

>>> jsonStr = sc.textFile("file:///usr/local/spark/examples/src/main/resources/people.json")

>>> jsonStr.foreach(print)

{"name":"Michael"}

{"name":"Andy", "age":30}

{"name":"Justin", "age":19}

从上面执行结果可以看出,people.json文件加载到RDD中以后,在RDD中存在三个字符串。我们下面要做的事情,就是把这三个JSON格式的字符串解析出来,比如说,第一个字符串{“name”:”Michael”},经过解析后,解析得到key是”name”,value是”Michael”。

现在我们编写程序完成对上面字符串的解析工作。

Scala中有一个自带的JSON库——scala.util.parsing.json.JSON,可以实现对JSON数据的解析。JSON.parseFull(jsonString:String)函数,以一个JSON字符串作为输入并进行解析,如果解析成功则返回一个Some(map: Map[String, Any]),如果解析失败则返回None。

因此,我们可以使用模式匹配来处理解析结果

请执行以下命令:

cd /usr/local/spark/mycode/json

vim testjson.py

在testjson.py代码文件中输入以下内容:

from pyspark import SparkContext

import json

sc = SparkContext('local','JSONAPP')

inputFile = "file:///home/dblab/people.json"

jsonStrs = sc.textFile(inputFile)

result = jsonStrs.map(lambda s : json.loads(s))

result.foreach(print)

接下请执行如下代码:

python3 testjson.py

执行后可以在屏幕上的大量输出信息中找到如下结果:

执行后可以在屏幕上的大量输出信息中找到如下结果:

python写spark_Spark2.1.0+入门:文件数据读写(Python版)相关推荐

  1. python写自动化运行脚本_bat文件一键运行python自动化脚本

    ThinkPHP3.2.3 安装教程 本文以  Windows  平台为例 安装前准备:Windows操作系统的电脑,php编程环境(配置好了Apache.MySql.php).推荐wampserve ...

  2. python写入数据的一种措施_Python 文件数据读写的具体实现

    文件数据读写 读写文件,本质上是请求操作系统打开一个文件对象,然后,通过操作系统提供的接口从这个文件对象中读取数据(读文件),或者把数据写入这个文件对象(写文件). 文件读取 使用 Python 内置 ...

  3. python是一种面向对象的高级语言_为什么入门大数据选择Python而不是Java?

    马云说:"未来最大的资源就是数据,不参与大数据十年后一定会后悔."毕竟出自wuli马大大之口,今年二月份我开始了学习大数据的道路,直到现在对大数据的学习脉络和方法也渐渐清晰.今天我 ...

  4. python怎么导入csv文件数据-机器学习Python实践——数据导入(CSV)

    一,CSV 逗号分隔值(逗号分隔值,CSV,有时也称为字符分隔值,因为分隔字符也可以不是逗号),其文件以纯文本形式存储表格数据(数字和文本).字幕:纯意味着该文件的英文一个字符序列,不含必须像二进制数 ...

  5. Pytorch1.1.0 入门 自定义op(python)

    因为需求,需要调研tensorRT与ONNX关于自定义层的方法.经过之前的调研,首先,关于onnx,开发者手册中的介绍有限,在已知的demo中没有关于onnx自定义层的,详情见TensorRT 5.1 ...

  6. 为什么都建议学java而不是python-为什么入门大数据选择Python而不是Java?

    马云说:"未来最大的资源就是数据,不参与大数据十年后一定会后悔."毕竟出自wuli马大大之口,今年二月份我开始了学习大数据的道路,直到现在对大数据的学习脉络和方法也渐渐清晰.今天我 ...

  7. python和java区别大吗-为什么入门大数据选择Python而不是Java?

    马云说:"未来最大的资源就是数据,不参与大数据十年后一定会后悔."毕竟出自wuli马大大之口,今年二月份我开始了学习大数据的道路,直到现在对大数据的学习脉络和方法也渐渐清晰.今天我 ...

  8. 翰林学院python_为什么入门大数据选择Python而不是Java?

    马云说:"未来最大的资源就是数据,不参与大数据十年后一定会后悔."毕竟出自wuli马大大之口,今年二月份我开始了学习大数据的道路,直到现在对大数据的学习脉络和方法也渐渐清晰.今天我 ...

  9. python读取c盘中的csv文件-python读取当前目录下的CSV文件数据

    在处理数据的时候,经常会碰到CSV类型的文件,下面将介绍如何读取当前目录下的CSV文件,步骤如下 1.获取当前目录所有的CSV文件名称: #创建一个空列表,存储当前目录下的CSV文件全称 file_n ...

最新文章

  1. 连接linux系统软件,Windows系统上有哪些不错的终端软件可以远程连接Linux?
  2. Nginx容器日志收集方案fluentd+elasticsearch+kilbana
  3. 【leetcode】Jump Game I, II 跳跃游戏一和二
  4. mysql安装图形化管理界面phpMyAdmin
  5. 不用中间变量交换a和b的值?
  6. 【问链财经-区块链基础知识系列】 第二十一课 区块链应用于大宗商品供应链金融
  7. VTK:地形抽取用法实战
  8. 外挂学习之路(12)--- 用CE搜索字符串和搜索字符数组的区别
  9. ffmpeg for iOS,并调试iFrameExtractor demo
  10. Linux Tomcat8 启动堆内存溢出
  11. Hadoop不适合哪些场景 哪些场景适合?
  12. (Ajax)axios源码简析(三)——请求与取消请求
  13. 命令行运行python找不到sqlite_ubuntu python3.6 找不到_sqlite3
  14. Mac安装Qt出现错误Could not resolve SDK Path for ‘macosx‘
  15. oracle学习札记72
  16. 阿里笔试 3.14 T1
  17. 腾讯下载的视频怎么转换成mp4格式
  18. 倾斜摄影技术发展与应用前景
  19. python中mysqldb模块_python之MySQLdb模块
  20. 运用fiddler工具深度配置证书抓苹果IOS微信小程序或app数据请求

热门文章

  1. 智能录音笔的工作原理
  2. SAS与固定收益证券有关的函数(1)
  3. SEO学习笔记六(SEO实战密码读书笔记)
  4. 经验 | 嵌入式工程师如何写简历?
  5. 运用R语言(ggplot2包)绘制箱式图
  6. 推荐几个比较有意思的js效果插件
  7. 狡兔三窟,这是第二窟;)
  8. 如何在cmd查看文件内容的MD5值
  9. javascript为数字添加千分符
  10. HTML基础总结第一节