HBase LRU源码解析

编程技术  /  houtizong 发布于 3年前   93
先来看下LruBlockCache的构造,关键是看清每个参数的作用:
  /**   * Configurable constructor.  Use this constructor if not using defaults.   * @param maxSize maximum size of this cache, in bytes   * @param blockSize expected average size of blocks, in bytes   * @param evictionThread whether to run evictions in a bg thread or not   * @param mapInitialSize initial size of backing ConcurrentHashMap   * @param mapLoadFactor initial load factor of backing ConcurrentHashMap   * @param mapConcurrencyLevel initial concurrency factor for backing CHM   * @param minFactor percentage of total size that eviction will evict until   * @param acceptableFactor percentage of total size that triggers eviction   * @param singleFactor percentage of total size for single -access blocks   * @param multiFactor percentage of total size for multiple -access blocks   * @param memoryFactor percentage of total size for in -memory blocks   */  public LruBlockCache( long maxSize, long blockSize, boolean evictionThread,      int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel,      float minFactor , float acceptableFactor,      float singleFactor, float multiFactor, float memoryFactor) {    if(singleFactor + multiFactor + memoryFactor != 1) {      throw new IllegalArgumentException("Single, multi, and memory factors " +          " should total 1.0");    }    if(minFactor >= acceptableFactor) {      throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor");    }    if(minFactor >= 1.0f || acceptableFactor >= 1.0f) {      throw new IllegalArgumentException("all factors must be < 1" );    }    this. maxSize = maxSize;    this. blockSize = blockSize;    map = new ConcurrentHashMap<String,CachedBlock>(mapInitialSize,        mapLoadFactor, mapConcurrencyLevel);    this. minFactor = minFactor;    this. acceptableFactor = acceptableFactor;    this. singleFactor = singleFactor;    this. multiFactor = multiFactor;    this. memoryFactor = memoryFactor;    this. stats = new CacheStats();    this. count = new AtomicLong(0);    this. elements = new AtomicLong(0);    this. overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel);    this. size = new AtomicLong(this.overhead);    if(evictionThread) {      this. evictionThread = new EvictionThread(this);      this. evictionThread.start(); // FindBugs SC_START_IN_CTOR    } else {      this. evictionThread = null ;    }    this. scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this),        statThreadPeriod, statThreadPeriod , TimeUnit.SECONDS);  }


接下来我们还需要了解几个相关的类:

public class CachedBlock implements HeapSize, Comparable<CachedBlock >


这个类代表了LruBlockCache中的一个条目,它里面有个非常关键的枚举:
  static enum BlockPriority {    /**     * Accessed a single time (used for scan -resistance)     */    SINGLE,    /**     * Accessed multiple times     */    MULTI,    /**     * Block from in -memory store     */    MEMORY  };

通过以下代码可以更好的解释:
  public CachedBlock(String blockName, ByteBuffer buf, long accessTime,      boolean inMemory ) {    this. blockName = blockName;    this.buf = buf;    this. accessTime = accessTime;    this. size = ClassSize. align(blockName.length()) +    ClassSize.align(buf.capacity()) + PER_BLOCK_OVERHEAD;    //第一次缓存一个block时,假设inMemory为false(默认),那么会把这个CachedBlock的BlockPriority 设置为SINGLE, 否则为MEMORY。    if(inMemory ) {      this. priority = BlockPriority. MEMORY;    } else {      this. priority = BlockPriority. SINGLE;    }  }


 /**   * Block has been accessed.  Update its local access time.   */  public void access(long accessTime) {    this. accessTime = accessTime;    // 当再次访问到时,假如此时CacheedBlock的BlockPriority的值是SINGLE,则把它变为MULTI     if(this. priority == BlockPriority. SINGLE) {      this. priority = BlockPriority. MULTI;    }  }


另一方面,因为是LRU算法的实现,该类也实现了一个比较器:
  public int compareTo(CachedBlock that) {    if(this. accessTime == that.accessTime ) return 0;    return this.accessTime < that.accessTime ? 1 : -1;  }

因为它实现了HeapSize这个接口,所以它能返回这个条目所占用的heap大小。

另一个关键的类是LruBlcokCache的内部类:
private class BlockBucket implements Comparable<BlockBucket >

这个类的作用是把所有的block分到不同的priority bucket中,每个BlockPriority都会有自己的一个bucket

我们可以开始看将一个新的block加入缓存:
  public void cacheBlock(String blockName, ByteBuffer buf, boolean inMemory ) {    //private final ConcurrentHashMap<String,CachedBlock> map, 维护了缓存映射    CachedBlock cb = map.get(blockName);    //如果这个block已经被缓存了,那么就抛出一个运行时异常    if(cb != null) {      throw new RuntimeException("Cached an already cached block" );    }    //初始化一个新的CachedBlock    cb = new CachedBlock(blockName, buf, count.incrementAndGet(), inMemory);    //得到最新的heapsize    long newSize = size.addAndGet(cb.heapSize());    //将新增的block放到map中    map.put(blockName, cb);    //elements记录了目前缓存的数目    elements.incrementAndGet();    //假如最新的heapsize大于了acceptableSize(见下面的方法),那么就需要进行evict动作    if(newSize > acceptableSize() && ! evictionInProgress) {      runEviction();    }  } //-----------------------------  //假如没有特定的清理线程,那么就使用目前的线程来进行evict,这显然不是一个好主意,会造成阻塞,假如有清理线程,那么调用其evict方法   private void runEviction() {    if(evictionThread == null) {      evict();    } else {      evictionThread.evict(); //事实上是触发了清理线程的notify    }  }


有必要来看一下这个清理线程,在初始化LruBlockCache的时候就已经将其启动:
  private static class EvictionThread extends Thread {    private WeakReference<LruBlockCache> cache;    public EvictionThread(LruBlockCache cache) {      super( "LruBlockCache.EvictionThread" );      setDaemon( true);      this. cache = new WeakReference<LruBlockCache>(cache);    }    @Override    //这里使用了wait和notify机制,线程将一直等待,知道有notify消息过来说需要进行清理了    public void run() {      while( true) {        synchronized(this ) {          try {            this.wait();          } catch(InterruptedException e) {}        }        //这里cache使用了弱引用        LruBlockCache cache = this.cache .get();        if(cache == null) break;        cache.evict();      }    }    public void evict() {      synchronized( this) {        this.notify(); // FindBugs NN_NAKED_NOTIFY      }    }  }


看具体的evict方法:
 void evict () {    // Ensure only one eviction at a time    if(!evictionLock.tryLock()) return;    try {      evictionInProgress = true;      long currentSize = this.size .get();      //需要释放掉的heap大小      long bytesToFree = currentSize - minSize();      if (LOG.isDebugEnabled()) {        LOG.debug("Block cache LRU eviction started; Attempting to free " +          StringUtils. byteDesc(bytesToFree) + " of total=" +          StringUtils. byteDesc(currentSize));      }      if(bytesToFree <= 0) return;      // Instantiate priority buckets      //初始化三个桶,来存放single,multi,和memory,比例分别为25%,50%,25%      BlockBucket bucketSingle = new BlockBucket(bytesToFree, blockSize ,          singleSize());      BlockBucket bucketMulti = new BlockBucket(bytesToFree, blockSize ,          multiSize());      BlockBucket bucketMemory = new BlockBucket(bytesToFree, blockSize ,          memorySize());      // Scan entire map putting into appropriate buckets      for(CachedBlock cachedBlock : map.values()) {        switch(cachedBlock.getPriority()) {          case SINGLE : {            bucketSingle.add(cachedBlock);            break;          }          case MULTI : {            bucketMulti.add(cachedBlock);            break;          }          case MEMORY : {            bucketMemory.add(cachedBlock);            break;          }        }      }      //接下来将三个桶放入PriorityQueue      PriorityQueue<BlockBucket> bucketQueue =        new PriorityQueue<BlockBucket>(3);            //会调用到CachedBlockQueue的add方法,下面分析      bucketQueue.add(bucketSingle);      bucketQueue.add(bucketMulti);      bucketQueue.add(bucketMemory);      int remainingBuckets = 3;      long bytesFreed = 0;      BlockBucket bucket;     //溢出的多的那个桶,会越先被清理, 参看BlockBucket的compareTo方法       //这里也说明,三个桶本身没有优先级      while((bucket = bucketQueue.poll()) != null) {        long overflow = bucket.overflow();        if(overflow > 0) {          // 本次要释放掉的内存          long bucketBytesToFree = Math.min(overflow, (bytesToFree - bytesFreed) / remainingBuckets);          //free方法在下面解释          bytesFreed += bucket.free(bucketBytesToFree);        }        remainingBuckets--;      }      if (LOG.isDebugEnabled()) {        long single = bucketSingle.totalSize();        long multi = bucketMulti.totalSize();        long memory = bucketMemory.totalSize();        LOG.debug("Block cache LRU eviction completed; " +          "freed=" + StringUtils.byteDesc(bytesFreed) + ", " +          "total=" + StringUtils.byteDesc( this.size .get()) + ", " +          "single=" + StringUtils.byteDesc(single) + ", " +          "multi=" + StringUtils.byteDesc(multi) + ", " +          "memory=" + StringUtils.byteDesc(memory));      }    } finally {      stats.evict();      evictionInProgress = false;      evictionLock.unlock();    }  }


public void add(CachedBlock cb) {    //如果当前的heapsize小于maxsize,直接加到queue中,这边的queue也是一个PriorityQueue    if(heapSize < maxSize) {      queue.add(cb);      heapSize += cb.heapSize();    } else {      // 否则先取出列表头      CachedBlock head = queue.peek();      //判断假如的cb是不是比head大,实际的意义就是看新加入的cb是不是比head新,参看CachedBlock的compareTo方法,假如新,则继续      if(cb.compareTo(head) > 0) {        heapSize += cb.heapSize();        heapSize -= head.heapSize();        if(heapSize > maxSize ) {          //取出head          queue.poll();        } else {          heapSize += head.heapSize();        }        queue.add(cb);      }    }  }


 public long free(long toFree) {      //这边的queue是CacheBlockQueue类型,这个get方法很重要,它对PriorityQueue做了反序,这样的话就把时间最早的放在队列头      LinkedList<CachedBlock> blocks = queue.get();      long freedBytes = 0;      for(CachedBlock cb: blocks) {        freedBytes += evictBlock(cb);        if(freedBytes >= toFree) {          return freedBytes;        }      }      return freedBytes;    }


  //最后调用这个方法将block从map中移除:  protected long evictBlock(CachedBlock block) {    map.remove(block.getName());    size.addAndGet(-1 * block.heapSize());    elements.decrementAndGet();    stats.evicted();    return block.heapSize();  }

请勿发布不友善或者负能量的内容。与人为善,比聪明更重要!

留言需要登陆哦

技术博客集 - 网站简介:
前后端技术:
后端基于Hyperf2.1框架开发,前端使用Bootstrap可视化布局系统生成

网站主要作用:
1.编程技术分享及讨论交流,内置聊天系统;
2.测试交流框架问题,比如:Hyperf、Laravel、TP、beego;
3.本站数据是基于大数据采集等爬虫技术为基础助力分享知识,如有侵权请发邮件到站长邮箱,站长会尽快处理;
4.站长邮箱:[email protected];

      订阅博客周刊 去订阅

文章归档

文章标签

友情链接

Auther ·HouTiZong
侯体宗的博客
© 2020 zongscan.com
版权所有ICP证 : 粤ICP备20027696号
PHP交流群 也可以扫右边的二维码
侯体宗的博客