raft在tablet_peer初始化时被创建,伴随着tablet_peer的启动而启动。

tablet_peer

DocDB 中的数据复制是在 tablet 级别实现的,使用tablet-peers。每个表都被分片成一组tablets。

table

而每个tablet由一组tablet-peers组成,每个tablet-peers都存储属于该tablet的数据的一个副本。一个tablet有多少个tablet-peer和replication factor一样多,它们就形成了一个Raft group。tablet-peer 托管在不同的节点上,以允许节点故障时的数据冗余。请注意,tablet-peer 之间的数据复制是 高度一致的。

下图说明了属于一个tablet(tablet1)的三个tablet-peers。tablet-peers 托管在不同的 YB-TServer 上,并形成一个 Raft 组,用于领导者选举、故障检测和预写日志的复制。

table

而在Yugabyte中存在着两种类型的tablet:

  • 一种是元数据表sys.catalog的tablet;
  • 另一种是用户定义的表的tablet。

元数据表

Yugabyte中的Catalog Manager 追踪集群中用户定义的表和以及表对应的tablet。 所有的table和tablet信息以TableInfo / TabletInfo 对象的形式通过写入时复制(copy-on-write)的方式存储在内存及sys.catalog对应的磁盘中。sys.catalog这个系统表在Master启动时加载入内存。 为了保证元数据的强一致性,sys.catalog仅仅存在一个tablet中(catalog也是tablet结构,为了便于采用Raft协议进行复制)。

static const char* const kSysCatalogTabletId = "00000000000000000000000000000000";
static const char* const kSysCatalogTableId = "sys.catalog.uuid";
static const char* const kSysCatalogTableName = "sys.catalog";

元数据表对应的属性如上。 Catalog Manager 初始化时会去判断系统是第一次运行还是非第一次运行。

  1. 当系统第一次运行时,会去创建这个表,并写入磁盘。
  2. 当系统非第一次运行时,会根据磁盘上对应的目录找到系统表对应的文件,通过文件中的信息构建这张表。 对应的磁盘目录为数据目录(/home/data) + yb-data/master/tablet-meta
    [root@localhost tablet-meta]# pwd
    /home/data/yb-data/master/tablet-meta
    [root@localhost tablet-meta]# ls
    00000000000000000000000000000000
    [root@localhost tablet-meta]#
    

看看代码。 1 InitSysCatalogAsync函数首先去master配置中的磁盘尝试加载元数据。如果加载失败,那么应该是还未初始化过元数据,也就是服务第一次启动时,此时通过CreateNew去创建新的SysCatalogTable。

Status CatalogManager::InitSysCatalogAsync() {
  // Optimistically try to load data from disk.
  Status s = sys_catalog_->Load(master_->fs_manager());
    ......
    RETURN_NOT_OK_PREPEND(sys_catalog_->CreateNew(master_->fs_manager()),
        Substitute("Encountered errors during system catalog initialization:"
                   "\n\tError on Load: $0\n\tError on CreateNew: ", s.ToString()));
    ......
  }

  return s;
}

2 Load函数去加载SysCatalogTable。

Status SysCatalogTable::Load(FsManager* fs_manager) {
  ......
  // Load Metadata Information from disk
  auto metadata = VERIFY_RESULT(tablet::RaftGroupMetadata::Load(fs_manager, kSysCatalogTabletId));
  ......
  RETURN_NOT_OK(SetupTablet(metadata));
  return Status::OK();
}

3 若Load失败,则通过CreateNew创建这个表。

Status SysCatalogTable::CreateNew(FsManager *fs_manager) {
  ......
  auto metadata = VERIFY_RESULT(tablet::RaftGroupMetadata::CreateNew(......);
  ......
  return SetupTablet(metadata);
}

4 可以看到无论是CreateNew还是Load,最终都是会调用SetupTablet(metadata)通过元数据来来创建tablet,并使用OpenTablet完成其初始化并启动。

Status SysCatalogTable::SetupTablet(const scoped_refptr<tablet::RaftGroupMetadata>& metadata) {
  SetupTabletPeer(metadata);

  RETURN_NOT_OK(OpenTablet(metadata));

  return Status::OK();
}

5 通过SetupTabletPeer创建一个新的tablet peer,并将其保存至tabletpeer

void SysCatalogTable::SetupTabletPeer(const scoped_refptr<tablet::RaftGroupMetadata>& metadata) {
  InitLocalRaftPeerPB();
  auto tablet_peer = std::make_shared<tablet::TabletPeer>(......);

  std::atomic_store(&tablet_peer_, tablet_peer);
}

前面说过sys.catalog仅仅存在一个tablet中,所以tabletpeer存储的是这个tablet的一份副本。

6 最后在OpenTablet函数中去初始化并启动这个tabletpeer

Status SysCatalogTable::OpenTablet(const scoped_refptr<tablet::RaftGroupMetadata>& metadata) {
  ......
  RETURN_NOT_OK_PREPEND(
    tablet_peer()->InitTabletPeer(.....);
  RETURN_NOT_OK_PREPEND(tablet_peer()->Start(consensus_info),
  ......
}

用户表

用户表与系统表类似,也分为两种情形:

  1. 系统正常运行,收到用户发来的创建表请求。
  2. 系统重启,需要将已创建的表现从磁盘加载到内存中。 当表创建后,需要在一组副本上创建tablets。为了实现它,master需要选择副本,并与tablet结合,即将副本指定给某一个tablet。 我们发送创建tablet请求,选择了一组副本,且选择了leader,并不代表着这个tablet已经创建好了。接下来,tablet server如果接收到leader的tablet created心跳,我们就可认为这个tablet是running状态。如果超出了timeout时间(ASSIGNMENT-TIMEOUT-MSEC)没有接收到tablet created报告,master会替换掉这个tablet,步骤与上面的一样。 这个分配任务是被CatalogManagerBgTasks线程执行的。这个线程主要负责两个事:
  3. 创建表(分配新的tablet)
  4. 超时时再次分配(部分tablet超时了,需要替换)

创建表

首先来看看正常的创建表流程。 前面说过元数据表记录了所有的table和tablet信息,所以创建表请求,首先会在master上创建表信息以及分配表对应的tablet,然后将table和tablet信息存入内存及对应的元数据表中。 但此时,并不意味着tablet已经完全创建成功。tablet创建请求会下发到所有的tserver服务,每个tserver服务会为这个tablet生成tablet-peer,多个tablet_peer通过raft协议完成leader选举后,master收到leader的tablet created心跳,我们才认可这个tablet是running状态。如果超出了timeout时间(ASSIGNMENT-TIMEOUT-MSEC)没有接收到tablet created报告,master会替换掉这个tablet。

看看代码。 1 CreateTable函数中会对将要创建的表进行一系列验证,包括表结构是否准确、表名是否可以等,然后在内存中创建表信息:

Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req,
                                   CreateTableResponsePB* resp,
                                   rpc::RpcContext* rpc) {
  ......
      RETURN_NOT_OK(CreateTableInMemory(
        req, schema, partition_schema,
        !tablets_exist && !tablegroup_tablets_exist /* create_tablets */, namespace_id,
        namespace_name, partitions, &index_info, &tablets, resp, &table));
  ......
}

2 CreateTableInMemory通过CreateTableInfo函数在内存中创建TableInfo信息。然后通过CreateTabletsFromTable函数对每个Table创建对应的Tablet。

Status CatalogManager::CreateTableInMemory(......) {
  *table = CreateTableInfo(req, schema, partition_schema, namespace_id, namespace_name, index_info);
  .......
  if (create_tablets) {
    RETURN_NOT_OK(CreateTabletsFromTable(partitions, *table, tablets));
  }
  ......
}

3 CreateTabletsFromTable根据分配策略生成tablet信息,然后存入tabletmap,等待下发。

Status CatalogManager::CreateTabletsFromTable(...) {
  // Create the TabletInfo objects in state PREPARING.
  for (const Partition& partition : partitions) {
    PartitionPB partition_pb;
    partition.ToPB(&partition_pb);
    tablets->push_back(CreateTabletInfo(table.get(), partition_pb));
  }

  // Add the table/tablets to the in-memory map for the assignment.
  table->AddTablets(*tablets);
  auto tablet_map_checkout = tablet_map_.CheckOut();
  for (TabletInfo* tablet : *tablets) {
    InsertOrDie(tablet_map_checkout.get_ptr(), tablet->tablet_id(), tablet);
  }

  return Status::OK();
}

4 tabletmap中存入的tablet通过后台线程CatalogManagerBgTasks发送至tserver。

void CatalogManagerBgTasks::Run() {
  ......
      // Get list of tablets not yet running or already replaced.
      catalog_manager_->ExtractTabletsToProcess(&to_delete, &to_process);
  ......
  if (!to_process.empty()) {
    ......
    for (const auto& entries : to_process) {
      ......
      Status s = catalog_manager_->ProcessPendingAssignments(entries.second);
      ......
    }
  }
  ......
}

5 ExtractTabletsToProcess从tabletmap中取出状态不是running的tablet进行处理。

void CatalogManager::ExtractTabletsToProcess(......) {
  ......
  for (const TabletInfoMap::value_type& entry : *tablet_map_) {
    ......
    // Running tablets.
    if (tablet_lock->is_running()) {
      // TODO: handle last update > not responding timeout?
      continue;
    }

    // Tablets not yet assigned or with a report just received.
    (*tablets_to_process)[tablet->table()->id()].push_back(tablet);
  }
}

6 ProcessPendingAssignments函数通过SendCreateTabletRequests函数向tserver发送CreateTablet请求。

Status CatalogManager::ProcessPendingAssignments(const TabletInfos& tablets) {
  ......
  SendCreateTabletRequests(deferred.needs_create_rpc);
  return Status::OK();
}

7 Tserver收到CreateTablet请求,进行处理 。

void TabletServiceAdminImpl::CreateTablet(......) {
  ......
  auto status = DoCreateTablet(req, resp);
  ......
}

Status TabletServiceAdminImpl::DoCreateTablet(......) {
  ......
  status = ResultToStatus(server_->tablet_manager()->CreateNewTablet(......));
  ......
}

8 CreateNewTablet完成tablet的创建与启动。

Result<TabletPeerPtr> TSTabletManager::CreateNewTablet(
  ......
  TabletPeerPtr new_peer = VERIFY_RESULT(CreateAndRegisterTabletPeer(meta, NEW_PEER));
  ......
  RETURN_NOT_OK(
      open_tablet_pool_->SubmitFunc(std::bind(&TSTabletManager::OpenTablet, this, meta, deleter)));
  ......
}

9 CreateAndRegisterTabletPeer创建tabletpeer,并将其插入到内存tablet_map

// Create and register a new TabletPeer, given tablet metadata.
Result<TabletPeerPtr> TSTabletManager::CreateAndRegisterTabletPeer(
    const RaftGroupMetadataPtr& meta, RegisterTabletPeerMode mode) {
  TabletPeerPtr tablet_peer(new tablet::TabletPeer(......);
  RETURN_NOT_OK(RegisterTablet(meta->raft_group_id(), tablet_peer, mode));
  return tablet_peer;
}

Status TSTabletManager::RegisterTablet(......) {
  ......
  if (!InsertIfNotPresent(&tablet_map_, tablet_id, tablet_peer)) {
  ......
}

10 OpenTablet完成tablet_peer的初始化以及启动。

void TSTabletManager::OpenTablet(......) {
  ......
    auto s = tablet_peer->InitTabletPeer(......);
  ......
  s = tablet_peer->Start(bootstrap_info);
  ......
}

加载表

用户已创建的表,当服务重启时,必须对这部分数据重新加载到内存。 服务重启,对TSTabletManager进行初始化。

Status TSTabletManager::Init() {
  ......
  // Search for tablets in the metadata dir.
  vector<string> tablet_ids;
  RETURN_NOT_OK(fs_manager_->ListTabletIds(&tablet_ids));
  ......
  for (const string& tablet_id : tablet_ids) {
    RaftGroupMetadataPtr meta;
    RETURN_NOT_OK_PREPEND(OpenTabletMeta(tablet_id, &meta),
                          "Failed to open tablet metadata for tablet: " + tablet_id);
    .......
    RegisterDataAndWalDir(
        fs_manager_, meta->table_id(), meta->raft_group_id(), meta->data_root_dir(),
        meta->wal_root_dir());
    ......
    }
  }
  ......
  // Now submit the "Open" task for each.
  for (const RaftGroupMetadataPtr& meta : metas) {
    ......
    TabletPeerPtr tablet_peer = VERIFY_RESULT(CreateAndRegisterTabletPeer(meta, NEW_PEER));
    RETURN_NOT_OK(open_tablet_pool_->SubmitFunc(
        std::bind(&TSTabletManager::OpenTablet, this, meta, deleter)));
  }
  .......
}

初始化时,首先通过ListTabletIds函数获取集群中所有的tablet id。遍历id,通过OpenTabletMeta获取id对应的元数据信息,将元数据信息通过RegisterDataAndWalDir加载到内存对应的数据结构中。最后通过元数据信息创建tablet_peer,并提交OpenTablet任务 1 ListTabletIds 通过读取元数据目录获取所有tablet id.

Status FsManager::ListTabletIds(vector<string>* tablet_ids) {
  string dir = GetRaftGroupMetadataDir();
  vector<string> children;
  RETURN_NOT_OK_PREPEND(ListDir(dir, &children),
                        Substitute("Couldn't list tablets in metadata directory $0", dir));

  vector<string> tablets;
  for (const string& child : children) {
    if (!IsValidTabletId(child)) {
      continue;
    }
    tablet_ids->push_back(child);
  }
  return Status::OK();
}

对应的目录为指定的数据目录(/home/data) 加上 yb-data/tserver/tablet-meta/,子文件就是对应的tablet_id

[root@localhost ~]# cd /home/data/yb-data/tserver/tablet-meta/
[root@localhost tablet-meta]# ls
07f2cfde093948eabaf3954942dcb009  51dec1d3faec4a06aa3f7095808945ba  7d58317f08704e2083141eeb99b08836  a9a0badefc9c4587b48e7dffbc5c22d3  c6d709a0b54240deac24e0070c2c612f ......

2 OpenTabletMeta读取对应的子文件,获取元数据信息。

Status TSTabletManager::OpenTabletMeta(const string& tablet_id,
                                       RaftGroupMetadataPtr* metadata) {
  LOG(INFO) << "Loading metadata for tablet " << tablet_id;
  TRACE("Loading metadata...");
  auto load_result = RaftGroupMetadata::Load(fs_manager_, tablet_id);
  RETURN_NOT_OK_PREPEND(load_result,
                        Format("Failed to load tablet metadata for tablet id $0", tablet_id));
  TRACE("Metadata loaded");
  metadata->swap(*load_result);
  return Status::OK();
}

3 RegisterDataAndWalDir将对应的元数据信息写入内存。

void TSTabletManager::RegisterDataAndWalDir(......) {
  ......
  table_data_assignment_map_[table_id][data_root_iter] = tablet_id_set;
  ......
  table_wal_assignment_map_[table_id][wal_root_iter] = tablet_id_set;
  ......
}

主要对tabledata_assignment_map和tablewal_assignment_map两个内存结构做了填充。

4 CreateAndRegisterTabletPeer创建tabletpeer,并将其插入到内存tablet_map

// Create and register a new TabletPeer, given tablet metadata.
Result<TabletPeerPtr> TSTabletManager::CreateAndRegisterTabletPeer(
    const RaftGroupMetadataPtr& meta, RegisterTabletPeerMode mode) {
  TabletPeerPtr tablet_peer(new tablet::TabletPeer(......);
  RETURN_NOT_OK(RegisterTablet(meta->raft_group_id(), tablet_peer, mode));
  return tablet_peer;
}

Status TSTabletManager::RegisterTablet(......) {
  ......
  if (!InsertIfNotPresent(&tablet_map_, tablet_id, tablet_peer)) {
  ......
}

5 最后一步,依旧是OpenTablet完成tablet_peer的初始化以及启动。

void TSTabletManager::OpenTablet(......) {
  ......
    auto s = tablet_peer->InitTabletPeer(......);
  ......
  s = tablet_peer->Start(bootstrap_info);
  ......
}

raft启动

tablet创建时,选择raft作为其一致性协议。当一个 tablet 启动时发生的第一件事是使用Raft协议选择一个 tablet-peer 作为tablet leader。这个tablet leader现在负责处理面向用户的写入请求。它将用户发出的写入转换到 DocDB 的Rocksdb,并使用 Raft 在 tablet-peer 之间复制以实现强一致性。撇开Tablet leader,剩下的Raft 组的tablet-peers 被称为tablet follower。

raft 在 tablet_peer的初始化时被创建,伴随着tablet_peer的启动而启动。

Status TabletPeer::InitTabletPeer(
  ......
  consensus_ = RaftConsensus::Create(......);
  ......
}
Status TabletPeer::Start(const ConsensusBootstrapInfo& bootstrap_info) {
  ......
  RETURN_NOT_OK(consensus_->Start(bootstrap_info));
  ......
}

后面会对raft详细介绍.

Copyright © itrunner.cn 2020 all right reserved,powered by Gitbook该文章修订时间: 2022-08-28 07:44:16

results matching ""

    No results matching ""