canal mq数据同步

官网:https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart

*******************

简 介

tcp mode:客户端直连canal server

mq mode:canal server将binlog数据发送到mq,1.1.5支持kafka、rocketmq、rabbitmq

*******************

canal server mq配置

canal.properties

##################################################
#########         MQ Properties      #############
##################################################
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =
canal.aliyun.uid=canal.mq.flatMessage = true
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = localcanal.mq.database.hash = true
canal.mq.send.thread.size = 30
canal.mq.build.thread.size = 8##################################################
#########            Kafka           #############
##################################################
kafka.bootstrap.servers = 127.0.0.1:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0kafka.kerberos.enable = false
kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"##################################################
#########           RocketMQ         #############
##################################################
rocketmq.producer.group = test
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = 127.0.0.1:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag = ##################################################
#########           RabbitMQ         #############
##################################################
rabbitmq.host =
rabbitmq.virtual.host =
rabbitmq.exchange =
rabbitmq.username =
rabbitmq.password =
rabbitmq.deliveryMode =

example/instance.properties

# 静态topic:消息发送的分区为example
canal.mq.topic=example# 动态topic:根据数据库、表动态设置发送的topic
canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*# 静态分区:数据发送的分区
canal.mq.partition=0# 动态分区:根据数据库、表设置返送的分区
canal.mq.partitionsNum=3
canal.mq.partitionHash=test.table:id^name,.*\\..*# 为不同的topic动态设置分区数
canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6  #test.*:test开头的topic分区数为4#mycanal:6 topic mycanal分区数为6# 动态topic:canal.mq.dynamicTopic
test:test库中的所有数据都发送到test topic上
.*:数据库的数据都发到对应数据库名的topic上mytest1.user:发送到mytest1_user topic上
mytest2\\..*:发送到mytest2_tableName topic上topicName:.*:所有数据库的数据都发到topicName上
topicName:test\\..*:test下的所有表都发送到topicName上test,test1\\.*:数据库test1中的表发送到test1_tableName topic上数据库test中的所有表发送到test topic上其余所有数据发送到canal.mq.topic指定的topic上# 动态分区:
.*\\..*:id:hash字段为id
.*\\..*:id^name:hash字段为id、name
.*\\..*:$pk$:hash字段为主键(自动查找)
.*\\..*:根据tableName hash
partitionHash为空:发送到默认分区 0
test.test,test.test2:id:test.test根据表名test hashtest.test2根据id hash其余发送到对应topic的0分区

CanalKafkaProducer:kafka消息发送类

@SuppressWarnings({ "rawtypes", "unchecked" })
@SPI("kafka")
public class CanalKafkaProducer extends AbstractMQProducer implements CanalMQProducer {private static final Logger      logger              = LoggerFactory.getLogger(CanalKafkaProducer.class);private static final String      PREFIX_KAFKA_CONFIG = "kafka.";private Producer<String, byte[]> producer;@Overridepublic void init(Properties properties) {KafkaProducerConfig kafkaProducerConfig = new KafkaProducerConfig();this.mqProperties = kafkaProducerConfig;super.init(properties);// load propertiesthis.loadKafkaProperties(properties);Properties kafkaProperties = new Properties();kafkaProperties.putAll(kafkaProducerConfig.getKafkaProperties());kafkaProperties.put("max.in.flight.requests.per.connection", 1);kafkaProperties.put("key.serializer", StringSerializer.class);if (kafkaProducerConfig.isKerberosEnabled()) {File krb5File = new File(kafkaProducerConfig.getKrb5File());File jaasFile = new File(kafkaProducerConfig.getJaasFile());if (krb5File.exists() && jaasFile.exists()) {// 配置kerberos认证,需要使用绝对路径System.setProperty("java.security.krb5.conf", krb5File.getAbsolutePath());System.setProperty("java.security.auth.login.config", jaasFile.getAbsolutePath());System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");kafkaProperties.put("security.protocol", "SASL_PLAINTEXT");kafkaProperties.put("sasl.kerberos.service.name", "kafka");} else {String errorMsg = "ERROR # The kafka kerberos configuration file does not exist! please check it";logger.error(errorMsg);throw new RuntimeException(errorMsg);}}kafkaProperties.put("value.serializer", KafkaMessageSerializer.class);producer = new KafkaProducer<>(kafkaProperties);}private void loadKafkaProperties(Properties properties) {//加载相关配置KafkaProducerConfig kafkaProducerConfig = (KafkaProducerConfig) this.mqProperties;Map<String, Object> kafkaProperties = kafkaProducerConfig.getKafkaProperties();// 兼容下<=1.1.4的mq配置doMoreCompatibleConvert("canal.mq.servers", "kafka.bootstrap.servers", properties);doMoreCompatibleConvert("canal.mq.acks", "kafka.acks", properties);doMoreCompatibleConvert("canal.mq.compressionType", "kafka.compression.type", properties);doMoreCompatibleConvert("canal.mq.retries", "kafka.retries", properties);doMoreCompatibleConvert("canal.mq.batchSize", "kafka.batch.size", properties);doMoreCompatibleConvert("canal.mq.lingerMs", "kafka.linger.ms", properties);doMoreCompatibleConvert("canal.mq.maxRequestSize", "kafka.max.request.size", properties);doMoreCompatibleConvert("canal.mq.bufferMemory", "kafka.buffer.memory", properties);doMoreCompatibleConvert("canal.mq.kafka.kerberos.enable", "kafka.kerberos.enable", properties);doMoreCompatibleConvert("canal.mq.kafka.kerberos.krb5.file", "kafka.kerberos.krb5.file", properties);doMoreCompatibleConvert("canal.mq.kafka.kerberos.jaas.file", "kafka.kerberos.jaas.file", properties);

RocketMQConstants:rocketmq producer属性配置

public class RocketMQConstants {public static final String ROOT                                  = "rocketmq";public static final String ROCKETMQ_PRODUCER_GROUP               = ROOT + "." + "producer.group";public static final String ROCKETMQ_ENABLE_MESSAGE_TRACE         = ROOT + "." + "enable.message.trace";public static final String ROCKETMQ_CUSTOMIZED_TRACE_TOPIC       = ROOT + "." + "customized.trace.topic";public static final String ROCKETMQ_NAMESPACE                    = ROOT + "." + "namespace";public static final String ROCKETMQ_NAMESRV_ADDR                 = ROOT + "." + "namesrv.addr";public static final String ROCKETMQ_RETRY_TIMES_WHEN_SEND_FAILED = ROOT + "." + "retry.times.when.send.failed";public static final String ROCKETMQ_VIP_CHANNEL_ENABLED          = ROOT + "." + "vip.channel.enabled";public static final String ROCKETMQ_TAG                          = ROOT + "." + "tag";public static final String ROCKETMQ_ACCESS_CHANNEL               = ROOT + "." + "access.channel";public static final String ROCKETMQ_BATCH_SIZE                   = ROOT + "." + "batch.size";public static final String ROCKETMQ_SUBSCRIBE_FILTER             = ROOT + "." + "subscribe.filter";}

RabbitMQConstants:rabbitmq producer属性配置

public class RabbitMQConstants {public static final String ROOT                      = "rabbitmq";public static final String RABBITMQ_HOST             = ROOT + "." + "host";public static final String RABBITMQ_EXCHANGE         = ROOT + "." + "exchange";public static final String RABBITMQ_VIRTUAL_HOST     = ROOT + "." + "virtual.host";public static final String RABBITMQ_USERNAME         = ROOT + "." + "username";public static final String RABBITMQ_PASSWORD         = ROOT + "." + "password";public static final String RABBITMQ_RESOURCE_OWNERID = ROOT + "." + "rabbitmq.resource.ownerId";
}

*******************

canal adapter mq属性配置

application.yml:启动器配置

server:port: 8081
spring:jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8default-property-inclusion: non_nullcanal.conf:mode: tcp #tcp kafka rocketMQ rabbitMQflatMessage: truezookeeperHosts:syncBatchSize: 1000retries: 0timeout:accessKey:secretKey:consumerProperties:# canal tcp consumercanal.tcp.server.host: 127.0.0.1:11111canal.tcp.zookeeper.hosts:canal.tcp.batch.size: 500canal.tcp.username:canal.tcp.password:# kafka consumerkafka.bootstrap.servers: 127.0.0.1:9092kafka.enable.auto.commit: falsekafka.auto.commit.interval.ms: 1000kafka.auto.offset.reset: latestkafka.request.timeout.ms: 40000kafka.session.timeout.ms: 30000kafka.isolation.level: read_committedkafka.max.poll.records: 1000# rocketMQ consumerrocketmq.namespace:rocketmq.namesrv.addr: 127.0.0.1:9876rocketmq.batch.size: 1000rocketmq.enable.message.trace: falserocketmq.customized.trace.topic:rocketmq.access.channel:rocketmq.subscribe.filter:# rabbitMQ consumerrabbitmq.host:rabbitmq.virtual.host:rabbitmq.username:rabbitmq.password:rabbitmq.resource.ownerId:#  srcDataSources:
#    defaultDS:
#      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
#      username: root
#      password: 121212canalAdapters:- instance: example # canal instance Name or mq topic namegroups:- groupId: g1outerAdapters:- name: logger

CanalKafkaConsumer:kafka. 开头的属性均可读取

@SPI("kafka")
public class CanalKafkaConsumer implements CanalMsgConsumer {private static final String      PREFIX_KAFKA_CONFIG = "kafka.";private KafkaConsumer<String, ?> kafkaConsumer;private boolean                  flatMessage         = true;private String                   topic;private Map<Integer, Long>       currentOffsets      = new ConcurrentHashMap<>();private Properties               kafkaProperties     = new Properties();@Overridepublic void init(Properties properties, String topic, String groupId) {this.topic = topic;Boolean flatMessage = (Boolean) properties.get(CanalConstants.CANAL_MQ_FLAT_MESSAGE);if (flatMessage != null) {this.flatMessage = flatMessage;}for (Map.Entry<Object, Object> entry : properties.entrySet()) {String k = (String) entry.getKey();Object v = entry.getValue();if (k.startsWith(PREFIX_KAFKA_CONFIG) && v != null) {kafkaProperties.put(k.substring(PREFIX_KAFKA_CONFIG.length()), v);}   //读取所有 kafka. 开头的属性}kafkaProperties.put("group.id", groupId);kafkaProperties.put("key.deserializer", StringDeserializer.class);kafkaProperties.put("client.id", UUID.randomUUID().toString().substring(0, 6));}@Overridepublic void connect() {if (this.flatMessage) {kafkaProperties.put("value.deserializer", StringDeserializer.class);this.kafkaConsumer = new KafkaConsumer<String, String>(kafkaProperties);} else {kafkaProperties.put("value.deserializer", KafkaMessageDeserializer.class);this.kafkaConsumer = new KafkaConsumer<String, Message>(kafkaProperties);}kafkaConsumer.subscribe(Collections.singletonList(topic));}

CanalRocketMQConsumer:rocketmq 消费相关属性

@SPI("rocketmq")
public class CanalRocketMQConsumer implements CanalMsgConsumer {private static final Logger                                logger               = LoggerFactory.getLogger(CanalRocketMQConsumer.class);private static final String                                CLOUD_ACCESS_CHANNEL = "cloud";private String                                             nameServer;private String                                             topic;private String                                             groupName;private DefaultMQPushConsumer                              rocketMQConsumer;private BlockingQueue<ConsumerBatchMessage<CommonMessage>> messageBlockingQueue;private int                                                batchSize            = -1;private long                                               batchProcessTimeout  = 60 * 1000;private boolean                                            flatMessage;private volatile ConsumerBatchMessage<CommonMessage>       lastGetBatchMessage  = null;private String                                             accessKey;private String                                             secretKey;private String                                             customizedTraceTopic;private boolean                                            enableMessageTrace   = false;private String                                             accessChannel;private String                                             namespace;private String                                             filter               = "*";@Overridepublic void init(Properties properties, String topic, String groupName) {this.topic = topic;this.groupName = groupName;this.flatMessage = (Boolean) properties.get(CanalConstants.CANAL_MQ_FLAT_MESSAGE);this.messageBlockingQueue = new LinkedBlockingQueue<>(1024);this.accessKey = properties.getProperty(CanalConstants.CANAL_ALIYUN_ACCESS_KEY);this.secretKey = properties.getProperty(CanalConstants.CANAL_ALIYUN_SECRET_KEY);String enableMessageTrace = properties.getProperty(RocketMQConstants.ROCKETMQ_ENABLE_MESSAGE_TRACE);if (StringUtils.isNotEmpty(enableMessageTrace)) {this.enableMessageTrace = Boolean.parseBoolean(enableMessageTrace);}this.customizedTraceTopic = properties.getProperty(RocketMQConstants.ROCKETMQ_CUSTOMIZED_TRACE_TOPIC);this.accessChannel = properties.getProperty(RocketMQConstants.ROCKETMQ_ACCESS_CHANNEL);this.namespace = properties.getProperty(RocketMQConstants.ROCKETMQ_NAMESPACE);this.nameServer = properties.getProperty(RocketMQConstants.ROCKETMQ_NAMESRV_ADDR);String batchSize = properties.getProperty(RocketMQConstants.ROCKETMQ_BATCH_SIZE);if (StringUtils.isNotEmpty(batchSize)) {this.batchSize = Integer.parseInt(batchSize);}String subscribeFilter = properties.getProperty(RocketMQConstants.ROCKETMQ_SUBSCRIBE_FILTER);if (StringUtils.isNotEmpty(subscribeFilter)) {this.filter = subscribeFilter;}}

CanalRabbitMQConsumer:rabbitmq 消费相关属性

@SPI("rabbitmq")
public class CanalRabbitMQConsumer implements CanalMsgConsumer {private static final Logger                                logger              = LoggerFactory.getLogger(CanalRabbitMQConsumer.class);// 链接地址private String                                             nameServer;// 主机名private String                                             vhost;private String                                             queueName;// 一些鉴权信息private String                                             accessKey;private String                                             secretKey;private Long                                               resourceOwnerId;private String                                             username;private String                                             password;private boolean                                            flatMessage;private Connection                                         connect;private Channel                                            channel;private long                                               batchProcessTimeout = 60 * 1000;private BlockingQueue<ConsumerBatchMessage<CommonMessage>> messageBlockingQueue;private volatile ConsumerBatchMessage<CommonMessage>       lastGetBatchMessage = null;@Overridepublic void init(Properties properties, String topic, String groupId) {this.nameServer = PropertiesUtils.getProperty(properties, "rabbitmq.host");this.vhost = PropertiesUtils.getProperty(properties, "rabbitmq.virtual.host");this.queueName = topic;this.accessKey = PropertiesUtils.getProperty(properties, CanalConstants.CANAL_ALIYUN_ACCESS_KEY);this.secretKey = PropertiesUtils.getProperty(properties, CanalConstants.CANAL_ALIYUN_SECRET_KEY);this.username = PropertiesUtils.getProperty(properties, RabbitMQConstants.RABBITMQ_USERNAME);this.password = PropertiesUtils.getProperty(properties, RabbitMQConstants.RABBITMQ_PASSWORD);Long resourceOwnerIdPro = (Long) properties.get(RabbitMQConstants.RABBITMQ_RESOURCE_OWNERID);

*******************

示例

创建 mysql实例

docker run -it -d --net fixed --ip 172.18.0.2 -p 3306:3306 --privileged=true \
--name mysql -e  MYSQL_ROOT_PASSWORD=123456 mysql# 创建用户、并授权
mysql> create user canal identified with mysql_native_password by "123456";
Query OK, 0 rows affected (0.01 sec)mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
Query OK, 0 rows affected (0.00 sec)mysql> flush privileges;
Query OK, 0 rows affected (0.00 sec)# 创建数据库、表
mysql> create database lihu;
Query OK, 1 row affected (0.35 sec)mysql> use lihu;
Database changedmysql> create table test(id int not null auto_increment primary key, value int not null);
Query OK, 0 rows affected (0.67 sec)mysql> create table test2(id int not null auto_increment primary key, value int not null);
Query OK, 0 rows affected (0.27 sec)

创建 canal server

docker run -it -d --net fixed --ip 172.18.0.3 -p 11111:11111 --name canal-server \
-v /usr/canal/mq/conf:/home/admin/canal-server/conf  canal/canal-server# canal.properties
canal.serverMode = rocketmq
rocketmq.namesrv.addr = 172.18.0.4:9876# instance.properties
canal.instance.master.address=172.18.0.2:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=123456canal.mq.topic=example
canal.mq.dynamicTopic=.*canal.mq.partition=0
canal.mq.partitionsNum=3
canal.mq.partitionHash=.*\\..*

创建 rocketmq(namesrv、broker)

docker run -it -d --net fixed --ip 172.18.0.4 -p 9876:9876 \
-e JAVA_OPT="-server -Xms256m -Xmx256m -Xmn128m" \
--name namesrv apacherocketmq/rocketmq:4.9.0-alpine sh mqnamesrvdocker run -it -d --net fixed --ip 172.18.0.5 -p 10911:10911 -p 10909:10909 \
-e NAMESRV_ADDR="172.18.0.4:9876" \
-e JAVA_OPT="-server -Xms2g -Xmx2g -Xmn1g" \
-v /usr/rocketmq/test/broker.conf:/home/rocketmq/rocketmq-4.9.0/conf/broker.conf \
--name broker apacherocketmq/rocketmq:4.9.0-alpine \
sh mqbroker autoCreateTopicEnable=true -c /home/rocketmq/rocketmq-4.9.0/conf/broker.conf

创建 canal adapter

docker run -it -d --net fixed --ip 172.18.0.6 -p 8081:8081 \
-v /usr/canal/mq/adapter/conf/application.yml:/opt/canal-adapter/conf/application.yml \
-v /usr/canal/mq/adapter/conf/rdb:/opt/canal-adapter/conf/rdb \
--name canal-adapter slpcat/canal-adapter:v1.1.5-jdk8****************
application.yml:启动器配置server:port: 8081
spring:jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8default-property-inclusion: non_nullcanal.conf:mode: rocketMQ #tcp kafka rocketMQ rabbitMQflatMessage: truezookeeperHosts:syncBatchSize: 1000retries: 0timeout:accessKey:secretKey:consumerProperties:rocketmq.namespace:rocketmq.namesrv.addr: 172.18.0.4:9876rocketmq.batch.size: 1000rocketmq.enable.message.trace: falserocketmq.customized.trace.topic:rocketmq.access.channel:rocketmq.subscribe.filter:srcDataSources:defaultDS:url: jdbc:mysql://172.18.0.2:3306/lihu?useUnicode=trueusername: rootpassword: 123456canalAdapters:- instance: lihu # canal instance Name or mq topic namegroups:- groupId: g1outerAdapters:- name: rdbkey: mysql1properties:jdbc.driverClassName: com.mysql.jdbc.Driverjdbc.url: jdbc:mysql://172.18.0.7:3306/lihu?useUnicode=truejdbc.username: rootjdbc.password: 123456****************
rdb/mytest_user.yml:适配器配置dataSourceKey: defaultDS
destination: lihu
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:mirrorDb: truedatabase: lihu

创建 mysql2实例(目标数据库)

docker run -it -d --net fixed --ip 172.18.0.7 -p 3307:3306 --privileged=true \
--name mysql2 -e  MYSQL_ROOT_PASSWORD=123456 mysql# 创建数据库(不需要创建表)
mysql> create database lihu;
Query OK, 1 row affected (0.00 sec)

*******************

使用测试

源数据库 mysql:插入数据

mysql> insert into test(id,value) values(1,2);
Query OK, 1 row affected (0.11 sec)mysql> insert into test(id,value) values(2,2);
Query OK, 1 row affected (0.15 sec)mysql> insert into test2(id,value) values(1,2);
Query OK, 1 row affected (0.03 sec)mysql> insert into test2(id,value) values(2,2);
Query OK, 1 row affected (0.11 sec)

canal adapter 日志

2021-07-14 22:36:09.564 [pool-7-thread-1] DEBUG c.a.o.c.c.adapter.rdb.service.RdbMirrorDbSyncService - DDL: {"data":null,"database":"lihu","destination":"lihu","es":1626273340000,"groupId":"g1","isDdl":true,"old":null,"pkNames":null,"sql":"create table test(id int not null auto_increment primary key, value int not null)","table":"test","ts":1626273345726,"type":"CREATE"}
2021-07-14 22:42:00.785 [pool-6-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"id":"2","value":"2"},"database":"lihu","destination":"lihu","old":null,"table":"test","type":"INSERT"}
2021-07-14 22:44:48.353 [pool-7-thread-1] DEBUG c.a.o.c.c.adapter.rdb.service.RdbMirrorDbSyncService - DDL: {"data":null,"database":"lihu","destination":"lihu","es":1626273887000,"groupId":"g1","isDdl":true,"old":null,"pkNames":null,"sql":"create table test2(id int not null auto_increment primary key, value int not null)","table":"test2","ts":1626273888331,"type":"CREATE"}
2021-07-14 22:44:54.400 [pool-5-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"id":"1","value":"2"},"database":"lihu","destination":"lihu","old":null,"table":"test2","type":"INSERT"}
2021-07-14 22:44:57.226 [pool-6-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"id":"2","value":"2"},"database":"lihu","destination":"lihu","old":null,"table":"test2","type":"INSERT"}
2021-07-14 22:57:26.640 [pool-7-thread-1] DEBUG c.a.o.c.c.adapter.rdb.service.RdbMirrorDbSyncService - DDL: {"data":null,"database":"lihu","destination":"lihu","es":1626274646000,"groupId":"g1","isDdl":true,"old":null,"pkNames":null,"sql":"create table test3(id int not null auto_increment primary key, value int not null)","table":"test3","ts":1626274646540,"type":"CREATE"}
2021-07-14 22:57:34.374 [pool-6-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"id":"2","value":"2"},"database":"lihu","destination":"lihu","old":null,"table":"test3","type":"INSERT"}
2021-07-14 22:59:08.462 [pool-5-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"id":"1","value":"2"},"database":"lihu","destination":"lihu","old":null,"table":"test","type":"DELETE"}
2021-07-14 22:59:16.007 [pool-5-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"id":"1","value":"2"},"database":"lihu","destination":"lihu","old":null,"table":"test","type":"INSERT"}

目标数据库 mysql2:查看数据

mysql> select * from test;
+----+-------+
| id | value |
+----+-------+
|  1 |     2 |
|  2 |     2 |
+----+-------+
2 rows in set (0.00 sec)mysql> select * from test2;
+----+-------+
| id | value |
+----+-------+
|  1 |     2 |
|  2 |     2 |
+----+-------+

mysql2 可以同步 mysql中的数据

canal mq数据同步相关推荐

  1. php阿里的同步工具canal,基于阿里的Canal实现数据同步

    一.开启同步数据库的binlog功能 (1)开启同步数据端的数据库服务(比如我的将一号虚拟机上的mysql数据库作为同步操作数据库) systemctl start mysql.service mys ...

  2. canal mysql 数据同步

    首先canal是什么呢? canal是阿里巴巴旗下的一款开源项目,纯Java开发.基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL:简单来说,canal 会将自己伪装成 ...

  3. canal mysql数据同步mysql

    前言 canal 数据实时同步,读取Mysql Binlog 日志, 首先需要开启Binlog日志 一.canal 是什么? canal,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量 ...

  4. 系统重构数据同步利器之Canal实战篇

    一.背景 二话不说,先上图 上图来自于官网(https://github.com/alibaba/canal),基本上涵盖了目前生产环境使用场景了,众所周知,Canal做数据同步已经是行业内标杆了.我 ...

  5. 大数据同步工具Canal

    目录 1 什么是canal 2 canal能做什么 3 如何搭建canal 3.1 首先有一个MySQL服务器 3.2 安装canal 4 Java客户端操作 5 总结 6 ClientAdapter ...

  6. 数据同步的解决方案Canal

    Canal实现数据同步的原理: 1.是根据模拟mysql slave的主从交互协议,伪装自己是mysql slave,向mysql master发送dump请求. 2.mysql master收到du ...

  7. liunx下通过Canal将MySQL数据同步到Elasticsearch

    liunx下通过Canal将MySQL数据同步到Elasticsearch 一.canal背景信息 Canal是Github中开源的ETL(Extract Transform Load)软件 cana ...

  8. Canal Mysql binlog 同步至 Hbase ES

    文章目录 一.Canal介绍 工作原理 canal 工作原理 二.下载 三.安装使用 Mysql准备 canal 安装 解压缩 canal-deployer 配置修改 启动 查看server日志 查看 ...

  9. 两个mysql数据同步

    使用同步canal两个mysql数据 使用同步canal两个mysql数据 使用同步canal两个mysql数据 在日常项目开发时候,经常遇到两个mysql库 中表数据需要同步的情况,基以此推荐使用c ...

最新文章

  1. php 实现对称加密算法,PHP实现简单的对称加密和解密方法
  2. 跟进table_cache参数
  3. mysql导出bacpac_数据库的迁移
  4. 每日一题之 MySQL
  5. JavaFX游戏(四连环)
  6. csvn(apache+svn)管理工具搭建
  7. echarts折线图怎么从y轴开始_基于echarts的双y轴实时更新折线图
  8. python stdin read_python 3:使用readahead从stdin管道读取字节
  9. android 双系统 一加5,[一加2][双ROM]一加手机2安装双系统教程
  10. java多线程设计wait、notify、notifyall、synchronized的使用机制
  11. bzoj 1624: [Usaco2008 Open] Clear And Present Danger 寻宝之路(floyd)
  12. 2.Netty的粘包、拆包(一)
  13. 用长按键重复输入 - Mac OS X Lion
  14. 手机工商银行显示服务器安装不了,工行网银助手无法安装怎么办?
  15. 过渡属性transition详解
  16. JAVA——从基础学起(五)类和对象
  17. SysTick系统滴答定时器
  18. 一个无需会员就可以看Netflix的安卓App
  19. python爬取拉钩python数据分析职位招聘信息
  20. 学习Linux有哪些工作方向?

热门文章

  1. mysql如何插入图片和视频_mysql中怎样插入图片
  2. C++中strcmp的用法
  3. 【Cxinny】数据结构与算法
  4. 农村没网络怎样安监控,家里没有wifi安哪种监控器
  5. 三个限免网站,助你白嫖正版付费软件/游戏
  6. 测试服务器网站并发,Nginx服务器10000 并发 优化测试(ab测试工具)
  7. “剑指Offer”数据结构篇:java实现
  8. Java学习笔记(视频:韩顺平老师)1.0
  9. 关于波特率与字节传输速率计算
  10. Android开发中WIFI和GPRS网络的切换