006 Spark中的wordcount以及TopK的程序编写
1.启动
启动HDFS
启动spark的local模式./spark-shell
2.知识点
textFile:
def textFile( path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
Filter:
Return a new RDD containing only the elements that satisfy a predicate.
def filter(f: T => Boolean): RDD[T],返回里面判断是true的RDD。
map:
Return a new RDD by applying a function to all elements of this RDD.
def map[U: ClassTag](f: T => U): RDD[U],从T到U类型的一个数据转换函数,最终返回的RDD中的数据类型是f函数返回的数据类型
flatMap:
Return a new RDD by first applying a function to all elements of thisRDD, and then flattening the results.
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] 从T到集合类型的数据类型转换,集合中的数据类型是U,最终返回的RDD数据类型是f函数返回的集合中的具体的类型数据。 3.编写基础的wordcount程序
1 //读取文件 2 val rdd=sc.textFile("wc/input/wc.input") 3 //过滤数据 4 val filterRdd=rdd.filter(len=>len.length>0) 5 //数据转换 6 val flatMapRdd=filterRdd.flatMap(line=>line.split(" ") 7 .map(word=>(word,1))) 8 //分组 9 val groupByRdd=flatMapRdd.groupBy(tuple=>tuple._1) 10 //聚合 11 val wordCount=groupByRdd.map(tuple=>{ 12 val word=tuple._1 13 val sum=tuple._2.toList.foldLeft(0)((a,b)=>a+b._2) 14 (word,sum) 15 }) 16 //输出 17 wordCount.foreach(println) //控制台上的输出 18 wordCount.saveAsTextFile("wc/output6") //HDFS上的输出
4.简化代码(链式编程)
1 sc.textFile("wc/input/wc.input"). 2 //数据过滤 3 filter(_.length>0). 4 //数据转换 5 flatMap(_.split(" ").map((_,1))). 6 //分组 7 groupByKey(). 8 //统计 9 map(tuple=>(tuple._1,tuple._2.toList.sum)). 10 //输出 11 saveAsTextFile("wc/output7")
5.最优化程序
reduceByKey存在combiner。
groupBy在大数据量的情况下,会出现OOM
1 sc.textFile("wc/input/wc.input"). 2 //数据过滤 3 filter(_.length>0). 4 //数据转换 5 flatMap(_.split(" ").map((_,1))). 6 //统计 7 reduceByKey(_+_). 8 //输出 9 saveAsTextFile("wc/output8")
6.显示结果
1 sc.textFile("wc/input/wc.input"). 2 //数据过滤 3 filter(_.length>0). 4 //数据转换 5 flatMap(_.split(" ").map((_,1))). 6 //统计 7 reduceByKey(_+_). 8 collect()
7.排序(第二个数,从大到小)
1 sc.textFile("wc/input/wc.input"). 2 //数据过滤 3 filter(_.length>0). 4 //数据转换 5 flatMap(_.split(" ").map((_,1))). 6 //统计 7 reduceByKey(_+_). 8 //排序 9 sortBy(tuple=>tuple._2,ascending=false). 10 collect()
8.TopK(方式一)
1 sc.textFile("wc/input/wc.input"). 2 //数据过滤 3 filter(_.length>0). 4 //数据转换 5 flatMap(_.split(" ").map((_,1))). 6 //统计 7 reduceByKey(_+_). 8 //排序 9 sortBy(tuple=>tuple._2,ascending=false). 10 take(4)
9.TopK(方式二,自定义)
1 sc.textFile("wc/input/wc.input"). 2 //数据过滤 3 filter(_.length>0). 4 //数据转换 5 flatMap(_.split(" ").map((_,1))). 6 //统计 7 reduceByKey(_+_). 8 //排序 9 sortBy(tuple=>tuple._2,ascending=false). 10 top(3)(new scala.math.Ordering[(String,Int)](){ 11 override def compare(x:(String,Int),y:(String,Int))={ 12 val tmp=x._2.compare(y._2) 13 if(tmp!=0) tmp 14 else x._1.compare(x._1) 15 } 16 })
006 Spark中的wordcount以及TopK的程序编写相关推荐
- spark 中的RDD编程:基于Java api
1.RDD介绍: RDD,弹性分布式数据集,即分布式的元素集合.在spark中,对所有数据的操作不外乎是创建RDD.转化已有的RDD以及调用RDD操作进行求值.在这一切的背后,Spark会自动将RDD ...
- scala打印服务器消息,Spark中使用Scala实现WordCount业务
Spark中使用Scala实现WordCount业务 创建一个Project sbt选择1.0.4 Scala选择2.11.8 配置路径 Project Sources Dependencies 新建 ...
- Spark初步 从wordcount开始
Spark初步-从wordcount开始 spark中自带的example,有一个wordcount例子,我们逐步分析wordcount代码,开始我们的spark之旅. 准备工作 把README.md ...
- spark中repartition, coalesce, partitionBy, repartitionAndSortWithinPartitions 四种重分区算子
美图欣赏: 一.背景 spark中一共有四种重分区算子: 1.repartition 2.coalesce 3.partitionBy 4.repartitionAndSortWithinPartit ...
- Spark中mapToPair和flatMapToPair的区别【附示例源码及运行结果】
本文重点介绍 Spark 中 [mapToPair]和[flatMapToPair]的区别,请继续看到尾部,后续有示例说明,会理解更加清晰. 函数原型 1.JavaPairRDD<K2,V2&g ...
- Spark中CheckPoint、Cache、Persist的用法、区别
Spark中CheckPoint.Cache.Persist 大家好,我是一拳就能打爆A柱的猛男 这几天看到一套视频<尚硅谷2021迎新版大数据Spark从入门到精通>,其中有关于检查点( ...
- spark 获取广播变量_Spark流式程序中广播变量和累加器为何使用单例模式
Spark中广播变量详解以及如何动态更新广播变量mp.weixin.qq.com 1.广播变量是只读的,使用单例模式可以减少Spark流式程序中每次job生成执行,频繁创建广播变量带来的开销 2. ...
- Spark中的内存计算是什么?
由于计算的融合只发生在 Stages 内部,而 Shuffle 是切割 Stages 的边界,因此一旦发生 Shuffle,内存计算的代码融合就会中断. 在 Spark 中,内存计算有两层含义: 第一 ...
- Java查询spark中生成的文件_java+spark-sql查询excel
Spark官网下载Spark 下载Windows下Hadoop所需文件winutils.exe 同学们自己网上找找吧,这里就不上传了,其实该文件可有可无,报错也不影响Spark运行,强迫症可以下载,本 ...
最新文章
- K8S集群搭建:利用kubeadm构建K8S集群
- margin负值布局(一)
- (四)boost库之正则表达式regex
- javaweb简单的登录增删改查系统_利用python操作小程序云数据库实现简单的增删改查!
- github 创建文件夹
- NOI入门级:数据结构之线性表
- python描述器descriptor_Python 黑魔法 --- 描述器(descriptor)
- MTK 驱动(38)---MTK 待机问题分析
- 【8】测试用例设计-边界值法
- JUC与JVM并发编程学习笔记03
- Vue学习笔记(利用网易云API实现音乐播放器 实例)
- mobi 转 pdf mobi格式转pdf格式 ePub azw3
- 在 SSM 中使用 Ajax 进行数据传递
- 中奖率的三种常用算法
- 【Spring Data ElasticSearch】高级查询,聚合
- Day124.分布式事务:Seata、2PC两段式、代码补偿TCC、本地消息表、MQ事物消息
- 安卓期末大作业——猫咪社区(源码+任务书)
- css文件插入背景音乐,关注css背景音乐代码
- 关于开发微信公众号获取手机用户运动数据的功能实现思路
- 决策树(Decision Tree)算法原理总结(二)
热门文章
- MySQL太细碎了,我硬生生捋出了一条核心大主线!
- Spring Boot+JWT+Shiro+MyBatisPlus 实现 RESTful 快速开发后端脚手架
- ArrayList集合为什么不能使用foreach增加、删除、修改元素
- 太胖就会变秃,这回真的有依据了!日本学者发现肥胖会诱导毛囊衰竭 | Nature...
- 重大布局!北京大学,落子上海!
- CVPR 2021 Oral | Transformer再发力!华南理工和微信提出UP-DETR
- 研究生扩招,数据发现清华北大本科生毕生后几乎没人找工作!
- 600页!分享珍藏很久的《推荐系统学习手册》(附下载链接及入门经验)
- 模型的跨界:我拿Transformer去做目标检测,结果发现效果不错
- 中国式姥姥上热搜感动无数人:有妈妈在,我才敢生娃