虽然Kryo支持对RDD的cache和shuffle,但是在Spark中不是内置就显示提供使用Kryo将数据系列化到磁盘中的输入输出API,RDD中的saveAsObjectFile和SparkContext中的objectFile方法仅仅支持使用Java系列化。所以如果我们可以使用Kryo系列化将会变得很棒!

在这篇文章中,我将讨论如何自定义Kryo系列化输出输出相关API来将数据进行读写到磁盘中。

写数据

通常,我们使用rdd.saveAsObjectFile API将已经系列化的对象写入到磁盘中。下面的代码将展示如何使用我们自定义的saveAsObjectFile方法将已经使用kryo系列化的对象写入到磁盘中: def saveAsObjectFile[T: ClassTag](rdd: RDD[T], path: String)

这个函数中参数rdd就是我们需要写的数据;path是数据保存的路径。 val kryoSerializer = new KryoSerializer(rdd.context.getConf)

KryoSerializer是Spark内部提供的用于提供操作Kryo的类。在上述代码中,我们创建了KryoSerializer对象,并从rdd.context.getConf中获取传进来的缓存大小。 rdd.mapPartitions(iter => iter.grouped(10)

.map(_.toArray))

.map(splitArray => {}

所有的objectFile 将会在HDFS上保存,我们对RDD中的每个分片进行遍历,然后将他们转换成Byte数组。 val kryo = kryoSerializer.newKryo()

对每个splitArray,我们首先创建了kryo实例,kryo是线程不安全的,所以我们在每个map操作中单独创建。当我们调用kryoSerializer.newKryo()来创建新的kryo实例,他也会调用我们自定义的KryoRegistrator。 //create output stream and plug it to the kryo output

val bao = new ByteArrayOutputStream()

val output = kryoSerializer.newKryoOutput()

output.setOutputStream(bao)

kryo.writeClassAndObject(output, splitArray)

output.close()

一旦我们拥有kryo实例,我们就可以创建kryo输出对象,然后我们将类信息和对象写入到那个输出对象中。 val byteWritable = new BytesWritable(bao.toByteArray)

(NullWritable.get(), byteWritable)

}).saveAsSequenceFile(path)

我们在创建byteWritable的时候,包装了bytearray,然后保存成Sequence文件。使用那些代码我们就可以将Kryo对象写入到磁盘中。完整代码如下: /**

* User: 过往记忆

* Date: 15-04-24

* Time: 上午07:24

* bolg:

* 本文地址:/archives/1328

* 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货

* 过往记忆博客微信公共帐号:iteblog_hadoop

*/

def saveAsObjectFile[T: ClassTag](rdd: RDD[T], path: String) {

val kryoSerializer = new KryoSerializer(rdd.context.getConf)

rdd.mapPartitions(iter => iter.grouped(10)

.map(_.toArray))

.map(splitArray => {

//initializes kyro and calls your registrator class

val kryo = kryoSerializer.newKryo()

//convert data to bytes

val bao = new ByteArrayOutputStream()

val output = kryoSerializer.newKryoOutput()

output.setOutputStream(bao)

kryo.writeClassAndObject(output, splitArray)

output.close()

// We are ignoring key field of sequence file

val byteWritable = new BytesWritable(bao.toByteArray)

(NullWritable.get(), byteWritable)

}).saveAsSequenceFile(path)

}

光有写没有读对我们来说仍然不完美。通常我们使用sparkContext中的objectFile API从磁盘中读取数据,这里我们使用自定义的objectFile API来读取Kryo对象文件。 def objectFile[T](sc: SparkContext, path: String, minPartitions: Int = 1)

(implicit ct: ClassTag[T]) = {

val kryoSerializer = new KryoSerializer(sc.getConf)

sc.sequenceFile(path, classOf[NullWritable], classOf[BytesWritable],

minPartitions)

.flatMap(x => {

val kryo = kryoSerializer.newKryo()

val input = new Input()

input.setBuffer(x._2.getBytes)

val data = kryo.readClassAndObject(input)

val dataObject = data.asInstanceOf[Array[T]]

dataObject

})

}

上面的步骤和写的步骤很类似,只不过这里我们使用的是input,而不是output。我们从BytesWritable中读取bytes数据,然后使用readClassAndObject API反序列化数据。

如何使用

下面例子使用上面介绍的两个方法来系列化和反序列化Person对象: /**

* User: 过往记忆

* Date: 15-04-24

* Time: 上午07:24

* bolg:

* 本文地址:/archives/1328

* 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货

* 过往记忆博客微信公共帐号:iteblog_hadoop

*/

// user defined class that need to serialized

class Person(val name: String)

def main(args: Array[String]) {

if (args.length < 1) {

println("Please provide output path")

return

}

val outputPath = args(0)

val conf = new SparkConf().setMaster("local").setAppName("kryoexample")

conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

val sc = new SparkContext(conf)

//create some dummy data

val personList = 1 to 10000 map (value => new Person(value + ""))

val personRDD = sc.makeRDD(personList)

saveAsObjectFile(personRDD, outputPath)

val rdd = objectFile[Person](sc, outputPath)

println(rdd.map(person => person.name).collect().toList)

}

spark kryo java,在Spark中自定义Kryo序列化输入输出API相关推荐

  1. 在Spark中自定义Kryo序列化输入输出API(转)

    原文链接:在Spark中自定义Kryo序列化输入输出API 在Spark中内置支持两种系列化格式:(1).Java serialization:(2).Kryo serialization.在默认情况 ...

  2. java sessionstate_在Java Web开发中自定义Session

    Session在存储安全性要求较高的会话信息方面是必不可少的,对于分布式Web应用自定义Session支持独立的状态服务器或集群是必须的.本文就来教大家如何在Java Web开发中自定义Session ...

  3. Java I/O中的对象序列化

    Java I/O中的对象序列化 Java对象序列化将那些实现了Serializable接口的对象转换成一个字节序列,并能够以后将这个字节序列完全恢复为原来的对象.利用对象的序列化,可以实现轻量级持久性 ...

  4. java cxf soapheader_CXF 中自定义SOAPHeader

    Interceptor是CXF架构中一个很有特色的模式.你可以在不对核心模块进行修改的情况下,动态添加很多功能.这对于CXF这个以处理消息为中心的服务框架来说是非常有用的,CXF通过在Intercep ...

  5. 用c#实现通讯中自定义发送序列化数据,可一定程度上实现可编程发送的功能

    C#实现串口发送序列化数据 如下图: 其中红色框内展现的为实现效果图,其中最前面的文本框是要发送的具体字节,可以手动修改,后面的按钮为单击可控制单次发送,同时双机要发送的文本框可修改按钮的标题,用于做 ...

  6. java与c++中的对象序列化分析

    有时候我们在开发项目的时候,对于数据的保存 我们通常是直接将数据保存到磁盘上面 ,但是这样操作起来非常的不方便 ,尤其是在大型的项目开发中.  对象的序列化 可以将对象以数据的形式存储到文件中:反之我 ...

  7. spark中自定义UDAF函数实现的两种方式---UserDefinedAggregateFunction和Aggregator

    1.基于UserDefinedAggregateFunction实现平均数的计算 package com.bigdata.wb.sparkimport org.apache.spark.sql.Row ...

  8. 第10章 Spark(全面解读Spark架构体系)

    概述 Spark简介 Spark诞生于2009年美国加州伯克利分校的AMP实验室,基于内存计算的大数据并行计算框架,可用于构建大型的.低延迟的数据分析应用程序. Spark最初的设计目标是使数据分析更 ...

  9. 大数据技术之Spark(一)——Spark概述

    大数据技术之Spark(一)--Spark概述 文章目录 前言 一.Spark基础 1.1 Spark是什么 1.2 Spark VS Hadoop 1.3 Spark优势及特点 1.3.1 优秀的数 ...

  10. 自定义Redis序列化工具

    为什么用户需要自己创建一个redis配置类? SpringBoot提供了对Redis的自动配置功能,在RedisAutoConfiguration类中默认为我们配置了客户端连接(Lettuce和Jed ...

最新文章

  1. CentOS远程监控
  2. 2001~2020大数据行业怎么样?面临哪些挑战?解决了什么问题?
  3. 安装ssr_网易《代号SSR》电脑版教程!
  4. 【Quartz】解密properties配置文件中的账号密码
  5. 移动端动画使用transform提升性能
  6. laravel实现发送邮件(腾讯企业邮箱)
  7. SpringBoot技术点细解
  8. NTC热敏电阻温度计算方法,Steinhart-Hart方程和B值法
  9. python批量查询(excel)数据
  10. css中子元素设置margin-top会影响到父元素
  11. PostgreSQL+PostGIS下载和离线安装
  12. 计算机文件夹操作教案,文件文件和文件夹教案
  13. ACM计算几何专项练习题目总结
  14. 数学与计算机学院校友会,福州大学数学与计算机科学学院厦门校友会成立
  15. Framer:开源原型设计工具,巨头们的心头好
  16. Node + 讯飞语音 定时播放天气预报音频
  17. 传播问卷调查数据不够?自己生成假数据!
  18. 剑网三怎么查看服务器角色信息,剑网3如何获取角色?以下这些获取方式请全部掌握!...
  19. Opencv报错004:cv::VideoCapture无法读取本地视频文件,报错:cv::CvCapture_Images::open CAP_IMAGES: Stop scanning. Can‘
  20. 前端练习--京东图片链接

热门文章

  1. 商用计算机品牌,请问什么牌子的笔记本比较好啊?要商用的
  2. spring cloud gateway转发服务报错。
  3. 25篇最新CV领域综述性论文速递!涵盖15个方向:目标检测/图像处理/姿态估计/医学影像/人脸识别等方向...
  4. ps排版html,排版教程,超详细适合初学者的排版教程(二)
  5. 阿里云大幅降低CDN价格网宿蓝汛跟不跟?
  6. win7计算机管理中看不到新加的硬盘,win7系统看不到第二块硬盘的解决方法.
  7. win7磁盘管理分区,改变页面文件卷,删除卷就由灰变黑了!
  8. 了解java集合框架
  9. component lists rendered with v-for should have explicit keys.
  10. Log-normal distribution对数正态分布