原文:https://cloud.tencent.com/developer/article/1336568

1. schema 注册表

无论是使用传统的Avro API自定义序列化类和反序列化类还是使用Twitter的Bijection类库实现Avro的序列化与反序列化,这两种方法都有一个缺点:在每条Kafka记录里都嵌入了schema,这会让记录的大小成倍地增加。但是不管怎样,在读取记录时仍然需要用到整个 schema,所以要先找到 schema。有没有什么方法可以让数据共用一个schema?

我们遵循通用的结构模式并使用"schema注册表"来达到目的。"schema注册表"的原理如下:


把所有写入数据需要用到的 schema 保存在注册表里,然后在记录里引用 schema 的 ID。负责读取数据的应用程序使用 ID 从注册表里拉取 schema 来反序列化记录。序列化器和反序列化器分别负责处理 schema 的注册和拉取。

schema注册表并不属于Kafka,现在已经有一些开源的schema 注册表实现。比如本文要讨论的Confluent Schema Registry。

2. 案例说明

现有 schema 文件 user.json,其中内容如下:

{"type": "record","name": "User","fields": [{"name": "id", "type": "int"},{"name": "name",  "type": "string"},{"name": "age", "type": "int"}]
}

需求:把这个 schema 中的内容注册到 Confluent Schema Registry 中,Kafka Producer 和 Kafka Consumer 通过识别 Confluent Schema Registry 中的 schema 内容来序列化和反序列化。

3. 实操步骤

(1) 启动 Confluent Schema Registry 服务
Confluent 下载地址:https://www.confluent.io/download/,我这里使用confluent-oss-4.1.1-2.11.tar.gz
下载好后上传到服务器,解压即可用
进入confluent-4.1.1/etc/schema-registry/目录下,修改schema-registry.properties文件,内容及注释如下:

# Confluent Schema Registry 服务的访问IP和端口
listeners=http://192.168.42.89:8081# Kafka集群所使用的zookeeper地址,如果不配置,会使用Confluent内置的Zookeeper地址(localhost:2181)
kafkastore.connection.url=192.168.42.89:2181/kafka-1.1.0-cluster# Kafka集群的地址(上一个参数和这个参数配置一个就可以了)
# kafkastore.bootstrap.servers=192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094# 存储 schema 的 topic
kafkastore.topic=_schemas# 其余保持默认即可

启动 Confluent Schema Registry

[root@confluent confluent-4.1.1]# bin/schema-registry-start etc/schema-registry/schema-registry.properties
# 省略一些内容......
[2018-06-22 16:10:26,442] INFO Server started, listening for requests... (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain:45)

但是我的报错

[2020-09-10 17:01:23,523] ERROR Server died unexpectedly:  (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain:51)
io.confluent.common.config.ConfigException: No supported Kafka endpoints are configured. Either kafkastore.bootstrap.servers must have at least one endpoint matching kafkastore.security.protocol or broker endpoints loaded from ZooKeeper must have at least one endpoint matching kafkastore.security.protocol.at io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig.endpointsToBootstrapServers(SchemaRegistryConfig.java:614)at io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig.bootstrapBrokers(SchemaRegistryConfig.java:554)at io.confluent.kafka.schemaregistry.storage.KafkaStore.<init>(KafkaStore.java:101)at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.<init>(KafkaSchemaRegistry.java:139)at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:60)at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:42)at io.confluent.rest.Application.createServer(Application.java:157)at io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain.main(SchemaRegistryMain.java:43)

这里有个Bug参考:https://github.com/confluentinc/schema-registry/issues/765

(2) 注册 User 的 schema 注册到对应的 topic 下

首先把原来的 schema 文件加上 “schema” 标记

{"schema": "{"type": "record","name": "User","fields": [{"name": "id", "type": "int"},{"name": "name",  "type": "string"},{"name": "age", "type": "int"}]}"
}

部分"需要转义:

{"schema": "{\"type\": \"record\",\"name\": \"User\",\"fields\": [{\"name\": \"id\", \"type\": \"int\"},{\"name\": \"name\",  \"type\": \"string\"},{\"name\": \"age\", \"type\": \"int\"}]}"
}

注册 schema 的命令如下

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '' \
http://192.168.42.89:8081/subjects/dev3-yangyunhe-topic001-value/versions

说明:

  1. ''之间需要填写schema字符串
  2. 我用来测试的 topic 为 dev3-yangyunhe-topic001,而且我只对 Kafka 的 value 进行 avro 的序列化,所以注册的地址为http://192.168.42.89:8081/subjects/dev3-yangyunhe-topic001-value/versions
  3. http://192.168.42.89:8081需要根据自己的配置进行修改

把转义后 schema 填充到 --data ''的两个单引号中

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"id\", \"type\": \"int\"}, {\"name\": \"name\",  \"type\": \"string\"}, {\"name\": \"age\", \"type\": \"int\"}]}"}' \
http://192.168.42.89:8081/subjects/dev3-yangyunhe-topic001-value/versions

注册成功会返回这个 schema 的 ID

{"id":102}

(3) 在 maven 工程中引入 Confluent Schema Registry 相关的 jar 包
这些 jar 包在 maven 仓库中下载不到,需要自己手动添加到集群中,confluent-4.1.1 解压后,其 share/java/目录下有 confluent 各个组件的 jar 包:

我们需要 confluent-common 目录下的common-config-4.1.1.jar、common-utils-4.1.1.jar和全部以jackson开头的 jar 包以及 kafka-serde-tools 目录下的kafka-schema-registry-client-4.1.1.jar和kafka-avro-serializer-4.1.1.jar,关于如何添加本地的 jar 包到 java 工程中,本文不再赘述。

(4) Kafka Producer 发送数据

package com.bonc.rdpe.kafka110.producer;import java.util.Properties;
import java.util.Random;import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;/*** @Title ConfluentProducer.java * @Description 使用Confluent实现的Schema Registry服务来发送Avro序列化后的对象* @Author YangYunhe* @Date 2018-06-25 10:49:19*/
public class ConfluentProducer {public static final String USER_SCHEMA = "{\"type\": \"record\", \"name\": \"User\", " + "\"fields\": [{\"name\": \"id\", \"type\": \"int\"}, " + "{\"name\": \"name\",  \"type\": \"string\"}, {\"name\": \"age\", \"type\": \"int\"}]}";public static void main(String[] args) throws Exception {Properties props = new Properties();props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 使用Confluent实现的KafkaAvroSerializerprops.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");// 添加schema服务的地址,用于获取schemaprops.put("schema.registry.url", "http://192.168.42.89:8081");Producer<String, GenericRecord> producer = new KafkaProducer<>(props);Schema.Parser parser = new Schema.Parser();Schema schema = parser.parse(USER_SCHEMA);Random rand = new Random();int id = 0;while(id < 100) {id++;String name = "name" + id;int age = rand.nextInt(40) + 1;GenericRecord user = new GenericData.Record(schema);user.put("id", id);user.put("name", name);user.put("age", age);ProducerRecord<String, GenericRecord> record = new ProducerRecord<>("dev3-yangyunhe-topic001", user);producer.send(record);Thread.sleep(1000);}producer.close();}}

(5) Kafka Consumer 消费数据

package com.bonc.rdpe.kafka110.consumer;import java.util.Collections;
import java.util.Properties;import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;/*** @Title ConfluentConsumer.java* @Description 使用Confluent实现的Schema Registry服务来消费Avro序列化后的对象* @Author YangYunhe* @Date 2018-06-25 11:42:21*/
public class ConfluentConsumer {public static void main(String[] args) throws Exception {Properties props = new Properties();props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");props.put("group.id", "dev3-yangyunhe-group001");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 使用Confluent实现的KafkaAvroDeserializerprops.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");// 添加schema服务的地址,用于获取schemaprops.put("schema.registry.url", "http://192.168.42.89:8081");KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("dev3-yangyunhe-topic001"));try {while (true) {ConsumerRecords<String, GenericRecord> records = consumer.poll(1000);for (ConsumerRecord<String, GenericRecord> record : records) {GenericRecord user = record.value();System.out.println("value = [user.id = " + user.get("id") + ", " + "user.name = "+ user.get("name") + ", " + "user.age = " + user.get("age") + "], "+ "partition = " + record.partition() + ", " + "offset = " + record.offset());}}} finally {consumer.close();}}
}

(6) 测试结果

Kafka Consumer 的控制台输出内容如下:

value = [user.id = 1, user.name = name1, user.age = 20], partition = 1, offset = 696
value = [user.id = 2, user.name = name2, user.age = 27], partition = 0, offset = 696
value = [user.id = 3, user.name = name3, user.age = 35], partition = 2, offset = 695
value = [user.id = 4, user.name = name4, user.age = 7], partition = 1, offset = 697
value = [user.id = 5, user.name = name5, user.age = 34], partition = 0, offset = 697......

【Kafka】Confluent Schema Registry相关推荐

  1. Kafka 的 Confluent Schema Registry安装与使用教程

    1 .Confluent Schema Registry 安装教程 Schema Registry的各个发现行版本的下载链接 上传到linux系统进行解压安装. 本教程使用外部以安装好的Kafka集群 ...

  2. kafka Confluent Schema Registry 简单实践

    解释及目的: 使用传统的Avro API自定义序列化类和反序列化类或者使用Twitter的Bijection类库实现Avro的序列化与反序列化,这两种方法都有一个缺点:在每条Kafka记录里都嵌入了s ...

  3. 【Kafka】Kafka 使用 Twitter 的 Bijection 类库实现 avro 的序列化与反序列化

    1.概述 请参考:https://www.jianshu.com/p/a70950bab06d [Kafka]Kafka 使用传统的 avro API 自定义序列化类和反序列化类 比较麻烦,需要根据 ...

  4. 【Kafka】第三篇-Kafka的集群及Canal介绍

    [上一章 [Kafka]第二篇-Kafka的核心概念及分区消费规则] 学习路线 Kafka集群架构 Kafka集群环境 1.kafka是一个压缩包,直接解压即可使用,所以我们就解压三个kafka: 2 ...

  5. 【kafka】Kafka 幂等 Producer

    1.概述 [Kafka]Kafka幂等性原理及实现剖析 [kafka]Kafka 事务性之幂等性实现 官网:Idempotent Producer 2.简介 Kafka提供了"至少一次&qu ...

  6. 【Kafka】Kafka 如果 动态 不停止的情况下 修改 消费组 offset

    文章目录 1.概述 2.方案1 3.方案2 1.概述 测试点:有人遇到这样的情况,他一个消费者正在消费一个环境的topic,然后他想启动另外一个消费组,但是是使用了同一个消费组,这个去更改 消费组of ...

  7. 【kafka】kafka获取消费组异常 EOFException: null KeeperErrorCode

    文章目录 1.场景1 1.1 概述 1.2 源码解析 1.2.1 sendRequest 1.2.2 receive 1.2.3 readCompletely 1.2 4 readFromReadab ...

  8. 【kafka】kafka kerberos Cannot locate KDC Unable to locate KDC for realm

    文章目录 1.概述 2.场景2 1.概述 本次是在 [kafka]kafka 消费 带有 kerberos认证的服务器 这个都搞通的情况下,搞这个. 机器的环境如下 12.12.12.24 kdc服务 ...

  9. 【kafka】kafka kerberos KeeperErrorCode = InvalidACL for /config/topics

    文章目录 1.概述 1.概述 环境搭建参考:[kafka]kafka 消费 带有 kerberos认证的服务器 在这个kerberos认证的环境下,我想测试一下,构建zk的客户端是否需要认证,然后代码 ...

最新文章

  1. Delphi 与 DirectX 之 DelphiX(89): TDIB.DrawAlphaMask();
  2. java命名规则_Java命名规则
  3. Swift:用UICollectionView整一个瀑布流
  4. Linux编程 8 (挂载mount,查看磁盘df du,搜索grep,压缩zgip,归档tar)
  5. rxjs pipe和filter组合的一个实际例子的单步调试
  6. kafka命令行操作
  7. 单链表逆序的多种方式
  8. Java并发编程之CountDownLatch闭锁
  9. 程序员社区骂战:不满政治正确,LLVM元老宣布退出
  10. 金志文机器人歌叫什么_MIR发布《移动机器人部署安全指南》白皮书
  11. fatjar: eclipse导出工具
  12. stm32例程_如何系统地入门学习stm32?
  13. Android实战开发小米主题下载工具
  14. jmeter进行http压力测试
  15. Windows下CURL编译 支持HTTPS
  16. 第四百九十一章 战利品
  17. GBase 8c开发接口
  18. 关于AI芯片功耗和应用的一些看法
  19. 前端面试必备ES6全方位总结
  20. SpringAMQP发送与接收消息

热门文章

  1. 年轻人原地过年,也不忘搞钱
  2. iPhone 12明天凌晨发布,有望推动苹果市值超过2.2万亿美元
  3. 百度景鲲:9月15日发布小度真无线智能耳机
  4. “苹果税”猛于虎惹众怒,库克:我们是不会让步的
  5. 华为Mate 40系列或采用双处理器方案:国行版仍为麒麟芯
  6. 罗永浩与银联合作直播,但因过程太流畅被网友调侃是录播
  7. 那些慢慢消失的手机功能,最怀念第一个!
  8. 拼多多上线“医药健康日” 将对最常用的OTC药品等进行补贴
  9. 什么?iPhone 11起售价不到5400元?
  10. 华为Mate 30 Pro前面板曝光:双曲面设计 几乎全是屏