一、实践Spark的共享变量

不使用广播变量时:

Spark的执行过程中,Spark的一个或者多个函数操作会作为一个Task分发到某个节点上的Executor中去执行,当函数用到程序中定义的变量,那么那么Spark会将这些变量创建一个副本,并与这些函数一起打包到相应的task中。那么当有很多个task都应用到同一个变量时,spark会多次复制并打包传输这些变量,但变量本身有可能是一个很大的字典或者集合,这就会引起大量的网络传输,也有可能会造成一个Executor的内存溢出。

广播变量的意义:

就是解决上述问题的。当我们将Spark程序中的某个变量声明为广播变量之后,Spark的Driver只会给每个Executor发送一份该变量的副本,而不是每个Task拥有一个该变量的副本

一个Executor中的多个Task共享该变量,减少了网络数据的传输,也减少了对Executor资源的占用。

  1. wordcount示例,读取example.txt文件,统计单词数目。

import org.apache.spark.SparkContext

import org.apache.spark.SparkConf

 

object BroadcastTest {

  def main(args: Array[String]) {

    val conf=new SparkConf()

      .setMaster("local")//本地启动

      .setAppName("BroadcastTest")

 

    //创建SparkContext对象

    val sc=new SparkContext(conf)

 

    val str = "hadoop"//定义一个将被作为广播变量的变量

    val broadCast = sc.broadcast(str)//将一个变量定义为广播变量

 

val textFile=sc.textFile("file:///home/hadoop/example.txt")//读取本地文件

//filter中通过value属性来获取广播变量的值

    val wordCount=textFile.filter(word=>word.contains(broadCast.value))

      .foreach(println)//打印输出包含广播变量的文本行

  }

}

(2)增加一个共享变量--累加器,可以实现分布式计数的功能,各个Task对其的修改将会在Driver中进行累加计算。

import org.apache.spark.SparkContext

import org.apache.spark.SparkConf

 

object BroadcastTest {

  def main(args: Array[String]) {

    val conf=new SparkConf()

      .setMaster("local")//本地启动

      .setAppName("AccumulatorTest")

 

    //创建SparkContext对象

    val sc=new SparkContext(conf)

    val str = "hadoop"//定义一个将被作为广播变量的变量

val broadCast = sc.broadcast(str)//将一个变量定义为广播变量

val accumulator = sc.accumulator(0)//累加器

 

val textFile=sc.textFile("file:///home/hadoop/example.txt")//读取本地文件

//filter中通过value属性来获取广播变量的值

val wordCount=textFile.filter(word=>word.contains(broadCast.value))

  .foreach{//在动作算子foreach中使用累加器

     x=>{accumulator.add(1)//当找到一个包含广播变量的文本行,累加器值加1

         println(accumulator)}}//打印累加器当前的值

  println(accumulator)//打印累加器最终的结果

}

}

//需要注意的是对累加器的操作必须包含在一个动作算子中或者在对累加器的操作之后必须有动作算子,否则对累加器操作不会立即执行,导致累加器的值为0。

二、实践基于pySpark的Kmeans实现

(1),python的安装

打开linux终端,python的安装可以通过sudo apt install python完成

  1. python编辑插件的安装

在新弹出的窗口的左上角的输入框中输入python,并在列出的选项中选择下图所示的python community edition选项,然后点击图中绿色install按钮进行在线安装

3)配置python项目环境

在安装完python插件之后,重新启动IDEA,选择创建python工程,并点击project SDK按钮(如下图所示),选择安装的python的SDK。

当python项目创建成功之后,还需要设置python和Spark的环境变量。点击run->edit configuration,在弹出的窗口中(如下图所示),点击绿色的加号按钮,然后选择python,此时窗口的内容如右侧显示。然后点击窗口中的enviroment variables按钮,在弹出的小窗口中设置python和Spark的环境变量。

在Spark和python环境变量设置完成之后,通过file->project struncture打开如下图所示的窗口。点击窗口左侧列表中的module选项,在窗口右侧的区域中,选择denpendces选项卡,就会看到如下图所示的右侧窗口的内容。然后点击最右侧的绿色加号按钮,在弹出的下拉菜单中选择第一个JARs or directories选项,并在弹出的窗口中,找到spark下python文件中Lib的路径(如下图所示)。这些操作完成之后,点击Ok,并在弹出的窗口中选择Jar Directory即可

  1. 新建python文件测试

代码及注释:

from pyspark import SparkConf,SparkContext

conf = SparkConf().setMaster("local").setAppName("PythonTest")//设置工作路径

sc = SparkContext(conf = conf)

data = ["hello","world","chine","good","spark","good"]

rdd = sc.parallelize(data) //把数据转换为rdd形式

result_Rdd = rdd.map(lambda word: (word,1)).reduceByKey(lambda a,b:a+b)

result_list = result_Rdd.collect()//把集合每个元素转换为key,1的形式并对相同的key对次数进行求和

for line in result_list: //遍历输出

print(line)

sc.stop()

(5).pySpark的Kmeans实现

由于与test.py配置一样,直接新建一个python文件即可,

# -*- coding: utf-8 -*-

import math

from pyspark import SparkConf, SparkContext

#计算一个点到各个聚类中心的距离,返回距离最近的聚类的序号或者ID

def closestCluster(p, centers):

closest_cluster_id = 0#距离最近的聚类的序号

closest_dist = float("+inf")//先定义一个最近节点

for i in range(len(centers)): //计算一个数据节点对于每一个聚类中心的距离

temp_dist=math.sqrt(math.pow((p[0]-centers[i][0]),2)+

math.pow((p[1]-centers[i][1]),2))

if temp_dist < closest_dist://函数,当聚类中心更新后小于阀值,返回聚类中心坐标。

closest_dist = temp_dist

closest_cluster_id = i

return closest_cluster_id

#主程序

if __name__ == "__main__":

conf = SparkConf().setAppName("kmeans")

sc = SparkContext(conf=conf)

#测试数据,共5个点

data=[(1, 2), (3, 4), (2, 6), (7, 2), (4,9)]

#利用parallelize函数创建rdd,并进行缓存,因为该RDD在后面的迭代计算中会反复用到

data_rdd = sc.parallelize(data).cache()

k=2#设置聚类个数为2

converge_dist = float(1)#设置停止迭代时,新旧聚类中心间的距离之和的上限

#随机选择2个聚类中心,参数False设定为取出样本后不放回

center_points_list = data_rdd.takeSample(False, k)

#每次迭代之和,新旧聚类中心间距离的和,初始设置为大于converge_dist

temp_dist=converge_dist+1

#开始迭代,当temp_dist小于等于converge_dist时,停止迭代

while temp_dist > converge_dist:

#通过map操作计算每个点距离最近的聚类中心点,返回的是一个rdd,rdd中的元素为

#(closest_cluster_id, (p,1))形式的元组。(p,1)中1的作用是便于后面统计一个聚类中点的个数

closest_rdd = data_rdd.map( lambda p: (closestCluster(p, center_points_list), (p, 1)))

#利用reduceByKey操作将每个聚类中的点的x和y分别相加。其中p1_c1[1]和p2_c2[1]的值#分别是1,将它们相加的作用是统计一个聚类中点或者成员的个数

point_stats_rdd = closest_rdd.reduceByKey( lambda p1_c1, p2_c2: ((p1_c1[0][0] + p2_c2[0][0],p1_c1[0][1] + p2_c2[0][1]), p1_c1[1] + p2_c2[1]))

#计算各个聚类新的中心,也就是将每个聚类前面相加的x和y的值,除以每个聚类点的个数

#其中st[0]是聚类的id,st[1][0][0]是汇总的x,st[1][0][1]是汇总的y,st[1][1]是聚类点的个数

new_center_points_list = point_stats_rdd.map( lambda st:(st[0], (st[1][0][0]/ st[1][1],st[1][0][1]/ st[1][1]))).collect()

#计算新的各个聚类中心与老的聚类中心间欧式距离的和

temp_dist=0

for (cluster_key, new_center_point) in new_center_points_list:

x_2=math.pow((new_center_point[0]-center_points_list[cluster_key][0]),2)

y_2=math.pow((new_center_point[1]-center_points_list[cluster_key][1]),2)

dist=math.sqrt(x_2+y_2)

temp_dist+=dist

#利用新得到的聚类中心的坐标更新各个聚类的中心

for (cluster_key, new_center_point) in new_center_points_list:

center_points_list[cluster_key] = new_center_point

#打印输出得到的聚类的中心点坐标

print("Final centers: " + str(center_points_list))

cluster_result_list=closest_rdd.collect()

for item in cluster_result_list:

print item[1][0], item[0]#输出各个点以及他们所属的聚类

sc.stop()

三.问题讨论:

  1. 安装python插件时,出现如下错误。显示资源被占用。

解决方案:

首先杀死进程,解放资源。

输入以下命令:ps -e | grep apt获得当前进程然后杀死进程

再输入:

sudo rm /var/cache/apt/archives/lock

sudo rm /var/lib/dpkg/lock-frontend

sudo apt install python

可以运行

2,问题:运行kmeans 程序时,在有中文注释的地方编译出错。

解决方案:     1,删掉中文注释

2,在文件头部加上这样的一句话 # -*- coding: utf-8 -*-

pySpark与Kmeans算法实现相关推荐

  1. 机器学习中的聚类算法(1):k-means算法

    一文详解激光点云的物体聚类:https://mp.weixin.qq.com/s/FmMJn2qjtylUMRGrD5telw 引言: Q:什么是聚类算法? 现在我们在做的深度学习当中,比如图像的识别 ...

  2. python实现K-means算法

    K-means算法流程: 随机选k个样本作为初始聚类中心 计算数据集中每个样本到k个聚类中心距离,并将其分配到距离最小的聚类中心 对于每个聚类,重新计算中心 回到2,至得到局部最优解 python代码 ...

  3. Python之机器学习K-means算法实现

    一.前言: 今天在宿舍弄了一个下午的代码,总算还好,把这个东西算是熟悉了,还不算是力竭,只算是知道了怎么回事.今天就给大家分享一下我的代码.代码可以运行,运行的Python环境是Python3.6以上 ...

  4. matlab 职坐标,机器学习入门之机器学习实战ByMatlab(四)二分K-means算法

    本文主要向大家介绍了机器学习入门之机器学习实战ByMatlab(四)二分K-means算法,通过具体的内容向大家展现,希望对大家学习机器学习入门有所帮助.前面我们在是实现K-means算法的时候,提到 ...

  5. 一文详尽系列之K-means算法

    点击上方"Datawhale",选择"星标"公众号 第一时间获取价值内容 K-means 是我们最常用的基于距离的聚类算法,其认为两个目标的距离越近,相似度越大 ...

  6. 标准K-means算法的缺陷、K-mean++初始化算法、初始化算法步骤、Kmeans++算法实现

    标准K-means算法的缺陷.K-mean++初始化算法.初始化算法步骤.Kmeans++算法实现 目录 标准K-means算法的缺陷.K-mean&

  7. Kmeans算法的过程是什么?Kmeans算法的缺陷主要有哪些?

    Kmeans算法的过程是什么?Kmeans算法的缺陷主要有哪些? 目录 Kmeans算法的过程是什么?Kmeans算法的缺陷主要有哪些?

  8. AI K-means算法对数据进行聚类分析-实验报告

    1. 问题描述及实验要求 K-means算法对data中数据进行聚类分析 (1)算法原理描述 (2)算法结构 (3)写出K-means具体功能函数(不能直接调用sklearn.cluster(Mean ...

  9. 「AI科技」机器学习算法之K-means算法原理及缺点改进思路

    https://www.toutiao.com/a6641916717624721933/ 2019-01-03 08:00:00 K-means算法是使用得最为广泛的一个算法,本文将介绍K-mean ...

最新文章

  1. iis6如何升级iis7_IIS修复IIS出现错误后如何完全卸载重装
  2. 从需求出发来看关系模型与非关系模型–关系模型与非关系模型概述
  3. javascript函数上的prototype属性的理解
  4. (JAVA)Arrays数组工具类
  5. 【Gym - 101061F】Fairness(dp,思维)
  6. c语言黑白棋程序设计报告,C语言课程设计黑白棋
  7. 今日头条iOS客户端启动速度优化
  8. 谷歌爆苹果 Image I/O 存重大漏洞,无辜用户躺枪
  9. 如何通过 Siri 播放视频?且看优酷技术接入实践
  10. svn上传时显示database is locked
  11. Java学习之InputStream中read()与read(byte[] b)
  12. 面试题27 二叉搜索树转换为双向链表
  13. c语言贪吃蛇(简易版本含完整代码)
  14. 【无标题】段码液晶驱动芯片VK1621S-1资料介绍
  15. BOS v2.0后台管理系统界面通用解决方案
  16. 不用镜像,也不下载安装包,windows下安装Ruby
  17. java 正则用法_Java正则用法
  18. 运用Ntop监控网络流量(视频Demo)
  19. 在否定句和疑问句使用have动词_26
  20. python中标点符号大全及名字_标点符号大全及名字

热门文章

  1. Linux中 mv(文件移动)
  2. 小米4手机指令大全以及使用技巧
  3. PandaOCR 翻译软件 好像有毒
  4. shiro实现密码加密
  5. Java中如何遍历HashMap呢?
  6. Vim基础用法,最常用、最实用的命令介绍(保姆级教程)
  7. JS正则匹配字符串中的数字/字母
  8. PTA Python习题 计算工资
  9. 智能客服机器人背后是思考与实践
  10. Java学习---nginx虚拟主机域名配置