您的位置:首页 > 其它

第三章 第六节 数据流

2016-04-08 14:16 369 查看
读取文件解析

为了了解客户端与HDFS、namenode、datanode交互的数据流,仔细查看图3-2,

它展示了读取文件时的主要事件序列。



客户端通过调用FileSystem对象的open()方法来打开它想读取的文件,对于HDFS来说,它就是

一个DisributedFileSystem实例(图3-2的第一步)。DisributedFileSystem通过RPC调用namenode,

来探测文件开头几个block的位置(第二步)。对于每一个block,namenode返回所有有这个block拷贝

的datanode地址。同时,这些datanode根据它与客户端的亲近度排序(根据集群的网络拓扑结构;见

70页的“网络拓扑与HADOOP”)。如果客户端本身就是一个datanode(例如,MapReduce任务),

如果这个datanode保存了这个block的一个副本,客户端会从本地datanode读取数据。

DisributedFileSystem返回一个FSDataInputStream(支持随机读取)给客户端来读取数据。

FSDataInputStream包装了DFSInputStream,它管理datanode和namenode的I/0。

然后客户端调用stream的read()方法。DFSInputStream保存了开始几个block的datanode地址,

连接到最近的一个datanode。数据以流的形式从datanode返回到客户端,反复调用stream的read()

方法(第四步)。到达block的末尾时,DFSInputStream会关闭到datanode的连接,然后找到下一个

block对应的最近的datanode(第五步)。这些步骤对客户端来说是透明的,在它看来,它只是从一个

持续的流中读取数据。

block是按序读取的,随着客户端的读取,DFSInputStream不断打开新的到datanode的连接。同时它也

会调用namenode来检索下一批block对应的datanode的位置。当客户端读取完毕后,它会调用FSDataInputStream

的close()方法(第六步)。

在读取过程中,如果DFSInputStream与一个datanode交互时产生错误,它会尝试连接下一个最近的datanode

来获取对应的block。同意它把失败的datanode记录下来,在读取接下来的block时不会再尝试连接它们。DFSInputStream

对datanode传输过来的数据做校验和验证。如果发现一个坏的block,DFSInputStream尝试连接另一个datanode中

的block;同时把这个坏的block报告给namenode。

这个设计的一个重要方面是客户端直接连接datanode来检索数据,由namenode指示读取每一个block的最优datanode。

这个设计让HDFS支持客户端庞大的并发量。同时,namenode很少需要去处理block位置请求(它直接保存在内存中,

使它们效率很高)也不需要处理数据(在客户端增长时它会很快成为瓶颈)。

网络拓扑和HADOOP

在本地网络中,对于两个节点它们之间比较"close"是什么意思?在大数据处理这个上下文中,在节点之间传输数据的

速率是一个限制因素----带宽是一个稀缺资源。想法是使用带宽来衡量两个节点距离。

在实际中衡量节点之间的带宽是很困难的(它需要一个静态的集群,同时随着集群中节点的增多,节点对的数据呈指数

增长),HADOOP使用一个简单的方式,这个方式中网络被表示成一棵树,两个节点之间的距离就是它们到最近的共同祖先

的距离和。tree中的level不是预先设置的,但是一般都会使用level来表示data center,rack,node。在下面的场景中可用带宽

逐级递减:

同一个node
同一个rack不同node
同一个data center不同rack
不同data center

例如,假设data center d1中的rack r1上的node n1。这可以表示成/d1/r1/n1。使用这个记号法,这上上面四种场景的距离:

distance(/d1/r1/n1, /d1/r1/n1) = 0 (processes on the same node)
distance(/d1/r1/n1, /d1/r1/n2) = 2 (different nodes on the same rack)
distance(/d1/r1/n1, /d1/r2/n3) = 4 (nodes on different racks in the same data center)
distance(/d1/r1/n1, /d2/r3/n4) = 6 (nodes in different data centers)

如图3-3所示(擅长数据的读者可能会注意到这是一个距离标准的例子)。



最后,需要认识到HADOOP不可能魔法般的发现你的网络拓扑;它需要一些帮助(我们会在

286页“网络拓扑”中讲到如何配置拓扑)。默认的,它假设网络是平的----单层结构----或者说,所有

node在同一个datanode中的同一个rack上。对于小的集群来说 ,实际上也许就是这个样子,不

需要额外的配置。

写文件解析

接下来我们看文件如何写到HDFS。尽管太琐碎,了解数据流还是有益的,因为它阐明了HDFS

的一致性模型。

我们考虑一下创建一个新文件的情形,把数据写入,然后关闭文件。如图3-4所示:



客户端通过调用DistributedFileSystem的create()方法创建文件(第一步)。DistributedFileSystem

通过RPC调用namenode在文件系统的namespace中创建一个没有block关联的新文件(第二步)。

namenode执行各种验证也确保这个文件不存在及客户端有权限创建文件。如果这些验证通过,namenode

做一个这个新文件的记录;否则,文件创建失败,客户端抛出一个IOException。DistributedFileSystem返回

一个FSDataOutputStream给客户端来写数据。顾名思义,FSDataOutputStream包装了一个DFSOutputStream,

它控制datanode与namenode之间的通信。

随着客户端写数据(第三步),DFSOutputStream把它拆分成一个个数据包,然后写到一个内部的

叫data queue的队列中。DataStremer使用data queue,它负责向namenode申请新的block来存储副本,

namenode查找到一些合适的datanode。这些datanode形成了一个pipeline,这里我们假设复制因子为3,所以

pipeline有3个节点。DataStreamer把数据包传到pipeline的第一个datanode,datanode存储数据包后把它

传递给pipeline中的第二个datanode。同样,第二个datanode存储了数据包后把它传递给pipeline中的第三个

datanode(第四步)。

DFSOutputStream也维护了一个要被datanode接收的数据包的队列,叫做ack queue。只有当数据包被

pipeline中的所有datanode都接收后,这个数据包才能从ack queue中移除(第五步)。

当写数据datanode发生故障时,将执行以下操作(对客户端是透明的)。第一,关闭pipeline,ack queue中

的所有数据包会加到data queue中的前部,所以发生故障的node的下游datanode不会丢失任何数据包。在正常

datanode上的当前block会有一个新的标记,它与namenode通信,当有故障的datanode恢复后,会删除写的不

完整的block。发生故障的datanode会被从pipeline移出,一个新的pipeline会被构造出来。block剩下的数据会

写到pipeline中的datanode。namenode注意到这个block under-replicated,它会安排其它节点上的副本。剩下

的block和平时一样处理。

当一个block被写出的时候有多个datanode发生故障是有可能的,但可能性不大。如果设置了

dfs.namenode.replication.min(默认为1),这个设置会生效,这个block会在集群中异步复制直到它达到

了复制因子的目标(dfs.replication,默认是3)。

当客户端数据写完之后,它会调用stream的close()方法(第六步)。这个操作会把缓存中的数据包写到

datanode pipeline并在确认之后通知namenode去标记文件处理完成(第七步)。namenode已经知道了这个文件

由哪些block组成的(因为DataStreamer向它请求block的空间分配),所以在返回成功之前它只需要等待这些

block最低程序的拷贝。

副本的放置

namenode是如何选择datanode来存储副本的?这要在稳定性、写带宽及读带宽之间做平衡。例如,把所有

副本放到一个node上写带宽占用最低(因为pipeline在同一个节点上运行),但是这样会导致没有真正的冗余(如

果这个node故障,block中的数据就丢失了)。同时,off-rack读占用带宽要多。另一个极端情况,把所有副本放

在不同的data center可以最大化冗余,但是会消耗更多的带宽。即使在同一个data center(目前所有的HADOOP

集群的运行方式),仍然有各种放置策略。

HADOOP的默认策略是把第一个副本放在与客户端同一个node上(对于客户端在集群外运行的,则随机选择

一个node,系统会尽量不选择太满或太忙的node)。第二个副本放置在与第一个不同的rack上(off-rack),选择

一个随机的。第三个副本放置在与第二个相同的rack上的不同node上。其它副本随机放置在集群上,系统会尽量

避免把太多副本放在同一个rack上。

一旦副本位置选择完毕,pipeline就被创建出来,考虑到网络拓扑。对于复制因子是3的,pipeline可能如图

3-5所示:



总的来说,这个策略在可靠性、写带宽及集群内block的分布做了很好的平衡。

一致性模型

文件系统的一致性模型描述了文件读写的可见性。HDFS利用了一些POSIX来提高效率,所以一些操作可能

表现的与你期望的不一样。

创建一个文件之后,它就可以在文件系统命名空间看到了,如下:

Path p = new Path("p");
fs.create(p);
assertThat(fs.exists(p), is(true));
然而,写到文件的内容并不保证是可见的,即使stream已经flush。所以,文件的长度是0:

Path p = new Path("p");
OutputStream out = fs.create(p);
out.write("content".getBytes("UTF-8"));
out.flush();
assertThat(fs.getFileStatus(p).getLen(), is(0L));
一旦超过一个block的数据被写出,第一个block就可以看到了。对于接下来的block来说也是这样:通常被写的

block是不可见的。

HDFS提供了强制所有缓存都flush到datanode的方法,通过FSDataOutputStream.的hflush()方法。hflush成功

返回后,HDFS保证这时写到文件的数据已经到达所有的pipeline中的datanode并且对其它读者可见:

Path p = new Path("p");
FSDataOutputStream out = fs.create(p);
out.write("content".getBytes("UTF-8"));
out.hflush();
assertThat(fs.getFileStatus(p).getLen(), is(((long) "content".length())));
注意hflush()不保证datanode已经把数据写到硬盘,只保证它在datanode的内存中(所以如果data center断电,数据可能

会丢失)。如果要这个保证,可以使用hsync()代替。

hsync()的行为和系统调用POSIX的fsync()类似,都是提交缓存数据。例如,使用标准JAVA API写一个本地文件,我们

保证在stream执行flush及sync之后可以看到它的内容:

FileOutputStream out = new FileOutputStream(localFile);
out.write("content".getBytes("UTF-8"));
out.flush(); // flush to operating system
out.getFD().sync(); // sync to disk
assertThat(localFile.length(), is(((long) "content".length())));
关闭HDFS的一个文件也会默认执行hflush():

Path p = new Path("p");
OutputStream out = fs.create(p);
out.write("content".getBytes("UTF-8"));
out.close();
assertThat(fs.getFileStatus(p).getLen(), is(((long) "content".length())));


对应用设计的影响

这个一致性模型对你应用设计的方式有影响。不调用hflush()或hsync(),你需要准备在客户端或系统异常时选择数据。对于

很多应用来说这个是不能接受的,所以你应该在合适的时间点调用hflush(),例如在写完一定量的记录或是字节。尽管设计的

hflush()操作不是可以经常用的,它确实会有一些消耗(hsync()更多),所以在数据稳定性与吞吐量之间有一个平衡。构成

可接受的代价是应用可依赖及不同频率调用hflush()(或hsync())测试应用性能时选择一个合适的值。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: