MLlib代表机器学习库。

MLlib

  • 数据准备:特征提取、变换、选择、分类特征的散列和一些自然语言处理方法
  • 机器学习算法:实现了一些流行和高级的回归、分类和聚类算法
  • 使用程序:统计方法,如描述性统计、卡方检验、线性代数(系数稠密矩阵和向量)和模型评估方法

加载和转换数据

import pyspark.sql.types as typ
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
labels = [('INFANT_ALIVE_AT_REPORT', typ.StringType()),('BIRTH_YEAR', typ.IntegerType()),('BIRTH_MONTH', typ.IntegerType()),('BIRTH_PLACE', typ.StringType()),('MOTHER_AGE_YEARS', typ.IntegerType()),('MOTHER_RACE_6CODE', typ.StringType()),('MOTHER_EDUCATION', typ.StringType()),('FATHER_COMBINED_AGE', typ.IntegerType()),('FATHER_EDUCATION', typ.StringType()),('MONTH_PRECARE_RECODE', typ.StringType()),('CIG_BEFORE', typ.IntegerType()),('CIG_1_TRI', typ.IntegerType()),('CIG_2_TRI', typ.IntegerType()),('CIG_3_TRI', typ.IntegerType()),('MOTHER_HEIGHT_IN', typ.IntegerType()),('MOTHER_BMI_RECODE', typ.IntegerType()),('MOTHER_PRE_WEIGHT', typ.IntegerType()),('MOTHER_DELIVERY_WEIGHT', typ.IntegerType()),('MOTHER_WEIGHT_GAIN', typ.IntegerType()),('DIABETES_PRE', typ.StringType()),('DIABETES_GEST', typ.StringType()),('HYP_TENS_PRE', typ.StringType()),('HYP_TENS_GEST', typ.StringType()),('PREV_BIRTH_PRETERM', typ.StringType()),('NO_RISK', typ.StringType()),('NO_INFECTIONS_REPORTED', typ.StringType()),('LABOR_IND', typ.StringType()),('LABOR_AUGM', typ.StringType()),('STEROIDS', typ.StringType()),('ANTIBIOTICS', typ.StringType()),('ANESTHESIA', typ.StringType()),('DELIV_METHOD_RECODE_COMB', typ.StringType()),('ATTENDANT_BIRTH', typ.StringType()),('APGAR_5', typ.IntegerType()),('APGAR_5_RECODE', typ.StringType()),('APGAR_10', typ.IntegerType()),('APGAR_10_RECODE', typ.StringType()),('INFANT_SEX', typ.StringType()),('OBSTETRIC_GESTATION_WEEKS', typ.IntegerType()),('INFANT_WEIGHT_GRAMS', typ.IntegerType()),('INFANT_ASSIST_VENTI', typ.StringType()),('INFANT_ASSIST_VENTI_6HRS', typ.StringType()),('INFANT_NICU_ADMISSION', typ.StringType()),('INFANT_SURFACANT', typ.StringType()),('INFANT_ANTIBIOTICS', typ.StringType()),('INFANT_SEIZURES', typ.StringType()),('INFANT_NO_ABNORMALITIES', typ.StringType()),('INFANT_ANCEPHALY', typ.StringType()),('INFANT_MENINGOMYELOCELE', typ.StringType()),('INFANT_LIMB_REDUCTION', typ.StringType()),('INFANT_DOWN_SYNDROME', typ.StringType()),('INFANT_SUSPECTED_CHROMOSOMAL_DISORDER', typ.StringType()),('INFANT_NO_CONGENITAL_ANOMALIES_CHECKED', typ.StringType()),('INFANT_BREASTFED', typ.StringType())
]schema = typ.StructType([typ.StructField(e[0], e[1], False) for e in labels])

加载数据。.read.csv()方法可以读取未压缩的或‘Gzipped’压缩的逗号分隔的值。参数header设置为true,代表第一行,并且使用shema来指定正确的数据类型:

births = spark.read.csv('file:///Program Files/Pyproject/pyspark/data/births_train.csv.gz',header=True,schema=schema)
# 首先定义重编码字典:
recode_dictionary = {'YNU':{'Y': 1,'N': 0,'U': 0}
}
selected_features = ['INFANT_ALIVE_AT_REPORT', 'BIRTH_PLACE', 'MOTHER_AGE_YEARS', 'FATHER_COMBINED_AGE', 'CIG_BEFORE', 'CIG_1_TRI', 'CIG_2_TRI', 'CIG_3_TRI', 'MOTHER_HEIGHT_IN', 'MOTHER_PRE_WEIGHT', 'MOTHER_DELIVERY_WEIGHT', 'MOTHER_WEIGHT_GAIN', 'DIABETES_PRE', 'DIABETES_GEST', 'HYP_TENS_PRE', 'HYP_TENS_GEST', 'PREV_BIRTH_PRETERM'
]births_trimmed = births.select(selected_features)

在数据集中大量的特征,它们的值是YSE/NO/Unknown;将YES编码为1,其它值设置为0.
还有个小问题,母亲吸烟的数量如何编码:因为0意味着母亲在怀孕前或怀孕期间没有吸烟;1~97之间代表的是母亲实际吸烟数量;98代表母亲实际吸烟数量是98或者更过;而99代表母亲实际吸烟数量未知,将位置状态设置为0,并重新编码

import pyspark.sql.functions as func
def recode(col, key):return recode_dictionary[key][col]
def correct_cig(feat):return func.when(func.col(feat) != 99, func.col(feat)).otherwise(0)
rec_integer = func.udf(recode, typ.IntegerType())

recode方法从recode_dictionary(给出键)查找正确的键,并返回更正的值。correct_cig方法检查如下,当feat特征值不等于99时,返回特征值;如果值等于99,则得到0.
不能直接在DataFrame上使用recode函数,需要转换为Spark可理解的UDF。rec_integer函数功能如下:
通过传递指定的recode函数及指定返回值的数据类型,可以使用它来重新编码Yes/No/Unknown特征。

births_transformed = births_trimmed.withColumn('CIG_BEFORE', correct_cig('CIG_BEFORE'))\
.withColumn('CIG_1_TRI', correct_cig('CIG_1_TRI'))\
.withColumn('CIG_2_TRI', correct_cig('CIG_2_TRI'))\
.withColumn('CIG_3_TRI', correct_cig('CIG_3_TRI'))

.withColumn()方法用列名作为其第一个参数,用转换作为第二个参数。

cols = [(col.name, col.dataType) for col in births_transformed.schema]
YNU_cols = []
for i, s in enumerate(cols):if s[1] == typ.StringType():dis = births.select(s[0]).distinct().rdd.map(lambda row: row[0]).collect()
if 'Y' in dis:YNU_cols.append(s[0])

首先,创建一个包含列名称和相应数据类型的元组(cols)列表。循环遍历这些列表,并计算所有字符串列的不同值;如果‘Y’在返回的列表中,将列名追加到YNU_cols列表。
DataFrame可以在选择特征时批量转换特征。

births.select(['INFANT_NICU_ADMISSION',rec_integer('INFANT_NICU_ADMISSION', func.lit('YNU')).alias('INFANT_NICU_ADMISSION_RECODE')
]).take(5)
[Row(INFANT_NICU_ADMISSION='Y', INFANT_NICU_ADMISSION_RECODE=1),Row(INFANT_NICU_ADMISSION='Y', INFANT_NICU_ADMISSION_RECODE=1),Row(INFANT_NICU_ADMISSION='U', INFANT_NICU_ADMISSION_RECODE=0),Row(INFANT_NICU_ADMISSION='N', INFANT_NICU_ADMISSION_RECODE=0),Row(INFANT_NICU_ADMISSION='U', INFANT_NICU_ADMISSION_RECODE=0)]

选择‘INFANT_NICU_ADMISSION’列,并且将该特征的名称传递给rec_integer方法。还将新转换的列的别名称为‘INFANT_NICU_ADMISSION_RECODE’。这样可以确保UDF按预期工作。

exprs_YNU = [rec_integer(x, func.lit('YNU')).alias(x) if x in YNU_cols else x for x in births_transformed.columns
]births_transformed = births_transformed.select(exprs_YNU)
births_transformed.select(YNU_cols[-5:]).show(5)
+------------------+
|PREV_BIRTH_PRETERM|
+------------------+
|                 0|
|                 0|
|                 0|
|                 1|
|                 0|
+------------------+
only showing top 5 rows

描述性统计

.colStats()是根据一个样本来计算描述性统计的。
该方法使用RDD的数据来计算MultivariateStatisticalSummary对象的描述性统计信息,并返回MultivariateStatisticalSummary对象,该对象包含如下描述性统计信息:

  • count:行数
  • max:列中最大值
  • mean:列的所有值的平均值
  • min:列中最小值
  • normL1:列中值的L1_Norm值
  • normL2:列中值的L2_Norm值
  • numNonzeros:列中非零值的数量
  • variance:列中值得方差值
import pyspark.mllib.stat as st
import numpy as np
numeric_cols = ['MOTHER_AGE_YEARS','FATHER_COMBINED_AGE','CIG_BEFORE','CIG_1_TRI','CIG_2_TRI','CIG_3_TRI','MOTHER_HEIGHT_IN','MOTHER_PRE_WEIGHT','MOTHER_DELIVERY_WEIGHT','MOTHER_WEIGHT_GAIN']numeric_rdd = births_transformed\.select(numeric_cols)\.rdd \.map(lambda row: [e for e in row])mllib_stats = st.Statistics.colStats(numeric_rdd)for col, m, v in zip(numeric_cols, mllib_stats.mean(), mllib_stats.variance()):print('{0}: \t{1:.2f} \t {2:.2f}'.format(col, m, np.sqrt(v)))
MOTHER_AGE_YEARS:    28.30    6.08
FATHER_COMBINED_AGE:    44.55    27.55
CIG_BEFORE:     1.43     5.18
CIG_1_TRI:  0.91     3.83
CIG_2_TRI:  0.70     3.31
CIG_3_TRI:  0.58     3.11
MOTHER_HEIGHT_IN:   65.12    6.45
MOTHER_PRE_WEIGHT:  214.50   210.21
MOTHER_DELIVERY_WEIGHT:     223.63   180.01
MOTHER_WEIGHT_GAIN:     30.74    26.23
categorical_cols = [e for e in births_transformed.columns if e not in numeric_cols]categorical_rdd = births_transformed\.select(categorical_cols)\.rdd \.map(lambda row: [e for e in row])
for i, col in enumerate(categorical_cols):agg = categorical_rdd.groupBy(lambda row: row[i]).map(lambda row: (row[0], len(row[1])))print(col, sorted(agg.collect(), key=lambda el: el[1], reverse=True))
corrs = st.Statistics.corr(numeric_rdd)
for i, el in enumerate(corrs > 0.5):corelated = [(numeric_cols[j], corrs[i][j])for j, e in enumerate(el)if e == 1.0 and j != i]if len(corelated) > 0:for e in corelated:print('{0}-to-{1} : {2:.2f}'.format(numeric_cols[i], e[0], e[1]))
CIG_BEFORE-to-CIG_1_TRI : 0.83
CIG_BEFORE-to-CIG_2_TRI : 0.72
CIG_BEFORE-to-CIG_3_TRI : 0.62
CIG_1_TRI-to-CIG_BEFORE : 0.83
CIG_1_TRI-to-CIG_2_TRI : 0.87
CIG_1_TRI-to-CIG_3_TRI : 0.76
CIG_2_TRI-to-CIG_BEFORE : 0.72
CIG_2_TRI-to-CIG_1_TRI : 0.87
CIG_2_TRI-to-CIG_3_TRI : 0.89
CIG_3_TRI-to-CIG_BEFORE : 0.62
CIG_3_TRI-to-CIG_1_TRI : 0.76
CIG_3_TRI-to-CIG_2_TRI : 0.89
MOTHER_PRE_WEIGHT-to-MOTHER_DELIVERY_WEIGHT : 0.54
MOTHER_PRE_WEIGHT-to-MOTHER_WEIGHT_GAIN : 0.65
MOTHER_DELIVERY_WEIGHT-to-MOTHER_PRE_WEIGHT : 0.54
MOTHER_DELIVERY_WEIGHT-to-MOTHER_WEIGHT_GAIN : 0.60
MOTHER_WEIGHT_GAIN-to-MOTHER_PRE_WEIGHT : 0.65
MOTHER_WEIGHT_GAIN-to-MOTHER_DELIVERY_WEIGHT : 0.60

重量特征是高度相关的,所以保留‘MOTHER_PRE_WEIGHT’:

featrues_to_keep = ['INFANT_ALIVE_AT_REPORT','BIRTH_PLACE','MOTHER_AGE_YEARS','FATHER_COMBINED_AGE','CIG_1_TRI','MOTHER_HEIGHT_IN','MOTHER_PRE_WEIGHT','DIABETES_PRE','DIABETES_GEST','HYP_TENS_PRE','HYP_TENS_GEST','PREV_BIRTH_PRETERM'
]
births_transformed = births_transformed.select([e for e in featrues_to_keep])

统计测试

使用MLlib的.chiSqTest()方法:

import pyspark.mllib.linalg as ln
for cat in categorical_cols[1:]:agg = births_transformed \.groupby('INFANT_ALIVE_AT_REPORT') \.pivot(cat) \.count()agg_rdd = agg.rdd \.map(lambda row: (row[1:])) \.flatMap(lambda row: [0 if e == None else e for e in row]) \.collect()row_length = len(agg.collect()[0]) - 1agg = ln.Matrices.dense(row_length, 2, agg_rdd)test = st.Statistics.chiSqTest(agg)print(cat, round(test.pValue, 4))
print(ln.Matrices.dense(3,2,  [1,2,3,4,5,6]))
DenseMatrix([[1., 4.],[2., 5.],[3., 6.]])
import pyspark.mllib.feature as ft
import pyspark.mllib.regression as reg
hashing = ft.HashingTF(7)
births_hashed = births_transformed.rdd \.map(lambda row :[list(hashing.transform(row[1]).toArrary())if col == 'BIRTH_PLACE'else row[i]for i, col in enumerate(featrues_to_keep)]) \.map(lambda row : [[e] if type(e) == int else e for e in row]) \.map(lambda row : [item for sublist in row for item in sublist]) \.map(lambda row : reg.LabeledPoint(row[0], ln.Vectors.dense(row[1:])))

创建哈希模型。因为特征值又七个级别,所以使用哈希处理中相同多的特征。将模型中’BIRTH_PLACE’特征转换为SparseVector。如果数据中又许多列,但是再一行中只有少数数据具有非零值,则着这种数据机构是首选。然后将所有特征结合在一起,最终创建LabeledPoint。

分隔培训和测试数据

将数据集分为两组:一组用于培训,另一组用于测试。RDD有一个方便的方法处理该情况:.random Split()。该方法的参数是随机分割数据集的比例列表。

births_train, births_test = births_hashed.randomSplit([0.6, 0.4])

预测婴儿生存几率

构建两个模型:线性分类器(linear classifier)————逻辑回归,和非线性分类器(non-linear-classifier)—————随机森林。对于前者,使用所有特征来处理,对于后者,使用chiSqSelector()方法选出前四个特征

MLlib中的逻辑回归

逻辑回归是从某种程度上是构建任何分类模型的基准。MLlib过去使用随机梯度下降(SGD)算法来提供逻辑回归模型评估。这个模型在Spark2.0中弃用,而使用LogisticRegressionWithLBFGS模型。
LogisticRegressionWithLBFGS模型使用Limited-memoryBroyden-Fletcher-Goldfarb-Shanno(BFGS)优化算法。这是一种接近于BFGS算法的拟牛顿方法。

from pyspark.mllib.classification import LogisticRegressionWithLBFGS
LR_Model = LogisticRegressionWithLBFGS.train(births_train, iterations=10)
from pyspark.mllib.tree import RandomForest
selector = ft.ChiSqSelector(4).fit(births_train)
top_Features_train = (births_train.map(lambda row: row.label) \.zip(selector.transform(births_train \.map(lambda row: row.features)))).map(lambda row: reg.LabeledPoint(row[0], row[1]))
top_Features_test = (births_test.map(lambda row: row.label) \.zip(selector \.transform(births_test.map(lambda row: row.features)))).map(lambda row: reg.LabeledPoint(row[0], row[]))
RF_model = RandomForest.trainClassifier(data=births_train,numClasses=2,categoricalFeaturesInfo={},numTrees=6,featureSubsetStrategy='all',seed=666)
  File "<ipython-input-46-46c21b63ab41>", line 1RF_model = RandomForest.trainClassifier(data=births_train,^
SyntaxError: invalid character in identifier
RF_results = (topFeatures_test.map(lambda row: row.label) \.zip(RF_moedel.predict(topFeatures_test \.map(lambda row: row.features))))
RF_evaluation = ev.BinaryClassificationMetrics(RF_results)
print('Area under PR: {0 : .2f}'.format(RF_evaluation.areaUnderPR))
print('Area under ROC : {0: .2f}'.format(RF_evalution.areaUnderROC))
model_evaluation.unpersist()

MLlib机器学习库相关推荐

  1. Spark机器学习库(MLlib)指南

    spark-1.6.1 机器学习库(MLlib)指南 MLlib是Spark的机器学习(ML)库.旨在简化机器学习的工程实践工作,并方便扩展到更大规模.MLlib由一些通用的学习算法和工具组成,包括分 ...

  2. 【Spark】实验6 Spark机器学习库MLlib编程实践

    Spark机器学习库MLlib编程实践 一.实验目的 通过实验掌握基本的MLLib编程方法: 掌握用MLLib解决一些常见的数据分析问题,包括数据导入.成分分析和分类和预测等. 二.实验平台 新工科智 ...

  3. Spark 机器学习库【MLlib】编程指南

    一.机器学习库 MLlib是Spark的机器学习库[ML].其目标是使实用的机器学习算法变得可扩展且容易使用.在较高级别,它提供了以下工具: 机器学习算法:常见的机器学习算法,例如分类,回归,聚类和协 ...

  4. Spark MLlib 机器学习

    本章导读 机器学习(machine learning, ML)是一门涉及概率论.统计学.逼近论.凸分析.算法复杂度理论等多领域的交叉学科.ML专注于研究计算机模拟或实现人类的学习行为,以获取新知识.新 ...

  5. 离线轻量级大数据平台Spark之MLib机器学习库概念学习

    Mlib机器学习库 1.1机器学习概念 机器学习有很多定义,倾向于下面这个定义.机器学习是对能通过经验自动改进的计算机算法的研究.机器学习依赖数据经验并评估和优化算法所运行出的模型.机器学习算法尝试根 ...

  6. 猿辅导 python_关于猿辅导机器学习项目ytk-learn和ytk-mp4j分布式机器学习库

    本文描述了猿辅导开源分布式机器学习库ytk-learn及分布式通信库ytk-mp4j的相关内容,可实现在多应用场景中使用.ytk-learn 是基于Java的高效分布式机器学习库, 简单易用,文档详细 ...

  7. 支持C/C++、Java、python、Matlab等语言的第三方机器学习库汇总

    C 通用机器学习 Recommender - 一个产品推荐的C语言库,利用了协同过滤. 计算机视觉 CCV - C-based/Cached/Core Computer Vision Library ...

  8. java机器学习库_6大最常用的Java机器学习库一览

    在 MLOSS.org 网站上,列出了 70 多个基于 Java 的开源机器学习项目,可能还有更多未列出的项目,存于大学里的服务器.GitHub 或 Bitbucket 中.我们将在本文中回顾 Jav ...

  9. Spark MLlib机器学习 | 算法综合实战(一)(史上最详细)

    ==========                         ========= 8.1.1 什么是机器学习 机器学习可以看做是一门人工智能的科学,该领域的主要研究对象是人工智能.机器学习利用 ...

最新文章

  1. android view爆炸效果,Android 显示view的粒子爆炸/绽放效果
  2. struct 与class 的区别?
  3. android 回退 activity,Android Activity要点(2)
  4. js获取网页当前页面及路径
  5. C语言使用fopen的两点注意事项
  6. 模拟扑克牌随机选择(Java)
  7. CVE-2017-4901 VMware虚拟机逃逸漏洞分析【Frida Windows实例】
  8. axios拦截器_Axios源码解析 —— 一个小而美的HttpClient
  9. php选择不同店发送不同邮件,php – WooCommerce电子邮件通知:不同城市的不同电子邮件收件人...
  10. Android修行手册-TextView常用属性篇
  11. 兴业银行紧急核查国美贷款
  12. 微信小程序请求函数的封装
  13. win10投影无法正常使用:我们正在确认此功能 解决方法
  14. VR系列--VR介绍
  15. 音乐服务器制作教程,分享硬盘中的音乐 DLNA服务搭建教程
  16. Word如何拆分单元格
  17. 【弹吉他必备的乐理知识】【2】节拍
  18. LC39 Combination Sum
  19. Linux ALSA声卡驱动之五:Machine 以及ALSA声卡的注册
  20. 2018苹果发布会新品 是如何成为众商家的追热点目标

热门文章

  1. pdfminer3k读取pdf文件
  2. 基于JAVAvue健身食谱系统计算机毕业设计源码+数据库+lw文档+系统+部署
  3. 音频服务器未响应怎么修复,win10音频服务未响应未修复教程?win10音频服务未响应未修复要怎么办?...
  4. golang 中的 int 和 int64 是相同的类型吗?
  5. C语言PAT刷题 - 1032 挖掘机技术哪家强
  6. PMP中挣值管理的概念和计算公式
  7. swift 16进制数-[UInt8]-转字符串-String-
  8. 即时电话社交app 陪我宣布获得6000万元A轮融资
  9. 自学python,庆幸知道这12个网站,省下了不少钱
  10. CSDN编程竞赛第六期题解