Cassandra启动过程详解

这里的分析从CassandraDaemon.java文件开始。

一、配置文件storage-config.xml的读取和log4j的配置文件log4j.property的设置。

配置文件的读取和解析都是在org.apache.cassandra.config.DatabaseDescriptor类中完成的,这个类的作用非常简单,就是读取配置文件中各个配置项所定义的值,经过简单的验证,符合条件就将其值赋给DatabaseDescriptor的私有静态常量。值得注意的是关于Keyspace的解析,按照ColumnFamily的配置信息构建成org.apache.cassandra.config.CFMetaData对象,最后把这些所有ColumnFamily放入Keyspace的HashMap对象org.apache.cassandra.config.KSMetaData中,每个Keyspace就是一个Table。这些信息都是作为基本的元信息,可以通过DatabaseDescriptor类直接获取。

二、Keyspace的初始化。

这里主要调用Table.open(tableName)方法创建每个Table的实例。创建Table的实例将完成:1)获取该Table的元信息TableMatedate。2)创建改Table下每个ColumnFamily的存储操作对象ColumnFamilyStore。3)启动定时程序,检查该ColumnFamily的Memtable设置的MemtableFlushAfterMinutes是否已经过期,过期立即写到磁盘。详细过程可参见我前面关于该方法的详细代码跟踪分析。

一个Keyspace对应一个Table,一个Table持有多个ColumnFamilyStore,而一个ColumnFamily对应一个ColumnFamilyStore。Table并没有直接持有ColumnFamily的引用而是持有ColumnFamilyStore,这是因为ColumnFamilyStore类中不仅定义了对ColumnFamily的各种操作而且它还持有ColumnFamily在各种状态下数据对象的引用,所以持有了ColumnFamilyStore就可以操作任何与ColumnFamily相关的数据了。

三、Commitlog日志文件的恢复。

这里调用CmmitLog.recover()方法主要完成这几个操作,发现是否有没有被写到磁盘的数据,恢复这个数据,构建新的日志文件。CommitLog日志文件的恢复策略是,在头文件中发现没有被序列化的最新的ColumnFamilyId,然后取出这个这个被序列化RowMutation对象的起始地址,反序列化成为RowMutation对象,后面的操作和新添一条数据的流程是一样的,如果这个RowMutation对象中的数据被成功写到磁盘中,那么会在CommitLog去掉已经被持久化的ColumnFamilyId。

四、检查数据文件是否需要压缩

调用CompactionManager.instance.checkAllColumnFamilies()检查CF对应的数据文件是否需要压缩。将相似大小的SStable放到一个bucket中,然后调用submitMinorIfNeeded(cfs)。

五、启动存储服务

这是启动过程最重要的一步,需要启动很多服务。具体步骤有:

5.1)创建StorageMetadata

调用方法SystemTable.initMetadata()创建StorageMetadata。元数据只创建一次,如果元数据已经存在,则直接返回。StorageMetadata将包含三个关键信息:本节点的Token、当前generation以及ClusterName。这三个信息被存在StorageService类的属性metadata中(metadata是StorageMetadata类型的对象),以便后面随时调用。

Cassandra判断如果是第一次启动,Cassandra将会创建三列分别存储这些信息并将它们存在在系统表的LocationInfoColumnFamily中,key是“L”。这里的Token判断用户是否指定,如果指定了则使用用户指定的,否则随机生成一个Token,但是这个Token有可能在后面被修改;generation=System.currentTimeMillis()/ 1000,ClusterName为读取配置文件得到的值。

如果不是第一次启动将会更新这三个值:读取数据文件中的Token信息,generation信息以及ClusterName信息后设置Token值和ClusterName的值,更新generation的值为max(当前时间秒数,old_generation+1)。这里有点要注意的是,如果在后续的过程中更改了配置文件中ClusterName的名字,这会跟数据文件中存储的信息不一致,最终会导致Cassandra无法启动。

5.2)创建所有目录

调用方法DatabaseDescriptor.createAllDirectories()创建所有的目录。包括数据文件目录data/以及日志文件目录commitlog/。同时还为keyspaces创建了数据文件目录的子目录data/system和data/keyspace,...(keyspace为用户定义的keyspace)。当然这个方法早在Table.open()已经调用过了,在这里再次调用可能是为了某些测试需要。

5.3)启动GCInspector.instance.start服务

主要是统计统计当前系统中资源的使用情况(主要就是内存使用和回收情况),将这个信息记录到日志文件中,这个可以作为系统的监控日志使用。

5.4)启动消息监听服务

这个消息监听服务就是监听整个集群中其它节点发送到本节点的所有消息,Cassandra会根据每个消息的类型,做出相应的反应。消息监听代码如下:

public void listen(InetAddress localEp) throws IOException {ServerSocketChannel serverChannel = ServerSocketChannel.open();final ServerSocket ss = serverChannel.socket();ss.setReuseAddress(true);ss.bind(new InetSocketAddress(localEp, DatabaseDescriptor.getStoragePort()));socketThread = new SocketThread(ss, "ACCEPT-" + localEp);socketThread.start();
}
  1. 这里用到了nio里面的异步IO与连网的部分。监听端口默认为7000。创建一个线程SocketThread用于监听消息。

  2. 每接收到一个消息,就创建一个新的线程newIncomingTcpConnection(socket).start()进行消息响应;该线程run方法中主要是对消息进行魔数的验证,以及读取消息头部和消息体等内容,然后将消息内容反序列化的任务MessageDeserializationTask递交到相应的消息反序列化线程池messageDeserializerExecutor_

  3. MessageDeserializationTask反序列化消息内容后调用MessagingService.receive()处理消息

  4. receive()方法中创建MessageDeliveryTask任务对象,根据消息类型得到相应的stage的线程池对象,如果没有对应的线程池,则使用messageDeserializerExecutor_

  5. stage线程池执行MessageDeliveryTask任务,该任务主要是根据消息中的Verb,调用相应的VerbHandler.doVerb()方法来完成消息的处理。比如GossipDigestAckVerbHandler.doVerb()用来处理Gossip阶段的ACK消息。

5.5)启动StorageLoadBalancer.instance.startBroadcasting服务

调用方法loadTimer_.schedule(newLoadDisseminator(), 2 * Gossiper.intervalInMillis_,BROADCAST_INTERVAL),定时得到节点负载信息,2个Gossiper心跳后开始,间隔时间为60s。该任务得到节点数据总量(包括所有Data文件、FIlter文件以及Index文件),并将其更新到ApplicationState中,然后就可以通过这个state来和其它节点交换信息。这个load信息在数据的存储和新节点加入的时候,会有参考价值。

5.6)启动Gossiper服务

在启动Gossiper服务之前,将StorageService注册为观察者,一旦节点的某些状态发生变化,而这些状态是StorageService感兴趣的,StorageService的onChange方法就会触发。Gossiper服务就是一个定时程序,它会创建一个EndPointState对象。EndPointState对象持有HeartBeatState的引用和ApplicationState的一个引用集Map<String,ApplicationState> applicationState_ = newHashtable<String,ApplicationState>()。对于每个Application对象,EndPointState只保存一个最新的值,所以新值会覆盖旧值。

HeartBeatState对象记录了当前心跳的generation和version,这个generation和前面的StorageMetadata存储的generation是一致的,在节点每次启动的时候更新;而version是从0开始的,每次更新加1;每个节点有一个HeartBeatState对象与之关联。

ApplicationState的一个引用集Map<String,ApplicationState> applicationState_则是记录一些状态信息,比如前面startBroadcasting()过程中记录节点负载情况。ApplicationState对象包含state值和version值。比如表示节点负载的状态信息可能表示形式为(5.2,45),意思就是在version为45的时候负载为5.2;相似地,节点启动的状态信息可能表示形式为(bLpassF3XD8Kyks,56),前面的值表示启动的token,后面的56是version值。是需要注意的是创建ApplicationState对象时,version值加1。

还有一个结构需要注意,就是Gossiper中的Map<InetAddress,EndPointState>endPointStateMap__,它保存了它监听到的所有节点的EndPointState信息,包括它自己的。

Gossiper这个定时程序每隔一秒钟随机向定义的节点发送一个消息,而这个消息是保持集群中节点状态一致的唯一途径(具体过程后面详述)。

5.7)判断启动模式。

启动模式跟配置文件中的AutoBootstrap这一项相关。那这个配置项与Token和负载又有何关联性?其实表面上看起来这个配置项是当这个节点启动时是否自动加入集群。但是,当你设置成False时它是不是就不加入集群呢?显然不是,这还要看你有没有配置seeds,如果你配置了其它seed,那么它仍然会去加入集群。

那么到底有何区别,通过分析其启动代码发现,这个配置项不仅跟seed配置项有关而且和Cassandra是否是第一次启动也有关。Cassandra的启动规则大慨如下:

1)当AutoBootstrap设为FALSE

  • 第一次启动时Cassandra会设置系统表中key为Bootstrap,CF为STATUS_CF的Column为B的值为TRUE,以表示这是由系统自动设置的,其实这个只是作为一个标志来判断你以后的启动情况。标记启动方式,主要是防止用户再修改AutoBootstrap的启动模式。

  • 调用TokenMetadata.updateToken()更新token。

  • 加入ApplicationState对象到EPS的map<key,AS>中。key为MOVE,state为NORMAL:Token。

  • 设置模式为“Normal”。

2)当AutoBootstrap设为TRUE,第一次启动,Cassandra会判断当前节点配置在seeds,Cassandra的启动情况和1是一样的。

3)当AutoBootstrap设为TRUE,第一次启动,并且没有配置为seed,Cassandra将会有一个漫长的启动过程,当然这个时间的长短和你的当前的集群的数据量有很大的关系。

  • 设置模式为“Joining:getting load information”。

  • 等待90s(BROADCAST_INTERVAL+RING_DELAY)为了节点获得所有其他节点的负载信息。

  • 如果tokenMetadata已经包含了本节点ip,则抛出异常。

  • 设置模式为“Joining:getting bootstrap token”。

  • 如果在配置文件中指定了InitialToken,则返回这个InitialToken。否则调用getBalancedToken(metadata,load)。

  • getBalancedToken()方法首先调用getBootstrapSource()方法得到负载最大的节点的ip地址,(如果没有任何节点的负载信息,则抛出运行时异常;这个排序过程是先以落入该节点token范围内的正处于bootstrap的节点数目排序,数目越多优先级越低。如果落入其中的bootstrap节点数目相同,再以负载大小排序)。然后向这个节点发送消息,获取其一半key范围所对应的Token,这个Token是前半部分值(如果key的数目<3,则返回一个随机Token值)。

  • public static Token getBalancedToken(TokenMetadata metadata, Map<InetAddress, Double> load) {InetAddress maxEndpoint = getBootstrapSource(metadata, load);Token<?> t = getBootstrapTokenFrom(maxEndpoint);return t;
    }
  • 调用startBootstrap()方法。

  • private void startBootstrap(Token token) throws IOException {isBootstrapMode = true;  //设置isBootstrapMode。SystemTable.updateToken(token); //更新本节点的Token值。//加入状态信息。(MOVE, appstate(BOOT:Token))Gossiper.instance.addLocalApplicationState(MOVE_STATE, new ApplicationState(STATE_BOOTSTRAPPING + Delimiter
    partitioner_.getTokenFactory().toString(token)));//设置模式为等待range重新分布setMode("Joining: sleeping " + RING_DELAY + " for pending range setup", true);try {Thread.sleep(RING_DELAY);} catch (InterruptedException e) {throw new AssertionError(e);}//设置模式为正在启动setMode("Bootstrapping", true);//开始启动new BootStrapper(FBUtilities.getLocalAddress(), token, tokenMetadata_).startBootstrap(); // handles
    }
  • 调用BootStrapper.startBootstrap()方法完成启动,发送STREAM_REQUEST消息,请求数据。
    public void startBootstrap() throws IOException {for (String table : DatabaseDescriptor.getNonSystemTables()) {Multimap<Range, InetAddress> rangesWithSourceTarget =                                getRangesWithSources(table);/* Send messages to respective folks to stream data over  to me*/for (Map.Entry<InetAddress, Collection<Range>> entry : getWorkMap(rangesWithSourceTarget).asMap().entrySet()) {InetAddress source = entry.getKey();StorageService.instance.addBootstrapSource(source, table);StreamIn.requestRanges(source, table, entry.getValue());}}}

    1)首先调用getRangesWithSources(table)得到该节点负责的ranges所对应的节点集合。

    2)对于上一步得到的节点集合,移除掉不存活的节点,然后将活着的节点加入到Multimap<InetAddress,Range> sources 集合中。通过循环对每一个活着的节点,将其加入到bootstrapSet中(即作为bootstrapsource),然后调用StreamIn.requestRanges()请求该节点对应范围内的数据。

    3)requestRanges(ip,tableName,ranges)方法构建流请求消息StreamRequestMessage,然后调用MessagingService.instance.sendOneWay(message,source)发送消息。

    4)流请求消息的序列化格式为

    streammetadata

    .length

    local

    ip

    table

    name

    ranges.size

    type

    RANGE

    lefttoken

    length+byte

    righttoken

    length+byte

    ...

    <—metadata.len-><————————metadata——<———————ranges————————>>

    5)流数据需要经历STREAM_REQUEST,STREAM_INITIATE, STREAM_INITIATE_DONE, STREAM_COMPLETE,STREAM_FINISHED..等阶段。在最后会调用finishBootstrapping()方法,其中设置启动标志,并在setToken()中设置系统表中token值,并调用updateToken()更新token环。最后加入状态信息<MOVENORMALToken>并设置modeNormal

    private void finishBootstrapping() {isBootstrapMode = false;SystemTable.setBootstrapped(true);setToken(getLocalToken());Gossiper.instance.addLocalApplicationState(MOVE_STATE,new ApplicationState(STATE_NORMAL + Delimiter + partitioner_.getTokenFactory().toString(getLocalToken())));setMode("Normal", false);}

参考资料

cassandra详解 http://www.ibm.com/developerworks/cn/opensource/os-cn-cassandraxu1/

分布式系统设计

Cassandra启动过程详解相关推荐

  1. Linux开启动过程详解

    Linux开启动过程详解 Linux启动过程 前言: Linux是一种自由和开放源代码的类UNIX操作系统.该操作系统的内核由林纳斯·托瓦兹在1991年10月5日首次发布.在加上用户空间的应用程序之后 ...

  2. centos7 启动流程图_Linux启动过程详解

    Linux启动过程详解 作者:江远航 一.启动流程图如下 图1 Linux启动流程图 BIOS ---> MBR ---> Kernel---> Init 二.Linux启动顺序 一 ...

  3. Delta3d框架学习--程序启动过程详解

    一个Delta3d程序启动过程详解 一.初始化一个dtGame::GameApplication的实例,dtGame::GameApplication* app = new dtGame::GameA ...

  4. 朱老师ARM裸机学习笔记(四):S5PV210启动过程详解

    常用器件特性 内存: SRAM 静态内存 特点就是容量小.价格高,优点是不需要软件初始化直接上电就能用 DRAM 动态内存 特点就是容量大.价格低,缺点就是上电后不能直接使用,需要软件初始化后才可以使 ...

  5. Spring启动过程详解

    Spring启动过程详解 前言 spring容器启动过程 AnnotationConfigApplicationContext 有参数构造方法 无参数构造 AnnotatedBeanDefinitio ...

  6. linux efi 启动原理,Linux(RHEL6)启动过程详解

    Linux(RHEL6)启动过程详解 Linux(红帽RHEL6)启动过程详解: RHEL的一个重要和强大的方面是它是开源的,并且系统的启动过程是用户可配置的.用户可以自由的配置启动过程的许多方面,包 ...

  7. 嵌入式linux的u-boot系统启动过程,【站友投递】U-boot启动过程详解

    [站友投递]U-boot启动过程详解 来源:互联网 作者:denny 时间:2009-03-18 Tag:点击: 一.U-BOOT的目录结构 u-boot目录下有18个子目录,分别存放管理不通的源程序 ...

  8. 家用计算机启动过程 装载主引导记录,计算机启动过程详解

    综述: 计算机启动时经过了哪些过程: 计算机接通电源后,第一步要进行加电自检,也就是POST(Power On Self Test),检查RAM.驱动器等:第二步BIOS会读取活动分区主引导记录的启动 ...

  9. SpringBoot启动过程详解

    Spring Boot通常有一个名为*Application的入口类,在入口类里有一个main方法,这个main方法其实就是一个标准的java应用的入口方法. 在main方法中使用SpringAppl ...

  10. rufus中gpt和mrb磁盘_UEFI/BIOS/MBR/GPT启动过程详解与常见系统启动问题

    做系统一大头疼事情就是UEFI/BIOS/MBR/GPT等等的选择.系统平台和硬件日新月异,基本上中文站的解释正确性参差不齐,出了问题也查不到正确解决方案,我也遇到过很多次系统启动的问题(这就是为什么 ...

最新文章

  1. 【效率】如何有效提问
  2. 移动端以刻度或尺度滑动方式选择年龄收入等
  3. pandas.Series.multiply()含义解释
  4. 【学习笔记】SAP CO成本估算相关
  5. python if条件判断和while循环 练习题
  6. 数论四之综合训练——Magic Pairs,Crime Management,Top Secret,组合数问题
  7. VS2005(c#)项目调试问题解决方案集锦
  8. u盘 连接服务器系统软件,u盘服务器系统
  9. 递归 dfs 记忆化搜索 动态规划
  10. MyEclipse或Eclipse导出JavaDoc中文乱码问题解决
  11. Pandas:时间序列数据基本操作和分组
  12. linux下解压rar文件
  13. JAVA中SSH框架
  14. IBM Websphere CEI Configuration
  15. js打印去掉页眉页脚
  16. 盛迈坤电商:拼多多推广数据多久会显示
  17. protobuf 微信小程序_Protobuf在微信小游戏开发中的使用技巧
  18. aText--问题四:Need a valid command-line; Edit the string resources accordingly
  19. CDEC 2019中国数字智能生态大会暨第十二届中国软件渠道大会 北京站 | 参会指南...
  20. 网络宣传策划无所不用其极啊

热门文章

  1. 版权符号模糊解决办法
  2. 费用型采购订单后台配置
  3. 贝叶斯信念网络简介以及算法整理笔记
  4. c语言程序答案PDF,C语言程序设计答案.pdf
  5. Kernel那些事儿之内存管理(6) --- 衣带渐宽终不悔(下)
  6. IP获取方法二:太平洋网络IP地址查询Web接口
  7. 【PP那些事儿】生产模式-面向订单生产
  8. 计算机实验报告双绞线制作,双绞线的制作实验报告.docx
  9. 没去Google I/O 2018大会?这里有你想知道的一切…
  10. 前端人脸识别框架tracking.js,解决ios浏览器调摄像头黑屏的问题,兼容pc、安卓、ios。