大数据培训新三板挂牌机构 股票代码:837906 | EN CN
异常解决方案—NameNode 宕机读写测试
异常解决方案—NameNode 宕机切换实验
异常解决方案—Data Node 配置
异常解决方案—Backup Node配置
异常解决方案—NameNode配置
异常解决方案—6.5.1异常情况分析
安装及配置
5NameNode安装及配置以及6BackupNode安装及配置
6.4.4虚拟机集群架设
6.4.3安装JDK
3.配置操作系统
2.创建虚拟机与安装操作系统
6.4构建实验环境
实验方案说明
故障切换机制
日志池(journal spool)机制
元数据操作情景分——BackupNode更新磁盘上的日志文件
元数据操作情景分——NameNode通过日志输出流......
元数据操作情景分——NameNode将日志写入日志文件
元数据操作情景分——NameNode更新内存镜像
元数据操作情景分——客户端执行命令流程
元数据操作情景分
Hadoop的Backup Node方案——运行机制分析(5)
Hadoop的Backup Node方案——运行机制分析(4)
Hadoop的Backup Node方案——运行机制分析(3)
Hadoop的Backup Node方案——运行机制分析(2)
Hadoop的Backup Node方案——运行机制分析(1)
Hadoop的Backup Node方案——系统架构
Hadoop的Backup Node方案—Backup Node 概述
元数据可靠性机制以及使用说明
Checkpoint 过程情景分析
元数据更新及日志写入情景分析
NameNode启动加载元数据情景分析
Hadoop的元数据备份机制的进行分析
元数据应用场景分析
Format情景分析
磁盘元数据文件
HDFS之代码分析——元数据结构
HDFS之内存元数据结构
什么是HDFS的元数据
Hadoop中DRDB方案和AvatarNode方案
Hadoop中常用各方案的对比
Hadoop的BackupNode方案
Hadoop的CheckpointNode方案
Hadoop的SecondaryNameNode方案
Hadoop的元数据备份方案
影响HDFS可用性的几个因素
什么是高可用性? 详细解析
HDFS系统架构简介
如何安装和配置Hadoop集群
如何在Windows下安装Hadoop
在MacOSX上安装与配置Hadoop
Linux下安装Hadoop的步骤
Hadoop的集群安全策略介绍
Hive的数据管理介绍
HBase的数据管理介绍
HDFS的数据管理介绍
Hadoop计算模型之 MapReduce 简介
Hadoop于分布式开发
Hadoop体系结构介绍
Hadoop的项目结构详解
一文读懂Hadoop

Hadoop的Backup Node方案——运行机制分析(5)

于2018-01-18由小牛君创建

分享到:


5. Checkpoint 后台线程

注册成功后,BackupNode会启动一个后台线程,专门用于做CheckpointCheckpointNode和第2阶段的BackupNode都是通过该线程实现Checkpoint的,只是各自执行的步骤稍有差异。该线程是一个无限循环,每5分钟检查一次,只要当前时间距离上一次Checkpoint的时间超过5分钟,或者Edits的大小超过设定值,就会触发Checkpoint任务。

每次Checkpoint任务执行以下几个步骤。

1)在NameNode上进行Checkpoint的验证工作,如判断BackupNodeFSImage是否过期等,如果符合Checkpoint的条件,则在NameNode上进行相关的准备工作,如在日志保存目录下,创建新的Edits文件,将日志输出流重新定位,输出到新的日志文件等,最后将返回一个类(用于标识本次Checkpoint)。

2BackupNode根据验证结果进行相应的动作,如果允许Checkpoint,对于CheckpointNode则总是从NameNode下载新的FSImage文件和Edits文件,对于第2阶段的BackupNode,则是根据验证返回的结果判断本地的元数据是否过期,如果过期,也将从NameNode下载新的FSImage文件和Edits文件。

3)在内存中合并产生新的元数据,对于CheckpointNode来说,需要先从本地磁盘读取FSImage文件和Edits文件,再进行合并,而对于第2阶段的BackupNode,则直接在内存进行合并,因而效率更高。

4)将合并后的元数据输出到磁盘。

5)如果需要上传,则将合并后的元数据上传到NameNode,替换原来的元数据文件。具体实现见下面的代码分析。

BackupNode runCheckpointDaemon 方法,启动后台线程。

BackupNode.java

private void runCheckpointDaemon(Configuration conf) throws IOException {

checkpointManager = new Checkpointer(conf, this);

checkpointManager.start();

}

Checkpointer的主线程。

Checkpointer.java

public void run() {

// 1. 设置检查时间 periodMSec 5 分钟

……

// 2. 计算最后一次 check point 时间

……

while(shouldRun) {

try {

// 如果当前时间距离最后一次 check point 时间超过 periodMSec

// 或者 Edits 大小超过 check point 设定值,则进行 check point

if(shouldCheckpoint) {

doCheckpoint();

lastCheckpointTime = now;

}

} catch(IOException e) {

……

} catch(Throwable e) {

……

}

try {

// check point 结束后,休眠 periodMSec

Thread.sleep(periodMSec);

} catch(InterruptedException ie) {}

}

}

Checkpointer doCheckpoint 方法。

Checkpointer.java

void doCheckpoint() throws IOException {

long startTime = FSNamesystem.now();

// Namenode 验证 Checkpoint 的条件,如可以,做相关准备,

// 如创建新的 EditLog 文件,将日志输出流重定向到新的 EditLog 文件等

NamenodeCommand cmd =

getNamenode().startCheckpoint(backupNode.getRegistration());

CheckpointCommand cpCmd = null;

switch(cmd.getAction()) {

case NamenodeProtocol.ACT_SHUTDOWN:

shutdown();

throw new IOException("Name-node " + backupNode.nnRpcAddress

+ " requested shutdown.");

case NamenodeProtocol.ACT_CHECKPOINT:

cpCmd = (CheckpointCommand)cmd;

break;

default:

throw new IOException("Unsupported NamenodeCommand:

"+cmd.getAction());

}

// 获取本次 Checkpoint 的标识 sig

CheckpointSignature sig = cpCmd.getSignature();

assert FSConstants.LAYOUT_VERSION == sig.getLayoutVersion() :

"Signature should have current layout version. Expected: "

+ FSConstants.LAYOUT_VERSION + " actual "+ sig.getLayoutVersion();

assert !backupNode.isRole(NamenodeRole.CHECKPOINT) ||

cpCmd.isImageObsolete() : "checkpoint node should always download

image.";

backupNode.setCheckpointState(CheckpointStates.UPLOAD_START);

if(cpCmd.isImageObsolete()) {

// First reset storage on disk and memory state

backupNode.resetNamespace();

// 如果本地的元数据过期,则从 Namenode 下载最新的元数据

downloadCheckpoint(sig);

}

……

}

Checkpointer doCheckpoint 方法。

Checkpointer.java

void doCheckpoint() throws IOException {

……

BackupStorage bnImage = getFSImage();

// 加载 FSImageEdits 等元数据文件,并在内存进行合并

bnImage.loadCheckpoint(sig);

sig.validateStorageInfo(bnImage);

// 将最新的元数据保存到磁盘

bnImage.saveCheckpoint();

// 如果需要 upload,则将元数据 upload NameNode

if(cpCmd.needToReturnImage())

uploadCheckpoint(sig);

getNamenode().endCheckpoint(backupNode.getRegistration(), sig);

bnImage.convergeJournalSpool();

backupNode.setRegistration(); // keep registration up to date

if(backupNode.isRole(NamenodeRole.CHECKPOINT))

getFSImage().getEditLog().close();

LOG.info("Checkpoint completed in " +

(FSNamesystem.now() - startTime)/1000 + " seconds."+

" New Image Size: " + bnImage.getFsImageName().length());

}

NameNodestartCheckpoint方法,用来验证注册的BackupNode信息:LAYOUT_VERSIONRegistrationID,同时还验证NameNode自身是否处于ACTIVE状态。

NameNode.java

public NamenodeCommand startCheckpoint(NamenodeRegistration

registration)

throws IOException {

// 验证

verifyRequest(registration);

if(!isRole(NamenodeRole.ACTIVE))

throw new IOException("Only an ACTIVE node can invoke startCheckpoint.");

return namesystem.startCheckpoint(registration, setRegistration());

}

public void verifyRequest(NodeRegistration nodeReg) throws IOException {

verifyVersion(nodeReg.getVersion());

if

(!namesystem.getRegistrationID().equals(nodeReg.getRegistrationID()))

throw new UnregisteredNodeException(nodeReg);

}

public void verifyVersion(int version) throws IOException {

if (version != LAYOUT_VERSION)

throw new IncorrectVersionException(version, "data node");

}

FSNamesystem.java

synchronized NamenodeCommand startCheckpoint(

NamenodeRegistration bnReg, // backup node

NamenodeRegistration nnReg) // active name-node

throws IOException {

LOG.info("Start checkpoint for " + bnReg.getAddress());

NamenodeCommand cmd = getFSImage().startCheckpoint(bnReg, nnReg);

getEditLog().logSync();

return cmd;

}

NameNodestartCheckpoint方法将会判断BackupNode本地镜像与activename-node镜像,如果不兼容或者有更新,则关闭BackupNode;如果BackupNode本地镜像比activename-node镜像老,则从activename-node下载新的镜像;如果两者镜像相同,则使用本地镜像作为当前镜像。这样可以确保BackupNode的镜像与activename-node镜像一致,并且以activename-node镜像为准。判断的依据是NamespaceIDLayoutVersion

FSImage.java

NamenodeCommand startCheckpoint(NamenodeRegistration bnReg, // backup

node

NamenodeRegistration nnReg) // active name-node

throws IOException {

String msg = null;

// Verify that checkpoint is allowed

if(bnReg.getNamespaceID() != this.getNamespaceID())

msg = "Name node " + bnReg.getAddress()+

" has incompatible namespace id: " + bnReg.getNamespaceID()+

" expected: " + getNamespaceID();

else if(bnReg.isRole(NamenodeRole.ACTIVE))

msg = "Name node " + bnReg.getAddress() +

" role " + bnReg.getRole() + ": checkpoint is not allowed.";

else if(bnReg.getLayoutVersion() < this.getLayoutVersion() ||

(bnReg.getLayoutVersion() == this.getLayoutVersion() &&

bnReg.getCTime() > this.getCTime()) ||

(bnReg.getLayoutVersion() == this.getLayoutVersion() &&

bnReg.getCTime() == this.getCTime() &&

bnReg.getCheckpointTime() > this.checkpointTime))

// remote node has newer image age

msg = "Name node " + bnReg.getAddress() +

" has newer image layout version: LV = " +

bnReg.getLayoutVersion()+ " cTime = " + bnReg.getCTime() +

" checkpointTime = " + bnReg.getCheckpointTime() +

". Current version: LV = " + getLayoutVersion() +

" cTime = " + getCTime() + " checkpointTime = " + checkpointTime;

if(msg != null) {

LOG.error(msg);

// 如果不符合条件,则返回 ACT_SHUTDOWN,将关闭 Backup Node

return new NamenodeCommand(NamenodeProtocol.ACT_SHUTDOWN);

}

……

}

NameNode startCheckpoint 方法。

FSImage.java

NamenodeCommand startCheckpoint(NamenodeRegistration bnReg, // backup

node

NamenodeRegistration nnReg) // active name-node

throws IOException {

……

// 判断 Backup Node 上的 FSImage 是否过期

// 如果是,Backup Node 需要从 NameNode 下载最新的 FSImage

boolean isImgObsolete = true;

if(bnReg.getLayoutVersion() == this.getLayoutVersion()

&& bnReg.getCTime() == this.getCTime()

&& bnReg.getCheckpointTime() == this.checkpointTime)

isImgObsolete = false;

// 如果 NameNode 上配置了存储元数据的本地目录,则 needToReturnImg=true

// Backup Node 进行 Checkpoint 后,需要将元数据 upload NameNode

boolean needToReturnImg = true;

if(getNumStorageDirs(NameNodeDirType.IMAGE) == 0)

// do not return image if there are no image directories

needToReturnImg = false;

CheckpointSignature sig = rollEditLog();

// 创建 Backup Node edit output stream,它会将 NameNode 的日志记录以 stream

// 的方式发送到指定的 Backup Node。对于 Checkpointer 则不需要创建

getEditLog().logJSpoolStart(bnReg, nnReg);

// Backup Node 返回 startCheckpiont 的结果

return new CheckpointCommand(sig, isImgObsolete, needToReturnImg);

}

FSImagerollEditLog方法。

FSImage.java

CheckpointSignature rollEditLog() throws IOException {

getEditLog().rollEditLog();

ckptState = CheckpointStates.ROLLED_EDITS;

// 增长 checkpoint agecheckpointTime 表示 FSImage 的保存时间

incrementCheckpointTime();

return new CheckpointSignature(this);

}

public FSEditLog getEditLog() {

return editLog;

}

FSEditLogrollEditLog方法。

FSEditLog.java

synchronized void rollEditLog() throws IOException {

waitForSyncToFinish();

// 如果 NameNode 本地不保存 EditLogs,不需要任何操作,直接返回

Iterator<StorageDirectory> it =

fsimage.dirIterator(NameNodeDirType.EDITS);

if(!it.hasNext())

return;

// 所有存储目录下的 edits.new 文件必须一致,要不全部存在,要不全部不存在

boolean alreadyExists = existsNew(it.next());

while(it.hasNext()) {

StorageDirectory sd = it.next();

if(alreadyExists != existsNew(sd))

throw new IOException(getEditNewFile(sd)

+ "should " + (alreadyExists ? "" : "not ") + "exist.");

}

// 如果 edits.new 存在,则不需要任何操作,直接返回

if(alreadyExists)

return;

// 检查之前移除的存储目录,是否还能使用

fsimage.attemptRestoreRemovedStorage();

// 关闭之前的 EditLog 输出流,重定向到 edits.new

divertFileStreams(

Storage.STORAGE_DIR_CURRENT + "/" +

NameNodeFile.EDITS_NEW.getName());

}

FSEditLogdivertFileStreams方法。

FSEditLog.java

synchronized void divertFileStreams(String dest) throws IOException {

waitForSyncToFinish();

assert getNumEditStreams() >= getNumEditsDirs() :"Inconsistent number

of streams";

ArrayList<EditLogOutputStream> errorStreams = null;

EditStreamIterator itE =

(EditStreamIterator)getOutputStreamIterator(JournalType.FILE);

Iterator<StorageDirectory> itD =

fsimage.dirIterator(NameNodeDirType.EDITS);

while(itE.hasNext() && itD.hasNext()) {

EditLogOutputStream eStream = itE.next();

StorageDirectory sd = itD.next();

if(!eStream.getName().startsWith(sd.getRoot().getPath()))

throw new IOException("Inconsistent order of edit streams: " + eStream);

try {

// 关闭之前的 stream,并创建一个新的 stream