1 、Confluent Schema Registry 安装教程

  1. Schema Registry的各个发现行版本的下载链接
  2. 上传到linux系统进行解压安装。
  3. 本教程使用外部以安装好的Kafka集群不使用内部默认的。
  4. 修改confluent-5.3.1/etc/schema-registry/schema-registry.properties配置文件
#
# Copyright 2018 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
## The address the socket server listens on.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
# 注册服务器的监听地址及其端口号
listeners=http://0.0.0.0:8081# Zookeeper connection string for the Zookeeper cluster used by your Kafka cluster
# (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# 有关连接外部集群的地址有两种方式:1 通过zk连接 2 通过kafka的控制器 。 本教程采用zk连接
kafkastore.connection.url=henghe-042:2181# Alternatively, Schema Registry can now operate without Zookeeper, handling all coordination via
# Kafka brokers. Use this setting to specify the bootstrap servers for your Kafka cluster and it
# will be used both for selecting the master schema registry instance and for storing the data for
# registered schemas.
# (Note that you cannot mix the two modes; use this mode only on new deployments or by shutting down
# all instances, switching to the new configuration, and then starting the schema registry
# instances again.)
#kafkastore.bootstrap.servers=PLAINTEXT://localhost:9092# The name of the topic to store schemas in
kafkastore.topic=_schemas# If true, API requests that fail will include extra debugging information, including stack traces
debug=false
  1. 注册服务器的启动../../bin/schema-registry-start -daemon ../../etc/schema-registry/schema-registry.properties
  2. 注册服务器的API接口
# Register a new version of a schema under the subject "Kafka-key"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \--data '{"schema": "{\"type\": \"string\"}"}' \http://localhost:8081/subjects/Kafka-key/versions{"id":1}# Register a new version of a schema under the subject "Kafka-value"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \--data '{"schema": "{\"type\": \"string\"}"}' \http://localhost:8081/subjects/Kafka-value/versions{"id":1}# List all subjects
$ curl -X GET http://localhost:8081/subjects["Kafka-value","Kafka-key"]# List all schema versions registered under the subject "Kafka-value"
$ curl -X GET http://localhost:8081/subjects/Kafka-value/versions[1]# Fetch a schema by globally unique id 1
$ curl -X GET http://localhost:8081/schemas/ids/1{"schema":"\"string\""}# Fetch version 1 of the schema registered under subject "Kafka-value"
$ curl -X GET http://localhost:8081/subjects/Kafka-value/versions/1{"subject":"Kafka-value","version":1,"id":1,"schema":"\"string\""}# Fetch the most recently registered schema under subject "Kafka-value"
$ curl -X GET http://localhost:8081/subjects/Kafka-value/versions/latest{"subject":"Kafka-value","version":1,"id":1,"schema":"\"string\""}# Delete version 3 of the schema registered under subject "Kafka-value"
$ curl -X DELETE http://localhost:8081/subjects/Kafka-value/versions/33# Delete all versions of the schema registered under subject "Kafka-value"
$ curl -X DELETE http://localhost:8081/subjects/Kafka-value[1, 2, 3, 4, 5]# Check whether a schema has been registered under subject "Kafka-key"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \--data '{"schema": "{\"type\": \"string\"}"}' \http://localhost:8081/subjects/Kafka-key{"subject":"Kafka-key","version":1,"id":1,"schema":"\"string\""}# Test compatibility of a schema with the latest schema under subject "Kafka-value"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \--data '{"schema": "{\"type\": \"string\"}"}' \http://localhost:8081/compatibility/subjects/Kafka-value/versions/latest{"is_compatible":true}# Get top level config
$ curl -X GET http://localhost:8081/config{"compatibilityLevel":"BACKWARD"}# Update compatibility requirements globally
$ curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \--data '{"compatibility": "NONE"}' \http://localhost:8081/config{"compatibility":"NONE"}# Update compatibility requirements under the subject "Kafka-value"
$ curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \--data '{"compatibility": "BACKWARD"}' \http://localhost:8081/config/Kafka-value{"compatibility":"BACKWARD"}

2、Confluent Schema Registry 使用教程

  1. 创建java工程的pom依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.yss</groupId><artifactId>Kafka</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.3.0</version></dependency><!--此依赖是通过本地依赖库导入的,有关如何把jar放入本地依赖库自行搜索--><!--本人的jar文件是在编译源码时自动到依赖库中的所以直接引用--><dependency><groupId>io.confluent</groupId><artifactId>kafka-avro-serializer</artifactId><version>5.3.2</version></dependency></dependencies></project>
  1. 生产者示例:
package com.registry;import java.util.Properties;
import java.util.Random;import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;/*** @description:* @author: wangshuai* @create: 2020-01-03 14:17**/
public class ConfluentProducer {public static final String USER_SCHEMA = "{\"type\": \"record\", \"name\": \"User\", " +"\"fields\": [{\"name\": \"id\", \"type\": \"int\"}, " +"{\"name\": \"name\",  \"type\": \"string\"}, {\"name\": \"age\", \"type\": \"int\"}]}";public static void main(String[] args) throws InterruptedException {Properties props = new Properties();props.put("bootstrap.servers", "192.168.101.42:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 使用Confluent实现的KafkaAvroSerializerprops.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");// 添加schema服务的地址,用于获取schemaprops.put("schema.registry.url", "http://192.168.101.42:8081");Producer<String, GenericRecord> producer = new KafkaProducer<>(props);Schema.Parser parser = new Schema.Parser();Schema schema = parser.parse(USER_SCHEMA);Random rand = new Random();int id = 0;while (id < 100) {id++;String name = "name" + id;int age = rand.nextInt(40) + 1;GenericRecord user = new GenericData.Record(schema);user.put("id", id);user.put("name", name);user.put("age", age);ProducerRecord<String, GenericRecord> record = new ProducerRecord<>("test-topic", user);producer.send(record);Thread.sleep(1000);}producer.close();}
}
  1. 消费者示例:
package com.registry;import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;/*** @description:* @author: wangshuai* @create: 2020-01-03 14:34**/
public class ConfluentConsumer {public static void main(String[] args) throws Exception {Properties props = new Properties();props.put("bootstrap.servers", "192.168.101.42:9092");props.put("group.id", "test1");props.put("enable.auto.commit", "false");// 配置禁止自动提交,每次从头消费供测试使用props.put("auto.offset.reset", "earliest");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 使用Confluent实现的KafkaAvroDeserializerprops.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");// 添加schema服务的地址,用于获取schemaprops.put("schema.registry.url", "http://192.168.101.42:8081");KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("test-topic"));try {while (true) {ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, GenericRecord> record : records) {GenericRecord user = record.value();System.out.println("value = [user.id = " + user.get("id") + ", " + "user.name = "+ user.get("name") + ", " + "user.age = " + user.get("age") + "], "+ "partition = " + record.partition() + ", " + "offset = " + record.offset());}}} finally {consumer.close();}}
}
  1. 消费者消费结果示例。

Kafka 的 Confluent Schema Registry安装与使用教程相关推荐

  1. 【Kafka】Confluent Schema Registry

    原文:https://cloud.tencent.com/developer/article/1336568 1. schema 注册表 无论是使用传统的Avro API自定义序列化类和反序列化类还是 ...

  2. kafka Confluent Schema Registry 简单实践

    解释及目的: 使用传统的Avro API自定义序列化类和反序列化类或者使用Twitter的Bijection类库实现Avro的序列化与反序列化,这两种方法都有一个缺点:在每条Kafka记录里都嵌入了s ...

  3. Apache Kafka - Schema Registry

    关于我们为什么需要Schema Registry? 参考, https://www.confluent.io/blog/how-i-learned-to-stop-worrying-and-love- ...

  4. 【Kafka】Kafka Schema Registry 原理

    1.概述 原文:https://www.dazhuanlan.com/2019/12/10/5deec4add22d4/ Confluent 公司为了能让 Kafka 支持 Avro 序列化,创建了 ...

  5. Kafka系列(七)、Kafka套件 Confluent Platform 单机/集群部署

    目录 简介 单机部署 集群部署 尾巴 Kafka系列: kafka 2.4.1单机版部署及使用 kafka监控系统kafka eagle安装使用 滴滴开源的kafka-manager编译及部署使用 k ...

  6. kafka-rest和schema registry服务注册

    配置 一,下载confluent安装包,解压到linux目录,进入etc/kafka-rest/kafka-rest.properties kafka-rest.properties配置 id=kaf ...

  7. Docker Registry安装使用说明

    一.环境准备 准备两套CentOS 7.5,一套为Docker Registry,一套为Docker Client,都请安装Docker,方法请参见:https://blog.csdn.net/twi ...

  8. Confluent初次尝试安装和使用

    前言 需要使用到实时数据转移,因此使用Confluent试试看 MacOS 系统安装 安装和下载可参考:https://docs.confluent.io/5.5.0/installation/ins ...

  9. 2021年大数据Kafka(六):❤️安装Kafka-Eagle❤️

    全网最详细的大数据Kafka文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 安装Kafka-Eagle 一.Kafka-eagle基本介 ...

最新文章

  1. linux学习第三天 (Linux就该这么学)
  2. 当AI渗透到财务管理 未来人机协作机器人有望“独当一面”
  3. linux内核杂记(8)-进程调度(3)
  4. nginx——location 优先级
  5. BZOJ3473: 字符串【后缀数组+思维】
  6. 前端学习(3214):state的一个简洁方式
  7. java学习明确的路线规划奉上,助大家进阶
  8. (20)FPGA面试题时序设计的实质
  9. docker搭建单节点mongodb
  10. Java基础学习总结(166)——Google 13 条代码审查标准
  11. ImportError: No module named managers
  12. Python如何输出格式清晰的dict
  13. 51单片机数控电源c语言设计,单片机数控电源设计,含源代码,原理图
  14. script language=javascriptwindow.location.href=http://blog.securitycn.net/script
  15. Linux与Xshell:登陆服务器与后台执行程序
  16. yagmail发送邮件
  17. 最新最全MTK联发科手机芯片型号及参数汇总
  18. 推荐一个 github 项目 spider163,抓取网络数据,歌曲评论等数据
  19. 华为RH2288 V3安装Windows server/Linux详细教程
  20. python小案例程序安徽工程大学专用百词斩(刚接触python不喜勿喷)

热门文章

  1. System.UnauthorizedAccessException: Access to the path is denied
  2. 前端学习——仿原神官网顶部导航栏
  3. 大学生计算机论文评语简短,大学生毕业论文评语
  4. 天龙AVR-X4800H怎么样,天龙AVR-X4800H和X4700H区别对比
  5. 配置 zabbix 监控 MySQL
  6. 感恩节,感谢有你,感谢有AI!|中机智库编辑器
  7. CORESET#0与和Initial DL BWP
  8. AR项目总结之ER图
  9. keyshot4破解版 64位32位 免费版
  10. Linux那些事儿之我是U盘(50)跟着感觉走(二)