2019独角兽企业重金招聘Python工程师标准>>>

在HRegionServer中两个量和replication相关,如下所示:

[java] view plain copy

  1. //Replication services. if no replication, this handler will be null
  2. protected ReplicationSourceService replicationSourceHandler;
  3. protected ReplicationSinkService replicationSinkHandler;

在ReplicationSourceService中只有一个方法getWALActionsListener,该方法返回WALActionsListener。ReplicationSinkService同样也是一个接口类,它有一个方法replicateLogEntries。在HRegionServer的如下代码段中会启动replicationservice。

[java] view plain copy

  1. if(this.replicationSourceHandler == this.replicationSinkHandler && this.replicationSourceHandler != null) {
  2. this.replicationSourceHandler.startReplicationService();
  3. }else {
  4. if(this.replicationSourceHandler != null) {
  5. this.replicationSourceHandler.startReplicationService();
  6. }
  7. if(this.replicationSinkHandler != null) {
  8. this.replicationSinkHandler.startReplicationService();
  9. }
  10. }

startReplicationService中做了三件事,分别是调用ReplicationSourceManager的init方法,初始化replicationSink,初始化调度线程池scheduleThreadPool;

startReplicationService方法中调用了ReplicationSourceManager的init方法,init中遍历replicationPeers中的peerid,并以该id为参数,调用addSource方法,在addSource中针对每一个peerid构造了一个对象ReplicationSource,ReplicationSource是个守护进程,这里初始化的时候并不是通过构造函数,而是通过getReplicationSource函数,在这个方法里先获得了一个ReplicationSource的接口,接着调用init初始化该接口,此外,getReplicationSource还有一个重要的作用是它实例化了replicationEndpoint(HBaseInterClusterReplicationEndpoint)。回到addSource这个方法,它返回前调用了ReplicationSource的startup方法,startup是个挺有意思的方法,代码如下:

ReplicationSource是个守护线程,在startUp中启动了自己。。。。这么说也就是replicationPeers中的每个peerid都表示了一个slave集群,而每个slave集群都有一个自己的ReplicationSource线程。现在的重点就落在了ReplicationSource这个守护线程的处理逻辑,可以从它的run方法入手分析。

run中有如下几个关键步骤,首先:

1、启动replicationEndpoint :Service.State state = replicationEndpoint.start().get();

2、构造walEntryFilter:this.walEntryFilter = new ChainWALEntryFilter(filters);

3、进入一个循环,循环持续运行至守护线程ReplicationSource终止:

while(isActive) {

获取log path;

调用openReader打开当前path的log reader(后文详解);

从reader中依次读取WAL.Entry并放入一个List<WAL.Entry>的数据结构中,方法调用如下:

readAllEntriesToReplicateOrNextFile(currentWALisBingWrittenTo, entries)

最后调用shipEdits将entries发送到远端集群;

}

发送WALEntry到从集群的逻辑在方法shipEdits中完成,ship方法接收一个List<WAL.Entry>类型的参数entries,在shipEdits中entries参数被包装进replicateContext中并发送到从集群,这部分的主要代码如下所示:

还记得前文中说到,replicationEndpoint在getReplicationSource中初始化为HBaseInterClusterReplicationEndpoint类型的变量。进入HBaseInterClusterReplicationEndpoint的replicate方法的实现,该方法首先从参数replicateContext中获得List<Entry> entries,关键的wal传递在下面这段代码中:

其中最后一句将Entry对象序列化之后由文首RegionServer中初始化的ReplicationSinkService发送到远端集群;

以上这些就是大概的replication时,wal跨集群传递的一些细节实现。接下来回过头详细解释上文留下的一个小辫子,就是围绕ReplicationSource的openReader方法的实现,分析这个调用的目的是理清wal的读逻辑是什么样的。

ReplicationSource的openReader以currentPath为参数,调用ReplicationWALReaderManager的openReader

ReplicationWALReaderManager的openReader通过WALFactory.createReader返回指定文件的reader;

看看WALFactory.createReader中的关键代码吧:

可见Reader是在这里构建的,我们以最常见的lrClass属于ProtobufLogReader.class为例来解释,首先初始化一个数据输入流FSDataInputStream,通过这个流打开文件fs(fs在输入参数中指定),根据isPbWal选择new不同的Reader实例,最后调用reader的init方法完成初始化工作。这里的Reader大多数是DefaultWALProvider.Reader类型的。

Reader创建已经分析完毕,那读实现是什么样的?

读的动作主要在readAllEntriesToReplicateOrNextFile中,该方法接收一个List<WAL.Entry>类型的参数entries,也就是说读到的各个log entry在entries中返回,下面一一分析readAllEntriesToReplicateOrNextFile中的主要逻辑。

1、this.repLogReader.seek();

2、WAL.Entry entry = this.repLogReader.readNextAndSetPosition();

3、进入循环

while(entry != null) {

//过滤掉已经消费掉的log entry

if (replicationEndpoint.canReplicateToSameCluster()

|| !entry.getKey().getClusterIds().contains(peerClusterId)) {

entry = walEntryFilter.filter(entry);  //过滤的逻辑在walEntryFilter中实现

entries.add(entry);

}

try {

entry = this.repLogReader.readNextAndSetPosition();

}

}

4、各种metrics处理;

WALEntryFilter的作用是在把wal entries发送到slave集群前过滤掉某些并不需要的发送WAL Entries,它有很多个实现类,所有的类都实现了filter方法,这些不同的WALEntryFilter可以通过ChainWALEntryFilter构成一条责任链。HLog文件读出的wal entries流经责任链,筛选出需要replicate的walEntry,这是典型的责任链模式的应用。

转载于:https://my.oschina.net/sniperLi/blog/910764

HBase Replication源码解析之HLog读取相关推荐

  1. HDFS源码解析:教你用HDFS客户端写数据

    摘要:终于开始了这个很感兴趣但是一直觉得困难重重的源码解析工作,也算是一个好的开端. 本文分享自华为云社区<hdfs源码解析之客户端写数据>,作者: dayu_dls. 在我们客户端写数据 ...

  2. 2022-10-24 ClickHouse 源码解析-查询引擎经典理论

    ClickHouse 源码解析: 综述 ClickHouse 源码解析: MergeTree Write-Path ClickHouse 源码解析: MergeTree Read-Path Click ...

  3. Flink 全网最全资源(视频、博客、PPT、入门、原理、实战、性能调优、源码解析、问答等持续更新)

    Flink 学习 https://github.com/zhisheng17/flink-learning 麻烦路过的各位亲给这个项目点个 star,太不易了,写了这么多,算是对我坚持下来的一种鼓励吧 ...

  4. Flink 全网最全资源(视频、博客、PPT、入门、实战、源码解析、问答等持续更新)...

    Flink 学习 github.com/zhisheng17/- 麻烦路过的各位亲给这个项目点个 star,太不易了,写了这么多,算是对我坚持下来的一种鼓励吧! 本项目结构 博客 1.Flink 从0 ...

  5. Flink 源码解析 —— 源码编译运行

    更新一篇知识星球里面的源码分析文章,去年写的,周末自己录了个视频,大家看下效果好吗?如果好的话,后面补录发在知识星球里面的其他源码解析文章. 前言 之前自己本地 clone 了 Flink 的源码,编 ...

  6. Redis源码解析——前言

    今天开启Redis源码的阅读之旅.对于一些没有接触过开源代码分析的同学来说,可能这是一件很麻烦的事.但是我总觉得做一件事,不管有多大多难,我们首先要在战略上蔑视它,但是要在战术上重视它.除了一些高大上 ...

  7. 彻底理解OkHttp - OkHttp 源码解析及OkHttp的设计思想

    OkHttp 现在统治了Android的网络请求领域,最常用的框架是:Retrofit+okhttp.OkHttp的实现原理和设计思想是必须要了解的,读懂和理解流行的框架也是程序员进阶的必经之路,代码 ...

  8. python处理回显_Python中getpass模块无回显输入源码解析

    本文主要讨论了python中getpass模块的相关内容,具体如下. getpass模块 昨天跟学弟吹牛b安利Python标准库官方文档的时候偶然发现了这个模块.仔细一看内容挺少的,只有两个主要api ...

  9. java treeset原理_Java集合 --- TreeSet底层实现和原理(源码解析)

    概述 文章的内容基于JDK1.7进行分析,之所以选用这个版本,是因为1.8的有些类做了改动,增加了阅读的难度,虽然是1.7,但是对于1.8做了重大改动的内容,文章也会进行说明. TreeSet实现了S ...

最新文章

  1. django-pure-pagination 组件使用
  2. maven工程中添加scala-library并非idea scala插件
  3. 物业公司工作流应用方案
  4. Win7修复“会话‘循环内核上下文记录器’已停止,原因是存在以下错误:0xC0000188”
  5. console_init_r()函数分析
  6. 企业邮件系统搭建-关于不能往yahoo,sina,hotmail地址发邮件的问题一
  7. javascript 面向对象 new 关键字 原型链 构造函数
  8. 区块链扫盲 | 认知升级,虚拟货币骗子是什么样子的?
  9. Matlab的循环语法
  10. 群晖5.2php核心设置_只需四步, 黑群晖5.2 NAS 最简明搭建教程
  11. mac终端Login Incorrect问题
  12. Fansblog  HDU-6608(费马小定理、威尔逊定理)
  13. python神经网络编程chap01
  14. pandas_数据处理分析基本
  15. seaborn seaborn色板的使用设置(二)
  16. 绿盟科技技术大会 TechWorld 2016完美谢幕
  17. 云存储Java客户端上传文件
  18. 客户很外行,怎么办?
  19. 深入理解vue中的slot与slot-scope (简单易懂)
  20. 不接受 996 就不可思议了?

热门文章

  1. div border-radius
  2. 当执行打印预览window.close无效
  3. MySQL高级 - 常用工具 - mysqlshow
  4. SpringSecurity 案例父工程创建
  5. RabbitMq queue异常导致consumer停止
  6. 类装载器ClassLoader
  7. MyBatis 源码解读-loadCustomLogImpl(settings)
  8. 使用JUnit 5 执行条件和并发测试
  9. 数据库事务原理详解-事务基本概念
  10. 基于Xml 的IOC 容器-将配置载入内存