因为项目需要对大量数据进行排序计算top k,开始了解并行计算框架,接触了spark,spark都是用scala写的,所以为了了解spark,恶补了一阵scala语言。
这是一种非常简练的函数式语言,最让我感觉兴趣的就是它天然支持并行计算,并且因为生成的目标代码是java虚拟上的class,所以与java有着天然的亲和力。可以与java代码之间自由的互相调用。
原本是想通过spark架构来实现大数据的快速排序(实现top k),仔细研究了spark后发现有难度,就暂时放弃了这个方案。但是想到了新的解决方法,就是利用scala(研究spark的副产品)的并行特性来实现大数据的快速排序模块,加入到系统中,供java代码调用。。。
下面的代码就是这个模块的核心排序算法。
总体的流程就是:

在top_mutable_par方法中,对要排序的数据进行分段,然后利用scala的并行特性,以并行方式调用sort_range对每一段数据进行分段排序,之后再reduce所有的分段排序结果

import scala.collection.mutable
import scala.collection.JavaConversions
/*** 实现并行堆排序算法* @author guyadong* @param <A>* @param <B>* @param <S>*/
class HeapSort[A,B,S<:Iterable[A]](f:A=>B)(implicit ord: Ordering[B]){/*** 对l排序返回排序后的Seq* @param l 待排序集合的迭代器* @param desc 降/升序(默认为true,降序)* @return*/
def sort(l: S,desc:Boolean=true)=HeapSort.sort(f)(l,0,desc)/*** 对l排序并返回前top个结果* @param l 待排序集合的迭代器* @param top 返回最多结果数目* @param desc 降/升序(默认为true,降序)* @return*/
def top(l: S,top:Int,desc:Boolean=true)=HeapSort.sort(f)(l,top,desc)/*** 对可变集合排序,返回排序后的Seq* @param l 待排序可变集合的迭代器* @param desc 降/升序(默认为true,降序) * @return*/
def sort_m[M<:mutable.Seq[A]](l: M,desc:Boolean=true)=HeapSort.sort_mutable(f)(l,0,desc)/*** 对可变集合l排序并返回前top个结果* @param l 待排序可变集合的迭代器* @param top 返回最多结果数目* @param desc 降/升序(默认为true,降序) * @return*/
def top_m[M<:mutable.Seq[A]](l: M,top:Int,desc:Boolean=true)=HeapSort.sort_mutable(f)(l,top,desc)/*** 对可变集合l并行排序并返回前top个结果* @param l 待排序可变集合的迭代器* @param top 返回最多结果数目* @param desc 降/升序(默认为true,降序)* @return*/
def top_m_par[M<:mutable.Seq[A]](l: M,top:Int,desc:Boolean=true)=HeapSort.top_mutable_par(f)(l,top,desc)/*** 对可变集合l的指定范围排序并返回排序后的Seq* @param seq 待排序可变集合* @param top 返回最多结果数目* @param desc 降/升序(默认为true,降序)* @param from 待排序的起始位置* @param until 待排序的结束位置* @return*/
def sort_range[M<:mutable.Seq[A]](seq: M,top:Int,desc:Boolean=true)(from:Int=0, until:Int=seq.length)=HeapSort.sort_mutableRange(f)(seq,top,desc)(from, until)/*** 对seq中两个已经排序的区段进行合并排序,将src合并到dst* @param seq 可变集合* @param src  待合并的源区段(起始位置,结束位置)* @param dst  待合并的目标区段(起始位置,结束位置)* @param desc 降/升序(默认为true,降序)* @return*/
def merge2Seq(seq: mutable.Seq[A],src:(Int,Int),dst:(Int,Int), desc: Boolean=true)=HeapSort.merge2Seq(f)(seq, src, dst, desc)/*** 对seq中两个已经排序的区段进行合并排序,将src合并到dst* @param seq 可变集合* @param src  待合并的源区段(起始位置,结束位置)* @param dst  待合并的目标区段(起始位置,结束位置)* @param desc 降/升序(默认为true,降序)* @return*/
def merge2Seq2(seq: mutable.Seq[A],src:(Int,Int),dst:(Int,Int), desc: Boolean=true)=HeapSort.merge2Seq2(f)(seq, src, dst, desc)/*** 对seq中两个已经排序的区段进行合并排序,将src合并到dst<br>* 该算法在排序过程不申请新内存* @param seq 可变集合* @param src  待合并的源区段(起始位置,结束位置)* @param dst  待合并的目标区段(起始位置,结束位置)* @param desc 降/升序(默认为true,降序)* @return*/
def merge2SeqNM(seq: mutable.Seq[A],src:(Int,Int),dst:(Int,Int), desc: Boolean=true)=HeapSort.merge2SeqNM(f)(seq, src, dst, desc)
}
object HeapSort {def sort[A, B, S <: Iterable[A]](f: A => B)(iterator: S, top: Int = 0,desc:Boolean=true)(implicit ord: Ordering[B]) = {val bf = iterator.toBuffer    sort_mutable(f)(bf, top,desc)}def sort_mutable[A, B, S<:mutable.Seq[A]](f: A => B)(seq: S, top: Int = 0,desc:Boolean=true)(implicit ord: Ordering[B]) = {sort_mutableRange(f)(seq,top,desc)()(if (top < seq.length && top > 0) seq.takeRight(top) else seq).reverse}private def sort_mutableRange[A, B, S<:mutable.Seq[A]](f: A => B)(seq: S, top: Int = 0, desc:Boolean=true)(from:Int=0, until:Int=seq.length)(implicit ord: Ordering[B]) = {buildHeapRange(f)(seq,desc)(from,until); // 构建堆val sublen=until-fromval toplen = if (top <= 0 || top >= sublen) sublen  else topvar i=until - 1var continue=truewhile(continue){swap(seq, from, i)if (i > (until - toplen)) {heapify(f)(seq, from, i, desc, from)i -= 1}else continue=false}(i,until)}  private def buildHeapRange[A, B](f: A => B)(seq: mutable.Seq[A],desc:Boolean)(from:Int,until:Int)(implicit ord: Ordering[B]) = {var i=from+((until-from) >>> 1) - 1while(i>=from){heapify(f)(seq, i, until,desc,from)i-=1}}def cmp1_gt [A, B](f: A => B)(l: A, r: A)(implicit ord: Ordering[B]) = ord.gt(f(l), f(r))def cmp1_lt [A, B](f: A => B)(l: A, r: A)(implicit ord: Ordering[B]) = ord.lt(f(l), f(r))def cmp_gt [A, B](f: A => B,seq: mutable.Seq[A])(l: Int, r: Int)(implicit ord: Ordering[B]) = cmp1_gt(f)(seq(l),seq(r))def cmp_lt [A, B](f: A => B,seq: mutable.Seq[A])(l: Int, r: Int)(implicit ord: Ordering[B]) = cmp1_lt(f)(seq(l),seq(r))private def heapify[A, B](f: A => B)(seq: mutable.Seq[A], startpos: Int, max: Int, desc: Boolean,off:Int)(implicit ord: Ordering[B]): Unit = {def gt = (l: Int, r: Int) => cmp_gt(f, seq)(l, r)def lt = (l: Int, r: Int) => cmp_lt(f, seq)(l, r)val cmp = if (desc) gt  else lt var largest = 0var idx = startposvar right=0 var left =0do {right = off+((idx-off + 1) << 1)left = right - 1largest = if (left < max && cmp(left, idx)) left else idxif (right < max && cmp(right, largest)) largest = rightif (largest != idx) {swap(seq, largest, idx)idx = largest} else return} while (true)}private def swap[A](seq: mutable.Seq[A], i: Int, j: Int) = {val temp = seq(i)seq(i) = seq(j)seq(j) = temp}private def swap3[A](seq: mutable.Seq[A], i: Int, j: Int,k:Int) = {val temp = seq(i)seq(i) = seq(j)seq(j) = seq(k)seq(k) = temp}
//  private def _duplicateSeq[A](src: collection.Seq[A], srcPos: Int, dest: mutable.Seq[A], destPos: Int, length: Int): mutable.Seq[A] = {//    for (i <- 0 until length) dest(destPos + i) = src(srcPos + i)
//    dest
//  }private def _duplicateSeq[A](src: collection.Seq[A], srcPos: Int, dest: mutable.Seq[A], destPos: Int, length: Int): mutable.Seq[A] = {var i=0while(i<length){dest(destPos + i) = src(srcPos + i)i+=1}dest}def merge2Seq[A, B](f: A => B)(seq: mutable.Seq[A], src: (Int, Int), dst: (Int, Int), desc: Boolean)(implicit ord: Ordering[B]): (Int, Int) = {if (!(if (desc) cmp_gt(f, seq)(dst._1, src._2 - 1) else cmp_lt(f, seq)(dst._1, src._2 - 1))) {if (if (desc) cmp_gt(f, seq)(src._1, dst._2 - 1) else cmp_lt(f, seq)(src._1, dst._2 - 1)) {val (srclen, dstlen) = ((src._2 - src._1), (dst._2 - dst._1))val cplen = math.min(srclen, dstlen)_duplicateSeq(seq, dst._1 + cplen, seq, dst._1, dstlen - cplen)_duplicateSeq(seq, src._2 - cplen, seq, dst._2 - cplen, cplen)} else {val q = mutable.Queue[A]()def gt = (r: Int) => cmp1_gt(f)(seq(r), q.head)def lt = (r: Int) => cmp1_lt(f)(seq(r), q.head)val cmpdst = if (desc) gt else ltvar (topsrc, idx) = (src._2 - 1, dst._2 - 1)while (idx >= dst._1) {q.enqueue(seq(idx))if (cmpdst(topsrc)) {seq(idx) = seq(topsrc)topsrc -= 1} elseseq(idx) = q.dequeue()idx -= 1}while (idx >= dst._1){seq(idx) = q.dequeue()idx -= 1}}}dst}def merge2Seq2[A, B](f: A => B)(seq: mutable.Seq[A], src: (Int, Int), dst: (Int, Int), desc: Boolean)(implicit ord: Ordering[B]): (Int, Int) = {if (!(if (desc) cmp_gt(f, seq)(dst._1, src._2 - 1) else cmp_lt(f, seq)(dst._1, src._2 - 1))) {if (if (desc) cmp_gt(f, seq)(src._1, dst._2 - 1) else cmp_lt(f, seq)(src._1, dst._2 - 1)) {val (srclen, dstlen) = ((src._2 - src._1), (dst._2 - dst._1))val cplen = math.min(srclen, dstlen)_duplicateSeq(seq, dst._1 + cplen, seq, dst._1, dstlen - cplen)_duplicateSeq(seq, src._2 - cplen, seq, dst._2 - cplen, cplen)} else {val q = seq.slice(dst._1, dst._2)def gt = (l: Int,r:Int) => cmp1_gt(f)(seq(l), q(r))def lt = (l: Int,r:Int) => cmp1_lt(f)(seq(l), q(r))val cmpdst = if (desc) gt else ltvar (topdst,topsrc, idx) = (q.length-1,src._2 - 1, dst._2 - 1)while (idx >= dst._1&&topsrc>=src._1) {if (cmpdst(topsrc,topdst)) {seq(idx) = seq(topsrc)topsrc -= 1} else{seq(idx) = q(topdst)topdst -= 1}idx -= 1}if(idx>=dst._1)_duplicateSeq(q, topdst-(idx-dst._1), seq, dst._1, idx-dst._1+1)}}dst}def merge2SeqNM[A, B](f: A => B)(seq: mutable.Seq[A], src: (Int, Int), dst: (Int, Int), desc: Boolean)(implicit ord: Ordering[B]): (Int, Int) = {if (!(if (desc) cmp_gt(f, seq)(dst._1, src._2 - 1) else cmp_lt(f, seq)(dst._1, src._2 - 1))) {if (if (desc) cmp_gt(f, seq)(src._1, dst._2 - 1) else cmp_lt(f, seq)(src._1, dst._2 - 1)) {val (srclen, dstlen) = ((src._2 - src._1), (dst._2 - dst._1))val cplen = math.min(srclen, dstlen)_duplicateSeq(seq, dst._1 + cplen, seq, dst._1, dstlen - cplen)_duplicateSeq(seq, src._2 - cplen, seq, dst._2 - cplen, cplen)} else {var (idx,qbf,qbt,qh)=(dst._2-1,dst._2-1,dst._2-1,dst._2-1)var st=src._2-1var swapst=()=>{}var swapqh=()=>{}def gt = (l: Int) => cmp_gt(f, seq)(l, qh)def lt = (l: Int) => cmp_lt(f, seq)(l, qh)val cmpdst = if (desc) gt else ltdef swaptop(top: Int) = {val temp = seq(idx)seq(idx) = seq(top)seq(top) = temp}def getql=()=>qbf+(qh-qbf+1)%(qbt-qbf+1)def nextqh=()=>qbt-(qbt-qh+1)%(qbt-qbf+1)
//      def moveStep(from: Int, to: Int, step: Int) =for (i <- (if (step > 0) (from to to).reverse else (from to to))) seq(i + step) = seq(i)def moveStep(from: Int, to: Int, step: Int) = {var i = if (step > 0) to else fromdef upf() = i >= fromdef dnt() = i <= toval (s, c) = if (step > 0) (-1, upf _) else (1, dnt _)while (c()) {seq(i + step) = seq(i)i += s}}def swapLeft(from:Int,to:Int)={val tmp=seq(from-1)moveStep(from,to,-1)seq(to)=tmp}def swapRight(from:Int,to:Int)={val tmp=seq(to+1)moveStep(from,to,1)seq(from)=tmp}def swapStTail() = {swaptop(st)val ql = getql()if (ql > qbf)if (qh - qbf > qbt - ql) {swap(seq, st, qbt)swapRight(ql, qbt - 1)qbf = st} else {swapLeft(qbf, qh)qbf = stqh = nextqh()}else{qbf=st}}def swapStHead() = {swaptop(st)swapst = swapStTail swapqh = swapQhEnable  qh = stqbf = stqbt = st}def swapQhDisable() = {qbf -= 1qbt -= 1qh -= 1}def swapQhEnable() = {swaptop(qh)qh = nextqh()}swapst = swapStHead swapqh = swapQhDisable while (idx >= dst._1 && st >= src._1) {if (cmpdst(st)) {swapst()st -= 1} else swapqh()idx -= 1}if (idx >= dst._1) {val ql = getql()_duplicateSeq(seq, ql, seq, dst._1, qbt - ql + 1)_duplicateSeq(seq, qbf, seq, dst._1 + qbt - ql + 1, ql - qbf)}}}dst}private val processors=Runtime.getRuntime.availableProcessors()//获取cpu核心数def top_mutable_par[A, B, M <: mutable.Seq[A]](f: A => B)(seq: M, top: Int, desc: Boolean = true)(implicit ord: Ordering[B]) = {
//根据cpu核心数对要排序的数据分段  val step = (seq.length+(processors)-1) / (processors)
//以并行方式对每一段数据进行排序    val rangs = for (i <- (0 until (seq.length + step - 1) / step).par) yield { sort_mutableRange(f)(seq, top)(i * step, math.min(seq.length, (i + 1) * step)) }def merge = (left: (Int, Int), right: (Int, Int)) => if ((right._2 - right._1) > (left._2 - left._1)) merge2SeqNM(f)(seq, left, right, desc) else merge2SeqNM(f)(seq, right, left, desc)
//调用用reduce对分段排序后的结果进行合并val r = rangs.reduce(merge(_, _))
//返回排序结果(需要反序)seq.slice(r._1, r._2).reverse}  def main(args: Array[String]) {
//测试代码val m = new HeapSort[Int, Int, mutable.Buffer[Int]]((w: Int) => w)
println(Array(7,11,9,17,15,21,8,30,14,0,12,15,55,2,3,18,22,23,4).aggregate(List[Int]())(m.seqop, m.combop).toString())val rnd=new java.util.Random()val l=new Array[Int](40)for(i<-0 until 5){l(i)=rnd.nextInt(100)}for(i<-5 until l.length){l(i)=rnd.nextInt(100)}for (i <- 0 to 0) {println("==============time ", i,"=================")val s=l.toBuffer[Int]println(s)val t1 = System.currentTimeMillisval r1 = m.sort_range(s, 10, true)(0,5)val r2 = m.sort_range(s, 10, true)(5,40)val t2 = System.currentTimeMillisprintf("sort time cost:%f seconds(%d mills) used\n", (t2 - t1) / 1024D, t2 - t1)for(i<-(r1._1 until r1._2)){print(s(i)+",")}println(r1)for(i<-(r2._1 until r2._2)){print(s(i)+",")}println(r2)m.merge2Seq2(s, r1, r2)for(i<-(r2._1 until r2._2).reverse){print(s(i)+",")}println(r2)}}
}

用scala语言实现并行堆排序(top k)相关推荐

  1. 堆排序——top k问题

    问题:现在有n个数,设计算法得到前k大的数.(k<n) 解决思路: 1. 排序后切片 O(nlog n) 2. 排序LowB三人组 O(kn) 3. 堆排序思路 O(nlog k) 堆排序解决: ...

  2. 二叉树(三)——堆排序优化、TOP K问题

    一.堆排序优化      上一讲采用先独立建小堆,再把数组元素全部放入小堆中,最后从小堆弹出所有元素实现堆排序,固然达到了O(NlogN)的时间复杂度,但空间复杂度为O(N),值得优化. 优化堆排序的 ...

  3. Apache Spark学习:利用Scala语言开发Spark应用程序

    Spark内核是由Scala语言开发的,因此使用Scala语言开发Spark应用程序是自然而然的事情.如果你对Scala语言还不太熟悉,可以阅读网络教程 A Scala Tutorial for Ja ...

  4. 13:Scala语言的数据结构和算法

    第十九章 Scala语言的数据结构和算法 19.1 数据结构(算法)的介绍 数据结构的介绍   1.数据结构是一门研究算法的学科,只从有了编程语言也就有了数据结构.学好数据结构可以编写出更加漂亮.更加 ...

  5. Spark开发语言Scala语言

    Spark内核是由Scala语言开发的,因此使用Scala语言开发Spark应用程序是自然而然的事情.如果你对Scala语言还不太熟悉,可以阅读网络教程A Scala Tutorial for Jav ...

  6. 数据结构源码笔记(C语言):堆排序

    //实现堆排序算法#include<stdio.h> #include<malloc.h> #define MAXE 20typedef int KeyType; typede ...

  7. 【LeetCode 剑指offer刷题】查找与排序题12:Top K Frequent Elements

    [LeetCode & 剑指offer 刷题笔记]目录(持续更新中...) Top K Frequent Elements Given a non-empty array of integer ...

  8. 程序员编程艺术:第三章续、Top K算法问题的实现

    程序员编程艺术:第三章续.Top K算法问题的实现 作者:July,zhouzhenren,yansha.     致谢:微软100题实现组,狂想曲创作组.     时间:2011年05月08日    ...

  9. 海量数据中找top K专题

    1. 10亿个数中找出最大的1000个数 这种题目就是分治+堆排序. 为啥分治?因为数太多了,全部加载进内存不够用,所以分配到多台机器中,或者多个文件中,但具体分成多少份,视情况而定,只要保证满足内存 ...

最新文章

  1. 基于Python——实现远程下载sftp文件(只下载.zip文件)
  2. java中rpn_java – RPNCalculator代码混淆
  3. 学习笔记Kafka(四)—— Kafka安装配置(3)—— Kafka多代理配置及常用操作
  4. 《jQuery、jQuery UI及jQuery Mobile技巧与示例》——9.17 技巧:使用多个列
  5. 多线程:实现Callable接口
  6. MariaDB 10 Slave Crash-Safe需转为GTID复制模式
  7. spring扩展点之二:spring中关于bean初始化、销毁等使用汇总,ApplicationContextAware将ApplicationContext注入...
  8. 什么是线程单线程和多线程_什么是多线程?看我多线程七十二变,你能记住吗?...
  9. 计算机算法知识点总结,2021计算机考研知识点总结(1)
  10. 当CodeSmith不在时……
  11. Nancy之从403到错误处理
  12. R语言Error in model.frame.default(formula = y ~ ., data = exam.data, drop.unused.levels = TRUE) :
  13. mysql1526_MySQL数据库插入数据出现 ERROR 1526 (HY000): Table has no partition for value xxx
  14. 讲座笔记 | 陆铭 城市、区域和国家发展:空间政治经济学的理论和实证研究
  15. Ubuntu查看Cuda是否全部安装成功
  16. Linux 驱动简介
  17. 跨源通信--postMessage用法
  18. redis的安装步骤
  19. PPT设置自动保存时间 mac_CAD小技巧,教你设置AutoCAD自动保存,以防意外导致CAD图纸白做...
  20. 利用VARNA来画tRNA的二级结构

热门文章

  1. 求正数a的平方根的迭代公式为:xn+1=(xn+a/xn)/2。建立一个类SQRT,用来求某正数平方根的近似值。具体要求如下:
  2. 计算机应用与jaj,基于J2EE银鸽集团ERP系统的设计与应用-计算机应用技术专业论文.docx...
  3. OpenCV—python 发票印章去除或分割(图像内容识别预处理)
  4. 【HTML】HTML首页---拼多多首页界面-网易首页界面
  5. 如何使用iPhone生存僵尸启示录
  6. (原创)基于matlab和c++混合实现的SUSAN特征检测
  7. SUSAN USAN C++ OPENCV
  8. python实现word文档批量转成自定义格式的excel文档
  9. VMware设置静态ip地址及不同网络模式讲解【Linux网络问题】
  10. 传统特征点检测器的检测特征点和匹配流程