您的位置:首页 > 运维架构

Hadoop源码分析——数据节点数据发送

2016-06-22 15:11 471 查看
客户端读数据

下图显示了在读取HDFS上的文件时,客户端、名字节点和数据节点间发生的一些事件以及事件的顺序。



客户端通过FileSystem.open()打开文件,对应的HDFS具体文件系统,DistributedFileSystem创建输出流FSDataInputStream,返回给客户端,客户端使用这个输入流读取数据。FSDataInputStream需要和具体的输入流结合,一起形成过滤器流(filtered stream)向外提供服务。

对HDFS来说,具体的输入流是DFSInputStream。在DFSInputStream的构造函数中,输出流实例通过ClientProtocol.getBlockLocations()远程接口调用名字节点,以确定文件开始部分数据块的保存位置,即上图的步骤2.对于文件中的每个块,名字节点返回保存着该块副本的数据节点地址。注意,这些数据节点根据它们与客户端的距离(利用了网络的拓扑信息),进行了简单的排序。

客户端调用FSDataInputStream.read()方法读取文件数据时,DFSInputStream对象会通过和数据节点间的“读数据”流接口,和最近的数据节点建立联系。客户端反复调用read()方法,数据会通过数据节点和客户端连接上的数据包返回客户端。当到达块的末端时,DFSInputStream会关闭和数据节点间的连接,并通过getBlockLocations()远程方法获得保存着下一个数据块的数据节点信息(严格说,在对象没有缓存该数据块的位置时,才会使用这个远程方法),即上图步骤5,然后继续寻找最佳数据节点,再次通过数据节点的读数据接口,获得数据,即上图的步骤6.

数据发送

读请求的应答包括应答头和多个应答数据包,主要实现在org.apache.hadoop.hdfs.server.datanode.BlockSender.java的sendBlock()方法和sendChunks方法中。

读应答的应答头包括数据校验的类型、校验块大小和一个可选的偏移量,与校验相关的信息已经在数据块发送器的构造函数中,从校验信息文件中获得,现在只需要将这些信息发送到客户端即可。应答头中偏移量是一个可选的参数,BlockSender不但被用于支持客户端读数据,也用于数据块复制中。数据块复制,由于是对整个数据块进行的操作,也就不需要提供偏移量。对于一般的客户端读取数据,偏移量是个必选参数。



BlockSender.sendBlock()的第一部分代码,就是用于发送上述应答头。

try {
checksum.writeHeader(out);
if ( chunkOffsetOK ) {
out.writeLong( offset );
}
out.flush();
} catch (IOException e) { //socket error
throw ioeToSocketException(e);
}


BlockSender.sendBlock()的第二部分代码,是根据系统的缓冲区大小配置,计算一次能够发送多少校验块的数据,并分配工作缓冲区。
数据节点读应答的数据包包头包括:
1、packageLen(包长度):从数据长度字段开始的包长度,包括数据长度字段、校验数据和数据块数据的长度。(4字节)
2、offset(偏移量):应答数据位于数据块中的起始位置。(8字节)
3、seqno(顺序号):该数据包在应答中的顺序号。(8字节)
4、tail(最后应答包标识):该数据包是否是应答的最后一个应答包。(1字节)
5、length(数据长度):包中包含的实际数据长度。




缓冲区大小pktSize的初值是DataNode.PKT_HEADER_LEN(包长度+偏移量+顺序号+最后应答标识)和SIZE_OF_INTEGER(length字段大小)的和。

int pktSize = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER;


sendBlock()支持两种数据发送方式:“零拷贝”和普通发送,我们以普通发送展开分析。

if (transferToAllowed && !verifyChecksum &&
baseStream instanceof SocketOutputStream &&
blockIn instanceof FileInputStream) {

FileChannel fileChannel = ((FileInputStream)blockIn).getChannel();

// blockInPosition also indicates sendChunks() uses transferTo.
blockInPosition = fileChannel.position();
streamForSendChunks = baseStream;

// assure a mininum buffer size.
maxChunksPerPacket = (Math.max(BUFFER_SIZE,
MIN_BUFFER_WITH_TRANSFERTO)
+ bytesPerChecksum - 1)/bytesPerChecksum;

// packet buffer has to be able to do a normal transfer in the case
// of recomputing checksum
pktSize += (bytesPerChecksum + checksumSize) * maxChunksPerPacket;
} else {
maxChunksPerPacket = Math.max(1,
(BUFFER_SIZE + bytesPerChecksum - 1)/bytesPerChecksum);
pktSize += (bytesPerChecksum + checksumSize) * maxChunksPerPacket;
}

ByteBuffer pktBuf = ByteBuffer.allocate(pktSize);


普通方式发送是

maxChunksPerPacket = Math.max(1,
(BUFFER_SIZE + bytesPerChecksum - 1)/bytesPerChecksum);
pktSize += (bytesPerChecksum + checksumSize) * maxChunksPerPacket;


BUFFER_SIZE是系统的缓冲区大小配置,由配置项${io.file.buffer.size}指定,默认值是4096,即4KB字节

bytesPerChecksum,表示每bytesPerChecksum字节进行校验,默认值是512

checksumSize,是CRC32校验和大小,默认是4字节。

上述计算公式表示如果缓冲区的大小不是校验块的起始,需要多读取数据,供客户端进行校验。

比如说:BUFFER_SIZE是4096,是校验块的整数倍(512)

maxChunksPerPacket = (4096 + 512 - 1 ) / 512 = 8
pktSize = 25(pktSize)  +  ( 512 + 4 ) * 8 = 4153
```
表示分配工作缓冲区大小为4153个字节,数据包可以发送8个校验块。

再比如说:BUFFER_SIZE是4098,不是校验块的整数倍(512)


maxChunksPerPacket = (4098 + 512 - 1 ) / 512 = 9

pktSize = 25(pktSize) + ( 512 + 4 ) * 9 = 4669

“`

表示分配工作缓冲区大小为4669个字节,数据包可以发送9个校验块。

接下来,sendBlock()循环调用sendChunks()方法发送应答数据包。BlockSender.sendBlock()在所有应答数据包发送完毕以后,往客户端的输出流中写入0,以结束一次读操作。

while (endOffset > offset) {
manageOsCache();
long len = sendChunks(pktBuf, maxChunksPerPacket,
streamForSendChunks);
offset += len;
totalRead += len + ((len + bytesPerChecksum - 1)/bytesPerChecksum*
checksumSize);
seqno++;
}
try {
out.writeInt(0); // mark the end of block
out.flush();
} catch (IOException e) { //socket error
throw ioeToSocketException(e);
}


BlockSender.sendChunks比较简单,实现分为两部分:计算并将应答包的头部,即上述包长度、偏移量、顺序号、最后应答包标识、数据长度五个字段写入缓冲区,并获取校验数据;接下来,sendChunks()将根据情况处理数据块数据并发送缓冲区。

// Sends multiple chunks in one packet with a single write().

int len = (int) Math.min(endOffset - offset,
(((long) bytesPerChecksum) * ((long) maxChunks)));

// truncate len so that any partial chunks will be sent as a final packet.
// this is not necessary for correctness, but partial chunks are
// ones that may be recomputed and sent via buffer copy, so try to minimize
// those bytes
if (len > bytesPerChecksum && len % bytesPerChecksum != 0) {
len -= len % bytesPerChecksum;
}

if (len == 0) {
return 0;
}

int numChunks = (len + bytesPerChecksum - 1)/bytesPerChecksum;
int packetLen = len + numChunks*checksumSize + 4;
pkt.clear();

// write packet header
pkt.putInt(packetLen);
pkt.putLong(offset);
pkt.putLong(seqno);
pkt.put((byte)((offset + len >= endOffset) ? 1 : 0));
//why no ByteBuf.putBoolean()?
pkt.putInt(len);

int checksumOff = pkt.position();
int checksumLen = numChunks * checksumSize;
byte[] buf = pkt.array();

if (checksumSize > 0 && checksumIn != null) {
try {
checksumIn.readFully(buf, checksumOff, checksumLen);
} catch (IOException e) {
LOG.warn(" Could not read or failed to veirfy checksum for data" +
" at offset " + offset + " for block " + block + " got : "
+ StringUtils.stringifyException(e));
IOUtils.closeStream(checksumIn);
checksumIn = null;
if (corruptChecksumOk) {
if (checksumOff < checksumLen) {
// Just fill the array with zeros.
Arrays.fill(buf, checksumOff, checksumLen, (byte) 0);
}
} else {
throw e;
}
}
}

int dataOff = checksumOff + checksumLen;

if (blockInPosition < 0) {
//normal transfer
IOUtils.readFully(blockIn, buf, dataOff, len);

if (verifyChecksum) {
int dOff = dataOff;
int cOff = checksumOff;
int dLeft = len;

for (int i=0; i<numChunks; i++) {
checksum.reset();
int dLen = Math.min(dLeft, bytesPerChecksum);
checksum.update(buf, dOff, dLen);
if (!checksum.compare(buf, cOff)) {
throw new ChecksumException("Checksum failed at " +
(offset + len - dLeft), len);
}
dLeft -= dLen;
dOff += dLen;
cOff += checksumSize;
}
}

// only recompute checksum if we can't trust the meta data due to
// concurrent writes
if (memoizedBlock.hasBlockChanged(len)) {
ChecksumUtil.updateChunkChecksum(
buf, checksumOff, dataOff, len, checksum
);
}

try {
out.write(buf, 0, dataOff + len);
} catch (IOException e) {
throw ioeToSocketException(e);
}
} else {
try {
//use transferTo(). Checks on out and blockIn are already done.
SocketOutputStream sockOut = (SocketOutputStream) out;
FileChannel fileChannel = ((FileInputStream) blockIn).getChannel();

if (memoizedBlock.hasBlockChanged(len)) {
fileChannel.position(blockInPosition);
IOUtils.readFileChannelFully(
fileChannel,
buf,
dataOff,
len
);

ChecksumUtil.updateChunkChecksum(
buf, checksumOff, dataOff, len, checksum
);
sockOut.write(buf, 0, dataOff + len);
} else {
//first write the packet
sockOut.write(buf, 0, dataOff);
// no need to flush. since we know out is not a buffered stream.
sockOut.transferToFully(fileChannel, blockInPosition, len);
}

blockInPosition += len;

} catch (IOException e) {
/* exception while writing to the client (well, with transferTo(),
* it could also be while reading from the local file).
*/
throw ioeToSocketException(e);
}
}

if (throttler != null) { // rebalancing so throttle
throttler.throttle(packetLen);
}

return len;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息