《OpenShift 4.x HOL教程汇总》

Kafka Connect是一种可扩展的和可靠的连接Kafka框架与外部系统的框架。通过不同的Connector可以访问如数据库,键值存储,搜索索引和文件系统等不同的数据源。

本示例将配置Kafka Connect,从文件中读出数据然后发送到Topic上,最后再通过Consumer读出来。

  1. 创建内容如下的kafka-connect.yaml文件。
apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnect
metadata:name: my-connect
spec:replicas: 1resources:requests:memory: 512Micpu: 200mlimits:memory: 2Gicpu: 500mreadinessProbe:initialDelaySeconds: 120timeoutSeconds: 5livenessProbe:initialDelaySeconds: 120timeoutSeconds: 5bootstrapServers: my-cluster-kafka-bootstrap:9092config:key.converter: org.apache.kafka.connect.storage.StringConvertervalue.converter: org.apache.kafka.connect.storage.StringConverterkey.converter.schemas.enable: falsevalue.converter.schemas.enable: false
  1. 执行命令创建Kafka Connect,然后查看相关资源的状态,可以看到有Deployment、ReplicaSet、Service和Pod资源。
$ oc apply -f kafka-connect.yaml -n kafka
$ oc get all -l strimzi.io/cluster=my-connect
NAME                                      READY   STATUS    RESTARTS   AGE
pod/my-connect-connect-75ddc48968-wd2bb   1/1     Running   1          8mNAME                             TYPE        CLUSTER-IP     EXTERNAL-IP   PORT(S)    AGE
service/my-connect-connect-api   ClusterIP   172.30.49.47   <none>        8083/TCP   8mNAME                                 READY   UP-TO-DATE   AVAILABLE   AGE
deployment.apps/my-connect-connect   1/1     1            1           8mNAME                                            DESIRED   CURRENT   READY   AGE
replicaset.apps/my-connect-connect-75ddc48968   1         1         1       8m
  1. 下载Kafka Connect访问的数据文件到data目录中。
$ curl -LO https://raw.githubusercontent.com/liuxiaoyu-git/OpenShift-HOL/master/uber.csv
$ mkdir data
$ mv uber.csv data/
  1. 为了演示,我们将数据文件传到容器内部。
$ oc project kafka
$  oc rsync data/ my-connect-connect-75ddc48968-wd2bb:/tmp
sending incremental file list
./
./uber.csv
  1. 进入运行Kafka Connect的Pod,确认数据文件,然后创建source-plugin.json文件。它使用了/tmp/uber.csv作为FileStreamSource类型的Connect数据源,并将数据发送到名为my-topic-2的Kafka Topic。
$ oc rsh my-connect-connect-75ddc48968-wd2bb
$ ls /tmp/uber.cvs
$ cat <<EOF >> /tmp/source-plugin.json
{"name": "source-test","config": {"connector.class": "FileStreamSource","tasks.max": "3","topic": "my-topic-2","file": "/tmp/uber.csv"}
}
EOF
$ exit
  1. 创建内容如下的my-topic2.yaml文件,其中定义了名为my-topic-2的KafkaTopic。
apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaTopic
metadata:name: my-topic-2labels:strimzi.io/cluster: my-cluster
spec:partitions: 3replicas: 2config:retention.ms: 7200000segment.bytes: 1073741824
  1. 执行命令创建名为my-topic2的KafkaTopic,然后查看其状态。
$ oc apply -f my-topic2.yaml -n kafka
$ oc get kafkatopic my-topic2 -n kafka
NAME        PARTITIONS   REPLICATION FACTOR
my-topic2   3            2
  1. 创建内容如下的connector-consumer.yaml,
apiVersion: extensions/v1beta1
kind: Deployment
metadata:labels:app: connector-consumername: connector-consumer
spec:replicas: 1template:metadata:labels:app: connector-consumerspec:containers:- name: connector-consumerimage: strimzi/hello-world-consumer:latestresources:limits:cpu: "2"memory: 2Girequests:cpu: "1"memory: 1Gienv:- name: BOOTSTRAP_SERVERSvalue: my-cluster-kafka-bootstrap:9092- name: TOPICvalue: my-topic-2- name: GROUP_IDvalue: my-hello-world-consumer- name: LOG_LEVELvalue: "INFO"- name: MESSAGE_COUNTvalue: "1000"
  1. 执行命令,部署可从my-topic-2读取数据的Kafka Consumer,然后查看Pod运行情况。
$ oc apply -f connector-consumer.yaml -n kafka
$ oc get pod -l app=connector-consumer -n kafka
NAME                                  READY   STATUS    RESTARTS   AGE
connector-consumer-6c487d477d-rcl28   1/1     Running   0          49s
  1. 查看Kafka Consumer的Pod日志,确认当前没有持续从my-topic-2接收到数据。
$ oc logs $(oc get pod -n kafka -l app=connector-consumer -o=jsonpath='{.items[0].metadata.name}') -n kafka -f
[main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:allow.auto.create.topics = trueauto.commit.interval.ms = 5000auto.offset.reset = earliestbootstrap.servers = [my-cluster-kafka-bootstrap:9092]
。。。
。。。
8521 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-my-hello-world-consumer-1, groupId=my-hello-world-consumer] Successfully joined group with generation 69
8524 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-my-hello-world-consumer-1, groupId=my-hello-world-consumer] Adding newly assigned partitions: my-topic-2-0
8535 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-my-hello-world-consumer-1, groupId=my-hello-world-consumer] Found no committed offset for partition my-topic-2-0
8561 [main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-my-hello-world-consumer-1, groupId=my-hello-world-consumer] Resetting offset for partition my-topic-2-0 to offset 0.
  1. 进入OpenShift控制台中的Administrator视图,在kafak项目中通过Workloads -> Pods找到名为my-connect-connect-xxxxxxxxx-xxxxx的Pod,然后在Terminal中执行以下命令,把uber.cvs文件发给Kakfa Connect处理。
% curl -X POST -H "Content-Type: application/json" --data @/tmp/source-plugin.json http://localhost:8083/connectors


12. 最后在(10)步的窗口确认有不断的新日志输出,这些是从my-topic-2接受到的数据,这些数据就用Kafka Connect读到的uber.csv文件内容。

。。。
2020-04-10 12:12:14 INFO  KafkaConsumerExample:46 -     value: 2014-08-01 01:55:00,40.7703,-73.9685,B02682
2020-04-10 12:12:14 INFO  KafkaConsumerExample:43 - Received message:
2020-04-10 12:12:14 INFO  KafkaConsumerExample:44 -     partition: 0
2020-04-10 12:12:14 INFO  KafkaConsumerExample:45 -     offset: 1169
2020-04-10 12:12:14 INFO  KafkaConsumerExample:46 -     value: 2014-08-01 01:55:00,40.7182,-73.9892,B02682
2020-04-10 12:12:14 INFO  KafkaConsumerExample:43 - Received message:
2020-04-10 12:12:14 INFO  KafkaConsumerExample:44 -     partition: 0
2020-04-10 12:12:14 INFO  KafkaConsumerExample:45 -     offset: 1170
2020-04-10 12:12:14 INFO  KafkaConsumerExample:46 -     value: 2014-08-01 01:56:00,40.73,-73.9807,B02598
2020-04-10 12:12:14 INFO  KafkaConsumerExample:43 - Received message:
2020-04-10 12:12:14 INFO  KafkaConsumerExample:44 -     partition: 0
2020-04-10 12:12:14 INFO  KafkaConsumerExample:45 -     offset: 1171
2020-04-10 12:12:14 INFO  KafkaConsumerExample:46 -     value: 2014-08-01 01:57:00,40.7466,-73.9814,B02598
。。。

OpenShift 4 之AMQ Streams(2) - 用Kafka Connect访问数据源相关推荐

  1. OpenShift 4 之AMQ Streams(3) - 用Kafka MirrorMaker在Kafka集群间复制数据

    <OpenShift 4.x HOL教程汇总> 文章目录 什么是MirrorMaker 配置MirrorMaker 确认Kafka环境 创建Source和Target 创建MirrorMa ...

  2. OpenShift 4 之AMQ Streams(1) - 多个Consumer从Partition接收数据

    <OpenShift 4.x HOL教程汇总> 文章目录 AMQ Streams是什么? 安装 AMQ Streams 环境 安装AMQ Streams Operator 创建Kafka集 ...

  3. OpenShift 4 之AMQ Streams(4) - 用Prometheus监控Kafka

    <OpenShift 4.x HOL教程汇总> 文章目录 配置Kafka和测试应用 配置Prometheus和Grafana 配置Kafka和测试应用 首先删除上一个实验使用的Kafka集 ...

  4. 深入理解Kafka Connect:转换器和序列化

    AI前线导读:Kafka Connect是一个简单但功能强大的工具,可用于Kafka和其他系统之间的集成.人们对Kafka Connect最常见的误解之一是它的转换器.这篇文章将告诉我们如何正确地使用 ...

  5. Kafka Connect简介

    一. Kafka Connect简介 Kafka是一个使用越来越广的消息系统,尤其是在大数据开发中(实时数据处理和分析).为何集成其他系统和解耦应用,经常使用Producer来发送消息到Broker, ...

  6. Kafka Connect在MapR上

    在本周的白板演练中,MapR的高级产品营销经理Ankur Desai描述了Apache Kafka Connect和REST API如何简化和提高在处理来自包括旧数据库或数据仓库在内的各种数据源的流数 ...

  7. 1.3 Quick Start中 Step 7: Use Kafka Connect to import/export data官网剖析(博主推荐)

    不多说,直接上干货! 一切来源于官网 http://kafka.apache.org/documentation/ Step 7: Use Kafka Connect to import/export ...

  8. Kafka: Connect

    转自:http://www.cnblogs.com/f1194361820/p/6108025.html Kafka Connect 简介 Kafka Connect 是一个可以在Kafka与其他系统 ...

  9. SQL Server CDC配合Kafka Connect监听数据变化

    写在前面 好久没更新Blog了,从CRUD Boy转型大数据开发,拉宽了不少的知识面,从今年年初开始筹备.组建.招兵买马,到现在稳定开搞中,期间踏过无数的火坑,也许除了这篇还很写上三四篇. 进入主题, ...

最新文章

  1. 在 Kubuntu 21.04 中安装和升级 KDE Plasma 5.2
  2. java字面量和符号引用_java -- JVM的符号引用和直接引用
  3. 删除归档日志_ORA-00257:archiver error.Connect internal only, until freed的问题
  4. 《岸上的波妞》观后感
  5. 获取本机IP地址[JavaScript / Node.js]
  6. 接口测试用python怎么做_请问一下python怎么做接口测试工具?
  7. 机器学习-ID3决策树算法(附matlab/octave代码)
  8. c语言编程串级控制,组态王-串级控制
  9. PIE-engine APP 教程 ——太湖生态环境智能监测系统
  10. 三星刷入鸿蒙OS,全面狙击华为鸿蒙OS系统!谷歌霸气联手三星:发布新版鸿蒙OS系统...
  11. PAT L1-046 整除光棍 光棍当然不是单身汪呢
  12. 浏览器通过原生JS实现录音功能
  13. 去公司面试,记录下的hadoop最新面试题
  14. PMP证书容易考吗?
  15. 权威的国际敏捷认证Certified Scrum Master (CSM)
  16. python比java好学吗-java为什么比python快?
  17. Windows Server 2012计划任务定时执行bat文件
  18. python virtualenv简单使用
  19. 知识扫盲:南京是六朝古都,请问是哪六朝?
  20. Educational Codeforces Round 120 (Rated for Div. 2)

热门文章

  1. 支撑200并发_搞清楚系统到底怎样支撑高并发以及架构图的绘制(面试向)
  2. keil5函数 默认返回值_Python列表有什么内置函数可以使用,怎么使用这些函数
  3. linux权限案例,Linux下的权限体系
  4. ansi mysql_MySQL的ANSI和Unicode驱动程序之间的区别
  5. 设计灵感|色彩叠加!这样的海报层次感更强
  6. 年底圣诞促销,商家必备节日大促psd分层海报模板
  7. 万能广告促销海报,找不到灵感也不怕
  8. python能够处理图像的第三方库_基于图像预处理的几个库
  9. 同等学力计算机综合难吗,计算机在职研究生考试难度大吗?这种方式通过率高...
  10. WinXp系统中没有ChangeWindowMessageFilter这个函数