Spark算子--Scala版本 educoder
第1关:转换算子之map和distinct算子
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object EduCoder1 {def main(args: Array[String]): Unit = {val conf =new SparkConf().setAppName("educoder1").setMaster("local")val sc=new SparkContext(conf)val rdd = sc.parallelize(List("dog","an","cat","an","cat"))/********** Begin **********///第一步:通过获取rdd中每个元素的长度创建新的rdd1val rdd1=rdd.map(x=>x.length)//第二步:通过zip把rdd1和rdd组合创建rdd2val rdd2=rdd.zip(rdd1)//第三步:去重val rdd3=rdd2.distinct()//第四步:输出结果rdd3.foreach(println) /********** End **********/sc.stop()}}
第2关:转换算子之flatMap和filter算子
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object EduCoder2 {def main(args: Array[String]): Unit = {val conf =new SparkConf().setAppName("educoder2").setMaster("local")val sc=new SparkContext(conf)val rdd=sc.textFile("file:///root/step3_fils")/********** Begin **********///第一步:对所给数据创建的rdd切割分词val rdd1=rdd.flatMap(t=>t.split(","))//第二步:每个单词计数为1val rdd2= rdd1.map(t=>(t,1))//第三步:对相同单词个数进行累加val rdd3=rdd2.reduceByKey(_+_)//第四步:过滤出单词个数大于一个的val rdd4= rdd3.filter(t=>t._2>1)//第五步:输出结果rdd4.foreach(println)/********** End **********/sc.stop()}}
第3关:转换算子之reduceBykey和mapValues算子
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object EduCoder3 {def main(args: Array[String]): Unit = {val conf =new SparkConf().setAppName("educoder3").setMaster("local")val sc=new SparkContext(conf)/********** Begin **********///通过给定数据通过序列化方式创建rddval rdd = sc.parallelize(List(("spark",(10,50)),("hadoop",(5,40)),("hadoop",(10,25)),("spark",(40,25))))//求出一天收入总和以及出售本数val rdd2 = rdd.reduceByKey((x,y) => ((x._1*x._2)+(y._1*y._2), x._1+y._1))//求出每本平均售价val rdd3 = rdd2.mapValues(x => x._1 / x._2)//输出结果rdd3.foreach(println)/********** End **********/sc.stop}
}
第4关:转化算子之groupByKey和sortByKey
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object EduCoder4 {def main(args: Array[String]): Unit = {val conf =new SparkConf().setAppName("educoder4").setMaster("local")val sc=new SparkContext(conf)val rdd = sc.parallelize(List(("Bob","spark"),("Lily","hadoop"),("Candy","hive"),("Bob","hbase"),("Bob","hive")))/********** Begin **********/val rdd1= rdd.groupByKey()//求出每个人的书籍本数val rdd2= rdd1.mapValues(t=>t.toList.size)//根据姓名排序val rdd3= rdd2.sortByKey()//打印结果rdd3.foreach(println)/********** End **********/sc.stop()}}
第5关:常见行动算子
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object EduCoder5 {def main(args: Array[String]): Unit = {val conf =new SparkConf().setAppName("educoder5").setMaster("local")val sc=new SparkContext(conf)val rdd = sc.parallelize(List("dog","sun","an","cat"))/********** Begin **********///返回所给rdd的元素个数并输出val r1=rdd.count()println(r1)//返回rdd的前三个元素并输出val rdd1=rdd.take(3)rdd1.foreach(println)//累加rdd的所有元素并输出结果val r2=rdd.reduce(_+_)println(r2)//收集所有元素并且输出rdd.collect().foreach(println)/********** End **********/sc.stop()}}
第6关:算子的综合使用案例
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object EduCoder {
def main(args: Array[String]): Unit = {
val conf =new SparkConf().setAppName("educoder").setMaster("local")
val sc=new SparkContext(conf)
val line=sc.textFile("file:///root/step1_fils")
/********** Begin **********/
//根据需求,去除城市字段
val rdd1 = line.map(t => {
val arr = t.split(",")
((arr(0), arr(1), arr(2), arr(3)), arr(5))
})
//按key分组,key是除城市字段和版本号字段``以外的所有字段,value是版本号
val rdd2=rdd1.groupByKey()
//过滤版本号重复的``(例:(v2.0,v2.0))以及版本号只有一个的(例(v1.0))
val rdd3=rdd2.mapValues(t=>t.toList.distinct).filter(t=>t._2.length>1)
//拆分重新组合 例:(key,(v2.0,v2.5,v3.0))拆分成(key,(v2.0,v2.5),((key,(v2.5,v3.0)))
val rdd4= rdd3.mapValues(t => {
val tai = t.tail
t.zip(tai)
})
//按需求整理输出格式(例:(2017-08-14,Lily,Facebook,360 Shop,v1.2,v2.0))
val rdd5= rdd4.flatMap(t => {
t._2.map(tp => {
(t._1._1, t._1._2, t._1._3, t._1._4, tp._1, tp._2)
})
})
//执行foreach操作,打印出结果
rdd5.foreach(println)
/********** End **********/
sc.stop()
}
}
Spark算子--Scala版本 educoder相关推荐
- 头歌educoder Spark算子--Scala版本 实训答案
第1关:转换算子之map和distinct算子 import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkCon ...
- Educoder中Spark算子--java版本
第1关:QueueStream 编程要求 在右侧编辑器补充代码,完成以下需求: 将时间戳转换成规定格式的时间形式(格式为:yyyy-MM-dd HH:mm:ss ) 提取数据中的起始URL(切割符为空 ...
- 曹洁 Spark编程Scala版本课后习题答案
第二章课后习题 2-1 选D D项错在元组可以定义多个元素,通过(x,x,x,-) 2-2 选A A错在 元组的访问方式是 x._1来访问元组中的第一个元素.注意是从下标1开始的,数组是从下标0开始的 ...
- spark 算子例子_Spark性能调优方法
公众号后台回复关键词:pyspark,获取本项目github地址. Spark程序可以快如闪电⚡️,也可以慢如蜗牛?. 它的性能取决于用户使用它的方式. 一般来说,如果有可能,用户应当尽可能多地使用S ...
- spark(scala) shell 里面输入多行代码
spark(scala版本) shell 里面输入多行代码 但如果出现这种情况(如下图所示) 建议大家还是在linux系统中,利用脚本实现代码的运行.具体步骤如下: 进入Linux系统终端,输入相关代 ...
- 查看spark版本linux,如何查看spark版本和scala版本
如何查看spark版本和scala版本 发布时间:2018-11-21 05:48, 浏览次数:2445 , 标签: spark scala 1.进入命令行状态 windows 电脑 方法一: 在系统 ...
- 大数据Spark入门案例5–统计广告点击数量排行Top3(scala版本)
大数据Spark入门案例5–统计每广告点击数量排行Top3(scala版本) 1 数据准备 链接:https://pan.baidu.com/s/1afzmL-hNsAJl1_gx_dH2ag 提取码 ...
- spark用scala读取hive表数据(不同版本区别)
spark用scala读取hive表数据 spark1.6写法: val conf = new SparkConf() val sc = new SparkContext(conf) ...
- spark 算子使用类变量_Scala与Spark是天生的一对?
在Spark诞生之初,就有人诟病为什么AMP实验室选了一个如此小众的语言--Scala,很多人还将原因归结为学院派的高冷,但后来事实证明,选择Scala是非常正确的,Scala很多特性与Spark本身 ...
最新文章
- 除了iframe还有什么方法加载第三方网页_IE9常见问题的解决方法
- HNUOJ 13341
- RF - 完整用例展示
- 西门子ddc_铁门关西门子两通电动阀VVF42.25-10C+SKD60西
- ASP.NET Core使用功能开关控制路由访问(续)
- 川渝严重高温伏旱根源:全球气候变暖导致灾情频发
- 伪元素选择器使用场景-字体图标(CSS3)
- 从“如何设计用户超过1亿的应用”说起----数据库调优实战
- flash 与javascrip的t交互和注意事项
- 越南也自研5G设备,有这么容易?
- 树莓派CM4_5G扩展板
- miniGui交叉编译
- 航班动态查询接口 支持最新航班查询
- 网页实现从数据库读取数据并简单分页
- Freeswitch 常用命令
- HttpRequest 介绍
- APP被苹果APPStore拒绝的各种原因
- 电脑无线5g网卡发现不了网件R7000的Wifi 5g网络
- 求生之路2 服务器 修改难度,《求生之路2》服务器指令及难度参数设置难度篇.pdf...
- windows重装系统之后,开机显示“An operating system wasn't found,Try disconnecting any drives that...”(亲身遇到+解决方法)
热门文章
- ThreadLocal分析学习
- AngularJS 资源和开发教程
- 请问重定向与请求转发有什么区别?
- python判断ip地址是否合法_Python课堂:判定IP地址合法性的三种方法
- linux buffer 刷到磁盘,Linux下的磁盘缓存 linux page buffer cache深入理解
- java api apk_java-如何在APK文件中保护此API
- mysql 联合索引 range_MySQL 联合索引使用情况
- 下发布可执行文件_IOS APP 发布过程中涉及相关概念
- MySql学习10-----存储过程和函数
- python3爬虫-1