Spark常用RDD算子 - saveAsTextFile、saveAsObjectFile 可保存到本地文件或hdfs系统中
saveAsTextFile
函数原型
def saveAsTextFile(path: String): Unit
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit
saveAsTextFile用于将RDD以文本文件的格式存储到文件系统中。
从源码中可以看到,saveAsTextFile函数是依赖于saveAsHadoopFile函数,由于saveAsHadoopFile函数接受PairRDD,所以在saveAsTextFile函数中利用rddToPairRDDFunctions函数转化为(NullWritable,Text)类型的RDD,然后通过saveAsHadoopFile函数实现相应的写操作。
源码分析
def saveAsTextFile(path: String): Unit = withScope { // https://issues.apache.org/jira/browse/SPARK-2075 // // NullWritable is a `Comparable` in Hadoop 1.+, so the compiler cannot find an implicit // Ordering for it and will use the default `null`. However, it's a `Comparable[NullWritable]` // in Hadoop 2.+, so the compiler will call the implicit `Ordering.ordered` method to create an // Ordering for `NullWritable`. That's why the compiler will generate different anonymous // classes for `saveAsTextFile` in Hadoop 1.+ and Hadoop 2.+. // // Therefore, here we provide an explicit Ordering `null` to make sure the compiler generate // same bytecodes for `saveAsTextFile`. val nullWritableClassTag = implicitly[ClassTag[NullWritable]] val textClassTag = implicitly[ClassTag[Text]] val r = this.mapPartitions { iter => val text = new Text() iter.map { x => text.set(x.toString) (NullWritable.get(), text) } } RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null) .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)}/** * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class * supporting the key and value types K and V in this RDD. */def saveAsHadoopFile( path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: JobConf = new JobConf(self.context.hadoopConfiguration), codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope { // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). val hadoopConf = conf hadoopConf.setOutputKeyClass(keyClass) hadoopConf.setOutputValueClass(valueClass) // Doesn't work in Scala 2.9 due to what may be a generics bug // TODO: Should we uncomment this for Scala 2.10? // conf.setOutputFormat(outputFormatClass) hadoopConf.set("mapred.output.format.class", outputFormatClass.getName) for (c <- codec) { hadoopConf.setCompressMapOutput(true) hadoopConf.set("mapred.output.compress", "true") hadoopConf.setMapOutputCompressorClass(c) hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName) hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString) } // Use configured output committer if already set if (conf.getOutputCommitter == null) { hadoopConf.setOutputCommitter(classOf[FileOutputCommitter]) } FileOutputFormat.setOutputPath(hadoopConf, SparkHadoopWriter.createPathFromString(path, hadoopConf)) saveAsHadoopDataset(hadoopConf)}/** * Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for * that storage system. The JobConf should set an OutputFormat and any output paths required * (e.g. a table name to write to) in the same way as it would be configured for a Hadoop * MapReduce job. */def saveAsHadoopDataset(conf: JobConf): Unit = self.withScope { // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). val hadoopConf = conf val wrappedConf = new SerializableConfiguration(hadoopConf) val outputFormatInstance = hadoopConf.getOutputFormat val keyClass = hadoopConf.getOutputKeyClass val valueClass = hadoopConf.getOutputValueClass if (outputFormatInstance == null) { throw new SparkException("Output format class not set") } if (keyClass == null) { throw new SparkException("Output key class not set") } if (valueClass == null) { throw new SparkException("Output value class not set") } SparkHadoopUtil.get.addCredentials(hadoopConf) logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " + valueClass.getSimpleName + ")") if (isOutputSpecValidationEnabled) { // FileOutputFormat ignores the filesystem parameter val ignoredFs = FileSystem.get(hadoopConf) hadoopConf.getOutputFormat.checkOutputSpecs(ignoredFs, hadoopConf) } val writer = new SparkHadoopWriter(hadoopConf) writer.preSetup() val writeToFile = (context: TaskContext, iter: Iterator[(K, V)]) => { val config = wrappedConf.value // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it // around by taking a mod. We expect that no task will be attempted 2 billion times. val taskAttemptId = (context.taskAttemptId % Int.MaxValue).toInt val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context) writer.setup(context.stageId, context.partitionId, taskAttemptId) writer.open() var recordsWritten = 0L Utils.tryWithSafeFinally { while (iter.hasNext) { val record = iter.next() writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef]) // Update bytes written metric every few records maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten) recordsWritten += 1 } } { writer.close() } writer.commit() bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) } outputMetrics.setRecordsWritten(recordsWritten) } self.context.runJob(self, writeToFile) writer.commitJob()}
saveAsObjectFile
函数原型
def saveAsObjectFile(path: String): Unit
saveAsObjectFile用于将RDD中的元素序列化成对象,存储到文件中。
从源码中可以看出,saveAsObjectFile函数是依赖于saveAsSequenceFile函数实现的,将RDD转化为类型为<NullWritable,BytesWritable>的PairRDD,然后通过saveAsSequenceFile函数实现。在spark的java版的api中没有实现saveAsSequenceFile函数,该函数类似于saveAsTextFile函数。
源码分析
def saveAsObjectFile(path: String): Unit = withScope { this.mapPartitions(iter => iter.grouped(10).map(_.toArray)) .map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x)))) .saveAsSequenceFile(path)
}def saveAsSequenceFile( path: String, codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope { def anyToWritable[U <% Writable](u: U): Writable = u // TODO We cannot force the return type of `anyToWritable` be same as keyWritableClass and // valueWritableClass at the compile time. To implement that, we need to add type parameters to // SequenceFileRDDFunctions. however, SequenceFileRDDFunctions is a public class so it will be a // breaking change. val convertKey = self.keyClass != keyWritableClass val convertValue = self.valueClass != valueWritableClass logInfo("Saving as sequence file of type (" + keyWritableClass.getSimpleName + "," + valueWritableClass.getSimpleName + ")" ) val format = classOf[SequenceFileOutputFormat[Writable, Writable]] val jobConf = new JobConf(self.context.hadoopConfiguration) if (!convertKey && !convertValue) { self.saveAsHadoopFile(path, keyWritableClass, valueWritableClass, format, jobConf, codec) } else if (!convertKey && convertValue) { self.map(x => (x._1, anyToWritable(x._2))).saveAsHadoopFile( path, keyWritableClass, valueWritableClass, format, jobConf, codec) } else if (convertKey && !convertValue) { self.map(x => (anyToWritable(x._1), x._2)).saveAsHadoopFile( path, keyWritableClass, valueWritableClass, format, jobConf, codec) } else if (convertKey && convertValue) { self.map(x => (anyToWritable(x._1), anyToWritable(x._2))).saveAsHadoopFile( path, keyWritableClass, valueWritableClass, format, jobConf, codec) }
}
读和写代码示例,也可写入到hdfs中,只是路径不一样而已
写入:javaRDD.saveAsTextFile,javaRDD.saveAsObjectFile
读取:javaSparkContext.textFile,javaSparkContext.objectFile
SparkConf sparkConf = new SparkConf().setAppName("spark-test-1").setMaster("local");SparkContext sparkContext = SparkContext.getOrCreate(sparkConf);
sparkContext.setLogLevel("ERROR");JavaSparkContext javaSparkContext = new JavaSparkContext(sparkContext);List<String> l1 = new ArrayList<>();
l1.add("dog");
l1.add("cat");
l1.add("gnu");
l1.add("salmon");
l1.add("rabbit");
l1.add("turkey");
l1.add("wolf");
l1.add("bear");
l1.add("bee");
JavaRDD<String> javaRDD = javaSparkContext.parallelize(l1, 3);//写入本地系统
javaRDD.saveAsTextFile("file:///C:/rdd");
javaRDD.saveAsObjectFile("file:///C:/rdd2");//写入hdfs系统
javaRDD.saveAsTextFile("hdfs://kncloud02:8020/user/rdd");
javaRDD.saveAsObjectFile("hdfs://kncloud02:8020/user/rdd2");//从本地系统中读出来
JavaRDD<String> rdd_1= javaSparkContext.textFile("file:///C:/rdd");
JavaRDD<Object> rdd_2= javaSparkContext.objectFile("file:///C:/rdd2");//从hdfs中读出来
JavaRDD<String> rdd_3= javaSparkContext.textFile("hdfs://kncloud02:8020/user/rdd");
JavaRDD<Object> rdd_2= javaSparkContext.objectFile("hdfs://kncloud02:8020/user/rdd2");
Spark常用RDD算子 - saveAsTextFile、saveAsObjectFile 可保存到本地文件或hdfs系统中相关推荐
- spark常用RDD算子 汇总(java和scala版本)
github: https://github.com/zhaikaishun/spark_tutorial spark RDD的算子挺多,有时候如何灵活的使用,该如何用一下子想不起来,这一段时间将s ...
- spark常用RDD算子 - take(),takeOrdered(),top(),first()
take(),takeOrdered(),top() 返回一个列表,first()返回一个值 take def take(num: Int): Array[T] take用于获取RDD中从0到num- ...
- Scala发邮件(带附件,无论是本地文件和hdfs文件或df或rdd)
@羲凡--只为了更好的活着 Scala发邮件(带附件,无论是本地文件和hdfs文件或df或rdd) 在有些spark任务执行完后需要通知我们该任务已经执行结束,发邮件到某个邮箱是最直接的方式.那如何用 ...
- python保存运行结果下次使用_将python运行结果保存至本地文件中的示例讲解
一.建立文件,保存数据 1.使用python中内置的open函数 打开txt文件 #mode 模式 #w 只能操作写入 r 只能读取 a 向文件追加 #w+ 可读可写 r+可读可写 a+可读可追加 # ...
- java通过url获取网页内容_Java语言通过URL读取网页数据并保存到本地文件(代码实例)...
本文主要向大家介绍了Java语言通过URL读取网页数据并保存到本地文件(代码实例),通过具体的内容向大家展示,希望对大家学习JAVA语言有所帮助. Java通过URL读取网页数据并保存到本地文件(代码 ...
- Jmeter把响应数据结果保存到本地文件
最近做一个性能压测,需要用接口获取大量的数据,并获取该接口返回的一个字段值,于是找了如何把响应数据结果保存到本地文件. 如图,某接口需要用如图所示字段入参. 步骤一:先用正则表达式获取该字段的内容 步 ...
- Python爬取起点小说并保存到本地文件夹和MongoDB数据库中
Python爬取起点小说并保存到本地MongoDB数据库中 工具:Python3.7 + Mongo4.0 + Pycharm """ 爬取起点小说<诡秘之主> ...
- 爬虫实战5:爬取全部穿越火线武器的图片以武器名称命名保存到本地文件
申明:资料来源于网络及书本,通过理解.实践.整理成学习笔记. 文章目录 穿越火线官网 完整代码 运行结果 穿越火线官网 完整代码 import requests# 循环33次,官网武器库展示有33页 ...
- log日志打印封装,并保存到本地文件
封装了本地日志,可以通过config 文件动态控制log的打印,方便上线前日志打印的检查,并且如果在测试环境下,日志等级为i以上的日志都会存文件,并且文件以日期命名,最大数量为5,可以配置. 代码如下 ...
最新文章
- 信息安全系统设计基础期末总结
- Pytorch中的5个非常有用的张量操作
- 利用 RDA5807的RSSI测量RF强度
- android 打开支付宝扫码页_Chrome 85正式版发布:新增标签页分组功,网页多了也不乱...
- 有用户反映小米手机充电变慢,官方回应:天气过热
- java 输出xml文件_java解析xml文件并输出
- Android源码网站
- 用python做爬虫看图软件-RandomPicture
- 从键盘上键入1~7的数字,输出对应星期以及英文缩写
- 气相色谱仪排除问题S级详情讲解【Chro】
- 【云计算学习教程】云计算是什么?它有哪些形式?
- 社会实践分组(c++)
- 互联网晚报 | 1月13日 星期四 | 恒驰5首车下线;抖音电商测试快递服务“音尊达”;中国移动10086 App月底停止运营...
- 基于安卓的校园订餐系统开发设计
- 深夜,想到今天学的linux内容,太值了
- 李有志——预谋着向生活发起冲锋...
- mpc 安全多方计算协议_一文揭秘跨链、密钥管理、合约隐私背后的技术, 你不可不知的安全多方计算(MPC)...
- 看看你的老祖宗是谁,姓氏血统图及各姓图腾
- python 同时赋值_python同时给多个变量赋值
- Redis Essentials 读书笔记 - 第一章: Getting Started (The Baby Steps)
热门文章
- 【行业应用】一文讲通电力数字化转型
- 【NLP】可交互的 Attention 可视化工具!我的Transformer可解释性有救了?
- 【NLP】一文搞懂NLP中的对抗训练
- 【资源】太赞了!程序员应该访问的最佳网站都在这里了!
- 还怕GPU资源不够用?多实例GPU MIG助攻资源利用难题
- AI入门:无门槛可以玩的神经网络
- 评价指标:目标检测的评价指标 - mAP
- 一文看尽8篇目标检测最新论文(EfficientDet/EdgeNet/ASFF/RoIMix/SCL/EFGRNet等)
- 【InfoQ大咖说直播回放】老司机聊程序员的职场道路选择
- LeetCode_559.N叉树的最大深度