本文主要包含以下几部分:

  • 1.背景

  • 2.Spark支持的数据类型

    • 2.1 Local Vector(本地向量)

    • 2.2 Labeled point(带标签的点)

    • 2.3 Local Matrix(本地矩阵)

    • 2.4 Distributed Matrix(分布式矩阵)

    • 2.4.4 BlockMatrix

  • 3.相似度计算原理探索

    • 3.1 相似度计算

    • 3.2 公式拆解

    • 3.3 矩阵并行

    • 3.4 阅读源码

  • 4.Spark实现Item相似度计算

1.背景

之前小编在计算两两用户的item重合度,根据item重合度去评估两个用户之间的相似度,根据条件进行过滤之后大概有3000个用户,但每个用户对应的item量参差不齐,有上百万的,有几千的,这样在去构建笛卡尔积的时候,进行item数据关联,得到的用户集就会特别大,spark运行的时候就会很慢,而且会出现很严重的数据倾斜。这个时候了解到了spark支持的数据类型,看到了CoordinateMatrix,然后深究其原理,便看到了这篇文章,经过整理形成了此文。

本文出自「xingoo」在原文的基础上加以小编自己的理解形成的学习笔记,希望对读者有帮助。原文链接:Spark MLlib 之 大规模数据集的相似度计算原理探索

2.Spark支持的数据类型

官方文档地址:https://spark.apache.org/docs/latest/mllib-data-types.html

2.1 Local Vector(本地向量)

本地向量是从0开始的下标和double类型的数据组成,存储在本地机器上,所以称为Local Vector。它支持两种形式:

  • Dense (密集的向量)

  • Sparse (稀疏的向量)

比如一个向量[1.0,0.0,3.0],用Dense表示为:[1.0,0.0,3.0],用Sparse表示为:(3,[0,2],[1.0,3.0]),其中3为向量的长度,[0,2]表示元素[1.0,3.0]的位置,可见sparse形式下0.0是不存储的。

import org.apache.spark.mllib.linalg.Vectorsval denseVector = Vectors.dense(1.0,0.0,3.0)
val sparseVector1 = Vectors.sparse(3,Array(0,2),Array(1.0,3.0))
val sparseVector2 = Vectors.sparse(3,Seq((0,1.0),(2,3.0)))println(s"DenseVector is : $denseVector")
println(s"DenseVector to Sparse is : ${denseVector.toSparse}")println(s"sparseVector1 is : $sparseVector1")
println(s"sparseVector1 to Dense is : ${sparseVector1.toDense}")println(s"sparseVector2 is : $sparseVector2")
println(s"sparseVector2 to Dense is : ${sparseVector2.toDense}")

输出为:

DenseVector is : [1.0,0.0,3.0]
DenseVector to Sparse is : (3,[0,2],[1.0,3.0])sparseVector1 is : (3,[0,2],[1.0,3.0])
sparseVector1 to Dense is : [1.0,0.0,3.0]sparseVector2 is : (3,[0,2],[1.0,3.0])
sparseVector2 to Dense is : [1.0,0.0,3.0]

2.2 Labeled point(带标签的点)

labeled point由本地向量组成,既可以是dense向量,也可以是sparse向量。在mllib中常用于监督类算法,使用double类型来保存该类型的数据,因为也可以用于回归和分类算法。例如二分类,label可以是0(负例)或1(正例),对于多分类,label可以是0,1,2...

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPointval pos = LabeledPoint(1.0, Vectors.dense(1.0,0.0,3.0))
val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))

sparse data

稀疏数据存储是非常普遍的现象,mllib支持读取libsvm格式的数据,其数据格式如下:

label index1:value1,index2:value2 ...

其读取方式包括:

import org.apache.spark.mllib.util.MLUtils// method 1
spark.read.format("libsvm") .load("libsvm data path")// method 2
MLUtils.loadLibSVMFile(spark.sparkContext, "libsvm data path")

2.3 Local Matrix(本地矩阵)

local matrix由行下标,列索引和double类型的值组成,存储在本地机器上,mllib支持密集矩阵和稀疏矩阵,其存储是按照列进行存储的。

例如下面的为密集矩阵:

通过数组存储的形式为:[1.0, 3.0, 5.0, 2.0, 4.0, 6.0],矩阵大小为[3,2]

// Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
val denseMatrix = Matrices.dense(3,2, Array(1.0,3.0,5.0,2.0,4.0,6.0))
println(s"denseMatrix is : $denseMatrix")// Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))
val sparseMatrix = Matrices.sparse(3,2, Array(0,1,3),Array(0,2,1),Array(9,6,8))
println(s"sparseMatrix is : $sparseMatrix")

注:稀疏矩阵解释,首先指定矩阵是3行2列,Array(0, 1, 3)是指,第0个非零元素在第一列,第一第二个非零元素在第二列。

Array(0, 2, 1)是指,第一个非零元素在第0行,第二个非零元素在第2行,第三个非零元素在第1行。

此处设计比较好,假设100个元素分两列,不需要把每个元素所在列都标出来,只需要记录3个数字即可。Array(9, 6, 8)表示按顺序存储非零元素.

Array(0,1,3)比较难理解,可以参考以下文章:

  • https://www.cnblogs.com/lyy-blog/p/9288701.html

  • https://www.tuicool.com/articles/A3emmqis

2.4 Distributed Matrix(分布式矩阵)

一个分布式矩阵由下标和double类型的数据组成,不过分布式的矩阵的下标不是int类型,而是long类型,数据保存在一个或多个rdd中,选择一个正确的格式去存储分布式矩阵是非常重要的。分布式矩阵转换成不同的格式需要一个全局的shuffle(global shuffle),而全局shuffle的代价会非常高。到目前为止,Spark MLlib中已经实现了三种分布式矩阵。

最基本的分布式矩阵是RowMatrix,它是一个行式的分布式矩阵,没有行索引。比如一系列特征向量的集合。RowMatrix由一个RDD代表所有的行,每一行是一个本地向量。假设一个RowMatrix的列数不是特别巨大,那么一个简单的本地向量能够与driver进行联系,并且数据可以在单个节点上保存或使用。IndexedRowMatrix与RowMatrix类似但是有行索引,行索引可以用来区分行并且进行连接等操作。CoordinateMatrix是一个以协同列表(coordinate list)格式存储数据的分布式矩阵,数据以RDD形式存储。

注意:因为我们需要缓存矩阵的大小,所以分布式矩阵的RDDs格式是需要确定的,使用非确定RDDs的话会报错。

2.4.1 Row Matrix

RowMatrix它是一个行式的分布式矩阵,没有行索引。比如一系列特征向量的集合。RowMatrix由一个RDD代表所有的行,每一行是一个本地向量。因为每一行代表一个本地向量,所以它的列数被限制在Integer.max的范围内,在实际应用中不会太大。

一个RowMatrix可以由一个RDD[Vector]的实例创建。因此我们可以计算统计信息或者进行分解。QR分解(QR decomposition)是A=QR,其中Q是一个矩阵,R是一个上三角矩阵。对sigular value decomposition(SVD)和principal component analysis(PCA),可以去参考降维的部分。

// Row Matrix
println("Row Matrix ...")
val arr = Array(Vectors.dense(1,0),Vectors.dense(0,1))
val rows = spark.sparkContext.parallelize(arr)
val mat: RowMatrix = new RowMatrix(rows)
val m = mat.numRows()
val n = mat.numCols()
val qrResult = mat.tallSkinnyQR(true)
println(s"m is: $m,n is $n,\nqrResult is :")
qrResult.Q.rows.foreach(println)
println()
qrResult.R.rowIter.foreach(println)

输出为:

Row Matrix ...
m is: 2,n is 2,
qrResult is :
[1.0,0.0]
[0.0,1.0][1.0,0.0]
[0.0,1.0]

2.4.2 IndexedRowMatrix

IndexedRowMatrix与RowMatrix类似,但是它有行索引。由一个行索引RDD表示,索引每一行由一个long型行索引和一个本地向量组成。

一个IndexedRowMatrix可以由RDD[IndexedRow]的实例来生成,IndexedRow是一个(Long, Vector)的封装。去掉行索引,IndexedRowMatrix能够转换成RowMatrix。

// IndexedRowMatrix
println("Indexed Row Matrix ...")
val arr2 = Array(IndexedRow(0,Vectors.dense(1,0)),IndexedRow(1,Vectors.dense(0,1)))
val rows2: RDD[IndexedRow] = spark.sparkContext.parallelize(arr2)
val mat2 = new IndexedRowMatrix(rows2)
val m2 = mat2.numRows()
val n2 = mat2.numCols()
// 去掉行索引,转换成RowMatrix
val qrResult2 = mat2.toRowMatrix().tallSkinnyQR(true)
println(s"m2 is: $m2,n2 is $n2,\nqrResult2 is :")
qrResult2.Q.rows.foreach(println)
println()
qrResult2.R.rowIter.foreach(println)

输出为:

Indexed Row Matrix ...
m2 is: 2,n2 is 2,
qrResult2 is :
[1.0,0.0]
[0.0,1.0][1.0,0.0]
[0.0,1.0]

2.4.3 CoordinateMatrix

CoordinateMatrix是一个分布式矩阵,其实体集合是一个RDD,每一个是一个三元组(i:Long, j:Long, value:Double)。其中i是行索引,j是列索引,value是实体的值。当矩阵的维度很大并且是稀疏矩阵时,才使用CoordinateMatrix。

一个CoordinateMatrix可以通过一个RDD[MatrixEntry]的实例来创建,MatrixEntry是一个(Long, Long, Double)的封装。CoordinateMatrix可以通过调用toIndexedRowMatrix转换成一个IndexedRowMatrix。CoordinateMatrix的其他降维方法暂时还不支持(Spark-1.6.2)。

// CoordinateMatrix
println("Coordinate Matrix ...")
val arr3 = Array(MatrixEntry(0,0,1),MatrixEntry(1,1,1)
)
val entries = spark.sparkContext.parallelize(arr3)
val mat3 = new CoordinateMatrix(entries)
val m3 = mat.numRows()
val n3 = mat.numCols()
val qrResult3 = mat3.toIndexedRowMatrix().toRowMatrix().tallSkinnyQR(true)
println(s"m3 is: $m3,n3 is $n3,\nqrResult3 is :")
qrResult3.Q.rows.foreach(println)
println()
qrResult3.R.rowIter.foreach(println)

输出为:

Coordinate Matrix ...
m3 is: 2,n3 is 2,
rowMat3 is :
[1.0,0.0]
[0.0,1.0][1.0,0.0]
[0.0,1.0]

2.4.4 BlockMatrix

一个BlockMatrix是一个分布式的矩阵,由一个MatrixBlocks的RDD组成。MatrixBlock是一个三元组((Int, Int), Matrix),其中(Int, Int)是block的索引,Matrix是一个在指定位置上的维度为rowsPerBlock * colsPerBlock的子矩阵。BlockMatrix支持与另一个BlockMatrix对象的add和multiply操作。BlockMatrix提供了一个帮助方法validate,这个方法可以用于检测该`BlockMatrix·是否正确。

可以通过IndexedRowMatrix或者CoordinateMatrix调用toBlockMatrix快速得到BlockMatrix对象。默认情况下toBlockMatrix方法会得到一个1024 x 1024的BlockMatrix。使用时可以通过手动传递维度值来设置维度,toBlockMatrix(rowsPerBlock, colsPerBlock)。

// BlockMatrix
println("Block Matrix ...")
val arr4 = Array(MatrixEntry(0,0,1),MatrixEntry(1,1,1)
)
val entries4: RDD[MatrixEntry] = spark.sparkContext.parallelize(arr4)
val coordMat: CoordinateMatrix = new CoordinateMatrix(entries4)
val matA: BlockMatrix = coordMat.toBlockMatrix().cache()
// 检测BlockMatrix格式是否正确,错误的话会抛出异常,正确的话无其他影响
matA.validate()
matA.blocks.foreach(println)
val m4 = matA.numRowBlocks
val n4 = matA.numColBlocks
println(s"m4 is: $m4,n4 is $n4")// 计算A^T * A.
val ata = matA.transpose.multiply(matA)
ata.blocks.foreach(println)

输出为:

Block Matrix ...
((0,0),2 x 2 CSCMatrix
(0,0) 1.0
(1,1) 1.0)
m4 is: 1,n4 is 1
((0,0),1.0  0.0
0.0  1.0  )

3.相似度计算原理探索

无论是ICF基于物品的协同过滤、UCF基于用户的协同过滤、基于内容的推荐,最基本的环节都是计算相似度。如果样本特征维度很高或者<user, item, score>的维度很大,都会导致无法直接计算。设想一下100w*100w的二维矩阵,计算相似度怎么算?

在spark中RowMatrix提供了一种并行计算相似度的思路,下面就来看看其中的奥妙吧!

3.1 相似度计算

相似度有很多种,每一种适合的场景都不太一样。比如:

  • 欧氏距离,在几何中最简单的计算方法

  • 夹角余弦,通过方向计算相似度,通常在用户对商品评分、NLP等场景使用

  • 杰卡德距离,在不考虑每一样的具体值时使用

  • 皮尔森系数,与夹角余弦类似,但是可以去中心化。比如评分时,有人倾向于打高分,有人倾向于打低分,他们的最后效果在皮尔森中是一样的

  • 曼哈顿距离,一般在路径规划、地图类中常用,比如A*算法中使用曼哈顿来作为每一步代价值的一部分(F=G+H, G是从当前点移动到下一个点的距离,H是距离目标点的距离,这个H就可以用曼哈顿距离表示)

上面两个向量(x1,y1)和(x2,y2)计算夹角的余弦值就是两个向量方向的相似度,其公式为:

其中,表示的模,即每一项的平方和再开方。

3.2 公式拆解

那么如果向量不只是两维,而是n维呢?比如有两个向量:

第一个向量:

第二个向量:

他们的相似度计算方法套用上面的公式为:

通过上面的公式就可以发现,夹角余弦可以拆解成每一项与另一项对应位置的乘积x1∗y1,再除以每个向量自己的

就可以了。

3.3 矩阵并行

画个图看看,首先创建下面的矩阵:

注意,矩阵里面都是一列代表一个向量....上面是创建矩阵时的三元组,如果在spark中想要创建matrix,可以这样:

val df = spark.createDataFrame(Seq((0, 0, 1.0),(1, 0, 1.0),(2, 0, 1.0),(3, 0, 1.0),(0, 1, 2.0),(1, 1, 2.0),(2, 1, 1.0),(3, 1, 1.0),(0, 2, 3.0),(1, 2, 3.0),(2, 2, 3.0),(0, 3, 1.0),(1, 3, 1.0),(3, 3, 4.0)))val matrix = new CoordinateMatrix(df.map(row => MatrixEntry(row.getAs[Integer](0).toLong, row.getAs[Integer](1).toLong, row.getAs[Double](2))).toJavaRDD)

然后计算每一个向量的normL2,即平方和开根号。

以第一个和第二个向量计算为例,第一个向量为(1,1,1,1),第二个向量为(2,2,1,1),每一项除以对应的normL2,得到后面的两个向量:

两个向量最终的相似度为0.94。

那么在Spark如何快速并行处理呢?通过上面的例子,可以看到两个向量的相似度,需要把每一维度乘积后相加,但是一个向量一般都是跨RDD保存的,所以可以先计算所有向量的第一维,得出结果

最后对做一次reduceByKey累加结果即可.....

3.4 阅读源码

首先创建dataframe形成matrix:

import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}import org.apache.spark.sql.SparkSessionobject MatrixSimTest {  def main(args: Array[String]): Unit = {    // 创建dataframe,转换成matrix    val spark = SparkSession.builder().master("local[*]").appName("sim").getOrCreate()    spark.sparkContext.setLogLevel("WARN")    import spark.implicits._    val df = spark.createDataFrame(Seq(      (0, 0, 1.0),      (1, 0, 1.0),      (2, 0, 1.0),      (3, 0, 1.0),      (0, 1, 2.0),      (1, 1, 2.0),      (2, 1, 1.0),      (3, 1, 1.0),      (0, 2, 3.0),      (1, 2, 3.0),      (2, 2, 3.0),      (0, 3, 1.0),      (1, 3, 1.0),      (3, 3, 4.0)    ))    val matrix = new CoordinateMatrix(df.map(row => MatrixEntry(row.getAs[Integer](0).toLong, row.getAs[Integer](1).toLong, row.getAs[Double](2))).toJavaRDD)    // 调用sim方法    val x = matrix.toRowMatrix().columnSimilarities()    // 得到相似度结果    x.entries.collect().foreach(println)  }}

得到的结果为:

MatrixEntry(0,3,0.7071067811865476)
MatrixEntry(0,2,0.8660254037844386)
MatrixEntry(2,3,0.2721655269759087)
MatrixEntry(0,1,0.9486832980505139)
MatrixEntry(1,2,0.9128709291752768)
MatrixEntry(1,3,0.596284793999944)

直接进入columnSimilarities方法看看是怎么个流程吧!

def columnSimilarities(): CoordinateMatrix = {columnSimilarities(0.0)
}

内部调用了带阈值的相似度方法,这里的阈值是指相似度小于该值时,输出结果时,会自动过滤掉。

def columnSimilarities(threshold: Double): CoordinateMatrix = {//检查参数...val gamma = if (threshold < 1e-6) {Double.PositiveInfinity} else {10 * math.log(numCols()) / threshold}columnSimilaritiesDIMSUM(computeColumnSummaryStatistics().normL2.toArray, gamma)
}

这里的gamma用于采样,具体的做法咱们来继续看源码。然后看一下computeColumnSummaryStatistics().normL2.toArray这个方法:

def computeColumnSummaryStatistics(): MultivariateStatisticalSummary = {val summary = rows.treeAggregate(new MultivariateOnlineSummarizer)((aggregator, data) => aggregator.add(data),(aggregator1, aggregator2) => aggregator1.merge(aggregator2))updateNumRows(summary.count)summary
}

之前有介绍这个treeAggregate是一种带“预reduce”的map-reduce,返回的summary,里面帮我们统计了每一个向量的很多指标,比如

currMean    为 每一个向量的平均值
currM2      为 每个向量的每一维的平方和
currL1      为 每个向量的绝对值的和
currMax     为 每个向量的最大值
currMin     为 每个向量的最小值
nnz         为 每个向量的非0个数

这里我们只需要currM2,它是每个向量的平方和。summary调用的normL2方法:

override def normL2: Vector = {require(totalWeightSum > 0, s"Nothing has been added to this summarizer.")val realMagnitude = Array.ofDim[Double](n)var i = 0val len = currM2.lengthwhile (i < len) {realMagnitude(i) = math.sqrt(currM2(i))i += 1}Vectors.dense(realMagnitude)
}

上面这步就是对平方和开个根号,这样就求出来了每个向量的分母部分。下面就是最关键的地方了:

private[mllib] def columnSimilaritiesDIMSUM(colMags: Array[Double],gamma: Double): CoordinateMatrix = {// 一些参数校验// 对gamma进行开方val sg = math.sqrt(gamma) // sqrt(gamma) used many times// 这里把前面算的平方根的值设置一个默认值,因为如果为0,除0会报异常,所以设置为1val colMagsCorrected = colMags.map(x => if (x == 0) 1.0 else x)// 把抽样概率数组 和 平方根数组进行广播val sc = rows.contextval pBV = sc.broadcast(colMagsCorrected.map(c => sg / c))val qBV = sc.broadcast(colMagsCorrected.map(c => math.min(sg, c)))// 遍历每一行,计算每个向量该维的乘积,形成三元组val sims = rows.mapPartitionsWithIndex { (indx, iter) =>val p = pBV.valueval q = qBV.value// 获得随机值val rand = new XORShiftRandom(indx)val scaled = new Array[Double](p.size)iter.flatMap { row =>row match {case SparseVector(size, indices, values) =>// 如果是稀疏向量,遍历向量的每一维,除以平方根val nnz = indices.sizevar k = 0while (k < nnz) {scaled(k) = values(k) / q(indices(k))k += 1}// 遍历向量数组,计算每一个数值与其他数值的乘机。// 比如向量(1, 2, 0 ,1)// 得到的结果为 (0,1,value)(0,3,value)(2,3,value)Iterator.tabulate (nnz) { k =>val buf = new ListBuffer[((Int, Int), Double)]()val i = indices(k)val iVal = scaled(k)// 判断当前列是否符合采样范围,如果小于采样值,就忽略if (iVal != 0 && rand.nextDouble() < p(i)) {var l = k + 1while (l < nnz) {val j = indices(l)val jVal = scaled(l)if (jVal != 0 && rand.nextDouble() < p(j)) {// 计算每一维与其他维的值buf += (((i, j), iVal * jVal))}l += 1}}buf}.flattencase DenseVector(values) =>// 跟稀疏同理val n = values.sizevar i = 0while (i < n) {scaled(i) = values(i) / q(i)i += 1}Iterator.tabulate (n) { i =>val buf = new ListBuffer[((Int, Int), Double)]()val iVal = scaled(i)if (iVal != 0 && rand.nextDouble() < p(i)) {var j = i + 1while (j < n) {val jVal = scaled(j)if (jVal != 0 && rand.nextDouble() < p(j)) {buf += (((i, j), iVal * jVal))}j += 1}}buf}.flatten}}// 最后再执行一个reduceBykey,累加所有的值,就是i和j的相似度}.reduceByKey(_ + _).map { case ((i, j), sim) =>MatrixEntry(i.toLong, j.toLong, sim)}new CoordinateMatrix(sims, numCols(), numCols())}

这样把所有向量的平方和广播后,每一行都可以在不同的节点并行处理了。

总结来说,Spark提供的这个计算相似度的方法有两点优势:

  • 通过拆解公式,使得每一行独立计算,加快速度

  • 提供采样方案,以采样方式抽样固定的特征维度计算相似度

不过杰卡德目前并不能使用这种方法来计算,因为杰卡德中间有一项需要对向量求dot,这种方式就不适合了;如果杰卡德想要快速计算,可以去参考LSH局部敏感哈希算法,这里就不详细说明了。

4.Spark实现Item相似度计算

这里使用的数据集是MovieLens,计算Item的相似度,为用户推荐部分没有实现,不过也比较简单,感兴趣的用户可以自己试着实现一下看看。

// 加载数据 (userid, itemid, score) => (string, long, double)
val dataPath = "data/ml-100k/ua.base"
val dataTemp: RDD[(String, (Long, Double))] = spark.sparkContext.textFile(dataPath).map(_.split("\t")).map(l => (l(0), (l(1).toLong, l(2).toDouble)))// 理论为上userid 可能为 设备id等字符串,所以进行编码
val userIndex: RDD[(String, Long)] = dataTemp.map(_._1).distinct().zipWithIndex()// (userid index, itemid, score)
val data:RDD[(Long, Long, Double)] = dataTemp.leftOuterJoin(userIndex).filter(_._2._2.nonEmpty).map(l => (l._2._2.get, l._2._1._1, l._2._1._2)).persist(StorageLevel.MEMORY_AND_DISK)
println(s"使用的数据条数为:${data.count()}")
data.take(3).foreach(l => println(l))val matrix = data.map(_ match { case (uuid, spuid, rate) => MatrixEntry(uuid, spuid, rate) })// new CoordinateMatrix(matrix) 除了传入一个rdd之外
// 还有另外两个参数,rows 和 cols,如果不传的话默认是i,j中的最大值
val topicSims: CoordinateMatrix = new CoordinateMatrix(matrix)// toRowMatrix() 调用的是 toIndexedRowMatrix().toRowMatrix()
val itemSim: CoordinateMatrix = topicSims.toRowMatrix().columnSimilarities()val itemSimRDD = itemSim.entries.union(itemSim.entries.map(m => MatrixEntry(m.j, m.i, m.value)))println("生成计算结果 ...")
itemSimRDD.map(f => (f.i.toLong, f.j.toLong, f.value)).take(10).foreach(l => println(l))

Over!

Spark中如何使用矩阵运算间接实现i2i相关推荐

  1. Spark中内存模型管理

    一.概述 Spark 作为一个基于内存的分布式计算引擎,其内存管理模块在整个系统中扮演着非常重要的角色.理解 Spark 内存管理的基本原理,有助于更好地开发 Spark 应用程序和进行性能调优.本文 ...

  2. Spark中Task数量的分析

    本文主要说一下Spark中Task相关概念.RDD计算时Task的数量.Spark Streaming计算时Task的数量. Task作为Spark作业执行的最小单位,Task的数量及运行快慢间接决定 ...

  3. Spark中的内存计算是什么?

    由于计算的融合只发生在 Stages 内部,而 Shuffle 是切割 Stages 的边界,因此一旦发生 Shuffle,内存计算的代码融合就会中断. 在 Spark 中,内存计算有两层含义: 第一 ...

  4. Java查询spark中生成的文件_java+spark-sql查询excel

    Spark官网下载Spark 下载Windows下Hadoop所需文件winutils.exe 同学们自己网上找找吧,这里就不上传了,其实该文件可有可无,报错也不影响Spark运行,强迫症可以下载,本 ...

  5. Spark中Task,Partition,RDD、节点数、Executor数、core数目(线程池)、mem数

    Spark中Task,Partition,RDD.节点数.Executor数.core数目的关系和Application,Driver,Job,Task,Stage理解 from:https://bl ...

  6. Spark中常用的算法

    Spark中常用的算法: 3.2.1 分类算法 分类算法属于监督式学习,使用类标签已知的样本建立一个分类函数或分类模型,应用分类模型,能把数据库中的类标签未知的数据进行归类.分类在数据挖掘中是一项重要 ...

  7. 解决spark中遇到的数据倾斜问题

    一. 数据倾斜的现象 多数task执行速度较快,少数task执行时间非常长,或者等待很长时间后提示你内存不足,执行失败. 二. 数据倾斜的原因 常见于各种shuffle操作,例如reduceByKey ...

  8. 理解spark中的job、stage、task

    什么是Spark? Spark是处理大数据常用的计算引擎.Spark是一个用来实现快速而通用的集群计算的平台.扩展了广泛使用的MapReduce计算模型,而且高效地支持更多的计算模式,包括交互式查询和 ...

  9. spark中local模式与cluster模式使用场景_Spark 知识点 ( 架构 RDD Task )

    1. Spark介绍 Apache Spark是一个围绕速度.易用性和复杂分析构建的大数据处理框架,最初在2009年由加州大学伯克利分校的AMPLab开发,并于2010年成为Apache的开源项目之一 ...

最新文章

  1. 恢复Opera11.50地址栏的下拉列表按钮
  2. 数据库MYSQL学习系列三
  3. 数据库常用语句(日常填充)
  4. Boost:在向量中打印值
  5. 自动化设计模式Page Object
  6. js实现算法--割字符串
  7. SQL中跨服务器查询
  8. 【编程大系】Java资源汇总
  9. python桌面翻译_Python实现桌面翻译工具【新手必学】
  10. 输出一个数的二进制序列中1的个数(三种方法)
  11. 融云开发案例核心代码分享
  12. textbox点击后弹出系统键盘导致背景重置为白色的问题
  13. Expert 诊断优化系列------------------你的CPU高么?
  14. Ajax动态滚动加载数据
  15. Linux下SVN客户端基本使用教程
  16. QT自定义QmessageBox对话框
  17. Java接口组装一台计算机编写各组件厂商分别实现CPU,EMS,HardDisk接口
  18. Go语言 —— 前景
  19. C++ 知识补给(二)
  20. QlikView学习笔记

热门文章

  1. 第10章两个独立样本的t检验
  2. vue 实现第三方QQ登录
  3. 失落的帝国:盛大业务大收缩
  4. 三极管类型及工作状态判断
  5. 应公司需要,开发了一个CPU卡的发卡工具
  6. SwiftUI iOS 完整项目之基于CoreData构建购物计划App(教程含源码App Store上线app)
  7. Unity打包的PC项目生成一个EXE文件
  8. 原来华为手机的拨号键盘除了打电话,还有这些隐藏功能,涨知识了
  9. 【机械仿真】曲柄摇杆机构运动仿真含Matlab源码
  10. 中国LED植物照明行业盈利能力与进出口前景预测报告2021-2026年