代码先行:

val conf = new SparkConf().setMaster("local").setAppName("CbkDemo")val sc = new SparkContext(conf)sc.setLogLevel("error")val rdd: RDD[(String, Double)] = sc.parallelize(Array(("George", 88.0), ("George", 95.0), ("George", 88.0),("KangKang", 93.0),("KangKang", 95.0), ("KangKang", 98.0),("limu", 98.0)))
//    val rdd2: RDD[(String, Double)] = rdd.coalesce(3)//求和/*** createCombiner: V => C ,这个函数把当前的值作为参数,此时我们可以对其做些附加操作(类型转换)并把它返回 (这一步类似于初始化操作)* mergeValue: (C, V) => C,该函数把元素V合并到之前的元素C(createCombiner)上 (这个操作在每个分区内进行)* mergeCombiners: (C, C) => C,该函数把2个元素C合并 (这个操作在不同分区间进行)*/val res = rdd.combineByKey(x => {println(s"$x******");x},(x: Double, y: Double) => {println(s"$x%%%%%%$y");x+y},(x: Double, y: Double) => {println(s"$x@@@@@@$y");x+y})res.foreach(println)

输出结果:

88.0******
88.0%%%%%%95.0
183.0%%%%%%88.0
93.0******
93.0%%%%%%95.0
188.0%%%%%%98.0
98.0******
(George,271.0)
(KangKang,286.0)
(limu,98.0)

图示:

那么怎么走第三部呢?

mergeCombiners: (C, C) => C,该函数把2个元素C合并 (这个操作在不同分区间进行)
val conf = new SparkConf().setMaster("local").setAppName("CbkDemo")val sc = new SparkContext(conf)sc.setLogLevel("error")val rdd: RDD[(String, Double)] = sc.parallelize(Array(("George", 88.0), ("George", 95.0), ("George", 88.0),("George", 88.0), ("George", 95.0), ("George", 88.0),("George", 88.0), ("George", 95.0), ("George", 88.0),("George", 88.0), ("George", 95.0), ("George", 88.0),("KangKang", 93.0),("KangKang", 95.0), ("KangKang", 98.0),("limu", 98.0)),3)
//    val rdd2: RDD[(String, Double)] = rdd.coalesce(3)//求和/*** createCombiner: V => C ,这个函数把当前的值作为参数,此时我们可以对其做些附加操作(类型转换)并把它返回 (这一步类似于初始化操作)* mergeValue: (C, V) => C,该函数把元素V合并到之前的元素C(createCombiner)上 (这个操作在每个分区内进行)* mergeCombiners: (C, C) => C,该函数把2个元素C合并 (这个操作在不同分区间进行)*/val res = rdd.combineByKey(x => {println(s"$x******");x},(x: Double, y: Double) => {println(s"$x%%%%%%$y");x+y},(x: Double, y: Double) => {println(s"$x@@@@@@$y");x+y})res.foreach(println)

结果展示:

88.0******
88.0%%%%%%95.0
183.0%%%%%%88.0
271.0%%%%%%88.0
359.0%%%%%%95.0
88.0******
88.0%%%%%%88.0
176.0%%%%%%95.0
271.0%%%%%%88.0
359.0%%%%%%88.0
95.0******
95.0%%%%%%88.0
93.0******
93.0%%%%%%95.0
188.0%%%%%%98.0
98.0******
454.0@@@@@@447.0
901.0@@@@@@183.0
(George,1084.0)
(limu,98.0)
(KangKang,286.0)

图示:

【总结】

方法的第一个操作在相同分区相同key的时候只操作一次,然后一直进行第二个操作,如果不同分区中有相同的key值则进行第三步操作,否则不执行第三步操作。【因为第二步操作已经把结果算出来了】

友情提示:之所以我们的输出第二步操作时没有输出最终结果,原因在于,为了返回值。我们把输出语句放在了前面,也就是说输出语句后,还有一步加的操作。


趁热打铁:

val conf = new SparkConf().setMaster("local").setAppName("CbkDemo")val sc = new SparkContext(conf)sc.setLogLevel("error")val rdd: RDD[(String, Double)] = sc.parallelize(Array(("George", 88.0), ("George", 95.0), ("George", 88.0),("George", 88.0), ("George", 95.0), ("George", 88.0),("George", 88.0), ("George", 95.0), ("George", 88.0),("George", 88.0), ("George", 95.0), ("George", 88.0),("KangKang", 93.0),("KangKang", 95.0), ("KangKang", 98.0),("limu", 98.0)),3)//求平均数val res: RDD[(String, (Int, Double))] = rdd.combineByKey(score => (1, score),(total: (Int, Double), newScore) => (total._1 + 1, total._2 + newScore),(total: (Int, Double), sum: (Int, Double)) => (total._1 + sum._1, total._2 + sum._2))val fin: RDD[(String, Double)] = res.map{case (name,(num,score)) => (name,score/num)}fin.foreach(println)

输出结果:

(George,90.33333333333333)
(limu,98.0)
(KangKang,95.33333333333333)

再此总结:【谁让它不是很好理解】

combineByKey

是针对不同partition进行操作的。它的第一个参数用于数据初始化(后面着重讲),第二个是针对一个partition的combine操作函数,第三个是在所有partition都combine完毕后,针对所有临时结果进行combine操作的函数。

友情补充:

关于数据初始化

之前有人说,初始化是对每个数据进行操作,这其实是错误的。应该是针对每个partition中,每个key下的第一个数据进行操作。这句话怎么理解呢?看代码:

val rdd1 = sc.parallelize(List(1,2,2,3,3,3,3,4,4,4,4,4), 2)

val rdd2 = rdd1.map((_, 1))

val rdd3 = rdd2.combineByKey(-_, (x:Int, y:Int) => x + y, (x:Int, y:Int) => x + y)

val rdd4 = rdd2.combineByKey(+_, (x:Int, y:Int) => x + y, (x:Int, y:Int) => x + y)

rdd2.collect

rdd3.collect

rdd4.collect

Array((1,1), (2,1), (2,1), (3,1), (3,1), (3,1), (3,1), (4,1), (4,1), (4,1), (4,1), (4,1))

Array((4,3), (2,0), (1,-1), (3,0))

Array((4,5), (2,2), (1,1), (3,4))

在上述代码中,(1,1), (2,1), (2,1), (3,1), (3,1), (3,1) 被划分到第一个partition,(3,1), (4,1), (4,1), (4,1), (4,1), (4,1) 被划分到第二个。于是有如下操作:

(1, 1):由于只有1个,所以在值取负的情况下,自然输出(1, -1)

(2, 1):由于有2个,第一个取负,第二个不变,因此combine后为(2, 0)

(3, 1):partition1中有3个,参照上述规则,combine后为(3, 1),partition2中有1个,因此combine后为(3, -1)。在第二次combine时,不会有初始化操作,因此直接相加,结果为(3, 0)

(4, 1):过程同上,结果为(4, 3)

简单的combineByKey算子【看完就懂系列】相关推荐

  1. 看完就懂系列—动态规划

    前言 动态规划的主要思想 将原问题分解为更简单的子问题(重要的事情默念三遍),通过解决子问题来解决原问题. 记忆化搜索(存储子问题的解,解决重叠子问题多次计算的问题). 动态规划的三要素: 最优子结构 ...

  2. 《看完就懂系列》15个方法教你玩转字符串

    这是大冰块2021年第2篇原创文章,和大冰块一起在前端领域努力吧!!! 作为JavaScript基础数据类型,字符串具有一些很强大的方法,在平时的工作中用到的地方也特别的多,很多时候因为不熟悉某个字符 ...

  3. 高铁、动车到底啥区别?看完彻底懂了(组图)

    摘自:网易新闻 (原标题:高铁.动车到底啥区别?看完彻底懂了(组图)) 高铁与动车的区别到底在哪里?磁悬浮列车又是什么鬼?今天给你讲讲清楚! 高铁.动车到底啥区别?看完彻底懂了 一.普通列车与高铁钢轨 ...

  4. android 7 plus,手机别瞎买,iPhone7plus相当于什么档次的安卓机?看完就懂了!

    手机别瞎买,iPhone7plus相当于什么档次的安卓机?看完就懂了! 作为此前一直比较受欢迎的苹果手机,一直是安卓手机的大力比拼的对象,目前已经有的安卓手机能够在拍照的性能上超过苹果手机了,虽然说在 ...

  5. 看完弄懂,明年至少加 5K

    看完弄懂,明年至少加 5K

  6. 网络通过猫传输到计算机,网络直接从光猫出来好还是接个路由器再接入电脑好?看完搞懂了...

    网络直接从光猫出来好还是接个路由器再接入电脑好?看完搞懂了 宽带网络现在是家家户户不可缺少的"硬件"之一,现在即便是老一辈的人家中安装宽带都成了必需品.有些偏好用电脑来上网的朋友可 ...

  7. 华为mate10pro以后能上鸿蒙吗,华为Mate10和Mate10 Pro差别一览 怎么选看完就懂

    华为Mate10和Mate10 Pro差别一览 怎么选看完就懂上周五华为正式发布了今年的两款重磅旗舰Mate10和Mate10 Pro.与上代产品不同,此次Mate10系列的两款产品无论是在外观还是一 ...

  8. 新手入门,数控刀具上的代码怎么认?看完就懂了!

    新手入门,数控刀具上的代码怎么认?看完就懂了! 按照不同的刀具类型对刀具分组: 类别组1 xxyyy(铣刀类): 110 球面铣刀 (圆柱型铣刀,其后的字母y代表铣刀直径,以下略同) 120 立铣刀 ...

  9. 为什么会显示有人正在使用计算机,微信“对方正在输入”为什么有时出现?有时不出现?看完才懂了.....

    原标题:微信"对方正在输入"为什么有时出现?有时不出现?看完才懂了.. 生活中有很多美好的事情 手机电量满格 您的快递正在派送 换季衣服里翻出毛爷爷 与喜欢的人聊天显示" ...

最新文章

  1. java 读取文件,内容方置Person 并写到另外地址
  2. [NHibernate]基本配置与测试
  3. vmware安装mac终于成功
  4. 基于matlab异步电机 s函数,建立电机状态方程的S 函数和仿真模)基于MATLAB的无刷双馈电机建模与仿真...
  5. WebSocket 实现 Web 端即时通信
  6. 团队作业7——Beta版本冲刺计划及安排
  7. JAVA提取字符串信息中的数字(包括整数,小数)工具类
  8. 理解WebSocket心跳及重连机制以及加强版的reconnectingwebsocket.js实现
  9. 整理C知识点--函数(重点)
  10. 电子游戏理论基础初探
  11. NDTTS-II变压器综合试验系统
  12. 芯片手册不需要全部看,抓住框架信息即可
  13. python案例小游戏
  14. Android虚拟机参数
  15. 天梯赛题目练习——高速公路超速处罚(附带测试点)
  16. 用Altium Designer打开Protel文件的方法
  17. 《MySQL数据库用户模块需求分析》
  18. 2020年浙江机器人公司十大品牌排名
  19. BI在连锁零售业应用
  20. 中国大学MOOC第六周作业孔融分梨

热门文章

  1. 中石油训练赛 - 招待(思维)
  2. centos php imap,centos 5.5 64 php 添加imap 模块记录
  3. python两数求和_Python两数相加实现
  4. Docker教程-安装
  5. 线段树求区间最大值RMQ(单点更新)
  6. SQLite3的提交事务代码
  7. 交换机网络嗅探方法之欺骗交换机缓存
  8. Mongodb中$inc自增,自减的操作
  9. 阿里的离职率真的是互联网大厂里最低的吗?
  10. [七]RabbitMQ-客户端源码之AMQPImpl+Method