leveldb的compaction实现
本文最后更新于 595 天前,其中的信息可能已经有所发展或是发生改变。

leveldb compaction

注:这篇文章本质应该算是缝合怪,只是自己稍微记录一下(你还能看到很多的TODO)。

项目结构

util文件夹

  • arena.h/arena.cc:内存分配
  • bloom.cc:布隆过滤器
  • cache.cc:实现LRU缓存,相同的key只会存储一次
  • coding.cc/coding.h:实现整形的变长编码
  • crc32c.h/crc32c.cc:crc32的实现
  • env.cc/env_posix.cc/env_windows.cc:环境相关的代码
  • hash.h/hash.cc:hash的实现
  • histogram.h/histogram.cc:直方图的实现(具体作用未知)
  • logging.h/logging.cc:打印日志
  • no_destructor.hNoDesctructor 类的实现
    • std::aligned_storage
    • 定位new:
  • random.h:随机数算法
  • testutil.h/testutil.cc:测试相关的函数

table文件夹

  • iterator.cc/iterator_wrapper.h:迭代器的接口定义
  • merger.h/merger.cc:多路 iterator 的实现
  • block_builder.h/block_builder.cc:负责 block 数据的创建
  • block.h/block.cc:负责 block_builder 创建的数据的解析,构建了一个迭代器来进行block内部数据的遍历访问和二分查找
  • filter_block.h/filter_block.cc
  • format.h/format.ccBlockHandle 以及 Footer 的实现
  • table.cc:负责 sstable 的创建
  • table_builder.cc:负责 sstable 数据的读取
  • two_level_iterator.h/two_level_iterator.cc

本文档只会记录那些和 lst-tree 相关度较高的模块,其他像是跳表,cache之类的不会在分析范围之内。

SSTable的结构,memtable的结构其实对于分析整个 compaction过程来说也不重要,知道其内部怎么存储kv就够了(就是下面Internal Key部分)

two_level_iterator

通过index_iterdata_iter来对数据进行遍历,主要注意以下几个方法

  • InitDataBlock:通过当前的index_iter来初始化data_iter,如果index_iter无效的话,data_iter也设为无效
  • SkipEmptyDataBlocksBackward(Forward):如果data_iter没有走到末尾的话,什么也不做,否则,一直前进index_iter直到能成功初始化 data_iter(即能取到值)
  • 其余的seek等方法都差不多,只不过相比单迭代器多了一步可能的移动index_iter的过程

two level iterator主要用在 Block数据的遍历上

Internal Key

img

User Key 是我们进行查询时使用到的key,Internal Key 则为查询时用到的key

在数据写入SSTable时实际写的就是Internal Key,会通过User Key升序,Sequence Number降序的方式来排列,保证User Key相同的情况下,越新的数据在越前面

LookUp Key是存储与memtable内部的结构

leveldb的各类文件

leveldb 总共有以下几种文件类型

enum FileType {
  kLogFile,
  kDBLockFile,
  kTableFile,
  kDescriptorFile,
  kCurrentFile,
  kTempFile,
  kInfoLogFile  // Either the current one, or an old one
};
  • kLogFile*.log文件,也就是WAL文件
  • kDBLockFileLOCK文件是锁文件,一个LevelDB数据库同时只允许被一个进程操作,一个进程打开一个数据库时,会对这个文件加锁,防止其它进程并发打开这个数据库
  • kTableFile:SSTABLE文件
  • kDescriptorFile:MANIFEST文件,以MANIFEST-开头。例如:MANIFEST-000002
  • kCurrentFileCURRENT文件,存放当前使用的MANIFEST文件
  • kTempFile:在使用新Manifest文件时,会创建一个dbtmp文件,然后rename成为真的,例如:000005.dbtmp
  • kInfoLogFile

sstable、log、manifest 的文件都是通过一个递增的整数来命名的,来自 VersionSetnext_file_number_

也就是不会出现 MANIFEST-000002000002.log 同时存在的情况

SSTable

Put过程

最外层调用的是Put(const WriteOptions& options, const Slice& key, const Slice& value),底层会将这个kv对封装到一个Writer加入写队列中,持有锁的那个线程会将写队列中的请求合并为一个大的WriteBatch再进行写入,当然这个WriteBatch的大小是有限制的。

WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
  mutex_.AssertHeld();
  assert(!writers_.empty());
  Writer* first = writers_.front();
  WriteBatch* result = first->batch;
  assert(result != nullptr);

  size_t size = WriteBatchInternal::ByteSize(first->batch);

  // Allow the group to grow up to a maximum size, but if the
  // original write is small, limit the growth so we do not slow
  // down the small write too much.
  size_t max_size = 1 << 20;
  if (size <= (128 << 10)) {
    max_size = size + (128 << 10);
  }

  *last_writer = first;
  std::deque<Writer*>::iterator iter = writers_.begin();
  ++iter;  // Advance past "first"
  for (; iter != writers_.end(); ++iter) {
    Writer* w = *iter;
    if (w->sync && !first->sync) {
      // Do not include a sync write into a batch handled by a non-sync write.
      break;
    }

    if (w->batch != nullptr) {
      size += WriteBatchInternal::ByteSize(w->batch);
      if (size > max_size) {
        // Do not make batch too big
        break;
      }

      // Append to *result
      if (result == first->batch) {
        // Switch to temporary batch instead of disturbing caller's batch
        result = tmp_batch_;
        assert(WriteBatchInternal::Count(result) == 0);
        WriteBatchInternal::Append(result, first->batch);
      }
      WriteBatchInternal::Append(result, w->batch);
    }
    *last_writer = w;
  }
  return result;
}

在合并写入 SSTable 前,会在 MakeRoomForWrite 进行一系列判断,是不是需要进行 Compaction

  1. 如果allow_delay并且 level 0 的文件数已经大于等于config::kL0_SlowdownWritesTrigger = 8了,就等待一秒再进行下面的判断,这个延迟只会进行一次。
  2. 如果memtable的大小还没达到阈值,可以直接写入,判断结束。
  3. 如果memtable空间不够,并且imemtable还未写入,就继续等待。
  4. 如果 level 0 的文件数已经大于等于config::kL0_StopWritesTrigger = 12,此时就要等待后台线程进行compaction完成。
  5. 否则,创建新的log文件以及memtable,通知后台线程进行 compaction。

在完成一系列判断后,就可以将WriteBatch写入 memtable 中,当然在写入前需要先写日志。

Compaction

文件描述

文件的信息则是通过FileMetaData来记录,主要记录了文件大小,引用基数等信息,allowed_seeks用于进行 Compact,之后会介绍

struct FileMetaData {
  FileMetaData() : refs(0), allowed_seeks(1 << 30), file_size(0) {}

  int refs;
  int allowed_seeks;  // Seeks allowed until compaction
  uint64_t number;
  uint64_t file_size;    // File size in bytes
  InternalKey smallest;  // Smallest internal key served by table
  InternalKey largest;   // Largest internal key served by table
};

Version, VersionSet, VersionEdit

每次出现导致文件变动的操作后(比如添加,合并,移动,删除文件),都会产生一个新的VersionEdit文件来记录这些操作,VersionEdit的信息会下盘到 Manifest文件中,文件格式和 Wal 一致。

Version则是记录了某一时刻所用到的文件的情况,因为有 snapshot 的存在,一个版本可能只会用到一部分的文件

VersionEdit作用于Version上旧可以得到新的Version,调用的是LogAndApply

这里写图片描述

VersionSetversion组成一个双向循环链表,每个Version有引用计数,在生命周期结束后,会将自己从双向链表中摘除

这里写图片描述

VersionSet::LogAndApply主要做了几件事(TODO)

所有的 VersionEdit 信息都会存储在MANIFEST文件中,每次创建新的MANIFEST时,会将当前的文件信息通过WriteSnapshot写入MANIFEST中,之后每次版本有改动,就将VersionEdit追加写入。

重新打开数据库时,就通过MANIFEST文件恢复之前的状态

文件清理

RemoveObsoleteFiles函数主要负责了 compaction 后无用文件的清理

根据文件的类型有几种情况

kLogFile

keep = ((number >= versions_->LogNumber()) ||
        (number == versions_->PrevLogNumber()));

versions_->PrevLogNumber()存储了正在进行 compact 的表的日志,这个日志不能被删除

versions_->LogNumber()记录了正在使用的日志文件的编号,在 compact 完成后就会进行更新,小于它的日志文件都已经下盘了

虽然但是,这个PrevLogNumber似乎就没有被修改过,而且从逻辑上来看,这个PrevLogNumber似乎不需要?因为LogNumber的更新时机是在 memtable 下盘和清理文件之间,用LogNumber应该就够了?

kDescriptorFile

keep = (number >= versions_->ManifestFileNumber());

kTableFilekTempFile

keep = (live.find(number) != live.end());

这两类文件都通过一个文件存活的集合来判读文件是否可以被删除,只要文件被任意一个版本所引用,就不能被删除

void VersionSet::AddLiveFiles(std::set<uint64_t>* live) {
  for (Version* v = dummy_versions_.next_; v != &dummy_versions_;
       v = v->next_) {
    for (int level = 0; level < config::kNumLevels; level++) {
      const std::vector<FileMetaData*>& files = v->files_[level];
      for (size_t i = 0; i < files.size(); i++) {
        live->insert(files[i]->number);
      }
    }
  }
}

std::set<uint64_t> live = pending_outputs_;
versions_->AddLiveFiles(&live);

kCurrentFilekDBLockFilekInfoLogFile

不删除

Compaction

compaction主要分为两种,下盘 memtable 被称为 minor compaction,而进行文件的合并和删除,就被称为 major compaction。

compaction是在BackgroundCompaction中进行

并且一次compaction后,还要再进行一次判断来决定是否要进行再一次compaction

// Previous compaction may have produced too many files in a level,
// so reschedule another compaction if needed.

Minor Compaction

minor compaction 的优先级是最高的,BackgroundCompaction首先判断的就是是否需要 compact memtable

if (imm_ != nullptr) {
  CompactMemTable();
  return;
}

同时在之后能看到,在major compaction中,也会优先处理可能的minor compaction

选择文件放置的 level

在将 memtable 保存写入文件后,PickLevelForMemTableOutput会选择合适的level来放置新的文件

以下流程都针对config::kNumLevels为2的情况

  1. 如果和 level 0 的文件有重叠,直接放在 level 0
  2. 如果和 level 1 的文件有重叠,放在 level 0
  3. 如果和 level 2 的文件有重叠,且重叠文件的总大小大于 MaxGrandParentOverlapBytes ,放在 level 0(MaxGrandParentOverlapBytes是为了避免这次操作导致下层后续的compaction压力过大,后面也会出现)
  4. 如果和 level 2 的文件有重叠,放在 level 1
  5. 如果和 level 3 的文件有重叠,且重叠文件的总大小大于 MaxGrandParentOverlapBytes ,放在 level 2

后处理

// Replace immutable memtable with the generated Table
if (s.ok()) {
  edit.SetPrevLogNumber(0);

  // 只有在 minor compaction 中会设置 log number,因为 major compaction 不会创建新的log (😕不一定对)
  edit.SetLogNumber(logfile_number_);  // Earlier logs no longer needed
  s = versions_->LogAndApply(&edit, &mutex_);
}

if (s.ok()) {
  // Commit to the new state
  imm_->Unref();
  imm_ = nullptr;
  has_imm_.store(false, std::memory_order_release);
  RemoveObsoleteFiles();
} else {
  RecordBackgroundError(s);
}

之后就是Version的更新,以及文件清理工作

Major compaction

触发 major compaction 的情况有三种,这里只考虑自动触发的两种情况,分别是

  1. 某一层的文件总大小超出了规定的阈值(level0考虑的是文件的数量)
  2. 某一个文件的seek次数过多

每次创建了新的VersionEdit并作用产生新的Version时,就会去检测情况一是否发生。

而情况二则是在查询次数过多时发生(TODO)

同时 size compaction 的优先级高于 seek compaction

Compaction结构体

Compaction记录了一次compact涉及到的文件,其成员变量如下

  int level_;
  uint64_t max_output_file_size_;
  Version* input_version_;
  VersionEdit edit_;

  // Each compaction reads inputs from "level_" and "level_+1"
  std::vector<FileMetaData*> inputs_[2];  // The two sets of inputs

  // State used to check for number of overlapping grandparent files
  // (parent == level_ + 1, grandparent == level_ + 2)
  std::vector<FileMetaData*> grandparents_;
  size_t grandparent_index_;  // Index in grandparent_starts_
  bool seen_key_;             // Some output key has been seen
  int64_t overlapped_bytes_;  // Bytes of overlap between current output
                              // and grandparent files

  // State for implementing IsBaseLevelForKey

  // level_ptrs_ holds indices into input_version_->levels_: our state
  // is that we are positioned at one of the file ranges for each
  // higher level than the ones involved in this compaction (i.e. for
  // all L >= level_ + 2).
  size_t level_ptrs_[config::kNumLevels];

PickCompaction会根据两种情况来计算需要进行 compaction 的文件

c = versions_->PickCompaction();

size compaction

size_compaction会在当前的compaction_score大于1时进行

if (size_compaction) {
    level = current_->compaction_level_;
    assert(level >= 0);
    assert(level + 1 < config::kNumLevels);
    c = new Compaction(options_, level);

    // Pick the first file that comes after compact_pointer_[level]
    for (size_t i = 0; i < current_->files_[level].size(); i++) {
      FileMetaData* f = current_->files_[level][i];
      if (compact_pointer_[level].empty() ||
          icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) {
        c->inputs_[0].push_back(f);
        break;
      }
    }
    if (c->inputs_[0].empty()) {
      // Wrap-around to the beginning of the key space
      c->inputs_[0].push_back(current_->files_[level][0]);
    }
  }

compact_pointer_ 是 string 类型,记录了该层上次 compact 时文件的 largest key。

每次compact都会从该层上一次compact的位置开始选择合适的文件(如果走到末尾了,就再从头开始),这样应该是保证compaction能覆盖到所有的文件

处理level0的文件

因为 level 0 的文件是有可能重叠的,所以如果 compact 的是第0层,需要将和选择的文件有重叠的其他文件都一起下盘

img

例如我们有两个 level 0 的文件,范围分别是 (a0, u1) 和 (k2, v4)

第一次我们选择了第一个文件下降到 level 1

但是因为存在重叠,当我们查询 u 时,就会从 (k2, v4) 里获取到旧的数据,就导致了读取错误

对应的做法就是当选出文件后,判断还有哪些文件有重叠,把这些文件都加入进来,这个例子对应的就是把两个文件都加进来。

代码上,先通过GetRange获取输入文件的 key range,然后根据 key range 得到一个所有有重叠文件列表(GetOverlappingInputs内部会循环的判断重叠)

  // Files in level 0 may overlap each other, so pick up all overlapping ones
  if (level == 0) {
    InternalKey smallest, largest;
    GetRange(c->inputs_[0], &smallest, &largest);
    // Note that the next call will discard the file we placed in
    // c->inputs_[0] earlier and replace it with an overlapping set
    // which will include the picked file.
    current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]);
    assert(!c->inputs_[0].empty());
  }

seek compaction

这里的部分非常简单,就是选择file_to_compact_这个文件,也就是 seek 次数过多的文件

接下来的部分是两种 compaction 共通的

SetupOtherInputs

这里的逻辑还是比较复杂的,分步进行解读

void AddBoundaryInputs(const InternalKeyComparator& icmp,
                       const std::vector<FileMetaData*>& level_files,
                       std::vector<FileMetaData*>* compaction_files);

AddBoundaryInputs(icmp_, current_->files_[level], &c->inputs_[0]);

这个函数在每次选择完某一个 level 的文件后都要进行一次,它的流程如下

  1. 找到 compaction_files 中最大的key u1,找到满足 l2 > u1 && user_key(u1) = user_key(l2) 的最小的文件,加入compaction列表
  2. 将最大的key设为 u2,再次进行搜索,直到无法找到为止
  3. 也就是将那些边界上 user key 相同的文件一起加入列表

如果存在两个block,b1 = (l1, u1)b2 = (l2, u2)满足k = user_key(u1) = user_key(l2),那么在查询 k 时会先查到b2而不是b1的数据,而实际b2保存的是更旧的数据,因此,要将这些边界重叠的文件一起移到下一层。

这个bug据说存在了三年才被官方重视

下图就是一个简单的例子,在查询 bb 时就会出现错误

img

current_->GetOverlappingInputs(level + 1, &smallest, &largest,
                                 &c->inputs_[1]);
AddBoundaryInputs(icmp_, current_->files_[level + 1], &c->inputs_[1]);

// Get entire range covered by compaction
InternalKey all_start, all_limit;
GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);

回到选择文件上,首先是计算下一层与inputs_[0] 有重叠的所有文件,记录到inputs_[1],并计算 inputs_[1]的 key 值范围

  if (!c->inputs_[1].empty()) {
    std::vector<FileMetaData*> expanded0;
    current_->GetOverlappingInputs(level, &all_start, &all_limit, &expanded0);
    AddBoundaryInputs(icmp_, current_->files_[level], &expanded0);
    const int64_t inputs0_size = TotalFileSize(c->inputs_[0]);
    const int64_t inputs1_size = TotalFileSize(c->inputs_[1]);
    const int64_t expanded0_size = TotalFileSize(expanded0);
    if (expanded0.size() > c->inputs_[0].size() &&
        inputs1_size + expanded0_size <
            ExpandedCompactionByteSizeLimit(options_)) {
      InternalKey new_start, new_limit;
      GetRange(expanded0, &new_start, &new_limit);
      std::vector<FileMetaData*> expanded1;
      current_->GetOverlappingInputs(level + 1, &new_start, &new_limit,
                                     &expanded1);
      AddBoundaryInputs(icmp_, current_->files_[level + 1], &expanded1);
      if (expanded1.size() == c->inputs_[1].size()) {
        smallest = new_start;
        largest = new_limit;
        c->inputs_[0] = expanded0;
        c->inputs_[1] = expanded1;
        GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);
      }
    }
  }

之后我们希望尽可能compact更多的文件,基本的流程可以写为:

  1. 计算与inputs_[1]层范围有重叠的 level 层文件,记为expanded0
  2. 如果expanded0的总文件大小比inputs_[0]大,并且两层需要 compact 的总文件数量小于一定的阈值,继续判断,否则退出
  3. 计算与expanded0有重叠的 level + 1 层文件,记为expand1
  4. 如果expand1的数量和inputs_[1]相同,表明可以对 level 层进行一个扩充

也就是:在不增加 level + 1 层文件,同时不会导致 compact 的文件过大的前提下,尽量增加 level 层的文件数

这样就最终确定了需要参与 compact 的文件

为了避免这些文件合并到 level + 1 层后,跟 level + 2 层有重叠的文件太多,届时合并 level + 1 和 level + 2 层压力太大,因此我们还需要记录下 level + 2 层的文件,后续 compact 时用于提前结束的判断:

  // Compute the set of grandparent files that overlap this compaction
  // (parent == level+1; grandparent == level+2)
  if (level + 2 < config::kNumLevels) {
    //level + 2层overlap的文件记录到c->grandparents_
    current_->GetOverlappingInputs(level + 2, &all_start, &all_limit,
                                   &c->grandparents_);
  }

IsTrivialMove

选择完文件后,先进行判断能否通过移动文件来实现 compaction,如果满足条件就可以直接移动,调整manifest文件,产生新的Version即可

  1. level 层只有一个文件
  2. level +1 层没有文件
  3. 和 level + 2 层overlap的文件没有超过阈值(MaxGrandParentOverlapBytes
bool Compaction::IsTrivialMove() const {
  const VersionSet* vset = input_version_->vset_;
  // Avoid a move if there is lots of overlapping grandparent data.
  // Otherwise, the move could create a parent file that will require
  // a very expensive merge later on.
  return (num_input_files(0) == 1 && num_input_files(1) == 0 &&
          TotalFileSize(grandparents_) <=
              MaxGrandParentOverlapBytes(vset->options_));
}

DoCompaction

之后就可以真正的进行处理了,处理的过程如下所示,之后再一步一步分解来看

CompactionState* compact = new CompactionState(c);
status = DoCompactionWork(compact);
if (!status.ok()) {
  RecordBackgroundError(status);
}
CleanupCompaction(compact);
c->ReleaseInputs();
RemoveObsoleteFiles();

CompactionState

CompactionStateCompaction的基础上,还记录了如smallest_snapshot等信息

struct DBImpl::CompactionState {
  // Files produced by compaction
  struct Output {
    uint64_t number;
    uint64_t file_size;
    InternalKey smallest, largest;
  };

  //...
  Compaction* const compaction;

  // Sequence numbers < smallest_snapshot are not significant since we
  // will never have to service a snapshot below smallest_snapshot.
  // Therefore if we have seen a sequence number S <= smallest_snapshot,
  // we can drop all entries for the same key with sequence numbers < S.
  SequenceNumber smallest_snapshot;

  std::vector<Output> outputs;

  // State kept for output being generated
  WritableFile* outfile;
  TableBuilder* builder;

  uint64_t total_bytes;
};

DoCompactionWork

首先设置 snapshot

if (snapshots_.empty()) {
  compact->smallest_snapshot = versions_->LastSequence();
} else {
  compact->smallest_snapshot = snapshots_.oldest()->sequence_number();
}

如果存在 snapshot,compact->smallest_snapshot被设为最小的 snapshot,否则则是最新的 sequence number,大于smallest_snapshot的肯定需要全部保留,因为可能被某个 snapshot 用到,而小于smallest_snapshot的只需要保留一条比smallest_snapshot小的最新的插入或者删除就够了,这些数据只会被smallest_snapshot用到。

进行Compaction操作

首先需要根据需要 compact 的文件创建一个MergingIterator

  const int space = (c->level() == 0 ? c->inputs_[0].size() + 1 : 2);
  Iterator** list = new Iterator*[space];
  int num = 0;
  for (int which = 0; which < 2; which++) {
    if (!c->inputs_[which].empty()) {
      if (c->level() + which == 0) {
        const std::vector<FileMetaData*>& files = c->inputs_[which];
        for (size_t i = 0; i < files.size(); i++) {
          list[num++] = table_cache_->NewIterator(options, files[i]->number,
                                                  files[i]->file_size);
        }
      } else {
        // Create concatenating iterator for the files from this level
        list[num++] = NewTwoLevelIterator(
            new Version::LevelFileNumIterator(icmp_, &c->inputs_[which]),
            &GetFileIterator, table_cache_, options);
      }
    }
  }
  assert(num <= space);
  Iterator* result = NewMergingIterator(&icmp_, list, num);

因为 level0 的文件有重叠部分,所以不能使用TwoLevelIterator,转而使用普通的Table::Iterator

while (input->Valid() && !shutting_down_.load(std::memory_order_acquire)) {
    // Compact
}

整个 compact 流程就是不断的读取并且写入文件的过程,大致流程如下:

    if (has_imm_.load(std::memory_order_relaxed)) {
      const uint64_t imm_start = env_->NowMicros();
      mutex_.Lock();
      if (imm_ != nullptr) {
        CompactMemTable();
        // Wake up MakeRoomForWrite() if necessary.
        background_work_finished_signal_.SignalAll();
      }
      mutex_.Unlock();
      imm_micros += (env_->NowMicros() - imm_start);
    }

每次读取一个kv,会先判断是否需要做 minor compaction,因为 minor compaction 的优先级是最高的

    Slice key = input->key();
    if (compact->compaction->ShouldStopBefore(key) &&
        compact->builder != nullptr) {
      status = FinishCompactionOutputFile(compact, input);
      if (!status.ok()) {
        break;
      }
    }

判断是否需要提前终止(TODO. 待看)

bool drop = false;
if (!ParseInternalKey(key, &ikey)) {
  // Do not hide error keys
  current_user_key.clear();
  has_current_user_key = false;
  last_sequence_for_key = kMaxSequenceNumber;
} else {
  if (!has_current_user_key ||
      user_comparator()->Compare(ikey.user_key, Slice(current_user_key)) !=
          0) {
    // First occurrence of this user key
    current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
    has_current_user_key = true;
    last_sequence_for_key = kMaxSequenceNumber;
  }

  if (last_sequence_for_key <= compact->smallest_snapshot) {
    // Hidden by an newer entry for same user key
    drop = true;  // (A)
  } else if (ikey.type == kTypeDeletion &&
             ikey.sequence <= compact->smallest_snapshot &&
             compact->compaction->IsBaseLevelForKey(ikey.user_key)) {
    // For this user key:
    // (1) there is no data in higher levels
    // (2) data in lower levels will have larger sequence numbers
    // (3) data in layers that are being compacted here and have
    //     smaller sequence numbers will be dropped in the next
    //     few iterations of this loop (by rule (A) above).
    // Therefore this deletion marker is obsolete and can be dropped.
    drop = true;
  }

  last_sequence_for_key = ikey.sequence;
}

iterator 返回的 key 全部有序,遍历过程可以清理掉一些 key。

由于多次Put/Delete,有些key会出现多次,在compact时丢弃。策略如下:

  1. rule(A):对于多次出现的 user key,我们只关心最后写入的值,如果上一次写入的值的 last_sequence_for_key已经小于compact->smallest_snapshot了,这条kv就不需要插入(具体看之前的 smallest_snapshot 部分)
    • 只有当前key值是一个新的值才会触发last_sequence_for_key = kMaxSequenceNumber,跳过rule(A)
  2. 如果是删除 key,并且满足下面的条件,就可以删除
    • 当前的 sequence number 小于compact->smallest_snapshot
    • 更高层没有这个值
  3. 因为高层的值更旧,如果高层有值,就表明仍有小于compact->smallest_snapshot的插入操作,所以这个删除操作还是需要保留,以保证查询不出错
// Close output file if it is big enough
if (compact->builder->FileSize() >=
    compact->compaction->MaxOutputFileSize()) {
  status = FinishCompactionOutputFile(compact, input);
  if (!status.ok()) {
    break;
  }
}

如果插入的文件大小过大,就创建一个新文件继续写入

if (status.ok()) {
  status = InstallCompactionResults(compact);
}

最终合并完成后,把所有 compaction 使用到的文件和合并后创建的文件记录到 VersionEdit 中并产生新的 Version

清理

void DBImpl::CleanupCompaction(CompactionState* compact) {
  mutex_.AssertHeld();
  if (compact->builder != nullptr) {
    // May happen if we get a shutdown call in the middle of compaction
    compact->builder->Abandon();
    delete compact->builder;
  } else {
    assert(compact->outfile == nullptr);
  }
  delete compact->outfile;
  for (size_t i = 0; i < compact->outputs.size(); i++) {
    const CompactionState::Output& out = compact->outputs[i];
    pending_outputs_.erase(out.number);
  }
  delete compact;
}

void DBImpl::RemoveObsoleteFiles();

合并完成后,会将用到的文件从pending_outputs_中移除,然后调用RemoveObsoleteFiles删除文件

之前也看到,在合并过程中,也会去处理 minor compaction,而 minor compaction 也会通过RemoveObsoleteFiles进行清理,因此在合并前需要将用到的文件加入pending_outputs_中,防止被清理。

参考

  1. 深入浅出leveldb
  2. leveldb 笔记:Version
  3. leveldb笔记之19:major compaction
  4. leveldb - sstable格式
  5. RocksDB内部数据组织方式介绍及性能压测
  6. 庖丁解LevelDB之版本控制
  7. leveldb学习记录-compaction从memtable到sstable
暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇