canal mq数据同步
canal mq数据同步
官网:https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart
*******************
简 介
tcp mode:客户端直连canal server
mq mode:canal server将binlog数据发送到mq,1.1.5支持kafka、rocketmq、rabbitmq
*******************
canal server mq配置
canal.properties
##################################################
######### MQ Properties #############
##################################################
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =
canal.aliyun.uid=canal.mq.flatMessage = true
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = localcanal.mq.database.hash = true
canal.mq.send.thread.size = 30
canal.mq.build.thread.size = 8##################################################
######### Kafka #############
##################################################
kafka.bootstrap.servers = 127.0.0.1:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0kafka.kerberos.enable = false
kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"##################################################
######### RocketMQ #############
##################################################
rocketmq.producer.group = test
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = 127.0.0.1:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag = ##################################################
######### RabbitMQ #############
##################################################
rabbitmq.host =
rabbitmq.virtual.host =
rabbitmq.exchange =
rabbitmq.username =
rabbitmq.password =
rabbitmq.deliveryMode =
example/instance.properties
# 静态topic:消息发送的分区为example
canal.mq.topic=example# 动态topic:根据数据库、表动态设置发送的topic
canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*# 静态分区:数据发送的分区
canal.mq.partition=0# 动态分区:根据数据库、表设置返送的分区
canal.mq.partitionsNum=3
canal.mq.partitionHash=test.table:id^name,.*\\..*# 为不同的topic动态设置分区数
canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6 #test.*:test开头的topic分区数为4#mycanal:6 topic mycanal分区数为6# 动态topic:canal.mq.dynamicTopic
test:test库中的所有数据都发送到test topic上
.*:数据库的数据都发到对应数据库名的topic上mytest1.user:发送到mytest1_user topic上
mytest2\\..*:发送到mytest2_tableName topic上topicName:.*:所有数据库的数据都发到topicName上
topicName:test\\..*:test下的所有表都发送到topicName上test,test1\\.*:数据库test1中的表发送到test1_tableName topic上数据库test中的所有表发送到test topic上其余所有数据发送到canal.mq.topic指定的topic上# 动态分区:
.*\\..*:id:hash字段为id
.*\\..*:id^name:hash字段为id、name
.*\\..*:$pk$:hash字段为主键(自动查找)
.*\\..*:根据tableName hash
partitionHash为空:发送到默认分区 0
test.test,test.test2:id:test.test根据表名test hashtest.test2根据id hash其余发送到对应topic的0分区
CanalKafkaProducer:kafka消息发送类
@SuppressWarnings({ "rawtypes", "unchecked" })
@SPI("kafka")
public class CanalKafkaProducer extends AbstractMQProducer implements CanalMQProducer {private static final Logger logger = LoggerFactory.getLogger(CanalKafkaProducer.class);private static final String PREFIX_KAFKA_CONFIG = "kafka.";private Producer<String, byte[]> producer;@Overridepublic void init(Properties properties) {KafkaProducerConfig kafkaProducerConfig = new KafkaProducerConfig();this.mqProperties = kafkaProducerConfig;super.init(properties);// load propertiesthis.loadKafkaProperties(properties);Properties kafkaProperties = new Properties();kafkaProperties.putAll(kafkaProducerConfig.getKafkaProperties());kafkaProperties.put("max.in.flight.requests.per.connection", 1);kafkaProperties.put("key.serializer", StringSerializer.class);if (kafkaProducerConfig.isKerberosEnabled()) {File krb5File = new File(kafkaProducerConfig.getKrb5File());File jaasFile = new File(kafkaProducerConfig.getJaasFile());if (krb5File.exists() && jaasFile.exists()) {// 配置kerberos认证,需要使用绝对路径System.setProperty("java.security.krb5.conf", krb5File.getAbsolutePath());System.setProperty("java.security.auth.login.config", jaasFile.getAbsolutePath());System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");kafkaProperties.put("security.protocol", "SASL_PLAINTEXT");kafkaProperties.put("sasl.kerberos.service.name", "kafka");} else {String errorMsg = "ERROR # The kafka kerberos configuration file does not exist! please check it";logger.error(errorMsg);throw new RuntimeException(errorMsg);}}kafkaProperties.put("value.serializer", KafkaMessageSerializer.class);producer = new KafkaProducer<>(kafkaProperties);}private void loadKafkaProperties(Properties properties) {//加载相关配置KafkaProducerConfig kafkaProducerConfig = (KafkaProducerConfig) this.mqProperties;Map<String, Object> kafkaProperties = kafkaProducerConfig.getKafkaProperties();// 兼容下<=1.1.4的mq配置doMoreCompatibleConvert("canal.mq.servers", "kafka.bootstrap.servers", properties);doMoreCompatibleConvert("canal.mq.acks", "kafka.acks", properties);doMoreCompatibleConvert("canal.mq.compressionType", "kafka.compression.type", properties);doMoreCompatibleConvert("canal.mq.retries", "kafka.retries", properties);doMoreCompatibleConvert("canal.mq.batchSize", "kafka.batch.size", properties);doMoreCompatibleConvert("canal.mq.lingerMs", "kafka.linger.ms", properties);doMoreCompatibleConvert("canal.mq.maxRequestSize", "kafka.max.request.size", properties);doMoreCompatibleConvert("canal.mq.bufferMemory", "kafka.buffer.memory", properties);doMoreCompatibleConvert("canal.mq.kafka.kerberos.enable", "kafka.kerberos.enable", properties);doMoreCompatibleConvert("canal.mq.kafka.kerberos.krb5.file", "kafka.kerberos.krb5.file", properties);doMoreCompatibleConvert("canal.mq.kafka.kerberos.jaas.file", "kafka.kerberos.jaas.file", properties);
RocketMQConstants:rocketmq producer属性配置
public class RocketMQConstants {public static final String ROOT = "rocketmq";public static final String ROCKETMQ_PRODUCER_GROUP = ROOT + "." + "producer.group";public static final String ROCKETMQ_ENABLE_MESSAGE_TRACE = ROOT + "." + "enable.message.trace";public static final String ROCKETMQ_CUSTOMIZED_TRACE_TOPIC = ROOT + "." + "customized.trace.topic";public static final String ROCKETMQ_NAMESPACE = ROOT + "." + "namespace";public static final String ROCKETMQ_NAMESRV_ADDR = ROOT + "." + "namesrv.addr";public static final String ROCKETMQ_RETRY_TIMES_WHEN_SEND_FAILED = ROOT + "." + "retry.times.when.send.failed";public static final String ROCKETMQ_VIP_CHANNEL_ENABLED = ROOT + "." + "vip.channel.enabled";public static final String ROCKETMQ_TAG = ROOT + "." + "tag";public static final String ROCKETMQ_ACCESS_CHANNEL = ROOT + "." + "access.channel";public static final String ROCKETMQ_BATCH_SIZE = ROOT + "." + "batch.size";public static final String ROCKETMQ_SUBSCRIBE_FILTER = ROOT + "." + "subscribe.filter";}
RabbitMQConstants:rabbitmq producer属性配置
public class RabbitMQConstants {public static final String ROOT = "rabbitmq";public static final String RABBITMQ_HOST = ROOT + "." + "host";public static final String RABBITMQ_EXCHANGE = ROOT + "." + "exchange";public static final String RABBITMQ_VIRTUAL_HOST = ROOT + "." + "virtual.host";public static final String RABBITMQ_USERNAME = ROOT + "." + "username";public static final String RABBITMQ_PASSWORD = ROOT + "." + "password";public static final String RABBITMQ_RESOURCE_OWNERID = ROOT + "." + "rabbitmq.resource.ownerId";
}
*******************
canal adapter mq属性配置
application.yml:启动器配置
server:port: 8081
spring:jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8default-property-inclusion: non_nullcanal.conf:mode: tcp #tcp kafka rocketMQ rabbitMQflatMessage: truezookeeperHosts:syncBatchSize: 1000retries: 0timeout:accessKey:secretKey:consumerProperties:# canal tcp consumercanal.tcp.server.host: 127.0.0.1:11111canal.tcp.zookeeper.hosts:canal.tcp.batch.size: 500canal.tcp.username:canal.tcp.password:# kafka consumerkafka.bootstrap.servers: 127.0.0.1:9092kafka.enable.auto.commit: falsekafka.auto.commit.interval.ms: 1000kafka.auto.offset.reset: latestkafka.request.timeout.ms: 40000kafka.session.timeout.ms: 30000kafka.isolation.level: read_committedkafka.max.poll.records: 1000# rocketMQ consumerrocketmq.namespace:rocketmq.namesrv.addr: 127.0.0.1:9876rocketmq.batch.size: 1000rocketmq.enable.message.trace: falserocketmq.customized.trace.topic:rocketmq.access.channel:rocketmq.subscribe.filter:# rabbitMQ consumerrabbitmq.host:rabbitmq.virtual.host:rabbitmq.username:rabbitmq.password:rabbitmq.resource.ownerId:# srcDataSources:
# defaultDS:
# url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
# username: root
# password: 121212canalAdapters:- instance: example # canal instance Name or mq topic namegroups:- groupId: g1outerAdapters:- name: logger
CanalKafkaConsumer:kafka. 开头的属性均可读取
@SPI("kafka")
public class CanalKafkaConsumer implements CanalMsgConsumer {private static final String PREFIX_KAFKA_CONFIG = "kafka.";private KafkaConsumer<String, ?> kafkaConsumer;private boolean flatMessage = true;private String topic;private Map<Integer, Long> currentOffsets = new ConcurrentHashMap<>();private Properties kafkaProperties = new Properties();@Overridepublic void init(Properties properties, String topic, String groupId) {this.topic = topic;Boolean flatMessage = (Boolean) properties.get(CanalConstants.CANAL_MQ_FLAT_MESSAGE);if (flatMessage != null) {this.flatMessage = flatMessage;}for (Map.Entry<Object, Object> entry : properties.entrySet()) {String k = (String) entry.getKey();Object v = entry.getValue();if (k.startsWith(PREFIX_KAFKA_CONFIG) && v != null) {kafkaProperties.put(k.substring(PREFIX_KAFKA_CONFIG.length()), v);} //读取所有 kafka. 开头的属性}kafkaProperties.put("group.id", groupId);kafkaProperties.put("key.deserializer", StringDeserializer.class);kafkaProperties.put("client.id", UUID.randomUUID().toString().substring(0, 6));}@Overridepublic void connect() {if (this.flatMessage) {kafkaProperties.put("value.deserializer", StringDeserializer.class);this.kafkaConsumer = new KafkaConsumer<String, String>(kafkaProperties);} else {kafkaProperties.put("value.deserializer", KafkaMessageDeserializer.class);this.kafkaConsumer = new KafkaConsumer<String, Message>(kafkaProperties);}kafkaConsumer.subscribe(Collections.singletonList(topic));}
CanalRocketMQConsumer:rocketmq 消费相关属性
@SPI("rocketmq")
public class CanalRocketMQConsumer implements CanalMsgConsumer {private static final Logger logger = LoggerFactory.getLogger(CanalRocketMQConsumer.class);private static final String CLOUD_ACCESS_CHANNEL = "cloud";private String nameServer;private String topic;private String groupName;private DefaultMQPushConsumer rocketMQConsumer;private BlockingQueue<ConsumerBatchMessage<CommonMessage>> messageBlockingQueue;private int batchSize = -1;private long batchProcessTimeout = 60 * 1000;private boolean flatMessage;private volatile ConsumerBatchMessage<CommonMessage> lastGetBatchMessage = null;private String accessKey;private String secretKey;private String customizedTraceTopic;private boolean enableMessageTrace = false;private String accessChannel;private String namespace;private String filter = "*";@Overridepublic void init(Properties properties, String topic, String groupName) {this.topic = topic;this.groupName = groupName;this.flatMessage = (Boolean) properties.get(CanalConstants.CANAL_MQ_FLAT_MESSAGE);this.messageBlockingQueue = new LinkedBlockingQueue<>(1024);this.accessKey = properties.getProperty(CanalConstants.CANAL_ALIYUN_ACCESS_KEY);this.secretKey = properties.getProperty(CanalConstants.CANAL_ALIYUN_SECRET_KEY);String enableMessageTrace = properties.getProperty(RocketMQConstants.ROCKETMQ_ENABLE_MESSAGE_TRACE);if (StringUtils.isNotEmpty(enableMessageTrace)) {this.enableMessageTrace = Boolean.parseBoolean(enableMessageTrace);}this.customizedTraceTopic = properties.getProperty(RocketMQConstants.ROCKETMQ_CUSTOMIZED_TRACE_TOPIC);this.accessChannel = properties.getProperty(RocketMQConstants.ROCKETMQ_ACCESS_CHANNEL);this.namespace = properties.getProperty(RocketMQConstants.ROCKETMQ_NAMESPACE);this.nameServer = properties.getProperty(RocketMQConstants.ROCKETMQ_NAMESRV_ADDR);String batchSize = properties.getProperty(RocketMQConstants.ROCKETMQ_BATCH_SIZE);if (StringUtils.isNotEmpty(batchSize)) {this.batchSize = Integer.parseInt(batchSize);}String subscribeFilter = properties.getProperty(RocketMQConstants.ROCKETMQ_SUBSCRIBE_FILTER);if (StringUtils.isNotEmpty(subscribeFilter)) {this.filter = subscribeFilter;}}
CanalRabbitMQConsumer:rabbitmq 消费相关属性
@SPI("rabbitmq")
public class CanalRabbitMQConsumer implements CanalMsgConsumer {private static final Logger logger = LoggerFactory.getLogger(CanalRabbitMQConsumer.class);// 链接地址private String nameServer;// 主机名private String vhost;private String queueName;// 一些鉴权信息private String accessKey;private String secretKey;private Long resourceOwnerId;private String username;private String password;private boolean flatMessage;private Connection connect;private Channel channel;private long batchProcessTimeout = 60 * 1000;private BlockingQueue<ConsumerBatchMessage<CommonMessage>> messageBlockingQueue;private volatile ConsumerBatchMessage<CommonMessage> lastGetBatchMessage = null;@Overridepublic void init(Properties properties, String topic, String groupId) {this.nameServer = PropertiesUtils.getProperty(properties, "rabbitmq.host");this.vhost = PropertiesUtils.getProperty(properties, "rabbitmq.virtual.host");this.queueName = topic;this.accessKey = PropertiesUtils.getProperty(properties, CanalConstants.CANAL_ALIYUN_ACCESS_KEY);this.secretKey = PropertiesUtils.getProperty(properties, CanalConstants.CANAL_ALIYUN_SECRET_KEY);this.username = PropertiesUtils.getProperty(properties, RabbitMQConstants.RABBITMQ_USERNAME);this.password = PropertiesUtils.getProperty(properties, RabbitMQConstants.RABBITMQ_PASSWORD);Long resourceOwnerIdPro = (Long) properties.get(RabbitMQConstants.RABBITMQ_RESOURCE_OWNERID);
*******************
示例
创建 mysql实例
docker run -it -d --net fixed --ip 172.18.0.2 -p 3306:3306 --privileged=true \
--name mysql -e MYSQL_ROOT_PASSWORD=123456 mysql# 创建用户、并授权
mysql> create user canal identified with mysql_native_password by "123456";
Query OK, 0 rows affected (0.01 sec)mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
Query OK, 0 rows affected (0.00 sec)mysql> flush privileges;
Query OK, 0 rows affected (0.00 sec)# 创建数据库、表
mysql> create database lihu;
Query OK, 1 row affected (0.35 sec)mysql> use lihu;
Database changedmysql> create table test(id int not null auto_increment primary key, value int not null);
Query OK, 0 rows affected (0.67 sec)mysql> create table test2(id int not null auto_increment primary key, value int not null);
Query OK, 0 rows affected (0.27 sec)
创建 canal server
docker run -it -d --net fixed --ip 172.18.0.3 -p 11111:11111 --name canal-server \
-v /usr/canal/mq/conf:/home/admin/canal-server/conf canal/canal-server# canal.properties
canal.serverMode = rocketmq
rocketmq.namesrv.addr = 172.18.0.4:9876# instance.properties
canal.instance.master.address=172.18.0.2:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=123456canal.mq.topic=example
canal.mq.dynamicTopic=.*canal.mq.partition=0
canal.mq.partitionsNum=3
canal.mq.partitionHash=.*\\..*
创建 rocketmq(namesrv、broker)
docker run -it -d --net fixed --ip 172.18.0.4 -p 9876:9876 \
-e JAVA_OPT="-server -Xms256m -Xmx256m -Xmn128m" \
--name namesrv apacherocketmq/rocketmq:4.9.0-alpine sh mqnamesrvdocker run -it -d --net fixed --ip 172.18.0.5 -p 10911:10911 -p 10909:10909 \
-e NAMESRV_ADDR="172.18.0.4:9876" \
-e JAVA_OPT="-server -Xms2g -Xmx2g -Xmn1g" \
-v /usr/rocketmq/test/broker.conf:/home/rocketmq/rocketmq-4.9.0/conf/broker.conf \
--name broker apacherocketmq/rocketmq:4.9.0-alpine \
sh mqbroker autoCreateTopicEnable=true -c /home/rocketmq/rocketmq-4.9.0/conf/broker.conf
创建 canal adapter
docker run -it -d --net fixed --ip 172.18.0.6 -p 8081:8081 \
-v /usr/canal/mq/adapter/conf/application.yml:/opt/canal-adapter/conf/application.yml \
-v /usr/canal/mq/adapter/conf/rdb:/opt/canal-adapter/conf/rdb \
--name canal-adapter slpcat/canal-adapter:v1.1.5-jdk8****************
application.yml:启动器配置server:port: 8081
spring:jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8default-property-inclusion: non_nullcanal.conf:mode: rocketMQ #tcp kafka rocketMQ rabbitMQflatMessage: truezookeeperHosts:syncBatchSize: 1000retries: 0timeout:accessKey:secretKey:consumerProperties:rocketmq.namespace:rocketmq.namesrv.addr: 172.18.0.4:9876rocketmq.batch.size: 1000rocketmq.enable.message.trace: falserocketmq.customized.trace.topic:rocketmq.access.channel:rocketmq.subscribe.filter:srcDataSources:defaultDS:url: jdbc:mysql://172.18.0.2:3306/lihu?useUnicode=trueusername: rootpassword: 123456canalAdapters:- instance: lihu # canal instance Name or mq topic namegroups:- groupId: g1outerAdapters:- name: rdbkey: mysql1properties:jdbc.driverClassName: com.mysql.jdbc.Driverjdbc.url: jdbc:mysql://172.18.0.7:3306/lihu?useUnicode=truejdbc.username: rootjdbc.password: 123456****************
rdb/mytest_user.yml:适配器配置dataSourceKey: defaultDS
destination: lihu
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:mirrorDb: truedatabase: lihu
创建 mysql2实例(目标数据库)
docker run -it -d --net fixed --ip 172.18.0.7 -p 3307:3306 --privileged=true \
--name mysql2 -e MYSQL_ROOT_PASSWORD=123456 mysql# 创建数据库(不需要创建表)
mysql> create database lihu;
Query OK, 1 row affected (0.00 sec)
*******************
使用测试
源数据库 mysql:插入数据
mysql> insert into test(id,value) values(1,2);
Query OK, 1 row affected (0.11 sec)mysql> insert into test(id,value) values(2,2);
Query OK, 1 row affected (0.15 sec)mysql> insert into test2(id,value) values(1,2);
Query OK, 1 row affected (0.03 sec)mysql> insert into test2(id,value) values(2,2);
Query OK, 1 row affected (0.11 sec)
canal adapter 日志
2021-07-14 22:36:09.564 [pool-7-thread-1] DEBUG c.a.o.c.c.adapter.rdb.service.RdbMirrorDbSyncService - DDL: {"data":null,"database":"lihu","destination":"lihu","es":1626273340000,"groupId":"g1","isDdl":true,"old":null,"pkNames":null,"sql":"create table test(id int not null auto_increment primary key, value int not null)","table":"test","ts":1626273345726,"type":"CREATE"}
2021-07-14 22:42:00.785 [pool-6-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"id":"2","value":"2"},"database":"lihu","destination":"lihu","old":null,"table":"test","type":"INSERT"}
2021-07-14 22:44:48.353 [pool-7-thread-1] DEBUG c.a.o.c.c.adapter.rdb.service.RdbMirrorDbSyncService - DDL: {"data":null,"database":"lihu","destination":"lihu","es":1626273887000,"groupId":"g1","isDdl":true,"old":null,"pkNames":null,"sql":"create table test2(id int not null auto_increment primary key, value int not null)","table":"test2","ts":1626273888331,"type":"CREATE"}
2021-07-14 22:44:54.400 [pool-5-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"id":"1","value":"2"},"database":"lihu","destination":"lihu","old":null,"table":"test2","type":"INSERT"}
2021-07-14 22:44:57.226 [pool-6-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"id":"2","value":"2"},"database":"lihu","destination":"lihu","old":null,"table":"test2","type":"INSERT"}
2021-07-14 22:57:26.640 [pool-7-thread-1] DEBUG c.a.o.c.c.adapter.rdb.service.RdbMirrorDbSyncService - DDL: {"data":null,"database":"lihu","destination":"lihu","es":1626274646000,"groupId":"g1","isDdl":true,"old":null,"pkNames":null,"sql":"create table test3(id int not null auto_increment primary key, value int not null)","table":"test3","ts":1626274646540,"type":"CREATE"}
2021-07-14 22:57:34.374 [pool-6-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"id":"2","value":"2"},"database":"lihu","destination":"lihu","old":null,"table":"test3","type":"INSERT"}
2021-07-14 22:59:08.462 [pool-5-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"id":"1","value":"2"},"database":"lihu","destination":"lihu","old":null,"table":"test","type":"DELETE"}
2021-07-14 22:59:16.007 [pool-5-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"id":"1","value":"2"},"database":"lihu","destination":"lihu","old":null,"table":"test","type":"INSERT"}
目标数据库 mysql2:查看数据
mysql> select * from test;
+----+-------+
| id | value |
+----+-------+
| 1 | 2 |
| 2 | 2 |
+----+-------+
2 rows in set (0.00 sec)mysql> select * from test2;
+----+-------+
| id | value |
+----+-------+
| 1 | 2 |
| 2 | 2 |
+----+-------+
mysql2 可以同步 mysql中的数据
canal mq数据同步相关推荐
- php阿里的同步工具canal,基于阿里的Canal实现数据同步
一.开启同步数据库的binlog功能 (1)开启同步数据端的数据库服务(比如我的将一号虚拟机上的mysql数据库作为同步操作数据库) systemctl start mysql.service mys ...
- canal mysql 数据同步
首先canal是什么呢? canal是阿里巴巴旗下的一款开源项目,纯Java开发.基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL:简单来说,canal 会将自己伪装成 ...
- canal mysql数据同步mysql
前言 canal 数据实时同步,读取Mysql Binlog 日志, 首先需要开启Binlog日志 一.canal 是什么? canal,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量 ...
- 系统重构数据同步利器之Canal实战篇
一.背景 二话不说,先上图 上图来自于官网(https://github.com/alibaba/canal),基本上涵盖了目前生产环境使用场景了,众所周知,Canal做数据同步已经是行业内标杆了.我 ...
- 大数据同步工具Canal
目录 1 什么是canal 2 canal能做什么 3 如何搭建canal 3.1 首先有一个MySQL服务器 3.2 安装canal 4 Java客户端操作 5 总结 6 ClientAdapter ...
- 数据同步的解决方案Canal
Canal实现数据同步的原理: 1.是根据模拟mysql slave的主从交互协议,伪装自己是mysql slave,向mysql master发送dump请求. 2.mysql master收到du ...
- liunx下通过Canal将MySQL数据同步到Elasticsearch
liunx下通过Canal将MySQL数据同步到Elasticsearch 一.canal背景信息 Canal是Github中开源的ETL(Extract Transform Load)软件 cana ...
- Canal Mysql binlog 同步至 Hbase ES
文章目录 一.Canal介绍 工作原理 canal 工作原理 二.下载 三.安装使用 Mysql准备 canal 安装 解压缩 canal-deployer 配置修改 启动 查看server日志 查看 ...
- 两个mysql数据同步
使用同步canal两个mysql数据 使用同步canal两个mysql数据 使用同步canal两个mysql数据 在日常项目开发时候,经常遇到两个mysql库 中表数据需要同步的情况,基以此推荐使用c ...
最新文章
- php 实现对称加密算法,PHP实现简单的对称加密和解密方法
- 跟进table_cache参数
- mysql导出bacpac_数据库的迁移
- 每日一题之 MySQL
- JavaFX游戏(四连环)
- csvn(apache+svn)管理工具搭建
- echarts折线图怎么从y轴开始_基于echarts的双y轴实时更新折线图
- python stdin read_python 3:使用readahead从stdin管道读取字节
- android 双系统 一加5,[一加2][双ROM]一加手机2安装双系统教程
- java多线程设计wait、notify、notifyall、synchronized的使用机制
- bzoj 1624: [Usaco2008 Open] Clear And Present Danger 寻宝之路(floyd)
- 2.Netty的粘包、拆包(一)
- 用长按键重复输入 - Mac OS X Lion
- 手机工商银行显示服务器安装不了,工行网银助手无法安装怎么办?
- 过渡属性transition详解
- JAVA——从基础学起(五)类和对象
- SysTick系统滴答定时器
- 一个无需会员就可以看Netflix的安卓App
- python爬取拉钩python数据分析职位招聘信息
- 学习Linux有哪些工作方向?