官网地址:Apache Kafka

概念

Kafka Connect 是一款可扩展并且可靠地在 Apache Kafka 和其他系统之间进行数据传输的工具。 可以很简单的定义 connectors(连接器) 将大量数据迁入、迁出Kafka。

逻辑图

Kafka Connect 特性如下:

  • Kafka 连接器的通用框架:Kafka Connect 标准化了其他数据系统与Kafka的集成,从而简化了连接器的开发,部署和管理
  • 支持分布式模式和单机模式部署
  • Rest API:通过简单的Rest API管理连接器
  • 偏移量管理:针对Source和Sink都有相应的偏移量(Offset)管理方案,程序员无须关心Offset 的提交
  • 分布式模式可扩展的,支持故障转移

Connectors

连接器,分为两种 Source(从源数据库拉取pull数据写入Kafka),Sink(从Kafka消费数据写入push目标数据)

连接器其实并不参与实际的数据copy,连接器负责管理Task。连接器中定义了对应Task的类型,对外提供配置选项(用户创建连接器时需要提供对应的配置信息)。并且连接器还可以决定启动多少个Task线程。

用户可以通过Rest API 启停连接器,查看连接器状态

Confluent 已经提供了许多成熟的连接器,官网地址:Confluent Connector Portfolio

Task

实际进行数据传输的单元,和连接器一样同样分为 Source和Sink

Task的配置和状态存储在Kafka的Topic中,config.storage.topicstatus.storage.topic。我们可以随时启动,停止任务,以提供弹性、可扩展的数据管道

Worker

刚刚我们讲的Connectors 和Task 属于逻辑单元,而Worker 是实际运行逻辑单元的进程,Worker 分为两种模式,单机模式和分布式模式

单机模式:比较简单,但是功能也受限,只有一些特殊的场景会使用到,例如收集主机的日志,通常来说更多的是使用分布式模式

分布式模式:为Kafka Connect提供了可扩展和故障转移。相同group.id的Worker,会自动组成集群。当新增Worker,或者有Worker挂掉时,集群会自动协调分配所有的Connector 和 Task(这个过程称为Rebalance)

当使用Worker集群时,创建连接器,或者连接器Task数量变动时,都会触发Rebalance 以保证集群各个Worker节点负载均衡。但是当Task 进入Fail状态的时候并不会触发 Rebalance,只能通过Rest Api 对Task进行重启。

Converters

Kafka Connect 通过 Converter 将数据在Kafka(字节数组)与Task(Object)之间进行转换

默认支持以下Converter

  • AvroConverter io.confluent.connect.avro.AvroConverter: 需要使用 Schema Registry
  • ProtobufConverter io.confluent.connect.protobuf.ProtobufConverter: 需要使用 Schema Registry
  • JsonSchemaConverter io.confluent.connect.json.JsonSchemaConverter: 需要使用 Schema Registry
  • JsonConverter org.apache.kafka.connect.json.JsonConverter (无需 Schema Registry): 转换为json结构
  • StringConverter org.apache.kafka.connect.storage.StringConverter: 简单的字符串格式
  • ByteArrayConverter org.apache.kafka.connect.converters.ByteArrayConverter: 不做任何转换

Converters 与 Connector 是解耦的,下图展示了在Kafka Connect中,Converter 在何时进行数据转换

Transforms

连接器可以通过配置Transform 实现对单个消息(对应代码中的Record)的转换和修改,可以配置多个Transform 组成一个。例如让所有消息的topic加一个前缀、sink无法消费source 写入的数据格式,这些场景都可以使用Transform 解决

Transform 如果配置在Source 则在Task之后执行,如果配置在Sink 则在Task之前执行。

快速上手

使用步骤:

1. 启动zookeeper

bin/zookeeper-server-start.sh  config/zookeeper.properties

2.启动kafka

bin/kafka-server-start.sh config/server.properties

3.准备数据库信息MySQL

新建一个test_user表

CREATE TABLE `test_user` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`name` varchar(255) DEFAULT NULL,PRIMARY KEY (`id`)
) ;

并创建之后随便造几条模拟测试数据。

准备连接器,这里我是自己写了一个简单的连接器

kafka(15) Kafka Connect相关推荐

  1. 【kafka】kafka 脚本 kafka-run-class.sh 使用介绍 jmx监控 查看jmx信息

    文章目录 1.概述 2.Consumer Offset Checker 3.Dump Log Segment 4.导出Zookeeper中Group相关的偏移量 5.通过JMX获取metrics信息 ...

  2. Linux上安装Kafka和Kafka的使用

    如有侵权,私信立删 修改时间:2020年3月30日 作者:pp_x 邮箱:pp_x12138@163.com 文章目录 Kafka集群搭建 虚拟机中搭建 Kafka 集群 准备工作 Zookeeper ...

  3. 【Kafka】Kafka 增量 Rebalancing: Support and Policies

    1.概述 翻译:Incremental Cooperative Rebalancing: Support and Policies 2.介绍 在Apache Kafka中,分布式应用进程之间的再平衡得 ...

  4. 【kafka】kafka 生态系统 Ecosystem

    1.概述 官网:kafka 生态系统 Ecosystem 下面是我们被告知在主发行版之外与Kafka集成的工具列表.我们还没有全部尝试过,所以它们可能不起作用! 当然,Clients在这里是单独列出的 ...

  5. 【Kafka】Kafka The valid options based on currently configured listeners are PLAINTEXT,SSL

    1.背景 背景已经忘记了: [lcc@lcc ~/soft/kafka/kafka_2.11-1.1.0_author_scram]$ bin/kafka-server-start.sh config ...

  6. 【Kafka】Kafka 配置 SASL_SSL jks鉴权验证方式

    文章目录 1.概述 2.配置server 3.配置zk 4.配置消费者 consumer.properties 5.生成 SSL certificates 6.zookeeper_jaas.conf ...

  7. kafka实战教程(python操作kafka),kafka配置文件详解

    全栈工程师开发手册 (作者:栾鹏) 架构系列文章 应用往Kafka写数据的原因有很多:用户行为分析.日志存储.异步通信等.多样化的使用场景带来了多样化的需求:消息是否能丢失?是否容忍重复?消息的吞吐量 ...

  8. 【kafka】Kafka 可视化工具Kafka Eagle安装和使用

    一.背景 Kafka产线环境需要管理的Topic和Consumser越来越多,使用命令行工具进行管理会非常繁杂.因此,大数据平台上需要一套Kafka的管理监控系统,Kafka-Eagle. Kafka ...

  9. 【Kafka】Kafka简单总结

    1.Kafka架构图 2.Kafka的机器数量 Kafka机器数量=2*(峰值生产速度*副本数/100)+1 3.Kafka的日志保存时间 默认保存7天 4.Kafka的硬盘大小 每天的数据量 * 7 ...

最新文章

  1. C++中的union(联合体,共用体,数据变量可以共享内存,以节省内存空间)
  2. Sublime Text 3(中文)在Windows下的配置、安装、运行
  3. 『设计模式』开发设计的七大原则,我做人还是挺有原则,那些代码呢?
  4. 计算机修改文字试题,计算机文字处理试题.doc
  5. 正则表达式符号特殊详解_常用正则表达式_Java中正则表达式的使用
  6. Python基础学习总结(六)
  7. RocketMQ(十二)消息堆积与消费延迟
  8. SpringCloud面试题及答案
  9. 如何在yml中加上git用户名和密码的验证_使用Apollo升级一下yml文件管理和发布
  10. 吴恩达CNN卷积神经网络第2周作业ResNets
  11. (1)-使用json所要用到的jar包下载
  12. C语言:运行中获取宏名字的技巧
  13. Axure RP 9基础教程(1)——界面及基本操作(看完就会画)
  14. win7下cmd乱码
  15. 下行文格式图片_写信封的正确格式图片 看完这些你就懂了
  16. 【网络协议】IPV4协议介绍
  17. 高德地图标识大全_连地震都查得一清二楚!高德地图新功能体验
  18. 在教育孩子上少一点功利心,就会快乐?
  19. GitHub界面各个页签作用
  20. 辅助分类器遇上Domain Adaptation:连续性与不确定性

热门文章

  1. Android 10开启调试模式
  2. 第六章——总线系统(2)
  3. c语言转汇编编译器,【转】C语言内嵌汇编(asm)
  4. 大一期末程序课程设计 C/C++实现简单学生学籍管理系统
  5. 关于.Net MAUI
  6. weiphp2.0:关于OneThink后台添加密码重置的功能
  7. Linux chmod命令用法
  8. 气象卫星_全国自动站资料文件名规定(1)
  9. 中标麒麟v7服务器宕机问题分析
  10. MySQL中的char和varcharmysql中varchar能存多少汉字、数字,以及varchar(100)和varchar(10)的区别