【Kafka】Kafka 使用 Twitter 的 Bijection 类库实现 avro 的序列化与反序列化
1.概述
请参考:https://www.jianshu.com/p/a70950bab06d
【Kafka】Kafka 使用传统的 avro API 自定义序列化类和反序列化类 比较麻烦,需要根据 schema 生成实体类,需要调用 avro 的 API 实现 对象到 byte[] 和 byte[] 到对象的转化,而那些方法看上去比较繁琐,幸运的是,Twitter 开源的类库 Bijection 对传统的 Avro API 进行了封装了和优化,让我们可以方便的实现以上操作。
2.依赖
Bijection 类库的依赖如下:
<dependency><groupId>com.twitter</groupId><artifactId>bijection-avro_2.11</artifactId><version>0.9.6</version>
</dependency>
在 maven 工程的 resources 目录下新建一个 schema 文件,名称为"user.json",因为我们不用 avro 生成实体类的方式,所以定义一个普通的 json 文件来描述 schema 即可,另外,在 json 文件中,也不需要"namespace": "packageName"这个限定生成实体类的包名的参数,本文使用的 json 文件内容如下:
{"type": "record","name": "User","fields": [{"name": "id", "type": "int"},{"name": "name", "type": "string"},{"name": "age", "type": "int"}]
}
3.测试类
package com.kafka.avro.demo.bijection;import static org.junit.Assert.*;import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.Collections;
import java.util.Properties;import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;
import org.junit.Test;public class BijectionProducerTest {@Testpublic void producerTest() throws Exception {String schemaFilePath = BijectionProducer.class.getClassLoader().getResource("user.json").getPath();FileReader fr = new FileReader(new File(schemaFilePath));BufferedReader br = new BufferedReader(fr);StringBuilder sb = new StringBuilder();String line;while ((line = br.readLine()) != null) {sb.append(line).append("\n");}String schemaStr = sb.toString();br.close();fr.close();Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");Schema.Parser parser = new Schema.Parser();Schema schema = parser.parse(schemaStr);Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);Producer<String, byte[]> producer = new KafkaProducer<>(props);for (int i = 0; i < 100; i++) {GenericData.Record avroRecord = new GenericData.Record(schema);avroRecord.put("id", i);avroRecord.put("name", "name" + i);avroRecord.put("age", 22);byte[] avroRecordBytes = recordInjection.apply(avroRecord);ProducerRecord<String, byte[]> record = new ProducerRecord<>("topic_lcc", avroRecordBytes);producer.send(record);Thread.sleep(1000);}producer.close();}@Testpublic void consumerTest() throws Exception {String schemaFilePath = BijectionProducer.class.getClassLoader().getResource("user.json").getPath();FileReader fr = new FileReader(new File(schemaFilePath));BufferedReader br = new BufferedReader(fr);StringBuilder sb = new StringBuilder();String line;while ((line = br.readLine()) != null) {sb.append(line).append("\n");}String schemaStr = sb.toString();br.close();fr.close();Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "dev3-yangyunhe-group001");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("topic_lcc"));Schema.Parser parser = new Schema.Parser();Schema schema = parser.parse(schemaStr);Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);try {while (true) {ConsumerRecords<String, byte[]> records = consumer.poll(1000);for (ConsumerRecord<String, byte[]> record : records) {GenericRecord genericRecord = recordInjection.invert(record.value()).get();System.out.println("value = [user.id = " + genericRecord.get("id") + ", " +"user.name = " + genericRecord.get("name") + ", " +"user.age = " + genericRecord.get("age") + "], " +"partition = " + record.partition() + ", " +"offset = " + record.offset());}}} finally {consumer.close();}}}
先运行 KafkaConsumer,没有输出
当运行 KakfaProducer 后,KakfaConsumer 控制台输出:
value = [user.id = 0, user.name = name0, user.age = 22], partition = 2, offset = 662
value = [user.id = 1, user.name = name1, user.age = 22], partition = 1, offset = 663
value = [user.id = 2, user.name = name2, user.age = 22], partition = 0, offset = 663
value = [user.id = 3, user.name = name3, user.age = 22], partition = 2, offset = 663
value = [user.id = 4, user.name = name4, user.age = 22], partition = 1, offset = 664
【Kafka】Kafka 使用 Twitter 的 Bijection 类库实现 avro 的序列化与反序列化相关推荐
- SpringBoot 自定义Kafka消息序列化和反序列化
1. 概述 Kafka传输自定义的DTO对象时,不能像平时一样使用StringSerializer和StringDeserializer.这种情况需要自己实现对应DTO的序列化器和反序列化器.假设现在 ...
- Kafka消息序列化和反序列化(下)
欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 欢迎跳转到本文的原文链接:https: ...
- Kafka消息序列化和反序列化(上)
欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 欢迎跳转到本文的原文链接:https: ...
- kafka python框架_Python中如何使用Apache Avro——Apache的数据序列化系统
了解如何创建和使用基于Apache Avro的数据,以实现更好,更有效的传输. 在这篇文章中,我将讨论Apache Avro,这是一种开源数据序列化系统,Spark,Kafka等工具正在使用该工具进行 ...
- [Kafka] Kafka基本架构
[Kafka] Kafka基本架构 [Kafka] Kafka基本架构 [Kafka] Kafka基本架构 生产者Producer :生产信息: 消费者Consumer :订阅主题.消费信息: 代理B ...
- [Big Data - Kafka] kafka学习笔记:知识点整理
一.为什么需要消息系统 1.解耦: 允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束. 2.冗余:消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险.许 ...
- linux上卸载kafka,kafka安装在linux上的安装
kafka安装 第一关 java的安装 捞得嘛,不谈 第二关 zookeeper的安装及配置 1. 直接打开Apach zookeeper进行下载 Tips: source 是源文件,需要编译后才能继 ...
- 部署kafka kafka的service容器和zookeeper kafka客户端 Elasticsearch的客户端
创建network docker network create -d overlay --attachable loc_net docker stack up -c kafka.yml kafk ...
- [kafka]kafka集群实践
环境 ip hostname server_id 192.168.1.111 UAT04 2 192.168.1.112 UAT03 1 192.168.1.102 UAT05 3 配置hosts: ...
最新文章
- Android 性能优化——布局优化
- iOS开发中打电话发短信等功能的实现
- 【项目管理】项目管理计划
- AD 组策略应用与排错(1应用)
- 不可阻挡的PowerShell :Red Teamer告诉你如何突破简单的AppLocker策略
- leetcode(128)最长连续序列
- 您需要 TrustedInstaller 提供的权限才能对此文件进行更改
- 微信文章图片反防盗链解决方案
- QOne、QData开关机操作
- 重磅!中国芯片新锐50强榜单发布,上海20家、北京仅4家!(附:详细解读)...
- RK3588 实现温控风扇之获取cpu温度(一)
- 【转】怎样评价寒武纪的芯片1P 1M和MLU100?能够叫板英伟达吗?
- 《JS实现复制内容到剪贴板功能,可兼容所有PC浏览器,不兼容手机端》
- 22fall HKU港大CS 笔试+面试回忆
- Python数据类型 (字符串)
- H5新特性有哪些?怎么理解语义化
- 会员招募html5,会员招募活动策划方案
- [2018 NUIST 程序设计竞赛] P1553 抑郁的竹鼠
- 《How Tomcat Works》读书笔记(三)--Connector(连接器)
- 浅析容器运行时奥秘——OCI标准
热门文章
- 2.88万的五菱神车能赚钱吗?
- iPhone质量成迷?被吴彦祖一箭射穿,却还能开机
- 低价iPhone 12彻底没戏了?苹果严控渠道:给拼多多等电商供货罚款40万元/台
- 漂亮大气!小米武汉总部正式开园:从签约到建成使用仅用时两年
- 华为估值知多少?倪光南:位居世界第一应该没问题
- 阿里依然在“飙车”!第一财季净利润309.49亿元 同比增长54%
- 中国电信回应“变相涨价说”:对原畅享套餐进行的优化升级
- 一辆特斯拉Model S在比利时充电时起火 充电桩也被烧焦
- 黄轩成为QQ阅读新代言人 变身“队长”号召网友 “组队读书”
- oSIP开发者手册 (二)