一、DAG图

0.准备集群并启动

spark版本:

spark-1.5.2-bin-hadoop2.6

一主三从:

192.168.239.129  master

192.168.239.130  slave

192.168.239.131  slave

192.168.239.144  slave

配置:/home/software/spark-1.5.2-bin-hadoop2.6/conf/spark-env.sh

192.168.239.129  master

SPARK_LOCAL_IP=192.168.239.129
export SCALA_HOME=/home/software/scala-2.11.4
export JAVA_HOME=/home/software/jdk1.8
export HADOOP_HOME=/home/software/hadoop-2.7.1
export YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop
export SPARK_JAR=/home/software/spark-1.5.2-bin-hadoop2.6/lib/spark-assembly-1.5.2-hadoop2.6.0.jar

192.168.239.130  slave

SPARK_LOCAL_IP=192.168.239.130
export SCALA_HOME=/home/software/scala-2.11.4
export JAVA_HOME=/home/software/jdk1.8
export SPARK_JAR=/home/software/spark-1.5.2-bin-hadoop2.6/lib/spark-assembly-1.5.2-hadoop2.6.0.jar

192.168.239.131  slave

SPARK_LOCAL_IP=192.168.239.130
export SCALA_HOME=/home/software/scala-2.11.4
export JAVA_HOME=/home/software/jdk1.8
export SPARK_JAR=/home/software/spark-1.5.2-bin-hadoop2.6/lib/spark-assembly-1.5.2-hadoop2.6.0.jar

192.168.239.144  slave

SPARK_LOCAL_IP=192.168.239.130
export SCALA_HOME=/home/software/scala-2.11.4
export JAVA_HOME=/home/software/jdk1.8
export SPARK_JAR=/home/software/spark-1.5.2-bin-hadoop2.6/lib/spark-assembly-1.5.2-hadoop2.6.0.jar

启动:

master:bin/spark-shell --master=spark://192.168.239.129:7077  #master的IP地址

slave:bin/start-slave.sh spark://192.168.239.129:7077

1..准备代码:

map不发生shuffle
val rdd = sc.makeRDD(List(1,2,3,4,5,7,8))
rdd.map(x=>x*10)join发送shuffle
val rdd1 = sc.makeRDD(List(("1","a"),("2","b"),("3","c"),("4","d")))
val rdd2 = sc.makeRDD(List(("1","A"),("2","B"),("3","C"),("4","D")))
rdd1.join(rdd2)总例子
val list= scala.io.Source.fromFile(""hdfs:///spark/words.txt").getLines().toList
.map { (_,1) }
.groupBy { _._1 }
.mapValues{ _.map{ _._2 }.reduce{ _+_ } }查看分区工具:
import org.apache.spark.rdd.RDD
import scala.reflect.ClassTagobject su {def debug[T: ClassTag](rdd: RDD[T]) = {rdd.mapPartitionsWithIndex((i: Int, iter: Iterator[T]) => {val m = scala.collection.mutable.Map[Int, List[T]]()var list = List[T]()while (iter.hasNext) {list = list :+ iter.next}m(i) = listm.iterator}).collect().foreach((x: Tuple2[Int, List[T]]) => {val i = x._1println(s"partition:[$i]")x._2.foreach { println }})}
}

2.map不发生shuffle

scala> val rdd = sc.makeRDD(List(1,2,3,4,5,7,8))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:21scala> rdd.map(x=>x*10)
res0: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:24scala> import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDDscala> import scala.reflect.ClassTag
import scala.reflect.ClassTagscala> object su {|   def debug[T: ClassTag](rdd: RDD[T]) = {|     rdd.mapPartitionsWithIndex((i: Int, iter: Iterator[T]) => {|       val m = scala.collection.mutable.Map[Int, List[T]]()|       var list = List[T]()|       while (iter.hasNext) {|         list = list :+ iter.next|       }|       m(i) = list|       m.iterator|     }).collect().foreach((x: Tuple2[Int, List[T]]) => {|       val i = x._1|       println(s"partition:[$i]")|       x._2.foreach { println }|     })|   }| }
defined module suscala> su.debug(res0)
18/05/05 20:20:08 INFO spark.SparkContext: Starting job: collect at <console>:35
18/05/05 20:20:08 INFO scheduler.DAGScheduler: Got job 0 (collect at <console>:35) with 6 output partitions
18/05/05 20:20:08 INFO scheduler.DAGScheduler: Final stage: ResultStage 0(collect at <console>:35)
18/05/05 20:20:08 INFO scheduler.DAGScheduler: Parents of final stage: List()
18/05/05 20:20:08 INFO scheduler.DAGScheduler: Missing parents: List()
18/05/05 20:20:08 INFO scheduler.DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[2] at mapPartitionsWithIndex at <console>:27), which has no missing parents
18/05/05 20:20:09 INFO storage.MemoryStore: ensureFreeSpace(2304) called with curMem=0, maxMem=555755765
18/05/05 20:20:09 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 2.3 KB, free 530.0 MB)
18/05/05 20:20:09 INFO storage.MemoryStore: ensureFreeSpace(1388) called with curMem=2304, maxMem=555755765
18/05/05 20:20:09 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1388.0 B, free 530.0 MB)
18/05/05 20:20:09 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.239.129:57789 (size: 1388.0 B, free: 530.0 MB)
18/05/05 20:20:09 INFO spark.SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:861
18/05/05 20:20:09 INFO scheduler.DAGScheduler: Submitting 6 missing tasks from ResultStage 0 (MapPartitionsRDD[2] at mapPartitionsWithIndex at <console>:27)
18/05/05 20:20:09 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 6 tasks
18/05/05 20:20:09 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 192.168.239.130, PROCESS_LOCAL, 2029 bytes)
18/05/05 20:20:09 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 192.168.239.144, PROCESS_LOCAL, 2029 bytes)
18/05/05 20:20:09 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, 192.168.239.131, PROCESS_LOCAL, 2029 bytes)
18/05/05 20:20:09 INFO scheduler.TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, 192.168.239.130, PROCESS_LOCAL, 2029 bytes)
18/05/05 20:20:09 INFO scheduler.TaskSetManager: Starting task 4.0 in stage 0.0 (TID 4, 192.168.239.144, PROCESS_LOCAL, 2029 bytes)
18/05/05 20:20:09 INFO scheduler.TaskSetManager: Starting task 5.0 in stage 0.0 (TID 5, 192.168.239.131, PROCESS_LOCAL, 2033 bytes)
18/05/05 20:20:19 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.239.144:43403 (size: 1388.0 B, free: 534.5 MB)
18/05/05 20:20:19 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.239.130:53716 (size: 1388.0 B, free: 534.5 MB)
18/05/05 20:20:19 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.239.131:43057 (size: 1388.0 B, free: 534.5 MB)
18/05/05 20:20:28 INFO scheduler.TaskSetManager: Finished task 4.0 in stage 0.0 (TID 4) in 18557 ms on 192.168.239.144 (1/6)
18/05/05 20:20:28 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 18570 ms on 192.168.239.144 (2/6)
18/05/05 20:20:29 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 20147 ms on 192.168.239.130 (3/6)
18/05/05 20:20:29 INFO scheduler.TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) in 20089 ms on 192.168.239.130 (4/6)
18/05/05 20:20:30 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 20785 ms on 192.168.239.131 (5/6)
18/05/05 20:20:30 INFO scheduler.TaskSetManager: Finished task 5.0 in stage 0.0 (TID 5) in 20781 ms on 192.168.239.131 (6/6)
18/05/05 20:20:30 INFO scheduler.DAGScheduler: ResultStage 0 (collect at <console>:35) finished in 20.883 s
18/05/05 20:20:30 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
18/05/05 20:20:30 INFO scheduler.DAGScheduler: Job 0 finished: collect at <console>:35, took 22.654117 s
partition:[0]
10
partition:[1]
20
partition:[2]
30
partition:[3]
40
partition:[4]
50
partition:[5]
70
80

查看DAG图(使用Firefox浏览器,qq浏览器和Google浏览器无法查看DAG图,qq浏览器无法加载,Google浏览器加载出两个黑点):

3.join发生shuffle

scala> val rdd1 = sc.makeRDD(List(("1","a"),("2","b"),("3","c"),("4","d")))
rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[3] at makeRDD at <console>:23scala> val rdd2 = sc.makeRDD(List(("1","A"),("2","B"),("3","C"),("4","D")))
rdd2: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[4] at makeRDD at <console>:23scala> rdd1.join(rdd2)
res3: org.apache.spark.rdd.RDD[(String, (String, String))] = MapPartitionsRDD[7] at join at <console>:28scala> res3.collect
18/05/05 21:10:38 INFO spark.SparkContext: Starting job: collect at <console>:30
18/05/05 21:10:38 INFO scheduler.DAGScheduler: Registering RDD 3 (makeRDD at <console>:23)
18/05/05 21:10:38 INFO scheduler.DAGScheduler: Registering RDD 4 (makeRDD at <console>:23)
18/05/05 21:10:38 INFO scheduler.DAGScheduler: Got job 1 (collect at <console>:30) with 6 output partitions
18/05/05 21:10:38 INFO scheduler.DAGScheduler: Final stage: ResultStage 3(collect at <console>:30)
18/05/05 21:10:38 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 1, ShuffleMapStage 2)
18/05/05 21:10:38 INFO scheduler.DAGScheduler: Missing parents: List(ShuffleMapStage 1, ShuffleMapStage 2)
18/05/05 21:10:38 INFO scheduler.DAGScheduler: Submitting ShuffleMapStage 1 (ParallelCollectionRDD[3] at makeRDD at <console>:23), which has no missing parents
18/05/05 21:10:38 INFO storage.MemoryStore: ensureFreeSpace(1520) called with curMem=0, maxMem=555755765
18/05/05 21:10:38 INFO storage.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 1520.0 B, free 530.0 MB)
18/05/05 21:10:42 INFO storage.MemoryStore: ensureFreeSpace(982) called with curMem=1520, maxMem=555755765
18/05/05 21:10:42 INFO storage.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 982.0 B, free 530.0 MB)
18/05/05 21:10:42 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.239.129:57789 (size: 982.0 B, free: 530.0 MB)
18/05/05 21:10:42 INFO spark.SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:861
18/05/05 21:10:42 INFO scheduler.DAGScheduler: Submitting 6 missing tasks from ShuffleMapStage 1 (ParallelCollectionRDD[3] at makeRDD at <console>:23)
18/05/05 21:10:42 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 6 tasks
18/05/05 21:10:42 INFO scheduler.DAGScheduler: Submitting ShuffleMapStage 2 (ParallelCollectionRDD[4] at makeRDD at <console>:23), which has no missing parents
18/05/05 21:10:43 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 6, 192.168.239.130, PROCESS_LOCAL, 2072 bytes)
18/05/05 21:10:43 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 1.0 (TID 7, 192.168.239.131, PROCESS_LOCAL, 2129 bytes)
18/05/05 21:10:43 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 1.0 (TID 8, 192.168.239.144, PROCESS_LOCAL, 2129 bytes)
18/05/05 21:10:43 INFO scheduler.TaskSetManager: Starting task 3.0 in stage 1.0 (TID 9, 192.168.239.130, PROCESS_LOCAL, 2072 bytes)
18/05/05 21:10:43 INFO scheduler.TaskSetManager: Starting task 4.0 in stage 1.0 (TID 10, 192.168.239.131, PROCESS_LOCAL, 2129 bytes)
18/05/05 21:10:43 INFO scheduler.TaskSetManager: Starting task 5.0 in stage 1.0 (TID 11, 192.168.239.144, PROCESS_LOCAL, 2129 bytes)
18/05/05 21:10:43 INFO storage.MemoryStore: ensureFreeSpace(1520) called with curMem=2502, maxMem=555755765
18/05/05 21:10:43 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 1520.0 B, free 530.0 MB)
18/05/05 21:10:43 INFO storage.MemoryStore: ensureFreeSpace(984) called with curMem=4022, maxMem=555755765
18/05/05 21:10:43 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 984.0 B, free 530.0 MB)
18/05/05 21:10:43 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.239.129:57789 (size: 984.0 B, free: 530.0 MB)
18/05/05 21:10:43 INFO spark.SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:861
18/05/05 21:10:43 INFO scheduler.DAGScheduler: Submitting 6 missing tasks from ShuffleMapStage 2 (ParallelCollectionRDD[4] at makeRDD at <console>:23)
18/05/05 21:10:43 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0 with 6 tasks
18/05/05 21:10:43 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.239.144:43403 (size: 982.0 B, free: 534.5 MB)
18/05/05 21:10:45 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.239.131:43057 (size: 982.0 B, free: 534.5 MB)
18/05/05 21:10:46 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.239.130:53716 (size: 982.0 B, free: 534.5 MB)
18/05/05 21:10:48 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 2.0 (TID 12, 192.168.239.144, PROCESS_LOCAL, 2072 bytes)
18/05/05 21:10:48 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 2.0 (TID 13, 192.168.239.144, PROCESS_LOCAL, 2129 bytes)
18/05/05 21:10:48 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.239.144:43403 (size: 984.0 B, free: 534.5 MB)
18/05/05 21:10:48 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 2.0 (TID 14, 192.168.239.144, PROCESS_LOCAL, 2129 bytes)
18/05/05 21:10:48 INFO scheduler.TaskSetManager: Starting task 3.0 in stage 2.0 (TID 15, 192.168.239.144, PROCESS_LOCAL, 2072 bytes)
18/05/05 21:10:48 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 2.0 (TID 12) in 624 ms on 192.168.239.144 (1/6)
18/05/05 21:10:48 INFO scheduler.TaskSetManager: Starting task 4.0 in stage 2.0 (TID 16, 192.168.239.144, PROCESS_LOCAL, 2129 bytes)
18/05/05 21:10:48 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 2.0 (TID 13) in 629 ms on 192.168.239.144 (2/6)
18/05/05 21:10:48 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 1.0 (TID 8) in 5949 ms on 192.168.239.144 (1/6)
18/05/05 21:10:48 INFO scheduler.TaskSetManager: Finished task 5.0 in stage 1.0 (TID 11) in 5962 ms on 192.168.239.144 (2/6)
18/05/05 21:10:48 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 2.0 (TID 14) in 464 ms on 192.168.239.144 (3/6)
18/05/05 21:10:48 INFO scheduler.TaskSetManager: Starting task 5.0 in stage 2.0 (TID 17, 192.168.239.144, PROCESS_LOCAL, 2129 bytes)
18/05/05 21:10:48 INFO scheduler.TaskSetManager: Finished task 3.0 in stage 2.0 (TID 15) in 454 ms on 192.168.239.144 (4/6)
18/05/05 21:10:49 INFO scheduler.TaskSetManager: Finished task 5.0 in stage 2.0 (TID 17) in 30 ms on 192.168.239.144 (5/6)
18/05/05 21:10:49 INFO scheduler.TaskSetManager: Finished task 4.0 in stage 2.0 (TID 16) in 94 ms on 192.168.239.144 (6/6)
18/05/05 21:10:49 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool
18/05/05 21:10:49 INFO scheduler.DAGScheduler: ShuffleMapStage 2 (makeRDD at <console>:23) finished in 5.973 s
18/05/05 21:10:49 INFO scheduler.DAGScheduler: looking for newly runnable stages
18/05/05 21:10:49 INFO scheduler.DAGScheduler: running: Set(ShuffleMapStage 1)
18/05/05 21:10:49 INFO scheduler.DAGScheduler: waiting: Set(ResultStage 3)
18/05/05 21:10:49 INFO scheduler.DAGScheduler: failed: Set()
18/05/05 21:10:50 INFO scheduler.DAGScheduler: Missing parents for ResultStage 3: List(ShuffleMapStage 1)
18/05/05 21:10:50 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 1.0 (TID 6) in 7303 ms on 192.168.239.130 (3/6)
18/05/05 21:10:50 INFO scheduler.TaskSetManager: Finished task 3.0 in stage 1.0 (TID 9) in 7297 ms on 192.168.239.130 (4/6)
18/05/05 21:10:50 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 1.0 (TID 7) in 7687 ms on 192.168.239.131 (5/6)
18/05/05 21:10:50 INFO scheduler.TaskSetManager: Finished task 4.0 in stage 1.0 (TID 10) in 7687 ms on 192.168.239.131 (6/6)
18/05/05 21:10:50 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
18/05/05 21:10:50 INFO scheduler.DAGScheduler: ShuffleMapStage 1 (makeRDD at <console>:23) finished in 7.695 s
18/05/05 21:10:50 INFO scheduler.DAGScheduler: looking for newly runnable stages
18/05/05 21:10:50 INFO scheduler.DAGScheduler: running: Set()
18/05/05 21:10:50 INFO scheduler.DAGScheduler: waiting: Set(ResultStage 3)
18/05/05 21:10:50 INFO scheduler.DAGScheduler: failed: Set()
18/05/05 21:10:50 INFO scheduler.DAGScheduler: Missing parents for ResultStage 3: List()
18/05/05 21:10:50 INFO scheduler.DAGScheduler: Submitting ResultStage 3 (MapPartitionsRDD[7] at join at <console>:28), which is now runnable
18/05/05 21:10:50 INFO storage.MemoryStore: ensureFreeSpace(2816) called with curMem=5006, maxMem=555755765
18/05/05 21:10:50 INFO storage.MemoryStore: Block broadcast_3 stored as values in memory (estimated size 2.8 KB, free 530.0 MB)
18/05/05 21:10:50 INFO storage.MemoryStore: ensureFreeSpace(1558) called with curMem=7822, maxMem=555755765
18/05/05 21:10:50 INFO storage.MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 1558.0 B, free 530.0 MB)
18/05/05 21:10:50 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on 192.168.239.129:57789 (size: 1558.0 B, free: 530.0 MB)
18/05/05 21:10:50 INFO spark.SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:861
18/05/05 21:10:50 INFO scheduler.DAGScheduler: Submitting 6 missing tasks from ResultStage 3 (MapPartitionsRDD[7] at join at <console>:28)
18/05/05 21:10:50 INFO scheduler.TaskSchedulerImpl: Adding task set 3.0 with 6 tasks
18/05/05 21:10:50 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 3.0 (TID 18, 192.168.239.144, PROCESS_LOCAL, 1974 bytes)
18/05/05 21:10:50 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 3.0 (TID 19, 192.168.239.131, PROCESS_LOCAL, 1974 bytes)
18/05/05 21:10:50 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 3.0 (TID 20, 192.168.239.130, PROCESS_LOCAL, 1974 bytes)
18/05/05 21:10:50 INFO scheduler.TaskSetManager: Starting task 3.0 in stage 3.0 (TID 21, 192.168.239.144, PROCESS_LOCAL, 1974 bytes)
18/05/05 21:10:50 INFO scheduler.TaskSetManager: Starting task 4.0 in stage 3.0 (TID 22, 192.168.239.131, PROCESS_LOCAL, 1974 bytes)
18/05/05 21:10:50 INFO scheduler.TaskSetManager: Starting task 5.0 in stage 3.0 (TID 23, 192.168.239.130, PROCESS_LOCAL, 1974 bytes)
18/05/05 21:10:50 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on 192.168.239.131:43057 (size: 1558.0 B, free: 534.5 MB)
18/05/05 21:10:51 INFO spark.MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to 192.168.239.131:58335
18/05/05 21:10:51 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on 192.168.239.130:53716 (size: 1558.0 B, free: 534.5 MB)
18/05/05 21:10:51 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on 192.168.239.144:43403 (size: 1558.0 B, free: 534.5 MB)
18/05/05 21:10:51 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 196 bytes
18/05/05 21:10:52 INFO spark.MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to 192.168.239.144:49621
18/05/05 21:10:52 INFO spark.MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to 192.168.239.130:56518
18/05/05 21:10:53 INFO spark.MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 1 to 192.168.239.131:58335
18/05/05 21:10:53 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 175 bytes
18/05/05 21:10:53 INFO spark.MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 1 to 192.168.239.144:49621
18/05/05 21:10:53 INFO spark.MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 1 to 192.168.239.130:56518
18/05/05 21:10:54 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 3.0 (TID 18) in 3432 ms on 192.168.239.144 (1/6)
18/05/05 21:10:54 INFO scheduler.TaskSetManager: Finished task 3.0 in stage 3.0 (TID 21) in 3509 ms on 192.168.239.144 (2/6)
18/05/05 21:10:54 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 3.0 (TID 19) in 3737 ms on 192.168.239.131 (3/6)
18/05/05 21:10:54 INFO scheduler.TaskSetManager: Finished task 4.0 in stage 3.0 (TID 22) in 3738 ms on 192.168.239.131 (4/6)
18/05/05 21:10:54 INFO scheduler.TaskSetManager: Finished task 5.0 in stage 3.0 (TID 23) in 3758 ms on 192.168.239.130 (5/6)
18/05/05 21:10:54 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 3.0 (TID 20) in 3982 ms on 192.168.239.130 (6/6)
18/05/05 21:10:54 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool
18/05/05 21:10:54 INFO scheduler.DAGScheduler: ResultStage 3 (collect at <console>:30) finished in 4.009 s
18/05/05 21:10:54 INFO scheduler.DAGScheduler: Job 1 finished: collect at <console>:30, took 16.052410 s
res4: Array[(String, (String, String))] = Array((1,(a,A)), (2,(b,B)), (3,(c,C)), (4,(d,D)))scala> 

序号

名称

说明

1

PROCESS_LOCAL

表示HDFS数据和worker是在同一个进程

2

NODE_LOCAL

表示HDFS数据和worker是在同一台服务器

3

NO_PREF

表示spark不能确定数据的位置

4

RACK_LOCAL

表示HDFS的数据和worker在同一个机架内

5

ANY

表示HDFS的数据和worker不在同一个机架内

二、机器学习

MLLib(科学计算,数学的功底)

1)  把实际需求进行转化,数学公式去进行计算,这个过程数据建模

需求:想在海淀区买200平米,多少钱?

1)  有了模型就可以带入数学公式进行计算(机器学习)

1)  找一根离真实数据(样本数据)最近的一根线

2)  x=200,x称为特征值

3)  蓝点这些数据称为样本数据,它将来用来训练模型

三、数学建模

目的:让计算机来技术

公式:直线公式

x0=1,过原点直线

数学中减少变量,变量越少越好计算

如果把这些变量都带入值,自然直线就出来了

带无数变量的值,找到最接近蓝色点的线,把这个数学公式的结果叫做预测函数

检验最终是哪个直线最接近真实值,检验函数就称为Cost误差函数

1)h代表预测函数,x代表特征值,m=22,代表样本数量2(x0,x1)特征值数量

2)y算出的结果(真实成交房价)m样本数量,

3)预测值-真实值,结果越小越好 100, 0.00000000001,0.000000001,数学上推出均方差,把差距值放大

m个(h-y)平方(消除正负的影响),相加,累加,结果除以/2m

四、总结

1)  预测函数h,利用公式可以算出预测房价

2)  通过误差函数来验证哪个h是最接近真实成交价

上面两个公式定出,计算机就可以完成

数据建模过程,就是把业务需求转成数学公式。

误差函数算完有很多结果

五、 梯度下降法来实现

1)  跳跃很多点,大量点不需计算,计算速度加快

2)  增加一个参数a,下降速率,通过它减小台阶高度

通过梯度下降法就获取到最小值

六、工具,octave,matlab(进行科学计算)

已经有现成api,可以方便画出图形,矩阵,向量,生成特殊测试数据

矩阵(多列)

1 2

3 4

向量(一列)

1

2

3

计算机以矩阵,向量运算比直接带入公式计算更加方便表达,计算方便

工具:

1.matlab商业版需要付费

R语言,并发语言,go/scala。R语言中加了很多科学计算实现,scala自己去按R实现这些数学公式,spark基于RDD,逐渐把这些科学计算数学公式利用RDD实现一遍

2.Octave开源版本matlab,功能远小于matlab

(1)向量是特殊矩阵,列只有一列

矩阵*向量=矩阵行每个元素乘以对应向量,累加和

1 4

2 5

3 6

向量

1

2

结果:

1*1+4*2=9

2*1+5*2=12

3*1+6*2=15

矩阵*向量,结果向量

习惯:矩阵大写,向量小写,

(2)提供了很多常用函数

读取文件:

cd c:        #进入c盘
cd txt      #进入目录
ls          #列出当前目录下文件
F = load("prices.txt")   #也可以直接指定路径c:/prices/txt

(3)集合运算

矩阵相加、减、乘

之前的预测函数就是矩阵相乘的每一行的结果

A=[θo,θ1;θo,θ1;θo,θ1]       三行数据

B=[Xo;X1]

第一行:θo*Xo + θ1*X1

第二行:θo*Xo + θ1*X1

第三行:θo*Xo + θ1*X1

七、案例实现

1.变成矩阵和向量运算,生成公式

1)  读文件

>> F = load("prices.txt")
F =825.0000      1.0000    135.0000      3.0000      2.0000997.5000      1.0000    133.0000      3.0000      2.00001005.0000      1.0000    134.0000      3.0000      2.0000384.0000      1.0000     64.0000      3.0000      2.0000270.0000      1.0000     45.0000      2.0000      1.0000459.6000      1.0000     76.0000      1.0000      1.0000388.8000      1.0000     64.8000      1.0000      1.0000713.4000      1.0000    118.9000      1.0000      1.0000218.4000      1.0000     39.0000      1.0000      1.00001145.5000      1.0000    145.0000      3.0000      2.00001864.4000      1.0000    236.0000      4.0000      2.0000539.0000      1.0000     77.0000      2.0000      2.0000679.0000      1.0000     97.0000      3.0000      2.0000756.0000      1.0000    108.0000      3.0000      2.0000784.0000      1.0000    112.0000      3.0000      2.0000487.5000      1.0000     75.0000      1.0000      2.0000780.0000      1.0000    120.0000      2.0000      2.0000780.0000      1.0000    120.0000      2.0000      2.0000780.0000      1.0000    120.0000      3.0000      2.0000995.5000      1.0000    147.0000      3.0000      2.00001072.5000      1.0000    165.0000      3.0000      2.00001072.5000      1.0000    165.0000      3.0000      2.0000

2)  m=样本数量

>> m=size(F,1)
m =  22

3)  x0,很多1,m个1

>> x0=ones(m,1)
x0 =1111111111111111111111

4)  x1,文件读取第三列

>> x1=F(:,3)
x1 =135.000133.000134.00064.00045.00076.00064.800118.90039.000145.000236.00077.00097.000108.000112.00075.000120.000120.000120.000147.000165.000165.000

5)  X=x0+x1矩阵合并

>> X=[x0,x1]
X =1.0000   135.00001.0000   133.00001.0000   134.00001.0000    64.00001.0000    45.00001.0000    76.00001.0000    64.80001.0000   118.90001.0000    39.00001.0000   145.00001.0000   236.00001.0000    77.00001.0000    97.00001.0000   108.00001.0000   112.00001.0000    75.00001.0000   120.00001.0000   120.00001.0000   120.00001.0000   147.00001.0000   165.00001.0000   165.0000

6)  y=F(:,1)

>> x1=F(:,3)
x1 =135.000133.000134.00064.00045.00076.00064.800118.90039.000145.000236.00077.00097.000108.000112.00075.000120.000120.000120.000147.000165.000165.000

7)z=[6;7]

z代表,此处赋值为6和7,他是不断变化的

>> z=[6;7]
z =67

8)H=X*z

9)  H-y

10)  r=(H-y).^2

11)  sum(r)

12) sum(r)/2/m

8)-12)整合后:sum((X*z-y).^2)/2/m

>> sum((X*z-y).^2)/2/m
ans =  3445.9

2.利用算法得到θ0θ1的值,之前的误差COST函数就发挥作用了。

1.理论:

Z 返回值=函数名(参数)

z0就是θ0,给个范围-100~100

z1就是θ1,给个范围-100~100

X就是之前的矩阵

y就是之前的向量

function Z = costFunction(z0,z1,X,y)Z = zeros(length(z0),length(z1));  % 初始0的集合m = length(y);                         % 样本个数% 通过两层循环不断的尝试θ0和θ1的组合for i=1:length(z0)for j=1:length(z1)% 运算公式,构建了100*100的组合都放在Z(i,j)中Z(i,j) = sum((X*[z0(i);z1(j)]-y).^2)/(2*m);endendZ = Z';       % 单撇矩阵转置
end

2.创建一个自定义函数:

(1)将以上代码保存编码格式为utf-8(防止乱码);

(2)文件名必须和函数名称一致

(3)将文件后缀名改为.m(这样可以使Octave可以识别识别后的文件图标如下图);

验证这个函数是否被加载

调用函数名,参数故意给错

如果提示的是函数名未定义undefined如下图,证明该函数未被加载

>> costFunction(x)
error: 'costFunction' undefined near line 1 column 1

如果提示是参数不对,代表octave工具已经识别函数

>> costFunction(x)
error: 'z1' undefined near line 2 column 30
error: called fromcostFunction at line 2 column 4
error: evaluating argument list element number 1
error: called fromcostFunction at line 2 column 4
error: evaluating argument list element number 2
error: called fromcostFunction at line 2 column 4

3.调用自定义函数

z0就是θ0,给个范围-100~100

>> z0=[-100:1:100]   #这样的编写会将-100到100,步长为1的所有值都打印出来
z0 =Columns 1 through 25:-100   -99   -98   -97   -96   -95   -94   -93   -92   -91   -90   -89   -88   -87   -86   -85   -84   -83   -82   -81   -80   -79   -78   -77   -76Columns 26 through 50:-75   -74   -73   -72   -71   -70   -69   -68   -67   -66   -65   -64   -63   -62   -61   -60   -59   -58   -57   -56   -55   -54   -53   -52   -51Columns 51 through 75:-50   -49   -48   -47   -46   -45   -44   -43   -42   -41   -40   -39   -38   -37   -36   -35   -34   -33   -32   -31   -30   -29   -28   -27   -26Columns 76 through 100:-25   -24   -23   -22   -21   -20   -19   -18   -17   -16   -15   -14   -13   -12   -11   -10    -9    -8    -7    -6    -5    -4    -3    -2    -1Columns 101 through 125:0     1     2     3     4     5     6     7     8     9    10    11    12    13    14    15    16    17    18    19    20    21    22    23    24Columns 126 through 150:25    26    27    28    29    30    31    32    33    34    35    36    37    38    39    40    41    42    43    44    45    46    47    48    49Columns 151 through 175:50    51    52    53    54    55    56    57    58    59    60    61    62    63    64    65    66    67    68    69    70    71    72    73    74Columns 176 through 200:75    76    77    78    79    80    81    82    83    84    85    86    87    88    89    90    91    92    93    94    95    96    97    98    99Column 201:100>> z0=[-100:1:100];   #这样的编写不会将-100到100,步长为1的所有值都打印出来

z1就是θ1,给个范围-100~100

>> z1=[-100:1:100];

X就是之前的矩阵

y就是之前的向量

调用函数:

>> J=costFunction(z0,z1,X,y)
J =Columns 1 through 10:8.6161e+007  8.6149e+007  8.6137e+007  8.6125e+007  8.6113e+007  8.6100e+007  8.6088e+007  8.6076e+007  8.6064e+007  8.6051e+0078.4569e+007  8.4557e+007  8.4545e+007  8.4533e+007  8.4521e+007  8.4508e+007  8.4496e+007  8.4484e+007  8.4472e+007  8.4460e+0078.2991e+007  8.2979e+007  8.2967e+007  8.2955e+007  8.2943e+007  8.2931e+007  8.2919e+007  8.2907e+007  8.2895e+007  8.2883e+0078.1429e+007  8.1417e+007  8.1405e+007  8.1393e+007  8.1381e+007  8.1369e+007  8.1357e+007  8.1346e+007  8.1334e+007  8.1322e+0077.9881e+007  7.9869e+007  7.9857e+007  7.9846e+007  7.9834e+007  7.9822e+007  7.9810e+007  7.9798e+007  7.9787e+007  7.9775e+0077.8348e+007  7.8336e+007  7.8325e+007  7.8313e+007  7.8301e+007  7.8290e+007  7.8278e+007  7.8266e+007  7.8255e+007  7.8243e+0077.6830e+007  7.6818e+007  7.6807e+007  7.6795e+007  7.6784e+007  7.6772e+007  7.6760e+007  7.6749e+007  7.6737e+007  7.6726e+0077.5326e+007  7.5315e+007  7.5304e+007  7.5292e+007  7.5281e+007  7.5269e+007  7.5258e+007  7.5246e+007  7.5235e+007  7.5224e+0077.3838e+007  7.3827e+007  7.3815e+007  7.3804e+007  7.3793e+007  7.3781e+007  7.3770e+007  7.3759e+007  7.3747e+007  7.3736e+0077.2364e+007  7.2353e+007  7.2342e+007  7.2331e+007  7.2320e+007  7.2308e+007  7.2297e+007  7.2286e+007  7.2275e+007  7.2264e+0077.0906e+007  7.0894e+007  7.0883e+007  7.0872e+007  7.0861e+007  7.0850e+007  7.0839e+007  7.0828e+007  7.0817e+007  7.0806e+0076.9462e+007  6.9451e+007  6.9440e+007  6.9429e+007  6.9418e+007  6.9407e+007  6.9396e+007  6.9385e+007  6.9374e+007  6.9363e+0076.8033e+007  6.8022e+007  6.8011e+007  6.8000e+007  6.7989e+007  6.7978e+007  6.7968e+007  6.7957e+007  6.7946e+007  6.7935e+0076.6618e+007  6.6608e+007  6.6597e+007  6.6586e+007  6.6576e+007  6.6565e+007  6.6554e+007  6.6543e+007  6.6533e+007  6.6522e+0076.5219e+007  6.5209e+007  6.5198e+007  6.5187e+007  6.5177e+007  6.5166e+007  6.5155e+007  6.5145e+007  6.5134e+007  6.5124e+0076.3835e+007  6.3824e+007  6.3814e+007  6.3803e+007  6.3793e+007  6.3782e+007  6.3772e+007  6.3761e+007  6.3751e+007  6.3740e+0076.2465e+007  6.2455e+007  6.2444e+007  6.2434e+007  6.2423e+007  6.2413e+007  6.2403e+007  6.2392e+007  6.2382e+007  6.2371e+0076.1110e+007  6.1100e+007  6.1090e+007  6.1079e+007  6.1069e+007  6.1059e+007  6.1049e+007  6.1038e+007  6.1028e+007  6.1018e+0075.9770e+007  5.9760e+007  5.9750e+007  5.9740e+007  5.9730e+007  5.9720e+007  5.9709e+007  5.9699e+007  5.9689e+007  5.9679e+0075.8445e+007  5.8435e+007  5.8425e+007  5.8415e+007  5.8405e+007  5.8395e+007  5.8385e+007  5.8375e+007  5.8365e+007  5.8355e+0075.7135e+007  5.7125e+007  5.7115e+007  5.7105e+007  5.7095e+007  5.7085e+007  5.7075e+007  5.7065e+007  5.7056e+007  5.7046e+0075.5840e+007  5.5830e+007  5.5820e+007  5.5810e+007  5.5800e+007  5.5791e+007  5.5781e+007  5.5771e+007  5.5761e+007  5.5751e+007

生成图像,可以直观的查看

plot3(J)     //生成3D的图,速度比较慢,稍加等待
plot(J)         //生成2D的图

4.自定义函数并调用

伪代码:

循环很多次,不断的改变θ1,观察J(θ1)的值如果值3000,然后继续循环,还是3000,继续循环,还是3000,不变化了。就可以认为找到最小值了。

function [z,J_his] = descentFunction(z,X,y,a,iters)J_his = zeros(iters,1);m = length(y);      % 样本数n = length(z);        t = zeros(n,1);for iter = 1:itersfor i = 1:n% 变化率t(i) = (a/m)*(X*z-y)'*X(:,i);end;for i = 1:nz(i) = z(i) - t(i);end;J_his(iter) = sum((X*z-y).^2)/(2*m);end
end

将上述代码编写为.m文件后由以下调用

>> cd c:
>> dir
$360Section                Documents and Settings     PerfLogs                   Users                      grldr
$Recycle.Bin               Drivers                    Program Files              Windows                    mm.cfg
360SANDBOX                 InstallConfig.ini          Program Files (x86)        bootmgr                    offline_FtnInfo.txt
Boot                       Intel                      ProgramData                ckcore.txt                 pagefile.sys
Config.Msi                 MSOCache                   RECYCLER                   costFunction.m
DRMsoft                    OEMSF                      System Volume Information  descentFunction.m
>> descentFunction(1)
error: 'iters' undefined near line 2 column 16
error: called fromdescentFunction at line 2 column 8
error: evaluating argument list element number 1
error: called fromdescentFunction at line 2 column 8
>> z=[0;6]
z =06>> A = load("prices.txt")
A =825.0000      1.0000    135.0000      3.0000      2.0000997.5000      1.0000    133.0000      3.0000      2.00001005.0000      1.0000    134.0000      3.0000      2.0000384.0000      1.0000     64.0000      3.0000      2.0000270.0000      1.0000     45.0000      2.0000      1.0000459.6000      1.0000     76.0000      1.0000      1.0000388.8000      1.0000     64.8000      1.0000      1.0000713.4000      1.0000    118.9000      1.0000      1.0000218.4000      1.0000     39.0000      1.0000      1.00001145.5000      1.0000    145.0000      3.0000      2.00001864.4000      1.0000    236.0000      4.0000      2.0000539.0000      1.0000     77.0000      2.0000      2.0000679.0000      1.0000     97.0000      3.0000      2.0000756.0000      1.0000    108.0000      3.0000      2.0000784.0000      1.0000    112.0000      3.0000      2.0000487.5000      1.0000     75.0000      1.0000      2.0000780.0000      1.0000    120.0000      2.0000      2.0000780.0000      1.0000    120.0000      2.0000      2.0000780.0000      1.0000    120.0000      3.0000      2.0000995.5000      1.0000    147.0000      3.0000      2.00001072.5000      1.0000    165.0000      3.0000      2.00001072.5000      1.0000    165.0000      3.0000      2.0000>> X=A(:,2:3)
X =1.0000   135.00001.0000   133.00001.0000   134.00001.0000    64.00001.0000    45.00001.0000    76.00001.0000    64.80001.0000   118.90001.0000    39.00001.0000   145.00001.0000   236.00001.0000    77.00001.0000    97.00001.0000   108.00001.0000   112.00001.0000    75.00001.0000   120.00001.0000   120.00001.0000   120.00001.0000   147.00001.0000   165.00001.0000   165.0000>> y=A(:,1)
y =825.00997.501005.00384.00270.00459.60388.80713.40218.401145.501864.40539.00679.00756.00784.00487.50780.00780.00780.00995.501072.501072.50>> a=0.0001
a =   1.0000e-004
>> iters=1
iters =  1
>> [z,J]=descentFunction(z,X,y,a,iters)
z =0.00917007.4036164J =  4837.5
>>

小结:

1)  将需求转成数学公式,预测函数,误差函数

2)  通过数学公式来解决需求,带入所有的变量就可以进行计算

3)  所有的数据都带入来计算,计算次数太多,计算速度慢

4)  减少计算次数,梯度下降法,跳过很多值,越到下面越密集,a梯度下降法速率(机器学习速率)a=0.0001

5)  利用oactave工具就可以实现计算,向量和矩阵运算,方便展现,方便计算快速计算

八、spark实现案例

1.   spark就封装成函数库

线性回归函数LinearRegressionWithSGD

spark机器学习,就要知道,需求最终对应的是哪个函数方法

数据海量,

spark不可能把所有的数据一次读入内存,

转成一次读一条数据

1)  把一条数据封装成一个对象,LabeledPoint对象,基于RDD

2)  Vector向量,x0,x1叫特征点

3)  y,label

4)  机器学习中的所有的值的类型都是Double

5)  带入样本数据,选择合适函数库中对应函数,线性回归算法,进行训练

6)  得到的结果就叫,模型

7)  有了模型,带入特征值,直接就返回预测值。x1=200,结果:

2.稠密向量和稀疏向量

向量(1.0,0.0,0.0,0.0,0.0,0.0,3.0)

用密集格式表示为[1.0,0.0,0.0,0.0,0.0,0.0,3.0],

用稀疏格式表示为(7,[0,6],[1.0,3.0])

7代表个数;[0,6]存放位置,第1个位置,第7个位置;[1.0,3.0]第一个位置存放1.0,第7个位置存放3.0。

存储长度稀疏向量比稠密向量占用存储空间小。

差异:0的元素值处理方式。实际开发中含有大量0。

3.准备数据

4.使用spark来预估指定大小房间的价格

scala> import org.apache.spark.mllib.regression.LinearRegressionModel
import org.apache.spark.mllib.regression.LinearRegressionModelscala> import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.regression.LabeledPointscala> import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.Vectorsscala> import org.apache.spark.mllib.regression.LinearRegressionWithSGD
import org.apache.spark.mllib.regression.LinearRegressionWithSGDscala> Vectors.dense(1,2,3)
res5: org.apache.spark.mllib.linalg.Vector = [1.0,2.0,3.0]scala> Vectors.sparse(5,Array(0,1),Array(2,3))
res6: org.apache.spark.mllib.linalg.Vector = (5,[0,1],[2.0,3.0])scala> scala> val v = Vectors.dense(1,2,3);
v: org.apache.spark.mllib.linalg.Vector = [1.0,2.0,3.0]scala> val y = 4
y: Int = 4scala> val l = LabeledPoint(y, v)
l: org.apache.spark.mllib.regression.LabeledPoint = (4.0,[1.0,2.0,3.0])scala> scala> val rdd = sc.textFile("hdfs:///spark/prices.txt")
18/05/06 05:32:33 INFO storage.MemoryStore: ensureFreeSpace(216696) called with curMem=83219, maxMem=555755765
18/05/06 05:32:33 INFO storage.MemoryStore: Block broadcast_5 stored as values in memory (estimated size 211.6 KB, free 529.7 MB)
18/05/06 05:32:33 INFO storage.MemoryStore: ensureFreeSpace(19939) called with curMem=299915, maxMem=555755765
18/05/06 05:32:33 INFO storage.MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 19.5 KB, free 529.7 MB)
18/05/06 05:32:33 INFO storage.BlockManagerInfo: Added broadcast_5_piece0 in memory on 192.168.239.129:57789 (size: 19.5 KB, free: 530.0 MB)
18/05/06 05:32:33 INFO spark.SparkContext: Created broadcast 5 from textFile at <console>:27
rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at textFile at <console>:27scala> rdd.first
18/05/06 05:32:36 INFO mapred.FileInputFormat: Total input paths to process : 1
18/05/06 05:32:39 INFO spark.SparkContext: Starting job: first at <console>:30
18/05/06 05:32:39 INFO scheduler.DAGScheduler: Got job 2 (first at <console>:30) with 1 output partitions
18/05/06 05:32:39 INFO scheduler.DAGScheduler: Final stage: ResultStage 4(first at <console>:30)
18/05/06 05:32:39 INFO scheduler.DAGScheduler: Parents of final stage: List()
18/05/06 05:32:39 INFO scheduler.DAGScheduler: Missing parents: List()
18/05/06 05:32:39 INFO scheduler.DAGScheduler: Submitting ResultStage 4 (MapPartitionsRDD[11] at textFile at <console>:27), which has no missing parents
18/05/06 05:32:39 INFO storage.MemoryStore: ensureFreeSpace(3152) called with curMem=319854, maxMem=555755765
18/05/06 05:32:39 INFO storage.MemoryStore: Block broadcast_6 stored as values in memory (estimated size 3.1 KB, free 529.7 MB)
18/05/06 05:32:39 INFO storage.MemoryStore: ensureFreeSpace(1827) called with curMem=323006, maxMem=555755765
18/05/06 05:32:39 INFO storage.MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 1827.0 B, free 529.7 MB)
18/05/06 05:32:39 INFO storage.BlockManagerInfo: Added broadcast_6_piece0 in memory on 192.168.239.129:57789 (size: 1827.0 B, free: 530.0 MB)
18/05/06 05:32:39 INFO spark.SparkContext: Created broadcast 6 from broadcast at DAGScheduler.scala:861
18/05/06 05:32:39 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 4 (MapPartitionsRDD[11] at textFile at <console>:27)
18/05/06 05:32:39 INFO scheduler.TaskSchedulerImpl: Adding task set 4.0 with 1 tasks
18/05/06 05:32:39 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 4.0 (TID 24, 192.168.239.131, ANY, 2146 bytes)
18/05/06 05:32:46 INFO storage.BlockManagerInfo: Added broadcast_6_piece0 in memory on 192.168.239.131:43057 (size: 1827.0 B, free: 534.5 MB)
18/05/06 05:32:48 INFO storage.BlockManagerInfo: Added broadcast_5_piece0 in memory on 192.168.239.131:43057 (size: 19.5 KB, free: 534.5 MB)
18/05/06 05:33:01 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 4.0 (TID 24) in 21657 ms on 192.168.239.131 (1/1)
18/05/06 05:33:01 INFO scheduler.DAGScheduler: ResultStage 4 (first at <console>:30) finished in 21.703 s
18/05/06 05:33:01 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool
18/05/06 05:33:01 INFO scheduler.DAGScheduler: Job 2 finished: first at <console>:30, took 22.290016 s
res8: String = 825.0   1   135.00  3   2scala> val rdd2 = rdd.map{ x=>x.split("\\t") }
rdd2: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[12] at map at <console>:29scala> rdd2.collect
18/05/06 05:35:56 INFO spark.SparkContext: Starting job: collect at <console>:32
18/05/06 05:35:56 INFO scheduler.DAGScheduler: Got job 3 (collect at <console>:32) with 2 output partitions
18/05/06 05:35:56 INFO scheduler.DAGScheduler: Final stage: ResultStage 5(collect at <console>:32)
18/05/06 05:35:56 INFO scheduler.DAGScheduler: Parents of final stage: List()
18/05/06 05:35:56 INFO scheduler.DAGScheduler: Missing parents: List()
18/05/06 05:35:56 INFO scheduler.DAGScheduler: Submitting ResultStage 5 (MapPartitionsRDD[12] at map at <console>:29), which has no missing parents
18/05/06 05:35:56 INFO storage.MemoryStore: ensureFreeSpace(3312) called with curMem=324833, maxMem=555755765
18/05/06 05:35:56 INFO storage.MemoryStore: Block broadcast_7 stored as values in memory (estimated size 3.2 KB, free 529.7 MB)
18/05/06 05:35:56 INFO storage.MemoryStore: ensureFreeSpace(1889) called with curMem=328145, maxMem=555755765
18/05/06 05:35:56 INFO storage.MemoryStore: Block broadcast_7_piece0 stored as bytes in memory (estimated size 1889.0 B, free 529.7 MB)
18/05/06 05:35:56 INFO storage.BlockManagerInfo: Added broadcast_7_piece0 in memory on 192.168.239.129:57789 (size: 1889.0 B, free: 530.0 MB)
18/05/06 05:35:56 INFO spark.SparkContext: Created broadcast 7 from broadcast at DAGScheduler.scala:861
18/05/06 05:35:56 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 5 (MapPartitionsRDD[12] at map at <console>:29)
18/05/06 05:35:56 INFO scheduler.TaskSchedulerImpl: Adding task set 5.0 with 2 tasks
18/05/06 05:35:56 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 5.0 (TID 25, 192.168.239.144, ANY, 2146 bytes)
18/05/06 05:35:56 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 5.0 (TID 26, 192.168.239.130, ANY, 2146 bytes)
18/05/06 05:36:07 INFO storage.BlockManagerInfo: Added broadcast_7_piece0 in memory on 192.168.239.130:53716 (size: 1889.0 B, free: 534.5 MB)
18/05/06 05:36:11 INFO storage.BlockManagerInfo: Added broadcast_7_piece0 in memory on 192.168.239.144:43403 (size: 1889.0 B, free: 534.5 MB)
18/05/06 05:36:13 INFO storage.BlockManagerInfo: Added broadcast_5_piece0 in memory on 192.168.239.130:53716 (size: 19.5 KB, free: 534.5 MB)
18/05/06 05:36:19 INFO storage.BlockManagerInfo: Added broadcast_5_piece0 in memory on 192.168.239.144:43403 (size: 19.5 KB, free: 534.5 MB)
18/05/06 05:36:40 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 5.0 (TID 26) in 43728 ms on 192.168.239.130 (1/2)
18/05/06 05:36:43 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 5.0 (TID 25) in 46563 ms on 192.168.239.144 (2/2)
18/05/06 05:36:43 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 5.0, whose tasks have all completed, from pool
18/05/06 05:36:43 INFO scheduler.DAGScheduler: ResultStage 5 (collect at <console>:32) finished in 46.583 s
18/05/06 05:36:43 INFO scheduler.DAGScheduler: Job 3 finished: collect at <console>:32, took 46.599154 s
res9: Array[Array[String]] = Array(Array(825.0, 1, 135.00, 3, 2), Array(997.5, 1, 133.00, 3, 2), Array(1005.0, 1, 134.00, 3, 2), Array(384.00, 1, 64.00, 3, 2), Array(270.00, 1, 45.00, 2, 1), Array(459.6, 1, 76.00, 1, 1), Array(388.8, 1, 64.8, 1, 1), Array(713.4, 1, 118.9, 1, 1), Array(218.4, 1, 39.0, 1, 1), Array(1145.5, 1, 145.0, 3, 2), Array(1864.4, 1, 236.00, 4, 2), Array(539.0, 1, 77.00, 2, 2), Array(679.0, 1, 97.00, 3, 2), Array(756.00, 1, 108.00, 3, 2), Array(784.00, 1, 112.00, 3, 2), Array(487.5, 1, 75.0, 1, 2), Array(780.0, 1, 120.00, 2, 2), Array(780.0, 1, 120.00, 2, 2), Array(780.0, 1, 120.00, 3, 2), Array(995.5, 1, 147.0, 3, 2), Array(1072.5, 1, 165.00, 3, 2), Array(1072.5, 1, 165.00, 3, 2))scala> val rdd3 = rdd2.map{ x=>LabeledPoint(x(0).toDouble, Vectors.dense(x(1).toDouble, x(2).toDouble)) }
rdd3: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[13] at map at <console>:31scala> val model = LinearRegressionWithSGD.train(rdd3, 1000, 0.0001)
18/05/06 05:39:15 INFO spark.SparkContext: Starting job: first at GeneralizedLinearAlgorithm.scala:206
18/05/06 05:39:15 INFO scheduler.DAGScheduler: Got job 4 (first at GeneralizedLinearAlgorithm.scala:206) with 1 output partitions
18/05/06 05:39:15 INFO scheduler.DAGScheduler: Final stage: ResultStage 6(first at GeneralizedLinearAlgorithm.scala:206)
18/05/06 05:39:15 INFO scheduler.DAGScheduler: Parents of final stage: List()
18/05/06 05:39:15 INFO scheduler.DAGScheduler: Missing parents: List()
18/05/06 05:39:15 INFO scheduler.DAGScheduler: Submitting ResultStage 6 (MapPartitionsRDD[14] at map at GeneralizedLinearAlgorithm.scala:206), which has no missing parents
18/05/06 05:39:15 INFO storage.MemoryStore: ensureFreeSpace(3872) called with curMem=330034, maxMem=555755765
18/05/06 05:39:15 INFO storage.MemoryStore: Block broadcast_8 stored as values in memory (estimated size 3.8 KB, free 529.7 MB)
18/05/06 05:39:15 INFO storage.MemoryStore: ensureFreeSpace(2191) called with curMem=333906, maxMem=555755765
18/05/06 05:39:15 INFO storage.MemoryStore: Block broadcast_8_piece0 stored as bytes in memory (estimated size 2.1 KB, free 529.7 MB)
18/05/06 05:39:15 INFO storage.BlockManagerInfo: Added broadcast_8_piece0 in memory on 192.168.239.129:57789 (size: 2.1 KB, free: 530.0 MB)
18/05/06 05:39:15 INFO spark.SparkContext: Created broadcast 8 from broadcast at DAGScheduler.scala:861
18/05/06 05:39:15 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 6 (MapPartitionsRDD[14] at map at GeneralizedLinearAlgorithm.scala:206)
18/05/06 05:39:15 INFO scheduler.TaskSchedulerImpl: Adding task set 6.0 with 1 tasks
18/05/06 05:39:15 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 6.0 (TID 27, 192.168.239.144, ANY, 2146 bytes)
18/05/06 05:39:15 INFO storage.BlockManagerInfo: Added broadcast_8_piece0 in memory on 192.168.239.144:43403 (size: 2.1 KB, free: 534.5 MB)
18/05/06 05:39:16 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 6.0 (TID 27) in 1137 ms on 192.168.239.144 (1/1)
18/05/06 05:39:16 INFO scheduler.DAGScheduler: ResultStage 6 (first at GeneralizedLinearAlgorithm.scala:206) finished in 1.145 s
18/05/06 05:39:16 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 6.0, whose tasks have all completed, from pool
18/05/06 05:39:16 INFO scheduler.DAGScheduler: Job 4 finished: first at GeneralizedLinearAlgorithm.scala:206, took 1.172036 s
18/05/06 05:39:16 WARN regression.LinearRegressionWithSGD: The input data is not directly cached, which may hurt performance if its parent RDDs are also uncached.
18/05/06 05:39:16 INFO spark.SparkContext: Starting job: count at GradientDescent.scala:195
18/05/06 05:39:16 INFO scheduler.DAGScheduler: Got job 5 (count at GradientDescent.scala:195) with 2 output partitions
18/05/06 05:39:16 INFO scheduler.DAGScheduler: Final stage: ResultStage 7(count at GradientDescent.scala:195)
18/05/06 05:39:16 INFO scheduler.DAGScheduler: Parents of final stage: List()
18/05/06 05:39:16 INFO scheduler.DAGScheduler: Missing parents: List()
18/05/06 05:39:16 INFO scheduler.DAGScheduler: Submitting ResultStage 7 (MapPartitionsRDD[15] at map at GeneralizedLinearAlgorithm.scala:292), which has no missing parents
18/05/06 05:39:16 INFO storage.MemoryStore: ensureFreeSpace(3584) called with curMem=336097, maxMem=555755765
18/05/06 05:39:16 INFO storage.MemoryStore: Block broadcast_9 stored as values in memory (estimated size 3.5 KB, free 529.7 MB)
18/05/06 05:39:17 INFO storage.MemoryStore: ensureFreeSpace(2030) called with curMem=339681, maxMem=555755765
18/05/06 05:39:17 INFO storage.MemoryStore: Block broadcast_9_piece0 stored as bytes in memory (estimated size 2030.0 B, free 529.7 MB)
18/05/06 05:39:17 INFO storage.BlockManagerInfo: Added broadcast_9_piece0 in memory on 192.168.239.129:57789 (size: 2030.0 B, free: 530.0 MB)
18/05/06 05:39:17 INFO spark.SparkContext: Created broadcast 9 from broadcast at DAGScheduler.scala:861
18/05/06 05:39:17 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 7 (MapPartitionsRDD[15] at map at GeneralizedLinearAlgorithm.scala:292)
18/05/06 05:39:17 INFO scheduler.TaskSchedulerImpl: Adding task set 7.0 with 2 tasks
18/05/06 05:39:17 INFO spark.ContextCleaner: Cleaned accumulator 5
18/05/06 05:39:17 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 7.0 (TID 28, 192.168.239.144, ANY, 2146 bytes)
18/05/06 05:39:17 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 7.0 (TID 29, 192.168.239.130, ANY, 2146 bytes)
18/05/06 05:39:18 INFO storage.BlockManagerInfo: Removed broadcast_8_piece0 on 192.168.239.129:57789 in memory (size: 2.1 KB, free: 530.0 MB)
18/05/06 05:39:18 INFO storage.BlockManagerInfo: Added broadcast_9_piece0 in memory on 192.168.239.144:43403 (size: 2030.0 B, free: 534.5 MB)
18/05/06 05:39:19 INFO storage.BlockManagerInfo: Added broadcast_9_piece0 in memory on 192.168.239.130:53716 (size: 2030.0 B, free: 534.5 MB)
18/05/06 05:39:19 INFO storage.BlockManagerInfo: Removed broadcast_8_piece0 on 192.168.239.144:43403 in memory (size: 2.1 KB, free: 534.5 MB)
18/05/06 05:39:19 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 7.0 (TID 28) in 1412 ms on 192.168.239.144 (1/2)
18/05/06 05:39:19 INFO spark.ContextCleaner: Cleaned accumulator 7
18/05/06 05:39:19 INFO storage.BlockManagerInfo: Removed broadcast_7_piece0 on 192.168.239.129:57789 in memory (size: 1889.0 B, free: 530.0 MB)
18/05/06 05:39:19 INFO storage.BlockManagerInfo: Removed broadcast_7_piece0 on 192.168.239.144:43403 in memory (size: 1889.0 B, free: 534.5 MB)
18/05/06 05:39:19 INFO storage.BlockManagerInfo: Removed broadcast_7_piece0 on 192.168.239.130:53716 in memory (size: 1889.0 B, free: 534.5 MB)
18/05/06 05:39:19 INFO spark.ContextCleaner: Cleaned accumulator 6
18/05/06 05:39:19 INFO storage.BlockManagerInfo: Removed broadcast_6_piece0 on 192.168.239.129:57789 in memory (size: 1827.0 B, free: 530.0 MB)
18/05/06 05:39:19 INFO storage.BlockManagerInfo: Removed broadcast_6_piece0 on 192.168.239.131:43057 in memory (size: 1827.0 B, free: 534.5 MB)
18/05/06 05:39:21 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 7.0 (TID 29) in 3087 ms on 192.168.239.130 (2/2)
18/05/06 05:39:21 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 7.0, whose tasks have all completed, from pool
18/05/06 05:39:21 INFO scheduler.DAGScheduler: ResultStage 7 (count at GradientDescent.scala:195) finished in 3.097 s
18/05/06 05:39:21 INFO scheduler.DAGScheduler: Job 5 finished: count at GradientDescent.scala:195, took 4.086191 s
18/05/06 05:39:26 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
18/05/06 05:39:26 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
18/05/06 05:39:26 INFO storage.MemoryStore: ensureFreeSpace(72) called with curMem=325468, maxMem=555755765
18/05/06 05:39:26 INFO storage.MemoryStore: Block broadcast_10 stored as values in memory (estimated size 72.0 B, free 529.7 MB)
18/05/06 05:39:26 INFO storage.MemoryStore: ensureFreeSpace(138) called with curMem=325540, maxMem=555755765
18/05/06 05:39:26 INFO storage.MemoryStore: Block broadcast_10_piece0 stored as bytes in memory (estimated size 138.0 B, free 529.7 MB)
18/05/06 05:39:26 INFO storage.BlockManagerInfo: Added broadcast_10_piece0 in memory on 192.168.239.129:57789 (size: 138.0 B, free: 530.0 MB)
18/05/06 05:39:26 INFO spark.SparkContext: Created broadcast 10 from broadcast at GradientDescent.scala:221
18/05/06 05:39:27 INFO spark.SparkContext: Starting job: treeAggregate at GradientDescent.scala:225
18/05/06 05:39:27 INFO scheduler.DAGScheduler: Got job 6 (treeAggregate at GradientDescent.scala:225) with 2 output partitions
18/05/06 05:39:27 INFO scheduler.DAGScheduler: Final stage: ResultStage 8(treeAggregate at GradientDescent.scala:225)
18/05/06 05:39:27 INFO scheduler.DAGScheduler: Parents of final stage: List()
18/05/06 05:39:27 INFO scheduler.DAGScheduler: Missing parents: List()
18/05/06 05:39:27 INFO scheduler.DAGScheduler: Submitting ResultStage 8 (MapPartitionsRDD[17] at treeAggregate at GradientDescent.scala:225), which has no missing parents
18/05/06 05:39:27 INFO storage.MemoryStore: ensureFreeSpace(5656) called with curMem=325678, maxMem=555755765
18/05/06 05:39:27 INFO storage.MemoryStore: Block broadcast_11 stored as values in memory (estimated size 5.5 KB, free 529.7 MB)
18/05/06 05:39:27 INFO storage.MemoryStore: ensureFreeSpace(2995) called with curMem=331334, maxMem=555755765
18/05/06 05:39:27 INFO storage.MemoryStore: Block broadcast_11_piece0 stored as bytes in memory (estimated size 2.9 KB, free 529.7 MB)
18/05/06 05:39:27 INFO storage.BlockManagerInfo: Added broadcast_11_piece0 in memory on 192.168.239.129:57789 (size: 2.9 KB, free: 530.0 MB)
18/05/06 05:39:27 INFO spark.SparkContext: Created broadcast 11 from broadcast at DAGScheduler.scala:861
18/05/06 05:39:27 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 8 (MapPartitionsRDD[17] at treeAggregate at GradientDescent.scala:225)
18/05/06 05:39:27 INFO scheduler.TaskSchedulerImpl: Adding task set 8.0 with 2 tasks
18/05/06 05:39:27 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 8.0 (TID 30, 192.168.239.131, ANY, 2255 bytes)
18/05/06 05:39:27 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 8.0 (TID 31, 192.168.239.144, ANY, 2255 bytes)
18/05/06 05:39:27 INFO storage.BlockManagerInfo: Added broadcast_11_piece0 in memory on 192.168.239.144:43403 (size: 2.9 KB, free: 534.5 MB)
18/05/06 05:39:28 INFO storage.BlockManagerInfo: Added broadcast_11_piece0 in memory on 192.168.239.131:43057 (size: 2.9 KB, free: 534.5 MB)
18/05/06 05:39:44 INFO storage.BlockManagerInfo: Added broadcast_10_piece0 in memory on 192.168.239.144:43403 (size: 138.0 B, free: 534.5 MB)
18/05/06 05:39:45 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 8.0 (TID 31) in 17678 ms on 192.168.239.144 (1/2)
18/05/06 05:39:47 INFO storage.BlockManagerInfo: Added broadcast_10_piece0 in memory on 192.168.239.131:43057 (size: 138.0 B, free: 534.5 MB)
18/05/06 05:39:48 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 8.0 (TID 30) in 21035 ms on 192.168.239.131 (2/2)
18/05/06 05:39:48 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 8.0, whose tasks have all completed, from pool
18/05/06 05:39:48 INFO scheduler.DAGScheduler: ResultStage 8 (treeAggregate at GradientDescent.scala:225) finished in 21.034 s
18/05/06 05:39:48 INFO scheduler.DAGScheduler: Job 6 finished: treeAggregate at GradientDescent.scala:225, took 21.088037 s
18/05/06 05:39:48 INFO storage.MemoryStore: ensureFreeSpace(72) called with curMem=334329, maxMem=555755765
18/05/06 05:39:48 INFO storage.MemoryStore: Block broadcast_12 stored as values in memory (estimated size 72.0 B, free 529.7 MB)
18/05/06 05:39:48 INFO storage.MemoryStore: ensureFreeSpace(138) called with curMem=334401, maxMem=555755765
18/05/06 05:39:48 INFO storage.MemoryStore: Block broadcast_12_piece0 stored as bytes in memory (estimated size 138.0 B, free 529.7 MB)
18/05/06 05:39:48 INFO storage.BlockManagerInfo: Added broadcast_12_piece0 in memory on 192.168.239.129:57789 (size: 138.0 B, free: 530.0 MB)
18/05/06 05:39:48 INFO spark.SparkContext: Created broadcast 12 from broadcast at GradientDescent.scala:221
18/05/06 05:39:48 INFO spark.SparkContext: Starting job: treeAggregate at GradientDescent.scala:225
18/05/06 05:39:48 INFO scheduler.DAGScheduler: Got job 7 (treeAggregate at GradientDescent.scala:225) with 2 output partitions
18/05/06 05:39:48 INFO scheduler.DAGScheduler: Final stage: ResultStage 9(treeAggregate at GradientDescent.scala:225)
18/05/06 05:39:48 INFO scheduler.DAGScheduler: Parents of final stage: List()
18/05/06 05:39:48 INFO scheduler.DAGScheduler: Missing parents: List()
18/05/06 05:39:48 INFO scheduler.DAGScheduler: Submitting ResultStage 9 (MapPartitionsRDD[19] at treeAggregate at GradientDescent.scala:225), which has no missing parents
18/05/06 05:39:48 INFO storage.MemoryStore: ensureFreeSpace(5656) called with curMem=334539, maxMem=555755765
18/05/06 05:39:48 INFO storage.MemoryStore: Block broadcast_13 stored as values in memory (estimated size 5.5 KB, free 529.7 MB)
18/05/06 05:39:48 INFO storage.MemoryStore: ensureFreeSpace(2996) called with curMem=340195, maxMem=555755765
18/05/06 05:39:48 INFO storage.MemoryStore: Block broadcast_13_piece0 stored as bytes in memory (estimated size 2.9 KB, free 529.7 MB)
18/05/06 05:39:48 INFO storage.BlockManagerInfo: Added broadcast_13_piece0 in memory on 192.168.239.129:57789 (size: 2.9 KB, free: 530.0 MB)
18/05/06 05:39:48 INFO spark.SparkContext: Created broadcast 13 from broadcast at DAGScheduler.scala:861
18/05/06 05:39:48 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 9 (MapPartitionsRDD[19] at treeAggregate at GradientDescent.scala:225)
18/05/06 05:39:48 INFO scheduler.TaskSchedulerImpl: Adding task set 9.0 with 2 tasks
18/05/06 05:39:48 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 9.0 (TID 32, 192.168.239.130, ANY, 2255 bytes)
18/05/06 05:39:48 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 9.0 (TID 33, 192.168.239.144, ANY, 2255 bytes)
18/05/06 05:39:49 INFO storage.BlockManagerInfo: Added broadcast_13_piece0 in memory on 192.168.239.130:53716 (size: 2.9 KB, free: 534.5 MB)
18/05/06 05:39:49 INFO storage.BlockManagerInfo: Added broadcast_13_piece0 in memory on 192.168.239.144:43403 (size: 2.9 KB, free: 534.5 MB)
18/05/06 05:39:49 INFO storage.BlockManagerInfo: Added broadcast_12_piece0 in memory on 192.168.239.144:43403 (size: 138.0 B, free: 534.5 MB)
18/05/06 05:39:49 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 9.0 (TID 33) in 1028 ms on 192.168.239.144 (1/2)
18/05/06 05:39:57 INFO storage.BlockManagerInfo: Added broadcast_12_piece0 in memory on 192.168.239.130:53716 (size: 138.0 B, free: 534.5 MB)
18/05/06 05:39:57 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 9.0 (TID 32) in 9286 ms on 192.168.239.130 (2/2)
18/05/06 05:39:57 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 9.0, whose tasks have all completed, from pool
18/05/06 05:39:57 INFO scheduler.DAGScheduler: ResultStage 9 (treeAggregate at GradientDescent.scala:225) finished in 9.342 s
18/05/06 05:39:57 INFO scheduler.DAGScheduler: Job 7 finished: treeAggregate at GradientDescent.scala:225, took 9.356678 s
18/05/06 05:39:58 INFO storage.MemoryStore: ensureFreeSpace(72) called with curMem=343191, maxMem=555755765
18/05/06 05:39:58 INFO storage.MemoryStore: Block broadcast_14 stored as values in memory (estimated size 72.0 B, free 529.7 MB)
18/05/06 05:39:58 INFO storage.MemoryStore: ensureFreeSpace(138) called with curMem=343263, maxMem=555755765
18/05/06 05:39:58 INFO storage.MemoryStore: Block broadcast_14_piece0 stored as bytes in memory (estimated size 138.0 B, free 529.7 MB)
18/05/06 05:39:58 INFO storage.BlockManagerInfo: Added broadcast_14_piece0 in memory on 192.168.239.129:57789 (size: 138.0 B, free: 530.0 MB)
18/05/06 05:39:58 INFO spark.SparkContext: Created broadcast 14 from broadcast at GradientDescent.scala:221
18/05/06 05:39:58 INFO spark.SparkContext: Starting job: treeAggregate at GradientDescent.scala:225
18/05/06 05:39:58 INFO scheduler.DAGScheduler: Got job 8 (treeAggregate at GradientDescent.scala:225) with 2 output partitions
18/05/06 05:39:58 INFO scheduler.DAGScheduler: Final stage: ResultStage 10(treeAggregate at GradientDescent.scala:225)
18/05/06 05:39:58 INFO scheduler.DAGScheduler: Parents of final stage: List()
18/05/06 05:39:58 INFO scheduler.DAGScheduler: Missing parents: List()
18/05/06 05:39:58 INFO scheduler.DAGScheduler: Submitting ResultStage 10 (MapPartitionsRDD[21] at treeAggregate at GradientDescent.scala:225), which has no missing parents
18/05/06 05:39:58 INFO storage.MemoryStore: ensureFreeSpace(5656) called with curMem=343401, maxMem=555755765
18/05/06 05:39:58 INFO storage.MemoryStore: Block broadcast_15 stored as values in memory (estimated size 5.5 KB, free 529.7 MB)
18/05/06 05:39:58 INFO storage.MemoryStore: ensureFreeSpace(2995) called with curMem=349057, maxMem=555755765
18/05/06 05:39:58 INFO storage.MemoryStore: Block broadcast_15_piece0 stored as bytes in memory (estimated size 2.9 KB, free 529.7 MB)
18/05/06 05:39:58 INFO storage.BlockManagerInfo: Added broadcast_15_piece0 in memory on 192.168.239.129:57789 (size: 2.9 KB, free: 530.0 MB)
18/05/06 05:39:58 INFO spark.SparkContext: Created broadcast 15 from broadcast at DAGScheduler.scala:861
18/05/06 05:39:58 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 10 (MapPartitionsRDD[21] at treeAggregate at GradientDescent.scala:225)
18/05/06 05:39:58 INFO scheduler.TaskSchedulerImpl: Adding task set 10.0 with 2 tasks
18/05/06 05:39:58 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 10.0 (TID 34, 192.168.239.130, ANY, 2255 bytes)
18/05/06 05:39:58 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 10.0 (TID 35, 192.168.239.144, ANY, 2255 bytes)
18/05/06 05:39:58 INFO storage.BlockManagerInfo: Added broadcast_15_piece0 in memory on 192.168.239.130:53716 (size: 2.9 KB, free: 534.5 MB)
18/05/06 05:39:58 INFO storage.BlockManagerInfo: Added broadcast_15_piece0 in memory on 192.168.239.144:43403 (size: 2.9 KB, free: 534.5 MB)
18/05/06 05:39:58 INFO storage.BlockManagerInfo: Added broadcast_14_piece0 in memory on 192.168.239.130:53716 (size: 138.0 B, free: 534.5 MB)
18/05/06 05:39:58 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 10.0 (TID 34) in 383 ms on 192.168.239.130 (1/2)
18/05/06 05:39:58 INFO storage.BlockManagerInfo: Added broadcast_14_piece0 in memory on 192.168.239.144:43403 (size: 138.0 B, free: 534.5 MB)
18/05/06 05:39:58 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 10.0 (TID 35) in 545 ms on 192.168.239.144 (2/2)
18/05/06 05:39:58 INFO scheduler.DAGScheduler: ResultStage 10 (treeAggregate at GradientDescent.scala:225) finished in 0.546 s
18/05/06 05:39:58 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 10.0, whose tasks have all completed, from pool
18/05/06 05:39:58 INFO scheduler.DAGScheduler: Job 8 finished: treeAggregate at GradientDescent.scala:225, took 0.556249 s
18/05/06 05:39:58 INFO storage.MemoryStore: ensureFreeSpace(72) called with curMem=352052, maxMem=555755765
18/05/06 05:39:58 INFO storage.MemoryStore: Block broadcast_16 stored as values in memory (estimated size 72.0 B, free 529.7 MB)
18/05/06 05:39:58 INFO storage.MemoryStore: ensureFreeSpace(138) called with curMem=352124, maxMem=555755765
18/05/06 05:39:58 INFO storage.MemoryStore: Block broadcast_16_piece0 stored as bytes in memory (estimated size 138.0 B, free 529.7 MB)
18/05/06 05:39:58 INFO storage.BlockManagerInfo: Added broadcast_16_piece0 in memory on 192.168.239.129:57789 (size: 138.0 B, free: 530.0 MB)
18/05/06 05:39:58 INFO spark.SparkContext: Created broadcast 16 from broadcast at GradientDescent.scala:221
18/05/06 05:39:58 INFO spark.SparkContext: Starting job: treeAggregate at GradientDescent.scala:225
18/05/06 05:39:58 INFO scheduler.DAGScheduler: Got job 9 (treeAggregate at GradientDescent.scala:225) with 2 output partitions
18/05/06 05:39:58 INFO scheduler.DAGScheduler: Final stage: ResultStage 11(treeAggregate at GradientDescent.scala:225)
18/05/06 05:39:58 INFO scheduler.DAGScheduler: Parents of final stage: List()
18/05/06 05:39:58 INFO scheduler.DAGScheduler: Missing parents: List()
18/05/06 05:39:58 INFO scheduler.DAGScheduler: Submitting ResultStage 11 (MapPartitionsRDD[23] at treeAggregate at GradientDescent.scala:225), which has no missing parents
18/05/06 05:39:58 INFO storage.MemoryStore: ensureFreeSpace(5656) called with curMem=352262, maxMem=555755765
18/05/06 05:39:58 INFO storage.MemoryStore: Block broadcast_17 stored as values in memory (estimated size 5.5 KB, free 529.7 MB)
18/05/06 05:39:58 INFO storage.MemoryStore: ensureFreeSpace(2997) called with curMem=357918, maxMem=555755765
18/05/06 05:39:58 INFO storage.MemoryStore: Block broadcast_17_piece0 stored as bytes in memory (estimated size 2.9 KB, free 529.7 MB)
18/05/06 05:39:58 INFO storage.BlockManagerInfo: Added broadcast_17_piece0 in memory on 192.168.239.129:57789 (size: 2.9 KB, free: 530.0 MB)
18/05/06 05:39:58 INFO spark.SparkContext: Created broadcast 17 from broadcast at DAGScheduler.scala:861
18/05/06 05:39:58 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 11 (MapPartitionsRDD[23] at treeAggregate at GradientDescent.scala:225)
18/05/06 05:39:58 INFO scheduler.TaskSchedulerImpl: Adding task set 11.0 with 2 tasks
18/05/06 05:39:58 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 11.0 (TID 36, 192.168.239.144, ANY, 2255 bytes)
18/05/06 05:39:58 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 11.0 (TID 37, 192.168.239.131, ANY, 2255 bytes)
18/05/06 05:39:58 INFO storage.BlockManagerInfo: Added broadcast_17_piece0 in memory on 192.168.239.144:43403 (size: 2.9 KB, free: 534.5 MB)
18/05/06 05:39:58 INFO storage.BlockManagerInfo: Added broadcast_17_piece0 in memory on 192.168.239.131:43057 (size: 2.9 KB, free: 534.5 MB)
18/05/06 05:39:58 INFO storage.BlockManagerInfo: Added broadcast_16_piece0 in memory on 192.168.239.144:43403 (size: 138.0 B, free: 534.5 MB)
18/05/06 05:39:58 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 11.0 (TID 36) in 275 ms on 192.168.239.144 (1/2)
18/05/06 05:39:59 INFO storage.BlockManagerInfo: Added broadcast_16_piece0 in memory on 192.168.239.131:43057 (size: 138.0 B, free: 534.5 MB)
18/05/06 05:39:59 INFO scheduler.DAGScheduler: ResultStage 11 (treeAggregate at GradientDescent.scala:225) finished in 0.375 s
18/05/06 05:39:59 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 11.0 (TID 37) in 375 ms on 192.168.239.131 (2/2)
18/05/06 05:39:59 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 11.0, whose tasks have all completed, from pool
18/05/06 05:39:59 INFO scheduler.DAGScheduler: Job 9 finished: treeAggregate at GradientDescent.scala:225, took 0.406524 s
18/05/06 05:39:59 INFO storage.MemoryStore: ensureFreeSpace(72) called with curMem=360915, maxMem=555755765
18/05/06 05:39:59 INFO storage.MemoryStore: Block broadcast_18 stored as values in memory (estimated size 72.0 B, free 529.7 MB)
18/05/06 05:39:59 INFO storage.MemoryStore: ensureFreeSpace(138) called with curMem=360987, maxMem=555755765
18/05/06 05:39:59 INFO storage.MemoryStore: Block broadcast_18_piece0 stored as bytes in memory (estimated size 138.0 B, free 529.7 MB)
18/05/06 05:39:59 INFO storage.BlockManagerInfo: Added broadcast_18_piece0 in memory on 192.168.239.129:57789 (size: 138.0 B, free: 530.0 MB)
18/05/06 05:39:59 INFO spark.SparkContext: Created broadcast 18 from broadcast at GradientDescent.scala:221
18/05/06 05:39:59 INFO spark.SparkContext: Starting job: treeAggregate at GradientDescent.scala:225
18/05/06 05:39:59 INFO scheduler.DAGScheduler: Got job 10 (treeAggregate at GradientDescent.scala:225) with 2 output partitions
18/05/06 05:39:59 INFO scheduler.DAGScheduler: Final stage: ResultStage 12(treeAggregate at GradientDescent.scala:225)
18/05/06 05:39:59 INFO scheduler.DAGScheduler: Parents of final stage: List()
18/05/06 05:39:59 INFO scheduler.DAGScheduler: Missing parents: List()
18/05/06 05:39:59 INFO scheduler.DAGScheduler: Submitting ResultStage 12 (MapPartitionsRDD[25] at treeAggregate at GradientDescent.scala:225), which has no missing parents
18/05/06 05:39:59 INFO storage.MemoryStore: ensureFreeSpace(5656) called with curMem=361125, maxMem=555755765
18/05/06 05:39:59 INFO storage.MemoryStore: Block broadcast_19 stored as values in memory (estimated size 5.5 KB, free 529.7 MB)
18/05/06 05:39:59 INFO storage.MemoryStore: ensureFreeSpace(2996) called with curMem=366781, maxMem=555755765
18/05/06 05:39:59 INFO storage.MemoryStore: Block broadcast_19_piece0 stored as bytes in memory (estimated size 2.9 KB, free 529.7 MB)
18/05/06 05:39:59 INFO storage.BlockManagerInfo: Added broadcast_19_piece0 in memory on 192.168.239.129:57789 (size: 2.9 KB, free: 530.0 MB)
18/05/06 05:39:59 INFO spark.SparkContext: Created broadcast 19 from broadcast at DAGScheduler.scala:861
18/05/06 05:39:59 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 12 (MapPartitionsRDD[25] at treeAggregate at GradientDescent.scala:225)
18/05/06 05:39:59 INFO scheduler.TaskSchedulerImpl: Adding task set 12.0 with 2 tasks
18/05/06 05:39:59 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 12.0 (TID 38, 192.168.239.131, ANY, 2255 bytes)
18/05/06 05:39:59 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 12.0 (TID 39, 192.168.239.130, ANY, 2255 bytes)
18/05/06 05:39:59 INFO storage.BlockManagerInfo: Added broadcast_19_piece0 in memory on 192.168.239.131:43057 (size: 2.9 KB, free: 534.5 MB)
18/05/06 05:39:59 INFO storage.BlockManagerInfo: Added broadcast_19_piece0 in memory on 192.168.239.130:53716 (size: 2.9 KB, free: 534.5 MB)
18/05/06 05:39:59 INFO storage.BlockManagerInfo: Added broadcast_18_piece0 in memory on 192.168.239.131:43057 (size: 138.0 B, free: 534.5 MB)
18/05/06 05:39:59 INFO storage.BlockManagerInfo: Added broadcast_18_piece0 in memory on 192.168.239.130:53716 (size: 138.0 B, free: 534.5 MB)
18/05/06 05:39:59 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 12.0 (TID 39) in 594 ms on 192.168.239.130 (1/2)
18/05/06 05:39:59 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 12.0 (TID 38) in 719 ms on 192.168.239.131 (2/2)
18/05/06 05:39:59 INFO scheduler.DAGScheduler: ResultStage 12 (treeAggregate at GradientDescent.scala:225) finished in 0.717 s
18/05/06 05:39:59 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 12.0, whose tasks have all completed, from pool
18/05/06 05:39:59 INFO scheduler.DAGScheduler: Job 10 finished: treeAggregate at GradientDescent.scala:225, took 0.890308 s
18/05/06 05:39:59 INFO optimization.GradientDescent: GradientDescent.runMiniBatchSGD finished. Last 10 stochastic losses 361485.19500000007, 87684.419014825, 3489.5361094534996, 3279.8174503807445, 3275.7530596492475
18/05/06 05:39:59 WARN regression.LinearRegressionWithSGD: The input data was not directly cached, which may hurt performance if its parent RDDs are also uncached.
model: org.apache.spark.mllib.regression.LinearRegressionModel = org.apache.spark.mllib.regression.LinearRegressionModel: intercept = 0.0, numFeatures = 2scala> model.weights
res10: org.apache.spark.mllib.linalg.Vector = [0.04801929334645109,6.942515869951296]scala> model.predict(Vectors.dense(1,80))
res11: Double = 555.4492888894501scala> model.predict(Vectors.dense(1,200))
res12: Double = 1388.5511932836057scala> 

小结:spark机器学习

1)  速度非常快

2)  spark函数库逐渐丰富

3)  数学建模过程,最重要。需要有数学知识。

spark-06:MLlib相关推荐

  1. Spark+hadoop+mllib及相关概念与操作笔记

    Spark+hadoop+mllib及相关概念与操作笔记 作者: lw 版本: 0.1 时间: 2016-07-18 1.调研相关注意事项 a) 理解调研 调研的意义在于了解当前情况,挖掘潜在的问题, ...

  2. 《Scala机器学习》一一第3章 使用Spark和MLlib

    第3章 使用Spark和MLlib 上一章介绍了在全局数据驱动的企业架构中的什么地方以及如何利用统计和机器学习来处理知识,但接下来不会介绍Spark和MLlib的具体实现,MLlib是Spark顶层的 ...

  3. Spark机器学习MLlib系列1(for python)--数据类型,向量,分布式矩阵,API

    Spark机器学习MLlib系列1(for python)--数据类型,向量,分布式矩阵,API 关键词:Local vector,Labeled point,Local matrix,Distrib ...

  4. 关于spark的mllib学习总结(Java版)

    本篇博客主要讲述如何利用spark的mliib构建机器学习模型并预测新的数据,具体的流程如下图所示:  加载数据 对于数据的加载或保存,mllib提供了MLUtils包,其作用是Helper meth ...

  5. Spark之MLlib

    目录 Part VI. Advanced Analytics and Machine Learning Advanced Analytics and Machine Learning Overview ...

  6. spark mllib源码分析之随机森林(Random Forest)

    Spark在mllib中实现了tree相关的算法,决策树DT(DecisionTree),随机森林RF(RandomForest),GBDT(Gradient Boosting Decision Tr ...

  7. Spark MLlib实现的广告点击预测–Gradient-Boosted Trees

    关键字:spark.mllib.Gradient-Boosted Trees.广告点击预测 本文尝试使用Spark提供的机器学习算法 Gradient-Boosted Trees来预测一个用户是否会点 ...

  8. Spark MLlib中的协同过滤

    本文转自http://www.tuicool.com/articles/fANvieZ,所有权力归原作者所有. 本文主要通过Spark官方的例子,理解ALS协同过滤算法的原理和编码过程. 协同过滤 协 ...

  9. Spark中组件Mllib的学习1之Kmeans错误解决

    更多代码请见:https://github.com/xubo245/SparkLearning 解决办法:(中间比较多,为了方便看到,放在最开始) txt文件格式不对,用WPS转存的是UTF-16,s ...

  10. SPARK官方实例:两种方法实现随机森林模型(ML/MLlib)

    在spark2.0以上版本中,存在两种对机器学习算法的实现库MLlib与ML,比如随机森林: org.apache.spark.mllib.tree.RandomForest 和 org.apache ...

最新文章

  1. ajax cors html,通过CORS通过Ajax请求在浏览器上设置Cookie
  2. rpm安装文件制作和使用
  3. java汽车生产工厂模式_什么是工厂模式java
  4. 数据包接收系列 — IP协议处理流程(一)
  5. GAN生成对抗网络-CycleGAN原理与基本实现-图像转换-10
  6. AIX学习之--文件系统修复(/home)
  7. 2014年前端开发者如何提升自己
  8. NSS_08 extjs表单验证
  9. (原创)攻击方式学习之(3) - 缓冲区溢出(Buffer Overflow)
  10. javascript 计算后 无聊的小数点处理
  11. android日历订阅,Android日历.
  12. 解决执行HiveSQL时,报could not be cleaned up的错误
  13. 理解Python中的RingBuffer环形缓冲区
  14. 别再让「聪明」害了00后!
  15. 【NOI2015模拟YDC】游戏
  16. 两个电脑主机共用一个显示器
  17. 地图(用ECharts绘制)
  18. 阈值法matlab程序,遗传算法优化BP神经网络权值和阈值的通用MATLAB源码
  19. mysql实现vpd_MySQL支持类似Oracle的VPD特性吗
  20. Linux--使用snap删除软件--snap uninstall--紧急部署

热门文章

  1. Java 基础:多态
  2. eclipse各版本说明
  3. Android仿QQ好友列表分组实现增删改及持久化
  4. CAD的lisp写法兰,用LISP语言编写一个cad模板
  5. 开始搞WinCE-------关于定制WinCE操作系统的大概步骤
  6. 在Apache案例中加入模板引擎
  7. Volley传入请求体不成功的解决方案
  8. python制作的简单的猜数小游戏
  9. bcrypt加密和sha256加密
  10. 通用商业画布-0408-v1.0张雅慧