groupBy

返回集合类为元组,元组的第一个元素为分组元素,第二个元素为Iterable类型,这种类型可以转List什么的

def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]

例子
一维数组

scala> val a = sc.parallelize(1 to 9, 3)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
//依据是否能被2整除进行分组,分组字段可以自定义
scala> a.groupBy(x=>{if(x%2==0) "even" else "odd"}).collect
res0: Array[(String, Iterable[Int])] = Array((even,CompactBuffer(2, 4, 6, 8)), (odd,CompactBuffer(1, 3, 5, 7, 9)))
//分组后的数据不能用map处理
scala> a.groupBy(x=>{if(x%2==0) "even" else "odd"}).map(_.toList).collect
<console>:27: error: value toList is not a member of (String, Iterable[Int])a.groupBy(x=>{if(x%2==0) "even" else "odd"}).map(_.toList).collect^
//分组后的数据可以用mapValues进一步处理
scala> a.groupBy(x=>{if(x%2==0) "even" else "odd"}).mapValues(_.toList).collect
res2: Array[(String, List[Int])] = Array((even,List(2, 4, 6, 8)), (odd,List(1, 3, 5, 7, 9)))scala> a.groupBy(x=>{x%2==0}).mapValues(_.toList).collect
res3: Array[(Boolean, List[Int])] = Array((false,List(1, 3, 5, 7, 9)), (true,List(2, 4, 6, 8)))
scala> val rdd1=sc.parallelize(List((1,"a"),(2,"b"),(1,"c"),(2,"d")),2)//依据元组的第一个元素进行分组,注意分组后value是个元组.(分组元素,迭代器) 如果要对该迭代器进一步处理,要用mapValues,如果用map,处理的就是整个元组,而不只是元组的第二个元素了
scala> rdd1.groupBy(_._1).collect
res55: Array[(Int, Iterable[(Int, String)])] = Array((2,CompactBuffer((2,b), (2,d))), (1,CompactBuffer((1,a), (1,c))))//处理groupBy后生成的迭代器.转为List
scala> rdd1.groupBy(_._1).mapValues(_.toList).collect
res56: Array[(Int, List[(Int, String)])] = Array((2,List((2,b), (2,d))), (1,List((1,a), (1,c))))scala> rdd1.groupBy(_._1).mapValues(_.toArray).collect
res57: Array[(Int, Array[(Int, String)])] = Array((2,Array((2,b), (2,d))), (1,Array((1,a), (1,c))))scala> rdd1.groupBy(_._1).mapValues(_.toSet).collect
res58: Array[(Int, scala.collection.immutable.Set[(Int, String)])] = Array((2,Set((2,b), (2,d))), (1,Set((1,a), (1,c))))

xxByKey

  • groupByKey是groupBy的特殊形式,就是key不用指定,因为kv对,k是已知的,所以参数不用传输k
  • reduceByKey 分组后还可以聚合,聚合函数可以自定义
  • aggregateByKey分组后还可以聚合,聚合函数可以自定义,关键还可以有初始值
scala> val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2)//依据list构建kv对
scala> val b = a.keyBy(_.length)
b: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[10] at keyBy at <console>:26scala> b.collect
res4: Array[(Int, String)] = Array((3,dog), (5,tiger), (4,lion), (3,cat), (6,spider), (5,eagle))
//依据kv对中k进行分组,形成新的kv对
scala> b.groupByKey.collect
res5: Array[(Int, Iterable[String])] = Array((4,CompactBuffer(lion)), (6,CompactBuffer(spider)), (3,CompactBuffer(dog, cat)), (5,CompactBuffer(tiger, eagle)))
//转化新的kv对中的v为List
scala> b.groupByKey.mapValues(_.toList).collect
res6: Array[(Int, List[String])] = Array((4,List(lion)), (6,List(spider)), (3,List(dog, cat)), (5,List(tiger, eagle)))
//转化新的kv对中的v为Array
scala> b.groupByKey.mapValues(_.toArray).collect
res7: Array[(Int, Array[String])] = Array((4,Array(lion)), (6,Array(spider)), (3,Array(dog, cat)), (5,Array(tiger, eagle)))//reduceByKey 分组后还可以聚合,聚合函数可以自定义
scala> b.reduceByKey(_+_).collect
res8: Array[(Int, String)] = Array((4,lion), (6,spider), (3,dogcat), (5,tigereagle))
//reduceByKey 分组后还可以聚合,聚合函数可以自定义
scala> b.reduceByKey(_+" "+_).collect
res9: Array[(Int, String)] = Array((4,lion), (6,spider), (3,dog cat), (5,tiger eagle))scala> b.aggregate
aggregate   aggregateByKey
//aggregateByKey分组后还可以聚合,聚合函数可以自定义,关键还可以有初始值
scala> b.aggregateByKey("|")(_+_).collect
<console>:29: error: not enough arguments for method aggregateByKey: (seqOp: (String, String) => String, combOp: (String, String) => String)(implicit evidence$3: scala.reflect.ClassTag[String])org.apache.spark.rdd.RDD[(Int, String)].
Unspecified value parameter combOp.b.aggregateByKey("|")(_+_).collect^
// 2个参数列表 不是3个
scala> b.aggregateByKey("|")(_+_)(_+_).collect
<console>:29: error: not enough arguments for method aggregateByKey: (seqOp: (String, String) => String, combOp: (String, String) => String)(implicit evidence$3: scala.reflect.ClassTag[String])org.apache.spark.rdd.RDD[(Int, String)].
Unspecified value parameter combOp.b.aggregateByKey("|")(_+_)(_+_).collect^
//2个参数列表,初始值,分区内聚合函数,分区间聚合函数
scala> b.aggregateByKey("|")(_+_,_+_).collect
res12: Array[(Int, String)] = Array((4,|lion), (6,|spider), (3,|dog|cat), (5,|tiger|eagle))

总结

  • groupByKey是groupBy的特殊形式,groupBy底层调用了groupByKey
  • 如果是kv对.选groupByKey, 也可以用groupBy ,可以用方法取元组的值
  • 如果是一维,选择groupBy.

rdd分组聚合算子xxByKey,xxBy相关推荐

  1. spark笔记之RDD常用的算子操作

    Spark Rdd的所有算子操作,请见<sparkRDD函数详解.docx> 启动spark-shell 进行测试: spark-shell --master spark://node1: ...

  2. spark之RDD的转换算子与行为算子的具体使用

    文章目录 1.Transform算子 1.1 map 1.2 flatmap 1.3 groupBy和groupBykey 1.4 filter 1.5 Mappartitions 1.6 mapVa ...

  3. mysql分组查询和子查询语句_6.MySQL分组聚合查询,子查询

    自己的MySQL阅读笔记,持续更新,直到看书结束. 数据库技术可以有效帮助一个组织或者企业科学.有效的管理数据,也是现在很多企业招聘数据分析师的必备要求之一. 大家如果看过MySQL的书,也可以看我的 ...

  4. 【原创】StreamInsight查询系列(六)——基本查询操作之分组聚合

    上篇博文介绍了StreamInsight基础查询操作中的用户自定义聚合部分.这篇文章将主要介绍如何在StreamInsight查询中使用分组聚合. 测试数据准备 为了方便测试查询,我们首先准备一个静态 ...

  5. pandas使用groupby函数进行分组聚合、使用agg函数指定聚合统计计算的数值变量、并自定义统计计算结果的名称(naming columns after aggregation)

    pandas使用groupby函数进行分组聚合.使用agg函数指定聚合统计计算的数值变量.并自定义统计计算结果的名称(naming columns after aggregation in dataf ...

  6. pandas使用groupby函数进行分组聚合并使用agg函数将每个分组特定变量对应的多个内容组合到一起输出(merging content within a specific column of g

    pandas使用groupby函数进行分组聚合并使用agg函数将每个分组特定变量对应的多个内容组合到一起输出(merging content within a specific column of g ...

  7. pandas使用groupby函数对dataframe进行分组统计、使用as_index参数设置分组聚合的结果中分组变量不是dataframe的索引(index)

    pandas使用groupby函数对dataframe进行分组统计.使用as_index参数设置分组聚合的结果中分组变量不是dataframe的索引(index) 目录

  8. pandas使用groupby函数按照多个分组变量进行分组聚合统计、使用agg函数计算分组的多个统计指标(grouping by multiple columns in dataframe)

    pandas使用groupby函数按照多个分组变量进行分组聚合统计.使用agg函数计算分组的多个统计指标(grouping by multiple columns in dataframe) 目录

  9. pandas使用groupby函数、agg函数获取每个分组聚合对应的标准差(std)实战:计算分组聚合单数据列的标准差(std)、计算分组聚合多数据列的标准差(std)

    pandas使用groupby函数.agg函数获取每个分组聚合对应的标准差(std)实战:计算分组聚合单数据列的标准差(std).计算分组聚合多数据列的标准差(std) 目录

最新文章

  1. 16 分频 32 分频是啥意思_Verilog中任意分频的实现
  2. C# WinForm 通过URL取得服务器上的某图片文件到本地
  3. 1048 采药 1049 装箱问题
  4. 数据仓库与数据集市建模
  5. Linux之cut:简化版的awk
  6. 集成产品开发(IPD)初探
  7. 第一个冲刺周期第二天补发
  8. VTK(二)---相机外参矩阵Tcw转VTK相机,构建VR虚拟相机
  9. 2008下mysql补丁_windows Server 2008 R2安装Mysql 8的打补丁顺序
  10. mysql如何查看使用的配置_mysql 配置文件如何查询?
  11. python--(点餐--元组)enumerate将索引与值一一对应、 模拟手机通信录、 模拟手机通信录--使用集合
  12. mysql8+maven+mybatis
  13. Error starting Tomcat context. Exception
  14. 英特尔服务器级cpu型号含义,新手必看 英特尔移动CPU命名规则解析
  15. The first interview for xiecheng
  16. 基于pygame 图片多边形坐标获取
  17. Material UI 带复选框表格获取选中值(索引)
  18. 强化学习实践六 :给Agent添加记忆功能
  19. 算法笔记-CDQ分治
  20. 【历史上的今天】6 月 18 日:京东诞生;网店平台 Etsy 成立;Facebook 发布 Libra 白皮书

热门文章

  1. maya刀剑神域 建模_王者玩家最想联动的动漫——刀剑与铠甲勇士,如果实现会联动谁?...
  2. 新手如何使用docker搭建web服务环境
  3. int 最大值_十行代码说清楚:leetcode 队列的最大值
  4. linux netstat java,Linux netstat介绍
  5. mysql.net连接器_关于mysql-connector-net在C#中的用法
  6. Error creating bean with name 解决办法
  7. 3. mysql的注解驱动的三种方式_上手spring boot项目(三)之spring boot整合mybatis进行增删改查的三种方式。...
  8. python numpy数组动态写入csv文件_python - 将NumPy数组转储到csv fi中
  9. javaweb开发后端常用技术_Java Web开发后端常用技术汇总
  10. layui table 复选框数据_Python操作三大数据库 Mysql