文件格式

Spark对文件的读取和保存方式都很简单,会根据文件的扩展名选择对应的处理方式

Spark支持的一些常见格式
格式名称 结构化 备注
文本文件 普通的文本文件,每行一条记录
JSON 半结构化 常见的基于文本的格式,半结构化,大多数库都要求每行一条记录
CSV 非常常见的基于文本的格式,通常在电子表格应用中使用
sequenceFiles 一种用于键值对数据常见的Hadoop文件格式
protocol buffers 一种快速、节约空间的跨语言格式
对象文件

用来将Spark作业中的数据存储下来以让共享的代码读取。改变类的时候会失效,因为他依赖于java序列化

文本文件

当我们将一个文本文件读取为RDD时,输入的每一行都会成为RDD的一个元素,也可以将多个完整的文本文件一次性读取为一个pair RDD,其中键是文件名,值是文件内容。

读取文本文件

只需要使用文件路径作为参数调用SparkContext中的textFile()函数,就可以读取一个文本文件

读取一个文本文件

val input = sc.textFile("file:///home/holden/repos/spark/README.md")

如果文件足够小,可以使用SparkContext,wholeTextFiles()方法,该方法会返回一个pair RDD,其中键是输入文件的文件名

val input = sc.wholeTextFiles("file:///home/holden/salesFiles")
val result = input.mapValues{y =>val nums = y.split(" ").map(x => x.toDouble)nums.sum / nums.size.toDouble
}

JSON

JSON是一种使用较广的半结构化数据格式,读取JSON数据的最简单的方法可以在所有支持的编程语言中使用。然后使用JSON解释器来对RDD中的值进行映射操作,Scala中也可以使用一个自定义Hadoop格式来操作JSON数据。

在scala中读取JSON

import com.fasterxml.jackson.moudle.scala.DefaultScalaMoudle
import com.fasterxml.jackson.moudle.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMappr
import com.fasterxml.jackson.databind.DeserializationFeature
...
case class Person(name:String,lovesPandas:Boolean //必须是顶级类
...
//将其解析为特定的case class。使用flatMap,通过在遇到问题时返回空列表(None)
//来处理错误,而在没有问题时返回包含一个元素的列表(Some(_))
val result =input.flatMap(record =>{try{Some(mapper.readValue(record,classOf[Person]))}catch{case e:Exception => None}})

在scala中保存为JSON

result.filter(p => p.lovesPandas).map(mapper.writeValueAsString(_)).saveAsTextFile(outputFile)

逗号分隔值(CSV)与制表符分割值(TSV)

读取CSV

读取CSV/TSV数据和读取JSON数据相似,都需要先把文件当做普通文本文件来读取数据,再对数据进行处理。由于格式标准的缺失,同一个库的不同版本有时也会用不同的方式处理输入数据。

如果你的CSV的所有数据字段均没有包含换行符,你也可以使用textFile()读取并解析数据

import Java.io.StringReader
import au.com.bytecode.opencsv.CSVReader
...
val input = sc.textFile(inputFile)
val result = input.map{line => val reader = new CSVReader(new StringReader(line));reader.readNext();}

如果在字段中嵌有换行符,就需要完整读入每个文件,然后解析。

case class Person(name: String,favoriteAnimal:String)val input = sc.wholeTextFiles(inputFile)
val result = input.flatMap{case (_,txt) =>val reader = new CSVReader(new StringReader(txt));reader.readAll().map(x => Person(x(0),x(1)))
}

保存CSV

写出CSV/TSV数据很简单,可以通过重用输出编码器来加速

pandaLovers.map(persion => List(person.name,person.favoriteAnimal).toArray).mapPartitions{people => val stringWriter = new StringWriter();val csvWriter = new CSVWriter(stringWriter);csvWriter.writeAll(people.toList)Iterator(stringWriter.toString)}.saveAsTextFile(outFile)

上述的例子中只能在我们知道所有要输出的字段时使用,然而如果一些字段名是在运行时由用户输入决定的,就要使用别的方法了,最简单的方法是遍历所有的数据,提取不同的键,然后分别输出。

SequenceFile

SequenceFile是由没有相对关系结构的键值对文件组成常用的Hadoop格式,SequenceFile文件有同步标记,Spark可以用它来定位到文件中的某个点,然后再与记录的边界对其。这可以让Spark使用多个节点高效的并行读取SequenceFile文件。

读取SequenceFile

Spark有专门用来读取SequenceFile的接口。SparkContext中,可以调用sequenceFile(path,keyClass,valueClass,minPartitions)

val data = sc.sequenceFile(infile,classOf[Text],classOf[IntWritable]).map{case(x,y)=> (x.toString,y.get())}

保存SequenceFile

val data = sc.parallelize(List(("panda",3),("key",6),("Snail",2)))
data.saveAsSequenceFile(outputFile)

Spark-数据读取与保存(Scala版)相关推荐

  1. pandas 数据读取与保存

    pandas 数据读取与保存 一:读取表格数据 例:在一个text.xlsx文件中,有text1,text2,text3三张表格 sheetname 切换sheet表格 可以传入整形,表示从0开始的索 ...

  2. Flash数据读取和保存

    实现方法 Flash数据读取和保存的目的是在单片机的程序存储区开辟一块空间专门用来保存系统需要记忆的参数和数据,从而完全取代EEROM,达到降低成本和数据保密的目的.该实现方法主要分为四个部分: FL ...

  3. sparksql 保存点_Spark(十二)【SparkSql中数据读取和保存】

    一. 读取和保存说明 SparkSQL提供了通用的保存数据和数据加载的方式,还提供了专用的方式 读取:通用和专用 保存 保存有四种模式: 默认: error : 输出目录存在就报错 append: 向 ...

  4. TDMS数据 读取/转换/保存 为MATLAB/Python 可读取的通用数据格式的的方法

    TDMS格式是NI主推的高速测试测量采集系统中的一种二进制数据存储类型,适合存储海量才几级数据,兼有高速.方便和易存取等多种优点.做过实际测量项目的筒子们在NI的相关平台例如:CompactRIO/L ...

  5. Spark学习笔记:数据读取和保存

    spark所支持的文件格式 1.文本文件 在 Spark 中读写文本文件很容易. 当我们将一个文本文件读取为 RDD 时,输入的每一行 都会成为 RDD 的 一个元素. 也可以将多个完整的文本文件一次 ...

  6. python 对json数据读取及保存与读取,对dump,dumps,load,loads的理解

    一.对json文件的读取 data1={"programmers":[{ "firstName": "Brett", "lastN ...

  7. CGAL 点云数据读取与保存

    文章目录 一.简介 二.XYZ格式 三.LAS格式 四.OFF格式 五.PLY格式 参考资料 一.简介 点云中的点通常以纯文本格式存储(表示为"XYZ"格式),其中每个点由换行符分 ...

  8. spark学习-57-Spark下Scala版HBase下的根据权重获取最真实数据

    @[TOC] Scala版HBase下的根据权重获取最真实数据 和 java版HBase下的根据权重获取最真实数据 虽然处理流程相同,但是有很多细节需要注意 看这个之前先看 java版HBase下的根 ...

  9. spark SQL(三)数据源 Data Source----通用的数据 加载/保存功能

    Spark SQL 的数据源------通用的数据 加载/保存功能 Spark SQL支持通过DataFrame接口在各种数据源上进行操作.DataFrame可以使用关系变换进行操作,也可以用来创建临 ...

最新文章

  1. python 进程间通信效率_Python进程间通信 multiProcessing Queue队列实现详解
  2. 关于mysql数据库插入数据,不能插入中文和出现中文乱码问题
  3. java自定义标签 map_基于Spring MVC的自定义标签Tag
  4. 论文翻译 基于R-FCN的物体检测
  5. xmanager 3 远程连接REDHAT 4 桌面
  6. python 字典定义日志用法_python中字典(Dictionary)用法实例详解
  7. class 和 struct的区别
  8. [nRF51822] 13、浅谈nRF51822和NRF24LE1/NRF24LU1/NRF24L01经典2.4G模块无线通信配置与流程...
  9. SSM项目-山东医院-可行性配置-1
  10. sql语句查询结果合并union all用法_数据库技巧
  11. Appium脚本(2):元素检测
  12. java编程 停等协议_在应用层模拟实用停等协议
  13. eclipse汉化教程,保姆级教学,解决下载缓慢、安装不上等问题,随时中英互换,官网汉化包
  14. 数学分析教程(科大)——1.9笔记+习题
  15. (七)CMake变量及其设置
  16. MATALB APP DESIGNER 回调函数创建及StartupFcn函数
  17. 9.DNS和DNSmasq服务
  18. python提取发票信息发票识别_python 发票识别
  19. “啪”一炮就通!管道疏通神器终于诞生,马桶、下水道再也不怕堵!
  20. linux串口termios

热门文章

  1. IT职场人,切不要一辈子靠技术生存(转载)
  2. bson java_Java BSON使用
  3. java基于SSM框架的洗车店预约系统的设计与实现
  4. 关于binary diff / patch 差分补丁工具的那些事
  5. “智能+”为制造业转型升级赋能
  6. 域名租用的时候要注意什么?
  7. TP5验证码生成及验证
  8. Ubuntu安装驱动
  9. 编译原理学习笔记·语法分析(LL(1)分析法/算符优先分析法OPG)及例子详解
  10. 今日说“法”:上拉、下拉电阻那点事