一、高级分析和机器学习概览

1. 高级分析是指各种旨在发现数据规律,或根据数据做出预测和推荐等核心问题的技术。机器学习最佳的模型结构要根据要执行的任务制定,最常见的任务包括:

(1)监督学习,包括分类和回归,其目标是根据数据项的各种特征预测每个数据项的标签。

(2)推荐系统,根据行为向用户推荐产品。

(3)无监督学习,包括聚类,异常检测,以及主题建模,其目的是发现数据中的结构。

(4)图分析任务,如发现社交网络中的模式。

监督学习可能是最常见的机器学习任务,它的目标很简单:使用有标签的历史数据(通常被称为因变量)来训练模型,基于该模型和新数据点的各特征来预测该数据点的标签。一个例子是根据一个人的年龄(特征)预测他的收入(因变量)。它的训练过程一般是通过梯度下降这种优化算法实现的,训练算法从一个初始基本模型开始,并且在每次迭代期间会调整模型的各参数来逐渐提升模型准确度。

这一过程的结果是一个训练好的模型,可以利用它来对新数据进行预测。在训练和做出预测的过程中需要完成很多不同的任务,如在使用模型之前需要测试训练好的模型的效果,其基本原理很简单:基于历史数据进行训练,确保它在未训练过的数据上的泛化性,然后在新数据上预测

分类是监督学习的一种常见任务,它是训练一个算法来预测因变量的类别(属于离散的,有限的一组值)。最常见的情况是二元分类,模型预测一个给定的项属于两组中的哪一个。典型的例子是辨别垃圾邮件。使用一组已经确定为垃圾邮件或非垃圾邮件的历史电子邮件,训练一个模型来分析这些邮件其中的单词和各种属性,并对它们进行预测。一旦模型的性能让人满意,就使用该模型来对模型从未见过的邮件进行预测,自动判别它是否是垃圾邮件。

当分类的项多于两类时,称其为多分类问题。例如有四种不同类型的电子邮件(而不再是之前提到的两个类别):垃圾邮件,私人邮件,工作相关邮件和其他。当然还有很多分类任务的应用场景,包括:

(1)预测疾病。医生和医院可能有患者的行为和生理特征的历史数据集,他们可以用这份历史数据集来训练模型(在应用之前需要评估它的准确度和伦理问题),然后利用它来预测患者是否有心脏疾病。这可以是个二分类问题(健康或不健康),也可以是一个多分类问题(健康的心脏,或不同的疾病之一)。

(2)图像分类。有许多来自像苹果、谷歌、Facebook公司的应用,它们通过用户过往上传的人物图片训练出的一个分类模型,来预测给定一张图片中的人物。另一种常见应用场景是图片的分类或标注图片中的对象。

(3)预测客户流失。预测哪些客户可能不再使用这项服务,可以基于已经流失和未流失的客户数据集,训练出一个二元分类模型,并用它来预测当前客户是否有可能会流失。

(4)公司经常想预测网站的访问者会不会购买某产品。为了实现这种预测,它们可能会用到用户浏览模式或者用户位置等特征。

在分类问题中因变量是一系列离散的值,而在回归问题中要预测连续的变量(实数)。最简单的情况是想预测出某一个实数值而非一个类别,其余的过程大致相同,这就是为什么它们两个都是监督学习,要基于历史数据来预测从未见过的数据。回归有很多典型的应用场景:

(1)销售预测。商店想利用历史销售数据来预测商品销售情况。销售情况可能会和很多因素有关,也就是说可能会基于很多输入变量,但是一个简单的例子就是基于上周的销售数据来预测下周的数据。

(2)身高预测。根据父母的身高,要预测他们孩子的身高。

(3)预测节目的观众人数。像Netflix这样的流媒体公司要预测会有多少用户观看某一个节目。

2. 推荐系统是高级分析最直观的应用之一,通过研究用户对多种商品的显式偏好(通过评级)或隐式偏好(通过观察到的行为),基于用户之间的相似性或商品之间的相似性来推荐给用户他们可能喜欢的商品。通过查看这些相似性,推荐系统可以把相似用户喜欢的商品推荐给他,或者把相似用户已经购买的商品推荐给他。推荐系统是Spark任务中很常见的应用场景,并且Spark非常适合处理大数据推荐。下面是一些推荐系统的应用场景:

(1)电影推荐。Netflix虽然不一定使用Spark来处理大数据,但Netflix确实要为其用户做大规模推荐,它通过研究用户在Netflix上看过和不看什么电影的观影记录来做预测。此外,Netflix公司有可能考虑到用户之间打分模式的相似性。

(2)产品推荐。亚马逊将产品推荐作为提高销售额的主要手段之一。例如,根据购物车中的物品,亚马逊推荐与曾经加入过购物车的类似的其他物品。同样,每一个产品页面上,亚马逊展示出由其他用户购买的同类产品。

无监督学习是试图在一组给定的数据中寻找模式或发现隐层结构的方法。这不同于监督学习,因为没有因变量(标签)来做预测。一些使用无监督学习的应用场景包括:

(1)异常检测。鉴于一些常规事件经常发生,用户可能希望当非常规事件发生时给出预警。例如,安全管理人员可能希望当路上出现一个奇怪物体(损坏车辆,溜冰鞋或骑自行车的人)的时候收到通知。
(2)用户分类。给定一组用户的行为数据,可能要更好地了解某些用户与其他用户共享哪些特性。例如,游戏公司可能基于像在某游戏中花费的时间来聚类用户。该模型可能揭示休闲玩家与铁杆玩家完全不同的行为模式,并根据这种差异性,给每个玩家提供不同的建议或奖励。

(3)主题建模。给定一组文件,可以分析其中所含的词组来看看它们之间是否有某种潜在关系。例如,提供一些关于数据分析的网页,主题建模算法可以基于一个主题中比较常见的词,将这些网页标记成机器学习主题、SQL主题、流处理主题的页面等等。

虽然相比于分类和回归,图分析不是很常用,但它是一个强大的工具。从根本上讲,图分析是在研究给定顶点(对象)和边(表示这些对象之间的关系)的结构。例如,顶点可能代表了人与产品,边可能代表购买行为。通过观察顶点和边的属性,可以更好地理解它们和图形整体结构之间的关联。由于图表达的是关系,任何可以抽象成关系的数据都可以作为图分析的一个应用场景,这包括:

(1)欺诈预测。Capital One公司采用Spark的图分析功能更好的了解欺诈网络,通过使用历史欺诈信息(如电话号码,地址或名字),他们发现欺诈信贷的请求或交易。举例来说,一个欺诈电话号码的两跳范围内(电话拨出方和接听方构成一个边连接,从某电话号码A拨出给B,B再拨出给C,则B和C都处于该电话号码的两跳范围内)的任何用户帐户可能会被认为是可疑的。

(2)异常检测。通过观察个体之间的网络连接方式,可以标记出异常值,以便进行手动分析。例如,如果在数据中通常每个顶点都有10条边,而给定的一个顶点只有一条边相连,这可能是值得研究的奇怪现象。

(3)分类。给定一个网络中顶点的属性信息,就可以根据其他点与该点的连接情况来对其他点进行分类。例如,如果某个人被标记为社会网络中的一个有影响力者,可以将其他具有类似网络结构的人归类为有影响力者。

(4)推荐。谷歌的原始网页推荐算法PageRank是以分析网站关系来对网页重要性排名的图算法。例如,有很多链接的网页比没有链接的网页更重要。

3. 高级分析机器学习过程的流程步骤如下所示:

(1)搜集与预测任务相关的数据。

(2)清理和检查数据以更好地理解它。

(3)执行特征工程以使数据以适合的形式为算法使用(例如,将数据转换为数值向量)。

(4)使用该数据的一部分作为训练集训练一个或多个模型,生成一些候选模型。

(5)利用从未被用作训练的数据子集来实际客观地评价结果,从而评估比较模型的效果,这可以更好地了解模型到底怎么样。

(6)利用上述过程的结果和使用模型进行预测、检测异常、或解决更通用的业务难题。

收集合适的数据后将需要清理和检查它,这通常是所谓探索性数据分析或EDA的一部分。EDA一般是指采用交互式查询和可视化的方法,以便更好地了解数据的分布、数据相关性和其他细节。在这个过程中,可能会发现需要删除一些未标注的数据或者可能错误标注的数据。结构化API中的许多Spark函数都提供一种简单的方式来清洗和报告数据。

收集和清理数据集之后,需要将它转换成适合于机器学习算法一种形式,这通常是数值特征。特征工程可以造就也可以毁掉一个机器学习应用,所以这是要认真对待的一个步骤。特征工程的过程包括各种任务,诸如正则化数据,增加变量来表示其它变量的相互作用,操纵类别变量,并将它们转换为适当的格式以输入到我们的机器学习模型。在MLlib中,所有的变量通常必须作为浮点型向量输入(不管他们实际上代表什么)。

现在拥有了历史信息的数据集(例如,垃圾邮件或不是垃圾邮件),也有一个具体的任务(例如,分类垃圾邮件)。下一步将要训练一个模型来根据输入预测正确的输出。在训练过程中,模型的内部参数将根据模型对输入数据的分类效果发生变化。举例来说,要检测垃圾邮件,算法可能会发现某些词的出现更能用来识别垃圾邮件,因此模型中与这些词相关的权重会更高。最后,训练之后的模型发现,某些单词在分类邮件方面比其他词有更大的影响力(因为它们与垃圾邮件有一些相关性)。

训练过程的输出就是所说的模型,然后模型可被用于理解数据或做未来的预测。为了做出预测,需要给模型输入数据,它会基于对这些输入数据的数学运算来产生输出。以分类为例,给定邮件的属性,它会通过比较训练过的历史垃圾邮件和非垃圾邮件来预测该邮件是否为垃圾邮件。当然,应该将数据集分割成多个部分,并且只使用其中一部分进行训练,这是在机器学习过程中的一个重要步骤,因为当建立一个高级分析模型,需要能泛化到它以前没有见过的数据。

将数据集分成多个部分,能够客观地对其中一部分数据(一组未用来训练模型的数据)对训练后模型的有效性进行测试,目标是看模型是否了解有关此数据的本质,或者它是否只注意到特定于训练集的内容(又称为过拟合),这就是它为什么把被称为测试集。在训练模型的过程中,还可能采取另一个单独的数据子集,并将其视为另一种类型的测试集(称为验证集),以便尝试不同的超参数(影响训练过程的参数),并在没有过拟合到测试集的情况下比较相同模型的不同变化

配置适当的训练集、验证集和测试集对成功运用机器学习解决问题是非常重要的,如果不正确地分割这些数据集,那么就很容易导致过拟合,即模型对新数据的泛化能力弱

4. Mllib是基于Spark的一个软件包,它提供各种API接口用于收集和清理数据、特征工程和特征选择、训练和微调大型有监督和无监督机器学习模型、并在生产中使用这些模型。MLlib实际上由两个利用不同核心数据结构的包组成:

(1)org.apache.spark.ml软件包使用DataFrame的接口,还提供了用于构建机器学习流程的高层次接口,它将有助于标准化执行上述步骤。

(2)较低级别的软件包org.apache.spark.mllib,包括Spark低级别的RDD API接口。它处于维护模式,也就是说处于更新bug状态而不会再有新功能。

基于单机有许多用于执行机器学习任务的工具,虽然有几个很好的工具(如TensorFlow)可供选择,但这些基于单机的工具要么无法训练海量数据,或者处理时间太长。这意味着单机工具通常是MLlib的补充,当涉及特别需要扩展性的问题时,就要利用Spark的能力。利用Spark的可扩展能力,有两个关键的应用场景:

(1)希望利用Spark进行数据预处理和特征生成,以减少从大量数据中生成训练和测试集所需的时间,然后再利用单机机器学习库对这些给定的数据集进行训练

(2)当输入数据或模型变得太难或不方便在单机上处理时,可以使用Spark。

一个重要提示是,虽然利用Spark可以使训练和数据预处理工作变得很简单,但仍需要牢记一些复杂性,尤其是在部署训练好的模型时。例如,Spark并没有提供内置的方式来支持模型中的低延迟预测,因此可能希望将处理好的模型导出到另一个系统或自定义程序来做到这一点,MLlib通常支持导出模型到其他工具。

5. 在MLlib有几个基本的“结构”类型:转换器(transformer),估计器(estimator),评估器(evaluator),和流水线(pipeline。通过结构化的表示,在定义端到端的机器学习流水线时会考虑这些类型,它们将提供共同的语言来定义哪些属于流水线的哪一部分。下图说明了使用Spark开发机器学习模型时需遵循的总体开发流程:

转换器(transformer)是将原始数据以某种方式进行转换的函数。它可能会(从另外两个变量)创建一个新的变量,对某一列进行归一化,或仅仅将一个Integer类型值变为Double类型值输入进模型。转换器的一个例子是将字符串类型变量转换为MLlib能够使用的数值型值,它主要用于数据预处理和特征工程阶段,以DataFrame作为输入并生成一个新的DataFrame作为输出,如下图所示:

估计器(Estimator)可能用来做两件事其中之一。第一,估计器可以作为数据初始化的转换器,例如为了对数值数据进行归一化,需要基于某列中的当前值来初始化转换器,这需要传递两次数据,第一次传递生成初始化值,第二次实际在数据上应用生成函数。第二,基于数据训练模型的算法也称为估计器。

评估器(Evaluator)允许根据某种效果评价指标(如受试者工作特征曲线-ROC曲线)来评价给定模型的表现如何。在使用评估器从测试的模型中选择最佳模型之后,就可以使用该模型进行预测。从较高的层次上,可以一个个地指定转换器、估计器和评估器,但是也可以把步骤指定为pipeline中的stage,这种流水线类似于scikit-learn中的流水线概念。

除了构建pipeline的结构类型外,还有几种低级别的数据类型可能在MLlib 中使用,Vector是最常见的。每当将一组特征数据传递到机器学习模型中时,必须将其组织成Double类型的向量,此向量可以是稀疏的(其中大多数元素为零)也可以是稠密的(其中有许多非重复值)。向量是以不同的方式创建的,要创建稠密向量需要指定数组中的所有值,要创建稀疏向量可以指定向量的大小、索引和非零元素的值。稀疏向量是大多数值为零时情况下的最好表达形式,因为这是一种更易压缩的表示形式。下面是如何手动创建向量的示例:

from pyspark.ml.linalg import Vectors
denseVec = Vectors.dense(1.0, 2.0, 3.0)
size = 3
idx = [1, 2] # locations of non-zero elements in vector
values = [2.0, 3.0]
sparseVec = Vectors.sparse(size, idx, values)

6. 前面已经概述了可能会遇到的核心概念,下面就来创建一个简单的pipeline来演示每个组件部分,这里将使用一个规模较小的合成数据集,在进一步讨论之前先读取数据:

#in Python
df = spark.read.json("/data/simple-ml")
df.orderBy("value2").show()

这是数据样本:

+-----+----+------+------------------+
|color| lab|value1| value2|
+-----+----+------+------------------+
|green|good| 1|14。386294994851129|
。。。
| red| bad| 16|14。386294994851129|
|green|good| 12|14。386294994851129|
+-----+----+------+------------------+

此数据集包含以下标签:好或坏的分类lab、代表颜色的分类color,和两个数值变量value1和value2。虽然数据是合成的,但可以想象成这个数据集代表公司客户的健康状况,其中color列代表由客服代表所做的某种类别的健康评级,lab列代表真正的客户健康状况,另外两个值是一些数值型的行为度量(例如在线时间和花费开销)。假设想要训练一个分类模型,希望根据这些值预测一个二元变量(标签)。

除了JSON,还有一些特定的数据格式用于监督学习,包括LIBSVM。这些格式具有真正的值标签和稀疏输入数据,Spark可以使用其DataSource API读取和写入这些格式。下面是一个如何使用DataSource API的例子:

spark.read.format("libsvm").load("/data/sample_libsvm_data.txt")

7. 如前所述,转换器帮助以某种方式操纵当前的列数据,操纵这些列通常是为了构建特征(会将它们输入到模型中)。转换器可以用来减少特征的数量、添加更多特征、操作当前的列、或简单地帮助纠正数据格式,转换器会向DataFrame 添加新列。当使用Mllib时,所有Spark机器学习算法的输入是由Double类型(表示标签)和Vector[Double]类型(表示特征)组成。当前的例子数据不符合这一要求,因此需要将其转换为正确的格式。

要在上面例子做到这一点,要指定一个Rformula,它是指定机器学习转换的声明式语言,用来根据数据集自动生成特征(feature)和标签(label),RFormula支持R语言运算符的一个有限子集,它对于实际应用中的简单模型和操作有很好的支持。基本RFormula操作符是:

(1)~:目标(标签)和项(特征)的分隔符号。

(2)+:合并项,“+ 0”表示删除空格。

(3)-:删除项,“ -1”表示删除空格,和“+ 0”起相同作用。

(4)::交互(数值乘法,或类别二值化)。

(5).:除了目标列的全部列。

为了应用具有此语法的转换,需要导入相关的类,然后需要定义公式。在该例子中,希望使用所有列(.符号),在value1和color列之间添加交互,在value2和color列之间之间添加交互,将这些特征视为新特性:

from pyspark.ml.feature import RFormula
supervised = RFormula(formula="lab ~ . + color: value1 + color: value2")

此时以声明方式指定了如何将数据更改为模型训练要使用的格式。接下来的步骤是,将RFormula转换器应用到数据上,让它发现每个列的可能值。不是所有的转换器都有这个要求,但因为RFormula会自动处理分类变量,它需要确定哪些列是用于分类的、哪些不是、还有分类的列的具体值,出于这个原因要调用fit方法。之后,它返回“训练好”的转换器,可以使用它来转换数据。现在说明了这些细节所以继续准备DataFrame:

# in Python
fittedRF = supervised.fit(df)
preparedDF = fittedRF.transform(df)
preparedDF.show()

这是训练和转换过程的输出:

+-----+----+------+------------------+--------------------+-----+
|color| lab|value1| value2| features|label |
+-----+----+------+------------------+--------------------+-----+
|green|good| 1|14。386294994851129|(10,[1,2,3,5,8],[…1。0|
…| red| bad| 2|14。386294994851129|(10,[0,2,3,4,7],[…0。0|
+-----+----+------+------------------+--------------------+-----+

在输出中可以看到转换结果,就是名为features的列,它包含以前的原始数据。后面发生的事情其实很简单,RFormula在调用fit函数时检查数据,并输出一个根据指定公式转换数据的对象(称为RformulaModel),这个“训练好”的转换器在类型签名中始终称为Model。当使用这个转换器时,Spark会自动将分类变量转换为Double类型,这样就可以将它输入到一个(尚未指定的)机器学习模型中,特别是它为每个可能的color类别分配一个数值,为color和value1/value2 之间的交互变量创建特征,并将它们全部放到一个向量中。然后调用该对象上的transform函数,以便将输入数据转换为想要的输出数据。

至此预处理了数据,并添加了一些特征,现在是时候在这个数据集上实际训练模型了。为了做到这一点,首先需要准备一个测试集进行评估。有一个好的测试集可能是最重要的事情,能确保训练的模型以一种可靠的方式在实际环境中使用。创建不具有代表性的测试集,或不使用测试集进行超参数优化,一定会导致模型在实际场景下的应用时表现欠佳。现在创建一个基于随机拆分的简单测试集:

# in Python
train,test = preparedDF.randomSplit([0.7,0.3])

8. 现在已经将数据转换为正确的格式,也创建了有用的特征,终于到了使用模型的时候了。在本例子中将使用一种称为逻辑回归(logistic regression)的分类算法,创建一个LogisticRegression的实例,使用默认配置和超参数,然后设置标签列和特征列。这里设置的label和features列,实际上是Spark MLlib估计器默认使用的标签,在后面的内容里会省略它们:

from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol="label",featuresCol="features")

实际去训练这个模型之前,先检查一下参数,这也是提示每个特定模型所有可用参数选项的好方法:

# in Python
print lr.explainParams()

虽然输出太大无法在此处重现,但它显示了对Spark实现逻辑回归的所有参数的解释,explainParams方法在MLlib可用的所有算法中都有实现。在实例化未经过训练的算法后,该把数据输入进去了,在本例中它将返回LogisticRegressionModel:

# in Python
fittedLR = lr.fit(train)

这段代码将启动一个Spark job来训练模型。与以前的普通transformation懒执行过程相反,机器学习模型的拟合是即刻(eager)实现的。完成后可以使用模型进行预测,从逻辑上说这就是从特征到标签的转换,可以使用transform()方法进行预测,例如可以transform训练集,看看模型对训练数据会输出什么标签,并将其与真实标签进行比较,这又是可以操纵的另一个DataFrame,用下面的代码段来执行该预测:

fittedLR.transform(train).select("label", "prediction").show()

结果是:

+-----+----------+
|label|prediction|
+-----+----------+
| 0。0| 0。0|
…
| 0。0| 0。0|
+-----+----------+

下一步将是手动评估此模型,并计算性能指标,如真正率(true positive rate)、假负率(false negative rate)等。然后,可以尝试一组不同的参数以查看这些性能指标是否更好, Spark可以帮助避免手动尝试不同的模型和性能指标,它允许将Spark job指定为包含所有转换的声明式流水线,以及允许调整超参数。

超参数(Hyperparameter是影响训练过程的配置参数,诸如模型结构和正则化(regularization),它们在训练开始之前被设置,例如逻辑回归有一个超参数(正则化参数),它决定了数据在训练阶段执行什么程度的正则化(正则化是一种避免模型过拟合的技术)。后面将会看到,以不同的超参数值(例如不同的正则化参数值)来启动pipeline job,以便比较同一模型不同版本的效果。

9.如果执行大量的转换,则编写所有步骤并跟踪DataFrame的过程最终会非常繁琐,这就是Spark包含Pipeline概念的原因所在。pipeline允许设置相关转换的数据流,以估计器结束,估计器可以根据用户指定自动调整,最后得到一个优化后的模型,即可以直接使用,如下图所示:

请注意,转换器或模型的实例在不同pipeline上不会被重用,在创建另一个pipeline之前,始终要创建模型的新实例。为了确保不会过拟合,这里将创建一个测试集并根据验证集调整超参数(注意这里基于原始数据集创建此验证集,而不是前面提到的preparedDF):

# in Python
train, test = df.randomSplit([0.7, 0.3])

配置完成后,需要创建pipeline中的基本stage,一个stage仅仅代表转换器或估计器。在该例子中将有两个估计器,创建RFomula将首先分析数据以了解输入特征的类型,然后转换它们以创建新的特征,随后创建LogisticRegression对象,它是通过训练来生成模型的算法:

# in Python
rForm = RFormula()
lr = LogisticRegression().setLabelCol("label").setFeaturesCol("features")

现在,不是去手动使用转换然后调整模型,而是只需在整个pipeline的stage创建它们,如下面的代码片段所示:

from pyspark.ml import Pipeline
stages = [rForm, lr]
pipeline = Pipeline().setStages(stages)

10. 现在已经设定好了逻辑pipeline,下一步是训练。在这个例子中不只训练一个模型,将通过指定不同的超参数组合来让Spark训练多个不同的模型。然后将使用评估器选择最佳模型,并将它作出的预测与验证数据进行比较。可以测试整个pipeline上各种不同的超参数组合,甚至当使用RFormula操作原始数据的时候也是如此。下面代码演示了如何执行此操作:

from pyspark.ml.tuning import ParamGridBuilder
params = ParamGridBuilder()\.addGrid(rForm.formula, ["lab ~ . + color:value1","lab ~ . + color:value1 + color:value2"])\.addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\.addGrid(lr.regParam, [0.1, 2.0])\.build()

在当前的参数设置中,有三个超参数与默认值不同:

(1)两个不同版本的Rformula。

(2)对于ElasticNet参数有三个不同的选择。

(3)对于正则化参数有两种不同的选择。

这给了总共12种不同的参数组合,这意味着将训练12个不同版本的逻辑回归模型。现在已建立了各种参数组合,是指定评估过程的时候了。评估器将自动和客观地把多个模型基于同一评估指标进行比较,在这个例子中使用BinaryClassificationEvaluator,它有许多潜在的评估指标,并将使用areaUnderROC,它是受试者工作特征曲线的总面积,这是评估分类任务效果的常用评估指标

from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()\.setMetricName("areaUnderROC")\.setRawPredictionCol("prediction")\.setLabelCol("label")

现在有了指定数据应该如何转换的pipeline,将对逻辑回归模型执行模型选择,尝试不同的超参数,利用areaUnderROC指标比较性能来评估其效果。机器学习中在验证集(而非测试集)上对比超参数效果可以很好地避免过拟合问题,出于这个原因不用之前创建的测试集来调整这些参数。Spark提供了自动调整超参数的两种选择,可以使用TrainValidationSplit简单地将数据任意随机分成两个不同的组,或使用CrossValidator通过将数据集分割成不重叠的、随机分配的k个折叠来执行K-折叠交叉验证(K-fold cross-validation):

from pyspark.ml.tuning import TrainValidationSplit
tvs = TrainValidationSplit()\.setTrainRatio(0.75)\.setEstimatorParamMaps(params)\.setEstimator(pipeline)\.setEvaluator(evaluator)

下面运行构建的pipeline,运行该pipeline将对每一个模型版本都进行测试。注意tvsFitted的类型是TrainValidationSplitModel,当拟合一个模型会输出一个“model”类型:

# in Python
tvsFitted = tvs.fit(train)

可以在测试集上评估它的效果:

evaluator.evaluate(tvsFitted.transform(test)) // 0.9166666666666667

也可以查看模型的训练信息总结。要做到这一点要从pipeline中提取它,将它转换为适当的类型并打印结果。下面是查看结果的方法:

// in Scala
import org.apache.spark.ml.PipelineModel
import org.apache.spark.ml.classification.LogisticRegressionModel
val trainedPipeline = tvsFitted.bestModel.asInstanceOf[PipelineModel]
val TrainedLR = trainedPipeline.stages(1).asInstanceOf[LogisticRegressionModel]
val summaryLR = TrainedLR.summary
summaryLR.objectiveHistory // 0.6751425885789243,0.5543659647777687,0.473776...

此处显示了算法在训练中每次迭代的训练效果。可以观察到算法正逼近最佳模型,通常是在开始时评价指标会有较大的变化,但随着时间的推移,这些值应该变得越来越小。现在训练好了这个模型,可以将它保存到磁盘上,以便在以后的预测中使用它:

tvsFitted.write.overwrite().save("/tmp/modelLocation")

在写出模型之后,可以将其加载到另一个Spark程序中进行预测,为此需要使用特定算法的“model”版本来从磁盘上来加载保存的模型。例如如果使用CrossValidator,则必须在持久化版本中读取CrossValidatorModel,如果要手动使用LogisticRegression,则必须使用LogisticRegressionModel。在当前例子中使用TrainValidationSplit,它输出TrainValidationSplitModel:

// in Scala
import org.apache.spark.ml.tuning.TrainValidationSplitModel
val model = TrainValidationSplitModel.load("/tmp/modelLocation")
model.transform(test)

11. 在Spark中有几种不同的部署模式,用于实际应用机器学习模型,下图展示了常见的工作流程:

(1)离线训练机器学习(ML) 模型,然后向其提供离线数据。离线数据指的是存储下来用于分析的数据,而不是需要快速得到应答的实时数据。Spark很适合这种部署方式。

(2)离线训练模型,然后把训练结果放到一个数据库(通常是一个KV存储)中。这非常适用于推荐系统,但不适用于像分类或回归这样的应用,因为这些应用不是为某一用户查询返回一个值,而是必须基于输入计算输出值。

(3)离线训练模型,持久化模型到磁盘上用于之后提供服务。如果使用Spark作为服务模块,这将不是一个低延迟的解决方案,因为Spark job启动很耗时,即使不是在集群上运行。此外它不能很好的并行化,所以很可能不得不使用负载均衡器,集成多个模型副本提供服务,并集成一些自己的REST API。这个问题有一些有趣的潜在解决方案,但当前还没有标准化的方法。

(4)手动(或通过其他一些软件)将分布式模型转换成可以在一台机器上更快运行的模型。当不对Spark中的原始数据做很多操作时这种方法很不错,但可能很难维护和更新。当前也有几种解决方案,例如MLlib可以将某些模型导出为PMML(一种通用的模型交换格式)文件。

(5)在线训练算法并在线使用。可以和Structured Streaming一起使用,但是对于某些模型来说会很复杂。

二、预处理和特征工程

12. 每一个数据科学家都知道数据分析中最大的挑战(以及最耗时的工作)就是数据预处理,这并不是说它是一个特别复杂的编程过程,而是它需要特别了解对应数据,并了解对应模型需要什么才能利用这些数据。下面的列表说明了MLlib中每个高级分析任务的输入数据结构要求

(1)在大多数分类和回归算法中,用Double类型的一列数据代表标签,Vector类型(可能稠密也可能稀疏)的一列数据代表特征。

(2)在推荐系统中,数据由一个用户列、一个项目(观看的电影或购买的书)列、一个评分列组成。

(3)在无监督学习时,需要使用Vector类型(可能稠密也可能稀疏)的一列数据表示特征。

(4)图分析中,需要DataFrame数据类型表示顶点和边。

以这些格式表示数据的最好方式是使用转换器(Transformer)。转换器是以DataFrame作为输入输出的函数。处理之前需要从几个不同的采样数据集中读取数据,其中的每一个都有后面例子中需要处理的属性:

# in Python
sales = spark.read.format("csv")\.option("header", "true")\.option("inferSchema", "true")\.load("/data/retail-data/by-day/*.csv")\.coalesce(5)\.where("Description IS NOT NULL")
fakeIntDF = spark.read.parquet("/data/simple-ml-integers")
simpleDF = spark.read.json("/data/simple-ml")
scaleDF = spark.read.parquet("/data/simple-ml-scaling")

除了真实的销售数据之外,还将使用几个简单的合成数据集,FakeIntDF,simpleDF和scaleDF的行数都很少,这将有助于专注于正在执行的具体数据操作,而不是数据集之间的差异性。因为得多次使用这些销售数据,所以将对其进行缓存,以便可以从内存中高效地读取它,而不是每次需要时再从磁盘上读取它:

sales.cache()
sales.show()+---------+---------+--------------------+--------+-------------------+---------
|InvoiceNo|StockCode| Description|Quantity| InvoiceDate|UnitPr..
---+--------- +---------+---------+--------------------+--------+----------------
| 580538| 23084| RABBIT NIGHT LIGHT| 48|2011-12-05 08:38:00| 1……
| 580539| 22375|AIRLINE BAG VINTA...| 4|2011-12-05 08:39:00| 4…
+---------+---------+--------------------+--------+-------------------+---------

值得注意的是,这里过滤掉了null 值。当前MLlib并不总是能很好地处理null值,这也是常见的报错的原因,也是需要在调试时首先考虑的

13. 转换器(Transformer)是以某种方式转换原始数据的函数,这可能是从两个其他变量创建一个新的交互变量、规范化一列、或简单地将其转换为Double类型以便输入到模型中。转换器主要用于数据预处理或特征生成。Spark的转换器只包括一种transform()方法,这是因为它不会根据输入数据进行更改。下图是一个简单的示例,左侧是一个要被转换的输入DataFrame ,右侧是输出DataFrame,它增加了一个代表转换输出的新列:

分词器(Tokenizer)是转换器的一个例子,它根据给定的字符对一个字符串进行分词,这不会从数据中学习到任何东西,它只是应用了一个函数。下面是一个代码片断,它显示了如何构建一个分词器来处理输入列,如何转换数据,然后从该转换中输出:

// in Scala
import org.apache.spark.ml.feature.Tokenizer
val tkn = new Tokenizer().setInputCol("Description")
tkn.transform(sales.select("Description")).show(false)+-----------------------------------+------------------------------------------+
|Description |tok_7de4dfc81ab7__output |
+-----------------------------------+------------------------------------------+
|RABBIT NIGHT LIGHT |[rabbit,night,light] |
|DOUGHNUT LIP GLOSS |[doughnut,lip,gloss] |
…
|AIRLINE BAG VINTAGE WORLD CHAMPION |[airline,bag,vintage,world,champion] |
|AIRLINE BAG VINTAGE JET SET BROWN |[airline,bag,vintage,jet,set,brown] |
+----------------------------------+------------------------------------------+

14. 预处理的另一个工具是估计器(Estimator)。如果要执行的转换必须使用与输入列有关的数据或信息进行初始化(通常传递输入列本身来派生),那就有必要估计器。例如,如果要将列中的值缩放为以0为均值和具有单位方差的值,则需要处理整个数据,以便计算用于将数据归一化为以0为均值和具有单位方差的值。

从效果上来看,估计器是可以根据特定输入数据配置的转换器,简单来说可以盲目应用转换(转换器类型)、或者根据数据执行转换(估计器类型)。下图是使用对特定输入数据集拟合的估计器的一个简单示例,它生成随后转换器需要的输入数据,再通过转换器以追加新列:

此类估计器的一个示例是StandardScaler,根据该列中的值范围对输入列进行缩放,使其在每个维度中的数据都保持平均值为0方差为1的形式,为此它必须首先处理一遍数据来创建转换器。下面是显示整个过程的示例代码片断以及输出:

// in Scala
import org.apache.spark.ml.feature.StandardScaler
val ss = new StandardScaler().setInputCol("features")
ss.fit(scaleDF).transform(scaleDF).show(false)+---+--------------+------------------------------------------------------------+
|id |features |stdScal_d66fbeac10ea__output |
+---+--------------+------------------------------------------------------------+
|0 |[1.0,0.1,-1.0]|[1.1952286093343936,0.02337622911060922,-0.5976143046671968]|
...
|1 |[3.0,10.1,3.0]|[3.5856858280031805,2.3609991401715313,1.7928429140015902] |
+---+--------------+------------------------------------------------------------+

所有的转换器都要求至少指定输入和输出的列名,这可以通过setInputCol和setOutputCol来设置。虽然有一些默认值,但最好手动指定它们以使其清晰明了。除了输入和输出列,所有的转换器都有不同的可调参数(必须使用set()方法设置)。在Python中还有另一个方法,就是将这些值设置为对象构造函数的关键字参数。估计器要求将转换器首先拟合到给定数据集,然后在结果对象上调用transform()。

15. Rformula是遇到“常规”格式的数据时最易用的转换器。Spark从R语言借用这个转换器概念,可以简单地以声明式地方法为数据指定一组转换。有了这个转换器,值可以是数字或类别变量,不需要从字符串中提取值或以任何方式操作它们。通过执行独热编码(one-hot encoding)的操作,RFormula将自动处理代表类别的输入(字符串形式)。简而言之,独热编码将一组值转换为一组二进制列,它指定一个数据点是否具有每个特定值。使用RFormula,数值列将被强制转换为Double类型而不会被热编码。如果标签列是String类型,则首先会使用StringIndexer将其转换为Double类型。

将数值列转换为Double类型而不是独热编码形式的策略具有一些重要含义。如果是具有数值型的类别变量,则它们将仅仅转换为Double类型,这将隐式地指定次序。确保输入类型进行预期的转换非常重要,如果处理没有顺序关系的类别变量,则应将它们强制转换为String类型。RFormula允许在声明式语言指定转换,目前支持有限的R运算符子集。在实际工作中,对于简单的转换来说它非常适合。基本运算符为:

(1)~:目标(标签)和项(特征)的分隔符号

(2)+:合并项,“+ 0”表示删除空格。

(3)-:删除项,“ -1”表示删除空格,和“+ 0”起相同作用。

(4)::交互(数值乘法,或类别二值化)。

(5).:除了目标列的全部列。

RFormula默认使用label和features的名字来标记它输出的标签和特征集合(对于监督学习),后面所述的模型例子在默认情况下也使用这些列名,这样就可以很容易地将转换成DataFrame的结果传递到模型中进行训练。下面这个例子中希望使用所有可用的变量(.),然后在value1和color之间与value2和color之间指定交互,作为附加特征生成:

from pyspark.ml.feature import RFormulasupervised = RFormula(formula="lab ~ . + color:value1 + color:value2")
supervised.fit(simpleDF).transform(simpleDF).show()+-----+----+------+------------------+--------------------+-----+
|color| lab|value1| value2| features|label|
+-----+----+------+------------------+--------------------+-----+
|green|good| 1|14.386294994851129|(10,[1,2,3,5,8],[...| 1.0|
| blue| bad| 8|14.386294994851129|(10,[2,3,6,9],[8....| 0.0|
...
| red| bad| 1| 38.97187133755819|(10,[0,2,3,4,7],[...| 0.0|
| red| bad| 2|14.386294994851129|(10,[0,2,3,4,7],[...| 0.0|
+-----+----+------+------------------+--------------------+-----+

使用SQL Transformer允许利用Spark的大量SQL相关操作库,就像MLlib转换一样。在SQL中使用的任何SELECT语句都是可行的转换,唯一需要更改的是不要使用表名,只需使用关键字THIS。如果要将某些DataFrame操作写为预处理步骤,或者在超参数调整期间尝试不同的SQL表达式,则可能需要使用SQLTransformer。要注意的是,此转换的输出将作为列追加到输出DataFrame。用户可能希望使用SQLTransformer来表示最原始格式数据上的所有操作,以便为不同的操作创建不同的转换器,这提供了构建和测试不同pipeline的好处,只要换一个转换器就可以。下面是使用SQLTransformer的基本一个例子:

from pyspark.ml.feature import SQLTransformerbasicTransformation = SQLTransformer()\.setStatement("""SELECT sum(Quantity), count(*), CustomerIDFROM __THIS__GROUP BY CustomerID""")basicTransformation.transform(sales).show()-------------+--------+----------+
|sum(Quantity)|count(1)|CustomerID|
+-------------+--------+----------+
| 119| 62| 14452.0|
...
| 138| 18| 15776.0|
+-------------+--------+----------+

VectorAssembler是在几乎每条pipeline中都使用的工具,它将所有特征组合成一个大的向量,然后将其传递到估计器它通常在机器学习流水线的最后一步使用,并将多个Boolean、Double、Vector类型的列作为输入。如果要使用各种转换器执行一些操作,并需要收集所有这些结果,则这种方法就很有用。下面代码段的输出明确说明了它是如何工作的:

from pyspark.ml.feature import VectorAssembler
va = VectorAssembler().setInputCols(["int1", "int2", "int3"])
va.transform(fakeIntDF).show()+----+----+----+--------------------------------------------+
|int1|int2|int3|VectorAssembler_403ab93eacd5585ddd2d__output|
+----+----+----+--------------------------------------------+
| 1| 2| 3| [1.0,2.0,3.0]|
| 4| 5| 6| [4.0,5.0,6.0]|
| 7| 8| 9| [7.0,8.0,9.0]|
+----+----+----+--------------------------------------------+

16. 连续型特征(Continuous Feature)是指从正无穷大到负无穷大的连续型数值,有两个常用于处理连续型特征的转换器。首先,可以通过一个称为分桶(bucketing)的过程将连续特征转换为类别特征,也可以根据不同的要求来缩放和归一化特征。这些转换器仅用于Double类型,因此请确保已将任何其他数值类型转换为Double类型:

# in Python
contDF = spark.range(20).selectExpr("cast(id as double)")

实现分桶(Bucketing)的最直接方法是使用Bucketizer,这将把一个给定的连续型特征分解到指定的桶中,需要指定如何根据Double类型值的数组或列表创建桶,例如假设有一个代表一个人体重的列,并且希望根据这些信息预测一些值。在某些情况下,创建三个分别代表“超重”、“平均值”和“偏轻”的桶可能更简单。要设置桶就需要设置它的边界,例如在contDF数据集上设置5.0、10.0、250.0的拆分值就会失败,因为没有涵盖所有可能的输入范围。指定桶时,传入拆分的值必须满足三个要求:

(1)拆分数组中的最小值必须小于DataFrame中的最小值。

(2)拆分数组中的最大值必须大于DataFrame中的最大值。

(3)需要在拆分数组中指定至少三个值,这将创建两个桶

使用Scala.Double.NegativeInfinity代表最小值、Scala.Double.PositiveInfinity代表最大值可以覆盖所有可能的范围。在Python中,用float("-inf")、float("inf")。为了处理null或NaN值,必须指定handleInvalid参数,可以保留这些值(设置keep)、报错(设置error或null)、跳过(设置skip)那些行。下面是分桶的例子:

from pyspark.ml.feature import Bucketizer
bucketBorders = [-1.0, 5.0, 10.0, 250.0, 600.0]
bucketer = Bucketizer().setSplits(bucketBorders).setInputCol("id")
bucketer.transform(contDF).show()+----+---------------------------------------+
| id|Bucketizer_4cb1be19f4179cc2545d__output|
+----+---------------------------------------+
| 0.0| 0.0|
...
|10.0| 2.0|
|11.0| 2.0|
...

除了基于硬编码值进行分桶外,另一个选项是基于数据的百分比进行拆分,这是通过QuantileDiscretizer完成的,它将值放入用户指定的存储桶中,并由近似分位数的值确定拆分,例如数据中的90分位数表示90%的数据小于该值。通过使用setRelativeError 设置近似分位数计算的相对误差,可以控制桶的拆分方式。Spark这样做是通过允许指定数据上的桶数,进而据此相应地拆分数据。下面是一个示例:

from pyspark.ml.feature import QuantileDiscretizer
bucketer = QuantileDiscretizer().setNumBuckets(5).setInputCol("id")
fittedBucketer = bucketer.fit(contDF)
fittedBucketer.transform(contDF).show()+----+----------------------------------------+
| id|quantileDiscretizer_cd87d1a1fb8e__output|
+----+----------------------------------------+
| 0.0| 0.0|
...
| 6.0| 1.0|
| 7.0| 2.0|
...
|14.0| 3.0|
|15.0| 4.0|
...
+----+----------------------------------------+

刚才介绍的分桶技术是将数据分桶最常用的方法,但是当前Spark中也有许多其他的方法。从数据流的角度来看,所有这些过程都是相同的,即把连续型数据划分到桶中进而得到类别变量,区别在于取得这些桶的算法。MLlib也有更高级的技术例如局部敏感哈希(LSH)来完成近似分桶的任务。

17. 前面已经明白如何利用分桶技术来创建连续型变量的分组。另一个常见的任务是缩放(Scaling)和归一化(Normalization)连续型数据,虽然并不总是必要的,但这样做通常会带来更好的效果。当数据集包含基于不同单位度量的多个列时,就有必要这样做。例如假设有一个包含两列的DataFrame,一列是重量(以盎司为单位),一列是高度(以英尺为单位)。如果没有缩放或归一化数据,算法将对高度变化不太敏感,因为英尺的高度值比盎司的重量值小得多,这就是一个应该缩放数据的情况。

归一化的例子可能涉及转换数据,使得每个点的值都表示其与该列的平均值的距离,例如想知道某个体的身高与平均身高差了多少,许多算法会假设输入数据是归一化后的。根本目标是希望不同单位的数据具有相同的度量,以便可以很容易地以合理的方式比较。在MLlib中它总是基于Vector类型的列,MLlib将查看给定列中的所有行(Vector类型),然后将这些向量中的每个维度视为一种特殊的列,之后它将分别对每个维度应用缩放或归一化函数。一个简单的例子可以是一列中的下列向量:

1,2
3,4

当应用缩放(而非归一化)函数时,3和1将根据另外两个值进行调整,而2和4也将进行调整,这通常被称为逐分量比较(component-wise comparisons)。

StandardScaler将一组特征值归一化成平均值为0而标准偏差为1的一组新值。withStd标志表示将数据缩放到单位标准差,而withMean标志(默认为false)表示将使数据在缩放之前进行中心化(变量减去它的均值)。稀疏向量中心化非常耗时,因为一般会将它们转化为稠密向量,所以谨慎中心化数据。下面是一个使用StandardScaler的例子:

from pyspark.ml.feature import StandardScaler
sScaler = StandardScaler().setInputCol("features")
sScaler.fit(scaleDF).transform(scaleDF).show()+---+--------------+-----------------------------------------+
| id| features|MinMaxScaler_460cbafafbe6b9ab7c62__output|
+---+--------------+-----------------------------------------+
| 0|[1.0,0.1,-1.0]| [5.0,5.0,5.0]|
...
| 1|[3.0,10.1,3.0]| [10.0,10.0,10.0]|
+---+--------------+-----------------------------------------+

18. MinMaxScaler将向量中的值基于给定的最小值到最大值按比例缩放。如果指定的最小值为0且最大值为1,则所有值都将介于0和1之间:

from pyspark.ml.feature import MinMaxScaler
minMax = MinMaxScaler().setMin(5).setMax(10).setInputCol("features")
fittedminMax = minMax.fit(scaleDF)
fittedminMax.transform(scaleDF).show()+---+--------------+----------------------------------------------------------+
|id |features |MaxAbsScaler_402587e1d9b6f268b927__output |
+---+--------------+----------------------------------------------------------+
|0 |[1.0,0.1,-1.0]|[0.3333333333333333,0.009900990099009901,-0.3333333333333]|
...
|1 |[3.0,10.1,3.0]|[1.0,1.0,1.0] |
+---+--------------+----------------------------------------------------------+

最大绝对值缩放(MaxAbsScaler)将每个值除以该特征的最大绝对值来缩放数据。因此,所有的值最终都会在-1和1之间。该转换器不会平移数据也不会中心化数据:

from pyspark.ml.feature import MaxAbsScaler
maScaler = MaxAbsScaler().setInputCol("features")
fittedmaScaler = maScaler.fit(scaleDF)
fittedmaScaler.transform(scaleDF).show()+---+--------------+-----------------------------------------------+
| id| features|ElementwiseProduct_42b29ea5a55903e9fea6__output|
+---+--------------+-----------------------------------------------+
| 0|[1.0,0.1,-1.0]| [10.0,1.5,-20.0]|
...
| 1|[3.0,10.1,3.0]| [30.0,151.5,60.0]|
+---+--------------+-----------------------------------------------+

ElementwiseProduct允许用一个缩放向量对某向量中的每个值以不同的尺度进行缩放。例如,给定下面的向量和行"1,0.1,-1" ,输出将是"10,1.5,-20"。当然缩放向量的维度必须与要进行缩放的向量的维度匹配:

from pyspark.ml.feature import ElementwiseProduct
from pyspark.ml.linalg import Vectors
scaleUpVec = Vectors.dense(10.0, 15.0, 20.0)
scalingUp = ElementwiseProduct()\.setScalingVec(scaleUpVec)\.setInputCol("features")
scalingUp.transform(scaleDF).show()+---+--------------+-----------------------------------------------+
| id| features|ElementwiseProduct_42b29ea5a55903e9fea6__output|
+---+--------------+-----------------------------------------------+
| 0|[1.0,0.1,-1.0]| [10.0,1.5,-20.0]|
...
| 1|[3.0,10.1,3.0]| [30.0,151.5,60.0]|
+---+--------------+-----------------------------------------------+

Normalizer允许使用某个幂范数来缩放多维向量,Normalizer的作用范围是每一行,使每一个行向量的范数变换为一个单位范数。它通过参数“p”设置是几范数。例如可以使用p =1表示曼哈顿范数(或曼哈顿距离),p = 2表示欧几里德范数等。如果是一阶范数,将每一行的规整为一阶范数为1的向量,一阶范数即所有值绝对值之和。例如下面例子中第一行[1.0,0.1,-1.0]的一阶范数(曼哈顿范数)是1.0+0.1+1.0=2.1,所以会得到[1.0/2.1,0.1/2.1,-1.0/2.1]=[0.47619047619047...]:

from pyspark.ml.feature import Normalizer
manhattanDistance = Normalizer().setP(1).setInputCol("features")
manhattanDistance.transform(scaleDF).show()+---+--------------+-------------------------------+
| id| features|normalizer_1bf2cd17ed33__output|
+---+--------------+-------------------------------+
| 0|[1.0,0.1,-1.0]| [0.47619047619047...|
| 1| [2.0,1.1,1.0]| [0.48780487804878...|
| 0|[1.0,0.1,-1.0]| [0.47619047619047...|
| 1| [2.0,1.1,1.0]| [0.48780487804878...|
| 1|[3.0,10.1,3.0]| [0.18633540372670...|
+---+--------------+-------------------------------+

19. 处理类型特征(Categorical Feature)最常见的任务是索引(Indexing)。索引将列中的一个类别变量转换为数值,进而可以嵌入到机器学习算法,通常建议在预处理时重新索引每个类别变量,这是为了保证一致性。因为从长远来看,这有助于模型的维护,因为代码可能会随着时间的推移而发生变化。

实现索引的最简单方法是通过StringIndexer,它将字符串映射到不同的数字id。Spark的StringIndexer还会创建附加到DataFrame的元数据,用来指定哪些输入字符串对应于哪些输出数值,这样以后就可以从索引的数值反向推出原始的类别输入

from pyspark.ml.feature import StringIndexer
lblIndxr = StringIndexer().setInputCol("lab").setOutputCol("labelInd")
idxRes = lblIndxr.fit(simpleDF).transform(simpleDF)
idxRes.show()+-----+----+------+------------------+--------+
|color| lab|value1| value2|labelInd|
+-----+----+------+------------------+--------+
|green|good| 1|14.386294994851129| 1.0|
...
| red| bad| 2|14.386294994851129| 0.0|
+-----+----+------+------------------+--------+

如上所示,可以将StringIndexer应用于非字符串的列,这种情况下这些非字符串列将在被索引之前转换为字符串

要记住StringIndexer是一个必须符合输入数据的估计器,它必须看到所有输入之后进而确定输入到ID的映射关系。如果在输入"a"、"b"和"c"上训练StringIndexer,然后再输入"d" 的话,则默认情况下会返回错误。另一个可能是,如果输入值不是训练过程中看到的值,则跳过该值对应的行,对于该示例输入"d"值将导致该行完全跳过。可以在训练索引器或流水线之前或之后设置此选项。

20. 当检查机器学习的结果时,用户希望将索引的数值类型映射回原始的类别值。由于MLlib分类模型使用索引值进行预测,因此这种转换帮助将模型预测值(索引值)转换回原始代表的类别,可以使用IndexToString()来执行此处理,注意到不必指定输出的是String类型,因为MLlib会维护此元数据信息,可以选择指定转换哪列:

from pyspark.ml.feature import IndexToString
labelReverse = IndexToString().setInputCol("labelInd")
labelReverse.transform(idxRes).show()+-----+----+------+------------------+--------+--------------------------------+
|color| lab|value1| value2|labelInd|IndexToString_415...2a0d__output|
+-----+----+------+------------------+--------+--------------------------------+
|green|good| 1|14.386294994851129| 1.0| good|
...
| red| bad| 2|14.386294994851129| 0.0| bad|
+-----+----+------+------------------+--------+--------------------------------+

VectorIndexer是处理在数据集向量内部具有类别变量的有用工具。此工具将自动查找输入向量内部的类别特征,并将它们转换为具有从0开始的类别索引。例如在下面示例的DataFrame中,向量中的第一列是具有两个不同类别的类别变量,而其余变量是连续的,通过在VectorIndexer中设置maxCategories为2,可以指示Spark使用两个或更少的值代表向量中的某一列,并将其转换为一个类别变量;当知道最多有多少个类别时,可以指定它,并且它将自动对类别值进行相应的索引,Spark会根据这个参数改变数据值,所以有大量重复值的连续变量就会被意外地转换为类别变量:

from pyspark.ml.feature import VectorIndexer
from pyspark.ml.linalg import Vectors
idxIn = spark.createDataFrame([(Vectors.dense(1, 2, 3),1),(Vectors.dense(2, 5, 6),2),(Vectors.dense(1, 8, 9),3)
]).toDF("features", "label")
indxr = VectorIndexer()\.setInputCol("features")\.setOutputCol("idxed")\.setMaxCategories(2)
indxr.fit(idxIn).transform(idxIn).show()+-------------+-----+-------------+
| features|label| idxed|
+-------------+-----+-------------+
|[1.0,2.0,3.0]| 1|[0.0,2.0,3.0]|
|[2.0,5.0,6.0]| 2|[1.0,5.0,6.0]|
|[1.0,8.0,9.0]| 3|[0.0,8.0,9.0]|
+-------------+-----+-------------+

21. 索引类别变量只是整个工作的一部分,独热编码(One-Hot Encoding)是在类别变量索引之后执行的非常常见的数据转换。这是因为索引并不总是能够以(对下游处理模型来说)正确的方式表示类别变量。例如当对"color"列进行索引时,会注意到某些颜色的值(或索引号)比其他的高(例如蓝色是1,绿色是2)。

这是不正确的,因为它给出的这种数学表达(即机器学习算法的输入)似乎暗示了绿色>蓝色,这在颜色类别的情况下是没有意义的。为了避免这种情况出现,可以使用OneHotEncoder,它将不同的类别值转换为向量中的一个布尔值元素(1或0 。当对颜色类别进行编码时,我们可以看到这些颜色类别不再被排序,从而使下游模型(如线性模型) 更容易处理:

from pyspark.ml.feature import OneHotEncoder, StringIndexer
lblIndxr = StringIndexer().setInputCol("color").setOutputCol("colorInd")
colorLab = lblIndxr.fit(simpleDF).transform(simpleDF.select("color"))
ohe = OneHotEncoder().setInputCol("colorInd")
ohe.transform(colorLab).show()+-----+--------+------------------------------------------+
|color|colorInd|OneHotEncoder_46b5ad1ef147bb355612__output|
+-----+--------+------------------------------------------+
|green| 1.0| (2,[1],[1.0])|
| blue| 2.0| (2,[],[])|
...
| red| 0.0| (2,[0],[1.0])|
| red| 0.0| (2,[0],[1.0])|
+-----+--------+------------------------------------------+

22. 文本类型数据总是棘手的输入,因为它往往需要大量的操作才能映射到一个机器学习模型可用的格式。通常会看到两种文本:自由格式文本和字符串类别变量。分词是将任意格式的文本转变成一个“符号”(token)列表或者一个单词列表的过程。要做到这一点最简单的方法是使用Tokenizer类,该转换器把由空格分开的包含若干单词的字符串转换成单词数组。例如可能想将Description属性转换成一个token列表:

from pyspark.ml.feature import Tokenizer
tkn = Tokenizer().setInputCol("Description").setOutputCol("DescOut")
tokenized = tkn.transform(sales.select("Description"))
tokenized.show(20, False)+-----------------------------------+------------------------------------------+
|Description DescOut |
+-----------------------------------+------------------------------------------+
|RABBIT NIGHT LIGHT |[rabbit, night, light] |
|DOUGHNUT LIP GLOSS |[doughnut, lip, gloss] |
...
|AIRLINE BAG VINTAGE WORLD CHAMPION |[airline, bag, vintage, world, champion] |
|AIRLINE BAG VINTAGE JET SET BROWN |[airline, bag, vintage, jet, set, brown] |
+-----------------------------------+------------------------------------------+

不仅可以基于空格符创建Tokenizer,也可以用RegexTokenizer指定的正则表达式来分词。正则表达式的格式应符合Java正则表达式(RegEx)语法:

from pyspark.ml.feature import RegexTokenizer
rt = RegexTokenizer()\.setInputCol("Description")\.setOutputCol("DescOut")\.setPattern(" ")\.setToLowercase(True)
rt.transform(sales.select("Description")).show(20, False)+-----------------------------------+------------------------------------------+
|Description DescOut |
+-----------------------------------+------------------------------------------+
|RABBIT NIGHT LIGHT |[rabbit, night, light] |
|DOUGHNUT LIP GLOSS |[doughnut, lip, gloss] |
...
|AIRLINE BAG VINTAGE WORLD CHAMPION |[airline, bag, vintage, world, champion] |
|AIRLINE BAG VINTAGE JET SET BROWN |[airline, bag, vintage, jet, set, brown] |
+-----------------------------------+------------------------------------------+

23. 分词后的一个常见任务是过滤停用词(stop word,这些常用词在许多分析中没有什么意义,因此应被删除。英语中经常出现的停用词包括“the”,“and”,“but”。Spark通过调用以下方法查看默认的停用词列表,如有需要可以不区分大小写:

from pyspark.ml.feature import StopWordsRemover
englishStopWords = StopWordsRemover.loadDefaultStopWords("english")
stops = StopWordsRemover()\.setStopWords(englishStopWords)\.setInputCol("DescOut")
stops.transform(tokenized).show()

以下输出说明了其运行原理:

+--------------------+--------------------+------------------------------------+
| Description| DescOut|StopWordsRemover_4ab18...6ed__output|
+--------------------+--------------------+------------------------------------+
...
|SET OF 4 KNICK KN...|[set, of, 4, knic...| [set, 4, knick, k...|
...
+--------------------+--------------------+------------------------------------+

注意在输出列中单词of被删除了,因为它是一个及其常见的单词,与任何下游操作都不相关,只会给数据集增加噪音。

字符串分词和过滤停用词之后,会得到可作为特征的一个词集合,观察这些单词的组合(通常是关联词)很有趣。单词组合在技术上称为n-gram,即长度为n的单词序列,长度为1的n-gram称为unigram,长度为2的单词序列称为bigram,而长度为3的单词序列称为trigram(长度再大的就叫做four-gram,five-gram等)。创建n-gram要考虑单词的次序,将一个含有三个单词的句子转换为bigram将得到两个bigram。创建n-gram的目的是为了更好地捕获句子结构和更多的信息,而不是逐个单词查看。来创建一些n-gram来说明这个概念,例如"Big Data Processing Made Simple"的bigram包括:

(1)“Big Data”;

(2) “Data Processing”;

(3) “Processing Made”;

(4) “Made Simple”。

它的trigram是:

(1) “Big Data Processing”;

(2) “Data Processing Made”;

(3) “Processing Made Simple”。

使用n-gram,可以查看经常共同出现的单词序列,并将它们用作机器学习算法的输入,这比逐个单词处理可以创建更好的特征(以空格分隔):

from pyspark.ml.feature import NGram
unigram = NGram().setInputCol("DescOut").setN(1)
bigram = NGram().setInputCol("DescOut").setN(2)
unigram.transform(tokenized.select("DescOut")).show(False)
bigram.transform(tokenized.select("DescOut")).show(False)

unigram的结果为:

+-----------------------------------------+-------------------------------------
DescOut |ngram_104c4da6a01b__output ...
+-----------------------------------------+-------------------------------------
|[rabbit, night, light] |[rabbit, night, light] ...
|[doughnut, lip, gloss] |[doughnut, lip, gloss] ...
...
|[airline, bag, vintage, world, champion] |[airline, bag, vintage, world, cha...
|[airline, bag, vintage, jet, set, brown] |[airline, bag, vintage, jet, set, ...
+-----------------------------------------+-------------------------------------

bigram的结果是:

+------------------------------------------+------------------------------------
DescOut |ngram_6e68fb3a642a__output ...
+------------------------------------------+------------------------------------
|[rabbit, night, light] |[rabbit night, night light] ...
|[doughnut, lip, gloss] |[doughnut lip, lip gloss] ...
...
|[airline, bag, vintage, world, champion] |[airline bag, bag vintage, vintag...
|[airline, bag, vintage, jet, set, brown] |[airline bag, bag vintage, vintag...
+------------------------------------------+------------------------------------

24. 一旦有了词特征,就可以开始对单词和单词组合进行计数,以便在模型中使用。最简单的方法就是统计文档中每个单词的二值计数,即存在该单词还是不存在该单词,这种简单方法可以规范化文档大小和出现次数,并获得可以用于文档分类的数值特征。此外,还可以使用CountVectorizer来对单词进行计数,或使用TF–IDF转换来对单词重新赋值权重。一个CountVectorizer对分词之后的数据进行操作,并执行以下两项操作:

(1)在fit(拟合)过程中,在全部文档中查找一个单词集合,然后计算在所有文档中这些单词的出现次数。

(2)然后在转换过程中计算DataFrame列每行中给定单词的出现次数,并输出包含在该行中的单词的向量。

在概念上,此转换器将每行看作文档(document),把每个单词看作项(term),把所有项的集合看作词库(vocabulary)。这些都是可调的参数,可以设置最小项频率参数(minTF)来决定词库中是否包含某项(这样可以有效地删除低频单词);可以设置词库中的项需要至少出现在多少个文档中(用于从词库中删除低频单词的另一种方法),最后设置总的最大单词量(vocabSize)。最后默认情况下CountVectorizer将输出文档中各项的计数,若要只返回word是否存在于文档中,可以使用setBinary(true)。下面是使用CountVectorizer的示例:

from pyspark.ml.feature import CountVectorizer
cv = CountVectorizer()\.setInputCol("DescOut")\.setOutputCol("countVec")\.setVocabSize(500)\.setMinTF(1)\.setMinDF(2)
fittedCV = cv.fit(tokenized)
fittedCV.transform(tokenized).show(False)

虽然输出看起来有点复杂,但实际上它只是一个稀疏向量,包含总的词汇量、词库中某单词的索引、以及该单词的计数:

+---------------------------------+--------------------------------------------+
DescOut |countVec |
+---------------------------------+--------------------------------------------+
|[rabbit,night,light] |(500,[150,185,212],[1.0,1.0,1.0]) |
|[doughnut,lip,gloss] |(500,[462,463,492],[1.0,1.0,1.0]) |
...
|[airline,bag,vintage,world,...|(500,[2,6,328],[1.0,1.0,1.0]) |
|[airline,bag,vintage,jet,s...|(500,[0,2,6,328,405],[1.0,1.0,1.0,1.0,1.0]) |
+---------------------------------+--------------------------------------------+

25. 另一种将文本转换为数值表示的方法是使用词频-逆文档频率(TF–IDF)。最简单的情况是,TF–ID度量一个单词在每个文档中出现的频率,并根据该单词出现过的文档数进行加权,结果是在较少文档中出现的单词比在许多文档中出现的单词权重更大。在实践中,像"the"这样的词的权重非常低,因为它在所有文档中都经常出现,而一个更专业的词如"streaming"将出现在较少的文档中,从而有更高的权重值。在某种程度上,TF–IDF帮助查找有类似主题的文档。来看看一个例子,首先将检查数据中包含"red"一词的一些文档:

tfIdfIn = tokenized\.where("array_contains(DescOut, 'red')")\.select("DescOut")\.limit(10)
tfIdfIn.show(10, False)+---------------------------------------+
DescOut |
+---------------------------------------+
|[gingham,heart,,doorstop,red] |
…|[red,retrospot,oven,glove] |
|[red,retrospot,plate] |
+---------------------------------------+

可以在这些文档中看到一些重复出现的单词,但这些词至少提供了一个粗略的主题表示形式。现在输入到TF–IDF中,为此将对每个单词进行哈希运算并将其转换为数值表示形式,然后根据逆文档频率对词库中的每个单词进行加权。哈希是与CountVectorizer类似的过程,但是不可逆,也就是说从输出索引无法反过来获得输入词(多个单词可能映射到相同的输出索引):

from pyspark.ml.feature import HashingTF, IDF
tf = HashingTF()\.setInputCol("DescOut")\.setOutputCol("TFOut")\.setNumFeatures(10000)
idf = IDF()\.setInputCol("TFOut")\.setOutputCol("IDFOut")\.setMinDocFreq(2)idf.fit(tf.transform(tfIdfIn)).transform(tf.transform(tfIdfIn)).show(10, False)

因为输出太大无法在此展示。要注意"red"已经对应到了一个数值,并且每个文档中都要包含该值。而且,代表“red”的项的权重非常低,因为它出现在每个文档中。输出格式是稀疏的Vector,以如下形式输入到机器学习模型中:

(10000,[2591,4291,4456],[1.0116009116784799,0.0,0.0])

此向量使用三个不同的值表示:总的词汇量、文档中出现的每个单词的哈希值、以及这些单词的权重。这类似于CountVectorizer的输出。

Word2Vec是一个基于深度学习的工具,它用于计算一组单词的向量表示形式使用它的目标是希望在某个向量空间中相似的词彼此接近,这样就可以对单词本身进行归纳。该模型易于训练和使用,在一些自然语言处理应用中被证明很有效,包括实体识别、消除歧义、解析、标注和机器翻译。

Word2Vec是基于语义来捕获单词之间的关系的。例如,如果v~king,v~queen,v~man,和v~women代表这四个词的向量,那么通常会得到一个这样的表示:v ~ king − v ~ man + v ~ women ~=v ~ queen。为此,Word2Vec使用一种名为“skipgrams”的技术将单词组成的句子转换为向量表示(可选为特定大小)。它通过构建一个词汇表来做到这一点,然后对于每个句子它删除一个token,并训练模型来预测“n-gram”特征中缺少的token。Word2Vec在连续的、无格式约束的(以token形式表示的)文本中最有效。下面是文档中的一个简单示例:

from pyspark.ml.feature import Word2Vec
# Input data: Each row is a bag of words from a sentence or document.
documentDF = spark.createDataFrame([("Hi I heard about Spark".split(" "), ),("I wish Java could use case classes".split(" "), ),("Logistic regression models are neat".split(" "), )
], ["text"])
# Learn a mapping from words to Vectors.
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text",outputCol="result")
model = word2Vec.fit(documentDF)
result = model.transform(documentDF)
for row in result.collect():text, vector = rowprint("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector)))

输出结果为:

Text:[Hi,I,heard,about,Spark] =>
Vector:[-0.008142343163490296,0.02051363289356232,0.03255096450448036]
Text:[I,wish,Java,could,use,case,classes] =>
Vector:[0.043090314205203734,0.035048123182994974,0.023512658663094044]
Text:[Logistic,regression,models,are,neat] =>
Vector:[0.038572299480438235,-0.03250147425569594,-0.01552378609776497]

26. 尽管几乎每个ML的转换器都在某种程度上操纵特征空间,但下面的算法和工具是自动扩展输入特征向量或将它们降维的常用方法。主成分分析(PCA是一种数学方法,用于找到数据中最重要的成分(主成分) 。它通过创建新的特征集(“成分”)来更改数据的特征表示形式,每个新特征都是原始特征的组合。PCA的作用在于它可以创建一组更小的更有意义的特征集合输入到模型中,从而减少运行时间成本

如果有一个特别大的输入数据集,并且希望减少特征数,则需要使用PCA。这种情况经常出现在文本分析中,其整个特征空间很大,许多特征都无关紧要。使用PCA可以找到最重要的特征组合,只把它们输入到机器学习模型中。PCA使用参数k指定要创建的输出特征的数量,这通常应该比输入向量的尺寸小得多。选择正确的k值很难,需要查阅其他资料来确定。下面来训练k=2的PCA:

from pyspark.ml.feature import PCA
pca = PCA().setInputCol("features").setK(2)
pca.fit(scaleDF).transform(scaleDF).show(20, False)+---+--------------+------------------------------------------+
|id |features |pca_7c5c4aa7674e__output |
+---+--------------+------------------------------------------+
|0 |[1.0,0.1,-1.0]|[0.0713719499248418,-0.4526654888147822] |
...
|1 |[3.0,10.1,3.0]|[-10.872398139848944,0.030962697060150646]|
+---+--------------+------------------------------------------+

多项式扩展(Polynomial Expansion)基于所有输入列生成交互变量。通过多项式扩展,可以指定各种程度的交互作用(Interaction)。例如对于一个二阶多项式,Spark把特征向量中的每个值乘以所有其他值,然后将结果存储为特征,比如如果有两个输入特征使用二阶多项式(2x2),将得到4个输出特征;如果有三个输入特征,将得到9个输出特征(3x3);如果使用三阶多项式将得到27个输出特性(3x3x3)等等。

当希望看到特定特征之间的交互作用,但不确定要考虑哪些特征之间的交互作用时,此转换非常有用。但是多项式扩展会扩大特征空间,从而导致高计算成本和过拟合效果,所以慎用它,特别是在高维的情况下。这里有一个二阶多项式的例子:

from pyspark.ml.feature import PolynomialExpansion
pe = PolynomialExpansion().setInputCol("features").setDegree(2)
pe.transform(scaleDF).show()+---+--------------+-----------------------------------------------------------+
|id |features |poly_9b2e603812cb__output |
+---+--------------+-----------------------------------------------------------+
|0 |[1.0,0.1,-1.0]|[1.0,1.0,0.1,0.1,0.010000000000000002,-1.0,-1.0,-0.1,1.0] |
...
|1 |[3.0,10.1,3.0]|[3.0,9.0,10.1,30.299999999999997,102.00999999999999,3.0... |
+---+--------------+-----------------------------------------------------------+

27. 通常会拥有大量可选的特征,并希望选择一个较小的子集用于训练。例如许多特征之间可能是紧密相关的(所以只需选用一个),或者使用太多的特征可能导致过拟合,此过程称为特征选择。一旦训练了一个模型,有许多方法来评估特征的重要性,但另一种选择是做一些粗略的预过滤。Spark提供一些简单的工具来做特征选择,例如ChiSqSelector。

ChiSqSelector利用统计测试来确定与试图预测的标签无关的特征,并删除不相关的特征它经常与类别数据一起使用,以减少输入到模型中的特征数量,也可以降低文本数据的维数(以频率或计数的形式)。由于这种方法是基于卡方测试(Chi-Square test)的,有几种不同的方法可以选择最佳特征,这些方法是:

(1)numTopFea tures,它基于p-value排序;

(2)percentile,它采用输入特征的比例(而不是仅top N个特征);

(3)fpr,它设置截断p-value。

这里使用前面创建过的CountVectorizer输出来演示这一点:

from pyspark.ml.feature import ChiSqSelector, Tokenizer
tkn = Tokenizer().setInputCol("Description").setOutputCol("DescOut")
tokenized = tkn\.transform(sales.select("Description", "CustomerId"))\.where("CustomerId IS NOT NULL")
prechi = fittedCV.transform(tokenized)\.where("CustomerId IS NOT NULL")
chisq = ChiSqSelector()\.setFeaturesCol("countVec")\.setLabelCol("CustomerId")\.setNumTopFeatures(2)
chisq.fit(prechi).transform(prechi)\.drop("customerId", "Description", "DescOut").show()

28. 还有一些关于转换器和估计器的高级用法,此处讨论两个最常见的:持久化转换器、编写自定义转换器。

(1)一旦使用了Estimator来配置Transformer,将其写入磁盘并在需要时加载它(如在另一个SparkSession中使用)是很有用的,上面接触持久化整条pipeline时看到了这一点。要单独保存转换器,需要使用内置转换器(或标准转换器)上的write方法并指定位置:

# in Python
fittedPCA = pca.fit(scaleDF)
fittedPCA.write().overwrite().save("/tmp/fittedPCA")

之后就可以加载该转换器:

from pyspark.ml.feature import PCAModel
loadedPCA = PCAModel.load("/tmp/fittedPCA")
loadedPCA.transform(scaleDF).show()

(2)如果要将自己的某些业务逻辑编码并放入ML Pipeline中,或传递给超参数搜索等,则编写自定义转换器可能很有用处。通常应尽量使用内置模块(例如SQLTransformer),因为它们经过了优化可以高效运行。但有时没有合适的内置模块,就需要自定义一个转换器。下面创建一个简单的分词器(tokenizer)来演示:

//in Scala
import org.apache.spark.ml.UnaryTransformer
import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable,Identifiable}
import org.apache.spark.sql.types.{ArrayType, StringType, DataType}
import org.apache.spark.ml.param.{IntParam, ParamValidators}class MyTokenizer(override val uid: String)extends UnaryTransformer[String, Seq[String],MyTokenizer] with DefaultParamsWritable {def this() = this(Identifiable.randomUID("myTokenizer"))val maxWords: IntParam = new IntParam(this, "maxWords","The max number of words to return.",ParamValidators.gtEq(0))def setMaxWords(value: Int): this.type = set(maxWords, value)def getMaxWords: Integer = $(maxWords)override protected def createTransformFunc: String => Seq[String] = (inputString: String) => {inputString.split("\\s").take($(maxWords))}override protected def validateInputType(inputType: DataType): Unit = {require(inputType == StringType, s"Bad input type: $inputType. Requires String.")}override protected def outputDataType: DataType = new ArrayType(StringType,true)
}// 这使得可以通过该对象读回数据
object MyTokenizer extends DefaultParamsReadable[MyTokenizer]val myT = new MyTokenizer().setInputCol("someCol").setMaxWords(2)
myT.transform(Seq("hello world. This text won't show.").toDF("someCol")).show()

三、分类

29. 分类(Classification)是在给定一组输入特征的情况下预测标签、类别、类或离散变量的任务。与其他机器学习任务例如回归相比,关键区别在于分类任务的输出标签是一组可能值的有限集合(例如三个类别)。在现实世界中还有几个可以应用分类的应用场景:

(1)预测信贷风险。融资公司在向公司或个人提供贷款之前,可能会查看很多信息。最后决定是否提供贷款,是否提供贷款是一个二进制分类问题。

(2)新闻分类。分类算法还可以被训练来预测新闻文章的主题(体育、政治、商业等) 。

(3)对用户行为分类。根据从传感器(如手机的重力感应器或智能手表)采集的数据,可以预测用户的行为活动,而输出将是一组有限类别的集合(如行走、睡眠、站立或运动)。

在继续介绍分类计数之前,先回顾一下几种不同的分类类型:

(1)最简单的分类类型是二元分类(Binary Classification),只会预测出两个标签。例如欺诈分析,某一交易归类为欺诈行为或是非欺诈行为;或者垃圾邮件分类,一个给定的电子邮件归类为垃圾邮件或是非垃圾邮件。

(2)输出多于两个标签的分类任务是多分类(Multiclass Classification),预测结果是从多于两个的候选标签集合中选出来的一个标签。例如,Facebook从一张照片中预测是哪个人,气象学家预测天气(雨天,晴天,多云等)。注意始终有一组有限的候选类别来预测,而不是无限制的类别候选,有时候多分类也称为多项式分类(multinomial classification)。

(3)最后一种分类任务是多标签分类(Multilabel Classification),一个给定的输入可对应多个标签。例如根据书的内容预测书的主题,虽然这可以是一个多分类问题,但更适合于多标签分类,因为一本书可以涉及多个主题,划分到多个分类中。多标签分类的另一个例子是识别图像中的多个物体,预测输出物体的个数不一定是固定的,不同图像可以检测出不同数量的物体。

30. Spark有一些可以直接用的二分类和多分类模型

(1)Logistic回归(Logistic regression);

(2)决策树(Decision trees);

(3)随机森林(Random forests);

(4)梯度提升决策树(Gradient-boosted trees)。

Spark本身不支持多标签分类,为了训练出多标签分类模型,用户必须训练每一个标签模型然后人工地将它们结合起来。当人工创建好模型后,可以使用评估多种模型效果的内置工具。选择模型时,模型的可扩展性是一个重要的考虑因素,总的来说Spark支持训练大规模机器学习模型(如果数据量不算巨大不需要分布式集群,基于单节点的训练还有其他很多优秀的工具)。下图简单列出了各个模型的可扩展性能,据此可以找到适合特定任务的最佳模型(如果可扩展性是核心考虑因素的话):

可以看到,几乎所有这些模型都可以适应规模庞大的输入数据集,无限制训练实例的原因是因为,这些训练采用了随机梯度下降和L-BFGS等方法进行训练,这些方法专门针对大数据集进行了优化,并避免了可能存在的限制训练实例数量的若干问题。下面开始先加载数据再来详细看看这些分类模型:

bInput = spark.read.format("parquet").load("/data/binary-classification")\.selectExpr("features", "cast(label as double) as label")

31. 逻辑回归(Logistic Regression)是最常见的分类方法之一。它是一种线性模型,为输入的每个特征赋以权重之后将它们组合在一起,从而获得该输入属于特定类的概率。这些权重很有用,因为它们很好地表示了特征重要性。如果某特征的权重很大,则说明该特征的变化对结果有显著影响(假定执行了标准化),而较小的权重意味着该特征不是那么重要。

模型的超参数决定了模型本身的基本结构配置。逻辑回归包含以下超参数:

(1)family。可以设置为“multinomial”(两个或更多个不同的标签,对应多分类)或“binary”(仅两个不同的标签,对应二分类)。

(2)elasticNetParam。从0到1的浮点值,该参数依照弹性网络正则化(elastic net regularization)的方法将L1正则化和L2正则化混合(即两者的线性组合)。对L1或L2的选择取决于特定用例,其指导原则如下:L1正则化(值1)将在模型中产生稀疏性,因为(对输出的影响不大的)某些特征权重将变为零,因此它可以作为一个简单的特征选择方法。而L2正则化(值0)不会造成稀疏,因为特定特征的相应权重只会趋于零而不会等于零。弹性网络给出了两者之间最好的结合,即可以选择一个介于0和1之间的值,以指定L1和L2正则化的混合。在大多数情况下,应该通过多次测试来调整这个值。

(3)fitIntercept。可以是true或false,此超参数决定是否适应截距。通常情况下,如果没有对训练数据执行标准化,则需要添加截距。

(4)regParam。大于等于0的值,确定在目标函数中正则化项的权重,它的选择和数据集的噪音情况和数据维度有关,最好尝试多个值(如0、0.01、0.1、1)。

(5)standardization。可以是true或false,设置它决定在将输入数据传递到模型之前是否要对其进行标准化。

训练参数用于指定我们如何执行训练,下面是逻辑回归的训练参数:

(1)maxIter。迭代次数,更改此参数可能不会对结果造成很大的影响,所以它不应该是要调整的首个参数。默认值是100。

(2)tol。此值指定一个用于停止迭代的阈值,达到该阈值说明模型已经优化的足够好了。指定该参数后,算法可能在达到maxIter指定的次数之前停止迭代,默认值为1.0E-6。这也不应该是要调整的第一个参数。

(3)weightCol。权重列的名称,用于赋予某些行更大的权重。如果有衡量每个训练样本重要性的方法,并对这些样本赋予了不同的训练权重值,这就是个很有用的工具。例如在10000个样例中知道哪些样本的标签比其他样本的标签更精确,就可以赋予那些更有用的样本以更大的权值。

下面这些参数指定模型如何实际进行预测而又不影响训练:

(1)threshold。一个0~1的Double值。此参数是预测时的概率阈值,可以根据需要调整此参数以平衡误报(false positive)和漏报(false negative)。例如,如果误报的成本高昂,可能希望使该预测阈值非常高。

(2)thresholds。该参数允许在进行多分类的时候指定每个类的阈值数组,它和之前的threshold类似。

下面是一个使用LogisticRegression(逻辑回归)模型的简单示例,没有指定任何参数而是使用默认值,并且数据符合正确的列命名。在实际当中,一般也不需要设置许多参数:

from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression()
print lr.explainParams() #查看所有的参数
lrModel = lr.fit(bInput)

一旦模型被训练好了,就可以观察系数和截距项来获取有关模型的信息。系数对应于各特征的权重(每个特征权重乘以各特征来计算预测值),而截距项是斜线截距的值(如果在指定模型时选择了适当的截距参数fitIntercept)。查看系数有助于检查构建的模型,也有助于理解各特征如何影响预测:

# in Python
print lrModel.coefficients
print lrModel.intercept

对于多项式分类模型(上面是针对二分类模型的),lrModel.coefficientMatrix和lrModel.interceptVector可以用来得到系数和截距值,它们会返回Matrix类型和Vector类型的值。

逻辑回归提供了一个模型摘要(Model Summary),给出了最终训练模型的相关信息,它类似于在许多R语言机器学习包中看到的摘要。利用二分类摘要,可以得到关于模型本身的各种信息,包括ROC曲线下的面积、f值、准确率、召回率、ROC曲线。要注意,对于曲线下面积不考虑实例权重,因此如果想了解考虑权重的情况,则必须手动处理。可以使用以下API 查看摘要:

# in Python
summary = lrModel.summary
print summary.areaUnderROC
summary.roc.show()
summary.pr.show()

模型到达最终结果状态的速度会显示在目标历史(objective history)中,可以查看模型摘要的目标历史记录来获得:

summary.objectiveHistory

它是一个Double类型的数组,包含了每次训练迭代时模型到底表现如何,此信息有助于帮助用户了解是否已经进行足够次数的迭代训练或是否需要调整其他参数。

32. 决策树(Decision Tree)是一种更友好和更易于理解的分类方法,因为它类似于人类经常使用的简单决策模型。例如预测某人是否会买冰激凌,一个很好的特征可能是这个人是否喜欢冰淇淋,在伪代码中如果person.likes(“ice_cream”)返回true,他们会买冰淇淋,否则不会。决策树模型是基于所有输入构建一棵树形结构,在预测时通过判断各种可能的分支来给出预测结果,这使它被常常做为一个最先被试用的模型,因为它易于推理,易于检查,并且对数据的结构只需要很少的假设。简而言之,它并不是试图训练系数来拟合函数,而是简单地创造了一棵树来进行预测。该模型还支持多分类,并输出各种预测结果和它们的概率。

虽然决策树模型简单,但它有一些缺点,它可能会非常快地就会出现过拟合的情况,意思是在无约束的条件下决策树会基于每个训练样例,从根节点开始创建一条判断路径,这意味着它会对模型训练集中的所有信息进行编码,这很糟糕,因为这样模型就不能泛化到新的数据(可能会得到很差的预测准确度)。当然也有一些方法通过限制分支结构(例如限制高度)来避免过拟合的问题。

有许多不同的方法来配置和训练决策树,下面是Spark实现支持的超参数:

(1)maxDepth。因为正在训练生成一棵树结构,所以指定最大深度以避免过拟合数据集是很有帮助的。默认值为5。

(2)maxBins。在决策树中,连续特征(continuous features)被转换为类别特征(categorical features),maxBins确定应基于连续特征创建多少个槽(bin,相当于类别特征个数),更多的槽提供更细的粒度级别。该值必须大于等于2,并且也需要大于等于数据集中任何类别特征中的类别数。默认值为32。

(3)impurity。建立一个“树”时,在模型应该分支时需要进行配置。不纯度(impurity)表示是否应该在某叶子结点拆分的度量(信息增益)。该参数可以设置为"entropy" 或"gini" (默认值),这是两个常用的不纯度度量。

(4)minInfoGain 。此参数确定可用于分割的最小信息增益。较大的值可以防止过拟合,这通常需要通过测试决策树模型不同变体来确定。默认值是0。

(5)minInstancePerNode。此参数确定需要在一个节点结束训练的实例最小数目。可以把这看成是控制最大深度的另一种方式,可以通过限制深度来防止过拟合,或者可以指定在一个叶子节点上的最少训练样本来防止过拟合。如果它不满足会修剪(prune)树,直到满足要求。较大的值可以防止过拟合。默认值为1,但可以是大于1的任何数值。

还有一些针对如何执行训练的训练参数可以配置,checkpointInterval为设置检查点(checkpoint)的间隔,checkpoint是一种在训练过程中保存模型的方法,此方法可以保证当集群节点因某种原因崩溃时,不影响整个训练过程,将该值设置为10表示模型每10次迭代都会保存checkpoint,将此设置为-1以关闭checkpoint,需要将此参数与checkpointDir(检查点目录)和useNodeIdCache = true一起设置。

决策树只有一个预测参数:thresholds,它的定义和逻辑回归的同名参数相同。下面是一个使用决策树分类器的示例:

from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier()
print dt.explainParams()
dtModel = dt.fit(bInput)

33. 随机森林和梯度提升树是决策树的扩展,其实不必在所有数据上只训练一棵树,而是在不同的数据子集上训练多个树。这样做的思路是,多棵决策树成为某一特定领域的专家,而其他决策树则可以成为其他特定领域的专家。通过结合这些不同的专家,就可以达到超过任何单独方法的“群众智慧”。此外,结合这些方法使用有助于防止过度拟合。

随机森林(Random Forest)和梯度提升树(Gradient-Boosted Trees)是组合决策树的两种截然不同的方法。在随机森林中只训练大量的树,然后平均它们的结果做出预测;而梯度提升树对每棵树进行加权预测(因为一些树对某些类别的预测能力比其他树更强)。它们的参数大致相同,但是梯度提升树目前仅支持二分类任务。

随机森林和梯度提升树具有与决策树模型相同的模型超参数。此外它们各自还添加了几个自己的超参数:

(1)仅适合随机森林的参数有numTrees,即用于训练的树的总数;featureSubsetStrategy确定拆分时应考虑多少特征,它可以是"auto"、"all"、"sqrt"、"log2"或数字"n"。当输入为"n"时,模型将在训练过程中使用n个特征数,当n在范围1~特征数量之间时,模型将在训练期间使用n个特征。这里没有一个适合所有情况的解决方案,所以在pipeline任务中需要尝试不同的值。

(2)仅适合梯度提升树(GBT)的参数有lossType,即梯度提升树在训练过程中优化的损失函数,目前只支持logistic loss损失;maxIter即迭代次数,改变这个值可能不会改变结果太多,所以它不应该是首要调整的参数,默认值是100;以及stepSize,代表算法的学习速度。较大的步长(step size)意味着在两次迭代训练之间较大的变化,该参数可以帮助优化过程,是在训练中应该多次测试的参数,默认值为0.1,可以是从0到1的任何数值。

这两者的模型只有一个训练参数为checkpointInterval,与决策树的定义相同。两者的预测参数thresholds也与决策树相同。下面是使用这些分类方法的代码示例:

from pyspark.ml.classification import RandomForestClassifier
rfClassifier = RandomForestClassifier()
print rfClassifier.explainParams()
trainedModel = rfClassifier.fit(bInput)from pyspark.ml.classification import GBTClassifier
gbtClassifier = GBTClassifier()
print gbtClassifier.explainParams()
trainedModel = gbtClassifier.fit(bInput)

34. 朴素贝叶斯(Naive Bayes)分类是基于贝叶斯定理的分类方法。该模型背后的核心假设是,数据的所有特征都是相互独立的。当然严格的独立性是有点不现实,但即使不符合这条假设仍然可以生成有用的模型。朴素贝叶斯分类通常用于文本或文档分类,尽管它也可以用作更通用的分类任务。有两种不同的模型类型:多元贝努利模型(multivariate Bernoulli model),其中指示器变量代表文档中的一个单词是否存在;多项式模型(multinomial model),其中使用所有单词计数。

使用朴素贝叶斯一个重要的注意事项是,所有的输入特征值必须为非负数。下面的模型超参数是为确定模型的基本结构而指定的:

(1)modelType。可设置为“bernoulli”或“multinomial”。

(2)weightCol。允许对不同的数据点赋值不同的权值。

smoothing这个训练参数指定使用加法平滑(additive smoothing)时的正则化量,该设置有助于平滑分类数据,并通过改变某些类的预期概率来避免过拟合问题,默认值是1。像所有其他模型一样,朴素贝叶斯也使用相同的预测参数thresholds。下面是一个使用朴素贝叶斯分类的示例:

from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes()
print nb.explainParams()
trainedModel = nb.fit(bInput.where("label != 0"))

在该示例的样例数据集中具有负值的特征,具有负特征的行对应于带有标签"0"的行,因此只要通过标签过滤它们,而不是进一步处理它们。

35. Evaluator允许指定模型的终止标准。当只有一个评估器时,它的作用并不太大。但是当在pipeline中使用它时,可以自动尝试模型和转换器的各种参数,尝试所有参数的组合,以查看哪些性能最好。评估器在这条pipeline和参数网格(Parameter Grid)上下文中最有用。

对于分类,有两个评估器对应两列:一个模型预测的标签和一个真正的标签,对于二分类使用BinaryClassificationEvaluator,它支持优化两个不同的指标“areaUnderROC”和“areaUnderPR”。对于多分类,需要使用MulticlassClassificationEvaluator,它支持优化"f1"、"weightedPrecision"、"weightedRecall"和"accuracy"。

MLlib还包含一些工具,能同时评估多个分类指标。但这些metric类暂时还没有从底层RDD框架中移植到基于DataFrame的机器学习包中,因此在Spark2.2中仍然需要创建一个RDD来使用它们,将来此功能很可能会移植到DataFrame上,可以使用三种不同的分类指标:二分类指标、多分类指标和多标签分类指标。

所有这些评测标准都遵循相同的近似样式,将生成的输出值与真实值进行比较,模型将计算所有相关的metric标准,然后可以查询每个指标的值:

from pyspark.mllib.evaluation import BinaryClassificationMetrics
out = model.transform(bInput)\.select("prediction", "label")\.rdd.map(lambda x: (float(x[0]), float(x[1])))
metrics = BinaryClassificationMetrics(out)

这样做之后,可以使用与逻辑回归类似的API,在此metric指标对象上查看典型的metric指标:

# in Python
print metrics.areaUnderPR
print metrics.areaUnderROC
print "Receiver Operating Characteristic"
metrics.roc.toDF().show()

有一些MLlib模型不支持多分类,在这些情况下,用户可以利用一个one-vs-rest分类器来执行多分类,它是在给定一个二分类器的情况下,执行多分类任务。这背后的思想是对于每一个希望预测的类,one-vs-rest分类器将把该分类转化为一个二分类问题,将目标类做为一类,把其余的其它类别做为另外一类。这样,分类预测就变成了一个二分类问题(是此类或不是此类)。

One-vs-rest是作为估计器实现的。对于基分类器,它创建k个分类器实例,为k个类别中的每个分类创建一个对应的二分类问题,训练类i的分类器预测标签是否为i,将类i与所有其他类区分开来。预测是通过对每个二分类器进行评估来完成的,将最有可能的分类器的索引输出为标签。多层感知器(Multilayer Perceptron)是基于(可配置层数和层宽度的)神经网络的分类器,后面的部分会详细讨论。

四、回归

36. 回归(Regression)是分类的逻辑延伸。回归不是仅仅根据一组特质值预测单个离散值,而是预测实数值(表示为连续变量)。回归可能比分类更难,因为从数学角度来看有无限数量的可能输出值,此外目标是优化预测值和真实值之间的一些误差度量,而不是准确率。除此之外,回归和分类是很类似的,因此将看到许多分类和回归有一些相同的基础概念。下面是一系列使用回归的应用场景:

(1)预测电影收视率。如果有电影和电影观众的信息,比如有多少人看过预告片或者在社交媒体上分享过,可以预测有多少人会真去看这部电影。

(2)预测公司营收。鉴于目前的增长轨迹、市场和季节性,可以预测一个公司在未来将获得多少收入。

(3)预测作物产量。如果有关于作物生长区域的信息以及全年的天气情况信息,可以预测某一特定区域位置的农作物总产量。

下面是当前Spark 2.2支持的回归模型,将来的版本将会有所扩增:

(1)线性回归(Linear regression);

(2)广义线性回归(Generalized linear regression);

(3)保序回归(Isotonic regression);

(4)决策树(Decision trees);

(5)随机森林(Random forest);

(6)梯度提升树(Gradient-boosted trees);

(7)生存分析(Survival regression)。

MLlib中的回归模型全部都适合处理大型数据集。下图是一个模型扩展性参考表,可以为特定任务选择最佳模型(如果可扩展性是要考虑的核心因素):

首先读入一些数据,它将在后面例子部分也用到:

# in Python
df = spark.read.load("/data/regression")

37. 线性回归(Linear Regression)假定输入特征的线性组合(每个特征乘以权重的总和)将得到(有一定高斯误差的)输出结果。这个(高斯误差)线性的假设并不总是正确的,但它确实是一个简单的可解释模型,而且一般不会产生过拟合。像逻辑回归一样,Spark实现了ElasticNet正则化,允许结合使用L1和L2正则化。线性回归与逻辑回归具有相同的模型超参数、训练参数,下面是对读入数据集使用线性回归的简短示例:

from pyspark.ml.regression import LinearRegression
lr = LinearRegression().setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8)
print lr.explainParams()
lrModel = lr.fit(df)

正如逻辑回归可以得到模型的详细训练信息,代码字体(code font)方法可以快速访问这些度量标准,它包含了一些传统的衡量回归模型效果的指标,可以帮助看到模型是如何拟合曲线的。summary方法返回具有多个字段的摘要对象,接下来一个个来分析:

残差(residuals)是输入进模型中每个特征的权重;目标历史(objective history)记录了每次迭代训练的情况;均方根误差(root mean squared error)是衡量曲线对数据的拟合程度的度量,通过检查每个预测值与实际值之间的距离来确定;R方(R-squared)变量是由模型捕获的预测变量方差与总方差比例的度量。

下面是线性回归模型摘要的一些属性:

# in Python
summary = lrModel.summary
summary.residuals.show()
print summary.totalIterations
print summary.objectiveHistory
print summary.rootMeanSquaredError
print summary.r2

38. 标准线性回归实际上是称为广义线性回归的算法系列的一部分。对这种算法Spark中有两种的实现,一个是针对处理非常大的特征集(如前面的简单线性回归)进行了优化,而另一个则更为通用,包括对更多算法的支持,目前并不支持处理大量的特性值。

线性回归的广义形式使得可以更细粒度地控制使用各种回归模型,例如它可以选择不同的噪声分布,包括高斯分布(线性回归)、二项式分布(逻辑回归)、泊松分布(泊松回归)和伽玛分布(伽玛回归)。广义模型还支持设置链接函数(link function),指定线性预测器与分布函数均值之间的关系。下表列出了每种分布类型的可用链接函数:

这些配置是用于确定模型本身的基本结构而指定的,除了fitIntercept和regParam之外,广义线性回归还包括几个其他超参数:

(1)family。指定在模型中使用的误差分布。支持的选项包括Poisson(泊松分布),binomial(二项式分布),gamma(伽马分布),Caussian(高斯分布)和tweedie(tweedie分布)。

(2)link。链接函数的名称,指定线性预测器与分布函数平均值之间的关系。支持的选项是cloglog、probit、logit、reverse、sqrt、identity和log(默认是identity)

(3)solver。指定的优化算法。当前唯一支持的优化算法是irls (迭代重加权最小二乘法)。

(4)variancePower。Tweedie分布方差函数中的幂,它刻画了分布的方差和平均值之间的关系,仅适用于Tweedie 分布。支持的值为0和[1,无穷大),默认值为0。

(5)linkPower。Tweedie分布的乘幂链接函数索引。

广义线性回归的训练参数与逻辑回归的训练参数相同,此外添加了一个预测参数:linkPredictionCol,用于指定一个列名,为每个预测保存链接函数的输出。下面是一个使用GeneralizedLinearRegression的示例:

from pyspark.ml.regression import GeneralizedLinearRegression
glr = GeneralizedLinearRegression()\.setFamily("gaussian")\.setLink("identity")\.setMaxIter(10)\.setRegParam(0.3)\.setLinkPredictionCol("linkOut")
print glr.explainParams()
glrModel = glr.fit(df)

与简单线性模型一样,Spark也为广义线性模型提供训练摘要,它可以帮助确保模型适合该训练集的数据。需注意的是,这并不会提供更适合的运行算法,但它可以提供更多的信息。此信息包括许多不同的metric标准,用于分析算法的适应性,包括一些最常见的度量方法:

(1)R squared,R方指标,用户评估拟合度。

(2)The residuals,标签与预测值之间的差异。

39. 在回归中使用的决策树(Decision Tree)与用于分类的决策树很相似,主要区别是,用于回归分析的决策树不是在每个叶子节点上输出离散的标签,而是一个连续的数值,但是可解释性和模型结构仍然适用。简而言之,回归决策树只是创建一个树来预测数值输出,而不是试图训练系数来建模一个函数,这点很重要,因为与广义线性回归不同,可以预测输入数据中的非线性函数,这也造成了过拟合的风险,因此在调整和评估这些模型时需要小心。

用于回归的决策树模型超参数与用于分类的决策树超参数相同,只是在impurity参数上稍有变化,impurity指示模型是否应在某叶子节点分割(根据信息增益),回归树当前支持的唯一度量方法是"variance"。分类树和回归树也采用相同的训练参数,下面是使用决策树进行回归任务的例子:

from pyspark.ml.regression import DecisionTreeRegressor
dtr = DecisionTreeRegressor()
print dtr.explainParams()
dtrModel = dtr.fit(df)

随机森林(Random Forest)和梯度提升树(Gradient-Boosted Tree)模型可应用于分类和回归,它们与决策树具有相同的基本概念,不是训练一棵树而是训练很多树来做回归分析。在随机森林模型中,大量的非相关树被训练然后平均;使用梯度提升树时,每棵树进行加权预测(一些树对某些类的预测能力比其他的强)。随机森林和梯度提升树回归具有与相应分类模型相同的模型超参数和训练参数,只有纯度度量impurity不同,但是它与DecisionTreeRegressor的impurity一样设置。这里有一个如何使用这两个模型进行回归的例子:

from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import GBTRegressor
rf = RandomForestRegressor()
print rf.explainParams()
rfModel = rf.fit(df)
gbt = GBTRegressor()
print gbt.explainParams()
gbtModel = gbt.fit(df)

统计学家一般在对照实验中使用生存分析来了解个体的存活率。Spark实现了加速失效时间模型(生存分析变体),而不是描述实际生存时间来对生存时间进行记录,Spark实现这种生存分析变体,是因为更著名的Cox Proportional Hazard模型是半参数化的,也不能很好地扩展到大型数据集上。相比之下,加速失效时间模型是因为每个实例(行)都独立地贡献结果,这与Cox生存模型有不同的假设,因此这两个模型互相补充而不能取代其一。

生存回归对输入的要求与其他回归模型的要求非常相似,然而有一处不同,它引入了一个censor变量列。在科学分析中,测试对象会检查某个体何时退出,因为它们在实验结束时的状态可能是未知的。这一点很重要,因为不能假设在研究的某个中间点个体会输出结果。

保序回归(Isotonic Regression)是另一种特殊的回归模型,具有一些特别的要求。本质上,保序回归指定了一个总是单调递增的分段线性函数,它不会下降,这意味着如果数据在训练目标图中总是向上和向右,这是一个合适的模型。如果它随着输入会有波动变化,那么这是不合适的。下图说明了保序回归的情况:

40. 回归具有与分类相同的核心模型优化功能,可以指定一个Evaluator,选择一个metric标准来优化,然后训练pipeline在这一部分上执行参数调整。用于回归任务的评估器称为RegressionEvaluator,支持许多常见的回归度量标准。与分类评估器一样,RegressionEvaluator需要两项输入,一个表示预测值,另一个表示真实标签的值。支持的优化度量值包括均方根误差("rmse")、均方误差("mse")、r2度量("r2")、和平均绝对误差("mae")。要使用RegressionEvaluator,需要建立pipeline,指定想要测试的参数然后运行它,Spark将自动选择最佳的模型并将其返回:

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import GeneralizedLinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
glr = GeneralizedLinearRegression().setFamily("gaussian").setLink("identity")
pipeline = Pipeline().setStages([glr])
params = ParamGridBuilder().addGrid(glr.regParam, [0, 0.5, 1]).build()
evaluator = RegressionEvaluator()\.setMetricName("rmse")\.setPredictionCol("prediction")\.setLabelCol("label")
cv = CrossValidator()\.setEstimator(pipeline)\.setEvaluator(evaluator)\.setEstimatorParamMaps(params)\.setNumFolds(2) # 至少应该是3,但是这里数据太少所以设置成为2
model = cv.fit(df)

评估器允许根据一个特定的metric指标值来评估和调整模型,但也可以通过RegressionMetrics对象获得多个回归度量指标。与前面的分类指标一样,RegressionMetrics也是处理在RDD上的(预测值,标签值)对。下面看看如何检查之前训练好的模型的结果:

from pyspark.mllib.evaluation import RegressionMetrics
out = model.transform(df)\.select("prediction", "label").rdd.map(lambda x: (float(x[0]), float(x[1])))
metrics = RegressionMetrics(out)
print "MSE: " + str(metrics.meanSquaredError)
print "RMSE: " + str(metrics.rootMeanSquaredError)
print "R-squared: " + str(metrics.r2)
print "MAE: " + str(metrics.meanAbsoluteError)
print "Explained variance: " + str(metrics.explainedVariance)

五、推荐系统

41. 在Spark中有一个经常被使用的推荐算法,交替最小二乘法(ALS)。该算法利用一种叫做协同过滤(collaborative filtering)的方法,这种方法仅使用用户过去的采购数据做推荐,它并不需要或使用用户或项目的其他任何特征,它还支持多种ALS变体(例如,显式或隐式反馈)。除了ALS,Spark还提供了在购物篮分析中用于发现关联规则的频繁模式挖掘(Frequent Pattern Mining)技术。最后,Spark的RDD API提供了低级的矩阵分解运算。

ALS为每个用户和物品建立k维的特征向量,从而可以通过用户和物品向量的点积来估算该用户对该物品的评分值,所以只需要用户-物品键值对的评分数据作为输入数据集,其中有三列:用户id列、物品(如电影)id列和评分列。评分可以是显式的,即想要直接预测的数值等级;或隐式的,在这种情况下每个分数表示用户和物品之间的交互强度(例如,访问特定页的次数),它衡量用户对该物品的偏好程度。根据输入的DataFrame,模型将产生特征向量,可以使用它来预测用户对尚未评分的物品的评级。

在实践中需要注意的一个问题是,该算法偏好推荐那些非常常见的或者有大量信息的物品,如果正在推出一种还没有用户熟悉的新产品,该算法就不会推荐它。此外如果新用户加入平台,他们可能在训练集中没有任何打分记录,算法也将不知道推荐他们些什么,这些称为冷启动(Cold Start)问题。

在ALS中,这些配置用于确定模型的结构以及具体的协同过滤问题:

(1)rank。rank(秩)确定了用户和物品特征向量的维度,这通常是通过实验来调整,一个重要权衡是过高的秩导致过拟合,而过低的秩导致不能做出最好的预测,默认值为10。

(2)alpha。在基于隐式反馈(用户行为)的数据上进行训练时,alpha设置偏好的基线置信度(BaselineConfidence),这个值越大则越认为用户和他没有评分的物品之间没有关联。默认值1.0,需要通过实验调参。

(3)regParam。控制正则化参数来防止过拟合,需要测试不同的值来找到针对当前问题的最优值。默认值为0.1。

(4)implicitPrefs。此布尔值指定是在隐式数据(true)还是显式数据(false)上进行训练,应根据模型输入的数据来设置此值。如果数据是基于对产品的被动交互(如通过单击或网页访问),则应设置为隐式。与此相反,如果数据是显式分级的(例如,用户给这家餐厅4/5的星级),则应设置为显式。默认值是显式。

(5)nonnegative。如果设置为true,则将非负约束置于最小二乘问题上,并且只返回非负特征向量,这可以提高某些应用程序的性能。默认值为false。

交替最小二乘法的训练参数与其他模型中的训练参数不同,这是因为将会对数据在整个集群中的分布方式进行更低级的控制,分布在集群中的数据组称为数据块(block),确定在每个块中放置的数据对训练时间有很大影响(不是最终结果)。通常的经验法则是每个block大概分配一百万到五百万个评分值,如果每个block的数据量少于这个数字,则太多的block可能会影响性能

(1)numUserBlocks。用于确定将用户数据拆分成多少个数据块,默认值为10。

(2)numItemBlocks。用于确定将物品数据拆分为多少个数据块,默认值为10。

(3)maxIter。训练的迭代次数,改变它可能不会太影响结果,所以这不应该是调参的首要参数,默认值为10。在检查训练历史记录之后,如果发现迭代多少次之后误差曲线还没有平缓,可以考虑增大该值

(4)checkpointInterval。设置checkpoint可以在训练过程中保存模型状态,以便更快地从节点故障中恢复。可以使用SparkContext。setCheckpointDir设置检查点目录。

(5)seed。指定随机种子帮助复现实验结果。

预测参数用来指定如何用训练好的模型进行预测。在例子中有一个参数,就是冷启动策略(通过coldStartStrategy设置),该参数用来设置模型应给未出现过在训练集中的用户或物品推荐什么。当在实际生产中使用模型时,冷启动的挑战就会凸显出来,模型对新用户和物品没有评分的历史记录无法做出推荐。当在Spark的CrossValidator 或TrainValidationSplit 中使用简单随机拆分时,也可能发生这种情况,遇到不在训练集中的用户和物品非常常见。

默认情况下,Spark在遇到实际模型中不存在的用户和物品时,预测值将使用默认值NaN,但是这在训练过程中是不可取的,因为它会破坏Evaluator正确评估模型的能力,这使得无法合理地选择模型。用户可以将coldStartStrategy参数设置为drop,以便把在DataFrame中预测为NaN值的行删除,排除NaN数据后将通过非NaN数据计算评估指标。下面示例将使用MovieLens电影评分数据集,这个数据集具有与电影推荐相关的信息,首先使用此数据集来训练模型:

from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
ratings = spark.read.text("/data/sample_movielens_ratings.txt")\.rdd.toDF()\.selectExpr("split(value , '::') as col")\.selectExpr("cast(col[0] as int) as userId","cast(col[1] as int) as movieId","cast(col[2] as float) as rating","cast(col[3] as long) as timestamp")
training, test = ratings.randomSplit([0.8, 0.2])
als = ALS()\.setMaxIter(5)\.setRegParam(0.01)\.setUserCol("userId")\.setItemCol("movieId")\.setRatingCol("rating")
print als.explainParams()
alsModel = als.fit(training)
predictions = alsModel.transform(test)

现在可以针对每个用户或影片输出评分最高的k个推荐。模型的recommendForAllUsers方法返回对应某个userId的DataFrame,包含推荐电影的数组,以及每个影片的评分。recommendForAllItems返回对应某个movieId的DataFrame以及最有可能给该影片打高分的前几个用户:

#in Python
alsModel.recommendForAllUsers(10)\.selectExpr("userId", "explode(recommendations)").show()
alsModel.recommendForAllItems(10)\.selectExpr("movieId", "explode(recommendations)").show()

42. 在涉及到冷启动策略时,可以在使用ALS时设置自动化的模型评估器。有一件事可能不太明显,就是这个推荐问题其实只是一种回归问题,由于正在预测给定用户的评分值,希望通过优化措施以减少预测的用户评分和真实值之间的差别,可以使用RegressionEvaluator来执行,将其放在pipeline中以自动化训练过程。执行Evaluator操作时,还应将冷启动策略设置为drop而不是NaN然后在生产系统中实际进行预测时将其切换回NaN

from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator()\.setMetricName("rmse")\.setLabelCol("rating")\.setPredictionCol("prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = %f" % rmse)

推荐结果可以使用标准的回归metric指标和一些特定于推荐的metric指标来衡量。毫无疑问,在评估推荐结果的问题上,比单纯的基于回归的评估更为复杂,这些指标对于评估最终模型特别有用。可以继续使用回归的metric指标来评估推荐结果,简单地查看每个用户和项目的实际评级与预测值的接近程度

from pyspark.mllib.evaluation import RegressionMetrics
regComparison = predictions.select("rating", "prediction")\.rdd.map(lambda x: (x(0), x(1)))
metrics = RegressionMetrics(regComparison)

还有另一个工具就是排名指标RankingMetric将推荐结果与给定用户的实际评分值(或实际偏好)进行比较,它不关注具体的评分值,而是关注算法是否能够把已经打分的物品再次推荐给用户。这需要做一些数据准备工作,首先需要为给定的用户收集一组高评分的电影,在该例子中将使用一个相当低的阈值,即电影评分在2.5以上的电影,选择一个合适的阈值属于商业决策,如下所示:

from pyspark.mllib.evaluation import RankingMetrics, RegressionMetrics
from pyspark.sql.functions import col, expr
perUserActual = predictions\.where("rating > 2.5")\.groupBy("userId")\.agg(expr("collect_set(movieId) as movies"))

现在有一个用户的集合,连同他们对电影的真实打分,下面将为每个用户依据算法取得前10个推荐电影,再观察前10个推荐是否出现在排名前10的电影集合中,如果训练了一个很好的模型,它将正确地推荐用户真正喜欢的电影,否则它还没有能学习到用户的喜好:

perUserPredictions = predictions\.orderBy(col("userId"), expr("prediction DESC"))\.groupBy("userId")\.agg(expr("collect_list(movieId) as movies"))

现在有两个DataFrame、一个预测值,另一个是真实的打分最高的电影集合,可以将它们传递到RankingMetrics对象中,此对象接受这些组合的RDD作为参数,如以下join操作和RDD transformation的例子所示:

# in Python
perUserActualvPred = perUserActual.join(perUserPredictions, ["userId"]).rdd\.map(lambda row: (row[1], row[2][:15]))
ranks = RankingMetrics(perUserActualvPred)

现在,可以根据这个排名来度量推荐算法。例如,可以根据平均精度值指标来查看算法的准确度如何,还可以查看某排名位置之前的准确度,如查看在哪个排名位置开始正向预测开始大量失败:

# in Python
ranks.meanAveragePrecision
ranks.precisionAt(5)

除了ALS之外,MLlib提供的另一个推荐系统工具是频繁模式挖掘(Frequent Pattern Mining),有时称为购物篮分析(market basket analysis),根据原始数据发现关联规则。例如鉴于大量的交易,可能会发现购买热狗的用户几乎总是购买热狗面包。这种技术可以应用于推荐中,尤其是当人们在购物时。Spark实现了频繁模式挖掘的FP-growth算法。

六、无监督学习

43. 一般来说,无监督学习的使用频率低于有监督学习,因为它的最终结果通常难以评估好坏。这些挑战随着数据规模变大可能会加剧,例如高维空间中的聚类可能因为高维的属性而生成奇怪的聚类结果,这被称为维度灾难(curse of dimensionality)。维度灾难体现了这样一个事实:随着特征空间随着维度扩展,它变得越来越稀疏,这意味着随着维数的增加,填充此空间以获得具有统计意义的结果所需的数据会迅速增加。此外,高维度会带来更多噪音,这可能会使模型在噪音数据中训练从而导致奇怪的结果。

下面是一些可能的应用场景,这些模式可能会揭示数据中并不明显的主题、异常或分组:

(1)查找数据中的异常。如果数据集中的大多数值都集中在一个较大的组中,而其外只有几个小组,则这些小组可能需要进一步研究。

(2)主题建模。通过检查大量的文本,可以找到在这些不同文档中存在的共同主题。

就像前面其他模型一样,模型扩展性限制非常重要:

先加载一些数值数据:

from pyspark.ml.feature import VectorAssembler
va = VectorAssembler()\.setInputCols(["Quantity", "UnitPrice"])\.setOutputCol("features")sales = va.transform(spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/data/retail-data/by-day/*.csv").limit(50).coalesce(1).where("Description IS NOT NULL"))sales.cache()

44. k-means是最受欢迎的聚类算法之一。在此算法中,用户在数据集中随机选择数量为k的数据点作为处理聚类的聚类中心,未分配的点基于它们与这些聚类中心的相似度(以欧氏距离计算),然后被“分配”到离它们最近的聚类中。分配之后,再根据被分配到一个聚类的数据点再计算聚类的新中心(称为质心),并重复该过程,直到到达有限的迭代次数或直到收敛(即质心位置停止改变)

这种方法并不一定合理,例如如果实际的一个聚类可能被错误地拆分为两个,仅仅是因为开始的时候选择了两个初始点,因此以不同方式选择初始点来多次执行k-means会取得比较好的效果。选择正确的聚类个数k是保证kmeans算法有效的一个关键,这也是一个难点。Spark k-means的模型超参数就是k,训练参数有以下几个:

(1)initMode。初始化模式是确定质心初始位置的算法,支持的选项为random(随机初始化)和k-means| |(默认值)。后者是k-means||的并行化变体,它的思想不是简单地选择随机初始化位置,而是选择具有良好分布的聚类中心,以产生更好的聚类。

(2)initSteps。k-means||模式初始化所需要的步数。必须大于0,默认值是2

(3)maxIter。迭代次数,改变这个值可能不会对最终结果改变很大,所以这不应该是你首先考虑调整的参数。默认20。

(4)tol。该阈值指定质心改变小到该程度后,就认为模型已经优化的足够了,可以在迭代maxIter次之前停止运行。默认值为0.0001。

此算法对这些参数通常是具有鲁棒性的,主要是一个均衡问题,运行更多的初始化步骤和迭代次数可能会生成更好的聚类,代价则是更长的训练时间

from pyspark.ml.clustering import KMeans
km = KMeans().setK(5)
print km.explainParams()
kmModel = km.fit(sales)

k-means包括可用于评估模型的摘要类,该类为k-means运行成功提供了一些常用的衡量标准,k-means摘要包括有关创建聚类的信息以及聚类的相对大小(示例数)。还可以使用computeCost计算类内平方误差和,这可以帮助衡量聚类内数据点与每个聚类中心点相距有多近。k-means的隐式目标是,基于给定的聚类数量k来最小化聚类内平方误差的和:

#in Python
summary = kmModel.summary
print summary.clusterSizes # 中心点的数量
kmModel.computeCost(sales)
centers = kmModel.clusterCenters()
print("Cluster Centers: ")
for center in centers:print(center)

二分k-means是k-means的变体,关键区别在于它不是通过自下而上(bottom-up)地聚类,而是自上而下(top-down)的聚类方法。它首先创建一个组,然后将该组再拆分成较小的组,以此类推,直到满足用户指定的k个组。这通常是比k-means更快捷的方法,将得到与k-means不同的结果。模型超参数依然和k-means一样为k,训练参数为以下两个:

(1)minDivisibleClusterSize。指定一个可分聚类中的最少的数据点数(如果大于等于1.0)或数据点的最小比例(如果小于1.0),当聚类中数据点数小于该值时,聚类就不可再分割了。默认值为1.0,这意味着每个聚类中必须至少有一个点。

(2)maxIter。迭代次数,改变迭代次数可能不会对聚类的最终结果造成太大影响,所以这不应该是调参的首要选项。默认是20。

该模型中的大多数参数都应进行调参以找到最佳结果,没有适用于所有数据集的规则。代码示例如下所示:

from pyspark.ml.clustering import BisectingKMeans
bkm = BisectingKMeans().setK(5).setMaxIter(5)
print bkm.explainParams()
bkmModel = bkm.fit(sales)

二分k-means包括一个摘要类,可以使用它来评估模型。它和k-means摘要类大部分相同,包括有关创建的聚类信息以及聚类的相对大小(数据点数量):

summary = bkmModel.summary
print summary.clusterSizes # 中心点的数量
kmModel.computeCost(sales)
centers = kmModel.clusterCenters()
print("Cluster Centers: ")
for center in centers:print(center)

45. 高斯混合模型(Gaussian mixture models,GMM)是另一种流行的聚类算法,它的假设不同于二分k-means和k-means算法,这两种算法尝试通过降低数据点和聚类中心之间的距离平方和来对数据进行分组,而高斯混合模型假设每个聚类中的数据点符合高斯分布,这意味着数据点在聚类边缘(根据高斯分布)的可能性较小,而数据点在中心附近的概率更高。每个高斯聚类的均值和标准差各不相同,可以是任意大小(因此可能是各不相同的椭圆形)。在训练过程中仍然需要用户指定聚类的k值。

一种简单理解高斯混合模型的方法是,它们就像k-means的软聚类版本(软聚类soft clustering即每个数据点可以划分到多个聚类中),而k-means创建硬聚类(即每个点仅在一个聚类中),高斯混合模型GMM依照概率而不是硬性边界进行聚类。高斯混合的模型超参数也为k,训练参数为以下几个:

(1)maxIter。迭代次数,改变这个值可能不会对最终聚类结果有太大影响,所以它不应该是首先考虑调整的参数。默认为100。

(2)tol。指定一个阈值来代表将模型优化到什么程度就够了,越小的阈值将需要更多的迭代次数作为代价(不会超过maxIter),也可以得到更高的准确度。默认值为0.01。

与k-means模型一样,这些训练参数一般不会受到聚类数量k的影响,代码示例如下所示:

from pyspark.ml.clustering import GaussianMixture
gmm = GaussianMixture().setK(5)
print gmm.explainParams()
model = gmm.fit(sales)

与其他聚类算法一样,高斯混合模型包括一个摘要类来帮助模型评估,这包括创建的聚类信息,如高斯混合的权重、均值和协方差,这可以帮助进一步了解数据的隐藏信息:

# in Python
summary = model.summary
print model.weights
model.gaussiansDF.show()
summary.cluster.show()
summary.clusterSizes
summary.probability.show()

46. 隐含狄利克雷分布(LDA)是一种通常用于对文本文档执行主题建模的分层聚类模型。LDA试图从与这些主题相关联的一系列文档和关键字中提取高层次的主题,然后它将每个文档解释为多个输入主题的组合。可以使用两种实现方案:在线LDA(online LDA)和EM算法,一般来说当有更多的输入样本时,在线LDA表现得更好;当有较大的输入词库时,EM优化器效果更好。此方法还可以扩展到成百上千个主题。

要把文本数据输入进LDA中,首先要将其转化为数值格式,这可以通过CountVectorizer来实现。它的模型超参数为:

(1)k。用于指定从数据中提取的主题数量。默认值是10,并且必须是整数。

(2)docConcentration。文档分布的浓度(Concentration)参数向量(通常称为"alpha"),它是狄利克雷分布的参数,越大的值意味着越平滑(正则化程度更高)。如果未经用户设置,则docConcentration自动取值。如果设置为单值向量[alpha],则alpha被重复复制到长度为k的向量中,docConcentration向量的长度必须为k。

(3)topicConcentration。主题分布的浓度参数向量,通常命名为"beta"或"eta",它是一个对称狄利克雷分布的参数。如果用户未设置,则topicConcentration将被自动设置。

训练参数有以下几个:

(1)maxIter。最大迭代次数,改变这个值可能对结果不会有多大影响,所以这不应该是首先考虑调整的参数。默认值20。

(2)optimizer。指定是使用EM还是在线训练方法来优化LDA 模型。默认为online。

(3)learningDecay。学习率,即指数衰减率。应该介于(0.5、1.0]之间以保证渐近收敛。默认值为0.51,仅适用于在线优化程序。

(4)learningOffset。一个正数值的学习参数,在前几次迭代中会递减。较大的值使前期迭代次数较少。默认值为1024.0,仅适用于在线优化程序。

(5)optimizeDocConcentration。指示docConcentration(文档主题分布的狄利克雷参数)是否在训练过程中进行优化。默认值为true,但仅适用于在线优化程序。

(6)subsamplingRate。在微型批量梯度下降的每次迭代中采样的样本比例,范围是(0、1]。默认值为0.5,仅适用于在线优化程序。

(7)seed。该模型还支持指定一个随机种子,用于实验重现。

(8)checkpointInterval。和在分类算法中看到的一样,是用于设置检查点的参数。

LDA支持的预测参数为topicDistributionCol,即把每个文档的主题混合分布输出作为一列保存起来。代码示例如下所示:

from pyspark.ml.feature import Tokenizer, CountVectorizer
tkn = Tokenizer().setInputCol("Description").setOutputCol("DescOut")
tokenized = tkn.transform(sales.drop("features"))
cv = CountVectorizer()\.setInputCol("DescOut")\.setOutputCol("features")\.setVocabSize(500)\.setMinTF(0)\.setMinDF(0)\.setBinary(True)
cvFitted = cv.fit(tokenized)
prepped = cvFitted.transform(tokenized)from pyspark.ml.clustering import LDA
lda = LDA().setK(10).setMaxIter(5)
print lda.explainParams()
model = lda.fit(prepped)

训练完模型后,将看到一些排名靠前的主题,返回单词的索引。必须使用训练的CountVectorizerModel来找到这些单词的真实语义,例如训练后发现的前3个相关主题是hot、home、和brown:

# in Python
model.describeTopics(3).show()
cvFitted.vocabulary

这种方法会产生所用词的详细信息以及特定单词的强调,这些有助于更好地理解潜在的主题。使用类似API其实有更多的评估方法,如对数似然(log likelihood)和困惑度(perplexity),这些工具的目标是帮助用户根据数据的分布情况优化主题的数量,应该将这些指标应用到一个保留集(holdoutset)上,以减少模型的整体困惑度。另一种选择是以提高保留集的对数似然为目标的优化,可以通过将数据集传递到这两个函数来计算相应的指标值:model.logLikelihood和model.logPerplexity。

七、图分析

47. 是由节点或顶点(任意对象)和边(定义这些节点之间的关系)组成的数据结构,图分析是分析这些关系的过程,比如这个图可能是朋友圈,每个顶点或节点代表一个人,每一条边代表人与人之间的关系,下图给出了一个图结构的示例:

此关系图为无向图,因为边没有指定起点和终点,即没有方向。除此之外还有有向图,有向图中的边指定起始顶点和结束顶点,如下图所示边是有方向的:

图中的边和顶点也可以有与之关联的数据。在朋友关系图的例子中,边的权重可能代表不同的朋友之间的亲密关系,例如普通朋友之间的边权重较低,而结婚了的两人之间的边权重较大。可以通过查看节点之间的通信频率给边添加权重,每个顶点也可能有数据,如人名。图结构是刻画关系数据和许多问题的一种很自然的数据结构,而Spark提供了几种用来图分析的有用方法。可能的一些商业应用场景包括检测信用卡欺诈、模式发现、确定引文网络中的论文重要性(例如哪些论文引用量最高),以及网页排名,就像Google著名的PageRank算法。

Spark一直提供一个基于RDD的库来执行图处理:GraphX。它提供了一个非常低级的接口,非常强大,但就像RDD一样不易使用或优化。GraphX仍然是Spark的核心部分,而且仍然可以看到一些功能扩展。GraphX API提供很全面的的记录,因为它自创建以来没有什么变化。然而,社区最近基于Spark创建了一个新一代的图分析库:GraphFrame。GraphFrame扩展了GraphX ,提供DataFrame API和对Spark不同语言的支持,以便Python用户可以使用该工具利用其可扩展性,这里将集中讨论GraphFrame。

GraphFrame当前是一个Spark外部包,在启动Spark application时需要进行导入操作,但未来可能会合并到Spark的核心模块中。在大多数情况下GraphX和GraphFrame之间的性能差别不大(除了GraphFrame的用户体验改进),使用GraphFrame还有一些微量开销,但在多数情况下它会在适当的时候调用GraphX。对大多数人来说,用户体验提升的收益远远大于这个轻微的性能开销

这里将采用湾区共享单车门户(Bay Area Bike Share portal)开放的共享单车数据来做分析。首先需要指向合适的包,如果从命令行执行此操作,需要运行如下代码:

./bin/spark-shell --packages graphframes: graphframes: 0.5.0-spark2.2-s_2.11#in Python
bikeStations = spark.read.option("header","true")\.csv("/data/bike-data/201508_station_data.csv")
tripData = spark.read.option("header","true")\.csv("/data/bike-data/201508_trip_data.csv")

48. 进行图分析的第一步是构建图。要做到这一点需要定义顶点和边,这是具有一些命名列的DataFrame。在当前例子中创建一个有向图,具有从源地址指向目的地址的边,即在共享单车行程数据集中,包含从行程的开始位置到行程的结束位置。为了定义图,这里基于GraphFrames库中列的命名约定,在顶点表中,将车站名标识符定义为id(该示例中是字符串类型);在边表中,将每条边的源顶点ID标记为src,目的地顶点ID为dst:

# in Python
stationVertices = bikeStations.withColumnRenamed("name", "id").distinct()
tripEdges = tripData\.withColumnRenamed("Start Station", "src")\.withColumnRenamed("End Station", "dst")

接下来可以基于这些顶点DataFrame和边DataFrame创建一个GraphFrame对象来表示图。这里将利用缓存,因为在后面的查询中还要频繁访问此数据:

from graphframes import GraphFrame
stationGraph = GraphFrame(stationVertices, tripEdges)
stationGraph.cache()

现在,可以看到这个图的基本信息,也可以查询原始DataFrame以确保看到的是预期的结果:

# in Python
print "Total Number of Stations: " + str(stationGraph.vertices.count())
print "Total Number of Trips in Graph: " + str(stationGraph.edges.count())
print "Total Number of Trips in Original Data: " + str(tripData.count())

49. 操作图的最基本的方法是查询,诸如执行行程计数和根据某目的地进行过滤操作等,GraphFrame提供类似操作DataFrame的对于顶点和边的简单访问操作。要注意除了ID、源顶点和目的地顶点之外,这里的图还保留了数据中的其他列,因此也可以在需要时查询它们:

from pyspark.sql.functions import desc
stationGraph.edges.groupBy("src", "dst").count().orderBy(desc("count")).show(10)+--------------------+--------------------+-----+
| src| dst|count|
+--------------------+--------------------+-----+
|San Francisco Cal。。。| Townsend at 7th| 3748|
|Harry Bridges Pla。。。|Embarcadero at Sa。。。| 3145|
。。。
| Townsend at 7th|San Francisco Cal。。。| 2192|
|Temporary Transba。。。|San Francisco Cal。。。| 2184|
+--------------------+--------------------+-----+

还可以通过DataFrame表达式进行过滤操作。在这个例子中,想看看一个特定的站点和往返于该站的次数:

# in Python
stationGraph.edges\.where("src = 'Townsend at 7th' OR dst = 'Townsend at 7th'")\.groupBy("src", "dst").count()\.orderBy(desc("count"))\.show(10)+--------------------+--------------------+-----+
| src| dst|count|
+--------------------+--------------------+-----+
|San Francisco Cal。。。| Townsend at 7th| 3748|
| Townsend at 7th|San Francisco Cal。。。| 2734|
。。。
| Steuart at Market| Townsend at 7th| 746|
| Townsend at 7th|Temporary Transba。。。| 740|
+--------------------+--------------------+-----+

子图就是一个大图中的小图,可以使用下面的查询功能创建子图:

# in Python
townAnd7thEdges = stationGraph.edges\.where("src = 'Townsend at 7th' OR dst = 'Townsend at 7th'")
subgraph = GraphFrame(stationGraph.vertices, townAnd7thEdges)

50. 在模式发现(Motif Finding)中,Motif是图的结构化模式的一种表现形式。当指定一个motif时,查询的是数据中的模式而不是实际的数据。在GraphFrame中,采用具体领域语言(类似于Neo4J's的Cypher语言)来指定查询,此语言允许指定顶点和边的组合,并给它们指定名称。例如如果要指定给定顶点a通过边ab连接到另一个顶点b,就要指定(a)-[ab]-> (b),小括号或中括号内的名称不表示值,而是结果DataFrame中命名对应顶点和边的列名。如果不打算查询结果值,则可以省略名称(例如(a)-[]-> ())。

来对共享单车数据执行查询,找到在三个站点之间形成“三角”模式的所有行程,可以用下面的表达来声明motif,使用find()方法在GraphFrame中查找该模式,(a)表示起始站点,[ab]表示从(a)到下一站(b)的边。对于站点(b)到站点(c),以及从站点(c)到站点(a),重复此操作:

# in Python
motifs = stationGraph.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[ca]->(a)")

此查询的可视化表达如下所示:

运行此查询所得到的DataFrame 包含顶点a、b和c的嵌套字段以及各自的边,然后可以采用DataFrame查询它们,例如给定一辆自行车从a站到b站,再到c站,再到车站a的最短行程是哪条?下面代码将时间戳解析为Spark时间戳,然后进行比较以确保它是同一个自行车,从一个站到另一个站以及每个行程的开始时间都是正确的:

from pyspark.sql.functions import expr
motifs.selectExpr("*","to_timestamp(ab.`Start Date`, 'MM/dd/yyyy HH:mm') as abStart","to_timestamp(bc.`Start Date`, 'MM/dd/yyyy HH:mm') as bcStart","to_timestamp(ca.`Start Date`, 'MM/dd/yyyy HH:mm') as caStart")\.where("ca.`Bike #` = bc.`Bike #`").where("ab.`Bike #` = bc.`Bike #`")\.where("a.id != b.id").where("b.id != c.id")\.where("abStart < bcStart").where("bcStart < caStart")\.orderBy(expr("cast(caStart as long) - cast(abStart as long)"))\.selectExpr("a.id", "b.id", "c.id", "ab.`Start Date`", "ca.`End Date`")\.limit(1).show(1, False)

可以看到最快的路程是大约20分钟。需要注意,必须过滤这个例子中通过模式发现返回的三角行程。通常查询中使用的不同顶点ID不会强制匹配不同的顶点,所以如果需要不同的顶点,则应执行此类型的过滤。GraphFrames最强大的功能之一是,可以将模式发现与DataFarme查询合并到结果表中,以进一步缩小、排序或聚合找到的模式。

51. 广受欢迎的图算法之一是PageRank。PageRank通过计算网页链接的数量和质量来确定网站的重要性,它根本的假设是,重要的网站可能会被更多其他网站所链接。PageRank在网页排名之外的领域也有广泛应用,这里可以将改方法应用到共享单车数据集中,并得到重要的单车站(特别是那些有很多共享单车行经的车站),在这个例子中重要的单车站将被分配更大的排名值:

from pyspark.sql.functions import desc
ranks = stationGraph.pageRank(resetProbability=0.15, maxIter=10)
ranks.vertices.orderBy(desc("pagerank")).select("id", "pagerank").show(10)+--------------------+------------------+
| id| pagerank|
+--------------------+------------------+
|San Jose Diridon 。。。| 4。051504835989922|
|San Francisco Cal。。。|3。3511832964279518|
。。。
| Townsend at 7th| 1。568456580534273|
|Embarcadero at Sa。。。|1。5414242087749768|
+--------------------+------------------+

GraphFrame中的大多数算法都是接受参数的方法(例如这个PageRank示例中的resetProbability),大多数算法返回的是一个新的GraphFrame。算法的结果存储在GraphFrame的顶点和边上,或者是返回一个DataFrame。对于PageRank算法返回一个GraphFrame,可以从新的PageRank列中提取每个顶点的PageRank值。有趣的是,看到样例数据中Caltrain站的排名相当高。这是因为很多单车行程都与此站相关,人们要么从家到Caltrain站来上班,要么从Caltrain站回家。

52. 下面例子的图是有向图,这是由于单车行程是有向的,从一个站点开始并在另一个站点结束。一个常见的任务是计算进出某一站点的次数,为了测量进出车站的行程数,将分别使用一种名为“入度”和“出度”的metric,如下图所示:

入度和出度在社交网络中很有用,因为某些用户可能拥有比出连接(即他们关注的人)更多的入连接(即关注他们的人)。使用以下查询,可以在社交网络中找到更具影响力的人,GraphFrame提供了一种查询图顶点入度和出度的简单方法:

# in Python
inDeg = stationGraph.inDegrees
inDeg.orderBy(desc("inDegree")).show(5, False)

按照车站入度由高到低排序得到的查询结果如下所示:

+----------------------------------------+--------+
|id |inDegree|
+----------------------------------------+--------+
|San Francisco Caltrain (Townsend at 4th)|34810 |
|San Francisco Caltrain 2 (330 Townsend) |22523 |
|Harry Bridges Plaza (Ferry Building) |17810 |
|2nd at Townsend |15463 |
|Townsend at 7th |15422 |
+----------------------------------------+--------+

可以用相同的方式查询出度:

outDeg = stationGraph.outDegrees
outDeg.orderBy(desc("outDegree")).show(5, False)+---------------------------------------------+---------+
|id |outDegree|
+---------------------------------------------+---------+
|San Francisco Caltrain (Townsend at 4th) |26304 |
|San Francisco Caltrain 2 (330 Townsend) |21758 |
|Harry Bridges Plaza (Ferry Building) |17255 |
|Temporary Transbay Terminal (Howard at Beale)|14436 |
|Embarcadero at Sansome |14158 |
+---------------------------------------------+---------+

这两个值的比值是一个有趣的指标,更高的比值告诉了在哪里有大量的行程结束(但很少有行程从这里开始),而较低的值告诉了行程通常开始于哪里(但很少有行程在这里结束):

# in Python
degreeRatio = inDeg.join(outDeg, "id")\.selectExpr("id", "double(inDegree)/double(outDegree) as degreeRatio")
degreeRatio.orderBy(desc("degreeRatio")).show(10, False)
degreeRatio.orderBy("degreeRatio").show(10, False)

查询结果为:

+----------------------------------------+------------------+
|id |degreeRatio |
+----------------------------------------+------------------+
|Redwood City Medical Center |1。5333333333333334|
|San Mateo County Center |1。4724409448818898|
。。。
|Embarcadero at Vallejo |1。2201707365495336|
|Market at Sansome |1。2173913043478262|
+----------------------------------------+------------------+
+-------------------------------+------------------+
|id |degreeRatio |
+-------------------------------+------------------+
|Grant Avenue at Columbus Avenue|0。5180520570948782|
|2nd at Folsom |0。5909488686085761|
。。。
|San Francisco City Hall |0。7928849902534113|
|Palo Alto Caltrain Station |0。8064516129032258|
+-------------------------------+------------------+

53. 广度优先搜索(BFS)将搜索图以了解如何根据图中的边连接两组节点。在数据集上可能需要广度优先搜索来查找到不同站点间的最短路径,但该算法也适用于通过SQL 表达式指定的节点集合。可以指定maxPathLength来设定最大的边数量,还可以指定edgeFilter来筛选不符合要求的边,比如非业务时间内的行程。该例子中将选择两个接近的站点,因此不会运行太长时间,但是当在具有远距离连接的稀疏图上时,可以执行一些有趣的图遍历。关注一下这些车站(特别是其他城市的车站),看看是否可以获得遥远车站的连接:

# in Python
stationGraph.bfs(fromExpr="id = 'Townsend at 7th'",toExpr="id = 'Spear at Folsom'", maxPathLength=2).show(10)+--------------------+--------------------+--------------------+
| from| e0| to|
+--------------------+--------------------+--------------------+
|[65,Townsend at 7。。。|[913371,663,8/31/。。。|[49,Spear at Fols。。。|
|[65,Townsend at 7。。。|[913265,658,8/31/。。。|[49,Spear at Fols。。。|
。。。
|[65,Townsend at 7。。。|[903375,850,8/24/。。。|[49,Spear at Fols。。。|
|[65,Townsend at 7。。。|[899944,910,8/21/。。。|[49,Spear at Fols。。。|
+--------------------+--------------------+--------------------+

54. 连通分量(connected component)是一个(无向的)子图,它与自身子图有连接但不连接到其它子图,如下图中所示:

连通分量算法与这里的单车行程问题不太相关,因为该算法假定为无向图,但仍然可以运行该算法,只需要假设边数据没有方向性。事实上,如果看看共享单车位置地图,还是可以得到两个截然不同的连通分量:

要注意的是,运行连通分量算法需要设置一个checkpoint目录,它将在每次迭代后存储job的状态。这样如果job崩溃就可以继续重新启动。它可能是目前GraphFrame中运行代价最大的算法之一,因此预计会运行很长时间,因此在本机上运行此算法可能需要做数据采样,就像下面的代码示例中所示(采样可以避免由于GC问题而导致application崩溃):

# in Python
spark.sparkContext.setCheckpointDir("/tmp/checkpoints")minGraph = GraphFrame(stationVertices, tripEdges.sample(False, 0.1))
cc = minGraph.connectedComponents()

从这个查询中得到两个连通分量,但不一定是所期望的,这里的示例可能没有使用所有正确的数据或信息,因此可能需要更多的计算资源来进一步查询:

# in Python
cc.where("component != 0").show()+----------+------------------+---------+-----------+---------+------------+-----
|station_id| id| lat| long|dockcount| landmark|in。。。
+----------+------------------+---------+-----------+---------+------------+-----
| 47| Post at Kearney|37。788975|-122。403452| 19|San Franc。。。| 。。。
| 46|Washington at K。。。|37。795425|-122。404767| 15|San Franc。。。| 。。。
+----------+------------------+---------+-----------+---------+------------+-----

GraphFrame包括另一种作用于有向图的算法,即强连通分量(strongly connected components),它考虑到了方向性。强连通分量是一个子图,子图中所有顶点对之间都具有连接路径,如下所示:

# in Python
scc = minGraph.stronglyConnectedComponents(maxIter=3)
scc.groupBy("component").count().show()

以上只列举了GraphFrame支持的的一部分算法和功能,GraphFrame库还包括其它一些功能,例如通过消息传递接口(message-passing interface ,MPI)编写自己的算法、三角形计数以及与GraphX之间的转换等。

八、深度学习

55. 深度学习能够解决一些以前机器学习方法很难处理的问题,特别是处理非结构化数据(如图像、音频和文本)。由于深度学习仍然是一个新的领域,许多最新的Spark工具都是在外部库中实现的。

要定义深度学习,首先应该给出神经网络的概念。神经网络是具有权重和激活函数的一个节点网络,这些节点被组织到若干个堆叠在一起的层上,每个层的部分节点或全部节点连接到网络上的前一层。通过这种层结构,这些简单的函数可以学会识别输入中复杂的信号:一层识别简单线条,下一层识别圆圈和正方形,再下一层识别复杂的纹理,最后一层识别完整的对象。目标是通过调整与每个连接相关的权重和网络中每个节点的值来训练网络,使某些输入与某些输出相关联,下图显示了一个简单的神经网络:

深度学习或深层神经网络将许多这些层堆叠在一起,组成各种不同的体系结构。近年来由于规模更大的数据集(如ImageNet目标识别数据集)、性能强大的硬件、以及新的训练算法等的出现,可以训练规模更大的神经网络,获得的效果比以前的许多机器学习方法都更好。由于增加了更多的数据,传统的机器学习技术通常不能获得稳定的表现效果,可能已经达到了它们的上限。

深度学习可以从大量的数据和信息中受益,其输入数据的规模通常比其他机器学习方法的数据集要大几个数量级深度神经网络现在已经成为计算机视觉、语音处理和一些自然语言处理任务的标准方法,相比于以前的手工模型,它们能够学习到更好的特征,而且也还活跃在机器学习的其他领域。Spark在大数据处理和并行计算能力方面的优势使其成为深度学习非常合适的框架。

在大多数情况下,在Spark中使用深度学习可用来主要做以下三个任务:

(1)推断(Inference)。使用深度学习最简单方法是采用训练好的模型,然后用Spark并行地将其应用于大型数据集上。例如,可以使用像ImageNet这样训练好的标准图像分类模型,并将其应用于自己的图像上来识别熊猫、花卉或汽车,可以在偏好的深度学习框架上提取模型,然后使用Spark并行地使用它。使用PySpark,可以简单地在map函数中调用TensorFlow或PyTorch等框架来实现分布式推断,但还有一些库可以做进一步优化,而不仅仅是在map函数中调用这些库。

(2)特征化(Featurization)与迁移学习(Transfer Learning)。比前面所讲更复杂的是将现有模型用作特征提取器(featurizer),而不再是用它来获得最终的输出。许多深度学习模型因为接受过端到端任务的训练,在较低层次学习到了有用的特征表示。例如,对ImageNet数据集进行训练的分类器还将学习所有自然图像中存在的低级特征(如边缘和纹理)。

然后,可以使用这些特征为原始数据集未包括的新问题来学习模型,此方法称为迁移学习,通常涉及训练好的模型的最后几层,并使用感兴趣的数据对其进行再训练。如果没有大量的训练数据,迁移学习也特别有用:从头开始训练一个完整的网络需要一个像ImageNet这样的包含成千上万张图像的数据集,以避免过拟合,但在许多商业环境中没有这么多数据集。相比之下,迁移学习即使只有几千张图像也可以运行,因为它更新的参数较少

(3)模型训练(Model training)。Spark还可以用来从头训练一个新的深度学习模型。这里有两种常用的方法,首先可以使用Spark集群在多个服务器上并行训练某个单一模型,通过服务器之间的通信进行更新;或者一些库允许用户并行地训练相似模型的多个实例,以尝试各种模型结构和超参数,从而加速模型搜索和优化的过程。

在这两种情况下,使用Spark的深度学习库可以很方便地将数据从RDD和DataFrame传递给深度学习算法。最后,即使不希望并行训练模型,也可以使用这些库从集群中提取数据,并根据TensorFlow等框架的原始数据格式将其导出到单机训练脚本中。

在这三种情况下,深度学习代码通常作为更大应用程序的一部分运行,其中包括ETL步骤来分析输入数据、来自各种源的I/O以及可能的批处理或流式推断。对于应用程序的其他部分,可以简单地使用DataFrame、RDD和MLlib API。Spark的优点之一在于很容易将这些步骤合并到一个并行工作流中。

56. Spark的MLlib当前提供对一个深度学习算法的原生支持:多层感知机分类器(multilayer perceptron classifier)即ml.classification.MultilayerPerceptronClassifier类。该类仅限于训练层数相对浅的网络,该网络包含具有sigmoid激活函数的完全连接层、和具有softmax激活函数的输出层。当在现有的基于深度学习的特征提取器上使用迁移学习时,此类对于训练这个分类模型的最后几层非常有用。例如,它可以添加到后面描述的几个深度学习库之上,以快速执行Keras和TensorFlow模型的迁移学习:

(1)TensorFrame是一个用于推断和面向迁移学习的库,它便于在Spark DataFrame和TensorFlow之间传递数据。它支持Python和Scala接口,提供一个简单而优化的接口来支持数据从TensorFlow和Spark之间的传递,在Spark DataFrame上使用TensorFrame来应用模型通常比直接调用TensorFlow模型的一个Python map函数更有效,因为启动时间更少,数据传输更快。TensorFrame在处理推断问题上特别有用,无论是在流处理和批处理情形下,还是在迁移学习中,可以应用一个训练好的模型来处理原始数据,提取它的特征,然后使用MultilayerPerceptronClassifier来学习最后几层,甚至在数据上应用简单的逻辑回归或随机森林分类器。

(2)BigDL是由英特尔公司针对Spark开发的分布式深度学习框架,它旨在支持大模型的分布式训练以及这些模型的快速应用。BigDL相比于其他库的一个主要优点是,它主要优化使用CPU的训练而不是GPU训练,从而使其在基于CPU的集群如Hadoop上运行效率更高。BigDL提供高级API可以构建神经网络,并在默认情况下自动分布式化所有操作。它还可以训练由Keras DL库所描述的模型。

(3)TensorFlowOnSpark是一个广泛使用的库,可以用并行方式在Spark集群上训练TensorFlow模型。TensorFlow包括一些基础配置来进行分布式训练,但它仍然需要依靠集群管理器如yarn来管理硬件和数据通信,它不与集群管理器之外的分布式I/O层一起使用。TensorFlowOnSpark在Spark job中启动TensorFlow的分布式模式,并自动将数据从RDD或DataFrame传递给TensorFlow task。TensorFlowOnSpark 最初是由Yahoo!开发的,在其他大型组织中也有实际的生产使用。该项目还与Spark的ML Pipelines API进行了集成。

(4)DeepLearning4j是基于Java和Scala的一个开源分布式深度学习项目,支持单节点和分布式训练。相比于Python深度学习框架,其优点之一是它主要为JVM设计,使不希望在开发过程中添加Python 库的人感觉更方便。它包括各种各样的训练算法,并支持CPU也支持GPU。

(5)Deep Learning Pipelines(深度学习流水线)是Databricks公司的一个开源软件包,它将深度学习功能集成到Spark的ML Pipelines API中。目前TensorFlow和TensorFlow支持下的Keras深度学习引擎已经支持,它侧重于两个目标:

a. 将这些框架并入标准的Spark API(如ML Pipelines和SparkSQL),使之易于使用;

b. 默认情况下将计算分布式化。

例如,Deep Learning Pipelines提供一个DeepImageFeaturizer类,在ML Pipeline API中充当转换器,允许使用几行代码构建迁移学习pipeline(例如,通过在顶部添加感知器或逻辑回归分类程序)。同样,该库还支持使用MLlib的网格搜索和交叉验证API,对多个模型参数进行并行网格搜索。最后,用户可以在SparkSQL的UDF中使用机器学习模型,使其可供使用SQL或流数据的分析人员使用。下表总结了各种深度学习库及其支持的主要应用场景:

57. Deep Learning Pipelines通过将流行的深度学习框架和ML Pipeline与SparkSQL集成在一起,为可扩展的深度学习提供了高级API。建立在Spark的ML Pipelines上的Deep Learning Pipelines是为了训练,而基于DataFrame和SQL的pipeline是为了部署模型。在Spark中处理图像数据的一个挑战是,很难将图像数据转换成一个DataFrame。Deep Learning Pipelines包括一些工具函数,便于分布式加载和解码图像数据,在2.3.X版本及以上支持:

from sparkdl import readImages
img_dir = '/data/deep-learning-images/'
image_df = readImages(img_dir)

结果DataFrame中包含路径以及图像和它的一些元数据:

image_df.printSchema()root
|--filePath:string (nullable = false)
|--image:struct (nullable = true)
| |--mode:string (nullable = false)
| |--height:integer (nullable = false)
| |--width:integer (nullable = false)
| |--nChannels:integer (nullable = false)
| |--data:binary (nullable= false)

现在有了数据,可以开始执行一些简单的迁移学习。可以利用别人创建的模型,修改它以更好地匹配这里的目标,首先将加载每类花的数据,并创建训练集和测试集:

from sparkdl import readImages
from pyspark.sql.functions import lit
tulips_df = readImages(img_dir + "/tulips").withColumn("label", lit(1))
daisy_df = readImages(img_dir + "/daisy").withColumn("label", lit(0))
tulips_train, tulips_test = tulips_df.randomSplit([0.6, 0.4])
daisy_train, daisy_test = daisy_df.randomSplit([0.6, 0.4])
train_df = tulips_train.unionAll(daisy_train)
test_df = tulips_test.unionAll(daisy_test)

下一步将用一个名为DeepImageFeaturizer的转换器,可以利用它的一个名为Inception的预训练模型,它是一个成功用于图像模式识别的神经网络模型。使用这个训练好的模型可以很好地识别图像中的各种常见对象和动物,它是Keras库支持的标准预训练模型之一。然而,这个特殊的神经网络并没有被训练来识别雏菊和玫瑰,因此将使用迁移学习使其适合处理当前的问题:区分不同的花卉类型。

注意可以使用ML Pipeline概念,并将它与Deep Learning Pipelines一起使用:DeepImageFeaturizer只是一个ML转换器,此外仅添加了一个逻辑回归模型来做分类,也可以用其他分类模型。下面的代码演示如何添加此模型(这可能需要一段时间才能完成,因为它是一个相当耗费资源的过程):

from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from sparkdl import DeepImageFeaturizer
featurizer = DeepImageFeaturizer(inputCol="image", outputCol="features",modelName="InceptionV3")
lr = LogisticRegression(maxIter=1, regParam=0.05, elasticNetParam=0.3,labelCol="label")
p = Pipeline(stages=[featurizer, lr])
p_model = p.fit(train_df)

一旦训练好了模型,就可以使用前面预处理和特征工程部分中使用的分类评估器,指定要评估的指标,然后进行评估:

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
tested_df = p_model.transform(test_df)
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Test set accuracy = " + str(evaluator.evaluate(tested_df.select("prediction", "label"))))

对于这里的DataFrame 例子,可以检查在以前的训练中出错的行和图像:

from pyspark.sql.types import DoubleType
from pyspark.sql.functions import expr
# a simple UDF to convert the value to a double
def _p1(v):return float(v.array[1])
p1 = udf(_p1, DoubleType())
df = tested_df.withColumn("p_1", p1(tested_df.probability))
wrong_df = df.orderBy(expr("abs(p_1 - label)"), ascending=False)
wrong_df.select("filePath", "p_1", "label").limit(10).show()

58. Spark的DataFrame结构可以将深度学习模型应用于大规模数据集,Deep Learning Pipelines提供了一套转换器用于处理大规模的TensorFlow数据流图和TensorFlow支持的Keras模型。此外,其它流行的图像模型也可以应用,不需要TensorFlow或Keras代码,由TensorFrames库支持的转换器可以有效地处理Spark任务中模型分布和数据分布。

有许多标准的深度学习模型用来处理图像数据,如果自己的任务与模型适合解决的问题非常相似(例如使用ImageNet类进行对象识别),则只需指定模型名称即可使用转换器DeepImagePredictor。Deep Learning Pipelines支持Keras中的各种标准模型,下面是使用DeepImagePredictor 的示例:

from sparkdl import readImages, DeepImagePredictor
image_df = readImages(img_dir)
predictor = DeepImagePredictor(inputCol="image",outputCol="predicted_labels",modelName="InceptionV3",decodePredictions=True,topK=10)
predictions_df = predictor.transform(image_df)

使用该基本模型后,predicted_labels列很大概率是"daisy"(雏菊)。从概率值的差异可以看出,神经网络可以识别出两种花型,这个迁移学习示例显示,它能够从基本模型开始正确地学习出雏菊(daisy)和郁金香(tulip)之间的差异:

df = p_model.transform(image_df)

Deep Learning Pipelines还允许使用Spark以分布式方式应用Keras模型,它将加载一个Keras模型并将其应用于DataFrame列。Deep Learning Pipelies通过与TensorFlow的集成,可创建使用TensorFlow操作图像的自定义转换器,例如可以创建一个转换器来更改图像的大小或修改颜色,为此需要使用TFImageTransformer类。

另一种选择是将模型集成到SQL函数,这样熟悉SQL的用户能够使用深度学习模型。使用此函数后,生成的UDF函数将生成特定模型的输出并写出到一列,例如可以使用registerKeraImageUDF类将Inception v3应用于各种图像:

from keras.applications import InceptionV3
from sparkdl.udf.keras_image_model import registerKerasImageUDF
from keras.applications import InceptionV3
registerKerasImageUDF("my_keras_inception_udf", InceptionV3(weights="imagenet"))

这样,深度学习的力量可以被所有Spark用户所享用,而不仅仅是构建模型的专家。

Spark高级分析与机器学习笔记相关推荐

  1. 基于spark mllib_Spark高级分析指南 | 机器学习和分析流程详解(下)

    - 点击上方"中国统计网"订阅我吧!- 我们在Spark高级分析指南 | 机器学习和分析流程详解(上)快速介绍了一下不同的高级分析应用和用力,从推荐到回归.但这只是实际高级分析过程 ...

  2. 在 Apache Spark 中利用 HyperLogLog 函数实现高级分析

    在 Apache Spark 中利用 HyperLogLog 函数实现高级分析 预聚合是高性能分析中的常用技术,例如,每小时100亿条的网站访问数据可以通过对常用的查询纬度进行聚合,被降低到1000万 ...

  3. 机器学习笔记——支持向量机SMO算法完整版代码分析

    机器学习笔记--支持向量机SMO算法完整版代码分析 代码大体分析 外循环 参数类 内循环 KKT条件判断 eCache参数 完整SMO代码 添加核函数代码 代码参考书籍:<机器学习实战> ...

  4. 机器学习笔记5-Tensorflow高级API之tf.estimator

    前言 本文接着上一篇继续来聊Tensorflow的接口,上一篇中用较低层的接口实现了线性模型,本篇中将用更高级的API--tf.estimator来改写线性模型. 还记得之前的文章<机器学习笔记 ...

  5. 机器学习笔记 - 独立成分分析(ICA)

    1.概述 独立分量分析 (ICA) 是一种机器学习方法,其中将多元信号分解为不同的非高斯信号.它侧重于独立来源.由于混频处理未知,所以常用ICA作为黑盒.与寻求最大化数据点方差的主成分分析不同. 噪声 ...

  6. 机器学习笔记 - YOLO家族简介

    一.背景概述 目标检测是计算机视觉中最重要的课题之一.大多数计算机视觉问题都涉及检测视觉对象类别,如行人.汽车.公共汽车.人脸等.这一领域不仅限于学术界,而且在视频监控.医疗保健.车载传感和自动驾驶. ...

  7. 【机器学习笔记】可解释机器学习-学习笔记 Interpretable Machine Learning (Deep Learning)

    [机器学习笔记]可解释机器学习-学习笔记 Interpretable Machine Learning (Deep Learning) 目录 [机器学习笔记]可解释机器学习-学习笔记 Interpre ...

  8. Spark大数据分布式机器学习处理实战

    前言 Spark是一种大规模.快速计算的集群平台,本公众号试图通过学习Spark官网的实战演练笔记提升笔者实操能力以及展现Spark的精彩之处.有关框架介绍和环境配置可以参考以下内容: 大数据处理框架 ...

  9. Spark快速大数据分析——读书笔记

    --8.16开始整理 Spark快速大数据分析 推荐序: 一套大数据解决方案通常包含多个组件,从存储.计算和网络硬件层,到数据处理引擎,再到利用改良的统计和计算算法.数据可视化来获得商业洞见的分析层, ...

最新文章

  1. C语言中常用的数学公式
  2. Linux查看进程和进程管理
  3. oracle+字段+virtual,Oracle 11g新特性之--虚拟列(Virtual Column)
  4. OpenCV中LUT函数的使用
  5. 与JBoss Fuse,Jenkins和Nexus的持续集成
  6. 有这样的开发,产品经理跪着帮你擦汗!
  7. 华为户外模式怎么设置_华为FreeLace Pro降噪器效果怎么样?降噪开启和设置教程!...
  8. 找回华为云删除的通讯录_找回小米手机误删照片只要10秒!人人都知道的方法,你怎能不知道...
  9. eslint / prettier 检查格式配置、husky / lint-staged 强制校验、tslint 配置
  10. 利用netstat查看http为短连接还是长连接?
  11. 2019第十届蓝桥杯国赛C组C/C++第I题 胖子走迷宫
  12. 计算机电子科技生产质量标准,SJT9527-1993微型数字电子计算机质量分等标准.pdf...
  13. android基础的博客,【复习】Android基础
  14. Ubuntu查看Cuda是否全部安装成功
  15. 计算机网络原理ospf协议配置思考题,动态路由协议之OSPF理论篇(下)(含虚链路的实验)...
  16. 2022-5-17-验证外星语词典
  17. LeetCode 1278. 分割回文串 III
  18. 【远程编辑工具UE】超好用的工具UltraEdit(UE)远程连接Linux的方法,以及FTP Component Failuer连接失败的解决方法
  19. Visual Studio 2019 和 qt 5.15.1 下 opengl 的运用 - Lighting - 01 - Colors
  20. Alipay(支付宝)接口相关

热门文章

  1. C#使用消息队列(MSMQ)
  2. Oracle 19C 安装指引
  3. 前端图片加载闪烁问题
  4. rust guessing game
  5. Charles工具使用-pc端
  6. 笨办法学python2.0 习题1-10
  7. 电商营销策略介绍,电商营销手段有哪些
  8. HTML5与CSS3基础教程笔记
  9. datagridview 显示红色叉_显示器促销日常数码店招首页装修PSD模板分层psd素材
  10. html整体垂直居中,实现HTML元素垂直居中的六种方法