于2018-01-15由小牛君创建
Checkpoint 将内存中最新的元数据以文件形式存储到各个备份目录之下,同时清除备份目录下原有的 fsiamge 文件和 edits 文件,这样可以定期的对 Fsiamge 文件和 Edits 文件进行合并,产生最新的 Fsiamge 文件,减少 NameNode 重启时的合并时间,同时又防止 Edits 的无限制增长。
Checkpoint 的功能可以由 Secondary NameNode、Checkpoint Node 或 BackupNode 来完成,下面将以 Backup Node 为例,结合代码对 Checkpoint 的过程进行分析,重点关注其中涉及到备份目录的部分。整个 Checkpoint 的过程由 Backup Node发起并结束,其中还通过 RPC 调用 NameNode 的部分接口,具体步骤描述如下。
(1)Backup Node RPC 调用 NameNode 的 startCheckPoint 方法,NameNode 遍历当前的日志输出流,并将其重新定位到其对应的备份目录下的新文件 edits.new 上,并创建一个指向 Backup Node 的输出流,用于向其发送日志记录,Backup Node 则会创建一个 journal spool,用于接收Checkpoint 期间 NameNode 发送过来的日志记录。
(2)Backup Node 根据需要从 NameNode 下载最新的 Fsimage 和 Edits 文件。
(3)Backup Node 将最新的 Fsimage 和 Edits 文件加载到内存合并。
(4)Backup Node 将合并后的元数据保存到磁盘,并创建新 Edits。
(5)Backup Node 根据需要,通过 http 上传合并后的 Fsimage,Name Node接收 Fsimage,将其命名为 Fsimage.ckpt,依次保存在各个备份目录下。
(6)Backup Node RPC 调用 NameNode 的 endCheckpoint 方法,NameNode将所有备份目录下的 edits.new 重命名为 edits,将 Fsimage.ckpt 重命名为 Fsimage,并重定向输出流到 edits。
(7)Backup Node 将 journal spool 的日志合并到内存,并销毁 journal spool。下面进行具体代码分析。Backup Node 的 Checkpoint 由 doCheckpoint 方法完成。如下所示。
package org.apache.hadoop.hdfs.server.namenode;
Checkpointer.java
void doCheckpoint() throws IOException {
long startTime = FSNamesystem.now();
// RPC 调用 NameNode 的 startCheckpoint 方法
NamenodeCommand cmd =
getNamenode().startCheckpoint(backupNode.getRegistration());
……
if(cpCmd.isImageObsolete()) {
……
// 根据需要下载最新的 Fsimage 和 Edits。
downloadCheckpoint(sig);
}
BackupStorage bnImage = getFSImage();
// 加载、合并 FSImage 和 Edits。
bnImage.loadCheckpoint(sig);
sig.validateStorageInfo(bnImage);
// 将内存中的元数据保存到磁盘
bnImage.saveCheckpoint();
// 根据需要上传最新的 FSImage
if(cpCmd.needToReturnImage())
uploadCheckpoint(sig);
// RPC 调用 NameNode 的 endCheckpoint,结束 Checkpoint
getNamenode().endCheckpoint(backupNode.getRegistration(), sig);
// 将 journal spool 中的日志合并到内存,销毁 journal spool
bnImage.convergeJournalSpool();
……
}
第一步Backup Node RPC 调用 NameNode 的 startCheckpoint 方法,该方法完成以下几个任务:
(1)首先对 Checkpoint 进行验证,判断是否允许进行 Checkpoint;
(2)其次,决定 BackupNode 是否需要下载 FSImage 和 Edits;
(3)再次,决定 Backup Node 是否需要将合并后的 FSImage 文件上传回NameNode;
(4)接下来,在所有 NameNode 的 Edits 备份目录下创建 edits.new 文件,关闭原有的文件输出流,创建指向 edits.new 的文件输出流,在 Checkpoint过程中,所有的日志记录都将写入 edits.new 文件;
(5)接下来,创建一个指向 Backup Node 的输出流,同时在 Backup Node 端创建 journal spool,用于接收 Checkpoint 期间 NameNode 发送过来的日志记录。
package org.apache.hadoop.hdfs.server.namenode;
NameNode.java
public NamenodeCommand startCheckpoint(NamenodeRegistration
registration)
throws IOException {
verifyRequest(registration);
if(!isRole(NamenodeRole.ACTIVE))
throw new IOException("Only an ACTIVE node can invoke
startCheckpoint.");
return namesystem.startCheckpoint(registration, setRegistration());
}
package org.apache.hadoop.hdfs.server.namenode;
FSNamesystem.java
synchronized NamenodeCommand startCheckpoint(
NamenodeRegistration bnReg, // backup node
NamenodeRegistration nnReg) // active name-node
throws IOException {
……
NamenodeCommand cmd = getFSImage().startCheckpoint(bnReg, nnReg);
// 日志记录写入 edits.new
getEditLog().logSync();
return cmd;
}
package org.apache.hadoop.hdfs.server.namenode;
FSImage.java
NamenodeCommand startCheckpoint(NamenodeRegistration bnReg, // backup node
NamenodeRegistration nnReg) // active name-node
throws IOException {
// 验证是否允许进行 Checkpoint
……
// 决定 Backup Node 是否需要下载 FSImage 和 Edits
boolean isImgObsolete = true;
if(bnReg.getLayoutVersion() == this.getLayoutVersion()
&& bnReg.getCTime() == this.getCTime()
&& bnReg.getCheckpointTime() == this.checkpointTime)
isImgObsolete = false;
// 决定 Backup Node 是否上传合并后的 FSImage
boolean needToReturnImg = true;
// 如果没有配置备份目录,则 Backup Node 不需要上传
if(getNumStorageDirs(NameNodeDirType.IMAGE) == 0)
// do not return image if there are no image directories
needToReturnImg = false;
// 创建新的 edits.new,将备份目录的输出流重定向到 edits.new
CheckpointSignature sig = rollEditLog();
// 增加一个 EditLogBackupOutputStream 到 editStreams,用于向 Backup Node
// 输出 HDFS 产生的日志记录
getEditLog().logJSpoolStart(bnReg, nnReg);
return new CheckpointCommand(sig, isImgObsolete, needToReturnImg);
}
package org.apache.hadoop.hdfs.server.namenode;
FSImage.java
CheckpointSignature rollEditLog() throws IOException {
getEditLog().rollEditLog();
ckptState = CheckpointStates.ROLLED_EDITS;
// If checkpoint fails this should be the most recent image, therefore
incrementCheckpointTime();
return new CheckpointSignature(this);
}
package org.apache.hadoop.hdfs.server.namenode;
FSEditLog.java
synchronized void rollEditLog() throws IOException {
waitForSyncToFinish();
Iterator<StorageDirectory> it =
fsimage.dirIterator(NameNodeDirType.EDITS);
if(!it.hasNext())
return;
// 验证备份目录下 edits.new 的一致性,要么都存在,要么都不存在,否则抛出异常
boolean alreadyExists = existsNew(it.next());
while(it.hasNext()) {
StorageDirectory sd = it.next();
if(alreadyExists != existsNew(sd))
throw new IOException(getEditNewFile(sd)
+ "should " + (alreadyExists ? "" : "not ") + "exist.");
}
// 如果 edits.new 存在,则不需要做任何处理
if(alreadyExists)
return;
// 检查之前访问失败的备份目录是否可用,如果可以,把它加入回来
fsimage.attemptRestoreRemovedStorage();
// 将输出流重定向到 edits.new
divertFileStreams(
Storage.STORAGE_DIR_CURRENT + "/" +
NameNodeFile.EDITS_NEW.getName());
}
类FSEditLog的divertFileStreams方法完成重定向输出流的功能,具体步骤是:
(1)检查当前输出流的路径与配置的存储备份目录是否一致;
(2)关闭原来的输出流;
(3)创建指向edits.new的输出流;
(4)以新创建的输出流替代当前输出流。
在对存储备份目录进行遍历的过程中,同样设置了一个EditLogOutputStream数组errorStreams,将无法正常访问的备份目录对应的输出流加入到errorStreams,并在最后进行错误处理。
package org.apache.hadoop.hdfs.server.namenode;
FSEditLog.java
synchronized void divertFileStreams(String dest) throws IOException {
waitForSyncToFinish();
assert getNumEditStreams() >= getNumEditsDirs() :"Inconsistent number
of streams";
ArrayList<EditLogOutputStream> errorStreams = null;
EditStreamIterator itE =
(EditStreamIterator)getOutputStreamIterator(JournalType.FILE);
Iterator<StorageDirectory> itD =
fsimage.dirIterator(NameNodeDirType.EDITS);
while(itE.hasNext() && itD.hasNext()) {
EditLogOutputStream eStream = itE.next();
StorageDirectory sd = itD.next();
if(!eStream.getName().startsWith(sd.getRoot().getPath()))
throw new IOException("Inconsistent order of edit streams: " + eStream);
try {
// 关闭原来的 stream
closeStream(eStream);
// 创建一个新的 stream
eStream = new EditLogFileOutputStream(new File(sd.getRoot(), dest),
sizeOutputFlushBuffer);
eStream.create();
// 使用新的 stream 替换原来的 stream
itE.replace(eStream);
} catch (IOException e) {
FSNamesystem.LOG.warn("Error in editStream " + eStream.getName(), e);
if(errorStreams == null)
errorStreams = new ArrayList<EditLogOutputStream>(1);
errorStreams.add(eStream);
}
}
processIOError(errorStreams, true);
}
logJSpoolStart创建是指向BackupNode的EditLogBackupOutputStream输出流,它是EditLogOutputStream的子类,并且复写(override)了父类的相关接口。至此,可以总结一下:
对于NameNode来说,它的所有日志记录都通过一组EditLogOutputStream进行输出,而在具体实例化的时候,这一组EditLogOutputStream中包括多个EditLogFileOutputStream和1个EditLogBackupOutputStream:
每个EditLogFileOutputStream将日志记录最终输出到对应的备份目录下的edits文件或edits.new文件;1个EditLogBackupOutputStream对应的是注册上来的BackupNode,EditLogBackupOutputStream将日志通过网络发送到BackupNode。对于NameNode来说,通过统一的EditLogOutputStream的接口进行操作,无须关心具体的实例对象是什么类型,屏蔽了差异,便于扩展,而对于EditLogOutputStream的实例来说,对父类接口的调用,通过多态可以直接转到相应子类的实现。
logJSpoolStart最后调用logEdit方法,写入一条OP_JSPOOL_START的日志记录,其作用是通知Backup Node创建一个日志池(journal spool),该日志池可以用来缓存NameNode发送过来的日志记录,以供BackupNode后续处理。
logEdit只是将日志记录写入到输出流的缓存中,也就是bufCurrent中,此时还并未发送到Backup Node,真正的发送需要等到后续调用getEditLog().logSync()。
package org.apache.hadoop.hdfs.server.namenode;
FSEditLog.java
synchronized void logJSpoolStart(NamenodeRegistration bnReg, // backup node
NamenodeRegistration nnReg) // active name-node
throws IOException {
// NameNode 上没有指向 checkpoint node 的输出流
if(bnReg.isRole(NamenodeRole.CHECKPOINT))
return;
if(editStreams == null)
editStreams = new ArrayList<EditLogOutputStream>();
EditLogOutputStream boStream = null;
// 查找是否有指向该 Backup Node 的输出流
for(EditLogOutputStream eStream : editStreams) {
if(eStream.getName().equals(bnReg.getAddress())) {
boStream = eStream; // already there
break;
}
}
//如果没有,则创建新的指向该 Backup Node 的输出流
if(boStream == null) {
boStream = new EditLogBackupOutputStream(bnReg, nnReg);
editStreams.add(boStream);
}
logEdit(OP_JSPOOL_START, (Writable[])null);
}
logEdit 方法遍历输出流,依次调用输出流的 write 方法,写入日志记录,由于EditLogOutputStream 的子类复写了 write 方法,因此,实际上调用的是各个子类复写的 write 方法。
package org.apache.hadoop.hdfs.server.namenode;
FSEditLog.java
synchronized void logEdit(byte op, Writable ... writables) {
if(getNumEditStreams() == 0)
throw new
java.lang.IllegalStateException(NO_JOURNAL_STREAMS_WARNING);
ArrayList<EditLogOutputStream> errorStreams = null;
long start = FSNamesystem.now();
// 遍历所有的输出流
for(EditLogOutputStream eStream : editStreams) {
FSImage.LOG.debug("loggin edits into " + eStream.getName() + " stream");
if(!eStream.isOperationSupported(op))
continue;
try {
eStream.write(op, writables);
} catch (IOException ie) {
// 将访问异常的输出流加入到 errorStreams
FSImage.LOG.warn("logEdit: removing "+ eStream.getName(), ie);
if(errorStreams == null)
errorStreams = new ArrayList<EditLogOutputStream>(1);
errorStreams.add(eStream);
}
}
// 处理所有访问异常的输出流
processIOError(errorStreams, true);
recordTransaction(start);
}
EditLogBackupOutputStream 的子类复写了 write 方法,它将创建一条日志记录 JournalRecord,将其加入到 bufCurrent 缓存汇中。
package org.apache.hadoop.hdfs.server.namenode;
EditLogBackupOutputStream.java
void write(byte op, Writable ... writables) throws IOException {
bufCurrent.add(new JournalRecord(op, writables));
}
沿调用依次返回至FSNamesystem.java3917,调用NamenodeCommand cmd =getFSImage().startCheckpoint(bnReg,nnReg)结束后,将调用getEditLog().logSync(); ,logSync中也是遍历输出流,调用输出流的flush方法,在父类的flush方法中调用了flushAndSync方法,输出流的子类复写了flushAndSync方法,因此对父类接口flushAndSync的调用将直接转移至相应子类的实现,我们下面看下logSync中对EditLogBackupOutputStream的调用。
package org.apache.hadoop.hdfs.server.namenode;
FSEditLog.java
public void logSync() throws IOException {
……
try {
synchronized (this) {
……
}
……
// do the sync
long start = FSNamesystem.now();
// 遍历所有的输出流
for (int idx = 0; idx < streams.length; idx++) {
EditLogOutputStream eStream = streams[idx];
try {
eStream.flush();
} catch (IOException ie) {
// 输出流异常处理
……
if (errorStreams == null) {
errorStreams = new ArrayList<EditLogOutputStream>(1);
}
errorStreams.add(eStream);
}
}
} finally {
……
在线咨询
免费热线
资料发放
技术答疑
关注微信