一、整体模块构成

canal有两种使用方式:1、独立部署 2、内嵌到应用中。 deployer模块主要用于独立部署canal server。关于这两种方式的区别,请参见server模块源码分析。

deployer模块源码目录结构如下所示,包括启动和停止脚本,服务的相关配置文件等。

deployer模块主要完成以下功能:

1、读取canal,properties配置文件

2、启动canal server,监听canal client的请求

3、启动canal instance,连接mysql数据库,伪装成slave,解析binlog

4、在canal的运行过程中,监听配置文件的变化

二、核心类

1、CanalLauncher类(canal独立版本启动的入口类)

这个类是模块中最关键的入口类,

public static void main(String[] args) {try {logger.info("## set default uncaught exception handler");setGlobalUncaughtExceptionHandler();logger.info("## load canal configurations");//1、读取canal.properties文件中配置,默认读取classpath下的canal.propertiesString conf = System.getProperty("canal.conf", "classpath:canal.properties");Properties properties = new Properties();if (conf.startsWith(CLASSPATH_URL_PREFIX)) {conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf));} else {properties.load(new FileInputStream(conf));}//2、启动canal,首先将properties对象传递给CanalStarter ,然后调用其start方法启动final CanalStarter canalStater = new CanalStarter(properties);String managerAddress = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_MANAGER);if (StringUtils.isNotEmpty(managerAddress)) {String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER);String passwd = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);String adminPort = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PORT, "11110");boolean autoRegister = BooleanUtils.toBoolean(CanalController.getProperty(properties,CanalConstants.CANAL_ADMIN_AUTO_REGISTER));String autoCluster = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_AUTO_CLUSTER);String registerIp = CanalController.getProperty(properties, CanalConstants.CANAL_REGISTER_IP);if (StringUtils.isEmpty(registerIp)) {registerIp = AddressUtils.getHostIp();}final PlainCanalConfigClient configClient = new PlainCanalConfigClient(managerAddress,user,passwd,registerIp,Integer.parseInt(adminPort),autoRegister,autoCluster);PlainCanal canalConfig = configClient.findServer(null);if (canalConfig == null) {throw new IllegalArgumentException("managerAddress:" + managerAddress+ " can't not found config for [" + registerIp + ":" + adminPort+ "]");}Properties managerProperties = canalConfig.getProperties();// merge localmanagerProperties.putAll(properties);int scanIntervalInSecond = Integer.valueOf(CanalController.getProperty(managerProperties,CanalConstants.CANAL_AUTO_SCAN_INTERVAL,"5"));executor.scheduleWithFixedDelay(new Runnable() {private PlainCanal lastCanalConfig;public void run() {try {if (lastCanalConfig == null) {lastCanalConfig = configClient.findServer(null);} else {PlainCanal newCanalConfig = configClient.findServer(lastCanalConfig.getMd5());if (newCanalConfig != null) {// 远程配置canal.properties修改重新加载整个应用canalStater.stop();Properties managerProperties = newCanalConfig.getProperties();// merge localmanagerProperties.putAll(properties);canalStater.setProperties(managerProperties);//启动canal server,但这还不是最终具体实现的类canalStater.start();lastCanalConfig = newCanalConfig;}}} catch (Throwable e) {logger.error("scan failed", e);}}}, 0, scanIntervalInSecond, TimeUnit.SECONDS);canalStater.setProperties(managerProperties);} else {canalStater.setProperties(properties);}canalStater.start();runningLatch.await();executor.shutdownNow();} catch (Throwable e) {logger.error("## Something goes wrong when starting up the canal Server:", e);}
}

可以看到,CanalLauncher实际上只是负责读取canal.properties配置文件,然后构造CanalStarter 对象,并通过其start和stop方法来开启和停止canal。

2、CanalStarter类(Canal server 启动类)

这个类是模块中最关键的启动类

/**
* 启动方法
*
* @throws Throwable
*/
//同步操作,防止重复启动同一个实例
public synchronized void start() throws Throwable {
String serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE);
if (!“tcp”.equalsIgnoreCase(serverMode)) {
ExtensionLoader loader = ExtensionLoader.getExtensionLoader(CanalMQProducer.class);
canalMQProducer = loader
.getExtension(serverMode.toLowerCase(), CONNECTOR_SPI_DIR, CONNECTOR_STANDBY_SPI_DIR);
//初始化Kafka
if (canalMQProducer != null) {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(canalMQProducer.getClass().getClassLoader());
canalMQProducer.init(properties);
Thread.currentThread().setContextClassLoader(cl);
}
}

    if (canalMQProducer != null) {MQProperties mqProperties = canalMQProducer.getMqProperties();// disable nettySystem.setProperty(CanalConstants.CANAL_WITHOUT_NETTY, "true");if (mqProperties.isFlatMessage()) {// 设置为raw避免ByteString->Entry的二次解析System.setProperty("canal.instance.memory.rawEntry", "false");}}logger.info("## start the canal server.");//将属性传给CanalController类,这是canal调度控制器,真正的启动是这个类在做controller = new CanalController(properties);//启动controller.start();logger.info("## the canal server is running now ......");shutdownThread = new Thread(() -> {try {logger.info("## stop the canal server");controller.stop();CanalLauncher.runningLatch.countDown();} catch (Throwable e) {logger.warn("##something goes wrong when stopping canal Server:", e);} finally {logger.info("## canal server is down.");}});//关闭canal,通过添加JVM的钩子(  这个方法的意思就是在jvm中增加一个关闭的钩子,当jvm关闭的时候,会执行系统中已经设置的所有通过方法addShutdownHook添加的钩子,当系统执行完这些钩子后,jvm才会关闭。所以这些钩子可以在jvm关闭的时候进行内存清理、对象销毁、关闭连接等操作。),JVM停止前会回调run方法,其内部调用controller.stop()方法进行停止Runtime.getRuntime().addShutdownHook(shutdownThread);//启动kafkaif (canalMQProducer != null) {canalMQStarter = new CanalMQStarter(canalMQProducer);String destinations = CanalController.getProperty(properties, CanalConstants.CANAL_DESTINATIONS);canalMQStarter.start(destinations);controller.setCanalMQStarter(canalMQStarter);}// start canalAdminString port = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PORT);//CanalAdminWithNetty,基于netty网络服务的server实现if (canalAdmin == null && StringUtils.isNotEmpty(port)) {String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER);String passwd = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);CanalAdminController canalAdmin = new CanalAdminController(this);canalAdmin.setUser(user);canalAdmin.setPasswd(passwd);String ip = CanalController.getProperty(properties, CanalConstants.CANAL_IP);logger.debug("canal admin port:{}, canal admin user:{}, canal admin password: {}, canal ip:{}",port,user,passwd,ip);CanalAdminWithNetty canalAdminWithNetty = CanalAdminWithNetty.instance();canalAdminWithNetty.setCanalAdmin(canalAdmin);canalAdminWithNetty.setPort(Integer.parseInt(port));canalAdminWithNetty.setIp(ip);canalAdminWithNetty.start();this.canalAdmin = canalAdminWithNetty;}running = true;
}

可以看到,CanalStarter是负责初始化Kafka,启动kafka,启动基于Netty的server服务,调用CanalController的启动实例方法。

3、CanalController类(canal调度控制器类)

这个类是模块中canal调度控制器

public CanalController(final Properties properties){
managerClients = MigrateMap.makeComputingMap(this::getManagerClient);

    // 初始化全局参数设置globalInstanceConfig = initGlobalConfig(properties);//这里利用Google Guava框架的MapMaker创建Map实例并赋值给instanceConfigsinstanceConfigs = new MapMaker().makeMap();// 初始化instance configinitInstanceConfig(properties);// init socketChannelString socketChannel = getProperty(properties, CanalConstants.CANAL_SOCKETCHANNEL);if (StringUtils.isNotEmpty(socketChannel)) {System.setProperty(CanalConstants.CANAL_SOCKETCHANNEL, socketChannel);}// 兼容1.1.0版本的ak/sk参数名String accesskey = getProperty(properties, "canal.instance.rds.accesskey");String secretkey = getProperty(properties, "canal.instance.rds.secretkey");if (StringUtils.isNotEmpty(accesskey)) {System.setProperty(CanalConstants.CANAL_ALIYUN_ACCESSKEY, accesskey);}if (StringUtils.isNotEmpty(secretkey)) {System.setProperty(CanalConstants.CANAL_ALIYUN_SECRETKEY, secretkey);}// 准备canal serverip = getProperty(properties, CanalConstants.CANAL_IP);registerIp = getProperty(properties, CanalConstants.CANAL_REGISTER_IP);port = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT, "11111"));adminPort = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_ADMIN_PORT, "11110"));//CanalServerWithEmbedded是嵌入式版本实现的canal serverembededCanalServer = CanalServerWithEmbedded.instance();embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGeneratorint metricsPort = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_METRICS_PULL_PORT, "11112"));embededCanalServer.setMetricsPort(metricsPort);this.adminUser = getProperty(properties, CanalConstants.CANAL_ADMIN_USER);this.adminPasswd = getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);embededCanalServer.setUser(getProperty(properties, CanalConstants.CANAL_USER));embededCanalServer.setPasswd(getProperty(properties, CanalConstants.CANAL_PASSWD));String canalWithoutNetty = getProperty(properties, CanalConstants.CANAL_WITHOUT_NETTY);if (canalWithoutNetty == null || "false".equals(canalWithoutNetty)) {//设置CanalServerWithNettycanalServer = CanalServerWithNetty.instance();canalServer.setIp(ip);canalServer.setPort(port);}// 处理下ip为空,默认使用hostIp暴露到zk中if (StringUtils.isEmpty(ip) && StringUtils.isEmpty(registerIp)) {ip = registerIp = AddressUtils.getHostIp();}if (StringUtils.isEmpty(ip)) {ip = AddressUtils.getHostIp();}if (StringUtils.isEmpty(registerIp)) {registerIp = ip; // 兼容以前配置}//zookeeper的部分配置final String zkServers = getProperty(properties, CanalConstants.CANAL_ZKSERVERS);if (StringUtils.isNotEmpty(zkServers)) {zkclientx = ZkClientx.getZkClient(zkServers);// 初始化系统目录zkclientx.createPersistent(ZookeeperPathUtils.DESTINATION_ROOT_NODE, true);zkclientx.createPersistent(ZookeeperPathUtils.CANAL_CLUSTER_ROOT_NODE, true);}// CanalInstance运行状态监控final ServerRunningData serverData = new ServerRunningData(registerIp + ":" + port);ServerRunningMonitors.setServerData(serverData);ServerRunningMonitors.setRunningMonitors(MigrateMap.makeComputingMap((Function<String, ServerRunningMonitor>) destination -> {ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData);runningMonitor.setDestination(destination);runningMonitor.setListener(new ServerRunningListener() {//触发现在轮到自己做为active,需要载入上一个active的上下文数据public void processActiveEnter() {try {MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));embededCanalServer.start(destination);if (canalMQStarter != null) {canalMQStarter.startDestination(destination);}} finally {MDC.remove(CanalConstants.MDC_DESTINATION);}}//触发一下当前active模式失败public void processActiveExit() {try {MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));if (canalMQStarter != null) {canalMQStarter.stopDestination(destination);}embededCanalServer.stop(destination);} finally {MDC.remove(CanalConstants.MDC_DESTINATION);}}//启动时回调做点事情public void processStart() {try {if (zkclientx != null) {final String path = ZookeeperPathUtils.getDestinationClusterNode(destination,registerIp + ":" + port);initCid(path);zkclientx.subscribeStateChanges(new IZkStateListener() {public void handleStateChanged(KeeperState state) throws Exception {}public void handleNewSession() throws Exception {initCid(path);}@Overridepublic void handleSessionEstablishmentError(Throwable error) throws Exception {logger.error("failed to connect to zookeeper", error);}});}} finally {MDC.remove(CanalConstants.MDC_DESTINATION);}}//关闭时回调做点事情public void processStop() {try {MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));if (zkclientx != null) {final String path = ZookeeperPathUtils.getDestinationClusterNode(destination,registerIp + ":" + port);releaseCid(path);}} finally {MDC.remove(CanalConstants.MDC_DESTINATION);}}});if (zkclientx != null) {runningMonitor.setZkClient(zkclientx);}// 触发创建一下cid节点runningMonitor.init();return runningMonitor;}));// 初始化monitor机制autoScan = BooleanUtils.toBoolean(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN));if (autoScan) {defaultAction = new InstanceAction() {public void start(String destination) {InstanceConfig config = instanceConfigs.get(destination);if (config == null) {// 重新读取一下instance configconfig = parseInstanceConfig(properties, destination);instanceConfigs.put(destination, config);}if (!embededCanalServer.isStart(destination)) {// HA机制启动ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);if (!config.getLazy() && !runningMonitor.isStart()) {runningMonitor.start();}}logger.info("auto notify start {} successful.", destination);}public void stop(String destination) {// 此处的stop,代表强制退出,非HA机制,所以需要退出HA的monitor和配置信息InstanceConfig config = instanceConfigs.remove(destination);if (config != null) {embededCanalServer.stop(destination);ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);if (runningMonitor.isStart()) {runningMonitor.stop();}}logger.info("auto notify stop {} successful.", destination);}public void reload(String destination) {// 目前任何配置变化,直接重启,简单处理stop(destination);start(destination);logger.info("auto notify reload {} successful.", destination);}@Overridepublic void release(String destination) {// 此处的release,代表强制释放,主要针对HA机制释放运行,让给其他机器抢占InstanceConfig config = instanceConfigs.get(destination);if (config != null) {//ServerRunningMonitor 是针对server的running节点控制ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);if (runningMonitor.isStart()) {boolean release = runningMonitor.release();if (!release) {// 如果是单机模式,则直接清除配置instanceConfigs.remove(destination);// 停掉服务runningMonitor.stop();if (instanceConfigMonitors.containsKey(InstanceConfig.InstanceMode.MANAGER)) {ManagerInstanceConfigMonitor monitor = (ManagerInstanceConfigMonitor) instanceConfigMonitors.get(InstanceConfig.InstanceMode.MANAGER);Map<String, InstanceAction> instanceActions = monitor.getActions();if (instanceActions.containsKey(destination)) {// 清除内存中的autoScan cachemonitor.release(destination);}}}}}logger.info("auto notify release {} successful.", destination);}};//实例化配置instanceConfigMonitors = MigrateMap.makeComputingMap(mode -> {int scanInterval = Integer.valueOf(getProperty(properties,CanalConstants.CANAL_AUTO_SCAN_INTERVAL,"5"));//spring 模式if (mode.isSpring()) {SpringInstanceConfigMonitor monitor = new SpringInstanceConfigMonitor();monitor.setScanIntervalInSecond(scanInterval);monitor.setDefaultAction(defaultAction);// 设置conf目录,默认是user.dir + conf目录组成String rootDir = getProperty(properties, CanalConstants.CANAL_CONF_DIR);if (StringUtils.isEmpty(rootDir)) {rootDir = "../conf";}if (StringUtils.equals("otter-canal", System.getProperty("appName"))) {monitor.setRootConf(rootDir);} else {// eclipse debug模式monitor.setRootConf("src/main/resources/");}return monitor;} else if (mode.isManager()) {//管理模式ManagerInstanceConfigMonitor monitor = new ManagerInstanceConfigMonitor();monitor.setScanIntervalInSecond(scanInterval);monitor.setDefaultAction(defaultAction);String managerAddress = getProperty(properties, CanalConstants.CANAL_ADMIN_MANAGER);monitor.setConfigClient(getManagerClient(managerAddress));return monitor;} else {throw new UnsupportedOperationException("unknow mode :" + mode + " for monitor");}});}
}

3.1.1配置解析相关代码

启动的相关配置类InstanceConfig

public class InstanceConfig {

private InstanceConfig globalConfig;
private InstanceMode   mode;
private Boolean        lazy;
private String         managerAddress;
private String         springXml;

这个类对应canal.properties文件类的

具体实例化方法:

private InstanceConfig initGlobalConfig(Properties properties) {
String adminManagerAddress = getProperty(properties, CanalConstants.CANAL_ADMIN_MANAGER);
InstanceConfig globalConfig = new InstanceConfig();
//读取canal.instance.global.mode
String modeStr = getProperty(properties, CanalConstants.getInstanceModeKey(CanalConstants.GLOBAL_NAME));
if (StringUtils.isNotEmpty(adminManagerAddress)) {
// 如果指定了manager地址,则强制适用manager
globalConfig.setMode(InstanceMode.MANAGER);
} else if (StringUtils.isNotEmpty(modeStr)) {
globalConfig.setMode(InstanceMode.valueOf(StringUtils.upperCase(modeStr)));
}
//读取canal.instance.global.lazy
String lazyStr = getProperty(properties, CanalConstants.getInstancLazyKey(CanalConstants.GLOBAL_NAME));
if (StringUtils.isNotEmpty(lazyStr)) {
globalConfig.setLazy(Boolean.valueOf(lazyStr));
}
//读取canal.instance.global.manager.address
String managerAddress = getProperty(properties,
CanalConstants.getInstanceManagerAddressKey(CanalConstants.GLOBAL_NAME));
if (StringUtils.isNotEmpty(managerAddress)) {
if (StringUtils.equals(managerAddress, “${canal.admin.manager}”)) {
managerAddress = adminManagerAddress;
}

        globalConfig.setManagerAddress(managerAddress);}//读取canal.instance.global.spring.xmlString springXml = getProperty(properties, CanalConstants.getInstancSpringXmlKey(CanalConstants.GLOBAL_NAME));if (StringUtils.isNotEmpty(springXml)) {globalConfig.setSpringXml(springXml);}//初始化instanceGeneratorinstanceGenerator = destination -> {InstanceConfig config = instanceConfigs.get(destination);if (config == null) {throw new CanalServerException("can't find destination:" + destination);}//基于manager生成对应的canal实例if (config.getMode().isManager()) {PlainCanalInstanceGenerator instanceGenerator = new PlainCanalInstanceGenerator(properties);instanceGenerator.setCanalConfigClient(managerClients.get(config.getManagerAddress()));instanceGenerator.setSpringXml(config.getSpringXml());return instanceGenerator.generate(destination);} else if (config.getMode().isSpring()) {//基于Spring生成对应的canal实例SpringCanalInstanceGenerator instanceGenerator = new SpringCanalInstanceGenerator();instanceGenerator.setSpringXml(config.getSpringXml());return instanceGenerator.generate(destination);} else {throw new UnsupportedOperationException("unknow mode :" + config.getMode());}};return globalConfig;
}

其中canal.instance.global.mode用于确定canal instance的全局配置加载方式,其取值范围有2个:spring、manager。我们知道一个canal server中可以启动多个canal instance,每个instance都有各自的配置。instance的配置也可以放在本地,也可以放在远程配置中心里。我们可以自定义每个canal instance配置文件存储的位置,如果所有canal instance的配置都在本地或者远程,此时我们就可以通过canal.instance.global.mode这个配置项,来统一的指定配置文件的位置,避免为每个canal instance单独指定。

其中:

spring方式:

表示所有的canal instance的配置文件位于本地。此时,我们必须提供配置项canal.instance.global.spring.xml指定spring配置文件的路径。canal提供了多个spring配置文件:file-instance.xml、default-instance.xml、memory-instance.xml、local-instance.xml、group-instance.xml。这么多配置文件主要是为了支持canal instance不同的工作方式。

manager方式:

表示所有的canal instance的配置文件位于远程配置中心,此时我们必须提供配置项 canal.instance.global.manager.address来指定远程配置中心的地址。目前alibaba内部配置使用这种方式。开发者可以自己实现CanalConfigClient,连接各自的管理系统,完成接入。

3.1.2instanceConfigs字段

类型为Map<String, InstanceConfig>。前面提到初始化instanceGenerator后,当其generate方法被调用时,会尝试从instanceConfigs根据一个destination获取对应的InstanceConfig,现在分析instanceConfigs的相关初始化代码。

我们知道globalInstanceConfig定义全局的配置加载方式。如果需要把部分CanalInstance配置放于本地,另外一部分CanalIntance配置放于远程配置中心,则只通过全局方式配置,无法达到这个要求。虽然这种情况很少见,但是为了提供最大的灵活性,canal支持每个CanalIntance自己来定义自己的加载方式,来覆盖默认的全局配置加载方式。而每个destination对应的InstanceConfig配置就存放于instanceConfigs字段中。

举例来说:

//当前server上部署的instance列表
canal.destinations=instance1,instance2

//instance配置全局加载方式
canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.spring.xml = classpath:spring/file-instance.xml

//instance1覆盖全局加载方式
canal.instance.instance1.mode = manager
canal.instance.instance1.manager.address = 127.0.0.1:1099
canal.instance.instance1.lazy = tue
这段配置中,设置了instance的全局加载方式为spring,instance1覆盖了全局配置,使用manager方式加载配置。而instance2没有覆盖配置,因此默认使用spring加载方式。

具体的实例化配置方法

private void initInstanceConfig(Properties properties) {
//读取配置项canal.destinations
String destinationStr = getProperty(properties, CanalConstants.CANAL_DESTINATIONS);
//以","分割canal.destinations,得到一个数组形式的destination
String[] destinations = StringUtils.split(destinationStr, CanalConstants.CANAL_DESTINATION_SPLIT);
//为每一个destination生成一个InstanceConfig实例
for (String destination : destinations) {
//将destination对应的InstanceConfig放入instanceConfigs中
InstanceConfig config = parseInstanceConfig(properties, destination);
InstanceConfig oldConfig = instanceConfigs.put(destination, config);

        if (oldConfig != null) {logger.warn("destination:{} old config:{} has replace by new config:{}", destination, oldConfig, config);}}
}

上面代码片段中,首先解析canal.destinations配置项,可以理解一个destination就对应要初始化一个canal instance。针对每个destination会创建各自的InstanceConfig,最终都会放到instanceConfigs这个Map中。

3.1.3 准备canal server

// 准备canal server
ip = getProperty(properties, CanalConstants.CANAL_IP);
registerIp = getProperty(properties, CanalConstants.CANAL_REGISTER_IP);
port = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT, “11111”));
adminPort = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_ADMIN_PORT, “11110”));
embededCanalServer = CanalServerWithEmbedded.instance();
embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator
int metricsPort = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_METRICS_PULL_PORT, “11112”));
embededCanalServer.setMetricsPort(metricsPort);

    this.adminUser = getProperty(properties, CanalConstants.CANAL_ADMIN_USER);this.adminPasswd = getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);embededCanalServer.setUser(getProperty(properties, CanalConstants.CANAL_USER));embededCanalServer.setPasswd(getProperty(properties, CanalConstants.CANAL_PASSWD));String canalWithoutNetty = getProperty(properties, CanalConstants.CANAL_WITHOUT_NETTY);if (canalWithoutNetty == null || "false".equals(canalWithoutNetty)) {canalServer = CanalServerWithNetty.instance();canalServer.setIp(ip);canalServer.setPort(port);}

ip:String,对应canal.properties文件中的canal.ip,canal server监听的ip。

port:int,对应canal.properties文件中的canal.port,canal server监听的端口

embededCanalServer:类型为CanalServerWithEmbedded

canalServer:类型为CanalServerWithNetty

CanalServerWithEmbedded 和 CanalServerWithNetty都实现了CanalServer接口,且都实现了单例模式,通过静态方法instance获取实例。

关于这两种类型的实现,canal官方文档有以下描述:

说白了,就是我们可以不必独立部署canal server。在应用直接使用CanalServerWithEmbedded直连mysql数据库。如果觉得自己的技术hold不住相关代码,就独立部署一个canal server,使用canal提供的客户端,连接canal server获取binlog解析后数据。而CanalServerWithNetty是在CanalServerWithEmbedded的基础上做的一层封装,用于与客户端通信。

在独立部署canal server时,Canal客户端发送的所有请求都交给CanalServerWithNetty处理解析,解析完成之后委派给了交给CanalServerWithEmbedded进行处理。因此CanalServerWithNetty就是一个马甲而已。CanalServerWithEmbedded才是核心。

因此,在上述代码中,我们看到,用于生成CanalInstance实例的instanceGenerator被设置到了CanalServerWithEmbedded中,而ip和port被设置到CanalServerWithNetty中。

3.1.4启动方法

public void start() throws Throwable {
logger.info("## start the canal server[{}({}):{}]", ip, registerIp, port);
// 创建整个canal的工作节点
/构建临时节点的路径:/otter/canal/destinations/{0}/running,其中占位符{0}会被destination替换。
在集群模式下,可能会有多个canal server共同处理同一个destination,
在某一时刻,只能由一个canal server进行处理,处理这个destination的canal server进入running状态,其他canal server进入standby状态。
/
final String path = ZookeeperPathUtils.getCanalClusterNode(registerIp + “:” + port);
initCid(path);
if (zkclientx != null) {
/对destination对应的running节点进行监听,一旦发生了变化,则说明可能其他处理相同destination的canal server可能出现了异常,
此时需要尝试自己进入running状态。
/
this.zkclientx.subscribeStateChanges(new IZkStateListener() {

            public void handleStateChanged(KeeperState state) throws Exception {}public void handleNewSession() throws Exception {initCid(path);}@Overridepublic void handleSessionEstablishmentError(Throwable error) throws Exception {logger.error("failed to connect to zookeeper", error);}});}// 优先启动embeded服务embededCanalServer.start();// 尝试启动一下非lazy状态的通道for (Map.Entry<String, InstanceConfig> entry : instanceConfigs.entrySet()) {final String destination = entry.getKey();InstanceConfig config = entry.getValue();// 创建destination的工作节点if (!embededCanalServer.isStart(destination)) {// HA机制启动ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);if (!config.getLazy() && !runningMonitor.isStart()) {runningMonitor.start();}}if (autoScan) {instanceConfigMonitors.get(config.getMode()).register(destination, defaultAction);}}if (autoScan) {instanceConfigMonitors.get(globalInstanceConfig.getMode()).start();for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) {if (!monitor.isStart()) {monitor.start();}}}// 启动网络接口if (canalServer != null) {canalServer.start();}
}

HA机制启动机制

runningMonitor.start();

public synchronized void start() {
super.start();
try {
processStart();
if (zkClient != null) {
// 如果需要尽可能释放instance资源,不需要监听running节点,不然即使stop了这台机器,另一台机器立马会start
String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
zkClient.subscribeDataChanges(path, dataListener);

            initRunning();} else {processActiveEnter();// 没有zk,直接启动}} catch (Exception e) {logger.error("start failed", e);// 没有正常启动,重置一下状态,避免干扰下一次startstop();}}

private void initRunning() {
if (!isStart()) {
return;
}
//构建临时节点的路径:/otter/canal/destinations/{0}/running,其中占位符{0}会被destination替换
String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
// 序列化
//构建临时节点的数据,标记当前destination由哪一个canal server处理
byte[] bytes = JsonUtils.marshalToByte(serverData);
try {
mutex.set(false);
//尝试创建临时节点。如果节点已经存在,说明是其他的canal server已经启动了这个canal instance。
//此时会抛出ZkNodeExistsException,进入catch代码块。
zkClient.create(path, bytes, CreateMode.EPHEMERAL);
activeData = serverData;
//如果创建成功,触发一下事件,内部调用ServerRunningListener的processActiveEnter方法
processActiveEnter();// 触发一下事件
mutex.set(true);
release = false;
} catch (ZkNodeExistsException e) {
//创建节点失败,则根据path从zk中获取当前是哪一个canal server创建了当前canal instance的相关信息。
//第二个参数true,表示的是,如果这个path不存在,则返回null。
bytes = zkClient.readData(path, true);
if (bytes == null) {// 如果不存在节点,立即尝试一次
initRunning();
} else {
//如果的确存在,则将创建该canal instance实例信息存入activeData中。
activeData = JsonUtils.unmarshalFromByte(bytes, ServerRunningData.class);
}
} catch (ZkNoNodeException e) {
zkClient.createPersistent(ZookeeperPathUtils.getDestinationPath(destination), true); // 尝试创建父节点
initRunning();
}
}

可以看到,initRunning方法内部只有在尝试在zk中创建节点成功后,才会去调用listener的processActiveEnter方法来真正启动destination对应的canal instance,这是canal HA方式启动的核心。canal官方文档中介绍了CanalServer HA机制启动的流程,如下:

事实上,这个说明的前两步,都是在initRunning方法中实现的。从上面的代码中,我们可以看出,在HA机启动的情况下,initRunning方法不一定能走到processActiveEnter()方法,因为创建临时节点可能会出错。

此外,根据官方文档说明,如果出错,那么当前canal instance则进入standBy状态。也就是另外一个canal instance出现异常时,当前canal instance顶上去。

3.1.5autoScan机制相关代码

关于autoscan,官方文档有以下介绍:

autoScan = BooleanUtils.toBoolean(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN));
if (autoScan) {
defaultAction = new InstanceAction() {//…};

        instanceConfigMonitors = //....}

可以看到,autoScan是否需要自动扫描的开关,只有当autoScan为true时,才会初始化defaultAction字段和instanceConfigMonitors字段。其中:

其中:

defaultAction:其作用是如果配置发生了变更,默认应该采取什么样的操作。其实现了InstanceAction接口定义的三个抽象方法:start、stop和reload。当新增一个destination配置时,需要调用start方法来启动;当移除一个destination配置时,需要调用stop方法来停止;当某个destination配置发生变更时,需要调用reload方法来进行重启。instanceConfigMonitors:类型为Map<InstanceMode, InstanceConfigMonitor>。defaultAction字段只是定义了配置发生变化默认应该采取的操作,那么总该有一个类来监听配置是否发生了变化,这就是InstanceConfigMonitor的作用。官方文档中,只提到了对canal.conf.dir配置项指定的目录的监听,这指的是通过spring方式加载配置。显然的,通过manager方式加载配置,配置中心的内容也是可能发生变化的,也需要进行监听。此时可以理解为什么instanceConfigMonitors的类型是一个Map,key为InstanceMode,就是为了对这两种方式的配置加载方式都进行监听。

defaultAction字段初始化源码如下所示:

defaultAction = new InstanceAction() {

public void start(String destination) {InstanceConfig config = instanceConfigs.get(destination);if (config == null) {// 重新读取一下instance configconfig = parseInstanceConfig(properties, destination);instanceConfigs.put(destination, config);}if (!embededCanalServer.isStart(destination)) {// HA机制启动ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);if (!config.getLazy() && !runningMonitor.isStart()) {runningMonitor.start();}}
}public void stop(String destination) {// 此处的stop,代表强制退出,非HA机制,所以需要退出HA的monitor和配置信息InstanceConfig config = instanceConfigs.remove(destination);if (config != null) {embededCanalServer.stop(destination);ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);if (runningMonitor.isStart()) {runningMonitor.stop();}}
}public void reload(String destination) {// 目前任何配置变化,直接重启,简单处理stop(destination);start(destination);
}

};
instanceConfigMonitors字段初始化源码如下所示:

instanceConfigMonitors = MigrateMap.makeComputingMap(new Function<InstanceMode, InstanceConfigMonitor>() {
public InstanceConfigMonitor apply(InstanceMode mode) {
int scanInterval = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN_INTERVAL));
if (mode.isSpring()) {//如果加载方式是spring,返回SpringInstanceConfigMonitor
SpringInstanceConfigMonitor monitor = new SpringInstanceConfigMonitor();
monitor.setScanIntervalInSecond(scanInterval);
monitor.setDefaultAction(defaultAction);
// 设置conf目录,默认是user.dir + conf目录组成
String rootDir = getProperty(properties, CanalConstants.CANAL_CONF_DIR);
if (StringUtils.isEmpty(rootDir)) {
rootDir = “…/conf”;
}
if (StringUtils.equals(“otter-canal”, System.getProperty(“appName”))) {
monitor.setRootConf(rootDir);
} else {
// eclipse debug模式
monitor.setRootConf(“src/main/resources/”);
}
return monitor;
} else if (mode.isManager()) {//如果加载方式是manager,返回ManagerInstanceConfigMonitor
return new ManagerInstanceConfigMonitor();
} else {
throw new UnsupportedOperationException(“unknow mode :” + mode + " for monitor");
}
}
});

3.1.6 启动实例方法

public void start() throws Throwable {
logger.info("## start the canal server[{}:{}]", ip, port);
// 创建整个canal的工作节点 :/otter/canal/cluster/{0}
final String path = ZookeeperPathUtils.getCanalClusterNode(ip + “:” + port);
initCid(path);
if (zkclientx != null) {
this.zkclientx.subscribeStateChanges(new IZkStateListener() {
public void handleStateChanged(KeeperState state) throws Exception {
}
public void handleNewSession() throws Exception {
initCid(path);
}
});
}
// 优先启动embeded服务
embededCanalServer.start();
//启动不是lazy模式的CanalInstance,通过迭代instanceConfigs,根据destination获取对应的ServerRunningMonitor,然后逐一启动
for (Map.Entry<String, InstanceConfig> entry : instanceConfigs.entrySet()) {
final String destination = entry.getKey();
InstanceConfig config = entry.getValue();
// 如果destination对应的CanalInstance没有启动,则进行启动
if (!embededCanalServer.isStart(destination)) {
ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
//如果不是lazy,lazy模式需要等到第一次有客户端请求才会启动
if (!config.getLazy() && !runningMonitor.isStart()) {
runningMonitor.start();
}
}
if (autoScan) {
instanceConfigMonitors.get(config.getMode()).register(destination, defaultAction);
}
}
if (autoScan) {//启动配置文件自动检测机制
instanceConfigMonitors.get(globalInstanceConfig.getMode()).start();
for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) {
if (!monitor.isStart()) {
monitor.start();//启动monitor
}
}
}
// 启动网络接口,监听客户端请求
canalServer.start();
}

4、总结

deployer模块的主要作用:

1、读取canal.properties,确定canal instance的配置加载方式

2、确定canal instance的启动方式:独立启动或者集群方式启动

3、监听canal instance的配置的变化,动态停止、启动或新增

4、启动canal server,监听客户端请求

2、Canal的deployer模块相关推荐

  1. Canal源码分析deployer模块

    canal有两种使用方式:1.独立部署 2.内嵌到应用中. deployer模块主要用于独立部署canal server.关于这两种方式的区别,请参见server模块源码分析.deployer模块源码 ...

  2. 「从零单排canal 04」 启动模块deployer源码解析

    本文将对canal的启动模块deployer进行分析. Deployer模块(绿色部分)在整个系统中的角色如下图所示,用来启动canal-server. 模块内的类如下: 为了能带着目的看源码,以几个 ...

  3. 如何基于Canal 和 Kafka,实现 MySQL 的 Binlog 近实时同步

    转载自 如何基于Canal 和 Kafka,实现 MySQL 的 Binlog 近实时同步 近段时间,业务系统架构基本完备,数据层面的建设比较薄弱,因为笔者目前工作重心在于搭建一个小型的数据平台.优先 ...

  4. centos7时间同步_基于 Canal 和 Kafka 实现 MySQL 的 Binlog 近实时同步

    点击蓝色"架构文摘"关注我哟 加个"星标",每天上午 09:25,干货推送! 作者:Throwable    掘金:https://juejin.im/post ...

  5. 超详细的canal使用总结

    超详细的canal使用总结 canal的介绍 ​ canal,译意为水道/管道/沟渠,从官网的介绍中可以知道,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费. ​ 这是一张官网 ...

  6. docker 安装mysql、canal、redis实现redis和mysql缓存一致性

    一.canal介绍 Canal 是用 Java 开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件. 目前,Canal 主要支持了 MySQL 的 Binlog 解析,解析完成后才利 ...

  7. 【源码】otter工程结构

    最近在搞数据同步相关的内容,需要对otter的代码进行扩展,所以需要先熟悉一下otter的源码.首先我们整体来看下otter的工程结构.otter的工程结构比较复杂,需要花费一定的时间来理解各个部分的 ...

  8. 代码技巧——数据迁移方案【建议收藏】

    开发工作中,可能会遇到如"大表拆分"."跨库数据迁移"等场景,本文介绍互联网常见架构下的数据迁移方案及实现: 1. 数据迁移的业务场景 以下是需要数据迁移的场景 ...

  9. canel-1.1.5 canal.deployer安装

    简介 canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费 工作原理 canal 模拟 MySQL slave 的交互协议,伪装 ...

最新文章

  1. 服务器Jmail配置问题
  2. oracle 增加ora容量_案例:Oracle报错ORA-01144 详解数据文件大小32GB的限制的原因
  3. 【Boost】boost库中thread多线程详解2——mutex与lock
  4. 剪映专业版PC端清理缓存与日志
  5. Mybatis判断int类型是否为空
  6. 用python内置函数算复杂度吗_Python减少代码量的两个内置函数
  7. C++递归以及内存值的传递
  8. 艾肯4nano声卡调试教程,效果演示
  9. 备战秋招之数电知识查漏补缺
  10. C语言华氏度转换摄氏度
  11. 算法实践:波兰表达式
  12. 信息安全等级测评内容
  13. 国际黄金产品中的期货黄金主力合约是什么
  14. 2022年计算机二级MS Office高级应用复习题及答案
  15. 滚动距离置零,恢复到初始位置
  16. 腾讯云 云点播 JAVASDK上传
  17. 【ESP32_8266_WiFi (一)】网络通信基础
  18. 通过数据:提升用户转化与留存全攻略
  19. 一个人赶着鸭子去每个村庄卖,每经过一个 村子卖去所赶鸭子的一半又一只。 这样他经过了 七个村子后还剩 两只鸭子,问问他出发时共赶多少只鸭子?经过每个村子卖出多少只鸭子?(递归实现)
  20. 3D游戏恶魔与牧师(动作分离)

热门文章

  1. unity的C#学习——标识符号、关键字与数据类型及其转换方式
  2. 孟婆汤传说!!!感人至极!
  3. 集装箱编号校验码规则及java程序的实现
  4. 安卓逆向 -- Xposed模块编写
  5. MEM/MBA英语基础(07)复合句-定语从句
  6. 信阳毛尖大山茶、高山茶、小山茶的辨别
  7. 2023年2月京东手机品牌销量数据查询(京东电商数据平台)
  8. python找列表中相邻数的个数_利用python求相邻数的方法示例
  9. Qt多人协作项目执行方案
  10. 当你压力大到快崩溃时,不要跟任何人说,也不要觉得委屈