PySpark-Recipes : I/O操作(txt, json, hdfs, csv...)
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('IO').getOrCreate()
read txt
- textFile : only text content
- wholeTextFiles : file path and text content
- 通用的三个参数:
- path
- minPartitions
- use_unicode : default False
data = spark.sparkContext.textFile('./chapter6/dataFiles/shakespearePlays.txt', 2)data_list = data.collect()
data_list
["Love's Labour's Lost ","A Midsummer Night's Dream",'Much Ado About Nothing','As You Like It']
data.count()
4
# 字符总长度
char_lenght = data.map(lambda x: len(x))char_lenght.sum()
86
whole_data = spark.sparkContext.wholeTextFiles('./chapter6/dataFiles/shakespearePlays.txt',2)whole_data_list = whole_data.collect()
whole_data_list
[('file:/home/wjh/PYspark/pyspark-recipes/code_mishra/chapter6/dataFiles/shakespearePlays.txt',"Love's Labour's Lost \nA Midsummer Night's Dream\nMuch Ado About Nothing\nAs You Like It\n")]
whole_data.keys().collect()
['file:/home/wjh/PYspark/pyspark-recipes/code_mishra/chapter6/dataFiles/shakespearePlays.txt']
whole_data.values().collect()
["Love's Labour's Lost \nA Midsummer Night's Dream\nMuch Ado About Nothing\nAs You Like It\n"]
write txt
data = spark.sparkContext.textFile('./chapter6/dataFiles/shakespearePlays.txt',4)
dataLineLength = data.map(lambda x : len(x))dataLineLength.saveAsTextFile('./chapter6/dataFiles/save_rdd')
read direction
# Reading a directory using textFile() function.>>> manyFilePlayData = sc.textFile('/home/pysparkbook/pysparkBookData/manyFiles',4)>>> manyFilePlayData.collect()# Reading a directory using wholeTextFiles() function.>>> manyFilePlayDataKeyValue = sc.wholeTextFiles('/home/pysparkbook/pysparkBookData/manyFiles',4)
>>> manyFilePlayDataKeyValue.collect()
read hdfs
# Reading Data from HDFS.>>> filamentdata = sc.textFile('hdfs://localhost:9746/bookData/filamentData.csv',4)
save hdfs
>>> playData = sc.textFile('/home/muser/bData/shakespearePlays.txt',4)
>>> playDataLineLength = playData.map(lambda x : len(x))
>>> playDataLineLength.collect()# Saving RDD to HDFS.>>> playDataLineLength.saveAsTextFile('hdfs://localhost:9746/savedData/')### hadoop fs -cat /savedData/part-00000
Reading Data from Sequential File
What is a sequential file?
A sequential file is one whose contents is stored in some order. It must always be read starting from the beginning of the file. This is opposed to a direct access file, whose contents can be retrieved without reading the entire file.
- seqenceFile():
- path
- keyClass : indicates the key class of data in the sequence file
- valueClass datatype of the values
>>> simpleRDD = sc.sequenceFile('hdfs://localhost:9746/sequenceFileToRead')
>>> simpleRDD.collect()
Write Data to a Sequential File
Data = [('si1','Python'),('si3','Java'),('si1','Java'),('si2','Python'),('si3','Ruby'),('si4','C++'),('si5','C'),('si4','Python'),('si2','Java')]RDD = sc.parallelize(subjectsData, 4)RDD.saveAsSequenceFile('hdfs://localhost:9746/sequenceFiles')
read csv file
import csv
import io
def parseCSV(csvRow) :data = io.StringIO(csvRow)dataReader = csv.reader(data)return([x for x in dataReader])csvRow = "p,s,r,p"
parseCSV(csvRow)
[['p', 's', 'r', 'p']]
read json file
import jsondef jsonParse(dataLine):parsedDict = json.loads(dataLine)valueData = parsedDict.values()return(valueData)jsonData = '{"Time":"6AM", "Temperature":15}'
jsonParsedData = jsonParse(jsonData)
print(jsonParsedData)
dict_values(['6AM', 15])
>>> tempData = sc.textFile("/home/pysparkbook//pysparkBookData/tempData.json",4)>>> tempData.take(4)# Creating paired RDD.
>>> tempDataParsed = tempData.map(jsonParse)
# write json format
def createJSON(data):dataDict = {}dataDict['Name'] = data[0]dataDict['Age'] = data[1]return(json.dumps(dataDict))nameAgeList = ['Arun',22]createJSON(nameAgeList)
'{"Name": "Arun", "Age": 22}'
>>> nameAgeData = [['Arun',22],
... ['Bony',35],
... ['Juna',29]]
>>> nameAgeRDD = sc.parallelize(nameAgeData,3)>>> nameAgeRDD.collect()>>> nameAgeJSON = nameAgeRDD.map(createJSON)
>>> nameAgeJSON.collect()
>>> nameAgeJSON.saveAsTextFile('/home/pysparkbook/jsonDir/')
Reading table data from HBase using PySpark
>>> hostName = 'localhost'>>> tableName = 'pysparkBookTable'>>> ourInputFormatClass='org.apache.hadoop.hbase.mapreduce.TableInputFormat'
>>> ourKeyClass='org.apache.hadoop.hbase.io.ImmutableBytesWritable'
>>> ourValueClass='org.apache.hadoop.hbase.client.Result'
>>> ourKeyConverter='org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter'
>>> ourValueConverter='org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter'
>>> configuration = {}
>>> configuration['hbase.mapreduce.inputtable'] = tableName
>>> configuration['hbase.zookeeper.quorum'] = hostNameNow it is time to call the function newAPIHadoopRDD() with its arguments. >>> tableRDDfromHBase = sc.newAPIHadoopRDD(
... inputFormatClass = ourInputFormatClass,
... keyClass = ourKeyClass,
... valueClass = ourValueClass,
... keyConverter = ourKeyConverter,
... valueConverter = ourValueConverter,
... conf = configuration
... )Let us see how our paired RDD tableRDDfromHBase looks like. >>> tableRDDfromHBase.take(2)
PySpark-Recipes : I/O操作(txt, json, hdfs, csv...)相关推荐
- solr6.6 导入 文本(txt/json/xml/csv)文件
参照:solr6.6 导入 pdf文件 重点就是三个配置文件 1.建立的data-config.xml 内容如下: <dataConfig><dataSource name=&quo ...
- python之文件操作、对.txt文本文件的操作(读、写、修改、复制、合并)、对json文本文件的操作、json字符串与字典的相互转换。
注意:本篇所讲的文件操作,都是对后缀为.txt..json文件进行的操作. 1.json其实就是一个文件的格式,像.txt一样,json也是一个纯文本文件.与.txt不同的是,json常用来存放有键值 ...
- scrapy 保存mysql_scrapy爬虫事件以及数据保存为txt,json,mysql
今天要爬取的网页是虎嗅网 我们将完成如下几个步骤: 创建一个新的Scrapy工程 定义你所需要要抽取的Item对象 编写一个spider来爬取某个网站并提取出所有的Item对象 编写一个Item Pi ...
- Atittit HDFS hadoop 大数据文件系统java使用总结 目录 1. 操作系统,进行操作 1 2. Hdfs 类似nfs ftp远程分布式文件服务 2 3. 启动hdfs服务start
Atittit HDFS hadoop 大数据文件系统java使用总结 目录 1. 操作系统,进行操作 1 2. Hdfs 类似nfs ftp远程分布式文件服务 2 3. 启动hdfs服务start- ...
- 实验二 HDFS的Shell命令操作,和HDFS的API操作
文章目录 实验目的 一. 实验原理 二.实验准备 实验内容 步骤 项目1 HDFS常见命令练习 列出HDFS当前用户家目录下的文件及文件夹: 列出HDFS文件下名为directory的文件夹中的文件: ...
- Python处理数据,并经其存储为文本数据(Txt、JSON、CSV、Excel)总结
处理文本数据(Txt.JSON.CSV.Excel) Txt文件存储 JSON文件存储 CSV文件存储 Excel文件存储 Txt文件存储 将数据保存为txt文件格式是最为简单的. with open ...
- python中txt转成csv_Python-如何将JSON转换为CSV?
我有一个要转换为CSV文件的JSON文件.如何使用Python执行此操作? 我试过了: import json import csv f = open('data.json') data = json ...
- python应用中调用spark_在python中使用pyspark读写Hive数据操作
1.读Hive表数据 pyspark读取hive数据非常简单,因为它有专门的接口来读取,完全不需要像hbase那样,需要做很多配置,pyspark提供的操作hive的接口,使得程序可以直接使用SQL语 ...
- MySQL操作之JSON数据类型操作详解
MySQL操作之JSON数据类型操作详解 这篇文章主要介绍了MySQL操作之JSON数据类型操作详解,内容较为详细,具有收藏价值,需要的朋友可以参考. 概述 mysql自5.7.8版本开始,就支持了j ...
- C# 学习 txt -- excel txt -- json
在Unity3D 中使用,需要配置环境,\Assets\Plugins Txt --> Excel using System.Collections; using System.Colle ...
最新文章
- php解析html类库simple_html_dom
- 数据切分——Atlas读写分离Mysql集群的搭建
- 目标检测第4步:显卡、GPU、CUDA、cuDNN的介绍及如何在Windows 10下安装cuDNN?
- Python中列表的del,remove和pop函数之间的区别
- vs设计窗口不见了_龙猫腕表评测:VS沛纳海320V2版本
- php采集url,PHP-如何采集这个url跳转内容呢
- 转:55个javascript处理网页技巧
- es大量数据导入效率优化
- Eclipse修改svn地址
- 你的名字比我生命更重要
- 主流强化学习算法论文综述:DQN、DDPG、TRPO、A3C、PPO、SAC、TD3
- 跨时钟域问题(三)异步FIFO的Verilog实现(格雷码)
- 【2023】上海交通大学计算机考研信息汇总
- 智能楼宇一卡通系统QY-LY04A
- 卸载 make install 编译安装的软件
- Win10如何批量修改文件名,实现向后加固定的数字,001.jpg——999.jpg
- 成都拓嘉辰丰:拼多多子账号建立的方法?
- 夜游神安卓模拟器安装
- 【图解HTTP】返回结果的HTTP状态码
- 【Multisim仿真】74LS194组成的8个led流水灯
热门文章
- MyBatis-Plus学习
- mysql 5.7.26 linux安装_mysql-5.7.28 在Linux下的安装教程图解
- C# WPF中DataGrid的数据绑定(Binding)
- Usage of #pragma
- 以太坊智能合约gas如何估计?
- L2-004. 这是二叉搜索树吗?
- 菜鸟学Linux 第048篇笔记 配置slave server
- 视频直播技术(三):低延时直播经验总结
- Aaron Swartz Rewriting Reddit中关于web.py的创建思路
- linux之sed使用