为什么要引入ML Pipeline

Spark MLlib是Spark的重要组成部分,也是最早推出的库之一,其基于RDD的API,算法比较丰富,比较稳定,也比较好用

划重点:
但是如果目标数据集结构复杂需要多次处理,或者是对新数据需要结合多个已经训练好的单个模型进行综合计算时,使用MLlib将会让程序结构复杂,甚至难于理解和实现。

为了改变这一局限性,从Spark1.2版本之后引入了ML Pipeline,经过多个版本的发展,Spark ML克服了MLlib在处理复杂机器学习问题的一些API库,以更加方便的构建复杂的机器学习工作流式应用,使整个机器学习过程变得更加易用、简洁、规范和高效。

Pipeline简介

ML提倡使用Pipeline,一般译为流水线,以便将多种算法更容易地组合成单个流水线或工作原理。
一个Pipeline在结构上会包含一个或多个Stage,每一个Stage都会完成一个任务,如数据处理、数据转化、模型训练、参数设置或数据预测等,其中两个主要的Stage为TransformerEstimator

Transformer主要是用来操作一个DataFrame数据并生成另外一个DataFrame数据,比如决策树模型、一个特征提取工具,都可以抽象为一个Transformer。
Estimator则主要是用来做模型拟合,用来生成一个Transformer。

这些Stage有序组成一个Pipeline。

Pipeline组件

Pipeline组件主要包括Transformer和Estimator

  • Transformer
    Transformer一般翻译为转换器,是一个Pipeline Stage,转换器包含特征变化学习模型
    从技术上来说,转换器通过方法transform(),在原始数据上增加一列或者多列来讲一个DataFrame转为另一个DataFrame。如:
    (1)一个特征转换器输入一个DataFrame,读取一个文本列,将其映射为新的特征向量列。输出一个新的带有特征向量列的DataFrame。
    (2)一个学习模型转换器输入一个DataFrame,读取包括特征向量的列,预测的每一个特征向量的标签。输出一个新的带有预测标签列的DataFrame。
  • Estimator
    Estimator可以被翻译为评估器或适配器,在Pipeline里通常是被用来操作DataFrame数据并生产一个Transformer,一个分类算法就是一个Estimator,因为它可以通过训练特征数据而得到一个分类模型。评估器用来拟合或者训练数据的学习算法或者任何算法。从技术上来说,评估器通过调用fit()方法,接受一个DataFrame产生一个模型,这个模型就是一个Transformer。比如,逻辑回归就是一个评估器,通过调用fit()来产生一个逻辑回归模型。

Pipeline原理

要构建一个Pipeline,首先需要定义Pipeline中的各个Stage,如指标提取和转换模型训练等。有了这些处理特定问题的Transformer和Estimator,我们就可以按照具体的处理逻辑来有序的组织Stages并创建一个Pipeline。
流水线由一系列有顺序的阶段指定,每个状态的运行时有顺序的,输入的DataFrame通过每个阶段进行改变。在转换器阶段,transform()方法被调用于DataFrame上。对于评估器阶段,fit()方法被调用来产生一个转换器,然后该转换器的transform()方法被调用在DataFrame上。下图简单说明了文档处理工作流的运行过程。

在上图中,第一行代表流水线处理的三个阶段。第一、二个阶段是转换器,第三个逻辑回归是评估器。底下一行代表流水线中的数据流,圆筒指DataFrame。流水线fit()方法被调用于原始的DataFrame中,里面包含原始的文档和标签。分词器的transform()方法将原始文档分为词语,添加新的词语列到DataFrame中。哈希处理的transform()方法将词语列转换为特征向量,添加新的向量列到DataFrame中。然后,因为逻辑回归是评估器,流水线先调用逻辑回归的fit()方法来产生逻辑回归模型。如果流水线还有其他更多阶段,在将DataFrame传入下一个阶段之前,流水线会先调用逻辑回归模型的transform()方法。
整个流水线是一个估计器。所以当流水线的fit()方法运行后,会产生一个流水线模型,流水线模型是转换器。流水线模型会在测试时被调用,下面的图示说明用法。

上面的图示中,流水线模型和原始流水线有同样数目的阶段,然而原始流水线中的估计器此时变为了转换器。当流水线模型的transform()方法被调用于测试数据集时,数据依次经过流水线的各个阶段。每个阶段的transform()方法更新数据集,并将之传到下个阶段。
流水线和流水线模型有助于确认训练数据和测试数据经过同样的特征处理流程
以上两图如果合并为一图,可用如下图形表达:

其中Pipeline及LogisticRegression都Estimator,Tokenizer,HashingTF,LogisticRegression Model为Transformer。

Pipeline实例

  • 使用Estimator, Transformer, and Param实例
    机器学习整个过程中,特征转换、特征选择、派生特征等工作,一般需要占据大部分时间,现在ML提供了很多Transformer,如OneHotEncoder、StringIndexer、PCA、Bucketizer、Word2vec等,利用这些函数可极大提高工作效率。
    以下通过实例说明如何使用ML库中Estimaor、Transformer和Param等。
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql.Row//从(标识、特征)元组开始训练数据.
val training = spark.createDataFrame(Seq((1.0, Vectors.dense(0.0, 1.1, 0.1)),(0.0, Vectors.dense(2.0, 1.0, -1.0)),(0.0, Vectors.dense(2.0, 1.3, 1.0)),(1.0, Vectors.dense(0.0, 1.2, -0.5))
)).toDF("label", "features")//创建一个LogisticRegression实例。 这个实例是一个估计器.
val lr = new LogisticRegression()
//打印参数,文档和任何默认值.
println("LogisticRegression parameters:\n" + lr.explainParams() + "\n")//我们可以使用setter方法来设置参数.
lr.setMaxIter(10).setRegParam(0.01)//训练LogisticRegression模型,这里使用了存储在lr中的参数。.
val model1 = lr.fit(training)
//由于模型1是模型(即由估计器生成的转换器),
//我们可以查看它在fit()中使用的参数。
//打印参数(名称:值)对,其中名称是唯一的ID,
// LogisticRegression实例。
println("Model 1 was fit using parameters: " + model1.parent.extractParamMap)//我们可以用ParamMap来指定参数,
//它支持几种指定参数的方法。
val paramMap = ParamMap(lr.maxIter -> 20).put(lr.maxIter, 30)  //指定1个参数。 这会覆盖原来的maxIter。.put(lr.regParam -> 0.1, lr.threshold -> 0.55)  // 指定多个参数。//也可以组合ParamMaps.
val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability")  // 修改输出列名
val paramMapCombined = paramMap ++ paramMap2//现在使用paramMapCombined参数学习一个新的模型。
// paramMapCombined覆盖之前通过lr.set *方法设置的所有参数。
val model2 = lr.fit(training, paramMapCombined)
println("Model 2 was fit using parameters: " + model2.parent.extractParamMap)// 准备测试数据
val test = spark.createDataFrame(Seq((1.0, Vectors.dense(-1.0, 1.5, 1.3)),(0.0, Vectors.dense(3.0, 2.0, -0.1)),(1.0, Vectors.dense(0.0, 2.2, -1.5))
)).toDF("label", "features")//使用Transformer.transform()方法对测试数据进行预测
// LogisticRegression.transform将仅使用“特征”列
//请注意,model2.transform()输出一个“myProbability”列,而不是通常的。
// 我们先前通过lr.probabilityCol参数重新命名了'probability'列
model2.transform(test).select("features", "label", "myProbability", "prediction").collect().foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) =>println(s"($features, $label) -> prob=$prob, prediction=$prediction")}
  • ML使用Pipeline实例
    要构建一个Pipeline,首先我们需要定义Pipeline中各个Pipeline Stage,如分词,计算IF-IDF及训练逻辑回归模型等。这些Transformer和Estimator创建后,我们就可以按照处理流程组成PipelineStages,并创建一个Pipeline,如:
val pipeline = new Pipeline().setStages(Array(stage1,stage2,...))

然后,把训练集数据作为参数并调用Pipeline实例的fit()方法之后,将以流程的方式来处理原训练数据。

import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row//准备训练文档,(id,内容,标签)
val training = spark.createDataFrame(Seq((0L, "a b c d e spark", 1.0),(1L, "b d", 0.0),(2L, "spark f g h", 1.0),(3L, "hadoop mapreduce", 0.0)
)).toDF("id", "text", "label")//配置ML Pipeline,由三个stage组成,tokenizer, hashingTF, and lr.
val tokenizer = new Tokenizer().setInputCol("text").setOutputCol("words")
val hashingTF = new HashingTF().setNumFeatures(1000).setInputCol(tokenizer.getOutputCol).setOutputCol("features")
val lr = new LogisticRegression().setMaxIter(10).setRegParam(0.001)
val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, lr))//在训练数据集上使用Pipeline
val model = pipeline.fit(training)// Now we can optionally save the fitted pipeline to disk
//现在可以保存安装好的流水线到磁盘上
model.write.overwrite().save("/tmp/spark-logistic-regression-model")//现在可以保存未安装好的Pipeline保存到磁盘上
pipeline.write.overwrite().save("/tmp/unfit-lr-model")// 装载模型
val sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model")//准备测试文档,不包含标签(id, text) .
val test = spark.createDataFrame(Seq((4L, "spark i j k"),(5L, "l m n"),(6L, "spark hadoop spark"),(7L, "apache hadoop")
)).toDF("id", "text")
//在测试文档上做出预测.
model.transform(test).select("id", "text", "probability", "prediction").collect().foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>println(s"($id, $text) --> prob=$prob, prediction=$prediction")}

后记

至少看三遍以上,才能有所顿悟,如果仅仅走马观花的话,不如不看。如果看的不是特别明白,可以试试把最后一个例子,不用pipeline实现一下,你会发现,在训练集和测试集上面得重复写两边一模一样的处理数据流程,并且每一步都得使用transform方法传入上一步处理后的DataFrame。写起来真的麻烦!

ML Pipeline原理相关推荐

  1. 诗人般的机器学习,ML工作原理大揭秘

    诗人般的机器学习,ML工作原理大揭秘 https://www.cnblogs.com/DicksonJYL/p/9698208.html 选自arXiv 作者:Cassie Kozyrkov 机器之心 ...

  2. 使用spark ml pipeline进行机器学习

    一.关于spark ml pipeline与机器学习 一个典型的机器学习构建包含若干个过程 1.源数据ETL 2.数据预处理 3.特征选取 4.模型训练与验证 以上四个步骤可以抽象为一个包括多个步骤的 ...

  3. 使用 Spark ML Pipeline 进行机器学习

    Spark ML Pipeline 的引入,是受到 scikit-learn 的启发,虽然 MLlib 已经足够简单实用,但如果目标数据集结构复杂,需要多次处理,或是在学习过程中,要使用多个转化器 ( ...

  4. RAxML下载与使用与ML建树原理

    >ML建树原理 1.选择模型:首先需要根据不同的分子序列特点和研究目的,选择合适的进化模型.常用的模型包括JC模型.K80模型.HKY模型.GTR模型等. 2.构建初始树:为了进行ML优化,需要 ...

  5. MLOps极致细节:18. Azure ML Pipeline(机器学习管道),Azure Container Instances (ACI)部署模型

    MLOps极致细节:18. Azure ML Pipeline(机器学习管道),Azure Container Instances (ACI)部署模型 在之前的章节中,我们已经完成了数据预处理,机器学 ...

  6. MLOps极致细节:16. Azure ML Pipeline(机器学习管道),Azure Compute Instance搭建与使用

    MLOps极致细节:16. Azure ML Pipeline(机器学习管道),Azure Compute Instance搭建与使用 这篇博客与下篇博客,我们将介绍Azure ML Pipeline ...

  7. MLOps极致细节:17. Azure ML Pipeline(机器学习管道),模型训练,打包和注册

    MLOps极致细节:17. Azure ML Pipeline(机器学习管道),模型训练,打包和注册 这两个章节中,我们将介绍Azure ML Pipeline的使用,并且结合MLFlow一起跟踪ML ...

  8. Rasa课程、Rasa培训、Rasa面试系列之: Rasa 3.0 ML Pipeline

    Rasa课程.Rasa培训.Rasa面试系列之: Rasa 3.0 ML Pipeline Rasa开源3.0将开始使用一个新的计算后台.从概念上讲,机器学习管道将类似于一个图,而不是一个组件序列.本 ...

  9. Redis Pipeline 原理及注意事项

    Pipeline介绍 Pipeline指的是管道技术,指的是客户端允许将多个请求依次发给服务器,过程中而不需要等待请求的回复,在最后再一并读取结果即可. 当client 使用pipelining 发送 ...

最新文章

  1. 菜鸡记录-王爽-汇编语言-实验十(编写子程序-显示字符串)
  2. Android--SoundPool
  3. iframe,window,滚动栏的一些问题
  4. 工业用微型计算机(23)-汇编语言基本结构
  5. Arduino--LCD1602(IIC)
  6. 海口只有阳光沙滩?错,人家还是“最佳智慧城市”
  7. [WPF系列]-Deep Zoom
  8. 计算机学校的奖项,2017年度国家科学技术奖各奖项公示
  9. axios 的简单使用
  10. 好心酸!三星可折叠屏手机Galaxy Fold下月也无法发货
  11. OpenShift Security (9) - 用 RHACS 扫描 Log4j 安全漏洞,屏蔽不安全镜像部署
  12. Linux下批量添加用户
  13. net core体系-web应用程序-4asp.net core2.0 项目实战(CMS)-第二章 入门篇-快速入门ASP.NET Core看这篇就够了...
  14. 【论文阅读笔记】《Simple, Accurate, and Robust Projector-Camera Calibration》
  15. 阿里高频面试题:如何快速判断元素是不是在集合里?
  16. [Cocos Creator] 本地文件加载系列六:本地龙骨动画dragonbones加载(web模式)
  17. MySQL 8.0如何配置my.cnf
  18. html旅游网站作品简介,旅游网站html模板
  19. SpringMVC基础入门
  20. 智慧校园人员定位系统解决方案

热门文章

  1. Mysql Hash索引和B-Tree索引区别(Comparison of B-Tree and Hash Indexes)
  2. React.js小书总结
  3. jQuery框架学习
  4. python 一些练习 (初学)
  5. 挑战练习题2.3动态规划 poj3046 Ant Counting dp
  6. vmware虚拟机Centos安装samba全过程
  7. 涨姿势 | 如何修复硬盘,以及如何避免硬盘损坏
  8. CA ARCserve Backup系列(1)—安装
  9. 自定义View----滑动刻度尺与流式布局 实例(四)
  10. 使用iScroll实现上、下滑动刷新和加载更多数据