数据格式为:

http://python.cn/wei
http://python.cn/wei
http://java.cn/zhang
http://java.cn/zhang
package spark
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object GroupTeacher{def main(args: Array[String]): Unit = {val topN = 3val conf = new SparkConf().setAppName("Teacher").setMaster("local[4]")val sc = new SparkContext(conf)val lines: RDD[String] = sc.textFile("D:\\code\\ip\\teacher.log")val sbjectTeacherAndOne: RDD[((String, String), Int)] = lines.map(line => {val subject = line.split("/")(2).replace(".cn","")val teacher = line.split("/")(3)((subject, teacher), 1)})val reduced: RDD[((String, String), Int)] = sbjectTeacherAndOne.reduceByKey(_+_)//聚合,将学科和老师联合当做keyval grouped: RDD[(String, Iterable[((String, String), Int)])] = reduced.groupBy(_._1._1)//按学科进行分组//scala的集合排序是在内存中进行的,但是内存有可能不够用,可以替换使用RDD的sortedval sorted = grouped.mapValues(_.toList.sortBy(_._2).reverse.take(topN))//经过分组后,一个分区内可能有多个学科的数据,每台机器上都计算一个学科的数据所以可以调用scala的方法val r: Array[(String, List[((String, String), Int)])] = sorted.collect()println(r.toBuffer)sc.stop()}
}

优化:在每一台机器上进行排序

package spark
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object GroupTeacher2 {def main(args: Array[String]): Unit = {val topN = 3val subjects = Array("python", "java", "php")val conf = new SparkConf().setAppName("GroupFavTeacher2").setMaster("local[4]")val sc = new SparkContext(conf)sc.setCheckpointDir("D:\\code\\ip\\ck")val lines: RDD[String] = sc.textFile("D:\\code\\ip\\teacher.log")val sbjectTeacherAndOne: RDD[((String, String), Int)] = lines.map(line => {val subject = line.split("/")(2).replace(".cn","")val teacher = line.split("/")(3)((subject, teacher), 1)})val reduced: RDD[((String, String), Int)] = sbjectTeacherAndOne.reduceByKey(_+_)//val cached = reduced.cache()//cache到内存(标记为Cache的RDD以后被反复使用,才使用cache)reduced.checkpoint()//scala的集合排序是在内存中进行的,但是内存有可能不够用,可以调用RDD的sortby方法,内存+磁盘进行排序for (sb <- subjects) {val filtered: RDD[((String, String), Int)] = reduced.filter(_._1._1 == sb)//过滤一个学科的数据val favTeacher = filtered.sortBy(_._2, false).take(topN)//调用RDD的sortBy方法,(take是一个action,会触发任务提交)println(favTeacher.toBuffer)}sc.stop()}
}

优化:加上了分区器parpartitioner,避免数据倾斜

package cn.edu360.day3
import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}import scala.collection.mutable
object GroupTeacher3 {def main(args: Array[String]): Unit = {val topN = 3val conf = new SparkConf().setAppName("GroupTeacher").setMaster("local[4]")val sc = new SparkContext(conf)val lines: RDD[String] = sc.textFile("D:\\code\\ip\\teacher.log")val sbjectTeacherAndOne: RDD[((String, String), Int)] = lines.map(line => {val subject = line.split("/")(2).replace(".cn","")val teacher = line.split("/")(3)((subject, teacher), 1)})val reduced: RDD[((String, String), Int)] = sbjectTeacherAndOne.reduceByKey(_+_)val subjects: Array[String] = reduced.map(_._1._1).distinct().collect()val sbPatitioner = new SubjectParitioner(subjects);//自定义一个分区器,并且按照指定的分区器进行分区val partitioned: RDD[((String, String), Int)] = reduced.partitionBy(sbPatitioner)//partitionBy按照指定的分区规则进行分区val sorted: RDD[((String, String), Int)] = partitioned.mapPartitions(it => {it.toList.sortBy(_._2).reverse.take(topN).iterator//将迭代器转换成list,然后排序,在转换成迭代器返回})val r: Array[((String, String), Int)] = sorted.collect()println(r.toBuffer)sc.stop()}
}//自定义分区器
class SubjectParitioner(sbs: Array[String]) extends Partitioner {val rules = new mutable.HashMap[String, Int]()var i = 0for(sb <- sbs) {//rules(sb) = irules.put(sb, i)i += 1}override def numPartitions: Int = sbs.length//返回分区的数量(下一个RDD有多少分区)//根据传入的key计算分区标号//key是一个元组(String, String)override def getPartition(key: Any): Int = {val subject = key.asInstanceOf[(String, String)]._1//获取学科名称rules(subject)//根据规则计算分区编号}
}

Spark Group相关推荐

  1. 手机、桌面和浏览器应用程序开发的差异

    使用 Flex 可以针对以下部署环境开发应用程序: 浏览器 将应用程序部署为 SWF 文件,以便在运行于浏览器内的 Flash Player 中使用. 桌面 针对桌面计算机(例如 Windows 计算 ...

  2. Zeppelin解释器的REST API接口

    2019独角兽企业重金招聘Python工程师标准>>> Zeppelin解释器的REST API接口 概览 Apache Zeppelin 提供了多个REST APIs用于远程功能交 ...

  3. Cloudera Manager拓展SPARK2-2.3.0.cloudera3-1.cdh5.6.0.p0.1-el6.parcel

    一.准备工作 Centos6.5的系统 Cloudera Manager 版本5.6 Spark2.3依赖的jdk1.8 网上的资料提供的下载地址下载不了,只能基于原有的SPARK2-2.3.0.cl ...

  4. spark多个kafka source采用同一个group id导致的消费堆积延迟

    Kafka版本0.10.0 spark版本 2.1 Spark streaming在同一个application中多个kafka source当使用一个group id的时候订阅不同topic会存在消 ...

  5. 解决数据倾斜一:RDD执行reduceByKey或则Spark SQL中使用group by语句导致的数据倾斜

    一:概述 有的时候,我们可能会遇到大数据计算中一个最棘手的问题--数据倾斜,此时Spark作业的性能会比期望差很多.数据倾斜调优,就是使用各种技术方案解决不同类型的数据倾斜问题,以保证Spark作业的 ...

  6. 关于hive on spark的distribute by和group by使用以及小文件合并问题

    欢迎关注交流微信公众号:小满锅 问题导言 最近在使用hive时,发现一些任务的因为使用mapreduce的缘故,跑的太慢了,才几十个G的数据就经常跑一个多小时,于是有了切换spark的想法. 但是刚刚 ...

  7. 广告推荐算法(group auc)评价指标及Spark实现代码

    我们曾经有这样的疑惑,那就是训练样本,AUC得到提升.当将新模型放到线上后,却发现实际效果却没有老模型好,这时候很多人就开始疑惑了. ​ 在机器学习算法中,很多情况我们都是把auc当成最常用的一个评价 ...

  8. spark踩坑记录 (一) group by

    mysql 与 spark sql 语法大致相通.但是会有一些坑存在.比如 group by · 需求场景: 在视屏表中,取每个作者最新的一条数据. 即, 筛选出 所有  user_id对应的  最新 ...

  9. Spark:group by和聚合函数使用

    groupBy分组和使用agg聚合函数demo: df.show +----+-----+---+ |YEAR|MONTH|NUM| +----+-----+---+ |2017| 1| 10| |2 ...

最新文章

  1. 一个测试员的工作与学习
  2. 堪称经典!这部由苏联最杰出数学家编写的数学教材,为何能大受推崇?
  3. 大量删除的表、查询卡顿的表,重建索引
  4. 【原】让两个DIV高度一样的Javascript函数
  5. example 排序_个性化推荐系统源代码之基于 WideDeep模型的在线排序
  6. VB 按指定编码格式写入文本文件
  7. 电脑声音太小如何增强_如何录制电脑上播放的声音,背景音乐
  8. R语言ETL工程:集合运算(intersect/union/setdiff)
  9. 编码神器 Sublime Text 包管理工具及扩展大全
  10. 学习git reset 、 git checkout、git revert
  11. axure数据报表元件库_axure图表元件库 axure教程:如何制作axure组件库
  12. Ubuntu在物理机系统安装和teamviewer安装过程问题和解决的记录
  13. 二项式定理与二项分布、多项式定理与多项分布
  14. 第一个C语言项目——图书管理系统
  15. 《数字图像处理》(武汉大学)笔记第四章
  16. h5 HTML5 浏览器 录制视频
  17. 设计模式的原则和常用的模式
  18. win10麦克风说话没声音_电脑录屏,真的没那么麻烦
  19. chapter8——消抖技术
  20. react高德地图定位--显示城市名字

热门文章

  1. python开发人员看什么书_python初学者看什么书
  2. Python检查字符串重叠部分并进行拼接
  3. Python爬虫扩展库scrapy选择器用法入门(一)
  4. Python使用tkinter的Treeview组件实现表格功能
  5. java迷宫_java实现迷宫算法--转
  6. python对csv数据提取某列的某些行_python pandas获取csv指定行 列的操作方法
  7. 时间戳服务器显示invalid,signtool签名时间戳失败的解决方法
  8. 可逆矩阵的特征值和原来矩阵_线性代数——相似矩阵的可逆变换矩阵P是否唯一...
  9. 策略设计模式_设计模式之策略者模式
  10. oracle11g 端口,navicate 连接 oracle11g精简版监听不到端口和用户密码错误问题