概述
在RocksDB中每一次数据的更新都会涉及到两个结构,一个是内存中的memtable(后续会刷新到磁盘成为SST),第二个是WAL(WriteAheadLog)。 本篇文章主要就是来介绍WAL.
WAL主要的功能是当RocksDB异常退出后,能够恢复出错前的内存中(memtable)数据,因此RocksDB默认是每次用户写都会刷新数据到WAL. 每次当当前WAL对应的内存数据(memtable)刷新到磁盘之后,都会新建一个WAL.
所有的WAL文件都是保存在WAL目录(options.wal_dir),为了保证数据的状态,所有的WAL文件的名字都是按照顺序的(log_number).
WAL文件格式
file
WAL文件由一堆变长的record组成,而每个record是由kBlockSize(32k)来分组,比如某一个record大于kBlockSize的话,他就会被切分为多个record(通过type来判断).
+-----+-------------+--+----+----------+------+-- ... ----+
File | r0 | r1 |P | r2 | r3 | r4 | |
+-----+-------------+--+----+----------+------+-- ... ----+
<--- kBlockSize ------>|<-- kBlockSize ------>|
rn = variable size records
P = Padding
record
record包括header和payload,格式如下:
+---------+-----------+-----------+--- ... ---+
|CRC (4B) | Size (2B) | Type (1B) | Payload |
+---------+-----------+-----------+--- ... ---+
CRC = 32bit hash computed over the payload using CRC
Size = Length of the payload data
Type = Type of record
(kZeroType, kFullType, kFirstType, kLastType, kMiddleType )
The type is used to group a bunch of records together to represent
blocks that are larger than kBlockSize
Payload = Byte stream as long as specified by the payload size
可以看到header包含4B的校验值、2B的Payload长度以及1B的记录类型。 对应源代码:
namespace rocksdb {
namespace log {
enum RecordType {
// Zero is reserved for preallocated files
kZeroType = 0,
kFullType = 1,
// For fragments
kFirstType = 2,
kMiddleType = 3,
kLastType = 4,
// For recycled log files
kRecyclableFullType = 5,
kRecyclableFirstType = 6,
kRecyclableMiddleType = 7,
kRecyclableLastType = 8,
};
static const int kMaxRecordType = kRecyclableLastType;
static const unsigned int kBlockSize = 32768;
// Header is checksum (4 bytes), type (1 byte), length (2 bytes).
static const int kHeaderSize = 4 + 1 + 2;
// Recyclable header is checksum (4 bytes), type (1 byte), log number
// (4 bytes), length (2 bytes).
static const int kRecyclableHeaderSize = 4 + 1 + 4 + 2;
} // namespace log
}
最后是WAL的payload的格式,也可以在源码中找到:
// WriteBatch::rep_ :=
// sequence: fixed64
// count: fixed32
// data: record[count]
// record :=
// kTypeValue varstring varstring
// kTypeDeletion varstring
// kTypeSingleDeletion varstring
// kTypeMerge varstring varstring
// kTypeColumnFamilyValue varint32 varstring varstring
// kTypeColumnFamilyDeletion varint32 varstring varstring
// kTypeColumnFamilySingleDeletion varint32 varstring varstring
// kTypeColumnFamilyMerge varint32 varstring varstring
// varstring :=
// len: varint32
// data: uint8[len]
上面的格式中可以看到有一个sequence的值,这个值主要用来表示WAL中操作的时序,这里要注意每次sequence的更新是按照WriteBatch来更新的.
Status DBImpl::WriteImpl(const WriteOptions& write_options,
WriteBatch* my_batch, WriteCallback* callback) {
......
WriteBatchInternal::SetSequence(merged_batch, current_sequence);
......
}
WAL创建
Open
首先是当一个新的DB被打开的时候会创建一个WAL:
Status DB::Open(const DBOptions& db_options, const std::string& dbname,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles, DB** dbptr) {
......
s = NewWritableFile(impl->db_options_.env,
LogFileName(impl->db_options_.wal_dir, new_log_number),
&lfile, opt_env_options);
......
}
SwitchMemtable
最后一个情况是当一个CF(column family)被刷新到磁盘之后,也会创建新的WAL,这种情况下创建WAL是用过SwitchMemtable函数. 这个函数主要是用来切换memtable,也就是做flush之前的切换(生成新的memtable,然后把老的刷新到磁盘)。生成新的memtable,自然也要创建新的WAL。
Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
......
if (creating_new_log) {
......
} else {
s = NewWritableFile(env_,
LogFileName(db_options_.wal_dir, new_log_number),
&lfile, opt_env_opt);
}
......
}
上面两种情况我们可以看到每次新建WAL都会有一个new_log_number,这个值就是对应的WAL的文件名前缀,可以看到每次生成新的log_number, 基本都会调用NewFileNumber函数.这里注意如果option设置了recycle_log_file_num的话,是有可能重用老的log_number的。我们先来看下NewFileNumber函数:
uint64_t NewFileNumber() { return next_file_number_.fetch_add(1); }
可以看到函数实现很简单,就是每次log_number加一,因此一般来说WAL的文件格式都是类似0000001.log这样子.
WAL清理
WAL的删除只有当包含在此WAL中的所有的数据都已经被持久化为SST之后(也有可能会延迟删除,因为有时候需要master发送transcation Log到slave来回放). 先来看DBImpl::FIndObsoleteFiles函数,这个函数很长,我们只关注对应的WAL部分,这里逻辑很简单,就是遍历所有的WAL,然后找出log_number小于当前min_log_number的文件然后加入到对应的结构(log_delete_files).
void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
bool no_full_scan) {
......
if (!alive_log_files_.empty()) {
uint64_t min_log_number = versions_->MinLogNumber();
// find newly obsoleted log files
while (alive_log_files_.begin()->number < min_log_number) {
auto& earliest = *alive_log_files_.begin();
if (db_options_.recycle_log_file_num > log_recycle_files.size()) {
RLOG(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"adding log %" PRIu64 " to recycle list\n", earliest.number);
log_recycle_files.push_back(earliest.number);
} else {
job_context->log_delete_files.push_back(earliest.number);
}
total_log_size_.fetch_sub(static_cast<int64_t>(earliest.size));
alive_log_files_.pop_front();
// Current log should always stay alive since it can't have
// number < MinLogNumber().
assert(alive_log_files_.size());
}
while (!logs_.empty() && logs_.front().number < min_log_number) {
auto& log = logs_.front();
if (log.getting_synced) {
log_sync_cv_.Wait();
// logs_ could have changed while we were waiting.
continue;
}
logs_to_free_.push_back(log.ReleaseWriter());
logs_.pop_front();
}
// Current log cannot be obsolete.
assert(!logs_.empty());
}
......
}
这里可以看到有两个核心的数据结构alivelog_files和logs,他们的区别就是前一个表示有写入的WAL,而后一个则是包括了所有的WAL(比如open一个DB,而没有写入数据,此时也会生成WAL).
最终删除WAL的操作是在DBImpl::DeleteObsoleteFiles这个函数,而WAL删除不会单独触发,而是和temp/sst这类文件一起被删除的(PurgeObsoleteFiles).
WAL写入
前面介绍了WAL的创建和清理,为什么最后再来介绍WAL的写入呢?
事实上,YugabyteDB基于RocksDB构建了DocDB这一文档存储引擎。
而DocDB使用Raft共识协议进行复制。对分布式系统的更改(例如行更新)已经作为Raft日志的一部分进行了记录。RocksDB中不需要额外的WAL机制,只会增加开销。
所以DocDB通过禁用RocksDB WAL避免了这种双重日记,而是依靠Raft日志作为事实来源。它跟踪Raft的“序列ID”,直到从RocksDB内存表将数据刷新到SSTable文件为止。这确保了我们可以正确地垃圾收集Raft日志,以及在服务器崩溃或重新启动时从Raft WAL日志中重放最少数量的记录。
namespace yb {
void InitRocksDBWriteOptions(rocksdb::WriteOptions* write_options) {
// We disable the WAL in RocksDB because we already have the Raft log and we should
// replay it during recovery.
write_options->disableWAL = true;
write_options->sync = false;
}
} // namespace yb