本文将对canal的启动模块deployer进行分析。

Deployer模块(绿色部分)在整个系统中的角色如下图所示,用来启动canal-server.

模块内的类如下:

为了能带着目的看源码,以几个问题开头,带着问题来一起探索deployer模块的源码。

  • CanalServer启动过程中配置如何加载?
  • CanalServer启动过程中涉及哪些组件?
  • 集群模式的canalServer,是如何实现instance的HA呢?
  • 每个canalServer又是怎么获取admin上的配置变更呢?

1.入口类CanalLauncher

这个类是整个canal-server的入口类。负责配置加载和启动canal-server。

主流程如下:

  • 加载canal.properties的配置内容
  • 根据canal.admin.manager是否为空判断是否是admin控制,如果不是admin控制,就直接根据canal.properties的配置来了
  • 如果是admin控制,使用PlainCanalConfigClient获取远程配置 新开一个线程池每隔五秒用http请求去admin上拉配置进行merge(这里依赖了instance模块的相关配置拉取的工具方法) 用md5进行校验,如果canal-server配置有更新,那么就重启canal-server
  • 核心是用canalStarter.start()启动
  • 使用CountDownLatch保持主线程存活
  • 收到关闭信号,CDL-1,然后关闭配置更新线程池,优雅退出
  1 public static void main(String[] args) {2 3     try {4 5         //note:设置全局未捕获异常的处理6 7         setGlobalUncaughtExceptionHandler();8 9         /**10 11          * note:12 13          * 1.读取canal.properties的配置14 15          * 可以手动指定配置路径名称16 17          */18 19         String conf = System.getProperty("canal.conf", "classpath:canal.properties");20 21         Properties properties = new Properties();22 23         if (conf.startsWith(CLASSPATH_URL_PREFIX)) {24 25             conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);26 27             properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf));28 29         } else {30 31             properties.load(new FileInputStream(conf));32 33         }34 35         final CanalStarter canalStater = new CanalStarter(properties);36 37         String managerAddress = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_MANAGER);38 39         /**40 41          * note:42 43          * 2.根据canal.admin.manager是否为空判断是否是admin控制,如果不是admin控制,就直接根据canal.properties的配置来了44 45          */46 47         if (StringUtils.isNotEmpty(managerAddress)) {48 49             String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER);50 51             //省略一部分。。。。。。52           53 54             /**55 56              * note:57 58              * 2.1使用PlainCanalConfigClient获取远程配置59 60              */61 62             final PlainCanalConfigClient configClient = new PlainCanalConfigClient(managerAddress,63 64                     user,65 66                     passwd,67 68                     registerIp,69 70                     Integer.parseInt(adminPort),71 72                     autoRegister,73 74                     autoCluster);75 76             PlainCanal canalConfig = configClient.findServer(null);77 78             if (canalConfig == null) {79 80                 throw new IllegalArgumentException("managerAddress:" + managerAddress81 82                         + " can't not found config for [" + registerIp + ":" + adminPort83 84                         + "]");85 86             }87 88             Properties managerProperties = canalConfig.getProperties();89 90             // merge local91 92             managerProperties.putAll(properties);93 94             int scanIntervalInSecond = Integer.valueOf(CanalController.getProperty(managerProperties,95 96                     CanalConstants.CANAL_AUTO_SCAN_INTERVAL,97 98                     "5"));99
100             /**
101
102              * note:
103
104              * 2.2 新开一个线程池每隔五秒用http请求去admin上拉配置进行merge(这里依赖了instance模块的相关配置拉取的工具方法)
105
106              */
107
108             executor.scheduleWithFixedDelay(new Runnable() {
109
110                 private PlainCanal lastCanalConfig;
111
112                 public void run() {
113
114                     try {
115
116                         if (lastCanalConfig == null) {
117
118                             lastCanalConfig = configClient.findServer(null);
119
120                         } else {
121
122                             PlainCanal newCanalConfig = configClient.findServer(lastCanalConfig.getMd5());
123
124                             /**
125
126                              * note:
127
128                              * 2.3 用md5进行校验,如果canal-server配置有更新,那么就重启canal-server
129
130                              */
131
132                             if (newCanalConfig != null) {
133
134                                 // 远程配置canal.properties修改重新加载整个应用
135
136                                 canalStater.stop();
137
138                                 Properties managerProperties = newCanalConfig.getProperties();
139
140                                 // merge local
141
142                                 managerProperties.putAll(properties);
143
144                                 canalStater.setProperties(managerProperties);
145
146                                 canalStater.start();
147
148                                 lastCanalConfig = newCanalConfig;
149
150                             }
151
152                         }
153
154                     } catch (Throwable e) {
155
156                         logger.error("scan failed", e);
157
158                     }
159
160                 }
161
162             }, 0, scanIntervalInSecond, TimeUnit.SECONDS);
163
164             canalStater.setProperties(managerProperties);
165
166         } else {
167
168             canalStater.setProperties(properties);
169
170         }
171
172         canalStater.start();
173
174         //note: 这样用CDL处理和while(true)有点类似
175
176         runningLatch.await();
177
178         executor.shutdownNow();
179
180     } catch (Throwable e) {
181
182         logger.error("## Something goes wrong when starting up the canal Server:", e);
183
184     }
185
186 }

2.启动类CanalStarter

从上面的入口类,我们可以看到canal-server真正的启动逻辑在CanalStarter类的start方法。

这里先对三个对象进行辨析:

  • CanalController:是canalServer真正的启动控制器
  • canalMQStarter:用来启动mqProducer。如果serverMode选择了mq,那么会用canalMQStarter来管理mqProducer,将canalServer抓取到的实时变更用mqProducer直接投递到mq
  • CanalAdminWithNetty:这个不是admin控制台,而是对本server启动一个netty服务,让admin控制台通过请求获取当前server的信息,比如运行状态、正在本server上运行的instance信息等

start方法主要逻辑如下:

  • 根据配置的serverMode,决定使用CanalMQProducer或者canalServerWithNetty
  • 启动CanalController
  • 注册shutdownHook
  • 如果CanalMQProducer不为空,启动canalMQStarter(内部使用CanalMQProducer将消息投递给mq)
  • 启动CanalAdminWithNetty做服务器
  1 public synchronized void start() throws Throwable {2 3     String serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE);4 5     /**6 7      * note8 9      * 1.如果canal.serverMode不是tcp,加载CanalMQProducer,并且启动CanalMQProducer10 11      * 回头可以深入研究下ExtensionLoader类的相关实现12 13      */14 15     if (!"tcp".equalsIgnoreCase(serverMode)) {16 17         ExtensionLoader<CanalMQProducer> loader = ExtensionLoader.getExtensionLoader(CanalMQProducer.class);18 19         canalMQProducer = loader20 21                 .getExtension(serverMode.toLowerCase(), CONNECTOR_SPI_DIR, CONNECTOR_STANDBY_SPI_DIR);22 23         if (canalMQProducer != null) {24 25             ClassLoader cl = Thread.currentThread().getContextClassLoader();26 27             Thread.currentThread().setContextClassLoader(canalMQProducer.getClass().getClassLoader());28 29             canalMQProducer.init(properties);30 31             Thread.currentThread().setContextClassLoader(cl);32 33         }34 35     }36 37     //note 如果启动了canalMQProducer,就不使用canalWithNetty(这里的netty是用在哪里的?)38 39     if (canalMQProducer != null) {40 41         MQProperties mqProperties = canalMQProducer.getMqProperties();42 43         // disable netty44 45         System.setProperty(CanalConstants.CANAL_WITHOUT_NETTY, "true");46 47         if (mqProperties.isFlatMessage()) {48 49             // 设置为raw避免ByteString->Entry的二次解析50 51             System.setProperty("canal.instance.memory.rawEntry", "false");52 53         }54 55     }56 57     controller = new CanalController(properties);58 59     //note 2.启动canalController60 61     controller.start();62 63     //note 3.注册了一个shutdownHook,系统退出时执行相关逻辑64 65     shutdownThread = new Thread() {66 67         public void run() {68 69             try {70 71                 controller.stop();72 73                 //note 主线程退出74 75                 CanalLauncher.runningLatch.countDown();76 77             } catch (Throwable e) {78 79 80             } finally {81 82             }83 84         }85 86     };87 88     Runtime.getRuntime().addShutdownHook(shutdownThread);89 90     //note 4.启动canalMQStarter,集群版的话,没有预先配置destinations。91 92     if (canalMQProducer != null) {93 94         canalMQStarter = new CanalMQStarter(canalMQProducer);95 96         String destinations = CanalController.getProperty(properties, CanalConstants.CANAL_DESTINATIONS);97 98         canalMQStarter.start(destinations);99
100         controller.setCanalMQStarter(canalMQStarter);
101
102     }
103
104     // start canalAdmin
105
106     String port = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PORT);
107
108     //note 5.根据填写的canalAdmin的ip和port,启动canalAdmin,用netty做服务器
109
110     if (canalAdmin == null && StringUtils.isNotEmpty(port)) {
111
112         String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER);
113
114         String passwd = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);
115
116         CanalAdminController canalAdmin = new CanalAdminController(this);
117
118         canalAdmin.setUser(user);
119
120         canalAdmin.setPasswd(passwd);
121
122         String ip = CanalController.getProperty(properties, CanalConstants.CANAL_IP);
123
124         CanalAdminWithNetty canalAdminWithNetty = CanalAdminWithNetty.instance();
125
126         canalAdminWithNetty.setCanalAdmin(canalAdmin);
127
128         canalAdminWithNetty.setPort(Integer.parseInt(port));
129
130         canalAdminWithNetty.setIp(ip);
131
132         canalAdminWithNetty.start();
133
134         this.canalAdmin = canalAdminWithNetty;
135
136     }
137
138     running = true;
139
140 }

3.CanalController

前面两个类都是比较清晰的,一个是入口类,一个是启动类,下面来看看核心逻辑所在的CanalController。

这里用了大量的匿名内部类实现接口,看起来有点头大,耐心慢慢剖析一下。

3.1 从构造器开始了解

整体初始化的顺序如下:

  • 构建PlainCanalConfigClient,用于用户远程配置的获取
  • 初始化全局配置,顺便把instance相关的全局配置初始化一下
  • 准备一下canal-server,核心在于embededCanalServer,如果有需要canalServerWithNetty,那就多包装一个(我们serverMode=mq是不需要这个netty的)
  • 初始化zkClient
  • 初始化ServerRunningMonitors,作为instance 运行节点控制
  • 初始化InstanceAction,完成monitor机制。(监控instance配置变化然后调用ServerRunningMonitor进行处理)

这里有几个机制要详细介绍一下。

3.1.1 CanalServer两种模式

canalServer支持两种模式,CanalServerWithEmbedded和CanalServerWithNetty。

在构造器中初始化代码部分如下:

 1 // 3.准备canal server2 3 //note: 核心在于embededCanalServer,如果有需要canalServerWithNetty,那就多包装一个(我们serverMode=mq4 5 // 是不需要这个netty的)6 7 ip = getProperty(properties, CanalConstants.CANAL_IP);8 9 //省略一部分。。。
10
11 embededCanalServer = CanalServerWithEmbedded.instance();
12
13 embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator
14
15 int metricsPort = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_METRICS_PULL_PORT, "11112"));
16
17 //省略一部分。。。
18
19 String canalWithoutNetty = getProperty(properties, CanalConstants.CANAL_WITHOUT_NETTY);
20
21 if (canalWithoutNetty == null || "false".equals(canalWithoutNetty)) {
22
23     canalServer = CanalServerWithNetty.instance();
24
25     canalServer.setIp(ip);
26
27     canalServer.setPort(port);
28
29 }

embededCanalServer:类型为CanalServerWithEmbedded

canalServer:类型为CanalServerWithNetty

二者有什么区别呢?

都实现了CanalServer接口,且都实现了单例模式,通过静态方法instance获取实例。

关于这两种类型的实现,canal官方文档有以下描述:

说白了,就是我们可以不必独立部署canal server。在应用直接使用CanalServerWithEmbedded直连mysql数据库进行订阅。

如果觉得自己的技术hold不住相关代码,就独立部署一个canal server,使用canal提供的客户端,连接canal server获取binlog解析后数据。而CanalServerWithNetty是在CanalServerWithEmbedded的基础上做的一层封装,用于与客户端通信。

在独立部署canal server时,Canal客户端发送的所有请求都交给CanalServerWithNetty处理解析,解析完成之后委派给了交给CanalServerWithEmbedded进行处理。因此CanalServerWithNetty就是一个马甲而已。CanalServerWithEmbedded才是核心。

因此,在构造器中,我们看到,

用于生成CanalInstance实例的instanceGenerator被设置到了CanalServerWithEmbedded中,

而ip和port被设置到CanalServerWithNetty中。

关于CanalServerWithNetty如何将客户端的请求委派给CanalServerWithEmbedded进行处理,我们将在server模块源码分析中进行讲解。

3.1.2 ServerRunningMonitor

在CanalController的构造器中,canal会为每一个destination创建一个Instance,每个Instance都会由一个ServerRunningMonitor来进行控制。而ServerRunningMonitor统一由ServerRunningMonitors进行管理。

ServerRunningMonitor是做什么的呢?

我们看下它的属性就了解了。它主要用来记录每个instance的运行状态数据的。

 1 /**2 3 * 针对server的running节点控制4 5 */6 7 public class ServerRunningMonitor extends AbstractCanalLifeCycle {8 9     private static final Logger        logger       = LoggerFactory.getLogger(ServerRunningMonitor.class);
10
11     private ZkClientx                  zkClient;
12
13     private String                     destination;
14
15     private IZkDataListener            dataListener;
16
17     private BooleanMutex               mutex        = new BooleanMutex(false);
18
19     private volatile boolean           release      = false;
20
21     // 当前服务节点状态信息
22
23     private ServerRunningData          serverData;
24
25     // 当前实际运行的节点状态信息
26
27     private volatile ServerRunningData activeData;
28
29     private ScheduledExecutorService   delayExector = Executors.newScheduledThreadPool(1);
30
31     private int                        delayTime    = 5;
32
33     private ServerRunningListener      listener;
34
35     public ServerRunningMonitor(ServerRunningData serverData){
36
37         this();
38
39         this.serverData = serverData;
40
41     }
42         //。。。。。
43
44 }

在创建ServerRunningMonitor对象时,首先根据ServerRunningData创建ServerRunningMonitor实例,之后设置了destination和ServerRunningListener。

ServerRunningListener是个接口,这里采用了匿名内部类的形式构建,实现了各个接口的方法。

主要为instance在当前server上的状态发生变化时调用。比如要在当前server上启动这个instance了,就调用相关启动方法,如果在这个server上关闭instance,就调用相关关闭方法。

具体的调用逻辑我们后面在启动过程中分析,这里大概知道下构造器中做了些什么就行了,主要就是一些启动、关闭的逻辑。

  1 new Function<String, ServerRunningMonitor>() {2 3     public ServerRunningMonitor apply(final String destination) {4 5         ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData);6 7         runningMonitor.setDestination(destination);8 9         runningMonitor.setListener(new ServerRunningListener() {10 11             /**12 13              * note14 15              * 1.内部调用了embededCanalServer的start(destination)方法。16 17              * 这里很关键,说明每个destination对应的CanalInstance是通过embededCanalServer的start方法启动的,18 19              * 这样我们就能理解,为什么之前构造器中会把instanceGenerator设置到embededCanalServer中了。20 21              * embededCanalServer负责调用instanceGenerator生成CanalInstance实例,并负责其启动。22 23              *24 25              * 2.如果投递mq,还会直接调用canalMQStarter来启动一个destination26 27              */28 29             public void processActiveEnter() {30 31                //省略具体内容。。。32             }33 34             /**35 36              * note37 38              * 1.与开始顺序相反,如果有mqStarter,先停止mqStarter的destination39 40              * 2.停止embedeCanalServer的destination41 42              */43 44             public void processActiveExit() {45 46                 //省略具体内容。。。47 48             }49 50             /**51 52              * note53 54              * 在Canalinstance启动之前,destination注册到ZK上,创建节点55 56              * 路径为:/otter/canal/destinations/{0}/cluster/{1},其0会被destination替换,1会被ip:port替换。57 58              * 此方法会在processActiveEnter()之前被调用59 60              */61 62             public void processStart() {63 64                 //省略具体内容。。。65 66             }67 68             /**69 70              * note71 72              * 在Canalinstance停止前,把ZK上节点删除掉73 74              * 路径为:/otter/canal/destinations/{0}/cluster/{1},其0会被destination替换,1会被ip:port替换。75 76              * 此方法会在processActiveExit()之前被调用77 78              */79 80             public void processStop() {81 82                 //省略具体内容。。。83             }84 85         });86 87         if (zkclientx != null) {88 89             runningMonitor.setZkClient(zkclientx);90 91         }92 93         // 触发创建一下cid节点94 95         runningMonitor.init();96 97         return runningMonitor;98 99     }
100
101 }

3.2 canalController的start方法

具体运行逻辑如下:

  • 在zk的/otter/canal/cluster目录下根据ip:port创建server的临时节点,注册zk监听器
  • 先启动embededCanalServer(会启动对应的监控)
  • 根据配置的instance的destination,调用runningMonitor.start() 逐个启动instance
  • 如果cannalServer不为空,启动canServer (canalServerWithNetty)

这里需要注意,canalServer什么时候为空?

如果用户选择了serverMode为mq,那么就不会启动canalServerWithNetty,采用mqStarter来作为server,直接跟mq集群交互。canalServerWithNetty只有在serverMode为tcp时才启动,用来跟canal-client做交互。

所以如果以后想把embeddedCanal嵌入自己的应用,可以考虑参考mqStarter的写法。后面我们在server模块中会做详细解析。

  1 public void start() throws Throwable {2 3     // 创建整个canal的工作节点4 5     final String path = ZookeeperPathUtils.getCanalClusterNode(registerIp + ":" + port);6 7     initCid(path);8 9     if (zkclientx != null) {10 11         this.zkclientx.subscribeStateChanges(new IZkStateListener() {12 13             public void handleStateChanged(KeeperState state) throws Exception {14 15             }16 17             public void handleNewSession() throws Exception {18 19                 initCid(path);20 21             }22 23             @Override24 25             public void handleSessionEstablishmentError(Throwable error) throws Exception{26 27                 logger.error("failed to connect to zookeeper", error);28 29             }30 31         });32 33     }34 35     // 先启动embeded服务36 37     embededCanalServer.start();38 39     // 尝试启动一下非lazy状态的通道40 41     for (Map.Entry<String, InstanceConfig> entry : instanceConfigs.entrySet()) {42 43         final String destination = entry.getKey();44 45         InstanceConfig config = entry.getValue();46 47         // 创建destination的工作节点48 49         if (!embededCanalServer.isStart(destination)) {50 51             // HA机制启动52 53             ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);54 55             if (!config.getLazy() && !runningMonitor.isStart()) {56 57                 runningMonitor.start();58 59             }60 61         }62 63         //note:为每个instance注册一个配置监视器64 65         if (autoScan) {66 67             instanceConfigMonitors.get(config.getMode()).register(destination, defaultAction);68 69         }70 71     }72 73     if (autoScan) {74 75         //note:启动线程定时去扫描配置76 77         instanceConfigMonitors.get(globalInstanceConfig.getMode()).start();78 79         //note:这部分代码似乎没有用,目前只能是manager或者spring两种方式二选一80 81         for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) {82 83             if (!monitor.isStart()) {84 85                 monitor.start();86 87             }88 89         }90 91     }92 93     // 启动网络接口94 95     if (canalServer != null) {96 97         canalServer.start();98 99     }
100
101 }

我们重点关注启动instance的过程,也就是ServerRunningMonitor的运行机制,也就是HA启动的关键。

入口在runningMonitor.start()。

  • 如果zkClient != null,就用zk进行HA启动
  • 否则,就直接processActiveEnter启动,这个我们前面已经分析过了
 1 public synchronized void start() {2 3     super.start();4 5     try {6 7         /**8 9          * note
10
11          * 内部会调用ServerRunningListener的processStart()方法
12
13          */
14
15         processStart();
16
17         if (zkClient != null) {
18
19             // 如果需要尽可能释放instance资源,不需要监听running节点,不然即使stop了这台机器,另一台机器立马会start
20
21             String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
22
23             zkClient.subscribeDataChanges(path, dataListener);
24
25             initRunning();
26
27         } else {
28
29             /**
30
31              * note
32
33              * 内部直接调用ServerRunningListener的processActiveEnter()方法
34
35              */
36
37             processActiveEnter();// 没有zk,直接启动
38
39         }
40
41     } catch (Exception e) {
42
43         logger.error("start failed", e);
44
45         // 没有正常启动,重置一下状态,避免干扰下一次start
46
47         stop();
48
49     }
50
51 }

重点关注下HA启动方式,一般 我们都采用这种模式进行。

在集群模式下,可能会有多个canal server共同处理同一个destination,

在某一时刻,只能由一个canal server进行处理,处理这个destination的canal server进入running状态,其他canal server进入standby状态。

同时,通过监听对应的path节点,一旦发生变化,出现异常,可以立刻尝试自己进入running,保证了instace的 高可用!!

启动的重点还是在initRuning()。

利用zk来保证集群中有且只有 一个instance任务在运行。

  • 还构建一个临时节点的路径:/otter/canal/destinations/{0}/running
  • 尝试创建临时节点。
  • 如果节点已经存在,说明是其他的canal server已经启动了这个canal instance。此时会抛出ZkNodeExistsException,进入catch代码块。
  • 如果创建成功,就说明没有其他server启动这个instance,可以创建
 1 private void initRunning() {2     if (!isStart()) {3         return;4     }5 6 7     //note: 还是一样构建一个临时节点的路径:/otter/canal/destinations/{0}/running8     String path = ZookeeperPathUtils.getDestinationServerRunning(destination);9     // 序列化
10     byte[] bytes = JsonUtils.marshalToByte(serverData);
11     try {
12         mutex.set(false);
13         /**
14          * note:
15          * 尝试创建临时节点。如果节点已经存在,说明是其他的canal server已经启动了这个canal instance。
16          * 此时会抛出ZkNodeExistsException,进入catch代码块。
17          */
18         zkClient.create(path, bytes, CreateMode.EPHEMERAL);
19         /**
20          * note:
21          * 如果创建成功,就开始触发启动事件
22          */
23         activeData = serverData;
24         processActiveEnter();// 触发一下事件
25         mutex.set(true);
26         release = false;
27     } catch (ZkNodeExistsException e) {
28         /**
29          * note:
30          * 如果捕获异常,表示创建失败。
31          * 就根据临时节点路径查一下是哪个canal-sever创建了。
32          * 如果没有相关信息,马上重新尝试一下。
33          * 如果确实存在,就把相关信息保存下来
34          */
35         bytes = zkClient.readData(path, true);
36         if (bytes == null) {// 如果不存在节点,立即尝试一次
37             initRunning();
38         } else {
39             activeData = JsonUtils.unmarshalFromByte(bytes, ServerRunningData.class);
40         }
41     } catch (ZkNoNodeException e) {
42         /**
43          * note:
44          * 如果是父节点不存在,那么就尝试创建一下父节点,然后再初始化。
45          */
46         zkClient.createPersistent(ZookeeperPathUtils.getDestinationPath(destination), true); // 尝试创建父节点
47         initRunning();
48     }
49 }

那运行中的HA是如何实现的呢,我们回头看一下

zkClient.subscribeDataChanges(path, dataListener);
对destination对应的running节点进行监听,一旦发生了变化,则说明可能其他处理相同destination的canal server可能出现了异常,此时需要尝试自己进入running状态。

dataListener是在ServerRunningMonitor的构造方法中初始化的,

包括节点发生变化、节点被删两种变化情况以及相对应的处理逻辑,如下 :

 1 public ServerRunningMonitor(){2     // 创建父节点3     dataListener = new IZkDataListener() {4         /**5          * note:6          * 当注册节点发生变化时,会自动回调这个方法。7          * 我们回想一下使用过程中,什么时候可能 改变节点当状态呢?8          * 大概是在控制台中,对canal-server中正在运行的 instance做"停止"操作时,改变了isActive。9          * 可以 触发 HA。
10          */
11         public void handleDataChange(String dataPath, Object data) throws Exception {
12             MDC.put("destination", destination);
13             ServerRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ServerRunningData.class);
14             if (!isMine(runningData.getAddress())) {
15                 mutex.set(false);
16             }
17
18             if (!runningData.isActive() && isMine(runningData.getAddress())) { // 说明出现了主动释放的操作,并且本机之前是active
19                 releaseRunning();// 彻底释放mainstem
20             }
21
22             activeData = (ServerRunningData) runningData;
23         }
24
25
26         /**
27          * note:
28          * 如果其他canal instance出现异常,临时节点数据被删除时,会自动回调这个方法,此时当前canal instance要顶上去
29          */
30         public void handleDataDeleted(String dataPath) throws Exception {
31             MDC.put("destination", destination);
32             mutex.set(false);
33             if (!release && activeData != null && isMine(activeData.getAddress())) {
34                 // 如果上一次active的状态就是本机,则即时触发一下active抢占
35                 initRunning();
36             } else {
37                 // 否则就是等待delayTime,避免因网络异常或者zk异常,导致出现频繁的切换操作
38                 delayExector.schedule(new Runnable() {
39                     public void run() {
40                         initRunning();
41                     }
42                 }, delayTime, TimeUnit.SECONDS);
43             }
44         }
45     };
46 }

当注册节点发生变化时,会自动回调zkListener的handleDataChange方法。

我们回想一下使用过程中,什么时候可能 改变节点当状态呢?

就是在控制台中,对canal-server中正在运行的 instance做”停止”操作时,改变了isActive,可以 触发 HA。

如下图所示

4.admin的配置监控原理

我们现在采用admin做全局的配置控制。

那么每个canalServer是怎么监控配置的变化呢?

还记得上吗cananlController的start方法中对配置监视器的启动吗?

 1 if (autoScan) {2         //note:启动线程定时去扫描配置3         instanceConfigMonitors.get(globalInstanceConfig.getMode()).start();4         //note:这部分代码似乎没有用,目前只能是manager或者spring两种方式二选一5         for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) {6             if (!monitor.isStart()) {7                 monitor.start();8             }9         }
10     }

这个就是关键的配置监控。

我们来看deployer模块中的monitor包了。

4.1 InstanceAction

是一个接口,有四个方法,用来获取配置后,对具体instance采取动作。

 1 /**2 * config配置变化后的动作3 *4 */5 public interface InstanceAction {6 7 8     /**9      * 启动destination
10      */
11     void start(String destination);
12
13
14     /**
15      * 主动释放destination运行
16      */
17     void release(String destination);
18
19
20     /**
21      * 停止destination
22      */
23     void stop(String destination);
24
25
26     /**
27      * 重载destination,可能需要stop,start操作,或者只是更新下内存配置
28      */
29     void reload(String destination);
30 }

具体实现在canalController的构造器中实现了匿名类。

4.2 InstanceConfigMonitor

这个接口有两个实现,一个是基于spring的,一个基于manager(就是admin)。

我们看下基于manager配置的实现的ManagerInstanceConfigMonitor即可。

原理很简单。

  • 采用一个固定大小线程池,每隔5s,使用PlainCanalConfigClient去拉取instance配置
  • 然后通过defaultAction去start
  • 这个start在canalController的构造器的匿名类中实现,会使用instance对应的runningMonitor做HA启动。具体逻辑上一小节已经详细介绍过了。
 1 /**2 * 基于manager配置的实现3 *4 */5 public class ManagerInstanceConfigMonitor extends AbstractCanalLifeCycle implements InstanceConfigMonitor, CanalLifeCycle {6 7 8     private static final Logger         logger               = LoggerFactory.getLogger(ManagerInstanceConfigMonitor.class);9     private long                        scanIntervalInSecond = 5;
10     private InstanceAction              defaultAction        = null;
11     /**
12      * note:
13      * 每个instance对应的instanceAction,实际上我们看代码发现都是用的同一个defaultAction
14      */
15     private Map<String, InstanceAction> actions              = new MapMaker().makeMap();
16     /**
17      * note:
18      * 每个instance对应的远程配置
19      */
20     private Map<String, PlainCanal>     configs              = MigrateMap.makeComputingMap(new Function<String, PlainCanal>() {
21                                                                  public PlainCanal apply(String destination) {
22                                                                      return new PlainCanal();
23                                                                  }
24                                                              });
25     /**
26      * note:
27      * 一个固定大小线程池,每隔5s,使用PlainCanalConfigClient去拉取instance配置
28      */
29     private ScheduledExecutorService    executor             = Executors.newScheduledThreadPool(1,
30                                                                  new NamedThreadFactory("canal-instance-scan"));
31
32     private volatile boolean            isFirst              = true;
33     /**
34      * note:
35      * 拉取admin配置的client
36      */
37     private PlainCanalConfigClient      configClient;
38 //…
39 }

5.总结

deployer模块的主要作用:

1)读取canal.properties,确定canal instance的配置加载方式。如果使用了admin,那么还会定时拉取admin上的配置更新。

2)确定canal-server的启动方式:独立启动或者集群方式启动

3)利用zkClient监听canal instance在zookeeper上的状态变化,动态停止、启动或新增,实现了instance的HA

4)利用InstanceConfigMonitor,采用固定线程定时轮训admin,获取instance的最新配置

5)启动canal server,监听客户端请求

这里还有个非常有意思的问题没有展开说明,那就是CanalStarter里面的配置加载,通过ExtensionLoader类的相关实现,如何通过不同的类加载器,实现SPI,后面再分析吧。

「从零单排canal 04」 启动模块deployer源码解析相关推荐

  1. 「从零单排canal 02」canal集群版 + admin控制台 最新搭建姿势(基于1.1.4版本)

    canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据 订阅 和 消费.应该是阿里云DTS(Data Transfer Service)的开 ...

  2. 「从零单排canal 01」 canal 10分钟入门(基于1.1.4版本)

    1.简介 canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据 订阅 和 消费.应该是阿里云DTS(Data Transfer Servi ...

  3. hbase组合rowkey_「从零单排HBase 11」HBase二级索引解决方案

    HBase一个令人惋惜的地方,就是不支持二级索引.因此,社区有了很多补充方案来填补HBase的二级索引能力的缺陷. 今天,我们就来看看有哪些二级索引方案,通过对比各个方案的优缺点,并结合我们的具体场景 ...

  4. java hbase 删除数据结构_「从零单排HBase 09」Hbase的那些数据结构和算法

    在之前学习MySQL的时候,我们知道存储引擎常用的索引结构有B+树索引和哈希索引. 而对HBase的学习,也离不开索引结构的学习,它使用了一种LSM树((Log-Structured Merge-Tr ...

  5. 从零单排之golang:mutex使用及源码详解

    mutex(互斥锁)详解:互斥锁是一个值类型,实现了locker接口,所以使用的时候需要注意参数的传递,它的底层嵌套了linux的信号量(Semaphore),每次操作其实就是PV操作 type Mu ...

  6. spring 启动之全过程 源码解析

    主类代码 public class BeanLifeCycle {public static void main(String[] args) {System.out.println("现在 ...

  7. SpringBoot启动全流程源码解析(超详细版)

    我们在使用SpringBoot启动项目的时候,可能只需加一个注解,然后启动main,整个项目就运行了起来,但事实真的是所见即所得吗,还是SpringBoot在背后默默做了很多?本文会通过源码解析的方式 ...

  8. 我搭建了一个随机「毒鸡汤」语录网站附源码下载

    小伙伴们注意:公众号的推送机制不再按照时间前后推送了,微信公众号信息流乱序.君哥建议大家把科技毒瘤君公众号置顶(设为星标⭐),以便第一时间看到推送,非常感谢~,方法如下图: 1 演示效果 ★ 遇到喜欢 ...

  9. Flink源码解析 | 从Example出发:理解Flink启动流程

    从<Apache Flink本地部署>这篇文章中可以看到,我们启动集群都是通过脚本start-cluster.sh开始执行. 我们的源码解析之路就从flink的bash脚本入手. star ...

最新文章

  1. 2021-2027年中国智能制造行业市场前景预测研究报告
  2. Codeforces Round #742 (Div. 2) E. Non-Decreasing Dilemma (线段树维护区间连续问题)
  3. 使用 Telnet 仿冒任意邮件
  4. 比特币现金的第四次战役
  5. IntelliJ IDEA 不能识别 Java 项目
  6. [C/C++] C++声明和定义的区别
  7. String为null
  8. c# groupbox大小_【已解决】C#中使得控件随着WinForm窗体的大小改变而自动变化
  9. 工业以太网交换机的重要技术参数分析
  10. python爬虫淘宝视频_Python2爬虫:以抓取淘宝MM为例(实战)
  11. 如何在命令行下运行kettle的作业(job)和转换(transform)
  12. 安徽省湖泊河流ArcGIS地形图shp图层文件下载
  13. Java六种异常处理的陋习
  14. DetectoRS: Detecting Objects with Recursive Feature Pyramidand Switchable Atrous Convolution
  15. 圣诞邀请助力活动H5系统开发
  16. Word插入的表格如何调整长和宽
  17. 嵌入式Linux开发常用命令总结
  18. 另类神秘幽浮飞棍之迷已被解开
  19. 优秀的产品经理需要具备哪些能力?
  20. assaasasas

热门文章

  1. MacOS 安装 Adobe 系列软件错误解决
  2. linux 11121端口,Linux文本处理之awk
  3. 搭建hbase启动报的错
  4. 【QT】 QComboBox实现可下拉可编辑
  5. 贝叶斯网引论 by 张连文
  6. M0001 a和b平方和
  7. 文章合集Raspberry Pi/树莓派
  8. MySQL免安装版,安装成服务,开机自启动
  9. 数据库基本操作3()
  10. roast和roasting区别_你知道“吐槽”在英语里是 roast 吗?