大数据培训新三板挂牌机构 股票代码:837906 | EN CN
异常解决方案—NameNode 宕机读写测试
异常解决方案—NameNode 宕机切换实验
异常解决方案—Data Node 配置
异常解决方案—Backup Node配置
异常解决方案—NameNode配置
异常解决方案—6.5.1异常情况分析
安装及配置
5NameNode安装及配置以及6BackupNode安装及配置
6.4.4虚拟机集群架设
6.4.3安装JDK
3.配置操作系统
2.创建虚拟机与安装操作系统
6.4构建实验环境
实验方案说明
故障切换机制
日志池(journal spool)机制
元数据操作情景分——BackupNode更新磁盘上的日志文件
元数据操作情景分——NameNode通过日志输出流......
元数据操作情景分——NameNode将日志写入日志文件
元数据操作情景分——NameNode更新内存镜像
元数据操作情景分——客户端执行命令流程
元数据操作情景分
Hadoop的Backup Node方案——运行机制分析(5)
Hadoop的Backup Node方案——运行机制分析(4)
Hadoop的Backup Node方案——运行机制分析(3)
Hadoop的Backup Node方案——运行机制分析(2)
Hadoop的Backup Node方案——运行机制分析(1)
Hadoop的Backup Node方案——系统架构
Hadoop的Backup Node方案—Backup Node 概述
元数据可靠性机制以及使用说明
Checkpoint 过程情景分析
元数据更新及日志写入情景分析
NameNode启动加载元数据情景分析
Hadoop的元数据备份机制的进行分析
元数据应用场景分析
Format情景分析
磁盘元数据文件
HDFS之代码分析——元数据结构
HDFS之内存元数据结构
什么是HDFS的元数据
Hadoop中DRDB方案和AvatarNode方案
Hadoop中常用各方案的对比
Hadoop的BackupNode方案
Hadoop的CheckpointNode方案
Hadoop的SecondaryNameNode方案
Hadoop的元数据备份方案
影响HDFS可用性的几个因素
什么是高可用性? 详细解析
HDFS系统架构简介
如何安装和配置Hadoop集群
如何在Windows下安装Hadoop
在MacOSX上安装与配置Hadoop
Linux下安装Hadoop的步骤
Hadoop的集群安全策略介绍
Hive的数据管理介绍
HBase的数据管理介绍
HDFS的数据管理介绍
Hadoop计算模型之 MapReduce 简介
Hadoop于分布式开发
Hadoop体系结构介绍
Hadoop的项目结构详解
一文读懂Hadoop

Checkpoint 过程情景分析

于2018-01-15由小牛君创建

分享到:


Checkpoint 将内存中最新的元数据以文件形式存储到各个备份目录之下,同时清除备份目录下原有的 fsiamge 文件和 edits 文件,这样可以定期的对 Fsiamge 文件和 Edits 文件进行合并,产生最新的 Fsiamge 文件,减少 NameNode 重启时的合并时间,同时又防止 Edits 的无限制增长。

Checkpoint 的功能可以由 Secondary NameNodeCheckpoint Node   BackupNode 来完成,下面将以 Backup Node 为例,结合代码对 Checkpoint 的过程进行分析,重点关注其中涉及到备份目录的部分。整个 Checkpoint 的过程由 Backup Node发起并结束,其中还通过 RPC 调用 NameNode 的部分接口,具体步骤描述如下。

1Backup  Node  RPC 调用 NameNode startCheckPoint 方法,NameNode 遍历当前的日志输出流,并将其重新定位到其对应的备份目录下的新文件 edits.new 上,并创建一个指向 Backup Node 的输出流,用于向其发送日志记录,Backup  Node 则会创建一个 journal  spool,用于接收Checkpoint 期间 NameNode 发送过来的日志记录。

2Backup Node 根据需要从 NameNode 下载最新的 Fsimage Edits 文件。

3Backup Node 将最新的 Fsimage Edits 文件加载到内存合并。

4Backup Node 将合并后的元数据保存到磁盘,并创建新 Edits

5Backup  Node 根据需要,通过 http 上传合并后的 FsimageName  Node接收 Fsimage,将其命名为 Fsimage.ckpt,依次保存在各个备份目录下。

6Backup Node RPC 调用  NameNode endCheckpoint 方法,NameNode将所有备份目录下的 edits.new 重命名为 edits,将 Fsimage.ckpt 重命名为 Fsimage,并重定向输出流到 edits

7Backup Node journal spool 的日志合并到内存,并销毁 journal spool。下面进行具体代码分析。Backup Node  Checkpoint doCheckpoint 方法完成。如下所示。

package org.apache.hadoop.hdfs.server.namenode;

Checkpointer.java

void doCheckpoint() throws IOException {

long startTime = FSNamesystem.now();

// RPC 调用 NameNode startCheckpoint 方法

NamenodeCommand cmd =

getNamenode().startCheckpoint(backupNode.getRegistration());

……

if(cpCmd.isImageObsolete()) {

……

// 根据需要下载最新的 Fsimage Edits

downloadCheckpoint(sig);

}

BackupStorage bnImage = getFSImage();

// 加载、合并 FSImage Edits

bnImage.loadCheckpoint(sig);

sig.validateStorageInfo(bnImage);

// 将内存中的元数据保存到磁盘

bnImage.saveCheckpoint();

// 根据需要上传最新的 FSImage

if(cpCmd.needToReturnImage())

uploadCheckpoint(sig);

// RPC 调用 NameNode endCheckpoint,结束 Checkpoint

getNamenode().endCheckpoint(backupNode.getRegistration(), sig);

// journal spool 中的日志合并到内存,销毁 journal spool

bnImage.convergeJournalSpool();

……

}

第一步Backup  Node  RPC 调用 NameNode startCheckpoint 方法,该方法完成以下几个任务:

1)首先对 Checkpoint 进行验证,判断是否允许进行 Checkpoint

2)其次,决定 BackupNode 是否需要下载 FSImage Edits

3)再次,决定 Backup  Node 是否需要将合并后的 FSImage 文件上传回NameNode

4)接下来,在所有 NameNode Edits 备份目录下创建 edits.new 文件,关闭原有的文件输出流,创建指向 edits.new 的文件输出流,在 Checkpoint过程中,所有的日志记录都将写入 edits.new 文件;

5)接下来,创建一个指向 Backup Node 的输出流,同时在 Backup Node 端创建 journal spool,用于接收  Checkpoint 期间 NameNode 发送过来的日志记录。

package org.apache.hadoop.hdfs.server.namenode;

NameNode.java

public NamenodeCommand startCheckpoint(NamenodeRegistration

registration)

throws IOException {

verifyRequest(registration);

if(!isRole(NamenodeRole.ACTIVE))

throw new IOException("Only an ACTIVE node can invoke

startCheckpoint.");

return namesystem.startCheckpoint(registration, setRegistration());

}

package org.apache.hadoop.hdfs.server.namenode;

FSNamesystem.java

synchronized NamenodeCommand startCheckpoint(

NamenodeRegistration bnReg, // backup node

NamenodeRegistration nnReg) // active name-node

throws IOException {

……

NamenodeCommand cmd = getFSImage().startCheckpoint(bnReg, nnReg);

// 日志记录写入 edits.new

getEditLog().logSync();

return cmd;

}

package org.apache.hadoop.hdfs.server.namenode;

FSImage.java

NamenodeCommand startCheckpoint(NamenodeRegistration bnReg, // backup node

NamenodeRegistration nnReg) // active name-node

throws IOException {

// 验证是否允许进行 Checkpoint

……

// 决定 Backup Node 是否需要下载 FSImage Edits

boolean isImgObsolete = true;

if(bnReg.getLayoutVersion() == this.getLayoutVersion()

&& bnReg.getCTime() == this.getCTime()

&& bnReg.getCheckpointTime() == this.checkpointTime)

isImgObsolete = false;

// 决定 Backup Node 是否上传合并后的 FSImage

boolean needToReturnImg = true;

// 如果没有配置备份目录,则 Backup Node 不需要上传

if(getNumStorageDirs(NameNodeDirType.IMAGE) == 0)

// do not return image if there are no image directories

needToReturnImg = false;

// 创建新的 edits.new,将备份目录的输出流重定向到 edits.new

CheckpointSignature sig = rollEditLog();

// 增加一个 EditLogBackupOutputStream editStreams,用于向 Backup Node

// 输出 HDFS 产生的日志记录

getEditLog().logJSpoolStart(bnReg, nnReg);

return new CheckpointCommand(sig, isImgObsolete, needToReturnImg);

}

package org.apache.hadoop.hdfs.server.namenode;

FSImage.java

CheckpointSignature rollEditLog() throws IOException {

getEditLog().rollEditLog();

ckptState = CheckpointStates.ROLLED_EDITS;

// If checkpoint fails this should be the most recent image, therefore

incrementCheckpointTime();

return new CheckpointSignature(this);

}

package org.apache.hadoop.hdfs.server.namenode;

FSEditLog.java

synchronized void rollEditLog() throws IOException {

waitForSyncToFinish();

Iterator<StorageDirectory> it =

fsimage.dirIterator(NameNodeDirType.EDITS);

if(!it.hasNext())

return;

// 验证备份目录下 edits.new 的一致性,要么都存在,要么都不存在,否则抛出异常

boolean alreadyExists = existsNew(it.next());

while(it.hasNext()) {

StorageDirectory sd = it.next();

if(alreadyExists != existsNew(sd))

throw new IOException(getEditNewFile(sd)

+ "should " + (alreadyExists ? "" : "not ") + "exist.");

}

// 如果 edits.new 存在,则不需要做任何处理

if(alreadyExists)

return;

// 检查之前访问失败的备份目录是否可用,如果可以,把它加入回来

fsimage.attemptRestoreRemovedStorage();

// 将输出流重定向到 edits.new

divertFileStreams(

Storage.STORAGE_DIR_CURRENT + "/" +

NameNodeFile.EDITS_NEW.getName());

}

FSEditLogdivertFileStreams方法完成重定向输出流的功能,具体步骤是:

1)检查当前输出流的路径与配置的存储备份目录是否一致;

2)关闭原来的输出流;

3)创建指向edits.new的输出流;

4)以新创建的输出流替代当前输出流。

在对存储备份目录进行遍历的过程中,同样设置了一个EditLogOutputStream数组errorStreams,将无法正常访问的备份目录对应的输出流加入到errorStreams,并在最后进行错误处理。

package org.apache.hadoop.hdfs.server.namenode;

FSEditLog.java

synchronized void divertFileStreams(String dest) throws IOException {

waitForSyncToFinish();

assert getNumEditStreams() >= getNumEditsDirs() :"Inconsistent number

of streams";

ArrayList<EditLogOutputStream> errorStreams = null;

EditStreamIterator itE =

(EditStreamIterator)getOutputStreamIterator(JournalType.FILE);

Iterator<StorageDirectory> itD =

fsimage.dirIterator(NameNodeDirType.EDITS);

while(itE.hasNext() && itD.hasNext()) {

EditLogOutputStream eStream = itE.next();

StorageDirectory sd = itD.next();

if(!eStream.getName().startsWith(sd.getRoot().getPath()))

throw new IOException("Inconsistent order of edit streams: " + eStream);

try {

// 关闭原来的 stream

closeStream(eStream);

// 创建一个新的 stream

eStream = new EditLogFileOutputStream(new File(sd.getRoot(), dest),

sizeOutputFlushBuffer);

eStream.create();

// 使用新的 stream 替换原来的 stream

itE.replace(eStream);

} catch (IOException e) {

FSNamesystem.LOG.warn("Error in editStream " + eStream.getName(), e);

if(errorStreams == null)

errorStreams = new ArrayList<EditLogOutputStream>(1);

errorStreams.add(eStream);

}

}

processIOError(errorStreams, true);

}

logJSpoolStart创建是指向BackupNodeEditLogBackupOutputStream输出流,它是EditLogOutputStream的子类,并且复写(override)了父类的相关接口。至此,可以总结一下:

对于NameNode来说,它的所有日志记录都通过一组EditLogOutputStream进行输出,而在具体实例化的时候,这一组EditLogOutputStream中包括多个EditLogFileOutputStream1EditLogBackupOutputStream

每个EditLogFileOutputStream将日志记录最终输出到对应的备份目录下的edits文件或edits.new文件;1EditLogBackupOutputStream对应的是注册上来的BackupNodeEditLogBackupOutputStream将日志通过网络发送到BackupNode。对于NameNode来说,通过统一的EditLogOutputStream的接口进行操作,无须关心具体的实例对象是什么类型,屏蔽了差异,便于扩展,而对于EditLogOutputStream的实例来说,对父类接口的调用,通过多态可以直接转到相应子类的实现。

logJSpoolStart最后调用logEdit方法,写入一条OP_JSPOOL_START的日志记录,其作用是通知Backup Node创建一个日志池(journal spool),该日志池可以用来缓存NameNode发送过来的日志记录,以供BackupNode后续处理。

logEdit只是将日志记录写入到输出流的缓存中,也就是bufCurrent中,此时还并未发送到Backup Node,真正的发送需要等到后续调用getEditLog().logSync()

package org.apache.hadoop.hdfs.server.namenode;

FSEditLog.java

synchronized void logJSpoolStart(NamenodeRegistration bnReg, // backup node

NamenodeRegistration nnReg) // active name-node

throws IOException {

// NameNode 上没有指向 checkpoint node 的输出流

if(bnReg.isRole(NamenodeRole.CHECKPOINT))

return;

if(editStreams == null)

editStreams = new ArrayList<EditLogOutputStream>();

EditLogOutputStream boStream = null;

// 查找是否有指向该 Backup Node 的输出流

for(EditLogOutputStream eStream : editStreams) {

if(eStream.getName().equals(bnReg.getAddress())) {

boStream = eStream; // already there

break;

}

}

//如果没有,则创建新的指向该 Backup Node 的输出流

if(boStream == null) {

boStream = new EditLogBackupOutputStream(bnReg, nnReg);

editStreams.add(boStream);

}

logEdit(OP_JSPOOL_START, (Writable[])null);

}

logEdit 方法遍历输出流,依次调用输出流的 write 方法,写入日志记录,由于EditLogOutputStream 的子类复写了 write 方法,因此,实际上调用的是各个子类复写的 write 方法。

package org.apache.hadoop.hdfs.server.namenode;

FSEditLog.java

synchronized void logEdit(byte op, Writable ... writables) {

if(getNumEditStreams() == 0)

throw new

java.lang.IllegalStateException(NO_JOURNAL_STREAMS_WARNING);

ArrayList<EditLogOutputStream> errorStreams = null;

long start = FSNamesystem.now();

// 遍历所有的输出流

for(EditLogOutputStream eStream : editStreams) {

FSImage.LOG.debug("loggin edits into " + eStream.getName() + " stream");

if(!eStream.isOperationSupported(op))

continue;

try {

eStream.write(op, writables);

} catch (IOException ie) {

// 将访问异常的输出流加入到 errorStreams

FSImage.LOG.warn("logEdit: removing "+ eStream.getName(), ie);

if(errorStreams == null)

errorStreams = new ArrayList<EditLogOutputStream>(1);

errorStreams.add(eStream);

}

}

// 处理所有访问异常的输出流

processIOError(errorStreams, true);

recordTransaction(start);

}

EditLogBackupOutputStream 的子类复写了 write 方法,它将创建一条日志记录 JournalRecord,将其加入到 bufCurrent 缓存汇中。

package org.apache.hadoop.hdfs.server.namenode;

EditLogBackupOutputStream.java

void write(byte op, Writable ... writables) throws IOException {

bufCurrent.add(new JournalRecord(op, writables));

}

沿调用依次返回至FSNamesystem.java3917,调用NamenodeCommand cmd =getFSImage().startCheckpoint(bnReg,nnReg)结束后,将调用getEditLog().logSync(); logSync中也是遍历输出流,调用输出流的flush方法,在父类的flush方法中调用了flushAndSync方法,输出流的子类复写了flushAndSync方法,因此对父类接口flushAndSync的调用将直接转移至相应子类的实现,我们下面看下logSync中对EditLogBackupOutputStream的调用。

package org.apache.hadoop.hdfs.server.namenode;

FSEditLog.java

public void logSync() throws IOException {

……

try {

synchronized (this) {

……

}

……

// do the sync

long start = FSNamesystem.now();

// 遍历所有的输出流

for (int idx = 0; idx < streams.length; idx++) {

EditLogOutputStream eStream = streams[idx];

try {

eStream.flush();

} catch (IOException ie) {

// 输出流异常处理

……

if (errorStreams == null) {

errorStreams = new ArrayList<EditLogOutputStream>(1);

}

errorStreams.add(eStream);

}

}

} finally {

……