本文基于Spark2.1.0版本

套用官文Tuning Spark中的一句话作为文章的标题:

*Often, choose a serialization type will be the first thing you should tune to optimize a Spark application. *

在Spark的架构中,在网络中传递的或者缓存在内存、硬盘中的对象需要进行序列化操作,序列化的作用主要是利用时间换空间:

分发给Executor上的Task

需要缓存的RDD(前提是使用序列化方式缓存)

广播变量

Shuffle过程中的数据缓存

使用receiver方式接收的流数据缓存

算子函数中使用的外部变量

上面的六种数据,通过Java序列化(默认的序列化方式)形成一个二进制字节数组,大大减少了数据在内存、硬盘中占用的空间,减少了网络数据传输的开销,并且可以精确的推测内存使用情况,降低GC频率。

其好处很多,但是缺陷也很明显:

把数据序列化为字节数组、把字节数组反序列化为对象的操作,是会消耗CPU、延长作业时间的,从而降低了Spark的性能。

至少默认的Java序列化方式在这方面是不尽如人意的。Java序列化很灵活但性能较差,同时序列化后占用的字节数也较多。

所以官方也推荐尽量使用Kryo的序列化库(版本2)。官文介绍,Kryo序列化机制比Java序列化机制性能提高10倍左右,Spark之所以没有默认使用Kryo作为序列化类库,是因为它不支持所有对象的序列化,同时Kryo需要用户在使用前注册需要序列化的类型,不够方便。

由于 Spark2.1.0默认对Task使用Java序列化(该序列化方式不允许修改,源码如下),

/**

* Helper method to create a SparkEnv for a driver or an executor.

*/

private def create(

conf: SparkConf,

executorId: String,

bindAddress: String,

advertiseAddress: String,

port: Int,

isLocal: Boolean,

numUsableCores: Int,

ioEncryptionKey: Option[Array[Byte]],

listenerBus: LiveListenerBus = null,

mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {

val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER

...

val serializer = instantiateClassFromConf[Serializer](

"spark.serializer", "org.apache.spark.serializer.JavaSerializer")

logDebug(s"Using serializer: ${serializer.getClass}")

val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)

val closureSerializer = new JavaSerializer(conf) --Task闭包函数使用Java序列化库

所以本文主要针对下面这五种数据类型:

需要缓存的RDD(前提是使用序列化方式缓存)

广播变量

Shuffle过程中的数据缓存

使用receiver方式接收的流数据缓存

算子函数中使用的外部变量

其实从Spark 2.0.0版本开始,简单类型、简单类型数组、字符串类型的Shuffling RDDs 已经默认使用Kryo序列化方式了。

下面,我给出具体的流程,来切换到Kryo序列化库。

先介绍几个相关的配置:

Property Name

Default

Meaning

spark.serializer

org.apache.spark.serializer.JavaSerializer

Class to use for serializing objects that will be sent over the network or need to be cached in serialized form. The default of Java serialization works with any Serializable Java object but is quite slow, so we recommend using org.apache.spark.serializer.KryoSerializer and configuring Kryo serialization when speed is necessary.

spark.kryoserializer.buffer

64k

Initial size of Kryo's serialization buffer. Note that there will be one buffer per core on each worker. This buffer will grow up to spark.kryoserializer.buffer.max if needed.

spark.kryoserializer.buffer.max

64m

Maximum allowable size of Kryo serialization buffer. This must be larger than any object you attempt to serialize. Increase this if you get a "buffer limit exceeded" exception inside Kryo.

spark.kryo.classesToRegister

(none)

If you use Kryo serialization, give a comma-separated list of custom class names to register with Kryo. See the tuning guide for more details.

spark.kryo.referenceTracking

true

Whether to track references to the same object when serializing data with Kryo, which is necessary if your object graphs have loops and useful for efficiency if they contain multiple copies of the same object. Can be disabled to improve performance if you know this is not the case.

spark.kryo.registrationRequired

false

Whether to require registration with Kryo. If set to 'true', Kryo will throw an exception if an unregistered class is serialized. If set to false (the default), Kryo will write unregistered class names along with each object. Writing class names can cause significant performance overhead, so enabling this option can enforce strictly that a user has not omitted classes from registration.

spark.kryo.registrator

(none)

If you use Kryo serialization, give a comma-separated list of classes that register your custom classes with Kryo. This property is useful if you need to register your classes in a custom way, e.g. to specify a custom field serializer. Otherwise spark.kryo.classesToRegister is simpler. It should be set to classes that extend KryoRegistrator. See the tuning guide for more details.

spark.kryo.unsafe

false

Whether to use unsafe based Kryo serializer. Can be substantially faster by using Unsafe Based IO.

配置说明:(当使用Kryo序列化库时)

spark.kryo.classesToRegister:向Kryo注册自定义的的类型,类名间用逗号分隔

spark.kryo.referenceTracking:跟踪对同一个对象的引用情况,这对发现有循环引用或同一对象有多个副本的情况是很有用的。设置为false可以提高性能

spark.kryo.registrationRequired:是否需要在Kryo登记注册?如果为true,则序列化一个未注册的类时会抛出异常

spark.kryo.registrator:为Kryo设置这个类去注册你自定义的类。最后,如果你不注册需要序列化的自定义类型,Kryo也能工作,不过每一个对象实例的序列化结果都会包含一份完整的类名,这有点浪费空间

spark.kryo.unsafe:如果想更加提升性能,可以使用Kryo unsafe方式

spark.kryoserializer.buffer:每个Executor中的每个core对应着一个序列化buffer。如果你的对象很大,可能需要增大该配置项。其值不能超过spark.kryoserializer.buffer.max

spark.kryoserializer.buffer.max:允许使用序列化buffer的最大值

spark.serializer:序列化时用的类,需要申明为org.apache.spark.serializer.KryoSerializer。这个设置不仅控制各个worker节点之间的混洗数据序列化格式,同时还控制RDD存到磁盘上的序列化格式及广播变量的序列化格式。

更多的Kryo配置及使用细节,参考文末的链接

主要的使用过程就三步:

设置序列化使用的库

conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); //使用Kryo序列化库

在该库中注册用户定义的类型

conf.set("spark.kryo.registrator", toKryoRegistrator.class.getName()); //在Kryo序列化库中注册自定义的类集合

在自定义类中实现KryoRegistrator接口的registerClasses方法

public static class toKryoRegistrator implements KryoRegistrator {

public void registerClasses(Kryo kryo) {

kryo.register(tmp1.class, new FieldSerializer(kryo, tmp1.class)); //在Kryo序列化库中注册自定义的类

kryo.register(tmp2.class, new FieldSerializer(kryo, tmp2.class)); //在Kryo序列化库中注册自定义的类

}

}

具体的源码如下(关键点见源码中的注释):

import java.util.Arrays;

import java.util.Iterator;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.function.FlatMapFunction;

import org.apache.spark.serializer.KryoRegistrator;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;

import com.esotericsoftware.kryo.Kryo;

import com.esotericsoftware.kryo.serializers.FieldSerializer;

import org.apache.spark.storage.StorageLevel;

import java.util.regex.Pattern;

import java.io.IOException;

import java.io.InputStream;

import java.io.FileInputStream;

import java.io.BufferedReader;

import java.io.InputStreamReader;

import org.apache.spark.broadcast.Broadcast;

public final class javakryoserializer {

private static final Pattern SPACE = Pattern.compile(" ");

// This is our custom class we will configure Kyro to serialize

static class tmp1 implements java.io.Serializable {

public int total_;

public int num_;

}

static class tmp2 implements java.io.Serializable {

public tmp2 (String ss)

{

s = ss;

}

public String s;

}

public static class toKryoRegistrator implements KryoRegistrator {

public void registerClasses(Kryo kryo) {

kryo.register(tmp1.class, new FieldSerializer(kryo, tmp1.class)); //在Kryo序列化库中注册自定义的类

kryo.register(tmp2.class, new FieldSerializer(kryo, tmp2.class)); //在Kryo序列化库中注册自定义的类

}

}

public static void readToBuffer(StringBuffer buffer, String filePath) throws IOException {

InputStream is = new FileInputStream(filePath);

String line; // 用来保存每行读取的内容

BufferedReader reader = new BufferedReader(new InputStreamReader(is));

line = reader.readLine(); // 读取第一行

while (line != null) { // 如果 line 为空说明读完了

buffer.append(line); // 将读到的内容添加到 buffer 中

buffer.append("\n"); // 添加换行符

line = reader.readLine(); // 读取下一行

}

reader.close();

is.close();

}

public static void main(String[] args) throws Exception {

SparkConf conf = new SparkConf().setMaster("local").setAppName("basicavgwithkyro");

conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); //使用Kryo序列化库,如果要使用Java序列化库,需要把该行屏蔽掉

conf.set("spark.kryo.registrator", toKryoRegistrator.class.getName()); //在Kryo序列化库中注册自定义的类集合,如果要使用Java序列化库,需要把该行屏蔽掉

JavaSparkContext sc = new JavaSparkContext(conf);

StringBuffer sb = new StringBuffer();

javakryoserializer.readToBuffer(sb, args[0]);

final Broadcast stringBV = sc.broadcast(new tmp2(sb.toString()));

JavaRDD rdd1 = sc.textFile(args[1]);

JavaRDD rdd2 = rdd1.flatMap(new FlatMapFunction() {

@Override

public Iterator call(String s) {

return Arrays.asList(SPACE.split(s)).iterator();

}

});

JavaRDD rdd3 = rdd2.map(new Function() {

@Override

public Integer call(String s) {

String length = stringBV.value().s; //只是为了使用广播变量stringBV,没有实际的意义

String tmp = length; //只是为了使用广播变量stringBV,没有实际的意义

return s.length();

}

});

JavaRDD rdd4 = rdd3.map(new Function() {

@Override

public tmp1 call(Integer x) {

tmp1 a = new tmp1(); //只是为了将rdd4中的元素类型转换为tmp1类型的对象,没有实际的意义

a.total_ += x;

a.num_ += 1;

return a;

}

});

rdd4.persist(StorageLevel.MEMORY_ONLY_SER()); //将rdd4以序列化的形式缓存在内存中,因为其元素是tmp1对象,所以使用Kryo的序列化方式缓存

System.out.println("the count is " + rdd4.count());

while (true) {} //调试命令,只是用来将程序挂住,方便在Driver 4040的WEB UI中观察rdd的storage情况

//sc.stop();

}

}

上述源码,涉及了闭包中使用的广播变量stringBV(是tmp2类的对象),以及对rdd4(元素是tmp1类的对象)的持久化,由于RDD的持久化占用的内存看起来比较直观,所以主要对比rdd4使用两种序列化库的区别。

使用默认的Java序列化库的情况:缓存后的 rdd4占用内存空间137.7MB

应用程序执行时的信息

4040端口 Driver WEB UI

使用Kryo序列化库的情况:缓存后的 rdd4占用内存空间38.5MB

应用程序执行时的信息

4040端口 Driver WEB UI

可以看出,使用了Kryo序列化库后,rdd4在内存中占用的空间从137.7MB降低到38.5MB,比使用Java序列化库节省了4倍左右的空间(如果使用其他更适合压缩的对象类型,应该能达到官方的所说的提升10倍的压缩比)

当然,如果想进一步的节省内存、硬盘的空间,减少网络传输的数据量,可以配合的使用Spark支持的压缩方式(目前默认是lz4),广播变量、shuffle过程中的数据都默认使用压缩功能。(注意,RDD默认是不压缩的)

Property Name

Default

Meaning

spark.io.compression.codec

lz4

The codec used to compress internal data such as RDD partitions, broadcast variables and shuffle outputs. By default, Spark provides three codecs: lz4, lzf, and snappy.

spark.broadcast.compress

true

Whether to compress broadcast variables before sending them. Generally a good idea.

spark.shuffle.compress

true

Whether to compress map output files. Generally a good idea.

spark.shuffle.spill.compress

true

Whether to compress data spilled during shuffles.

spark.rdd.compress

false

Whether to compress serialized RDD partitions (e.g. for StorageLevel.MEMORY_ONLY_SER in Java and Scala or StorageLevel.MEMORY_ONLY in Python). Can save substantial space at the cost of some extra CPU time.

RDD持久化操作时使用压缩机制(注意,只有序列化后的RDD才能使用压缩机制)

SparkConf 增加下面的配置

conf.set("spark.rdd.compress", "true");

效果很显著吧!rdd4持久化后在内存中占用的空间降低到1MB左右!

应用程序执行的信息

4040端口 Driver WEB UI

使用压缩机制,也会增加额外的开销,也会影响到性能,这点需要注意。

广播变量kyro_利用Kryo序列化库是你提升Spark性能要做的第一件事相关推荐

  1. spark 广播变量大数据_Spark基础知识(三)--- Spark的广播变量和累加器

    在spark程序中,当一个传递给Spark操作(例如map和reduce)的函数在远程节点上面运行时,Spark操作实际上操作的是这个函数所用变量的一个独立副本.这些变量会被复制到每台机器上,并且这些 ...

  2. Spark广播变量Broadcast

    注意:原文出处https://www.jianshu.com/p/3bd18acd2f7f Broadcast 顾名思义,broadcast 就是将数据从一个节点发送到其他各个节点上去.这样的场景很多 ...

  3. 寺库商用区块链落地,联盟链为每件奢侈品发“×××”

    2018年6月,整个世界奢侈品行业都把目光集中到了两件事情上:在欧洲,历峰集团宣布,旗下的Yoox Net-a-porter Group SpA (YNAP.MI)(下称YNAP)在完成私有化后,正式 ...

  4. spark之kryo序列化及其使用

    spark之kryo序列化 spark之kryo 序列化 Spark 中使用 Kryo序列化 中文切词案例: spark之kryo 序列化 1.定义:把对象转换为字节序列的过程称为对象的序列化. 把字 ...

  5. spark中的广播变量broadcast

    Spark中的Broadcast处理 首先先来看一看broadcast的使用代码: val values = List[Int](1,2,3) val broadcastValues = sparkC ...

  6. spark 获取广播变量_Spark流式程序中广播变量和累加器为何使用单例模式

    Spark中广播变量详解以及如何动态更新广播变量​mp.weixin.qq.com 1.广播变量是只读的,使用单例模式可以减少Spark流式程序中每次job生成执行,频繁创建广播变量带来的开销 2. ...

  7. Spark 广播变量和累加器

    Spark 的一个核心功能是创建两种特殊类型的变量:广播变量和累加器 广播变量(groadcast varible)为只读变量,它有运行SparkContext的驱动程序创建后发送给参与计算的节点.对 ...

  8. SparkCore:RDD累加器和广播变量(最详细的介绍)!!!!!!

    RDD累加器和广播变量 在默认情况下,当Spark在集群的多个不同节点的多个任务上并行运行一个函数时,在每个任务上都生成一个副本.但是,有时候需要在多个任务之间共享变量,或者在任务(Task)和任务控 ...

  9. 普通话考试是从题库里抽吗_你必须知道这6件事儿,才能去参加普通话考试

    专注于发布全国普通话水平测试报考信息,精选普通话易错字词读音.报考指南.考试技巧.考试流程.考试题库.命题说话范文.成绩查询等. 究竟是哪六件事呢? 第一件事:我为什么要考普通话水平测试? 普通话水平 ...

最新文章

  1. 大规模业务服务器开发总结
  2. cass插件_南方CASS专题系列,全套教程+视频讲解+插件汇总,全部打包速带走
  3. python实现复制文件功能
  4. Android之帧动画与补间动画的使用
  5. 实例——在编程过程中进行单元测试
  6. [云炬创业基础笔记] 第四章测试13
  7. c语言的运算答案,C语言运算符与表达式的练习题答案.doc
  8. sharing-jdbc实现读写分离及分库分表
  9. 零基础转行学习python是否还在纠结?这里告诉你答案!
  10. 无法远程连接Oracle
  11. 正弦 sin 余弦 cos
  12. 清华毕业生最爱去华为;应届生称因拒绝加班,被申通快递辞退;PrestoSQL被迫更名 | 极客头条...
  13. 5G 是未来的唯一选择?| 畅言
  14. ANSI C和Glib C区别(二)
  15. 汇编语言:基本指令详解
  16. AndroidStudio Screen Capture 按钮点击提示:Unexpected error while obtain screenshot from device:EOF
  17. Mariadb 安装 启动 及错误 1067 问题解决
  18. python井字棋_python实现井字棋游戏
  19. win10 linux efi分区大小,EFI模式 win10+Ubuntu16.04双系统
  20. uniapp 自定义头部 支持微信、百度、头条小程序

热门文章

  1. 安卓opengl基本使用
  2. Valens VS3000系列 HDBaseT2.0延长芯片
  3. c语言停车场管理系统源代码,C语言课程设计 停车场管理系统 源代码 原创 不得随意转载...
  4. 【小程序】__wxConfig is not defined 报错解决
  5. 手机云存储空间已满用免费不限速阿里云盘替代
  6. vuex系列--浅析Vuex 的设计思想
  7. BCM芯片三层组播原理及调试
  8. SQL在数据分析中的应用案例(一)
  9. 无公网IP如何外网异地登录访问电商进销存系统?
  10. spring-banner