Storm 中的 tuple可以包含任何类型的对象。由于Storm 是一个分布式系统,所以在不同的任务之间传递消息时Storm必须知道怎样序列化、反序列化消息对象。

  Storm 使用 Kryo库对对象进行序列化。Kryo 是一个灵活、快速的序列化库。Storm 默认支持基础类型、string、byte arrays、ArrayList、HashMap、HashSet 以及 Clojure 的集合类型的序列化。如果需要在tuple中使用其他的对象类型,就需要注册一个自定义的序列化器。

原文和作者一起讨论:http://www.cnblogs.com/intsmaze/p/7044042.html

微信:intsmaze

避免微信回复重复咨询问题,技术咨询请博客留言。

自定义序列化

  TORM使用Kryo来序列化。要实现自定义序列化器,我们需要使用Kryo注册新的序列化器。添加自定义序列化器是通过拓扑配置的topology.kryo.register属性完成的。它需要一个注册的列表,每个注册项可以采取两种形式:

1:类名注册,在这种情况下,Storm将使用Kryo的FieldsSerializer来序列化该类。这可能是也可能不是该类最好的选择,更多的细节可以查看Kryo文档。

2:实现了com.esotericsoftware.kryo.Serializer接口的类名注册的映射。

Storm为拓扑配置里的注册序列化提供了帮助。Config类中有一个名为registerSerialization的方法,可以把注册添加到配置中。

public void registerSerialization(Class klass);
public void registerSerialization(Class klass, Class<? extends Serializer> serializerClass);

java序列化

  一个拓扑中不同的任务传递消息时Storm发现了一个没有注册序列化器的类型,它会使用 Java 序列化器来代替,如果这个对象无法被Java序列化器序列化,Storm 就会抛出异常。

  注意,Java 自身的序列化机制非常耗费资源,而且不管在 CPU 的性能上还是在序列化对象的大小上都没有优势。强烈建议读者在生产环境中运行topology 的时候注册一个自定义的序列化器。

  可以通过将 Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION 配置为 false 的方式来将禁止序列化器回退到Java的序列化机制。

Config.setFallBackOnJavaSerialization(conf,false);这个时候如果storm使用java序列化就会抛出异常告诉开发人员去注册一个kryo序列化。

实现storm序列化

创建传输的对象。

package cn.intsmaze.serializable.bean;
public class Person {private int age;private Studnet studnet;private ArrayList arrayList=new ArrayList();private LinkedList linkedList=new LinkedList();public Person() {}public Person(int age,Studnet s) {this.age = age;this.studnet=s;arrayList.add("ArrayList中的"+s.getName());linkedList.add("linkedList中的"+s.getName());}@Overridepublic String toString() {return "Person [age=" + age + ", studnet=" + studnet + ", arrayList="+ arrayList + ", linkedList=" + linkedList + "]";}get(),set()......
}package cn.intsmaze.serializable.bean;
public class Studnet {private String name;public Studnet() {}public Studnet(String name) {this.name = name;}@Overridepublic String toString() {return "Studnet [name=" + name + "]";} get(),set()......
}

spout和bolt的实现,spout每次会创建一个person对象将该对象发送到bolt,bolt类接收到该对象将该对象打印出来。

package cn.intsmaze.serializable;
import cn.intsmaze.serializable.bean.Person;
import cn.intsmaze.serializable.bean.Studnet;
public class SpoutBean extends BaseRichSpout {SpoutOutputCollector collector;public void open(Map conf, TopologyContext context,SpoutOutputCollector collector) {this.collector = collector;}public void nextTuple() {Studnet s=new Studnet("xiaoxi");collector.emit(new Values(new Person(100,s)));Utils.sleep(500);}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("person"));}
} package cn.intsmaze.serializable;
import cn.intsmaze.serializable.bean.Person;
public class BoltBean extends BaseBasicBolt {public void prepare(Map stormConf, TopologyContext context) {super.prepare(stormConf, context);}public void execute(Tuple input, BasicOutputCollector collector) {Person person = (Person)input.getValueByField("person");System.out.println("接收到spout节点传来的数据:"+person);}public void declareOutputFields(OutputFieldsDeclarer declarer) {}
}

场景一:

使用public void registerSerialization(Class klass);

package cn.intsmaze.serializable;
import java.util.LinkedList;
import cn.intsmaze.serializable.bean.Person;
import cn.intsmaze.serializable.bean.Studnet;
public class TopologyBean {
public static void main(String[] args) throws Exception {TopologyBuilder builder = new TopologyBuilder();builder.setSpout("spout", new SpoutBean(), 1);builder.setBolt("bolt", new BoltBean(), 1).shuffleGrouping("spout");Config conf = new Config();conf.registerSerialization(Person.class);conf.registerSerialization(Studnet.class);//注释掉后,但Studnet没实现java序列化,则会报错。有两种方法,一种注册该类,一种实现java序列化。conf.registerSerialization(LinkedList.class);//这里如果注释掉,则会使用java序列化方式,如果我们取消掉禁止使用java序列化方法,则会提示注册LinkedList类报错。conf.setNumWorkers(2);// Config.setFallBackOnJavaSerialization(conf, false);//禁止使用java语言自己的序列化StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());}
}

  第11行,我们注册person类使用Kryo序列化,person对象除了有基本类型int字段外,还有arraylist,linkedlist类型以及自定义的student类型。arraylist类型是storm默认已经提供了支持。

  这里如果我们不对linkedlist类型和自定义类型student进行注册则该拓扑在运行时则会报无法序列化student类型异常。

这个时候有两种办法解决:

  一种就是使student实现java的public class Studnet implements Serializable接口,则该拓扑会成功运行。因为storm如果发现传输的对象如果没有注册为Kryo,则就会使用java的序列化对象,而linkedlist默认已经实现了该接口,所以才会出现前面报student对象无法序列化,然后使得student实现java的序列化接口即可。

  第二种方案就是,我们对student类进行注册conf.registerSerialization(Studnet.class);。

虽然linkedlist不注册,会默认使用java的序列化,但是出于效率的考虑,我们将其注册为Kryo。

  提示:因为有些集合类型,storm没有提供序列化支持,但是实现了java序列化接口,所以如果我们不加以控制,会使用java序列化而拖累整个系统。所以推荐使用

Config.setFallBackOnJavaSerialization(conf, false);禁止使用java语言自己的序列化来可以在本地模式时及时发现报错信息,将问题尽早解决。

场景二:

  我们使用kryo序列化,但是有时候我们并不希望传输对象的所有字段,而只是传输对象的某些字段,从而进一步提高消息的传递速率,这个时候我们可以使用kryo的自定义序列化机制来指定传输的值。

package cn.intsmaze.serializable.bean;
import java.util.ArrayList;
import java.util.LinkedList;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
public class PersonSerializable extends Serializer<Person>{@Overridepublic Person read(Kryo kryo, Input input, Class<Person> arg2) {System.out.println("序列化");Person person=new Person();person.setAge(input.readInt());person.setArrayList(kryo.readObject(input, ArrayList.class));person.setLinkedList(kryo.readObject(input, LinkedList.class));//该类型,storm默认不支持,所以要在topology中注册该类型,如果不注册,则会使用java序列化。 person.setStudnet(kryo.readObject(input, Studnet.class));//该类型,storm默认不支持,所以要在topology中注册该类型,如果不注册,且java序列化没有实现,则会报错。return person;}@Overridepublic void write(Kryo kryo, Output output, Person person) {System.out.println("反序列化");output.writeInt(person.getAge());kryo.writeObject(output, person.getArrayList());kryo.writeObject(output, person.getLinkedList());kryo.writeObject(output, person.getStudnet());}
}

package cn.intsmaze.serializable;
import java.util.LinkedList;
import cn.intsmaze.serializable.bean.Person;
import cn.intsmaze.serializable.bean.PersonSerializable;
import cn.intsmaze.serializable.bean.Studnet;
public class TopologyBean {public static void main(String[] args) throws Exception {TopologyBuilder builder = new TopologyBuilder();builder.setSpout("spout", new SpoutBean(), 1);builder.setBolt("bolt", new BoltBean(), 1).shuffleGrouping("spout");Config conf = new Config();conf.registerSerialization(Studnet.class);conf.registerSerialization(LinkedList.class);conf.registerSerialization(Person.class, PersonSerializable.class);conf.setNumWorkers(2);     StormSubmitter.submitTopologyWithProgressBar(args[0], conf,builder.createTopology());}
}

  因为PersonSerializable类中指定了要传输person对象的int,studne,ArrayList,LinkedList 类型。

如果我们注释掉第12行 conf.registerSerialization(Studnet.class);且Studnet类没有实现java的序列化,则拓扑的任务间传递消息进行序列化时就会报无法序列化该类的错误,感兴趣的同学可以试试注释掉该行,看看storm会报什么异常。

第13行,我们必须注册对LinkedList序列化,storm默认支持了对ArrayList类的序列化,但没有提供对LinkedList序列化,需要我们手动注册,如果不注册,因为LinkedList实现了java的序列化接口,所以会使用java序列化,则不会报错。

  强烈建议,在开发中就算注册了kyro序列化方式,也要设置该conf.setFallBackOnJavaSerialization(false)方法来禁止使用java序列化方式,因为实际开发中,核心架构搭建好了,会让团队成员直接在现成架构上编写,他们不需要了解storm的一些机制,但是这也带来问题,一种场景就是,开发人员对传输对象增加了一个LinkedList字段,但是他没有注册序列化类,storm就会对LinkedList使用java序列化,就会拖累系统的性能,所以在架构的时候,通过设置禁止java序列化方法,就可以在测试中及时发现问题所在。

补充:上面的所有一切,在本地运行以及部署到集群时,work数量设置为1时,都不会生效的。因为同一个对象公有一个内存,不会涉及网络传输的,也就不需要序列化和反序列化。

生产场景回顾:

  本人intsmaze生产上遇见的问题:storm工程中对传输对象使用了conf.registerSerialization(Person.class, PersonSerializable.class);方式来指定序列化该对象的某些字段。初级程序员在storm工程上开发时,因为业务需要对传输对象增加了一个字段,但是没有在PersonSerializable中序列化和反序列化该对象。恰巧的时,初级工程师本地模式和准生产测试时,topology的work的数量都为1,导致对象在bolt和bolt节点传输时并没有走序列化方式,结果测试一切正常,但是上生产后,因为work数量是10个,立马在后一个bolt中报大量的空指针异常,造成很严重的生产问题。

Strom序列化机制相关推荐

  1. java序列化算法透析_Java序列化机制与原理的深入分析

    Java序列化算法透析 Serialization(序列化)是一种将对象以一连串的字节描述的过程:反序列化deserialization是一种将这些字节重建成一个对象的过程.Java序列化API提供一 ...

  2. Spark 配置Kryo序列化机制注意细节

    一.Spark 的序列化 序列化 Spark 是一个高性能.分布式的.基于内存计算的计算引擎,Spark 集群中包含多个节点,各节点之间要进行通信(比如数据传输,Spark 通过 RPC 进行节点间的 ...

  3. SpringBoot高级-缓存-RedisTemplate序列化机制

    前面我们就搭建测试好了redis环境,接下来我们就来整合redis来做缓存,我们需要引入redis的starter,这个starter我们直接去官方文档去搜索就行了,我们来找到所有的starter跟r ...

  4. java对象的序列化机制详解

    Java对象的序列化机制 Java对象的序列化,是将内存中的java对象转化为二进制的字节流,然后保存到磁盘中或者在网络上.这就是序列化对象,反序列化顾名思义就是将对象的二进制字节流恢复成原来的对象. ...

  5. java 序列化 protobuf_java序列化机制之protobuf(快速高效跨语言)

    我们之前曾讲过java自带的一种序列化机制,但是这种机制效率太低,有很多缺点.因此也涌现出了很多优秀的系列化框架,比如说protobuf.protostuff.thrift.hession.kryo. ...

  6. 对象序列化机制的理解

    1.对象的序列化机制: 对象序列化机制允许把内存中的Java对象转换成平台无关的二进制流,从而允许把这种二进制流持久地保存在磁盘上,或通过网络将这种二进制流传输到另一个网络节点. //当其它程序获取了 ...

  7. 一篇文章搞定java序列化机制

    序列化与反序列化 序列化:将对象的状态信息转换为可以存储或传输的数据形式(比如二进制)的过程. 反序列化:与序列化相对,把序列化转换成的可以存储或传输的数据形式转化为对象的状态信息的过程. java序 ...

  8. Java序列化机制原理,java面试题,java基础笔试题,BAT

    写在最前面,我总结出了很多互联网公司的面试题及答案,并整理成了文档,以及各种学习的进阶学习资料,免费分享给大家.扫码加微信好友进[程序员面试学习交流群],免费领取.也欢迎各位一起在群里探讨技术. Ja ...

  9. 反序列化对象列表发生异常_Apache Thrift系列详解:序列化机制

    前言 Thrift支持二进制,压缩格式,以及json格式数据的序列化和反序列化.开发人员可以更加灵活的选择协议的具体形式.协议是可自由扩展的,新版本的协议,完全兼容老的版本! 正文 数据交换格式简介 ...

最新文章

  1. Blender三维插图设计视频教程 3D Characters and Illustrations in Blender 2.9
  2. 简析 .NET Core 构成体系
  3. 技术网站 --人人都是产品经理
  4. windows server 2003上安装mysql的问题
  5. Linux命令:tail
  6. 初识--AVSpeechSynthesizer
  7. Elastic job,任务状态:分片待调整
  8. spyder python调试_使用spyder编译器单步调试python
  9. 思维-CF-739A
  10. 直角三角形的边角关系_三角形的边角关系巩固练习
  11. C++模板(关键字template,typename)介绍
  12. 【Spring-AOP】自动代理类AnnotationAwareAspectJAutoProxyCreator
  13. maven仓库的优先级,profile的优先级
  14. 使用vue+element开发一个谷歌插件
  15. c语言 at指令gprs,gprs AT 指令
  16. 前端disable和readonly的区别?
  17. 一、首页第一个首页栏制作【仿淘票票系统前后端完全制作(除支付外)】
  18. 《Java学习笔记(第8版)》学习指导
  19. ISCA文献翻译:第三篇Efficient Digital Neurons for Large Scale Cortical Architectures
  20. setTimeout()方法

热门文章

  1. DevOps与持续交付实践
  2. 2010.2--netscreen ssg 140 恢复出厂设置的方法
  3. SilverLight学习笔记--Silverlight中WebRequest通讯
  4. java中日期的数据类型是啥_用于存储日期和时间的最合适的SQL和Java数据类型
  5. 计算机仿真的过程,计算机仿真的过程与方法.doc
  6. 收藏! Linux 服务器必备的安全设置
  7. 4道经典指针笔试题讲解 ~
  8. poj2155(二维树状数组)
  9. Himmelblau函数优化实战
  10. 下行物理信道rs_信道估计与均衡