最近着手的一个项目需要在Spark环境下使用DBSCAN算法,遗憾的是Spark MLlib中并没有提供该算法。调研了一些相关的文章,有些方案是将样本点按照空间位置进行分区,并在每个空间分区中分别跑DBSCAN,但是这种方案容易遇到数据倾斜的问题,并且在分区的边界的结果很有可能是错误的。经过与一些小伙伴的交流,通过几天的探索尝试,最终在Spark上手工实现了分布式的DBSCAN算法,经过校验结果和Sklearn单机结果完全一致,并且性能也达到了工业级水平。通过该算法的实现,加深了对Spark的理解,用到了分批次广播和分区迭代计算等技巧,感觉自己还是棒棒哒,特意分享出来供有需要的小伙伴们参考。

一,总体思路

DBSCAN算法的分布式实现需要解决以下一些主要的问题。1,如何计算样本点中两两之间的距离?在单机环境下,计算样本点两两之间的距离比较简单,是一个双重遍历的过程。为了减少计算量,可以用空间索引如Rtree进行加速。在分布式环境,样本点分布在不同的分区,难以在不同的分区之间直接进行双重遍历。为了解决这个问题,我的方案是将样本点不同的分区分成多个批次拉到Driver端, 然后依次广播到各个excutor分别计算距离,将最终结果union,从而间接实现双重遍历。为了减少计算量,广播前对拉到Driver端的数据构建空间索引Rtree进行加速。2,如何构造临时聚类簇?这个问题不难,单机环境和分布式环境的实现差不多。都是通过group的方式统计每个样本点周边邻域半径R内的样本点数量,并记录它们的id,如果这些样本点数量超过minpoints则构造临时聚类簇,并维护核心点列表。3,如何合并相连的临时聚类簇得到聚类簇?这个是分布式实现中最最核心的问题。在单机环境下,标准做法是对每一个临时聚类簇,判断其中的样本点是否在核心点列表,如果是,则将该样本点所在的临时聚类簇与当前临时聚类簇合并。并在核心点列表中删除该样本点。重复此过程,直到当前临时聚类簇中所有的点都不在核心点列表。在分布式环境下,临时聚类簇分布在不同的分区,无法直接扫描全局核心点列表进行临时聚类簇的合并。我的方案是先在每一个分区内部对各个临时聚类簇进行合并,然后缩小分区数量重新分区,再在各个分区内部对每个临时聚类簇进行合并。不断重复这个过程,最终将所有的临时聚类簇都划分到一个分区,完成对全部临时聚类簇的合并。为了降低最后一个分区的存储压力,我采用了不同于标准的临时聚类簇的合并算法。对每个临时聚类簇只关注其中的核心点id,而不关注非核心点id,以减少存储压力。合并时将有共同核心点id的临时聚类簇合并。为了加快临时聚类的合并过程,分区时并非随机分区,而是以每个临时聚类簇的核心点id中的最小值min_core_id作为分区的Hash参数,具有共同核心点id的临时聚类簇有更大的概率被划分到同一个分区,从而加快了合并过程。

二,核心代码

import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder().appName("dbscan").getOrCreate()val sc = spark.sparkContextimport spark.implicits._

1,寻找核心点形成临时聚类簇。该步骤一般要采用空间索引 + 广播的方法,此处从略,假定已经得到了临时聚类簇。

//rdd_core的每一行代表一个临时聚类簇:(min_core_id, core_id_set)//core_id_set为临时聚类簇所有核心点的编号,min_core_id为这些编号中取值最小的编号var rdd_core = sc.parallelize(List((1L,Set(1L,2L)),(2L,Set(2L,3L,4L)),                                       (6L,Set(6L,8L,9L)),(4L,Set(4L,5L)),                                       (9L,Set(9L,10L,11L)),(15L,Set(15L,17L)),                                       (10L,Set(10L,11L,18L))))rdd_core.collect.foreach(println)

2,合并临时聚类簇得到聚类簇。

import scala.collection.mutable.ListBufferimport org.apache.spark.HashPartitioner//定义合并函数:将有共同核心点的临时聚类簇合并val mergeSets = (set_list: ListBuffer[Set[Long]]) =>{var result = ListBuffer[Set[Long]]()while (set_list.size>0){var cur_set = set_list.remove(0)var intersect_idxs = List.range(set_list.size-1,-1,-1).filter(i=>(cur_set&set_list(i)).size>0)while(intersect_idxs.size>0){for(idx        cur_set = cur_set|set_list(idx)      }for(idx        set_list.remove(idx)      }      intersect_idxs = List.range(set_list.size-1,-1,-1).filter(i=>(cur_set&set_list(i)).size>0)    }result = result:+cur_set  }result}///对rdd_core分区后在每个分区合并,不断将分区数量减少,最终合并到一个分区//如果数据规模十分大,难以合并到一个分区,也可以最终合并到多个分区,得到近似结果。//rdd: (min_core_id,core_id_set)def mergeRDD(rdd: org.apache.spark.rdd.RDD[(Long,Set[Long])], partition_cnt:Int):org.apache.spark.rdd.RDD[(Long,Set[Long])] = {  val rdd_merged =  rdd.partitionBy(new HashPartitioner(partition_cnt))    .mapPartitions(iter => {      val buffer = ListBuffer[Set[Long]]()for(t        val core_id_set:Set[Long] = t._2        buffer.append(core_id_set)      }      val merged_buffer = mergeSets(buffer)var result = List[(Long,Set[Long])]()for(core_id_set        val min_core_id = core_id_set.minresult = result:+(min_core_id,core_id_set)      }      result.iterator    })  rdd_merged}
//分区迭代计算,可以根据需要调整迭代次数和分区数量rdd_core = mergeRDD(rdd_core,8)rdd_core = mergeRDD(rdd_core,4)rdd_core = mergeRDD(rdd_core,1)rdd_core.collect.foreach(println)

三,完整范例

完整范例还包括临时聚类簇的生成,以及最终聚类信息的整理。鉴于该部分代码较为冗长,在当前文章中不展示全部代码,仅说明最终结果。范例的输入数据和《20分钟学会DBSCAN聚类算法》文中完全一致,共500个样本点。聚类结果输出如下:该结果中,聚类簇数量为2个。噪声点数量为500-242-241 = 17个,和调用sklearn中的结果完全一致。添加云哥的公众号,并后台回复关键字:"源码",获取完整范例代码。

点云谱聚类实现代码_Spark跑DBSCAN算法,工业级代码长啥样?相关推荐

  1. 谱聚类Python代码详解

    谱聚类算法步骤 整体来说,谱聚类算法要做的就是先求出相似性矩阵,然后对该矩阵归一化运算,之后求前个特征向量,最后运用K-means算法分类. 实际上,谱聚类要做的事情其实就是将高维度的数据,以特征向量 ...

  2. 机器学习(聚类十)——谱聚类及代码实现

    谱聚类是基于谱图理论基础上的一种聚类方法,与传统的聚类方法相比:具有在任意形状的样本空间上聚类并且收敛于全局最优解的优点.(但效率不高,实际工作中用的比较少) 谱聚类 通过对样本数据的拉普拉斯矩阵的特 ...

  3. 谱聚类python代码_Python 谱聚类算法从零开始

    谱聚类算法是一种常用的无监督机器学习算法,其性能优于其他聚类方法. 此外,谱聚类实现起来非常简单,并且可以通过标准线性代数方法有效地求解. 在谱聚类算法中,根据数据点之间的相似性而不是k-均值中的绝对 ...

  4. 转:完整的最简单的谱聚类python代码

    http://blog.csdn.net/waleking/article/details/7584084 针对karate_club数据集,做了谱聚类.由于是2-way clustering,比较简 ...

  5. 谱聚类(Spectral Clustering)1——算法原理

    文章目录 简介 1. 准备工作 1.1 邻接矩阵 1.2 度矩阵 1.3 拉普拉斯矩阵 1.3.1 非归一化拉普拉斯矩阵 1.3.2 归一化拉普拉斯矩阵 1.4 相似图 1.4.1 ϵ\epsilon ...

  6. 谱聚类python代码_python中的谱聚类图

    没有太多光谱聚类的经验,只是按照文档进行(结果请跳到最后!)以下内容: 代码:import numpy as np import networkx as nx from sklearn.cluster ...

  7. matlab ncut谱聚类,NCUT 归一化分割、谱聚类之代码调试问题

    1 相比于c++,matlab的效率较低,为了解决这个问题,大家在matlab中调用c++,也就是说matlab调用的一些函数,本身是由c++编写完成的,执行的时候也是在c++编译器中执行.实现这个功 ...

  8. MNIST图像谱聚类的案例实现

    Index 目录索引 写在前面 谱聚类简介 数据集 代码实现步骤详述 图像预处理 谱聚类操作 聚类后的图像对应还原操作 写在前面 最近要做一个基于无监督学习的传统图像分类,需要使用到聚类分析方法,但看 ...

  9. MATLAB 谱聚类

    k-means 可以说是思想最简单的聚类了,但是它在应对非凸数据时却显得手足无措,例如如下的数据分类: 各类之间虽然间隔较远,但是非凸,这时候就需要引入谱聚类了(以下为谱聚类效果). 本文参考 [1] ...

最新文章

  1. 转大神的中国剩余定理
  2. javascript2008
  3. bzoj3223 splay
  4. C++中public,protected,private访问范围和用法
  5. 从四个层面落地,成为受欢迎、可信赖、懂技术的产品经理
  6. 【笔记】docker核心概念和使用 docker命令
  7. 开源的全面胜利背后,那些被遗忘的人性问题
  8. 【极速下载】gradle各版本快速下载地址大全
  9. 牛客寒假算法集训营1 小a与军团模拟器(启发式合并)
  10. 微信小程序3天刷量开流量主
  11. fastposter 2.2.0 新版本发布 电商级海报生成器
  12. mysql设备未就绪_SQL Server 返回了错误 21(设备未就绪。) 解决方法
  13. java设备未就绪_java.io.IOException: 设备未就绪
  14. JAVA EE是什么?
  15. JS–for循环嵌套
  16. C++中的平方、开方、绝对值怎么计算
  17. 两端固定弦的自由振动 | 分离变量法(一)| 偏微分方程(十三)
  18. Scapy使用文档中文版
  19. dnf跨几服务器比较稳定,DNF1228跨区了能干嘛 1228跨区问题解决一览
  20. iOS启动优化-二进制重排与Clang插桩

热门文章

  1. java重载中this的作用_Java2:构造方法、方法重载和this关键字
  2. cv2.error: opencv(4.4.0)_【从零学习OpenCV 4】图像金字塔
  3. 服务器磁盘空间占用,大文件查找
  4. webService(简单小demo)
  5. cookie session token 之间的区别
  6. Chrome在302重定向的时候对原请求产生2次请求的问题说明
  7. Algorithm: 匈牙利算法
  8. 用户体验设计答疑对话(半吊子和纯外行
  9. ArcGIS的BLOB字段与Access数据库BLOB字段的交换
  10. win7如何删除mariadb