大数据最难源码 hbase源码(二)

HBase RPC的详细介绍

1,HBase 的 RPC 相关的实现类:RpcServer(NettyRpcServer) + RpcClient(NettyRpcClient)
如果现在启动服务端(HMaster & HRegionServer):最终会有一个步骤,要启动 RPC 服务端
如果现在启动客户端(Connection --> Admin | HTable):要启动 RPC 客户端
1)主节点 HMaster 中,完成 RPC 请求处理的组件:MasterRpcServices
2)从节点 HRegionServer 中,完成 RPC 请求处理的组件:RsRpcService
2,如图
在这里插入图片描述
整体看 HBase 的启动流程:
集群的配置文件中,有四个配置文件:
masters ,backup-masters, hbase-site.xml , workers
执行 start-hbase.sh 启动脚本,内部通过远程命令启动 HMaster 和 HRegionServer,他们就是运行在硬件服务器之上的一个 JVM 进程,一般来说,
RegionServer 还会运行的是 HDFS 和 YARN 的从节点
HMaster 启动,启动 RpcServer 和 WebUI 服务
HMaster 启动,初始化 HBase 在 ZooKeeper 之上的 znode 布局(ZNodePaths)
先获取 ZK 客户端
创建必要的 Znode 节点
HMaster 启动,初始化 HBase 在 HDFS 之上的布局
hbase.root.dir = “”
存储数据的跟目录中,有大量的子目录:data,log, oldlogs, …
HMaster 启动,按照配置信息(不是按照选举算法),要么成为 Active Master,要么成为 Backup Master
HMaster 启动,初始化 Meta 表并等待 Meta 表上线
创建一个 MetaInitProcedure 的处理程序
HMaster 通过 ProceducerExecutor 来执行处理
第一,先创建表
第一创建在 HDFS 的相关目录
第二创建 meta 表
给这个 meta 表生成一个 region
HMaster 通过内部的 AssignManager 来完成 Region 的分配(指定这个 Region 由哪个 RegionServer 去管理)
会接着把 Meta region 的位置信息,写入到 ZK 中
HMaster 启动,启动自身的一系列服务(是在 成为 active master 之后做的)
HMaster 必要的基础服务都启动好了之后,会等待 RegionServer 的上线,然后达成一定条件之后,集群启动成功
HRegionServer 启动,启动 RpcServer 和 Webui
HRegionServer 启动,向 HMaster 注册
HRegionServer 启动,向 HMaster 通过心跳汇报负载
HRegionServer 启动,启动自身一系列的服务(是在 HRegionServer 向 HMaster 注册成功之后做的)

HBase 集群启动脚本

如图
启动集群的命令:start-hbase.sh
在这里插入图片描述
启动:HMaster
hbase-daemon.sh start master --> java org.apache.hadoop.hbase.master.HMaster

启动:HRegionServer
hbase-daemon.sh start regionserver --> java org.apache.hadoop.hbase.master.HRegionServer

HBase Master启动流程

1,HMaster 的启动流程分析:

HMaster.main(){
// 通过 HMasterCommandLine 来启动 HMaster
new HMasterCommandLine(HMaster.class).doMain(args){
HMasterCommandLine.run(){
// 启动 HMaster
HMasterCommandLine.startMaster(){
// 单机版本 HBase
if(LocalHBaseCluster.isLocal(conf)){
LocalHBaseCluster cluster = new LocalHBaseCluster(conf, ..., LocalHMaster.class, HRegionServer.class);
cluster.startup();
waitOnMasterThreads(cluster);
}else{
// 第一: 构建 HMaster 实例对象
HMaster master = HMaster.constructMaster(masterClass, conf){
// 构造 HMaster 实例
new HMaster(final Configuration conf){
// 调用 HRegionServer 带 conf 的构造方法
super(conf){
} 
// HMaster 通过这个组件来监听存储在 ZK 上的 meta 表的位置信息
this.metaRegionLocationCache = new MetaRegionLocationCache(this.zooKeeper){
threadFactory.newThread(() -> loadMetaLocationsFromZk(retryFactory.create(), ZNodeOpType.INIT)).start();
} 
// 获取 和 监听 backup master 的状态(上线,下线)
this.activeMasterManager = createActiveMasterManager(zooKeeper, serverName, this);
// 内部维护 ClusterId 和 FileSystem 实例对象
cachedClusterId = new CachedClusterId(this, conf);
}
} 
// 第二: HMaster 启动
master.start(){
// HMaster 是一个线程,所以执行 run()
HMaster.run(){
Threads.setDaemonThreadRunning(new Thread(() -> {
// 1):启动内嵌的 Jetty web 服务
int infoPort = putUpJettyServer();
// 2):启动 ActiveMasterManager
// (1)、执行 active 角色确认
// (2)、启动成为 active HMaster 之后要启动的基础服务
// 一般来说,backup master 只是一个热备,没有任何工作
// 配置 一台性能极好的 active ,相对差一点 backup
startActiveMasterManager(infoPort){
// 自己首先要成为 backup 的,所以先构造出来 backupZNode znode 路径
String backupZNode = ZNodePaths.joinZNode(....);
// 先自动成为 standby Master,注册写 对应的 znode 节点
if(!MasterAddressTracker.setMasterAddress(zooKeeper, backupZNode, serverName, infoPort)) {
LOG.warn("Failed create of " + backupZNode + " by " + serverName);
} 
// 如果自己不是 active 配置,则等待别的 active HMaster 启动好了再向下执行
if(conf.getBoolean(HConstants.MASTER_TYPE_BACKUP, HConstants.DEFAULT_MASTER_TYPE_BACKUP)) {
// 当前 backup master 等到有其他的 active HMaster 启动好了之后,才退出这个循环!
while(!activeMasterManager.hasActiveMaster()) {
LOG.debug("Waiting for master address and cluster state znode to be written.");
Threads.sleep(timeout);
}
} 
// 第一个重点:竞选成为 Active Master
if(activeMasterManager.blockUntilBecomingActiveMaster(timeout, status)) {
// 第二个重点:如果竞选成功,则启动 Active Master 应该运行的一系列服务
finishActiveMasterInitialization(status);
}
}
}), getName() + ":becomeActiveMaster");
}
}
}
}
}
}
}

说一下 HMaster 的启动流程:
通过 HMaster CommandLine 来启动
创建 HMaster 实例对象
调用父类 HRegionServer 构造
启动 Rpc Server
启动 ZooKeeper 客户端

创建 MetaRegionLocationCache 组件
创建 ActiveMasterManager 组件
构建一个 CacheClusterID 对象
FileSystem 实例对象
ClusterID 对象
启动 HMaster 线程
启动 ActiveMasterManager
先构建 backup Znode path
到 ZK 上去创建 HMaster 成为 backup HMaster 的 znode
分两种情况
如果当前 Server 是 Acitve 配置:则不执行 if
如果但钱 Srever 是 Backup 配置,则执行 if 进行等待,直到 集群中有 active HMaster 才退出阻塞
进入正常的 active 确认 / backup HMaster 的阻塞
如果是 Active 配置;通过 activeMasterManager.blockUntilBecomingActiveMaster(timeout, status) 来创建 active znode 节点,如果创
建成功,则退出这个方法。之后调用 finishActiveMasterInitialization(status); 完成active HMaster 的初始化
如果是 backup 配置,则阻塞在 activeMasterManager.blockUntilBecomingActiveMaster(timeout, status) 方法
当然,这个 backup Master 的内部有一个 MasterAddressTracker 的组件用来监听 Active HMaster 的状态。如果发现 Active
HMaster kill掉,则竞争上岗!(实际上的动作:去创建自己的 active znode , 只要创建成功就代表获取了成为 active 的资格 = 分布
式锁)
2,
HMaster 和 HRegionServer 都会启动的一些服务

HMaster.构造方法(){
super(conf){
// 创建 RPC 服务端
rpcServices = createRpcServices();
// Rpc 控制器工厂 和 重试调用 工厂实例
rpcControllerFactory = RpcControllerFactory.instantiate(this.conf);
 HMaster 选举成为 Active
核心要点总结:
1. 在配置文件中,会明确配置,谁是 active 谁是 backup,所以要启动的 HMaster 都会遵照这个配置启动
2. 不管是配置的 active 还是 backup HMaster 启动,都是首先创建在 ZooKeeper 上的一个 znode 节点代表自己上线
3. 如果是 backup HMaster ,则等待 active HMaster 上线
4. 如果是 active HMaster ,则尝试写一个 active znode 到 ZooKeeper 上,正常来说,都会成功的,则代表自己获得成为 active HMaster 的权限,正式切换状
态为 active,然后删除 backup znode 节点
5. 不管是 active 还是 backup HMaster 的内部都会启动一个 MasterAddressTracker 的组件来监听 HMaster 的状态
6. 如果 backup HMaster 监听到 ZooKeeper 上的 active HMaster znode 节点被删除了,则会尝试创建 active znode ,如果成功,则顶替成为新的 active
HMaster
总的来说:就是基于 ZooKeeper 分布式独占锁提供的 HA 方案
 finishActiveMasterInitialization(status)
该方法是在 HMaster 竞选成功成为 active HMaster 之后要执行的。目的就是启动一系列服务:
rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf);
// FileSystem dataFs walFs
initializeFileSystem();
// 初始化 ZK 链接
zooKeeper = new ZKWatcher(conf, ..., this, canCreateBaseZNode()){
// 获取到各种 znode 路径
this.znodePaths = new ZNodePaths(conf);
// 构建一个zk客户端实例,维持和 zk 的链接
this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, pendingWatcher, identifier);
// 创建 base znode 节点 = /hbase
createBaseZNodes();
}
//
if(!this.masterless) {
// 监听 /hbase/master 节点
masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
masterAddressTracker.start();
// 监听 /hbase/running 节点
clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this);
clusterStatusTracker.start();
} 
// Rpc Server 启动
this.rpcServices.start(zooKeeper);
//Chore = 家务活 = 杂务事务 = 系统服务
this.choreService = new ChoreService(getName(), true);
this.executorService = new ExecutorService(getName());
//
putUpWebUI();
}
}


3,
finishActiveMasterInitialization(status)
该方法是在 HMaster 竞选成功成为 active HMaster 之后要执行的。目的就是启动一系列服务

finishActiveMasterInitialization(status){
//
this.masterActiveTime = System.currentTimeMillis();
// 为了提高内存利用率,降低 Full GC 而采用的一种 本地内存池的 策略: MSLAB 的技术 : 先一口气申请 2M 的内存给你用
initializeMemStoreChunkCreator();
// dataFs walFs rootDir
this.fileSystemManager = new MasterFileSystem(conf);
this.walManager = new MasterWalManager(this);
// HBase 集群启动的时候会存储 clusterID,当 hmaster 成为 active 的时候,会将这个 clusterID 信息写入到 zk 中
ClusterId clusterId = fileSystemManager.getClusterId();
ZKClusterId.setClusterId(this.zooKeeper, fileSystemManager.getClusterId());
// 是 HMaster 用来维护 online RegionServer 的
this.serverManager = createServerManager(this){
setupClusterConnection(){
createClusterConnection(){
return ConnectionUtils.createShortCircuitConnection(conf, ...){
return new ShortCircuitingClusterConnection(conf, pool, user, serverName, admin, client){
this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory);
this.metaCache = new MetaCache(this.metrics);
this.registry = ConnectionRegistryFactory.getRegistry(conf);
retrieveClusterId();
this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics);
}
}
}
}
} // 日志切割管理器
if(!conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, false)) {
this.splitWALManager = new SplitWALManager(this);
} // 其实是在 master 的内部创建了一个本地表: master:store ,这个表的信息,存储在了 HDFS
masterRegion = MasterRegionFactory.create(this);
// ProcedureExecutor 这个是 HBase 的状态机!
createProcedureExecutor();
// AssignmentManager 专门负责 hbase 集群中关于 region 分配和负载均衡的
// 如果创建 region,如果分裂region,如果迁移region, 等,都是由 AssignmentManager
this.assignmentManager = createAssignmentManager(this);
this.assignmentManager.start();
//
this.assignmentManager.setupRIT(ritList);
// 追踪 RS 在线状态的 用来监控 /hbase/rs 节点的 子节点个数变化
this.regionServerTracker = new RegionServerTracker(zk, this, this.serverManager);
this.regionServerTracker.start(....);
//
this.tableStateManager = this.conf.getBoolean(key, true) ?
new MirroringTableStateManager(this) : new TableStateManager(this);
// 这个方法的内部,会创建很多的 Tracker, 用来分别追踪不同的 znode 节点
initializeZKBasedSystemTrackers();
// 状态
this.activeMaster = true;
//
Thread zombieDetector = new Thread(new MasterInitializationMonitor(this), ...);
zombieDetector.setDaemon(true);
zombieDetector.start();
// 创建 meta 表,生成 meta 表的一个 region 然后又 hmaster 中的 assignManager 完成 regoin 的分配,又某个特定的 RS 来执行管理
if("new Deploy"){
InitMetaProcedure temp = new InitMetaProcedure();
procedureExecutor.submitProcedure(temp);
} // 启动一系列的服务线程,总之很多,有很多,都是 choreService 家务活
startServiceThreads();
//
if(initMetaProc != null) {
initMetaProc.await();
} // 等待RegionServer 上线过来注册,直到满足特定的 RS 个数要求
waitForRegionServers(status);
// 让 meta 表上线,打开这个 meta 唯一的 region
if(!waitForMetaOnline()) {
return;
} 
this.assignmentManager.joinCluster();
//
this.tableStateManager.start();
//
this.assignmentManager.processOfflineRegions();
//
this.assignmentManager.wakeMetaLoadedEvent();
//
this.clusterStatusChore = new ClusterStatusChore(this, balancer);
getChoreService().scheduleChore(clusterStatusChore);
//
this.balancerChore = new BalancerChore(this);
getChoreService().scheduleChore(balancerChore);
//
if(regionNormalizerManager != null) {
getChoreService().scheduleChore(xxx.getRegionNormalizerChore());
}
this.catalogJanitorChore = new CatalogJanitor(this);
getChoreService().scheduleChore(catalogJanitorChore);
//
this.hbckChore = new HbckChore(this);
getChoreService().scheduleChore(hbckChore);
//
if(!waitForNamespaceOnline()) {
return;
} 
initClusterSchemaService();
//
status.markComplete("Initialization successful");
setInitialized(true);

HBase HRegionServer启动流程

HRegionServer.main(){
// TODO_MA 注释: 加载 hbase-default.xml 和 hbase-site.xml 配置文件
Configuration conf = HBaseConfiguration.create();
// 获取 HBase regionserver 启动类
Class<? extends HRegionServer> regionServerClass = conf.getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
new HRegionServerCommandLine(regionServerClass).doMain(args){
HRegionServerCommandLine.run(){
HRegionServerCommandLine.start(){
// 第一个大动作: 构建 HRegionServer
HRegionServer hrs = HRegionServer.constructRegionServer(regionServerClass, conf);
// 第二个大动作: 启动 HRegionServer
hrs.start(){
HRegionServer.run(){
// 启动第一件大事: 初始化一些基础的服务
preRegistrationInitialization(){
// 初始化 ZK 链接,获取 zk 客户端
initializeZooKeeper();
//
setupClusterConnection();
// 创建用来和 HMaster 联系的 NettyRpcClient
this.rpcClient = RpcClientFactory.createClient(......);
} // 启动第二件大事: 调用 reportForDuty 方法来执行 RS 想 HMaster 的注册
while(keepLooping()) {
// HRegionServer 上线汇报 = 注册
RegionServerStartupResponse w = reportForDuty(){
ServerName masterServerName = createRegionServerStatusStub(true);
RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder();
request.setUseThisHostnameInstead(useThisHostnameInstead);
request.setPort(port);
request.setServerStartCode(this.startcode);
request.setServerCurrentTime(now);
// 注册的 RPC 请求
result = rss.regionServerStartup(null, request.build());
}
if(w == null) {
this.sleeper.sleep(sleepTime);
}else{
// 启动第三件大事: 处理汇报/注册反馈!
handleReportForDutyResponse(w){
// 在 /hbase/rs 节点下,创建代表自己上线的 znode 节点
createMyEphemeralNode();
// 初始化文件系统相关
initializeFileSystem();
//
setupWALAndReplication();
// 启动相关服务
startServices();
//
startReplicationService();
} 
break;
}// 启动第四件大事: 维持心跳
while(!isStopped() && isHealthy()) {
if((now - lastMsg) >= msgInterval) {
tryRegionServerReport(lastMsg, now){
// 构建 RS 的 region 负载
ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime);
// 构建请求对象:包含 SeverName 和 ServerLoad
RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder();
request.setServer(ProtobufUtil.toServerName(this.serverName));
request.setLoad(sl);
// 发送心跳
rss.regionServerReport(null, request.build());
} 
lastMsg = System.currentTimeMillis();
}

HRegionServer和Hmaster的启动流程差不多一样
还有很多不足,一起努力吧
说再多不如做,加油

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>