HBase LRU源码解析
编程技术  /  houtizong 发布于 3年前   93
/** * Configurable constructor. Use this constructor if not using defaults. * @param maxSize maximum size of this cache, in bytes * @param blockSize expected average size of blocks, in bytes * @param evictionThread whether to run evictions in a bg thread or not * @param mapInitialSize initial size of backing ConcurrentHashMap * @param mapLoadFactor initial load factor of backing ConcurrentHashMap * @param mapConcurrencyLevel initial concurrency factor for backing CHM * @param minFactor percentage of total size that eviction will evict until * @param acceptableFactor percentage of total size that triggers eviction * @param singleFactor percentage of total size for single -access blocks * @param multiFactor percentage of total size for multiple -access blocks * @param memoryFactor percentage of total size for in -memory blocks */ public LruBlockCache( long maxSize, long blockSize, boolean evictionThread, int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel, float minFactor , float acceptableFactor, float singleFactor, float multiFactor, float memoryFactor) { if(singleFactor + multiFactor + memoryFactor != 1) { throw new IllegalArgumentException("Single, multi, and memory factors " + " should total 1.0"); } if(minFactor >= acceptableFactor) { throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor"); } if(minFactor >= 1.0f || acceptableFactor >= 1.0f) { throw new IllegalArgumentException("all factors must be < 1" ); } this. maxSize = maxSize; this. blockSize = blockSize; map = new ConcurrentHashMap<String,CachedBlock>(mapInitialSize, mapLoadFactor, mapConcurrencyLevel); this. minFactor = minFactor; this. acceptableFactor = acceptableFactor; this. singleFactor = singleFactor; this. multiFactor = multiFactor; this. memoryFactor = memoryFactor; this. stats = new CacheStats(); this. count = new AtomicLong(0); this. elements = new AtomicLong(0); this. overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel); this. size = new AtomicLong(this.overhead); if(evictionThread) { this. evictionThread = new EvictionThread(this); this. evictionThread.start(); // FindBugs SC_START_IN_CTOR } else { this. evictionThread = null ; } this. scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), statThreadPeriod, statThreadPeriod , TimeUnit.SECONDS); }
public class CachedBlock implements HeapSize, Comparable<CachedBlock >
static enum BlockPriority { /** * Accessed a single time (used for scan -resistance) */ SINGLE, /** * Accessed multiple times */ MULTI, /** * Block from in -memory store */ MEMORY };
public CachedBlock(String blockName, ByteBuffer buf, long accessTime, boolean inMemory ) { this. blockName = blockName; this.buf = buf; this. accessTime = accessTime; this. size = ClassSize. align(blockName.length()) + ClassSize.align(buf.capacity()) + PER_BLOCK_OVERHEAD; //第一次缓存一个block时,假设inMemory为false(默认),那么会把这个CachedBlock的BlockPriority 设置为SINGLE, 否则为MEMORY。 if(inMemory ) { this. priority = BlockPriority. MEMORY; } else { this. priority = BlockPriority. SINGLE; } }
/** * Block has been accessed. Update its local access time. */ public void access(long accessTime) { this. accessTime = accessTime; // 当再次访问到时,假如此时CacheedBlock的BlockPriority的值是SINGLE,则把它变为MULTI if(this. priority == BlockPriority. SINGLE) { this. priority = BlockPriority. MULTI; } }
public int compareTo(CachedBlock that) { if(this. accessTime == that.accessTime ) return 0; return this.accessTime < that.accessTime ? 1 : -1; }
private class BlockBucket implements Comparable<BlockBucket >
public void cacheBlock(String blockName, ByteBuffer buf, boolean inMemory ) { //private final ConcurrentHashMap<String,CachedBlock> map, 维护了缓存映射 CachedBlock cb = map.get(blockName); //如果这个block已经被缓存了,那么就抛出一个运行时异常 if(cb != null) { throw new RuntimeException("Cached an already cached block" ); } //初始化一个新的CachedBlock cb = new CachedBlock(blockName, buf, count.incrementAndGet(), inMemory); //得到最新的heapsize long newSize = size.addAndGet(cb.heapSize()); //将新增的block放到map中 map.put(blockName, cb); //elements记录了目前缓存的数目 elements.incrementAndGet(); //假如最新的heapsize大于了acceptableSize(见下面的方法),那么就需要进行evict动作 if(newSize > acceptableSize() && ! evictionInProgress) { runEviction(); } } //----------------------------- //假如没有特定的清理线程,那么就使用目前的线程来进行evict,这显然不是一个好主意,会造成阻塞,假如有清理线程,那么调用其evict方法 private void runEviction() { if(evictionThread == null) { evict(); } else { evictionThread.evict(); //事实上是触发了清理线程的notify } }
private static class EvictionThread extends Thread { private WeakReference<LruBlockCache> cache; public EvictionThread(LruBlockCache cache) { super( "LruBlockCache.EvictionThread" ); setDaemon( true); this. cache = new WeakReference<LruBlockCache>(cache); } @Override //这里使用了wait和notify机制,线程将一直等待,知道有notify消息过来说需要进行清理了 public void run() { while( true) { synchronized(this ) { try { this.wait(); } catch(InterruptedException e) {} } //这里cache使用了弱引用 LruBlockCache cache = this.cache .get(); if(cache == null) break; cache.evict(); } } public void evict() { synchronized( this) { this.notify(); // FindBugs NN_NAKED_NOTIFY } } }
void evict () { // Ensure only one eviction at a time if(!evictionLock.tryLock()) return; try { evictionInProgress = true; long currentSize = this.size .get(); //需要释放掉的heap大小 long bytesToFree = currentSize - minSize(); if (LOG.isDebugEnabled()) { LOG.debug("Block cache LRU eviction started; Attempting to free " + StringUtils. byteDesc(bytesToFree) + " of total=" + StringUtils. byteDesc(currentSize)); } if(bytesToFree <= 0) return; // Instantiate priority buckets //初始化三个桶,来存放single,multi,和memory,比例分别为25%,50%,25% BlockBucket bucketSingle = new BlockBucket(bytesToFree, blockSize , singleSize()); BlockBucket bucketMulti = new BlockBucket(bytesToFree, blockSize , multiSize()); BlockBucket bucketMemory = new BlockBucket(bytesToFree, blockSize , memorySize()); // Scan entire map putting into appropriate buckets for(CachedBlock cachedBlock : map.values()) { switch(cachedBlock.getPriority()) { case SINGLE : { bucketSingle.add(cachedBlock); break; } case MULTI : { bucketMulti.add(cachedBlock); break; } case MEMORY : { bucketMemory.add(cachedBlock); break; } } } //接下来将三个桶放入PriorityQueue PriorityQueue<BlockBucket> bucketQueue = new PriorityQueue<BlockBucket>(3); //会调用到CachedBlockQueue的add方法,下面分析 bucketQueue.add(bucketSingle); bucketQueue.add(bucketMulti); bucketQueue.add(bucketMemory); int remainingBuckets = 3; long bytesFreed = 0; BlockBucket bucket; //溢出的多的那个桶,会越先被清理, 参看BlockBucket的compareTo方法 //这里也说明,三个桶本身没有优先级 while((bucket = bucketQueue.poll()) != null) { long overflow = bucket.overflow(); if(overflow > 0) { // 本次要释放掉的内存 long bucketBytesToFree = Math.min(overflow, (bytesToFree - bytesFreed) / remainingBuckets); //free方法在下面解释 bytesFreed += bucket.free(bucketBytesToFree); } remainingBuckets--; } if (LOG.isDebugEnabled()) { long single = bucketSingle.totalSize(); long multi = bucketMulti.totalSize(); long memory = bucketMemory.totalSize(); LOG.debug("Block cache LRU eviction completed; " + "freed=" + StringUtils.byteDesc(bytesFreed) + ", " + "total=" + StringUtils.byteDesc( this.size .get()) + ", " + "single=" + StringUtils.byteDesc(single) + ", " + "multi=" + StringUtils.byteDesc(multi) + ", " + "memory=" + StringUtils.byteDesc(memory)); } } finally { stats.evict(); evictionInProgress = false; evictionLock.unlock(); } }
public void add(CachedBlock cb) { //如果当前的heapsize小于maxsize,直接加到queue中,这边的queue也是一个PriorityQueue if(heapSize < maxSize) { queue.add(cb); heapSize += cb.heapSize(); } else { // 否则先取出列表头 CachedBlock head = queue.peek(); //判断假如的cb是不是比head大,实际的意义就是看新加入的cb是不是比head新,参看CachedBlock的compareTo方法,假如新,则继续 if(cb.compareTo(head) > 0) { heapSize += cb.heapSize(); heapSize -= head.heapSize(); if(heapSize > maxSize ) { //取出head queue.poll(); } else { heapSize += head.heapSize(); } queue.add(cb); } } }
public long free(long toFree) { //这边的queue是CacheBlockQueue类型,这个get方法很重要,它对PriorityQueue做了反序,这样的话就把时间最早的放在队列头 LinkedList<CachedBlock> blocks = queue.get(); long freedBytes = 0; for(CachedBlock cb: blocks) { freedBytes += evictBlock(cb); if(freedBytes >= toFree) { return freedBytes; } } return freedBytes; }
//最后调用这个方法将block从map中移除: protected long evictBlock(CachedBlock block) { map.remove(block.getName()); size.addAndGet(-1 * block.heapSize()); elements.decrementAndGet(); stats.evicted(); return block.heapSize(); }
请勿发布不友善或者负能量的内容。与人为善,比聪明更重要!
技术博客集 - 网站简介:
前后端技术:
后端基于Hyperf2.1框架开发,前端使用Bootstrap可视化布局系统生成
网站主要作用:
1.编程技术分享及讨论交流,内置聊天系统;
2.测试交流框架问题,比如:Hyperf、Laravel、TP、beego;
3.本站数据是基于大数据采集等爬虫技术为基础助力分享知识,如有侵权请发邮件到站长邮箱,站长会尽快处理;
4.站长邮箱:[email protected];
文章归档
文章标签
友情链接