MySQL · RocksDB · Level Compact 分析

  1. 云栖社区>
  2. 阿里云数据库ApsaraDB>
  3. 博客>
  4. 正文

MySQL · RocksDB · Level Compact 分析

db匠 2018-10-22 09:00:17 浏览1375
展开阅读全文

综述

在RocksDB中,将MemTable刷新到磁盘之后,将会有很多sstable,而这些sstable则是可能包含了相同的key的不同时间的值,这样子就会导致两个问题:

  1. 浪费磁盘空间
  2. 读取内容将会非常慢.

而compact就是用来解决上面两个问题的,简单来说compact就是读取几个sstable然后合并为一个(或者多个)sstable. 而什么时候合并,合并的时候如何来挑选sstable,这个就是compcation strategy.一般来说compact strategy的目的都是为了更低的amplification:

  • 避免一次读请求读取太多的sstables.
    • 读放大
  • 避免一些临时数据(deleted/overwritten/expired)在磁盘上停留时间过长
  • 避免磁盘上临时空间过大
    • 空间放大
  • 避免compact相同的数据太多次
    • 写放大

而在RockDB中实现了多种compact strategy,不同的strategy有不同的侧重,这里我们只分析默认的strategy, 那就是leveled-N compaction.

在Leveled compaction中,所有的SSTables被分为很多levels(level0/1/2/3…).

  • 最新的SSTable(从memtable中刷新下来的)是属于Level0
    • 每一个SSTable都是有序的
    • 只有Level0的SSTable允许overlap
  • 除了level0之外其他的level的总的SSTable大小有一个最大的限制
    • 通过level_compaction_dynamic_level_bytes来计算
  • 在Level0,如果积攒够了足够的(level0_file_num_compaction_trigger)SSTable,则就会进行compact.
    • 一般来说会把全部的SSTables compact到下一个level(Level1).
    • 不会写一个很大的SSTable,
  • 一般来说百分之90的空间都是给最后一级level的.

源码

Compact运行的条件

先来看在RocksDB中是什么时候会引起compact.在RocksDB中所有的compact都是在后台线程中进行的,这个线程就是BGWorkCompaction.这个线程只有在两种情况下被调用,一个是 手动compact(RunManualCompaction),一个就是自动(MaybeScheduleFlushOrCompaction),我们主要来看自动的compact,而MaybeScheduleFlushOrCompaction这个函数我们在之前介绍flush的时候已经介绍过了,简单来说就是会在切换WAL(SwitchWAL)或者writebuffer满的时候(HandleWriteBufferFull)被调用.

我们来看在MaybeScheduleFlushOrCompaction中compact的调用.这里可以看到RocksDB中后台运行的compact会有一个限制(max_compactions).而我们可以看到这里还有一个变量 unscheduled_compactions_,这个变量表示需要被compact的columnfamily的队列长度.

  while (bg_compaction_scheduled_ < bg_job_limits.max_compactions &&
         unscheduled_compactions_ > 0) {
    CompactionArg* ca = new CompactionArg;
    ca->db = this;
    ca->prepicked_compaction = nullptr;
    bg_compaction_scheduled_++;
    unscheduled_compactions_--;
    env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
                   &DBImpl::UnscheduleCallback);
  }

类似flush的逻辑,compact的时候RocksDB也有一个队列叫做DBImpl::compaction_queue_.

  std::deque<ColumnFamilyData*> compaction_queue_;

然后我们来看这个队列何时被更新,其中unscheduled_compactions_和队列的更新是同步的,因此只有compaction_queue_更新之后,调用compact后台线程才会进入compact处理.

void DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd) {
  if (!cfd->queued_for_compaction() && cfd->NeedsCompaction()) {
    AddToCompactionQueue(cfd);
    ++unscheduled_compactions_;
  }
}

上面的核心函数是NeedsCompaction,通过这个函数来判断是否有sst需要被compact,因此接下来我们就来详细分析这个函数.当满足下列几个条件之一就将会更新compact队列

  • 有超时的sst(ExpiredTtlFiles)
  • files_marked_for_compaction_或者bottommost_files_marked_for_compaction_都不为空
    • 后面会介绍这两个队列
  • 遍历所有的level的sst,然后判断是否需要compact
    • 最核心的条件(上面两个队列都是在这里更新的).
bool LevelCompactionPicker::NeedsCompaction(
    const VersionStorageInfo* vstorage) const {
  if (!vstorage->ExpiredTtlFiles().empty()) {
    return true;
  }
  if (!vstorage->BottommostFilesMarkedForCompaction().empty()) {
    return true;
  }
  if (!vstorage->FilesMarkedForCompaction().empty()) {
    return true;
  }
  for (int i = 0; i <= vstorage->MaxInputLevel(); i++) {
    if (vstorage->CompactionScore(i) >= 1) {
      return true;
    }
  }
  return false;
}

因此接下来我们来分析最核心的CompactionScore,这里将会涉及到两个变量,这两个变量分别保存了level以及每个level所对应的score(这里score越高表示compact优先级越高),而score小于1则表示不需要compact.

  std::vector<double> compaction_score_;
  std::vector<int> compaction_level_;

这两个vector是在VersionStorageInfo::ComputeCompactionScore中被更新,因此我们来看这个函数,这个函数中会对level-0和其他的level区别处理。 首先来看level-0的处理:

  1. 首先会计算level-0下所有文件的大小(total_size)以及文件个数(num_sorted_runs).
  2. 用文件个数除以level0_file_num_compaction_trigger来得到对应的score
  3. 如果当前不止一层level,那么将会从上面的score和(total_size/max_bytes_for_level_base)取最大值.

之所以要做第三步,主要还是为了防止level-0的文件size过大,那么当它需要compact的时候有可能会需要和level-1 compact,那么此时就有可能会有一个很大的compact.

if (level == 0) {
      int num_sorted_runs = 0;
      uint64_t total_size = 0;
      for (auto* f : files_[level]) {
        if (!f->being_compacted) {
          total_size += f->compensated_file_size;
          num_sorted_runs++;
        }
      }
.........................
      score = static_cast<double>(num_sorted_runs) /
                mutable_cf_options.level0_file_num_compaction_trigger;
        if (compaction_style_ == kCompactionStyleLevel && num_levels() > 1) {
          score = std::max(
              score, static_cast<double>(total_size) /
                     mutable_cf_options.max_bytes_for_level_base);
        }
      }

然后是非level-0的处理,这里也是计算level的文件大小然后再除以MaxBytesForLevel,然后得到当前level的score.

      uint64_t level_bytes_no_compacting = 0;
      for (auto f : files_[level]) {
        if (!f->being_compacted) {
          level_bytes_no_compacting += f->compensated_file_size;
        }
      }
      score = static_cast<double>(level_bytes_no_compacting) /
              MaxBytesForLevel(level);

上面我们看到有一个MaxBytesForLevel,这个函数的作用就是得到当前level的最大的文件大小.而这个函数实现也很简单.

uint64_t VersionStorageInfo::MaxBytesForLevel(int level) const {
  // Note: the result for level zero is not really used since we set
  // the level-0 compaction threshold based on number of files.
  assert(level >= 0);
  assert(level < static_cast<int>(level_max_bytes_.size()));
  return level_max_bytes_[level];
}

可以看到核心就是level_max_bytes_这个数组,接下来我们就来看这个数组是在哪里被初始化的。level_max_bytes这个数组是在VersionStorageInfo::CalculateBaseBytes 这个函数中被初始化,这里RocksDB有一个option叫做level_compaction_dynamic_level_bytes,这个配置如果被设置,那么level_max_bytes将会这样 设置(这里我们只关注level):

  • 如果是level-1那么level-1的的文件大小限制为options.max_bytes_for_level_base.
  • 如果level大于1那么当前level-i的大小限制为(其中max_bytes这两个变量都是options中设置的)
    Target_Size(Ln+1) = Target_Size(Ln) * max_bytes_for_level_multiplier * max_bytes_for_level_multiplier_additional[n].
    

    举个例子,如果max_bytes_for_level_base=1024,max_bytes_for_level_multiplier=10,然后max_bytes_for_level_multiplier_additional未设置,那么L1, L2,L3的大小限制分别为1024,10240,102400.

下面是对应代码.

  if (!ioptions.level_compaction_dynamic_level_bytes) {
    base_level_ = (ioptions.compaction_style == kCompactionStyleLevel) ? 1 : -1;

    // Calculate for static bytes base case
    for (int i = 0; i < ioptions.num_levels; ++i) {
      if (i == 0 && ioptions.compaction_style == kCompactionStyleUniversal) {
        level_max_bytes_[i] = options.max_bytes_for_level_base;
      } else if (i > 1) {
        level_max_bytes_[i] = MultiplyCheckOverflow(
            MultiplyCheckOverflow(level_max_bytes_[i - 1],
                                  options.max_bytes_for_level_multiplier),
            options.MaxBytesMultiplerAdditional(i - 1));
      } else {
        level_max_bytes_[i] = options.max_bytes_for_level_base;
      }
    }
  }

然后我们来看如果设置了level_compaction_dynamic_level_bytes会如何来计算.如果设置了dynamic,那么就说明每次计算出来的每个level的最大值都是不一样的, 首先我们要知道调用CalculateBaseBytes是在每次创建version的时候。因此他是这样计算的.最大的level(num_levels -1 )的大小限制是不计入计算的,然后就是这样计算.

Target_Size(Ln-1) = Target_Size(Ln) / max_bytes_for_level_multiplier

举个例子,假设调用CalculateBaseBytes的时候,max_bytes_for_level_base是1G,然后num_levels = 6,然后当前最大的level的大小为256G,那么从L1-L6的大小是 0, 0, 0.276GB, 2.76GB, 27.6GB 和 276GB.

首先计算第一个非空的level.

    for (int i = 1; i < num_levels_; i++) {
      uint64_t total_size = 0;
      for (const auto& f : files_[i]) {
        total_size += f->fd.GetFileSize();
      }
      if (total_size > 0 && first_non_empty_level == -1) {
        first_non_empty_level = i;
      }
      if (total_size > max_level_size) {
        max_level_size = total_size;
      }
    }

得到最小的那个非0的level的size.

      uint64_t base_bytes_max = options.max_bytes_for_level_base;
      uint64_t base_bytes_min = static_cast<uint64_t>(
          base_bytes_max / options.max_bytes_for_level_multiplier);

      // Try whether we can make last level's target size to be max_level_size
      uint64_t cur_level_size = max_level_size;
      for (int i = num_levels_ - 2; i >= first_non_empty_level; i--) {
        // Round up after dividing
        cur_level_size = static_cast<uint64_t>(
            cur_level_size / options.max_bytes_for_level_multiplier);
      }

找到base_level_size,一般来说也就是cur_level_size.

        // Find base level (where L0 data is compacted to).
        base_level_ = first_non_empty_level;
        while (base_level_ > 1 && cur_level_size > base_bytes_max) {
          --base_level_;
          cur_level_size = static_cast<uint64_t>(
              cur_level_size / options.max_bytes_for_level_multiplier);
        }
        if (cur_level_size > base_bytes_max) {
          // Even L1 will be too large
          assert(base_level_ == 1);
          base_level_size = base_bytes_max;
        } else {
          base_level_size = cur_level_size;
        }

然后给level_max_bytes_ 赋值

      uint64_t level_size = base_level_size;
      for (int i = base_level_; i < num_levels_; i++) {
        if (i > base_level_) {
          level_size = MultiplyCheckOverflow(
              level_size, options.max_bytes_for_level_multiplier);
        }
        // Don't set any level below base_bytes_max. Otherwise, the LSM can
        // assume an hourglass shape where L1+ sizes are smaller than L0. This
        // causes compaction scoring, which depends on level sizes, to favor L1+
        // at the expense of L0, which may fill up and stall.
        level_max_bytes_[i] = std::max(level_size, base_bytes_max);
      }
    }

Compact实现细节

分析完毕何时会触发Compact,那么我们接下来来分析如何Compact.其中Compact的所有操作都在DBImpl::BackgroundCompaction中进行,因此接下来我们来分析 这个函数. 首先是从compaction_queue_队列中读取第一个需要compact的column family.

    // cfd is referenced here
    auto cfd = PopFirstFromCompactionQueue();
    // We unreference here because the following code will take a Ref() on
    // this cfd if it is going to use it (Compaction class holds a
    // reference).
    // This will all happen under a mutex so we don't have to be afraid of
    // somebody else deleting it.
    if (cfd->Unref()) {
      delete cfd;
      // This was the last reference of the column family, so no need to
      // compact.
      return Status::OK();
    }

然后就是选取当前CF中所需要compact的内容.

      c.reset(cfd->PickCompaction(*mutable_cf_options, log_buffer));

从上面可以看到PickCompaction这个函数,而这个函数会根据设置的不同的Compact策略调用不同的方法,这里我们只看默认的LevelCompact的对应函数.

Compaction* LevelCompactionBuilder::PickCompaction() {
  // Pick up the first file to start compaction. It may have been extended
  // to a clean cut.
  SetupInitialFiles();
  if (start_level_inputs_.empty()) {
    return nullptr;
  }
  assert(start_level_ >= 0 && output_level_ >= 0);

  // If it is a L0 -> base level compaction, we need to set up other L0
  // files if needed.
  if (!SetupOtherL0FilesIfNeeded()) {
    return nullptr;
  }

  // Pick files in the output level and expand more files in the start level
  // if needed.
  if (!SetupOtherInputsIfNeeded()) {
    return nullptr;
  }

  // Form a compaction object containing the files we picked.
  Compaction* c = GetCompaction();

  TEST_SYNC_POINT_CALLBACK("LevelCompactionPicker::PickCompaction:Return", c);

  return c;
}

这里PickCompaction分别调用了三个主要的函数.

  • SetupInitialFiles 这个函数主要用来初始化需要Compact的文件.
  • SetupOtherL0FilesIfNeeded 如果需要compact的话,那么还需要再设置对应的L0文件
  • SetupOtherInputsIfNeeded 选择对应的输出文件

先来看SetupInitialFiles,这个函数他会遍历所有的level,然后来选择对应需要compact的input和output.

这里可看到,他会从之前计算好的的compact信息中得到对应的score.

void LevelCompactionBuilder::SetupInitialFiles() {
  // Find the compactions by size on all levels.
  bool skipped_l0_to_base = false;
  for (int i = 0; i < compaction_picker_->NumberLevels() - 1; i++) {
    start_level_score_ = vstorage_->CompactionScore(i);
    start_level_ = vstorage_->CompactionScoreLevel(i);
    assert(i == 0 || start_level_score_ <= vstorage_->CompactionScore(i - 1));
................................................................
  }

只有当score大于一才有必要进行compact的处理(所有操作都在上面的循环中).这里可以看到如果是level0的话,那么output_level 则是vstorage_->base_level(),否则就是level+1. 这里base_level()可以认为就是level1或者是最小的非空的level(之前CalculateBaseBytes中计算).

if (start_level_score_ >= 1) {
      if (skipped_l0_to_base && start_level_ == vstorage_->base_level()) {
        // If L0->base_level compaction is pending, don't schedule further
        // compaction from base level. Otherwise L0->base_level compaction
        // may starve.
        continue;
      }
      output_level_ =
          (start_level_ == 0) ? vstorage_->base_level() : start_level_ + 1;
      if (PickFileToCompact()) {
        // found the compaction!
        if (start_level_ == 0) {
          // L0 score = `num L0 files` / `level0_file_num_compaction_trigger`
          compaction_reason_ = CompactionReason::kLevelL0FilesNum;
        } else {
          // L1+ score = `Level files size` / `MaxBytesForLevel`
          compaction_reason_ = CompactionReason::kLevelMaxLevelSize;
        }
        break;
      } else {
        // didn't find the compaction, clear the inputs
  ......................................................
        }
      }
    }

上面的代码中我们可以看到最终是通过PickFileToCompact来选择input以及output文件.因此我们接下来就来分这个函数.

首先是得到当前level(start_level_)的未compacted的最大大小的文件

  // Pick the largest file in this level that is not already
  // being compacted
  const std::vector<int>& file_size =
      vstorage_->FilesByCompactionPri(start_level_);
  const std::vector<FileMetaData*>& level_files =
      vstorage_->LevelFiles(start_level_);

紧接着就是这个函数最核心的功能了,它会开始遍历当前的输入level的所有待compact的文件,然后选择一些合适的文件然后compact到下一个level.

unsigned int cmp_idx;
  for (cmp_idx = vstorage_->NextCompactionIndex(start_level_);
       cmp_idx < file_size.size(); cmp_idx++) {
..........................................    
  }

然后我们来详细分析上面循环中所做的事情 首先选择好文件之后,将会扩展当前文件的key的范围,得到一个”clean cut”的范围, 这里”clean cut”是这个意思,假设我们有五个文件他们的key range分别为:

    f1[a1 a2] f2[a3 a4] f3[a4 a6] f4[a6 a7] f5[a8 a9]

如果我们第一次选择了f3,那么我们通过clean cut,则将还会选择f2,f4,因为他们都是连续的. 选择好之后,会再做一次判断,这次是判断是否正在compact的out_level的文件范围是否和我们选择好的文件的key有重合,如果有,则跳过这个文件. 这里之所以会有这个判断,主要原因还是因为compact是会并行的执行的.

int index = file_size[cmp_idx];
    auto* f = level_files[index];

    // do not pick a file to compact if it is being compacted
    // from n-1 level.
    if (f->being_compacted) {
      continue;
    }

    start_level_inputs_.files.push_back(f);
    start_level_inputs_.level = start_level_;
    if (!compaction_picker_->ExpandInputsToCleanCut(cf_name_, vstorage_,
                                                    &start_level_inputs_) ||
        compaction_picker_->FilesRangeOverlapWithCompaction(
            {start_level_inputs_}, output_level_)) {
      // A locked (pending compaction) input-level file was pulled in due to
      // user-key overlap.
      start_level_inputs_.clear();
      continue;
    }

选择好输入文件之后,接下来就是选择输出level中需要一起被compact的文件(output_level_inputs). 实现也是比较简单,就是从输出level的所有文件中找到是否有和上面选择好的input中有重合的文件,如果有,那么则需要一起进行compact.

    InternalKey smallest, largest;
    compaction_picker_->GetRange(start_level_inputs_, &smallest, &largest);
    CompactionInputFiles output_level_inputs;
    output_level_inputs.level = output_level_;
    vstorage_->GetOverlappingInputs(output_level_, &smallest, &largest,
                                    &output_level_inputs.files);
    if (!output_level_inputs.empty() &&
        !compaction_picker_->ExpandInputsToCleanCut(cf_name_, vstorage_,
                                                    &output_level_inputs)) {
      start_level_inputs_.clear();
      continue;
    }
    base_index_ = index;
    break;

继续分析PickCompaction,我们知道在RocksDB中level-0会比较特殊,那是因为只有level-0中的文件是无序的,而在上面的操作中, 我们是假设在非level-0,因此接下来我们需要处理level-0的情况,这个函数就是SetupOtherL0FilesIfNeeded.

这里如果start_level_为0,也就是level-0的话,才会进行下面的处理,就是从level-0中得到所有的重合key的文件,然后加入到start_level_inputs中.

  if (start_level_ == 0 && output_level_ != 0) {
    // Two level 0 compaction won't run at the same time, so don't need to worry
    // about files on level 0 being compacted.
    assert(compaction_picker_->level0_compactions_in_progress()->empty());
    InternalKey smallest, largest;
    compaction_picker_->GetRange(start_level_inputs_, &smallest, &largest);
    // Note that the next call will discard the file we placed in
    // c->inputs_[0] earlier and replace it with an overlapping set
    // which will include the picked file.
    start_level_inputs_.files.clear();
    vstorage_->GetOverlappingInputs(0, &smallest, &largest,
                                    &start_level_inputs_.files);

    // If we include more L0 files in the same compaction run it can
    // cause the 'smallest' and 'largest' key to get extended to a
    // larger range. So, re-invoke GetRange to get the new key range
    compaction_picker_->GetRange(start_level_inputs_, &smallest, &largest);
    if (compaction_picker_->IsRangeInCompaction(
            vstorage_, &smallest, &largest, output_level_, &parent_index_)) {
      return false;
    }
  }

假设start_level_inputs被扩展了,那么对应的output也需要被扩展,因为非level0的其他的level的文件key都是不会overlap的. 那么此时就是会调用SetupOtherInputsIfNeeded.

  if (output_level_ != 0) {
    output_level_inputs_.level = output_level_;
    if (!compaction_picker_->SetupOtherInputs(
            cf_name_, mutable_cf_options_, vstorage_, &start_level_inputs_,
            &output_level_inputs_, &parent_index_, base_index_)) {
      return false;
    }

    compaction_inputs_.push_back(start_level_inputs_);
    if (!output_level_inputs_.empty()) {
      compaction_inputs_.push_back(output_level_inputs_);
    }

    // In some edge cases we could pick a compaction that will be compacting
    // a key range that overlap with another running compaction, and both
    // of them have the same output level. This could happen if
    // (1) we are running a non-exclusive manual compaction
    // (2) AddFile ingest a new file into the LSM tree
    // We need to disallow this from happening.
    if (compaction_picker_->FilesRangeOverlapWithCompaction(compaction_inputs_,
                                                            output_level_)) {
      // This compaction output could potentially conflict with the output
      // of a currently running compaction, we cannot run it.
      return false;
    }
    compaction_picker_->GetGrandparents(vstorage_, start_level_inputs_,
                                        output_level_inputs_, &grandparents_);
  }

最后就是构造一个compact然后返回.

  // Form a compaction object containing the files we picked.
  Compaction* c = GetCompaction();

  TEST_SYNC_POINT_CALLBACK("LevelCompactionPicker::PickCompaction:Return", c);

  return c;

最后再回到BackgroundCompaction中,这里就是在得到需要compact的文件之后,进行具体的compact. 这里我们可以看到核心的数据结构就是CompactionJob,每一次的compact都是一个job,最终对于文件的compact都是在 CompactionJob::run中实现.

CompactionJob compaction_job(
        job_context->job_id, c.get(), immutable_db_options_,
        env_options_for_compaction_, versions_.get(), &shutting_down_,
        preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(),
        GetDataDir(c->column_family_data(), c->output_path_id()), stats_,
        &mutex_, &bg_error_, snapshot_seqs, earliest_write_conflict_snapshot,
        snapshot_checker, table_cache_, &event_logger_,
        c->mutable_cf_options()->paranoid_file_checks,
        c->mutable_cf_options()->report_bg_io_stats, dbname_,
        &compaction_job_stats);
    compaction_job.Prepare();

    mutex_.Unlock();
    compaction_job.Run();
    TEST_SYNC_POINT("DBImpl::BackgroundCompaction:NonTrivial:AfterRun");
    mutex_.Lock();

    status = compaction_job.Install(*c->mutable_cf_options());
    if (status.ok()) {
      InstallSuperVersionAndScheduleWork(
          c->column_family_data(), &job_context->superversion_context,
          *c->mutable_cf_options(), FlushReason::kAutoCompaction);
    }
    *made_progress = true;

在RocksDB中,Compact是会多线程并发的执行,而这里怎样并发,并发多少线程都是在CompactionJob中实现的,简单来说,当你的compact的文件range不重合的话,那么都是可以并发执行的。

我们先来看CompactionJob::Prepare函数,在这个函数中主要是做一些执行前的准备工作,首先是取得对应的compact的边界,这里每一个需要并发的compact都被抽象为一个sub compaction.因此在GenSubcompactionBoundaries会解析到对应的sub compaction以及边界.解析完毕之后,则将会把对应的信息全部加入sub_compact_states中。

void CompactionJob::Prepare() {
  ..........................
  if (c->ShouldFormSubcompactions()) {
    const uint64_t start_micros = env_->NowMicros();
    GenSubcompactionBoundaries();
    MeasureTime(stats_, SUBCOMPACTION_SETUP_TIME,
                env_->NowMicros() - start_micros);

    assert(sizes_.size() == boundaries_.size() + 1);

    for (size_t i = 0; i <= boundaries_.size(); i++) {
      Slice* start = i == 0 ? nullptr : &boundaries_[i - 1];
      Slice* end = i == boundaries_.size() ? nullptr : &boundaries_[i];
      compact_->sub_compact_states.emplace_back(c, start, end, sizes_[i]);
    }
    MeasureTime(stats_, NUM_SUBCOMPACTIONS_SCHEDULED,
                compact_->sub_compact_states.size());
  }
......................................
}

因此我们来详细分析GenSubcompactionBoundaries,这个函数比较长,我们来分开分析,首先是遍历所有的需要compact的level,然后取得每一个level的边界(也就是最大最小key)。

void CompactionJob::GenSubcompactionBoundaries() {
...........................
  // Add the starting and/or ending key of certain input files as a potential
  // boundary
  for (size_t lvl_idx = 0; lvl_idx < c->num_input_levels(); lvl_idx++) {
    int lvl = c->level(lvl_idx);
    if (lvl >= start_lvl && lvl <= out_lvl) {
      const LevelFilesBrief* flevel = c->input_levels(lvl_idx);
      size_t num_files = flevel->num_files;
.....................
      if (lvl == 0) {
        // For level 0 add the starting and ending key of each file since the
        // files may have greatly differing key ranges (not range-partitioned)
        for (size_t i = 0; i < num_files; i++) {
          bounds.emplace_back(flevel->files[i].smallest_key);
          bounds.emplace_back(flevel->files[i].largest_key);
        }
      } else {
        // For all other levels add the smallest/largest key in the level to
        // encompass the range covered by that level
        bounds.emplace_back(flevel->files[0].smallest_key);
        bounds.emplace_back(flevel->files[num_files - 1].largest_key);
        if (lvl == out_lvl) {
          // For the last level include the starting keys of all files since
          // the last level is the largest and probably has the widest key
          // range. Since it's range partitioned, the ending key of one file
          // and the starting key of the next are very close (or identical).
          for (size_t i = 1; i < num_files; i++) {
            bounds.emplace_back(flevel->files[i].smallest_key);
          }
        }
      }
    }
  }
......................

然后则是对取得的bounds进行排序以及去重.

  std::sort(bounds.begin(), bounds.end(),
            [cfd_comparator](const Slice& a, const Slice& b) -> bool {
              return cfd_comparator->Compare(ExtractUserKey(a),
                                             ExtractUserKey(b)) < 0;
            });
  // Remove duplicated entries from bounds
  bounds.erase(
      std::unique(bounds.begin(), bounds.end(),
                  [cfd_comparator](const Slice& a, const Slice& b) -> bool {
                    return cfd_comparator->Compare(ExtractUserKey(a),
                                                   ExtractUserKey(b)) == 0;
                  }),
      bounds.end());

接近着就来计算理想情况下所需要的subcompactions的个数以及输出文件的个数.

  // Group the ranges into subcompactions
  const double min_file_fill_percent = 4.0 / 5;
  int base_level = v->storage_info()->base_level();
  uint64_t max_output_files = static_cast<uint64_t>(std::ceil(
      sum / min_file_fill_percent /
      MaxFileSizeForLevel(*(c->mutable_cf_options()), out_lvl,
          c->immutable_cf_options()->compaction_style, base_level,
          c->immutable_cf_options()->level_compaction_dynamic_level_bytes)));
  uint64_t subcompactions =
      std::min({static_cast<uint64_t>(ranges.size()),
                static_cast<uint64_t>(c->max_subcompactions()),
                max_output_files});

最后更新boundaries_,这里会根据根据文件的大小,通过平均的size,来吧所有的range分为几份,最终这些都会保存在boundaries_中.

  if (subcompactions > 1) {
    double mean = sum * 1.0 / subcompactions;
    // Greedily add ranges to the subcompaction until the sum of the ranges'
    // sizes becomes >= the expected mean size of a subcompaction
    sum = 0;
    for (size_t i = 0; i < ranges.size() - 1; i++) {
      sum += ranges[i].size;
      if (subcompactions == 1) {
        // If there's only one left to schedule then it goes to the end so no
        // need to put an end boundary
        continue;
      }
      if (sum >= mean) {
        boundaries_.emplace_back(ExtractUserKey(ranges[i].range.limit));
        sizes_.emplace_back(sum);
        subcompactions--;
        sum = 0;
      }
    }
    sizes_.emplace_back(sum + ranges.back().size);
  }

然后我们来看CompactJob::Run的实现,在这个函数中,就是会遍历所有的sub_compact,然后启动线程来进行对应的compact工作,最后等到所有的线程完成,然后退出.

 // Launch a thread for each of subcompactions 1...num_threads-1
  std::vector<port::Thread> thread_pool;
  thread_pool.reserve(num_threads - 1);
  for (size_t i = 1; i < compact_->sub_compact_states.size(); i++) {
    thread_pool.emplace_back(&CompactionJob::ProcessKeyValueCompaction, this,
                             &compact_->sub_compact_states[i]);
  }

  // Always schedule the first subcompaction (whether or not there are also
  // others) in the current thread to be efficient with resources
  ProcessKeyValueCompaction(&compact_->sub_compact_states[0]);

  // Wait for all other threads (if there are any) to finish execution
  for (auto& thread : thread_pool) {
    thread.join();
  }

  if (output_directory_) {
    output_directory_->Fsync();
  }

最后我们可以看到最终compact工作是在CompactionJob::ProcessKeyValueCompaction是实现的,这个函数我们暂时就不分析了,我们只需要知道所有的compact工作都是在这个函数中执行的.

网友评论

登录后评论
0/500
评论
db匠
+ 关注
所属云栖号: 阿里云数据库ApsaraDB