一、简介

BlockCache是HBase中的一个重要特性,相比于写数据时缓存为Memstore,读数据时的缓存则为BlockCache。
      LruBlockCache是HBase中BlockCache的默认实现,它采用严格的LRU算法来淘汰Block。

二、缓存级别

目前有三种缓存级别,定义在BlockPriority中,如下:

public enum BlockPriority {/*** Accessed a single time (used for scan-resistance)*/SINGLE,/*** Accessed multiple times*/MULTI,/*** Block from in-memory store*/MEMORY
}

1、SINGLE:主要用于scan等,避免大量的这种一次的访问导致缓存替换;

2、MULTI:多次缓存;

3、MEMORY:常驻缓存的,比如meta信息等。

三、缓存实现分析

LruBlockCache缓存的实现在方法cacheBlock()中,实现逻辑如下:

1、首先需要判断需要缓存的数据大小是否超过最大块大小,按照2%的频率记录warn类型log并返回;

2、从缓存map中根据cacheKey尝试获取已缓存数据块cb;

3、如果已经缓存过,比对下内容,如果内容不一样,抛出异常,否则记录warn类型log并返回;

4、获取当前缓存大小currentSize,获取可以接受的缓存大小currentAcceptableSize,计算硬性限制大小hardLimitSize;

5、如果当前大小超过硬性限制,当回收没在执行时,执行回收并返回,否则直接返回;

6、利用cacheKey、数据buf等构造Lru缓存数据块实例cb;

7、将cb放置入map缓存中;

8、元素个数原子性增1;

9、如果新大小超过当前可以接受的大小,且未执行回收过程中,执行内存回收。

详细代码如下,可自行阅读分析:

  // BlockCache implementation/*** Cache the block with the specified name and buffer.* <p>* It is assumed this will NOT be called on an already cached block. In rare cases (HBASE-8547)* this can happen, for which we compare the buffer contents.* @param cacheKey block's cache key* @param buf block buffer* @param inMemory if block is in-memory* @param cacheDataInL1*/@Overridepublic void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory,final boolean cacheDataInL1) {// 首先需要判断需要缓存的数据大小是否超过最大块大小if (buf.heapSize() > maxBlockSize) {// If there are a lot of blocks that are too// big this can make the logs way too noisy.// So we log 2%if (stats.failInsert() % 50 == 0) {LOG.warn("Trying to cache too large a block "+ cacheKey.getHfileName() + " @ "+ cacheKey.getOffset()+ " is " + buf.heapSize()+ " which is larger than " + maxBlockSize);}return;}// 从缓存map中根据cacheKey尝试获取已缓存数据块LruCachedBlock cb = map.get(cacheKey);if (cb != null) {// 如果已经缓存过// compare the contents, if they are not equal, we are in big troubleif (compare(buf, cb.getBuffer()) != 0) {// 比对缓存内容,如果不相等,抛出异常,否则记录warn日志throw new RuntimeException("Cached block contents differ, which should not have happened."+ "cacheKey:" + cacheKey);}String msg = "Cached an already cached block: " + cacheKey + " cb:" + cb.getCacheKey();msg += ". This is harmless and can happen in rare cases (see HBASE-8547)";LOG.warn(msg);return;}// 获取当前缓存大小long currentSize = size.get();// 获取可以接受的缓存大小long currentAcceptableSize = acceptableSize();// 计算硬性限制大小long hardLimitSize = (long) (hardCapacityLimitFactor * currentAcceptableSize);if (currentSize >= hardLimitSize) {// 如果当前大小超过硬性限制,当回收没在执行时,执行回收并返回stats.failInsert();if (LOG.isTraceEnabled()) {LOG.trace("LruBlockCache current size " + StringUtils.byteDesc(currentSize)+ " has exceeded acceptable size " + StringUtils.byteDesc(currentAcceptableSize) + "  too many."+ " the hard limit size is " + StringUtils.byteDesc(hardLimitSize) + ", failed to put cacheKey:"+ cacheKey + " into LruBlockCache.");}if (!evictionInProgress) {// 当回收没在执行时,执行回收并返回runEviction();}return;}// 利用cacheKey、数据buf等构造Lru缓存数据块实例cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory);long newSize = updateSizeMetrics(cb, false);// 放置入map缓存中map.put(cacheKey, cb);// 元素个数原子性增1long val = elements.incrementAndGet();if (LOG.isTraceEnabled()) {long size = map.size();assertCounterSanity(size, val);}// 如果新大小超过当前可以接受的大小,且未执行回收过程中if (newSize > currentAcceptableSize && !evictionInProgress) {runEviction();// 执行内存回收}}

四、淘汰缓存实现分析

淘汰缓存的实现方式有两种:

1、第一种是在主线程中执行缓存淘汰;

2、第二种是在一个专门的淘汰线程中通过持有对外部类LruBlockCache的弱引用WeakReference来执行缓存淘汰。

应用那种方式,取决于构造函数中的boolean参数evictionThread,如下:

    if(evictionThread) {this.evictionThread = new EvictionThread(this);this.evictionThread.start(); // FindBugs SC_START_IN_CTOR} else {this.evictionThread = null;}

而在执行淘汰缓存的runEviction()方法中,有如下判断:

  /*** Multi-threaded call to run the eviction process.* 多线程调用以执行回收过程*/private void runEviction() {if(evictionThread == null) {// 如果未指定回收线程evict();} else {// 如果执行了回收线程evictionThread.evict();}}

而EvictionThread的evict()实现如下:

    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",justification="This is what we want")public void evict() {synchronized(this) {this.notifyAll();}}

通过synchronized获取EvictionThread线程的对象锁,然后主线程通过回收线程对象的notifyAll唤醒EvictionThread线程,那么这个线程是何时wait的呢?答案就在其run()方法中,notifyAll()之后,线程run()方法得以继续执行:

    @Overridepublic void run() {enteringRun = true;while (this.go) {synchronized(this) {try {this.wait(1000 * 10/*Don't wait for ever*/);} catch(InterruptedException e) {LOG.warn("Interrupted eviction thread ", e);Thread.currentThread().interrupt();}}LruBlockCache cache = this.cache.get();if (cache == null) break;cache.evict();}}

线程会wait10s,放弃对象锁,在notifyAll()后,继续执行后面的淘汰流程,即:

  /*** Eviction method.*/void evict() {// Ensure only one eviction at a time// 通过可重入互斥锁ReentrantLock确保同一时刻只有一个回收在执行if(!evictionLock.tryLock()) return;try {// 标志位,是否正在进行回收过程evictionInProgress = true;// 当前缓存大小long currentSize = this.size.get();// 计算应该释放的缓冲大小bytesToFreelong bytesToFree = currentSize - minSize();if (LOG.isTraceEnabled()) {LOG.trace("Block cache LRU eviction started; Attempting to free " +StringUtils.byteDesc(bytesToFree) + " of total=" +StringUtils.byteDesc(currentSize));}// 如果需要回收的大小小于等于0,直接返回if(bytesToFree <= 0) return;// Instantiate priority buckets// 实例化优先级队列:single、multi、memoryBlockBucket bucketSingle = new BlockBucket("single", bytesToFree, blockSize,singleSize());BlockBucket bucketMulti = new BlockBucket("multi", bytesToFree, blockSize,multiSize());BlockBucket bucketMemory = new BlockBucket("memory", bytesToFree, blockSize,memorySize());// Scan entire map putting into appropriate buckets// 扫描缓存,分别加入上述三个优先级队列for(LruCachedBlock cachedBlock : map.values()) {switch(cachedBlock.getPriority()) {case SINGLE: {bucketSingle.add(cachedBlock);break;}case MULTI: {bucketMulti.add(cachedBlock);break;}case MEMORY: {bucketMemory.add(cachedBlock);break;}}}long bytesFreed = 0;if (forceInMemory || memoryFactor > 0.999f) {// 如果memoryFactor或者InMemory缓存超过99.9%,long s = bucketSingle.totalSize();long m = bucketMulti.totalSize();if (bytesToFree > (s + m)) {// 如果需要回收的缓存超过则全部回收Single、Multi中的缓存大小和,则全部回收Single、Multi中的缓存,剩余的则从InMemory中回收// this means we need to evict blocks in memory bucket to make room,// so the single and multi buckets will be emptiedbytesFreed = bucketSingle.free(s);bytesFreed += bucketMulti.free(m);if (LOG.isTraceEnabled()) {LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +" from single and multi buckets");}// 剩余的则从InMemory中回收bytesFreed += bucketMemory.free(bytesToFree - bytesFreed);if (LOG.isTraceEnabled()) {LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +" total from all three buckets ");}} else {// 否则,不需要从InMemory中回收,按照如下策略回收Single、Multi中的缓存:尝试让single-bucket和multi-bucket的比例为1:2// this means no need to evict block in memory bucket,// and we try best to make the ratio between single-bucket and// multi-bucket is 1:2long bytesRemain = s + m - bytesToFree;if (3 * s <= bytesRemain) {// single-bucket足够小,从multi-bucket中回收// single-bucket is small enough that no eviction happens for it// hence all eviction goes from multi-bucketbytesFreed = bucketMulti.free(bytesToFree);} else if (3 * m <= 2 * bytesRemain) {// multi-bucket足够下,从single-bucket中回收// multi-bucket is small enough that no eviction happens for it// hence all eviction goes from single-bucketbytesFreed = bucketSingle.free(bytesToFree);} else {// single-bucket和multi-bucket中都回收,且尽量满足回收后比例为1:2// both buckets need to evict some blocksbytesFreed = bucketSingle.free(s - bytesRemain / 3);if (bytesFreed < bytesToFree) {bytesFreed += bucketMulti.free(bytesToFree - bytesFreed);}}}} else {// 否则,从三个队列中循环回收PriorityQueue<BlockBucket> bucketQueue =new PriorityQueue<BlockBucket>(3);bucketQueue.add(bucketSingle);bucketQueue.add(bucketMulti);bucketQueue.add(bucketMemory);int remainingBuckets = 3;BlockBucket bucket;while((bucket = bucketQueue.poll()) != null) {long overflow = bucket.overflow();if(overflow > 0) {long bucketBytesToFree = Math.min(overflow,(bytesToFree - bytesFreed) / remainingBuckets);bytesFreed += bucket.free(bucketBytesToFree);}remainingBuckets--;}}if (LOG.isTraceEnabled()) {long single = bucketSingle.totalSize();long multi = bucketMulti.totalSize();long memory = bucketMemory.totalSize();LOG.trace("Block cache LRU eviction completed; " +"freed=" + StringUtils.byteDesc(bytesFreed) + ", " +"total=" + StringUtils.byteDesc(this.size.get()) + ", " +"single=" + StringUtils.byteDesc(single) + ", " +"multi=" + StringUtils.byteDesc(multi) + ", " +"memory=" + StringUtils.byteDesc(memory));}} finally {// 重置标志位,释放锁等stats.evict();evictionInProgress = false;evictionLock.unlock();}}

逻辑比较清晰,如下:

1、通过可重入互斥锁ReentrantLock确保同一时刻只有一个回收在执行;

2、设置标志位evictionInProgress,是否正在进行回收过程为true;

3、获取当前缓存大小currentSize;

4、计算应该释放的缓冲大小bytesToFree:currentSize - minSize();

5、如果需要回收的大小小于等于0,直接返回;

6、实例化优先级队列:single、multi、memory;

7、扫描缓存,分别加入上述三个优先级队列;

8、如果forceInMemory或者InMemory缓存超过99.9%:

8.1、如果需要回收的缓存超过则全部回收Single、Multi中的缓存大小和,则全部回收Single、Multi中的缓存,剩余的则从InMemory中回收(this means we need to evict blocks in memory bucket to make room,so the single and multi buckets will be emptied):

8.2、否则,不需要从InMemory中回收,按照如下策略回收Single、Multi中的缓存:尝试让single-bucket和multi-bucket的比例为1:2:

8.2.1、 single-bucket足够小,从multi-bucket中回收;

8.2.2、 multi-bucket足够小,从single-bucket中回收;

8.2.3、single-bucket和multi-bucket中都回收,且尽量满足回收后比例为1:2;

9、否则,从三个队列中循环回收;

10、最后,重置标志位,释放锁等。

四、实例化

参见《HBase-1.2.4 Allow block cache to be external分析》最后。

HBase-1.2.4LruBlockCache实现分析(一)相关推荐

  1. HBase建表高级属性,hbase应用案例看行键设计,HBase和mapreduce结合,从Hbase中读取数据、分析,写入hdfs,从hdfs中读取数据写入Hbase,协处理器和二级索引

    1. Hbase高级应用 1.1建表高级属性 下面几个shell 命令在hbase操作中可以起到很到的作用,且主要体现在建表的过程中,看下面几个create 属性 1. BLOOMFILTER 默认是 ...

  2. HBase之Region Compact流程分析

    Compact(合并):是指在HBase中,HRegion上某一个列簇部分或者全部Store File合并.是由于数据不断的被写入,MemStore达到阀值则会把数据flush到Store File持 ...

  3. Flink/Hbase 异常 - 4.Sink 背压100% 与 hbase.util.RetryCounter.sleepUntilNextRetry 异常分析与排查

    一.引言 Flink 程序内有读取 hbase 的需求,近期任务启动后偶发 sink 端背压 100% 导致无数据写入下游且无明显 exception 报错,重启任务后有较大概率恢复服务,但也有可能继 ...

  4. HBase架构设计及原理分析

    我们从以上架构图可以得知HBase存储机制涉及到的组件: 一 ZooKeeper作用: #存储HBase元数据 #负责HMaster的选择和主备切换 #负责对HRegionServer进行监控:HRe ...

  5. Hbase查询速度快的原理分析

    因为Hbase属于NoSQL,非关系型数据库,所以会经常拿来和关系型数据库做对比.面试的时候也会问到为何Hbase的速度快或者为什么选择Hbase作为数据库存储.下面的文章是转发的,对于上述问题的回答 ...

  6. 如何保证 HBase 服务的高可用?看看这份 HBase 可用性分析与高可用实践吧!

    来源 | 阿丸笔记 责编 | Carol 头图 | CSDN 下载自视觉中国 HBase作为一个分布式存储的数据库,它是如何保证可用性的呢?对于分布式系统的CAP问题,它是如何权衡的呢? 最重要的是, ...

  7. HBase解决Region Server Compact过程占用大量网络出口带宽的问题

    为什么80%的码农都做不了架构师?>>>    HBase 0.92版本之后,RegionServer的Compact过程根据待合并的文件大小分为smallcompaction和la ...

  8. HBase 在人工智能场景的使用

    近几年来,人工智能逐渐火热起来,特别是和大数据一起结合使用.人工智能的主要场景又包括图像能力.语音能力.自然语言处理能力和用户画像能力等等.这些场景我们都需要处理海量的数据,处理完的数据一般都需要存储 ...

  9. 文档 hbase_0783-6.2.0-如何在Hue中集成HBase

    文档编写目的 Fayson在前面介绍了<0635-5.16.1-Hue集成HBase出现Api Error异常分析>和<0647-6.1.1-Hue集成HBase出现Api Erro ...

最新文章

  1. 中值定理符号怎么读_微分、微分中值定理、泰勒公式
  2. 【 Linux 】单台服务器上并发TCP连接数(转)
  3. UDP千兆以太网FPGA_verilog实现(五、以太网帧的结构)
  4. Android 绿豆通讯录【SQLite数据库---数据库(增删改查、展示数据)】
  5. 深度学习模型大合集:GitHub趋势榜第一,两天斩获2000星
  6. 面向消息的中间件 (Message-Oriented Middleware, MOM)
  7. 构建dubbo分布式平台-window安装zookeeper注册中心
  8. redhat配置caffe多核训练
  9. poj 2406 Power Strings kmp基础
  10. EasyUI icon 小图标库,应有尽有(5000多个)打包下载
  11. 《北京市工作居住证》办理攻略
  12. 关键词抽取工具-THUtag 个人使用心得
  13. echart 环形饼图设置中心固定信息
  14. 顺序表练习(三):对称矩阵的压缩储存
  15. Total Commander 显示文件包含文件名扩展
  16. 分部积分题型总结笔记(分部积分超强拓展)
  17. Mysql内查询时报错,错误代码: 1146
  18. 羊皮卷的故事-第六章
  19. 应用交付厂商F5与微软联合解决方案怎么样?有什么作用,效果如何?
  20. 红米Note-4G双卡移动版线刷兼救砖_解账户锁_纯净刷机包_教程

热门文章

  1. librtmp分析(接收数据包处理)
  2. ret2libc过地址随机化
  3. csv中包含多余换行符_Python3爬虫之猫眼电影TOP100(requests、lxml、Xpath、CSV)
  4. h5 一镜到底_这些一镜到底的H5还能怎么玩?
  5. 【动态规划】01背包问题
  6. 70. 爬楼梯 golang 斐波那契数列
  7. 7. 整数反转 golang
  8. 守护进程nohup的用法
  9. linux中的dup和fcntl的用法
  10. C++ 向上转型初步01