原文链接:在Spark中自定义Kryo序列化输入输出API

在Spark中内置支持两种系列化格式:(1)、Java serialization;(2)、Kryo serialization。在默认情况下,Spark使用的是Java的ObjectOutputStream系列化框架,它支持所有继承java.io.Serializable的类系列化,虽然Java系列化非常灵活,但是它的性能不佳。然而我们可以使用Kryo 库来系列化,它相比Java serialization系列化高效,速度很快(通常比Java快10x),但是它不支持所有的系列化对象,而且要求用户注册类。

  在Spark中,使用Kryo系列化比使用Java系列化更明智。在shuffling和caching大量数据的情况下,使用 Kryo系列化就变得非常重要。

  虽然Kryo支持对RDD的cache和shuffle,但是在Spark中不是内置就显示提供使用Kryo将数据系列化到磁盘中的输入输出API,RDD中的saveAsObjectFile和SparkContext中的objectFile方法仅仅支持使用Java系列化。所以如果我们可以使用Kryo系列化将会变得很棒!

  在这篇文章中,我将讨论如何自定义Kryo系列化输出输出相关API来将数据进行读写到磁盘中。

写数据

  通常,我们使用rdd.saveAsObjectFile API将已经系列化的对象写入到磁盘中。下面的代码将展示如何使用我们自定义的saveAsObjectFile方法将已经使用kryo系列化的对象写入到磁盘中:

1 def saveAsObjectFile[T: ClassTag](rdd: RDD[T], path: String)

这个函数中参数rdd就是我们需要写的数据;path是数据保存的路径。

1 val kryoSerializer = new KryoSerializer(rdd.context.getConf)

  KryoSerializer是Spark内部提供的用于提供操作Kryo的类。在上述代码中,我们创建了KryoSerializer对象,并从rdd.context.getConf中获取传进来的缓存大小。

1 rdd.mapPartitions(iter => iter.grouped(10)
2       .map(_.toArray))
3       .map(splitArray => {}

所有的objectFile 将会在HDFS上保存,我们对RDD中的每个分片进行遍历,然后将他们转换成Byte数组。

1 val kryo = kryoSerializer.newKryo()

  对每个splitArray,我们首先创建了kryo实例,kryo是线程不安全的,所以我们在每个map操作中单独创建。当我们调用 kryoSerializer.newKryo()来创建新的kryo实例,他也会调用我们自定义的KryoRegistrator。

1 //create output stream and plug it to the kryo output
2 val bao = new ByteArrayOutputStream()
3 val output = kryoSerializer.newKryoOutput()
4 output.setOutputStream(bao)
5 kryo.writeClassAndObject(output, splitArray)
6 output.close()

一旦我们拥有kryo实例,我们就可以创建kryo输出对象,然后我们将类信息和对象写入到那个输出对象中。

1 val byteWritable = new BytesWritable(bao.toByteArray)
2       (NullWritable.get(), byteWritable)
3     }).saveAsSequenceFile(path)

  我们在创建byteWritable的时候,包装了bytearray,然后保存成Sequence文件。使用那些代码我们就可以将Kryo对象写入到磁盘中。完整代码如下:

01 /**
02  * User: 过往记忆
03  * Date: 15-04-24
04  * Time: 上午07:24
05  * bolg: http://www.iteblog.com
06  * 本文地址:http://www.iteblog.com/archives/1328
07  * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
08  * 过往记忆博客微信公共帐号:iteblog_hadoop
09  */
10  
11   def saveAsObjectFile[T: ClassTag](rdd: RDD[T], path: String) {
12     val kryoSerializer = new KryoSerializer(rdd.context.getConf)
13  
14     rdd.mapPartitions(iter => iter.grouped(10)
15       .map(_.toArray))
16       .map(splitArray => {
17       //initializes kyro and calls your registrator class
18       val kryo = kryoSerializer.newKryo()
19  
20       //convert data to bytes
21       val bao = new ByteArrayOutputStream()
22       val output = kryoSerializer.newKryoOutput()
23       output.setOutputStream(bao)
24       kryo.writeClassAndObject(output, splitArray)
25       output.close()
26  
27       // We are ignoring key field of sequence file
28       val byteWritable = new BytesWritable(bao.toByteArray)
29       (NullWritable.get(), byteWritable)
30     }).saveAsSequenceFile(path)
31   }

读数据

  光有写没有读对我们来说仍然不完美。通常我们使用sparkContext中的objectFile API从磁盘中读取数据,这里我们使用自定义的objectFile API来读取Kryo对象文件。

01 def objectFile[T](sc: SparkContext, path: String, minPartitions: Int = 1)
02     (implicit ct: ClassTag[T]) = {
03     val kryoSerializer = new KryoSerializer(sc.getConf)
04     sc.sequenceFile(path, classOf[NullWritable], classOf[BytesWritable],
05        minPartitions)
06        .flatMap(x => {
07        val kryo = kryoSerializer.newKryo()
08        val input = new Input()
09        input.setBuffer(x._2.getBytes)
10        val data = kryo.readClassAndObject(input)
11        val dataObject = data.asInstanceOf[Array[T]]
12        dataObject
13     })
14   }

上面的步骤和写的步骤很类似,只不过这里我们使用的是input,而不是output。我们从BytesWritable中读取bytes数据,然后使用readClassAndObject API反序列化数据。

如何使用

  下面例子使用上面介绍的两个方法来系列化和反序列化Person对象:

查看源代码 打印帮助
01 /**
02  * User: 过往记忆
03  * Date: 15-04-24
04  * Time: 上午07:24
05  * bolg: http://www.iteblog.com
06  * 本文地址:http://www.iteblog.com/archives/1328
07  * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
08  * 过往记忆博客微信公共帐号:iteblog_hadoop
09  */
10  
11 // user defined class that need to serialized
12   class Person(val name: String)
13  
14  def main(args: Array[String]) {
15  
16     if (args.length < 1) {
17       println("Please provide output path")
18       return
19     }
20     val outputPath = args(0)
21  
22     val conf = new SparkConf().setMaster("local").setAppName("kryoexample")
23     conf.set("spark.serializer""org.apache.spark.serializer.KryoSerializer")
24     val sc = new SparkContext(conf)
25  
26     //create some dummy data
27     val personList = 1 to 10000 map (value =new Person(value + ""))
28     val personRDD = sc.makeRDD(personList)
29  
30     saveAsObjectFile(personRDD, outputPath)
31     val rdd = objectFile[Person](sc, outputPath)
32     println(rdd.map(person => person.name).collect().toList)
33   }

转载于:https://www.cnblogs.com/gaopeng527/p/4962030.html

在Spark中自定义Kryo序列化输入输出API(转)相关推荐

  1. spark中自定义UDAF函数实现的两种方式---UserDefinedAggregateFunction和Aggregator

    1.基于UserDefinedAggregateFunction实现平均数的计算 package com.bigdata.wb.sparkimport org.apache.spark.sql.Row ...

  2. 用c#实现通讯中自定义发送序列化数据,可一定程度上实现可编程发送的功能

    C#实现串口发送序列化数据 如下图: 其中红色框内展现的为实现效果图,其中最前面的文本框是要发送的具体字节,可以手动修改,后面的按钮为单击可控制单次发送,同时双机要发送的文本框可修改按钮的标题,用于做 ...

  3. Spark中自定义排序

    项目创建参考:http://blog.csdn.net/tototuzuoquan/article/details/74571374 package cn.toto.sparkimport org.a ...

  4. spark之kryo序列化及其使用

    spark之kryo序列化 spark之kryo 序列化 Spark 中使用 Kryo序列化 中文切词案例: spark之kryo 序列化 1.定义:把对象转换为字节序列的过程称为对象的序列化. 把字 ...

  5. Spark 配置Kryo序列化机制注意细节

    一.Spark 的序列化 序列化 Spark 是一个高性能.分布式的.基于内存计算的计算引擎,Spark 集群中包含多个节点,各节点之间要进行通信(比如数据传输,Spark 通过 RPC 进行节点间的 ...

  6. Spark中RDD、DataFrame和DataSet的区别与联系

    一.RDD.DataFrame和DataSet的定义 在开始Spark RDD与DataFrame与Dataset之间的比较之前,先让我们看一下Spark中的RDD,DataFrame和Dataset ...

  7. 广播变量kyro_利用Kryo序列化库是你提升Spark性能要做的第一件事

    本文基于Spark2.1.0版本 套用官文Tuning Spark中的一句话作为文章的标题: *Often, choose a serialization type will be the first ...

  8. spark 中的RDD编程:基于Java api

    1.RDD介绍: RDD,弹性分布式数据集,即分布式的元素集合.在spark中,对所有数据的操作不外乎是创建RDD.转化已有的RDD以及调用RDD操作进行求值.在这一切的背后,Spark会自动将RDD ...

  9. 【Spark】Spark的Kryo序列化

    1.美图 2.Spark序列化概述 在Spark的架构中,在网络中传递的或者缓存在内存.硬盘中的对象需要进行序列化操作,序列化的作用主要是利用时间换空间: 分发给Executor上的Task 需要缓存 ...

最新文章

  1. linux无法启动之-“/dev/xxx unexpected inconsistency, run fsck manually”的解决
  2. 简单线性回归预测实现
  3. JAVA中使用bos做视频上传_JAVA语言之搭建物流BOS项目骨架
  4. 论文笔记 Aggregated Residual Transformations for Deep Neural Networks
  5. random_state的值如何选_算法萌新如何学好动态规划(3)
  6. IntelliJ IDEA使用技巧——关于版本控制(上)
  7. php防丢包,记一次丢包网络故障
  8. sqlalchemy按月水平分表、python元类、动态映射表名automap_base\ 模型类
  9. 产品研发管理和研发项目管理
  10. Linux编译安装iozone,Fedora下NFS的配置与iozone测试
  11. 深度学习对抗样本的防御方法
  12. 北大计算机博士毕业难度,北京大学博士毕业要求
  13. springcloud搭建实战<十一>【config配置中心】
  14. LayUI 性别前端显示
  15. 让微软起死回生之作:CEO纳德拉18年新书《刷新》
  16. SSM整合案例分析(详解)
  17. JButton:按钮组件
  18. 【python办公自动化(19)】利用python发送邮件(每天向邮箱发送一条定时新闻)
  19. leetcode | 971. Flip Binary Tree To Match Preorder Traversal(DFS/preorder)
  20. 基于有源钳位三电平的有源电力滤波器(ANPC-APF)MATLAB仿真,包括自建的DSOGI锁相模块和PQ谐波检测模块。 可简单解释。

热门文章

  1. UBoot讲解和实践-----------讲解(一)
  2. 第七章 PX4-Pixhawk-Mavlink解析
  3. #1419 : 后缀数组四·重复旋律4 (重复次数最多的连续字串)
  4. xhtml代码 中<pre>元素简单介绍
  5. [精选代码笔记]Anagram, group-anagrams, two sum
  6. java properties 路径问题_Java 读取Properties文件时应注意的路径问题
  7. 稀疏数组与原始数组之间的转换
  8. 优酷开放SDK之setOnTimeOutListener
  9. jquery ajax error但状态是200,jQuery $ .ajaxError()在200上运行 - 好的
  10. LTE小区选择和重选