从下面分析可以看出,是先做了hash计算,然后使用hash join table来讲hash值相等的数据合并在一起。然后再使用udf计算距离,最后再filter出满足阈值的数据:

参考:https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala
  /*** Join two datasets to approximately find all pairs of rows whose distance are smaller than* the threshold. If the [[outputCol]] is missing, the method will transform the data; if the* [[outputCol]] exists, it will use the [[outputCol]]. This allows caching of the transformed* data when necessary.** @param datasetA One of the datasets to join.* @param datasetB Another dataset to join.* @param threshold The threshold for the distance of row pairs.* @param distCol Output column for storing the distance between each pair of rows.* @return A joined dataset containing pairs of rows. The original rows are in columns*         "datasetA" and "datasetB", and a column "distCol" is added to show the distance*         between each pair.*/def approxSimilarityJoin(datasetA: Dataset[_],datasetB: Dataset[_],threshold: Double,distCol: String): Dataset[_] = {val leftColName = "datasetA"val rightColName = "datasetB"val explodeCols = Seq("entry", "hashValue")val explodedA = processDataset(datasetA, leftColName, explodeCols)// If this is a self join, we need to recreate the inputCol of datasetB to avoid ambiguity.// TODO: Remove recreateCol logic once SPARK-17154 is resolved.val explodedB = if (datasetA != datasetB) {processDataset(datasetB, rightColName, explodeCols)} else {val recreatedB = recreateCol(datasetB, $(inputCol), s"${$(inputCol)}#${Random.nextString(5)}")processDataset(recreatedB, rightColName, explodeCols)}// Do a hash join on where the exploded hash values are equal.val joinedDataset = explodedA.join(explodedB, explodeCols).drop(explodeCols: _*).distinct()// Add a new column to store the distance of the two rows.val distUDF = udf((x: Vector, y: Vector) => keyDistance(x, y), DataTypes.DoubleType)val joinedDatasetWithDist = joinedDataset.select(col("*"),distUDF(col(s"$leftColName.${$(inputCol)}"), col(s"$rightColName.${$(inputCol)}")).as(distCol))// Filter the joined datasets where the distance are smaller than the threshold.joinedDatasetWithDist.filter(col(distCol) < threshold)}

补充:

sql join 算法 时间复杂度

2016年08月26日 12:04:34 stevewongbuaa 阅读数 2477

参考

stackoverflow

笔记

sql语句如下:

SELECT  T1.name, T2.date
FROM    T1, T2
WHERE T1.id=T2.id AND T1.color='red' AND T2.type='CAR'

假设T1有m行,T2有n行,那么,普通情况下,应该要遍历T1的每一行的id(m),然后在遍历T2(n)中找出T2.id = T1.id的行进行join。时间复杂度应该是O(m*n)

如果没有索引的话,engine会选择hash join或者merge join进行优化。

hash join是这样的:

  1. 选择被哈希的表,通常是小一点的表。让我们愉快地假定是T1更小吧。
  2. T1所有的记录都被遍历。如果记录符合color=’red’,这条记录就会进去哈希表,以id为key,以name为value。
  3. T2所有的记录被遍历。如果记录符合type=’CAR’,使用这条记录的id去搜索哈希表,所有命中的记录的name的值,都被返回,还带上了当前记录的date的值,这样就可以把两者join起来了。

时间复杂度O(n+m),实现hash表是O(n),hash表查找是O(m),直接将其相加。

merge join是这样的:

1.复制T1(id, name),根据id排序。
2.复制T2(id, date),根据id排序。
3.两个指针指向两个表的最小值。

    >1 2<2 32 43 5

4.在循环中比较指针,如果match,就返回记录。如果不match,指向较小值的指针指向下一个记录。

>1  2<  - 不match, 左指针小,左指针++ 2 3 2 4 3 5 1 2< - match, 返回记录,两个指针都++ >2 3 2 4 3 5 1 2 - match, 返回记录,两个指针都++ 2 3< 2 4 >3 5 1 2 - 左指针越界,查询结束。 2 3 2 4< 3 5 >

时间复杂度O(n*log(n)+m*log(m))。排序算法的复杂度分别是O(n*log(n))和O(m*log(m)),直接将两者相加。

在这种情况下,使查询更加复杂反而可以加快速度,因为更少的行需要经受join-level的测试?

当然了。

如果原来的query没有where语句,如

SELECT  T1.name, T2.date
FROM    T1, T2

是更简单的,但是会返回更多的结果并运行更长的时间。

  

hash函数的补充:

可以看到 hashFunction 涉及到indices 字段下表的计算。另外的distance计算使用了jaccard相似度。

from:https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala

/*** :: Experimental ::** Model produced by [[MinHashLSH]], where multiple hash functions are stored. Each hash function* is picked from the following family of hash functions, where a_i and b_i are randomly chosen* integers less than prime:*    `h_i(x) = ((x \cdot a_i + b_i) \mod prime)`** This hash family is approximately min-wise independent according to the reference.** Reference:* Tom Bohman, Colin Cooper, and Alan Frieze. "Min-wise independent linear permutations."* Electronic Journal of Combinatorics 7 (2000): R26.** @param randCoefficients Pairs of random coefficients. Each pair is used by one hash function.*/
@Experimental
@Since("2.1.0")
class MinHashLSHModel private[ml](override val uid: String,private[ml] val randCoefficients: Array[(Int, Int)])extends LSHModel[MinHashLSHModel] {/** @group setParam */@Since("2.4.0")override def setInputCol(value: String): this.type = super.set(inputCol, value)/** @group setParam */@Since("2.4.0")override def setOutputCol(value: String): this.type = super.set(outputCol, value)@Since("2.1.0")override protected[ml] def hashFunction(elems: Vector): Array[Vector] = {require(elems.numNonzeros > 0, "Must have at least 1 non zero entry.")val elemsList = elems.toSparse.indices.toListval hashValues = randCoefficients.map { case (a, b) =>elemsList.map { elem: Int =>((1L + elem) * a + b) % MinHashLSH.HASH_PRIME}.min.toDouble}// TODO: Output vectors of dimension numHashFunctions in SPARK-18450hashValues.map(Vectors.dense(_))}@Since("2.1.0")override protected[ml] def keyDistance(x: Vector, y: Vector): Double = {val xSet = x.toSparse.indices.toSetval ySet = y.toSparse.indices.toSetval intersectionSize = xSet.intersect(ySet).size.toDoubleval unionSize = xSet.size + ySet.size - intersectionSizeassert(unionSize > 0, "The union of two input sets must have at least 1 elements")1 - intersectionSize / unionSize}@Since("2.1.0")override protected[ml] def hashDistance(x: Seq[Vector], y: Seq[Vector]): Double = {// Since it's generated by hashing, it will be a pair of dense vectors.// TODO: This hashDistance function requires more discussion in SPARK-18454x.zip(y).map(vectorPair =>vectorPair._1.toArray.zip(vectorPair._2.toArray).count(pair => pair._1 != pair._2)).min}@Since("2.1.0")override def copy(extra: ParamMap): MinHashLSHModel = {val copied = new MinHashLSHModel(uid, randCoefficients).setParent(parent)copyValues(copied, extra)}@Since("2.1.0")override def write: MLWriter = new MinHashLSHModel.MinHashLSHModelWriter(this)
}

  

转载于:https://www.cnblogs.com/bonelee/p/11151729.html

minhash pyspark 源码分析——hash join table是关键相关推荐

  1. Thread源码分析之join方法

    2019独角兽企业重金招聘Python工程师标准>>> join方法示例1 源码 import java.util.concurrent.TimeUnit;public class ...

  2. 图片加载框架Picasso - 源码分析

    简书:图片加载框架Picasso - 源码分析 前一篇文章讲了Picasso的详细用法,Picasso 是一个强大的图片加载缓存框架,一个非常优秀的开源库,学习一个优秀的开源库,,我们不仅仅是学习它的 ...

  3. 【热修复】Andfix源码分析

    转载请标注来源:http://www.cnblogs.com/charles04/p/8471301.html Andfix源码分析 0.目录 背景介绍 源码分析 方案评价 总结与思考 参考文献 1. ...

  4. java join 源码_java并发:join源码分析

    join join join是Thread方法,它的作用是A线程中子线程B在运行之后调用了B.join(),A线程会阻塞直至B线程执行结束 join源码(只有继承Thread类才能使用) 基于open ...

  5. java并发:join源码分析

    join join join是Thread方法,它的作用是A线程中子线程B在运行之后调用了B.join(),A线程会阻塞直至B线程执行结束 join源码(只有继承Thread类才能使用) 基于open ...

  6. postgreSQL源码分析——索引的建立与使用——Hash索引(2)

    2021SC@SDUSC 目录 Hash索引创建 hashbuild函数 _hash_init函数 Hash索引的插入 hashinsert函数 _hash_doinsert函数 总结 Hash索引创 ...

  7. 多线程高并发编程(8) -- Fork/Join源码分析

    一.概念 Fork/Join就是将一个大任务分解(fork)成许多个独立的小任务,然后多线程并行去处理这些小任务,每个小任务处理完得到结果再进行合并(join)得到最终的结果. 流程:任务继承Recu ...

  8. 《源码分析转载收藏向—数据库内核月报》

    月报原地址: 数据库内核月报 现在记录一下,我可能需要参考的几篇文章吧,不然以后还得找: MySQL · 代码阅读 · MYSQL开源软件源码阅读小技巧 MySQL · 源码分析 · 聚合函数(Agg ...

  9. 《MySQL 8.0.22执行器源码分析(3.2)关于HashJoinIterator》

    在本文章之前,应该了解的概念: 连接的一些概念.NLJ.BNL.HashJoin算法. 目录 关于join连接 probe行保存概念 Hashjoin执行流程(十分重要) HashJoinIterat ...

最新文章

  1. 解密 Redis 助力双 11 背后电商秒杀系统
  2. 电脑服务器高配置和高性能,为高性能工作站服务 超算系统配置推荐
  3. Ctex软件介绍安装破解(是WinEdt_v6.0破解)
  4. PHP框架CodeIgniter之连接MS Sqlserver2014及URL Rewrite问题解决
  5. 【数理知识】二次型求导 矩阵求导
  6. SQL SERVER 使用 OPENRORWSET(BULK)函数将txt文件中的数据批量插入表中(2)
  7. ajax值上传不过去,ajax上传时参数提交不更新等相关问题
  8. log4net 记录日志到sqlserver
  9. 云计算时代,数据中心架构三层到大二层的演变
  10. php实习生很苦吗,实习生就该被欺负做又累又苦的活吗?!
  11. __attribute__((format(printf,m,n)))
  12. 蓝桥杯数字三角形java,蓝桥杯数字三角形(java)
  13. python字典默认排序_Python字典练习:设置默认获取排序,小,知识点,setdefaultgetsorted...
  14. webstorm软件使用汉化包
  15. cdn网络加速原理剖析
  16. 再谈本土EDA竞争力顺便聊聊DTCO在中国落地
  17. 【BZOJ4199】品酒大会(NOI2015)-后缀数组+并查集
  18. idea 的Igonre 设置
  19. 校园网路由器有线中继(针对802.1X)
  20. Redis缓存击穿、雪崩、穿透!(超详细)

热门文章

  1. pyqt5从子目录加载qrc文件_PyQt5快速上手基础篇10-QSettings用法
  2. 高中计算机会考优秀网,北京高三体育会考成绩255分为优秀
  3. Awk之if ,else if,else用法
  4. Php魔术函数学习与应用 __construct() __destruct() __get()等
  5. 仿站小工具8.0_安卓微信8.0版本可以升级了!新增4个实用功能,内附更新方式...
  6. java servletcontext_Java ServletContext对象用法解析
  7. oracle分页置顶,[置顶]       ibatis查询oracle分页
  8. 台式计算机怎么加一个硬盘,如何再安装一个台式计算机硬盘驱动器?如何在计算机安装中添加额外的硬盘...
  9. ajax返回304,jquery $ajax GET请求在IE浏览器兼容中遇到的304 cache请求的经验分享
  10. RSA加密算法详解以及RSA在laravel中的应用