《OpenShift 4.x HOL教程汇总》

文章目录

  • 场景说明
  • 部署环境
    • 安装MySQL环境
    • 安装Kafka Operator
    • 创建Kafka集群
    • 创建Kafka Connect S2I
    • 安装Debezium Connector MySQL Plugin
    • 将Debezium Connector配置到example-mysql目标数据库上
    • 测试
  • 参考

场景说明

部署环境

执行命令创建项目

$ oc new-project debezium-cdc

安装MySQL环境

  1. 在第一个Terminal中执行命令部署MySQL。
$ oc new-app docker.io/debezium/example-mysql:0.5 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw
  1. 进入运行MySQL的Pod。
$ MYSQL_POD=$(oc get pod -l deployment=example-mysql-1 -o jsonpath={.items[0].metadata.name})
$ oc rsh $MYSQL_POD
  1. 用mysqluser/mysqlpw登录MySQL,然后查看customers表中的数据。
$ mysql -u mysqluser -p inventory
Enter password:
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
...mysql> select * from customers;
+------+------------+-----------+-----------------------+
| id   | first_name | last_name | email                 |
+------+------------+-----------+-----------------------+
| 1001 | Sally      | Thomas    | sally.thomas@acme.com |
| 1002 | George     | Bailey    | gbailey@foobar.com    |
| 1003 | erfinf     | Walker    | ed@walker.com         |
| 1004 | erfin      | Kretchmar | annek@noanswer.org    |
+------+------------+-----------+-----------------------+

安装Kafka Operator

在OpenShift的控制台Administrator视图的OperatorHub中找到名为Strimzi或AMQ Streams的Operator(Strimzi是社区版Kafka,AMQ Streams是RedHat版本。注意:不过发现1.5.2版本的AMQ Streams Operator在创建Kafka Connect S2I时报错),然后安装到debezium-cdc项目中。

创建Kafka集群

  1. 安装完Operator后,进入Developer视图的Topology。右击界面,在Add to Project菜单中选择From Catalog。
  2. 在Developer Catalog页面选中Kafka,然后在右滑窗口中点击Create。
  3. 在Create Kafka页面中无需修改内容,点击Create。在完成后Topology界面如下:

创建Kafka Connect S2I

  1. 右击界面,在Add to Project菜单中选择From Catalog,然后在Developer Catalog页面中选择Kafka Connect S2I,随后在右滑的页面选择Create,最后在Create KafkaConnectS2I页面点击Create。

  2. 在Terminal2执行命令,创建Route。

$ oc expose svc/my-connect-cluster-connect-api
route.route.openshift.io/my-connect-cluster-connect-api exposed
$ CONNECT_API=$(oc get route my-connect-cluster-connect-api -o jsonpath='{ .spec.host }')

安装Debezium Connector MySQL Plugin

  1. 在Terminal2中创建目录。
$ mkdir $HOME/plugins
$ cd $HOME/plugins
  1. 下载针对MySQL的Debezium Connector。
$ wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.1.2.Final/debezium-connector-mysql-1.1.2.Final-plugin.tar.gz
$ tar -xzf debezium-connector-mysql-1.1.2.Final-plugin.tar.gz
  1. 执行命令安装plugin。
$ oc get buildconfigs
NAME                         TYPE     FROM     LATEST
my-connect-cluster-connect   Source   Binary   0$ oc start-build my-connect-cluster-connect --from-dir $HOME/plugins/
Uploading directory "/home/xiaoyliu-redhat.com/plugins" as binary input for the build ...
.
Uploading finished
build.build.openshift.io/my-connect-cluster-connect-1 started

将Debezium Connector配置到example-mysql目标数据库上

  1. 在Terminal2中创建内容如下的myconnect.json文件,其中包括example-mysql的配置。
{"name":"inventory-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","tasks.max":"1","database.hostname":"example-mysql","database.port":"3306","database.user":"debezium","database.password":"dbz","database.server.id":"184054","database.server.name":"dbserver1","database.whitelist":"inventory","database.history.kafka.bootstrap.servers":"my-cluster-kafka-brokers:9092","database.history.kafka.topic":"dbhistory.inventory"}
}
  1. 向Connect API提交example-mysql的配置,成功后会返回“HTTP/1.1 201 Created”。
$ curl -i -X POST -k -H "Accept:application/json" -H "Content-Type:application/json" -d @myconnect.json http://${CONNECT_API}/connectors
HTTP/1.1 201 Created
date: Sat, 18 Jul 2020 16:24:42 GMT
location: http://my-connect-cluster-connect-api-erfin-debezium.apps.cluster-beijing-959a.beijing-959a.example.opentlc.com/connectors/inventory-connector44
content-type: application/json
content-length: 518
server: Jetty(9.4.24.v20191120)
set-cookie: 0aa0a42e8fae37202ee998a0dda19849=06d80b8ed8fef97d71edebc861644699; path=/; HttpOnly{"name":"inventory-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","tasks.max":"1","database.hostname":"example-mysql","database.port":"3306","database.user":"debezium","database.password":"dbz","database.server.id":"184054","database.server.name":"dbserver1","database.whitelist":"inventory","database.history.kafka.bootstrap.servers":"my-cluster-kafka-brokers:9092","database.history.kafka.topic":"dbhistory.inventory","name":"inventory-connector"},"tasks":[],"type":"source"}

测试

  1. 在Terminal2中部署应用,其可从Kafka接收捕获到的MySQL变化数据。
$ oc new-app quay.io/efeluzy/quarkus-kafka-consumer:latest -e mp.messaging.incoming.mytopic-subscriber.topic=dbserver1.inventory.customers
  1. 在Terminal2中执行命令访问测试应用,此时测试窗口处于“待命”状态。
$ curl http://$(oc get route quarkus-kafka-consumer -o jsonpath='{ .spec.host }')/stream
  1. 在Terminal1中的MySQL中执行updata更新customers表中主键为“1003”的数据。
mysql> update customers set first_name='erfin' where id = 1003;
Query OK, 1 row affected (0.01 sec)
Rows matched: 1  Changed: 1  Warnings: 0
  1. 在Terminal2中确认测试应用可以从Kafka接收到Debezium从MySQL捕获到的变化数据(注意:输出中内容为“payload”部分的“before”和“after”,以及从Debezium的“dbserver1.inventory.customers.Envelope”获取的数据)。
data: Kafka Offset=4; message={"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope"},"payload":{"before":{"id":1003,"first_name":"Edward","last_name":"Walker","email":"ed@walker.com"},"after":{"id":1003,"first_name":"erfin","last_name":"Walker","email":"ed@walker.com"},"source":{"version":"1.1.2.Final","connector":"mysql","name":"dbserver1","ts_ms":1595073286000,"snapshot":"false","db":"inventory","table":"customers","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":364,"row":0,"thread":9,"query":null},"op":"u","ts_ms":1595073286806,"transaction":null}}

参考

Installing Debezium on OpenShift
Tutorial: Streaming CDC using Debezium on Openshift 4
Change data capture with Debezium: A simple how-to, Part 1

OpenShift 4 - 用Debezium+Kafka实现MySQL数据库的CDC相关推荐

  1. kafka连接mysql数据库,Kafka-connect-jdbc-source连接mysql数据库实战

    Kafka-connect-jdbc-source连接mysql数据库实战 1.创建mysql数据库 为避免使用kafka连接数据库时意外修改或删除数据,建议单独创建一个只读用户(kafka),仅用于 ...

  2. OpenShift 4 - 使用 Debezium 捕获变化数据,实现MySQL到PostgreSQL数据库同步(附视频)

    <OpenShift / RHEL / DevSecOps 汇总目录> 说明:本文已经在OpenShift 4.10环境中验证 文章目录 场景说明 部署环境 安装CDC源和目标数据库 安装 ...

  3. OpenShift 4 - 使用Debezium实现MySQL的CDC变化数据捕获

    <OpenShift 4.x HOL教程汇总> 说明:本文已经在OpenShift 4.6环境中验证 文章目录 部署应用和MySQL数据库 部署MySQL 部署应用 部署AMQ Strea ...

  4. Debezium系列之:使用Debezium接入SQL Server数据库数据到Kafka集群的详细技术文档

    Debezium系列之:使用Debezium接入SQL Server数据库数据到Kafka集群的详细技术文档 一.Debezium概述 二.SQL Server 连接器的工作原理 1.Snapshot ...

  5. kafka拉取mysql数据库_kafka里信息用flink获取后放入mysql

    1. 安装zookeeper, kafka 2. 启动zookeeper, kafka server 3. 准备工作 在Mysql数据库创建一个table, t_student 加入maven需要的f ...

  6. spring boot+kafka+canal实现监听MySQL数据库

    spring boot+kafka+canal实现监听MySQL数据库 一.zookeeper安装 kafka依赖于zookeeper,安装kafka前先安装zookeeper 下载地址:Apache ...

  7. SparkStreaming读取Kafka数据源并写入Mysql数据库

    SparkStreaming读取Kafka数据源并写入Mysql数据库 一.实验环境 本实验所用到的工具有 kafka_2.11-0.11.0.2: zookeeper-3.4.5: spark-2. ...

  8. kafka mysql 迁移_一种Kafka与Elasticsearch数据库数据的互相迁移方法与流程

    本发明属于数据库迁移领域,具体地讲涉及一种kafka与elasticsearch数据库数据的互相迁移方法. 背景技术: 实现数据共享,可以使更多的人更充分地使用已有数据资源,减少资料收集.数据采集等重 ...

  9. bireme数据源同步工具--debezium+kafka+bireme

    1.介绍 Bireme 是一个 Greenplum / HashData 数据仓库的增量同步工具.目前支持 MySQL.PostgreSQL 和 MongoDB 数据源 官方介绍文档:https:// ...

最新文章

  1. cisco 2950 3550 端口速率限制实现方法
  2. 解决jquey中当事件嵌套时,内层事件会执行多次的问题
  3. 《你的误区》《少有人走过的路》下载doc格式
  4. 根据class名 赋值_匿名内部类 类名规则
  5. 第35届MPD软件工作坊深圳站圆满落幕
  6. MOCTF-Web-死亡退出
  7. 皖西学院计算机协会组织部,皖西学院
  8. Python 远程部署利器 Fabric2 模块
  9. django 模型 使用 DateTimeFields 字段 auto_now_add 属性 实现 插入数据时 自动记录时间...
  10. Sketch中文版教程,已加星标的更新如何使用?什么是Sketch星标功能?
  11. springboot @value 默认值_原创 | 搞定默认值
  12. c语言编译器手机版显示错误,C语言编译器的错误信息
  13. 单片机、ARM与DSP对比
  14. HOG特征,LBP特征,Haar特征(图像特征提取)
  15. 支付宝小程序的开通流程
  16. JS 今天/明天的日期
  17. 从头开始学习->JVM(九):垃圾收集(上)
  18. Linux – cp: omitting directory 复制文件失败
  19. 企业网站推广步骤有哪些?
  20. RK3288刷机教程:安装Ubuntu 16.04

热门文章

  1. java单击按钮切换图片_JAVA点击按钮改变背景图片 跪求代码·
  2. php this db get,php – Codeigniter $this- db- get(),如何返回特定行的值?
  3. java 的 provider_Java里的Provider是什么?
  4. python生成yaml_使用python脚本自动生成K8S-YAML的方法示例
  5. opengl显示英文_OpenGL-Using Shaders(使用Shader)
  6. idea切换视图快捷键_IDEA操作技巧:一些常用且实用的快捷键
  7. 立体剪纸风新春农历春节PSD分层海报素材
  8. 电商夏季促销海报设计PSD模板,分解教你如何设计
  9. 时尚精美电商专题首页设计PSD分层模板资源
  10. ansys 内聚力模型_《ANSYS Workbench有限元分析实例详解(静力学)》,9787115446312