

Reduces the elements of this RDD using the specified commutative and associative binary operator.


def reduce(f: JFunction2[T, T, T]): T



def reduce(f: (T, T) => T): T = withScope {  val cleanF = sc.clean(f)  val reducePartition: Iterator[T] => Option[T] = iter => {    if (iter.hasNext) {      Some(iter.reduceLeft(cleanF))    } else {      None    }  }  var jobResult: Option[T] = None  val mergeResult = (index: Int, taskResult: Option[T]) => {    if (taskResult.isDefined) {      jobResult = jobResult match {        case Some(value) => Some(f(value, taskResult.get))        case None => taskResult      }    }  }  sc.runJob(this, reducePartition, mergeResult)  // Get the final result out of our Option, or throw an exception if the RDD was empty  jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))



JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2);JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,3);Integer reduceRDD = javaRDD.reduce(new Function2<Integer, Integer, Integer>() {    @Override    public Integer call(Integer v1, Integer v2) throws Exception {        return v1 + v2;    }
System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + reduceRDD);



Aggregate the elements of each partition, and then the results for all the partitions, using given combine functions and a neutral "zero value". This function can return a different result type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are allowed to modify and return their first argument instead of creating a new U to avoid memory allocation.


def aggregate[U](zeroValue: U)(seqOp: JFunction2[U, T, U],  combOp: JFunction2[U, U, U]): U



def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {  // Clone the zero value since we will also be serializing it as part of tasks  var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())  val cleanSeqOp = sc.clean(seqOp)  val cleanCombOp = sc.clean(combOp)  val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)  val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)  sc.runJob(this, aggregatePartition, mergeResult)  jobResult



JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,3);
Integer aggregateRDD = javaRDD.aggregate(2, new Function2<Integer, Integer, Integer>() {    @Override    public Integer call(Integer v1, Integer v2) throws Exception {        return v1 + v2;    }
}, new Function2<Integer, Integer, Integer>() {    @Override    public Integer call(Integer v1, Integer v2) throws Exception {          return v1 + v2;    }
System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + aggregateRDD);



Aggregate the elements of each partition, and then the results for all the partitions, using a given associative and commutative function and a neutral "zero value". The function op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object allocation; however, it should not modify t2.


def fold(zeroValue: T)(f: JFunction2[T, T, T]): T



def fold(zeroValue: T)(op: (T, T) => T): T = withScope {  // Clone the zero value since we will also be serializing it as part of tasks var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())  val cleanOp = sc.clean(op)  val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp)  val mergeResult = (index: Int, taskResult: T) => jobResult = op(jobResult, taskResult)  sc.runJob(this, foldPartition, mergeResult)  jobResult



List<String> data = Arrays.asList("5", "1", "1", "3", "6", "2", "2");
JavaRDD<String> javaRDD = javaSparkContext.parallelize(data,5);
JavaRDD<String> partitionRDD = javaRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {    @Override    public Iterator<String> call(Integer v1, Iterator<String> v2) throws Exception {        LinkedList<String> linkedList = new LinkedList<String>();        while(v2.hasNext()){            linkedList.add(v1 + "=" +;        }        return linkedList.iterator();    }
},false);System.out.println(partitionRDD.collect());String foldRDD = javaRDD.fold("0", new Function2<String, String, String>() {    @Override    public String call(String v1, String v2) throws Exception {        return v1 + " - " + v2;    }
System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + foldRDD);



Count the number of elements for each key, collecting the results to a local Map.Note that this method should only be used if the resulting map is expected to be small, as the whole thing is loaded into the driver's memory. To handle very large results, consider using rdd.mapValues(_ => 1L).reduceByKey(_ + _), which returns an RDD[T, Long] instead of a map.


def countByKey(): java.util.Map[K, Long]


def countByKey(): Map[K, Long] = self.withScope {  self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap

注意,从上述分析可以看出,countByKey操作将数据全部加载到driver端的内存,如果数据量比较大,可能出现OOM。因此,如果key数量比较多,建议进行rdd.mapValues(_ => 1L).reduceByKey(_ + _),返回RDD[T, Long]


List<String> data = Arrays.asList("5", "1", "1", "3", "6", "2", "2");
JavaRDD<String> javaRDD = javaSparkContext.parallelize(data,5);JavaRDD<String> partitionRDD = javaRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {    @Override      public Iterator<String> call(Integer v1, Iterator<String> v2) throws Exception {        LinkedList<String> linkedList = new LinkedList<String>();        while(v2.hasNext()){            linkedList.add(v1 + "=" +;        }        return linkedList.iterator();    }
JavaPairRDD<String,String> javaPairRDD = javaRDD.mapToPair(new PairFunction<String, String, String>() {    @Override    public Tuple2<String, String> call(String s) throws Exception {        return new Tuple2<String, String>(s,s);    }



Applies a function f to all elements of this RDD.


def foreach(f: VoidFunction[T])



def foreach(f: T => Unit): Unit = withScope {  val cleanF = sc.clean(f)  sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))


List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,3);
javaRDD.foreach(new VoidFunction<Integer>() {    @Override    public void call(Integer integer) throws Exception {        System.out.println(integer);    }



Applies a function f to each partition of this RDD.


def foreachPartition(f: VoidFunction[java.util.Iterator[T]])



def foreachPartition(f: Iterator[T] => Unit): Unit = withScope {  val cleanF = sc.clean(f)  sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))


List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,3);//获得分区ID
JavaRDD<String> partitionRDD = javaRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<Integer>, Iterator<String>>() {    @Override    public Iterator<String> call(Integer v1, Iterator<Integer> v2) throws Exception {        LinkedList<String> linkedList = new LinkedList<String>();        while(v2.hasNext()){            linkedList.add(v1 + "=" +;        }return linkedList.iterator();    }
javaRDD.foreachPartition(new VoidFunction<Iterator<Integer>>() {    @Override    public void call(Iterator<Integer> integerIterator) throws Exception {        System.out.println("___________begin_______________");        while(integerIterator.hasNext())            System.out.print( + "      ");        System.out.println("\n___________end_________________");    }



Return the list of values in the RDD for key `key`. This operation is done efficiently if the RDD has a known partitioner by only searching the partition that the key maps to.


def lookup(key: K): JList[V]



def lookup(key: K): Seq[V] = self.withScope {  self.partitioner match {    case Some(p) =>      val index = p.getPartition(key)      val process = (it: Iterator[(K, V)]) => {        val buf = new ArrayBuffer[V]        for (pair <- it if pair._1 == key) {          buf += pair._2        }        buf      } : Seq[V]      val res = self.context.runJob(self, process, Array(index), false)      res(0)    case None =>      self.filter(_._1 == key).map(_._2).collect()  }



List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data, 3);
JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() {    int i = 0;    @Override    public Tuple2<Integer, Integer> call(Integer integer) throws Exception {        i++;        return new Tuple2<Integer, Integer>(integer,i + integer);    }
System.out.println("lookup------------" + javaPairRDD.lookup(4));



Return this RDD sorted by the given key function.


def sortBy[S](f: JFunction[T, S], ascending: Boolean, numPartitions: Int): JavaRDD[T]



def sortBy[K](   f: (T) => K,    ascending: Boolean = true,    numPartitions: Int = this.partitions.length)    (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {  this.keyBy[K](f)      .sortByKey(ascending, numPartitions)      .values
* Creates tuples of the elements in this RDD by applying `f`.
def keyBy[K](f: T => K): RDD[(K, T)] = withScope {  val cleanedF = sc.clean(f)  map(x => (cleanedF(x), x))



List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data, 3);
final Random random = new Random(100);
JavaRDD<String> javaRDD1 = Function<Integer, String>() {    @Override    public String call(Integer v1) throws Exception {        return v1.toString() + "_" + random.nextInt(100);    }
JavaRDD<String> resultRDD = javaRDD1.sortBy(new Function<String, Object>() {    @Override    public Object call(String v1) throws Exception {        return v1.split("_")[1];    }
System.out.println("result--------------" + resultRDD.collect());



Returns the first k (smallest) elements from this RDD using the
natural ordering for T while maintain the order.


def takeOrdered(num: Int): JList[T]
def takeOrdered(num: Int, comp: Comparator[T]): JList[T]



def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {  if (num == 0) {    Array.empty  } else {    val mapRDDs = mapPartitions { items =>      // Priority keeps the largest elements, so let's reverse the ordering.      val queue = new BoundedPriorityQueue[T](num)(ord.reverse)      queue ++= util.collection.Utils.takeOrdered(items, num)(ord)      Iterator.single(queue)    }    if (mapRDDs.partitions.length == 0) {      Array.empty    } else {      mapRDDs.reduce { (queue1, queue2) =>        queue1 ++= queue2        queue1      }.toArray.sorted(ord)    }  }



public static class TakeOrderedComparator implements Serializable,Comparator<Integer>{    @Override    public int compare(Integer o1, Integer o2) {        return -o1.compareTo(o2);    }
List<Integer> data = Arrays.asList(5, 1, 0, 4, 4, 2, 2);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data, 3);
System.out.println("takeOrdered-----1-------------" + javaRDD.takeOrdered(2));
List<Integer> list = javaRDD.takeOrdered(2, new TakeOrderedComparator());
System.out.println("takeOrdered----2--------------" + list);



Return a fixed-size sampled subset of this RDD in an array


def takeSample(withReplacement: Boolean, num: Int): JList[T]def takeSample(withReplacement: Boolean, num: Int, seed: Long): JList[T] 

takeSample函数返回一个数组,在数据集中随机采样 num 个元素组成。


def takeSample(    withReplacement: Boolean,    num: Int,    seed: Long = Utils.random.nextLong): Array[T] =
{  val numStDev = 10.0  if (num < 0) {    throw new IllegalArgumentException("Negative number of elements requested")  } else if (num == 0) {    return new Array[T](0)  }  val initialCount = this.count()  if (initialCount == 0) {    return new Array[T](0)  }val maxSampleSize = Int.MaxValue - (numStDev * math.sqrt(Int.MaxValue)).toInt  if (num > maxSampleSize) {    throw new IllegalArgumentException("Cannot support a sample size > Int.MaxValue - " +      s"$numStDev * math.sqrt(Int.MaxValue)")  }  val rand = new Random(seed)    if (!withReplacement && num >= initialCount) {    return Utils.randomizeInPlace(this.collect(), rand)  }  val fraction = SamplingUtils.computeFractionForSampleSize(num, initialCount,    withReplacement)  var samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()  // If the first sample didn't turn out large enough, keep trying to take samples;  // this shouldn't happen often because we use a big multiplier for the initial size  var numIters = 0  while (samples.length < num) {    logWarning(s"Needed to re-sample due to insufficient sample size. Repeat #$numIters")    samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()    numIters += 1  }  Utils.randomizeInPlace(samples, rand).take(num)

从源码中可以看出,takeSample函数类似于sample函数,该函数接受三个参数,第一个参数withReplacement ,表示采样是否放回,true表示有放回的采样,false表示无放回采样;第二个参数num,表示返回的采样数据的个数,这个也是takeSample函数和sample函数的区别;第三个参数seed,表示用于指定的随机数生成器种子。另外,takeSample函数先是计算fraction,也就是采样比例,然后调用sample函数进行采样,并对采样后的数据进行collect(),最后调用take函数返回num个元素。注意,如果采样个数大于RDD的元素个数,且选择的无放回采样,则返回RDD的元素的个数。


List<Integer> data = Arrays.asList(5, 1, 0, 4, 4, 2, 2);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data, 3);
System.out.println("takeSample-----1-------------" + javaRDD.takeSample(true,2));
System.out.println("takeSample-----2-------------" + javaRDD.takeSample(true,2,100));
System.out.println("takeSample-----3-------------" + javaRDD.takeSample(true,20,100));
System.out.println("takeSample-----4-------------" + javaRDD.takeSample(false,20,100));



Aggregates the elements of this RDD in a multi-level tree pattern.


def treeAggregate[U](    zeroValue: U,    seqOp: JFunction2[U, T, U],    combOp: JFunction2[U, U, U],depth: Int): U
def treeAggregate[U](    zeroValue: U,    seqOp: JFunction2[U, T, U],    combOp: JFunction2[U, U, U]): U 



def treeAggregate[U: ClassTag](zeroValue: U)(    seqOp: (U, T) => U,    combOp: (U, U) => U,    depth: Int = 2): U = withScope {  require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.")  if (partitions.length == 0) {    Utils.clone(zeroValue, context.env.closureSerializer.newInstance())  } else {    val cleanSeqOp = context.clean(seqOp)    val cleanCombOp = context.clean(combOp)    val aggregatePartition =      (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)    var partiallyAggregated = mapPartitions(it => Iterator(aggregatePartition(it)))    var numPartitions = partiallyAggregated.partitions.length    val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2)    // If creating an extra level doesn't help reduce    // the wall-clock time, we stop tree aggregation.          // Don't trigger TreeAggregation when it doesn't save wall-clock time    while (numPartitions > scale + math.ceil(numPartitions.toDouble / scale)) {      numPartitions /= scale      val curNumPartitions = numPartitions      partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex {        (i, iter) => % curNumPartitions, _))      }.reduceByKey(new HashPartitioner(curNumPartitions), cleanCombOp).values    }    partiallyAggregated.reduce(cleanCombOp)  }



List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,3);
JavaRDD<String> javaRDD1 = Function<Integer, String>() {    @Override    public String call(Integer v1) throws Exception {        return Integer.toString(v1);    }
});String result1 = javaRDD1.treeAggregate("0", new Function2<String, String, String>() {    @Override    public String call(String v1, String v2) throws Exception {        System.out.println(v1 + "=seq=" + v2);        return v1 + "=seq=" + v2;    }
}, new Function2<String, String, String>() {    @Override    public String call(String v1, String v2) throws Exception {        System.out.println(v1 + "<=comb=>" + v2);        return v1 + "<=comb=>" + v2;    }



Reduces the elements of this RDD in a multi-level tree pattern.


def treeReduce(f: JFunction2[T, T, T], depth: Int): T
def treeReduce(f: JFunction2[T, T, T]): T



def treeReduce(f: (T, T) => T, depth: Int = 2): T = withScope {  require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.")  val cleanF = context.clean(f)  val reducePartition: Iterator[T] => Option[T] = iter => {    if (iter.hasNext) {      Some(iter.reduceLeft(cleanF))    } else {      None    }  }  val partiallyReduced = mapPartitions(it => Iterator(reducePartition(it)))  val op: (Option[T], Option[T]) => Option[T] = (c, x) => {    if (c.isDefined && x.isDefined) {      Some(cleanF(c.get, x.get))    } else if (c.isDefined) {      c    } else if (x.isDefined) {      x    } else {      None    }  }
partiallyReduced.treeAggregate(Option.empty[T])(op, op, depth)    .getOrElse(throw new UnsupportedOperationException("empty collection"))}



List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,5);
JavaRDD<String> javaRDD1 = Function<Integer, String>() {    @Override    public String call(Integer v1) throws Exception {        return Integer.toString(v1);    }
String result = javaRDD1.treeReduce(new Function2<String, String, String>() {    @Override    public String call(String v1, String v2) throws Exception {        System.out.println(v1 + "=" + v2);        return v1 + "=" + v2;    }
System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + treeReduceRDD);



Save this RDD as a text file, using string representations of elements.


def saveAsTextFile(path: String): Unit
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit



def saveAsTextFile(path: String): Unit = withScope {  //  //  // NullWritable is a `Comparable` in Hadoop 1.+, so the compiler cannot find an implicit  // Ordering for it and will use the default `null`. However, it's a `Comparable[NullWritable]`  // in Hadoop 2.+, so the compiler will call the implicit `Ordering.ordered` method to create an  // Ordering for `NullWritable`. That's why the compiler will generate different anonymous  // classes for `saveAsTextFile` in Hadoop 1.+ and Hadoop 2.+.  //  // Therefore, here we provide an explicit Ordering `null` to make sure the compiler generate  // same bytecodes for `saveAsTextFile`.  val nullWritableClassTag = implicitly[ClassTag[NullWritable]]  val textClassTag = implicitly[ClassTag[Text]]  val r = this.mapPartitions { iter =>    val text = new Text() { x =>      text.set(x.toString)      (NullWritable.get(), text)    }  }  RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)    .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
* Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
* supporting the key and value types K and V in this RDD.
def saveAsHadoopFile(    path: String,    keyClass: Class[_],    valueClass: Class[_],    outputFormatClass: Class[_ <: OutputFormat[_, _]],    conf: JobConf = new JobConf(self.context.hadoopConfiguration),    codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope {  // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).  val hadoopConf = conf  hadoopConf.setOutputKeyClass(keyClass)  hadoopConf.setOutputValueClass(valueClass)  // Doesn't work in Scala 2.9 due to what may be a generics bug  // TODO: Should we uncomment this for Scala 2.10?  // conf.setOutputFormat(outputFormatClass)  hadoopConf.set("mapred.output.format.class", outputFormatClass.getName)  for (c <- codec) {    hadoopConf.setCompressMapOutput(true)    hadoopConf.set("mapred.output.compress", "true")    hadoopConf.setMapOutputCompressorClass(c)    hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName)    hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)  }  // Use configured output committer if already set  if (conf.getOutputCommitter == null) {    hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])  }  FileOutputFormat.setOutputPath(hadoopConf,   SparkHadoopWriter.createPathFromString(path, hadoopConf))  saveAsHadoopDataset(hadoopConf)
* Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for
* that storage system. The JobConf should set an OutputFormat and any output paths required
* (e.g. a table name to write to) in the same way as it would be configured for a Hadoop
* MapReduce job.
def saveAsHadoopDataset(conf: JobConf): Unit = self.withScope {  // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).  val hadoopConf = conf  val wrappedConf = new SerializableConfiguration(hadoopConf)  val outputFormatInstance = hadoopConf.getOutputFormat  val keyClass = hadoopConf.getOutputKeyClass  val valueClass = hadoopConf.getOutputValueClass  if (outputFormatInstance == null) {    throw new SparkException("Output format class not set")  }  if (keyClass == null) {    throw new SparkException("Output key class not set")  }  if (valueClass == null) {    throw new SparkException("Output value class not set")  }  SparkHadoopUtil.get.addCredentials(hadoopConf)  logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " +    valueClass.getSimpleName + ")")  if (isOutputSpecValidationEnabled) {    // FileOutputFormat ignores the filesystem parameter    val ignoredFs = FileSystem.get(hadoopConf)    hadoopConf.getOutputFormat.checkOutputSpecs(ignoredFs, hadoopConf)  }  val writer = new SparkHadoopWriter(hadoopConf)  writer.preSetup()  val writeToFile = (context: TaskContext, iter: Iterator[(K, V)]) => {    val config = wrappedConf.value    // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it    // around by taking a mod. We expect that no task will be attempted 2 billion times.    val taskAttemptId = (context.taskAttemptId % Int.MaxValue).toInt    val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context)    writer.setup(context.stageId, context.partitionId, taskAttemptId)    var recordsWritten = 0L    Utils.tryWithSafeFinally {      while (iter.hasNext) {        val record =        writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])        // Update bytes written metric every few records        maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten)        recordsWritten += 1      }
} {      writer.close()
}    writer.commit()    bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) }    outputMetrics.setRecordsWritten(recordsWritten)  }  self.context.runJob(self, writeToFile)  writer.commitJob()



List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,5);



Save this RDD as a SequenceFile of serialized objects.


def saveAsObjectFile(path: String): Unit



def saveAsObjectFile(path: String): Unit = withScope {  this.mapPartitions(iter => iter.grouped(10).map(_.toArray))    .map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x))))    .saveAsSequenceFile(path)
}def saveAsSequenceFile(    path: String,    codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope {  def anyToWritable[U <% Writable](u: U): Writable = u  // TODO We cannot force the return type of `anyToWritable` be same as keyWritableClass and  // valueWritableClass at the compile time. To implement that, we need to add type parameters to  // SequenceFileRDDFunctions. however, SequenceFileRDDFunctions is a public class so it will be a  // breaking change.  val convertKey = self.keyClass != keyWritableClass  val convertValue = self.valueClass != valueWritableClass  logInfo("Saving as sequence file of type (" + keyWritableClass.getSimpleName + "," +    valueWritableClass.getSimpleName + ")" )  val format = classOf[SequenceFileOutputFormat[Writable, Writable]]  val jobConf = new JobConf(self.context.hadoopConfiguration)  if (!convertKey && !convertValue) {    self.saveAsHadoopFile(path, keyWritableClass, valueWritableClass, format, jobConf, codec)  } else if (!convertKey && convertValue) { => (x._1, anyToWritable(x._2))).saveAsHadoopFile(      path, keyWritableClass, valueWritableClass, format, jobConf, codec)  } else if (convertKey && !convertValue) { => (anyToWritable(x._1), x._2)).saveAsHadoopFile(      path, keyWritableClass, valueWritableClass, format, jobConf, codec)  } else if (convertKey && convertValue) { => (anyToWritable(x._1), anyToWritable(x._2))).saveAsHadoopFile(      path, keyWritableClass, valueWritableClass, format, jobConf, codec)  }



List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,5);

