文章目录

  • 一、自定义排序四种方式、实现序列化
  • 二、案例:自定义分区器

一、自定义排序四种方式、实现序列化

前面两种是样例类实现、普通类实现

第三种方式可以不实现序列化接口

用的最多的还是第四种方式,第四种方式不需要封装类,直接使用元组即可
但是第四种方式有一定局限,如果不是Int类型则不能使用负号进行排序

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.junit.Test/*** 自定义排序器*/
class _01_SortExamples {private val conf: SparkConf = new SparkConf().setMaster("local").setAppName("sort").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")  // Spark2.x替换默认的序列化机制为Kryo,效率高.registerKryoClasses(Array(classOf[Student2], classOf[Student1]))       // 指定需要进行序列化的类private val sc: SparkContext = new SparkContext(conf)private val rdd: RDD[String] = sc.parallelize(Array("xiaoming 12 90", "xiaobai 13 89", "xiaohui 12 91", "xiaolv 12 100"))// 需求:// 对数据进行排序, 按照年龄升序排序, 如果年龄相同, 按照成绩降序排序/*** 自定义排序1:* 按照数据,设计一个样例类,实现Ordered特质,实现排序比较* 样例类默认已经实现了序列化了,此时不需要再额外的设置序列化*/@Test def sort1(): Unit = {val res: RDD[Student1] = rdd.map(line => {val infos: Array[String] = line.split(" ")Student1(infos(0), infos(1).toInt, infos(2).toInt)}).sortBy(x => x)res.foreach(println)}/*** 自定义排序2:* 按照数据,设计一个类,实现Ordered特质,实现排序比较* 这种方式,需要类实现Serializable特质* 最高再替换Spark的序列化机制,参见SparkConf中的设置*/@Test def sort2(): Unit = {val res: RDD[Student2] = rdd.map(line => {val infos: Array[String] = line.split(" ")new Student2(infos(0), infos(1).toInt, infos(2).toInt)}).sortBy(x => x)res.foreach(println)}/*** 自定义排序3* 将元素映射成元组,在sortBy的时候,转成对象进行比较*/@Test def sort3(): Unit = {val res: RDD[(String, Int, Int)] = rdd.map(line => {val infos: Array[String] = line.split(" ")(infos(0), infos(1).toInt, infos(2).toInt)}).sortBy(t => new Student3(t._1, t._2, t._3))res.foreach(println)}/*** 自定义排序4* 不需要借助类,直接使用元组即可* 将排序的若干依据直接写出即可*/@Test def sort4(): Unit = {val res: RDD[(String, Int, Int)] = rdd.map(line => {val infos: Array[String] = line.split(" ")(infos(0), infos(1).toInt, infos(2).toInt)}).sortBy(t => (t._2, -t._3))res.foreach(println)}}case class Student1(name: String, age: Int, score: Int) extends Ordered[Student1] {override def compare(that: Student1): Int = {if (this.age == that.age) {that.score - this.score} else {this.age - that.age}}
}class Student2(var name: String, var age: Int, var score: Int) extends Ordered[Student2] with Serializable {override def compare(that: Student2): Int = {if (this.age == that.age) {that.score - this.score} else {this.age - that.age}}override def toString: String = s"name = $name, age = $age, score = $score"
}class Student3(var name: String, var age: Int, var score: Int) extends Ordered[Student3] {override def compare(that: Student3): Int = {if (this.age == that.age) {that.score - this.score} else {this.age - that.age}}override def toString: String = s"name = $name, age = $age, score = $score"
}

二、案例:自定义分区器

数据中有不同的学科,将输出的⼀个学科⽣成⼀个⽂件

如果不定义自定义分区器,则会产生数据倾斜(可能有的文件没有数据,有的文件含有不同key的数据)

access.txt

20161123101523   http://java.learn.com/java/javaee.shtml
20161123101523  http://java.learn.com/java/javaee.shtml
20161123101523  http://ui.learn.com/ui/video.shtml
20161123101523  http://bigdata.learn.com/bigdata/teacher.shtml
20161123101523  http://android.learn.com/android/video.shtml
20161123101523  http://h5.learn.com/h5/teacher.shtml
20161123101523  http://h5.learn.com/h5/course.shtml
20161123101523  http://bigdata.learn.com/bigdata/teacher.shtml
20161123101523  http://java.learn.com/java/video.shtml
20161123101523  http://bigdata.learn.com/bigdata/teacher.shtml
20161123101523  http://ui.learn.com/ui/course.shtml
20161123101523  http://bigdata.learn.com/bigdata/teacher.shtml
20161123101523  http://h5.learn.com/h5/course.shtml
20161123101523  http://java.learn.com/java/video.shtml
20161123101523  http://ui.learn.com/ui/video.shtml
20161123101523  http://h5.learn.com/h5/course.shtml
20161123101523  http://h5.learn.com/h5/teacher.shtml
20161123101523  http://bigdata.learn.com/bigdata/teacher.shtml
20161123101523  http://bigdata.learn.com/bigdata/video.shtml
import java.net.URLimport org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}import scala.collection.mutable/*** 自定义分区器*/
object PartitionerTest {// 案例: 读取数据访问记录的文件 access.txt,统计每一个学科有多少访问量def main(args: Array[String]): Unit = {val sc: SparkContext = new SparkContext(new SparkConf().setMaster("local").setAppName("partitioner"))// 读取文件,构建RDD// val rdd: RDD[String] = sc.textFile("C:\\Users\\luds\\Desktop\\SZ2002\\Day10-SparkCore\\数据\\access.txt", 5)val rdd: RDD[String] = sc.textFile("C:\\Users\\luds\\Desktop\\SZ2002\\Day10-SparkCore\\数据\\access.txt")// 转换,将每一行的数据,转换成(学科,1)形式的元组val res: RDD[(String, Int)] = rdd.map(line => {val url: URL = new URL(line.split("\t")(1))(url.getHost, 1)}).reduceByKey(_ + _)// 获取所有的学科val keys: Array[String] = res.keys.collect()res.partitionBy(new SubjectPartitioner(keys)).saveAsTextFile("C:\\Users\\luds\\Desktop\\out")}
}/*** 自定义的分区器,需要继承自Partitioner,重写其中的抽象方法* numPartitions: 返回的是一共有多少个分区* getPartition: 返回的是每一个数据需要在哪一个分区中*/
class SubjectPartitioner extends Partitioner {// 定义一个Map,用来记录每一个学科对应的分区号private val _partitionMap: mutable.Map[String, Int] = new mutable.HashMap[String, Int] {}// 传入所有的学科,为每一个学科分配分区def this(subjects: Array[String]) {this()// 遍历数组,将每一个学科存入到Map中,并分配一个分区IDvar index: Int = 0      // 作为分区的IDfor (elem <- subjects) {_partitionMap.put(elem, index)index += 1}}override def numPartitions: Int = _partitionMap.sizeoverride def getPartition(key: Any): Int = _partitionMap.getOrElse(key.toString, 0)
}

除了上面的方式外,我们观察到reduceByKey()方法中同样可以传入一个partitioner,使用该方法同样可以实现分区

总结:
1.分区主要⾯对KV结构数据,Spark内部提供了两个⽐较重要的分区器,Hash分区器和Range分区器
2.hash分区主要通过key的hashcode来对分区数求余,hash分区可能会导致数据倾斜问题,Range分区是通过⽔塘抽样的算法来将数据均匀的分配到各个分区中
3.⾃定义分区主要通过继承partitioner抽象类来实现,必须要实现两个⽅法:
numPartitions 和 getPartition(key: Any)

【大数据开发】SparkCore——自定义排序、实现序列化、自定义分区器相关推荐

  1. 大数据开发笔记(四):Hive分区详解

      ✨大数据开发笔记推荐: 大数据开发面试知识点总结_GoAI的博客-CSDN博客_大数据开发面试​本文详细介绍大数据hadoop生态圈各部分知识,包括不限于hdfs.yarn.mapreduce.h ...

  2. 大数据开发实战:Hive表DDL和DML

    1.Hive 表 DDL 1.1.创建表 Hive中创建表的完整语法如下: CREATE [EXTERNAL] TABLE [IF NOT EXISTS]  table_name [ (col_nam ...

  3. 大数据开发学习:进行大数据开发课程有哪些

    想要成为工资高.待遇好的大数据工程师,是需要专业的技能的.对于专业大数据技术的学习,需要学习的大数据开发课程有哪些呢? 大数据开发课程: 一.Hadoop 可以说,hadoop几乎已经是大数据代名词. ...

  4. 大数据开发实战教程目录

    大数据开发实战教程目录 一. 课程性质.目的和任务 本课程目的是让学生了解并掌握四个领域 (1)大数据系统的起源及系统特征 (2)大数据系统的架构设计及功能目标设计 (3)大数据系统程序开发.企业大数 ...

  5. 大数据开发面试知识点复习3

    文章目录 大数据开发复习课程 10.scala 10.1.scala介绍 10.2.scala解释器 10.3.scala的基本语法 10.3.1.声明变量 10.3.2.字符串 10.3.3.数据类 ...

  6. 【Spark】黑马-大数据开发2

    Scala+Spark-大数据开发复习课程 10.scala 10.1.scala介绍 10.2.scala解释器 10.3.scala的基本语法 10.3.1.声明变量 10.3.2.字符串 10. ...

  7. 大数据 - 大数据开发技术课程总结(未完)

    1.课程介绍 大数据开发课程主要从了解大数据概念.特征开始,再介绍大数据Java开发和Hadoop的环境配置,较为全面地讲解了HDFS分布式存储,MapReduce分布式计算框架,Spark平台开发和 ...

  8. 某某证券大数据开发工程师招聘笔试题

    某某证券大数据开发工程师招聘笔试题 一.基础知识(单选题,每题1分,共10分) 1.实现两个远程主机之间的文件复制是用 ( D ) . A . mv B . cp C . cut D. scp 2.一 ...

  9. 如何自学大数据开发?

    大数据技术怎么自学?大数据开发如何自学? 我们在学习大数据开发前需要先找到适合自己的方式方法,首先需要审视一下自身的情况,是否是以兴趣为出发点,对大数据是不是自己是真的感兴趣吗,目前对大数据的了解有多 ...

  10. 大数据开发:ElasticSearch 索引设置

    提起ElasticSearch,大家首先会联想到的往往是其特殊的索引机制,带来的快速查询性能优势.前面我们也对ElasticSearch的索引机制做了简单的介绍,今天的大数据开发分享,我们来讲讲Ela ...

最新文章

  1. javascript数字千分位格式化
  2. 流程控制--for序列
  3. Leecode 869. 重新排序得到 2 的幂——Leecode每日一题系列
  4. Robot framework 引入 Selenium2Library 类库:
  5. 有几种部署模式_来!PyFlink 作业的多种部署模式
  6. 万年历c语言设计报告,C语言实训题目设计报告 万年历
  7. SVG.js 颜色渐变使用
  8. jQuery dataTables四种数据来源[转]-原文地址:http://xqqing79.iteye.com/blog/1219425
  9. PAT A1002 A+B for Polynomials(25)
  10. js高程读书笔记(1-3章)
  11. jdk8官网下载地址
  12. 锐捷交换机配置保存到计算机,锐捷交换机配置命令总结中篇
  13. Struts2下载问题再探
  14. 网站服务器访问ip带宽限速,巧用IP带宽控制实现路由器限速
  15. 进程间的相互通讯 C++
  16. Android Databinding 与 RecycleView mvvm的运用
  17. ps排版html,排版教程,超详细适合初学者的排版教程(二)
  18. linux系统下sendmail的搭建
  19. 着急上市的喜马拉雅FM,进退两难
  20. 褪黑素缓释片 Melatonin Controlled-Release 2 mg 60 tablet

热门文章

  1. IDEA输入字母间距变大报红处理方法
  2. 永磁同步发电机仿真,带四个牵引电机仿真。内燃机车仿真模型
  3. 如何1分钟实现身份实名认证功能?
  4. 深度学习在工业界的应用案例(二)
  5. 认识并理顺元宇宙与产业互联网之间的关系,可以打开产业互联网的发展新症结
  6. 第十章 宠物商店 数据库建立插入信息
  7. Adrealm:区块链的“快慢之道”|金色财经独家专访
  8. Misc 图片中的图片
  9. 淘宝商品采集上架拼多多店铺(无货源数据采集接口,拼多多商品详情数据,淘宝商品详情数据,京东商品详情数据)接口代码对接教程
  10. Unity 图片定点缩放功能