版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u013054888/article/details/90237348

系列文章专栏目录:小浪阅读 Spark 源码

文章目录

  • 序列器 Serializer
    • 序列器的实现 `JavaSerializer`
      • SerializerInstance 序列器实例
    • 序列器的实现 KryoSerializer
  • SerializerManager
  • 实战

序列化模块为 Spark 的 RDD 与 shuffle 模块提供可拔插的序列器。在包 org.apache.spark.serializer 大致的类结构如下:

序列器 Serializer

org.apache.spark.serializer.Serializer 是一个序列器,准确来说是一个序列化器构建者。因为一些序列化库不是线程安全的,Serializer 被用于创建 org.apache.spark.serializer.SerializerInstance 去做实际的序列化工作并且保证SerializerInstance在同一时间只有一个线程能访问。

Serializer 的实现类应该实现:

  1. 一个无参数的构造器或者有一个 SparkConf 的构造器,如果两者都被定义了,则后者被优先使用。
  2. Java 序列化接口。也就是说 Serializer 本身也能被序列化。

注意序列化器不需要在不同的 Spark 版本之间兼容。序列化的数据只是在一个 Spark app 中使用。

Serializer 类如下,具体看注释:

code 1

abstract class Serializer {/*** 反序列化时使用的类加载器,要保证子类优先使用该类加载器。*/@volatile protected var defaultClassLoader: Option[ClassLoader] = None/*** 设置 defaultClassLoader。** @return this Serializer object*/def setDefaultClassLoader(classLoader: ClassLoader): Serializer = {defaultClassLoader = Some(classLoader)this}/** 创建一个新的 [[SerializerInstance]]. */def newInstance(): SerializerInstance/*** 参数如果是 True, 则表示该序列化器支持重新定位他的序列化对象,否则则不行。* 如果支持,这表示在流中输出的被序列化的对象的字节可以进行排序。这相当于对象排序后再进行序列化。* 该属性现在被用于判断 shuffle 使用哪个 shuffleWriter。具体有机会再说。可以看博文 https://blog.csdn.net/qq_26222859/article/details/81454620**/private[spark] def supportsRelocationOfSerializedObjects: Boolean = false
}

下面看他的一个实现类 org.apache.spark.serializer.JavaSerializer

序列器的实现 JavaSerializer

现在来看看序列器的一个简单实现 org.apache.spark.serializer.JavaSerializer,即使用原生 Java 序列化。

code 2

class JavaSerializer(conf: SparkConf) extends Serializer with Externalizable {private var counterReset = conf.getInt("spark.serializer.objectStreamReset", 100)private var extraDebugInfo = conf.getBoolean("spark.serializer.extraDebugInfo", true)protected def this() = this(new SparkConf())  // For deserialization onlyoverride def newInstance(): SerializerInstance = {val classLoader = defaultClassLoader.getOrElse(Thread.currentThread.getContextClassLoader)new JavaSerializerInstance(counterReset, extraDebugInfo, classLoader)}override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {out.writeInt(counterReset)out.writeBoolean(extraDebugInfo)}override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {counterReset = in.readInt()extraDebugInfo = in.readBoolean()}
}

代码很简单, newInstance 直接返回了JavaSerializerInstance,且类加载器使用本线程 ContextClassLoader 缺省。注意他还实现了 Externalizable 以完全控制自身的序列化。

下面来看序列化实例 SerializerInstance

SerializerInstance 序列器实例

抽象类 SerializerInstance 的定义如下,他可以序列化/反序列化对象的与生成 序列化/反序列 流:

code 3

abstract class SerializerInstance {def serialize[T: ClassTag](t: T): ByteBufferdef deserialize[T: ClassTag](bytes: ByteBuffer): Tdef deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): Tdef serializeStream(s: OutputStream): SerializationStreamdef deserializeStream(s: InputStream): DeserializationStream
}

前面也说过 SerializerInstance 不是线程安全的。虽然他可以创建多个序列/反序列化流,不过他们只能在一个线程中使用。SerializationStreamDeserializationStream 是一个包装类,他得到输入流/输出流,提供往流中写入序列后的对象SerializationStream#writeObject与读取反序列的对象DeserializationStream#readObject的方法,以方便序列/反序列化过程。关于他们实现与细节,这里描述下。

SerializationStream 的实现 JavaSerializationStream 将传入的输入流传入ObjectOutputStream 中,在 writeObject 的时候直接写入 ObjectOutputStream 以完成序列化操作。并且在写入流的过程中,为了避免流在被写入过程中持有的对象过多而导致流膨胀(内存泄露),每写入 100 个对象,就 reset 一下 ObjectOutputStream (解除引用,JVM 回收内存),这里的 100 即由 code2 中的counterReset 所配置,writeObject() 见 code4:

code 4

  /*** Calling reset to avoid memory leak:* http://stackoverflow.com/questions/1281549/memory-leak-traps-in-the-java-standard-api* But only call it every 100th time to avoid bloated serialization streams (when* the stream 'resets' object class descriptions have to be re-written)*/def writeObject[T: ClassTag](t: T): SerializationStream = {try {objOut.writeObject(t)} catch {case e: NotSerializableException if extraDebugInfo =>throw SerializationDebugger.improveException(t, e)}counter += 1if (counterReset > 0 && counter >= counterReset) {objOut.reset()counter = 0}this}

DeserializationStream 的 实现类 JavaDeserializationStream 也使用相同的思路,将输出流传入 ObjectInputStream 来进行反序列化。且会在进行序列化之前判断要序列化的类是否可找到,找不到就抛出 ClassNotFoundException。此外 DeserializationStream 还提供了将流转化为迭代器(还有 key-value 元素迭代器)的方法。

现在我们可以看 SerializerInstance 的 Java 实现了,见 code 5。很简单,序列/反序列化 对象的方法借助流来实现。

code 5

private[spark] class JavaSerializerInstance(counterReset: Int, extraDebugInfo: Boolean, defaultClassLoader: ClassLoader)extends SerializerInstance {// 序列化对象override def serialize[T: ClassTag](t: T): ByteBuffer = {val bos = new ByteBufferOutputStream()val out = serializeStream(bos)out.writeObject(t)out.close()bos.toByteBuffer}// 反序列化 byte 为对象override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {val bis = new ByteBufferInputStream(bytes)val in = deserializeStream(bis)in.readObject()}// 反序列化 byte 为对象,使用指定类加载器override def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T = {val bis = new ByteBufferInputStream(bytes)val in = deserializeStream(bis, loader)in.readObject()}// 生成序列化流override def serializeStream(s: OutputStream): SerializationStream = {new JavaSerializationStream(s, counterReset, extraDebugInfo)}// 生成反序列化流override def deserializeStream(s: InputStream): DeserializationStream = {new JavaDeserializationStream(s, defaultClassLoader)}//  生成反序列化流, 并使用指定类加载器def deserializeStream(s: InputStream, loader: ClassLoader): DeserializationStream = {new JavaDeserializationStream(s, loader)}
}

序列器的实现 KryoSerializer

当然上面描述的一切 Java 原生序列的实现都对应有 Kryo 的实现。
// 未完待续

SerializerManager

SerializerManager 是为各类组件配置序列化、压缩、加密功能的组件,他还自动为 shuffles 选择 Serializer

对于基本变量、基本变量数组、String 类型,默认选用 kryo 来序列化。

关键函数如下,可以只看注释:

/*** 为了 加密 与 解压 包装一个 input stream*/
def wrapStream(blockId: BlockId, s: InputStream): InputStream = {wrapForCompression(blockId, wrapForEncryption(s))
}/*** 为了 加密 与 解压 包装一个 output stream*/
def wrapStream(blockId: BlockId, s: OutputStream): OutputStream = {wrapForCompression(blockId, wrapForEncryption(s))
}/*** 如果 shuffle 加密激活了,为了 加密 而包装 input stream*/
def wrapForEncryption(s: InputStream): InputStream = {encryptionKey.map { key => CryptoStreamUtils.createCryptoInputStream(s, conf, key) }.getOrElse(s)
}/*** 如果 shuffle 加密激活了,为了 加密 而包装 output stream*/
def wrapForEncryption(s: OutputStream): OutputStream = {encryptionKey.map { key => CryptoStreamUtils.createCryptoOutputStream(s, conf, key) }.getOrElse(s)
}/*** 如果对于该 block 类型的块压缩激活了,为了 压缩 包装 output stream*/
def wrapForCompression(blockId: BlockId, s: OutputStream): OutputStream = {if (shouldCompress(blockId)) compressionCodec.compressedOutputStream(s) else s
}/*** 如果对于该 block 类型的块压缩激活了,为了 压缩 包装 input stream*/
def wrapForCompression(blockId: BlockId, s: InputStream): InputStream = {if (shouldCompress(blockId)) compressionCodec.compressedInputStream(s) else s
}/** 序列化到 stream 中. */
def dataSerializeStream[T: ClassTag](blockId: BlockId,outputStream: OutputStream,values: Iterator[T]): Unit = {val byteStream = new BufferedOutputStream(outputStream)val autoPick = !blockId.isInstanceOf[StreamBlockId]val ser = getSerializer(implicitly[ClassTag[T]], autoPick).newInstance()ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close()
}/** 序列化到 chunked byte buffer 中. */
def dataSerialize[T: ClassTag](blockId: BlockId,values: Iterator[T]): ChunkedByteBuffer = {dataSerializeWithExplicitClassTag(blockId, values, implicitly[ClassTag[T]])
}/** 序列化到 chunked byte buffer 中. */
def dataSerializeWithExplicitClassTag(blockId: BlockId,values: Iterator[_],classTag: ClassTag[_]): ChunkedByteBuffer = {val bbos = new ChunkedByteBufferOutputStream(1024 * 1024 * 4, ByteBuffer.allocate)val byteStream = new BufferedOutputStream(bbos)val autoPick = !blockId.isInstanceOf[StreamBlockId]val ser = getSerializer(classTag, autoPick).newInstance()ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close()bbos.toChunkedByteBuffer
}/*** 反序列 InputStream 到一个 value 的迭代器中并且当迭代器可以获取到,就暴露出去。*/
def dataDeserializeStream[T](blockId: BlockId,inputStream: InputStream)(classTag: ClassTag[T]): Iterator[T] = {val stream = new BufferedInputStream(inputStream)val autoPick = !blockId.isInstanceOf[StreamBlockId]getSerializer(classTag, autoPick).newInstance().deserializeStream(wrapForCompression(blockId, stream)).asIterator.asInstanceOf[Iterator[T]]
}

实战

待续。。


End!!

[以浪为码]Spark源码阅读03 - 序列化介绍 serializer相关推荐

  1. spark的java源码,Spark源码包的编译

    Spark源码包的编译和部署生成,其本质只有两种:Maven和SBT (Simple Build Tool), 只不过针对不同的场景而已: Maven编译 SBT编译 IntelliJ IDEA编译( ...

  2. Windows + IDEA + SBT 打造Spark源码阅读环境

    Spark源码阅读环境的准备 Spark源码是有Scala语言写成的,目前,IDEA对Scala的支持要比eclipse要好,大多数人会选在在IDEA上完成Spark平台应用的开发.因此,Spark源 ...

  3. Spark源码阅读——DirectInputDStream

    2019独角兽企业重金招聘Python工程师标准>>> Spark源码分析--DirectInputDStream 在Spark-Streaming中,对流的抽象是使用DStream ...

  4. Spark源码阅读——任务提交过程

    2019独角兽企业重金招聘Python工程师标准>>> Spark 源码阅读--任务提交过程 当我们在使用spark编写mr作业是,最后都要涉及到调用reduce,foreach或者 ...

  5. Apache Spark源码阅读环境搭建

    文章目录 1 下载源码 2 导入项目 3 新建文件 4 Debug JavaWordCount 4.1 搜索JavaWordCount 4.2 修改参数 4.3 Debug 遇到的报错 1 未设置Ma ...

  6. 3000门徒内部训练绝密视频(泄密版)第2课:Scala面向对象彻底精通及Spark源码阅读

    Scala面向对象彻底精通及Spark源码阅读 不用写public class中的public class Person {private var myName = "flink" ...

  7. 3000门徒内部训练绝密视频(泄密版)第3课:Scala中函数式编程彻底精通及Spark源码阅读

    Scala中函数式编程彻底精通及Spark源码阅读 函数可以不依赖于类,函数可以作为函数的参数,函数可以作为函数的返回值 =>表明对左面的参数进行右面的加工 函数赋值给变量需要在函数名后面加空格 ...

  8. 3000门徒内部训练绝密视频(泄密版)第5课:彻底精通Scala隐式转换和并发编程及Spark源码阅读

    彻底精通Scala隐式转换和并发编程及Spark源码阅读 Akka ,Scala内部并发 隐式转换.隐式类.隐式参数 可以手动指定某种类型的对象或类转换成其他类型的对象或类.转换的原因是假设写好接口 ...

  9. spark源码阅读总纲

    spark使用了这么长时间,对于driver.master.worker.BlockManage.RDD.DAGScheduler.TaskScheduler这些概念或多或少都了解一些,但是对于其任务 ...

最新文章

  1. 在一台机器上搭建多个redis实例
  2. 微软欢迎所有热爱开源软件的朋友们来投稿![征稿开放时间:2015年8月17日]
  3. 空间三维散点数据的线性拟合
  4. Runtime.getRuntime().exec()
  5. 模块化 组件化 工程化_软件工程中的模块和软件组件
  6. python 装饰器应用
  7. 高并发大流量专题---3、前端优化(减少HTTP请求次数)
  8. 使用Spring Boot构建独立的OAuth服务器(三)
  9. 项目交换通知——PM(李忠)
  10. 设计模式 - 建造者模式/生成器模式
  11. 老司机推荐企业用什么代理ip好
  12. 【AI SoC】全志R329 高算力低功耗,当下智能音箱的最优解?
  13. 如何创建Qt quick应用程序
  14. [leetcode]1438. 绝对差不超过限制的最长连续子数组
  15. 结合自己经历的一场机器人省赛浅谈如何学习单片机
  16. 【学习资料】中国开放大学-电大-《教育学》形考作业答案(2018).docx
  17. 图片压缩网址和工具---TinyPNG
  18. CCPC-Wannafly Comet OJ 夏季欢乐赛(2019)部分题解
  19. 1.Object.create() 是什么
  20. php 获取当前用户的IP

热门文章

  1. 节省 75%人工处理,行业龙头日丰 集团如何落地精益化生产
  2. 「从零单排canal 04」 启动模块deployer源码解析
  3. 计算机电脑水晶字制作图片,电脑技巧收藏家photoshop技巧Photoshop教程:制作透明水晶字(3)...
  4. 【东大自控笔记8】闭环零点与开环零极点之间的关系
  5. 2021年绥化一中高考成绩查询,2019年绥化市重点高中排名 绥化市高中人气排行榜...
  6. 2022年中级会计职称财务管理练习题及答案
  7. 齐博X1冰蓝后台模板
  8. 5G直播武汉医院施工现场!百万网友,在线监工
  9. 标你妹怎么导出html,标你妹啊:PSD文档自动生成标注的工具
  10. 高德地图去掉定位按钮_怎样修改百度地图店名怎么取消高德地图定位