您的位置:首页 > Web前端 > Node.js

[hadoop源码阅读][8]-datanode-datanode

2012-07-14 23:08 525 查看
datanode的定义

public class DataNode extends Configured implements InterDatanodeProtocol, ClientDatanodeProtocol, FSConstants, Runnable, DataNodeMXBean


DataNode实现了两个通信接口,其中ClientDatanodeProtocol是用于和Client交互的,InterDatanodeProtocol,是DataNode间的通信接口。ipcServer是DataNode的一个成员变量,它启动了一个IPC服务,这样,DataNode就能提供ClientDatanodeProtocol和InterDatanodeProtocol的能力了。

datanode的启动

从main函数开始,一路往下,实际干活的是在函数startDataNode里.

datanode的运行

datanode的线程循环函数run,不过干活的主要是offerService

offerService这个方法是如何执行的:


1.检查心跳间隔是否超时,如是向namenode发送心跳报告,内容是dfs的容量、剩余的空间和DataXceiverServer的数量等,调用processCommand方法处理namenode返回的命令
2.通知namenode已经接收的块
3.检查块报告间隔是否超时,如是向namenode发送块报告,调用processCommand方法处理namenode返回的命令
4.如果没到下个发送心跳的时候,休眠


接下来看processCommand方法是如何处理命令的,这些操作在DatanodeProtocol中定义:

DNA_UNKNOWN = 0:未知操作
DNA_TRANSFER = 1:传输块到另一个datanode,创建DataTransfer来传输每个块,请求的类型是OP_WRITE_BLOCK,使用BlockSender来发送块和元数据文件,不对块进行校验
DNA_INVALIDATE = 2:不合法的块,将所有块删除
DNA_SHUTDOWN = 3:停止datanode,停止infoServer、DataXceiverServer、DataBlockScanner和处理线程,将存储目录解锁,DataBlockScanner结束可能需要等待1小时
DNA_REGISTER = 4:重新注册
DNA_FINALIZE = 5:完成升级,调用DataStorage的finalizeUpgrade方法完成升级
DNA_RECOVERBLOCK = 6:请求块恢复,创建线程来恢复块,每个线程服务一个块,对于每个块,调用recoverBlock来恢复块信息


//  Main loop for the DataNode. Runs until shutdown, forever calling remote NameNode functions.
public void offerService() throws Exception
{
while (shouldRun)
{
try
{
long startTime = now();
if (startTime - lastHeartbeat > heartBeatInterval)//定期发送心跳
{
DatanodeCommand[] cmds = namenode.sendHeartbeat(dnRegistration, data.getCapacity(), data.getDfsUsed(), data.getRemaining(), xmitsInProgress.get(), getXceiverCount());
if (!processCommand(cmds))
continue;
}

blockArray = receivedBlockList.toArray(new Block[numBlocks]);// receivedBlockList表明在这个DataNode成功创建的新的数据块,而delHints,是可以删除该数据块的节点
delHintArray = delHints.toArray(new String[numBlocks]);//在datanode.notifyNamenodeReceivedBlock函数中发生变化
namenode.blockReceived(dnRegistration, blockArray, delHintArray);//Block状态变化报告通过NameNode.blockReceived来报告。

if (startTime - lastBlockReport > blockReportInterval)// 向namenode报告系统中Block状态的变化
{
Block[] bReport = data.getBlockReport();
DatanodeCommand cmd = namenode.blockReport(dnRegistration, BlockListAsLongs.convertToArrayLongs(bReport));
processCommand(cmd);
}

if (blockScanner != null && blockScannerThread == null && upgradeManager.isUpgradeCompleted())// start block scanner
{
blockScannerThread = new Daemon(blockScanner);//启动blockScanner线程 进行block扫描
blockScannerThread.start();
}

// There is no work to do; sleep until hearbeat timer elapses,or work arrives, and then iterate again.
long waitTime = heartBeatInterval - (System.currentTimeMillis() - lastHeartbeat);
synchronized (receivedBlockList)
{
if (waitTime > 0 && receivedBlockList.size() == 0)
{
try
{
receivedBlockList.wait(waitTime);
}
catch (InterruptedException ie)
{
}
delayBeforeBlockReceived();
}
} // synchronized
}
catch (RemoteException re)
{
}
catch (IOException e)
{
}
} // while (shouldRun)
} // offerService


类图





其他的

datanode一路粗略的看完,还有一些问题,我想关注的,留待后面继续,


能影响datanode各种功能的参数,

hdfs的权限控制



参考url

http://blog.jeoygin.org/2012/03/hdfs-source-analysis-datanode-startup-service.html

http://caibinbupt.iteye.com/blog/287870
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: