TabletServerMain
入口函数中,创建并启动了TabletServer
,让我们来看看TabletServer
具体做了些什么。
构造函数
构造函数中对几个内部成员变量进行了初始化
// yb/tserver/tablet_server.cc
TabletServer::TabletServer(const TabletServerOptions& opts)
: RpcAndWebServerBase(
"TabletServer", opts, "yb.tabletserver", server::CreateMemTrackerForServer()),
fail_heartbeats_for_tests_(false),
opts_(opts),
tablet_manager_(new TSTabletManager(fs_manager_.get(), this, metric_registry())),
path_handlers_(new TabletServerPathHandlers(this)),
maintenance_manager_(new MaintenanceManager(MaintenanceManager::DEFAULT_OPTIONS)),
master_config_index_(0),
tablet_server_service_(nullptr),
shared_object_(CHECK_RESULT(TServerSharedObject::Create())) {
SetConnectionContextFactory(rpc::CreateConnectionContextFactory<rpc::YBInboundConnectionContext>(
FLAGS_inbound_rpc_memory_limit, mem_tracker()));
LOG(INFO) << "yb::tserver::TabletServer created at " << this;
LOG(INFO) << "yb::tserver::TSTabletManager created at " << tablet_manager_.get();
}
头文件中关于几个成员变量的定义:
// yb/tserver/tablet_server.h
class TabletServer : public server::RpcAndWebServerBase, public TabletServerIf {
...
...
...
// If true, all heartbeats will be seen as failed.
Atomic32 fail_heartbeats_for_tests_;
// The options passed at construction time, and will be updated if master config changes.
TabletServerOptions opts_;
// Manager for tablets which are available on this server.
gscoped_ptr<TSTabletManager> tablet_manager_;
// Webserver path handlers
gscoped_ptr<TabletServerPathHandlers> path_handlers_;
// The maintenance manager for this tablet server
std::shared_ptr<MaintenanceManager> maintenance_manager_;
// Index at which master sent us the last config
int master_config_index_;
// An instance to tablet server service. This pointer is no longer valid after RpcAndWebServerBase
// is shut down.
TabletServiceImpl* tablet_server_service_;
// Shared memory owned by the tablet server.
TServerSharedObject shared_object_;
...
...
...
}
初始化
// yb/tserver/tablet_server.cc
Status TabletServer::Init() {
CHECK(!initted_.load(std::memory_order_acquire));
// Validate that the passed master address actually resolves.
// We don't validate that we can connect at this point -- it should
// be allowed to start the TS and the master in whichever order --
// our heartbeat thread will loop until successfully connecting.
RETURN_NOT_OK(ValidateMasterAddressResolution());
RETURN_NOT_OK(RpcAndWebServerBase::Init());
RETURN_NOT_OK(path_handlers_->Register(web_server_.get()));
log_prefix_ = Format("P $0: ", permanent_uuid());
heartbeater_ = CreateHeartbeater(opts_, this);
if (FLAGS_tserver_enable_metrics_snapshotter) {
metrics_snapshotter_.reset(new MetricsSnapshotter(opts_, this));
}
RETURN_NOT_OK_PREPEND(tablet_manager_->Init(),
"Could not init Tablet Manager");
initted_.store(true, std::memory_order_release);
auto bound_addresses = rpc_server()->GetBoundAddresses();
if (!bound_addresses.empty()) {
shared_object_->SetEndpoint(bound_addresses.front());
}
// 5433 is kDefaultPort in src/yb/yql/pgwrapper/pg_wrapper.h.
RETURN_NOT_OK(pgsql_proxy_bind_address_.ParseString(FLAGS_pgsql_proxy_bind_address, 5433));
shared_object_->SetPostgresAuthKey(RandomUniformInt<uint64_t>());
return Status::OK();
}
RpcAndWebServerBase::Init()
// yb/server/server_base.cc
Status RpcAndWebServerBase::Init() {
yb::enterprise::InitOpenSSL();
Status s = fs_manager_->Open();
if (s.IsNotFound() || (!s.ok() && fs_manager_->HasAnyLockFiles())) {
LOG(INFO) << "Could not load existing FS layout: " << s.ToString();
LOG(INFO) << "Creating new FS layout";
RETURN_NOT_OK_PREPEND(fs_manager_->CreateInitialFileSystemLayout(true),
"Could not create new FS layout");
s = fs_manager_->Open();
}
RETURN_NOT_OK_PREPEND(s, "Failed to load FS layout");
if (PREDICT_FALSE(FLAGS_TEST_simulate_port_conflict_error)) {
return STATUS(NetworkError, "Simulated port conflict error");
}
RETURN_NOT_OK(RpcServerBase::Init());
return Status::OK();
}
在这个函数中,尝试打开元数据存储目录以及初始化RpcServerBase
服务。
FsManager::Open()
// yb/fs/fs_manager.cc
Status FsManager::Open() {
RETURN_NOT_OK(Init());
if (HasAnyLockFiles()) {
return STATUS(Corruption, "Lock file is present, filesystem may be in inconsistent state");
}
for (const string& root : canonicalized_all_fs_roots_) {
gscoped_ptr<InstanceMetadataPB> pb(new InstanceMetadataPB);
RETURN_NOT_OK(pb_util::ReadPBContainerFromPath(env_, GetInstanceMetadataPath(root), pb.get()));
if (!metadata_) {
metadata_.reset(pb.release());
} else if (pb->uuid() != metadata_->uuid()) {
return STATUS(Corruption, Substitute(
"Mismatched UUIDs across filesystem roots: $0 vs. $1",
metadata_->uuid(), pb->uuid()));
}
}
LOG(INFO) << "Opened local filesystem: " << JoinStrings(canonicalized_all_fs_roots_, ",")
<< std::endl << metadata_->DebugString();
return Status::OK();
}
在FsManager::Open()
函数中,验证了元数据存储目录,即配置文件中--fs_data_dirs=/mnt/ssd
中的选项,我们所有数据的元数据信息将会存储在这个目录下。
RpcServerBase::Init()
// src/yb/server/server_base.cc
Status RpcServerBase::Init() {
CHECK(!initialized_);
glog_metrics_.reset(new ScopedGLogMetrics(metric_entity_));
tcmalloc::RegisterMetrics(metric_entity_);
RegisterSpinLockContentionMetrics(metric_entity_);
InitSpinLockContentionProfiling();
RETURN_NOT_OK(SetStackTraceSignal(SIGUSR2));
// Initialize the clock immediately. This checks that the clock is synchronized
// so we're less likely to get into a partially initialized state on disk during startup
// if we're having clock problems.
RETURN_NOT_OK_PREPEND(clock_->Init(), "Cannot initialize clock");
// Create the Messenger.
rpc::MessengerBuilder builder(name_);
builder.UseDefaultConnectionContextFactory(mem_tracker());
RETURN_NOT_OK(SetupMessengerBuilder(&builder));
messenger_ = VERIFY_RESULT(builder.Build());
proxy_cache_ = std::make_unique<rpc::ProxyCache>(messenger_.get());
RETURN_NOT_OK(rpc_server_->Init(messenger_.get()));
RETURN_NOT_OK(rpc_server_->Bind());
clock_->RegisterMetrics(metric_entity_);
RETURN_NOT_OK_PREPEND(StartMetricsLogging(), "Could not enable metrics logging");
initialized_ = true;
return Status::OK();
}
在RpcServerBase中完成了RpcServer
的初始化和端口绑定。
CreateHeartbeater
创建心跳
// yb/tserver/hearbeater_factory.cc
std::unique_ptr<Heartbeater> CreateHeartbeater(
const TabletServerOptions& options, TabletServer* server) {
std::vector<std::unique_ptr<HeartbeatDataProvider>> data_providers;
data_providers.push_back(
std::make_unique<TServerMetricsHeartbeatDataProvider>(server));
data_providers.push_back(
std::make_unique<TabletSplitHeartbeatDataProvider>(server));
return std::make_unique<Heartbeater>(options, server, std::move(data_providers));
}
tabletmanager->Init()
// yb/tserver/ts_tablet_manager.h
// Load all tablet metadata blocks from disk, and open their respective tablets.
// Upon return of this method all existing tablets are registered, but
// the bootstrap is performed asynchronously.
CHECKED_STATUS Init();
如注释,加载disk中所有的元数据快,打开对应的tablets
启动
// yb/tserver/tablet_server.cc
Status TabletServer::Start() {
CHECK(initted_.load(std::memory_order_acquire));
AutoInitServiceFlags();
RETURN_NOT_OK(RegisterServices());
RETURN_NOT_OK(RpcAndWebServerBase::Start());
// If enabled, creates a proxy to call this tablet server locally.
if (FLAGS_enable_direct_local_tablet_server_call) {
proxy_ = std::make_shared<TabletServerServiceProxy>(proxy_cache_.get(), HostPort());
}
RETURN_NOT_OK(tablet_manager_->Start());
RETURN_NOT_OK(heartbeater_->Start());
if (FLAGS_tserver_enable_metrics_snapshotter) {
RETURN_NOT_OK(metrics_snapshotter_->Start());
}
RETURN_NOT_OK(maintenance_manager_->Init());
google::FlushLogFiles(google::INFO); // Flush the startup messages.
return Status::OK();
}
启动tablet
服务
RegisterServices()
注册TabletServer
相关的服务实例
// yb/tserver/tablet_server.cc
Status TabletServer::RegisterServices() {
tablet_server_service_ = new TabletServiceImpl(this);
LOG(INFO) << "yb::tserver::TabletServiceImpl created at " << tablet_server_service_;
std::unique_ptr<ServiceIf> ts_service(tablet_server_service_);
RETURN_NOT_OK(RpcAndWebServerBase::RegisterService(FLAGS_tablet_server_svc_queue_length,
std::move(ts_service)));
std::unique_ptr<ServiceIf> admin_service(new TabletServiceAdminImpl(this));
LOG(INFO) << "yb::tserver::TabletServiceAdminImpl created at " << admin_service.get();
RETURN_NOT_OK(RpcAndWebServerBase::RegisterService(FLAGS_ts_admin_svc_queue_length,
std::move(admin_service)));
std::unique_ptr<ServiceIf> consensus_service(new ConsensusServiceImpl(metric_entity(),
tablet_manager_.get()));
LOG(INFO) << "yb::tserver::ConsensusServiceImpl created at " << consensus_service.get();
RETURN_NOT_OK(RpcAndWebServerBase::RegisterService(FLAGS_ts_consensus_svc_queue_length,
std::move(consensus_service),
rpc::ServicePriority::kHigh));
std::unique_ptr<ServiceIf> remote_bootstrap_service =
std::make_unique<RemoteBootstrapServiceImpl>(
fs_manager_.get(), tablet_manager_.get(), metric_entity());
LOG(INFO) << "yb::tserver::RemoteBootstrapServiceImpl created at " <<
remote_bootstrap_service.get();
RETURN_NOT_OK(RpcAndWebServerBase::RegisterService(FLAGS_ts_remote_bootstrap_svc_queue_length,
std::move(remote_bootstrap_service)));
return Status::OK();
}
RpcAndWebServerBase::Start()
启动rpc和webserver
// yb/server/server_base.cc
Status RpcAndWebServerBase::Start() {
GenerateInstanceID();
AddDefaultPathHandlers(web_server_.get());
AddRpczPathHandlers(messenger_.get(), web_server_.get());
RegisterMetricsJsonHandler(web_server_.get(), metric_registry_.get());
TracingPathHandlers::RegisterHandlers(web_server_.get());
web_server_->RegisterPathHandler("/utilz", "Utilities",
std::bind(&RpcAndWebServerBase::HandleDebugPage, this, _1, _2),
true, true, "fa fa-wrench");
web_server_->set_footer_html(FooterHtml());
RETURN_NOT_OK(web_server_->Start());
RETURN_NOT_OK(RpcServerBase::Start());
return Status::OK();
}
tabletmanager->Start()
启动了一个客户端线程
// yb/tserver/ts_tablet_manager.cc
Status TSTabletManager::Start() {
async_client_init_->Start();
if (FLAGS_cleanup_split_tablets_interval_sec > 0) {
tablets_cleaner_->Start(
&server_->messenger()->scheduler(), FLAGS_cleanup_split_tablets_interval_sec * 1s);
LOG(INFO) << "Split tablets cleanup monitor started...";
} else {
LOG(INFO)
<< "Split tablets cleanup is disabled by cleanup_split_tablets_interval_sec flag set to 0";
}
return Status::OK();
}
heartbeater_->Start()
启动心跳线程
// yb/tserver/heartbeater.cc
Status Heartbeater::Start() { return thread_->Start(); }
metricssnapshotter->Start()
快照启动
// yb/tserver/metrics_snashshotter.cc
Status MetricsSnapshotter::Start() {
return thread_->Start();
}
maintenancemanager->Init()
启动MaintenanceManager线程
// yb/tablet/maintenance_manager.h
// The MaintenanceManager manages the scheduling of background operations such
// as flushes or compactions. It runs these operations in the background, in a
// thread pool. It uses information provided in MaintenanceOpStats objects to
// decide which operations, if any, to run.
Status MaintenanceManager::Init() {
RETURN_NOT_OK(Thread::Create(
"maintenance", "maintenance_scheduler",
std::bind(&MaintenanceManager::RunSchedulerThread, this), &monitor_thread_));
return Status::OK();
}
整个TabletServer
从Init
到Start
总共就这些内容,当然其中牵扯到许多其他内容,后面会慢慢详细介绍。
画个图总结下: