PySpark ML(转换器)

在PySpark中包含了两种机器学习相关的包:MLlib和ML,二者的主要区别在于MLlib包的操作是基于RDD的,ML包的操作是基于DataFrame的。根据之前我们叙述过的DataFrame的性能要远远好于RDD,并且MLlib已经不再被维护了,所以在本专栏中我们将不会讲解MLlib。

ML简介

在ML包中主要包含了三个主要的抽象类:转换器、评估器、管道,本文先来介绍第一种抽象类——转换器。

转换器

在PySpark中,我们通常通过将一个新列附加到DataFrame来转换数据。

  • Binarizer()
  • 用处:根据指定的阈值将连续变量转换为对应的二进制值。
  • 使用方法示例:
from pyspark.ml.feature import Binarizer
df = spark.createDataFrame([(0.5, ), (1.0, ), (1.5, )], ['values'])
binarizer = Binarizer(threshold=0.7, inputCol="values", outputCol="features")
binarizer.transform(df).show()# 结果展示
+------+--------+
|values|features|
+------+--------+
|   0.5|     0.0|
|   1.0|     1.0|
|   1.5|     1.0|
+------+--------+
  • Bucketizer()
  • 用处:将连续变量离散化到指定的范围区间。
  • 使用方法示例:
from pyspark.ml.feature import Bucketizer
values = [(0.1, ), (0.4, ), (1.2, ), (1.5, ), (float("nan"), ),(float("nan"), )]
df = spark.createDataFrame(values, ["values"])
# splits 为分类区间
bucketizer = Bucketizer(splits=[-float("inf"), 0.5, 1.4,float("inf")],inputCol="values",outputCol="buckets")
bucketed = bucketizer.setHandleInvalid("keep").transform(df)
bucketed.show()# 结果展示
+------+-------+
|values|buckets|
+------+-------+
|   0.1|    0.0|
|   0.4|    0.0|
|   1.2|    1.0|
|   1.5|    2.0|
|   NaN|    3.0|
|   NaN|    3.0|
+------+-------+
  • ChiSqSelector()

  • 用处:使用卡方检验完成选择。

  • 使用方法示例:

from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import ChiSqSelector
df = spark.createDataFrame([(Vectors.dense([0.0, 0.0, 18.0, 1.0]), 1.0),(Vectors.dense([0.0, 1.0, 12.0, 0.0]), 0.0),(Vectors.dense([1.0, 0.0, 15.0, 0.1]), 0.0)],["features", "label"])
selector = ChiSqSelector(numTopFeatures=2, outputCol="selectedFeatures")
model = selector.fit(df)
model.transform(df).show()# 结果展示
+------------------+-----+----------------+
|          features|label|selectedFeatures|
+------------------+-----+----------------+
|[0.0,0.0,18.0,1.0]|  1.0|      [18.0,1.0]|
|[0.0,1.0,12.0,0.0]|  0.0|      [12.0,0.0]|
|[1.0,0.0,15.0,0.1]|  0.0|      [15.0,0.1]|
+------------------+-----+----------------+
  • CountVectorizer()

  • 用处:从数据集中学习某种模式,对数据进行标记

  • 使用方法示例:

from pyspark.ml.feature import CountVectorizer
df = spark.createDataFrame([(0, ["a", "b", "c"]),(1, ["a", "b", "b", "c", "a"])], ["label", "raw"])
cv = CountVectorizer(inputCol="raw", outputCol="vectors")
model = cv.fit(df)
model.transform(df).show(truncate=False)# 结果展示
+-----+---------------+-------------------------+
|label|raw            |vectors                  |
+-----+---------------+-------------------------+
|0    |[a, b, c]      |(3,[0,1,2],[1.0,1.0,1.0])|
|1    |[a, b, b, c, a]|(3,[0,1,2],[2.0,2.0,1.0])|
+-----+---------------+-------------------------+
  • ElementwiseProduct()

  • 用处:返回传入向量和参数scalingVec的乘积

  • 使用方法示例:

from pyspark.ml.feature import ElementwiseProduct
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([(Vectors.dense([2.0, 1.0, 3.0]), )], ["values"])
ep = ElementwiseProduct(scalingVec=Vectors.dense([1.0, 2.0, 3.0]),inputCol="values",outputCol="eprod")
ep.transform(df).show()
ep.setParams(scalingVec=Vectors.dense([2.0, 3.0, 5.0])).transform(df).show()# 结果展示
+-------------+-------------+
|       values|        eprod|
+-------------+-------------+
|[2.0,1.0,3.0]|[2.0,2.0,9.0]|
+-------------+-------------++-------------+--------------+
|       values|         eprod|
+-------------+--------------+
|[2.0,1.0,3.0]|[4.0,3.0,15.0]|
+-------------+--------------+
  • MaxAbsScaler()

  • 用处:将数据调整到[-1,1]范围内(不会移动数据的中心)

  • 使用方法示例:

from pyspark.ml.feature import MaxAbsScaler
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([(Vectors.dense([1.0]), ),(Vectors.dense([2.0]), )], ["a"])
maScaler = MaxAbsScaler(inputCol="a", outputCol="scaled")
model = maScaler.fit(df)
model.transform(df).show()# 结果展示
+-----+------+
|    a|scaled|
+-----+------+
|[1.0]| [0.5]|
|[2.0]| [1.0]|
+-----+------+
  • MinMaxScaler()

  • 用处:将数据缩放到[0,1]范围内(最大最小归一化)。

  • 使用方法示例:

from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([(Vectors.dense([0.0]), ),(Vectors.dense([2.0]), )], ["a"])
mmScaler = MinMaxScaler(inputCol="a", outputCol="scaled")
model = mmScaler.fit(df)
print(model.originalMin, model.originalMax)
model.transform(df).show()# 结果展示
[0.0] [2.0]
+-----+------+
|    a|scaled|
+-----+------+
|[0.0]| [0.0]|
|[2.0]| [1.0]|
+-----+------+
  • NGram()

  • 用处:返回NGram算法后的结果。

  • 使用方法示例:

from pyspark.ml.feature import NGram
from pyspark.sql import Row
df = spark.createDataFrame([Row(inputTokens=["a", "b", "c", "d", "e"])])
ngram = NGram(n=2, inputCol="inputTokens", outputCol="nGrams")
ngram.transform(df).show()# 结果展示
+---------------+--------------------+
|    inputTokens|              nGrams|
+---------------+--------------------+
|[a, b, c, d, e]|[a b, b c, c d, d e]|
+---------------+--------------------+
  • Normalizer()

  • 用处:使用p范数将数据缩放为单位范数(默认为L2)。

  • 使用方法示例:

from pyspark.ml.feature import Normalizer
from pyspark.ml.linalg import Vectors
svec = Vectors.sparse(4, {1: 4.0, 3: 3.0})
df = spark.createDataFrame([(Vectors.dense([3.0, -4.0]), svec)],["dense", "sparse"])
normalizer = Normalizer(p=2.0, inputCol="dense", outputCol="features")
normalizer.transform(df).show()# 结果展示
+----------+-------------------+----------+
|     dense|             sparse|  features|
+----------+-------------------+----------+
|[3.0,-4.0]|(4,[1,3],[4.0,3.0])|[0.6,-0.8]|
+----------+-------------------+----------+
  • OneHotEncoderEstimator()

  • 用处:将分类列编码为二进制向量列(独热编码)。

  • 使用方法示例:

from pyspark.ml.feature import OneHotEncoderEstimator
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([(0.0, ), (1.0, ), (2.0, )], ["input"])
ohe = OneHotEncoderEstimator(inputCols=["input"], outputCols=["output"])
model = ohe.fit(df)
model.transform(df).show()# 结果展示
+-----+-------------+
|input|       output|
+-----+-------------+
|  0.0|(2,[0],[1.0])|
|  1.0|(2,[1],[1.0])|
|  2.0|    (2,[],[])|
+-----+-------------+
  • PCA()

  • 用处:使用主成分分析执行数据降维(PCA算法)。

  • 使用方法示例:

from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors
data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]), ),(Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]), ),(Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]), )]
df = spark.createDataFrame(data, ["features"])
pca = PCA(k=2, inputCol="features", outputCol="pca_features")
model = pca.fit(df)
model.transform(df).show(truncate=0)# 结果展示
+---------------------+----------------------------------------+
|features             |pca_features                            |
+---------------------+----------------------------------------+
|(5,[1,3],[1.0,7.0])  |[1.6485728230883807,-4.013282700516296] |
|[2.0,0.0,3.0,4.0,5.0]|[-4.645104331781534,-1.1167972663619026]|
|[4.0,0.0,0.0,6.0,7.0]|[-6.428880535676489,-5.337951427775355] |
+---------------------+----------------------------------------+
  • QuantileDiscretizer()

  • 用处:传入一个numBuckets参数,该方法通过计算数据的近似分位数来决定分隔应该是什么。

  • 使用方法示例:

from pyspark.ml.feature import QuantileDiscretizer
values = [(0.1, ), (0.4, ), (1.2, ), (1.5, ), (float("nan"), ),(float("nan"), )]
df = spark.createDataFrame(values, ["values"])
qds = QuantileDiscretizer(numBuckets=2,inputCol="values",outputCol="buckets",relativeError=0.01,handleInvalid="error")
bucketizer = qds.fit(df)
qds.setHandleInvalid("keep").fit(df).transform(df).show()# 结果展示
+------+-------+
|values|buckets|
+------+-------+
|   0.1|    0.0|
|   0.4|    1.0|
|   1.2|    1.0|
|   1.5|    1.0|
|   NaN|    2.0|
|   NaN|    2.0|
+------+-------+
  • RegexTokenizer()

  • 用处:使用正则表达式的字符串分词器。

  • 使用方法示例:

from pyspark.ml.feature import RegexTokenizer
df = spark.createDataFrame([("A B  c", )], ["text"])
reTokenizer = RegexTokenizer(inputCol="text", outputCol="words")
reTokenizer.transform(df).show()# 结果展示
+------+---------+
|  text|    words|
+------+---------+
|A B  c|[a, b, c]|
+------+---------+
  • StandardScaler()

  • 用处:数据标准化。

  • 使用方法示例:

from pyspark.ml.feature import StandardScaler
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([(Vectors.dense([0.0]), ),(Vectors.dense([2.0]), )], ["a"])
standardScaler = StandardScaler(inputCol="a", outputCol="scaled")
model = standardScaler.fit(df)
print(model.mean, model.std)
model.transform(df).show()# 结果展示
[1.0] [1.4142135623730951]
+-----+-------------------+
|    a|             scaled|
+-----+-------------------+
|[0.0]|              [0.0]|
|[2.0]|[1.414213562373095]|
+-----+-------------------+
  • StopWordsRemover()

  • 用处:从标记文本中删除停用词。

  • 使用方法示例:

from pyspark.ml.feature import StopWordsRemover
df = spark.createDataFrame([(["a", "b", "c"], )], ["text"])
remover = StopWordsRemover(inputCol="text", outputCol="words", stopWords=["b"])
remover.transform(df).show()# 结果展示
+---------+------+
|     text| words|
+---------+------+
|[a, b, c]|[a, c]|
+---------+------+
  • Tokenizer()

  • 用处:将字符串转成小写,然后以空格为分隔符分词。

  • 使用方法示例:

from pyspark.ml.feature import Tokenizer
df = spark.createDataFrame([("ASD VA c", )], ["text"])
tokenizer = Tokenizer(inputCol="text", outputCol="words")
tokenizer.transform(df).show()# 结果展示
+--------+------------+
|    text|       words|
+--------+------------+
|ASD VA c|[asd, va, c]|
+--------+------------+
  • VectorSlicer()

  • 用处:给定一个索引列表,从特征向量中提取值(作用于特征向量,不管是密集的还是稀疏的)。

  • 使用方法示例:

from pyspark.ml.feature import VectorSlicer
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([(Vectors.dense([-2.0, 2.3, 0.0, 0.0, 1.0]), ),(Vectors.dense([0.0, 0.0, 0.0, 0.0, 0.0]), ),(Vectors.dense([0.6, -1.1, -3.0, 4.5, 3.3]), )],["features"])
vs = VectorSlicer(inputCol="features", outputCol="sliced", indices=[1, 4])
vs.transform(df).show(truncate=0)# 结果展示
+-----------------------+----------+
|features               |sliced    |
+-----------------------+----------+
|[-2.0,2.3,0.0,0.0,1.0] |[2.3,1.0] |
|[0.0,0.0,0.0,0.0,0.0]  |[0.0,0.0] |
|[0.6,-1.1,-3.0,4.5,3.3]|[-1.1,3.3]|
+-----------------------+----------+
  • VectorAssembler()

  • 用处:将多个数字(包括向量)列合并为一列向量。

  • 使用方法示例:

from pyspark.ml.feature import VectorAssembler
df = spark.createDataFrame([(1, 0, 3)], ["a", "b", "c"])
vecAssembler = VectorAssembler(inputCols=["a", "b", "c"], outputCol="features")
vecAssembler.transform(df).show()# 结果展示
+---+---+---+-------------+
|  a|  b|  c|     features|
+---+---+---+-------------+
|  1|  0|  3|[1.0,0.0,3.0]|
+---+---+---+-------------+
  • Word2Vec()

  • 用处:将一个句子(字符串)作为输入,将其转换为{string, vector}格式的映射。

  • 使用方法示例:

from pyspark.ml.feature import Word2Vec
sent = ("a b " * 100 + "a c " * 10).split(" ")
doc = spark.createDataFrame([(sent, ), (sent, )], ["sentence"])
word2Vec = Word2Vec(vectorSize=5,seed=42,inputCol="sentence",outputCol="model")
model = word2Vec.fit(doc)
model.getVectors().show()# 结果展示
+----+--------------------+
|word|              vector|
+----+--------------------+
|   a|[0.09461779892444...|
|   b|[1.15474212169647...|
|   c|[-0.3794820010662...|
+----+--------------------+

小结

在PySpark中转换器实现的功能类似于数据转换,这种生成新数据的数据预处理形式是我们在进行机器学习任务过程中重要的一部分呢,下一篇我们将会讲解第二种抽象类:估计器。

PySpark ML(转换器)相关推荐

  1. pyspark ml 中LogisticRegression的使用

    前置概念 ML包公开了三个主要的抽象类:转换器(transformer).评估器(estimator)和管道(pipeline). 转换器,通常通过将一个新列附加到DataFrame来转换数据,其常见 ...

  2. PySpark︱pyspark.ml 相关模型实践

    文章目录 1 pyspark.ml MLP模型实践 模型存储与加载 9 spark.ml模型评估 MulticlassClassificationEvaluator 1 pyspark.ml MLP模 ...

  3. 调用pyspark.ml库对信用卡逾期进行预测

    导包 from pyspark import SparkConf,SparkContext from pyspark.ml.classification import RandomForestClas ...

  4. pyspark入门---机器学习实战预测婴儿出生率(二)使用ML库

    机器学习实战预测婴儿出生率 1.加载数据 2.创建转换器 3.创建预测器 4.创建管道 5.训练模型 6.使用BinaryClassificationEvaluator对模型评估 7.模型保存与调用 ...

  5. ML Pipelines管道

    ML Pipelines管道 In this section, we introduce the concept of ML Pipelines. ML Pipelines provide a uni ...

  6. 【机器学习】在大数据上使用PySpark进行K-Means

    作者 | Angel Das 编译 | VK 来源 | Towards Data Science 如果你不熟悉K Means聚类,我建议你阅读下面的文章.本文主要研究数据并行和聚类,大数据上的K-Me ...

  7. dataframe 筛选_Spark.DataFrame与Spark.ML简介

    本文是PySpark销量预测系列第一篇,后面会陆续通过实战案例详细介绍PySpark销量预测流程,包含特征工程.特征筛选.超参搜索.预测算法. 在零售销量预测领域,销售小票数据动辄上千万条,这个量级在 ...

  8. pyspark sparksession_PySpark 处理数据和数据建模

    安装相关包 from pyspark.sql import SparkSession from pyspark.sql.functions import udf, when, count, count ...

  9. Python大数据处理库 PySpark实战 总结四

    Python大数据处理库 PySpark实战四 ETL 实战 实验数据来源 数据加载 观察资料 选择.筛选与聚合 机器学习实战 实验数据来源 数据加载 统计描述 清洗与变形 Pipeline 逻辑回归 ...

最新文章

  1. 什么是Singleton?
  2. Nacos 1.1.4 发布,业界率先支持 Istio MCP 协议
  3. 日常笔记-css\html篇
  4. 《学习OpenCV3(中文版)》图书目录
  5. apple watch自身不会让你更健康,而这些会
  6. docker kaniko push推送镜像至harbor报错:x509: certificate signed by unknown authority(命令中添加 --skip-tls-ver)
  7. MySQL数据库的设计和命令行模式下建立详细过程
  8. (4)回归决策树_预测波士顿房价
  9. Warez出品的精品动画,近25万倍的压缩,大小仅有64K的
  10. Zookeeper之数据同步原理
  11. 嵌入式开发有年龄限制吗_32岁入门晚不晚?来听听这位70后程序员的故事
  12. Java-JavaWeb—(4)MySQL数据库
  13. background 互联网图片_“background-image:url(data:image”data类型的Url格式简介
  14. 使用cmd命令行或运行框进行关机重启操作
  15. 【数据库】期末复习:SQL语句、关系代数的运算、范式的定义和判断、求最小函数依赖集、ER图转关系模式
  16. 国内外数字信号处理经典教材
  17. 2.6 自定义srv python
  18. 《沉默的大多数》的读后感作文3500字
  19. 用学过的python写一个计算三角形面积和周长
  20. 不仅仅程序员的格子衬衫!互联网公司时尚穿搭指南

热门文章

  1. 无人豆浆机?九阳,请开始你的表演
  2. 纹理表示(Texture)
  3. 独立站站群是什么?被玩烂的站群模式还能持续多久?站群玩法怎么玩
  4. 免费建立自己的企业邮箱
  5. 从零开始的微信小程序线上商城开发(一)
  6. 爱玩mc卡在更新java_爱玩mc火爆服务器ip与登陆机制
  7. C#网页爬虫抓取行政区划
  8. 聊聊时代中的那些坚持
  9. ipv6 “无状态地址分配”和 “有状态地址分配” 两种IPV6地址分配方式的区别说明
  10. 解决ip无法自动分配