这篇Spark MLLib 机器学习库的简介翻译自Spark官方文档。感谢 @明风 的悉心的校对修改。

Spark 0.9.1 MLLib

机器学习库简介

  • 依赖
  • 二元分类
  • 线性回归
  • 聚类
  • 协同过滤
    • 隐性反馈 vs 显性反馈
  • 梯度下降基础算法
  • 用Scala调用MLLib
    • 二元分类
    • 线性回归
    • 聚类
    • 协同过滤
  • 用Java调用MLLib
  • 用Python调用MLLib
    • 二元分类
    • 线性回归
    • 聚类
    • 协同过滤

MLlib 是Spark对常用的机器学习算法的实现库,同时包括相关的测试和数据生成器。MLlib 目前支持四种常见的机器学习问题:二元分类,回归,聚类以及协同过滤,同时也包括一个底层的梯度下降优化基础算法。本指南将会简要介绍 MLlib 中所支持的功能,并给出相应的调用 MLlib 的例子。

依赖

MLlib 将会调用 jblas 线性代数库,这个库本身依赖于原生的 Fortran 程序。如果你的节点中没有这些库,你也许会需要安装 gfortran runtime library。如果程序没有办法自动检测到这些库,MLlib 将会抛出链接错误的异常。

如果想用 Python 调用 MLlib,你需要安装 NumPy 1.7 或者更新的版本。

二元分类

二元分类是一个监督学习问题。在这个问题中,我们希望将实体归类到两个独立的类别或标签的其中一个中,例如判断一个邮件是否是垃圾邮件。这个问题涉及在一组被打过标签的样例运行一个学习算法,例如一组由(数字)特征和(相关的)类别标签所代表的实体。这个算法将会返回一个训练好的模型,该模型能够对标签未知的新个体进行潜在标签预测。

MLlib 目前支持两个适用于二元分类的标准模型家族:线性支持向量机(SVMs) 和逻辑回归,同时也包括分别适用与这两个模型家族的 L1 和 L2 正则化 变体。这些训练算法都利用了一个底层的梯度下降基础算法(描述如下)。二元分类算法的输入值是一个正则项参数(regParam) 和多个与梯度下降相关的参数(stepSize, numIterations, miniBatchFraction) 。

目前可用的二元分类算法:

  • SVMWithSGD
  • LogisticRegressionWithSGD

线性回归

线性回归是另一个经典的监督学习问题。在这个问题中,每个个体都有一个与之相关联的实数标签(而在二元分类中个体的标签都是二元的),并且我们希望在给出用于表示这些实体的数值特征后,所预测出的标签值可以尽可能接近实际值。MLlib支持线性回归和与之相关的 L1 (lasso)和 L2 (ridge) 正则化的变体。MLlib中的回归算法也利用了底层的梯度下降基础算法(描述如下),输入参数与上述二元分类算法一致。

目前可用的线性回归算法:

  • LinearRegressionWithSGD
  • RidgeRegressionWithSGD
  • LassoWithSGD

聚类

聚类是一个非监督学习问题,在这个问题上,我们的目标是将一部分实体根据某种意义上的相似度和另一部分实体聚在一起。聚类通常被用于探索性的分析,或者作为层次化监督学习管道网(hierarchical supervised learning pipeline) 的一个组件(其中每一个类簇都会用与训练不同的分类器或者回归模型)。 MLlib 目前已经支持作为最被广泛使用的聚类算法之一的 k-means 聚类算法,根据事先定义的类簇个数,这个算法能对数据进行聚类。MLlib 的实现中包含一个 k-means++ 方法的并行化变体 kmeans||。 MLlib 里面的实现有如下的参数:

  • k 是所需的类簇的个数。
  • maxIterations 是最大的迭代次数。
  • initializationMode 这个参数决定了是用随机初始化还是通过 k-means|| 进行初始化。
  • runs 是跑 k-means 算法的次数(k-mean 算法不能保证能找出最优解,如果在给定的数据集上运行多次,算法将会返回最佳的结果)。
  • initializiationSteps 决定了 k-means|| 算法的步数。
  • epsilon 决定了判断 k-means 是否收敛的距离阀值。

目前可用的聚类算法:

  • KMeans

协同过滤

协同过滤常被应用于推荐系统。这些技术旨在补充用户-商品关联矩阵中所缺失的部分。MLlib当前支持基于模型的协同过滤,其中用户和商品通过一小组隐语义因子进行表达,并且这些因子也用于预测缺失的元素。为此,我们实现了交替最小二乘法(ALS) 来学习这些隐性语义因子。在 MLlib 中的实现有如下的参数:

numBlocks 是用于并行化计算的分块个数 (设置为-1为自动配置)。
rank 是模型中隐语义因子的个数。
iterations 是迭代的次数。
lambda 是ALS的正则化参数。
implicitPrefs 决定了是用显性反馈ALS的版本还是用适用隐性反馈数据集的版本。
alpha 是一个针对于隐性反馈 ALS 版本的参数,这个参数决定了偏好行为强度的基准。

隐性反馈 vs 显性反馈

基于矩阵分解的协同过滤的标准方法一般将用户商品矩阵中的元素作为用户对商品的显性偏好。

在许多的现实生活中的很多场景中,我们常常只能接触到隐性的反馈(例如游览,点击,购买,喜欢,分享等等)在 MLlib 中所用到的处理这种数据的方法来源于文献: Collaborative Filtering for Implicit Feedback Datasets。 本质上,这个方法将数据作为二元偏好值和偏好强度的一个结合,而不是对评分矩阵直接进行建模。因此,评价就不是与用户对商品的显性评分而是和所观察到的用户偏好强度关联了起来。然后,这个模型将尝试找到隐语义因子来预估一个用户对一个商品的偏好。

目前可用的协同过滤的算法:

  • ALS

梯度下降基础算法

梯度下降(及其随机的变种)是非常适用于大型分布式计算的一阶优化方案。梯度下降旨在通过向一个函数当前点(当前的参数值)的负梯度方向移动的方式迭代地找到这个函数的本地最优解。MLlib 以梯度下降作为一个底层的基础算法,在上面开发了各种机器学习算法。梯度下降算法有如下的参数:

  • gradient 这个类是用来计算要被优化的函数的随机梯度(如:相对于单一训练样本当前的参数值)。MLlib 包含常见损失函数 (hinge, logistic, least-squares) 的梯度类。梯度类将训练样本,其标签,以及当前的参数值作为输入值。
  • updater 是在梯度下降的每一次迭代中更新权重的类。MLlib 包含适用于无正则项,L1 正则项和 L2 正则项3种情况下的类。
  • stepSize 是一个表示梯度下降初始步长的数值。MLlib 中所有的更新器第 t 步的步长等于 stepSize / sqrt(t)。
  • numIterations 表示迭代的次数。
  • regParam 是在使用L1,L2 正则项时的正则化参数。
  • miniBatchFraction 是每一次迭代中用来计算梯度的数据百分比。

目前可用的梯度下降算法:

  • GradientDescent

用Scala调用MLLib

下面的代码段可以在spark-shell中运行。

二元分类

下面的代码段演示了如何导入一份样本数据集,使用算法对象中的静态方法在训练集上执行训练算法,在所得的模型上进行预测并计算训练误差。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

import org.apache.spark.SparkContext
import org.apache.spark.mllib.classification.SVMWithSGD
import org.apache.spark.mllib.regression.LabeledPoint
// Load and parse the data file
val data = sc.textFile("mllib/data/sample_svm_data.txt")
val parsedData = data.map { line =>
val parts = line.split(' ')
LabeledPoint(parts(0).toDouble, parts.tail.map(x => x.toDouble).toArray)
}
// Run training algorithm to build the model
val numIterations = 20
val model = SVMWithSGD.train(parsedData, numIterations)
// Evaluate model on training examples and compute training error
val labelAndPreds = parsedData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / parsedData.count
println("Training Error = " + trainErr)

默认情况下,这个SVMWithSGD.train()方法使用正则参数为 1.0 的 L2 正则项。如果我们想配置这个算法,我们可以通过直接新建一个新的对象,并调用setter的方法,进一步个性化设置SVMWithSGD。所有其他的 MLlib 算法也是通过这样的方法来支持个性化的设置。比如,下面的代码给出了一个正则参数为0.1的 L1 正则化SVM变体,并且让这个训练算法迭代200遍。

1
2
3
4
5
6
7

import org.apache.spark.mllib.optimization.L1Updater
val svmAlg = new SVMWithSGD()
svmAlg.optimizer.setNumIterations(200)
.setRegParam(0.1)
.setUpdater(new L1Updater)
val modelL1 = svmAlg.run(parsedData)

线性回归

下面这个例子演示了如何导入训练集数据,将其解析为带标签点的RDD。然后,使用LinearRegressionWithSGD 算法来建立一个简单的线性模型来预测标签的值。最后我们计算了均方差来评估预测值与实际值的吻合度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

import org.apache.spark.mllib.regression.LinearRegressionWithSGD
import org.apache.spark.mllib.regression.LabeledPoint
// Load and parse the data
val data = sc.textFile("mllib/data/ridge-data/lpsa.data")
val parsedData = data.map { line =>
val parts = line.split(',')
LabeledPoint(parts(0).toDouble, parts(1).split(' ').map(x => x.toDouble).toArray)
}
// Building the model
val numIterations = 20
val model = LinearRegressionWithSGD.train(parsedData, numIterations)
// Evaluate model on training examples and compute training error
val valuesAndPreds = parsedData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
val MSE = valuesAndPreds.map{ case(v, p) => math.pow((v - p), 2)}.reduce(_ + _)/valuesAndPreds.count
println("training Mean Squared Error = " + MSE)

类似的,你也可以使用 RidgeRegressionWithSGD 和 LassoWithSGD 这两个算法,并比较这些算法在训练集上的均方差。

聚类

在下面的例子中,在载入和解析数据之后,我们使用 KMeans 对象来将数据聚类到两个类簇当中。所需的类簇个数会被传递到算法中。然后我们将计算集内均方差总和 (WSSSE). 你可以通过增加类簇的个数k 来减小误差。 实际上,最优的类簇数通常是 1,因为这一点通常是WSSSE图中的 “低谷点”。

1
2
3
4
5
6
7
8
9
10
11
12
13
14

import org.apache.spark.mllib.clustering.KMeans
// Load and parse the data
val data = sc.textFile("kmeans_data.txt")
val parsedData = data.map( _.split(' ').map(_.toDouble))
// Cluster the data into two classes using KMeans
val numIterations = 20
val numClusters = 2
val clusters = KMeans.train(parsedData, numClusters, numIterations)
// Evaluate clustering by computing Within Set Sum of Squared Errors
val WSSSE = clusters.computeCost(parsedData)
println("Within Set Sum of Squared Errors = " + WSSSE)

协同过滤

在下面的例子中,我们导入的训练集中,数据每一行由一个用户,一个商品和相应的评分组成。假设评分是显性的,在这种情况下我们使用默认的ALS.train()方法。我们通过计算预测出的评分的均方差来评估这个推荐模型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.Rating
// Load and parse the data
val data = sc.textFile("mllib/data/als/test.data")
val ratings = data.map(_.split(',') match {
case Array(user, item, rate) => Rating(user.toInt, item.toInt, rate.toDouble)
})
// Build the recommendation model using ALS
val numIterations = 20
val model = ALS.train(ratings, 1, 20, 0.01)
// Evaluate the model on rating data
val usersProducts = ratings.map{ case Rating(user, product, rate) => (user, product)}
val predictions = model.predict(usersProducts).map{
case Rating(user, product, rate) => ((user, product), rate)
}
val ratesAndPreds = ratings.map{
case Rating(user, product, rate) => ((user, product), rate)
}.join(predictions)
val MSE = ratesAndPreds.map{
case ((user, product), (r1, r2)) => math.pow((r1- r2), 2)
}.reduce(_ + _)/ratesAndPreds.count
println("Mean Squared Error = " + MSE)

如果这个评分矩阵是通过其他的信息来源(如从其他的信号中提取出来的)所获得,你也可以使用trainImplicit的方法来得到更好的结果。

1
val model = ALS.trainImplicit(ratings, 1, 20, 0.01)

用Java调用MLLib

所有 MLlib 中的算法都是对Java友好的,因此你可以用在 Scala 中一样的方法来导入和调用这些算法。唯一要注意的是,这些算法的输入值是Scala RDD对象,而在 Spark Java API 中用了分离的JavaRDD类。你可以在你的 JavaRDD对象中调用.rdd()的方法来将Java RDD转化成Scala RDD。

用Python调用MLLib

下面的列子可以在 PySpark shell 中得到测试。

二元分类

下面的代码段表明了如何导入一份样本数据集,使用算法对象中的静态方法在训练集上执行训练算法,在所得的模型上进行预测并计算训练误差。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

from pyspark.mllib.classification import LogisticRegressionWithSGD
from numpy import array
# Load and parse the data
data = sc.textFile("mllib/data/sample_svm_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))
model = LogisticRegressionWithSGD.train(parsedData)
# Build the model
labelsAndPreds = parsedData.map(lambda point: (int(point.item(0)),
model.predict(point.take(range(1, point.size)))))
# Evaluating the model on training data
trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(parsedData.count())
print("Training Error = " + str(trainErr))

线性回归

下面这个例子给出了如何导入训练集数据,将其解析为带标签点的RDD。然后,这个例子使用了LinearRegressionWithSGD 算法来建立一个简单的线性模型来预测标签的值。我们在最后计算了均方差来评估预测值与实际值的吻合度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

from pyspark.mllib.regression import LinearRegressionWithSGD
from numpy import array
# Load and parse the data
data = sc.textFile("mllib/data/ridge-data/lpsa.data")
parsedData = data.map(lambda line: array([float(x) for x in line.replace(',', ' ').split(' ')]))
# Build the model
model = LinearRegressionWithSGD.train(parsedData)
# Evaluate the model on training data
valuesAndPreds = parsedData.map(lambda point: (point.item(0),
model.predict(point.take(range(1, point.size)))))
MSE = valuesAndPreds.map(lambda (v, p): (v - p)**2).reduce(lambda x, y: x + y)/valuesAndPreds.count()
print("Mean Squared Error = " + str(MSE))

类似的,你也可以使用 RidgeRegressionWithSGD 和 LassoWithSGD 这两个算法,并比较这些算法在训练集上的均方差。

聚类

在下面的例子中,在载入和解析数据之后,我们使用 KMeans对象来将数据聚类到两个类簇当中。所需的类簇个数被传递到算法中。然后我们将计算集内均方差总和(WSSSE). 你可以通过增加类簇的个数 k来减小误差。 实际上,最优的类簇数通常是 1,因为这一点通常是WSSSE图中的”低谷点”。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

from pyspark.mllib.clustering import KMeans
from numpy import array
from math import sqrt
# Load and parse the data
data = sc.textFile("kmeans_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))
# Build the model (cluster the data)
clusters = KMeans.train(parsedData, 2, maxIterations=10,
runs=30, initialization_mode="random")
# Evaluate clustering by computing Within Set Sum of Squared Errors
def error(point):
center = clusters.centers[clusters.predict(point)]
return sqrt(sum([x**2 for x in (point - center)]))
WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))

协同过滤

在下面的例子中,我们导入的训练集中,数据每一行由一个用户,一个商品和相应的评分组成。假设评分是显性的,在这种情况下我们使用默认的>ALS.train()方法。我们通过计算预测出的评分的均方差来评估这个推荐模型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

from pyspark.mllib.recommendation import ALS
from numpy import array
# Load and parse the data
data = sc.textFile("mllib/data/als/test.data")
ratings = data.map(lambda line: array([float(x) for x in line.split(',')]))
# Build the recommendation model using Alternating Least Squares
model = ALS.train(ratings, 1, 20)
# Evaluate the model on training data
testdata = ratings.map(lambda p: (int(p[0]), int(p[1])))
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).reduce(lambda x, y: x + y)/ratesAndPreds.count()
print("Mean Squared Error = " + str(MSE))

如果这个评分矩阵是通过其他的信息来源(如从其他的信号中提取出来的)所获得,你也可以使用trainImplicit的方法来得到更好的结果。

1
2

# Build the recommendation model using Alternating Least Squares based on implicit ratings
model = ALS.trainImplicit(ratings, 1, 20)

Posted in 机器学习, 算法. Tagged with spark.

Spark 0.9.1 MLLib 机器学习库简介相关推荐

  1. 【Hadoop Summit Tokyo 2016】Hivemall: Apache Hive/Spark/Pig 的可扩展机器学习库

    本讲义出自 Makoto YUI与NTT Takashi Yamamuro在Hadoop Summit Tokyo 2016上的演讲,主要介绍了Hivemall的相关知识以及Hivemall在Spar ...

  2. MLlib机器学习库

    MLlib代表机器学习库. MLlib 数据准备:特征提取.变换.选择.分类特征的散列和一些自然语言处理方法 机器学习算法:实现了一些流行和高级的回归.分类和聚类算法 使用程序:统计方法,如描述性统计 ...

  3. mllib调参 spark_《Spark 官方文档》机器学习库(MLlib)指南

    我们推荐您使用spark.ml,因为基于DataFrames的API更加的通用而且灵活.不过我们也会继续支持spark.mllib包.用户可以放心使用,spark.mllib还会持续地增加新的功能.不 ...

  4. Spark机器学习库(MLlib)指南

    spark-1.6.1 机器学习库(MLlib)指南 MLlib是Spark的机器学习(ML)库.旨在简化机器学习的工程实践工作,并方便扩展到更大规模.MLlib由一些通用的学习算法和工具组成,包括分 ...

  5. 【Spark】实验6 Spark机器学习库MLlib编程实践

    Spark机器学习库MLlib编程实践 一.实验目的 通过实验掌握基本的MLLib编程方法: 掌握用MLLib解决一些常见的数据分析问题,包括数据导入.成分分析和分类和预测等. 二.实验平台 新工科智 ...

  6. Spark 机器学习库【MLlib】编程指南

    一.机器学习库 MLlib是Spark的机器学习库[ML].其目标是使实用的机器学习算法变得可扩展且容易使用.在较高级别,它提供了以下工具: 机器学习算法:常见的机器学习算法,例如分类,回归,聚类和协 ...

  7. Apache Spark 2.0预览: 机器学习模型持久化

    在即将发布的Apache Spark 2.0中将会提供机器学习模型持久化能力.机器学习模型持久化(机器学习模型的保存和加载)使得以下三类机器学习场景变得容易: \\ 数据科学家开发ML模型并移交给工程 ...

  8. Spark MLlib 机器学习

    本章导读 机器学习(machine learning, ML)是一门涉及概率论.统计学.逼近论.凸分析.算法复杂度理论等多领域的交叉学科.ML专注于研究计算机模拟或实现人类的学习行为,以获取新知识.新 ...

  9. Spark入门之九:机器学习简介

    机器学习概念 在维基百科上对机器学习提出以下几种定义: " 机器学习是一门人工智能的科学,该领域的主要研究对象是人工智能,特别是如何在经验学习中改善具体算法的性能 " . &quo ...

  10. Spark MLlib机器学习 | 算法综合实战(一)(史上最详细)

    ==========                         ========= 8.1.1 什么是机器学习 机器学习可以看做是一门人工智能的科学,该领域的主要研究对象是人工智能.机器学习利用 ...

最新文章

  1. 结合实例与代码谈数字图像处理都研究什么?
  2. rrdtool的完整例子
  3. window下利用navicat访问Linux下的mariadb数据库
  4. 如何对付费广告流量进行标记?
  5. 怎样cp文件夹时忽略指定的文件夹和文件
  6. c malloc 头文件_干货笔记 | C/C++笔试面试详细总结(二)
  7. C++ ActiveX开发的问题讨论
  8. java数据类型后缀_java基础知识---基本数据类型
  9. php正则如何使用 1,PHP正则表达式使用详解(1)
  10. 华为手机助手上架流程_2019年各大安卓应用商店上架经验,含流程,物料,方法,建议收藏...
  11. vue项目 拷到别的电脑应该怎吗再次重新运行
  12. js获取ck_JS获取CkEditor在线编辑的内容
  13. 用“企业架构”方法指导信息化规划
  14. K-均值聚类算法通俗讲解
  15. 《左耳听风》-ARTS-打卡记录-第十一周
  16. 交换机组合超级计算机,图解:世界上最快的超级计算机Roadrunner
  17. 股票实战--线性回归
  18. 条形码录入测试软件,条码管理:商品条码录入
  19. Testbench编写指南(2)文件的读写操作
  20. Dubbo 实现原理与源码解析系列 —— 精品合集

热门文章

  1. [LeetCode]题解(python):067-Add Binary
  2. Daily Scrum 10.23
  3. 【职场攻略】比你的工资更重要的十件事
  4. linux查看db2表空间大小,db2怎么列出当前数据库下全部表占用空间的大小
  5. wordpress常用插件
  6. ECharts项目小结~
  7. 数据库习题(填空题五)
  8. 计算机学院指导报告,重庆大学计算机学院论文指导讲座圆满结束
  9. python实现音乐播放器_【原创源码】用Python来实现一个简易的MP3播放器(采用酷我接口,包含接口分析)...
  10. out.print 嵌套html代码_代码规范之前端编写码规范