参考文章:Spark序列化

spark之kryo 序列化

Spark序列化入门

1. 什么是序列化和序列化?

  • 序列化是什么
  1. 序列化的作用就是可以将对象的内容变成二进制, 存入文件中保存
  2. 反序列化指的是将保存下来的二进制对象数据恢复成对象
  • 序列化对对象的要求
  1. 对象必须实现 Serializable 接口
  2. 对象中的所有属性必须都要可以被序列化, 如果出现无法被序列化的属性, 则序列化失败
  • 限制
  1. 对象被序列化后, 生成的二进制文件中, 包含了很多环境信息, 如对象头, 对象中的属性字段等, 所以内容相对较大
  2. 因为数据量大, 所以序列化和反序列化的过程比较慢
  • 序列化的应用场景
  1. 持久化对象数据
  2. 网络中不能传输 Java 对象, 只能将其序列化后传输二进制数据

2. Java序列化

Java序列化算法

  • Serialization(序列化)是一种将对象以一连串的字节描述的过程;
  • 反序列化deserialization是一种将这些字节重建成一个对象的过程。
  • Java序列化API提供一种处理对象序列化的标准机制。

为什么要进行序列化?

Java中,一切都是对象,在分布式环境中经常需要将Object从这一端网络或设备传递到另一端。
这就需要有一种可以在两端传输数据的协议。
Java序列化机制就是为了解决这个问题而产生。

如何进行序列化?

一个对象能够序列化的前提是实现Serializable接口,Serializable接口没有方法,更像是个标记。
有了这个标记的Class就能被序列化机制处理。

序列化过程:

a、实现Serializable接口

b、写个程序将对象序列化并输出(ObjectOutputStream能把Object输出成Byte流。将Byte流暂时存储到serial.out文件里)

c、使用ObjectInputStream从持久的文件中读取Bytes重建对象

3. Spark序列化

大部分Spark程序都具有“内存计算”的天性,所以集群中的所有资源:CPU、网络带宽或者是内存都有可能成为Spark程序的瓶颈。
通常情况下,如果数据完全加载到内存那么网络带宽就会成为瓶颈,但是你仍然需要对程序进行优化,
例如采用序列化的方式保存RDD数据以便减少内存使用。
数据序列化不但能提高网络性能还能减少内存使用。

几乎所有的资料都显示kryo 序列化方式优于java自带的序列化方式,而且在spark2.*版本中都是默认采用kryo 序列化。

在spark2.0+版本的官方文档中提到:spark默认提供了两个序列化库:Java自身的序列化和Kryo序列化
官网的解释是:java序列化灵活,但是速度缓慢。Kryo序列化速度更快且更紧凑,但是支持的类型较少。
而且spark现在已经默认RDD在shuffle的时候对简单类型使用了Kryo序列化。

3.1 Spark通过两种方式来创建序列化器

a. Java序列化

在默认情况下,Spark采用Java的ObjectOutputStream序列化一个对象。
该方式适用于所有实现了java.io.Serializable的类。
通过继承java.io.Externalizable,你能进一步控制序列化的性能。
Java序列化非常灵活,但是速度较慢,在某些情况下序列化的结果也比较大。

b. Kryo序列化

Spark也能使用Kryo(版本2)序列化对象。
Kryo 是 Spark 引入的一个外部的序列化工具, 可以增快 RDD 的运行速度,
因为 Kryo 序列化后的对象更小, 序列化和反序列化的速度非常快
Kryo不但速度极快, 而且产生的结果更为紧凑(通常能提高10倍)。
Kryo的缺点是不支持所有类型, 为了更好的性能, 需要提前注册程序中所使用的类(class)。

3.2 如何使用kryo序列化

可以在创建SparkContext之前,通过调用System.setProperty("spark.serializer", "spark.KryoSerializer"),
将序列化方式切换成Kryo。
但是Kryo需要用户进行注册,这也是为什么Kryo不能成为Spark序列化默认方式的唯一原因,
但是建议对于任何“网络密集型”(network-intensive)的应用,
都采用这种方式进行序列化方式。
Kryo文档描述了很多便于注册的高级选项,例如添加用户自定义的序列化代码。

如果对象非常大,还需要增加属性spark.kryoserializer.buffer.mb的值。
该属性的默认值是32,但是该属性需要足够大以便能够容纳需要序列化的最大对象。

如果不注册相应的类,Kryo仍然可以工作,
但是需要为了每一个对象保存其对应的全类名(full class name),这非常浪费。

spark中已经包含了kryo库,使用kryo只需要注册即可。官网只提供了scala版本的,java版本的如下:

或者:System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

两者都可以用,但是测试好像没起到什么效果。于是需要手动注册:

上图是关于kryo 的一些配置,可以单独注册自己的一个类,如紫色框线部分;
也可以像红色框线部分一样,自定义一个接口实现类MyKryoRegistrator,在这个类里面将所需的类全部注册。具体操作如下图:
如果需要序列化的类太多,就在这里逐一列举即可,当然被注册的类要实现java.io.Serializable,即:class TestKryo implements Serializable

序列化的效果:

test程序验证效果:

3.5 测试结果:

三种不同情况下的的RDD大小:
默认不序列化:2017.0 KB
在MyKryoRegistrator中序列化: 960.2 KB
只序列化demo.TestKryo:1053.0 KB

4. Spark 序列化和反序列化的应用场景

a.Task 分发

Task 是一个对象, 想在网络中传输对象就必须要先序列化

b.RDD 缓存

val rdd1 = rdd.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
rdd1.cache
rdd1.collect
  • RDD 中处理的是对象, 例如说字符串, Person 对象等
  • 如果缓存 RDD 中的数据, 就需要缓存这些对象
  • 对象是不能存在文件中的, 必须要将对象序列化后, 将二进制数据存入文件

c.广播变量

广播变量会分发到不同的机器上, 这个过程中需要使用网络, 对象在网络中传输就必须先被序列化

d. Shuffle 过程

  • Shuffle 过程是由 Reducer 从 Mapper 中拉取数据, 这里面涉及到两个需要序列化对象的原因
  • RDD 中的数据对象需要在 Mapper 端落盘缓存, 等待拉取
  • Mapper 和 Reducer 要传输数据对象
  • Spark Streaming 的 Receiver

Spark Streaming 中获取数据的组件叫做 Receiver, 获取到的数据也是对象形式, 在获取到以后需要落盘暂存, 就需要对数据对象进行序列化

d. 算子引用外部对象

class userserializable(i: Int)rdd.map(i => new Unserializable(i)).collect.foreach(println)
  • 在 Map算子的函数中, 传入了一个 Unserializable 的对象
  • Map 算子的函数是会在整个集群中运行的, 那 Unserializable 对象就需要跟随 Map 算子的函数被传输到不同的节点上
  • 如果 Unserializable 不能被序列化, 则会报错

e. RDD 的序列化

  • RDD 的序列化

RDD 的序列化只能使用 Java 序列化器, 或者 Kryo 序列化器

  • 为什么?

RDD 中存放的是数据对象, 要保留所有的数据就必须要对对象的元信息进行保存, 例如对象头之类的 保存一整个对象, 内存占用和效率会比较低一些

  • Kryo序列化示例
  val conf = new SparkConf().setMaster("local[2]").setAppName("KyroTest")conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")conf.registerKryoClasses(Array(classOf[Person]))val sc = new SparkContext(conf)rdd.map(arr => Person(arr(0), arr(1), arr(2)))

f.DataFrame 和 Dataset 中的序列化

历史的问题

RDD 中无法感知数据的组成, 无法感知数据结构, 只能以对象的形式处理数据

DataFrame 和 Dataset 的特点:

  • DataFrame 和 Dataset 是为结构化数据优化的
  • 在 DataFrame 和 Dataset 中, 数据和数据的 Schema 是分开存储的
spark.read.csv("...").where($"name" =!= "").groupBy($"name").map(row: Row => row).show()
  • DataFrame 中没有数据对象这个概念, 所有的数据都以行的形式存在于 Row 对象中, Row 中记录了每行数据的结构, 包括列名, 类型等

  • Dataset 中上层可以提供有类型的 API, 用以操作数据, 但是在内部, 无论是什么类型的数据对象 Dataset 都使用一个叫做 InternalRow 的类型的对象存储数据
val dataset: Dataset[Person] = spark.read.csv(...).as[Person]

总结

  1. 当需要将对象缓存下来的时候, 或者在网络中传输的时候, 要把对象转成二进制, 在使用的时候再将二进制转为对象, 这个过程叫做序列化和反序列化
  2. 在 Spark 中有很多场景需要存储对象, 或者在网络中传输对象
    1. Task 分发的时候, 需要将任务序列化, 分发到不同的 Executor 中执行
    2. 缓存 RDD 的时候, 需要保存 RDD 中的数据
    3. 广播变量的时候, 需要将变量序列化, 在集群中广播
    4. RDD 的 Shuffle 过程中 Map 和 Reducer 之间需要交换数据
    5. 算子中如果引入了外部的变量, 这个外部的变量也需要被序列化
  3. RDD 因为不保留数据的元信息, 所以必须要序列化整个对象, 常见的方式是 Java 的序列化器, 和 Kyro 序列化器
  4. Dataset 和 DataFrame 中保留数据的元信息, 所以可以不再使用 Java 的序列化器和 Kyro 序列化器, 使用 Spark 特有的序列化协议, 生成 UnsafeInternalRow 用以保存数据, 这样不仅能减少数据量, 也能减少序列化和反序列化的开销, 其速度大概能达到 RDD 的序列化的 20 倍左右

Spark序列化简介相关推荐

  1. 学习笔记Spark(九)—— Spark MLlib应用(1)—— 机器学习简介、Spark MLlib简介

    一.机器学习简介 1.1.机器学习概念 机器学习就是让机器能像人一样有学习.理解.认识的能力. 机器学习是一门人工智能的科学,该领域的主要研究对象是人工智能,特别是如何在经验学习中改善具体算法的性能. ...

  2. 学习笔记Spark(六)—— Spark SQL应用(1)—— Spark SQL简介、环境配置

    一.Spark SQL简介 1.1.Spark SQL特性 Spark SQL是Spark Core之上的一个组件,它引入了一个称为SchemaRDD的新- 数据抽象,它为结构化和半结构化数据提供支持 ...

  3. dataframe 筛选_Spark.DataFrame与Spark.ML简介

    本文是PySpark销量预测系列第一篇,后面会陆续通过实战案例详细介绍PySpark销量预测流程,包含特征工程.特征筛选.超参搜索.预测算法. 在零售销量预测领域,销售小票数据动辄上千万条,这个量级在 ...

  4. Spark Streaming简介 (三十四)

    Spark Streaming简介 Spark Streaming 是 Spark 提供的对实时数据进行流式计算的组件.它是 Spark 核心 API 的一个扩展,具有吞吐量高.容错能力强的实时流数据 ...

  5. Spark MLlib简介

    1. Spark MLlib简介 1.1 传统ML ML利用数据或以往经验,以此优化计算机程序的性能标准. ML强调的3个关键词: 算法.经验.性能. 基于大数据的ML 1. 传统的ML算法,由于技术 ...

  6. spark 序列化_spark shell 配置 Kryo 序列化

    Spark 默认使用 Java Serialization 作为序列化方式,但是这种序列化方式一般会被认为性能和效率一般.因此 Spark 官方是推荐使用 Kryo 来代替默认的序列化方式的,为了便于 ...

  7. spark 序列化错误 集群提交时_Spark统一内存管理机制

    一.内存的分配 预留内存:300m 可用内存 = 系统内存 -  预留内存 可用内存 = 统一内存(60%) + 其他 (40%) 统一内存 = 存储内存(Storage)50%  + 执行内存(Ex ...

  8. sparkstreaming监听hdfs目录如何终止_四十六、Spark Streaming简介及入门

    1.什么是Spark Streaming Spark Streaming是基于Spark Core之间的实时计算框架,可以从很多数据源消费数据并对数据进行处理.它是Spark核心API的一个扩展与封装 ...

  9. Spark序列化入门

    什么是序列化和序列化? 序列化是什么 1. 序列化的作用就是可以将对象的内容变成二进制, 存入文件中保存 2. 反序列化指的是将保存下来的二进制对象数据恢复成对象 序列化对对象的要求 1. 对象必须实 ...

最新文章

  1. 【Prometheus】Exporter详解
  2. Win系统复制粘贴失效解决办法
  3. 计算机中丢失 MSVCR100.dll
  4. Mac zsh切换bash bash切换zsh
  5. JAVA_OA(五):SpringMVC接受传入页面的参数值
  6. 从刘备面试诸葛亮看信息系统项目管理师
  7. 线性代数-矩阵-转置 C和C++的实现
  8. Entityframework批量删除
  9. python知识笔记_[Python笔记]第一篇:基础知识
  10. 找不到ffmpeg.dll无法继续执行代码怎么办_2020年,小规模增值税3%减按1%征收,那么一般纳税人该怎么办?...
  11. 2017版张宇1000题【题目收集】
  12. VS,VAX一些快捷键记录
  13. 鸿蒙珠融入体内,逍遥至尊之诸天逍遥
  14. 悟空问答如何高效引流,这几个技巧让你上首页
  15. linux常用命令——ls
  16. NginxProxyManager实现unraid反向代理
  17. Model-Agnostic Meta-Learning for Fast Adaptation of Deep Networks阅读笔记
  18. oracle java培训机构
  19. IBM P570热插拔添加稳压模块
  20. android system ui修改,【教程】一些有关于SystemUI修改的教程

热门文章

  1. 【分隔结构】主谓分离
  2. 【虚幻引擎】UE4 FTabmanager实现多窗口
  3. 玩转Java8 Stream,代码效率飞升
  4. Slackel Live ,惊艳与惊讶并存
  5. VLAN应用篇系列:(8)Cisco交换机 PVLAN技术应用(高级的隔离技术)
  6. 卡巴斯基在线杀毒曝高危漏洞 用户可被攻击
  7. PS图层中如何快速找到想要的图层
  8. 华为云开发者联盟助力培养数字化人才,加速应用构建质效提升
  9. 高斯消元(整数型)模板 高斯消元(实数型)模板 高斯消元(异或型)
  10. 怎么使用代理IP上网 如何设置代理服务器