saveAsTextFile

函数原型

def saveAsTextFile(path: String): Unit
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit

saveAsTextFile用于将RDD以文本文件的格式存储到文件系统中。
从源码中可以看到,saveAsTextFile函数是依赖于saveAsHadoopFile函数,由于saveAsHadoopFile函数接受PairRDD,所以在saveAsTextFile函数中利用rddToPairRDDFunctions函数转化为(NullWritable,Text)类型的RDD,然后通过saveAsHadoopFile函数实现相应的写操作。

源码分析

  def saveAsTextFile(path: String): Unit = withScope {  // https://issues.apache.org/jira/browse/SPARK-2075  //  // NullWritable is a `Comparable` in Hadoop 1.+, so the compiler cannot find an implicit  // Ordering for it and will use the default `null`. However, it's a `Comparable[NullWritable]`  // in Hadoop 2.+, so the compiler will call the implicit `Ordering.ordered` method to create an  // Ordering for `NullWritable`. That's why the compiler will generate different anonymous  // classes for `saveAsTextFile` in Hadoop 1.+ and Hadoop 2.+.  //  // Therefore, here we provide an explicit Ordering `null` to make sure the compiler generate  // same bytecodes for `saveAsTextFile`.  val nullWritableClassTag = implicitly[ClassTag[NullWritable]]  val textClassTag = implicitly[ClassTag[Text]]  val r = this.mapPartitions { iter =>    val text = new Text()    iter.map { x =>      text.set(x.toString)      (NullWritable.get(), text)    }  }  RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)    .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)}/** * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class * supporting the key and value types K and V in this RDD. */def saveAsHadoopFile(    path: String,    keyClass: Class[_],    valueClass: Class[_],    outputFormatClass: Class[_ <: OutputFormat[_, _]],    conf: JobConf = new JobConf(self.context.hadoopConfiguration),    codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope {  // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).  val hadoopConf = conf  hadoopConf.setOutputKeyClass(keyClass)  hadoopConf.setOutputValueClass(valueClass)  // Doesn't work in Scala 2.9 due to what may be a generics bug  // TODO: Should we uncomment this for Scala 2.10?  // conf.setOutputFormat(outputFormatClass)  hadoopConf.set("mapred.output.format.class", outputFormatClass.getName)  for (c <- codec) {    hadoopConf.setCompressMapOutput(true)    hadoopConf.set("mapred.output.compress", "true")    hadoopConf.setMapOutputCompressorClass(c)    hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName)    hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)  }  // Use configured output committer if already set  if (conf.getOutputCommitter == null) {    hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])  }  FileOutputFormat.setOutputPath(hadoopConf,   SparkHadoopWriter.createPathFromString(path, hadoopConf))  saveAsHadoopDataset(hadoopConf)}/** * Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for * that storage system. The JobConf should set an OutputFormat and any output paths required * (e.g. a table name to write to) in the same way as it would be configured for a Hadoop * MapReduce job. */def saveAsHadoopDataset(conf: JobConf): Unit = self.withScope {  // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).  val hadoopConf = conf  val wrappedConf = new SerializableConfiguration(hadoopConf)  val outputFormatInstance = hadoopConf.getOutputFormat  val keyClass = hadoopConf.getOutputKeyClass  val valueClass = hadoopConf.getOutputValueClass  if (outputFormatInstance == null) {    throw new SparkException("Output format class not set")  }  if (keyClass == null) {    throw new SparkException("Output key class not set")  }  if (valueClass == null) {    throw new SparkException("Output value class not set")  }  SparkHadoopUtil.get.addCredentials(hadoopConf)  logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " +    valueClass.getSimpleName + ")")  if (isOutputSpecValidationEnabled) {    // FileOutputFormat ignores the filesystem parameter    val ignoredFs = FileSystem.get(hadoopConf)    hadoopConf.getOutputFormat.checkOutputSpecs(ignoredFs, hadoopConf)  }  val writer = new SparkHadoopWriter(hadoopConf)  writer.preSetup()  val writeToFile = (context: TaskContext, iter: Iterator[(K, V)]) => {    val config = wrappedConf.value    // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it    // around by taking a mod. We expect that no task will be attempted 2 billion times.    val taskAttemptId = (context.taskAttemptId % Int.MaxValue).toInt    val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context)    writer.setup(context.stageId, context.partitionId, taskAttemptId)    writer.open()    var recordsWritten = 0L    Utils.tryWithSafeFinally {      while (iter.hasNext) {        val record = iter.next()        writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])        // Update bytes written metric every few records        maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten)        recordsWritten += 1      }    } {      writer.close()    }    writer.commit()    bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) }    outputMetrics.setRecordsWritten(recordsWritten)  }  self.context.runJob(self, writeToFile)  writer.commitJob()}

saveAsObjectFile

函数原型

def saveAsObjectFile(path: String): Unit

saveAsObjectFile用于将RDD中的元素序列化成对象,存储到文件中。
从源码中可以看出,saveAsObjectFile函数是依赖于saveAsSequenceFile函数实现的,将RDD转化为类型为<NullWritable,BytesWritable>的PairRDD,然后通过saveAsSequenceFile函数实现。在spark的java版的api中没有实现saveAsSequenceFile函数,该函数类似于saveAsTextFile函数。

源码分析

def saveAsObjectFile(path: String): Unit = withScope {  this.mapPartitions(iter => iter.grouped(10).map(_.toArray))    .map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x))))    .saveAsSequenceFile(path)
}def saveAsSequenceFile(    path: String,    codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope {  def anyToWritable[U <% Writable](u: U): Writable = u  // TODO We cannot force the return type of `anyToWritable` be same as keyWritableClass and  // valueWritableClass at the compile time. To implement that, we need to add type parameters to  // SequenceFileRDDFunctions. however, SequenceFileRDDFunctions is a public class so it will be a  // breaking change.  val convertKey = self.keyClass != keyWritableClass  val convertValue = self.valueClass != valueWritableClass  logInfo("Saving as sequence file of type (" + keyWritableClass.getSimpleName + "," +    valueWritableClass.getSimpleName + ")" )  val format = classOf[SequenceFileOutputFormat[Writable, Writable]]  val jobConf = new JobConf(self.context.hadoopConfiguration)  if (!convertKey && !convertValue) {    self.saveAsHadoopFile(path, keyWritableClass, valueWritableClass, format, jobConf, codec)  } else if (!convertKey && convertValue) {    self.map(x => (x._1, anyToWritable(x._2))).saveAsHadoopFile(      path, keyWritableClass, valueWritableClass, format, jobConf, codec)  } else if (convertKey && !convertValue) {    self.map(x => (anyToWritable(x._1), x._2)).saveAsHadoopFile(      path, keyWritableClass, valueWritableClass, format, jobConf, codec)  } else if (convertKey && convertValue) {    self.map(x => (anyToWritable(x._1), anyToWritable(x._2))).saveAsHadoopFile(      path, keyWritableClass, valueWritableClass, format, jobConf, codec)  }
}

读和写代码示例,也可写入到hdfs中,只是路径不一样而已

写入:javaRDD.saveAsTextFile,javaRDD.saveAsObjectFile

读取:javaSparkContext.textFile,javaSparkContext.objectFile

SparkConf sparkConf = new SparkConf().setAppName("spark-test-1").setMaster("local");SparkContext sparkContext = SparkContext.getOrCreate(sparkConf);
sparkContext.setLogLevel("ERROR");JavaSparkContext javaSparkContext = new JavaSparkContext(sparkContext);List<String> l1 = new ArrayList<>();
l1.add("dog");
l1.add("cat");
l1.add("gnu");
l1.add("salmon");
l1.add("rabbit");
l1.add("turkey");
l1.add("wolf");
l1.add("bear");
l1.add("bee");
JavaRDD<String> javaRDD = javaSparkContext.parallelize(l1, 3);//写入本地系统
javaRDD.saveAsTextFile("file:///C:/rdd");
javaRDD.saveAsObjectFile("file:///C:/rdd2");//写入hdfs系统
javaRDD.saveAsTextFile("hdfs://kncloud02:8020/user/rdd");
javaRDD.saveAsObjectFile("hdfs://kncloud02:8020/user/rdd2");//从本地系统中读出来
JavaRDD<String> rdd_1= javaSparkContext.textFile("file:///C:/rdd");
JavaRDD<Object> rdd_2= javaSparkContext.objectFile("file:///C:/rdd2");//从hdfs中读出来
JavaRDD<String> rdd_3= javaSparkContext.textFile("hdfs://kncloud02:8020/user/rdd");
JavaRDD<Object> rdd_2= javaSparkContext.objectFile("hdfs://kncloud02:8020/user/rdd2");

Spark常用RDD算子 - saveAsTextFile、saveAsObjectFile 可保存到本地文件或hdfs系统中相关推荐

  1. spark常用RDD算子 汇总(java和scala版本)

    github: https://github.com/zhaikaishun/spark_tutorial  spark RDD的算子挺多,有时候如何灵活的使用,该如何用一下子想不起来,这一段时间将s ...

  2. spark常用RDD算子 - take(),takeOrdered(),top(),first()

    take(),takeOrdered(),top() 返回一个列表,first()返回一个值 take def take(num: Int): Array[T] take用于获取RDD中从0到num- ...

  3. Scala发邮件(带附件,无论是本地文件和hdfs文件或df或rdd)

    @羲凡--只为了更好的活着 Scala发邮件(带附件,无论是本地文件和hdfs文件或df或rdd) 在有些spark任务执行完后需要通知我们该任务已经执行结束,发邮件到某个邮箱是最直接的方式.那如何用 ...

  4. python保存运行结果下次使用_将python运行结果保存至本地文件中的示例讲解

    一.建立文件,保存数据 1.使用python中内置的open函数 打开txt文件 #mode 模式 #w 只能操作写入 r 只能读取 a 向文件追加 #w+ 可读可写 r+可读可写 a+可读可追加 # ...

  5. java通过url获取网页内容_Java语言通过URL读取网页数据并保存到本地文件(代码实例)...

    本文主要向大家介绍了Java语言通过URL读取网页数据并保存到本地文件(代码实例),通过具体的内容向大家展示,希望对大家学习JAVA语言有所帮助. Java通过URL读取网页数据并保存到本地文件(代码 ...

  6. Jmeter把响应数据结果保存到本地文件

    最近做一个性能压测,需要用接口获取大量的数据,并获取该接口返回的一个字段值,于是找了如何把响应数据结果保存到本地文件. 如图,某接口需要用如图所示字段入参. 步骤一:先用正则表达式获取该字段的内容 步 ...

  7. Python爬取起点小说并保存到本地文件夹和MongoDB数据库中

    Python爬取起点小说并保存到本地MongoDB数据库中 工具:Python3.7 + Mongo4.0 + Pycharm """ 爬取起点小说<诡秘之主> ...

  8. 爬虫实战5:爬取全部穿越火线武器的图片以武器名称命名保存到本地文件

    申明:资料来源于网络及书本,通过理解.实践.整理成学习笔记. 文章目录 穿越火线官网 完整代码 运行结果 穿越火线官网 完整代码 import requests# 循环33次,官网武器库展示有33页 ...

  9. log日志打印封装,并保存到本地文件

    封装了本地日志,可以通过config 文件动态控制log的打印,方便上线前日志打印的检查,并且如果在测试环境下,日志等级为i以上的日志都会存文件,并且文件以日期命名,最大数量为5,可以配置. 代码如下 ...

最新文章

  1. 信息安全系统设计基础期末总结
  2. Pytorch中的5个非常有用的张量操作
  3. 利用 RDA5807的RSSI测量RF强度
  4. android 打开支付宝扫码页_Chrome 85正式版发布:新增标签页分组功,网页多了也不乱...
  5. 有用户反映小米手机充电变慢,官方回应:天气过热
  6. java 输出xml文件_java解析xml文件并输出
  7. Android源码网站
  8. 用python做爬虫看图软件-RandomPicture
  9. 从键盘上键入1~7的数字,输出对应星期以及英文缩写
  10. 气相色谱仪排除问题S级详情讲解【Chro】
  11. 【云计算学习教程】云计算是什么?它有哪些形式?
  12. 社会实践分组(c++)
  13. 互联网晚报 | 1月13日 星期四 | 恒驰5首车下线;抖音电商测试快递服务“音尊达”;中国移动10086 App月底停止运营...
  14. 基于安卓的校园订餐系统开发设计
  15. 深夜,想到今天学的linux内容,太值了
  16. 李有志——预谋着向生活发起冲锋...
  17. mpc 安全多方计算协议_一文揭秘跨链、密钥管理、合约隐私背后的技术, 你不可不知的安全多方计算(MPC)...
  18. 看看你的老祖宗是谁,姓氏血统图及各姓图腾
  19. python 同时赋值_python同时给多个变量赋值
  20. Redis Essentials 读书笔记 - 第一章: Getting Started (The Baby Steps)

热门文章

  1. 【行业应用】一文讲通电力数字化转型
  2. 【NLP】可交互的 Attention 可视化工具!我的Transformer可解释性有救了?
  3. 【NLP】一文搞懂NLP中的对抗训练
  4. 【资源】太赞了!程序员应该访问的最佳网站都在这里了!
  5. 还怕GPU资源不够用?多实例GPU MIG助攻资源利用难题
  6. AI入门:无门槛可以玩的神经网络
  7. 评价指标:目标检测的评价指标 - mAP
  8. 一文看尽8篇目标检测最新论文(EfficientDet/EdgeNet/ASFF/RoIMix/SCL/EFGRNet等)
  9. 【InfoQ大咖说直播回放】老司机聊程序员的职场道路选择
  10. LeetCode_559.N叉树的最大深度