此文已由作者叶林宝授权网易云社区发布。

欢迎访问网易云社区,了解更多网易技术产品运营经验。

方案四:Sort on Cell Values

简述:

上述方案三, 当数据行数较多, 情况下, 在二次排序还是可能出现oom情况, 而且, 不同的field_index的数据可能shuffle到同一个分区,这样就加大了oom的概率。当field_index本身取值较多 情况下, 增加分区数是其中一种解决方法。但是field_index取值本身就少于分区数的情况下, 增加分区数对缓解oom就没任何作用了。 如果 当field_value相比field_index较为分散, 且值较多的情况下, 不妨换个思维, 按field_value分区。 具体算法如下:

算法:

(1)将df 转换为(field_value, field_index)

(2)对分区内的数据, 用sortByKey根据 field_value排序 (rangPartition排序)

(3)利用mapPartitions确定每个分区内的每个field_index共有多少数据(不同分区中的filed_value相对有序, 例如partiiton1 中的filed_value比partition2中的field_value小)

(4)利用第(3)步数据, 确定每个field_index中所需要的排名的数据在哪个分区以及分区内第几条数据。例如要输出field_index_6的13th位数据,假设第一个分区已经包含10条数据, 则目标数据在第二个分区的第3条数据

(5)转换(4)计算结果为标准输出格式

代码:

(1)

/*** 将数据源df转换为(field_value, field_index)格式的rdd* @param dataFrame* @return*/def getValueColumnPairs(dataFrame : DataFrame): RDD[(Double, Int)] ={dataFrame.rdd.flatMap{row: Row => row.toSeq.zipWithIndex.map{case (v, index) => (v.toString.toDouble, index)}}}

(3)

/*** 对按照field_value排序后的sortedValueColumnPairs, 计算出每个分区上, 每个field_index分别有多少数据* @param sortedValueColumnPairs* @param numOfColumns* @return*/def getColumnsFreqPerPartition(sortedValueColumnPairs: RDD[(Double, Int)],numOfColumns : Int): Array[(Int, Array[Long])] = {val zero = Array.fill[Long](numOfColumns)(0)    def aggregateColumnFrequencies (partitionIndex : Int, valueColumnPairs : Iterator[(Double, Int)]) = {val columnsFreq : Array[Long] = valueColumnPairs.aggregate(zero)((a : Array[Long], v : (Double, Int)) => {val (value, colIndex) = v          //increment the cell in the zero array corresponding to this columna(colIndex) = a(colIndex) + 1La},(a : Array[Long], b : Array[Long]) => {a.zip(b).map{ case(aVal, bVal) => aVal + bVal}})Iterator((partitionIndex, columnsFreq))}sortedValueColumnPairs.mapPartitionsWithIndex(aggregateColumnFrequencies).collect()}

举例说明:

假设对(1)中转换后的数据, 按照field_value排序后, 各个分区的数据如下所示

Partition 1: (1.5, 0) (1.75, 1) (2.0, 2) (5.25, 0)

Partition 2: (7.5, 1) (9.5, 2)

则(2)的输出结果为:

[(0, [2, 1, 1]), (1, [0, 1, 1])]

(4)

/*** 计算每个field_index所需排位数据在第几个分区的第几条数据* @param targetRanks 排位数组* @param partitionColumnsFreq 每个分区的每个field_index包含多少数据* @param numOfColumns field个数* @return*/def getRanksLocationsWithinEachPart(targetRanks : List[Long],partitionColumnsFreq : Array[(Int, Array[Long])],numOfColumns : Int) : Array[(Int, List[(Int, Long)])] = {    // 二维数组, 存储当前每个field_index, 遍历到到第几条数据val runningTotal = Array.fill[Long](numOfColumns)(0)    // The partition indices are not necessarily in sorted order, so we need// to sort the partitionsColumnsFreq array by the partition index (the// first value in the tuple).partitionColumnsFreq.sortBy(_._1).map {      // relevantIndexList 存储分区上, 满足排位数组的field_index在该分区的第几条数据case (partitionIndex, columnsFreq) => val relevantIndexList = new mutable.MutableList[(Int, Long)]()columnsFreq.zipWithIndex.foreach{ case (colCount, colIndex) =>          // 当天field_index(即colIndex), 遍历到第几条数据val runningTotalCol = runningTotal(colIndex)          //  当前field_index(即colIndex),排位数组中哪些排位位于当前分区val ranksHere: List[Long] = targetRanks.filter(rank =>runningTotalCol < rank && runningTotalCol + colCount >= rank)          // 计算出当前分区,当前field_index(即colIndex), 满足排位数组的field_value在当前分区的位置relevantIndexList ++= ranksHere.map(rank => (colIndex, rank - runningTotalCol))runningTotal(colIndex) += colCount}(partitionIndex, relevantIndexList.toList)}}

举个例子:

假如目标排位:targetRanks: [5]

各分区各feild_index数据量:partitionColumnsFreq: [(0, [2, 3]), (1, [4, 1]), (2, [5, 2])]

字段个数:numOfColumns: 2

输出结果: [(0, []), (1, [(0, 3)]), (2, [(1, 1)])]

(5)

/*** 过滤出每个field_index 所需排位的数值* @param sortedValueColumnPairs* @param ranksLocations (4)中计算出的满足排位数组要求的每个分区上,每个field_index在该分区的第几条数据* @return*/def findTargetRanksIteratively( sortedValueColumnPairs : RDD[(Double, Int)], ranksLocations : Array[(Int, List[(Int, Long)])]):RDD[(Int, Double)] = {sortedValueColumnPairs.mapPartitionsWithIndex((partitionIndex : Int, valueColumnPairs : Iterator[(Double, Int)]) => {        // 当前分区上, 满足排位数组的feild_index及其在该分区上的位置val targetsInThisPart: List[(Int, Long)] = ranksLocations(partitionIndex)._2        if (targetsInThisPart.nonEmpty) {          // map中的key为field_index, value为该feild_index在当前分区中的哪些位置上的数据满足排位数组要求val columnsRelativeIndex: Map[Int, List[Long]] = targetsInThisPart.groupBy(_._1).mapValues(_.map(_._2))val columnsInThisPart = targetsInThisPart.map(_._1).distinct          // 存储各个field_index, 在分区遍历了多少条数据val runningTotals : mutable.HashMap[Int, Long]= new mutable.HashMap()runningTotals ++= columnsInThisPart.map(columnIndex => (columnIndex, 0L)).toMap          // 遍历当前分区的数据源, 格式为(field_value, field_index), 过滤出满足排位数据要求的数据valueColumnPairs.filter{            case(value, colIndex) =>lazy val thisPairIsTheRankStatistic: Boolean = {                // 每遍历一条数据, runningTotals上对应的field_index 当前已遍历数据量+1val total = runningTotals(colIndex) + 1LrunningTotals.update(colIndex, total)columnsRelativeIndex(colIndex).contains(total)}(runningTotals contains colIndex) && thisPairIsTheRankStatistic}.map(_.swap)} else {Iterator.empty}})}

分析:

(1)这种方法代码可读性较差

(2)需要遍历两遍原始数据

(3)相比于方案三, 更加有效避免executor内oom

(4)当field_value分布较离散的情况下, 这种方案相比于前三种, 效率更高

(5)上述算法中, 有两个潜在的问题, 当field_value倾斜情况下(即某个范围的值特别多),算法效率严重依赖于算法描述中的步骤(2)是否能将所有的field_value均匀的分配到各个partition;另一个问题是,当某些field_value重复现象比较多时, 是否可以合并对这些field_value的计数,而不是在一个partition中的iterator中挨个遍历这些重复数据。

备注:上述内容(问题背景、解决算法)取自《High Performance Spark Best Practices for Scaling and Optimizing Apache Spark》(作者: Holden Karau and Rachel Warren)

免费体验云安全(易盾)内容安全、验证码等服务

更多网易技术、产品、运营经验分享请点击。

相关文章:
【推荐】 [翻译]pytest测试框架(一)
【推荐】 浅谈js拖拽
【推荐】 HBase最佳实践-集群规划

转载于:https://www.cnblogs.com/163yun/p/9881058.html

大数据算法:排位问题(2)相关推荐

  1. 大数据算法系列——布隆过滤器

    大数据算法系列--布隆过滤器 一.简介 Bloom filter介绍 Bloom Filter(BF)是一种空间效率很高的随机数据结构,它利用位数组很简洁地表示一个集合,并能判断一个元素是否属于这个集 ...

  2. 《大数据算法》一1.2 大数据算法

    本节书摘来华章计算机<大数据算法>一书中的第1章 ,第1.2节,王宏志 编著, 更多章节内容可以访问云栖社区"华章计算机"公众号查看. 1.2 大数据算法 这一节我们概 ...

  3. 基于PyTorch重写sklearn,《现代大数据算法》

    HyperLearn是一个基于PyTorch重写的机器学习工具包Scikit Learn,它的一些模块速度更快.需要内存更少,效率提高了一倍. 专为大数据而设计,HyperLearn可以使用50%以下 ...

  4. 大数据算法与分析技术国家工程实验室将建设

    国家发展改革委近日正式下发通知,同意由西安交通大学作为承担单位,国家电网公司全球能源互联网研究院作为联合共建单位,筹建"大数据算法与分析技术国家工程实验室". 国网信通部落实公司党 ...

  5. 大数据算法_【中科大】大数据算法(2020年春季)

    算法与理论是计算机科学的核心领域之一.随着大数据时代的来临,传统的算法理论已经不能很好地解决人工智能. 物联网.工业制造等领域所遇到的实际问题.本门课程主要介绍基于大数据的新型算法技术,如随机采样.数 ...

  6. 漫画趣解大数据算法建模:买瓜

    大数据开发如何转型算法? 算法建模主要做什么?调参为什么玄学? 如何通俗理解算法建模过程.. 夕阳下的村东头,有一人来买瓜. 1 引子(买瓜) 忙碌的一天刚刚结束,村里的小张就匆匆的骑上车,准备买个西 ...

  7. 郦旭东小可爱的大数据算法课程期末复习

    郦旭东小可爱的大数据算法课程期末复习 kmeans问题 kmeans原始问题和kmeans alg算法问题 kmeans问题 kmeans原始问题和kmeans alg算法问题 kmeans 原始问题 ...

  8. 大数据算法培养计划!

    立春节气已经过去了一个多月,但职场却迎来了真正的就业寒冬."旺季不旺" ,职场人期待在年后招聘季实现的跳槽.涨薪.转行,只能被暂时搁置. 根据智联招聘最新发布的调研数据来看:春节后 ...

  9. 大数据算法_看过来!2019“神气”大数据算法与应用赛决赛在即

    前方注意! 第二届智慧气象服务创新大赛 2019"神气"大数据算法与应用赛 决赛就要开始啦~ 快来看看都有哪些队伍来参赛吧! 第二届智慧气象服务创新大赛--2019"神气 ...

  10. 大数据算法识别高自杀风险人群?准确率高得吓人

    每年4月1日,追忆张国荣,几乎成了固定的"节目",之所以如此"执着"地纪念,很大程度上就在于张国荣的自杀身亡,令人扼腕. 据统计,全世界每年有约80万人自杀死亡 ...

最新文章

  1. 回归算法 - 线性回归求解 θ(最大似然估计求解)
  2. MySQL视图的创建、修改与删除
  3. [Android问答] 开发环境问题集锦
  4. oracle查看列数据类型
  5. wxWidgets:文件类和函数
  6. 生产可用:是时候来一个微前端架构了!
  7. CF1137C:Museums Tour(缩点、分层图)
  8. 【Pytorch神经网络实战案例】13 构建变分自编码神经网络模型生成Fashon-MNST模拟数据
  9. Python正则表达式中的贪婪和非贪婪
  10. 4问教你搞定java中的ThreadLocal
  11. stm32 非debug模式程序无法运行
  12. win10 mysql zip 安装教程_windows10+mysql8.0.11zip安装教程详解
  13. broadcom linux网卡驱动下载,Broadcom Bcm57xx芯片网卡驱动Linux下安装方法
  14. android4.0.3校准屏幕和隐藏statusbar
  15. linux中打zip命令,zip命令 - Linux命令大全 | linux教程
  16. 微信公众平台开发文档
  17. 烽火服务器安装系统,烽火服务器进入bios配置
  18. python识图 web_python+flask搭建CNN在线识别手写中文网站
  19. 史上最全的TCP/IP协议原理
  20. 系统时间与格林威治时间

热门文章

  1. FreeBSD没有安装Ports的解决办法
  2. C++ Time类重载运算符
  3. 在docker中安装RabbitMQ
  4. 单链表的逆序java_java 实现单链表的逆序
  5. java poi jar maven_使用maven引入Apache poi jar包
  6. Python 途虎养车全系车型轮毂--参数分析与实现
  7. 【工具类】页面静态化 --- Freemarker的使用
  8. fastadmin的基本用法 自动生成crud模块
  9. BZOJ 2038: [2009国家集训队]小Z的袜子(hose)
  10. Mysql增加、删除和修改列属性和约束,和一些有用的查询语句