机器学习实践:Spark MLlib库介绍与使用

1、实验描述

  • MLlib ( Machine Learning Library )是 Spark 的一个机器学习库。它能够较容易地解决一些实际的大规模机器学习问题。本实验旨在学习 Spark 的机器学习库—— MLlib 的相关知识,了解 MLlib 与 ML 之间的区别和联系,掌握 MLlib 中的几个基本数据类型

  • 实验时长:90分钟

  • 主要步骤:

    • 学习Mllib的基本数据类型

    • 学习Mllib的基本算法库

    • 利用Mllib算法库中的协同推荐算法为用户推荐电影

2、实验环境

  • 虚拟机数量:1

  • 系统版本:CentOS 7.5

  • Spark版本:Apache Spark 2.1.1

3、相关技能

  • linux常用命令
  • Spark shell
  • Scala语言编程
  • MLlib 库的使用
  • 协同过滤算法
  • Spark SQL应用

4、相关知识点

  • 数据类型:本地向量、标签点、本地矩阵
  • 稠密向量、稀疏向量
  • MLlib 库的主要内容
  • MLlib 的数据类型和算法种类
  • 协同过滤算法
  • Spark SQL

5、效果图

  • 利用ALS模型对测试数据做预测,看到这个误差值是在可以接受的范围内的。详情请看实验步骤

图 1

6、实验步骤

6.1MLlib 的数据类型

6.1.1本地向量

6.1.1.1本地向量(Local Vector)的索引是整型的、从0开始的。而它的值为 Double 类型,存储于单个机器内。 MLlib 支持两种本地向量:稠密向量稀疏向量

6.1.1.2本地向量的基类是 Vector 类,DenseVector 和 SparseVector 类是其两种不同的实现。官方文档推荐大家使用 Vector 类中已实现的工厂方法来创建本地向量

6.1.1.3安装spark(这里我们只需要单机模式的spark的环境即可),解压tgz下的spark安装包

[zkpk@master ~]$ cd tgz/spark
[zkpk@master spark]$ tar -zxvf spark-2.1.1-bin-hadoop-2.7 -C ~/
[zkpk@master spark]$ cd

6.1.1.4打开spark shell终端

[zkpk@master ~]$ cd spark-2.1.1-bin-hadoop2.7/
[zkpk@master spark-2.1.1-bin-hadoop2.7]$ bin/spark-shell

6.1.1.5进入spark shell命令行后,导入spark mllib中的本地向量相关包

scala>import org.apache.spark.mllib.linalg.{Vector, Vectors}

6.1.1.6利用dense方法创建稠密向量(1.0, 0.0, 3.0)

scala>val dv: Vector = Vectors.dense(1.0, 0.0, 3.0)

6.1.1.7通过指定非零实体对应的索引和值,来创建稀疏向量 (1.0, 0.0, 3.0)

scala>val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))  // 3个参数:第一个参数:vector大小;第二个indices:数组,用于保存vector中非零元素,且索引是递增的;第三个:vector非零值组成的数组,且与indices对应

6.1.1.8通过指定非零实体,来创建稀疏向量 (1.0, 0.0, 3.0)

scala>val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0)))

图 2

6.1.1.9向量 (1.0, 0.0, 3.0) 写成稠密形式就是 [1.0, 0.0, 3.0],而写成稀疏形式则是(3, [0, 2], [1.0, 3.0]),后者的第一个 3 是指向量的大小。稀疏和稠密的界限没有严格意义上的标准,通常需要依据具体的问题来决定

6.1.1.10在Spark中,如果没有引入org.apache.spark.mllib.linalg.Vector 包的话,Vector 会被认为是scala.collection.immutable.Vector 中定义的那个。因此,应当引入前者以显式地使用 MLlib 中的向量

6.1.2标签点

6.1.2.1标签点(Labeled Point)是一个本地向量,也分稀疏或者稠密,并且是一个带有标签的本地向量。

6.1.2.2在 MLlib 中,标签点常用于监督学习类算法。标签(Label)是用 Double 类型存放的,因此标签点可以用于回归或者分类算法中。如果是二维分类,标签则必须是 0 或 1 之间的一种。而如果是多个维度的分类,标签应当是从 0 开始的数字,代表各个分类的索引。

6.1.2.3标签点是由一个名为 LabeledPoint的 Case Class 样例类定义的

6.1.2.4在spark-shell中创建标签点, 引入标签点相关的包

scala>import org.apache.spark.mllib.linalg.Vectors
scala>import org.apache.spark.mllib.regression.LabeledPoint

6.1.2.5创建一个带有正面标签和稠密特征向量的标签点

scala>val lpd= LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))

6.1.2.6创建一个带有负面标签和稀疏特征向量的标签点

scala>val lps= LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))

图 3

6.1.3本地矩阵

6.1.3.1本地矩阵(Local matrix)的索引也是从0开始的,并且是整型。值也为 Double 类型,存储于单个机器内。 Mllib 支持两种本地矩阵:稠密矩阵和稀疏矩阵

6.1.3.2稠密矩阵的实体值以列为主序的形式,存放于单个 Double 型数组内。稀疏矩阵的非零实体以列为主序的形式,存放于压缩稀疏列(Compressed Sparse Column, CSC)中

6.1.3.3本地矩阵的基类是 Matrix 类,在 Spark 中有其两种实现,分别是 DenseMatrix 和 SparseMatrix 。官方文档中推荐使用已在 Matrices 类中实现的工厂方法来创建本地矩阵。需要注意的是,MLlib 中的本地矩阵是列主序的(column-major)

6.1.3.4在spark shell中创建本地矩阵,首先导入矩阵相关包

scala>import org.apache.spark.mllib.linalg.{Matrix, Matrices}

6.1.3.5创建稠密矩阵((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))

scala>val md: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0)) // 列主序的;如此例,第一列的值有3个,分别是1.0、3.0、5.0;另外三个值是第二列的值

6.1.3.6创建稀疏矩阵 ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))

scala>val ms: Matrix = Matrices.sparse(3, 2, Array(0, 1, 3), Array(0, 2, 1), Array(9, 6, 8))

图 4

6.2MLlib算法

6.2.1MLlib 的算法涵盖了基本统计、分类和回归和协同过滤等 9 个大类,而更加新颖和高效的算法也在不断地推陈出新。在 Spark中,主要的算法有:

6.2.1.1基本统计算法

  • 汇总统计

  • 相关统计

  • 分层抽样

  • 假设检验

  • 随机数据生成

  • 核密度估计

6.2.1.2分类和回归

6.2.1.3协同过滤

6.2.1.4聚类

6.2.1.5降维

6.2.1.6特征提取和转化

6.2.1.7频繁项挖掘

6.2.1.8评价指标

6.2.1.9PMML导出

6.2.2由于篇幅有限且算法内容较复杂,这里就不一一讲解,本实验介绍使用一种交替最小二乘法的协同过滤算法

6.3实例:利用算法来推荐电影

6.3.1利用**:quit**命令退出spark shell命令行,解压experiment/mllib目录下的数据集文件

scala>:quit
[zkpk@master spark-2.1.1-bin-hadoop2.7]$ cd ~/experiment/mllib
[zkpk@master mllib]$ unzip ml-1m.zip
[zkpk@master mllib]$ cd ml-1m

6.3.2解压完成之后有三个dat文件,分别是movies.dat、ratings.data、users.dat

6.3.3我们查看下用户信息users.dat文件的前10条,每行的数据格式为`用户ID::性别::年龄::工作::邮编`

[zkpk@master ml-1m]$ head -10 users.dat

图 5

6.3.3.1性别部分:M代表男性,F代表女性;

6.3.3.2年龄部分:1表示18岁以下,18表示18-24岁,25表示25-34岁,35表示35-44岁,45表示45-49岁,50表示50-55岁,56表示56岁及以上

6.3.3.3职业部分:职业编号分布在0-20,每一种编号分别表示一种职业,若想查看详情,打开README文件,找到对users.dat的描述的内容

6.3.4我们查看下评论信息ratings.dat文件的前10条,其格式为`用户ID::电影ID::评论星级::时间戳`

[zkpk@master ml-1m]$ head -10 ratings.dat

图 6

6.3.4.1电影ID部分:电影ID的范围在1-3952之间,每一个ID代表一部电影

6.3.4.2评论星级部分:共五星用1-5的数字表示

6.3.4.3时间戳部分:评论发布的时间戳

6.3.5我们查看下电影信息movies.dat文件的前10条,其格式为`电影ID::电影标题::电影流派`

[zkpk@master ml-1m]$ head -10 movies.dat

图 7

6.3.5.1电影标题部分:电影的发布名称和发布年份等

6.3.5.2电影流派部分:共18个流派,一部电影可能属于一个或者多个流派,各流派如下:Action(行动)、Adventure(冒险)、Animation(动画)、Children’s(儿童)、Comedy(喜剧)、Crime(犯罪)、Documentary(纪录片)、Drama(戏剧)、Fantasy(幻想)、Film-Noir(黑色电影)、Horror(恐怖)、Musical(音乐)、Mystery(神秘)、Romance(浪漫)、Sci-Fi(科幻)、Thriller(惊悚)、War(战争)、Western(西方)

6.3.6重新进入Spark shell

[zkpk@master ml-1m]$ cd ~/spark-2.1.1-bin-hadoop2.7/
[zkpk@master spark-2.1.1-bin-hadoop2.7]$ bin/spark-shell

6.3.7开始编写实验代码

6.3.7.1引入本节实验要使用的相关spark包

scala>import spark.implicits._
scala>import org.apache.spark.rdd._
scala>import org.apache.spark.sql._
scala>import org.apache.spark.mllib.recommendation.Rating
scala>import org.apache.spark.mllib.recommendation.ALS
scala>import org.apache.spark.mllib.recommendation.MatrixFactorizationModel

6.3.7.2首先对电影数据建立一个 Case Class,它对应了 movies.dat 文件中的部分字段(其中 Genres(流派)字段不是我们所需要的,因此在 Case Class 中没有设置对应的成员变量)

scala>case class Movie(movieId: Int, title: String)

图 8

6.3.7.3同样的,我们为用户数据建立一个 Case Class,它对应了 users.dat 文件中的部分字段,(本实验只使用了 用户ID 作为运算时的输入)

scala>case class User(userId: Int, gender: String, age: Int, occupation: Int, zipCode: String)

图 9

6.3.7.4我们不需定义ratings.dat数据的Case Class,因为评价类型的数据在spark机器学习框架中已经有org.apache.spark.mllib.recommendation.Rating类来做映射,在一开始的导包中我们已经导入了该包

6.3.7.5定义三个解析数据函数。这些函数将用于解析数据集中的每一行内容,去掉分隔符 ::,然后把数据映射到 Case Class 的对应成员中,因此,针对三种数据,分别编写对应解析函数

// 解析 movies.dat 文件的函数;参数为此文件中的一行内容
scala> def parseMovieData(data: String): Movie = {val dataField = data.split("::")assert(dataField.size == 3)Movie(dataField(0).toInt, dataField(1))
}// 解析 users.dat 文件的函数;参数为此文件中的一行内容
scala> def parseUserData(data: String): User = {val dataField = data.split("::")assert(dataField.size == 5)User(dataField(0).toInt, dataField(1), dataField(2).toInt, dataField(3).toInt, dataField(4))
}// 解析 ratings.dat 文件的函数,这里用到的 Rating 类是在org.apache.spark.mllib.recommendation.Rating 包中定义的
scala> def parseRatingData(data: String): Rating = {val dataField = data.split("::")Rating(dataField(0).toInt, dataField(1).toInt, dataField(2).toDouble)
}

图 10

6.3.8定义好三种数据对应的解析函数后,下面我们将他们导入到 RDD 中

scala>val moviesData = sc.textFile("/home/zkpk/experiment/mllib/ml-1m/movies.dat").map(parseMovieData).cache() //cache()将RDD缓存起来
scala>val usersData = sc.textFile("/home/zkpk/experiment/mllib/ml-1m/users.dat").map(parseUserData).cache()
scala>val ratingsData = sc.textFile("/home/zkpk/experiment/mllib/ml-1m/ratings.dat").map(parseRatingData).cache()

图 11

6.3.9通过一些方法来查看数据的某些特征

6.3.9.1看一下评价数据到底有多少个

scala>val amountOfRatings = ratingsData.count()

图 12

6.3.9.2看一下多有少个用户评价了电影

scala>val amountOfUsers = ratingsData.map(_.user).distinct().count() //ratingsData是一个RDD,其元素类型是Rating;_.user中_代表RDD中的一个个Rating对象,然后取出此对象的user属性;再去重;计数

图 13

6.3.10将这些 RDD 转化为 DataFrame ,用于后续的操作

scala>val moviesDF = moviesData.toDF()
scala>val usersDF = usersData.toDF()
scala>val ratingsDF = ratingsData.toDF()

图 14

6.3.11将这几个 DataFrame 注册为临时表

scala>moviesDF.registerTempTable("movies")
scala>usersDF.registerTempTable("users")
scala>ratingsDF.registerTempTable("ratings")

图 15

6.3.12利用一个 SQL 查询,获取id是1680的用户的评价高于 4.5 分的电影有哪些

scala>val highRatingMovies = spark.sql("SELECT ratings.user, ratings.product, ratings.rating, movies.title FROM ratings JOIN movies ON movies.movieId=ratings.product  WHERE ratings.user=1680 and ratings.rating > 4.5 ORDER BY ratings.rating DESC ")

6.3.13检索完成后,调用 show() 函数查看前20条结果,结果格式为:用户ID,电影ID,用户评分,电影标题(每次实验结果可能略有不同)

scala>highRatingMovies.show()

图 16

6.3.14开始机器学习,将已有的数据划分为两部分:训练集、测试集

6.3.15继续在spark shell中编程,将评分数据集按照训练集70%,测试集30%来随机切分,250L是随机种子数

scala>val tempPartitions = ratingsData.randomSplit(Array(0.7, 0.3), 250L)

6.3.15.1这里用到了 randomSplit 函数,两个参数分别为划分比例、产生随机数的种子值

6.3.16然后我们将划分的结果存放在两个不同的变量中:

scala>val trainingSetOfRatingsData = tempPartitions(0).cache()   // 训练集
scala>val testSetOfRatingData = tempPartitions(1).cache()       // 测试集

6.3.17将训练集用于 ALS 算法

scala>val recomModel = new ALS().setRank(20).setIterations(10).run(trainingSetOfRatingsData)

6.3.17.1MLlib 中的 ALS 算法需要设置三个参数,分别是特征矩阵的秩数量 setRank、迭代次数 setIterations 和训练用的训练数据集。其中特征矩阵的秩设置为 20,表明生成的 userFeatures 和 itemFeatures 的个数是20

6.3.18生成推荐结果,通过 MatrixFactorizationModel 类的 recommendProducts 方法,我们可以对某个用户生成推荐结果(每次实验结果可能略有不同)

scala>val recomResult = recomModel.recommendProducts(1234, 10)   // 1234是userid

图 17

6.3.18.11234 是我们随意指定的一个用户 ID,做实验时也可以修改成其他用户的 ID 。10 是产生推荐的数量

6.3.19也可以利用mkString展现更直观的推荐结果

scala>println(recomResult.mkString("\n"))

图 18

6.3.19.1输出数据的格式是 Rating 类的格式:用户ID、电影ID和该用户对电影可能的评分。

6.3.20但仅看到电影的 ID,并不知道具体是什么电影。所以还需要按照电影的 ID找到对应的电影标题:

scala>val movieTitles = moviesDF.rdd.map(array => (array(0), array(1))).collectAsMap()   // 0、1分别是movieId、title
scala>val recomResultWithTitle = recomResult.map(rating => (movieTitles(rating.product), rating.rating))

6.3.21然后再把结果输出,就可以看到推荐的电影名称了:

scala>println(recomResultWithTitle.mkString("\n"))

图 19

6.3.22模型评价,评价的方式便是将上边得到的推荐结果(评分预测值)与测试集中实际结果相比较。利用 MatrixFactorizationModel 类的 predict 函数来得到预测的评分值

scala>val predictResultOfTestSet = recomModel.predict(testSetOfRatingData.map { case Rating(user, product, rating) => (user, product) })

6.3.23得到对测试集的预测评分结果后,用 map 、 join算子将它与测试集的原始数据组合成 **((用户ID, 电影ID), (测试集原有评分, 预测评分))**的数据结构。这个格式是 Key-Value 形式的,Key 为 (user, product)。我们是要把这里的测试集原有评分与预测时得到的评分相比较,二者的联系就是 user 和 product 相同。第一个 map 可以将 (用户ID, 电影ID) 与 测试集原有评分 组合成 KV 格式;第二个 map 操作,可以将 (用户ID, 电影ID) 与 预测评分 组合成 KV 格式

// 测试集
scala>val formatResultOfTestSet = testSetOfRatingData.map {case Rating(user, product, rating) => ((user, product), rating)
}// 预测结果
scala>val formatResultOfPredictionResult = predictResultOfTestSet.map {case Rating(user, product, rating) => ((user, product), rating)
}

6.3.24然后利用join算子将二者连接起来,Key 相同的不同值就整合在一起形成了 ((用户ID, 电影ID), (测试集原有评分, 预测评分)) 的格式。

scala>val finalResultForComparison = formatResultOfPredictionResult.join(formatResultOfTestSet)

6.3.25最后利用这个结果来计算预测评分和实际评分之间的平均绝对误差。平均绝对误差(Mean Absolute Error)是所有单个观测值与算术平均值偏差的绝对值的平均。与平均误差相比,平均绝对误差由于被绝对值化,不会出现正负相抵消的情况,所以平均绝对误差能更好地反映预测值误差的实际情况。我们直接取finalResultForComparison 结果中 ratingOfTest 和 ratingOfPrediction两个值,先算误差,再取绝对值。最后对所有的绝对值计算平均数

scala>val MAE = finalResultForComparison.map {case ((user, product), (ratingOfTest, ratingOfPrediction)) => {val deviation= (ratingOfTest - ratingOfPrediction)Math.abs(deviation)}
}.mean()

6.3.26查看终端结果(每次实验结果可能略有不同),可以看到这个误差值是在可以接受的范围内的

图 20

7、总结

算术平均值偏差的绝对值的平均。与平均误差相比,平均绝对误差由于被绝对值化,不会出现正负相抵消的情况,所以平均绝对误差能更好地反映预测值误差的实际情况。我们直接取finalResultForComparison 结果中 ratingOfTest 和 ratingOfPrediction两个值,先算误差,再取绝对值。最后对所有的绝对值计算平均数

机器学习实践:Spark MLlib库介绍与使用-3相关推荐

  1. 分布式机器学习之——Spark MLlib并行训练原理

    这里是 王喆的机器学习笔记 的第二十五篇文章.接下来的几篇文章希望与大家一同讨论一下机器学习模型的分布式训练的问题.这个问题在推荐.广告.搜索领域尤为突出,因为在互联网场景下,动辄TB甚至PB级的数据 ...

  2. Python之数据挖掘实践--scikit learn库介绍和下载、实践、采坑

    文章目录 前言 A sklearn库是什么? A1 依赖库介绍 1.Numpy库 2.Scipy库 3. matplotlib A2 下载安装 B 实践过程 B1 主成分分析(PCA) B2 实现Km ...

  3. 《大数据机器学习实践探索》 ---- 大数据机器学习:spark mlib 库【简介 与 架构初探】

    文章大纲 简介 架构 Pipelines 案例 实现过程 参考文献 简介 机器学习强调三个关键词:算法.经验.性能,其处理过程如上图所示.在数据的基础上,通过算法构建出模型并对模型进行评估.评估的性能 ...

  4. C++最佳实践之常用库介绍

    C++的常用库包括:algorithm.chrono.iostream.future.memory.map.queue.unordered_map.regex.set.string.sstream.s ...

  5. 机器学习实践:足球比赛聚类分析--11

    机器学习实践:足球比赛聚类分析 1.实验描述 本实验利用K-Means聚类分析算法对足球比赛结果进行分析,该算法通过Sprak Mllib库来调用,我们将学习K-Means算法的K值选取,聚类原理等内 ...

  6. Spark RDD算子介绍

    Spark学习笔记总结 01. Spark基础 1. 介绍 Spark可以用于批处理.交互式查询(Spark SQL).实时流处理(Spark Streaming).机器学习(Spark MLlib) ...

  7. Python机器学习、深度学习库总结(内含大量示例,建议收藏)

    Python机器学习.深度学习库总结(内含大量示例,建议收藏) 前言 python常用机器学习及深度学习库介绍 总结与分类 python 常用机器学习及深度学习库总结 分类 更多 前言 目前,随着人工 ...

  8. Python机器学习、深度学习库总结

    Python机器学习.深度学习库总结(内含大量示例,建议收藏) 前言 python常用机器学习及深度学习库介绍 总结与分类 python 常用机器学习及深度学习库总结 分类 更多 前言 为了大家能够对 ...

  9. 盘点27个机器学习、深度学习库最频繁使用的 Python 工具包(内含大量示例,建议收藏)

    目前,随着人工智能的大热,吸引了诸多行业对于人工智能的关注,同时也迎来了一波又一波的人工智能学习的热潮,虽然人工智能背后的原理并不能通过短短一文给予详细介绍,但是像所有学科一样,我们并不需要从头开始& ...

  10. mllib调参 spark_从Spark MLlib到美图机器学习框架实践

    MLlib 是 Apache Spark 的可扩展机器学习库,旨在简化机器学习的工程实践工作,并方便扩展到更大规模的数据集. 机器学习简介 在深入介绍 Spark MLlib 之前先了解机器学习,根据 ...

最新文章

  1. 1.2 Spyder的基本使用
  2. 开源框架完美组合之Spring.NET + NHibernate + ASP.NET MVC + jQuery + easyUI 中英文双语言小型企业网站Demo项目分析
  3. [蓝桥] 算法提高 队列操作
  4. 【NLP-语义匹配】详解深度语义匹配模型DSSM
  5. [mybatis]Configuration XML_databaseidProvider
  6. 基于JAVA的在线图书销售系统
  7. maven 依赖版本管理— dependencyManagement
  8. MySql Cluster 集成安装,Centos,坑点集锦
  9. 物联卡与SIM卡相比优势在哪
  10. elasticsearch中文分词
  11. 单片机CC2530学习笔记
  12. keil注册机激活的方法
  13. 文件搜索工具终极大PK挑战赛
  14. 阿里云域名购买和域名解析教程
  15. HTML表格边框空隙
  16. 图像处理空间域、变换域、时域和频域的含义理解:spatial VS transforms domain
  17. JAVA的stream流操作详细解析
  18. 移动apn接入点哪个快_电信和联通以及移动物联卡哪个较好
  19. **蒙特卡洛计算定积分VC++**
  20. 从NCBI 上下载 gbff 文件并得到 CDS 信息

热门文章

  1. 清华大学计算机系高考选科要求,清华大学新高考选课要求-清华大学新高考选考科目...
  2. 【YBT2022寒假Day1 B】方格填写(插头DP)
  3. 开源的物理引擎_开源物理引擎
  4. 从零开始搭建java物联网平台_【攻略】从零开始搭建物联网系统
  5. 微商怎么引流客源,谈谈我这些年引流的经验
  6. 深入浅出Spring Boot 2.x——第一章Spring Boot来临
  7. 微信小程序超市购物+后台管理系统|前后分离VUE
  8. 非线性光纤光学_多模光纤中的非线性光学
  9. 非线性光纤光学_片上光学频率梳:可产生光子微波,应用于卫星通信和5G网络!...
  10. html中css鼠标手势样式,CSS鼠标手势