package com.shujia.coreimport org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDDobject Demo09Union {def main(args: Array[String]): Unit = {//创建Spark Contextval conf: SparkConf = new SparkConf()conf.setAppName("Demo09Union")conf.setMaster("local")//控制Spark程序默认的并行度conf.set("spark.default.parallelism","3")val sc: SparkContext = new SparkContext(conf)/*** RDD的分区数量由什么决定?* 1.默认情况下子RDD的分区数等于父RDD的分区数* 2.如果是处理HDFS的数据 第一个RDD的分区数等于“切片”的数量* 3.如果是shuffle类算子 则可以手动指定分区数* 4.可以通过Spark的参数控制分区数量 spark.default.parallelism* shuffle类算子得到的RDD分区数判断优先级:* 在shuffle类算子中手动指定分区数 > spark.default.parallelism > 父RDD的数量** 什么时候需要修改分区数?* 需要调整任务的并行度时可以通过改变分区数实现*/val stuRDD: RDD[String] = sc.textFile("Spark/data/students.txt")// getNumPartitions 返回RDD中的分区数量 它不是算子 只是RDD的一个属性println(s"stuRDD的分区数为:${stuRDD.getNumPartitions}")val sampleRDD1 = stuRDD.sample(withReplacement = false, fraction = 0.1)println(s"sampleRDD1的分区数为:${sampleRDD1.getNumPartitions}")val sampleRDD2 = stuRDD.sample(withReplacement = false, fraction = 0.1)println(s"sampleRDD2的分区数为:${sampleRDD2.getNumPartitions}")//RDD与RDD之间可以通过union进行合并/*** union 转换算子* 可以将两个RDD进行合并* 两个RDD的结构必须保存一致* 新RDD的分区数等于两个RDD的分区数量之和*/val unionRDD: RDD[String] = sampleRDD1.union(sampleRDD2)println(s"unionRDD的分区数为:${unionRDD.getNumPartitions}")unionRDD.foreach(println)//统计性别人数val groupRDD: RDD[(String, Iterable[String])] = unionRDD.groupBy(line => line.split(",")(3))println(s"groupRDD的分区数为:${groupRDD.getNumPartitions}")val genderCntRDD: RDD[String] = groupRDD.map(kv => s"${kv._1},${kv._2.size}")println(s"genderCntRDD的分区数为:${genderCntRDD.getNumPartitions}")genderCntRDD.foreach(println)//统计性别人数 手动给groupBy算子 指定分区数val groupRDD2: RDD[(String, Iterable[String])] = unionRDD.groupBy((line:String) => line.split(",")(3),8)println(s"groupRDD2的分区数为:${groupRDD2.getNumPartitions}")val genderCntRDD2: RDD[String] = groupRDD.map(kv => s"${kv._1},${kv._2.size}")println(s"genderCntRDD2的分区数为:${genderCntRDD2.getNumPartitions}")genderCntRDD2.foreach(println)//手动改变分区数val newGroupRDD2: RDD[(String, Iterable[String])] = groupRDD2//一般用于 减少分区数.coalesce(4)println(s"newGroupRDD2的分区数为:${newGroupRDD2.getNumPartitions}")val newGroupRDD3: RDD[(String, Iterable[String])] = groupRDD2//一般用于 减少分区数 如果需要增加分区则需要将shuffle参数设为true 等同于.coalesce(16,true)println(s"newGroupRDD3的分区数为:${newGroupRDD3.getNumPartitions}")val newGroupRDD4: RDD[(String, Iterable[String])] = newGroupRDD2//用于增加分区 需要产生shuffle.repartition(12)println(s"newGroupRDD4的分区数为:${newGroupRDD4.getNumPartitions}")while(true){}}
}

stuRDD的分区数为:2
sampleRDD1的分区数为:2
sampleRDD2的分区数为:2
unionRDD的分区数为:4

1500100018,骆怜雪,21,女,文科六班
1500100042,麻旭尧,24,男,文科四班
1500100050,方小蕾,22,女,文科五班
1500100057,经沛白,24,女,文科四班
1500100059,席昌燎,24,男,理科二班
1500100066,惠耘涛,22,男,文科三班
1500100077,钮元蝶,24,女,理科三班
1500100091,蔺水风,22,女,文科一班
1500100101,计又琴,23,女,文科一班
1500100106,仲怀莲,22,女,文科四班
1500100137,宣向山,22,女,理科四班
1500100154,容昊然,22,男,文科五班
1500100159,寇鸿朗,23,男,文科四班
1500100193,景寄文,21,女,文科四班
1500100196,汤浩博,21,男,文科三班
1500100199,陆慕易,24,女,文科三班
1500100223,殷醉波,21,女,文科二班
1500100239,余向南,21,女,文科四班
1500100253,管昌黎,23,男,文科四班
1500100254,阴鸿朗,24,男,理科三班
1500100262,黎盼烟,23,女,理科六班
1500100264,俞俊晖,24,男,文科三班
1500100272,蔡起运,23,男,文科四班
1500100289,荆瀚钰,24,男,理科四班
1500100291,毋高朗,21,男,文科二班
1500100292,易语梦,23,女,文科五班
1500100293,狄惜萍,23,女,理科四班
1500100300,吴采波,21,女,理科二班
1500100317,范盼菡,21,女,理科六班
1500100332,储惜蕊,23,女,理科二班
1500100334,倪曜曦,21,男,理科二班
1500100336,扶昆明,23,男,理科五班
1500100347,胥浩阔,22,男,文科五班
1500100362,元巧兰,21,女,理科五班
1500100363,郝海荣,21,男,理科一班
1500100364,段南蓉,22,女,理科一班
1500100369,扈海之,23,女,理科二班
1500100416,杜笑槐,22,女,理科四班
1500100417,沃运升,23,男,文科三班
1500100422,裘向南,24,女,理科二班
1500100425,穆南蓉,22,女,理科五班
1500100428,宗笑柳,21,女,理科三班
1500100435,殴紫萱,23,女,理科五班
1500100440,梅金鹏,22,男,文科六班
1500100456,鄂运凯,24,男,文科一班
1500100472,萧香巧,21,女,理科二班
1500100489,顾冷霜,22,女,理科二班
1500100495,戈越泽,24,男,理科六班

1500100503,强飞昂,24,男,理科五班
1500100514,乐震博,22,男,文科五班
1500100536,栾昊苍,23,男,文科一班
1500100540,全昆鹏,22,男,理科二班
1500100547,廖向南,22,女,理科五班
1500100548,荆觅雪,21,女,文科四班
1500100559,能千凡,21,女,理科六班
1500100562,怀若山,24,女,理科四班
1500100563,禄昆鹏,22,男,理科六班
1500100585,穆海超,21,男,文科一班
1500100589,全春冬,24,女,文科六班
1500100605,柏星鹏,22,男,理科一班
1500100614,邹昂然,23,男,文科六班
1500100619,水夜山,22,女,理科一班
1500100621,娄依云,21,女,文科三班
1500100622,蓬慕卉,21,女,文科五班
1500100641,卞恨蕊,21,女,文科二班
1500100651,丁从安,21,女,文科四班
1500100655,贺青易,22,女,文科六班
1500100659,祁昊天,22,男,文科二班
1500100660,黎白风,24,女,理科一班
1500100661,许华晖,21,男,理科三班
1500100668,侯从寒,23,女,文科一班
1500100669,余浩轩,24,男,理科一班
1500100672,傅宣朗,24,男,文科二班
1500100685,施昆颉,23,男,文科一班
1500100687,祝辰宇,23,男,理科四班
1500100689,隆涵阳,21,女,理科二班
1500100699,吕白风,23,女,文科二班
1500100721,钱泽雨,23,男,理科三班
1500100730,闻晓山,24,女,文科四班
1500100735,阙迎梅,22,女,文科五班
1500100745,危慕易,24,女,文科六班
1500100751,裘哲瀚,22,男,文科六班
1500100752,经山菡,21,女,文科三班
1500100761,桓鸿祯,21,男,文科二班
1500100773,傅元蝶,21,女,理科一班
1500100788,冯鸿朗,23,男,文科二班
1500100799,姚维运,22,男,文科二班
1500100807,魏笑卉,22,女,理科六班
1500100823,宓新曦,22,男,文科二班
1500100842,贡曼冬,21,女,理科一班
1500100864,柯凡雁,22,女,理科六班
1500100871,贝惜梦,24,女,文科一班
1500100895,蒙鑫鹏,22,男,理科三班
1500100917,桑智阳,22,男,理科四班
1500100936,习振锐,23,男,理科二班
1500100939,耿智杰,23,男,理科四班
1500100946,秋海白,23,女,文科一班
1500100961,李昂熙,24,男,文科四班
1500100978,郜昆卉,21,男,文科五班
1500100979,乐曜灿,24,男,文科六班
1500100983,左傲薇,22,女,理科四班
1500100987,双昆杰,24,男,文科四班
1500100004,葛德曜,24,男,理科三班
1500100041,傅景天,24,男,理科四班
1500100074,史鹏煊,22,男,理科六班
1500100084,应景平,22,男,理科三班
1500100090,富寄风,21,女,文科五班
1500100095,尹宛秋,23,女,文科三班
1500100100,宁昊磊,22,男,文科六班
1500100107,能昆明,23,男,理科五班
1500100109,从振强,24,男,文科三班
1500100118,蔺昆宇,21,男,文科一班
1500100130,宁怀莲,21,女,理科四班
1500100132,曾安寒,22,女,文科五班
1500100144,通冰夏,22,女,文科六班
1500100163,胥凡白,21,女,理科五班
1500100177,樊若翠,23,女,文科五班
1500100192,暨君昊,21,男,文科二班
1500100195,凌昆锐,21,男,文科六班
1500100204,屠昆纬,24,男,理科六班
1500100212,单旭鹏,23,男,理科二班
1500100215,费华晖,24,男,理科六班
1500100221,费智伟,23,男,理科六班
1500100222,赵嘉澍,22,男,文科六班
1500100224,裴昂熙,24,男,理科六班
1500100251,曹德昌,24,男,理科四班
1500100256,曹振锐,21,男,理科一班
1500100272,蔡起运,23,男,文科四班
1500100276,庾运鹏,24,男,文科一班
1500100278,钭寄容,24,女,文科三班
1500100287,凌觅波,23,女,理科一班
1500100348,路若云,22,女,文科四班
1500100369,扈海之,23,女,理科二班
1500100373,邢映冬,22,女,理科一班
1500100377,罗寻桃,24,女,文科六班
1500100384,郁鸿振,23,男,理科四班
1500100385,潘忆之,23,女,文科一班
1500100400,符寄风,23,女,文科二班
1500100401,宋景天,23,男,理科五班
1500100416,杜笑槐,22,女,理科四班
1500100418,蓟海昌,22,男,文科二班
1500100436,戎运凡,23,男,文科五班
1500100437,焦冰真,24,女,理科三班
1500100445,柏念瑶,23,女,理科一班
1500100463,苏若芹,22,女,文科一班
1500100474,和凌寒,21,女,文科四班
1500100493,訾俊晖,21,男,理科四班
1500100504,时傲旋,23,女,理科六班
1500100524,璩凡梦,23,女,文科四班
1500100541,赵秋灵,21,女,理科一班
1500100545,那俊晖,23,男,理科二班
1500100546,甄景逸,22,男,理科四班
1500100548,荆觅雪,21,女,文科四班
1500100574,汤鸿畅,21,男,文科四班
1500100575,张恨桃,24,女,理科五班
1500100580,程轩昂,22,男,文科六班
1500100599,伊昆卉,22,男,理科六班
1500100600,侯鸿晖,22,男,理科四班
1500100607,堵迎荷,22,女,理科二班
1500100630,窦小萍,24,女,理科二班
1500100639,殳迎彤,24,女,理科六班
1500100640,纪昌黎,23,男,文科一班
1500100642,宗巧兰,21,女,文科四班
1500100686,郜旭彬,23,男,文科三班
1500100726,任笑容,23,女,理科五班
1500100753,党景明,23,男,文科六班
1500100760,翁绮彤,24,女,文科五班
1500100762,聂德明,23,男,理科二班
1500100764,濮浩皛,23,男,文科二班
1500100771,蓟智杰,23,男,文科二班
1500100782,罗静珊,22,女,文科六班
1500100784,花运发,24,男,文科三班
1500100793,庾向南,23,女,文科二班
1500100797,殳碧春,22,女,理科六班
1500100806,满金鹏,22,男,理科五班
1500100815,牧晋鹏,22,男,文科六班
1500100824,柏小蕾,24,女,理科五班
1500100829,尤香薇,22,女,文科六班
1500100831,宦芷容,21,女,理科六班
1500100833,赖香巧,21,女,文科三班
1500100838,养惜梦,24,女,文科三班
1500100853,林鸿朗,24,男,理科四班
1500100855,彭旭鹏,24,男,理科四班
1500100883,叶海超,24,男,文科三班
1500100887,都若山,22,女,文科六班
1500100902,丰昊明,23,男,文科六班
1500100930,闻运凯,24,男,文科五班
1500100931,庄昆卉,21,男,理科二班
1500100934,隆高旻,21,男,理科五班
1500100936,习振锐,23,男,理科二班
1500100939,耿智杰,23,男,理科四班
1500100942,贡涵亮,21,男,理科六班
1500100948,马昊天,24,男,理科二班
1500100974,容鸿晖,21,男,文科五班
1500100976,卓芷梦,21,女,文科六班
1500100981,经鹏涛,23,男,文科六班

groupRDD的分区数为:3
genderCntRDD的分区数为:3
女,95
男,101
groupRDD2的分区数为:8
genderCntRDD2的分区数为:3
女,95
男,101
newGroupRDD2的分区数为:4
newGroupRDD3的分区数为:16
newGroupRDD4的分区数为:12

package com.shujia.coreimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object Demo10Join {def main(args: Array[String]): Unit = {//创建Spark Contextval conf: SparkConf = new SparkConf()conf.setAppName("Demo09Union")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)//读取学生数据及分数数据 并将每一行数据转化为样例类对象val stuRDD: RDD[Student] = sc.textFile("Spark/data/students.txt").map(line => {val splits: Array[String] = line.split(",")val id: String = splits(0)val name: String = splits(1)val age: Int = splits(2).toIntval gender: String = splits(3)val clazz: String = splits(4)Student(id, name, age, gender, clazz)})val scoreRDD: RDD[Score] = sc.textFile("Spark/data/score.txt").map(line => {val splits: Array[String] = line.split(",")val id: String = splits(0)val subId: String = splits(1)val score: Int = splits(2).toIntScore(id, subId, score)})/*** 将学生及分数数据关联 [id,name,clazz,subId,score]* 关联之前需要将RDD变成KV格式  一般将关联的字段作为Key Value自定*/val stuKVRDD: RDD[(String, String)] = stuRDD.map(stu => (stu.id, s"${stu.name},${stu.clazz}"))val scoreKVRDD: RDD[(String, String)] = scoreRDD.map(score => (score.id, s"${score.subId},${score.score}"))val joinRDD: RDD[(String, (String, String))] = stuKVRDD.join(scoreKVRDD)joinRDD.foreach(println)val stuKVRDD1: RDD[(String, Student)] = stuRDD.map(stu => (stu.id, stu))val scoreKVRDD1: RDD[(String,Score)] = scoreRDD.map(score => (score.id, score))val joinRDD1: RDD[(String, (Student,Score))] = stuKVRDD1.join(scoreKVRDD1)joinRDD1.map(kv=>{val id: String = kv._1val stuScoreTuple: (Student, Score) = kv._2val stu: Student = stuScoreTuple._1val score: Score = stuScoreTuple._2s"$id,${stu.name},${stu.clazz},${score.subId},${score.score}"}).foreach(println)//使用match 简写joinRDD1.map{case (id:String,(stu:Student,score:Score))=>s"$id,${stu.name},${stu.clazz},${score.subId},${score.score}"}.foreach(println)val leftJoinRDD: RDD[(String, (Student, Option[Score]))] = stuKVRDD1.leftOuterJoin(scoreKVRDD1)leftJoinRDD.map{case(id:String,(stu:Student,Some(score)))=>s"$id,${stu.name},${stu.clazz},${score.subId},${score.score}"case(id:String,(stu:Student,None))=>s"$id,${stu.name},${stu.clazz},null,null"}.foreach(println)}case class Student(id:String,name:String,age:Int,gender:String,clazz:String)case class Score(id:String,subId:String,score:Int)}

决定RDD分区数因素、关联相关推荐

  1. 2021年大数据Spark(十五):Spark Core的RDD常用算子

    目录 常用算子 基本算子 分区操作函数算子 重分区函数算子 1).增加分区函数 2).减少分区函数 3).调整分区函数 ​​​​​​​聚合函数算子 ​​​​​​​Scala集合中的聚合函数 ​​​​​ ...

  2. spark基础之RDD详解

    一 什么是RDD,有什么特点? RDD: Resilient Distributed Dataset,弹性分布式数据集. 特点: # 它是一种数据的集合 # 它可以被分区,每一个分区分布在不同的集群中 ...

  3. Spark算子与RDD基本转换

    map 将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素. 输入分区与输出分区一对一,即:有多少个输入分区,就有多少个输出分区. flatMap 属于Transformation算子 ...

  4. 理解RDD的Partition

    比如读一份本地文件成为RDD,应该分成多少个区? 如何分区? 就是分区的数量以及分区的方法 private val value: RDD[String] = sc.textFile("D:/ ...

  5. Spark RDD与Partion

    一.RDD的概述 1.1 什么是RDD? RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变.可分区.里面的元素 ...

  6. PySpark | RDD

    文章目录 一.RDD详解 1.为什么需要RDD? 2.什么是RDD? 3.RDD的5大特性 4.WordCount案例中的RDD 5.总结 二.RDD编程入门 1.程序入口 SparkContext对 ...

  7. RDD操作(超详细)

    1.RDD操作详解 1.1 基本转换 map map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD. 任何原RDD中的元素在新RDD中都有且只有一个元素与之对应. 举例: //设置sp ...

  8. rdd数据存内存 数据量_超全spark性能优化总结

    Spark是大数据分析的利器,在工作中用到spark的地方也比较多,这篇总结是希望能将自己使用spark的一些调优经验分享出来. 一.常用参数说明 --driver-memory 4g : drive ...

  9. Spark的RDD操作之Join大全

    一.RDD的Join操作有哪些? (一)Join:Join类似于SQL的inner join操作,返回结果是前面和后面集合中配对成功的,过滤掉关联不上的.源代码如下: /** * Return an ...

最新文章

  1. 华为ax3怎么接光纤sc接口_视频监控工程中使用光纤光缆注意事项有哪些?
  2. 探测器反向偏压_科学网—《涨知识啦22》---MSM型光电探测器 - 寇建权的博文
  3. Vue父子组件之间的传值
  4. 2018 美团、腾讯、头条、蔚来 社招面试随谈
  5. linux驱动大小,为什么在Linux字符驱动程序读取调用中大小总是= 4096?
  6. 批量删除满足指定条件的 SAP CRM One Order 订单
  7. 三列布局-中间固定俩边自适应-和两边固定中间自适应布局
  8. python网络爬虫基础day01
  9. SQL 查找是否存在,别再 count 了,很耗费时间的!
  10. Nacos教程_1 简介和安装
  11. mac 版 idea 破解
  12. Hybird接口的理解
  13. Linear Mixde Model:线性混合模型简介
  14. vue随笔1-element-UI中checkbook多选框-复选框样式修改
  15. win7一点计算机就卡死,win7系统电脑经常卡住假死页面关不掉的解决方法
  16. java上传文件夹文件
  17. 计算机桌面有个方框,电脑屏幕的白色方框怎么清除
  18. html tr固定行高列宽,HTML表格固定格式:行高列宽
  19. link.sct解析
  20. Contest2574 - 高级语言程序实践--第6次作业--计信A2107-2113

热门文章

  1. [TypeScript]中字符和ASCII码转换
  2. MBA书籍推荐:打造商业思维,看这一本书就够了
  3. imovie导入媒体没有声音的解决办法
  4. linux下uboot内存测试,uboot中内存测试,内存检测方法
  5. 如何在C#中将 加载、编辑WPS表格?国产控件就能搞定
  6. Verilog中可综合及不可综合语句概述
  7. Linux系统备份系统还原
  8. 涂涂乐的详细实现之四--unity3d调用EmguCV实现图片识别
  9. kubectl describe pod 里边没有看到events问题解决【详细步骤】
  10. GDB基本命令(整合)