欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。

欢迎跳转到本文的原文链接:https://honeypps.com/mq/kafka-message-serialize-and-deserialize-2/


接上一篇:Kafka消息序列化和反序列化(上)。

有序列化就会有反序列化,反序列化的操作是在Kafka Consumer中完成的,使用起来只需要配置一下key.deserializer和value.deseriaizer。对应上面自定义的Company类型的Deserializer就需要实现org.apache.kafka.common.serialization.Deserializer接口,这个接口同样有三个方法:

  1. public void configure(Map<String, ?> configs, boolean isKey):用来配置当前类。
  2. public byte[] serialize(String topic, T data):用来执行反序列化。如果data为null建议处理的时候直接返回null而不是抛出一个异常。
  3. public void close():用来关闭当前序列化器。

下面就来看一下DemoSerializer对应的反序列化的DemoDeserializer,详细代码如下:

public class DemoDeserializer implements Deserializer<Company> {public void configure(Map<String, ?> configs, boolean isKey) {}public Company deserialize(String topic, byte[] data) {if (data == null) {return null;}if (data.length < 8) {throw new SerializationException("Size of data received by DemoDeserializer is shorter than expected!");}ByteBuffer buffer = ByteBuffer.wrap(data);int nameLen, addressLen;String name, address;nameLen = buffer.getInt();byte[] nameBytes = new byte[nameLen];buffer.get(nameBytes);addressLen = buffer.getInt();byte[] addressBytes = new byte[addressLen];buffer.get(addressBytes);try {name = new String(nameBytes, "UTF-8");address = new String(addressBytes, "UTF-8");} catch (UnsupportedEncodingException e) {throw new SerializationException("Error occur when deserializing!");}return new Company(name,address);}public void close() {}
}

有些读者可能对新版的Consumer不是很熟悉,这里顺带着举一个完整的消费示例,并以DemoDeserializer作为消息Value的反序列化器。

Properties properties = new Properties();
properties.put("bootstrap.servers", brokerList);
properties.put("group.id", consumerGroup);
properties.put("session.timeout.ms", 10000);
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "com.hidden.client.DemoDeserializer");
properties.put("client.id", "hidden-consumer-client-id-zzh-2");
KafkaConsumer<String, Company> consumer = new KafkaConsumer<String, Company>(properties);
consumer.subscribe(Arrays.asList(topic));
try {while (true) {ConsumerRecords<String, Company> records = consumer.poll(100);for (ConsumerRecord<String, Company> record : records) {String info = String.format("topic=%s, partition=%s, offset=%d, consumer=%s, country=%s",record.topic(), record.partition(), record.offset(), record.key(), record.value());System.out.println(info);}consumer.commitAsync(new OffsetCommitCallback() {public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {if (exception != null) {String error = String.format("Commit failed for offsets {}", offsets, exception);System.out.println(error);}}});}
} finally {consumer.close();
}

有些时候自定义的类型还可以和Avro、ProtoBuf等联合使用,而且这样更加的方便快捷,比如我们将前面Company的Serializer和Deserializer用Protostuff包装一下,由于篇幅限制,笔者这里只罗列出对应的serialize和deserialize方法,详细参考如下:

public byte[] serialize(String topic, Company data) {if (data == null) {return null;}Schema schema = (Schema) RuntimeSchema.getSchema(data.getClass());LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);byte[] protostuff = null;try {protostuff = ProtostuffIOUtil.toByteArray(data, schema, buffer);} catch (Exception e) {throw new IllegalStateException(e.getMessage(), e);} finally {buffer.clear();}return protostuff;
}public Company deserialize(String topic, byte[] data) {if (data == null) {return null;}Schema schema = RuntimeSchema.getSchema(Company.class);Company ans = new Company();ProtostuffIOUtil.mergeFrom(data, ans, schema);return ans;
}

如果Company的字段很多,我们使用Protostuff进一步封装一下的方式就显得简洁很多。不过这个不是最主要的,而最主要的是经过Protostuff包装之后,这个Serializer和Deserializer可以向前兼容(新加字段采用默认值)和向后兼容(忽略新加字段),这个特性Avro和Protobuf也都具备。

自定义的类型有一个不得不面对的问题就是Kafka Producer和Kafka Consumer之间的序列化和反序列化的兼容性,试想对于StringSerializer来说,Kafka Consumer可以顺其自然的采用StringDeserializer,不过对于Company这种专用类型,某个服务使用DemoSerializer进行了序列化之后,那么下游的消费者服务必须也要实现对应的DemoDeserializer。再者,如果上游的Company类型改变,下游也需要跟着重新实现一个新的DemoSerializer,这个后面所面临的难题可想而知。所以,如无特殊需要,笔者不建议使用自定义的序列化和反序列化器;如有业务需要,也要使用通用的Avro、Protobuf、Protostuff等序列化工具包装,尽可能的实现得更加通用且向前后兼容。

题外话,对于Kafka的“深耕者”Confluent来说,还有其自身的一套序列化和反序列化解决方案(io.confluent.kafka.serializer.KafkaAvroSerializer),GitHub上有相关资料,读者如有兴趣可以自行扩展学习。


参考资料

  1. protostuff序列化/反序列化

欢迎跳转到本文的原文链接:https://honeypps.com/mq/kafka-message-serialize-and-deserialize-2/


欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。


Kafka消息序列化和反序列化(下)相关推荐

  1. Kafka消息序列化和反序列化(上)

    欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 欢迎跳转到本文的原文链接:https: ...

  2. SpringBoot 自定义Kafka消息序列化和反序列化

    1. 概述 Kafka传输自定义的DTO对象时,不能像平时一样使用StringSerializer和StringDeserializer.这种情况需要自己实现对应DTO的序列化器和反序列化器.假设现在 ...

  3. Kafka 消息序列化反序列化

    主要是记录下 SpringBoot 如何集成 Kafka,完成消息队列的使用,代码包括 Json 序列化消息,生产者,消费者,配置文件. 1.maven 依赖 <dependency>&l ...

  4. SpringBoot高级-消息-RabbitTemplate发送接受消息序列化机制

    引入了spring-boot-starter-amqp模块,他引入了spring-messaging模块,包括引入了spring-rabbit模块,怎么配置使用呢,<dependency> ...

  5. json 反序列化 父子类型_Jaskson精讲第7篇-类继承关系下的JSON序列化与反序列化JsonTypeInfo...

    Jackson是Spring Boot(SpringBoot)默认的JSON数据处理框架,但是其并不依赖于任何的Spring 库.有的小伙伴以为Jackson只能在Spring框架内使用,其实不是的, ...

  6. JavaSE——IO(下)(Properties类、序列化与反序列化)

    第3节 IO(下) 一..properties文件与Properties类 1.1 .properties文件介绍 .properties文件一种属性文件,以键值对 的格式存储内容,在Java中可以使 ...

  7. 深入Atlas系列:探究序列化与反序列化能力(下) - JavaScriptSerializer

    在ASP.NET AJAX中,客户端的序列化与反序列能力由Sys.Serialization.JavaScriptSerializer类的serialize和deserialize两个静态方法提供.在 ...

  8. 面试官:您能说说序列化和反序列化吗?是怎么实现的?什么场景下需要它?

    序列化和反序列化是Java中最基础的知识点,也是很容易被大家遗忘的,虽然天天使用它,但并不一定都能清楚的说明白.我相信很多小伙伴们掌握的也就几句概念.关键字(Serializable)而已,如果深究问 ...

  9. 10、Kafka 消息订阅系统

    1.Kafka 简介 Kafka 是一个高吞吐.分布式.基于发布订阅的消息系统,利用 Kafka 技术可在廉价 PCServer 上搭建起大规模消息系统. Kafka 和其他组件比较,具有消息持久化. ...

最新文章

  1. js中 let var const 的差异和使用场景
  2. [新闻]Ubuntu7.04于4月19日全球同步发布
  3. java中对象的生存期_Java中对象的生存周期
  4. [css] 你知道CSS的标准发布流程吗?
  5. java成员内部类_Java中的内部类(二)成员内部类
  6. Java 算法 约数个数
  7. 华为出售荣耀为不实消息,赵明曾在内部否认;迅雷前CEO陈磊涉嫌职务侵占罪被调查 ;Python 3.9发布|极客头条...
  8. 机器学习基础(三十) —— 线性回归、正则化(regularized)线性回归、局部加权线性回归(LWLR)
  9. 发布后500访问错误 —— dll引用错误
  10. 马哥【直播班】Python运维自动化与DevOps项目特训班学习记录
  11. 关于计算机知识的动画电影,动画概论总复习题目(附答案)
  12. flash游戏教程集锦~~
  13. Ask, acquire, and attack: data-free UAP generation using class impressions
  14. 【iOS开发必收藏】详解iOS应用程序内使用IAP/StoreKit付费、沙盒(SandBox)测试、创建测试账号流程!2012-6-25日更新iap恢复
  15. 景观生态学原理| 2 景观生态学的理论与核心
  16. Qt Creator老是提示红色信息In included file:unknown type name ‘b‘,怎么解决?
  17. 卷积神经网络——vgg16网络及其python实现
  18. 为什么说全球疫情的刺激,加快了AI视频智能分析技术的需求?
  19. U+V2深度隐藏PE制作技术初探
  20. 配置nginx代理实现https访问

热门文章

  1. 微型计算机组装实验报告虚拟,微型计算机组装与维护实训(附光盘)
  2. 计算机录入员考试题及答案,计算机录入员理论考题及答案.docx
  3. android 等待动画 库,android--AnimationDrawable实现等待动画效果
  4. C51_按键按下,流水灯亮起,数码管显示按下的次数
  5. 每月分享github上有意思的项目
  6. 你知道我们平时在CSS中写的%都是相对于谁吗?
  7. windows下oracle数据库自动备份脚本
  8. 51CTO学院四周年-成长之路
  9. Capybara 2.14.1 发布,Web 应用验收测试框架
  10. Bzoj4561 [JLoi2016]圆的异或并