首家大数据培训挂牌机构 股票代码:837906 | EN CN
异常解决方案—NameNode 宕机读写测试
异常解决方案—NameNode 宕机切换实验
异常解决方案—Data Node 配置
异常解决方案—Backup Node配置
异常解决方案—NameNode配置
异常解决方案—6.5.1异常情况分析
安装及配置
5NameNode安装及配置以及6BackupNode安装及配置
6.4.4虚拟机集群架设
6.4.3安装JDK
3.配置操作系统
2.创建虚拟机与安装操作系统
6.4构建实验环境
实验方案说明
故障切换机制
日志池(journal spool)机制
元数据操作情景分——BackupNode更新磁盘上的日志文件
元数据操作情景分——NameNode通过日志输出流......
元数据操作情景分——NameNode将日志写入日志文件
元数据操作情景分——NameNode更新内存镜像
元数据操作情景分——客户端执行命令流程
元数据操作情景分
Hadoop的Backup Node方案——运行机制分析(5)
Hadoop的Backup Node方案——运行机制分析(4)
Hadoop的Backup Node方案——运行机制分析(3)
Hadoop的Backup Node方案——运行机制分析(2)
Hadoop的Backup Node方案——运行机制分析(1)
Hadoop的Backup Node方案——系统架构
Hadoop的Backup Node方案—Backup Node 概述
元数据可靠性机制以及使用说明
Checkpoint 过程情景分析
元数据更新及日志写入情景分析
NameNode启动加载元数据情景分析
Hadoop的元数据备份机制的进行分析
元数据应用场景分析
Format情景分析
磁盘元数据文件
HDFS之代码分析——元数据结构
HDFS之内存元数据结构
什么是HDFS的元数据
Hadoop中DRDB方案和AvatarNode方案
Hadoop中常用各方案的对比
Hadoop的BackupNode方案
Hadoop的CheckpointNode方案
Hadoop的SecondaryNameNode方案
Hadoop的元数据备份方案
影响HDFS可用性的几个因素
什么是高可用性? 详细解析
HDFS系统架构简介
如何安装和配置Hadoop集群
如何在Windows下安装Hadoop
在MacOSX上安装与配置Hadoop
Linux下安装Hadoop的步骤
Hadoop的集群安全策略介绍
Hive的数据管理介绍
HBase的数据管理介绍
HDFS的数据管理介绍
Hadoop计算模型之 MapReduce 简介
Hadoop于分布式开发
Hadoop体系结构介绍
Hadoop的项目结构详解
一文读懂Hadoop

元数据更新及日志写入情景分析

于2018-01-15由小牛君创建

分享到:


本节以客户端 Mkdir 操作为例,分析元数据的数据流过程,并对其中涉及到备份目录部分进行重点分析。Mkdir 操作由客户端发起,具体实现是调用 DFSClient.java 中的 mkdirs 方法,mkdirs 又通过 RPC 远程调用 NameNode 所实现的 Mkdirs 接口。如下所示。

package org.apache.hadoop.hdfs;

DFSClient.java

public boolean mkdirs(String src, FsPermission permission, boolean

createParent)

throws IOException, UnresolvedLinkException {

……

try {

return namenode.mkdirs(src, masked, createParent);

} catch(RemoteException re) {

……

}

}

NameNode 的 mkdirs 方法调用了类 FSNamesystem 的 mkdirs 方法。如下所示。

package org.apache.hadoop.hdfs.server.namenode

NameNode.java

public boolean mkdirs(String src, FsPermission masked, boolean

createParent)

throws IOException {

……

return namesystem.mkdirs(src,

new

PermissionStatus(UserGroupInformation.getCurrentUser().getShor

tUserName(),

null, masked), createParent);

}

类 FSNamesystem 的 mkdirs 方法首先调用 mkdirsInternal,创建目录,然后调用 FSEditlog 的 logSync,将日志记录写入 Edits 文件。如下所示。

package org.apache.hadoop.hdfs.server.namenode;

FSNamesystem.java

public boolean mkdirs(String src, PermissionStatus permissions, boolean

createParent)

throws IOException, UnresolvedLinkException {

boolean status = mkdirsInternal(src, permissions, createParent);

getEditLog().logSync();

……

return status;

}

mkdirsInternal 方法调用 FSDirectory 的 mkdirs 方法来创建目录,并将日志记录写入相应的输出流。如下所示。

package org.apache.hadoop.hdfs.server.namenode;

FSNamesystem.java

private synchronized boolean mkdirsInternal(String src, PermissionStatus

permissions,

boolean createParent) throws IOException, UnresolvedLinkException {

……

if (!dir.mkdirs(src, permissions, false, now())) {

throw new IOException("Invalid directory name: " + src);

}

return true;

}

FSDirectory的mkdirs方法通过synchronized关键字对rootDir进行锁定,保证元数据操作原子性,mkdirs在创建完目录后,调用FSEditlog的logMkdir方法,将mkdir的日志记录写入日志输出流中,此时日志记录还并未写入磁盘文件。如下所示。

package org.apache.hadoop.hdfs.server.namenode;

FSDirectory.java

boolean mkdirs(String src, PermissionStatus permissions, boolean

inheritPermission, long now) throws FileAlreadyExistsException,

QuotaExceededException, UnresolvedLinkException {

……

synchronized(rootDir) {

……

// create directories beginning from the first null index

for(; i < inodes.length; i++) {

// 创建目录

……

// 将日志记录写入输出流

fsImage.getEditLog().logMkDir(cur, inodes[i]);

……

}

}

return true;

}

package org.apache.hadoop.hdfs.server.namenode;

FSEditLog.java

public void logMkDir(String path, INode newNode) {

// 创建要写入的日志信息

……

// 将信息写入输出流

logEdit(OP_MKDIR, new ArrayWritable(DeprecatedUTF8.class, info),

newNode.getPermissionStatus());

}

每个Edit备份目录下的Edits文件对应一个EditLogFileOutputStream,它是EditLogOutputStream的子类,editStreams是所有EditLogFileOutputStream的集合,logEdit方法会将Mkdir的日志记录依次写入editStreams中的每个EditLogFileOutputStream,对于无法正常访问的EditLogFileOutputStream,将把它加入到errorStreams中,在最后统一处理。同样的道理,在将日志记录写入EditLogOutputStream的时候发生阻塞,将导致整个Mkdir操作阻塞。如下所示。

package org.apache.hadoop.hdfs.server.namenode;

FSEditLog.java

synchronized void logEdit(byte op, Writable ... writables) {

if(getNumEditStreams() == 0)

throw new

java.lang.IllegalStateException(NO_JOURNAL_STREAMS_WARNING);

ArrayList<EditLogOutputStream> errorStreams = null;

long start = FSNamesystem.now();

for(EditLogOutputStream eStream : editStreams) {

FSImage.LOG.debug("loggin edits into " + eStream.getName() + "

stream");

if(!eStream.isOperationSupported(op))

continue;

try {

eStream.write(op, writables);

} catch (IOException ie) {

FSImage.LOG.warn("logEdit: removing "+ eStream.getName(), ie);

if(errorStreams == null)

errorStreams = new ArrayList<EditLogOutputStream>(1);

errorStreams.add(eStream);

}

}

processIOError(errorStreams, true);

recordTransaction(start);

}

所有的元数据操作最终都会调用logEdit方法写入日志记录,类FSEditLog中将每个写日志记录的动作称为一次事务(Transaction),用一个递增的变量txid来唯一标识此次事务,该ID保存在调用logEdit方法的线程的变量中,当该线程后续调用logSync时,使用该ID来确定自己之前写入的日志事务,并且通过与当前正在Sync的事务ID进行比较,确保flush到磁盘文件的日志记录的顺序性。

package org.apache.hadoop.hdfs.server.namenode;

FSEditLog.java

private void recordTransaction(long start) {

// get a new transactionId

txid++;

// record the transactionId when new data was written to the edits log

TransactionId id = myTransactionId.get();

id.txid = txid;

// update statistics

long end = FSNamesystem.now();

numTransactions++;

totalTimeTransactions += (end-start);

if (metrics != null)

metrics.transactions.inc((end-start));

}

}

EditLogFileOutputStream中设计了双缓冲bufCurrent和bufReady,其中bufCurrent用于接收日志流的输入,而bufReady则用于将日志流输出到磁盘文件,双缓冲的设计可以使得日志的写入和输出可以并行完成,提高效率。EditLogFileOutputStream的write方法会将数据写入bufCurrent缓冲中,但此时并不写入磁盘。如下所示。

package org.apache.hadoop.hdfs.server.namenode;

EditLogFileOutputStream.java

void write(byte op, Writable... writables) throws IOException {

write(op);

for (Writable w : writables) {

w.write(bufCurrent);

}

}

Mkdir日志记录写入EditLogFileOutputStream后,并未写入到磁盘的Edits文件,沿调用返回至类FSNamesystem的mkdirs方法后,将调用getEditLog().logSync(),最终将日志从EditLogOutputStream写入磁盘文件。类FSEditLog的logSync方法分为3个主要步骤。

第1步调用线程使用synchronized(this),获取对象锁,然后判断能否进行sync,判断的条件是:当前线程的事务ID(mytxid)小于正在处理的事务ID(synctxid),或者当前没有进行sync(isSyncRunning)。如果条件不满足,则说明正在处理之前的日志记录,那么线程调用wait,释放对象锁,进入等待状态,直到其他线程完成sync后,调用notifyAll通知其结束等待,wait还设置了超时时间,防止无限期地等待。当条件满足后,如果当前线程的事务ID(mytxid)小于正在处理的事务ID(synctxid),则说明该线程写入的日志记录已经处理完毕(多个线程都是调用logEdit方法,将日志记录写入bufCurrent,随后的任何一次logSync调用都将会把bufCurrent中所有的日志记录flush到磁盘),因此无需再处理直接返回。否则,向下执行Sync,主要是设置3个标志:

syncStart= txid;

isSyncRunning = true; 

sync = true;

其中syncStart表示正在进行Sync的事务ID,isSyncRunning=true表示当前正在做Sync,sync=true表示Sync确实在进行。接下来交换bufCurrent和bufReady,这样原来的bufReady将作为bufCurrent,接受新的日志写入,而原来的bufCurrent将作为bufReady,其内容将在第2步中被flush至磁盘,退出synchronized代码段,释放对象锁。

第2步具体执行Sync,在第2步中并没有加锁,这样日志的写入和日志的Sync可以并行进行。由于在第1步将isSyncRunning已经置为true,因此即便此时对象锁已经释放,也可以保证只有一个线程进入第2步。第2步中,同样采用了一个EditLogOutputStream数组errorStreams,将无法使用正常方法的Stream加入到该数组中,并在最后进行处理。Sync的操作主要是调用各个EditLogOutputStream中的flush操作完成的,flush方法将bufReady中的内容追加到该EditLogOutputStream对应的磁盘文件上。

第3步。将synctxid置为当前处理完毕的事务ID,表示当前进度,然后置isSyncRunning为false,表示Sync结束,然后调用notifyAll唤醒所有在该对象中等待的线程,当然,最后只有一个获得了对象锁的线程才能继续执行下去。整个logSync方法采用了try-finally的结构,第3步处于finally部分,这样可以确保进入第2步的线程一定可以执行到第3步,防止在前面出现异常退出,而不能释放对象锁,最终造成死锁。代码如下所示。

package org.apache.hadoop.hdfs.server.namenode;

FSEditLog.java

public void logSync() throws IOException {

……

boolean sync = false;

try {

// 第 1 步

synchronized (this) {

……

}

while (mytxid > synctxid && isSyncRunning) {

try {

wait(1000);

} catch (InterruptedException ie) {

}

}

if (mytxid <= synctxid) {

numTransactionsBatchedInSync++;

if (metrics != null)

metrics.transactionsBatchedInSync.inc();

return;

}

// now, this thread will do the sync

syncStart = txid;

isSyncRunning = true;

sync = true;

for(EditLogOutputStream eStream : editStreams) {

eStream.setReadyToFlush();

}

streams = editStreams.toArray(new

EditLogOutputStream[editStreams.size()]);

}

// 第 2 步

long start = FSNamesystem.now();

for (int idx = 0; idx < streams.length; idx++) {

EditLogOutputStream eStream = streams[idx];

try {

eStream.flush();

} catch (IOException ie) {

if (errorStreams == null) {

errorStreams = new ArrayList<EditLogOutputStream>(1);

}

errorStreams.add(eStream);

}

}

long elapsed = FSNamesystem.now() - start;

processIOError(errorStreams, true);

if (metrics != null) // Metrics non-null only when used inside name node

metrics.syncs.inc(elapsed);

} finally {

// 第 3 步

synchronized (this) {

synctxid = syncStart;

if (sync) {

isSyncRunning = false;

}

this.notifyAll();

}

}

}