第1关:转换算子之map和distinct算子


import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object EduCoder1 {def main(args: Array[String]): Unit = {val conf =new SparkConf().setAppName("educoder1").setMaster("local")val sc=new SparkContext(conf)val rdd = sc.parallelize(List("dog","an","cat","an","cat"))/********** Begin **********///第一步:通过获取rdd中每个元素的长度创建新的rdd1val rdd1=rdd.map(x=>x.length)//第二步:通过zip把rdd1和rdd组合创建rdd2val rdd2=rdd.zip(rdd1)//第三步:去重val rdd3=rdd2.distinct()//第四步:输出结果rdd3.foreach(println)  /********** End **********/sc.stop()}}

第2关:转换算子之flatMap和filter算子

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object EduCoder2 {def main(args: Array[String]): Unit = {val conf =new SparkConf().setAppName("educoder2").setMaster("local")val sc=new SparkContext(conf)val rdd=sc.textFile("file:///root/step3_fils")/********** Begin **********///第一步:对所给数据创建的rdd切割分词val rdd1=rdd.flatMap(t=>t.split(","))//第二步:每个单词计数为1val rdd2= rdd1.map(t=>(t,1))//第三步:对相同单词个数进行累加val rdd3=rdd2.reduceByKey(_+_)//第四步:过滤出单词个数大于一个的val rdd4= rdd3.filter(t=>t._2>1)//第五步:输出结果rdd4.foreach(println)/********** End **********/sc.stop()}}

第3关:转换算子之reduceBykey和mapValues算子


import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object EduCoder3 {def main(args: Array[String]): Unit = {val conf =new SparkConf().setAppName("educoder3").setMaster("local")val sc=new SparkContext(conf)/********** Begin **********///通过给定数据通过序列化方式创建rddval rdd = sc.parallelize(List(("spark",(10,50)),("hadoop",(5,40)),("hadoop",(10,25)),("spark",(40,25))))//求出一天收入总和以及出售本数val rdd2 = rdd.reduceByKey((x,y) => ((x._1*x._2)+(y._1*y._2), x._1+y._1))//求出每本平均售价val rdd3 = rdd2.mapValues(x => x._1 / x._2)//输出结果rdd3.foreach(println)/********** End **********/sc.stop}
}

第4关:转化算子之groupByKey和sortByKey


import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object EduCoder4 {def main(args: Array[String]): Unit = {val conf =new SparkConf().setAppName("educoder4").setMaster("local")val sc=new SparkContext(conf)val rdd = sc.parallelize(List(("Bob","spark"),("Lily","hadoop"),("Candy","hive"),("Bob","hbase"),("Bob","hive")))/********** Begin **********/val rdd1= rdd.groupByKey()//求出每个人的书籍本数val rdd2= rdd1.mapValues(t=>t.toList.size)//根据姓名排序val rdd3= rdd2.sortByKey()//打印结果rdd3.foreach(println)/********** End **********/sc.stop()}}

第5关:常见行动算子


import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object EduCoder5 {def main(args: Array[String]): Unit = {val conf =new SparkConf().setAppName("educoder5").setMaster("local")val sc=new SparkContext(conf)val rdd = sc.parallelize(List("dog","sun","an","cat"))/********** Begin **********///返回所给rdd的元素个数并输出val r1=rdd.count()println(r1)//返回rdd的前三个元素并输出val rdd1=rdd.take(3)rdd1.foreach(println)//累加rdd的所有元素并输出结果val r2=rdd.reduce(_+_)println(r2)//收集所有元素并且输出rdd.collect().foreach(println)/********** End **********/sc.stop()}}

第6关:算子的综合使用案例

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object EduCoder {
def main(args: Array[String]): Unit = {
val conf =new  SparkConf().setAppName("educoder").setMaster("local")
val sc=new SparkContext(conf)
val line=sc.textFile("file:///root/step1_fils")
/********** Begin **********/
//根据需求,去除城市字段
val rdd1 = line.map(t => {
val arr = t.split(",")
((arr(0), arr(1), arr(2), arr(3)), arr(5))
})
//按key分组,key是除城市字段和版本号字段``以外的所有字段,value是版本号
val rdd2=rdd1.groupByKey()
//过滤版本号重复的``(例:(v2.0,v2.0))以及版本号只有一个的(例(v1.0))
val rdd3=rdd2.mapValues(t=>t.toList.distinct).filter(t=>t._2.length>1)
//拆分重新组合 例:(key,(v2.0,v2.5,v3.0))拆分成(key,(v2.0,v2.5),((key,(v2.5,v3.0)))
val rdd4= rdd3.mapValues(t => {
val tai = t.tail
t.zip(tai)
})
//按需求整理输出格式(例:(2017-08-14,Lily,Facebook,360  Shop,v1.2,v2.0))
val rdd5= rdd4.flatMap(t => {
t._2.map(tp => {
(t._1._1, t._1._2, t._1._3, t._1._4, tp._1, tp._2)
})
})
//执行foreach操作,打印出结果
rdd5.foreach(println)
/********** End **********/
sc.stop()
}
}

Spark算子--Scala版本 educoder相关推荐

  1. 头歌educoder Spark算子--Scala版本 实训答案

    第1关:转换算子之map和distinct算子 import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkCon ...

  2. Educoder中Spark算子--java版本

    第1关:QueueStream 编程要求 在右侧编辑器补充代码,完成以下需求: 将时间戳转换成规定格式的时间形式(格式为:yyyy-MM-dd HH:mm:ss ) 提取数据中的起始URL(切割符为空 ...

  3. 曹洁 Spark编程Scala版本课后习题答案

    第二章课后习题 2-1 选D D项错在元组可以定义多个元素,通过(x,x,x,-) 2-2 选A A错在 元组的访问方式是 x._1来访问元组中的第一个元素.注意是从下标1开始的,数组是从下标0开始的 ...

  4. spark 算子例子_Spark性能调优方法

    公众号后台回复关键词:pyspark,获取本项目github地址. Spark程序可以快如闪电⚡️,也可以慢如蜗牛?. 它的性能取决于用户使用它的方式. 一般来说,如果有可能,用户应当尽可能多地使用S ...

  5. spark(scala) shell 里面输入多行代码

    spark(scala版本) shell 里面输入多行代码 但如果出现这种情况(如下图所示) 建议大家还是在linux系统中,利用脚本实现代码的运行.具体步骤如下: 进入Linux系统终端,输入相关代 ...

  6. 查看spark版本linux,如何查看spark版本和scala版本

    如何查看spark版本和scala版本 发布时间:2018-11-21 05:48, 浏览次数:2445 , 标签: spark scala 1.进入命令行状态 windows 电脑 方法一: 在系统 ...

  7. 大数据Spark入门案例5–统计广告点击数量排行Top3(scala版本)

    大数据Spark入门案例5–统计每广告点击数量排行Top3(scala版本) 1 数据准备 链接:https://pan.baidu.com/s/1afzmL-hNsAJl1_gx_dH2ag 提取码 ...

  8. spark用scala读取hive表数据(不同版本区别)

    spark用scala读取hive表数据 spark1.6写法: val conf = new SparkConf()      val sc = new SparkContext(conf)     ...

  9. spark 算子使用类变量_Scala与Spark是天生的一对?

    在Spark诞生之初,就有人诟病为什么AMP实验室选了一个如此小众的语言--Scala,很多人还将原因归结为学院派的高冷,但后来事实证明,选择Scala是非常正确的,Scala很多特性与Spark本身 ...

最新文章

  1. 除了iframe还有什么方法加载第三方网页_IE9常见问题的解决方法
  2. HNUOJ 13341
  3. RF - 完整用例展示
  4. 西门子ddc_铁门关西门子两通电动阀VVF42.25-10C+SKD60西
  5. ASP.NET Core使用功能开关控制路由访问(续)
  6. 川渝严重高温伏旱根源:全球气候变暖导致灾情频发
  7. 伪元素选择器使用场景-字体图标(CSS3)
  8. 从“如何设计用户超过1亿的应用”说起----数据库调优实战
  9. flash 与javascrip的t交互和注意事项
  10. 越南也自研5G设备,有这么容易?
  11. 树莓派CM4_5G扩展板
  12. miniGui交叉编译
  13. 航班动态查询接口 支持最新航班查询
  14. 网页实现从数据库读取数据并简单分页
  15. Freeswitch 常用命令
  16. HttpRequest 介绍
  17. APP被苹果APPStore拒绝的各种原因
  18. 电脑无线5g网卡发现不了网件R7000的Wifi 5g网络
  19. 求生之路2 服务器 修改难度,《求生之路2》服务器指令及难度参数设置难度篇.pdf...
  20. windows重装系统之后,开机显示“An operating system wasn't found,Try disconnecting any drives that...”(亲身遇到+解决方法)

热门文章

  1. ThreadLocal分析学习
  2. AngularJS 资源和开发教程
  3. 请问重定向与请求转发有什么区别?
  4. python判断ip地址是否合法_Python课堂:判定IP地址合法性的三种方法
  5. linux buffer 刷到磁盘,Linux下的磁盘缓存 linux page buffer cache深入理解
  6. java api apk_java-如何在APK文件中保护此API
  7. mysql 联合索引 range_MySQL 联合索引使用情况
  8. 下发布可执行文件_IOS APP 发布过程中涉及相关概念
  9. MySql学习10-----存储过程和函数
  10. python3爬虫-1