1.启动

  启动HDFS

  启动spark的local模式./spark-shell

2.知识点

 textFile:

  def textFile(    path: String,    minPartitions: Int = defaultMinPartitions): RDD[String]

 Filter: 

  Return a new RDD containing only the elements that satisfy a predicate.

  def filter(f: T => Boolean): RDD[T],返回里面判断是true的RDD。

 map:

  Return a new RDD by applying a function to all elements of this RDD.
 def map[U: ClassTag](f: T => U): RDD[U],从T到U类型的一个数据转换函数,最终返回的RDD中的数据类型是f函数返回的数据类型

 flatMap:

    Return a new RDD by first applying a function to all elements of thisRDD, and then flattening the results.
    def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]  从T到集合类型的数据类型转换,集合中的数据类型是U,最终返回的RDD数据类型是f函数返回的集合中的具体的类型数据。

3.编写基础的wordcount程序
 1 //读取文件
 2 val rdd=sc.textFile("wc/input/wc.input")
 3 //过滤数据
 4 val filterRdd=rdd.filter(len=>len.length>0)
 5 //数据转换
 6 val flatMapRdd=filterRdd.flatMap(line=>line.split(" ")
 7     .map(word=>(word,1)))
 8 //分组
 9 val groupByRdd=flatMapRdd.groupBy(tuple=>tuple._1)
10 //聚合
11 val wordCount=groupByRdd.map(tuple=>{
12     val word=tuple._1
13     val sum=tuple._2.toList.foldLeft(0)((a,b)=>a+b._2)
14     (word,sum)
15 })
16 //输出
17 wordCount.foreach(println)             //控制台上的输出
18 wordCount.saveAsTextFile("wc/output6") //HDFS上的输出

4.简化代码(链式编程)

 1 sc.textFile("wc/input/wc.input").
 2 //数据过滤
 3 filter(_.length>0).
 4 //数据转换
 5 flatMap(_.split(" ").map((_,1))).
 6 //分组
 7 groupByKey().
 8 //统计
 9 map(tuple=>(tuple._1,tuple._2.toList.sum)).
10 //输出
11 saveAsTextFile("wc/output7")

5.最优化程序

  reduceByKey存在combiner。

  groupBy在大数据量的情况下,会出现OOM

1 sc.textFile("wc/input/wc.input").
2 //数据过滤
3 filter(_.length>0).
4 //数据转换
5 flatMap(_.split(" ").map((_,1))).
6 //统计
7 reduceByKey(_+_).
8 //输出
9 saveAsTextFile("wc/output8")

6.显示结果

1 sc.textFile("wc/input/wc.input").
2 //数据过滤
3 filter(_.length>0).
4 //数据转换
5 flatMap(_.split(" ").map((_,1))).
6 //统计
7 reduceByKey(_+_).
8 collect()

7.排序(第二个数,从大到小)

 1 sc.textFile("wc/input/wc.input").
 2 //数据过滤
 3 filter(_.length>0).
 4 //数据转换
 5 flatMap(_.split(" ").map((_,1))).
 6 //统计
 7 reduceByKey(_+_).
 8 //排序
 9 sortBy(tuple=>tuple._2,ascending=false).
10 collect()

8.TopK(方式一)

 1 sc.textFile("wc/input/wc.input").
 2 //数据过滤
 3 filter(_.length>0).
 4 //数据转换
 5 flatMap(_.split(" ").map((_,1))).
 6 //统计
 7 reduceByKey(_+_).
 8 //排序
 9 sortBy(tuple=>tuple._2,ascending=false).
10 take(4)

9.TopK(方式二,自定义)

 1 sc.textFile("wc/input/wc.input").
 2 //数据过滤
 3 filter(_.length>0).
 4 //数据转换
 5 flatMap(_.split(" ").map((_,1))).
 6 //统计
 7 reduceByKey(_+_).
 8 //排序
 9 sortBy(tuple=>tuple._2,ascending=false).
10 top(3)(new scala.math.Ordering[(String,Int)](){
11     override def compare(x:(String,Int),y:(String,Int))={
12         val tmp=x._2.compare(y._2)
13         if(tmp!=0) tmp
14         else x._1.compare(x._1)
15     }
16     })

006 Spark中的wordcount以及TopK的程序编写相关推荐

  1. spark 中的RDD编程:基于Java api

    1.RDD介绍: RDD,弹性分布式数据集,即分布式的元素集合.在spark中,对所有数据的操作不外乎是创建RDD.转化已有的RDD以及调用RDD操作进行求值.在这一切的背后,Spark会自动将RDD ...

  2. scala打印服务器消息,Spark中使用Scala实现WordCount业务

    Spark中使用Scala实现WordCount业务 创建一个Project sbt选择1.0.4 Scala选择2.11.8 配置路径 Project Sources Dependencies 新建 ...

  3. Spark初步 从wordcount开始

    Spark初步-从wordcount开始 spark中自带的example,有一个wordcount例子,我们逐步分析wordcount代码,开始我们的spark之旅. 准备工作 把README.md ...

  4. spark中repartition, coalesce, partitionBy, repartitionAndSortWithinPartitions 四种重分区算子

    美图欣赏: 一.背景 spark中一共有四种重分区算子: 1.repartition 2.coalesce 3.partitionBy 4.repartitionAndSortWithinPartit ...

  5. Spark中mapToPair和flatMapToPair的区别【附示例源码及运行结果】

    本文重点介绍 Spark 中 [mapToPair]和[flatMapToPair]的区别,请继续看到尾部,后续有示例说明,会理解更加清晰. 函数原型 1.JavaPairRDD<K2,V2&g ...

  6. Spark中CheckPoint、Cache、Persist的用法、区别

    Spark中CheckPoint.Cache.Persist 大家好,我是一拳就能打爆A柱的猛男 这几天看到一套视频<尚硅谷2021迎新版大数据Spark从入门到精通>,其中有关于检查点( ...

  7. spark 获取广播变量_Spark流式程序中广播变量和累加器为何使用单例模式

    Spark中广播变量详解以及如何动态更新广播变量​mp.weixin.qq.com 1.广播变量是只读的,使用单例模式可以减少Spark流式程序中每次job生成执行,频繁创建广播变量带来的开销 2. ...

  8. Spark中的内存计算是什么?

    由于计算的融合只发生在 Stages 内部,而 Shuffle 是切割 Stages 的边界,因此一旦发生 Shuffle,内存计算的代码融合就会中断. 在 Spark 中,内存计算有两层含义: 第一 ...

  9. Java查询spark中生成的文件_java+spark-sql查询excel

    Spark官网下载Spark 下载Windows下Hadoop所需文件winutils.exe 同学们自己网上找找吧,这里就不上传了,其实该文件可有可无,报错也不影响Spark运行,强迫症可以下载,本 ...

最新文章

  1. K8S集群搭建:利用kubeadm构建K8S集群
  2. margin负值布局(一)
  3. (四)boost库之正则表达式regex
  4. javaweb简单的登录增删改查系统_利用python操作小程序云数据库实现简单的增删改查!
  5. github 创建文件夹
  6. NOI入门级:数据结构之线性表
  7. python描述器descriptor_Python 黑魔法 --- 描述器(descriptor)
  8. MTK 驱动(38)---MTK 待机问题分析
  9. 【8】测试用例设计-边界值法
  10. JUC与JVM并发编程学习笔记03
  11. Vue学习笔记(利用网易云API实现音乐播放器 实例)
  12. mobi 转 pdf mobi格式转pdf格式 ePub azw3
  13. 在 SSM 中使用 Ajax 进行数据传递
  14. 中奖率的三种常用算法
  15. 【Spring Data ElasticSearch】高级查询,聚合
  16. Day124.分布式事务:Seata、2PC两段式、代码补偿TCC、本地消息表、MQ事物消息
  17. 安卓期末大作业——猫咪社区(源码+任务书)
  18. css文件插入背景音乐,关注css背景音乐代码
  19. 关于开发微信公众号获取手机用户运动数据的功能实现思路
  20. 决策树(Decision Tree)算法原理总结(二)

热门文章

  1. MySQL太细碎了,我硬生生捋出了一条核心大主线!
  2. Spring Boot+JWT+Shiro+MyBatisPlus 实现 RESTful 快速开发后端脚手架
  3. ArrayList集合为什么不能使用foreach增加、删除、修改元素
  4. 太胖就会变秃,这回真的有依据了!日本学者发现肥胖会诱导毛囊衰竭 | Nature...
  5. 重大布局!北京大学,落子上海!
  6. CVPR 2021 Oral | Transformer再发力!华南理工和微信提出UP-DETR
  7. 研究生扩招,数据发现清华北大本科生毕生后几乎没人找工作!
  8. 600页!分享珍藏很久的《推荐系统学习手册》(附下载链接及入门经验)
  9. 模型的跨界:我拿Transformer去做目标检测,结果发现效果不错
  10. 中国式姥姥上热搜感动无数人:有妈妈在,我才敢生娃