在之前的文章中我们知道RocksDB每一次写入,都是先写WAL,然后写Memtable,这次我们就来分析下MemTable的实现.

MemTable

MemTable是一个内存数据结构,它保存了落盘到SST文件前的数据。同时服务于读和写——新的写入总是将数据插入到memtable,读取在查询SST文件前总是要查询memtable,因为memtable里面的数据总是更新的。一旦一个memtable被写满,它会变成不可修改的,并被一个新的memtable替换。一个后台线程会把这个memtable的内容落盘到一个SST文件,然后这个memtable就可以被销毁了。并且在flush的过程中,会完成数据的压缩。

实现方式

默认的memtable实现是基于skiplist的。除了默认的memtable实现,用户可以使用其他memtable实现,例如HashLinkList,HashSkipList或者Vector,以加快查询速度。

相关选项

  • memtable_factory: memtable对象的工厂。通过声明一个工厂对象,用户可以改变底层memtable的实现,并提供事先声明的选项。
  • write_buffer_size:一个memtable的大小
  • db_write_buffer_size:多个列族的memtable的大小总和。这可以用来管理memtable使用的总内存数。
  • max_write_buffer_number:内存中可以拥有刷盘到SST文件前的最大memtable数。

插入方式

  • 并发插入:如果不支持对memtable进行并发插入,从多个线程过来的并发写会按顺序应用到memtable中。并发memtable插入是默认打开的,可以通过allow_concurrent_memtable_write选项来关闭,尽管只有skip-list的memtable支持这个功能。
  • 带Hint插入
  • 原地更新
MEMTABLE类型 SKIPLIST HASHSKIPLIST HASHLINKLIST VECTOR
最佳使用场景 通用 带特殊key前缀的范围查询 带特殊key前缀,并且每个前缀都只有很小数量的行 大量随机写压力
索引类型 二分搜索 哈希+二分搜索 哈希+线性搜索 线性搜索
是否支持全量db有序扫描? 天然支持 非常耗费资源(拷贝以及排序一生成一个临时视图) 同HashSkipList 同HashSkipList
额外内存 平均(每个节点有多个指针) 高(哈希桶+非空桶的skiplist元数据+每个节点多个指针) 稍低(哈希桶+每个节点的指针) 低(vector尾部预分配的内存)
Memtable落盘 快速,以及固定数量的额外内存 慢,并且大量临时内存使用 同HashSkipList 同HashSkipList
并发插入 支持 不支持 不支持 不支持
带Hint插入 支持(在没有并发插入的时候) 不支持 不支持 不支持

Immutable Memtable

所有的写操作都是在memtable进行,当memtable空间不足时,会创建一块新的memtable来继续接收写操作,原先的内存将被标识为只读模式,等待被刷入sst。在任何时候,一个CF中,只存在一块active memtable和0+块immutable memtable。

源码分析

MemTable创建

首先从创建Memtable开始,Memtable的创建(ColumnFamilyData::CreateNewMemtable)是在创建ColumnFamily(VersionSet::CreateColumnFamily)的时候创建的.这里就是创建memtable,然后设置到ColumnFamilyData的mem_域中.

MemTable* ColumnFamilyData::ConstructNewMemtable(
    const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
  DCHECK_ONLY_NOTNULL(current());
  return new MemTable(*internal_comparator_, ioptions_, mutable_cf_options,
                      write_buffer_, earliest_seq);
}

void ColumnFamilyData::CreateNewMemtable(
    const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
  if (mem_ != nullptr) {
    delete mem_->Unref();
  }
  SetMemtable(ConstructNewMemtable(mutable_cf_options, earliest_seq));
  mem_->Ref();
}

上面所提及的,RocksDB有多种MemTable的实现,那么它是如何来做的呢?

在相关选项中,我们提到了memtablefactory,RocksDB通过memtable_factory来根据用户的设置来创建不同的memtable.这里要注意的是核心的memtable实现是在MemTable这个类的table域中.

MemTable::MemTable(......)
    : ......
      table_(ioptions.memtable_factory->CreateMemTableRep(
          comparator_, &allocator_, ioptions.prefix_extractor,
          ioptions.info_log)),

// This is the base class for all factories that are used by RocksDB to create
// new MemTableRep objects
class MemTableRepFactory {
 public:
  virtual ~MemTableRepFactory() {}
  virtual MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator&,
                                         MemTableAllocator*,
                                         const SliceTransform*,
                                         Logger* logger) = 0;
  virtual const char* Name() const = 0;

  // Return true if the current MemTableRep supports concurrent inserts
  // Default: false
  virtual bool IsInsertConcurrentlySupported() const { return false; }

  virtual bool IsInMemoryEraseSupported() const { return false; }
};

MemTable在构造函数中调用了CreateMemTableRep,而CreateMemTableRep是MemTableRepFactory类的一个虚函数,最终会去调用对应的实现的CreateMemTableRep方法。

ColumnFamilyOptions::ColumnFamilyOptions()
  : .......
  memtable_factory(std::shared_ptr<SkipListFactory>(new SkipListFactory)),
  ......

可以看到,在Option中指定使用SkipListFactory。这里我们就来看看SkipList的实现。

MemTableRep* SkipListFactory::CreateMemTableRep(
    const MemTableRep::KeyComparator& compare, MemTableAllocator* allocator,
    const SliceTransform* transform, Logger* logger) {
  if (concurrent_writes_) {
    return new SkipListRep<InlineSkipList<const MemTableRep::KeyComparator&>>(
        compare, allocator, transform, lookahead_);
  } else {
    return new SkipListRep<SingleWriterInlineSkipList<const MemTableRep::KeyComparator&>>(
        compare, allocator, transform, lookahead_);
  }
}

最终就是创建SkipListRep对象,在这个对象里面会创建SkipList:

class SkipListRep : public MemTableRep {
  SkipListImpl skip_list_;
  ......
 public:
  explicit SkipListRep(const MemTableRep::KeyComparator& compare,
                       MemTableAllocator* allocator,
                       const SliceTransform* transform, const size_t lookahead)
    : MemTableRep(allocator), skip_list_(compare, allocator), cmp_(compare),
      transform_(transform), lookahead_(lookahead) {
  }

这里SkipList就不继续向下分析实现了,我们只需要知道最终所有的memtable数据都是保存在SkipList中就可以了.

MemTable写入

Memtable的插入是通过WriteBatch然后遍历ColumnFamily来插入的,而最终则是会调用MemTable::Add这个函数。 让我们来看看这个函数.

数据格式化

RocksDB是一个KV存储,那么这个KV是如何存储的呢,这里是这样的,RocksDB会将KV打包成一个key传递给SkipList, 对应的KEY的结构是这样的.

void MemTable::Add(SequenceNumber s, ValueType type,
                   const Slice& key, /* user key */
                   const Slice& value, bool allow_concurrent) {
  // Format of an entry is concatenation of:
  //  key_size     : varint32 of internal_key.size()
  //  key bytes    : char[internal_key.size()]
  //  value_size   : varint32 of value.size()
  //  value bytes  : char[value.size()]
  .......
}

将我们要存储的数据格式化成上面这样的结构:

void MemTable::Add(SequenceNumber s, ValueType type,
                   const Slice& key, /* user key */
                   const Slice& value, bool allow_concurrent) {
  ......
    uint32_t key_size = static_cast<uint32_t>(key.size());
  uint32_t val_size = static_cast<uint32_t>(value.size());
  uint32_t internal_key_size = key_size + 8;
  const uint32_t encoded_len = VarintLength(internal_key_size) +
                               internal_key_size + VarintLength(val_size) +
                               val_size;
  char* buf = nullptr;
  KeyHandle handle = table_->Allocate(encoded_len, &buf);

  char* p = EncodeVarint32(buf, internal_key_size);
  memcpy(p, key.data(), key_size);
  p += key_size;
  uint64_t packed = PackSequenceAndType(s, type);
  EncodeFixed64(p, packed);
  p += 8;
  p = EncodeVarint32(p, val_size);
  memcpy(p, value.data(), val_size);
  assert((unsigned)(p + val_size - buf) == (unsigned)encoded_len);
  ......
}

并发插入

通过allow_concurrent判断是否支持并发插入,默认使用SkipList,所以是支持的。

void MemTable::Add(SequenceNumber s, ValueType type,
                   const Slice& key, /* user key */
                   const Slice& value, bool allow_concurrent) {
  ......
  if (!allow_concurrent) {
    ......
   } else {
    table_->InsertConcurrently(handle);
    ......
  }
  UpdateFlushState();
}

class SkipListRep : public MemTableRep {
  ......
    void InsertConcurrently(KeyHandle handle) override {
    skip_list_.InsertConcurrently(static_cast<char*>(handle));
  }
  ......
}

最终会调用InlineSkipList来对数据进行插入:

template <class Comparator>
void InlineSkipList<Comparator>::InsertConcurrently(const char* key) {
  Node* x = reinterpret_cast<Node*>(const_cast<char*>(key)) - 1;
  ......
  Node* prev[kMaxPossibleHeight + 1];
  Node* next[kMaxPossibleHeight + 1];
  prev[max_height] = head_;
  next[max_height] = nullptr;
  for (int i = max_height - 1; i >= 0; --i) {
    FindLevelSplice(key, prev[i + 1], next[i + 1], i, &prev[i], &next[i]);
  }
  for (int i = 0; i < height; ++i) {
    while (true) {
      x->NoBarrier_SetNext(i, next[i]);
      if (prev[i]->CASNext(i, next[i], x)) {
        // success
        break;
      }
      ......
    }
  }
}

MemTable读取

首先在memtable中读取,或者在immutable memtable中读取。

Status DBImpl::GetImpl(const ReadOptions& read_options,
                       ColumnFamilyHandle* column_family, const Slice& key,
                       std::string* value, bool* value_found) {
  ......
  if (!skip_memtable) {
    if (sv->mem->Get(lkey, value, &s, &merge_context)) {
      done = true;
      RecordTick(stats_, MEMTABLE_HIT);
    } else if (sv->imm->Get(lkey, value, &s, &merge_context)) {
      done = true;
      RecordTick(stats_, MEMTABLE_HIT);
    }
  }
  if (!done) {
    PERF_TIMER_GUARD(get_from_output_files_time);
    sv->current->Get(read_options, lkey, value, &s, &merge_context,
                           value_found);
    RecordTick(stats_, MEMTABLE_MISS);
  }
  ......
}

最终则是会调用MemTable::Get这个函数。

bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
                   MergeContext* merge_context, SequenceNumber* seq) {
  ......
  table_->Get(key, &saver, SaveValue);
  ......
}

最终在SkipList中找到对应的数据

  virtual void Get(const LookupKey& k, void* callback_args,
                   bool (*callback_func)(void* arg,
                                         const char* entry)) override {
    SkipListRep::Iterator iter(&skip_list_);
    Slice dummy_slice;
    for (iter.Seek(dummy_slice, k.memtable_key().cdata());
         iter.Valid() && callback_func(callback_args, iter.key());
         iter.Next()) {
    }
  }

MemTable清理

有两种MemTable清理的情况:

  1. 当MemTable中的数据flush到sst中后,会对这些已flush的MemTable进行清理。
  2. 删除掉了数据,其对应的MemTable也应进行清理

flush

当MemTable中的数据flush到sst中后,会对这些已flush的MemTable进行清理。

首先是当选中要flush的MemTable时,将其加入待清理的队列。

Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
  ......
  cfd->imm()->Add(cfd->mem(), &context->memtables_to_free_);
  ......
}

等到flush完成后,将其清理掉。

void DBImpl::BackgroundCallFlush(ColumnFamilyData* cfd) {
  .....
  BackgroundJobComplete(s, &job_context, &log_buffer);
  ......
}

void DBImpl::BackgroundJobComplete(
    const Status& s, JobContext* job_context, LogBuffer* log_buffer) {
    ......
    job_context->Clean();
    ......
}

  void Clean() {
    // free pending memtables
    for (auto m : memtables_to_free) {
      delete m;
    }
    ......
  }

Delete

数据被删除后,同样应对MemTable进行清理。

CHECKED_STATUS DeleteImpl(uint32_t column_family_id, const Slice& key,
                            ValueType delete_type) {
  ......
    MemTable* mem = cf_mems_->GetMemTable();
    if ((delete_type == ValueType::kTypeSingleDeletion ||
         delete_type == ValueType::kTypeColumnFamilySingleDeletion) &&
        mem->Erase(key)) {
      return Status::OK();
    }
  ......
}

最终会调用MemTable::Erase进行清理。

bool MemTable::Erase(const Slice& user_key) {
  ......
  if (table_->Erase(buf, only_user_key_comparator)) {
    // this is a bit ugly, but is the way to avoid locked instructions
    // when incrementing an atomic
    num_erased_.store(num_erased_.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed);

    UpdateFlushState();
    return true;
  } else if (had_delete) { // Do nothing in case when we already had delete.
    return true;
  }
  ......
}

template<class Key, class Comparator, class NodeType>
bool SkipListBase<Key, Comparator, NodeType>::Erase(Key key, Comparator cmp) {
  auto prev = static_cast<Node**>(alloca(sizeof(Node*) * kMaxHeight_));
  auto node = FindLessThan(key, prev);
  node = node->Next(0);
  if (node == nullptr || cmp(key, node->key) != 0) {
    return false;
  }

  for (int level = max_height_; --level >= 0;) {
    if (prev[level]->NoBarrier_Next(level) == node) {
      prev[level]->SetNext(level, node->Next(level));
    }
  }

  prev_valid_ = false;

  return true;
}

Copyright © itrunner.cn 2020 all right reserved,powered by Gitbook该文章修订时间: 2024-08-01 14:07:49

results matching ""

    No results matching ""