1.2、创建RDD

1)由一个已经存在的Scala集合创建。
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))

2)由外部存储系统的数据集创建,包括本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、Cassandra、HBase等
val rdd2 = sc.textFile(“hdfs://mycluster/wordcount/input/2.txt”)

1.3、RDD编程API

1.3.1、Transformation

RDD中的所有转换都是延迟加载的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这种设计让Spark更加有效率地运行。

常用的Transformation:

转换 含义
map(func) 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
filter(func) 返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成
flatMap(func) 类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
mapPartitions(func) 类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]
mapPartitionsWithIndex(func) 类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U]
sample(withReplacement, fraction, seed) 根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子
union(otherDataset) 对源RDD和参数RDD求并集后返回一个新的RDD
intersection(otherDataset) 对源RDD和参数RDD求交集后返回一个新的RDD
distinct([numTasks])) 对源RDD进行去重后返回一个新的RDD
groupByKey([numTasks]) 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD
reduceByKey(func, [numTasks]) 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
sortByKey([ascending], [numTasks]) 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
sortBy(func,[ascending], [numTasks]) 与sortByKey类似,但是更灵活
join(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD
cogroup(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K(Iterable,Iterable))类型的RDD
cartesian(otherDataset) 笛卡尔积
pipe(command, [envVars])
coalesce(numPartitions)
repartition(numPartitions)
repartitionAndSortWithinPartitions(partitioner)

1.3.2、Action

动作 含义
reduce(func) 通过func函数聚集RDD中的所有元素,这个功能必须是可交换且可并联的
collect() 在驱动程序中,以数组的形式返回数据集的所有元素
count() 返回RDD的元素个数
first() 返回RDD的第一个元素(类似于take(1))
take(n) 返回一个由数据集的前n个元素组成的数组
takeSample(withReplacement,num, [seed]) 返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子
takeOrdered(n, [ordering])
saveAsTextFile(path) 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本
saveAsSequenceFile(path) 将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。
saveAsObjectFile(path)
countByKey() 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
foreach(func) 在数据集的每一个元素上,运行函数func进行更新。

1.4 练习Spark rdd的api

连接Spark-Shell:

[root@hadoop1 spark-2.1.1-bin-hadoop2.7]# bin/spark-shell --master spark://hadoop1:7077,hadoop2:7077

练习1

//通过并行化生成rdd
val rdd1 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))
//对rdd1里的每一个元素乘2然后排序
val rdd2 = rdd1.map(_ * 2).sortBy(x => x, true)
//过滤出大于等于十的元素
val rdd3 = rdd2.filter(_ >= 10)
//将元素以数组的方式在客户端显示
rdd3.collect

练习2:

val rdd1 = sc.parallelize(Array("a b c", "d e f", "h i j"))
//将rdd1里面的每一个元素先切分在压平
val rdd2 = rdd1.flatMap(_.split(' '))
rdd2.collect

练习3:

val rdd1 = sc.parallelize(List(5, 6, 4, 3))
val rdd2 = sc.parallelize(List(1, 2, 3, 4))
//求并集
val rdd3 = rdd1.union(rdd2)
//求交集
val rdd4 = rdd1.intersection(rdd2)
//去重
rdd3.distinct.collect
rdd4.collect

练习4:

val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
//求jion
val rdd3 = rdd1.join(rdd2)
rdd3.collect
//求并集
val rdd4 = rdd1 union rdd2
rdd4.collect
//按key进行分组
val rdd5 = rdd4.groupByKey
rdd5.collect

练习5:

val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
//cogroup
val rdd3 = rdd1.cogroup(rdd2)
//注意cogroup与groupByKey的区别
rdd3.collect

练习6:

val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5))
//reduce聚合
val rdd2 = rdd1.reduce(_ + _)
rdd2.collect

练习7:

val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2),  ("shuke", 1)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5)))
val rdd3 = rdd1.union(rdd2)
//按key进行聚合
val rdd4 = rdd3.reduceByKey(_ + _)
rdd4.collect
//按value的降序排序
val rdd5 = rdd4.map(t => (t._2, t._1)).sortByKey(false).map(t => (t._2, t._1))
rdd5.collect

练习8:

mapPartitionsdef mapPartitions[U](f: (Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]该函数和map函数类似,只不过映射函数的参数由RDD中每一个元素变成了RDD中每一个分区的迭代器。如果在映射的过程中需要频繁创建额外的对象,使用mapPartitions要比map高效的多比如:将RDD中的所有元素通过JDBC连接写入数据库,如果使用map函数,可能要为每一个元素都创建一个collection,这样开销很大,如果使用mapPartitions,那么只需要针对每一个分区建立一个connection.参数preservesPartitioning表示是否保留父RDD的partitioner分区信息。
//rdd1有两个分区
scala> var rdd1 = sc.makeRDD(1 to 5,2)rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[63] at makeRDD at <console>:24
scala> rdd1.collectres27: Array[Int] = Array(1, 2, 3, 4, 5)
//rdd3将rdd1中每个分区中的数值累加(通过mapPartitions来实现)
scala> var rdd3 = rdd1.mapPartitions{ x => {|     var result = List[Int]()|     var i = 0|     while(x.hasNext) {|         i += x.next()|     }|     result.::(i).iterator| }}rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[64] at mapPartitions at <console>:26
//查看合并结果后的rdd3的值
scala> rdd3.collectres28: Array[Int] = Array(3, 12)
//查看rdd3的分区大小
scala> rdd3.partitions.size
res29: Int = 2

练习9:

mapPartitionsWithIndexdef mapPartitionsWithIndex[U](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]函数作用通mapPartitions,不过提供了两个参数,第一个参数为分区的索引
例如:
scala> var rdd1 = sc.makeRDD(1 to 25,4)rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[66] at makeRDD at <console>:24scala> var rdd2 = rdd1.mapPartitionsWithIndex{|  (x,iter) => {|      var result = List[String]()|      var i = 0|      while(iter.hasNext) {|          i += iter.next()|      }|      result.::(x + "|" + i).iterator|  }| }rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[67] at mapPartitionsWithIndex at <console>:26//获取结果值(从返回的结果中可以看到)
scala> rdd2.collectres30: Array[String] = Array(0|21, 1|57, 2|93, 3|154)再如:
scala> val func = (index:Int,iter:Iterator[(Int)])=> {| iter.toList.map(x => "[partID:" + index + ",val:" + x + "]").iterator| }
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9),2)
scala> rdd1.mapPartitionsWithIndex(func).collectres0: Array[String] = Array([partID:0,val:1], [partID:0,val:2], [partID:0,val:3], [partID:0,val:4],  [partID:1,val:5], [part], [partID:1,val:7], [partID:1,val:8], [partID:1,val:9])

练习8:
aggregate函数将每个分区里的元素进行聚合,然后用combine函数将每个分区的结果和初始值(zerorValue)进行combine操作。这个函数最终返回的类型不需要和RDD中元素类型一致。
函数原型:
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

aggregate
聚合,先在分区内进行聚合,然后再将每个分区的结果一起结果进行聚合scala> def func1(index:Int,iter:Iterator[(Int)]):Iterator[String] = {| iter.toList.map(x => "[partID:" + index + ",val:" + x + "]").iterator| }func1: (index: Int, iter: Iterator[Int])Iterator[String]//创建一个并行化的RDD,有两个分区
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9),2)rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[77] at parallelize at <console>:24
//通过下面的代码可以看到rdd1中内容再两个分区内的分布情况,通过下面的结果可以看出有两个分区,分别是partID:0和partID:1
scala> rdd1.mapPartitionsWithIndex(func1).collectres56: Array[String] = Array([partID:0,val:1], [partID:0,val:2], [partID:0,val:3], [partID:0,val:4], [partID:1,val:5], [partID:1,val:6], [partID:1,val:7], [partID:1,val:8], [partID:1,val:9])
//下面的执行步骤是:
//一:0和1取出最大值1,1和2取出最大值2,2和3取出最大值3,3和4取出最大值4===》第一个分区的最大值是4
//二:0和5取出最大值5,5和6取出最大值6,6和7取出最大值7,7和8取出最大值8,8和9取出最大值9====>第二个分区的最大值是9
//三:后面的执行逻辑是:_+_,就是说将两个分区的最大结果值求和,执行的结果是:(0) + 4+9=13
scala> rdd1.aggregate(0)(math.max(_,_),_+_)
res57: Int = 13//下面的执行步骤是:
//一:3和1取出最大值3,3和2取出最大值3,3和3取出最大值3,3和4取出最大值4===》第一个分区的最大值是4
//二:3和5取出最大值5,5和6取出最大值6,6和7取出最大值7,7和8取出最大值8,8和9取出最大值9====>第二个分区的最大值是9
//三:后面的执行逻辑是:_+_,就是说将两个分区的最大结果值求和,执行的结果是:(3)+4+9=16
scala> rdd1.aggregate(3)(math.max(_,_),_+_)
res62: Int = 16//下面的执行步骤是:
//一:5和1取出最大值5,5和2取出最大值5,5和3取出最大值5,5和4取出最大值5===》第一个分区的最大值是5
//二:5和5取出最大值5,5和6取出最大值6,6和7取出最大值7,7和8取出最大值8,8和9取出最大值9====>第二个分区的最大值是9
//三:后面的执行逻辑是:_+_,就是说将两个分区的最大结果值求和,执行的结果是:(5)+5+9=19
scala> rdd1.aggregate(5)(math.max(_,_),_+_)
res58: Int = 19再如:
//下面的执行步骤是:
//一:8和1取出最大值8,8和2取出最大值8,8和3取出最大值8,8和4取出最大值8===》第一个分区的最大值是8
//二:8和5取出最大值8,8和6取出最大值8,8和7取出最大值8,8和8取出最大值8,8和9取出最大值9====>第二个分区的最大值是9
//三:后面的执行逻辑是:_+_,就是说将两个分区的最大结果值求和,执行的结果是:(8)+8+9=25
scala> rdd1.aggregate(8)(math.max(_,_),_+_)
res58: Int = 19再如:
//下面的执行步骤是:
//一:10和1取出最大值10,10和2取出最大值10,10和3取出最大值10,10和4取出最大值10===》第一个分区的最大值是10
//二:10和5取出最大值10,10和6取出最大值10,10和7取出最大值10,10和8取出最大值10,10和9取出最大值10====>第二个分区的最大值是10
//三:后面的执行逻辑是:_+_,就是说将两个分区的最大结果值求和,执行的结果是:(10)+10+10=30
scala> rdd1.aggregate(10)(math.max(_,_),_+_)
res58: Int = 30================================================================================
下面是字符串的聚合
scala> val rdd2 = sc.parallelize(List("a","b","c","d","e","f"),2)rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[79] at parallelize at <console>:24scala> def fun2(index:Int,iter:Iterator[(String)]):Iterator[String] = {| iter.toList.map(x => "[partID:" + index + ",val:" + x + "]").iterator| }
fun2: (index: Int, iter: Iterator[String])Iterator[String]
//通过下面的结果可以知道:"a","b","c"在partID:0中,"d","e","f"在partID:1中
scala> rdd2.mapPartitionsWithIndex(fun2).collect
res63: Array[String] = Array([partID:0,val:a], [partID:0,val:b], [partID:0,val:c], [partID:1,val:d], [partID:1,val:e], [partID:1,val:f])
//下面的运行顺序是:
//一、""和"a"相加得"a","a"和"b"相加得"ab","ab"和"c"相加得"abc",第一个分区得到的结果是:"abc"
//一、""和"d"相加得"d","d"和"e"相加得"de","ed"和"f"相加得"def",第一个分区得到的结果是:"def"
//三、由于是并行的计算,所以可能是第一个分区先执行完,此时的结果是:"" + "abc" + "def" ===》"abcdef";若是第二个分区先执行完,此时的结果是:"" + "def" + "abc" ===》"defabc"
scala> rdd2.aggregate("")(_+_,_+_)
res64: String = abcdef
scala> rdd2.aggregate("")(_+_,_+_)
res65: String = defabc//下面的运行顺序是:
//一、"="和"a"相加得"=a","=a"和"b"相加得"=ab","=ab"和"c"相加得"=abc",第一个分区得到的结果是:"=abc"
//一、"="和"d"相加得"=d","=d"和"e"相加得"=de","=ed"和"f"相加得"=def",第一个分区得到的结果是:"=def"
//三、由于是并行的计算,所以可能是第一个分区先执行完,此时的结果是:"=" + "=abc" + "=def" ===》"==abc=def";若是第二个分区先执行完,此时的结果是:"="+"=def" + "=abc" ===》"==def=abc"
//下面的结果中分别是:res68: String = ==def=abc 和 res69: String = ==abc=def,和上面的推算结果一致
scala> rdd2.aggregate("=")(_ + _, _ + _)
res68: String = ==def=abc
scala> rdd2.aggregate("=")(_ + _, _ + _)
res69: String = ==abc=defval rdd3 = sc.parallelize(List("12","23","345","4567"),2)
//通过下面可以知道有两个分区,并且每个分区中有不同的值
scala> rdd3.mapPartitionsWithIndex(fun2).collect
res70: Array[String] = Array([partID:0,val:12], [partID:0,val:23], [partID:1,val:345], [partID:1,val:4567])
//下面的运行步骤是(scala> "".length结果是res72: Int = 0),(scala>"12".length结果是res73:Int=2):
//一:"".length和"12".length求出最大值2,得到字符串是"2";"2".length和"23".length求出最大值2,得到的字符串是2;所以第一个分区计算出的结果是:"2"
//二:"".length和"345".length求出最大值3,得到字符串是"3";"3".length和"4567".length求出最大值4,得到的字符串是4;所以第一个分区计算出的结果是:"4"
//三:得到的结果最后执行x+y,由于是并行计算所以可能是"24"或者"42"
scala> rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y)
res75: String = 24
scala> rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y)
res76: String = 42//下面求最小值:
scala> val rdd4 = sc.parallelize(List("12","23","345",""),2)
rdd4: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[84] at parallelize at <console>:24
scala> rdd4.mapPartitionsWithIndex(fun2).collect
res79: Array[String] = Array([partID:0,val:12], [partID:0,val:23], [partID:1,val:345], [partID:1,val:])
//运行过程是:
//一:"".length和"12".length求出最小值0,得到字符串是"0";"0".length和"23".length求出最小值1,得到的字符串是0;所以第一个分区计算出的结果是:"0"
//二:"".length和"345".length求出最小值0,得到字符串是"0";"0".length和"".length求出最小值0,得到的字符串是0;所以第一个分区计算出的结果是:"0"
//三:得到的结果最后执行x+y,由于是并行计算所以可能是"01"或"10"
scala> rdd4.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
res85: String = 10scala> rdd4.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
res86: String = 01val rdd5 = sc.parallelize(List("12","23","","345"),2)
//运行过程是:
//一:"".length和"12".length求出最小值0,得到字符串是"0";"0".length和"23".length求出最小值1,得到的字符串是0;所以第一个分区计算出的结果是:"0"
//二:"".length和"".length求出最小值0,得到字符串是"0";"0".length和"345".length求出最小值1,得到的字符串是1;所以第一个分区计算出的结果是:"1"
//三:得到的结果最后执行x+y,由于是并行计算所以可能是"1"或
rdd5.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)再如案例:
scala> def seqOP(a:Int, b:Int) : Int = {| println("seqOp: " + a + "\t" + b)| math.min(a,b)| }
seqOP: (a: Int, b: Int)Intscala> def combOp(a:Int, b:Int): Int = {| println("combOp: " + a + "\t" + b)| a + b| }
combOp: (a: Int, b: Int)Intscala> val z = sc. parallelize ( List (1 ,2 ,3 ,4 ,5 ,6) , 2)
//这里要注意的是上面的z是Int类型的,所以下面要用于集合迭代的类型也是Int类型的。
scala> def fun2(index:Int,iter:Iterator[(Int)]):Iterator[String] = {| iter.toList.map(x => "[partID:" + index + ",val:" + x + "]").iterator| }
fun2: (index: Int, iter: Iterator[Int])Iterator[String]
//通过下面的方式显示出每个值所在的分区
scala> z.mapPartitionsWithIndex(fun2).collect
res94: Array[String] = Array([partID:0,val:1], [partID:0,val:2], [partID:0,val:3], [partID:1,val:4], [partID:1,val:5], [partID:1,val:6])
//下面的含义是:两个分区每个里面先单独执行seqOP,两个都执行完成之后,再执行comOp逻辑,所以下面的运行过程是:
//一、3和1执行seqOP的最小值是1,1和2执行seqOP间的最小值是1,1和3执行seqOP的最小值是1,第一个分区得到的结果是1
//二、3和4执行seqOP的最小值是3,3和5执行seqOP间的最小值是3,3和6执行seqOP的最小值是3,第一个分区得到的结果是3
//三、接着执行comOp逻辑,(3)和分区一种的1执行combOp得到的结果是:3+1=4,4接着和分区二中的3执行combOp得到的结果是4+3=7,所以最后的结果是:7
scala> z.aggregate(3)(seqOP, combOp)
combOp:3    1
combOp:4    3
res95: Int = 7//再次验证:
//一、2和1执行seqOP的最小值是1,1和2执行seqOP间的最小值是1,1和3执行seqOP的最小值是1,第一个分区得到的结果是1
//二、2和4执行seqOP的最小值是2,2和5执行seqOP间的最小值是2,2和6执行seqOP的最小值是2,第一个分区得到的结果是2
//三、接着执行comOp逻辑,(2)和分区一种的1执行combOp得到的结果是:2+1=3,3接着和分区二中的2执行combOp得到的结果是3 + 2=5,所以最后的结果是:5
scala> z.aggregate(2)(seqOP, combOp)
[Stage 105:>                                                        (0 + 0) / 2]combOp:2    1
combOp:3    2
res96: Int = 5 //下面的同样:
scala> def seqOp(a:String, b:String) : String = {| println("seqOp: " + a + "\t" + b)| math.min(a.length , b.length ).toString| }
seqOp: (a: String, b: String)Stringscala> def combOp(a:String, b:String) : String = {|  println("combOp: " + a + "\t" + b)| a + b| }
combOp: (a: String, b: String)Stringscala> val z = sc. parallelize ( List ("12" ,"23" ,"345" ,"4567") ,2)
scala> z. aggregate ("")(seqOp, combOp)
seqOp:  345
seqOp:  12
seqOp: 0    4567
seqOp: 0    23
combOp:     1
combOp: 1   1res25: String = 11

练习10:
aggregateByKey

val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
def func2(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {iter.toList.map(x => "[partID:" +  index + ", val: " + x + "]").iterator
}
scala> pairRDD.mapPartitionsWithIndex(func2).collect
res99: Array[String] = Array([partID:0, val: (cat,2)], [partID:0, val: (cat,5)], [partID:0, val: (mouse,4)], [partID:1, val: (cat,12)], [partID:1, val: (dog,12)], [partID:1, val: (mouse,2)])
//执行过程是:
//1、每个分区中的内容都按照key先进行分组,
//第一个分区分组后的结果是:(cat,(2,5))、(mouse,(4))
//第二个分区分组后的结果是:(cat,(12))、(dog,(12))、(mouse,(2))
//2、接着0,分别和每组中的结果比对,
//对于分区一:0和cat中的2比较,得到最大值2;2和cat中的5比较,得到的最大结果是5。同样mouse执行相同操作。所以最终得到的是:(cat,(5)),(mouse,(4))
//对于分区二:0和cat中的12比较,得到的最大值12。依次类推,最终得到的结果是:(cat,(12))、(dog,(12))、(mouse,(2))
//3、接着0和分区一和分区二中每个最大值相加,最终得到的结果是:
// (cat,(5)) + (cat,(12)) ⇒ (cat,(5 + 12)) ==> (cat,(17))
//(mouse,(4)) + (mouse,(2)) ⇒ (mouse,(4 + 2)) ==> (mouse,(6))
//(dog,(12)) ⇒ (dog,(12)) ==> (dog,(12))
pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect//执行过程是:
//1、每个分区中的内容都按照key先进行分组,
//第一个分区分组后的结果是:(cat,(2,5))、(mouse,(4))
//第二个分区分组后的结果是:(cat,(12))、(dog,(12))、(mouse,(2))
//2、接着100,分别和每组中的结果比对,
//对于分区一:100和cat中的2比较,得到最大值100;100和cat中的5比较,得到的最大结果是100。同样mouse执行相同操作。所以最终得到的是:(cat,(100)),(mouse,(100))
//对于分区二:100和cat中的12比较,得到的最大值100。依次类推,最终得到的结果是:(cat,(100))、(dog,(100))、(mouse,(100))
//3、接着100和分区一和分区二中每个最大值相加,最终得到的结果是:
//(cat,(100)) + (cat,(100)) ⇒ (cat,(100 + 100)) ==> (cat,(200))
//(mouse,(100)) + (mouse,(100)) ⇒ (mouse,(100 + 100)) ==> (mouse,(200))
//(dog,(100)) + (dog,(100)) ⇒ (dog,(100)) ==> (dog,(100))
pairRDD.aggregateByKey(100)(math.max(_, _), _ + _).collect

练习11:
checkpoint (知识点可以查看:http://blog.csdn.net/tototuzuoquan/article/details/74838936)
为当前RDD设置检查点。该函数将会创建一个二进制的文件,并存储到checkpoint目录中,该目录是用SparkContext.setCheckpointDir()设置的。在checkpoint的过程中,该RDD的所有依赖于父RDD中的信息将全部被移出。对RDD进行checkpoint操作并不会马上被执行,必须执行Action操作才能触发。
函数原型:
def checkpoint()
实例:

scala> val data = sc.parallelize(1 to 100000,15)
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[94] at parallelize at <console>:24
scala> sc.setCheckpointDir("/iteblog")
17/07/07 19:17:22 WARN spark.SparkContext: Spark is not running in local mode, therefore the checkpoint directory must not be on the local filesystem. Directory '/iteblog' appears to be on the local filesystem.
scala> data.checkpoint
scala> data.count
res105: Long = 100000[root@hadoop2 hadoop-2.8.0]# hdfs dfs -ls /iteblog
Found 1 items
drwxr-xr-x   - root supergroup          0 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c
[root@hadoop2 hadoop-2.8.0]# hdfs dfs -ls /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c
Found 1 items
drwxr-xr-x   - root supergroup          0 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94
[root@hadoop2 hadoop-2.8.0]# hdfs dfs -ls /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94
Found 15 items
-rw-r--r--   3 root supergroup      71219 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00000
-rw-r--r--   3 root supergroup      71229 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00001
-rw-r--r--   3 root supergroup      71229 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00002
-rw-r--r--   3 root supergroup      71219 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00003
-rw-r--r--   3 root supergroup      71229 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00004
-rw-r--r--   3 root supergroup      71229 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00005
-rw-r--r--   3 root supergroup      71219 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00006
-rw-r--r--   3 root supergroup      71229 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00007
-rw-r--r--   3 root supergroup      71229 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00008
-rw-r--r--   3 root supergroup      71219 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00009
-rw-r--r--   3 root supergroup      71229 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00010
-rw-r--r--   3 root supergroup      71229 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00011
-rw-r--r--   3 root supergroup      71219 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00012
-rw-r--r--   3 root supergroup      71229 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00013
-rw-r--r--   3 root supergroup      71229 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00014
[root@hadoop2 hadoop-2.8.0]#
执行完count之后,会在/iteblog目录下产生出多个(数量和你分区个数有关)二进制的文件。
//设置检查点,将文件最终输出到下面的位置上
scala> sc.setCheckpointDir("hdfs://mycluster/wordcount/ck")scala> val rdd = sc.textFile("hdfs://mycluster/wordcount/input").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[100] at reduceByKey at <console>:24scala> rdd.checkpointscala> rdd.isCheckpointed
res108: Boolean = falsescala> rdd.count
res109: Long = 289                                                              scala> rdd.isCheckpointed
res110: Boolean = truescala> rdd.getCheckpointFile
res111: Option[String] = Some(hdfs://mycluster/wordcount/ck/8de9de76-3343-4166-bd3f-4ed0da31209e/rdd-100)scala> [root@hadoop2 hadoop-2.8.0]# hdfs dfs -ls hdfs://mycluster/wordcount/ck/8de9de76-3343-4166-bd3f-4ed0da31209e/rdd-100
Found 10 items
-rw-r--r--   3 root supergroup        147 2017-07-07 19:28 hdfs://mycluster/wordcount/ck/8de9de76-3343-4166-bd3f-4ed0da31209e/rdd-100/_partitioner
-rw-r--r--   3 root supergroup        867 2017-07-07 19:28 hdfs://mycluster/wordcount/ck/8de9de76-3343-4166-bd3f-4ed0da31209e/rdd-100/part-00000
-rw-r--r--   3 root supergroup        721 2017-07-07 19:28 hdfs://mycluster/wordcount/ck/8de9de76-3343-4166-bd3f-4ed0da31209e/rdd-100/part-00001
-rw-r--r--   3 root supergroup       1091 2017-07-07 19:28 hdfs://mycluster/wordcount/ck/8de9de76-3343-4166-bd3f-4ed0da31209e/rdd-100/part-00002
-rw-r--r--   3 root supergroup       1030 2017-07-07 19:28 hdfs://mycluster/wordcount/ck/8de9de76-3343-4166-bd3f-4ed0da31209e/rdd-100/part-00003
-rw-r--r--   3 root supergroup        944 2017-07-07 19:28 hdfs://mycluster/wordcount/ck/8de9de76-3343-4166-bd3f-4ed0da31209e/rdd-100/part-00004
-rw-r--r--   3 root supergroup        810 2017-07-07 19:28 hdfs://mycluster/wordcount/ck/8de9de76-3343-4166-bd3f-4ed0da31209e/rdd-100/part-00005
-rw-r--r--   3 root supergroup        964 2017-07-07 19:28 hdfs://mycluster/wordcount/ck/8de9de76-3343-4166-bd3f-4ed0da31209e/rdd-100/part-00006
-rw-r--r--   3 root supergroup       1011 2017-07-07 19:28 hdfs://mycluster/wordcount/ck/8de9de76-3343-4166-bd3f-4ed0da31209e/rdd-100/part-00007
-rw-r--r--   3 root supergroup        974 2017-07-07 19:28 hdfs://mycluster/wordcount/ck/8de9de76-3343-4166-bd3f-4ed0da31209e/rdd-100/part-00008

练习12:
coalesce, repartition

coalesce:对RDD中的分区重新进行合并
函数原型:
def coalesce(numPartitions: Int, shuffle: Boolean = false)
    (implicit ord: Ordering[T] = null): RDD[T]
  返回一个新的RDD,且该RDD的分区个数等于numPartitions个数。如果shuffle设置为true,这回进行shuffle。

scala> var data = sc.parallelize(List(1,2,3,4))
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[104] at parallelize at <console>:24scala> data.partitions.length
res115: Int = 6scala> val result = data.coalesce(2,false)
result: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[105] at coalesce at <console>:26scala> result.partitions.length
res116: Int = 2scala> result.toDebugString
res117: String =
(2) CoalescedRDD[105] at coalesce at <console>:26 []|  ParallelCollectionRDD[104] at parallelize at <console>:24 []scala> val result1 = data.coalesce(2,true)
result1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[109] at coalesce at <console>:26scala> result1.toDebugString
res118: String =
(2) MapPartitionsRDD[109] at coalesce at <console>:26 []|  CoalescedRDD[108] at coalesce at <console>:26 []|  ShuffledRDD[107] at coalesce at <console>:26 []+-(6) MapPartitionsRDD[106] at coalesce at <console>:26 []|  ParallelCollectionRDD[104] at parallelize at <console>:24 []scala>
从上面可以看出shuffle为false的时候并不进行shuffle操作;而为true的时候会进行shuffle操作。RDD.partitions.length可以获取相关RDD的分区数。再如下面的例子:
scala> val rdd1 = sc.parallelize(1 to 10,10)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[102] at parallelize at <console>:24scala> rdd1.collect
res112: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)scala> rdd1.partitions.length
res113: Int = 10scala> val rdd2 = rdd1.coalesce(2,false)
rdd2: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[103] at coalesce at <console>:26scala> rdd1.partitions.length
res114: Int = 10scala>

练习13:
collectAsMap
功能和collect函数类似,该函数用于Pair RDD,最终返回Map类型的结果
函数原型:
def collectAsMap(): Map[K, V]

scala> val rdd = sc.parallelize(List(("a",1),("b",2)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[111] at parallelize at <console>:24scala> rdd.collectAsMap
res119: scala.collection.Map[String,Int] = Map(b -> 2, a -> 1)scala>从结果我们可以看出,如果RDD中同一个key中存在多个Value,那么后面的Value将会把前面的Value覆盖,最终得到的结果就是Key唯一,而且对应一个Value。

练习14:
combineByKey
使用用户设置好的聚合函数对每个Key中的Value进行组合(combine)。可以将输入类型为RDD[(K,V)]转成RDD[(K,C)]
函数原型:
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C) : RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine:
Boolean = true, serializer: Serializer = null): RDD[(K, C)]
第一个和第二个函数都是基于第三个函数实现的,使用的是HashPartitioner,Serializer为null。而第三个函数我们可以指定分区,如果需要使用Serializer的话也可以指定。combineByKey函数比较重要,我们熟悉地诸如aggregateByKey、foldByKey、reduceByKey等函数都是基于函数实现的。默认情况在Map端进行组合操作。

scala> val data = sc.parallelize(List((1, "www"), (1, "iteblog"), (1, "com"), (2, "bbs"), (2, "iteblog"), (2, "com"), (3, "good")))
data: org.apache.spark.rdd.RDD[(Int, String)] =ParallelCollectionRDD[15] at parallelize at <console>:12scala> val result = data.combineByKey(List(_), (x: List [String], y: String) => y :: x, (x: List[String], y: List[String]) => x ::: y)
result: org.apache.spark.rdd.RDD[(Int, List[String])] =ShuffledRDD[19] at combineByKey at <console>:14scala> result.collect
res20: Array[(Int, List[String])] = Array((1,List(www, iteblog, com)),(2,List(bbs, iteblog, com)), (3,List(good)))scala> val data = sc.parallelize(List(("iteblog", 1), ("bbs", 1), ("iteblog", 3)))
data: org.apache.spark.rdd.RDD[(String, Int)] =ParallelCollectionRDD[24] at parallelize at <console>:12scala> val result = data.combineByKey(x => x, (x: Int, y:Int) => x + y, (x:Int, y: Int) => x + y)
result: org.apache.spark.rdd.RDD[(String, Int)] =ShuffledRDD[25] at combineByKey at <console>:14scala> result.collect
res27: Array[(String, Int)] = Array((iteblog,4), (bbs,1))再如:
val rdd1 = sc.textFile("hdfs://mycluster/wordcount/input").flatMap(_.split(" ")).map((_, 1))
val rdd2 = rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
rdd2.collectval rdd3 = rdd1.combineByKey(x => x + 10, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
rdd3.collectval rdd4 = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
val rdd5 = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)
val rdd6 = rdd5.zip(rdd4)
val rdd7 = rdd6.combineByKey(List(_), (x: List[String], y: String) => x :+ y, (m: List[String], n: List[String]) => m ++ n)

练习15
countByKey

scala> val rdd1 = sc.parallelize(List(("a",1),("b",2),("b",2),("c",2),("c",1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24scala> rdd1.countByKey
res0: scala.collection.Map[String,Long] = Map(a -> 1, b -> 2, c -> 2)           scala> rdd1.countByValue
res1: scala.collection.Map[(String, Int),Long] = Map((b,2) -> 2, (a,1) -> 1, (c,2) -> 1, (c,1) -> 1)scala>

练习16:
filterByRange

scala> val rdd1 = sc.parallelize(List(("e",5),("c",3),("d",4),("c",2),("a",1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[6] at parallelize at <console>:24scala> val rdd2 = rdd1.filterByRange("b","d")
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[7] at filterByRange at <console>:26scala> rdd2.collect
res2: Array[(String, Int)] = Array((c,3), (d,4), (c,2))

练习17:
flatMapValues

scala> a.flatMapValues(_.split(" "))
res5: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[10] at flatMapValues at <console>:27scala> a.flatMapValues(_.split(" ")).collect
res6: Array[(String, String)] = Array((a,1), (a,2), (b,3), (b,4))

练习18:
foldByKey

scala> val rdd1 = sc.parallelize(List("dog","wolf","cat","bear"),2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[12] at parallelize at <console>:24scala> val rdd2 = rdd1.map(x => (x.length,x))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[13] at map at <console>:26scala> rdd2.collect
res7: Array[(Int, String)] = Array((3,dog), (4,wolf), (3,cat), (4,bear))scala> val rdd3 = rdd2.foldByKey("")(_+_)
rdd3: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[14] at foldByKey at <console>:28scala> rdd3.collect
res8: Array[(Int, String)] = Array((4,bearwolf), (3,dogcat))scala> rdd3.collect
res9: Array[(Int, String)] = Array((4,wolfbear), (3,catdog))scala> val rdd = sc.textFile("hdfs://mycluster/wordcount/input/2.txt").flatMap(_.split(" ")).map((_,1))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[18] at map at <console>:24scala> rdd.foldByKey(0)(_+_)
res10: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[19] at foldByKey at <console>:27scala> rdd.foldByKey(0)(_+_).collect
res11: Array[(String, Int)] = Array((role,1), (Play,1), (fraud,1), (level,1), (business,2), (improve,1), (platforms,1), (order,1), (big,1), (with,1), (scientist,,1), (active,1), (valuable,1), (data,5), (information,1), (Cooperate,1), (Collecting,1), (framework,1), (E-commerce/payment,1), (acquired,1), (root,1), (accurate,1), (solutions,1), (analysis;Maintenance,1), (problems,1), (them,1), (Analyze,1), (models,1), (analysis,3), (realize,1), (actual,1), (weight,1), (compare,1), (risk,1), (anti-fraud,1), (key,1), (related,1), (base,1), (Support,1), (against,1), (automatic,1), (to,2), (platform,2), (company's,1), (in,2), (needs,,1), (provide,2), (implement,1), (affecting,1), (strategy,1), (of,1), (reports,1), (management,1), (detection,,1), (for,1), (work,,1), (cause,1), (an,1), (verify,1),...
scala>

foreachPartition
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
rdd1.foreachPartition(x => println(x.reduce(_ + _)))

练习19:
keyBy

scala> val rdd1 = sc.parallelize(List("dog","salmon","salmon","rat","elephant"),3)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[21] at parallelize at <console>:24scala> val rdd2 = rdd1.keyBy(_.length)
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[22] at keyBy at <console>:26scala> rdd2.collect
res12: Array[(Int, String)] = Array((3,dog), (6,salmon), (6,salmon), (3,rat), (8,elephant))scala>

练习20:
keys values

scala> val rdd1 = sc.parallelize(List("dog","tiger","lion","cat","panther","eagle"),2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[23] at parallelize at <console>:24scala> val rdd2 = rdd1.map(x => (x.length,x))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[24] at map at <console>:26scala> rdd2.keys.collect
res13: Array[Int] = Array(3, 5, 4, 3, 7, 5)scala> rdd2.values.collect
res14: Array[String] = Array(dog, tiger, lion, cat, panther, eagle)

Spark rdd 介绍,和案例介绍相关推荐

  1. Spark Streaming 图片处理案例介绍

    Spark Streaming 图片处理案例介绍 本文首先介绍了流式处理框架的设计原理.Spark Streaming 的工作原理,然后通过一个基于 Spark Streaming 编写的读取.分析. ...

  2. 机器学习算法——KD树算法介绍以及案例介绍

    系列文章目录 机器学习的一些常见算法介绍[线性回归,岭回归,套索回归,弹性网络] 文章目录 一.KD算法简介 1.1.kd树简介 1.2.怎样将一个K维数据划分到左子树或右子树? 1.3.在哪个维度上 ...

  3. Spark RDD 论文详解(一)摘要和介绍

    前言 本文隶属于专栏<1000个问题搞定大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见1000个问题搞定大数据技 ...

  4. 什么是HADOOP、产生背景、在大数据、云计算中的位置和关系、国内外HADOOP应用案例介绍、就业方向、生态圈以及各组成部分的简介(学习资料中的文档材料)

    1. HADOOP背景介绍 1. 1.1 什么是HADOOP 1.        HADOOP是apache旗下的一套开源软件平台 2.        HADOOP提供的功能:利用服务器集群,根据用户 ...

  5. spark之2:原理介绍

    spark之2:原理介绍 @(SPARK)[spark, 大数据] 1.spark是一个基于内存计算的开源的集群计算系统,目的是让数据分析更加快速.因此运行spark的机器应该尽量的大内存,如96G以 ...

  6. java spark dataset_Spark 2.0介绍:Dataset介绍和使用

    <Spark 2.0技术预览:更容易.更快速.更智能>文章中简单地介绍了 Dataset介绍 Dataset是从Spark 1.6开始引入的一个新的抽象,当时还是处于alpha版本:然而在 ...

  7. RDD之一:总体介绍

    摘要 本文提出了分布式内存抽象的概念--弹性分布式数据集(RDD,Resilient Distributed Datasets),它具备像MapReduce等数据流模型的容错特性,并且允许开发人员在大 ...

  8. spark (3)Spark Standalone集群安装介绍

    (1)初学者对于spark的几个疑问 http://aperise.iteye.com/blog/2302481 (2)spark开发环境搭建 http://aperise.iteye.com/blo ...

  9. Spark最详细安装和介绍--JavaApi!!!!!!!

    Spark概述 Spark官方介绍: Spark是什么? Apache Spark是用于大规模数据处理的统一分析引擎 Spark基于内存计算,提高了在大数据环境下数据处理的实时性,同时保证了高容错性和 ...

最新文章

  1. Linux 命令行小技巧《叹号的用处》
  2. CVPR 2022 | 室外多模态3D目标检测(DeepFusion)
  3. android系统下替换so库等操作
  4. SAE去掉index.php实现自定义固定链接
  5. Java基础IO流(五)RandomAccessFile
  6. python try语句相关(try/except/else/finally)
  7. script async和defer
  8. OpenCV彩色目标跟踪
  9. 一步步入门搭建SpringSecurity OAuth2(密码模式)
  10. 解决谷歌浏览器自动填充表单
  11. 车标识别 深度学习车标识别 神经网络车标识别 cnn车标识别 目标检测 yolo识别
  12. c语言ax2十bx十c=0的根,c++c语言计算aX2+bX+c=0的根。程序编写
  13. 没有心的男人 2012-02-08 21:11:06
  14. OC_AddressBook_通讯录
  15. 我要你觉得,我不要我觉得--根据企业现状实施DevOps
  16. 【XSS漏洞-06】XSS漏洞利用案例(浏览器劫持、会话劫持、GetShell)—基于神器beEF
  17. 软件缺陷报告模板(微信缺陷报告案例)
  18. mybatis if条件判断
  19. 冰达ROS机器人使用-实现slam建模、自主导航、避障
  20. 使用python对单幅图像进行数据增并保存增强后的结果

热门文章

  1. 基于SVD矩阵分解的用户商品推荐(python实现)
  2. LeetCode 情侣牵手 (贪心)
  3. JavaScript实现combine With Repetitions结合重复算法(附完整源码)
  4. wxWidgets:工具栏概述
  5. boost::stacktrace模块实现终止处理程序的测试程序
  6. boost::qvm::deduce_vec相关的测试程序
  7. boost::hof::implicit用法的测试程序
  8. boost::hana::maximum.by用法的测试程序
  9. boost::function_types::is_function_pointer用法的测试程序
  10. GDCM:将文件封装在RawData中的测试程序