Hadoop源码分析——数据节点数据发送
2016-06-22 15:11
471 查看
客户端读数据
下图显示了在读取HDFS上的文件时,客户端、名字节点和数据节点间发生的一些事件以及事件的顺序。
客户端通过FileSystem.open()打开文件,对应的HDFS具体文件系统,DistributedFileSystem创建输出流FSDataInputStream,返回给客户端,客户端使用这个输入流读取数据。FSDataInputStream需要和具体的输入流结合,一起形成过滤器流(filtered stream)向外提供服务。
对HDFS来说,具体的输入流是DFSInputStream。在DFSInputStream的构造函数中,输出流实例通过ClientProtocol.getBlockLocations()远程接口调用名字节点,以确定文件开始部分数据块的保存位置,即上图的步骤2.对于文件中的每个块,名字节点返回保存着该块副本的数据节点地址。注意,这些数据节点根据它们与客户端的距离(利用了网络的拓扑信息),进行了简单的排序。
客户端调用FSDataInputStream.read()方法读取文件数据时,DFSInputStream对象会通过和数据节点间的“读数据”流接口,和最近的数据节点建立联系。客户端反复调用read()方法,数据会通过数据节点和客户端连接上的数据包返回客户端。当到达块的末端时,DFSInputStream会关闭和数据节点间的连接,并通过getBlockLocations()远程方法获得保存着下一个数据块的数据节点信息(严格说,在对象没有缓存该数据块的位置时,才会使用这个远程方法),即上图步骤5,然后继续寻找最佳数据节点,再次通过数据节点的读数据接口,获得数据,即上图的步骤6.
数据发送
读请求的应答包括应答头和多个应答数据包,主要实现在org.apache.hadoop.hdfs.server.datanode.BlockSender.java的sendBlock()方法和sendChunks方法中。
读应答的应答头包括数据校验的类型、校验块大小和一个可选的偏移量,与校验相关的信息已经在数据块发送器的构造函数中,从校验信息文件中获得,现在只需要将这些信息发送到客户端即可。应答头中偏移量是一个可选的参数,BlockSender不但被用于支持客户端读数据,也用于数据块复制中。数据块复制,由于是对整个数据块进行的操作,也就不需要提供偏移量。对于一般的客户端读取数据,偏移量是个必选参数。
BlockSender.sendBlock()的第一部分代码,就是用于发送上述应答头。
缓冲区大小pktSize的初值是DataNode.PKT_HEADER_LEN(包长度+偏移量+顺序号+最后应答标识)和SIZE_OF_INTEGER(length字段大小)的和。
sendBlock()支持两种数据发送方式:“零拷贝”和普通发送,我们以普通发送展开分析。
普通方式发送是
BUFFER_SIZE是系统的缓冲区大小配置,由配置项${io.file.buffer.size}指定,默认值是4096,即4KB字节
bytesPerChecksum,表示每bytesPerChecksum字节进行校验,默认值是512
checksumSize,是CRC32校验和大小,默认是4字节。
上述计算公式表示如果缓冲区的大小不是校验块的起始,需要多读取数据,供客户端进行校验。
比如说:BUFFER_SIZE是4096,是校验块的整数倍(512)
maxChunksPerPacket = (4098 + 512 - 1 ) / 512 = 9
pktSize = 25(pktSize) + ( 512 + 4 ) * 9 = 4669
“`
表示分配工作缓冲区大小为4669个字节,数据包可以发送9个校验块。
接下来,sendBlock()循环调用sendChunks()方法发送应答数据包。BlockSender.sendBlock()在所有应答数据包发送完毕以后,往客户端的输出流中写入0,以结束一次读操作。
BlockSender.sendChunks比较简单,实现分为两部分:计算并将应答包的头部,即上述包长度、偏移量、顺序号、最后应答包标识、数据长度五个字段写入缓冲区,并获取校验数据;接下来,sendChunks()将根据情况处理数据块数据并发送缓冲区。
下图显示了在读取HDFS上的文件时,客户端、名字节点和数据节点间发生的一些事件以及事件的顺序。
客户端通过FileSystem.open()打开文件,对应的HDFS具体文件系统,DistributedFileSystem创建输出流FSDataInputStream,返回给客户端,客户端使用这个输入流读取数据。FSDataInputStream需要和具体的输入流结合,一起形成过滤器流(filtered stream)向外提供服务。
对HDFS来说,具体的输入流是DFSInputStream。在DFSInputStream的构造函数中,输出流实例通过ClientProtocol.getBlockLocations()远程接口调用名字节点,以确定文件开始部分数据块的保存位置,即上图的步骤2.对于文件中的每个块,名字节点返回保存着该块副本的数据节点地址。注意,这些数据节点根据它们与客户端的距离(利用了网络的拓扑信息),进行了简单的排序。
客户端调用FSDataInputStream.read()方法读取文件数据时,DFSInputStream对象会通过和数据节点间的“读数据”流接口,和最近的数据节点建立联系。客户端反复调用read()方法,数据会通过数据节点和客户端连接上的数据包返回客户端。当到达块的末端时,DFSInputStream会关闭和数据节点间的连接,并通过getBlockLocations()远程方法获得保存着下一个数据块的数据节点信息(严格说,在对象没有缓存该数据块的位置时,才会使用这个远程方法),即上图步骤5,然后继续寻找最佳数据节点,再次通过数据节点的读数据接口,获得数据,即上图的步骤6.
数据发送
读请求的应答包括应答头和多个应答数据包,主要实现在org.apache.hadoop.hdfs.server.datanode.BlockSender.java的sendBlock()方法和sendChunks方法中。
读应答的应答头包括数据校验的类型、校验块大小和一个可选的偏移量,与校验相关的信息已经在数据块发送器的构造函数中,从校验信息文件中获得,现在只需要将这些信息发送到客户端即可。应答头中偏移量是一个可选的参数,BlockSender不但被用于支持客户端读数据,也用于数据块复制中。数据块复制,由于是对整个数据块进行的操作,也就不需要提供偏移量。对于一般的客户端读取数据,偏移量是个必选参数。
BlockSender.sendBlock()的第一部分代码,就是用于发送上述应答头。
try { checksum.writeHeader(out); if ( chunkOffsetOK ) { out.writeLong( offset ); } out.flush(); } catch (IOException e) { //socket error throw ioeToSocketException(e); }
BlockSender.sendBlock()的第二部分代码,是根据系统的缓冲区大小配置,计算一次能够发送多少校验块的数据,并分配工作缓冲区。 数据节点读应答的数据包包头包括: 1、packageLen(包长度):从数据长度字段开始的包长度,包括数据长度字段、校验数据和数据块数据的长度。(4字节) 2、offset(偏移量):应答数据位于数据块中的起始位置。(8字节) 3、seqno(顺序号):该数据包在应答中的顺序号。(8字节) 4、tail(最后应答包标识):该数据包是否是应答的最后一个应答包。(1字节) 5、length(数据长度):包中包含的实际数据长度。
缓冲区大小pktSize的初值是DataNode.PKT_HEADER_LEN(包长度+偏移量+顺序号+最后应答标识)和SIZE_OF_INTEGER(length字段大小)的和。
int pktSize = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER;
sendBlock()支持两种数据发送方式:“零拷贝”和普通发送,我们以普通发送展开分析。
if (transferToAllowed && !verifyChecksum && baseStream instanceof SocketOutputStream && blockIn instanceof FileInputStream) { FileChannel fileChannel = ((FileInputStream)blockIn).getChannel(); // blockInPosition also indicates sendChunks() uses transferTo. blockInPosition = fileChannel.position(); streamForSendChunks = baseStream; // assure a mininum buffer size. maxChunksPerPacket = (Math.max(BUFFER_SIZE, MIN_BUFFER_WITH_TRANSFERTO) + bytesPerChecksum - 1)/bytesPerChecksum; // packet buffer has to be able to do a normal transfer in the case // of recomputing checksum pktSize += (bytesPerChecksum + checksumSize) * maxChunksPerPacket; } else { maxChunksPerPacket = Math.max(1, (BUFFER_SIZE + bytesPerChecksum - 1)/bytesPerChecksum); pktSize += (bytesPerChecksum + checksumSize) * maxChunksPerPacket; } ByteBuffer pktBuf = ByteBuffer.allocate(pktSize);
普通方式发送是
maxChunksPerPacket = Math.max(1, (BUFFER_SIZE + bytesPerChecksum - 1)/bytesPerChecksum); pktSize += (bytesPerChecksum + checksumSize) * maxChunksPerPacket;
BUFFER_SIZE是系统的缓冲区大小配置,由配置项${io.file.buffer.size}指定,默认值是4096,即4KB字节
bytesPerChecksum,表示每bytesPerChecksum字节进行校验,默认值是512
checksumSize,是CRC32校验和大小,默认是4字节。
上述计算公式表示如果缓冲区的大小不是校验块的起始,需要多读取数据,供客户端进行校验。
比如说:BUFFER_SIZE是4096,是校验块的整数倍(512)
maxChunksPerPacket = (4096 + 512 - 1 ) / 512 = 8 pktSize = 25(pktSize) + ( 512 + 4 ) * 8 = 4153 ``` 表示分配工作缓冲区大小为4153个字节,数据包可以发送8个校验块。 再比如说:BUFFER_SIZE是4098,不是校验块的整数倍(512)
maxChunksPerPacket = (4098 + 512 - 1 ) / 512 = 9
pktSize = 25(pktSize) + ( 512 + 4 ) * 9 = 4669
“`
表示分配工作缓冲区大小为4669个字节,数据包可以发送9个校验块。
接下来,sendBlock()循环调用sendChunks()方法发送应答数据包。BlockSender.sendBlock()在所有应答数据包发送完毕以后,往客户端的输出流中写入0,以结束一次读操作。
while (endOffset > offset) { manageOsCache(); long len = sendChunks(pktBuf, maxChunksPerPacket, streamForSendChunks); offset += len; totalRead += len + ((len + bytesPerChecksum - 1)/bytesPerChecksum* checksumSize); seqno++; } try { out.writeInt(0); // mark the end of block out.flush(); } catch (IOException e) { //socket error throw ioeToSocketException(e); }
BlockSender.sendChunks比较简单,实现分为两部分:计算并将应答包的头部,即上述包长度、偏移量、顺序号、最后应答包标识、数据长度五个字段写入缓冲区,并获取校验数据;接下来,sendChunks()将根据情况处理数据块数据并发送缓冲区。
// Sends multiple chunks in one packet with a single write(). int len = (int) Math.min(endOffset - offset, (((long) bytesPerChecksum) * ((long) maxChunks))); // truncate len so that any partial chunks will be sent as a final packet. // this is not necessary for correctness, but partial chunks are // ones that may be recomputed and sent via buffer copy, so try to minimize // those bytes if (len > bytesPerChecksum && len % bytesPerChecksum != 0) { len -= len % bytesPerChecksum; } if (len == 0) { return 0; } int numChunks = (len + bytesPerChecksum - 1)/bytesPerChecksum; int packetLen = len + numChunks*checksumSize + 4; pkt.clear(); // write packet header pkt.putInt(packetLen); pkt.putLong(offset); pkt.putLong(seqno); pkt.put((byte)((offset + len >= endOffset) ? 1 : 0)); //why no ByteBuf.putBoolean()? pkt.putInt(len); int checksumOff = pkt.position(); int checksumLen = numChunks * checksumSize; byte[] buf = pkt.array(); if (checksumSize > 0 && checksumIn != null) { try { checksumIn.readFully(buf, checksumOff, checksumLen); } catch (IOException e) { LOG.warn(" Could not read or failed to veirfy checksum for data" + " at offset " + offset + " for block " + block + " got : " + StringUtils.stringifyException(e)); IOUtils.closeStream(checksumIn); checksumIn = null; if (corruptChecksumOk) { if (checksumOff < checksumLen) { // Just fill the array with zeros. Arrays.fill(buf, checksumOff, checksumLen, (byte) 0); } } else { throw e; } } } int dataOff = checksumOff + checksumLen; if (blockInPosition < 0) { //normal transfer IOUtils.readFully(blockIn, buf, dataOff, len); if (verifyChecksum) { int dOff = dataOff; int cOff = checksumOff; int dLeft = len; for (int i=0; i<numChunks; i++) { checksum.reset(); int dLen = Math.min(dLeft, bytesPerChecksum); checksum.update(buf, dOff, dLen); if (!checksum.compare(buf, cOff)) { throw new ChecksumException("Checksum failed at " + (offset + len - dLeft), len); } dLeft -= dLen; dOff += dLen; cOff += checksumSize; } } // only recompute checksum if we can't trust the meta data due to // concurrent writes if (memoizedBlock.hasBlockChanged(len)) { ChecksumUtil.updateChunkChecksum( buf, checksumOff, dataOff, len, checksum ); } try { out.write(buf, 0, dataOff + len); } catch (IOException e) { throw ioeToSocketException(e); } } else { try { //use transferTo(). Checks on out and blockIn are already done. SocketOutputStream sockOut = (SocketOutputStream) out; FileChannel fileChannel = ((FileInputStream) blockIn).getChannel(); if (memoizedBlock.hasBlockChanged(len)) { fileChannel.position(blockInPosition); IOUtils.readFileChannelFully( fileChannel, buf, dataOff, len ); ChecksumUtil.updateChunkChecksum( buf, checksumOff, dataOff, len, checksum ); sockOut.write(buf, 0, dataOff + len); } else { //first write the packet sockOut.write(buf, 0, dataOff); // no need to flush. since we know out is not a buffered stream. sockOut.transferToFully(fileChannel, blockInPosition, len); } blockInPosition += len; } catch (IOException e) { /* exception while writing to the client (well, with transferTo(), * it could also be while reading from the local file). */ throw ioeToSocketException(e); } } if (throttler != null) { // rebalancing so throttle throttler.throttle(packetLen); } return len; }
相关文章推荐
- 详解HDFS Short Circuit Local Reads
- Hadoop_2.1.0 MapReduce序列图
- 使用Hadoop搭建现代电信企业架构
- 单机版搭建Hadoop环境图文教程详解
- hadoop常见错误以及处理方法详解
- hadoop 单机安装配置教程
- hadoop的hdfs文件操作实现上传文件到hdfs
- hadoop实现grep示例分享
- Apache Hadoop版本详解
- linux下搭建hadoop环境步骤分享
- hadoop client与datanode的通信协议分析
- hadoop中一些常用的命令介绍
- Hadoop单机版和全分布式(集群)安装
- 用PHP和Shell写Hadoop的MapReduce程序
- hadoop map-reduce中的文件并发操作
- Hadoop1.2中配置伪分布式的实例
- hadoop上传文件功能实例代码
- java结合HADOOP集群文件上传下载
- Hadoop 2.x伪分布式环境搭建详细步骤
- Java访问Hadoop分布式文件系统HDFS的配置说明