[以浪为码]Spark源码阅读03 - 序列化介绍 serializer
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u013054888/article/details/90237348
系列文章专栏目录:小浪阅读 Spark 源码
文章目录
- 序列器 Serializer
- 序列器的实现 `JavaSerializer`
- SerializerInstance 序列器实例
- 序列器的实现 KryoSerializer
- SerializerManager
- 实战
序列化模块为 Spark 的 RDD 与 shuffle 模块提供可拔插的序列器。在包 org.apache.spark.serializer
大致的类结构如下:
序列器 Serializer
org.apache.spark.serializer.Serializer
是一个序列器,准确来说是一个序列化器构建者。因为一些序列化库不是线程安全的,Serializer
被用于创建 org.apache.spark.serializer.SerializerInstance
去做实际的序列化工作并且保证SerializerInstance
在同一时间只有一个线程能访问。
Serializer 的实现类应该实现:
- 一个无参数的构造器或者有一个
SparkConf
的构造器,如果两者都被定义了,则后者被优先使用。 - Java 序列化接口。也就是说
Serializer
本身也能被序列化。
注意序列化器不需要在不同的 Spark 版本之间兼容。序列化的数据只是在一个 Spark app 中使用。
Serializer
类如下,具体看注释:
code 1
abstract class Serializer {/*** 反序列化时使用的类加载器,要保证子类优先使用该类加载器。*/@volatile protected var defaultClassLoader: Option[ClassLoader] = None/*** 设置 defaultClassLoader。** @return this Serializer object*/def setDefaultClassLoader(classLoader: ClassLoader): Serializer = {defaultClassLoader = Some(classLoader)this}/** 创建一个新的 [[SerializerInstance]]. */def newInstance(): SerializerInstance/*** 参数如果是 True, 则表示该序列化器支持重新定位他的序列化对象,否则则不行。* 如果支持,这表示在流中输出的被序列化的对象的字节可以进行排序。这相当于对象排序后再进行序列化。* 该属性现在被用于判断 shuffle 使用哪个 shuffleWriter。具体有机会再说。可以看博文 https://blog.csdn.net/qq_26222859/article/details/81454620**/private[spark] def supportsRelocationOfSerializedObjects: Boolean = false
}
下面看他的一个实现类 org.apache.spark.serializer.JavaSerializer
序列器的实现 JavaSerializer
现在来看看序列器的一个简单实现 org.apache.spark.serializer.JavaSerializer
,即使用原生 Java 序列化。
code 2
class JavaSerializer(conf: SparkConf) extends Serializer with Externalizable {private var counterReset = conf.getInt("spark.serializer.objectStreamReset", 100)private var extraDebugInfo = conf.getBoolean("spark.serializer.extraDebugInfo", true)protected def this() = this(new SparkConf()) // For deserialization onlyoverride def newInstance(): SerializerInstance = {val classLoader = defaultClassLoader.getOrElse(Thread.currentThread.getContextClassLoader)new JavaSerializerInstance(counterReset, extraDebugInfo, classLoader)}override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {out.writeInt(counterReset)out.writeBoolean(extraDebugInfo)}override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {counterReset = in.readInt()extraDebugInfo = in.readBoolean()}
}
代码很简单, newInstance
直接返回了JavaSerializerInstance
,且类加载器使用本线程 ContextClassLoader
缺省。注意他还实现了 Externalizable
以完全控制自身的序列化。
下面来看序列化实例 SerializerInstance
。
SerializerInstance 序列器实例
抽象类 SerializerInstance
的定义如下,他可以序列化/反序列化对象的与生成 序列化/反序列 流:
code 3
abstract class SerializerInstance {def serialize[T: ClassTag](t: T): ByteBufferdef deserialize[T: ClassTag](bytes: ByteBuffer): Tdef deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): Tdef serializeStream(s: OutputStream): SerializationStreamdef deserializeStream(s: InputStream): DeserializationStream
}
前面也说过 SerializerInstance
不是线程安全的。虽然他可以创建多个序列/反序列化流,不过他们只能在一个线程中使用。SerializationStream
与 DeserializationStream
是一个包装类,他得到输入流/输出流,提供往流中写入序列后的对象SerializationStream#writeObject
与读取反序列的对象DeserializationStream#readObject
的方法,以方便序列/反序列化过程。关于他们实现与细节,这里描述下。
SerializationStream
的实现 JavaSerializationStream
将传入的输入流传入ObjectOutputStream
中,在 writeObject
的时候直接写入 ObjectOutputStream
以完成序列化操作。并且在写入流的过程中,为了避免流在被写入过程中持有的对象过多而导致流膨胀(内存泄露),每写入 100 个对象,就 reset
一下 ObjectOutputStream
(解除引用,JVM 回收内存),这里的 100 即由 code2 中的counterReset
所配置,writeObject()
见 code4:
code 4
/*** Calling reset to avoid memory leak:* http://stackoverflow.com/questions/1281549/memory-leak-traps-in-the-java-standard-api* But only call it every 100th time to avoid bloated serialization streams (when* the stream 'resets' object class descriptions have to be re-written)*/def writeObject[T: ClassTag](t: T): SerializationStream = {try {objOut.writeObject(t)} catch {case e: NotSerializableException if extraDebugInfo =>throw SerializationDebugger.improveException(t, e)}counter += 1if (counterReset > 0 && counter >= counterReset) {objOut.reset()counter = 0}this}
DeserializationStream
的 实现类 JavaDeserializationStream
也使用相同的思路,将输出流传入 ObjectInputStream
来进行反序列化。且会在进行序列化之前判断要序列化的类是否可找到,找不到就抛出 ClassNotFoundException
。此外 DeserializationStream
还提供了将流转化为迭代器(还有 key-value 元素迭代器)的方法。
现在我们可以看 SerializerInstance 的 Java 实现了,见 code 5。很简单,序列/反序列化 对象的方法借助流来实现。
code 5
private[spark] class JavaSerializerInstance(counterReset: Int, extraDebugInfo: Boolean, defaultClassLoader: ClassLoader)extends SerializerInstance {// 序列化对象override def serialize[T: ClassTag](t: T): ByteBuffer = {val bos = new ByteBufferOutputStream()val out = serializeStream(bos)out.writeObject(t)out.close()bos.toByteBuffer}// 反序列化 byte 为对象override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {val bis = new ByteBufferInputStream(bytes)val in = deserializeStream(bis)in.readObject()}// 反序列化 byte 为对象,使用指定类加载器override def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T = {val bis = new ByteBufferInputStream(bytes)val in = deserializeStream(bis, loader)in.readObject()}// 生成序列化流override def serializeStream(s: OutputStream): SerializationStream = {new JavaSerializationStream(s, counterReset, extraDebugInfo)}// 生成反序列化流override def deserializeStream(s: InputStream): DeserializationStream = {new JavaDeserializationStream(s, defaultClassLoader)}// 生成反序列化流, 并使用指定类加载器def deserializeStream(s: InputStream, loader: ClassLoader): DeserializationStream = {new JavaDeserializationStream(s, loader)}
}
序列器的实现 KryoSerializer
当然上面描述的一切 Java 原生序列的实现都对应有 Kryo 的实现。
// 未完待续
SerializerManager
SerializerManager
是为各类组件配置序列化、压缩、加密功能的组件,他还自动为 shuffles 选择 Serializer
。
对于基本变量、基本变量数组、String 类型,默认选用 kryo
来序列化。
关键函数如下,可以只看注释:
/*** 为了 加密 与 解压 包装一个 input stream*/
def wrapStream(blockId: BlockId, s: InputStream): InputStream = {wrapForCompression(blockId, wrapForEncryption(s))
}/*** 为了 加密 与 解压 包装一个 output stream*/
def wrapStream(blockId: BlockId, s: OutputStream): OutputStream = {wrapForCompression(blockId, wrapForEncryption(s))
}/*** 如果 shuffle 加密激活了,为了 加密 而包装 input stream*/
def wrapForEncryption(s: InputStream): InputStream = {encryptionKey.map { key => CryptoStreamUtils.createCryptoInputStream(s, conf, key) }.getOrElse(s)
}/*** 如果 shuffle 加密激活了,为了 加密 而包装 output stream*/
def wrapForEncryption(s: OutputStream): OutputStream = {encryptionKey.map { key => CryptoStreamUtils.createCryptoOutputStream(s, conf, key) }.getOrElse(s)
}/*** 如果对于该 block 类型的块压缩激活了,为了 压缩 包装 output stream*/
def wrapForCompression(blockId: BlockId, s: OutputStream): OutputStream = {if (shouldCompress(blockId)) compressionCodec.compressedOutputStream(s) else s
}/*** 如果对于该 block 类型的块压缩激活了,为了 压缩 包装 input stream*/
def wrapForCompression(blockId: BlockId, s: InputStream): InputStream = {if (shouldCompress(blockId)) compressionCodec.compressedInputStream(s) else s
}/** 序列化到 stream 中. */
def dataSerializeStream[T: ClassTag](blockId: BlockId,outputStream: OutputStream,values: Iterator[T]): Unit = {val byteStream = new BufferedOutputStream(outputStream)val autoPick = !blockId.isInstanceOf[StreamBlockId]val ser = getSerializer(implicitly[ClassTag[T]], autoPick).newInstance()ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close()
}/** 序列化到 chunked byte buffer 中. */
def dataSerialize[T: ClassTag](blockId: BlockId,values: Iterator[T]): ChunkedByteBuffer = {dataSerializeWithExplicitClassTag(blockId, values, implicitly[ClassTag[T]])
}/** 序列化到 chunked byte buffer 中. */
def dataSerializeWithExplicitClassTag(blockId: BlockId,values: Iterator[_],classTag: ClassTag[_]): ChunkedByteBuffer = {val bbos = new ChunkedByteBufferOutputStream(1024 * 1024 * 4, ByteBuffer.allocate)val byteStream = new BufferedOutputStream(bbos)val autoPick = !blockId.isInstanceOf[StreamBlockId]val ser = getSerializer(classTag, autoPick).newInstance()ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close()bbos.toChunkedByteBuffer
}/*** 反序列 InputStream 到一个 value 的迭代器中并且当迭代器可以获取到,就暴露出去。*/
def dataDeserializeStream[T](blockId: BlockId,inputStream: InputStream)(classTag: ClassTag[T]): Iterator[T] = {val stream = new BufferedInputStream(inputStream)val autoPick = !blockId.isInstanceOf[StreamBlockId]getSerializer(classTag, autoPick).newInstance().deserializeStream(wrapForCompression(blockId, stream)).asIterator.asInstanceOf[Iterator[T]]
}
实战
待续。。
End!!
[以浪为码]Spark源码阅读03 - 序列化介绍 serializer相关推荐
- spark的java源码,Spark源码包的编译
Spark源码包的编译和部署生成,其本质只有两种:Maven和SBT (Simple Build Tool), 只不过针对不同的场景而已: Maven编译 SBT编译 IntelliJ IDEA编译( ...
- Windows + IDEA + SBT 打造Spark源码阅读环境
Spark源码阅读环境的准备 Spark源码是有Scala语言写成的,目前,IDEA对Scala的支持要比eclipse要好,大多数人会选在在IDEA上完成Spark平台应用的开发.因此,Spark源 ...
- Spark源码阅读——DirectInputDStream
2019独角兽企业重金招聘Python工程师标准>>> Spark源码分析--DirectInputDStream 在Spark-Streaming中,对流的抽象是使用DStream ...
- Spark源码阅读——任务提交过程
2019独角兽企业重金招聘Python工程师标准>>> Spark 源码阅读--任务提交过程 当我们在使用spark编写mr作业是,最后都要涉及到调用reduce,foreach或者 ...
- Apache Spark源码阅读环境搭建
文章目录 1 下载源码 2 导入项目 3 新建文件 4 Debug JavaWordCount 4.1 搜索JavaWordCount 4.2 修改参数 4.3 Debug 遇到的报错 1 未设置Ma ...
- 3000门徒内部训练绝密视频(泄密版)第2课:Scala面向对象彻底精通及Spark源码阅读
Scala面向对象彻底精通及Spark源码阅读 不用写public class中的public class Person {private var myName = "flink" ...
- 3000门徒内部训练绝密视频(泄密版)第3课:Scala中函数式编程彻底精通及Spark源码阅读
Scala中函数式编程彻底精通及Spark源码阅读 函数可以不依赖于类,函数可以作为函数的参数,函数可以作为函数的返回值 =>表明对左面的参数进行右面的加工 函数赋值给变量需要在函数名后面加空格 ...
- 3000门徒内部训练绝密视频(泄密版)第5课:彻底精通Scala隐式转换和并发编程及Spark源码阅读
彻底精通Scala隐式转换和并发编程及Spark源码阅读 Akka ,Scala内部并发 隐式转换.隐式类.隐式参数 可以手动指定某种类型的对象或类转换成其他类型的对象或类.转换的原因是假设写好接口 ...
- spark源码阅读总纲
spark使用了这么长时间,对于driver.master.worker.BlockManage.RDD.DAGScheduler.TaskScheduler这些概念或多或少都了解一些,但是对于其任务 ...
最新文章
- 在一台机器上搭建多个redis实例
- 微软欢迎所有热爱开源软件的朋友们来投稿![征稿开放时间:2015年8月17日]
- 空间三维散点数据的线性拟合
- Runtime.getRuntime().exec()
- 模块化 组件化 工程化_软件工程中的模块和软件组件
- python 装饰器应用
- 高并发大流量专题---3、前端优化(减少HTTP请求次数)
- 使用Spring Boot构建独立的OAuth服务器(三)
- 项目交换通知——PM(李忠)
- 设计模式 - 建造者模式/生成器模式
- 老司机推荐企业用什么代理ip好
- 【AI SoC】全志R329 高算力低功耗,当下智能音箱的最优解?
- 如何创建Qt quick应用程序
- [leetcode]1438. 绝对差不超过限制的最长连续子数组
- 结合自己经历的一场机器人省赛浅谈如何学习单片机
- 【学习资料】中国开放大学-电大-《教育学》形考作业答案(2018).docx
- 图片压缩网址和工具---TinyPNG
- CCPC-Wannafly Comet OJ 夏季欢乐赛(2019)部分题解
- 1.Object.create() 是什么
- php 获取当前用户的IP
热门文章
- 节省 75%人工处理,行业龙头日丰 集团如何落地精益化生产
- 「从零单排canal 04」 启动模块deployer源码解析
- 计算机电脑水晶字制作图片,电脑技巧收藏家photoshop技巧Photoshop教程:制作透明水晶字(3)...
- 【东大自控笔记8】闭环零点与开环零极点之间的关系
- 2021年绥化一中高考成绩查询,2019年绥化市重点高中排名 绥化市高中人气排行榜...
- 2022年中级会计职称财务管理练习题及答案
- 齐博X1冰蓝后台模板
- 5G直播武汉医院施工现场!百万网友,在线监工
- 标你妹怎么导出html,标你妹啊:PSD文档自动生成标注的工具
- 高德地图去掉定位按钮_怎样修改百度地图店名怎么取消高德地图定位