原文出自云台博客:http://yuntai.1kapp.com/?p=941
NameNode会接收两种情况的块报告,DataNode全部块报告与增量块报告。
4.1全量报告分析
目前全量报告以周期性进行报告,既然已经有启动时候的全量数据块报告,错误块报告,增量块报告(包括删除块报告),为什么还需要周期性全量块报告呢?比如某DataNode接受到数据块但是增量报告失败,那就需要周期性报告来解决了,或者NameNode给DN发送了删除块的命令,但是由于网络等异常,DN没收收到删除命令,这样DN再把这些数据块报告上来就是无效块,需要再次放入无效队列,下次心跳再命令DN删除;同时比如每次块报告会清理DatanodeDescriptor对象维护的块列表还有某个块的信息,但是DN节点再也没有报告上来,定时清除这些无效信息,有助于提高块列表的操作性能,从而提供NameNode的性能。同时我们可以考虑分析是否还有其他原因可能影响NameNode的性能。
为了提高HDFS启动速度,在Hadoop2.0版本中全量块报告分为了两种:启动时候块报告与非启动的时候块报告,即是否是第一次块报告。那么具体又是如何来提高启动速度的呢?在启动的时候,不计算哪些文件元数据需要删除,不计算无效快,这些处理都推迟到下一次块报告进行处理

对于第一次块报告,代码调用流程为:NameNodeRpcServer.blockReport()->BlockManager. processReport()->BlockManager.processFirstBlockReport().对Standby节点,如果报告的数据块所相关元数据日志从节点还没有加载完毕,则会将报告的块信息加入一个队列,当Standby节点加载元数据后,再处理该消息队列,第一次块报告处理详细代码如下,可以看到,为了提高报告速度,只有简单的几步进行块报告处理,仅有验证块是否损坏,然后直接判断块状态是否为FINALIZED状态,如果是,就直接建立块与DN节点的映射。
-
private void processFirstBlockReport(final DatanodeDescriptor node,
-
final BlockListAsLongs report) throws IOException {
-
if (report == null) return;
-
assert (namesystem.hasWriteLock());
-
assert (node.numBlocks() == 0);
-
BlockReportIterator itBR = report.getBlockReportIterator();
-
-
while(itBR.hasNext()) {
-
Block iblk = itBR.next();
-
ReplicaState reportedState = itBR.getCurrentReplicaState();
-
-
if (shouldPostponeBlocksFromFuture&&
-
namesystem.isGenStampInFuture(iblk.getGenerationStamp())) {
-
-
queueReportedBlock(node, iblk, reportedState,
-
QUEUE_REASON_FUTURE_GENSTAMP);
-
continue;
-
}
-
-
BlockInfo storedBlock = blocksMap.getStoredBlock(iblk);
-
-
if (storedBlock == null) continue;
-
-
-
BlockUCState ucState = storedBlock.getBlockUCState();
-
BlockToMarkCorrupt c = checkReplicaCorrupt(
-
iblk, reportedState, storedBlock, ucState, node);
-
if (c != null) {
-
-
-
if (shouldPostponeBlocksFromFuture) {
-
-
-
queueReportedBlock(node, iblk, reportedState,
-
QUEUE_REASON_CORRUPT_STATE);
-
} else {
-
-
markBlockAsCorrupt(c, node);
-
}
-
continue;
-
}
-
-
-
if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
-
((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent(
-
node, iblk, reportedState);
-
-
}
-
-
if (reportedState == ReplicaState.FINALIZED) {
-
addStoredBlockImmediate(storedBlock, node);
-
}
-
}
-
}
而对于非第一次块报告,情况就要复杂一些了,对于报告的每个块信息,不仅会建立块与DN的映射,而且均会检查块是否损坏,块是是否无效,元数据是否已经无效应该删除,是否为UC状态的块等,该过程主要由方法processReport来完成
-
private void processReport(final DatanodeDescriptor node,
-
final BlockListAsLongs report) throws IOException {
-
-
-
-
-
Collection toAdd = new LinkedList();
-
Collection toRemove = new LinkedList();
-
Collection toInvalidate = new LinkedList();
-
Collection toCorrupt = new LinkedList();
-
Collection toUC = new LinkedList();
-
-
reportDiff(node, report, toAdd, toRemove, toInvalidate, toCorrupt, toUC);
-
-
-
for (StatefulBlockInfo b : toUC) {
-
addStoredBlockUnderConstruction(b.storedBlock, node, b.reportedState);
-
}
-
for (Block b : toRemove) {
-
removeStoredBlock(b, node);
-
}
-
for (BlockInfo b : toAdd) {
-
addStoredBlock(b, node, null, true);
-
}
-
for (Block b : toInvalidate) {
-
NameNode.stateChangeLog.info("BLOCK* processReport: block "
-
+ b + " on " + node + " size " + b.getNumBytes()
-
+ " does not belong to any file.");
-
addToInvalidates(b, node);
-
}
-
for (BlockToMarkCorrupt b : toCorrupt) {
-
markBlockAsCorrupt(b, node);
-
}
-
}
在reportDiff方法内,实现如下:
-
private void reportDiff(DatanodeDescriptor dn,
-
BlockListAsLongs newReport,
-
Collection toAdd,
-
Collection toRemove,
-
Collection toInvalidate,
-
Collection toCorrupt,
-
Collection toUC) {
-
-
-
BlockInfo delimiter = new BlockInfo(new Block(), 1);
-
boolean added = dn.addBlock(delimiter);
-
assert added : "Delimiting block cannot be present in the node";
-
int headIndex = 0;
-
int curIndex;
-
-
if (newReport == null)
-
newReport = new BlockListAsLongs();
-
-
BlockReportIterator itBR = newReport.getBlockReportIterator();
-
while(itBR.hasNext()) {
-
Block iblk = itBR.next();
-
ReplicaState iState = itBR.getCurrentReplicaState();
-
BlockInfo storedBlock = processReportedBlock(dn, iblk, iState,
-
toAdd, toInvalidate, toCorrupt, toUC);
-
-
if (storedBlock != null && (curIndex = storedBlock.findDatanode(dn)) >= 0) {
-
headIndex = dn.moveBlockToHead(storedBlock, curIndex, headIndex);
-
}
-
}
-
-
-
-
-
Iterator extends Block> it = new DatanodeDescriptor.BlockIterator(
-
delimiter.getNext(0), dn);
-
while(it.hasNext())
-
toRemove.add(it.next());
-
dn.removeBlock(delimiter);
-
}
4.2增量报告分析
相比于全量块报告方式,增量报告报告DN节点很短时间内已经接收完成,或者正在接受或者删除的块,而且为了提高文件上传的效率, DN节点应该尽快将接受到的块报告给NameNode,现在引入了RECEIVING_BLOCK这个一个块状态,有可能就是为了提高写入速度。

增量块报告流程图
对于增量块报告过程,已经收到的块报告与全量块报告类似,不同的主要就是增量方式块的数量相对比较少,增量块报告处理速度也会直接影响到文件上传的速度。同时也存在数据块已经被DN报告上来了,但是从节点还没有消化到相应的日志,所以依然会涉及到数据集合pendingDNMessages相应的处理。
正在接收的块与已经接收完的块,除了在数据块状态不一样外,其他基本相同,其接收块代码调用流程如下:NameNodeRpcServer.blockReceivedAndDeleted()->BlockManager.processIncrementalBlockReport()->BlockManager. addBlock()->BlockManager.processAndHandleReportedBlock()->BlockManager.processReportedBlock(),在方法processReportedBlock中,首先判断报告的块是否元数据已经从主节点读取到,如果没有加入消息列表
-
-
-
if (shouldPostponeBlocksFromFuture &&
-
namesystem.isGenStampInFuture(block.getGenerationStamp())) {
-
queueReportedBlock(dn, block, reportedState,
-
QUEUE_REASON_FUTURE_GENSTAMP);
-
return null;
-
}
然后从blocksMap中查询到数据块对于文件inode,判断文件是否存在;如果判断块属于损害块,冗余分数是否不够等情况,如果块一切正常,且状态为完成,将将其加入blocksMap等集合列表。具体代码如下:
-
-
BlockToMarkCorrupt c = checkReplicaCorrupt(
-
block, reportedState, storedBlock, ucState, dn);
-
if (c != null) {
-
if (shouldPostponeBlocksFromFuture) {
-
-
-
-
queueReportedBlock(dn, storedBlock, reportedState,
-
QUEUE_REASON_CORRUPT_STATE);
-
} else {
-
-
toCorrupt.add(c);
-
}
-
return storedBlock;
-
}
-
-
if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
-
toUC.add(new StatefulBlockInfo(
-
(BlockInfoUnderConstruction)storedBlock, reportedState));
-
return storedBlock;
-
}
-
-
-
-
if (reportedState == ReplicaState.FINALIZED
-
&& storedBlock.findDatanode(dn) < 0) {
-
toAdd.add(storedBlock);
-
}
-
return storedBlock;
-