15、Spark_RDD算子——AggregateByKey
一、SparkUtils工具类
import org.apache.spark.{SparkConf, SparkContext}object SparkUtils {/*** 默认的master url路径*/val DEFAULT_MASTER = "local[*]"/*** 默认master为local[*]的获取sparkContext*/def getSparkContext(appName:String):SparkContext = getSparkContext(appName, DEFAULT_MASTER)def getSparkContext(appName:String, master:String):SparkContext = new SparkContext(new SparkConf().setAppName(appName).setMaster(master))/*** 释放sparkContext*/def close(sc:SparkContext) = if(sc != null) sc.stop()
}
二、日志工具
import org.apache.log4j.{Level, Logger}trait LoggerTrait {Logger.getLogger("org.apache.spark").setLevel(Level.WARN)Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)Logger.getLogger("org.spark_project").setLevel(Level.WARN)}
三、Spark算子AggregateByKey
import cn.qphone.spark.common.LoggerTrait.LoggerTrait
import cn.qphone.spark.common.Utils.SparkUtils
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDDimport scala.collection.mutable.ArrayBufferobject Deom15_AggregateByKey extends LoggerTrait {def main(args: Array[String]): Unit = {//1.sparkcontext获取val sc = SparkUtils.getSparkContext("Deom15_AggregateByKey ")//2.数据abk2rbk(sc)gbk2rbk(sc)//6.释放资源SparkUtils.close(sc)}def abk2rbk(sc: SparkContext): Unit = {val list: List[String] = List("i am a big big boy","you are a abag girl")val listRDD: RDD[String] = sc.parallelize(list, 3)val mapRDD: RDD[(String, Int)] = listRDD.flatMap(_.split("\\s+")).map((_, 1))val cntRDD: RDD[(String, Int)] = mapRDD.aggregateByKey(0)(seqOp1, combOp1)cntRDD.foreach(println)}def seqOp1(sum: Int, num: Int): Int = sum + numdef combOp1(sum1: Int, sum2: Int): Int = sum1 + sum2def gbk2rbk(sc: SparkContext): Unit = {val stuList = List("令狐冲 华山派","岳不群 华山派","虚竹 逍遥派","乔峰 丐帮","黄蓉 桃花岛","杨过 古墓派","小龙女 古墓派","郭靖 丐帮")val stuRDD: RDD[String] = sc.parallelize(stuList, 3)// val stusRDD: RDD[(String, String)] = stuRDD.map(line => {// val index = line.lastIndexOf(" ")// val classname = line.substring(index + 1)// val info = line.substring(0, index)// (classname, info)// })val stusRDD: RDD[(String, String)] = stuRDD.mapPartitionsWithIndex {case (partitionId, iterator) => {val array = iterator.toArrayprintln(s"${partitionId},${array.mkString("[", ",", "]")}")array.map(line => {val index = line.lastIndexOf(" ")val classname = line.substring(index + 1)val info = line.substring(0, index)(classname, info)}).iterator}}//3.combineByKeystusRDD.aggregateByKey(ArrayBuffer[String]())(seqOp2,combOp2).foreach(println)}def seqOp2(ab: ArrayBuffer[String],str:String): ArrayBuffer[String] = {ab.append(str)ab}def combOp2(ab1: ArrayBuffer[String], ab2: ArrayBuffer[String]): ArrayBuffer[String] = ab1.++(ab2)
}
15、Spark_RDD算子——AggregateByKey相关推荐
- 14、Spark_RDD算子——CombineByKey_ReduceByKey转换
一.SparkUtils工具类 import org.apache.spark.{SparkConf, SparkContext}object SparkUtils {/*** 默认的master u ...
- spark算子_Spark常用算子
Spark的算子分类: 从大方向说,Spark算子大致可以分为以下两类: (1)Transformation变换/转换算子:这种变换并不触发提交作业,这种算子是延迟执行的,也就是说从一个RDD转换生成 ...
- spark算子_十、Spark之详解Action类算子
常用Action类算子列表 reduce(func): 通过func函数来对RDD中所有元素进行聚合运算,先运算分区内数据,再运算分区间数据. scala> val rdd1 = sc.make ...
- 【大数据开发】SparkCore——进阶算子、Action算子、查看分区数的三种方式
源代码中的大写V,指的是value rdd.getNumberPartitions获取分区数量 Transformation算⼦全都是RDD[U,T]类型的 Action算子的返回值一般情况下不会是R ...
- Spark_Transformation转换算子
RDD整体上分为Value类型.双Value类型和Key-Value类型 一.Value类型 1.map()映射 1)函数签名: def map[U: ClassTag](f: T => U): ...
- Spark的RDD行动算子
目录 基本概念 算子介绍 1. reduce 2. collect 3. count 4. first 5. take 6. takeOrdered 案例实操1-6 7. aggregate 8. f ...
- Spark算子总结版
Spark的算子的分类 从大方向来说,Spark 算子大致可以分为以下两类: 1)Transformation 变换/转换算子:这种变换并不触发提交作业,完成作业中间过程处理. Transformat ...
- CANN AICPU算子耗时分析及优化探索
摘要:本文以GreaterEqual作为测试算子,该算子计算逻辑较为简单(output = input1 >= input2),旨在尽可能降低计算耗时,使得算子耗时尽可能以数据操作和算子调度作为 ...
- 第三课 大数据技术之Spark-RDD介绍和转换算子
第三课 大数据技术之Spark-RDD介绍和转换算子 文章目录 第三课 大数据技术之Spark-RDD介绍和转换算子 第一节 RDD相关介绍 1.1 什么是 RDD 1.2 核心属性 1.3 执行原理 ...
最新文章
- 一些关于找工作的书籍
- 使用CNN分类签名和文本图像
- win安装wordcloud报错解决方案
- Linux下的字符集问题
- 【 Grey Hack 】万金油脚本:在路由器上获取shell
- 系统架构设计之-任务调度系统的设计
- androidstudio 日历视图怎么显示农历_ipad自带的日历程序
- 随想录(docker应用)
- 流程图动画效果html,jQuery创意线条步骤流程图动画特效
- 一句话简单区分 SQL 连接
- python如何压缩pdf_PDF文件怎么压缩,一键压缩PDF文件
- C# 如何合并和拆分PDF文件
- keil5编译器出现Undefined symbol time (referred from xxx.o).
- OSPF协议邻居(Neighbor)与邻接(Adjacency)关系
- 震撼您心灵的四川雪山
- QTDialog去掉右上角问号保留X
- 豆豆趣事[2016年06月]
- HDU5761 Rower Bo
- PyTorch模型训练的几个加速技巧
- ZUCC_Linux系统管理_实验五 计划任务
热门文章
- SonarQube在Windows环境下下载安装,中文包下载安装,mysql配置,maven配置,idea配置,项目配置
- 提高企业WiFi速度的快速简便的方法—Vecloud微云
- 整合flink-cdc实现实时读postgrasql
- 计算机专业相关的职业技术证书有哪些,你知道吗?
- 全球与中国葡萄酒保鲜工具市场现状及未来发展趋势
- UTL_HTTP read_text和read_raw应用
- 读心或成现实,OpenBCI要将脑波传感技术用于VR中
- c语言编程学多久,丰城c语言编程学习,丰城学c语言编程的学校,丰城学c语言编程一般要多久才能学会...
- PHP常见三种设计模式:单例、工厂、观察者
- 转大数据开发,适合什么岗位?