raft
在tablet_peer初始化时被创建,伴随着tablet_peer的启动而启动。
tablet_peer
DocDB 中的数据复制是在 tablet 级别实现的,使用tablet-peers。每个表都被分片成一组tablets。
而每个tablet由一组tablet-peers组成,每个tablet-peers都存储属于该tablet的数据的一个副本。一个tablet有多少个tablet-peer和replication factor一样多,它们就形成了一个Raft group。tablet-peer 托管在不同的节点上,以允许节点故障时的数据冗余。请注意,tablet-peer 之间的数据复制是 高度一致的。
下图说明了属于一个tablet(tablet1)的三个tablet-peers。tablet-peers 托管在不同的 YB-TServer 上,并形成一个 Raft 组,用于领导者选举、故障检测和预写日志的复制。
而在Yugabyte中存在着两种类型的tablet:
- 一种是元数据表sys.catalog的tablet;
- 另一种是用户定义的表的tablet。
元数据表
Yugabyte中的Catalog Manager 追踪集群中用户定义的表和以及表对应的tablet。 所有的table和tablet信息以TableInfo / TabletInfo 对象的形式通过写入时复制(copy-on-write)的方式存储在内存及sys.catalog对应的磁盘中。sys.catalog这个系统表在Master启动时加载入内存。 为了保证元数据的强一致性,sys.catalog仅仅存在一个tablet中(catalog也是tablet结构,为了便于采用Raft协议进行复制)。
static const char* const kSysCatalogTabletId = "00000000000000000000000000000000";
static const char* const kSysCatalogTableId = "sys.catalog.uuid";
static const char* const kSysCatalogTableName = "sys.catalog";
元数据表对应的属性如上。 Catalog Manager 初始化时会去判断系统是第一次运行还是非第一次运行。
- 当系统第一次运行时,会去创建这个表,并写入磁盘。
- 当系统非第一次运行时,会根据磁盘上对应的目录找到系统表对应的文件,通过文件中的信息构建这张表。
对应的磁盘目录为数据目录(/home/data) + yb-data/master/tablet-meta
[root@localhost tablet-meta]# pwd /home/data/yb-data/master/tablet-meta [root@localhost tablet-meta]# ls 00000000000000000000000000000000 [root@localhost tablet-meta]#
看看代码。 1 InitSysCatalogAsync函数首先去master配置中的磁盘尝试加载元数据。如果加载失败,那么应该是还未初始化过元数据,也就是服务第一次启动时,此时通过CreateNew去创建新的SysCatalogTable。
Status CatalogManager::InitSysCatalogAsync() {
// Optimistically try to load data from disk.
Status s = sys_catalog_->Load(master_->fs_manager());
......
RETURN_NOT_OK_PREPEND(sys_catalog_->CreateNew(master_->fs_manager()),
Substitute("Encountered errors during system catalog initialization:"
"\n\tError on Load: $0\n\tError on CreateNew: ", s.ToString()));
......
}
return s;
}
2 Load函数去加载SysCatalogTable。
Status SysCatalogTable::Load(FsManager* fs_manager) {
......
// Load Metadata Information from disk
auto metadata = VERIFY_RESULT(tablet::RaftGroupMetadata::Load(fs_manager, kSysCatalogTabletId));
......
RETURN_NOT_OK(SetupTablet(metadata));
return Status::OK();
}
3 若Load失败,则通过CreateNew创建这个表。
Status SysCatalogTable::CreateNew(FsManager *fs_manager) {
......
auto metadata = VERIFY_RESULT(tablet::RaftGroupMetadata::CreateNew(......);
......
return SetupTablet(metadata);
}
4 可以看到无论是CreateNew还是Load,最终都是会调用SetupTablet(metadata)通过元数据来来创建tablet,并使用OpenTablet完成其初始化并启动。
Status SysCatalogTable::SetupTablet(const scoped_refptr<tablet::RaftGroupMetadata>& metadata) {
SetupTabletPeer(metadata);
RETURN_NOT_OK(OpenTablet(metadata));
return Status::OK();
}
5 通过SetupTabletPeer创建一个新的tablet peer,并将其保存至tabletpeer。
void SysCatalogTable::SetupTabletPeer(const scoped_refptr<tablet::RaftGroupMetadata>& metadata) {
InitLocalRaftPeerPB();
auto tablet_peer = std::make_shared<tablet::TabletPeer>(......);
std::atomic_store(&tablet_peer_, tablet_peer);
}
前面说过sys.catalog仅仅存在一个tablet中,所以tabletpeer存储的是这个tablet的一份副本。
6 最后在OpenTablet函数中去初始化并启动这个tabletpeer。
Status SysCatalogTable::OpenTablet(const scoped_refptr<tablet::RaftGroupMetadata>& metadata) {
......
RETURN_NOT_OK_PREPEND(
tablet_peer()->InitTabletPeer(.....);
RETURN_NOT_OK_PREPEND(tablet_peer()->Start(consensus_info),
......
}
用户表
用户表与系统表类似,也分为两种情形:
- 系统正常运行,收到用户发来的创建表请求。
- 系统重启,需要将已创建的表现从磁盘加载到内存中。 当表创建后,需要在一组副本上创建tablets。为了实现它,master需要选择副本,并与tablet结合,即将副本指定给某一个tablet。 我们发送创建tablet请求,选择了一组副本,且选择了leader,并不代表着这个tablet已经创建好了。接下来,tablet server如果接收到leader的tablet created心跳,我们就可认为这个tablet是running状态。如果超出了timeout时间(ASSIGNMENT-TIMEOUT-MSEC)没有接收到tablet created报告,master会替换掉这个tablet,步骤与上面的一样。 这个分配任务是被CatalogManagerBgTasks线程执行的。这个线程主要负责两个事:
- 创建表(分配新的tablet)
- 超时时再次分配(部分tablet超时了,需要替换)
创建表
首先来看看正常的创建表流程。 前面说过元数据表记录了所有的table和tablet信息,所以创建表请求,首先会在master上创建表信息以及分配表对应的tablet,然后将table和tablet信息存入内存及对应的元数据表中。 但此时,并不意味着tablet已经完全创建成功。tablet创建请求会下发到所有的tserver服务,每个tserver服务会为这个tablet生成tablet-peer,多个tablet_peer通过raft协议完成leader选举后,master收到leader的tablet created心跳,我们才认可这个tablet是running状态。如果超出了timeout时间(ASSIGNMENT-TIMEOUT-MSEC)没有接收到tablet created报告,master会替换掉这个tablet。
看看代码。 1 CreateTable函数中会对将要创建的表进行一系列验证,包括表结构是否准确、表名是否可以等,然后在内存中创建表信息:
Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req,
CreateTableResponsePB* resp,
rpc::RpcContext* rpc) {
......
RETURN_NOT_OK(CreateTableInMemory(
req, schema, partition_schema,
!tablets_exist && !tablegroup_tablets_exist /* create_tablets */, namespace_id,
namespace_name, partitions, &index_info, &tablets, resp, &table));
......
}
2 CreateTableInMemory通过CreateTableInfo函数在内存中创建TableInfo信息。然后通过CreateTabletsFromTable函数对每个Table创建对应的Tablet。
Status CatalogManager::CreateTableInMemory(......) {
*table = CreateTableInfo(req, schema, partition_schema, namespace_id, namespace_name, index_info);
.......
if (create_tablets) {
RETURN_NOT_OK(CreateTabletsFromTable(partitions, *table, tablets));
}
......
}
3 CreateTabletsFromTable根据分配策略生成tablet信息,然后存入tabletmap,等待下发。
Status CatalogManager::CreateTabletsFromTable(...) {
// Create the TabletInfo objects in state PREPARING.
for (const Partition& partition : partitions) {
PartitionPB partition_pb;
partition.ToPB(&partition_pb);
tablets->push_back(CreateTabletInfo(table.get(), partition_pb));
}
// Add the table/tablets to the in-memory map for the assignment.
table->AddTablets(*tablets);
auto tablet_map_checkout = tablet_map_.CheckOut();
for (TabletInfo* tablet : *tablets) {
InsertOrDie(tablet_map_checkout.get_ptr(), tablet->tablet_id(), tablet);
}
return Status::OK();
}
4 tabletmap中存入的tablet通过后台线程CatalogManagerBgTasks发送至tserver。
void CatalogManagerBgTasks::Run() {
......
// Get list of tablets not yet running or already replaced.
catalog_manager_->ExtractTabletsToProcess(&to_delete, &to_process);
......
if (!to_process.empty()) {
......
for (const auto& entries : to_process) {
......
Status s = catalog_manager_->ProcessPendingAssignments(entries.second);
......
}
}
......
}
5 ExtractTabletsToProcess从tabletmap中取出状态不是running的tablet进行处理。
void CatalogManager::ExtractTabletsToProcess(......) {
......
for (const TabletInfoMap::value_type& entry : *tablet_map_) {
......
// Running tablets.
if (tablet_lock->is_running()) {
// TODO: handle last update > not responding timeout?
continue;
}
// Tablets not yet assigned or with a report just received.
(*tablets_to_process)[tablet->table()->id()].push_back(tablet);
}
}
6 ProcessPendingAssignments函数通过SendCreateTabletRequests函数向tserver发送CreateTablet请求。
Status CatalogManager::ProcessPendingAssignments(const TabletInfos& tablets) {
......
SendCreateTabletRequests(deferred.needs_create_rpc);
return Status::OK();
}
7 Tserver收到CreateTablet请求,进行处理 。
void TabletServiceAdminImpl::CreateTablet(......) {
......
auto status = DoCreateTablet(req, resp);
......
}
Status TabletServiceAdminImpl::DoCreateTablet(......) {
......
status = ResultToStatus(server_->tablet_manager()->CreateNewTablet(......));
......
}
8 CreateNewTablet完成tablet的创建与启动。
Result<TabletPeerPtr> TSTabletManager::CreateNewTablet(
......
TabletPeerPtr new_peer = VERIFY_RESULT(CreateAndRegisterTabletPeer(meta, NEW_PEER));
......
RETURN_NOT_OK(
open_tablet_pool_->SubmitFunc(std::bind(&TSTabletManager::OpenTablet, this, meta, deleter)));
......
}
9 CreateAndRegisterTabletPeer创建tabletpeer,并将其插入到内存tablet_map。
// Create and register a new TabletPeer, given tablet metadata.
Result<TabletPeerPtr> TSTabletManager::CreateAndRegisterTabletPeer(
const RaftGroupMetadataPtr& meta, RegisterTabletPeerMode mode) {
TabletPeerPtr tablet_peer(new tablet::TabletPeer(......);
RETURN_NOT_OK(RegisterTablet(meta->raft_group_id(), tablet_peer, mode));
return tablet_peer;
}
Status TSTabletManager::RegisterTablet(......) {
......
if (!InsertIfNotPresent(&tablet_map_, tablet_id, tablet_peer)) {
......
}
10 OpenTablet完成tablet_peer的初始化以及启动。
void TSTabletManager::OpenTablet(......) {
......
auto s = tablet_peer->InitTabletPeer(......);
......
s = tablet_peer->Start(bootstrap_info);
......
}
加载表
用户已创建的表,当服务重启时,必须对这部分数据重新加载到内存。 服务重启,对TSTabletManager进行初始化。
Status TSTabletManager::Init() {
......
// Search for tablets in the metadata dir.
vector<string> tablet_ids;
RETURN_NOT_OK(fs_manager_->ListTabletIds(&tablet_ids));
......
for (const string& tablet_id : tablet_ids) {
RaftGroupMetadataPtr meta;
RETURN_NOT_OK_PREPEND(OpenTabletMeta(tablet_id, &meta),
"Failed to open tablet metadata for tablet: " + tablet_id);
.......
RegisterDataAndWalDir(
fs_manager_, meta->table_id(), meta->raft_group_id(), meta->data_root_dir(),
meta->wal_root_dir());
......
}
}
......
// Now submit the "Open" task for each.
for (const RaftGroupMetadataPtr& meta : metas) {
......
TabletPeerPtr tablet_peer = VERIFY_RESULT(CreateAndRegisterTabletPeer(meta, NEW_PEER));
RETURN_NOT_OK(open_tablet_pool_->SubmitFunc(
std::bind(&TSTabletManager::OpenTablet, this, meta, deleter)));
}
.......
}
初始化时,首先通过ListTabletIds函数获取集群中所有的tablet id。遍历id,通过OpenTabletMeta获取id对应的元数据信息,将元数据信息通过RegisterDataAndWalDir加载到内存对应的数据结构中。最后通过元数据信息创建tablet_peer,并提交OpenTablet任务 1 ListTabletIds 通过读取元数据目录获取所有tablet id.
Status FsManager::ListTabletIds(vector<string>* tablet_ids) {
string dir = GetRaftGroupMetadataDir();
vector<string> children;
RETURN_NOT_OK_PREPEND(ListDir(dir, &children),
Substitute("Couldn't list tablets in metadata directory $0", dir));
vector<string> tablets;
for (const string& child : children) {
if (!IsValidTabletId(child)) {
continue;
}
tablet_ids->push_back(child);
}
return Status::OK();
}
对应的目录为指定的数据目录(/home/data) 加上 yb-data/tserver/tablet-meta/,子文件就是对应的tablet_id
[root@localhost ~]# cd /home/data/yb-data/tserver/tablet-meta/
[root@localhost tablet-meta]# ls
07f2cfde093948eabaf3954942dcb009 51dec1d3faec4a06aa3f7095808945ba 7d58317f08704e2083141eeb99b08836 a9a0badefc9c4587b48e7dffbc5c22d3 c6d709a0b54240deac24e0070c2c612f ......
2 OpenTabletMeta读取对应的子文件,获取元数据信息。
Status TSTabletManager::OpenTabletMeta(const string& tablet_id,
RaftGroupMetadataPtr* metadata) {
LOG(INFO) << "Loading metadata for tablet " << tablet_id;
TRACE("Loading metadata...");
auto load_result = RaftGroupMetadata::Load(fs_manager_, tablet_id);
RETURN_NOT_OK_PREPEND(load_result,
Format("Failed to load tablet metadata for tablet id $0", tablet_id));
TRACE("Metadata loaded");
metadata->swap(*load_result);
return Status::OK();
}
3 RegisterDataAndWalDir将对应的元数据信息写入内存。
void TSTabletManager::RegisterDataAndWalDir(......) {
......
table_data_assignment_map_[table_id][data_root_iter] = tablet_id_set;
......
table_wal_assignment_map_[table_id][wal_root_iter] = tablet_id_set;
......
}
主要对tabledata_assignment_map和tablewal_assignment_map两个内存结构做了填充。
4 CreateAndRegisterTabletPeer创建tabletpeer,并将其插入到内存tablet_map。
// Create and register a new TabletPeer, given tablet metadata.
Result<TabletPeerPtr> TSTabletManager::CreateAndRegisterTabletPeer(
const RaftGroupMetadataPtr& meta, RegisterTabletPeerMode mode) {
TabletPeerPtr tablet_peer(new tablet::TabletPeer(......);
RETURN_NOT_OK(RegisterTablet(meta->raft_group_id(), tablet_peer, mode));
return tablet_peer;
}
Status TSTabletManager::RegisterTablet(......) {
......
if (!InsertIfNotPresent(&tablet_map_, tablet_id, tablet_peer)) {
......
}
5 最后一步,依旧是OpenTablet完成tablet_peer的初始化以及启动。
void TSTabletManager::OpenTablet(......) {
......
auto s = tablet_peer->InitTabletPeer(......);
......
s = tablet_peer->Start(bootstrap_info);
......
}
raft启动
tablet创建时,选择raft作为其一致性协议。当一个 tablet 启动时发生的第一件事是使用Raft协议选择一个 tablet-peer 作为tablet leader。这个tablet leader现在负责处理面向用户的写入请求。它将用户发出的写入转换到 DocDB 的Rocksdb,并使用 Raft 在 tablet-peer 之间复制以实现强一致性。撇开Tablet leader,剩下的Raft 组的tablet-peers 被称为tablet follower。
raft 在 tablet_peer的初始化时被创建,伴随着tablet_peer的启动而启动。
Status TabletPeer::InitTabletPeer(
......
consensus_ = RaftConsensus::Create(......);
......
}
Status TabletPeer::Start(const ConsensusBootstrapInfo& bootstrap_info) {
......
RETURN_NOT_OK(consensus_->Start(bootstrap_info));
......
}
后面会对raft详细介绍.