reduce


官方文档描述:

Reduces the elements of this RDD using the specified commutative and associative binary operator.

函数原型:

def reduce(f: JFunction2[T, T, T]): T

根据映射函数f,对RDD中的元素进行二元计算(满足交换律和结合律),返回计算结果。

源码分析:

def reduce(f: (T, T) => T): T = withScope {  val cleanF = sc.clean(f)  val reducePartition: Iterator[T] => Option[T] = iter => {    if (iter.hasNext) {      Some(iter.reduceLeft(cleanF))    } else {      None    }  }  var jobResult: Option[T] = None  val mergeResult = (index: Int, taskResult: Option[T]) => {    if (taskResult.isDefined) {      jobResult = jobResult match {        case Some(value) => Some(f(value, taskResult.get))        case None => taskResult      }    }  }  sc.runJob(this, reducePartition, mergeResult)  // Get the final result out of our Option, or throw an exception if the RDD was empty  jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
}

从源码中可以看出,reduce函数相当于对RDD中的元素进行reduceLeft函数操作,reduceLeft函数是从列表的左边往右边应用reduce函数;之后,在driver端对结果进行合并处理,因此,如果分区数量过多或者自定义函数过于复杂,对driver端的负载比较重。

实例:

JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2);JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,3);Integer reduceRDD = javaRDD.reduce(new Function2<Integer, Integer, Integer>() {    @Override    public Integer call(Integer v1, Integer v2) throws Exception {        return v1 + v2;    }
});
System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + reduceRDD);

aggregate


官方文档描述:

Aggregate the elements of each partition, and then the results for all the partitions, using given combine functions and a neutral "zero value". This function can return a different result type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are allowed to modify and return their first argument instead of creating a new U to avoid memory allocation.

函数原型:

def aggregate[U](zeroValue: U)(seqOp: JFunction2[U, T, U],  combOp: JFunction2[U, U, U]): U

aggregate合并每个区分的每个元素,然后在对分区结果进行merge处理,这个函数最终返回的类型不需要和RDD中元素类型一致。

源码分析:

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {  // Clone the zero value since we will also be serializing it as part of tasks  var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())  val cleanSeqOp = sc.clean(seqOp)  val cleanCombOp = sc.clean(combOp)  val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)  val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)  sc.runJob(this, aggregatePartition, mergeResult)  jobResult
}

从源码中可以看出,aggregate函数针对每个分区利用scala集合操作aggregate,再使用comb()将之前每个分区结果聚合。

实例:

JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,3);
Integer aggregateRDD = javaRDD.aggregate(2, new Function2<Integer, Integer, Integer>() {    @Override    public Integer call(Integer v1, Integer v2) throws Exception {        return v1 + v2;    }
}, new Function2<Integer, Integer, Integer>() {    @Override    public Integer call(Integer v1, Integer v2) throws Exception {          return v1 + v2;    }
});
System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + aggregateRDD);

fold


官方文档描述:

Aggregate the elements of each partition, and then the results for all the partitions, using a given associative and commutative function and a neutral "zero value". The function op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object allocation; however, it should not modify t2.

函数原型:

def fold(zeroValue: T)(f: JFunction2[T, T, T]): T

fold是aggregate的简化,将aggregate中的seqOp和combOp使用同一个函数op。

源码分析:

def fold(zeroValue: T)(op: (T, T) => T): T = withScope {  // Clone the zero value since we will also be serializing it as part of tasks var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())  val cleanOp = sc.clean(op)  val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp)  val mergeResult = (index: Int, taskResult: T) => jobResult = op(jobResult, taskResult)  sc.runJob(this, foldPartition, mergeResult)  jobResult
}

从源码中可以看出,先是将zeroValue赋值给jobResult,然后针对每个分区利用op函数与zeroValue进行计算,再利用op函数将taskResult和jobResult合并计算,同时更新jobResult,最后,将jobResult的结果返回。

实例:

List<String> data = Arrays.asList("5", "1", "1", "3", "6", "2", "2");
JavaRDD<String> javaRDD = javaSparkContext.parallelize(data,5);
JavaRDD<String> partitionRDD = javaRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {    @Override    public Iterator<String> call(Integer v1, Iterator<String> v2) throws Exception {        LinkedList<String> linkedList = new LinkedList<String>();        while(v2.hasNext()){            linkedList.add(v1 + "=" + v2.next());        }        return linkedList.iterator();    }
},false);System.out.println(partitionRDD.collect());String foldRDD = javaRDD.fold("0", new Function2<String, String, String>() {    @Override    public String call(String v1, String v2) throws Exception {        return v1 + " - " + v2;    }
});
System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + foldRDD);

countByKey


官方文档描述:

Count the number of elements for each key, collecting the results to a local Map.Note that this method should only be used if the resulting map is expected to be small, as the whole thing is loaded into the driver's memory. To handle very large results, consider using rdd.mapValues(_ => 1L).reduceByKey(_ + _), which returns an RDD[T, Long] instead of a map.

函数原型:

def countByKey(): java.util.Map[K, Long]

源码分析:

def countByKey(): Map[K, Long] = self.withScope {  self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap
}

从源码中可以看出,先是进行map操作转化为(key,1)键值对,再进行reduce聚合操作,最后利用collect函数将数据加载到driver,并转化为map类型。 
注意,从上述分析可以看出,countByKey操作将数据全部加载到driver端的内存,如果数据量比较大,可能出现OOM。因此,如果key数量比较多,建议进行rdd.mapValues(_ => 1L).reduceByKey(_ + _),返回RDD[T, Long]

实例:

List<String> data = Arrays.asList("5", "1", "1", "3", "6", "2", "2");
JavaRDD<String> javaRDD = javaSparkContext.parallelize(data,5);JavaRDD<String> partitionRDD = javaRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {    @Override      public Iterator<String> call(Integer v1, Iterator<String> v2) throws Exception {        LinkedList<String> linkedList = new LinkedList<String>();        while(v2.hasNext()){            linkedList.add(v1 + "=" + v2.next());        }        return linkedList.iterator();    }
},false);
System.out.println(partitionRDD.collect());
JavaPairRDD<String,String> javaPairRDD = javaRDD.mapToPair(new PairFunction<String, String, String>() {    @Override    public Tuple2<String, String> call(String s) throws Exception {        return new Tuple2<String, String>(s,s);    }
});
System.out.println(javaPairRDD.countByKey());

foreach


官方文档描述:

Applies a function f to all elements of this RDD.

函数原型:

def foreach(f: VoidFunction[T])

foreach用于遍历RDD,将函数f应用于每一个元素。

源码分析:

def foreach(f: T => Unit): Unit = withScope {  val cleanF = sc.clean(f)  sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}

实例:

List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,3);
javaRDD.foreach(new VoidFunction<Integer>() {    @Override    public void call(Integer integer) throws Exception {        System.out.println(integer);    }
});

foreachPartition


官方文档描述:

Applies a function f to each partition of this RDD.

函数原型:

def foreachPartition(f: VoidFunction[java.util.Iterator[T]])

foreachPartition和foreach类似,只不过是对每一个分区使用f。

源码分析:

def foreachPartition(f: Iterator[T] => Unit): Unit = withScope {  val cleanF = sc.clean(f)  sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
}

实例:

List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,3);//获得分区ID
JavaRDD<String> partitionRDD = javaRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<Integer>, Iterator<String>>() {    @Override    public Iterator<String> call(Integer v1, Iterator<Integer> v2) throws Exception {        LinkedList<String> linkedList = new LinkedList<String>();        while(v2.hasNext()){            linkedList.add(v1 + "=" + v2.next());        }return linkedList.iterator();    }
},false);
System.out.println(partitionRDD.collect());
javaRDD.foreachPartition(new VoidFunction<Iterator<Integer>>() {    @Override    public void call(Iterator<Integer> integerIterator) throws Exception {        System.out.println("___________begin_______________");        while(integerIterator.hasNext())            System.out.print(integerIterator.next() + "      ");        System.out.println("\n___________end_________________");    }
});

lookup


官方文档描述:

Return the list of values in the RDD for key `key`. This operation is done efficiently if the RDD has a known partitioner by only searching the partition that the key maps to.

函数原型:

def lookup(key: K): JList[V]

lookup用于(K,V)类型的RDD,指定K值,返回RDD中该K对应的所有V值。

源码分析:

def lookup(key: K): Seq[V] = self.withScope {  self.partitioner match {    case Some(p) =>      val index = p.getPartition(key)      val process = (it: Iterator[(K, V)]) => {        val buf = new ArrayBuffer[V]        for (pair <- it if pair._1 == key) {          buf += pair._2        }        buf      } : Seq[V]      val res = self.context.runJob(self, process, Array(index), false)      res(0)    case None =>      self.filter(_._1 == key).map(_._2).collect()  }
}

从源码中可以看出,如果partitioner不为空,计算key得到对应的partition,在从该partition中获得key对应的所有value;如果partitioner为空,则通过filter过滤掉其他不等于key的值,然后将其value输出。

实例:

List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data, 3);
JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() {    int i = 0;    @Override    public Tuple2<Integer, Integer> call(Integer integer) throws Exception {        i++;        return new Tuple2<Integer, Integer>(integer,i + integer);    }
});
System.out.println(javaPairRDD.collect());
System.out.println("lookup------------" + javaPairRDD.lookup(4));

sortBy


官方文档描述:

Return this RDD sorted by the given key function.

函数原型:

def sortBy[S](f: JFunction[T, S], ascending: Boolean, numPartitions: Int): JavaRDD[T]

sortBy根据给定的f函数将RDD中的元素进行排序。

源码分析:

def sortBy[K](   f: (T) => K,    ascending: Boolean = true,    numPartitions: Int = this.partitions.length)    (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {  this.keyBy[K](f)      .sortByKey(ascending, numPartitions)      .values
}
/**
* Creates tuples of the elements in this RDD by applying `f`.
*/
def keyBy[K](f: T => K): RDD[(K, T)] = withScope {  val cleanedF = sc.clean(f)  map(x => (cleanedF(x), x))
}

从源码中可以看出,sortBy函数的实现依赖于sortByKey函数。该函数接受三个参数,第一参数是一个函数,该函数带有泛型参数T,返回类型与RDD中的元素类型一致,主要是用keyBy函数的map转化,将每个元素转化为tuples类型的元素;第二个参数是ascending,该参数是可选参数,主要用于RDD中的元素的排序方式,默认是true,是升序;第三个参数是numPartitions,该参数也是可选参数,主要使用对排序后的RDD进行分区,默认的分区个数与排序前一致是partitions.length。

实例:

List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data, 3);
final Random random = new Random(100);
//对RDD进行转换,每个元素有两部分组成
JavaRDD<String> javaRDD1 = javaRDD.map(new Function<Integer, String>() {    @Override    public String call(Integer v1) throws Exception {        return v1.toString() + "_" + random.nextInt(100);    }
});
System.out.println(javaRDD1.collect());
//按RDD中每个元素的第二部分进行排序
JavaRDD<String> resultRDD = javaRDD1.sortBy(new Function<String, Object>() {    @Override    public Object call(String v1) throws Exception {        return v1.split("_")[1];    }
},false,3);
System.out.println("result--------------" + resultRDD.collect());

takeOrdered


官方文档描述:

Returns the first k (smallest) elements from this RDD using the
natural ordering for T while maintain the order.

函数原型:

def takeOrdered(num: Int): JList[T]
def takeOrdered(num: Int, comp: Comparator[T]): JList[T]

takeOrdered函数用于从RDD中,按照默认(升序)或指定排序规则,返回前num个元素。

源码分析:

def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {  if (num == 0) {    Array.empty  } else {    val mapRDDs = mapPartitions { items =>      // Priority keeps the largest elements, so let's reverse the ordering.      val queue = new BoundedPriorityQueue[T](num)(ord.reverse)      queue ++= util.collection.Utils.takeOrdered(items, num)(ord)      Iterator.single(queue)    }    if (mapRDDs.partitions.length == 0) {      Array.empty    } else {      mapRDDs.reduce { (queue1, queue2) =>        queue1 ++= queue2        queue1      }.toArray.sorted(ord)    }  }
}

从源码分析可以看出,利用mapPartitions在每个分区里面进行分区排序,每个分区局部排序只返回num个元素,这里注意返回的mapRDDs的元素是BoundedPriorityQueue优先队列,再针对mapRDDs进行reduce函数操作,转化为数组进行全局排序。

实例:

//注意comparator需要序列化
public static class TakeOrderedComparator implements Serializable,Comparator<Integer>{    @Override    public int compare(Integer o1, Integer o2) {        return -o1.compareTo(o2);    }
}
List<Integer> data = Arrays.asList(5, 1, 0, 4, 4, 2, 2);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data, 3);
System.out.println("takeOrdered-----1-------------" + javaRDD.takeOrdered(2));
List<Integer> list = javaRDD.takeOrdered(2, new TakeOrderedComparator());
System.out.println("takeOrdered----2--------------" + list);

takeSample


官方文档描述:

Return a fixed-size sampled subset of this RDD in an array

函数原型:

def takeSample(withReplacement: Boolean, num: Int): JList[T]def takeSample(withReplacement: Boolean, num: Int, seed: Long): JList[T] 

takeSample函数返回一个数组,在数据集中随机采样 num 个元素组成。

源码分析:

def takeSample(    withReplacement: Boolean,    num: Int,    seed: Long = Utils.random.nextLong): Array[T] =
{  val numStDev = 10.0  if (num < 0) {    throw new IllegalArgumentException("Negative number of elements requested")  } else if (num == 0) {    return new Array[T](0)  }  val initialCount = this.count()  if (initialCount == 0) {    return new Array[T](0)  }val maxSampleSize = Int.MaxValue - (numStDev * math.sqrt(Int.MaxValue)).toInt  if (num > maxSampleSize) {    throw new IllegalArgumentException("Cannot support a sample size > Int.MaxValue - " +      s"$numStDev * math.sqrt(Int.MaxValue)")  }  val rand = new Random(seed)    if (!withReplacement && num >= initialCount) {    return Utils.randomizeInPlace(this.collect(), rand)  }  val fraction = SamplingUtils.computeFractionForSampleSize(num, initialCount,    withReplacement)  var samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()  // If the first sample didn't turn out large enough, keep trying to take samples;  // this shouldn't happen often because we use a big multiplier for the initial size  var numIters = 0  while (samples.length < num) {    logWarning(s"Needed to re-sample due to insufficient sample size. Repeat #$numIters")    samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()    numIters += 1  }  Utils.randomizeInPlace(samples, rand).take(num)
}

从源码中可以看出,takeSample函数类似于sample函数,该函数接受三个参数,第一个参数withReplacement ,表示采样是否放回,true表示有放回的采样,false表示无放回采样;第二个参数num,表示返回的采样数据的个数,这个也是takeSample函数和sample函数的区别;第三个参数seed,表示用于指定的随机数生成器种子。另外,takeSample函数先是计算fraction,也就是采样比例,然后调用sample函数进行采样,并对采样后的数据进行collect(),最后调用take函数返回num个元素。注意,如果采样个数大于RDD的元素个数,且选择的无放回采样,则返回RDD的元素的个数。

实例:

List<Integer> data = Arrays.asList(5, 1, 0, 4, 4, 2, 2);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data, 3);
System.out.println("takeSample-----1-------------" + javaRDD.takeSample(true,2));
System.out.println("takeSample-----2-------------" + javaRDD.takeSample(true,2,100));
//返回20个元素
System.out.println("takeSample-----3-------------" + javaRDD.takeSample(true,20,100));
//返回7个元素
System.out.println("takeSample-----4-------------" + javaRDD.takeSample(false,20,100));

treeAggregate


官方文档描述:

Aggregates the elements of this RDD in a multi-level tree pattern.

函数原型:

def treeAggregate[U](    zeroValue: U,    seqOp: JFunction2[U, T, U],    combOp: JFunction2[U, U, U],depth: Int): U
def treeAggregate[U](    zeroValue: U,    seqOp: JFunction2[U, T, U],    combOp: JFunction2[U, U, U]): U 

可理解为更复杂的多阶aggregate。

源码分析:

def treeAggregate[U: ClassTag](zeroValue: U)(    seqOp: (U, T) => U,    combOp: (U, U) => U,    depth: Int = 2): U = withScope {  require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.")  if (partitions.length == 0) {    Utils.clone(zeroValue, context.env.closureSerializer.newInstance())  } else {    val cleanSeqOp = context.clean(seqOp)    val cleanCombOp = context.clean(combOp)    val aggregatePartition =      (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)    var partiallyAggregated = mapPartitions(it => Iterator(aggregatePartition(it)))    var numPartitions = partiallyAggregated.partitions.length    val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2)    // If creating an extra level doesn't help reduce    // the wall-clock time, we stop tree aggregation.          // Don't trigger TreeAggregation when it doesn't save wall-clock time    while (numPartitions > scale + math.ceil(numPartitions.toDouble / scale)) {      numPartitions /= scale      val curNumPartitions = numPartitions      partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex {        (i, iter) => iter.map((i % curNumPartitions, _))      }.reduceByKey(new HashPartitioner(curNumPartitions), cleanCombOp).values    }    partiallyAggregated.reduce(cleanCombOp)  }
}

从源码中可以看出,treeAggregate函数先是对每个分区利用scala的aggregate函数进行局部聚合的操作;同时,依据depth参数计算scale,如果当分区数量过多时,则按i%curNumPartitions进行key值计算,再按key进行重新分区合并计算;最后,在进行reduce聚合操作。这样可以通过调解深度来减少reduce的开销。

实例:

List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,3);
//转化操作
JavaRDD<String> javaRDD1 = javaRDD.map(new Function<Integer, String>() {    @Override    public String call(Integer v1) throws Exception {        return Integer.toString(v1);    }
});String result1 = javaRDD1.treeAggregate("0", new Function2<String, String, String>() {    @Override    public String call(String v1, String v2) throws Exception {        System.out.println(v1 + "=seq=" + v2);        return v1 + "=seq=" + v2;    }
}, new Function2<String, String, String>() {    @Override    public String call(String v1, String v2) throws Exception {        System.out.println(v1 + "<=comb=>" + v2);        return v1 + "<=comb=>" + v2;    }
});
System.out.println(result1);

treeReduce


官方文档描述:

Reduces the elements of this RDD in a multi-level tree pattern.

函数原型:

def treeReduce(f: JFunction2[T, T, T], depth: Int): T
def treeReduce(f: JFunction2[T, T, T]): T

与treeAggregate类似,只不过是seqOp和combOp相同的treeAggregate。

源码分析:

def treeReduce(f: (T, T) => T, depth: Int = 2): T = withScope {  require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.")  val cleanF = context.clean(f)  val reducePartition: Iterator[T] => Option[T] = iter => {    if (iter.hasNext) {      Some(iter.reduceLeft(cleanF))    } else {      None    }  }  val partiallyReduced = mapPartitions(it => Iterator(reducePartition(it)))  val op: (Option[T], Option[T]) => Option[T] = (c, x) => {    if (c.isDefined && x.isDefined) {      Some(cleanF(c.get, x.get))    } else if (c.isDefined) {      c    } else if (x.isDefined) {      x    } else {      None    }  }
partiallyReduced.treeAggregate(Option.empty[T])(op, op, depth)    .getOrElse(throw new UnsupportedOperationException("empty collection"))}

从源码中可以看出,treeReduce函数先是针对每个分区利用scala的reduceLeft函数进行计算;最后,在将局部合并的RDD进行treeAggregate计算,这里的seqOp和combOp一样,初值为空。在实际应用中,可以用treeReduce来代替reduce,主要是用于单个reduce操作开销比较大,而treeReduce可以通过调整深度来控制每次reduce的规模。

实例:

List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,5);
JavaRDD<String> javaRDD1 = javaRDD.map(new Function<Integer, String>() {    @Override    public String call(Integer v1) throws Exception {        return Integer.toString(v1);    }
});
String result = javaRDD1.treeReduce(new Function2<String, String, String>() {    @Override    public String call(String v1, String v2) throws Exception {        System.out.println(v1 + "=" + v2);        return v1 + "=" + v2;    }
});
System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + treeReduceRDD);

saveAsTextFile


官方文档描述:

Save this RDD as a text file, using string representations of elements.

函数原型:

def saveAsTextFile(path: String): Unit
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit

saveAsTextFile用于将RDD以文本文件的格式存储到文件系统中。

源码分析:

def saveAsTextFile(path: String): Unit = withScope {  // https://issues.apache.org/jira/browse/SPARK-2075  //  // NullWritable is a `Comparable` in Hadoop 1.+, so the compiler cannot find an implicit  // Ordering for it and will use the default `null`. However, it's a `Comparable[NullWritable]`  // in Hadoop 2.+, so the compiler will call the implicit `Ordering.ordered` method to create an  // Ordering for `NullWritable`. That's why the compiler will generate different anonymous  // classes for `saveAsTextFile` in Hadoop 1.+ and Hadoop 2.+.  //  // Therefore, here we provide an explicit Ordering `null` to make sure the compiler generate  // same bytecodes for `saveAsTextFile`.  val nullWritableClassTag = implicitly[ClassTag[NullWritable]]  val textClassTag = implicitly[ClassTag[Text]]  val r = this.mapPartitions { iter =>    val text = new Text()    iter.map { x =>      text.set(x.toString)      (NullWritable.get(), text)    }  }  RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)    .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
}
/**
* Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
* supporting the key and value types K and V in this RDD.
*/
def saveAsHadoopFile(    path: String,    keyClass: Class[_],    valueClass: Class[_],    outputFormatClass: Class[_ <: OutputFormat[_, _]],    conf: JobConf = new JobConf(self.context.hadoopConfiguration),    codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope {  // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).  val hadoopConf = conf  hadoopConf.setOutputKeyClass(keyClass)  hadoopConf.setOutputValueClass(valueClass)  // Doesn't work in Scala 2.9 due to what may be a generics bug  // TODO: Should we uncomment this for Scala 2.10?  // conf.setOutputFormat(outputFormatClass)  hadoopConf.set("mapred.output.format.class", outputFormatClass.getName)  for (c <- codec) {    hadoopConf.setCompressMapOutput(true)    hadoopConf.set("mapred.output.compress", "true")    hadoopConf.setMapOutputCompressorClass(c)    hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName)    hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)  }  // Use configured output committer if already set  if (conf.getOutputCommitter == null) {    hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])  }  FileOutputFormat.setOutputPath(hadoopConf,   SparkHadoopWriter.createPathFromString(path, hadoopConf))  saveAsHadoopDataset(hadoopConf)
}/**
* Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for
* that storage system. The JobConf should set an OutputFormat and any output paths required
* (e.g. a table name to write to) in the same way as it would be configured for a Hadoop
* MapReduce job.
*/
def saveAsHadoopDataset(conf: JobConf): Unit = self.withScope {  // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).  val hadoopConf = conf  val wrappedConf = new SerializableConfiguration(hadoopConf)  val outputFormatInstance = hadoopConf.getOutputFormat  val keyClass = hadoopConf.getOutputKeyClass  val valueClass = hadoopConf.getOutputValueClass  if (outputFormatInstance == null) {    throw new SparkException("Output format class not set")  }  if (keyClass == null) {    throw new SparkException("Output key class not set")  }  if (valueClass == null) {    throw new SparkException("Output value class not set")  }  SparkHadoopUtil.get.addCredentials(hadoopConf)  logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " +    valueClass.getSimpleName + ")")  if (isOutputSpecValidationEnabled) {    // FileOutputFormat ignores the filesystem parameter    val ignoredFs = FileSystem.get(hadoopConf)    hadoopConf.getOutputFormat.checkOutputSpecs(ignoredFs, hadoopConf)  }  val writer = new SparkHadoopWriter(hadoopConf)  writer.preSetup()  val writeToFile = (context: TaskContext, iter: Iterator[(K, V)]) => {    val config = wrappedConf.value    // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it    // around by taking a mod. We expect that no task will be attempted 2 billion times.    val taskAttemptId = (context.taskAttemptId % Int.MaxValue).toInt    val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context)    writer.setup(context.stageId, context.partitionId, taskAttemptId)    writer.open()    var recordsWritten = 0L    Utils.tryWithSafeFinally {      while (iter.hasNext) {        val record = iter.next()        writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])        // Update bytes written metric every few records        maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten)        recordsWritten += 1      }
} {      writer.close()
}    writer.commit()    bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) }    outputMetrics.setRecordsWritten(recordsWritten)  }  self.context.runJob(self, writeToFile)  writer.commitJob()
}

从源码中可以看到,saveAsTextFile函数是依赖于saveAsHadoopFile函数,由于saveAsHadoopFile函数接受PairRDD,所以在saveAsTextFile函数中利用rddToPairRDDFunctions函数转化为(NullWritable,Text)类型的RDD,然后通过saveAsHadoopFile函数实现相应的写操作。

实例:

List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,5);
javaRDD.saveAsTextFile("/user/tmp");

savaAsObjectFile


官方文档描述:

Save this RDD as a SequenceFile of serialized objects.

函数原型:

def saveAsObjectFile(path: String): Unit

saveAsObjectFile用于将RDD中的元素序列化成对象,存储到文件中。

源码分析:

def saveAsObjectFile(path: String): Unit = withScope {  this.mapPartitions(iter => iter.grouped(10).map(_.toArray))    .map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x))))    .saveAsSequenceFile(path)
}def saveAsSequenceFile(    path: String,    codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope {  def anyToWritable[U <% Writable](u: U): Writable = u  // TODO We cannot force the return type of `anyToWritable` be same as keyWritableClass and  // valueWritableClass at the compile time. To implement that, we need to add type parameters to  // SequenceFileRDDFunctions. however, SequenceFileRDDFunctions is a public class so it will be a  // breaking change.  val convertKey = self.keyClass != keyWritableClass  val convertValue = self.valueClass != valueWritableClass  logInfo("Saving as sequence file of type (" + keyWritableClass.getSimpleName + "," +    valueWritableClass.getSimpleName + ")" )  val format = classOf[SequenceFileOutputFormat[Writable, Writable]]  val jobConf = new JobConf(self.context.hadoopConfiguration)  if (!convertKey && !convertValue) {    self.saveAsHadoopFile(path, keyWritableClass, valueWritableClass, format, jobConf, codec)  } else if (!convertKey && convertValue) {    self.map(x => (x._1, anyToWritable(x._2))).saveAsHadoopFile(      path, keyWritableClass, valueWritableClass, format, jobConf, codec)  } else if (convertKey && !convertValue) {    self.map(x => (anyToWritable(x._1), x._2)).saveAsHadoopFile(      path, keyWritableClass, valueWritableClass, format, jobConf, codec)  } else if (convertKey && convertValue) {    self.map(x => (anyToWritable(x._1), anyToWritable(x._2))).saveAsHadoopFile(      path, keyWritableClass, valueWritableClass, format, jobConf, codec)  }
}

从源码中可以看出,saveAsObjectFile函数是依赖于saveAsSequenceFile函数实现的,将RDD转化为类型为

实例:

List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,5);
javaRDD.saveAsObjectFile("/user/tmp");

Spark Java API:Action相关推荐

  1. Spark Java API:Transformation

    mapPartitions 官方文档描述: Return a new RDD by applying a function to each partition of this RDD. mapPart ...

  2. Spark Java API:broadcast、accumulator

    broadcast 官方文档描述: Broadcast a read-only variable to the cluster, returning a [[org.apache.spark.broa ...

  3. Spark Java API:foreach、foreachPartition、lookup

    foreach 官方文档描述: Applies a function f to all elements of this RDD. 函数原型: def foreach(f: VoidFunction[ ...

  4. java spark 环境_在 IntelliJ IDEA 中配置 Spark(Java API) 运行环境

    1. 新建Maven项目 初始Maven项目完成后,初始的配置(pom.xml)如下: 2. 配置Maven 向项目里新建Spark Core库 xmlns:xsi="http://www. ...

  5. spark (java API) 在Intellij IDEA中开发并运行

    Spark 程序开发,调试和运行,intellij idea开发Spark java程序.  分两部分,第一部分基于intellij idea开发Spark实例程序并在intellij IDEA中运行 ...

  6. spark java api通过run as java application运行的方法

    先上代码: [python] view plain copy   /* * Licensed to the Apache Software Foundation (ASF) under one or  ...

  7. java基础----数据结构与算法----Java API:集合 以及 排序相关API

    概述: 基本数据结构与算法在jdk中都有相应的API 数组+java.util.Arrays java中的集合类 Collection-->List+Set-->接口的实现类 Map   ...

  8. Spark RDD API:Map和Reduce

    参考文章: http://blog.csdn.net/jewes/article/details/39896301 http://homepage.cs.latrobe.edu.au/zhe/Zhen ...

  9. ElasticSearch Java API:Mget操作

    创建一个主查询请求对象: MultiGetRequest request = new MultiGetRequest(); 然后依次使用主请求对象的add方法,将子查询对象加入到主查询中 reques ...

最新文章

  1. 【2022新书】机器学习基础
  2. python测试用例怎么写_Python单元测试unittest的具体使用示例
  3. c++-initializer_list
  4. linux 如何在命令行下改系统时间
  5. (27)System Verilog设计UART接收
  6. iptables的连接追踪机制和nf_conntrack调优
  7. 后宫宛如传服务器维护,后宫宛如传完整版
  8. Atitit 项目管理 提升开发效率的项目流程方法模型 哑铃型  橄榄型 直板型
  9. ParticleEditor粒子编辑器
  10. 适合win7的python版本_Python 3.9 发布,不再支持 Win7!
  11. 2022年软考系统架构师论文真题
  12. vc++键盘钩子和鼠标钩子
  13. HTML基础代码用法大全,html代码大全(基础使用代码)(颜色代码完整版)
  14. 通达oa 2013 php解密,通达OA漏洞学习 - 安全先师的个人空间 - OSCHINA - 中文开源技术交流社区...
  15. py之基于分块运动补偿的视频压缩
  16. android记账本折线图_小熊记账本
  17. Aria2一键安装及管理脚本,搭建AriaNg前端
  18. 一阶电路实验报告心得_电路实验心得体会
  19. 架构师教你:如何实现两个完全独立闭环业务系统的融合。
  20. 单代号网络图计算例题_单代号网络图究竟是什么?用什么软件画?

热门文章

  1. 小学计算机考查方案,宋家塘街道中心学校2020年理化生实验操作和信息技术考试方案...
  2. python绘制折线图怎么样填充空白颜色_Python:填补两幅图之间的空白
  3. batocera游戏整合包_星露谷物语绅士mod整合包
  4. python获取控制台输出_Python | 用Python获取Windows控制台输出
  5. 计算机usb共享网络泄密,杜绝USB泄密 MyUSBOnly
  6. eclipse中文版界面设置黑色_硬核干货总结IDEA开发的26个常用设置
  7. java pid 获取句柄_获取进程pid、根据进程pid获取线程pid、获取线程进程句柄
  8. android palette组件用法,Palette颜色提取使用详解
  9. 云服务器40g能装sqlserver_双十一买2核4G云服务器,哪里更便宜
  10. SOCK_DGRAM(数据报套接字)与SOCK_STREAM(流套接口)的区别