1.0)meta 为一个接口,有多种实现方式,

if(!metaManager.isStart()) {metaManager.start();}

因为配置文件已经指定了实现模式,所以进入filemixedmetamanager模式

先看一下整个start方法

public voidstart() {super.start();Assert.notNull(dataDir);if(!dataDir.exists()) {try{FileUtils.forceMkdir(dataDir);} catch(IOException e) {throw newCanalMetaManagerException(e);}}if(!dataDir.canRead() || !dataDir.canWrite()) {throw newCanalMetaManagerException("dir["+ dataDir.getPath() + "] can not read/write");}dataFileCaches= MigrateMap.makeComputingMap(newFunction<String,File>() {publicFile apply(String destination) {returngetDataFile(destination);}});executor= Executors.newScheduledThreadPool(1);destinations= MigrateMap.makeComputingMap(newFunction<String,List<ClientIdentity>>() {publicList<ClientIdentity> apply(String destination) {returnloadClientIdentity(destination);}});cursors= MigrateMap.makeComputingMap(newFunction<ClientIdentity,Position>() {publicPosition apply(ClientIdentity clientIdentity) {Position position = loadCursor(clientIdentity.getDestination(),clientIdentity);if(position == null) {returnnullCursor;// 返回一个空对象标识,避免出现异常} else{returnposition;}}});updateCursorTasks= Collections.synchronizedSet(newHashSet<ClientIdentity>());// 启动定时工作任务executor.scheduleAtFixedRate(newRunnable() {public voidrun() {List<ClientIdentity> tasks = newArrayList<ClientIdentity>(updateCursorTasks);for(ClientIdentity clientIdentity : tasks) {MDC.put("destination",String.valueOf(clientIdentity.getDestination()));try{// 定时将内存中的最新值刷到file中,多次变更只刷一次if(logger.isInfoEnabled()) {LogPosition cursor = (LogPosition) getCursor(clientIdentity);logger.info("clientId:{} cursor:[{},{},{},{},{}] address[{}]", newObject[] {clientIdentity.getClientId(),cursor.getPostion().getJournalName(),cursor.getPostion().getPosition(),cursor.getPostion().getTimestamp(),cursor.getPostion().getServerId(),cursor.getPostion().getGtid(),cursor.getIdentity().getSourceAddress().toString() });}flushDataToFile(clientIdentity.getDestination());updateCursorTasks.remove(clientIdentity);} catch(Throwable e) {// ignorelogger.error("period update"+ clientIdentity.toString() + " curosr failed!",e);}}}},period,period,TimeUnit.MILLISECONDS);}

文件是否存在,是否可读可写,初始化类,启动定时任务,一秒后执行,一秒执行一次

定时任务里,对文件的处理

private voidflushDataToFile(String destination) {flushDataToFile(destination,dataFileCaches.get(destination));}private voidflushDataToFile(String destination,File dataFile) {FileMetaInstanceData data = newFileMetaInstanceData();if(destinations.containsKey(destination)) {synchronized(destination.intern()) { // 基于destination控制一下并发更新data.setDestination(destination);List<FileMetaClientIdentityData> clientDatas = Lists.newArrayList();List<ClientIdentity> clientIdentitys = destinations.get(destination);for(ClientIdentity clientIdentity : clientIdentitys) {FileMetaClientIdentityData clientData = newFileMetaClientIdentityData();clientData.setClientIdentity(clientIdentity);Position position = cursors.get(clientIdentity);if(position != null&& position != nullCursor) {clientData.setCursor((LogPosition) position);}clientDatas.add(clientData);}data.setClientDatas(clientDatas);}String json = JsonUtils.marshalToString(data);try{FileUtils.writeStringToFile(dataFile,json);} catch(IOException e) {throw newCanalMetaManagerException(e);}}
}
* 1. 先写内存,然后定时刷新数据到File
* 2. 数据采取overwrite模式(只保留最后一次),通过logger实施append模式(记录历史版本)

文件meta.dat里存放的是

client 已成功消费的最后binlog位点,时间,实例 的数据。目录和canal同级别,可配置,在canal.instance里配置

canal.conf.dir=../conf

meta.dat里的文件如下:

{"clientDatas":[{"clientIdentity":{"clientId":1001,"destination":"example","filter":""},"cursor":{"identity":{"slaveId":-1,"sourceAddress":{"address":"DESKTOP-B1R6VMO","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000009","position":6218,"serverId":1,"timestamp":1527665906000}}}],"destination":"example"}

1.1)alarm没有做任何处理,就log打印了一下

if(!alarmHandler.isStart()) {alarmHandler.start();}

1.2)store 初始化内存  new了一个数组 size为16384-1

if(!eventStore.isStart()) {eventStore.start();}
public voidstart() throwsCanalStoreException {super.start();if(Integer.bitCount(bufferSize) != 1) {throw newIllegalArgumentException("bufferSize must be a power of 2");}indexMask= bufferSize- 1;entries= newEvent[bufferSize];}

其中数组中的canalenty为google的序列化protrobuf

1.3)sink

if(!eventSink.isStart()) {eventSink.start();}
public voidstart() {super.start();Assert.notNull(eventStore);for(CanalEventDownStreamHandler handler : getHandlers()) {if(!handler.isStart()) {handler.start();}}
}

1.4)parse

if(!eventParser.isStart()) {beforeStartEventParser(eventParser);eventParser.start();afterStartEventParser(eventParser);}

before 做解析前处理,启动logposition和hacontroller

protected voidstartEventParserInternal(CanalEventParser eventParser, booleanisGroup) {if(eventParser instanceofAbstractEventParser) {AbstractEventParser abstractEventParser = (AbstractEventParser) eventParser;// 首先启动log position管理器CanalLogPositionManager logPositionManager = abstractEventParser.getLogPositionManager();if(!logPositionManager.isStart()) {logPositionManager.start();}}if(eventParser instanceofMysqlEventParser) {MysqlEventParser mysqlEventParser = (MysqlEventParser) eventParser;CanalHAController haController = mysqlEventParser.getHaController();if(haController instanceofHeartBeatHAController) {((HeartBeatHAController) haController).setCanalHASwitchable(mysqlEventParser);}if(!haController.isStart()) {haController.start();}}
}

1.4.2) 比较复杂的代码来了,和主库连接,发送dump指令,解析等

public voidstart() throwsCanalParseException {if(runningInfo== null) { // 第一次链接主库runningInfo= masterInfo;}super.start();}
public voidstart() throwsCanalParseException {if(enableTsdb) {if(tableMetaTSDB== null) {// 初始化tableMetaTSDB= TableMetaTSDBBuilder.build(destination,tsdbSpringXml);}}super.start();}

初始化缓存队列,线程,全局变量循环发送命令。

public voidstart() {super.start();MDC.put("destination",destination);// 配置transaction buffer// 初始化缓冲队列transactionBuffer.setBufferSize(transactionSize);// 设置buffer大小transactionBuffer.start();// 构造bin log parserbinlogParser= buildParser();// 初始化一下BinLogParserbinlogParser.start();// 启动工作线程parseThread= newThread(newRunnable() {public voidrun() {MDC.put("destination",String.valueOf(destination));ErosaConnection erosaConnection = null;while(running) {try{// 开始执行replication// 1. 构造Erosa连接erosaConnection = buildErosaConnection();// 2. 启动一个心跳线程startHeartBeat(erosaConnection);// 3. 执行dump前的准备工作preDump(erosaConnection);erosaConnection.connect();// 链接// 4. 获取最后的位置信息EntryPosition position = findStartPosition(erosaConnection);finalEntryPosition startPosition = position;if(startPosition == null) {throw newCanalParseException("can't find start position for "+ destination);}if(!processTableMeta(startPosition)) {throw newCanalParseException("can't find init table meta for "+ destination+ " with position : "+ startPosition);}logger.warn("find start position : {}",startPosition.toString());// 重新链接,因为在找position过程中可能有状态,需要断开后重建erosaConnection.reconnect();finalSinkFunction sinkHandler = newSinkFunction<EVENT>() {privateLogPosition lastPosition;public booleansink(EVENTevent) {try{CanalEntry.Entry entry = parseAndProfilingIfNecessary(event, false);if(!running) {return false;}if(entry != null) {exception= null;// 有正常数据流过,清空exceptiontransactionBuffer.add(entry);// 记录一下对应的positionsthis.lastPosition= buildLastPosition(entry);// 记录一下最后一次有数据的时间lastEntryTime= System.currentTimeMillis();}returnrunning;} catch(TableIdNotFoundException e) {throwe;} catch(Throwable e) {if(e.getCause() instanceofTableIdNotFoundException) {throw(TableIdNotFoundException) e.getCause();}// 记录一下,出错的位点信息processSinkError(e,this.lastPosition,startPosition.getJournalName(),startPosition.getPosition());throw newCanalParseException(e);// 继续抛出异常,让上层统一感知}}};// 4. 开始dump数据// 判断所属instance是否启用GTID模式,是的话调用ErosaConnection中GTID对应方法dump数据if(isGTIDMode()) {erosaConnection.dump(MysqlGTIDSet.parse(startPosition.getGtid()),sinkHandler);} else{if(StringUtils.isEmpty(startPosition.getJournalName()) && startPosition.getTimestamp() != null) {erosaConnection.dump(startPosition.getTimestamp(),sinkHandler);} else{erosaConnection.dump(startPosition.getJournalName(),startPosition.getPosition(),sinkHandler);}}} catch(TableIdNotFoundException e) {exception= e;// 特殊处理TableIdNotFound异常,出现这样的异常,一种可能就是起始的position是一个事务当中,导致tablemap// Event时间没解析过needTransactionPosition.compareAndSet(false, true);logger.error(String.format("dump address %s has an error, retrying. caused by ",runningInfo.getAddress().toString()),e);} catch(Throwable e) {processDumpError(e);exception= e;if(!running) {if(!(e instanceofjava.nio.channels.ClosedByInterruptException || e.getCause() instanceofjava.nio.channels.ClosedByInterruptException)) {throw newCanalParseException(String.format("dump address %s has an error, retrying. ",runningInfo.getAddress().toString()),e);}} else{logger.error(String.format("dump address %s has an error, retrying. caused by ",runningInfo.getAddress().toString()),e);sendAlarm(destination,ExceptionUtils.getFullStackTrace(e));}} finally{// 重新置为中断状态Thread.interrupted();// 关闭一下链接afterDump(erosaConnection);try{if(erosaConnection != null) {erosaConnection.disconnect();}} catch(IOException e1) {if(!running) {throw newCanalParseException(String.format("disconnect address %s has an error, retrying. ",runningInfo.getAddress().toString()),e1);} else{logger.error("disconnect address {} has an error, retrying., caused by ",runningInfo.getAddress().toString(),e1);}}}// 出异常了,退出sink消费,释放一下状态eventSink.interrupt();transactionBuffer.reset();// 重置一下缓冲队列,重新记录数据binlogParser.reset();// 重新置位if(running) {// sleep一段时间再进行重试try{Thread.sleep(10000+ RandomUtils.nextInt(10000));} catch(InterruptedException e) {}}}MDC.remove("destination");}});parseThread.setUncaughtExceptionHandler(handler);parseThread.setName(String.format("destination = %s , address = %s , EventParser",destination,runningInfo== null? null: runningInfo.getAddress()));parseThread.start();}

解析循环里的5步操作

构造一个connection

// 1. 构造Erosa连接erosaConnection = buildErosaConnection();
privateMysqlConnection buildMysqlConnection(AuthenticationInfo runningInfo) {MysqlConnection connection = newMysqlConnection(runningInfo.getAddress(),runningInfo.getUsername(),runningInfo.getPassword(),connectionCharsetNumber,runningInfo.getDefaultDatabaseName());connection.getConnector().setReceiveBufferSize(receiveBufferSize);connection.getConnector().setSendBufferSize(sendBufferSize);connection.getConnector().setSoTimeout(defaultConnectionTimeoutInSeconds* 1000);connection.setCharset(connectionCharset);// 随机生成slaveIdif(this.slaveId<= 0) {this.slaveId= generateUniqueServerId();}connection.setSlaveId(this.slaveId);returnconnection;}

返回一个mysqlconnection实体bean(user,password,url scheme),并把验证信息set里

2、启动一个线程做心跳(默认3秒 把type设置为hearbeat放到enty里 ,采用timer定时任务,)

// 2. 启动一个心跳线程startHeartBeat(erosaConnection);
protected voidstartHeartBeat(ErosaConnection connection) {lastEntryTime= 0L;// 初始化if(timer== null) {// lazy初始化一下String name = String.format("destination = %s , address = %s , HeartBeatTimeTask",destination,runningInfo== null? null: runningInfo.getAddress().toString());synchronized(AbstractEventParser.class) {// synchronized (MysqlEventParser.class) {// why use MysqlEventParser.class, u know, MysqlEventParser is// the child class 4 AbstractEventParser,// do this is ...if(timer== null) {timer= newTimer(name, true);}}}if(heartBeatTimerTask== null) {// fixed issue #56,避免重复创建heartbeat线程heartBeatTimerTask= buildHeartBeatTimeTask(connection);Integer interval = detectingIntervalInSeconds;timer.schedule(heartBeatTimerTask,interval * 1000L,interval * 1000L);logger.info("start heart beat.... ");}
}

包装entry ,发送到sink ,其实里面为心跳数据

protectedTimerTask buildHeartBeatTimeTask(ErosaConnection connection) {return newTimerTask() {public voidrun() {try{if(exception== null|| lastEntryTime> 0) {// 如果未出现异常,或者有第一条正常数据longnow = System.currentTimeMillis();longinteval = (now - lastEntryTime) / 1000;if(inteval >= detectingIntervalInSeconds) {Header.Builder headerBuilder = Header.newBuilder();headerBuilder.setExecuteTime(now);Entry.Builder entryBuilder = Entry.newBuilder();entryBuilder.setHeader(headerBuilder.build());entryBuilder.setEntryType(EntryType.HEARTBEAT);Entry entry = entryBuilder.build();// 提交到sink中,目前不会提交到store中,会在sink中进行忽略consumeTheEventAndProfilingIfNecessary(Arrays.asList(entry));}}} catch(Throwable e) {logger.warn("heartBeat run failed ",e);}}};}

发送到sink

protected booleanconsumeTheEventAndProfilingIfNecessary(List<CanalEntry.Entry> entrys) throwsCanalSinkException,InterruptedException {longstartTs = -1;booleanenabled = getProfilingEnabled();if(enabled) {startTs = System.currentTimeMillis();}booleanresult = eventSink.sink(entrys,(runningInfo== null) ? null: runningInfo.getAddress(),destination);if(enabled) {this.processingInterval= System.currentTimeMillis() - startTs;}if(consumedEventCount.incrementAndGet() < 0) {consumedEventCount.set(0);}returnresult;}

3、执行前dump工作 数据库连接是否正常

// 3. 执行dump前的准备工作preDump(erosaConnection);
protected voidpreDump(ErosaConnection connection) {if(!(connection instanceofMysqlConnection)) {throw newCanalParseException("Unsupported connection type : "+ connection.getClass().getSimpleName());}if(binlogParser!= null&& binlogParserinstanceofLogEventConvert) {metaConnection= (MysqlConnection) connection.fork();try{metaConnection.connect();} catch(IOException e) {throw newCanalParseException(e);}if(supportBinlogFormats!= null&& supportBinlogFormats.length> 0) {BinlogFormat format = ((MysqlConnection) metaConnection).getBinlogFormat();booleanfound = false;for(BinlogFormat supportFormat : supportBinlogFormats) {if(supportFormat != null&& format == supportFormat) {found = true;break;}}if(!found) {throw newCanalParseException("Unsupported BinlogFormat "+ format);}}if(supportBinlogImages!= null&& supportBinlogImages.length> 0) {BinlogImage image = ((MysqlConnection) metaConnection).getBinlogImage();booleanfound = false;for(BinlogImage supportImage : supportBinlogImages) {if(supportImage != null&& image == supportImage) {found = true;break;}}if(!found) {throw newCanalParseException("Unsupported BinlogImage "+ image);}}if(tableMetaTSDB!= null&& tableMetaTSDBinstanceofDatabaseTableMeta) {((DatabaseTableMeta) tableMetaTSDB).setConnection(metaConnection);((DatabaseTableMeta) tableMetaTSDB).setFilter(eventFilter);((DatabaseTableMeta) tableMetaTSDB).setBlackFilter(eventBlackFilter);}tableMetaCache= newTableMetaCache(metaConnection,tableMetaTSDB);((LogEventConvert) binlogParser).setTableMetaCache(tableMetaCache);}
}

判断数据库binlog模式和同步类型

binlog格式进行过滤,默认ROW模式。

binlog image进行过滤,默认是FULL,变更前后的数据,为minimal,变更后的值。

构造表结构源数据的缓存TableMetaCache


4、获取binlog位置信息 ,启动时候,只做初始化,然后就结束了(当数据流过才会有,加载配置,监听等)现在来解析一下findstartposition()方法

// 4. 获取最后的位置信息EntryPosition position = findStartPosition(erosaConnection);

进入方法:看下里面怎么判断最后位置的 gtid判断,

protectedEntryPosition findStartPosition(ErosaConnection connection) throwsIOException {if(isGTIDMode()) {// GTID模式下,CanalLogPositionManager里取最后的gtid,没有则取instanc配置中的LogPosition logPosition = getLogPositionManager().getLatestIndexBy(destination);if(logPosition != null) {returnlogPosition.getPostion();}if(StringUtils.isNotEmpty(masterPosition.getGtid())) {returnmasterPosition;}}EntryPosition startPosition = findStartPositionInternal(connection);if(needTransactionPosition.get()) {logger.warn("prepare to find last position : {}",startPosition.toString());Long preTransactionStartPosition = findTransactionBeginPosition(connection,startPosition);if(!preTransactionStartPosition.equals(startPosition.getPosition())) {logger.warn("find new start Transaction Position , old : {} , new : {}",startPosition.getPosition(),preTransactionStartPosition);startPosition.setPosition(preTransactionStartPosition);}needTransactionPosition.compareAndSet(true, false);}returnstartPosition;}

从缓存 中区 logpostion,若没有,去数据库执行,show maser status,然后是对binlog文件名字 时间戳的一些校验

protectedEntryPosition findStartPositionInternal(ErosaConnection connection) {MysqlConnection mysqlConnection = (MysqlConnection) connection;LogPosition logPosition = logPositionManager.getLatestIndexBy(destination);if(logPosition == null) {// 找不到历史成功记录EntryPosition entryPosition = null;if(masterInfo!= null&& mysqlConnection.getConnector().getAddress().equals(masterInfo.getAddress())) {entryPosition = masterPosition;} else if(standbyInfo!= null&& mysqlConnection.getConnector().getAddress().equals(standbyInfo.getAddress())) {entryPosition = standbyPosition;}if(entryPosition == null) {entryPosition = findEndPositionWithMasterIdAndTimestamp(mysqlConnection);// 默认从当前最后一个位置进行消费}// 判断一下是否需要按时间订阅if(StringUtils.isEmpty(entryPosition.getJournalName())) {// 如果没有指定binlogName,尝试按照timestamp进行查找if(entryPosition.getTimestamp() != null&& entryPosition.getTimestamp() > 0L) {logger.warn("prepare to find start position {}:{}:{}",newObject[] { "","",entryPosition.getTimestamp() });returnfindByStartTimeStamp(mysqlConnection,entryPosition.getTimestamp());} else{logger.warn("prepare to find start position just show master status");returnfindEndPositionWithMasterIdAndTimestamp(mysqlConnection);// 默认从当前最后一个位置进行消费}} else{if(entryPosition.getPosition() != null&& entryPosition.getPosition() > 0L) {// 如果指定binlogName + offest,直接返回entryPosition = findPositionWithMasterIdAndTimestamp(mysqlConnection,entryPosition);logger.warn("prepare to find start position {}:{}:{}",newObject[] { entryPosition.getJournalName(),entryPosition.getPosition(),entryPosition.getTimestamp() });returnentryPosition;} else{EntryPosition specificLogFilePosition = null;if(entryPosition.getTimestamp() != null&& entryPosition.getTimestamp() > 0L) {// 如果指定binlogName +// timestamp,但没有指定对应的offest,尝试根据时间找一下offestEntryPosition endPosition = findEndPosition(mysqlConnection);if(endPosition != null) {logger.warn("prepare to find start position {}:{}:{}",newObject[] { entryPosition.getJournalName(),"",entryPosition.getTimestamp() });specificLogFilePosition = findAsPerTimestampInSpecificLogFile(mysqlConnection,entryPosition.getTimestamp(),endPosition,entryPosition.getJournalName(),true);}}if(specificLogFilePosition == null) {// position不存在,从文件头开始entryPosition.setPosition(BINLOG_START_OFFEST);returnentryPosition;} else{returnspecificLogFilePosition;}}}} else{if(logPosition.getIdentity().getSourceAddress().equals(mysqlConnection.getConnector().getAddress())) {if(dumpErrorCountThreshold>= 0&& dumpErrorCount> dumpErrorCountThreshold) {// binlog定位位点失败,可能有两个原因:// 1. binlog位点被删除// 2.vip模式的mysql,发生了主备切换,判断一下serverId是否变化,针对这种模式可以发起一次基于时间戳查找合适的binlog位点booleancase2 = (standbyInfo== null|| standbyInfo.getAddress() == null)&& logPosition.getPostion().getServerId() != null&& !logPosition.getPostion().getServerId().equals(findServerId(mysqlConnection));if(case2) {longtimestamp = logPosition.getPostion().getTimestamp();longnewStartTimestamp = timestamp - fallbackIntervalInSeconds* 1000;logger.warn("prepare to find start position by last position {}:{}:{}", newObject[] { "","",logPosition.getPostion().getTimestamp() });EntryPosition findPosition = findByStartTimeStamp(mysqlConnection,newStartTimestamp);// 重新置为一下dumpErrorCount= 0;returnfindPosition;}}// 其余情况logger.warn("prepare to find start position just last position\n{}",JsonUtils.marshalToString(logPosition));returnlogPosition.getPostion();} else{// 针对切换的情况,考虑回退时间longnewStartTimestamp = logPosition.getPostion().getTimestamp() - fallbackIntervalInSeconds* 1000;logger.warn("prepare to find start position by switch {}:{}:{}", newObject[] { "","",logPosition.getPostion().getTimestamp() });returnfindByStartTimeStamp(mysqlConnection,newStartTimestamp);}}
}

解析以上代码:

(1)

publicLogPosition getLatestIndexBy(String destination) {LogPosition logPosition = primary.getLatestIndexBy(destination);if(logPosition != null) {returnlogPosition;}returnsecondary.getLatestIndexBy(destination);}
publicLogPosition getLatestIndexBy(String destination) {returnpositions.get(destination);}

从内存positons这个map里获取destination ,若获取不到进入下面这个方法

publicLogPosition getLatestIndexBy(String destination) {List<ClientIdentity> clientIdentities = metaManager.listAllSubscribeInfo(destination);LogPosition result = null;if(!CollectionUtils.isEmpty(clientIdentities)) {// 尝试找到一个最小的logPositionfor(ClientIdentity clientIdentity : clientIdentities) {LogPosition position = (LogPosition) metaManager.getCursor(clientIdentity);if(position == null) {continue;}if(result == null) {result = position;} else{result = CanalEventUtils.min(result,position);}}}returnresult;}

size = 1

判断配置文件中的主库信息是否与当前的数据库连接connection的地址一致,如果一致,如果一致,那么直接取properties文件中的master的位点信息,如果主库不一致,那么判断从库standby的connection地址,如果是从库,那么直接取从库的位点信息

若都不是:那么系统默认从当前时间开始消费。

protectedEntryPosition findEndPositionWithMasterIdAndTimestamp(MysqlConnection connection) {MysqlConnection mysqlConnection = (MysqlConnection) connection;finalEntryPosition endPosition = findEndPosition(mysqlConnection);if(tableMetaTSDB!= null) {longstartTimestamp = System.currentTimeMillis();returnfindAsPerTimestampInSpecificLogFile(mysqlConnection,startTimestamp,endPosition,endPosition.getJournalName(),true);} else{returnendPosition;}
}

然后去数据库执行show master status

获得最新的位点和时间戳

/**
 * 根据给定的时间戳,在指定的binlog中找到最接近于该时间戳(必须是小于时间戳)的一个事务起始位置。
 * 针对最后一个binlog会给定endPosition,避免无尽的查询
 */
privateEntryPosition findAsPerTimestampInSpecificLogFile(MysqlConnection mysqlConnection,finalLong startTimestamp,finalEntryPosition endPosition,finalString searchBinlogFile,finalBoolean justForPositionTimestamp) {finalLogPosition logPosition = newLogPosition();try{mysqlConnection.reconnect();// 开始遍历文件mysqlConnection.seek(searchBinlogFile,4L, newSinkFunction<LogEvent>() {privateLogPosition lastPosition;public booleansink(LogEvent event) {EntryPosition entryPosition = null;try{CanalEntry.Entry entry = parseAndProfilingIfNecessary(event, true);if(justForPositionTimestamp&& logPosition.getPostion() == null&& event.getWhen() > 0) {// 初始位点entryPosition = newEntryPosition(searchBinlogFile,event.getLogPos(),event.getWhen() * 1000,event.getServerId());logPosition.setPostion(entryPosition);}if(entry == null) {return true;}String logfilename = entry.getHeader().getLogfileName();Long logfileoffset = entry.getHeader().getLogfileOffset();Long logposTimestamp = entry.getHeader().getExecuteTime();Long serverId = entry.getHeader().getServerId();if(CanalEntry.EntryType.TRANSACTIONBEGIN.equals(entry.getEntryType())|| CanalEntry.EntryType.TRANSACTIONEND.equals(entry.getEntryType())) {if(logger.isDebugEnabled()) {logger.debug("compare exit condition:{},{},{}, startTimestamp={}...", newObject[] {logfilename,logfileoffset,logposTimestamp,startTimestamp});}// 事务头和尾寻找第一条记录时间戳,如果最小的一条记录都不满足条件,可直接退出if(logposTimestamp >= startTimestamp) {return false;}}if(StringUtils.equals(endPosition.getJournalName(),logfilename)&& endPosition.getPosition() < logfileoffset) {return false;}// 记录一下上一个事务结束的位置,即下一个事务的position// position = current +// data.length,代表该事务的下一条offest,避免多余的事务重复if(CanalEntry.EntryType.TRANSACTIONEND.equals(entry.getEntryType())) {entryPosition = newEntryPosition(logfilename,logfileoffset,logposTimestamp,serverId);if(logger.isDebugEnabled()) {logger.debug("set {} to be pending start position before finding another proper one...",entryPosition);}logPosition.setPostion(entryPosition);} else if(CanalEntry.EntryType.TRANSACTIONBEGIN.equals(entry.getEntryType())) {// 当前事务开始位点entryPosition = newEntryPosition(logfilename,logfileoffset,logposTimestamp,serverId);if(logger.isDebugEnabled()) {logger.debug("set {} to be pending start position before finding another proper one...",entryPosition);}logPosition.setPostion(entryPosition);}lastPosition= buildLastPosition(entry);} catch(Throwable e) {processSinkError(e,lastPosition,searchBinlogFile,4L);}returnrunning;}});} catch(IOException e) {logger.error("ERROR ## findAsPerTimestampInSpecificLogFile has an error",e);}if(logPosition.getPostion() != null) {returnlogPosition.getPostion();} else{return null;}
}

开始解析二进制,header body,转换,怎么转换的,下一篇开始

然后最终如下日志:

1.6)启动后parse解析(暂时不会走着)

afterStartEventParser(eventParser);
protected voidafterStartEventParser(CanalEventParser eventParser) {// 读取一下历史订阅的filter信息List<ClientIdentity> clientIdentitys = metaManager.listAllSubscribeInfo(destination);for(ClientIdentity clientIdentity : clientIdentitys) {subscribeChange(clientIdentity);}
}

返回true

没有做过滤

扫描配置项:(暂时不会走着)

接口类两个实现,目前manager里面为空实现

把action和des放到map里

public voidregister(String destination,InstanceAction action) {if(action != null) {actions.put(destination,action);} else{actions.put(destination,defaultAction);}
}

全局变量,定时扫描配置的任务 5秒

if(autoScan) {instanceConfigMonitors.get(globalInstanceConfig.getMode()).start();for(InstanceConfigMonitor monitor : instanceConfigMonitors.values()) {if(!monitor.isStart()) {monitor.start();}}
}

1.7 通信方面了,数据开始流转的地方

// 启动网络接口canalServer.start();

1.8netty 相关

public voidstart() {super.start();if(!embeddedServer.isStart()) {embeddedServer.start();}this.bootstrap= newServerBootstrap(newNioServerSocketChannelFactory(Executors.newCachedThreadPool(),Executors.newCachedThreadPool()));/** enable keep-alive mechanism, handle abnormal network connection* scenarios on OS level. the threshold parameters are depended on OS.* e.g. On Linux: net.ipv4.tcp_keepalive_time = 300* net.ipv4.tcp_keepalive_probes = 2 net.ipv4.tcp_keepalive_intvl = 30*/bootstrap.setOption("child.keepAlive", true);/** optional parameter.*/bootstrap.setOption("child.tcpNoDelay", true);// 构造对应的pipelinebootstrap.setPipelineFactory(newChannelPipelineFactory() {publicChannelPipeline getPipeline() throwsException {ChannelPipeline pipelines = Channels.pipeline();pipelines.addLast(FixedHeaderFrameDecoder.class.getName(), newFixedHeaderFrameDecoder());// support to maintain child socket channel.pipelines.addLast(HandshakeInitializationHandler.class.getName(),newHandshakeInitializationHandler(childGroups));pipelines.addLast(ClientAuthenticationHandler.class.getName(),newClientAuthenticationHandler(embeddedServer));SessionHandler sessionHandler = newSessionHandler(embeddedServer);pipelines.addLast(SessionHandler.class.getName(),sessionHandler);returnpipelines;}});// 启动if(StringUtils.isNotEmpty(ip)) {this.serverChannel= bootstrap.bind(newInetSocketAddress(this.ip, this.port));} else{this.serverChannel= bootstrap.bind(newInetSocketAddress(this.port));}
}

这里采用的是netty 4.0 版本,

canal 源码解析(1)-启动篇(3)相关推荐

  1. Android源码解析(一)动画篇-- Animator属性动画系统

    Android源码解析-动画篇 Android源码解析(一)动画篇-- Animator属性动画系统 Android源码解析(二)动画篇-- ObjectAnimator Android在3.0版本中 ...

  2. UWA学堂上新|虚幻引擎源码解析——基础容器篇

    文章简介 文章主要介绍了虚幻引擎的基础容器的内部数据结构和实现原理,以及在实践中的应用,性能优化等方面.包括:TArray.TSparseArray.TSet.TMap等基础容器,TQueue.TTr ...

  3. Xxl-job源码解析-调度中心篇

    写在前面:本篇讲一下xxl-job调度中心对于任务的调度部分的源码逻辑分析,逻辑都在注释里面- 一. xxl-job简介 XXL-JOB是一个分布式任务调度平台,其核心设计目标是开发迅速.学习简单.轻 ...

  4. JDK源码解析之集合篇2--Collection

    为什么80%的码农都做不了架构师?>>>    源码解析仅个人记录,若有不正确,请留言修改 package java.util;import java.util.function.P ...

  5. Vue源码解析系列——数据驱动篇:patch的执行过程

    准备 vue版本号2.6.12,为方便分析,选择了runtime+compiler版本. 回顾 如果有感兴趣的同学可以看看我之前的源码分析文章,这里呈上链接:<Vue源码分析系列:目录> ...

  6. JStorm/Storm源码解读(二)--启动篇

    为了在解读分析时有个统一的思路,本文将从启动一个集群开始分析. (说明:为了测试方便,采用的是本地模式). 1.参数设置 Config conf = new Config(); //设置Topolog ...

  7. RocketMQ源码解析-Consumer启动(2)

    接着上文的Pull消费者启动继续讲. public void start() throws MQClientException {switch (this.serviceState) {case CR ...

  8. Tomcat源码解析:启动

    文章目录 1.Catalina 开启start 2.Server启动 3.Service启动 3.1 Engine启动 3.2 Host 3.2.1 Host的初始化 3.2.2 Host的启动 3. ...

  9. RocketMQ源码解析-Consumer启动(1)

    DefaultMQPullConsumer继承了ClientConfig类,作为主动拉获取消息的消费者实现接口的管理与相关属性的配置(与PushConsumer对应).相比生产者,消费者配置的属性要复 ...

  10. RocketMQ源码解析-Producer启动

    RocketMQ中生产者通过DefaultProducer来创建. protected final transient DefaultMQProducerImpl defaultMQProducerI ...

最新文章

  1. AAAI2020-图神经网络(GNN)过去、现在、应用和未来最新研究进展分享
  2. jira 8.2.1 安装 及一些常见的坑
  3. MongoDB 查询时间差问题修复
  4. Oracle 建立包 和 包体
  5. discuz 标签详解
  6. 腾讯牵手数十家合作伙伴发起“光合计划” 推动“百千万”三大目标落地
  7. POJ - 2485(最小生成树.prime)
  8. 鸿蒙系统正式面世,跨时代!“鸿蒙”系统正式面世!余承东:如有必要随时可替代安卓...
  9. 基于Android的数据采集系统,一种基于Android的新型用户数据采集发送系统
  10. C# Wke例子 -- WebUI登录窗口
  11. 《矩阵分析与应用》(第2版)———知识+Matlab2018a——2nd
  12. ArcMap制作城市道路网壁纸(OSM道路数据下载)
  13. 读计算机网络得学五笔吗,新手学五笔打字的步骤
  14. magento 为用户注册增加一个字段
  15. VB 对数据库access的模糊查询代码
  16. 潜在因子模型_如何使用潜在因子模型在图形数据库中构建推荐系统
  17. pyTorch入门(六)——实战Android Minist OpenCV手写数字识别(附源码地址)
  18. 测试用例(测试用例的编写、评审和管理)
  19. Python tell 和 seek用法
  20. 关于Python字典的拓展使用

热门文章

  1. PHP icbc工商银行开放平台聚合支付,二维码扫码支付API云收呗对接步骤,稳步发展
  2. GIS等级考试知识集锦
  3. 转置矩阵(Transpose of a matrix)
  4. 职业院校教师招聘结构化面试
  5. 彻底解决乱码问题(一):为何会出现乱码
  6. 他是JavaScript之父,搞砸Firefox!推出下一代浏览器,使用就给钱!
  7. Lorenz系统、简单的Rossler系统和Chua电路系统的混沌吸引子——MATLAB实现
  8. PET,CT图像融合笔记
  9. 更精确的冲刺计划更好的Sprint Burndown图表
  10. 软件工程复习 第三章 软件立项