hbase源码系列(十三)缓存机制MemStore与Block Cache

  1. 云栖社区>
  2. 博客>
  3. 正文

hbase源码系列(十三)缓存机制MemStore与Block Cache

岑玉海 2016-09-11 17:55:10 浏览1730
展开阅读全文

这一章讲hbase的缓存机制,这里面涉及的内容也是比较多,呵呵,我理解中的缓存是保存在内存中的特定的便于检索的数据结构就是缓存。

之前在讲put的时候,put是被添加到Store里面,这个Store是个接口,实现是在HStore里面,MemStore其实是它底下的小子。

那它和Region Server、Region是什么关系?

Region Server下面有若干个Region,每个Region下面有若干的列族,每个列族对应着一个HStore。

HStore里面有三个很重要的类,在这章的内容都会提到。

protected final MemStore memstore;
private final CacheConfig cacheConf;
final StoreEngine<?, ?, ?, ?> storeEngine;

MemStore是存储着两个有序的kv集合,kv进来先写到里面,超过阀值之后就会写入硬盘。

CacheConf是针对HFileBlock的缓存,专门用来缓存快,默认是在读的时候缓存块,也可以修改列族的参数,让它在写的时候也缓存,这个在数据模型定义的时候提到过。

StoreEngine是StoreFile的管理器,它管理着这个列族对应的所有StoreFiles。

1. MemStore

memstore比较有意思,我们先看它的add方法,这个是入口。

long add(final KeyValue kv) {
    this.lock.readLock().lock();
    try {
      KeyValue toAdd = maybeCloneWithAllocator(kv);
      return internalAdd(toAdd);
    } finally {
      this.lock.readLock().unlock();
    }
}

先把kv放到maybeCloneWithAllocator里面复制出来一个新的kv,然后再走internalAdd的方法,为啥要这么搞呢?

1.1 MemStoreLAB

先看maybeCloneWithAllocator,我们慢慢看,没关系。

private KeyValue maybeCloneWithAllocator(KeyValue kv) {
    if (allocator == null) {
      return kv;
    }
    int len = kv.getLength();
    //从allocator当中分配出来len长度的非堆空间
    Allocation alloc = allocator.allocateBytes(len);
    if (alloc == null) {
      // 太大了,allocator决定不给它分配
      return kv;
    }
    //用allocator生成的空间,new一个kv出来
    assert alloc != null && alloc.getData() != null;
    System.arraycopy(kv.getBuffer(), kv.getOffset(), alloc.getData(), alloc.getOffset(), len);
    KeyValue newKv = new KeyValue(alloc.getData(), alloc.getOffset(), len);
    newKv.setMvccVersion(kv.getMvccVersion());
    return newKv;
 }
allocator是何许人也,它是一个MemStoreLAB,它是干啥的呀,这个让人很纠结呀?
public Allocation allocateBytes(int size) {
    // 如果申请的size比maxAlloc大,就不分了
    if (size > maxAlloc) {
      return null;
    }

    while (true) {
      Chunk c = getOrMakeChunk();

      // 给它分配个位置,返回数组的起始位置
      int allocOffset = c.alloc(size);
      if (allocOffset != -1) {
        // 用一个数据结构Allocation来描述这个,它主要包括两个信息,1:数组的引用,2:数据在数组当中的起始位置
        return new Allocation(c.data, allocOffset);
      }

      // 空间不足了,释放掉它
      tryRetireChunk(c);
    }
}
下面看看getOrMakeChunk看看是啥情况,挺疑惑的东西。
private Chunk getOrMakeChunk() {
    while (true) {
      // 当前的Chunk不为空,就取当前的
      Chunk c = curChunk.get();
      if (c != null) {
        return c;
      }
      // 这里还有个Chunk的Pool,默认是没有的,走的是new Chunk这条路径
      c = (chunkPool != null) ? chunkPool.getChunk() : new Chunk(chunkSize);
      if (curChunk.compareAndSet(null, c)) {
        // curChunk是为空的话,就设置为c,然后加到chunkQueue里面
        c.init();
        this.chunkQueue.add(c);
        return c;
      } else if (chunkPool != null) {
        // 先放回去,待会儿再拿出来
        chunkPool.putbackChunk(c);
      }      
    }
}
Chunk是一个持有一个byte[]数组的数据结构,属性如下。
static class Chunk {
    /* 实际数据保存的地方,被不停地分配 */
    private byte[] data;
    private static final int UNINITIALIZED = -1;
    private static final int OOM = -2;
    /* 下一个chunk的起始位置,也是上一个chunk的结束位置 */
    private AtomicInteger nextFreeOffset = new AtomicInteger(UNINITIALIZED);

    /** 分配给了多少个kv */
    private AtomicInteger allocCount = new AtomicInteger();

    /** Chunk的大小 */
    private final int size;

好吧,我们现在清楚了,它是给每个kv的数据又重新找了个地方混,从注释上面讲这个Chunk未初始化,没有被分配内存,所以开销小。不太理解这个东西,人家之前也是在byte数组里面混,只不顾挪了个窝了,莫非是为了减少内存碎片?尼玛,还真被我说中了,在我以前的资料里面有《调优》

不管怎么样吧,把多个小的kv写到一个连续的数组里面可能是好点好处吧,下面讲一下它的相关参数吧。

/** 可分配的最大值,超过这个值就不给它分配了,默认值是256K */
hbase.hregion.memstore.mslab.max.allocation 默认值是256  * 1024
/** 每个Chunk的大小,默认是2M */
hbase.hregion.memstore.mslab.chunksize 默认值是2048 * 1024

那我们继续讲讲这个MemStoreChunkPool吧,它默认是不被开启的,因为它的参数hbase.hregion.memstore.chunkpool.maxsize默认是0 (只允许输入0->1的数值),它是通过堆内存的最大值*比例来计算得出来的结果。

它可以承受的最大的Chunk的数量是这么计算的 MaxCount = MemStore内存限制 * Chunkpool.Maxsize / Chunksize。

MemStore的内存最大最小值分别是0.35 --> 0.4,这个在我之前的博客里面也有。

hbase.regionserver.global.memstore.upperLimit 
hbase.regionserver.global.memstore.lowerLimit

还有这个参数hbase.hregion.memstore.chunkpool.initialsize需要设置,默认又是0,输入0->1的数值,MaxCount乘以它就设置初始的Chunk大小。

没试过开启这个Pool效果是否会好,它是依附在MemStore里面的,它设置过大了,最直接的影响就是,另外两个集合的空间就小了。

1.2 有序集合

分配完Chunk之后,干的是这个函数,就是添加到一个有序集合当中kvset。

private long internalAdd(final KeyValue toAdd) {
    long s = heapSizeChange(toAdd, addToKVSet(toAdd));
    //把时间戳范围加到内部去
    timeRangeTracker.includeTimestamp(toAdd);
    this.size.addAndGet(s);
    return s;
}

MemStore里面有两个有序的集合,kvset和snapshot,KeyValueSkipListSet的内部实现是ConcurrentNavigableMap。

volatile KeyValueSkipListSet kvset;
volatile KeyValueSkipListSet snapshot;

它们的排序规则上一章已经说过了,排过序的在搜索的时候方便查找,这里为什么还有一个snapshot呢?snapshot是一个和它一样的东西,我们都知道MemStore是要flush到文件生成StoreFile的,那我不能写文件的时候让别人都没法读了吧,那怎么办,先把它拷贝到snapshot当中,这个时间很短,复制完了就可以访问kvset,实际flush的之后,我们flush掉snapshot当中的kv就可以啦。

2. CacheConfig

在看这个之前,先推荐看一下我的另外一篇文章《缓存机制以及可以利用SSD作为存储的BucketCache》,否则后面有很多概念,你看不懂的。

这里我们主要关注的是LruBlockCache和BucketCache,至于他们的使用,请参照上面的博客设置,这里不再介绍哦。

CacheConfig是一个HStore一个,属性是根据列族定制的,比如是否常驻内存,但是它内存用来缓存块的BlockCache是Region Server全局共享的的globalBlockCache,在new一个CacheConfig的时候,它会调用instantiateBlockCache方法返回一个BlockCache缓存Block的,如果已经存在globalBlockCache,就直接返回,没有才会重新实例化一个globalBlockCache。

这里还分堆上内存和直接分配的内存,堆上的内存的参数hfile.block.cache.size默认是0.25。

2.1 DoubleCache

直接分配的内存,要通过设置JVM参数-XX:MaxDirectMemorySize来设置,设置了这个之后我们还需要设置hbase.offheapcache.percentage(默认是0)来设置占直接分配内存的比例。

offHeapCacheSize =offheapcache.percentage * DirectMemorySize

这里我们还真不能设置它,因为如果设置了它的话,它会把new一个DoubleCache出来,它是LruBlockCache和SlabCache的合体,之前我提到的那篇文章里面说到SlabCache是一个只能存固定大小的Block大小的Cache,比较垃圾。

2.2 LruBlockCache

如果offHeapCacheSize <= 0,就走下面的逻辑,这里我就简单陈述一下了,代码没啥可贴的。

LruBlockCache和BucketCache的合作方式有两种,一种是BucketCache作为二级缓存使用,比如SSD,一种是在内存当中,它俩各占比列0.1和0.9,还是建议上SSD做二级缓存,其实也不贵。

不管如何,BlockCache这块的总大小是固定的,是由这个参数决定hfile.block.cache.size,默认它是0.25,所以LruBlockCache最大也就是0.25的最大堆内存。

在LruBlockCache当中还分了三种优先级的缓存块,分别是SINGLE、MULTI、MEMORY,比列分别是0.25、0.5、0.25,当快要满的时候,要把块剔除出内存的时候,就要遍历所有的块了,然后计算他们的分别占的比例,剔除的代码还挺有意思。

PriorityQueue<BlockBucket> bucketQueue =
        new PriorityQueue<BlockBucket>(3);

      bucketQueue.add(bucketSingle);
      bucketQueue.add(bucketMulti);
      bucketQueue.add(bucketMemory);

      int remainingBuckets = 3;
      long bytesFreed = 0;

      BlockBucket bucket;
      while((bucket = bucketQueue.poll()) != null) {
        long overflow = bucket.overflow();
        if(overflow > 0) {
          //把要释放的空间bytesToFree分给3个bucket,3个分完
          long bucketBytesToFree = Math.min(overflow,
            (bytesToFree - bytesFreed) / remainingBuckets);
          bytesFreed += bucket.free(bucketBytesToFree);
        }
        remainingBuckets--;
      }

搞了一个优先级队列,先从SINGLE的开刀、SINGLE不行了,再拿MULTI开刀,最后是MEMORY。bytesToFree是之前计算好的,要释放的大小=当前值-最小值。

在我们设置列族参数的时候,有一个InMemory的参数,如果设置了它就是MEMORY,如果没设置,就是SINGLE,SINGLE类型的一旦被访问过之后,立马变成高富帅的MULTI,但是没有希望变成MEMORY。

这里之前百度的一个哥么问我,Meta表的块会不会一直被保存在MEMORY当中呢,这块的代码写得让人有点儿郁闷的,它是按照列族的参数设置的,但是我怎么去找Meta表的列族设置啊,啊被我找到了,在代码里面写着的。

public static final HTableDescriptor META_TABLEDESC = new HTableDescriptor(
      TableName.META_TABLE_NAME,
      new HColumnDescriptor[] {
          new HColumnDescriptor(HConstants.CATALOG_FAMILY)
              // 保持10个版本是为了帮助调试
              .setMaxVersions(10)
              .setInMemory(true)
              .setBlocksize(8 * 1024)
              .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
              // 不使用BloomFilter
              .setBloomFilterType(BloomType.NONE)
});

可以看出来Meta表的块只有8K,常驻内存,不使用BloomFilter,允许集群间复制。

再吐槽一下hbase这个Lru算法吧,做得挺粗糙的,它记录了每个Block块的访问次数,但是它并没有按照这个来排序,就是简单的依赖哈希值来排序。

Tips:江湖传言一个Regionserver上有一个BlockCache和N个Memstore,它们的大小之和不能大于等于heapsize * 0.8,否则HBase不能正常启动,想想也是,hbase是内存大户,内存稍有不够就挂掉,大家要小心设置这个缓存的参数。

 2.3 BucketCache

原来这块的图在上面的那篇文章已经提到了,我就不再重复了,之前没看的请一定要看,那边有很详细的图解,我这里只是讲点我了解的实现。

我们可以从两个方法里面看LruBlockCache和BucketCache的关系,一个是getBlock,一个是evictBlock,先看evictBlock。

protected long evictBlock(CachedBlock block, boolean evictedByEvictionProcess) {
  //从map里面删除
    map.remove(block.getCacheKey());if (evictedByEvictionProcess && victimHandler != null) {
      boolean wait = getCurrentSize() < acceptableSize();
      boolean inMemory = block.getPriority() == BlockPriority.MEMORY;
   //保存到victimHandler里面
      victimHandler.cacheBlockWithWait(block.getCacheKey(), block.getBuffer(),
          inMemory, wait);
    }
    return block.heapSize()
}

在把block剔除出内存之后,就把块加到victimHandler里面,这个victimHandler就是BucketCache,在CacheConfig实例化LruBlockCache之后就用setVictimCache方法传进去的。

看完这个我们再看getBlock。

public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat) {
    CachedBlock cb = map.get(cacheKey);
    if(cb == null) {if (victimHandler != null)
        return victimHandler.getBlock(cacheKey, caching, repeat);
      return null;
    }
    return cb.getBuffer();
}

 先从map中取,如果找不到就从victimHandler中取得。

从上面两个方法,我们可以看出来BucketCache是LruBlockCache的二级缓存,它不要了才会存到BucketCache当中,取得时候也是,找不到了才想起人家来。

好,我们现在进入到BucketCache里面看看,它里面有几个重要的属性。

// Store/read block data
IOEngine ioEngine;
// 内存map
private ConcurrentHashMap<BlockCacheKey, RAMQueueEntry> ramCache;
// 后备队列,质保存块的索引信息,比如offset, length
private ConcurrentHashMap<BlockCacheKey, BucketEntry> backingMap;

这里怎么又来了两个,一个内存的,一个后备队里的,这个是有区别的RAMQueueEntry当中直接保存了块的buffer数据,BucketEntry只是保存了起始位置和长度。

下面我们看看这个流程吧,还是老规矩,先看入口,再看出口,入口在哪里,前面的代码中提到了,入口在cacheBlockWithWait方法。

//已经有就不加啦
    if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey))
      return;
    //写入一级缓存
    RAMQueueEntry re = new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory);
    ramCache.put(cacheKey, re);
    //用哈希值给计算出一个随机的队列来
    int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size();
    //把实体也插入到写入队列
    BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum);

可以看得出来在这个方法当中,先把块写入到ramCache当中,然后再插入到一个随机的写入队列,写入线程有3个,每个写入线程持有一个写入队列,线程的数量由参数hbase.bucketcache.writer.threads控制。

我们看看这个WriterThread的run方法吧。

List<RAMQueueEntry> entries = new ArrayList<RAMQueueEntry>();
      try {
        while (cacheEnabled && writerEnabled) {
          try {
            //从inputQueue拿出来放到entries,然后再对entries操作
            entries.add(inputQueue.take());
            inputQueue.drainTo(entries);
          } catch (InterruptedException ie) {
            if (!cacheEnabled) break;
          }
          doDrain(entries);
        }

那我们要关注的就是doDrain的方法了,在这个方法里面,它主要干了4件事情。

1、把ramCache当中的实体给剔除出来转换成BucketEntry,并切入到ioEngine。

2、ioEngine同步,ioEngine包括3种(file,offheap,heap),第一种就是写入SSD,用的是FileChannel,后两种是写入到一个ByteBufferArray

3、把BucketEntry添加到backingMap

4、如果空间不足的话,调用freeSpace清理空间,清理空间的方法和LruBlockCache的方法类似。

这里面的Bucket它也不是一个具体的东西,它里面记住的也是起始位置,使用了多少次的这些参数,所以说它是一个逻辑上的,而不是物理上的分配的一块随机的地址。

final private static class Bucket {
    //基准起始位置
    private long baseOffset;
    //每个item分配的大小
    private int itemAllocationSize; 
    //对应的在bucketSizeInfos中的位置
    private int sizeIndex;
    //总容量
    private int itemCount;
    private int freeList[];
    //空闲的数量
    private int freeCount;
    //已经使用的数量
    private int usedCount;
}

我们是不是可以这么理解:就是当我们不需要某个块的时候我们不用去物理的删除它,只需要不断的重用它里面的空间就可以了,而不需要管怎么删除、释放等相关内容。

BucketSizeInfo是负责管理这些Bucket的,它管理着3个队列,同时它可以动态根据需求,new一些新的不同大小的Bucket出来,也可以把现有的Bucket变更它的大小,Bucket的大小最小是5K,最大是513K。

final class BucketSizeInfo {
    // Free bucket means it has space to allocate a block;
    // Completely free bucket means it has no block.
    private List<Bucket> bucketList, freeBuckets, completelyFreeBuckets;
    private int sizeIndex;
}

sizeIndex是啥意思?是在BucketSizeInfo的数组里面的位置,它的大小都是有固定的值的,不能多也不能少,这里就不详细介绍了。我们直接看WriteToCache这个方法吧,好验证一下之前的想法。

//序列化长度 = 数据长度 + 额外的序列化的长度16个字节
      int len = data.getSerializedLength();
      // This cacheable thing can't be serialized...
      if (len == 0) return null;
      //bucketAllocator给分配点空间
      long offset = bucketAllocator.allocateBlock(len);
      //生成一个实体
      BucketEntry bucketEntry = new BucketEntry(offset, len, accessTime, inMemory);
      //设置Deserializer,具体的实现在HFileBlock当中
      bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap);
      try {
        if (data instanceof HFileBlock) {
          ByteBuffer sliceBuf = ((HFileBlock) data).getBufferReadOnlyWithHeader();
          sliceBuf.rewind();
          assert len == sliceBuf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE;
          ByteBuffer extraInfoBuffer = ByteBuffer.allocate(HFileBlock.EXTRA_SERIALIZATION_SPACE);
          ((HFileBlock) data).serializeExtraInfo(extraInfoBuffer);
          //先写入数据信息,再写入头信息
          ioEngine.write(sliceBuf, offset);
          ioEngine.write(extraInfoBuffer, offset + len - HFileBlock.EXTRA_SERIALIZATION_SPACE);
        } else {
          //如果不是HFileBlock的话,把数据序列化到bb当中,然后写入到IOEngine
          ByteBuffer bb = ByteBuffer.allocate(len);
          data.serialize(bb);
          ioEngine.write(bb, offset);
        }
      } catch (IOException ioe) {
        // 出错了就释放掉这个这个块
        bucketAllocator.freeBlock(offset);
        throw ioe;
    }

这里我们看这一句就可以了ioEngine.write(sliceBuf, offset);  在写入ioEngine的时候是要传这个offset的,也正好验证了我之前的想法,所以BucketAllocator.allocateBlock的分配管理这块就很关键了。

关于怎么分配这块,还是留个能人讲吧,我是讲不好了。

网友评论

登录后评论
0/500
评论
岑玉海
+ 关注