debezium系列之:Kafka Connect

  • 一、Source和Sink
  • 二、Task和Worker
  • 三、Kafka Connect特性
  • 四、独立模式
    • 1.Source连接器的用法
    • 2.Sink连接器的用法
  • 五、REST API
  • 六、分布式模式
  • 七、总结

Kafka Connect是一个工具,Kafka Connect为kafka和外部数据系统之间移动数据提供了一种可靠且可伸缩的实现方式。Kafka Connect可以简单快捷的将数据从Kafka中导入或导出,数据范围涵盖关系型数据库、日志和度量数据、Hadoop和数据仓库、NoSQL数据存储、搜索索引等。

一、Source和Sink

Kafka有两个核心概念:Source和Sink,Source和Sink都被称为Connector连接器。

  • Source负责导入数据到Kafka
  • Sink负责从Kafka导出数据

二、Task和Worker

在Kafka Connect中还有两个重要的概念:Task和Worker。

  • Task是Kafka Connect数据模型的主角,每一个Connector都会协调一系列的Task去执行任务,Connector可以把一项工作分割成许多Task,然后把Task分发到各个Worker进程中去执行(分布式模式下),Task不保存自己的状态信息,而是交给特定的Kafka主题去保存。
  • Connector和Task都是逻辑工作单位,比须安排在进程中执行,而在Kafka Connector中,这些进程就是Worker。

三、Kafka Connect特性

  • 通用性:规范化其他数据系统与Kafka的集成,简化了连接器的开发、部署和管理
  • 支持独立模式(standalone)和分布式模式(distributed)
  • REST接口:使用REST API提交和管理Connector
  • 自动位移管理:自动管理位移提交,不需要开发人员干预,降低了开发成本
  • 分布式和可扩展性:Kafka Connect基于现有的组管理协议来实现扩展Kafka Connect集群
  • 流式计算/批处理的集成。

四、独立模式

Kafka中的connect-standalone.sh脚本用来实现以独立的模式运行Kafka Connector。在独立模式下所有的操作都是在一个进程中完成的。

  • 独立模式适合测试或功能验证的场景
  • 由于是单进程,所以独立模式无法充分利用Kafka自身所提供的负载均衡和高容错等特性。

在执行这个脚本时需要指定两个配置文件:

  • 一个是用于Worker进程运行的相关配置文件。
  • 另一个是指定Source连接器或Sink连接器的配置文件,可以同时指定多个连接器配置,每个连接器配置文件对应一个连接器
  • 因此,要保证连接器名称全局唯一,连接器名称通过name参数指定。

1.Source连接器的用法

先了解下Source连接器的用法:将文件source.txt中的内容通过Source连接器写入Kafka的主题Topic-connect。

首先修改用于Worker进程运行的配置文件($KAFKA_HOME/config/connect-standalone.properties),内容参考如下:

bootstrap.servers = localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
  • bootstrap.servers参数用来配置与Kafka集群连接的地址。
  • key.converter和value.converter参数指定Kafka消息中key和value的格式转化类。本例中使用JsonConverter来将每一条消息的key和value都转化成JSON格式。
  • key.converter.schemas.enable和value.converter.schemas.enable参数用来指定JSON消息中是否可以包含schema。
  • offset.storage.file.filename参数用于指定保存偏移量的文件路径。
  • offset.flush.interval.ms参数用于设定提交偏移量的频率。

接下来修改Source连接器的配置文件($KAFKA_HOME/config/connect-file-source.properties),内容参考如下:

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=/opt/kafka_2.11-2.0.0/source.txt
topic=topic-connect
  • name参数用来配置连接器的名称。
  • connector.class用来设置连接器类的全限定名称。Kafka Connect会在classpath中自动搜索这个类并加载。
  • 相关连接器主要有:kafka-connect-elasticsearch,kafka-connect-jdbc,kafka-connect-hdfs,kafka-connect-storage-cloud
  • task.max参数指定了Task的数量。
  • file参数指定该连接器数据源文件路径,指定了Kafka根目录下的source.txt文件,在启动连接器前需要先创建好。
  • topic参数设置连接器把数据导入哪个主题,如果该主题不存在,则连接器会自动创建,不过建议最好还是提前手工创建该主题。

比如对本例中的主题topic-connect而言,可以事先创建,详细信息如下:

bin/kafka-topic.sh --zookeeper localhost:2181/kafka --create --topic topic-connect --replication-factor 1 --partitions 1

启动Source连接器,示例如下:

bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties

连接器启动后,向source.txt文件中输入两条句子:

echo "hello kafka connect" >> source.txt
echo "hello kafka connect" >> source.txt

之后可以观察主题topic-connect中是否包含这两条消息。既可以使用kafka-console-consumer.sh脚本,也可以使用kafka-dump-log.sh脚本来查看内容。

bin/kafka-dump-log.sh --files /tmp/kafka-logs/topic-connect-0/00000000000000000000.log --print-data-log

2.Sink连接器的用法

再来看一下Sink连接器的用法:将主题topic-connect中的内容通过Sink连接器写入文件sink.txt。对config/connect-standalone.properties文件做修改,参考如下:

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
  • 将Kafka消息中的key和value的格式转化类指定为StringConverter

再配置Sink连接器的配置文件($KAFKA_HOME/config/connect-file-sink.properties),内容参考如下:

name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=/opt/kafka_2.11-2.0.0/sink.txt
topics=topic-connect

接下来启动Sink连接器

bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-sink.properties

往主题topic-connect中发送一条消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-connecthello kafka

就可以在sink.txt文件中看到这条消息

cat sink.txt
hello kafka

五、REST API

可以通过Kafka Connect提供的基于REST风格的API接口来管理连接器,默认端口号8083,可以通过Worker进程的配置文件中的rest.port参数来修改端口号。

Kafka Connect REST API接口如下所示:

REST API 释义
GET / 查看kafka集群版本信息
GET /connectors 查看当前活跃的连接器列表,显示连接器的名字
POST /connectors 根据指定配置,创建一个新的连接器
GET /connectors/{name} 查看置顶连接器的信息
GET /connectors/{name}/config 查看连接器的配置信息
GET /connectors/{name}/status 查看连接器的状态
POST /connectors/{name}/restart 重启指定的连接器
PUT /connectors/{name}/pause 暂停指定的连接器
GET /connectors/{name}/tasks 查看指定连接器正在运行的Task
POST /connectors/{name}/tasks 修改Task的配置
GET /connector/{name}/tasks/{taskId}/status 查看指定连接器中指定Task的状态
POST /connectors/{name}/tasks/{taskId}/restart 重启指定连接器中指定的Task
Delete /connectors/{name} 删除指定的连接器

六、分布式模式

与独立模式不同,分布式模式结合了Kafka提供的负载均衡和故障转移功能,能够自动在多个节点机器上平衡负载。分布式模式只能通过访问REST API来创建连接器。

在运行分布式模式的连接器前,同样要修改Worker进程的相关配置文件($KAFKA_HOME/config/connect-distributed.properties),内容参考如下:

bootstrap.servers=localhost1:9092,localhost2:9092,localhost3:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

启动分布式模式,运行脚本变成了对应的connect-distributed.sh,示例如下所示:

bin/connect-distributed.sh config/connect-distributed.properties

接下来调用POST /connectors接口来创建指定的连接器,示例如下:

curl -u debezium:************* 'http://debezium-001:8083/connectors' -X POST -i -H "Content-Type:application/json" -d
'
{"name":"mysql-optics-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","task.max":1,"database.hostname":"10.10.128.146","database.port":"3306","database.dbname":"unified_view_test","database.user":"insight_admin","database.password":"Kp1Kd8XkzMI8MgnzZfq","database.server.id":2022032008,"database.server.name":"debezium-optics-test","database.include.list":"unified_view_test","table.include.list":"unified_view_test.retail_order_detail","database.history.kafka.bootstrap.servers":"kafka-002:9092,kafka-003:9092,kafka-001:9092","database.history.kafka.topic":"history-debezium-mysql-optics-test"}
}
'

接下来就可以向distribute-source.txt文件中写入内容,然后订阅消费主题topic-distribute-source中的消息来验证是否成功。

使用完毕后,可以调用DELETE /connectors/{name}接口来删除对应的连接器。

curl -i -X DELETE http://localhost:8083/connectors/local-file-distribute-source

七、总结

基于Kafka Connect加载debezium插件的更多的内容可以参考博主以下几篇技术博客,更多关于Debezium的技术可以阅读博主的debezium专栏:

  • Debezium系列之:安装部署debezium详细步骤,并把debezium服务托管到systemctl
  • Debezium系列之:打通Debezium2.0以上版本的使用技术
  • Debezium系列之:安装部署debezium2.0以上版本的详细步骤
  • Debezium系列之:实现接入上千Mysql、Sqlserver、MongoDB、Postgresql数据库的Debezium集群从Debezium1.X版本升级到Debezium2.X版本
  • Debezium系列之:安装jmx导出器监控debezium指标
  • Debezium系列之:Debezium UI部署详细步骤
  • Debezium 专栏地址

debezium系列之:Kafka Connect相关推荐

  1. [实战系列]SelectDB Cloud Kafka Connect 最佳实践张家锋

    概述 企业正在经历其数据资产的爆炸式增长,这些数据包括批式或流式传输的结构化.半结构化以及非结构化数据,随着海量数据批量导入的场景的增多,企业对于 Data Pipeline 的需求也愈加复杂.新一代 ...

  2. Debezium系列之:使用Debezium接入PostgreSQL数据库数据到Kafka集群的详细技术文档

    Debezium系列之:使用Debezium接入PostgreSQL数据库数据到Kafka集群的详细技术文档 一.概述 二.连接器的工作原理 1.安全 2.快照 3.Ad hoc snapshots ...

  3. Debezium系列之:使用Debezium接入SQL Server数据库数据到Kafka集群的详细技术文档

    Debezium系列之:使用Debezium接入SQL Server数据库数据到Kafka集群的详细技术文档 一.Debezium概述 二.SQL Server 连接器的工作原理 1.Snapshot ...

  4. Debezium系列- kafka connet debug

    背景 使用Debezium采集MySQL Binlog 集成到Hive中,采坑(一)(二) 中对问题的猜想还没得到证实,又发现了 Debezium 采集 MySQL 时间转换时间错乱问题,关于时间错乱 ...

  5. Debezium系列之:实现不同表中的数据始终发往对应的kafka topic分区,支持根据表中任意字段分发数据到Kafka topic多个分区

    Debezium系列之:实现不同表中的数据始终发往对应的kafka topic分区,实现支持根据表中任意字段分发数据到Kafka topic多个分区 一.需求背景 二.ComputePartition ...

  6. Kafka系列之:详细介绍部署Kafka Connect分布式集群

    Kafka系列之:详细介绍部署Kafka Connect分布式集群 一.部署分布式Kafka集群详细步骤 二.Kafka Worker节点安装部署Kafka 三.修改connect-distribut ...

  7. Kafka Connect简介

    一. Kafka Connect简介 Kafka是一个使用越来越广的消息系统,尤其是在大数据开发中(实时数据处理和分析).为何集成其他系统和解耦应用,经常使用Producer来发送消息到Broker, ...

  8. Debezium系列之:安装部署debezium2.0以上版本的详细步骤

    Debezium系列之:安装部署debezium2.0以上版本的详细步骤 一.相关技术博客 二.升级debezium2.0以上版本注意事项 三.安装jdk 四.修改kafka相关参数 五.启动kafk ...

  9. Debezium系列之:Debezium2.X之PostgreSQL数据库的Debezium连接器

    Debezium系列之:Debezium2.X之PostgreSQL数据库的Debezium连接器 一.概述 二.连接器的工作原理 1.安全 2.快照 3.临时快照 4.触发临时快照 5.增量快照 6 ...

最新文章

  1. python小游戏系列记忆宫殿,儿时的回忆
  2. centos7使用蓝牙_Nmon的使用和APP测试要点
  3. 决心开始写博,坚持!
  4. java字符数统计_【JAVA300例】51、统计输入的字符串中各种字符的字符数
  5. php 评论插件,Typecho评论增强插件:TeComment(2017.09.07更新)
  6. mac os x 安装 wireshark 的问题
  7. JAVA读、写EXCEL文件
  8. 纠正存储 dict 的元素前是计算 key 的 hash 值?
  9. centos安装mysql5.7.26_Centos安装mysql5.7.26
  10. AD19改变原理图图纸大小(A4改为A3
  11. windows2003安装网络打印机的问题(原创,转载请注明)
  12. SDxCentral 2015年NFV报告
  13. excel两个表格数据对比_Excel表格中数据比对和查找的几种技巧
  14. 燃烧的远征服务器排队小程序,请排队-在线排队叫号微信小程序
  15. 电视盒子系统是安卓还是yunOS,三招快速弄清
  16. 阻容降压电路:每个元器件计算选型
  17. Windows 2000驱动程序的设计
  18. P1551 亲戚 并查集
  19. DOS环境进入及基本命令DOS
  20. Mysql设计学生宿舍管理系统+考勤系统

热门文章

  1. Top10Servlet
  2. chrome控制台酷炫主题
  3. Android实现系统相册选择APP全局背景图片
  4. 如何用条码标签打印软件实现商品价签制定会员价...
  5. Cauchy积分公式的一个推广形式
  6. 【android】半角符号与全角符号的转换
  7. openwrt实现url过滤
  8. http状态码413,并提示Request Entity Too Large的解决办法
  9. Android Studio调用百度API(图片文字识别)
  10. Caffe: 贾扬清2015年讲座