可以通过配置对源数据库的指定表同步指定字段,无需修改代码

具体配置如下,可通过字段映射关系,自动生成对应sql执行,

canal:server: 172.16.4.62:11111# canal instance名字destination: exampleserver-mode: tcp# 消费的时间间隔(s)timeout: 2# canal 的用户名username:# canal 的密码password:sync:# canal监听的数据库名- database: testtableInfo:# canal监听的表名- tableName: t_user# 本服务数据库同步的表名targetTableName: t_em_user# 字段映射,key:value多个逗号隔开   key为监控数据库表的字段,value为从库对应表的字段   *代表同步所有字段并且两个库表字段相同fieldMapping: "*"# id映射,key:value多个逗号隔开   key为监控数据库表的字段,value为从库对应表的字段,一样可以只写一个id: USER_CODE
#        - tableName: t_org
#          targetTableName: t_em_org
#          fieldMapping: "*"
#          id: ORG_ID- tableName: t_orgtargetTableName: my_orgfieldMapping: ORG_ID:id,ORG_NAME:nameid: ORG_ID:id

部署方式

1.下载canal  执行脚本解压
链接:https://pan.baidu.com/s/1yOPrihHwG9MfkiuiBkOD2Q?pwd=e171
mkdir /opt/canal
cd /opt/canal
tar zxvf canal.deployer-1.1.6.tar.gz
2.修改配置文件
vi conf/example/instance.properties
- 基本连接配置修改如下
canal.instance.master.address=192.168.4.31:3306  #数据库地址
canal.instance.dbUsername=root #数据库账号
canal.instance.dbPassword=1234 #数据库密码
canal.instance.defaultDatabaseName = test  #数据库
canal.instance.connectionCharset = UTF-8 #数据库编码

监听指定的数据库和表

#canal.instance.filter.regex=.*\\..*  (默认为所有的库所有的表)
# 只监听icpdb_dev数据库的这两个表
canal.instance.filter.regex=test.t_user,test.t_orgmysql 数据解析关注的表,Perl正则表达式.多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)
常见例子:
1.  所有表:.*   or  .*\\..*
2.  canal schema下所有表: canal\\..*
3.  canal下的以canal打头的表:canal\\.canal.*
4.  canal schema下的一张表:canal\\.test15.  多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)

检查mysql是否开启了bin_log日志

SHOW VARIABLES LIKE '%log_bin%'

没有开启则修改mysql配置

vi /etc/my.cnf

添加如下

log-bin=mysql-bin
binlog-format=ROW 

重启mysql

service mysqld restart;
进入canal的bin文件夹,启动,默认端口11111
./bin/startup.sh

java部署canal客户端

核心pom

<dependency><groupId>io.github.xizixuejie</groupId><artifactId>canal-spring-boot-starter</artifactId><version>0.0.2</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Mysql Connector --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency>

添加BaseCanalListener统一消费binLog日志

import com.google.common.collect.Maps;
import com.netinfo.cannel.mapper.BaseMapper;
import io.xzxj.canal.core.listener.EntryListener;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.compress.utils.Lists;
import org.apache.commons.lang.ObjectUtils;
import org.apache.poi.ss.formula.functions.T;
import org.springframework.beans.factory.annotation.Autowired;import java.text.MessageFormat;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;/*** @author yangguang* @date 2023年04月24日 14:57*/
@Data
@Slf4j
public class BaseCanalListener implements EntryListener<HashMap> {private BaseMapper baseMapper;/*** 数据库名字*/private String schemaName;/*** 表名字*/private String tableName;private String targetTableName;private String fieldMapping;private String id;private final String insertTemplate = "insert into {0} ({1}) values ({2})";private final String updateTemplate = "update {0} set {1} where {2}";private final String deleteTemplate = "delete from {0} where {1}";public void insert(HashMap t) {log.info("插入获取的数据\n"+t);String sql = getInsertSql(t);baseMapper.insert(sql);}public void update(HashMap before, HashMap after) {log.info("更新之前的数据\n"+before);log.info("更新之后的数据\n"+after);String sql = getUpdateSql(before,after);if(sql!=null)baseMapper.update(sql);}public void delete(HashMap t) {log.info("删除的数据\n"+t);String sql = getDeleteSql(t);baseMapper.delete(sql);}private String getDeleteSql(HashMap t) {//"delete from {0} where {2}";String[] params = new String[2];params[0] = targetTableName;params[1] = getWhereSql(t);return MessageFormat.format(deleteTemplate,params);}private String getUpdateSql(HashMap<String,String> before, HashMap after) {if(fieldMapping==null){fieldMapping = "*";}Map<String,String> updateFieldMap = Maps.newHashMap();Set<String> updateOriginField = before.keySet();if(!"*".equals(fieldMapping)){for (String kv : fieldMapping.split(",")) {String[] kvArr = kv.split(":");if(updateOriginField.contains(kvArr[0])){updateFieldMap.put(kvArr[1],toString(after.get(kvArr[0])));}}}else{for (Map.Entry<String, String> entry : before.entrySet()) {updateFieldMap.put(entry.getKey(),toString(after.get(entry.getKey())));}}if(updateFieldMap.isEmpty()){return null;}//updateTemplate = "update {0} set {1} where {2}";String[] params = new String[3];params[0] = targetTableName;StringBuilder sb = new StringBuilder();updateFieldMap.entrySet().forEach(entry->{sb.append(entry.getKey());sb.append("=");sb.append(entry.getValue());sb.append(",");});params[1] = sb.toString().substring(0,sb.length()-1);params[2] =getWhereSql(after);return MessageFormat.format(updateTemplate,params);}private String getWhereSql(HashMap<String,String> data) {StringBuilder sb = new StringBuilder();String originId,currentId;for (String idMapper : id.split(",")) {if(idMapper.contains(":")){String[] split = idMapper.split(":");originId=split[0];currentId=split[1];}else{originId=idMapper;currentId=idMapper;}sb.append(currentId+"="+data.get(originId));sb.append(" and ");}return sb.toString().substring(0,sb.length()-5);}private String getInsertSql(HashMap t) {//首先查询插入的字段if(fieldMapping==null){fieldMapping = "*";}List<String> originFieldList = Lists.newArrayList();List<String> newFieldList = Lists.newArrayList();List<String> newFieldValueList = Lists.newArrayList();if(!fieldMapping.equals("*")){//指定字段for (String kv : fieldMapping.split(",")) {String[] kvArr = kv.split(":");originFieldList.add(kvArr[0]);newFieldList.add(kvArr[1]);newFieldValueList.add(toString(t.get(kvArr[0])));}}else{for (Object o : t.keySet()) {originFieldList.add((String)o);newFieldList.add((String)o);newFieldValueList.add(toString(t.get(o)));}}String[] params = new String[3];params[0] = targetTableName;params[1] = String.join(",",newFieldList);params[2] = String.join(",",newFieldValueList);return MessageFormat.format(insertTemplate,params);}private String toString(Object obj){if(obj == null || obj.toString().length()==0){return null;}return "\""+ObjectUtils.toString(obj)+"\"";}
}

添加Mapper操作数据库

import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Update;
import org.springframework.stereotype.Repository;/*** @author yangguang* @date 2023年04月25日 9:48*/
@Repository
public interface BaseMapper {@Insert("${sql}")void insert(String sql);@Update("${sql}")void update(String sql);@Delete("${sql}")void delete(String sql);
}

添加CanalListenerBeanPostProcessor 读取配置文件,生成BaseCanalListener类


import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.*;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor;
import org.springframework.beans.factory.support.GenericBeanDefinition;
import org.springframework.context.EnvironmentAware;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;import java.util.List;/*** @author yangguang* @date 2023年04月24日 15:11*/
@Component
@Slf4j
public class CanalListenerBeanPostProcessor implements BeanPostProcessor, BeanDefinitionRegistryPostProcessor, EnvironmentAware {private Environment environment;@Overridepublic Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {return bean;}@Overridepublic void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {int dbNum=0;while (true){String database = getConfigByKey("canal.sync["+dbNum+"].database");if(StringUtils.isEmpty(database)){break;}int tableNum=0;while (true){String tableName = getConfigByKey("canal.sync["+dbNum+"].tableInfo["+tableNum+"].tableName");String targetTableName = getConfigByKey("canal.sync["+dbNum+"].tableInfo["+tableNum+"].targetTableName");String fieldMapping = getConfigByKey("canal.sync["+dbNum+"].tableInfo["+tableNum+"].fieldMapping");String id = getConfigByKey("canal.sync["+dbNum+"].tableInfo["+tableNum+"].id");if(StringUtils.isEmpty(tableName)){break;}String beanName = database+tableName;//一个数据库一个表一个监听类GenericBeanDefinition beandefinition=new GenericBeanDefinition();beandefinition.setBeanClassName("com.netinfo.cannel.config.BaseCanalListener");beandefinition.getPropertyValues().add("schemaName",database);beandefinition.getPropertyValues().add("tableName",tableName);beandefinition.getPropertyValues().add("targetTableName",targetTableName);beandefinition.getPropertyValues().add("fieldMapping",fieldMapping);beandefinition.getPropertyValues().add("id",id);beandefinition.getPropertyValues().add("baseMapper",new RuntimeBeanReference("baseMapper"));registry.registerBeanDefinition(beanName,beandefinition);tableNum++;}dbNum++;}}@Overridepublic void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {}@Overridepublic void setEnvironment(Environment environment) {this.environment = environment;}private String getConfigByKey(String key) {try{return environment.getProperty(key);}catch (Exception e){return null;}}
}
到此客户端搭建完成,但是canal-spring-boot-starter 西子小姐姐封装的不支持Map实体,并且没有重连机制,所以需要修改源码进行覆盖一下

io.xzxj.canal.spring.autoconfigure.TcpClientAutoConfiguration 类

package io.xzxj.canal.spring.autoconfigure;import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import io.xzxj.canal.core.client.TcpCanalClient;
import io.xzxj.canal.core.factory.EntryColumnConvertFactory;
import io.xzxj.canal.core.handler.IMessageHandler;
import io.xzxj.canal.core.handler.RowDataHandler;
import io.xzxj.canal.core.handler.impl.AsyncMessageHandlerImpl;
import io.xzxj.canal.core.handler.impl.RowDataHandlerImpl;
import io.xzxj.canal.core.handler.impl.SyncMessageHandlerImpl;
import io.xzxj.canal.core.listener.EntryListener;
import io.xzxj.canal.spring.properties.CanalProperties;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;import java.util.List;
import java.util.concurrent.ExecutorService;/*** @author xzxj* @date 2023/3/11 11:37*/
@Configuration
@EnableConfigurationProperties(CanalProperties.class)
@ConditionalOnProperty(value = "canal.server-mode", havingValue = "tcp")
@Import(value = {TcpCanalClient.class,ThreadPoolAutoConfiguration.class})
public class TcpClientAutoConfiguration {private final CanalProperties canalProperties;public TcpClientAutoConfiguration(CanalProperties canalProperties) {this.canalProperties = canalProperties;}@Beanpublic RowDataHandler<CanalEntry.RowData> rowDataHandler() {return new RowDataHandlerImpl(new EntryColumnConvertFactory());}@Bean@ConditionalOnProperty(value = "canal.async", havingValue = "true", matchIfMissing = true)public IMessageHandler<Message> asyncMessageHandler(List<EntryListener<?>> entryListenerList,RowDataHandler<CanalEntry.RowData> rowDataHandler,ExecutorService executorService) {return new AsyncMessageHandlerImpl(entryListenerList, rowDataHandler, executorService);}@Bean@ConditionalOnProperty(value = "canal.async", havingValue = "false")public IMessageHandler<Message> syncMessageHandler(List<EntryListener<?>> entryListenerList,RowDataHandler<CanalEntry.RowData> rowDataHandler) {return new SyncMessageHandlerImpl(entryListenerList, rowDataHandler);}}

io.xzxj.canal.core.client.AbstractCanalClient

package io.xzxj.canal.core.client;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
import io.xzxj.canal.core.handler.IMessageHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.concurrent.TimeUnit;/*** @author xzxj* @date 2023/3/11 10:33*/
public abstract class AbstractCanalClient implements ICanalClient {private static final Logger log = LoggerFactory.getLogger(AbstractCanalClient.class);protected volatile boolean runStatus;private Thread thread;protected CanalConnector connector;protected IMessageHandler messageHandler;protected String filter = "";protected Integer batchSize = 1;protected Long timeout = 1L;protected TimeUnit unit = TimeUnit.SECONDS;@Overridepublic void init() {startConnect();}private void startConnect(){log.info("canal client init");this.connectCanal();thread = new Thread(this::handleListening);thread.setName("canal-client-thread");runStatus = true;thread.start();}private void connectCanal() {try {log.info("canal client connecting");connector.connect();this.subscribe();log.info("canal client connect success");} catch (CanalClientException e) {log.error("canal client connect error: {}", e.getMessage(), e);this.destroy();}}public void subscribe() {connector.subscribe(filter);}@Overridepublic void destroy() {try {log.info("canal client destroy");connector.unsubscribe();}catch (Exception e){log.error("canal client connect exception");}if (thread != null) {thread.interrupt();}runStatus = false;}private void sleep(int num){try {Thread.sleep(num);}catch (Exception e){}}
}
io.xzxj.canal.core.client.TcpCanalClient
package io.xzxj.canal.core.client;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.Message;
import io.xzxj.canal.core.handler.IMessageHandler;
import io.xzxj.canal.spring.properties.CanalProperties;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;/*** @author xzxj* @date 2023/3/11 10:51*/
public class TcpCanalClient extends AbstractCanalClient {private static final Logger log = LoggerFactory.getLogger(TcpCanalClient.class);@Autowiredprivate CanalProperties canalProperties;@Autowiredprivate IMessageHandler messageHandler;private static TcpCanalClient runClient ;@PostConstructpublic void initClient(){String server = canalProperties.getServer();String[] array = server.split(":");TcpCanalClient.builder().hostname(array[0]).port(Integer.parseInt(array[1])).destination(canalProperties.getDestination()).username(canalProperties.getUsername()).password(canalProperties.getPassword()).messageHandler(messageHandler).batchSize(canalProperties.getBatchSize()).filter(canalProperties.getFilter()).timeout(canalProperties.getTimeout()).unit(canalProperties.getUnit()).build().init();runClient = this;}@Overridepublic void handleListening() {try {while (runStatus) {Message message = connector.getWithoutAck(batchSize, timeout, unit);log.debug("receive message={}", message);long batchId = message.getId();if (message.getId() != -1 && message.getEntries().size() != 0) {messageHandler.handleMessage(message);}connector.ack(batchId);}} catch (Exception e) {log.error("canal client exception", e);runClient.reConnect();}}private void reConnect() {log.error("canal服务端断开连接,10s后准备重连");try {Thread.sleep(10000);}catch (Exception e){}initClient();}public static Builder builder() {return new Builder();}public static class Builder {private String filter = StringUtils.EMPTY;private Integer batchSize = 1;private Long timeout = 1L;private TimeUnit unit = TimeUnit.SECONDS;private String hostname;private Integer port;private String destination;private String username;private String password;private IMessageHandler<?> messageHandler;private Builder() {}public Builder hostname(String hostname) {this.hostname = hostname;return this;}public Builder port(Integer port) {this.port = port;return this;}public Builder destination(String destination) {this.destination = destination;return this;}public Builder username(String username) {this.username = username;return this;}public Builder password(String password) {this.password = password;return this;}public Builder filter(String filter) {this.filter = filter;return this;}public Builder batchSize(Integer batchSize) {this.batchSize = batchSize;return this;}public Builder timeout(Long timeout) {this.timeout = timeout;return this;}public Builder unit(TimeUnit unit) {this.unit = unit;return this;}public Builder messageHandler(IMessageHandler<?> messageHandler) {this.messageHandler = messageHandler;return this;}public TcpCanalClient build() {CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(hostname, port), destination, username, password);TcpCanalClient tcpCanalClient = new TcpCanalClient();tcpCanalClient.connector = canalConnector;tcpCanalClient.messageHandler = messageHandler;tcpCanalClient.filter = this.filter;tcpCanalClient.unit = this.unit;tcpCanalClient.batchSize = this.batchSize;tcpCanalClient.timeout = this.timeout;return tcpCanalClient;}}}

io.xzxj.canal.core.factory.EntryColumnConvertFactory

package io.xzxj.canal.core.factory;import com.alibaba.otter.canal.protocol.CanalEntry;
import io.xzxj.canal.core.listener.EntryListener;
import io.xzxj.canal.core.util.TableFieldUtil;
import io.xzxj.canal.core.util.TableInfoUtil;
import org.apache.commons.lang3.StringUtils;import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;/*** @author xzxj* @date 2023/3/12 12:10*/
public class EntryColumnConvertFactory extends AbstractConvertFactory<List<CanalEntry.Column>> {@Override<R> R newInstance(Class<R> clazz, List<CanalEntry.Column> columnList) throws InstantiationException, IllegalAccessException, NoSuchFieldException {R object = clazz.newInstance();if(object instanceof HashMap){for (CanalEntry.Column column : columnList) {((Map)object).put(column.getName().toUpperCase(),column.getValue());}return object;}Map<String, String> fieldMap = TableFieldUtil.getFieldMap(object.getClass());for (CanalEntry.Column column : columnList) {String fieldName = fieldMap.get(column.getName());if (StringUtils.isNotEmpty(fieldName)) {TableFieldUtil.setFieldValue(object, fieldName, column.getValue());}}return object;}@Overridepublic <R> R newInstance(EntryListener<?> entryHandler, List<CanalEntry.Column> columnList, Set<String> updateColumn) throws InstantiationException, IllegalAccessException, NoSuchFieldException {Class<R> tableClass = TableInfoUtil.getTableClass(entryHandler);if (tableClass == null) {return null;}R r = tableClass.newInstance();if(r instanceof HashMap){for (CanalEntry.Column column : columnList) {if (!updateColumn.contains(column.getName())) {continue;}((Map)r).put(column.getName().toUpperCase(),column.getValue());}return r;}Map<String, String> columnNames = TableFieldUtil.getFieldMap(r.getClass());for (CanalEntry.Column column : columnList) {if (!updateColumn.contains(column.getName())) {continue;}String fieldName = columnNames.get(column.getName());if (StringUtils.isNotEmpty(fieldName)) {TableFieldUtil.setFieldValue(r, fieldName, column.getValue());}}return r;}}

io.xzxj.canal.core.util.TableInfoUtil

package io.xzxj.canal.core.util;import com.baomidou.mybatisplus.annotation.TableName;
import com.google.common.base.CaseFormat;
import com.netinfo.cannel.config.BaseCanalListener;
import io.xzxj.canal.core.annotation.CanalListener;
import io.xzxj.canal.core.listener.EntryListener;
import org.apache.commons.lang3.StringUtils;import javax.annotation.Nullable;
import javax.persistence.Table;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** @author xzxj* @date 2023/3/11 16:19*/
public class TableInfoUtil {private static Map<Class<? extends EntryListener>, Class>CLASS_LISTENER_CACHE_MAP = new ConcurrentHashMap<>();@Nullablepublic static String getTableName(EntryListener<?> entryListener) {if(entryListener instanceof BaseCanalListener){BaseCanalListener listener = (BaseCanalListener)entryListener;return listener.getSchemaName()+"."+listener.getTableName();}CanalListener annotation = entryListener.getClass().getAnnotation(CanalListener.class);if (annotation == null) {return null;}StringBuilder fullName = new StringBuilder();if (StringUtils.isNotBlank(annotation.schemaName())) {fullName.append(annotation.schemaName()).append(".");}if (StringUtils.isNotBlank(annotation.tableName())) {fullName.append(annotation.tableName());}else {String tableName = findTableName(entryListener);fullName.append(tableName);}return fullName.toString();}@Nullableprivate static String findTableName(EntryListener<?> entryListener) {Class<Object> tableClass = getTableClass(entryListener);if (tableClass == null) {return null;}TableName tableName = tableClass.getAnnotation(TableName.class);if (tableName != null && StringUtils.isNotBlank(tableName.value())) {return tableName.value();}Table table = tableClass.getAnnotation(Table.class);if (table != null && StringUtils.isNotBlank(table.name())) {return table.name();}return CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, tableClass.getName());}/*** 找到EntryListener泛型中的数据库实体类** @param object* @param <T>* @return*/@Nullablepublic static <T> Class<T> getTableClass(EntryListener<?> object) {Class<? extends EntryListener> listenerClass = object.getClass();Class<T> tableClass = CLASS_LISTENER_CACHE_MAP.get(listenerClass);if (tableClass != null) {return tableClass;}Type[] interfacesTypes = listenerClass.getGenericInterfaces();for (Type type : interfacesTypes) {Class<?> c = (Class<?>) ((ParameterizedType) type).getRawType();if (c.equals(EntryListener.class)) {tableClass = (Class<T>) ((ParameterizedType) type).getActualTypeArguments()[0];CLASS_LISTENER_CACHE_MAP.putIfAbsent(listenerClass, tableClass);return tableClass;}}return null;}}

覆盖完后,就可以根据项目配置,自动进行表的同步了

基于canal根据配置实现数据库数据的同步相关推荐

  1. sqlserver2008基于发布/订阅功能实现主从数据库数据实时同步

    网上关于sqlserver基于发布/订阅实现数据同步的文章很多,大多介绍不详细,各种copy.为实现发布服务器.订阅服务器数据库实时同步,近期花了几天时间认真研究了一下,并实践验证通过,希望本文能帮助 ...

  2. 达梦DMHS异构数据库数据实时同步软件速知

    DMHS简介 DMHS(Heterogeneous database Synchronization for DM)是达梦数据库公司推出的一款异构数据库数据实时同步工具软件.同步源端支持ORACLE系 ...

  3. 基于Canal的MySQL=>ES数据同步方案

    文章目录 1.MySQL和ES的主要区别? 1.1 功能性 1.2 性能指标 1.3 在搜索业务上的区别 1.3.1 查询 1.3.2 检索 2.为什么要做数据同步 2.1 检索性能 2.2 写入性能 ...

  4. 基于Canal+kafka监听数据库变化的最佳实践

    1.前言 工作中,我们很多时候需要根据某些状态的变化更新另一个业务的逻辑,比如订单的生成,成交等,需要更新或者通知其他的业务.我们通常的操作通过业务埋点.接口的调用或者中间件完成. 但是状态变化的入口 ...

  5. EF框架中,在实体中手动更新字段,数据库数据未同步到程序中应该怎么解决呢?

    在一些技术不是很强的选手手中,设计数据库时,难免会未考虑到某些字段,只能到后期实现功能时,才能觉察出来数据库中或是少写字段,或是多加了无用的字段,故我们还不得不去数据库中做些手脚. 本文列举的是在as ...

  6. 数据库数据定期同步实现

    需求背景:有一个业务方需要定期跟数据库进行数据同步,就是需要定期往数据库中同步部分数据,而这些数据并不能被当前系统直接使用,需要做一些处理同步到系统所使用的数据库中,处理比较复杂,没办法进行实时的同步 ...

  7. 基于数据库数据增量同步_基于canal实现分布式数据同步

    应用场景 分布式架构中,数据同步常常是个大问题.例如,mysql中的数据,可能在ElasticSearch有一份索引,在redis有一份缓存,在Nginx有一份缓存,这时候只要你修改了mysql中的数 ...

  8. 基于数据库数据增量同步_基于 Flink SQL CDC 的实时数据同步方案

    简介:Flink 1.11 引入了 Flink SQL CDC,CDC 能给我们数据和业务间能带来什么变化?本文由 Apache Flink PMC,阿里巴巴技术专家伍翀 (云邪)分享,内容将从传统的 ...

  9. canal应用二:mysql数据实时同步到redis

    前言 在项目开发中,通常使用redis作为数据的缓存,那么经常遇到一个问题,修改MySQL的数据要怎么同步到Redis呢? 方式一:在系统的保存.删除接口同时对redis进行操作,但是存在一个缺点,就 ...

最新文章

  1. python+requests实现接口测试 - get与post请求使用
  2. onnxruntime c++ 工程实例
  3. 返回图片_Vue 图片压缩并上传至服务器
  4. 【Qt开发】QT对话框去掉帮助和关闭按钮 拦截QT关闭窗口的CloseEvent
  5. php计划任务 框架,计划任务的使用 ThinkCMF内容管理框架,做最简约的ThinkPHP开源软件...
  6. ASP.NET MVC Preview 2新特性
  7. 如何在Angular.JS中打开JSON / XML文件
  8. Netty工作笔记0032---零拷贝AIO内容梳理
  9. SAP 中 Webservice的发布和调用过程。
  10. winform界面嵌入dwg图纸_完美解决窗体中预览DWG图形(C#版)
  11. 使用QML编写舒尔特表小程序
  12. 绝地求生刺激战场辅助卧底外挂群,菜鸟一秒变高手
  13. 职称计算机考试有哪些题,职称计算机考试题库(-套有答案).doc
  14. 如何恢复磁盘中被删除的数据
  15. 天翼云技术B卷编程题
  16. 看看最新BTA大厂的Java程序员的招聘技术标准,Java篇
  17. macOS SwiftUI 指示器组件规范之 01 液位指示器Level Indicators
  18. 精益管理学会|什么是ECRS改善方法?
  19. MXNet的Faster R-CNN(基于区域提议网络的实时目标检测)《9》
  20. GPL和LGPL协议

热门文章

  1. 博迪投资学第10版课后答案
  2. 照明模型:光通量、辐照度、光源
  3. IOT-OS之RT-Thread(十三)--- 网络分层结构 + netdev/SAL原理
  4. C专家编程 第6章 运动的诗章:运行时数据结构 6.1 a.out及其传说
  5. php 顿号,一字一顿用逗号、顿号、破折号还是省略号?
  6. <table> | HTML表格标签的定义与用法
  7. 计算机网络利弊的作文英语作文,网络的利弊英语作文模板及范文赏析
  8. 【信息系统项目管理师】项目管理十大知识领域记忆敲出(整体范围进度)
  9. 岁月如沙容颜易逝,年关将至的感伤你有吗?
  10. R Z-score结果