TabletServerMain入口函数中,创建并启动了TabletServer,让我们来看看TabletServer具体做了些什么。

构造函数

构造函数中对几个内部成员变量进行了初始化
// yb/tserver/tablet_server.cc
TabletServer::TabletServer(const TabletServerOptions& opts)
    : RpcAndWebServerBase(
          "TabletServer", opts, "yb.tabletserver", server::CreateMemTrackerForServer()),
      fail_heartbeats_for_tests_(false),
      opts_(opts),
      tablet_manager_(new TSTabletManager(fs_manager_.get(), this, metric_registry())),
      path_handlers_(new TabletServerPathHandlers(this)),
      maintenance_manager_(new MaintenanceManager(MaintenanceManager::DEFAULT_OPTIONS)),
      master_config_index_(0),
      tablet_server_service_(nullptr),
      shared_object_(CHECK_RESULT(TServerSharedObject::Create())) {
  SetConnectionContextFactory(rpc::CreateConnectionContextFactory<rpc::YBInboundConnectionContext>(
      FLAGS_inbound_rpc_memory_limit, mem_tracker()));

  LOG(INFO) << "yb::tserver::TabletServer created at " << this;
  LOG(INFO) << "yb::tserver::TSTabletManager created at " << tablet_manager_.get();
}

头文件中关于几个成员变量的定义:

// yb/tserver/tablet_server.h
class TabletServer : public server::RpcAndWebServerBase, public TabletServerIf {
    ...
    ...
    ...
    // If true, all heartbeats will be seen as failed.
    Atomic32 fail_heartbeats_for_tests_;

    // The options passed at construction time, and will be updated if master config changes.
    TabletServerOptions opts_;

    // Manager for tablets which are available on this server.
    gscoped_ptr<TSTabletManager> tablet_manager_;

    // Webserver path handlers
    gscoped_ptr<TabletServerPathHandlers> path_handlers_;

    // The maintenance manager for this tablet server
    std::shared_ptr<MaintenanceManager> maintenance_manager_;

    // Index at which master sent us the last config
    int master_config_index_;

    // An instance to tablet server service. This pointer is no longer valid after RpcAndWebServerBase
    // is shut down.
    TabletServiceImpl* tablet_server_service_;

    // Shared memory owned by the tablet server.
    TServerSharedObject shared_object_;
    ...
    ...
    ...
}

初始化

// yb/tserver/tablet_server.cc
Status TabletServer::Init() {
  CHECK(!initted_.load(std::memory_order_acquire));

  // Validate that the passed master address actually resolves.
  // We don't validate that we can connect at this point -- it should
  // be allowed to start the TS and the master in whichever order --
  // our heartbeat thread will loop until successfully connecting.
  RETURN_NOT_OK(ValidateMasterAddressResolution());

  RETURN_NOT_OK(RpcAndWebServerBase::Init());
  RETURN_NOT_OK(path_handlers_->Register(web_server_.get()));

  log_prefix_ = Format("P $0: ", permanent_uuid());

  heartbeater_ = CreateHeartbeater(opts_, this);

  if (FLAGS_tserver_enable_metrics_snapshotter) {
    metrics_snapshotter_.reset(new MetricsSnapshotter(opts_, this));
  }

  RETURN_NOT_OK_PREPEND(tablet_manager_->Init(),
                        "Could not init Tablet Manager");

  initted_.store(true, std::memory_order_release);

  auto bound_addresses = rpc_server()->GetBoundAddresses();
  if (!bound_addresses.empty()) {
    shared_object_->SetEndpoint(bound_addresses.front());
  }

  // 5433 is kDefaultPort in src/yb/yql/pgwrapper/pg_wrapper.h.
  RETURN_NOT_OK(pgsql_proxy_bind_address_.ParseString(FLAGS_pgsql_proxy_bind_address, 5433));
  shared_object_->SetPostgresAuthKey(RandomUniformInt<uint64_t>());

  return Status::OK();
}

RpcAndWebServerBase::Init()

// yb/server/server_base.cc
Status RpcAndWebServerBase::Init() {
  yb::enterprise::InitOpenSSL();

  Status s = fs_manager_->Open();
  if (s.IsNotFound() || (!s.ok() && fs_manager_->HasAnyLockFiles())) {
    LOG(INFO) << "Could not load existing FS layout: " << s.ToString();
    LOG(INFO) << "Creating new FS layout";
    RETURN_NOT_OK_PREPEND(fs_manager_->CreateInitialFileSystemLayout(true),
                          "Could not create new FS layout");
    s = fs_manager_->Open();
  }
  RETURN_NOT_OK_PREPEND(s, "Failed to load FS layout");

  if (PREDICT_FALSE(FLAGS_TEST_simulate_port_conflict_error)) {
    return STATUS(NetworkError, "Simulated port conflict error");
  }

  RETURN_NOT_OK(RpcServerBase::Init());

  return Status::OK();
}

在这个函数中,尝试打开元数据存储目录以及初始化RpcServerBase服务。

FsManager::Open()

// yb/fs/fs_manager.cc
Status FsManager::Open() {
  RETURN_NOT_OK(Init());

  if (HasAnyLockFiles()) {
    return STATUS(Corruption, "Lock file is present, filesystem may be in inconsistent state");
  }

  for (const string& root : canonicalized_all_fs_roots_) {
    gscoped_ptr<InstanceMetadataPB> pb(new InstanceMetadataPB);
    RETURN_NOT_OK(pb_util::ReadPBContainerFromPath(env_, GetInstanceMetadataPath(root), pb.get()));
    if (!metadata_) {
      metadata_.reset(pb.release());
    } else if (pb->uuid() != metadata_->uuid()) {
      return STATUS(Corruption, Substitute(
          "Mismatched UUIDs across filesystem roots: $0 vs. $1",
          metadata_->uuid(), pb->uuid()));
    }
  }

  LOG(INFO) << "Opened local filesystem: " << JoinStrings(canonicalized_all_fs_roots_, ",")
            << std::endl << metadata_->DebugString();
  return Status::OK();
}

FsManager::Open()函数中,验证了元数据存储目录,即配置文件中--fs_data_dirs=/mnt/ssd中的选项,我们所有数据的元数据信息将会存储在这个目录下。

RpcServerBase::Init()

// src/yb/server/server_base.cc
Status RpcServerBase::Init() {
  CHECK(!initialized_);

  glog_metrics_.reset(new ScopedGLogMetrics(metric_entity_));
  tcmalloc::RegisterMetrics(metric_entity_);
  RegisterSpinLockContentionMetrics(metric_entity_);

  InitSpinLockContentionProfiling();

  RETURN_NOT_OK(SetStackTraceSignal(SIGUSR2));

  // Initialize the clock immediately. This checks that the clock is synchronized
  // so we're less likely to get into a partially initialized state on disk during startup
  // if we're having clock problems.
  RETURN_NOT_OK_PREPEND(clock_->Init(), "Cannot initialize clock");

  // Create the Messenger.
  rpc::MessengerBuilder builder(name_);
  builder.UseDefaultConnectionContextFactory(mem_tracker());
  RETURN_NOT_OK(SetupMessengerBuilder(&builder));
  messenger_ = VERIFY_RESULT(builder.Build());
  proxy_cache_ = std::make_unique<rpc::ProxyCache>(messenger_.get());

  RETURN_NOT_OK(rpc_server_->Init(messenger_.get()));
  RETURN_NOT_OK(rpc_server_->Bind());
  clock_->RegisterMetrics(metric_entity_);

  RETURN_NOT_OK_PREPEND(StartMetricsLogging(), "Could not enable metrics logging");

  initialized_ = true;
  return Status::OK();
}

在RpcServerBase中完成了RpcServer的初始化和端口绑定。

CreateHeartbeater

创建心跳

// yb/tserver/hearbeater_factory.cc
std::unique_ptr<Heartbeater> CreateHeartbeater(
    const TabletServerOptions& options, TabletServer* server) {
  std::vector<std::unique_ptr<HeartbeatDataProvider>> data_providers;
  data_providers.push_back(
      std::make_unique<TServerMetricsHeartbeatDataProvider>(server));
  data_providers.push_back(
      std::make_unique<TabletSplitHeartbeatDataProvider>(server));
  return std::make_unique<Heartbeater>(options, server, std::move(data_providers));
}

tabletmanager->Init()

// yb/tserver/ts_tablet_manager.h
  // Load all tablet metadata blocks from disk, and open their respective tablets.
  // Upon return of this method all existing tablets are registered, but
  // the bootstrap is performed asynchronously.
  CHECKED_STATUS Init();

如注释,加载disk中所有的元数据快,打开对应的tablets

启动

// yb/tserver/tablet_server.cc
Status TabletServer::Start() {
  CHECK(initted_.load(std::memory_order_acquire));

  AutoInitServiceFlags();

  RETURN_NOT_OK(RegisterServices());
  RETURN_NOT_OK(RpcAndWebServerBase::Start());

  // If enabled, creates a proxy to call this tablet server locally.
  if (FLAGS_enable_direct_local_tablet_server_call) {
    proxy_ = std::make_shared<TabletServerServiceProxy>(proxy_cache_.get(), HostPort());
  }

  RETURN_NOT_OK(tablet_manager_->Start());

  RETURN_NOT_OK(heartbeater_->Start());

  if (FLAGS_tserver_enable_metrics_snapshotter) {
    RETURN_NOT_OK(metrics_snapshotter_->Start());
  }

  RETURN_NOT_OK(maintenance_manager_->Init());

  google::FlushLogFiles(google::INFO); // Flush the startup messages.

  return Status::OK();
}

启动tablet服务

RegisterServices()

注册TabletServer相关的服务实例

// yb/tserver/tablet_server.cc
Status TabletServer::RegisterServices() {
  tablet_server_service_ = new TabletServiceImpl(this);
  LOG(INFO) << "yb::tserver::TabletServiceImpl created at " << tablet_server_service_;
  std::unique_ptr<ServiceIf> ts_service(tablet_server_service_);
  RETURN_NOT_OK(RpcAndWebServerBase::RegisterService(FLAGS_tablet_server_svc_queue_length,
                                                     std::move(ts_service)));

  std::unique_ptr<ServiceIf> admin_service(new TabletServiceAdminImpl(this));
  LOG(INFO) << "yb::tserver::TabletServiceAdminImpl created at " << admin_service.get();
  RETURN_NOT_OK(RpcAndWebServerBase::RegisterService(FLAGS_ts_admin_svc_queue_length,
                                                     std::move(admin_service)));

  std::unique_ptr<ServiceIf> consensus_service(new ConsensusServiceImpl(metric_entity(),
                                                                        tablet_manager_.get()));
  LOG(INFO) << "yb::tserver::ConsensusServiceImpl created at " << consensus_service.get();
  RETURN_NOT_OK(RpcAndWebServerBase::RegisterService(FLAGS_ts_consensus_svc_queue_length,
                                                     std::move(consensus_service),
                                                     rpc::ServicePriority::kHigh));

  std::unique_ptr<ServiceIf> remote_bootstrap_service =
      std::make_unique<RemoteBootstrapServiceImpl>(
          fs_manager_.get(), tablet_manager_.get(), metric_entity());
  LOG(INFO) << "yb::tserver::RemoteBootstrapServiceImpl created at " <<
    remote_bootstrap_service.get();
  RETURN_NOT_OK(RpcAndWebServerBase::RegisterService(FLAGS_ts_remote_bootstrap_svc_queue_length,
                                                     std::move(remote_bootstrap_service)));
  return Status::OK();
}

RpcAndWebServerBase::Start()

启动rpc和webserver

// yb/server/server_base.cc
Status RpcAndWebServerBase::Start() {
  GenerateInstanceID();

  AddDefaultPathHandlers(web_server_.get());
  AddRpczPathHandlers(messenger_.get(), web_server_.get());
  RegisterMetricsJsonHandler(web_server_.get(), metric_registry_.get());
  TracingPathHandlers::RegisterHandlers(web_server_.get());
  web_server_->RegisterPathHandler("/utilz", "Utilities",
                                   std::bind(&RpcAndWebServerBase::HandleDebugPage, this, _1, _2),
                                   true, true, "fa fa-wrench");
  web_server_->set_footer_html(FooterHtml());
  RETURN_NOT_OK(web_server_->Start());

  RETURN_NOT_OK(RpcServerBase::Start());

  return Status::OK();
}

tabletmanager->Start()

启动了一个客户端线程

// yb/tserver/ts_tablet_manager.cc
Status TSTabletManager::Start() {
  async_client_init_->Start();
  if (FLAGS_cleanup_split_tablets_interval_sec > 0) {
    tablets_cleaner_->Start(
        &server_->messenger()->scheduler(), FLAGS_cleanup_split_tablets_interval_sec * 1s);
    LOG(INFO) << "Split tablets cleanup monitor started...";
  } else {
    LOG(INFO)
        << "Split tablets cleanup is disabled by cleanup_split_tablets_interval_sec flag set to 0";
  }

  return Status::OK();
}

heartbeater_->Start()

启动心跳线程

// yb/tserver/heartbeater.cc
Status Heartbeater::Start() { return thread_->Start(); }

metricssnapshotter->Start()

快照启动

// yb/tserver/metrics_snashshotter.cc
Status MetricsSnapshotter::Start() {
  return thread_->Start();
}

maintenancemanager->Init()

启动MaintenanceManager线程

// yb/tablet/maintenance_manager.h

// The MaintenanceManager manages the scheduling of background operations such
// as flushes or compactions.  It runs these operations in the background, in a
// thread pool.  It uses information provided in MaintenanceOpStats objects to
// decide which operations, if any, to run.

Status MaintenanceManager::Init() {
  RETURN_NOT_OK(Thread::Create(
      "maintenance", "maintenance_scheduler",
      std::bind(&MaintenanceManager::RunSchedulerThread, this), &monitor_thread_));
  return Status::OK();
}

整个TabletServerInitStart总共就这些内容,当然其中牵扯到许多其他内容,后面会慢慢详细介绍。

画个图总结下: tablet

Copyright © itrunner.cn 2020 all right reserved,powered by Gitbook该文章修订时间: 2024-08-01 14:07:50

results matching ""

    No results matching ""