在之前的文章中我们知道RocksDB每一次写入,都是先写WAL,然后写Memtable,这次我们就来分析下MemTable的实现.
MemTable
MemTable是一个内存数据结构,它保存了落盘到SST文件前的数据。同时服务于读和写——新的写入总是将数据插入到memtable,读取在查询SST文件前总是要查询memtable,因为memtable里面的数据总是更新的。一旦一个memtable被写满,它会变成不可修改的,并被一个新的memtable替换。一个后台线程会把这个memtable的内容落盘到一个SST文件,然后这个memtable就可以被销毁了。并且在flush的过程中,会完成数据的压缩。
实现方式
默认的memtable实现是基于skiplist的。除了默认的memtable实现,用户可以使用其他memtable实现,例如HashLinkList,HashSkipList或者Vector,以加快查询速度。
相关选项
- memtable_factory: memtable对象的工厂。通过声明一个工厂对象,用户可以改变底层memtable的实现,并提供事先声明的选项。
- write_buffer_size:一个memtable的大小
- db_write_buffer_size:多个列族的memtable的大小总和。这可以用来管理memtable使用的总内存数。
- max_write_buffer_number:内存中可以拥有刷盘到SST文件前的最大memtable数。
插入方式
- 并发插入:如果不支持对memtable进行并发插入,从多个线程过来的并发写会按顺序应用到memtable中。并发memtable插入是默认打开的,可以通过allow_concurrent_memtable_write选项来关闭,尽管只有skip-list的memtable支持这个功能。
- 带Hint插入
- 原地更新
MEMTABLE类型 | SKIPLIST | HASHSKIPLIST | HASHLINKLIST | VECTOR |
---|---|---|---|---|
最佳使用场景 | 通用 | 带特殊key前缀的范围查询 | 带特殊key前缀,并且每个前缀都只有很小数量的行 | 大量随机写压力 |
索引类型 | 二分搜索 | 哈希+二分搜索 | 哈希+线性搜索 | 线性搜索 |
是否支持全量db有序扫描? | 天然支持 | 非常耗费资源(拷贝以及排序一生成一个临时视图) | 同HashSkipList | 同HashSkipList |
额外内存 | 平均(每个节点有多个指针) | 高(哈希桶+非空桶的skiplist元数据+每个节点多个指针) | 稍低(哈希桶+每个节点的指针) | 低(vector尾部预分配的内存) |
Memtable落盘 | 快速,以及固定数量的额外内存 | 慢,并且大量临时内存使用 | 同HashSkipList | 同HashSkipList |
并发插入 | 支持 | 不支持 | 不支持 | 不支持 |
带Hint插入 | 支持(在没有并发插入的时候) | 不支持 | 不支持 | 不支持 |
Immutable Memtable
所有的写操作都是在memtable进行,当memtable空间不足时,会创建一块新的memtable来继续接收写操作,原先的内存将被标识为只读模式,等待被刷入sst。在任何时候,一个CF中,只存在一块active memtable和0+块immutable memtable。
源码分析
MemTable创建
首先从创建Memtable开始,Memtable的创建(ColumnFamilyData::CreateNewMemtable)是在创建ColumnFamily(VersionSet::CreateColumnFamily)的时候创建的.这里就是创建memtable,然后设置到ColumnFamilyData的mem_域中.
MemTable* ColumnFamilyData::ConstructNewMemtable(
const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
DCHECK_ONLY_NOTNULL(current());
return new MemTable(*internal_comparator_, ioptions_, mutable_cf_options,
write_buffer_, earliest_seq);
}
void ColumnFamilyData::CreateNewMemtable(
const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
if (mem_ != nullptr) {
delete mem_->Unref();
}
SetMemtable(ConstructNewMemtable(mutable_cf_options, earliest_seq));
mem_->Ref();
}
上面所提及的,RocksDB有多种MemTable的实现,那么它是如何来做的呢?
在相关选项中,我们提到了memtablefactory,RocksDB通过memtable_factory来根据用户的设置来创建不同的memtable.这里要注意的是核心的memtable实现是在MemTable这个类的table域中.
MemTable::MemTable(......)
: ......
table_(ioptions.memtable_factory->CreateMemTableRep(
comparator_, &allocator_, ioptions.prefix_extractor,
ioptions.info_log)),
// This is the base class for all factories that are used by RocksDB to create
// new MemTableRep objects
class MemTableRepFactory {
public:
virtual ~MemTableRepFactory() {}
virtual MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator&,
MemTableAllocator*,
const SliceTransform*,
Logger* logger) = 0;
virtual const char* Name() const = 0;
// Return true if the current MemTableRep supports concurrent inserts
// Default: false
virtual bool IsInsertConcurrentlySupported() const { return false; }
virtual bool IsInMemoryEraseSupported() const { return false; }
};
MemTable在构造函数中调用了CreateMemTableRep,而CreateMemTableRep是MemTableRepFactory类的一个虚函数,最终会去调用对应的实现的CreateMemTableRep方法。
ColumnFamilyOptions::ColumnFamilyOptions()
: .......
memtable_factory(std::shared_ptr<SkipListFactory>(new SkipListFactory)),
......
可以看到,在Option中指定使用SkipListFactory。这里我们就来看看SkipList的实现。
MemTableRep* SkipListFactory::CreateMemTableRep(
const MemTableRep::KeyComparator& compare, MemTableAllocator* allocator,
const SliceTransform* transform, Logger* logger) {
if (concurrent_writes_) {
return new SkipListRep<InlineSkipList<const MemTableRep::KeyComparator&>>(
compare, allocator, transform, lookahead_);
} else {
return new SkipListRep<SingleWriterInlineSkipList<const MemTableRep::KeyComparator&>>(
compare, allocator, transform, lookahead_);
}
}
最终就是创建SkipListRep对象,在这个对象里面会创建SkipList:
class SkipListRep : public MemTableRep {
SkipListImpl skip_list_;
......
public:
explicit SkipListRep(const MemTableRep::KeyComparator& compare,
MemTableAllocator* allocator,
const SliceTransform* transform, const size_t lookahead)
: MemTableRep(allocator), skip_list_(compare, allocator), cmp_(compare),
transform_(transform), lookahead_(lookahead) {
}
这里SkipList就不继续向下分析实现了,我们只需要知道最终所有的memtable数据都是保存在SkipList中就可以了.
MemTable写入
Memtable的插入是通过WriteBatch然后遍历ColumnFamily来插入的,而最终则是会调用MemTable::Add这个函数。 让我们来看看这个函数.
数据格式化
RocksDB是一个KV存储,那么这个KV是如何存储的呢,这里是这样的,RocksDB会将KV打包成一个key传递给SkipList, 对应的KEY的结构是这样的.
void MemTable::Add(SequenceNumber s, ValueType type,
const Slice& key, /* user key */
const Slice& value, bool allow_concurrent) {
// Format of an entry is concatenation of:
// key_size : varint32 of internal_key.size()
// key bytes : char[internal_key.size()]
// value_size : varint32 of value.size()
// value bytes : char[value.size()]
.......
}
将我们要存储的数据格式化成上面这样的结构:
void MemTable::Add(SequenceNumber s, ValueType type,
const Slice& key, /* user key */
const Slice& value, bool allow_concurrent) {
......
uint32_t key_size = static_cast<uint32_t>(key.size());
uint32_t val_size = static_cast<uint32_t>(value.size());
uint32_t internal_key_size = key_size + 8;
const uint32_t encoded_len = VarintLength(internal_key_size) +
internal_key_size + VarintLength(val_size) +
val_size;
char* buf = nullptr;
KeyHandle handle = table_->Allocate(encoded_len, &buf);
char* p = EncodeVarint32(buf, internal_key_size);
memcpy(p, key.data(), key_size);
p += key_size;
uint64_t packed = PackSequenceAndType(s, type);
EncodeFixed64(p, packed);
p += 8;
p = EncodeVarint32(p, val_size);
memcpy(p, value.data(), val_size);
assert((unsigned)(p + val_size - buf) == (unsigned)encoded_len);
......
}
并发插入
通过allow_concurrent判断是否支持并发插入,默认使用SkipList,所以是支持的。
void MemTable::Add(SequenceNumber s, ValueType type,
const Slice& key, /* user key */
const Slice& value, bool allow_concurrent) {
......
if (!allow_concurrent) {
......
} else {
table_->InsertConcurrently(handle);
......
}
UpdateFlushState();
}
class SkipListRep : public MemTableRep {
......
void InsertConcurrently(KeyHandle handle) override {
skip_list_.InsertConcurrently(static_cast<char*>(handle));
}
......
}
最终会调用InlineSkipList来对数据进行插入:
template <class Comparator>
void InlineSkipList<Comparator>::InsertConcurrently(const char* key) {
Node* x = reinterpret_cast<Node*>(const_cast<char*>(key)) - 1;
......
Node* prev[kMaxPossibleHeight + 1];
Node* next[kMaxPossibleHeight + 1];
prev[max_height] = head_;
next[max_height] = nullptr;
for (int i = max_height - 1; i >= 0; --i) {
FindLevelSplice(key, prev[i + 1], next[i + 1], i, &prev[i], &next[i]);
}
for (int i = 0; i < height; ++i) {
while (true) {
x->NoBarrier_SetNext(i, next[i]);
if (prev[i]->CASNext(i, next[i], x)) {
// success
break;
}
......
}
}
}
MemTable读取
首先在memtable中读取,或者在immutable memtable中读取。
Status DBImpl::GetImpl(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value, bool* value_found) {
......
if (!skip_memtable) {
if (sv->mem->Get(lkey, value, &s, &merge_context)) {
done = true;
RecordTick(stats_, MEMTABLE_HIT);
} else if (sv->imm->Get(lkey, value, &s, &merge_context)) {
done = true;
RecordTick(stats_, MEMTABLE_HIT);
}
}
if (!done) {
PERF_TIMER_GUARD(get_from_output_files_time);
sv->current->Get(read_options, lkey, value, &s, &merge_context,
value_found);
RecordTick(stats_, MEMTABLE_MISS);
}
......
}
最终则是会调用MemTable::Get这个函数。
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
MergeContext* merge_context, SequenceNumber* seq) {
......
table_->Get(key, &saver, SaveValue);
......
}
最终在SkipList中找到对应的数据
virtual void Get(const LookupKey& k, void* callback_args,
bool (*callback_func)(void* arg,
const char* entry)) override {
SkipListRep::Iterator iter(&skip_list_);
Slice dummy_slice;
for (iter.Seek(dummy_slice, k.memtable_key().cdata());
iter.Valid() && callback_func(callback_args, iter.key());
iter.Next()) {
}
}
MemTable清理
有两种MemTable清理的情况:
- 当MemTable中的数据flush到sst中后,会对这些已flush的MemTable进行清理。
- 删除掉了数据,其对应的MemTable也应进行清理
flush
当MemTable中的数据flush到sst中后,会对这些已flush的MemTable进行清理。
首先是当选中要flush的MemTable时,将其加入待清理的队列。
Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
......
cfd->imm()->Add(cfd->mem(), &context->memtables_to_free_);
......
}
等到flush完成后,将其清理掉。
void DBImpl::BackgroundCallFlush(ColumnFamilyData* cfd) {
.....
BackgroundJobComplete(s, &job_context, &log_buffer);
......
}
void DBImpl::BackgroundJobComplete(
const Status& s, JobContext* job_context, LogBuffer* log_buffer) {
......
job_context->Clean();
......
}
void Clean() {
// free pending memtables
for (auto m : memtables_to_free) {
delete m;
}
......
}
Delete
数据被删除后,同样应对MemTable进行清理。
CHECKED_STATUS DeleteImpl(uint32_t column_family_id, const Slice& key,
ValueType delete_type) {
......
MemTable* mem = cf_mems_->GetMemTable();
if ((delete_type == ValueType::kTypeSingleDeletion ||
delete_type == ValueType::kTypeColumnFamilySingleDeletion) &&
mem->Erase(key)) {
return Status::OK();
}
......
}
最终会调用MemTable::Erase进行清理。
bool MemTable::Erase(const Slice& user_key) {
......
if (table_->Erase(buf, only_user_key_comparator)) {
// this is a bit ugly, but is the way to avoid locked instructions
// when incrementing an atomic
num_erased_.store(num_erased_.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed);
UpdateFlushState();
return true;
} else if (had_delete) { // Do nothing in case when we already had delete.
return true;
}
......
}
template<class Key, class Comparator, class NodeType>
bool SkipListBase<Key, Comparator, NodeType>::Erase(Key key, Comparator cmp) {
auto prev = static_cast<Node**>(alloca(sizeof(Node*) * kMaxHeight_));
auto node = FindLessThan(key, prev);
node = node->Next(0);
if (node == nullptr || cmp(key, node->key) != 0) {
return false;
}
for (int level = max_height_; --level >= 0;) {
if (prev[level]->NoBarrier_Next(level) == node) {
prev[level]->SetNext(level, node->Next(level));
}
}
prev_valid_ = false;
return true;
}