一、大数据框架及Spark介绍

1.1 大数据框架

大数据(Big Data)是指无法在一定时间内用常规软件工具对其内容进行抓取、管理和处理的数据集合。大数据技术,是指从各种各样类型的数据中,快速获得有价值信息的能力。

自2003年Google公布了3篇大数据奠基性论文,为大数据存储及分布式处理的核心问题提供了思路:非结构化文件分布式存储(GFS)、分布式计算(MapReduce)及结构化数据存储(BigTable),并奠定了现代大数据技术的理论基础,而后大数据技术便快速发展,诞生了很多日新月异的技术。

归纳现有大数据框架解决的核心问题及相关技术主要为:

  • 分布式存储的问题:有GFS,HDFS等,使得大量的数据能横跨成百上千台机器;
  • 大数据计算的问题:有MapReduce、Spark批处理、Flink流处理等,可以分配计算任务给各个计算节点(机器);
  • 结构化数据存储及查询的问题:有Hbase、Bigtable等,可以快速获取/存储结构化的键值数据;
  • 大数据挖掘的问题:有Hadoop的mahout,spark的ml等,可以使用分布式机器学习算法挖掘信息;

1.2 Spark的介绍

Spark是一个分布式内存批计算处理框架,Spark集群由Driver, Cluster Manager(Standalone,Yarn 或 Mesos),以及Worker Node组成。对于每个Spark应用程序,Worker Node上存在一个Executor进程,Executor进程中包括多个Task线程。

在执行具体的程序时,Spark会将程序拆解成一个任务DAG(有向无环图),再根据DAG决定程序各步骤执行的方法。该程序先分别从textFile和HadoopFile读取文件,经过一些列操作后再进行join,最终得到处理结果。

PySpark是Spark的Python API,通过Pyspark可以方便地使用 Python编写 Spark 应用程序, 其支持 了Spark 的大部分功能,例如 Spark SQL、DataFrame、Streaming、MLLIB(ML)和 Spark Core。

二、PySpark分布式机器学习

2.1 PySpark机器学习库

Pyspark中支持两个机器学习库:mllib及ml,区别在于ml主要操作的是DataFrame,而mllib操作的是RDD,即二者面向的数据集不一样。相比于mllib在RDD提供的基础操作,ml在DataFrame上的抽象级别更高,数据和操作耦合度更低。

注:mllib在后面的版本中可能被废弃,本文示例使用的是ml库。

pyspark.ml训练机器学习库有三个主要的抽象类:Transformer、Estimator、Pipeline。

  • Transformer主要对应feature子模块,实现了算法训练前的一系列的特征预处理工作,例如MinMaxScaler、word2vec、onehotencoder等,对应操作为transform;
# 举例:特征加工
from pyspark.ml.feature import VectorAssembler
featuresCreator = VectorAssembler(inputCols=[col[0] for col in labels[2:]] + [encoder.getOutputCol()],outputCol='features'
)
  • Estimator对应各种机器学习算法,主要为分类、回归、聚类和推荐算法4大类,具体可选算法大多在sklearn中均有对应,对应操作为fit;
# 举例:分类模型
from pyspark.ml.classification import LogisticRegressionlogistic = LogisticRegression(featuresCol=featuresCreator.getOutputCol(),labelCol='INFANT_ALIVE_AT_REPORT')
  • Pipeline可将一些列转换和训练过程串联形成流水线。
# 举例:创建流水线
from pyspark.ml import Pipelinepipeline = Pipeline(stages=[encoder, featuresCreator, logistic]) # 特征编码,特征加工,载入LR模型
# 拟合模型
train, test = data.randomSplit([0.7,0.3],seed=123)
model = pipeline.fit(train)

2.2 PySpark分布式机器学习原理

在分布式训练中,用于训练模型的工作负载会在多个微型处理器之间进行拆分和共享,这些处理器称为工作器节点,通过这些工作器节点并行工作以加速模型训练。 分布式训练可用于传统的 ML 模型,但更适用于计算和时间密集型任务,如用于训练深度神经网络。分布式训练有两种主要类型:数据并行及模型并行,主要代表有Spark ML,Parameter Server和TensorFlow。

spark的分布式训练的实现为数据并行:按行对数据进行分区,从而可以对数百万甚至数十亿个实例进行分布式训练。 以其核心的梯度下降算法为例:
1、首先对数据划分至各计算节点;
2、把当前的模型参数广播到各个计算节点(当模型参数量较大时会比较耗带宽资源);
3、各计算节点进行数据抽样得到mini batch的数据,分别计算梯度,再通过treeAggregate操作汇总梯度,得到最终梯度gradientSum;
4、利用gradientSum更新模型权重(这里采用的阻断式的梯度下降方式,当各节点有数据倾斜时,每轮的时间起决于最慢的节点。这是Spark并行训练效率较低的主要原因)。

PySpark项目实战

注:单纯拿Pyspark练练手,可无需配置Pyspark集群,直接本地配置下单机Pyspark,也可以使用线上spark集群(如: community.cloud.databricks.com)。

本项目通过PySpark实现机器学习建模全流程:数据的载入,数据分析,特征加工,二分类模型训练及评估。

#!/usr/bin/env python
# coding: utf-8#  初始化SparkSession
from pyspark.sql import SparkSessionspark = SparkSession.builder.appName("Python Spark RF example").config("spark.some.config.option", "some-value").getOrCreate()# 加载数据
df = spark.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("./data.csv",header=True)from pyspark.sql.functions import *
# 数据基本信息分析df.dtypes # Return df column names and data types
df.show()  #Display the content of df
df.head()  #Return first n rows
df.first()  #Return first row
df.take(2)  #Return the first n rows
df.schema   # Return the schema of df
df.columns # Return the columns of df
df.count()  #Count the number of rows in df
df.distinct().count()  #Count the number of distinct rows in df
df.printSchema()  #Print the schema of df
df.explain()  #Print the (logical and physical)  plans
df.describe().show()  #Compute summary statistics df.groupBy('Survived').agg(avg("Age"),avg("Fare")).show()  # 聚合分析
df.select(df.Sex, df.Survived==1).show()  # 带条件查询
df.sort("Age", ascending=False).collect() # 排序df = df.dropDuplicates()   # 删除重复值df = df.na.fill(value=0)  # 缺失填充值
df = df.na.drop()        # 或者删除缺失值df = df.withColumn('isMale', when(df['Sex']=='male',1).otherwise(0)) # 新增列:性别0 1
df = df.drop('_c0','Name','Sex') # 删除姓名、性别、索引列# 设定特征/标签列
from pyspark.ml.feature import VectorAssembler
ignore=['Survived']
vectorAssembler = VectorAssembler(inputCols=[x for x in df.columns  if x not in ignore], outputCol = 'features')
new_df = vectorAssembler.transform(df)
new_df = new_df.select(['features', 'Survived'])# 划分测试集训练集
train, test = new_df.randomSplit([0.75, 0.25], seed = 12345)# 模型训练
from pyspark.ml.classification import LogisticRegressionlr = LogisticRegression(featuresCol = 'features', labelCol='Survived')
lr_model = lr.fit(test)# 模型评估
from pyspark.ml.evaluation import BinaryClassificationEvaluatorpredictions = lr_model.transform(test)
auc = BinaryClassificationEvaluator().setLabelCol('Survived')
print('AUC of the model:' + str(auc.evaluate(predictions)))
print('features weights', lr_model.coefficientMatrix)

文章首发于算法进阶,公众号阅读原文可访问GitHub项目源码

分布式机器学习原理及实战(Pyspark)相关推荐

  1. [机器学习入门] 深度学习简介,GPU计算的原理,分布式机器学习原理

    深度学习简介 深度学习的概念源于人工神经网络的研究.含多隐层的多层感知器就是一种深度学习结构.深度学习通过组合低层特征形成更加抽象的高层表示属性类别或特征,以发现数据的分布式特征表示. 深度学习采用的 ...

  2. 京东架构师最新出品《分布式缓存原理到实战剖析手册》,限时开源

    分布式缓存所带来的诸多问题 当下互联网领域的软件系统,毫不夸张地说已经成为分布式系统的天下. 随着现代应用对速度的要求越来越高,对缓存机制的使用也越来越常见.越来越频繁.分布式的语境给系统的开发与设计 ...

  3. 【分布式】Pytorch分布式训练原理和实战

    [分布式]基于Horovod的Pytorch分布式训练原理和实战 并行方法: 1. 模型并行 2. 数据并行 3. 两者之间的联系 更新方法: 1. 同步更新 2. 异步更新 分布式算法: 1. Pa ...

  4. 分布式IM原理与实战: 从0到1打造即时通讯云

    目录 作者介绍 小册介绍 你会学到什么? 适宜人群 作者介绍 逆水:通信专家.多年系统设计开发及项目管理经验.目前就职某在线教育公司. 小册介绍 一套长连接系统涉及的技术与知识面非常多,相对于比较成熟 ...

  5. 分布式事务原理及实战seata(转自微信公众号 终码一生 )

    什么是分布式事务? _____________________________________________________________________________ 分布式对应的是单体架构, ...

  6. 分布式技术原理与实战45讲--06 加餐1:如何准备一线互联网公司面试?

    本课时我们来讲讲如何准备一线互联网公司面试. 互联网技术面试的特点 互联网公司的技术面试有一些侧重点,国内互联网公司和外企的侧重点又有不同.BAT 互联网公司看重项目能力,重点考察语言深度和项目能力, ...

  7. 分布式技术原理与实战45讲--05 第05讲:共识问题:区块链如何确认记账权?

    本课时我们主要讲解"共识问题:区块链如何确认记账权?" 区块链可以说是最近几年最热的技术领域之一,区块链起源于中本聪的比特币,作为比特币的底层技术,本质上是一个去中心化的数据库,其 ...

  8. 分布式事务开山之作——《深入理解分布式事务:原理与实战》草图曝光!!

    大家好,我是冰河~~ 今天,咱们就暂时不聊[精通高并发系列]了,今天插播一下分布式事务,为啥?因为冰河联合猫大人共同创作的分布式事务领域的开山之作--<深入理解分布式事务:原理与实战>一书 ...

  9. 可能要用心学高并发核心编程,限流原理与实战,分布式令牌桶限流

    实战:分布式令牌桶限流 本节介绍的分布式令牌桶限流通过Lua+Java结合完成,首先在Lua脚本中完成限流的计算,然后在Java代码中进行组织和调用. 分布式令牌桶限流Lua脚本 分布式令牌桶限流Lu ...

  10. 来!带你深入理解分布式事务:原理与实战!

    随着互联网的不断发展,互联网企业的业务在飞速变化,推动着系统架构也在不断地发生变化.总体来说,系统架构大致经历了 单体应用架构→垂直应用架构→分布式架构→SOA架构→微服务架构的演变. 如今微服务技术 ...

最新文章

  1. 一款零注解侵入的 API 文档生成工具,你用过吗?
  2. html导入.md文件并渲染,vue 导入.md文件(markdown转HTML)
  3. 人造神经元成功操纵植物,让捕蝇草强行闭合,脑机接口新思路打开丨Nature子刊...
  4. iOS开发UI篇 -- UISearchBar 属性、方法详解及应用(自定义搜索框样式)
  5. python中的类属性和类方法_python面向对象之类属性和类方法的使用和实例
  6. 手动实现一个速度仪表盘
  7. 面向.Net程序员的前端优化
  8. C++ 智能指针 :内存泄漏、 RAII、智能指针、auto_ptr、unique_ptr、shared_ptr、weak_ptr、定制删除器deleter
  9. 如何科学高效的学习Web前端开发?
  10. 计算机论文搜索技巧【一】
  11. Linux之父和Redis之父,Redis之父:10x程序员应该具备哪些素质
  12. mysql limit (1-1)10_110,当您知道只有1个结果时,是否将'LIMIT 1'添加到MySQL查询中使它们更快?...
  13. linux alias命令
  14. iphone在jsp显示时间会NAN解决办法
  15. 计算机控制系统模型,控制系统数学模型及其类型-电脑自学网
  16. C#实现Word批量转换Pdf
  17. 计算机无限开机,电脑开机无限循环重启
  18. funcode之c++版弹弹堂(第一个设计实验)
  19. windows 性能监控--Perfmon主要指标
  20. opencv显示图片只显示部分(或认为图片太大)

热门文章

  1. java基于ssm三大框架的小区物业收费报修管理系统
  2. LVDS,接口,时序讲解,非常好的文章
  3. 【STM32+cubemx】0013 HAL库开发:SPI总线访问气压计BMP280/BME280
  4. ANSYS Products 2020中文版
  5. 2020年秋季学期Python教材推荐与选用参考
  6. 单尺度Retinex
  7. mybatisplus代码生成器
  8. python实现聊天工具_python开发简单的聊天工具
  9. SPSS学习(二)作图
  10. maxscale连接mysql_MaxScale实现mysql读写分离,负载均衡