在Spark中自定义Kryo序列化输入输出API(转)
原文链接:在Spark中自定义Kryo序列化输入输出API
在Spark中内置支持两种系列化格式:(1)、Java serialization;(2)、Kryo serialization。在默认情况下,Spark使用的是Java的ObjectOutputStream系列化框架,它支持所有继承java.io.Serializable的类系列化,虽然Java系列化非常灵活,但是它的性能不佳。然而我们可以使用Kryo 库来系列化,它相比Java serialization系列化高效,速度很快(通常比Java快10x),但是它不支持所有的系列化对象,而且要求用户注册类。
在Spark中,使用Kryo系列化比使用Java系列化更明智。在shuffling和caching大量数据的情况下,使用 Kryo系列化就变得非常重要。
虽然Kryo支持对RDD的cache和shuffle,但是在Spark中不是内置就显示提供使用Kryo将数据系列化到磁盘中的输入输出API,RDD中的saveAsObjectFile和SparkContext中的objectFile方法仅仅支持使用Java系列化。所以如果我们可以使用Kryo系列化将会变得很棒!
在这篇文章中,我将讨论如何自定义Kryo系列化输出输出相关API来将数据进行读写到磁盘中。
写数据
通常,我们使用rdd.saveAsObjectFile
API将已经系列化的对象写入到磁盘中。下面的代码将展示如何使用我们自定义的saveAsObjectFile
方法将已经使用kryo系列化的对象写入到磁盘中:
1
|
def saveAsObjectFile[T : ClassTag](rdd : RDD[T], path : String)
|
这个函数中参数rdd就是我们需要写的数据;path是数据保存的路径。
1
|
val kryoSerializer = new KryoSerializer(rdd.context.getConf)
|
KryoSerializer
是Spark内部提供的用于提供操作Kryo的类。在上述代码中,我们创建了KryoSerializer
对象,并从rdd.context.getConf
中获取传进来的缓存大小。
1
|
rdd.mapPartitions(iter = > iter.grouped( 10 )
|
2
|
.map( _ .toArray))
|
3
|
.map(splitArray = > {}
|
所有的objectFile 将会在HDFS上保存,我们对RDD中的每个分片进行遍历,然后将他们转换成Byte数组。
1
|
val kryo = kryoSerializer.newKryo()
|
对每个splitArray,我们首先创建了kryo实例,kryo是线程不安全的,所以我们在每个map操作中单独创建。当我们调用 kryoSerializer.newKryo()
来创建新的kryo实例,他也会调用我们自定义的KryoRegistrator。
1
|
//create output stream and plug it to the kryo output
|
2
|
val bao = new ByteArrayOutputStream()
|
3
|
val output = kryoSerializer.newKryoOutput()
|
4
|
output.setOutputStream(bao)
|
5
|
kryo.writeClassAndObject(output, splitArray)
|
6
|
output.close()
|
一旦我们拥有kryo实例,我们就可以创建kryo输出对象,然后我们将类信息和对象写入到那个输出对象中。
1
|
val byteWritable = new BytesWritable(bao.toByteArray)
|
2
|
(NullWritable.get(), byteWritable)
|
3
|
}).saveAsSequenceFile(path)
|
我们在创建byteWritable的时候,包装了bytearray,然后保存成Sequence文件。使用那些代码我们就可以将Kryo对象写入到磁盘中。完整代码如下:
01
|
/**
|
02
|
* User: 过往记忆
|
03
|
* Date: 15-04-24
|
04
|
* Time: 上午07:24
|
05
|
* bolg: http://www.iteblog.com
|
06
|
* 本文地址:http://www.iteblog.com/archives/1328
|
07
|
* 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
|
08
|
* 过往记忆博客微信公共帐号:iteblog_hadoop
|
09
|
*/
|
10
|
11
|
def saveAsObjectFile[T : ClassTag](rdd : RDD[T], path : String) {
|
12
|
val kryoSerializer = new KryoSerializer(rdd.context.getConf)
|
13
|
14
|
rdd.mapPartitions(iter = > iter.grouped( 10 )
|
15
|
.map( _ .toArray))
|
16
|
.map(splitArray = > {
|
17
|
//initializes kyro and calls your registrator class
|
18
|
val kryo = kryoSerializer.newKryo()
|
19
|
20
|
//convert data to bytes
|
21
|
val bao = new ByteArrayOutputStream()
|
22
|
val output = kryoSerializer.newKryoOutput()
|
23
|
output.setOutputStream(bao)
|
24
|
kryo.writeClassAndObject(output, splitArray)
|
25
|
output.close()
|
26
|
27
|
// We are ignoring key field of sequence file
|
28
|
val byteWritable = new BytesWritable(bao.toByteArray)
|
29
|
(NullWritable.get(), byteWritable)
|
30
|
}).saveAsSequenceFile(path)
|
31
|
}
|
读数据
光有写没有读对我们来说仍然不完美。通常我们使用sparkContext中的objectFile API从磁盘中读取数据,这里我们使用自定义的objectFile API来读取Kryo对象文件。
01
|
def objectFile[T](sc : SparkContext, path : String, minPartitions : Int = 1 )
|
02
|
( implicit ct : ClassTag[T]) = {
|
03
|
val kryoSerializer = new KryoSerializer(sc.getConf)
|
04
|
sc.sequenceFile(path, classOf[NullWritable], classOf[BytesWritable],
|
05
|
minPartitions)
|
06
|
.flatMap(x = > {
|
07
|
val kryo = kryoSerializer.newKryo()
|
08
|
val input = new Input()
|
09
|
input.setBuffer(x. _ 2 .getBytes)
|
10
|
val data = kryo.readClassAndObject(input)
|
11
|
val dataObject = data.asInstanceOf[Array[T]]
|
12
|
dataObject
|
13
|
})
|
14
|
}
|
上面的步骤和写的步骤很类似,只不过这里我们使用的是input,而不是output。我们从BytesWritable中读取bytes数据,然后使用readClassAndObject API反序列化数据。
如何使用
下面例子使用上面介绍的两个方法来系列化和反序列化Person对象:
01
|
/**
|
02
|
* User: 过往记忆
|
03
|
* Date: 15-04-24
|
04
|
* Time: 上午07:24
|
05
|
* bolg: http://www.iteblog.com
|
06
|
* 本文地址:http://www.iteblog.com/archives/1328
|
07
|
* 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
|
08
|
* 过往记忆博客微信公共帐号:iteblog_hadoop
|
09
|
*/
|
10
|
11
|
// user defined class that need to serialized
|
12
|
class Person( val name : String)
|
13
|
14
|
def main(args : Array[String]) {
|
15
|
16
|
if (args.length < 1 ) {
|
17
|
println( "Please provide output path" )
|
18
|
return
|
19
|
}
|
20
|
val outputPath = args( 0 )
|
21
|
22
|
val conf = new SparkConf().setMaster( "local" ).setAppName( "kryoexample" )
|
23
|
conf.set( "spark.serializer" , "org.apache.spark.serializer.KryoSerializer" )
|
24
|
val sc = new SparkContext(conf)
|
25
|
26
|
//create some dummy data
|
27
|
val personList = 1 to 10000 map (value = > new Person(value + "" ))
|
28
|
val personRDD = sc.makeRDD(personList)
|
29
|
30
|
saveAsObjectFile(personRDD, outputPath)
|
31
|
val rdd = objectFile[Person](sc, outputPath)
|
32
|
println(rdd.map(person = > person.name).collect().toList)
|
33
|
}
|
转载于:https://www.cnblogs.com/gaopeng527/p/4962030.html
在Spark中自定义Kryo序列化输入输出API(转)相关推荐
- spark中自定义UDAF函数实现的两种方式---UserDefinedAggregateFunction和Aggregator
1.基于UserDefinedAggregateFunction实现平均数的计算 package com.bigdata.wb.sparkimport org.apache.spark.sql.Row ...
- 用c#实现通讯中自定义发送序列化数据,可一定程度上实现可编程发送的功能
C#实现串口发送序列化数据 如下图: 其中红色框内展现的为实现效果图,其中最前面的文本框是要发送的具体字节,可以手动修改,后面的按钮为单击可控制单次发送,同时双机要发送的文本框可修改按钮的标题,用于做 ...
- Spark中自定义排序
项目创建参考:http://blog.csdn.net/tototuzuoquan/article/details/74571374 package cn.toto.sparkimport org.a ...
- spark之kryo序列化及其使用
spark之kryo序列化 spark之kryo 序列化 Spark 中使用 Kryo序列化 中文切词案例: spark之kryo 序列化 1.定义:把对象转换为字节序列的过程称为对象的序列化. 把字 ...
- Spark 配置Kryo序列化机制注意细节
一.Spark 的序列化 序列化 Spark 是一个高性能.分布式的.基于内存计算的计算引擎,Spark 集群中包含多个节点,各节点之间要进行通信(比如数据传输,Spark 通过 RPC 进行节点间的 ...
- Spark中RDD、DataFrame和DataSet的区别与联系
一.RDD.DataFrame和DataSet的定义 在开始Spark RDD与DataFrame与Dataset之间的比较之前,先让我们看一下Spark中的RDD,DataFrame和Dataset ...
- 广播变量kyro_利用Kryo序列化库是你提升Spark性能要做的第一件事
本文基于Spark2.1.0版本 套用官文Tuning Spark中的一句话作为文章的标题: *Often, choose a serialization type will be the first ...
- spark 中的RDD编程:基于Java api
1.RDD介绍: RDD,弹性分布式数据集,即分布式的元素集合.在spark中,对所有数据的操作不外乎是创建RDD.转化已有的RDD以及调用RDD操作进行求值.在这一切的背后,Spark会自动将RDD ...
- 【Spark】Spark的Kryo序列化
1.美图 2.Spark序列化概述 在Spark的架构中,在网络中传递的或者缓存在内存.硬盘中的对象需要进行序列化操作,序列化的作用主要是利用时间换空间: 分发给Executor上的Task 需要缓存 ...
最新文章
- linux无法启动之-“/dev/xxx unexpected inconsistency, run fsck manually”的解决
- 简单线性回归预测实现
- JAVA中使用bos做视频上传_JAVA语言之搭建物流BOS项目骨架
- 论文笔记 Aggregated Residual Transformations for Deep Neural Networks
- random_state的值如何选_算法萌新如何学好动态规划(3)
- IntelliJ IDEA使用技巧——关于版本控制(上)
- php防丢包,记一次丢包网络故障
- sqlalchemy按月水平分表、python元类、动态映射表名automap_base\ 模型类
- 产品研发管理和研发项目管理
- Linux编译安装iozone,Fedora下NFS的配置与iozone测试
- 深度学习对抗样本的防御方法
- 北大计算机博士毕业难度,北京大学博士毕业要求
- springcloud搭建实战<十一>【config配置中心】
- LayUI 性别前端显示
- 让微软起死回生之作:CEO纳德拉18年新书《刷新》
- SSM整合案例分析(详解)
- JButton:按钮组件
- 【python办公自动化(19)】利用python发送邮件(每天向邮箱发送一条定时新闻)
- leetcode | 971. Flip Binary Tree To Match Preorder Traversal(DFS/preorder)
- 基于有源钳位三电平的有源电力滤波器(ANPC-APF)MATLAB仿真,包括自建的DSOGI锁相模块和PQ谐波检测模块。 可简单解释。
热门文章
- UBoot讲解和实践-----------讲解(一)
- 第七章 PX4-Pixhawk-Mavlink解析
- #1419 : 后缀数组四·重复旋律4 (重复次数最多的连续字串)
- xhtml代码 中<pre>元素简单介绍
- [精选代码笔记]Anagram, group-anagrams, two sum
- java properties 路径问题_Java 读取Properties文件时应注意的路径问题
- 稀疏数组与原始数组之间的转换
- 优酷开放SDK之setOnTimeOutListener
- jquery ajax error但状态是200,jQuery $ .ajaxError()在200上运行 - 好的
- LTE小区选择和重选