kafka安装: docker 安装 kafka单节点_docker kafka 单节点_大大蚊子的博客-CSDN博客

mysql安装:   docker 安装mysql 8.x_大大蚊子的博客-CSDN博客

debezium代码国内:https://kgithub.com/debezium/debezium

debezium: https://debezium.io/

debezium-mysql使用:

kafka-connect: https://kafka.apache.org/documentation/#connect

环境要求:

  • Git 2.2.1 or later
  • JDK 17 or later, e.g. OpenJDK
  • Docker Engine or Docker Desktop 1.9 or later
  • Apache Maven 3.8.4 or later

1、docker Engine 安装,并开启2375(仅测试)端口

https://blog.csdn.net/a309450028a/article/details/128951378?spm=1001.2014.3001.5502

配置环境变量 DOCKER_HOME

2、下载源码导入maven项目  本文使用1.9.5.Final 版本代码

3、修改debezium-build-parent\debezium-connector-mysql 两个pom文件

a、修改阿里云maven无法下载对应版本的依赖

b、删除所有 <scope>test</scope>

c、排出冲突依赖版本

d、添加kafka-connect所需依赖

e、删除部分测试构建及docker启动配置(可选)

debezium-build-parent.pom

<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.jboss</groupId><artifactId>jboss-parent</artifactId><version>36</version></parent><groupId>io.debezium</groupId><artifactId>debezium-build-parent</artifactId><version>1.9.5.Final</version><name>Debezium Build Aggregator</name><description>Debezium is an open source change data capture platform</description><packaging>pom</packaging><url>https://debezium.io</url><scm><connection>scm:git:git@github.com:debezium/debezium.git</connection><developerConnection>scm:git:git@github.com:debezium/debezium.git</developerConnection><url>https://github.com/debezium/debezium</url><tag>v1.9.5.Final</tag></scm><issueManagement><system>jira</system><url>http://issues.jboss.org/browse/DBZ</url></issueManagement><licenses><license><name>Apache Software License 2.0</name><url>http://www.apache.org/licenses/LICENSE-2.0</url><distribution>repo</distribution></license></licenses><developers><developer><id>rhauch</id><name>Randall Hauch</name><organization>Red Hat</organization><organizationUrl>http://redhat.com</organizationUrl><timezone>-6</timezone></developer><developer><id>gunnarmorling</id><name>Gunnar Morling</name><organization>Red Hat</organization><organizationUrl>http://redhat.com</organizationUrl><timezone>+2</timezone></developer></developers><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><!-- Can't use the (generally preferable) release option yet, as theCassandra connector still needs to be built with Java 1.8 whichisn't aware of this option --><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><!-- Enforce JDK 11 for building (handled via JBoss parent POM)--><jdk.min.version>11</jdk.min.version><!-- Maven Plugins --><version.compiler.plugin>3.8.1</version.compiler.plugin><version.resources.plugin>3.1.0</version.resources.plugin><version.dependency.plugin>3.1.1</version.dependency.plugin><version.enforcer.plugin>3.0.0-M2</version.enforcer.plugin><version.jar.plugin>3.0.2</version.jar.plugin><version.source.plugin>3.1.0</version.source.plugin><version.assembly.plugin>3.1.1</version.assembly.plugin><version.war.plugin>2.5</version.war.plugin><version.google.formatter.plugin>0.4</version.google.formatter.plugin><version.docker.maven.plugin>0.42.0</version.docker.maven.plugin><version.staging.plugin>1.6.12</version.staging.plugin><version.protoc.maven.plugin>3.8.0</version.protoc.maven.plugin><version.javadoc.plugin>3.3.2</version.javadoc.plugin><version.code.formatter>2.16.0</version.code.formatter><version.surefire.plugin>3.0.0-M9</version.surefire.plugin><version.checkstyle.plugin>3.1.1</version.checkstyle.plugin><version.release.plugin>2.5.3</version.release.plugin><version.impsort>1.8.0</version.impsort><version.failsafe.plugin>${version.surefire.plugin}</version.failsafe.plugin><version.checkstyle>8.32</version.checkstyle><version.revapi.plugin>0.11.5</version.revapi.plugin><version.jandex>1.0.8</version.jandex><version.revapi-java.plugin>0.21.0</version.revapi-java.plugin><version.build-helper.plugin>3.3.0</version.build-helper.plugin><!-- Which Maven Central infra should be used --><release.endpoint>https://s01.oss.sonatype.org/</release.endpoint><!-- Kafka and it's dependencies MUST reflect what the Kafka version uses --><version.kafka>3.2.0</version.kafka><version.zookeeper>3.6.3</version.zookeeper><!-- Kafka uses 2.12.6 but we need to bump due to CVE-2020-36518 before Kafka fixes it --><!-- NOTE: These two versions are maintained separately due to decoupling jackson and databind for downstream --><version.jackson>2.13.2</version.jackson><version.jackson.databind>2.13.2.2</version.jackson.databind><version.org.slf4j>1.7.36</version.org.slf4j><version.netty>4.1.73.Final</version.netty><!-- Scala version used to build Kafka --><version.kafka.scala>2.13</version.kafka.scala><!-- ANTLR --><!-- Align with Antlr runtime version pulled in via Quarkus --><version.antlr>4.8</version.antlr><version.antlr4test.plugin>1.18</version.antlr4test.plugin><!-- Quarkus --><!-- Must be aligned with Apicurio version below --><quarkus.version>2.7.2.Final</quarkus.version><!-- Apicurio --><version.apicurio>2.1.5.Final</version.apicurio><!-- Database drivers, should align with databases --><version.postgresql.driver>42.3.5</version.postgresql.driver><version.mysql.driver>8.0.28</version.mysql.driver><version.mysql.binlog>0.25.6</version.mysql.binlog><version.mongo.driver>4.3.3</version.mongo.driver><version.sqlserver.driver>9.4.1.jre8</version.sqlserver.driver><version.oracle.driver>21.1.0.0</version.oracle.driver><version.db2.driver>11.5.0.0</version.db2.driver><version.cassandra.driver>4.14.0</version.cassandra.driver><!-- Databases, should align with database drivers --><version.mysql.server>5.7</version.mysql.server><version.mongo.server>3.6</version.mongo.server><version.cassandra3>3.11.12</version.cassandra3><version.cassandra4>4.0.2</version.cassandra4><!-- Required in protoc plug-in config, too; can't be in BOM therefore --><version.com.google.protobuf>3.19.2</version.com.google.protobuf></properties><modules><module>debezium-bom</module><module>debezium-parent</module><module>support/checkstyle</module><module>support/ide-configs</module><module>support/revapi</module><module>debezium-api</module><module>debezium-ddl-parser</module><module>debezium-assembly-descriptors</module><module>debezium-core</module><module>debezium-embedded</module><module>debezium-connector-mysql</module><!-- <module>debezium-connector-postgres</module><module>debezium-connector-mongodb</module><module>debezium-connector-sqlserver</module><module>debezium-connector-oracle</module> --><module>debezium-microbenchmark</module><module>debezium-microbenchmark-oracle</module><module>debezium-quarkus-outbox</module><module>debezium-scripting</module><module>debezium-server</module><module>debezium-testing</module><module>debezium-connect-rest-extension</module><module>debezium-schema-generator</module></modules><distributionManagement><repository><id>ossrh</id><name>Sonatype Staging Repository</name><url>${release.endpoint}/service/local/staging/deploy/maven2</url></repository><snapshotRepository><id>ossrh</id><name>OSS Sonatype Nexus Snapshots</name><url>https://oss.sonatype.org/content/repositories/snapshots</url></snapshotRepository></distributionManagement><repositories><repository><id>confluent</id><name>Confluent</name><url>https://packages.confluent.io/maven/</url><snapshots><enabled>true</enabled><updatePolicy>never</updatePolicy></snapshots></repository><repository><id>ossrh</id><name>OSS Sonatype Nexus</name><url>${release.endpoint}/content/groups/public/</url><releases><enabled>true</enabled><updatePolicy>never</updatePolicy></releases><snapshots><enabled>true</enabled><updatePolicy>never</updatePolicy></snapshots></repository></repositories><build><pluginManagement><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-enforcer-plugin</artifactId><version>${version.enforcer.plugin}</version></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>${version.compiler.plugin}</version></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-source-plugin</artifactId><version>${version.source.plugin}</version></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-javadoc-plugin</artifactId><version>${version.javadoc.plugin}</version></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-release-plugin</artifactId><version>${version.release.plugin}</version></plugin><plugin><groupId>org.sonatype.plugins</groupId><artifactId>nexus-staging-maven-plugin</artifactId><version>${version.staging.plugin}</version></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>${version.surefire.plugin}</version></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-checkstyle-plugin</artifactId><version>${version.checkstyle.plugin}</version></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-report-plugin</artifactId><version>${version.surefire.plugin}</version></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-resources-plugin</artifactId><version>${version.resources.plugin}</version></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-dependency-plugin</artifactId><version>${version.dependency.plugin}</version></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-gpg-plugin</artifactId><version>${version.gpg.plugin}</version></plugin><plugin><groupId>org.codehaus.mojo</groupId><artifactId>build-helper-maven-plugin</artifactId><version>${version.build-helper.plugin}</version></plugin></plugins></pluginManagement><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-release-plugin</artifactId><configuration><pushChanges>false</pushChanges><releaseProfiles>docs,assembly,release-sign-artifacts,release</releaseProfiles></configuration></plugin><plugin><groupId>org.sonatype.plugins</groupId><artifactId>nexus-staging-maven-plugin</artifactId><extensions>true</extensions><configuration><serverId>ossrh</serverId><nexusUrl>${release.endpoint}</nexusUrl><autoReleaseAfterClose>false</autoReleaseAfterClose></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>${maven.compiler.source}</source><target>${maven.compiler.target}</target><encoding>${project.build.sourceEncoding}</encoding></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-resources-plugin</artifactId><configuration><encoding>${project.build.sourceEncoding}</encoding></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-enforcer-plugin</artifactId><executions><execution><id>enforce-maven</id><goals><goal>enforce</goal></goals><configuration><rules><requireMavenVersion><version>[3.6.3,3.8.5),[3.8.6,)</version></requireMavenVersion></rules></configuration></execution></executions></plugin></plugins></build><profiles><profile><id>qa</id><activation><property><name>!quick</name></property></activation></profile><profile><id>quick</id><activation><activeByDefault>false</activeByDefault><property><name>quick</name></property></activation><properties><skipTests>true</skipTests></properties></profile><profile><id>assembly</id><properties><skipLongRunningTests>false</skipLongRunningTests></properties></profile><profile><id>release</id><properties><skipLongRunningTests>false</skipLongRunningTests></properties></profile><profile><id>performance</id><properties><skipLongRunningTests>false</skipLongRunningTests></properties></profile><profile><id>docs</id><activation><activeByDefault>false</activeByDefault><property><name>docs</name><value>true</value></property></activation><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-javadoc-plugin</artifactId><version>${version.javadoc.plugin}</version><configuration><show>private</show><nohelp>true</nohelp></configuration><executions><execution><id>attach-javadocs</id><goals><goal>jar</goal></goals></execution><execution><id>attach-test-javadocs</id><goals><goal>test-jar</goal></goals></execution></executions></plugin></plugins></build></profile><profile><id>release-sign-artifacts</id><activation><property><name>performRelease</name><value>true</value></property></activation><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-gpg-plugin</artifactId><executions><execution><id>sign-artifacts</id><phase>verify</phase><goals><goal>sign</goal></goals><configuration><gpgArguments><arg>--pinentry-mode</arg><arg>loopback</arg></gpgArguments></configuration></execution></executions></plugin></plugins></build></profile><profile><id>jdk11</id><activation><jdk>[11</jdk></activation><properties><modules.argline>--illegal-access=permit</modules.argline><maven.compiler.release>8</maven.compiler.release></properties></profile></profiles>
</project>

debezium-connector-mysql.pom

<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><groupId>io.debezium</groupId><artifactId>debezium-parent</artifactId><version>1.9.5.Final</version><relativePath>../debezium-parent/pom.xml</relativePath></parent><modelVersion>4.0.0</modelVersion><artifactId>debezium-connector-mysql</artifactId><name>Debezium Connector for MySQL</name><packaging>jar</packaging><dependencies><dependency><groupId>io.debezium</groupId><artifactId>debezium-core</artifactId></dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-ddl-parser</artifactId></dependency><dependency><groupId>com.zendesk</groupId><artifactId>mysql-binlog-connector-java</artifactId></dependency><dependency><groupId>mil.nga</groupId><artifactId>wkb</artifactId></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>connect-api</artifactId><scope>provided</scope></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><scope>provided</scope></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><exclusions><exclusion><groupId>com.google.protobuf</groupId><artifactId>protobuf-java</artifactId></exclusion></exclusions></dependency><!-- Testing --><dependency><groupId>io.debezium</groupId><artifactId>debezium-core</artifactId><type>test-jar</type></dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-embedded</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>connect-runtime</artifactId></exclusion></exclusions><type>test-jar</type></dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-embedded</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>connect-runtime</artifactId></exclusion></exclusions></dependency><!-- lzwlzwlzw --><dependency><groupId>org.apache.kafka</groupId><artifactId>connect-runtime</artifactId><version>3.2.0</version><exclusions><exclusion><groupId>org.eclipse.jetty</groupId><artifactId>jetty-servlet</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.eclipse.jetty</groupId><artifactId>jetty-servlet</artifactId><version>9.4.44.v20210927</version></dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-connect-rest-extension</artifactId><version>1.9.5.Final</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.12.0</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.76</version></dependency> <dependency><groupId>org.jooq</groupId><artifactId>jooq-codegen</artifactId><version>3.9.6</version></dependency><dependency><groupId>org.jooq</groupId><artifactId>jooq-meta</artifactId><version>3.9.6</version></dependency><dependency><groupId>org.testcontainers</groupId><artifactId>r2dbc</artifactId><version>1.17.6</version><!--$NO-MVN-MAN-VER$--></dependency><dependency><groupId>com.atomikos</groupId><artifactId>transactions</artifactId><version>5.0.9</version></dependency><dependency><groupId>com.atomikos</groupId><artifactId>transactions-jta</artifactId><version>5.0.9</version></dependency><dependency><groupId>com.atomikos</groupId><artifactId>transactions-jdbc</artifactId><version>5.0.9</version></dependency><dependency><groupId>com.atomikos</groupId><artifactId>transactions-api</artifactId><version>5.0.9</version></dependency><dependency><groupId>com.atomikos</groupId><artifactId>atomikos-util</artifactId><version>5.0.9</version></dependency><dependency><groupId>org.codehaus.groovy</groupId><artifactId>com.springsource.org.codehaus.groovy</artifactId><version>1.8.8</version></dependency><!-- lzwlzwlzw --><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId></dependency><dependency><groupId>org.mockito</groupId><artifactId>mockito-core</artifactId></dependency><dependency><groupId>org.easytesting</groupId><artifactId>fest-assert</artifactId></dependency><dependency><groupId>io.confluent</groupId><artifactId>kafka-connect-avro-converter</artifactId></dependency><dependency><groupId>org.awaitility</groupId><artifactId>awaitility</artifactId></dependency><dependency><groupId>org.testcontainers</groupId><artifactId>testcontainers</artifactId></dependency><dependency><groupId>org.testcontainers</groupId><artifactId>mysql</artifactId></dependency><!-- Used for unit testing with Kafka --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_${version.kafka.scala}</artifactId></dependency></dependencies><properties><!-- Specify the properties for the various Docker containers.--><mysql.user>mysqluser</mysql.user><mysql.password>mysqlpw</mysql.password><mysql.replica.user>mysqlreplica</mysql.replica.user><mysql.replica.password>mysqlpw</mysql.replica.password><mysql.port>3306</mysql.port><mysql.percona.port>3306</mysql.percona.port><mysql.gtid.port>3306</mysql.gtid.port><mysql.gtid.replica.port>3306</mysql.gtid.replica.port><mysql.ssl.port>3306</mysql.ssl.port><mysql.replica.port>3306</mysql.replica.port> <!-- by default use primary as 'replica' --><mysql.init.timeout>60000</mysql.init.timeout> <!-- 60 seconds --><!--By default, we should use the docker image maintained by the MySQL team. This property is changed with different profiles.However, we run one container with GTIDs and one without.--><docker.filter>debezium/mysql-server-test-database</docker.filter><docker.skip>false</docker.skip><docker.initimage>rm -f /etc/localtime; ln -s /usr/share/zoneinfo/US/Samoa /etc/localtime</docker.initimage></properties><build><plugins><!-- Unlike surefire, the failsafe plugin ensures 'post-integration-test' phase always runs, evenwhen there are failed integration tests. We rely upon this to always shut down the Docker containerafter the integration tests (defined as '*IT.java') are run.--><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId></plugin><plugin><groupId>io.debezium</groupId><artifactId>debezium-schema-generator</artifactId><version>${project.version}</version><executions><execution><id>generate-connector-metadata</id><goals><goal>generate-api-spec</goal></goals><phase>prepare-package</phase></execution></executions></plugin></plugins><resources><!-- Apply the properties set in the POM to the resource files --><resource><filtering>true</filtering><directory>src/main/resources</directory><includes><include>*</include><include>**/*</include></includes></resource></resources><testResources><testResource><directory>src/test/resources</directory><filtering>false</filtering><includes><include>*</include><include>**/*</include></includes></testResource></testResources></build><!--Define several useful profiles--><profiles><!--  ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~This assembly profile is used during official builds. In addition to compiling, and running the unit and integration tests like the non-assemblyprofiles, this profile creates additional (like the connector plugin archives),starts up all three Docker containers (normal MySQL, MySQL+GTIDs, and alt-MySQL)and runs the integration tests against each of them.To use, specify "-Passembly" on the Maven command line.~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ --><profile><id>assembly</id><activation><activeByDefault>false</activeByDefault></activation><properties><!-- Run multiple images at the same time, but use different ports for all MySQL servers --><docker.filter>debezium/mysql-server-test-database,debezium/mysql-server-gtids-test-database,debezium/mysql-server-gtids-test-database-replica,debezium/percona-server-test-database,debezium/mysql-server-test-database-ssl</docker.filter><mysql.port>4301</mysql.port><mysql.gtid.port>4302</mysql.gtid.port><mysql.gtid.replica.port>4303</mysql.gtid.replica.port><mysql.percona.port>4304</mysql.percona.port><mysql.ssl.port>4305</mysql.ssl.port></properties><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><dependencies><dependency><groupId>io.debezium</groupId><artifactId>debezium-assembly-descriptors</artifactId><version>${project.version}</version></dependency></dependencies><executions><execution><id>default</id><phase>package</phase><goals><goal>single</goal></goals><configuration><finalName>${project.artifactId}-${project.version}</finalName><attach>true</attach>  <!-- we want attach & deploy these to Maven --><descriptorRefs><descriptorRef>${assembly.descriptor}</descriptorRef></descriptorRefs><tarLongFileMode>posix</tarLongFileMode></configuration></execution></executions></plugin><!-- Override the failsafe plugin to run the integration tests for each set of databases. But make sure each database server is used only once ...--><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-failsafe-plugin</artifactId><version>${version.failsafe.plugin}</version><configuration><skipTests>${skipITs}</skipTests><enableAssertions>true</enableAssertions><systemPropertyVariables><!-- Make these available to the tests via system properties --><database.hostname>${docker.host.address}</database.hostname><database.user>${mysql.user}</database.user><database.password>${mysql.password}</database.password><database.replica.hostname>${docker.host.address}</database.replica.hostname><database.replica.user>${mysql.replica.user}</database.replica.user><database.replica.password>${mysql.replica.password}</database.replica.password><database.port>${mysql.port}</database.port><database.replica.port>${mysql.port}</database.replica.port><database.ssl.mode>disabled</database.ssl.mode><skipLongRunningTests>false</skipLongRunningTests></systemPropertyVariables><runOrder>alphabetical</runOrder></configuration><executions><!-- First run the integration tests with the non-GTID server alone --><execution><id>integration-test-mysql</id><goals><goal>integration-test</goal></goals><configuration><systemPropertyVariables><!-- same port for both, since we're only running one server --><database.port>${mysql.port}</database.port><database.replica.port>${mysql.port}</database.replica.port></systemPropertyVariables></configuration></execution><!-- Then run the integration tests with the GTID server + replica server --><execution><id>integration-test-mysql-gtids-with-replica</id><goals><goal>integration-test</goal></goals><configuration><systemPropertyVariables><database.port>${mysql.gtid.port}</database.port><database.replica.port>${mysql.gtid.replica.port}</database.replica.port></systemPropertyVariables></configuration></execution><!-- Then just Percona Server --><execution><id>integration-test-percona-server</id><goals><goal>integration-test</goal></goals><configuration><systemPropertyVariables><!-- same port for both, since we're only running one server --><database.port>${mysql.percona.port}</database.port><database.replica.port>${mysql.percona.port}</database.replica.port></systemPropertyVariables></configuration></execution><!-- SSL --><execution><id>integration-test-ssl</id><goals><goal>integration-test</goal></goals><configuration><systemPropertyVariables><database.ssl.mode>verify_ca</database.ssl.mode><database.ssl.truststore>${project.basedir}/src/test/resources/ssl/truststore</database.ssl.truststore><database.ssl.truststore.password>debezium</database.ssl.truststore.password><database.ssl.keystore>${project.basedir}/src/test/resources/ssl/keystore</database.ssl.keystore><database.ssl.keystore.password>debezium</database.ssl.keystore.password><!-- same port for both, since we're only running one server --><database.port>${mysql.ssl.port}</database.port><database.replica.port>${mysql.ssl.port}</database.replica.port></systemPropertyVariables></configuration></execution><execution><id>verify</id><goals><goal>verify</goal></goals></execution></executions></plugin></plugins></build></profile><!--  ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~Do not perform any Docker-related functionalityTo use, specify "-DskipITs" on the Maven command line.~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ --><profile><id>skip-integration-tests</id><activation><activeByDefault>false</activeByDefault><property><name>skipITs</name></property></activation><properties><docker.skip>true</docker.skip></properties></profile><profile><id>quick</id><activation><activeByDefault>false</activeByDefault><property><name>quick</name></property></activation><properties><skipITs>true</skipITs><docker.skip>true</docker.skip></properties></profile><!--  ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~Use the alternative Docker image for MySQL.To use, specify "-Dmysql-gtids" or -Pmysql-gtids on the Maven command line.~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ --><profile><id>mysql-gtids</id><activation><activeByDefault>false</activeByDefault><property><name>mysql-gtids</name></property></activation><properties><!-- Docker properties --><docker.filter>debezium/mysql-server-gtids-test-database</docker.filter><!-- Integration test properties --><database.port>${mysql.gtid.port}</database.port><database.replica.port>${mysql.gtid.port}</database.replica.port></properties></profile><!--  ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~Use the Docker image for Percona Server.To use, specify "-Dpercona-server" or -Ppercona-server on the Maven command line.~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ --><profile><id>percona-server</id><activation><activeByDefault>false</activeByDefault><property><name>percona-server</name></property></activation><properties><!-- Docker properties --><docker.filter>debezium/percona-server-test-database</docker.filter><!-- Integration test properties --><database.port>${mysql.percona.port}</database.port><database.replica.port>${mysql.percona.port}</database.replica.port><docker.initimage>ln -s /usr/share/zoneinfo/Pacific/Pago_Pago /etc/localtime</docker.initimage></properties></profile><!--  ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~Use the Docker image for a MySQL replica of another MySQL server configured to use GTIDs. To use, specify "-Dmysql-replica" or -Pmysql-replica on the Maven command line.~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ --><profile><id>mysql-replica</id><activation><activeByDefault>false</activeByDefault><property><name>mysql-replica</name></property></activation><properties><!-- Docker properties --><mysql.gtid.port>3306</mysql.gtid.port><mysql.gtid.replica.port>4306</mysql.gtid.replica.port><docker.filter>debezium/mysql-server-gtids-test-database,debezium/mysql-server-gtids-test-database-replica</docker.filter><!-- Integration test properties --><database.port>${mysql.gtid.port}</database.port><database.replica.port>${mysql.gtid.replica.port}</database.replica.port></properties></profile><!--  ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~Use the alternative Docker image for MySQL.To use, specify "-Dmysql-ssl" or -Pmysql-ssl on the Maven command line.~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ --><profile><id>mysql-ssl</id><activation><activeByDefault>false</activeByDefault><property><name>mysql-ssl</name></property></activation><properties><!-- Docker properties --><docker.filter>debezium/mysql-server-test-database-ssl</docker.filter><!-- Integration test properties --><database.port>${mysql.ssl.port}</database.port><database.replica.port>${mysql.ssl.port}</database.replica.port><docker.initimage>ln -s /usr/share/zoneinfo/Pacific/Pago_Pago /etc/localtime</docker.initimage></properties></profile></profiles>
</project>

4、编译

mvn clean install -Dmaven.test.skip=true

5、准备connect-distributed.properties 启动配置文件

关键参数:

bootstrap.servers   kafka地址,注意带端口

group.id    节点ID 纯数字唯一即可

offset.storage.topic  作业点位topic名称,按需设置

config.storage.topic  作业配置topic名称,按需设置

status.storage.topic   作业状态topic名称,按需设置

plugin.path   插件路径,即debezium-connector-mysql打包路径

connect-distributed.properties 以下是我的配置样例

##
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
### This file contains some of the configurations for the Kafka Connect distributed worker. This file is intended
# to be used with the examples, and some settings may differ from those used in a production system, especially
# the `bootstrap.servers` and those specifying replication factors.# A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
bootstrap.servers=192.168.1.102:9092# unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs
group.id=1# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true# Topic to use for storing offsets. This topic should have many partitions and be replicated and compacted.
# Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
offset.storage.topic=my-connect-offsets
offset.storage.replication.factor=1
#offset.storage.partitions=25# Topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated,
# and compacted topic. Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
config.storage.topic=my-connect-configs
config.storage.replication.factor=1# Topic to use for storing statuses. This topic can have multiple partitions and should be replicated and compacted.
# Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
status.storage.topic=connect-status
status.storage.replication.factor=1
#status.storage.partitions=5# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=60000# List of comma-separated URIs the REST API will listen on. The supported protocols are HTTP and HTTPS.
# Specify hostname as 0.0.0.0 to bind to all interfaces.
# Leave hostname empty to bind to default interface.
# Examples of legal listener lists: HTTP://myhost:8083,HTTPS://myhost:8084"
#listeners=HTTP://:8083# The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
# If not set, it uses the value for "listeners" if configured.
rest.advertised.host.name=172.17.0.2
rest.advertised.port=8083
#rest.advertised.listener=# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=D:\\Users\\zheng\\git\\debezium\\debezium-connector-mysql\\target\\
rest.host.name=172.17.0.2
rest.port=8083
offset.flush.timeout.ms=5000
task.shutdown.graceful.timeout.ms=10000
connector.client.config.override.policy=All

6、启动

设置main方法

org.apache.kafka.connect.cli.ConnectDistributed

启动参数为配置文件路径

D:\\connect-distributed.properties

测试工具:

postman

接口一: POST 创建作业   查询serverid    SELECT@@server_id

http://192.168.1.5:8083/connectors{"name": "mysql_test_01", "config": {"connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "192.168.1.106", "database.port": "3306", "database.user": "root", "database.password": "123456", "database.server.id": "1", "database.server.name": "mysql_test", "database.include.list": "dbz_mysql", "database.history.kafka.bootstrap.servers": "192.168.1.102:9092", "database.history.kafka.topic": "dbhistory.fullfillment", "include.schema.changes": "true" }
}

接口二: get 获取作业列表   http://192.168.1.5:8083/connectors/

mysql 操作数据

kafka 消费对应数据

/opt/bitnami/kafka/bin/kafka-console-consumer.sh --property print.key=true --property print.timestamp=ture --bootstrap-server 192.168.1.102:9092 --topic mysql_test.dbz_mysql.aa  --from-beginning{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":true,"name":"mysql_test.dbz_mysql.aa.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":true,"name":"mysql_test.dbz_mysql.aa.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"mysql_test.dbz_mysql.aa.Envelope"},"payload":{"before":{"id":2},"after":{"id":3},"source":{"version":"1.9.5.Final","connector":"mysql","name":"mysql_test","ts_ms":1678548518000,"snapshot":"false","db":"dbz_mysql","sequence":null,"table":"aa","server_id":1,"gtid":null,"file":"binlog.000006","pos":675,"row":0,"thread":11,"query":null},"op":"u","ts_ms":1678548518053,"transaction":null}}

debezium-mysql 源码集成kafka-connector启动相关推荐

  1. mysql源码分析——InnoDB引擎启动分析

    一.InnoDB启动 在MySql中,InnoDB的启动流程其实是很重要的.一些更细节的问题,就藏在了这其中.在前面分析过整个数据库启动的流程,本篇就具体分析一下InnoDB引擎启动所做的各种动作.在 ...

  2. Debezium MySQL源连接器

    Debezium MySQL源连接器 该Debezium的MySQL连接是源连接器,可以得到现有数据的快照,记录所有的MySQL服务器/群集在数据库中的行级变化.第一次连接到MySQL服务器时,它将读 ...

  3. MySQL 源码 需要 什么基础_MySQL 基础之 源码 部署

    源码部署 1. 需要先卸载一些软件 centos7 中需要先卸载 mariadb-libs 软件包 # rpm -e --nodeps mariadb-libs 2. 安装依赖包 yum -y ins ...

  4. gcc编译器和mysql源码哪个难_源码编译mysql 5.5

    http://blog.csdn.net/aidenliu/article/details/6586610 源码编译mysql 5.5+ 安装过程全记录 分类: Mysql 2011-07-05 21 ...

  5. mysql windows编译_Windows平台下编译Mysql源码 | 学步园

    最近由于项目的关系,需要使用到Mysql数据库,而我的工作任务与数据库有很大的关系,所以,决定好好学学Mysql,同时,也把Mysql的源码下载了,希望能有利于对它的学习.这里记录一下windows平 ...

  6. Centos环境下mysql源码编译安装

    yum -y install gcc gcc-c++ ncurses-devel perl ##从mysql5.5起,mysql源码安装开始使用cmake了,设置源码编译配置脚本. wget --no ...

  7. ubuntu 环境下调试mysql源码_【转】Ubuntu 16.04下 Mysql 5.7.17源码编译与安装

    Ubuntu 16.04下 Mysql5.7.17源码编译与安装 系统环境 一. 系统安装条件 1.cmake MySQL使用cmake跨平台工具预编译源码,用于设置mysql的编译参数. sudo ...

  8. mysql源码安装配置_MySQL源码安装及配置

    目录 --cmake下载安装 http://cmake.org/download/ # wgethttp://cmake.org/files/v3.3/cmake-3.3.2.tar.gz # tar ...

  9. MySQL源码包编译安装

    +++++++++++++++++++++++++++++++++++++++++++ 标题:MySQL数据库实例部署 时间:2019年5月2日 内容:MySQL源码包进行编译,然后部署MySQL单实 ...

  10. 怎么查看MySQL 源码编译了什么_Mysql 源码编译教程贴

    题外话:这是一篇教程贴,不仅学的是mysql的编译,还是一些编译的知识.我也是一个菜鸟,写一些感悟和心得,有什么问题可以批评指正,谢谢! 如果只是为了安装请移到我的另一篇安装贴: Mysql安装贴 环 ...

最新文章

  1. 2022-2028年中国乳制品行业市场需求预测与投资战略规划分析报告
  2. 互斥量、读写锁长占时分析的利器——valgrind的DRD
  3. SAP 关于EWM的WT增强简介
  4. POJ 1160 Post Office
  5. 关于Angular中使用HTML的select和option标签的一些问题
  6. 什么是通用字符名称?_通用名称
  7. freeredius3.0 mysql_EDIUS视频采集卡 STROM 3G HD/HD SDI
  8. windows 快捷键整理
  9. java之跳转_java学习之五种跳转关于jsp的
  10. Java中提取字符串中的数字
  11. tomcat端口被占用的解决方式
  12. python中如何显示特殊的单位符号
  13. CPU使用率100%怎么办
  14. 计算机系统是日语,如何在计算机系统中添加日语输入法
  15. 国产工业机器人目前发展到了什么水平?
  16. 位运算符最详细的解释(java)
  17. 有关获取并保存微信头像到本地服务器
  18. 通过大白菜u盘启动工具备份/还原/重装/激活系统/修复引导 实操教程(上)
  19. 电蚊拍一天用30-50次?
  20. matlab合并两个矩阵。

热门文章

  1. 人工智能 : 第三篇”脑机接口“
  2. GridView宫格视图实践
  3. Java我的世界改字体颜色,我的世界:告示牌上面的字体其实可以染色?用了这么久竟然不知道...
  4. 原创|为什么我不建议你跳槽!
  5. E盘显示拒绝访问资料怎么恢复
  6. 【模拟面试】23届本科生拿下字节/京东/网易研发offer,到底有多强?
  7. 新年扯皮以及一些比较正经的东西
  8. 苹果cms百度php推送示例,苹果cms百度主动URL推送教程
  9. 暗黑三php是什么东西,暗黑3卡奈魔盒怎么得有什么用_暗黑3卡奈魔盒详细介绍
  10. pandas性能百倍提升之用字典索引或ndarray替换DataFrame索引以及内存占用分析