spark kryo java,在Spark中自定义Kryo序列化输入输出API
在
在
虽然Kryo支持对RDD的cache和shuffle,但是在Spark中不是内置就显示提供使用Kryo将数据系列化到磁盘中的输入输出API,RDD中的saveAsObjectFile和SparkContext中的objectFile方法仅仅支持使用Java系列化。所以如果我们可以使用Kryo系列化将会变得很棒!
在这篇文章中,我将讨论如何自定义Kryo系列化输出输出相关API来将数据进行读写到磁盘中。
写数据
通常,我们使用rdd.saveAsObjectFile API将已经系列化的对象写入到磁盘中。下面的代码将展示如何使用我们自定义的saveAsObjectFile方法将已经使用kryo系列化的对象写入到磁盘中: def saveAsObjectFile[T: ClassTag](rdd: RDD[T], path: String)
这个函数中参数rdd就是我们需要写的数据;path是数据保存的路径。 val kryoSerializer = new KryoSerializer(rdd.context.getConf)
KryoSerializer是Spark内部提供的用于提供操作Kryo的类。在上述代码中,我们创建了KryoSerializer对象,并从rdd.context.getConf中获取传进来的缓存大小。 rdd.mapPartitions(iter => iter.grouped(10)
.map(_.toArray))
.map(splitArray => {}
所有的objectFile 将会在HDFS上保存,我们对RDD中的每个分片进行遍历,然后将他们转换成Byte数组。 val kryo = kryoSerializer.newKryo()
对每个splitArray,我们首先创建了kryo实例,kryo是线程不安全的,所以我们在每个map操作中单独创建。当我们调用kryoSerializer.newKryo()来创建新的kryo实例,他也会调用我们自定义的KryoRegistrator。 //create output stream and plug it to the kryo output
val bao = new ByteArrayOutputStream()
val output = kryoSerializer.newKryoOutput()
output.setOutputStream(bao)
kryo.writeClassAndObject(output, splitArray)
output.close()
一旦我们拥有kryo实例,我们就可以创建kryo输出对象,然后我们将类信息和对象写入到那个输出对象中。 val byteWritable = new BytesWritable(bao.toByteArray)
(NullWritable.get(), byteWritable)
}).saveAsSequenceFile(path)
我们在创建byteWritable的时候,包装了bytearray,然后保存成Sequence文件。使用那些代码我们就可以将Kryo对象写入到磁盘中。完整代码如下: /**
* User: 过往记忆
* Date: 15-04-24
* Time: 上午07:24
* bolg:
* 本文地址:/archives/1328
* 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
* 过往记忆博客微信公共帐号:iteblog_hadoop
*/
def saveAsObjectFile[T: ClassTag](rdd: RDD[T], path: String) {
val kryoSerializer = new KryoSerializer(rdd.context.getConf)
rdd.mapPartitions(iter => iter.grouped(10)
.map(_.toArray))
.map(splitArray => {
//initializes kyro and calls your registrator class
val kryo = kryoSerializer.newKryo()
//convert data to bytes
val bao = new ByteArrayOutputStream()
val output = kryoSerializer.newKryoOutput()
output.setOutputStream(bao)
kryo.writeClassAndObject(output, splitArray)
output.close()
// We are ignoring key field of sequence file
val byteWritable = new BytesWritable(bao.toByteArray)
(NullWritable.get(), byteWritable)
}).saveAsSequenceFile(path)
}
读
光有写没有读对我们来说仍然不完美。通常我们使用sparkContext中的objectFile API从磁盘中读取数据,这里我们使用自定义的objectFile API来读取Kryo对象文件。 def objectFile[T](sc: SparkContext, path: String, minPartitions: Int = 1)
(implicit ct: ClassTag[T]) = {
val kryoSerializer = new KryoSerializer(sc.getConf)
sc.sequenceFile(path, classOf[NullWritable], classOf[BytesWritable],
minPartitions)
.flatMap(x => {
val kryo = kryoSerializer.newKryo()
val input = new Input()
input.setBuffer(x._2.getBytes)
val data = kryo.readClassAndObject(input)
val dataObject = data.asInstanceOf[Array[T]]
dataObject
})
}
上面的步骤和写的步骤很类似,只不过这里我们使用的是input,而不是output。我们从BytesWritable中读取bytes数据,然后使用readClassAndObject API反序列化数据。
如何使用
下面例子使用上面介绍的两个方法来系列化和反序列化Person对象: /**
* User: 过往记忆
* Date: 15-04-24
* Time: 上午07:24
* bolg:
* 本文地址:/archives/1328
* 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
* 过往记忆博客微信公共帐号:iteblog_hadoop
*/
// user defined class that need to serialized
class Person(val name: String)
def main(args: Array[String]) {
if (args.length < 1) {
println("Please provide output path")
return
}
val outputPath = args(0)
val conf = new SparkConf().setMaster("local").setAppName("kryoexample")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sc = new SparkContext(conf)
//create some dummy data
val personList = 1 to 10000 map (value => new Person(value + ""))
val personRDD = sc.makeRDD(personList)
saveAsObjectFile(personRDD, outputPath)
val rdd = objectFile[Person](sc, outputPath)
println(rdd.map(person => person.name).collect().toList)
}
spark kryo java,在Spark中自定义Kryo序列化输入输出API相关推荐
- 在Spark中自定义Kryo序列化输入输出API(转)
原文链接:在Spark中自定义Kryo序列化输入输出API 在Spark中内置支持两种系列化格式:(1).Java serialization:(2).Kryo serialization.在默认情况 ...
- java sessionstate_在Java Web开发中自定义Session
Session在存储安全性要求较高的会话信息方面是必不可少的,对于分布式Web应用自定义Session支持独立的状态服务器或集群是必须的.本文就来教大家如何在Java Web开发中自定义Session ...
- Java I/O中的对象序列化
Java I/O中的对象序列化 Java对象序列化将那些实现了Serializable接口的对象转换成一个字节序列,并能够以后将这个字节序列完全恢复为原来的对象.利用对象的序列化,可以实现轻量级持久性 ...
- java cxf soapheader_CXF 中自定义SOAPHeader
Interceptor是CXF架构中一个很有特色的模式.你可以在不对核心模块进行修改的情况下,动态添加很多功能.这对于CXF这个以处理消息为中心的服务框架来说是非常有用的,CXF通过在Intercep ...
- 用c#实现通讯中自定义发送序列化数据,可一定程度上实现可编程发送的功能
C#实现串口发送序列化数据 如下图: 其中红色框内展现的为实现效果图,其中最前面的文本框是要发送的具体字节,可以手动修改,后面的按钮为单击可控制单次发送,同时双机要发送的文本框可修改按钮的标题,用于做 ...
- java与c++中的对象序列化分析
有时候我们在开发项目的时候,对于数据的保存 我们通常是直接将数据保存到磁盘上面 ,但是这样操作起来非常的不方便 ,尤其是在大型的项目开发中. 对象的序列化 可以将对象以数据的形式存储到文件中:反之我 ...
- spark中自定义UDAF函数实现的两种方式---UserDefinedAggregateFunction和Aggregator
1.基于UserDefinedAggregateFunction实现平均数的计算 package com.bigdata.wb.sparkimport org.apache.spark.sql.Row ...
- 第10章 Spark(全面解读Spark架构体系)
概述 Spark简介 Spark诞生于2009年美国加州伯克利分校的AMP实验室,基于内存计算的大数据并行计算框架,可用于构建大型的.低延迟的数据分析应用程序. Spark最初的设计目标是使数据分析更 ...
- 大数据技术之Spark(一)——Spark概述
大数据技术之Spark(一)--Spark概述 文章目录 前言 一.Spark基础 1.1 Spark是什么 1.2 Spark VS Hadoop 1.3 Spark优势及特点 1.3.1 优秀的数 ...
- 自定义Redis序列化工具
为什么用户需要自己创建一个redis配置类? SpringBoot提供了对Redis的自动配置功能,在RedisAutoConfiguration类中默认为我们配置了客户端连接(Lettuce和Jed ...
最新文章
- CentOS远程监控
- 2001~2020大数据行业怎么样?面临哪些挑战?解决了什么问题?
- 安装ssr_网易《代号SSR》电脑版教程!
- 【Quartz】解密properties配置文件中的账号密码
- 移动端动画使用transform提升性能
- laravel实现发送邮件(腾讯企业邮箱)
- SpringBoot技术点细解
- NTC热敏电阻温度计算方法,Steinhart-Hart方程和B值法
- python批量查询(excel)数据
- css中子元素设置margin-top会影响到父元素
- PostgreSQL+PostGIS下载和离线安装
- 计算机文件夹操作教案,文件文件和文件夹教案
- ACM计算几何专项练习题目总结
- 数学与计算机学院校友会,福州大学数学与计算机科学学院厦门校友会成立
- Framer:开源原型设计工具,巨头们的心头好
- Node + 讯飞语音 定时播放天气预报音频
- 传播问卷调查数据不够?自己生成假数据!
- 剑网三怎么查看服务器角色信息,剑网3如何获取角色?以下这些获取方式请全部掌握!...
- Opencv报错004:cv::VideoCapture无法读取本地视频文件,报错:cv::CvCapture_Images::open CAP_IMAGES: Stop scanning. Can‘
- 前端练习--京东图片链接
热门文章
- 商用计算机品牌,请问什么牌子的笔记本比较好啊?要商用的
- spring cloud gateway转发服务报错。
- 25篇最新CV领域综述性论文速递!涵盖15个方向:目标检测/图像处理/姿态估计/医学影像/人脸识别等方向...
- ps排版html,排版教程,超详细适合初学者的排版教程(二)
- 阿里云大幅降低CDN价格网宿蓝汛跟不跟?
- win7计算机管理中看不到新加的硬盘,win7系统看不到第二块硬盘的解决方法.
- win7磁盘管理分区,改变页面文件卷,删除卷就由灰变黑了!
- 了解java集合框架
- component lists rendered with v-for should have explicit keys.
- Log-normal distribution对数正态分布