leveldb compaction
注:这篇文章本质应该算是缝合怪,只是自己稍微记录一下(你还能看到很多的TODO)。
项目结构
util
文件夹
arena.h/arena.cc
:内存分配bloom.cc
:布隆过滤器cache.cc
:实现LRU缓存,相同的key只会存储一次coding.cc/coding.h
:实现整形的变长编码crc32c.h/crc32c.cc
:crc32的实现env.cc/env_posix.cc/env_windows.cc
:环境相关的代码hash.h/hash.cc
:hash的实现histogram.h/histogram.cc
:直方图的实现(具体作用未知)logging.h/logging.cc
:打印日志no_destructor.h
:NoDesctructor
类的实现std::aligned_storage
- 定位new:
random.h
:随机数算法testutil.h/testutil.cc
:测试相关的函数
table
文件夹
iterator.cc/iterator_wrapper.h
:迭代器的接口定义merger.h/merger.cc
:多路iterator
的实现block_builder.h/block_builder.cc
:负责block
数据的创建block.h/block.cc
:负责block_builder
创建的数据的解析,构建了一个迭代器来进行block内部数据的遍历访问和二分查找filter_block.h/filter_block.cc
:format.h/format.cc
:BlockHandle
以及Footer
的实现table.cc
:负责sstable
的创建table_builder.cc
:负责sstable
数据的读取two_level_iterator.h/two_level_iterator.cc
:
本文档只会记录那些和 lst-tree 相关度较高的模块,其他像是跳表,cache之类的不会在分析范围之内。
SSTable的结构,memtable的结构其实对于分析整个 compaction过程来说也不重要,知道其内部怎么存储kv就够了(就是下面Internal Key
部分)
two_level_iterator
通过index_iter
和data_iter
来对数据进行遍历,主要注意以下几个方法
InitDataBlock
:通过当前的index_iter
来初始化data_iter
,如果index_iter
无效的话,data_iter
也设为无效SkipEmptyDataBlocksBackward(Forward)
:如果data_iter
没有走到末尾的话,什么也不做,否则,一直前进index_iter
直到能成功初始化data_iter
(即能取到值)- 其余的
seek
等方法都差不多,只不过相比单迭代器多了一步可能的移动index_iter
的过程
two level iterator主要用在 Block数据的遍历上
Internal Key
User Key 是我们进行查询时使用到的key,Internal Key 则为查询时用到的key
在数据写入SSTable时实际写的就是Internal Key,会通过User Key升序,Sequence Number降序的方式来排列,保证User Key相同的情况下,越新的数据在越前面
LookUp Key是存储与memtable内部的结构
leveldb的各类文件
leveldb 总共有以下几种文件类型
enum FileType {
kLogFile,
kDBLockFile,
kTableFile,
kDescriptorFile,
kCurrentFile,
kTempFile,
kInfoLogFile // Either the current one, or an old one
};
kLogFile
:*.log
文件,也就是WAL文件kDBLockFile
:LOCK
文件是锁文件,一个LevelDB数据库同时只允许被一个进程操作,一个进程打开一个数据库时,会对这个文件加锁,防止其它进程并发打开这个数据库kTableFile
:SSTABLE文件kDescriptorFile
:MANIFEST文件,以MANIFEST-开头。例如:MANIFEST-000002
kCurrentFile
:CURRENT
文件,存放当前使用的MANIFEST文件kTempFile
:在使用新Manifest文件时,会创建一个dbtmp文件,然后rename成为真的,例如:000005.dbtmp
kInfoLogFile
sstable、log、manifest 的文件都是通过一个递增的整数来命名的,来自 VersionSet
的 next_file_number_
也就是不会出现 MANIFEST-000002
、000002.log
同时存在的情况
SSTable
Put过程
最外层调用的是Put(const WriteOptions& options, const Slice& key, const Slice& value)
,底层会将这个kv对封装到一个Writer
加入写队列中,持有锁的那个线程会将写队列中的请求合并为一个大的WriteBatch
再进行写入,当然这个WriteBatch
的大小是有限制的。
WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
mutex_.AssertHeld();
assert(!writers_.empty());
Writer* first = writers_.front();
WriteBatch* result = first->batch;
assert(result != nullptr);
size_t size = WriteBatchInternal::ByteSize(first->batch);
// Allow the group to grow up to a maximum size, but if the
// original write is small, limit the growth so we do not slow
// down the small write too much.
size_t max_size = 1 << 20;
if (size <= (128 << 10)) {
max_size = size + (128 << 10);
}
*last_writer = first;
std::deque<Writer*>::iterator iter = writers_.begin();
++iter; // Advance past "first"
for (; iter != writers_.end(); ++iter) {
Writer* w = *iter;
if (w->sync && !first->sync) {
// Do not include a sync write into a batch handled by a non-sync write.
break;
}
if (w->batch != nullptr) {
size += WriteBatchInternal::ByteSize(w->batch);
if (size > max_size) {
// Do not make batch too big
break;
}
// Append to *result
if (result == first->batch) {
// Switch to temporary batch instead of disturbing caller's batch
result = tmp_batch_;
assert(WriteBatchInternal::Count(result) == 0);
WriteBatchInternal::Append(result, first->batch);
}
WriteBatchInternal::Append(result, w->batch);
}
*last_writer = w;
}
return result;
}
在合并写入 SSTable 前,会在 MakeRoomForWrite
进行一系列判断,是不是需要进行 Compaction
- 如果
allow_delay
并且 level 0 的文件数已经大于等于config::kL0_SlowdownWritesTrigger = 8
了,就等待一秒再进行下面的判断,这个延迟只会进行一次。 - 如果
memtable
的大小还没达到阈值,可以直接写入,判断结束。 - 如果
memtable
空间不够,并且imemtable
还未写入,就继续等待。 - 如果 level 0 的文件数已经大于等于
config::kL0_StopWritesTrigger = 12
,此时就要等待后台线程进行compaction完成。 - 否则,创建新的
log
文件以及memtable
,通知后台线程进行 compaction。
在完成一系列判断后,就可以将WriteBatch
写入 memtable 中,当然在写入前需要先写日志。
Compaction
文件描述
文件的信息则是通过FileMetaData
来记录,主要记录了文件大小,引用基数等信息,allowed_seeks
用于进行 Compact,之后会介绍
struct FileMetaData {
FileMetaData() : refs(0), allowed_seeks(1 << 30), file_size(0) {}
int refs;
int allowed_seeks; // Seeks allowed until compaction
uint64_t number;
uint64_t file_size; // File size in bytes
InternalKey smallest; // Smallest internal key served by table
InternalKey largest; // Largest internal key served by table
};
Version, VersionSet, VersionEdit
每次出现导致文件变动的操作后(比如添加,合并,移动,删除文件),都会产生一个新的VersionEdit
文件来记录这些操作,VersionEdit
的信息会下盘到 Manifest
文件中,文件格式和 Wal 一致。
Version
则是记录了某一时刻所用到的文件的情况,因为有 snapshot 的存在,一个版本可能只会用到一部分的文件
VersionEdit
作用于Version
上旧可以得到新的Version
,调用的是LogAndApply
VersionSet
是version
组成一个双向循环链表,每个Version
有引用计数,在生命周期结束后,会将自己从双向链表中摘除
VersionSet::LogAndApply
主要做了几件事(TODO)
所有的 VersionEdit
信息都会存储在MANIFEST
文件中,每次创建新的MANIFEST
时,会将当前的文件信息通过WriteSnapshot
写入MANIFEST
中,之后每次版本有改动,就将VersionEdit
追加写入。
重新打开数据库时,就通过MANIFEST
文件恢复之前的状态
文件清理
RemoveObsoleteFiles
函数主要负责了 compaction 后无用文件的清理
根据文件的类型有几种情况
kLogFile
keep = ((number >= versions_->LogNumber()) ||
(number == versions_->PrevLogNumber()));
versions_->PrevLogNumber()
存储了正在进行 compact 的表的日志,这个日志不能被删除
versions_->LogNumber()
记录了正在使用的日志文件的编号,在 compact 完成后就会进行更新,小于它的日志文件都已经下盘了
虽然但是,这个PrevLogNumber
似乎就没有被修改过,而且从逻辑上来看,这个PrevLogNumber
似乎不需要?因为LogNumber
的更新时机是在 memtable 下盘和清理文件之间,用LogNumber
应该就够了?
kDescriptorFile
keep = (number >= versions_->ManifestFileNumber());
kTableFile
和kTempFile
keep = (live.find(number) != live.end());
这两类文件都通过一个文件存活的集合来判读文件是否可以被删除,只要文件被任意一个版本所引用,就不能被删除
void VersionSet::AddLiveFiles(std::set<uint64_t>* live) {
for (Version* v = dummy_versions_.next_; v != &dummy_versions_;
v = v->next_) {
for (int level = 0; level < config::kNumLevels; level++) {
const std::vector<FileMetaData*>& files = v->files_[level];
for (size_t i = 0; i < files.size(); i++) {
live->insert(files[i]->number);
}
}
}
}
std::set<uint64_t> live = pending_outputs_;
versions_->AddLiveFiles(&live);
kCurrentFile
,kDBLockFile
,kInfoLogFile
不删除
Compaction
compaction主要分为两种,下盘 memtable 被称为 minor compaction,而进行文件的合并和删除,就被称为 major compaction。
compaction是在BackgroundCompaction
中进行
并且一次compaction后,还要再进行一次判断来决定是否要进行再一次compaction
// Previous compaction may have produced too many files in a level,
// so reschedule another compaction if needed.
Minor Compaction
minor compaction 的优先级是最高的,BackgroundCompaction
首先判断的就是是否需要 compact memtable
if (imm_ != nullptr) {
CompactMemTable();
return;
}
同时在之后能看到,在major compaction中,也会优先处理可能的minor compaction
选择文件放置的 level
在将 memtable 保存写入文件后,PickLevelForMemTableOutput
会选择合适的level来放置新的文件
以下流程都针对config::kNumLevels
为2的情况
- 如果和 level 0 的文件有重叠,直接放在 level 0
- 如果和 level 1 的文件有重叠,放在 level 0
- 如果和 level 2 的文件有重叠,且重叠文件的总大小大于
MaxGrandParentOverlapBytes
,放在 level 0(MaxGrandParentOverlapBytes
是为了避免这次操作导致下层后续的compaction压力过大,后面也会出现) - 如果和 level 2 的文件有重叠,放在 level 1
- 如果和 level 3 的文件有重叠,且重叠文件的总大小大于
MaxGrandParentOverlapBytes
,放在 level 2
后处理
// Replace immutable memtable with the generated Table
if (s.ok()) {
edit.SetPrevLogNumber(0);
// 只有在 minor compaction 中会设置 log number,因为 major compaction 不会创建新的log (😕不一定对)
edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed
s = versions_->LogAndApply(&edit, &mutex_);
}
if (s.ok()) {
// Commit to the new state
imm_->Unref();
imm_ = nullptr;
has_imm_.store(false, std::memory_order_release);
RemoveObsoleteFiles();
} else {
RecordBackgroundError(s);
}
之后就是Version
的更新,以及文件清理工作
Major compaction
触发 major compaction 的情况有三种,这里只考虑自动触发的两种情况,分别是
- 某一层的文件总大小超出了规定的阈值(level0考虑的是文件的数量)
- 某一个文件的seek次数过多
每次创建了新的VersionEdit
并作用产生新的Version
时,就会去检测情况一是否发生。
而情况二则是在查询次数过多时发生(TODO)
同时 size compaction 的优先级高于 seek compaction
Compaction结构体
Compaction
记录了一次compact涉及到的文件,其成员变量如下
int level_;
uint64_t max_output_file_size_;
Version* input_version_;
VersionEdit edit_;
// Each compaction reads inputs from "level_" and "level_+1"
std::vector<FileMetaData*> inputs_[2]; // The two sets of inputs
// State used to check for number of overlapping grandparent files
// (parent == level_ + 1, grandparent == level_ + 2)
std::vector<FileMetaData*> grandparents_;
size_t grandparent_index_; // Index in grandparent_starts_
bool seen_key_; // Some output key has been seen
int64_t overlapped_bytes_; // Bytes of overlap between current output
// and grandparent files
// State for implementing IsBaseLevelForKey
// level_ptrs_ holds indices into input_version_->levels_: our state
// is that we are positioned at one of the file ranges for each
// higher level than the ones involved in this compaction (i.e. for
// all L >= level_ + 2).
size_t level_ptrs_[config::kNumLevels];
在PickCompaction
会根据两种情况来计算需要进行 compaction 的文件
c = versions_->PickCompaction();
size compaction
size_compaction
会在当前的compaction_score
大于1时进行
if (size_compaction) {
level = current_->compaction_level_;
assert(level >= 0);
assert(level + 1 < config::kNumLevels);
c = new Compaction(options_, level);
// Pick the first file that comes after compact_pointer_[level]
for (size_t i = 0; i < current_->files_[level].size(); i++) {
FileMetaData* f = current_->files_[level][i];
if (compact_pointer_[level].empty() ||
icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) {
c->inputs_[0].push_back(f);
break;
}
}
if (c->inputs_[0].empty()) {
// Wrap-around to the beginning of the key space
c->inputs_[0].push_back(current_->files_[level][0]);
}
}
compact_pointer_
是 string 类型,记录了该层上次 compact 时文件的 largest key。
每次compact都会从该层上一次compact的位置开始选择合适的文件(如果走到末尾了,就再从头开始),这样应该是保证compaction能覆盖到所有的文件
处理level0的文件
因为 level 0 的文件是有可能重叠的,所以如果 compact 的是第0层,需要将和选择的文件有重叠的其他文件都一起下盘
例如我们有两个 level 0 的文件,范围分别是 (a0, u1) 和 (k2, v4)
第一次我们选择了第一个文件下降到 level 1
但是因为存在重叠,当我们查询 u
时,就会从 (k2, v4)
里获取到旧的数据,就导致了读取错误
对应的做法就是当选出文件后,判断还有哪些文件有重叠,把这些文件都加入进来,这个例子对应的就是把两个文件都加进来。
代码上,先通过GetRange
获取输入文件的 key range,然后根据 key range 得到一个所有有重叠文件列表(GetOverlappingInputs
内部会循环的判断重叠)
// Files in level 0 may overlap each other, so pick up all overlapping ones
if (level == 0) {
InternalKey smallest, largest;
GetRange(c->inputs_[0], &smallest, &largest);
// Note that the next call will discard the file we placed in
// c->inputs_[0] earlier and replace it with an overlapping set
// which will include the picked file.
current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]);
assert(!c->inputs_[0].empty());
}
seek compaction
这里的部分非常简单,就是选择file_to_compact_
这个文件,也就是 seek 次数过多的文件
接下来的部分是两种 compaction 共通的
SetupOtherInputs
这里的逻辑还是比较复杂的,分步进行解读
void AddBoundaryInputs(const InternalKeyComparator& icmp,
const std::vector<FileMetaData*>& level_files,
std::vector<FileMetaData*>* compaction_files);
AddBoundaryInputs(icmp_, current_->files_[level], &c->inputs_[0]);
这个函数在每次选择完某一个 level 的文件后都要进行一次,它的流程如下
- 找到
compaction_files
中最大的keyu1
,找到满足l2 > u1 && user_key(u1) = user_key(l2)
的最小的文件,加入compaction列表 - 将最大的key设为
u2
,再次进行搜索,直到无法找到为止 - 也就是将那些边界上 user key 相同的文件一起加入列表
如果存在两个block,b1 = (l1, u1)
和b2 = (l2, u2)
满足k = user_key(u1) = user_key(l2)
,那么在查询 k
时会先查到b2
而不是b1
的数据,而实际b2
保存的是更旧的数据,因此,要将这些边界重叠的文件一起移到下一层。
这个bug据说存在了三年才被官方重视
下图就是一个简单的例子,在查询 bb
时就会出现错误
current_->GetOverlappingInputs(level + 1, &smallest, &largest,
&c->inputs_[1]);
AddBoundaryInputs(icmp_, current_->files_[level + 1], &c->inputs_[1]);
// Get entire range covered by compaction
InternalKey all_start, all_limit;
GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);
回到选择文件上,首先是计算下一层与inputs_[0]
有重叠的所有文件,记录到inputs_[1]
,并计算 inputs_[1]
的 key 值范围
if (!c->inputs_[1].empty()) {
std::vector<FileMetaData*> expanded0;
current_->GetOverlappingInputs(level, &all_start, &all_limit, &expanded0);
AddBoundaryInputs(icmp_, current_->files_[level], &expanded0);
const int64_t inputs0_size = TotalFileSize(c->inputs_[0]);
const int64_t inputs1_size = TotalFileSize(c->inputs_[1]);
const int64_t expanded0_size = TotalFileSize(expanded0);
if (expanded0.size() > c->inputs_[0].size() &&
inputs1_size + expanded0_size <
ExpandedCompactionByteSizeLimit(options_)) {
InternalKey new_start, new_limit;
GetRange(expanded0, &new_start, &new_limit);
std::vector<FileMetaData*> expanded1;
current_->GetOverlappingInputs(level + 1, &new_start, &new_limit,
&expanded1);
AddBoundaryInputs(icmp_, current_->files_[level + 1], &expanded1);
if (expanded1.size() == c->inputs_[1].size()) {
smallest = new_start;
largest = new_limit;
c->inputs_[0] = expanded0;
c->inputs_[1] = expanded1;
GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);
}
}
}
之后我们希望尽可能compact更多的文件,基本的流程可以写为:
- 计算与
inputs_[1]
层范围有重叠的 level 层文件,记为expanded0
- 如果
expanded0
的总文件大小比inputs_[0]
大,并且两层需要 compact 的总文件数量小于一定的阈值,继续判断,否则退出 - 计算与
expanded0
有重叠的 level + 1 层文件,记为expand1
- 如果
expand1
的数量和inputs_[1]
相同,表明可以对 level 层进行一个扩充
也就是:在不增加 level + 1 层文件,同时不会导致 compact 的文件过大的前提下,尽量增加 level 层的文件数
这样就最终确定了需要参与 compact 的文件
为了避免这些文件合并到 level + 1 层后,跟 level + 2 层有重叠的文件太多,届时合并 level + 1 和 level + 2 层压力太大,因此我们还需要记录下 level + 2 层的文件,后续 compact 时用于提前结束的判断:
// Compute the set of grandparent files that overlap this compaction
// (parent == level+1; grandparent == level+2)
if (level + 2 < config::kNumLevels) {
//level + 2层overlap的文件记录到c->grandparents_
current_->GetOverlappingInputs(level + 2, &all_start, &all_limit,
&c->grandparents_);
}
IsTrivialMove
选择完文件后,先进行判断能否通过移动文件来实现 compaction,如果满足条件就可以直接移动,调整manifest文件,产生新的Version即可
- level 层只有一个文件
- level +1 层没有文件
- 和 level + 2 层overlap的文件没有超过阈值(
MaxGrandParentOverlapBytes
)
bool Compaction::IsTrivialMove() const {
const VersionSet* vset = input_version_->vset_;
// Avoid a move if there is lots of overlapping grandparent data.
// Otherwise, the move could create a parent file that will require
// a very expensive merge later on.
return (num_input_files(0) == 1 && num_input_files(1) == 0 &&
TotalFileSize(grandparents_) <=
MaxGrandParentOverlapBytes(vset->options_));
}
DoCompaction
之后就可以真正的进行处理了,处理的过程如下所示,之后再一步一步分解来看
CompactionState* compact = new CompactionState(c);
status = DoCompactionWork(compact);
if (!status.ok()) {
RecordBackgroundError(status);
}
CleanupCompaction(compact);
c->ReleaseInputs();
RemoveObsoleteFiles();
CompactionState
CompactionState
在Compaction
的基础上,还记录了如smallest_snapshot
等信息
struct DBImpl::CompactionState {
// Files produced by compaction
struct Output {
uint64_t number;
uint64_t file_size;
InternalKey smallest, largest;
};
//...
Compaction* const compaction;
// Sequence numbers < smallest_snapshot are not significant since we
// will never have to service a snapshot below smallest_snapshot.
// Therefore if we have seen a sequence number S <= smallest_snapshot,
// we can drop all entries for the same key with sequence numbers < S.
SequenceNumber smallest_snapshot;
std::vector<Output> outputs;
// State kept for output being generated
WritableFile* outfile;
TableBuilder* builder;
uint64_t total_bytes;
};
DoCompactionWork
首先设置 snapshot
if (snapshots_.empty()) {
compact->smallest_snapshot = versions_->LastSequence();
} else {
compact->smallest_snapshot = snapshots_.oldest()->sequence_number();
}
如果存在 snapshot,compact->smallest_snapshot
被设为最小的 snapshot,否则则是最新的 sequence number,大于smallest_snapshot
的肯定需要全部保留,因为可能被某个 snapshot 用到,而小于smallest_snapshot
的只需要保留一条比smallest_snapshot
小的最新的插入或者删除就够了,这些数据只会被smallest_snapshot
用到。
进行Compaction操作
首先需要根据需要 compact 的文件创建一个MergingIterator
const int space = (c->level() == 0 ? c->inputs_[0].size() + 1 : 2);
Iterator** list = new Iterator*[space];
int num = 0;
for (int which = 0; which < 2; which++) {
if (!c->inputs_[which].empty()) {
if (c->level() + which == 0) {
const std::vector<FileMetaData*>& files = c->inputs_[which];
for (size_t i = 0; i < files.size(); i++) {
list[num++] = table_cache_->NewIterator(options, files[i]->number,
files[i]->file_size);
}
} else {
// Create concatenating iterator for the files from this level
list[num++] = NewTwoLevelIterator(
new Version::LevelFileNumIterator(icmp_, &c->inputs_[which]),
&GetFileIterator, table_cache_, options);
}
}
}
assert(num <= space);
Iterator* result = NewMergingIterator(&icmp_, list, num);
因为 level0 的文件有重叠部分,所以不能使用TwoLevelIterator
,转而使用普通的Table::Iterator
while (input->Valid() && !shutting_down_.load(std::memory_order_acquire)) {
// Compact
}
整个 compact 流程就是不断的读取并且写入文件的过程,大致流程如下:
if (has_imm_.load(std::memory_order_relaxed)) {
const uint64_t imm_start = env_->NowMicros();
mutex_.Lock();
if (imm_ != nullptr) {
CompactMemTable();
// Wake up MakeRoomForWrite() if necessary.
background_work_finished_signal_.SignalAll();
}
mutex_.Unlock();
imm_micros += (env_->NowMicros() - imm_start);
}
每次读取一个kv,会先判断是否需要做 minor compaction,因为 minor compaction 的优先级是最高的
Slice key = input->key();
if (compact->compaction->ShouldStopBefore(key) &&
compact->builder != nullptr) {
status = FinishCompactionOutputFile(compact, input);
if (!status.ok()) {
break;
}
}
判断是否需要提前终止(TODO. 待看)
bool drop = false;
if (!ParseInternalKey(key, &ikey)) {
// Do not hide error keys
current_user_key.clear();
has_current_user_key = false;
last_sequence_for_key = kMaxSequenceNumber;
} else {
if (!has_current_user_key ||
user_comparator()->Compare(ikey.user_key, Slice(current_user_key)) !=
0) {
// First occurrence of this user key
current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
has_current_user_key = true;
last_sequence_for_key = kMaxSequenceNumber;
}
if (last_sequence_for_key <= compact->smallest_snapshot) {
// Hidden by an newer entry for same user key
drop = true; // (A)
} else if (ikey.type == kTypeDeletion &&
ikey.sequence <= compact->smallest_snapshot &&
compact->compaction->IsBaseLevelForKey(ikey.user_key)) {
// For this user key:
// (1) there is no data in higher levels
// (2) data in lower levels will have larger sequence numbers
// (3) data in layers that are being compacted here and have
// smaller sequence numbers will be dropped in the next
// few iterations of this loop (by rule (A) above).
// Therefore this deletion marker is obsolete and can be dropped.
drop = true;
}
last_sequence_for_key = ikey.sequence;
}
iterator 返回的 key 全部有序,遍历过程可以清理掉一些 key。
由于多次Put/Delete,有些key会出现多次,在compact时丢弃。策略如下:
- rule(A):对于多次出现的 user key,我们只关心最后写入的值,如果上一次写入的值的
last_sequence_for_key
已经小于compact->smallest_snapshot
了,这条kv就不需要插入(具体看之前的 smallest_snapshot 部分),- 只有当前key值是一个新的值才会触发
last_sequence_for_key = kMaxSequenceNumber
,跳过rule(A)
- 只有当前key值是一个新的值才会触发
- 如果是删除 key,并且满足下面的条件,就可以删除
- 当前的 sequence number 小于
compact->smallest_snapshot
- 更高层没有这个值
- 当前的 sequence number 小于
- 因为高层的值更旧,如果高层有值,就表明仍有小于
compact->smallest_snapshot
的插入操作,所以这个删除操作还是需要保留,以保证查询不出错
// Close output file if it is big enough
if (compact->builder->FileSize() >=
compact->compaction->MaxOutputFileSize()) {
status = FinishCompactionOutputFile(compact, input);
if (!status.ok()) {
break;
}
}
如果插入的文件大小过大,就创建一个新文件继续写入
if (status.ok()) {
status = InstallCompactionResults(compact);
}
最终合并完成后,把所有 compaction 使用到的文件和合并后创建的文件记录到 VersionEdit
中并产生新的 Version
清理
void DBImpl::CleanupCompaction(CompactionState* compact) {
mutex_.AssertHeld();
if (compact->builder != nullptr) {
// May happen if we get a shutdown call in the middle of compaction
compact->builder->Abandon();
delete compact->builder;
} else {
assert(compact->outfile == nullptr);
}
delete compact->outfile;
for (size_t i = 0; i < compact->outputs.size(); i++) {
const CompactionState::Output& out = compact->outputs[i];
pending_outputs_.erase(out.number);
}
delete compact;
}
void DBImpl::RemoveObsoleteFiles();
合并完成后,会将用到的文件从pending_outputs_
中移除,然后调用RemoveObsoleteFiles
删除文件
之前也看到,在合并过程中,也会去处理 minor compaction,而 minor compaction 也会通过RemoveObsoleteFiles
进行清理,因此在合并前需要将用到的文件加入pending_outputs_
中,防止被清理。