您的位置:首页 > 其它

DFSClient技术内幕 (DFSClient介绍以及其初始化)

2014-02-15 11:51 274 查看
以下是本人研究源代码成果, 此文僅献给我和我的小伙伴们,不足之处,欢迎斧正-------------------------------------------------致谢道格等人!

注:hadoop版本0.20.2,有童鞋表示看代码头晕,所以本文采用纯文字描述,哥还特意为你们把字体调调颜色噢 ^ o ^
大家都知道,hadoop是最优秀的大数据处理框架之一,而本文研究的DFSClient是 hadoop内部实现中,较为核心的部分
DFSClient,顾名思义,分布式文件系统的客户端
hadoop的客户端包括:shell命令,java接口,pig 等。。
DFSClient在分布式文件系统中扮演的角色:
现实场景如下:我的数据文件分散存储在集群中的很多台机器上,现在我需要获取它,以使用shell命令操作为例
我输入获取数据的指令后,DFSClient通过RPC机制和NameNode(集群中的主机器)通信并获得文件的元数据信息(数据的存放位置,校验和,等等一些关键的信息),然后DFSClient再与DataNode通信(集群中的其它机器)通过数据I/O流来获取到我要的文件信息,
它在分布式文件系统内部实现中起到双方通信的作用,是用户与文件系统通信的桥梁,由此可见,掌握它,驾驭它,显得尤为迫切!

首先一起讨论下关于DFSClient的体系结构:

它位于org.apache.hadoop.hdfs包下:体系图如下

DFSClient

|-------LeaseChecker implements Runnable

|-------DNAddrPair

|-------BlockReader extends FSInputChecker ( FSInputChecker extends FSInputStream )
|-------DFSInputStream

|-------DFSDataInputStream extends DataInputStream implements Seekable(支持流中随机存储), PositionedReadable(定位读取)

|-------DFSOutputStream extends FSOutputSummer implements Syncable (FSOutputSummer extends OutputStream)

|--------Packet
|--------DataStreamer extends Daemon (extends Thread)

|--------ResponseProcessor extends Thread

LeaseChecker介绍:实现了Runnable接口

在HDFS中可能有多个客户端在同一时刻进行文件的写入操作,有时会出现多个客户端并发的写入一个文件的情况,所以采取一些措施来控制并发写入情况的发送,一般情况下会采用互斥锁的方法来进行控制,使得每一时刻只有一个获得锁的客户端才能执行,写入操作。但是互斥锁的机制在分布式系统中会有很多问题

问题一:每次执行写入时,客户端都需要向NameNode申请互斥锁,从而造成网络开销的增大

问题二:当某个客户端获得锁之后和NameNode失去了联系,此时会造成互斥锁无法释放,使得其他的客户端的操作会被终止

解决方案:HDFS使用Lease租约来解决互斥锁的问题

过程:当DFSClient需要对一个文件执行写入操作时,他首先需要向NameNode申请一个租约(有时间限制),在时间期限内客户端可以 对租约所管理的文件执行写入。一个文件只能被一个租约锁管理,所以只能有一个客户端对文件执行写入操作,在租约的有效时间 内,DFSClient客户端会一直持有写文件的权限,而不需要再向NameNode询问是否有写文件的权限。当客户端一直工作时,它会在 租约过期后向NameNode申请续约,入股在租约的有效期间内,客户端发生了异常,和NameNode失去了联系,当租约期满后,
NameNode会发现发生异常的客户端,此时NameNode会将新的租约赋给其它正常的客户端,当发生异常的客户端已经写入了一部 分数据时,HDFS为了分辨出这些无用的数据,会在客户端每次写入数据时增加版本号信息,异常的客户端的写入的数据的版本号 会很低,从而可以被安全删除掉。

LeaseChecker作用: 在DFSClient中有个LeaseChecker线程,该线程会周期性的检查租约是否过期,在租约快过期的时候会对租约进行续约,此外,在namenode包中有个LeaseManager租约管理器,该管理器会不断的检查它所管理的lease是否过期,如果lease已经过期,会将其删除

DNAddrPair介绍:封装了定位到的DataNode信息和DataNode所对应的IP信息

FSInputChecker 介绍:抽象类FSInputChecker继承自FSInputStream,加入了HDFS所需要的校验功能,hadoop会生成与原生文件所对应的校验和文件,并在读写文件的时候对文件进行校验,以确保数据的准确性

BlockReader介绍:BlockReader 继承自 FSInputChecker 继承自 FSInputStream,校验功能是在readChecksumChunk方法中实现,而readChecksumChunk私有方法是被read1私有方法内部调用,而且所有的read方法的都是通过间接地调用read1方法来实现对数据进行读取并做校验和验证的

DFSInputStream介绍:继承自FSInputStream,该类会创建到DataNode的Socket连接,然后使用Socket来读取DataNode上的数据信息

DFSDataInputStream介绍:继承自DataInputStream,DFSDataInputStream的功能都依靠包装的DFSInputStream来完成

DFSOutputStream介绍:继承自DFSOutputStream

Packet介绍:数据包,DFSOutputStream的内部类,DFSClient是通过一个个Packet来向DataNode写入数据的,一个Packet由多个数据chunk组成,每个chunk对应着一个校验和,当写入足够的chunk之后,packet会被添加到dataQueue中

DataStreamer 介绍: DataStreamer是真正写入数据的进程,在发送Packet之前,它会首先从Namenode中获得一个blockid和Block的位置信息,然后它会循环地从dataQueue中取得一个Packet,然后将该Packet真正写入到与DataNode所建立的socket中, 当将属于一个Block的所有Packet都发送给DataNode,并且返回了与每个Packet所对应的响应信息之后,DataStream
会关闭当前的数据Block

ResponseProcessor 介绍:响应处理器ResponseProcessor

至此,DFSClient讨论完毕

-------------------------------------------------------------------------------------------------------------------------------------------------------------------------

DFSClient构造器群:



public DFSClient(Configuration conf) throws IOException {
this(NameNode.getAddress(conf), conf);
} public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf
) throws IOException {
this(nameNodeAddr, conf, null);
}

public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf,
FileSystem.Statistics stats)
throws IOException {
this(nameNodeAddr, null, conf, stats);
}

DFSClient为用户提供了简单,一致的标准访问接口下面,但其内部实现较为复杂,本人陪同大家一起一起去探索这个神奇的国度

我们需要构建一个DFSClient对象:

DFSClient提供了4种形式的构造器,构造方法的主要任务有两个:

a,读入配置项并初始化一些成员变量

b,建立和名字节点的IPC连接

详细过程分析:a过程被初始化对象如下:

1,配置对象configuration,

2,收集文件系统统计信息的对象,

3,socket连接的过期时间,

4,写入的数据包的大小

5,通过socket向dataNode写入数据的超期时间

6,创建socket连接的工厂类

7, 用户组信息

8, 最大块获取失败次数

9,客户端的名称(如果该任务是Map-reduces任务,则使用任务ID作为客户端名称)

9, 默认的块大小(64M),默认的块副本数

构造器代码如下:

/**
* Create a new DFSClient connected to the given nameNodeAddr or rpcNamenode.
* Exactly one of nameNodeAddr or rpcNamenode must be null.
*/
DFSClient(InetSocketAddress nameNodeAddr, ClientProtocol rpcNamenode,
Configuration conf, FileSystem.Statistics stats)
throws IOException {
this.conf = conf;
this.stats = stats;
this.socketTimeout = conf.getInt("dfs.socket.timeout",
HdfsConstants.READ_TIMEOUT);
this.datanodeWriteTimeout = conf.getInt("dfs.datanode.socket.write.timeout",
HdfsConstants.WRITE_TIMEOUT);
this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
// dfs.write.packet.size is an internal config variable
this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024);
this.maxBlockAcquireFailures = getMaxBlockAcquireFailures(conf);

try {
this.ugi = UnixUserGroupInformation.login(conf, true);
} catch (LoginException e) {
throw (IOException)(new IOException().initCause(e));
}

String taskId = conf.get("mapred.task.id");
if (taskId != null) {
//如果是MapReduce任务,则客户端名称为任务Id号,否则取随机号
this.clientName = "DFSClient_" + taskId;
} else {
this.clientName = "DFSClient_" + r.nextInt();
}
defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
defaultReplication = (short) conf.getInt("dfs.replication", 3);

if (nameNodeAddr != null && rpcNamenode == null) {
this.rpcNamenode = createRPCNamenode(nameNodeAddr, conf, ugi);

//非常关键的一步

通过RetryProxy的create方法来创建NameNode的RPC客户端ClientProtocol
this.namenode = createNamenode(this.rpcNamenode);
} else if (nameNodeAddr == null && rpcNamenode != null) {
//This case is used for testing.
this.namenode = this.rpcNamenode = rpcNamenode;
} else {
throw new IllegalArgumentException(
"Expecting exactly one of nameNodeAddr and rpcNamenode being null: "
+ "nameNodeAddr=" + nameNodeAddr + ", rpcNamenode=" + rpcNamenode);
}
}

b过程 :

通过调用私有方法createNamenode建立与名字节点的连接,方法内部通过RetryProxy的create方法来创建NameNode的RPC客户端ClientProtocol

至此,DFSClient初始化完成

注:对于文件系统,本文的讨论中一直区分两种情况,namenode的远程方法不在本文讨论范围内

a,文件和目录相关事务(都使用远程接口客户端namenode,调用其同名远程方法)

b,数据块读写

读写数据过程请参考http://user.qzone.qq.com/578333569/infocenter#!app=2&via=QZ.HashRefresh&pos=1383552016
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: