简单的combineByKey算子【看完就懂系列】
代码先行:
val conf = new SparkConf().setMaster("local").setAppName("CbkDemo")val sc = new SparkContext(conf)sc.setLogLevel("error")val rdd: RDD[(String, Double)] = sc.parallelize(Array(("George", 88.0), ("George", 95.0), ("George", 88.0),("KangKang", 93.0),("KangKang", 95.0), ("KangKang", 98.0),("limu", 98.0)))
// val rdd2: RDD[(String, Double)] = rdd.coalesce(3)//求和/*** createCombiner: V => C ,这个函数把当前的值作为参数,此时我们可以对其做些附加操作(类型转换)并把它返回 (这一步类似于初始化操作)* mergeValue: (C, V) => C,该函数把元素V合并到之前的元素C(createCombiner)上 (这个操作在每个分区内进行)* mergeCombiners: (C, C) => C,该函数把2个元素C合并 (这个操作在不同分区间进行)*/val res = rdd.combineByKey(x => {println(s"$x******");x},(x: Double, y: Double) => {println(s"$x%%%%%%$y");x+y},(x: Double, y: Double) => {println(s"$x@@@@@@$y");x+y})res.foreach(println)
输出结果:
88.0******
88.0%%%%%%95.0
183.0%%%%%%88.0
93.0******
93.0%%%%%%95.0
188.0%%%%%%98.0
98.0******
(George,271.0)
(KangKang,286.0)
(limu,98.0)
图示:
那么怎么走第三部呢?
mergeCombiners: (C, C) => C,该函数把2个元素C合并 (这个操作在不同分区间进行)
val conf = new SparkConf().setMaster("local").setAppName("CbkDemo")val sc = new SparkContext(conf)sc.setLogLevel("error")val rdd: RDD[(String, Double)] = sc.parallelize(Array(("George", 88.0), ("George", 95.0), ("George", 88.0),("George", 88.0), ("George", 95.0), ("George", 88.0),("George", 88.0), ("George", 95.0), ("George", 88.0),("George", 88.0), ("George", 95.0), ("George", 88.0),("KangKang", 93.0),("KangKang", 95.0), ("KangKang", 98.0),("limu", 98.0)),3)
// val rdd2: RDD[(String, Double)] = rdd.coalesce(3)//求和/*** createCombiner: V => C ,这个函数把当前的值作为参数,此时我们可以对其做些附加操作(类型转换)并把它返回 (这一步类似于初始化操作)* mergeValue: (C, V) => C,该函数把元素V合并到之前的元素C(createCombiner)上 (这个操作在每个分区内进行)* mergeCombiners: (C, C) => C,该函数把2个元素C合并 (这个操作在不同分区间进行)*/val res = rdd.combineByKey(x => {println(s"$x******");x},(x: Double, y: Double) => {println(s"$x%%%%%%$y");x+y},(x: Double, y: Double) => {println(s"$x@@@@@@$y");x+y})res.foreach(println)
结果展示:
88.0******
88.0%%%%%%95.0
183.0%%%%%%88.0
271.0%%%%%%88.0
359.0%%%%%%95.0
88.0******
88.0%%%%%%88.0
176.0%%%%%%95.0
271.0%%%%%%88.0
359.0%%%%%%88.0
95.0******
95.0%%%%%%88.0
93.0******
93.0%%%%%%95.0
188.0%%%%%%98.0
98.0******
454.0@@@@@@447.0
901.0@@@@@@183.0
(George,1084.0)
(limu,98.0)
(KangKang,286.0)
图示:
【总结】
方法的第一个操作在相同分区相同key的时候只操作一次,然后一直进行第二个操作,如果不同分区中有相同的key值则进行第三步操作,否则不执行第三步操作。【因为第二步操作已经把结果算出来了】
友情提示:之所以我们的输出第二步操作时没有输出最终结果,原因在于,为了返回值。我们把输出语句放在了前面,也就是说输出语句后,还有一步加的操作。
趁热打铁:
val conf = new SparkConf().setMaster("local").setAppName("CbkDemo")val sc = new SparkContext(conf)sc.setLogLevel("error")val rdd: RDD[(String, Double)] = sc.parallelize(Array(("George", 88.0), ("George", 95.0), ("George", 88.0),("George", 88.0), ("George", 95.0), ("George", 88.0),("George", 88.0), ("George", 95.0), ("George", 88.0),("George", 88.0), ("George", 95.0), ("George", 88.0),("KangKang", 93.0),("KangKang", 95.0), ("KangKang", 98.0),("limu", 98.0)),3)//求平均数val res: RDD[(String, (Int, Double))] = rdd.combineByKey(score => (1, score),(total: (Int, Double), newScore) => (total._1 + 1, total._2 + newScore),(total: (Int, Double), sum: (Int, Double)) => (total._1 + sum._1, total._2 + sum._2))val fin: RDD[(String, Double)] = res.map{case (name,(num,score)) => (name,score/num)}fin.foreach(println)
输出结果:
(George,90.33333333333333)
(limu,98.0)
(KangKang,95.33333333333333)
再此总结:【谁让它不是很好理解】
combineByKey
是针对不同partition进行操作的。它的第一个参数用于数据初始化(后面着重讲),第二个是针对一个partition的combine操作函数,第三个是在所有partition都combine完毕后,针对所有临时结果进行combine操作的函数。
友情补充:
关于数据初始化
之前有人说,初始化是对每个数据进行操作,这其实是错误的。应该是针对每个partition中,每个key下的第一个数据进行操作。这句话怎么理解呢?看代码:
val rdd1 = sc.parallelize(List(1,2,2,3,3,3,3,4,4,4,4,4), 2)
val rdd2 = rdd1.map((_, 1))
val rdd3 = rdd2.combineByKey(-_, (x:Int, y:Int) => x + y, (x:Int, y:Int) => x + y)
val rdd4 = rdd2.combineByKey(+_, (x:Int, y:Int) => x + y, (x:Int, y:Int) => x + y)
rdd2.collect
rdd3.collect
rdd4.collect
Array((1,1), (2,1), (2,1), (3,1), (3,1), (3,1), (3,1), (4,1), (4,1), (4,1), (4,1), (4,1))
Array((4,3), (2,0), (1,-1), (3,0))
Array((4,5), (2,2), (1,1), (3,4))
在上述代码中,(1,1), (2,1), (2,1), (3,1), (3,1), (3,1) 被划分到第一个partition,(3,1), (4,1), (4,1), (4,1), (4,1), (4,1) 被划分到第二个。于是有如下操作:
(1, 1):由于只有1个,所以在值取负的情况下,自然输出(1, -1)
(2, 1):由于有2个,第一个取负,第二个不变,因此combine后为(2, 0)
(3, 1):partition1中有3个,参照上述规则,combine后为(3, 1),partition2中有1个,因此combine后为(3, -1)。在第二次combine时,不会有初始化操作,因此直接相加,结果为(3, 0)
(4, 1):过程同上,结果为(4, 3)
简单的combineByKey算子【看完就懂系列】相关推荐
- 看完就懂系列—动态规划
前言 动态规划的主要思想 将原问题分解为更简单的子问题(重要的事情默念三遍),通过解决子问题来解决原问题. 记忆化搜索(存储子问题的解,解决重叠子问题多次计算的问题). 动态规划的三要素: 最优子结构 ...
- 《看完就懂系列》15个方法教你玩转字符串
这是大冰块2021年第2篇原创文章,和大冰块一起在前端领域努力吧!!! 作为JavaScript基础数据类型,字符串具有一些很强大的方法,在平时的工作中用到的地方也特别的多,很多时候因为不熟悉某个字符 ...
- 高铁、动车到底啥区别?看完彻底懂了(组图)
摘自:网易新闻 (原标题:高铁.动车到底啥区别?看完彻底懂了(组图)) 高铁与动车的区别到底在哪里?磁悬浮列车又是什么鬼?今天给你讲讲清楚! 高铁.动车到底啥区别?看完彻底懂了 一.普通列车与高铁钢轨 ...
- android 7 plus,手机别瞎买,iPhone7plus相当于什么档次的安卓机?看完就懂了!
手机别瞎买,iPhone7plus相当于什么档次的安卓机?看完就懂了! 作为此前一直比较受欢迎的苹果手机,一直是安卓手机的大力比拼的对象,目前已经有的安卓手机能够在拍照的性能上超过苹果手机了,虽然说在 ...
- 看完弄懂,明年至少加 5K
看完弄懂,明年至少加 5K
- 网络通过猫传输到计算机,网络直接从光猫出来好还是接个路由器再接入电脑好?看完搞懂了...
网络直接从光猫出来好还是接个路由器再接入电脑好?看完搞懂了 宽带网络现在是家家户户不可缺少的"硬件"之一,现在即便是老一辈的人家中安装宽带都成了必需品.有些偏好用电脑来上网的朋友可 ...
- 华为mate10pro以后能上鸿蒙吗,华为Mate10和Mate10 Pro差别一览 怎么选看完就懂
华为Mate10和Mate10 Pro差别一览 怎么选看完就懂上周五华为正式发布了今年的两款重磅旗舰Mate10和Mate10 Pro.与上代产品不同,此次Mate10系列的两款产品无论是在外观还是一 ...
- 新手入门,数控刀具上的代码怎么认?看完就懂了!
新手入门,数控刀具上的代码怎么认?看完就懂了! 按照不同的刀具类型对刀具分组: 类别组1 xxyyy(铣刀类): 110 球面铣刀 (圆柱型铣刀,其后的字母y代表铣刀直径,以下略同) 120 立铣刀 ...
- 为什么会显示有人正在使用计算机,微信“对方正在输入”为什么有时出现?有时不出现?看完才懂了.....
原标题:微信"对方正在输入"为什么有时出现?有时不出现?看完才懂了.. 生活中有很多美好的事情 手机电量满格 您的快递正在派送 换季衣服里翻出毛爷爷 与喜欢的人聊天显示" ...
最新文章
- java 读取文件,内容方置Person 并写到另外地址
- [NHibernate]基本配置与测试
- vmware安装mac终于成功
- 基于matlab异步电机 s函数,建立电机状态方程的S 函数和仿真模)基于MATLAB的无刷双馈电机建模与仿真...
- WebSocket 实现 Web 端即时通信
- 团队作业7——Beta版本冲刺计划及安排
- JAVA提取字符串信息中的数字(包括整数,小数)工具类
- 理解WebSocket心跳及重连机制以及加强版的reconnectingwebsocket.js实现
- 整理C知识点--函数(重点)
- 电子游戏理论基础初探
- NDTTS-II变压器综合试验系统
- 芯片手册不需要全部看,抓住框架信息即可
- python案例小游戏
- Android虚拟机参数
- 天梯赛题目练习——高速公路超速处罚(附带测试点)
- 用Altium Designer打开Protel文件的方法
- 《MySQL数据库用户模块需求分析》
- 2020年浙江机器人公司十大品牌排名
- BI在连锁零售业应用
- 中国大学MOOC第六周作业孔融分梨