from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('IO').getOrCreate()

read txt

  • textFile : only text content
  • wholeTextFiles : file path and text content
  • 通用的三个参数:
    • path
    • minPartitions
    • use_unicode : default False
data = spark.sparkContext.textFile('./chapter6/dataFiles/shakespearePlays.txt', 2)data_list = data.collect()
data_list
["Love's Labour's Lost     ","A Midsummer Night's Dream",'Much Ado About Nothing','As You Like It']
data.count()
4
# 字符总长度
char_lenght = data.map(lambda x: len(x))char_lenght.sum()
86
whole_data = spark.sparkContext.wholeTextFiles('./chapter6/dataFiles/shakespearePlays.txt',2)whole_data_list = whole_data.collect()
whole_data_list
[('file:/home/wjh/PYspark/pyspark-recipes/code_mishra/chapter6/dataFiles/shakespearePlays.txt',"Love's Labour's Lost     \nA Midsummer Night's Dream\nMuch Ado About Nothing\nAs You Like It\n")]
whole_data.keys().collect()
['file:/home/wjh/PYspark/pyspark-recipes/code_mishra/chapter6/dataFiles/shakespearePlays.txt']
whole_data.values().collect()
["Love's Labour's Lost     \nA Midsummer Night's Dream\nMuch Ado About Nothing\nAs You Like It\n"]

write txt

data = spark.sparkContext.textFile('./chapter6/dataFiles/shakespearePlays.txt',4)
dataLineLength = data.map(lambda x : len(x))dataLineLength.saveAsTextFile('./chapter6/dataFiles/save_rdd')

read direction

# Reading a directory using textFile() function.>>> manyFilePlayData = sc.textFile('/home/pysparkbook/pysparkBookData/manyFiles',4)>>> manyFilePlayData.collect()# Reading a directory using wholeTextFiles() function.>>> manyFilePlayDataKeyValue = sc.wholeTextFiles('/home/pysparkbook/pysparkBookData/manyFiles',4)
>>> manyFilePlayDataKeyValue.collect()

read hdfs

# Reading  Data from HDFS.>>> filamentdata = sc.textFile('hdfs://localhost:9746/bookData/filamentData.csv',4)

save hdfs

>>> playData = sc.textFile('/home/muser/bData/shakespearePlays.txt',4)
>>> playDataLineLength = playData.map(lambda x : len(x))
>>> playDataLineLength.collect()#  Saving RDD to HDFS.>>> playDataLineLength.saveAsTextFile('hdfs://localhost:9746/savedData/')###  hadoop fs -cat /savedData/part-00000

Reading Data from Sequential File

What is a sequential file?
A sequential file is one whose contents is stored in some order. It must always be read starting from the beginning of the file. This is opposed to a direct access file, whose contents can be retrieved without reading the entire file.
  • seqenceFile():

    • path
    • keyClass : indicates the key class of data in the sequence file
    • valueClass datatype of the values
>>> simpleRDD = sc.sequenceFile('hdfs://localhost:9746/sequenceFileToRead')
>>> simpleRDD.collect()

Write Data to a Sequential File

Data = [('si1','Python'),('si3','Java'),('si1','Java'),('si2','Python'),('si3','Ruby'),('si4','C++'),('si5','C'),('si4','Python'),('si2','Java')]RDD = sc.parallelize(subjectsData, 4)RDD.saveAsSequenceFile('hdfs://localhost:9746/sequenceFiles')

read csv file

import csv
import  io
def parseCSV(csvRow) :data = io.StringIO(csvRow)dataReader =  csv.reader(data)return([x for x in dataReader])csvRow = "p,s,r,p"
parseCSV(csvRow)
[['p', 's', 'r', 'p']]

read json file

import jsondef jsonParse(dataLine):parsedDict = json.loads(dataLine)valueData = parsedDict.values()return(valueData)jsonData = '{"Time":"6AM",  "Temperature":15}'
jsonParsedData = jsonParse(jsonData)
print(jsonParsedData)
dict_values(['6AM', 15])
>>> tempData = sc.textFile("/home/pysparkbook//pysparkBookData/tempData.json",4)>>> tempData.take(4)# Creating paired RDD.
>>> tempDataParsed = tempData.map(jsonParse)
# write json format
def createJSON(data):dataDict = {}dataDict['Name'] = data[0]dataDict['Age'] = data[1]return(json.dumps(dataDict))nameAgeList = ['Arun',22]createJSON(nameAgeList)
'{"Name": "Arun", "Age": 22}'
>>> nameAgeData = [['Arun',22],
...                                  ['Bony',35],
...                                  ['Juna',29]]
>>> nameAgeRDD = sc.parallelize(nameAgeData,3)>>> nameAgeRDD.collect()>>> nameAgeJSON = nameAgeRDD.map(createJSON)
>>> nameAgeJSON.collect()
>>> nameAgeJSON.saveAsTextFile('/home/pysparkbook/jsonDir/')

Reading table data from HBase using PySpark

>>> hostName = 'localhost'>>> tableName = 'pysparkBookTable'>>> ourInputFormatClass='org.apache.hadoop.hbase.mapreduce.TableInputFormat'
>>> ourKeyClass='org.apache.hadoop.hbase.io.ImmutableBytesWritable'
>>> ourValueClass='org.apache.hadoop.hbase.client.Result'
>>> ourKeyConverter='org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter'
>>> ourValueConverter='org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter'
>>> configuration = {}
>>> configuration['hbase.mapreduce.inputtable'] = tableName
>>> configuration['hbase.zookeeper.quorum'] = hostNameNow it is time to call the function newAPIHadoopRDD() with its arguments. >>> tableRDDfromHBase = sc.newAPIHadoopRDD(
...                        inputFormatClass = ourInputFormatClass,
...                        keyClass = ourKeyClass,
...                        valueClass = ourValueClass,
...                        keyConverter = ourKeyConverter,
...                        valueConverter = ourValueConverter,
...                        conf = configuration
...                     )Let us see how our paired RDD  tableRDDfromHBase looks like. >>> tableRDDfromHBase.take(2)

PySpark-Recipes : I/O操作(txt, json, hdfs, csv...)相关推荐

  1. solr6.6 导入 文本(txt/json/xml/csv)文件

    参照:solr6.6 导入 pdf文件 重点就是三个配置文件 1.建立的data-config.xml 内容如下: <dataConfig><dataSource name=&quo ...

  2. python之文件操作、对.txt文本文件的操作(读、写、修改、复制、合并)、对json文本文件的操作、json字符串与字典的相互转换。

    注意:本篇所讲的文件操作,都是对后缀为.txt..json文件进行的操作. 1.json其实就是一个文件的格式,像.txt一样,json也是一个纯文本文件.与.txt不同的是,json常用来存放有键值 ...

  3. scrapy 保存mysql_scrapy爬虫事件以及数据保存为txt,json,mysql

    今天要爬取的网页是虎嗅网 我们将完成如下几个步骤: 创建一个新的Scrapy工程 定义你所需要要抽取的Item对象 编写一个spider来爬取某个网站并提取出所有的Item对象 编写一个Item Pi ...

  4. Atittit HDFS hadoop 大数据文件系统java使用总结 目录 1. 操作系统,进行操作 1 2. Hdfs 类似nfs ftp远程分布式文件服务 2 3. 启动hdfs服务start

    Atittit HDFS hadoop 大数据文件系统java使用总结 目录 1. 操作系统,进行操作 1 2. Hdfs 类似nfs ftp远程分布式文件服务 2 3. 启动hdfs服务start- ...

  5. 实验二 HDFS的Shell命令操作,和HDFS的API操作

    文章目录 实验目的 一. 实验原理 二.实验准备 实验内容 步骤 项目1 HDFS常见命令练习 列出HDFS当前用户家目录下的文件及文件夹: 列出HDFS文件下名为directory的文件夹中的文件: ...

  6. Python处理数据,并经其存储为文本数据(Txt、JSON、CSV、Excel)总结

    处理文本数据(Txt.JSON.CSV.Excel) Txt文件存储 JSON文件存储 CSV文件存储 Excel文件存储 Txt文件存储 将数据保存为txt文件格式是最为简单的. with open ...

  7. python中txt转成csv_Python-如何将JSON转换为CSV?

    我有一个要转换为CSV文件的JSON文件.如何使用Python执行此操作? 我试过了: import json import csv f = open('data.json') data = json ...

  8. python应用中调用spark_在python中使用pyspark读写Hive数据操作

    1.读Hive表数据 pyspark读取hive数据非常简单,因为它有专门的接口来读取,完全不需要像hbase那样,需要做很多配置,pyspark提供的操作hive的接口,使得程序可以直接使用SQL语 ...

  9. MySQL操作之JSON数据类型操作详解

    MySQL操作之JSON数据类型操作详解 这篇文章主要介绍了MySQL操作之JSON数据类型操作详解,内容较为详细,具有收藏价值,需要的朋友可以参考. 概述 mysql自5.7.8版本开始,就支持了j ...

  10. C# 学习 txt -- excel txt -- json

    在Unity3D 中使用,需要配置环境,\Assets\Plugins  Txt  -->  Excel using System.Collections; using System.Colle ...

最新文章

  1. php解析html类库simple_html_dom
  2. 数据切分——Atlas读写分离Mysql集群的搭建
  3. 目标检测第4步:显卡、GPU、CUDA、cuDNN的介绍及如何在Windows 10下安装cuDNN?
  4. Python中列表的del,remove和pop函数之间的区别
  5. vs设计窗口不见了_龙猫腕表评测:VS沛纳海320V2版本
  6. php采集url,PHP-如何采集这个url跳转内容呢
  7. 转:55个javascript处理网页技巧
  8. es大量数据导入效率优化
  9. Eclipse修改svn地址
  10. 你的名字比我生命更重要
  11. 主流强化学习算法论文综述:DQN、DDPG、TRPO、A3C、PPO、SAC、TD3
  12. 跨时钟域问题(三)异步FIFO的Verilog实现(格雷码)
  13. 【2023】上海交通大学计算机考研信息汇总
  14. 智能楼宇一卡通系统QY-LY04A
  15. 卸载 make install 编译安装的软件
  16. Win10如何批量修改文件名,实现向后加固定的数字,001.jpg——999.jpg
  17. 成都拓嘉辰丰:拼多多子账号建立的方法?
  18. 夜游神安卓模拟器安装
  19. 【图解HTTP】返回结果的HTTP状态码
  20. 【Multisim仿真】74LS194组成的8个led流水灯

热门文章

  1. MyBatis-Plus学习
  2. mysql 5.7.26 linux安装_mysql-5.7.28 在Linux下的安装教程图解
  3. C# WPF中DataGrid的数据绑定(Binding)
  4. Usage of #pragma
  5. 以太坊智能合约gas如何估计?
  6. L2-004. 这是二叉搜索树吗?
  7. 菜鸟学Linux 第048篇笔记 配置slave server
  8. 视频直播技术(三):低延时直播经验总结
  9. Aaron Swartz Rewriting Reddit中关于web.py的创建思路
  10. linux之sed使用