概述
首先我们知道在RocksDB中,最终数据的持久化都是保存在SST中,而SST则是由Memtable刷新到磁盘生成的,因此这次我们就主要来分析在RocksDB中何时以及如何来Flush内存数据(memtable)到SST.
简单来说在RocksDB中,每一个ColumnFamily都有自己的Memtable,当Memtable超过固定大小之后(或者WAL文件超过限制),它将会被设置为immutable,然后会有后台的线程启动来刷新这个immutable memtable到磁盘(SST).
相关设置
- write_buffer_size 表示每个columnfamily的memtable的大小限制
- db_write_buffer_size 总的memtable的大小限制(所有的ColumnFamily).
- max_write_buffer_number 最大的memtable的个数
- min_write_buffer_number_to_merge 表示最小的可以被flush的memtable的个数
Flush Memtable的触发条件
在下面这几种条件下RocksDB会flush memtable到磁盘.
- 当某一个memtable的大小超过write_buffer_size.
- 当总的memtable的大小超过db_write_buffer_size.
- 当WAL文件的大小超过max_total_wal_size之后 最后一个条件的原因是,当WAL文件大小太大之后,我们需要清理WAL,因此此时我们需要将此WAL对应的数据都刷新到磁盘,也是刷新Memtable.
源码
flushqueue
首先在全局的DBImpl中包含了一个flushqueue的队列,这个队列将会保存所有的将要被flush到磁盘的ColumnFamily.只有当当前的ColumnFamily满足flush条件(cfd->imm()->IsFlushPending())才会将此CF加入到flush队列.
class DBImpl {
................................
// A column family is inserted into flush_queue_ when it satisfies condition
// cfd->imm()->IsFlushPending()
std::deque<ColumnFamilyData*> flush_queue_;
...................
};
那么什么时候将cfd加入到flushqueue中呢?
SchedulePendingFlush
在SchedulePendingFlush函数中,最终会将对应的ColumnFamily加入到flush queue中.
void DBImpl::SchedulePendingFlush(ColumnFamilyData* cfd) {
if (!cfd->pending_flush() && cfd->imm()->IsFlushPending()) {
for (auto listener : db_options_.listeners) {
listener->OnFlushScheduled(this);
}
if (db_options_.priority_thread_pool_for_compactions_and_flushes &&
FLAGS_use_priority_thread_pool_for_flushes) {
++bg_flush_scheduled_;
cfd->Ref();
cfd->set_pending_flush(true);
SubmitCompactionOrFlushTask(std::make_unique<FlushTask>(this, cfd));
} else {
AddToFlushQueue(cfd);
++unscheduled_flushes_;
}
}
}
IsFlushPending
首先我们来看IsFlushPending的实现.这个函数的意思就是至少有一个memtable需要被flush.而MemTableList这个类则是保存了所有的immutable memtables.
// Returns true if there is at least one memtable on which flush has
// not yet started.
bool MemTableList::IsFlushPending() const {
if ((flush_requested_ && num_flush_not_started_ >= 1) ||
(num_flush_not_started_ >= min_write_buffer_number_to_merge_)) {
assert(imm_flush_needed.load(std::memory_order_relaxed));
return true;
}
return false;
}
上面这几个变量的含义在注释中比较清楚, 而minwrite_buffer_number_to_merge就是min_write_buffer_number_to_merge.
// the number of elements that still need flushing
int num_flush_not_started_;
// committing in progress
bool commit_in_progress_;
// Requested a flush of all memtables to storage
bool flush_requested_;
SubmitCompactionOrFlushTask
可以看到,rocksdb本来的处理是直接将cfd加入到flush_queue中,但yugabyte在这里对rocksdb做了修改,增加了一个新的优先级线程池来处理。
void DBImpl::SubmitCompactionOrFlushTask(std::unique_ptr<ThreadPoolTask> task) {
mutex_.AssertHeld();
if (task->Type() == BgTaskType::kCompaction) {
compaction_tasks_.insert(down_cast<CompactionTask*>(task.get()));
}
auto status = db_options_.priority_thread_pool_for_compactions_and_flushes->Submit(task->Priority(), &task);
if (!status.ok()) {
task->AbortedUnlocked();
}
}
这个线程池依赖于FLAGS_use_priority_thread_pool_for_flushes
,默认为false,所以默认最终还是通过flush_queue_
来处理。
AddToFlushQueue
最终通过AddToFlushQueue将cfd加入到了flushqueue中。
void DBImpl::AddToFlushQueue(ColumnFamilyData* cfd) {
assert(!cfd->pending_flush());
cfd->Ref();
flush_queue_.push_back(cfd);
cfd->set_pending_flush(true);
}
flush_queue_
中有值了,那什么时候将其中的值flush呢?继续往下走。
BackgroundFlush
刷新MemTable到磁盘是一个后台线程来做的,这个后台线程叫做BGWorkFlush,最终这个函数会调用BackgroundFlush函数,而BackgroundFlush主要功能是在flushqueue中找到一个ColumnFamily然后刷新它的memtable到磁盘.
先来看看BackgroundFlush
Result<FileNumbersHolder> DBImpl::BackgroundFlush(
.......
if (cfd == nullptr) {
while (!flush_queue_.empty()) {
// This cfd is already referenced
auto first_cfd = PopFirstFromFlushQueue();
if (first_cfd->IsDropped() || !first_cfd->imm()->IsFlushPending()) {
// can't flush this CF, try next one
if (first_cfd->Unref()) {
delete first_cfd;
}
continue;
}
// found a flush!
cfd = first_cfd;
break;
}
}
......
auto result = FlushMemTableToOutputFile(cfd, mutable_cf_options, made_progress,
job_context, log_buffer);
......
}
通过PopFirstFromFlushQueue
函数我们从flush_queue
中取出cfd,最终调用FlushMemTableToOutputFile
来刷新Memtable到磁盘,等到最后我们来分析这个函数.
BackgroundCallFlush
然后是调用BackgroundFlush
的BackgroundCallFlush
函数:
void DBImpl::BackgroundCallFlush(ColumnFamilyData* cfd) {
.......
Status s;
{
auto file_number_holder = BackgroundFlush(&made_progress, &job_context, &log_buffer, cfd);
s = yb::ResultToStatus(file_number_holder);
WaitAfterBackgroundError(s, "flush", &log_buffer);
}
......
}
BGWorkFlush
最后是后台线程BGWorkFlush
:
void DBImpl::BGWorkFlush(void* db) {
IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH);
TEST_SYNC_POINT("DBImpl::BGWorkFlush");
reinterpret_cast<DBImpl*>(db)->BackgroundCallFlush(nullptr /* cfd */);
TEST_SYNC_POINT("DBImpl::BGWorkFlush:done");
}
那什么时候去启动这个后台进程呢,继续.
MaybeScheduleFlushOrCompaction
BGWorkFlush
线程的启动在MaybeScheduleFlushOrCompaction
函数中进行:
void DBImpl::MaybeScheduleFlushOrCompaction() {
......
while (unscheduled_flushes_ > 0 &&
bg_flush_scheduled_ < db_options_.max_background_flushes) {
unscheduled_flushes_--;
bg_flush_scheduled_++;
env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH, this);
}
size_t bg_compactions_allowed = BGCompactionsAllowed();
// special case -- if max_background_flushes == 0, then schedule flush on a
// compaction thread
if (db_options_.max_background_flushes == 0) {
while (unscheduled_flushes_ > 0 &&
bg_flush_scheduled_ + bg_compaction_scheduled_ + compaction_tasks_.size() <
bg_compactions_allowed) {
unscheduled_flushes_--;
bg_flush_scheduled_++;
env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::LOW, this);
}
}
......
}
SwitchMemtable
在RocksDB中,每一次MaybeScheduleFlushOrCompaction
执行之前都会有一个SwitchMemtable
函数,这个函数用来将现在的memtable改变为immutable,然后再新建一个memtable,也就是说理论上来说每一次内存的memtable被刷新到磁盘之前肯定会调用这个函数.
而在实现中,每一次调用SwitchMemtable之后,都会调用对应immutable memtable的FlushRequested函数来设置对应memtable的flushrequeseted, 并且会调用上面的SchedulePendingFlush来将对应的ColumnFamily加入到flushqueue队列中.因此这里我们就通过这几个函数的调用栈来分析RocksDB中何时会触发flush操作.
在Yugabyte中会有三个地方会调用SwitchMemtable,分别是:
- DbImpl:: WriteImpl
- DBImpl::FlushMemTable
- DBImpl::ScheduleFlushes
接下来我们就来一个个分析这几个函数
WriteImpl
先来看WriteImpl.这个函数里有两个地方调用了SwitchMemtable.
total_log_size() > max_total_wal_size, 也就是判断是否WAL的大小是否已经超过了设置的wal大小(max_total_wal_size)
Status DBImpl::WriteImpl(const WriteOptions& write_options, WriteBatch* my_batch, WriteCallback* callback) { ...... if (UNLIKELY(!single_column_family_mode_ && alive_log_files_.begin()->getting_flushed == false && total_log_size() > max_total_wal_size)) { uint64_t flush_column_family_if_log_file = alive_log_files_.begin()->number; alive_log_files_.begin()->getting_flushed = true; RLOG(InfoLogLevel::INFO_LEVEL, db_options_.info_log, "Flushing all column families with data in WAL number %" PRIu64 ". Total log size is %" PRIu64 " while max_total_wal_size is %" PRIu64, flush_column_family_if_log_file, total_log_size(), max_total_wal_size); // no need to refcount because drop is happening in write thread, so can't // happen while we're in the write thread for (auto cfd : *versions_->GetColumnFamilySet()) { if (cfd->IsDropped()) { continue; } if (cfd->GetLogNumber() <= flush_column_family_if_log_file) { status = SwitchMemtable(cfd, &context); if (!status.ok()) { break; } cfd->imm()->FlushRequested(); SchedulePendingFlush(cfd); } } MaybeScheduleFlushOrCompaction(); } ...... }
writebuffer.ShouldFlush()->buffer_size() > 0 && memory_usage() >= buffer_size(); 也就是判断memtable所使用的内存是否已经超过限制.
Status DBImpl::WriteImpl(const WriteOptions& write_options, WriteBatch* my_batch, WriteCallback* callback) { ...... } else if (UNLIKELY(write_buffer_.ShouldFlush())) { RLOG(InfoLogLevel::INFO_LEVEL, db_options_.info_log, "Flushing column family with largest mem table size. Write buffer is " "using %" PRIu64 " bytes out of a total of %" PRIu64 ".", write_buffer_.memory_usage(), write_buffer_.buffer_size()); ....... if (largest_cfd != nullptr) { status = SwitchMemtable(largest_cfd, &context); if (status.ok()) { largest_cfd->imm()->FlushRequested(); SchedulePendingFlush(largest_cfd); MaybeScheduleFlushOrCompaction(); } } } ...... }
FlushMemTable
然后是FlushMemTable,这个函数用来强制刷新刷新memtable到磁盘,比如用户直接调用Flush接口.可以看到和上面的集中情况基本一致,switchmemtable->flushrequested->maybescheduleflushorcompaction.
Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
const FlushOptions& flush_options) {
Status s;
{
......
// SwitchMemtable() will release and reacquire mutex
// during execution
s = SwitchMemtable(cfd, &context);
write_thread_.ExitUnbatched(&w);
cfd->imm()->FlushRequested();
// schedule flush
SchedulePendingFlush(cfd);
MaybeScheduleFlushOrCompaction();
}
if (s.ok() && flush_options.wait) {
// Wait until the compaction completes
s = WaitForFlushMemTable(cfd);
}
return s;
}
ScheduleFlushes
最后是ScheduleFlushes
.
Status DBImpl::ScheduleFlushes(WriteContext* context) {
ColumnFamilyData* cfd;
while ((cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {
auto status = SwitchMemtable(cfd, context);
if (cfd->Unref()) {
delete cfd;
}
if (!status.ok()) {
return status;
}
}
return Status::OK();
}
在ScheduleFlushes函数里通用调用了SwitchMemtable,可是我们会发现它并没有调用MaybeScheduleFlushOrCompaction,也就是它并没有启动一个后台进程去执行flush,那么它是如何去执行flush的呢?
值得注意的是,SwitchMemtable函数本身能设置flush的时间,当我们的memtable没有通过上面两个函数进行flush时,会去进行判断是否达到设置的超时时间。当达到超时时间是,也会强制去刷新。
Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
......
cfd->mem()->SetFlushStartTime(std::chrono::steady_clock::now());
......
}
Result<bool> Tablet::IntentsDbFlushFilter(const rocksdb::MemTable& memtable) {
......
// Force flush of regular DB if we were not able to flush for too long.
auto timeout = std::chrono::milliseconds(FLAGS_intents_flush_max_delay_ms);
if (flush_intention != rocksdb::FlushAbility::kAlreadyFlushing &&
(shutdown_requested_.load(std::memory_order_acquire) ||
std::chrono::steady_clock::now() > memtable.FlushStartTime() + timeout)) {
VLOG_WITH_PREFIX(2) << __func__ << ", force flush";
rocksdb::FlushOptions options;
options.wait = false;
RETURN_NOT_OK(regular_db_->Flush(options));
}
......
}
所以最后这种情况与前面两种情况的一个最大的区别就是前面两种情况的出现都是需要立即调用flush线程来刷新memtable到磁盘,而最后这种情况则是没那么紧急的情况,可以等到后面超时时间再调用flush线程来刷新内容到磁盘.
什么状况下会用到这种情况呢?
Status DBImpl::WriteImpl(const WriteOptions& write_options,
WriteBatch* my_batch, WriteCallback* callback) {
......
if (UNLIKELY(status.ok() && !flush_scheduler_.Empty())) {
status = ScheduleFlushes(&context);
}
......
}
class MemTableInserter : public WriteBatch::Handler {
void CheckMemtableFull() {
if (flush_scheduler_ != nullptr) {
auto* cfd = cf_mems_->current();
assert(cfd != nullptr);
if (cfd->mem()->ShouldScheduleFlush() &&
cfd->mem()->MarkFlushScheduled()) {
// MarkFlushScheduled only returns true if we are the one that
// should take action, so no need to dedup further
flush_scheduler_->ScheduleFlush(cfd);
}
}
}
}
也就是当ShouldScheduleFlush()和MarkFlushScheduled()为true时,向 flushscheduler写入cfd,当flushscheduler不为空的时候,去执行ScheduleFlushes
// This method heuristically determines if the memtable should continue to
// host more data.
bool ShouldScheduleFlush() const {
return flush_state_.load(std::memory_order_relaxed) == FlushState::kRequested;
}
// Returns true if a flush should be scheduled and the caller should
// be the one to schedule it
bool MarkFlushScheduled() {
auto before = FlushState::kRequested;
return flush_state_.compare_exchange_strong(before,
FlushState::kScheduled,
std::memory_order_relaxed,
std::memory_order_relaxed);
}
所以最终也就是判断flushstate是不是FlushState::kRequested,那什么时候flushstate会被设置为FLUSH_REQUESTED呢?
void MemTable::UpdateFlushState() {
auto state = flush_state_.load(std::memory_order_relaxed);
if (state == FlushState::kNotRequested && ShouldFlushNow()) {
// ignore CAS failure, because that means somebody else requested
// a flush
flush_state_.compare_exchange_strong(state, FlushState::kRequested,
std::memory_order_relaxed,
std::memory_order_relaxed);
}
}
bool MemTable::ShouldFlushNow() const {
......
if (allocated_memory > moptions_.write_buffer_size +
kArenaBlockSize * kAllowOverAllocationRatio) {
return true;
}
......
return arena_.AllocatedAndUnused() < kArenaBlockSize / 4;
}
也就是最终的触发条件为当前MemTable的内存使用是否超过了write_buffer_size。
FlushMemTableToOutputFile
触发机制介绍完了,最后还是来看看前面说到的FlushMemTableToOutputFile.
Result<FileNumbersHolder> DBImpl::FlushMemTableToOutputFile(
ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
bool* made_progress, JobContext* job_context, LogBuffer* log_buffer) {
......
FlushJob flush_job(
dbname_, cfd, db_options_, mutable_cf_options, env_options_,
versions_.get(), &mutex_, &shutting_down_, snapshot_seqs,
earliest_write_conflict_snapshot, mem_table_flush_filter, pending_outputs_.get(),
job_context, log_buffer, directories_.GetDbDir(), directories_.GetDataDir(0U),
GetCompressionFlush(*cfd->ioptions()), stats_, &event_logger_);
FileMetaData file_meta;
auto file_number_holder = flush_job.Run(&file_meta);
......
}
Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
FileMetaData* file_meta) {
...........................................
// This will release and re-acquire the mutex.
Status s = WriteLevel0Table();
........................................................
}
在RocksDB中刷新是通过FlushJob这个类来实现的,整个实现还是比较简单.最终这里是调用WriteLevel0Table来刷新内容到磁盘。这里就不继续向后分析了.
总结
WAL的大小超过了设置的wal大小或者memtable所使用的内存超过限制时,会去触发flush机制。当不满足这两张情况时,为避免cfd长时间不能flush, 系统设置一个超时时间。当超过这个超时时间时,也会去触发flush