目录

  • 1-直接从MyCAT入口开始说起:MycatStartup
    • 1.1 初始化MycatStartup
      • 1.1.1 MycatStartup 启动入口
      • 1.2 ZkConfig-Zookeeper初始化
    • 1.3 ZKUtils 创建连接
      • SchemaszkToxmlLoader Schema从Zookeeper到XML文件初始化
    • 1.4 ServerzkToxmlLoader 进行server的文件从zk中加载
    • 1.5 RuleszkToxmlLoader进行rule规则文件从Zookeeper加载
    • 1.6 RuleFunctionCacheListener订阅规则文件

1-直接从MyCAT入口开始说起:MycatStartup

直接从MyCAT入口开始说起:MycatStartup

1.1 初始化MycatStartup

1.1.1 MycatStartup 启动入口

/*** @author mycat*/
public final class MycatStartup {private static final String dateFormat = "yyyy-MM-dd HH:mm:ss";private static final Logger LOGGER = LoggerFactory.getLogger(MycatStartup.class);public static void main(String[] args) {//use zk ?ZkConfig.getInstance().initZk();try {String home = SystemConfig.getHomePath();if (home == null) {System.out.println(SystemConfig.SYS_HOME + "  is not set.");System.exit(-1);}// initMycatServer server = MycatServer.getInstance();server.beforeStart();// startupserver.startup();System.out.println("MyCAT Server startup successfully. see logs in logs/mycat.log");} catch (Exception e) {SimpleDateFormat sdf = new SimpleDateFormat(dateFormat);LOGGER.error(sdf.format(new Date()) + " startup error", e);System.exit(-1);}}
}

启动逻辑一共分为3步

  • 初始化zookeeper
  • 初始化Mycat
  • 启动Mycat

1.2 ZkConfig-Zookeeper初始化

初始化代码主要使用ZkConfig类型,这个类型为单例模式,初始化代码放在了静态代码块保证类型在加载时候初始化代码只被初始化一次,初始化代码如下:

static {ZKPROPERTIES = LoadMyidPropersites();}

通过调用LoadMyidPropersites()初始化方法来进行初始化我们来详细看下源码:(这里吐槽一下,这个方法名看起来真别扭)

/*** 加载myid配制文件信息* 方法描述* @return* @创建日期 2016年9月15日*/private static Properties LoadMyidPropersites() {Properties pros = new Properties();try (InputStream configIS = ZkConfig.class.getResourceAsStream(ZK_CONFIG_FILE_NAME)) {if (configIS == null) {return null;}pros.load(configIS);} catch (IOException e) {LOGGER.error("ZkConfig LoadMyidPropersites error:", e);throw new RuntimeException("can't find myid properties file : " + ZK_CONFIG_FILE_NAME);}// validateString zkURL = pros.getProperty(ZkParamCfg.ZK_CFG_URL.getKey());String myid = pros.getProperty(ZkParamCfg.ZK_CFG_MYID.getKey());String clusterId = pros.getProperty(ZkParamCfg.ZK_CFG_CLUSTERID.getKey());if (Strings.isNullOrEmpty(clusterId) ||Strings.isNullOrEmpty(zkURL) || Strings.isNullOrEmpty(myid)) {throw new RuntimeException("clusterId and zkURL and myid must not be null or empty!");}return pros;}

LoadMyidPropersites文件主要从myid.properties文件中读取配置信息转换为Properties属性对象
同时验证下zkURL(zk配制的url地址信息),myid(当前mycat节点的id),clusterId(集群id)是否存在

类型加载初始化之后开始执行Zookeeper的初始化:调用initZk()
方法

    public void initZk(){try {//配置loadZk 是否启用zookeeperif (Boolean.parseBoolean(ZKPROPERTIES.getProperty(ZkParamCfg.ZK_CFG_FLAG.getKey()))) {//如果Zookeeper配置启用就把Zookeeper中存储的配置持久化到本地ZktoXmlMain.loadZktoFile();}} catch (Exception e) {LOGGER.error("error:",e);}}

接下来看下同步Zookeeper配置到本地的代码
类型ZktoXmlMain类中的loadZktoFile方法
//创建zookeeper的连接,并开始初始化基本解析对象

/*** 将zk数据放到到本地* 方法描述* @throws Exception * @创建日期 2016年9月21日*/public static void loadZktoFile() throws Exception {// 得到集群名称clusterIdString custerName = ZkConfig.getInstance().getValue(ZkParamCfg.ZK_CFG_CLUSTERID);// 得到基本路径  根路径为 /mycat String basePath = ZookeeperPath.ZK_SEPARATOR.getKey() + ZookeeperPath.FLOW_ZK_PATH_BASE.getKey();//拼接到的完整根路径为 /mycat/{clusterName} ,这个路径作为当前连接的命名恐惧basePath = basePath + ZookeeperPath.ZK_SEPARATOR.getKey() + custerName;ZKLISTENER.setBasePath(basePath);// 获得zk的连接信息   配置参数zkURL 配置Zookeeper连接路径 然后创建连接CuratorFramework zkConn = buildConnection(ZkConfig.getInstance().getValue(ZkParamCfg.ZK_CFG_URL));// 获得公共的xml转换器对象 XmlProcessBase xmlProcess = new XmlProcessBase();// 加载以接收者 进行schema的文件从zk中加载new SchemaszkToxmlLoader(ZKLISTENER, zkConn, xmlProcess);// server加载进行server的文件从zk中加载new ServerzkToxmlLoader(ZKLISTENER, zkConn, xmlProcess);// rule文件加载 进行rule的文件从zk中加载,当前版本已经不再使用// new RuleszkToxmlLoader(zkListen, zkConn, xmlProcess);//当前版本处理rule文件方式是订阅rules目录来将配置变更刷新到RuleFunctionCacheListener监听器中来处理 ZKUtils.addChildPathCache(ZKUtils.getZKBasePath() + "rules", new RuleFunctionCacheListener());// 将序列配制信息加载new SequenceTopropertiesLoader(ZKLISTENER, zkConn, xmlProcess);// 进行ehcache转换new EcacheszkToxmlLoader(ZKLISTENER, zkConn, xmlProcess);// 将bindata目录的数据进行转换到本地文件ZKUtils.addChildPathCache(ZKUtils.getZKBasePath() + "bindata", new BinDataPathChildrenCacheListener());// ruledataZKUtils.addChildPathCache(ZKUtils.getZKBasePath() + "ruledata", new RuleDataPathChildrenCacheListener());// 初始化xml转换操作xmlProcess.initJaxbClass();// 通知所有人ZKLISTENER.notifly(ZkNofiflyCfg.ZK_NOTIFLY_LOAD_ALL.getKey());// 加载watchloadZkWatch(ZKLISTENER.getWatchPath(), zkConn, ZKLISTENER);// 创建临时节点createTempNode(ZKUtils.getZKBasePath() + "line", ZkConfig.getInstance().getValue(ZkParamCfg.ZK_CFG_MYID),zkConn, ZkConfig.getInstance().getValue(ZkParamCfg.MYCAT_SERVER_TYPE));// 接收zk发送过来的命令runCommandWatch(zkConn, ZKUtils.getZKBasePath() + ZKHandler.ZK_NODE_PATH);MigrateTaskWatch.start();}

1.3 ZKUtils 创建连接

先来看下buildConnection是如何获取连接的调用了ZKUtils的getConnection方法

    private static CuratorFramework buildConnection(String url) {return ZKUtils.getConnection();}

继续看ZKUtils的getConnection方法,这里直接返回了CuratorFramework对象

 public static CuratorFramework getConnection() {return curatorFramework;}

那真正创建连接的位置在哪里呢?这就要看下ZKUtils的初始化代码了:

static {//创建连接curatorFramework = createConnection();//创建关闭钩子当进程关闭的时候执行如下代码进行关闭连接Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {@Overridepublic void run() {if (curatorFramework != null)curatorFramework.close();watchMap.clear();}}));}

下面就来详细看下创建连接的代码

 private static CuratorFramework createConnection() {//获取连接Zookeeper的配置地址String url = ZkConfig.getInstance().getZkURL();
//创建Curator客户端对象,ExponentialBackoffRetry类型对象为重试策略,随着重试之间的睡眠时间增加而重试设定的次数,这里配置为指数退避重试
// 一共有3个参数这里只设置了前两个分别为baseSleepTimeMs :初始 sleep 时间 (毫秒) maxRetries : 最大重试次数,maxSleepMs 每次重试的最大睡眠时间(毫秒)(此参数不指定,默认是 Integer.MAX_VALUE)CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(url, new ExponentialBackoffRetry(100, 6));// start connection 开始启动连接curatorFramework.start();// wait 3 second to establish connecttry {//阻塞,直到与ZooKeeper的连接可用或已超过maxWaitTimecuratorFramework.blockUntilConnected(3, TimeUnit.SECONDS);//3秒内连接成功则返回连接失败则关闭连接抛出异常if (curatorFramework.getZookeeperClient().isConnected()) {return curatorFramework;}} catch (InterruptedException ignored) {Thread.currentThread().interrupt();}// fail situationcuratorFramework.close();throw new RuntimeException("failed to connect to zookeeper service : " + url);}

ExponentialBackoffRetry类型对象为重试策略,随着重试之间的睡眠时间增加而重试设定的次数,这里配置为指数退避重试
一共有3个参数这里只设置了前两个分别为

  • baseSleepTimeMs :初始 sleep 时间 (毫秒)
  • maxRetries : 最大重试次数,
  • maxSleepMs 每次重试的最大睡眠时间(毫秒)(此参数不指定,默认是 Integer.MAX_VALUE)

时间间隔 的公式计算:

时间间隔 = baseSleepTimeMs * Math.max(1, random.nextInt( 1<<(retryCount+1)) )

说明:
(1<<(retryCount+1) )的取值是 2,4,8,16。(retryCount>=0 )

随着重试次数的增加,计算出来的sleep 时间会越来越大。如果sleep 时间在 maxSleepMs 的范围内,那么就使用该 sleep 时间,否则使用;maxSleep

上面的参数全部写固定了无法配置不是很灵活呀

SchemaszkToxmlLoader Schema从Zookeeper到XML文件初始化

这个文件是MyCat最重要的配置文件,负责管理库,表,分片规则,DataNode ,DataSource。
SchemaszkToxmlLoader构造器如下:

    public SchemaszkToxmlLoader(ZookeeperProcessListen zookeeperListen, CuratorFramework curator,XmlProcessBase xmlParseBase) {//初始化成员变量curatorthis.setCurator(curator);//初始化Zookeeper监听器对象(进行zookeeper操作的监控器器父类信息)this.zookeeperListen = zookeeperListen;// 获得当前集群的名称String schemaPath = zookeeperListen.getBasePath();//这个完整路径为/mycat/{clusterName}/schemaschemaPath = schemaPath + ZookeeperPath.ZK_SEPARATOR.getKey() + ZookeeperPath.FOW_ZK_PATH_SCHEMA.getKey();//为成员变量赋值schema路径currZkPath = schemaPath;// 将当前自己注册为事件接收对象 当前类型监听路径schemaPath,当监听产生时候触发notiflyProcess() 方法 this.zookeeperListen.addListen(schemaPath, this);// 生成xml与类的转换信息 schema.xml与javabean之间的转化this.parseSchemaXmlService = new SchemasParseXmlImpl(xmlParseBase);}

SchemaszkToxmlLoader构造器主要做了一些初始化工作订阅Zookeeper的schemaPath路径下的节点的变更通知,然后初始化 schema.xml与javabean之间的转化对象schema.xml主要是用来存储一些表相关的信息

1.4 ServerzkToxmlLoader 进行server的文件从zk中加载

Server.xml保存了mycat需要的所有的系统配置信息
来看下这个类型的初始化:

  public ServerzkToxmlLoader(ZookeeperProcessListen zookeeperListen, CuratorFramework curator,XmlProcessBase xmlParseBase) {//初始化成员变量curatorthis.setCurator(curator);
//初始化Zookeeper监听器对象(进行zookeeper操作的监控器器父类信息)this.zookeeperListen = zookeeperListen;// 获得当前集群的名称String serverPath = zookeeperListen.getBasePath();//这个完整路径为/mycat/{clusterName}/serverserverPath = serverPath + ZookeeperPath.ZK_SEPARATOR.getKey() + ZookeeperPath.FLOW_ZK_PATH_SERVER.getKey();//为成员变量赋值serverPath路径currZkPath = serverPath;// 将当前自己注册为事件接收对象,当监听产生时候触发notiflyProcess() 方法 this.zookeeperListen.addListen(serverPath, this);// 生成xml与类的转换信息parseServerXMl = new ServerParseXmlImpl(xmlParseBase);}

1.5 RuleszkToxmlLoader进行rule规则文件从Zookeeper加载

rule.xml 里面就定义了我们对表进行拆分所涉及到的规则定义。我们可以灵活的对表使用不同的分片算法 这个类型虽然在当前版本不再使用来不过我们还是可以了解下
接下来我们看下同步配置的初始化代码

    public RuleszkToxmlLoader(ZookeeperProcessListen zookeeperListen, CuratorFramework curator,XmlProcessBase xmlParseBase) {//初始化成员变量curatorthis.setCurator(curator);//初始化Zookeeper监听器对象(进行zookeeper操作的监控器器父类信息)this.zookeeperListen = zookeeperListen;// 获得当前集群的名称String RulesPath = zookeeperListen.getBasePath();//这个完整路径为/mycat/{clusterName}/rulesRulesPath = RulesPath + ZookeeperPath.ZK_SEPARATOR.getKey() + ZookeeperPath.FLOW_ZK_PATH_RULE.getKey();//为成员变量//为成员变量赋值RulesPath路径 currZkPath = RulesPath;// 将当前自己注册为事件接收对象,当监听产生时候触发notiflyProcess() 方法zookeeperListen.addListen(RulesPath, this);// 生成xml与类的转换信息parseRulesXMl = new RuleParseXmlImpl(xmlParseBase);}

1.6 RuleFunctionCacheListener订阅规则文件

这里我们主要来了解规则路径变更
初始化代码:

  public RuleFunctionCacheListener() {//创建xml处理类XmlProcessBase xmlProcessBase = new XmlProcessBase();//规则xml处理类parseRulesXMl = new RuleParseXmlImpl(xmlProcessBase) ;try {//初始化用于xml解析的jax组件xmlProcessBase.initJaxbClass();} catch (JAXBException e) {LOGGER.error("error",e);}}

前面初始化过程中将当前对象添加到了规则路径变更订阅中我们可以看下订阅的代码:
订阅节点主要触发这个接口PathChildrenCacheListener
然后回调childEvent方法,接下来看下这个规则文件中的childEvent是如何处理的

@Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {ChildData data = event.getData();switch (event.getType()) {case CHILD_ADDED:addOrUpdate();break;case CHILD_UPDATED:addOrUpdate();break;default:break;}}

可以看到节点数据发生变更的时候都会触发addOrUpdate方法,接下来我们来看下这个更新方法

private void addOrUpdate(){Rules Rules = null;try {Rules = this.zktoRulesBean();} catch (Exception e) {LOGGER.error("error",e);}LOGGER.info("RuleszkToxmlLoader notiflyProcess zk to object  zk Rules Object  :" + Rules);// 将mapfile信息写入到文件 中writeMapFileAddFunction(Rules.getFunction());LOGGER.info("RuleszkToxmlLoader notiflyProcess write mapFile is success ");// 数配制信息写入文件String path = RuleszkToxmlLoader.class.getClassLoader().getResource(ZookeeperPath.ZK_LOCAL_WRITE_PATH.getKey()).getPath();path = new File(path).getPath() + File.separator;path = path + WRITEPATH;LOGGER.info("RuleszkToxmlLoader notiflyProcess zk to object writePath :" + path);//将读取到的规则文件内容持久化到xml文件中this.parseRulesXMl.parseToXmlWrite(Rules, path, "rule");LOGGER.info("RuleszkToxmlLoader notiflyProcess zk to object zk Rules      write :" + path + " is success");//连接处理配置不为空则刷新下配置if (MycatServer.getInstance().getProcessors() != null)ReloadConfig.reload();}

zktoRulesBean方法
这个方法主要是从Zookeeper的路径/mycat/rules/tableRule节点下获取表路由规则同步至内存
然后再从/mycat/rules/function节点下获取表路由规则函数

    private Rules zktoRulesBean() throws Exception {Rules Rules = new Rules();// tablerule信息String value=  new String( ZKUtils.getConnection().getData().forPath(ZKUtils.getZKBasePath()+"rules/tableRule"),"UTF-8") ;DataInf RulesZkData = new ZkDataImpl("tableRule",value);List<TableRule> tableRuleData = parseJsonTableRuleService.parseJsonToBean(RulesZkData.getDataValue());Rules.setTableRule(tableRuleData);// 得到function信息String fucValue=  new String( ZKUtils.getConnection().getData().forPath(ZKUtils.getZKBasePath()+"rules/function"),"UTF-8") ;DataInf functionZkData =new ZkDataImpl("function",fucValue) ;List<Function> functionList = parseJsonFunctionService.parseJsonToBean(functionZkData.getDataValue());Rules.setFunction(functionList);return Rules;}

writeMapFileAddFunction 将从Zookeeper中读取到的mapfile文件同步持久化到硬盘
通过mapFile配置一个分片关系映射,其格式为key-value,key为枚举,value为数据节点的索引。

private void writeMapFileAddFunction(List<Function> functionList) {List<Property> tempData = new ArrayList<>();List<Property> writeData = new ArrayList<>();//双层循环遍历所有函数的所有属性查询到与mapFile配置匹配的文件for (Function function : functionList) {List<Property> proList = function.getProperty();if (null != proList && !proList.isEmpty()) {// 进行数据遍历,查询总Zookeeper中获取到的mapFile配置for (Property property : proList) {// 如果为mapfile,则需要去读取数据信息,并存到json中if (ParseParamEnum.ZK_PATH_RULE_MAPFILE_NAME.getKey().equals(property.getName())) {tempData.add(property);}}// 通过mapfile的名称,找到对应的数据信息if (!tempData.isEmpty()) {for (Property property : tempData) {for (Property prozkdownload : proList) {// 根据mapfile的文件名去提取数据if (property.getValue().equals(prozkdownload.getName())) {writeData.add(prozkdownload);}}}}// 将对应的数据信息写入到磁盘中if (!writeData.isEmpty()) {for (Property writeMsg : writeData) {this.writeMapFile(writeMsg.getName(), writeMsg.getValue());}}// 将数据添加的集合中proList.removeAll(writeData);// 清空,以进行下一次的添加tempData.clear();writeData.clear();}}}

持久化数据

private void writeMapFile(String name, String value) {// 加载数据String path = RuleszkToxmlLoader.class.getClassLoader().getResource(ZookeeperPath.ZK_LOCAL_WRITE_PATH.getKey()).getPath();checkNotNull(path, "write Map file curr Path :" + path + " is null! must is not null");path = new File(path).getPath() + File.separator;path += name;// 进行数据写入try {Files.write(value.getBytes(), new File(path));} catch (IOException e1) {e1.printStackTrace();}}

技术咨询支持,可以扫描微信公众号进行回复咨询

[数据库中间件-Mycat 1.6.7.6-release源码解析系列]-1-直接从MyCAT入口开始说起相关推荐

  1. openGauss数据库源码解析系列文章——openGauss开发快速入门(二)

    在上一篇openGauss数据库源码解析系列文章--openGauss开发快速入门(上)中,我们介绍了openGauss的安装部署方法,本篇将具体介绍openGauss基本使用. 二. openGau ...

  2. openGauss数据库源码解析系列文章--openGauss简介(一)

    openGauss数据库是华为深度融合在数据库领域多年经验,结合企业级场景要求推出的新一代企业级开源数据库.此前,Gauss松鼠会已经发布了openGauss数据库核心技术系列文章,介绍了openGa ...

  3. ⭐openGauss数据库源码解析系列文章—— 角色管理⭐

    在前面介绍过"9.1 安全管理整体架构和代码概览.9.2 安全认证",本篇我们介绍第9章 安全管理源码解析中"9.3 角色管理"的相关精彩内容介绍. 9.3 角 ...

  4. ⭐openGauss数据库源码解析系列文章—— 对象权限管理⭐

    在前面文章中介绍过"9.3 角色管理整",本篇我们介绍第9章 安全管理源码解析中"9.4 对象权限管理"的相关精彩内容介绍. 9.4 对象权限管理 权限管理是安 ...

  5. openGauss数据库源码解析系列文章—— AI技术之“自调优”

    上一篇介绍了第七章执行器解析中"7.6 向量化引擎"及"7.7 小结"的相关内容,本篇我们开启第八章 AI技术中"8.1 概述"及" ...

  6. openGauss数据库源码解析系列文章——openGauss开发快速入门(一)

    作为openGauss数据库开发者,在基于开源社区的openGauss版本进行二次开发的过程中,需要完成软件包获取.源码了解.代码修改.编译发布等过程,同时还需要安装数据库以了解数据库的基本特点.验证 ...

  7. openGauss数据库源码解析系列文章—— SQL引擎源解析(一)

    本篇我们开启"SQL引擎源解析"中"6.1 概述"及"6.2 SQL解析"的精彩内容介绍. 第6章 SQL引擎源解析 SQL引擎作为数据库系 ...

  8. SpringBoot(1.5.6.RELEASE)源码解析(一)

    启动SpringBoot,需要在入口函数所在的类上添加@SpringBootApplication注解 1 @SpringBootApplication 2 public class Applicat ...

  9. SpringBoot(1.5.6.RELEASE)源码解析(三)

    请尊重作者劳动成果,转载请标明原文链接:http://www.cnblogs.com/dylan-java/p/7455699.html 上一篇分析了SpringApplication的初始化,接下来 ...

最新文章

  1. 视觉惯性SLAM:VINS-Mono
  2. mac电脑配置java环境变量_教你在Mac系统中配置JAVA环境变量的方法
  3. Java LinkedList类基本用法
  4. activiti 工作流_springboot+activiti+angular 集成activiti工作流实现,源码分享
  5. 你这飞机会爆炸吗?航空公司含泪甩卖49元机票,却被超模君挖出秘密!
  6. plupload怎么设置属性_腾达无线路由器怎么设置,这些是你要知道的
  7. jQuery Mobile入门必看
  8. Linux 下java jdk安装
  9. 删除网络信息服务器端,网络安全:手动清除gh0st远控服务端
  10. 面试官问:MySQL的自增 ID 用完了,怎么办?
  11. C/C++[codeup 2018]数列
  12. 网上银行显示本服务器只显示,使用企业网上银行时常见报错提示有哪些,怎么解决?...
  13. java基础-java概述,基本数据类型,基础语法
  14. 基于LMS及FxLMS算法的ANC主动降噪仿真分析
  15. 第四节 电路心法 基尔霍夫电压定律与电流定律
  16. 投影仪对焦应用镜头马达驱动芯片
  17. 自己动手搭建一个简单的网站
  18. 64g的u盘只有58g_为啥我买的64G U盘实际只有57G?聊聊存储市场的“不足量”现象...
  19. JDK11安装教程(手把手配置,也适用于其他jdk版本)
  20. c语言:做一个密码登录程序

热门文章

  1. Ask Ziggy:通过语音进行搜索 同时得到语音答复
  2. 阿里集团副总裁语嫣,拜托别把这名字给玷污了行吗
  3. 我的世界服务器公会系统指令,我的世界Factions 公会家族插件
  4. 游戏机马戏团用计算机怎么弹,疯狂马戏团推币机爆机位置和涨分技巧!新手都是这样错误操作...
  5. 网卡firmware
  6. Python LDA gensim 计算 perplexity
  7. 【测试理论】如何做好探索性测试—基础篇
  8. 只为途中与你相见——记大五台山之行
  9. laravel 实战 总结
  10. linux构建widi显示服务,教你四步轻松搞定WiDi 体验无线家庭影院