kafka connector 使用总结以及自定义connector开发
Kafaka connect 是一种用于在Kafka和其他系统之间可扩展的、可靠的流式传输数据的工具。它使得能够快速定义将大量数据集合移入和移出Kafka的连接器变得简单。Kafka Connect可以从数据库或应用程序服务器收集数据到Kafka topic,使数据可用于低延迟的流处理。导出作业可以将数据从Kafka topic传输到二次存储和查询系统,或者传递到批处理系统以进行离线分析。
Kafaka connect的核心组件:
Source:负责将外部数据写入到kafka的topic中。
Sink:负责从kafka中读取数据到自己需要的地方去,比如读取到HDFS,hbase等。
Connectors :通过管理任务来协调数据流的高级抽象
Tasks:数据写入kafk和从kafka中读出数据的具体实现,source和sink使用时都需要Task
Workers:运行connectors和tasks的进程
Converters:kafka connect和其他存储系统直接发送或者接受数据之间转换数据,
converter会把bytes数据转换成kafka connect内部的格式,也可以把kafka connect内部存储格式的数据转变成bytes,converter对connector来说是解耦的,所以其他的connector都可以重用,例如,使用了avro converter,那么jdbc connector可以写avro格式的数据到kafka,当然,hdfs connector也可以从kafka中读出avro格式的数据。
Transforms:一种轻量级数据调整的工具
Kafka connect 工作模式:
Kafka connect 有两种工作模式:
standalone:在standalone模式中,所有的worker都在一个独立的进程中完成。
distributed:distributed模式具有高扩展性,以及提供自动容错机制。你可以使用一个group.ip来启动很多worker进程,在有效的worker进程中它们会自动的去协调执行connector和task,如果你新加了一个worker或者挂了一个worker,其他的worker会检测到然后在重新分配connector和task。
本文作者:张永清,转载请注明出处:https://www.cnblogs.com/laoqing/p/11927958.html
在分布式模式下通过rest api来管理connector。
connector的常见管理操作API:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
|
如何开发自己的Connector:
1、引入maven依赖。
1 2 3 4 5 |
|
2、开发自定义的Source
开发自定义的Source 需要继承实现SourceConnector和SourceTask这两个抽象类,实现抽象类中的未实现的方法或者重写抽象类中的方法。
本文作者:张永清,转载请注明出处:https://www.cnblogs.com/laoqing/p/11927958.html
A、开发自定义的SourceConnector
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
|
B、开发Source对应的Task
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
|
3、开发自定义的Sink
开发自定义的Sink 需要继承实现SinkConnector和SinkTask这两个抽象类,实现抽象类中的未实现的方法或者重写抽象类中的方法。
A、开发自定义的SinkConnector
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
|
B、开发Sink对应的Task
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
|
Kafka Connect Configs
开源的实现的比较好的connector项目:
https://github.com/debezium/debezium
https://github.com/confluentinc
https://docs.confluent.io/current/connect/managing/connectors.html
这里我们以https://github.com/debezium/debezium 中的debezium-connector-mongodb 为例配置connector的standalone模式运行
从github中获取debezium-connector-mongodb-0.9.5.Final.jar 包,放到kafka的libs目录下,并且把mongodb相关的jar包一起放入到libs下。
在config目录下新建对应的mongodb.properties 属性配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
|
配置解释如下:
详情参考:https://debezium.io/documentation/reference/0.10/connectors/mongodb.html
https://docs.confluent.io/current/connect/debezium-connect-mongodb/mongodb_source_connector_config.html
Property | Default | Description |
---|---|---|
|
|
Specifies the maximum number of documents that should be read in one go from each collection while taking a snapshot. The connector will read the collection contents in multiple batches of this size. |
|
Unique name for the connector. Attempting to register again with the same name will fail. (This property is required by all Kafka Connect connectors.) |
|
|
The name of the Java class for the connector. Always use a value of |
|
|
The comma-separated list of hostname and port pairs (in the form 'host' or 'host:port') of the MongoDB servers in the replica set. The list can contain a single hostname and port pair. If |
|
|
A unique name that identifies the connector and/or MongoDB replica set or sharded cluster that this connector monitors. Each server should be monitored by at most one Debezium connector, since this server name prefixes all persisted Kafka topics emanating from the MongoDB replica set or cluster. |
|
|
Name of the database user to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication. |
|
|
Password to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication. |
|
|
|
Connector will use SSL to connect to MongoDB instances. |
|
|
When SSL is enabled this setting controls whether strict hostname checking is disabled during connection phase. If |
|
empty string |
An optional comma-separated list of regular expressions that match database names to be monitored; any database name not included in the whitelist will be excluded from monitoring. By default all databases will be monitored. May not be used with |
|
empty string |
An optional comma-separated list of regular expressions that match database names to be excluded from monitoring; any database name not included in the blacklist will be monitored. May not be used with |
|
empty string |
An optional comma-separated list of regular expressions that match fully-qualified namespaces for MongoDB collections to be monitored; any collection not included in the whitelist will be excluded from monitoring. Each identifier is of the form databaseName.collectionName. By default the connector will monitor all collections except those in the |
|
empty string |
An optional comma-separated list of regular expressions that match fully-qualified namespaces for MongoDB collections to be excluded from monitoring; any collection not included in the blacklist will be monitored. Each identifier is of the form databaseName.collectionName. May not be used with |
|
|
Specifies the criteria for running a snapshot (eg. initial sync) upon startup of the connector. The default is initial, and specifies the connector reads a snapshot when either no offset is found or if the oplog no longer contains the previous offset. The never option specifies that the connector should never use snapshots, instead the connector should proceed to tail the log. |
|
empty string |
An optional comma-separated list of the fully-qualified names of fields that should be excluded from change event message values. Fully-qualified names for fields are of the form databaseName.collectionName.fieldName.nestedFieldName, where databaseName and collectionName may contain the wildcard (*) which matches any characters. |
|
empty string |
An optional comma-separated list of the fully-qualified replacements of fields that should be used to rename fields in change event message values. Fully-qualified replacements for fields are of the form databaseName.collectionName.fieldName.nestedFieldName:newNestedFieldName, where databaseName and collectionName may contain the wildcard (*) which matches any characters, the colon character (:) is used to determine rename mapping of field. The next field replacement is applied to the result of the previous field replacement in the list, so keep this in mind when renaming multiple fields that are in the same path. |
|
|
The maximum number of tasks that should be created for this connector. The MongoDB connector will attempt to use a separate task for each replica set, so the default is acceptable when using the connector with a single MongoDB replica set. When using the connector with a MongoDB sharded cluster, we recommend specifying a value that is equal to or more than the number of shards in the cluster, so that the work for each replica set can be distributed by Kafka Connect. |
|
|
Positive integer value that specifies the maximum number of threads used to perform an intial sync of the collections in a replica set. Defaults to 1. |
|
|
Controls whether a tombstone event should be generated after a delete event. |
|
An interval in milli-seconds that the connector should wait before taking a snapshot after starting up; |
The following advanced configuration properties have good defaults that will work in most situations and therefore rarely need to be specified in the connector’s configuration.
Property | Default | Description |
---|---|---|
|
|
Whether field names will be sanitized to adhere to Avro naming requirements. See Avro namingfor more details. |
|
|
Positive integer value that specifies the maximum size of the blocking queue into which change events read from the database log are placed before they are written to Kafka. This queue can provide backpressure to the oplog reader when, for example, writes to Kafka are slower or if Kafka is not available. Events that appear in the queue are not included in the offsets periodically recorded by this connector. Defaults to 8192, and should always be larger than the maximum batch size specified in the |
|
|
Positive integer value that specifies the maximum size of each batch of events that should be processed during each iteration of this connector. Defaults to 2048. |
|
|
Positive integer value that specifies the number of milliseconds the connector should wait during each iteration for new change events to appear. Defaults to 1000 milliseconds, or 1 second. |
|
|
Positive integer value that specifies the initial delay when trying to reconnect to a primary after the first failed connection attempt or when no primary is available. Defaults to 1 second (1000 ms). |
|
|
Positive integer value that specifies the maximum delay when trying to reconnect to a primary after repeated failed connection attempts or when no primary is available. Defaults to 120 seconds (120,000 ms). |
|
|
Positive integer value that specifies the maximum number of failed connection attempts to a replica set primary before an exception occurs and task is aborted. Defaults to 16, which with the defaults for |
|
|
Boolean value that specifies whether the addresses in 'mongodb.hosts' are seeds that should be used to discover all members of the cluster or replica set ( |
|
v2 |
Schema version for the |
|
|
Controls how frequently heartbeat messages are sent. Set this parameter to |
|
|
Controls the naming of the topic to which heartbeat messages are sent. |
这里以standalone的模式运行,在connect-standalone.properties中做如下配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
|
standalone模式下启动方式如下:
bin/connect-standalone.sh config/connect-standalone.properties connector1.properties[connector2.properties ...] 一次可以启动多个connector,只需要在参数中加上connector的配置文件路径即可。
例如:connect-standalone.sh config/connect-standalone.properties mongodb.properties
distribute模式部署:
1、修改配置connect-distributed.properties
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
|
2、手动创建集群模式所必须的kafka的几个topic
1 2 3 4 5 6 7 8 |
|
- config.storage.topic:topic用于存储connector和任务配置;注意,这应该是一个单个的partition,多副本的topic
- offset.storage.topic:用于存储offsets;这个topic应该配置多个partition和副本。
- status.storage.topic:用于存储状态;这个topic 可以有多个partitions和副本
3、 启动worker
启动distributed模式命令如下:
./bin/connect-distributed ./etc/kafka/connect-distributed.properties
4、使用restful启动connect
1 2 3 4 5 6 7 8 9 10 |
|
常见问题:
1、在启动的过程中出现各种各样的java.lang.ClassNotFoundException。
在启动connector的时候,一开始总是会报各个各样的ClassNotFoundException,不是这个包就是那个包,查找问题一直说要么缺少包要么是包冲突,那么要排除依赖冲突或者看下是不是少了jar包。
2、在connector.properties中的key.converter.schemas.enable=false和value.converter.schemas.enable=false的问题。
这个选项默认在connect-standalone.properties中是true的,这个时候发送给topic的Json格式是需要使用avro格式。例如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
|
如果想发送普通的json格式而不是avro格式的话,很简单key.converter.schemas.enable和value.converter.schemas.enable设置为false就行。这样就能发送普通的json格式数据。
kafka connector 使用总结以及自定义connector开发相关推荐
- Flink SQL自定义connector
本文翻译自:Flink v1.11官方文档 动态表是Flink的Table&SQL API的核心概念,用于以统一方式处理有界和无界数据. 因为动态表只是一个逻辑概念,所以Flink并不拥有数据 ...
- Flink SQL 自定义 Connector
文章目录 一.Flink Table 实现架构图: 二.Flink Table Conncetor流程解析 1.定义动态工厂类 2.定义Connector Source/Sink类 (1)定义Conn ...
- Thingworx自定义扩展开发(一)- 开发环境搭建、Extension Demo Coding
系列 Thingworx自定义扩展开发(一)- 开发环境搭建.Extension Demo Coding Thingworx自定义扩展开发(二)- Widget Demo Coding Thingwo ...
- asp.net core mcroservices 架构之 分布式日志(二)之自定义日志开发
netcore日志原理 netcore的日志是作为一个扩展库存在的,每个组件都有它的入口,那么作为研究这个组件的入口是最好的,首先看两种方式: 这个是源码例子提供的. 1 var loggingCon ...
- javaweb学习总结(二十三)——jsp自定义标签开发入门
一.自定义标签的作用 自定义标签主要用于移除Jsp页面中的java代码. 二.自定义标签开发和使用 2.1.自定义标签开发步骤 1.编写一个实现Tag接口的Java类(标签处理器类) 1 packag ...
- 自定义View开发时踩的坑
在这篇博客中,我希望按照日记的方式来记录自己在android的自定义视图开发中所遇到的一些问题: 1.首先是在自定义view中获取对应的子视图,在自定义视图中定义了很多成员变量,按照个人的代码编写习惯 ...
- 转载:javaweb学习总结(二十三)——jsp自定义标签开发入门
javaweb学习总结(二十三)--jsp自定义标签开发入门 转自:http://www.cnblogs.com/xdp-gacl/p/3916734.html 一.自定义标签的作用 自定义标签主要用 ...
- javaweb学习总结(二十三):jsp自定义标签开发入门
一.自定义标签的作用 自定义标签主要用于移除Jsp页面中的java代码. 二.自定义标签开发和使用 2.1.自定义标签开发步骤 1.编写一个实现Tag接口的Java类(标签处理器类) 1 packag ...
- MS CRM 2011的自定义与开发(2)——预备知识
在真刀真枪开始自定义与开发之前,首先介绍一下需要用到的一系列术语等内容,以便后续的交流更加顺畅. 在MS CRM中,经常会说到实体Entity,属性Attribute以及关联Relationship, ...
最新文章
- react input[type='number']
- 方向控制中的动态比例值-P
- mysql eval,mysql中是否有类似于eval的写法的,答案在这里
- JEECG Excel 介绍篇
- ntop linux,linux下安装ntop
- 关于Oracle RAC调整网卡MTU值的问题
- 将指定文件中的空格或换行删除(可选是否创建一个新文件)
- SAP GUI 安全性 下载文件
- python漏洞扫描器爬虫_Python系列之——漏洞平台厂商列表爬虫
- C#开发工控上位机编程 csdn_5种将死的编程语言
- linux把m4s格式转换mp4,m4s格式(B站m4s怎么转换成MP4)
- GL-Currencies-Rates-Daily:Error:APP-FND-01206: This record already exists-文档 ID 292731.1
- JAVA音程_大三度和小三度
- 纯c++读取与显示BMP图片
- cisco路由器启动过程
- 15支持哪些数据库版本 tfs_版本和支持的功能 - SQL Server 2016 | Microsoft Docs
- cJSON笔记——三种结构的cJSON数组
- AGM FPGA与CPLD烧录说明
- MySQL 是如何归档数据的呢?
- VUE设置浏览器icon图标
热门文章
- Mstar的Monitor方案笔记(六)——OSD添加新的主菜单
- 用AI变革新药研发,终极目标是延缓衰老,这家创业公司迎来“里程碑”
- Python matplotpy颜色表(python画图常用颜色)
- win10如何设置pdf默认打开方式
- python二级题库 第四套 附刷题软件
- 我这些年从来没有用过算法,除了出去面试的时候
- 二叉树的遍历和线索线索二叉树
- spring-boot-maven-plugin:3.0.0:repackage 报错【解决】
- 【故障处理】队列等待之enq: TX - row lock contention
- matlab没有关联m文件,matlab 不关联m文件的解决方法