Spark学习之路——8.Spark MLlib
MLlib的官网文档:
http://spark.apache.org/docs/latest/ml-guide.html
本节主要内容:
一、MLlib简述
二、基本数据类型
三、汇总统计
四、实例应用K-means算法
一、MLlib简述:
1.MLlib是什么?
MLlib是Spark的机器学习(ML)库。它的目标是让实用的机器学习变得可扩展和容易。在高层次上,它提供以下工具:
(1)ML算法:常用的学习算法,如分类、回归、聚类和协同过滤等
(2)特性分析:特征提取、变换、降维和选择
(3)管道:用于构造、评估和调优ML管道的工具
(4)持久性:保存和加载算法、模型和管道
(5)实用性:线性代数,统计,数据处理等
2.MLlib的现状:
随着Spark2.0版本,基于RDD的MLlib已经进入“维护模式”,现在Spark中主要用于机器学习的包是ML包,ML包是基于DataFrame的API(这个包将逐渐取代MLlib)。
关于ML包将会在后面总结。
二、Sparkmllib中的基本数据类型(DataType)
1.Local Vector(向量)
这里给出一个向量示例:
(1,6,0,0,0,0,7,1,0,0,1,0)
引入下面的包:
import org.apache.spark.mllib.linalg.{Vector,Vectors}
(1)Dense Vector(稠密向量)
稠密向量将原封不动的将上面的向量保存下来
val v0:Vector = Vectors.dense(1.0,6.0,0.0,0.0,0.0,0.0)
(2)Sparse Vector(稀疏向量)
而稀疏向量会记录这个向量的长度,向量内非0元素的索引(位置),向量内非0元素的值
val v1:Vector = Vectors.sparse(6, Array(0,1), Array(1.0, 6.0))
也可以使用下面的形式:长度 + 索引,值的序列
val v2:Vector = Vector.sparse(6,Seq((0,1.0),(1,6.0)))
注意:在大数据分析中,稀疏数据会非常常见,用稀疏向量或矩阵进行模型的训练效率会比稠密矩阵存储的效率更高,也更加节省时间。所以尽量使用稀疏矩阵来进行模型的训练。
2.Labeled point(带类别的向量)
本质上是Label + Vector(在使用MLlib算法时,数据必须是LabelPoint类型)
例如,给上面的向量加上标签变为labelpoint即为:
import org.apache.s.park.mllib.regression.LabeledPointval posi=LabeledPoint(1.0,Vectors.sparse(6, Array(0,1), Array(1.0, 6.0)))
3.Local matrix(本地矩阵)
引入:import org.apache.spark.mllib.linalg.{Matrix, Matrices}
来看一下官网给出的例子:
(1)稠密矩阵存储方式:
((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
(2)对于稀疏矩阵的存储方式:
((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))
注意:这里有必要对稀疏矩阵的存储进行一定的说明
((9.0, 0.0),
(0.0, 8.0),
(0.0, 6.0))
在sparse中有3个数组作为参数:其实这些数组的数都是为了确定矩阵中的非0值的位置
①先看后两个
Array(0, 2, 1):指非0元素出现的行索引,对应Array(9,6,8),就是9在第0行,6在第二行...
②然后看第一个Array(0,1,3)
0是不变的,1指第一列出现了1个非0元素,3指第一列和第二列一共出现了3个非0元素。
这样计数的目的为了:通过第一列出现了1个非0元素,加上第二个列表中对行索引,就可以确定第三个列表中第一个值在矩阵中的位置,然后通过第一个列表的3-1就可以知道第二列有2个非0元素,加上第二个列表第2,3个元素对行索引的描述,确定第三个列表中6,8对应在矩阵中的位置,以此类推,就可以用这种方法来表示矩阵啦。
4.Distributed matrix(分布式矩阵)
分布式矩阵具有长类型的行和列索引以及双类型的值,分布存储在一个或多个rdd中。选择合适的格式来存储大型分布式矩阵是非常重要的。将分布式矩阵转换为不同的格式可能需要全局转移,这是非常昂贵的。目前已经实现了四种类型的分布式矩阵。
具体示例去官网查看。
在这里注明一下:
在计算矩阵奇异值、矩阵乘法等算法中,要求输入IndexedRowMatrix
BlockMatrix由于拆分比较方便,利于进行分布式矩阵计算,如块运算。
三、汇总统计:(关于汇总统计的内容在官网可以找到详细的例子,这里只做简单的介绍)
1.基础统计值
对RDD[Vector]进行基本的统计分析(mean,min,max,etc.)
2.相关性系数(Correlations)
研究变量之间线性相关程度的量。
现在Spark支持两种相关性系数:pearson相关系数和Spearman等级相关系数
适用条件:
Pearson 相关性:连续数据,正态分布, 线性关系,用pearson相关系数是最恰当 ;上述任一条件不满足,就用spearman相关系数
Spearman 相关性:两个定序测量数据 之间也用spearman相关系数,不能用 pearson相关系数
3.分层抽样(Stratified Sampling)
将数据根据不同的特征分成不同的组,然后按特定条件从不同的组中获取样本,并重新组成新的数组。Spark RDD api 中提供两种方式。
sampleByKey 和 sampleByKeyExact
两者的区别:
①sampleByKey 每次都通过给定的概率以一种类似于 掷硬币的方式来决定这个观察值是否被放入样本,因此一遍就可以过滤完所有数据,最后得到一个近似大小的样本,但往往不够准确。
②sampleByKeyExtra 会对全量数据做采样计算。对于每个类别,其都会产生 (fk⋅nk)个样本,其中 fk是键为k的样本类别采样的比例;nk是键k所拥有的样本数。 sampleByKeyExtra 采样的结果会更准确,有99.99%的置信度,但耗费的计算资源也更多。
4.假设检验(Hypothesis testing)
是数理统计学中根据一定假设条件由样本推断总体的一种方法
(1).卡方检验(Stratified Sampling):
统计样本的实际观测值与理论推断值之间的偏离程度,实际观测值与理论推断值之间的偏离程度就决定卡方值的大小。
(2).适配度检验(Goodness of fit)
这里采用pearson检验方法
介绍几个参数:
method:这里采用pearson方法。
Statistic: 检验统计量。
degrees of freedom:自由度。表示可自由变动的样本观测值的数目,
pValue:统计学根据显著性检验方法所得到的P值。一般以P < 0.05 为显著, P<0.01 为非常显著, 其含义是样本间的差异由抽样误差所致的概率小于0.05 或0.01。 一般来说,假设检验主要看P值就够了。
(3).独立性检验(Indenpendence)
卡方独立性检验是用来检验两个属性间是否独立。其中一个属性做为行,另外一个做为列,通过貌似相关的关系考察其是否真实存在相关性。如检验:性别和习惯用左右手没有关系。
5.核密度估计(Kernel density estimation)
Spark MLlib 提 供 了 一 个 工 具 类 KernelDensity 用于核密度估算,核密度 估算的意思是根据已知的样本估计未知的 密度,属于非参数检验方法之一。
原理:
核密度估计的原理是。观察某一事物的 已知分布,如果某一个数在观察中出现了 ,可认为这个数的概率密度很大,和这个数比较近的数的概率密度也会比较大,而那些离这个数远的数的概率密度会比较小。
四、实例应用K-means算法
对于k-means的介绍可以看往期博文:
https://blog.csdn.net/hehe_soft_engineer/article/details/101349943#t2
下面只对K-means算法的应用作简单介绍(给出测试数据、代码以及测试结果)
任务要求:使用K-Means算法分析给定数据的受到网络攻击类别(标签)
注意:这里只是提供一种思路,并不能保证正确。
1.训练数据(部分截图)
最后一列是训练数据的标签属性
2.代码(可能有些冗余)
代码设计过程:
1).观察数据可以看出来,首先要把第一、二、三列的数据变为整型类型(要通过广播变量的方式,在数据RDD的map操作内对变量进行修改,用以规避分块对数据一致性带来的影响)
2). 第一、二、三列进行对应映射,并将数据修改为可输入算法的RDD[array],其实此处的映射值应该差距大一些,不然分类不会太明显(这里的取值暂定为1)
3).将数据集转化为RDD[Vector]类型,作为K-means算法的输入
4). 通过train方法来传入参数用来调整模型
5) 输出聚类中心
6) 对源数据集进行分类预测并使用误差平方之和来评估数据模型
package MLlibtest.chapter01.algorithmsimport org.apache.spark.broadcast.Broadcast
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors
import scala.collection.mutable
object KDDCup_kmeans {def main(args: Array[String]) {val conf = new SparkConf().setAppName("KDDCup_kmeans").setMaster("local[*]")val sc = new SparkContext(conf)// 将带标签的10%训练数据载入val traindata = sc.textFile("F:\\2019秋季学期\\Spark内存计算\\spark课程资源\\5.sparkMLlib01\\sparkmllib_data\\kddcup\\kddcup.data_10_percent_corrected")//观察数据可以看出来,首先要把第一、二、三列的数据变为因子类型val linearrs=traindata.map(line=>line.split(","))//设置广播变量val empmap1=mutable.Map.empty[String,Int]val empmap2=mutable.Map.empty[String,Int]val empmap3=mutable.Map.empty[String,Int]val empmap4=mutable.Map.empty[String,Int]val col1map:Broadcast[mutable.Map[String,Int]]=sc.broadcast(empmap1)val col2map:Broadcast[mutable.Map[String,Int]]=sc.broadcast(empmap2)val col3map:Broadcast[mutable.Map[String,Int]]=sc.broadcast(empmap3)val col4map:Broadcast[mutable.Map[String,Int]]=sc.broadcast(empmap4)//计数可以通过累加器来计算,累加器在执行任务的节点上是不能读取的val map1count=mutable.Map("1"->0)val map2count=mutable.Map("2"->0)val map3count=mutable.Map("3"->0)val map4count=mutable.Map("4"->0)val col1count:Broadcast[mutable.Map[String,Int]]=sc.broadcast(map1count)val col2count:Broadcast[mutable.Map[String,Int]]=sc.broadcast(map2count)val col3count:Broadcast[mutable.Map[String,Int]]=sc.broadcast(map3count)val col4count:Broadcast[mutable.Map[String,Int]]=sc.broadcast(map4count)//第一、二、三、最后一列进行对应映射,并将数据修改为可输入算法的RDD[array]//其实此处的映射值应该差距大一些,不然分类不会太明显(这里的取值暂定为1)val colmap=linearrs.map{arr=>if(!col1map.value.contains(arr(1))){val count01=col1count.value.get("1").get+1col1count.value+=("1"->count01)//更新集合操作col1map.value+=(arr(1)->count01)arr(1)=count01.toString}else{arr(1)=col1count.value.get("1").get.toString}if(!col2map.value.contains(arr(2))){val count02=col2count.value.get("2").get+1col2count.value+=("2"->count02)//更新集合操作col2map.value+=(arr(2)->count02)arr(2)=count02.toString}else{arr(2)=col2count.value.get("2").get.toString}if(!col3map.value.contains(arr(3))){val count03=col3count.value.get("3").get+1col3count.value+=("3"->count03)//更新集合操作col3map.value+=(arr(3)->count03)arr(3)=count03.toString}else{arr(3)=col3count.value.get("3").get.toString}if(!col4map.value.contains(arr.last)){val count04=col4count.value.get("4").get+1col4count.value+=("4"->count04)//更新集合操作col4map.value+=(arr.last->count04)arr(arr.length-1)=count04.toString}else{arr(arr.length-1)=col4count.value.get("4").get.toString}arr.dropRight(1)}.cache()//转化为RDD[Vector]类型,作为K-means算法的输入val vecarrs=colmap.map {arr =>val doublearr=arr.map(_.toDouble)Vectors.dense(doublearr)}val numClusters1= 23println(numClusters1)val numIterations1 = 20//通过train方法来传入参数用来调整模型val cluster1=KMeans.train(vecarrs, numClusters1, numIterations1)//输出映射的集合println("第一列:***********************************************")col1map.value.keys.foreach { i =>print( "Key = " + i )println(" Value = " + col1map.value(i) )}println("第二列:***********************************************")col2map.value.keys.foreach { i =>print( "Key = " + i )println(" Value = " + col2map.value(i) )}println("第三列:***********************************************")col3map.value.keys.foreach { i =>print( "Key = " + i )println(" Value = " + col3map.value(i) )}println("第四列:***********************************************")col4map.value.keys.foreach { i =>print( "Key = " + i )println(" Value = " + col4map.value(i) )}//输出聚类中心val kmeansresult=cluster1.clusterCenters.foreach(center=>{println("聚类中心点为:"+center)})//对源数据集进行分类预测看分成几类val predic:mutable.Set[Int]=mutable.Set[Int]()val total=100000 //总次数val selectcol=vecarrs.take(total)for(data<-selectcol){val presult=cluster1.predict(data)predic.add(presult)}println("验证数据分类为:")println(predic)println("Spark MLlib K-means clustering test finished.")//cluster1.save(sc, "F:\\2019秋季学期\\Spark内存计算\\spark课程资源\\5.sparkMLlib01\\KMeansModel1")// Evaluate clustering by computing Within Set Sum of Squared Errors// 使用误差平方之和来评估数据模型val WSSSE = cluster1.computeCost(vecarrs)println(s"Within Set Sum of Squared Errors = $WSSSE")// Save and load model/*clusters.save(sc, "target/org/apache/spark/KMeansExample/KMeansModel")val sameModel = KMeansModel.load(sc, "target/org/apache/spark/KMeansExample/KMeansModel")*/// $example off$sc.stop()}
}
3.运行结果
1)数据处理
数据处理:
2)聚类中心
3)对数据集进行聚类验证:
4)对数据集求方差:
(感觉聚类效果不好,应该是无效的属性太多,没有进行主成分分析或者给某些属性的权值不太合适,导致偏差太大)
Spark学习之路——8.Spark MLlib相关推荐
- Spark学习之路一——Spark基础及环境搭建
Spark学习之路一--Spark基础及环境搭建 文章目录 一. Spark 概述 1.1 概述 1.2 优势特性 1.2.1 运行速度快 1.2.2 容易使用 1.2.3 通用性 1.2.4 运行模 ...
- 我的spark学习之路(三):利用spark做回归分析
spark的机器学习库(MLlib)下有简单的回归分析方法,今天只说最简单的线性回归,spark提供有两个回归分析库(mllib和ml),我学习的时候在网上也查了不少资料,有一个奇怪的现象是网上关于s ...
- Spark学习之路 (二)Spark2.3 HA集群的分布式安装
<2021年最新版大数据面试题全面开启更新> 欢迎关注github<大数据成神之路> 目录 一.下载Spark安装包 1.从官网下载 2.从微软的镜像站下载 3.从清华的镜像站 ...
- Spark学习之路 (二十二)SparkStreaming的官方文档
讨论QQ:1586558083 目录 一.简介 1.1 概述 1.2 一个小栗子 2.2 初始化StreamingContext 2.3 离散数据流 (DStreams) 2.4 输入DStream和 ...
- Spark学习之路 (五)Spark伪分布式安装
一.JDK的安装 JDK使用root用户安装 1.1 上传安装包并解压 [root@hadoop1 soft]# tar -zxvf jdk-8u73-linux-x64.tar.gz -C /usr ...
- Spark学习之路---Spark核心概念
Spark核心概念简介 一个Spark应用都由驱动器程序发起集群上的各种并发操作,一个驱动器程序一般包含多个执行器节点,驱动器程序通过一个SaprkContext对象访问saprk.RDD(弹性分布式 ...
- Spark学习之路 (二十三)SparkStreaming的官方文档
一.SparkCore.SparkSQL和SparkStreaming的类似之处 二.SparkStreaming的运行流程 2.1 图解说明 2.2 文字解说 1.我们在集群中的其中一台机器上提交我 ...
- Spark高手之路1—Spark简介
文章目录 Spark 概述 1. Spark 是什么 2. Spark与Hadoop比较 2.1 从时间节点上来看 2.2 从功能上来看 3. Spark Or Hadoop 4. Spark 4.1 ...
- Spark学习(一) -- Spark安装及简介
标签(空格分隔): Spark 学习中的知识点:函数式编程.泛型编程.面向对象.并行编程. 任何工具的产生都会涉及这几个问题: 现实问题是什么? 理论模型的提出. 工程实现. 思考: 数据规模达到一台 ...
最新文章
- 获取物料的103待检库存
- webpack - 收藏集 - 掘金
- YbtOJ#532-往事之树【广义SAM,线段树合并】
- devops 文化_DevOps之外的无责文化示例
- 读书笔记-你不知道的JS上-词法作用域
- 使用python实现栈和队列
- 多播报文的发送和接收
- x线计算机断层摄影机房面积应不小于,X射线计算机断层摄影放射防护要求GBZ165-2012.pdf...
- python 发邮件 抄送,Python调用outlook发送邮件,发送给多人、抄送给多人并带上附件...
- 拾叶集 - 江湖一剑客
- 光剑评注:其实,说了这么多废话,无非就是: 一切皆是映射。不管是嵌套 XML,还是 Lisp 嵌套括号,还是 XXX 的 Map 数据结构,一切都是树形结构——映射。...
- word,excel重难点问题解答
- python namedtuple默认值_python 使用 namedtuple
- Javascript——js常用的方法(一)...........
- 【HiFlow】腾讯云新一代自动化助手,我用它完成了企业疫情提示(无代码)
- 职业发展之大数据开发工程师理解
- “揾”钱,最紧要系稳
- 某校2019专硕编程题-逆序输出奇数
- Persecond for Mac(延时摄影视频制作工具)
- User: root is not allowed to impersonate anonymous (state=08S01,code=0)