您的位置:首页 > 运维架构

Hadoop 1.x的Shuffle源码分析之2

2015-05-13 07:59 155 查看
ReduceTask类的内嵌类ReduceCopier的内嵌类MapOutputCopier的函数copyOutput是Shuffle里最重要的一环,它以http的方式,从远程主机取数据:创建临时文件名,然后用http读数据,再保存到内存文件系统或者本地文件系统。它读取远程文件的函数是getMapOutput。

getMapOutput函数如下:

private MapOutput getMapOutput(MapOutputLocation mapOutputLoc,
Path filename, int reduce)
throws IOException, InterruptedException {
//建立http链接
URL url = mapOutputLoc.getOutputLocation();
HttpURLConnection connection = (HttpURLConnection)url.openConnection();
//创建输入流
InputStream input = setupSecureConnection(mapOutputLoc, connection);

//检查连接姿势是否正确
int rc = connection.getResponseCode();
if (rc != HttpURLConnection.HTTP_OK) {
throw new IOException(
"Got invalid response code " + rc + " from " + url +
": " + connection.getResponseMessage());
}

//从http链接获取mapId
TaskAttemptID mapId = null;
try {
mapId =
TaskAttemptID.forName(connection.getHeaderField(FROM_MAP_TASK));
} catch (IllegalArgumentException ia) {
LOG.warn("Invalid map id ", ia);
return null;
}
</pre><pre code_snippet_id="665348" snippet_file_name="blog_20150513_3_7696491" name="code" class="java">        //检查mapId是否一致
TaskAttemptID expectedMapId = mapOutputLoc.getTaskAttemptId();
if (!mapId.equals(expectedMapId)) {
LOG.warn("data from wrong map:" + mapId +
" arrived to reduce task " + reduce +
", where as expected map output should be from " + expectedMapId);
return null;
}
//如果数据有压缩,要获取压缩长度
long decompressedLength =
Long.parseLong(connection.getHeaderField(RAW_MAP_OUTPUT_LENGTH));
long compressedLength =
Long.parseLong(connection.getHeaderField(MAP_OUTPUT_LENGTH));

if (compressedLength < 0 || decompressedLength < 0) {
LOG.warn(getName() + " invalid lengths in map output header: id: " +
mapId + " compressed len: " + compressedLength +
", decompressed len: " + decompressedLength);
return null;
}
int forReduce =
(int)Integer.parseInt(connection.getHeaderField(FOR_REDUCE_TASK));

if (forReduce != reduce) {
LOG.warn("data for the wrong reduce: " + forReduce +
" with compressed len: " + compressedLength +
", decompressed len: " + decompressedLength +
" arrived to reduce task " + reduce);
return null;
}
if (LOG.isDebugEnabled()) {
LOG.debug("header: " + mapId + ", compressed len: " + compressedLength +
", decompressed len: " + decompressedLength);
}

//We will put a file in memory if it meets certain criteria:
//1. The size of the (decompressed) file should be less than 25% of
//    the total inmem fs
//2. There is space available in the inmem fs

// Check if this map-output can be saved in-memory
boolean shuffleInMemory = ramManager.canFitInMemory(decompressedLength);

// Shuffle
MapOutput mapOutput = null;
if (shuffleInMemory) {
if (LOG.isDebugEnabled()) {
LOG.debug("Shuffling " + decompressedLength + " bytes (" +
compressedLength + " raw bytes) " +
"into RAM from " + mapOutputLoc.getTaskAttemptId());
}
//在内存做shuffle处理
mapOutput = shuffleInMemory(mapOutputLoc, connection, input,
(int)decompressedLength,
(int)compressedLength);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Shuffling " + decompressedLength + " bytes (" +
compressedLength + " raw bytes) " +
"into Local-FS from " + mapOutputLoc.getTaskAttemptId());
}
//在本地做shuffle处理
mapOutput = shuffleToDisk(mapOutputLoc, input, filename,
compressedLength);
}
mapOutput.decompressedSize = decompressedLength;
return mapOutput;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: