1.概述

请参考:https://www.jianshu.com/p/a70950bab06d

【Kafka】Kafka 使用传统的 avro API 自定义序列化类和反序列化类 比较麻烦,需要根据 schema 生成实体类,需要调用 avro 的 API 实现 对象到 byte[] 和 byte[] 到对象的转化,而那些方法看上去比较繁琐,幸运的是,Twitter 开源的类库 Bijection 对传统的 Avro API 进行了封装了和优化,让我们可以方便的实现以上操作。

2.依赖

Bijection 类库的依赖如下:

<dependency><groupId>com.twitter</groupId><artifactId>bijection-avro_2.11</artifactId><version>0.9.6</version>
</dependency>

在 maven 工程的 resources 目录下新建一个 schema 文件,名称为"user.json",因为我们不用 avro 生成实体类的方式,所以定义一个普通的 json 文件来描述 schema 即可,另外,在 json 文件中,也不需要"namespace": "packageName"这个限定生成实体类的包名的参数,本文使用的 json 文件内容如下:

{"type": "record","name": "User","fields": [{"name": "id", "type": "int"},{"name": "name",  "type": "string"},{"name": "age", "type": "int"}]
}

3.测试类

package com.kafka.avro.demo.bijection;import static org.junit.Assert.*;import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.Collections;
import java.util.Properties;import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;
import org.junit.Test;public class BijectionProducerTest {@Testpublic void producerTest() throws Exception {String schemaFilePath = BijectionProducer.class.getClassLoader().getResource("user.json").getPath();FileReader fr = new FileReader(new File(schemaFilePath));BufferedReader br = new BufferedReader(fr);StringBuilder sb = new StringBuilder();String line;while ((line = br.readLine()) != null) {sb.append(line).append("\n");}String schemaStr = sb.toString();br.close();fr.close();Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");Schema.Parser parser = new Schema.Parser();Schema schema = parser.parse(schemaStr);Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);Producer<String, byte[]> producer = new KafkaProducer<>(props);for (int i = 0; i < 100; i++) {GenericData.Record avroRecord = new GenericData.Record(schema);avroRecord.put("id", i);avroRecord.put("name", "name" + i);avroRecord.put("age", 22);byte[] avroRecordBytes = recordInjection.apply(avroRecord);ProducerRecord<String, byte[]> record = new ProducerRecord<>("topic_lcc", avroRecordBytes);producer.send(record);Thread.sleep(1000);}producer.close();}@Testpublic void consumerTest() throws Exception {String schemaFilePath = BijectionProducer.class.getClassLoader().getResource("user.json").getPath();FileReader fr = new FileReader(new File(schemaFilePath));BufferedReader br = new BufferedReader(fr);StringBuilder sb = new StringBuilder();String line;while ((line = br.readLine()) != null) {sb.append(line).append("\n");}String schemaStr = sb.toString();br.close();fr.close();Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "dev3-yangyunhe-group001");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("topic_lcc"));Schema.Parser parser = new Schema.Parser();Schema schema = parser.parse(schemaStr);Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);try {while (true) {ConsumerRecords<String, byte[]> records = consumer.poll(1000);for (ConsumerRecord<String, byte[]> record : records) {GenericRecord genericRecord = recordInjection.invert(record.value()).get();System.out.println("value = [user.id = " + genericRecord.get("id") + ", " +"user.name = " + genericRecord.get("name") + ", " +"user.age = " + genericRecord.get("age") + "], " +"partition = " + record.partition() + ", " +"offset = " + record.offset());}}} finally {consumer.close();}}}

先运行 KafkaConsumer,没有输出
当运行 KakfaProducer 后,KakfaConsumer 控制台输出:

value = [user.id = 0, user.name = name0, user.age = 22], partition = 2, offset = 662
value = [user.id = 1, user.name = name1, user.age = 22], partition = 1, offset = 663
value = [user.id = 2, user.name = name2, user.age = 22], partition = 0, offset = 663
value = [user.id = 3, user.name = name3, user.age = 22], partition = 2, offset = 663
value = [user.id = 4, user.name = name4, user.age = 22], partition = 1, offset = 664

【Kafka】Kafka 使用 Twitter 的 Bijection 类库实现 avro 的序列化与反序列化相关推荐

  1. SpringBoot 自定义Kafka消息序列化和反序列化

    1. 概述 Kafka传输自定义的DTO对象时,不能像平时一样使用StringSerializer和StringDeserializer.这种情况需要自己实现对应DTO的序列化器和反序列化器.假设现在 ...

  2. Kafka消息序列化和反序列化(下)

    欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 欢迎跳转到本文的原文链接:https: ...

  3. Kafka消息序列化和反序列化(上)

    欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 欢迎跳转到本文的原文链接:https: ...

  4. kafka python框架_Python中如何使用Apache Avro——Apache的数据序列化系统

    了解如何创建和使用基于Apache Avro的数据,以实现更好,更有效的传输. 在这篇文章中,我将讨论Apache Avro,这是一种开源数据序列化系统,Spark,Kafka等工具正在使用该工具进行 ...

  5. [Kafka] Kafka基本架构

    [Kafka] Kafka基本架构 [Kafka] Kafka基本架构 [Kafka] Kafka基本架构 生产者Producer :生产信息: 消费者Consumer :订阅主题.消费信息: 代理B ...

  6. [Big Data - Kafka] kafka学习笔记:知识点整理

    一.为什么需要消息系统 1.解耦: 允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束. 2.冗余:消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险.许 ...

  7. linux上卸载kafka,kafka安装在linux上的安装

    kafka安装 第一关 java的安装 捞得嘛,不谈 第二关 zookeeper的安装及配置 1. 直接打开Apach zookeeper进行下载 Tips: source 是源文件,需要编译后才能继 ...

  8. 部署kafka kafka的service容器和zookeeper kafka客户端 Elasticsearch的客户端

    创建network docker network create -d overlay --attachable loc_net   docker stack up -c  kafka.yml kafk ...

  9. [kafka]kafka集群实践

    环境 ip hostname server_id 192.168.1.111 UAT04 2 192.168.1.112 UAT03 1 192.168.1.102 UAT05 3 配置hosts: ...

最新文章

  1. Android 性能优化——布局优化
  2. iOS开发中打电话发短信等功能的实现
  3. 【项目管理】项目管理计划
  4. AD 组策略应用与排错(1应用)
  5. 不可阻挡的PowerShell :Red Teamer告诉你如何突破简单的AppLocker策略
  6. leetcode(128)最长连续序列
  7. 您需要 TrustedInstaller 提供的权限才能对此文件进行更改
  8. 微信文章图片反防盗链解决方案
  9. QOne、QData开关机操作
  10. 重磅!中国芯片新锐50强榜单发布,上海20家、北京仅4家!(附:详细解读)...
  11. RK3588 实现温控风扇之获取cpu温度(一)
  12. 【转】怎样评价寒武纪的芯片1P 1M和MLU100?能够叫板英伟达吗?
  13. 《JS实现复制内容到剪贴板功能,可兼容所有PC浏览器,不兼容手机端》
  14. 22fall HKU港大CS 笔试+面试回忆
  15. Python数据类型 (字符串)
  16. H5新特性有哪些?怎么理解语义化
  17. 会员招募html5,会员招募活动策划方案
  18. [2018 NUIST 程序设计竞赛] P1553 抑郁的竹鼠
  19. 《How Tomcat Works》读书笔记(三)--Connector(连接器)
  20. 浅析容器运行时奥秘——OCI标准

热门文章

  1. 2.88万的五菱神车能赚钱吗?
  2. iPhone质量成迷?被吴彦祖一箭射穿,却还能开机
  3. 低价iPhone 12彻底没戏了?苹果严控渠道:给拼多多等电商供货罚款40万元/台
  4. 漂亮大气!小米武汉总部正式开园:从签约到建成使用仅用时两年
  5. 华为估值知多少?倪光南:位居世界第一应该没问题
  6. 阿里依然在“飙车”!第一财季净利润309.49亿元 同比增长54%
  7. 中国电信回应“变相涨价说”:对原畅享套餐进行的优化升级
  8. 一辆特斯拉Model S在比利时充电时起火 充电桩也被烧焦
  9. 黄轩成为QQ阅读新代言人 变身“队长”号召网友 “组队读书”
  10. oSIP开发者手册 (二)