一、SparkUtils工具类

import org.apache.spark.{SparkConf, SparkContext}object SparkUtils {/***  默认的master url路径*/val DEFAULT_MASTER = "local[*]"/*** 默认master为local[*]的获取sparkContext*/def getSparkContext(appName:String):SparkContext = getSparkContext(appName, DEFAULT_MASTER)def getSparkContext(appName:String, master:String):SparkContext = new SparkContext(new SparkConf().setAppName(appName).setMaster(master))/*** 释放sparkContext*/def close(sc:SparkContext) = if(sc != null) sc.stop()
}

二、日志工具

import org.apache.log4j.{Level, Logger}trait LoggerTrait {Logger.getLogger("org.apache.spark").setLevel(Level.WARN)Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)Logger.getLogger("org.spark_project").setLevel(Level.WARN)}

三、Spark算子AggregateByKey

import cn.qphone.spark.common.LoggerTrait.LoggerTrait
import cn.qphone.spark.common.Utils.SparkUtils
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDDimport scala.collection.mutable.ArrayBufferobject Deom15_AggregateByKey extends LoggerTrait {def main(args: Array[String]): Unit = {//1.sparkcontext获取val sc = SparkUtils.getSparkContext("Deom15_AggregateByKey ")//2.数据abk2rbk(sc)gbk2rbk(sc)//6.释放资源SparkUtils.close(sc)}def abk2rbk(sc: SparkContext): Unit = {val list: List[String] = List("i am a big big boy","you are a abag girl")val listRDD: RDD[String] = sc.parallelize(list, 3)val mapRDD: RDD[(String, Int)] = listRDD.flatMap(_.split("\\s+")).map((_, 1))val cntRDD: RDD[(String, Int)] = mapRDD.aggregateByKey(0)(seqOp1, combOp1)cntRDD.foreach(println)}def seqOp1(sum: Int, num: Int): Int = sum + numdef combOp1(sum1: Int, sum2: Int): Int = sum1 + sum2def gbk2rbk(sc: SparkContext): Unit = {val stuList = List("令狐冲 华山派","岳不群 华山派","虚竹 逍遥派","乔峰 丐帮","黄蓉 桃花岛","杨过 古墓派","小龙女 古墓派","郭靖 丐帮")val stuRDD: RDD[String] = sc.parallelize(stuList, 3)//    val stusRDD: RDD[(String, String)] = stuRDD.map(line => {//      val index = line.lastIndexOf(" ")//      val classname = line.substring(index + 1)//      val info = line.substring(0, index)//      (classname, info)//    })val stusRDD: RDD[(String, String)] = stuRDD.mapPartitionsWithIndex {case (partitionId, iterator) => {val array = iterator.toArrayprintln(s"${partitionId},${array.mkString("[", ",", "]")}")array.map(line => {val index = line.lastIndexOf(" ")val classname = line.substring(index + 1)val info = line.substring(0, index)(classname, info)}).iterator}}//3.combineByKeystusRDD.aggregateByKey(ArrayBuffer[String]())(seqOp2,combOp2).foreach(println)}def seqOp2(ab: ArrayBuffer[String],str:String): ArrayBuffer[String] = {ab.append(str)ab}def combOp2(ab1: ArrayBuffer[String], ab2: ArrayBuffer[String]): ArrayBuffer[String] = ab1.++(ab2)
}

15、Spark_RDD算子——AggregateByKey相关推荐

  1. 14、Spark_RDD算子——CombineByKey_ReduceByKey转换

    一.SparkUtils工具类 import org.apache.spark.{SparkConf, SparkContext}object SparkUtils {/*** 默认的master u ...

  2. spark算子_Spark常用算子

    Spark的算子分类: 从大方向说,Spark算子大致可以分为以下两类: (1)Transformation变换/转换算子:这种变换并不触发提交作业,这种算子是延迟执行的,也就是说从一个RDD转换生成 ...

  3. spark算子_十、Spark之详解Action类算子

    常用Action类算子列表 reduce(func): 通过func函数来对RDD中所有元素进行聚合运算,先运算分区内数据,再运算分区间数据. scala> val rdd1 = sc.make ...

  4. 【大数据开发】SparkCore——进阶算子、Action算子、查看分区数的三种方式

    源代码中的大写V,指的是value rdd.getNumberPartitions获取分区数量 Transformation算⼦全都是RDD[U,T]类型的 Action算子的返回值一般情况下不会是R ...

  5. Spark_Transformation转换算子

    RDD整体上分为Value类型.双Value类型和Key-Value类型 一.Value类型 1.map()映射 1)函数签名: def map[U: ClassTag](f: T => U): ...

  6. Spark的RDD行动算子

    目录 基本概念 算子介绍 1. reduce 2. collect 3. count 4. first 5. take 6. takeOrdered 案例实操1-6 7. aggregate 8. f ...

  7. Spark算子总结版

    Spark的算子的分类 从大方向来说,Spark 算子大致可以分为以下两类: 1)Transformation 变换/转换算子:这种变换并不触发提交作业,完成作业中间过程处理. Transformat ...

  8. CANN AICPU算子耗时分析及优化探索

    摘要:本文以GreaterEqual作为测试算子,该算子计算逻辑较为简单(output = input1 >= input2),旨在尽可能降低计算耗时,使得算子耗时尽可能以数据操作和算子调度作为 ...

  9. 第三课 大数据技术之Spark-RDD介绍和转换算子

    第三课 大数据技术之Spark-RDD介绍和转换算子 文章目录 第三课 大数据技术之Spark-RDD介绍和转换算子 第一节 RDD相关介绍 1.1 什么是 RDD 1.2 核心属性 1.3 执行原理 ...

最新文章

  1. 一些关于找工作的书籍
  2. 使用CNN分类签名和文本图像
  3. win安装wordcloud报错解决方案
  4. Linux下的字符集问题
  5. 【 Grey Hack 】万金油脚本:在路由器上获取shell
  6. 系统架构设计之-任务调度系统的设计
  7. androidstudio 日历视图怎么显示农历_ipad自带的日历程序
  8. 随想录(docker应用)
  9. 流程图动画效果html,jQuery创意线条步骤流程图动画特效
  10. 一句话简单区分 SQL 连接
  11. python如何压缩pdf_PDF文件怎么压缩,一键压缩PDF文件
  12. C# 如何合并和拆分PDF文件
  13. keil5编译器出现Undefined symbol time (referred from xxx.o).
  14. OSPF协议邻居(Neighbor)与邻接(Adjacency)关系
  15. 震撼您心灵的四川雪山
  16. QTDialog去掉右上角问号保留X
  17. 豆豆趣事[2016年06月]
  18. HDU5761 Rower Bo
  19. PyTorch模型训练的几个加速技巧
  20. ZUCC_Linux系统管理_实验五 计划任务

热门文章

  1. SonarQube在Windows环境下下载安装,中文包下载安装,mysql配置,maven配置,idea配置,项目配置
  2. 提高企业WiFi速度的快速简便的方法—Vecloud微云
  3. 整合flink-cdc实现实时读postgrasql
  4. 计算机专业相关的职业技术证书有哪些,你知道吗?
  5. 全球与中国葡萄酒保鲜工具市场现状及未来发展趋势
  6. UTL_HTTP read_text和read_raw应用
  7. 读心或成现实,OpenBCI要将脑波传感技术用于VR中
  8. c语言编程学多久,丰城c语言编程学习,丰城学c语言编程的学校,丰城学c语言编程一般要多久才能学会...
  9. PHP常见三种设计模式:单例、工厂、观察者
  10. 转大数据开发,适合什么岗位?