您的位置:首页 > 编程语言

【Hadoop代码笔记】Hadoop作业提交之Child启动reduce任务

2014-01-28 16:28 344 查看
一、概要描述

上篇博文描述了TaskTracker启动一个独立的java进程来执行Map任务。接上上篇文章,TaskRunner线程执行中,会构造一个java –D** Child address port tasked这样第一个java命令,单独启动一个java进程。在Child的main函数中通过TaskUmbilicalProtocol协议,从TaskTracker获得需要执行的Task,并调用Task的run方法来执行。在ReduceTask而Task的run方法会通过java反射机制构造Reducer,Reducer.Context,然后调用构造的Reducer的run方法执行reduce操作。不同于map任务,在执行reduce任务前,需要把map的输出从map运行的tasktracker上拷贝到reducer运行的tasktracker上。

Reduce需要集群上若干个map任务的输出作为其特殊的分区文件。每个map任务完成的时间可能不同,因此只要有一个任务完成,reduce任务就开始复制其输出。这就是reduce任务的复制阶段。其实是启动若干个MapOutputCopier线程来复制完所有map输出。在复制完成后reduce任务进入排序阶段。这个阶段将由LocalFSMerger或InMemFSMergeThread合并map输出,维持其顺序排序。【即对有序的几个文件进行归并,采用归并排序】在reduce阶段,对已排序输出的每个键都要调用reduce函数,此阶段的输出直接写到文件系统,一般为HDFS上。(如果采用HDFS,由于tasktracker节点也是DataNoe,所以第一个块副本将被写到本地磁盘。 即数据本地化)

Map 任务完成后,会通知其父tasktracker状态更新,然后tasktracker通知jobtracker。通过心跳机制来完成。因此jobtracker知道map输出和tasktracker之间的映射关系。Reducer的一个getMapCompletionEvents线程定期询问jobtracker以便获取map输出位置。

二、 流程描述

1.在ReduceTak中 构建ReduceCopier对象,调用其fetchOutputs方法。

2. 在ReduceCopier的fetchOutputs方法中分别构造几个独立的线程。相互配合,并分别独立的完成任务。

2.1 GetMapEventsThread线程通过RPC询问TaskTracker,对每个完成的Event,获取maptask所在的服务器地址,即MapTask输出的地址,构造URL,加入到mapLocations,供copier线程获取。

2.2构造并启动若干个MapOutputCopier线程,通过http协议,把map的输出从远端服务器拷贝的本地,如果可以放在内存中,则存储在内存中调用,否则保存在本地文件。

2.3LocalFSMerger对磁盘上的map 输出进行归并。

2.4nMemFSMergeThread对内存中的map输出进行归并。

3.根据拷贝到的map输出构造一个raw keyvalue的迭代器,作为reduce的输入。

4. 调用runNewReducer方法中根据配置的Reducer类构造一个Reducer实例和运行的上下文。并调用reducer的run方法来执行到用户定义的reduce操作。。

5.在Reducer的run方法中从上下文中取出一个key和该key对应的Value集合(Iterable<VALUEIN>类型),调用reducer的reduce方法进行处理。

6. Recuer的reduce方法是用户定义的处理数据的方法,也是用户唯一需要定义的方法。

public boolean fetchOutputs() throws IOException {
int totalFailures = 0;
int            numInFlight = 0, numCopied = 0;
DecimalFormat  mbpsFormat = new DecimalFormat("0.00");
final Progress copyPhase =
reduceTask.getProgress().phase();
LocalFSMerger localFSMergerThread = null;
InMemFSMergeThread inMemFSMergeThread = null;
GetMapEventsThread getMapEventsThread = null;

for (int i = 0; i < numMaps; i++) {
copyPhase.addPhase();       // add sub-phase per file
}

//1)根据配置的numCopiers数量构造若干个MapOutputCopier拷贝线程,默认是5个,正是这些MapOutputCopier来实施的拷贝任务。
copiers = new ArrayList<MapOutputCopier>(numCopiers);

// start all the copying threads
for (int i=0; i < numCopiers; i++) {
MapOutputCopier copier = new MapOutputCopier(conf, reporter);
copiers.add(copier);

copier.start();
}

//start the on-disk-merge thread 2)启动磁盘merge线程(参照后面方法)
localFSMergerThread = new LocalFSMerger((LocalFileSystem)localFileSys);
//start the in memory merger thread 3)启动内存merge线程(参照后面方法)
inMemFSMergeThread = new InMemFSMergeThread();
localFSMergerThread.start();
inMemFSMergeThread.start();

// start the map events thread 4)启动merge事件获取线程
getMapEventsThread = new GetMapEventsThread();
getMapEventsThread.start();

// start the clock for bandwidth measurement
long startTime = System.currentTimeMillis();
long currentTime = startTime;
long lastProgressTime = startTime;
long lastOutputTime = 0;

// loop until we get all required outputs
//5)当获取到的copiedMapOutputs数量小于map数时,说明还没有拷贝完成,则一直执行。在执行中会根据时间进度一直打印输出,表示已经拷贝了多少个map的输出,还有多万未完成。
while (copiedMapOutputs.size() < numMaps && mergeThrowable == null) {

currentTime = System.currentTimeMillis();
boolean logNow = false;
if (currentTime - lastOutputTime > MIN_LOG_TIME) {
lastOutputTime = currentTime;
logNow = true;
}
if (logNow) {
LOG.info(reduceTask.getTaskID() + " Need another "
+ (numMaps - copiedMapOutputs.size()) + " map output(s) "
+ "where " + numInFlight + " is already in progress");
}

// Put the hash entries for the failed fetches.
Iterator<MapOutputLocation> locItr = retryFetches.iterator();

while (locItr.hasNext()) {
MapOutputLocation loc = locItr.next();
List<MapOutputLocation> locList =
mapLocations.get(loc.getHost());

// Check if the list exists. Map output location mapping is cleared
// once the jobtracker restarts and is rebuilt from scratch.
// Note that map-output-location mapping will be recreated and hence
// we continue with the hope that we might find some locations
// from the rebuild map.
if (locList != null) {
// Add to the beginning of the list so that this map is
//tried again before the others and we can hasten the
//re-execution of this map should there be a problem
locList.add(0, loc);
}
}

if (retryFetches.size() > 0) {
LOG.info(reduceTask.getTaskID() + ": " +
"Got " + retryFetches.size() +
" map-outputs from previous failures");
}
// clear the "failed" fetches hashmap
retryFetches.clear();

// now walk through the cache and schedule what we can
int numScheduled = 0;
int numDups = 0;

synchronized (scheduledCopies) {

// Randomize the map output locations to prevent
// all reduce-tasks swamping the same tasktracker
List<String> hostList = new ArrayList<String>();
hostList.addAll(mapLocations.keySet());

Collections.shuffle(hostList, this.random);

Iterator<String> hostsItr = hostList.iterator();

while (hostsItr.hasNext()) {

String host = hostsItr.next();

List<MapOutputLocation> knownOutputsByLoc =
mapLocations.get(host);

// Check if the list exists. Map output location mapping is
// cleared once the jobtracker restarts and is rebuilt from
// scratch.
// Note that map-output-location mapping will be recreated and
// hence we continue with the hope that we might find some
// locations from the rebuild map and add then for fetching.
if (knownOutputsByLoc == null || knownOutputsByLoc.size() == 0) {
continue;
}

//Identify duplicate hosts here
if (uniqueHosts.contains(host)) {
numDups += knownOutputsByLoc.size();
continue;
}

Long penaltyEnd = penaltyBox.get(host);
boolean penalized = false;

if (penaltyEnd != null) {
if (currentTime < penaltyEnd.longValue()) {
penalized = true;
} else {
penaltyBox.remove(host);
}
}

if (penalized)
continue;

synchronized (knownOutputsByLoc) {

locItr = knownOutputsByLoc.iterator();

while (locItr.hasNext()) {

MapOutputLocation loc = locItr.next();

// Do not schedule fetches from OBSOLETE maps
if (obsoleteMapIds.contains(loc.getTaskAttemptId())) {
locItr.remove();
continue;
}

uniqueHosts.add(host);
scheduledCopies.add(loc);
locItr.remove();  // remove from knownOutputs
numInFlight++; numScheduled++;

break; //we have a map from this host
}
}
}
scheduledCopies.notifyAll();
}

if (numScheduled > 0 || logNow) {
LOG.info(reduceTask.getTaskID() + " Scheduled " + numScheduled +
" outputs (" + penaltyBox.size() +
" slow hosts and" + numDups + " dup hosts)");
}

if (penaltyBox.size() > 0 && logNow) {
LOG.info("Penalized(slow) Hosts: ");
for (String host : penaltyBox.keySet()) {
LOG.info(host + " Will be considered after: " +
((penaltyBox.get(host) - currentTime)/1000) + " seconds.");
}
}

// if we have no copies in flight and we can't schedule anything
// new, just wait for a bit
try {
if (numInFlight == 0 && numScheduled == 0) {
// we should indicate progress as we don't want TT to think
// we're stuck and kill us
reporter.progress();
Thread.sleep(5000);
}
} catch (InterruptedException e) { } // IGNORE

while (numInFlight > 0 && mergeThrowable == null) {
LOG.debug(reduceTask.getTaskID() + " numInFlight = " +
numInFlight);
//the call to getCopyResult will either
//1) return immediately with a null or a valid CopyResult object,
//                 or
//2) if the numInFlight is above maxInFlight, return with a
//   CopyResult object after getting a notification from a
//   fetcher thread,
//So, when getCopyResult returns null, we can be sure that
//we aren't busy enough and we should go and get more mapcompletion
//events from the tasktracker
CopyResult cr = getCopyResult(numInFlight);

if (cr == null) {
break;
}

if (cr.getSuccess()) {  // a successful copy
numCopied++;
lastProgressTime = System.currentTimeMillis();
reduceShuffleBytes.increment(cr.getSize());

long secsSinceStart =
(System.currentTimeMillis()-startTime)/1000+1;
float mbs = ((float)reduceShuffleBytes.getCounter())/(1024*1024);
float transferRate = mbs/secsSinceStart;

copyPhase.startNextPhase();
copyPhase.setStatus("copy (" + numCopied + " of " + numMaps
+ " at " +
mbpsFormat.format(transferRate) +  " MB/s)");

// Note successful fetch for this mapId to invalidate
// (possibly) old fetch-failures
fetchFailedMaps.remove(cr.getLocation().getTaskId());
} else if (cr.isObsolete()) {
//ignore
LOG.info(reduceTask.getTaskID() +
" Ignoring obsolete copy result for Map Task: " +
cr.getLocation().getTaskAttemptId() + " from host: " +
cr.getHost());
} else {
retryFetches.add(cr.getLocation());

// note the failed-fetch
TaskAttemptID mapTaskId = cr.getLocation().getTaskAttemptId();
TaskID mapId = cr.getLocation().getTaskId();

totalFailures++;
Integer noFailedFetches =
mapTaskToFailedFetchesMap.get(mapTaskId);
noFailedFetches =
(noFailedFetches == null) ? 1 : (noFailedFetches + 1);
mapTaskToFailedFetchesMap.put(mapTaskId, noFailedFetches);
LOG.info("Task " + getTaskID() + ": Failed fetch #" +
noFailedFetches + " from " + mapTaskId);

// did the fetch fail too many times?
// using a hybrid technique for notifying the jobtracker.
//   a. the first notification is sent after max-retries
//   b. subsequent notifications are sent after 2 retries.
if ((noFailedFetches >= maxFetchRetriesPerMap)
&& ((noFailedFetches - maxFetchRetriesPerMap) % 2) == 0) {
synchronized (ReduceTask.this) {
taskStatus.addFetchFailedMap(mapTaskId);
LOG.info("Failed to fetch map-output from " + mapTaskId +
" even after MAX_FETCH_RETRIES_PER_MAP retries... "
+ " reporting to the JobTracker");
}
}
// note unique failed-fetch maps
if (noFailedFetches == maxFetchRetriesPerMap) {
fetchFailedMaps.add(mapId);

// did we have too many unique failed-fetch maps?
// and did we fail on too many fetch attempts?
// and did we progress enough
//     or did we wait for too long without any progress?

// check if the reducer is healthy
boolean reducerHealthy =
(((float)totalFailures / (totalFailures + numCopied))
< MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT);

// check if the reducer has progressed enough
boolean reducerProgressedEnough =
(((float)numCopied / numMaps)
>= MIN_REQUIRED_PROGRESS_PERCENT);

// check if the reducer is stalled for a long time
// duration for which the reducer is stalled
int stallDuration =
(int)(System.currentTimeMillis() - lastProgressTime);
// duration for which the reducer ran with progress
int shuffleProgressDuration =
(int)(lastProgressTime - startTime);
// min time the reducer should run without getting killed
int minShuffleRunDuration =
(shuffleProgressDuration > maxMapRuntime)
? shuffleProgressDuration
: maxMapRuntime;
boolean reducerStalled =
(((float)stallDuration / minShuffleRunDuration)
>= MAX_ALLOWED_STALL_TIME_PERCENT);

// kill if not healthy and has insufficient progress
if ((fetchFailedMaps.size() >= maxFailedUniqueFetches ||
fetchFailedMaps.size() == (numMaps - copiedMapOutputs.size()))
&& !reducerHealthy
&& (!reducerProgressedEnough || reducerStalled)) {
LOG.fatal("Shuffle failed with too many fetch failures " +
"and insufficient progress!" +
"Killing task " + getTaskID() + ".");
umbilical.shuffleError(getTaskID(),
"Exceeded MAX_FAILED_UNIQUE_FETCHES;"
+ " bailing-out.");
}
}

// back off exponentially until num_retries <= max_retries
// back off by max_backoff/2 on subsequent failed attempts
currentTime = System.currentTimeMillis();
int currentBackOff = noFailedFetches <= maxFetchRetriesPerMap
? BACKOFF_INIT
* (1 << (noFailedFetches - 1))
: (this.maxBackoff * 1000 / 2);
penaltyBox.put(cr.getHost(), currentTime + currentBackOff);
LOG.warn(reduceTask.getTaskID() + " adding host " +
cr.getHost() + " to penalty box, next contact in " +
(currentBackOff/1000) + " seconds");
}
uniqueHosts.remove(cr.getHost());
numInFlight--;
}
}

// all done, inform the copiers to exit
exitGetMapEvents= true;
try {
getMapEventsThread.join();
LOG.info("getMapsEventsThread joined.");
} catch (Throwable t) {
LOG.info("getMapsEventsThread threw an exception: " +
StringUtils.stringifyException(t));
}

synchronized (copiers) {
synchronized (scheduledCopies) {
for (MapOutputCopier copier : copiers) {
copier.interrupt();
}
copiers.clear();
}
}

// copiers are done, exit and notify the waiting merge threads
synchronized (mapOutputFilesOnDisk) {
exitLocalFSMerge = true;
mapOutputFilesOnDisk.notify();
}

ramManager.close();

//Do a merge of in-memory files (if there are any)
if (mergeThrowable == null) {
try {
// Wait for the on-disk merge to complete
localFSMergerThread.join();
LOG.info("Interleaved on-disk merge complete: " +
mapOutputFilesOnDisk.size() + " files left.");

//wait for an ongoing merge (if it is in flight) to complete
inMemFSMergeThread.join();
LOG.info("In-memory merge complete: " +
mapOutputsFilesInMemory.size() + " files left.");
} catch (Throwable t) {
LOG.warn(reduceTask.getTaskID() +
" Final merge of the inmemory files threw an exception: " +
StringUtils.stringifyException(t));
// check if the last merge generated an error
if (mergeThrowable != null) {
mergeThrowable = t;
}
return false;
}
}
return mergeThrowable == null && copiedMapOutputs.size() == numMaps;
}


fetchOutputs
4. MapOutputCopier线程的run方法。从scheduledCopies(List<MapOutputLocation>)中取出对象来调用copyOutput方法执行拷贝。通过http协议,把map的输出从远端服务器拷贝的本地,如果可以放在内存中,则存储在内存中调用,否则保存在本地文件。

public void run() {
while (true) {
MapOutputLocation loc = null;
long size = -1;
synchronized (scheduledCopies) {
while (scheduledCopies.isEmpty()) {
scheduledCopies.wait();
}
loc = scheduledCopies.remove(0);
}

start(loc);
size = copyOutput(loc);

if (decompressor != null) {
CodecPool.returnDecompressor(decompressor);
}

}


5.MapOutputCopier线程的copyOutput方法。map的输出从远端map所在的tasktracker拷贝到reducer任务所在的tasktracker。

private long copyOutput(MapOutputLocation loc
) throws IOException, InterruptedException {
// 从拷贝的记录中检查是否已经拷贝完成。
if (copiedMapOutputs.contains(loc.getTaskId()) ||
obsoleteMapIds.contains(loc.getTaskAttemptId())) {
return CopyResult.OBSOLETE;
}
TaskAttemptID reduceId = reduceTask.getTaskID();
Path filename = new Path("/" + TaskTracker.getIntermediateOutputDir(
reduceId.getJobID().toString(),
reduceId.toString())
+ "/map_" +
loc.getTaskId().getId() + ".out");

//一个拷贝map输出的临时文件。
Path tmpMapOutput = new Path(filename+"-"+id);

//拷贝map输出。
MapOutput mapOutput = getMapOutput(loc, tmpMapOutput);
if (mapOutput == null) {
throw new IOException("Failed to fetch map-output for " +
loc.getTaskAttemptId() + " from " +
loc.getHost());
}
// The size of the map-output
long bytes = mapOutput.compressedSize;

synchronized (ReduceTask.this) {
if (copiedMapOutputs.contains(loc.getTaskId())) {
mapOutput.discard();
return CopyResult.OBSOLETE;
}
// Note that we successfully copied the map-output
noteCopiedMapOutput(loc.getTaskId());
return bytes;
}

// 处理map的输出,如果是存储在内存中则添加到(Collections.synchronizedList(new LinkedList<MapOutput>)类型的结合mapOutputsFilesInMemory中,否则如果存储在临时文件中,则冲明明临时文件为正式的输出文件。
if (mapOutput.inMemory) {
// Save it in the synchronized list of map-outputs
mapOutputsFilesInMemory.add(mapOutput);
} else {

tmpMapOutput = mapOutput.file;
filename = new Path(tmpMapOutput.getParent(), filename.getName());
if (!localFileSys.rename(tmpMapOutput, filename)) {
localFileSys.delete(tmpMapOutput, true);
bytes = -1;
throw new IOException("Failed to rename map output " +
tmpMapOutput + " to " + filename);
}

synchronized (mapOutputFilesOnDisk) {
addToMapOutputFilesOnDisk(localFileSys.getFileStatus(filename));
}
}

// Note that we successfully copied the map-output
noteCopiedMapOutput(loc.getTaskId());
}

return bytes;
}


5.ReduceCopier.MapOutputCopier的getMapOutput方法,真正执行拷贝动作的方法,通过http把远端服务器上map的输出拷贝到本地。

private MapOutput getMapOutput(MapOutputLocation mapOutputLoc,
Path filename, int reduce)
throws IOException, InterruptedException {
// 根据远端服务器地址构建连接。
URLConnection connection =
mapOutputLoc.getOutputLocation().openConnection();
InputStream input = getInputStream(connection, STALLED_COPY_TIMEOUT,
DEFAULT_READ_TIMEOUT);

// 从输出的http header中得到mapid
TaskAttemptID mapId = null;
mapId =           TaskAttemptID.forName(connection.getHeaderField(FROM_MAP_TASK));

TaskAttemptID expectedMapId = mapOutputLoc.getTaskAttemptId();
if (!mapId.equals(expectedMapId)) {
LOG.warn("data from wrong map:" + mapId +
" arrived to reduce task " + reduce +
", where as expected map output should be from " + expectedMapId);
return null;
}

long decompressedLength =
Long.parseLong(connection.getHeaderField(RAW_MAP_OUTPUT_LENGTH));
long compressedLength =
Long.parseLong(connection.getHeaderField(MAP_OUTPUT_LENGTH));

if (compressedLength < 0 || decompressedLength < 0) {
LOG.warn(getName() + " invalid lengths in map output header: id: " +
mapId + " compressed len: " + compressedLength +
", decompressed len: " + decompressedLength);
return null;
}
int forReduce =
(int)Integer.parseInt(connection.getHeaderField(FOR_REDUCE_TASK));

if (forReduce != reduce) {
LOG.warn("data for the wrong reduce: " + forReduce +
" with compressed len: " + compressedLength +
", decompressed len: " + decompressedLength +
" arrived to reduce task " + reduce);
return null;
}
LOG.info("header: " + mapId + ", compressed len: " + compressedLength +
", decompressed len: " + decompressedLength);

// 检查map的输出大小是否能在memory里存储下,已决定是在内存中shuffle还是在磁盘上shuffle。并决定最终生成的MapOutput对象调用不同的构造函数,其inMemory属性页不同。
boolean shuffleInMemory = ramManager.canFitInMemory(decompressedLength);

// Shuffle
MapOutput mapOutput = null;
if (shuffleInMemory) {
LOG.info("Shuffling " + decompressedLength + " bytes (" +
compressedLength + " raw bytes) " +
"into RAM from " + mapOutputLoc.getTaskAttemptId());

mapOutput = shuffleInMemory(mapOutputLoc, connection, input,
(int)decompressedLength,
(int)compressedLength);
} else {
LOG.info("Shuffling " + decompressedLength + " bytes (" +
compressedLength + " raw bytes) " +
"into Local-FS from " + mapOutputLoc.getTaskAttemptId());

mapOutput = shuffleToDisk(mapOutputLoc, input, filename,
compressedLength);
}

return mapOutput;
}


6.ReduceTask.ReduceCopier.MapOutputCopier的shuffleInMemory方法。根据上一方法当map的输出可以在内存中存储时会调用该方法。

private MapOutput shuffleInMemory(MapOutputLocation mapOutputLoc,
URLConnection connection,
InputStream input,
int mapOutputLength,
int compressedLength)
throws IOException, InterruptedException {

//checksum 输入流,读Mpareduce中间文件IFile.
IFileInputStream checksumIn =
new IFileInputStream(input,compressedLength);

input = checksumIn;

// 如果加密,则根据codec来构建一个解密的输入流。
if (codec != null) {
decompressor.reset();
input = codec.createInputStream(input, decompressor);
}

//把map的输出拷贝到内存的buffer中。
byte[] shuffleData = new byte[mapOutputLength];
MapOutput mapOutput =
new MapOutput(mapOutputLoc.getTaskId(),
mapOutputLoc.getTaskAttemptId(), shuffleData, compressedLength);

int bytesRead = 0;
try {
int n = input.read(shuffleData, 0, shuffleData.length);
while (n > 0) {
bytesRead += n;
shuffleClientMetrics.inputBytes(n);

// indicate we're making progress
reporter.progress();
n = input.read(shuffleData, bytesRead,
(shuffleData.length-bytesRead));
}

LOG.info("Read " + bytesRead + " bytes from map-output for " +
mapOutputLoc.getTaskAttemptId());

input.close();
} catch (IOException ioe) {
LOG.info("Failed to shuffle from " + mapOutputLoc.getTaskAttemptId(),
ioe);

// Inform the ram-manager
ramManager.closeInMemoryFile(mapOutputLength);
ramManager.unreserve(mapOutputLength);

// Discard the map-output
try {
mapOutput.discard();
} catch (IOException ignored) {
LOG.info("Failed to discard map-output from " +
mapOutputLoc.getTaskAttemptId(), ignored);
}
mapOutput = null;

// Close the streams
IOUtils.cleanup(LOG, input);

// Re-throw
throw ioe;
}

// Close the in-memory file
ramManager.closeInMemoryFile(mapOutputLength);

// Sanity check
if (bytesRead != mapOutputLength) {
// Inform the ram-manager
ramManager.unreserve(mapOutputLength);

// Discard the map-output
try {
mapOutput.discard();
} catch (IOException ignored) {
// IGNORED because we are cleaning up
LOG.info("Failed to discard map-output from " +
mapOutputLoc.getTaskAttemptId(), ignored);
}
mapOutput = null;

throw new IOException("Incomplete map output received for " +
mapOutputLoc.getTaskAttemptId() + " from " +
mapOutputLoc.getOutputLocation() + " (" +
bytesRead + " instead of " +
mapOutputLength + ")"
);
}

// TODO: Remove this after a 'fix' for HADOOP-3647
if (mapOutputLength > 0) {
DataInputBuffer dib = new DataInputBuffer();
dib.reset(shuffleData, 0, shuffleData.length);
LOG.info("Rec #1 from " + mapOutputLoc.getTaskAttemptId() + " -> (" +
WritableUtils.readVInt(dib) + ", " +
WritableUtils.readVInt(dib) + ") from " +
mapOutputLoc.getHost());
}

return mapOutput;
}


7.ReduceTask.ReduceCopier.MapOutputCopier的shuffleToDisk 方法把map输出拷贝到本地磁盘。当map的输出不能再内存中存储时,调用该方法。

private MapOutput shuffleToDisk(MapOutputLocation mapOutputLoc,
InputStream input,
Path filename,
long mapOutputLength)
throws IOException {
// Find out a suitable location for the output on local-filesystem
Path localFilename =
lDirAlloc.getLocalPathForWrite(filename.toUri().getPath(),
mapOutputLength, conf);

MapOutput mapOutput =
new MapOutput(mapOutputLoc.getTaskId(), mapOutputLoc.getTaskAttemptId(),
conf, localFileSys.makeQualified(localFilename),
mapOutputLength);

// Copy data to local-disk
OutputStream output = null;
long bytesRead = 0;
try {
output = rfs.create(localFilename);

byte[] buf = new byte[64 * 1024];
int n = input.read(buf, 0, buf.length);
while (n > 0) {
bytesRead += n;
shuffleClientMetrics.inputBytes(n);
output.write(buf, 0, n);

// indicate we're making progress
reporter.progress();
n = input.read(buf, 0, buf.length);
}

LOG.info("Read " + bytesRead + " bytes from map-output for " +
mapOutputLoc.getTaskAttemptId());

output.close();
input.close();
} catch (IOException ioe) {
LOG.info("Failed to shuffle from " + mapOutputLoc.getTaskAttemptId(),
ioe);

// Discard the map-output
try {
mapOutput.discard();
} catch (IOException ignored) {
LOG.info("Failed to discard map-output from " +
mapOutputLoc.getTaskAttemptId(), ignored);
}
mapOutput = null;

// Close the streams
IOUtils.cleanup(LOG, input, output);

// Re-throw
throw ioe;
}

// Sanity check
if (bytesRead != mapOutputLength) {
try {
mapOutput.discard();
} catch (Exception ioe) {
// IGNORED because we are cleaning up
LOG.info("Failed to discard map-output from " +
mapOutputLoc.getTaskAttemptId(), ioe);
} catch (Throwable t) {
String msg = getTaskID() + " : Failed in shuffle to disk :"
+ StringUtils.stringifyException(t);
reportFatalError(getTaskID(), t, msg);
}
mapOutput = null;

throw new IOException("Incomplete map output received for " +
mapOutputLoc.getTaskAttemptId() + " from " +
mapOutputLoc.getOutputLocation() + " (" +
bytesRead + " instead of " +
mapOutputLength + ")"
);
}

return mapOutput;

}


8.LocalFSMerger线程的run方法。Merge map输出的本地拷贝。

public void run() {
try {
LOG.info(reduceTask.getTaskID() + " Thread started: " + getName());
while(!exitLocalFSMerge){
// TreeSet<FileStatus>(mapOutputFileComparator)中存储了mapout的本地文件集合。
synchronized (mapOutputFilesOnDisk) {
List<Path> mapFiles = new ArrayList<Path>();
long approxOutputSize = 0;
int bytesPerSum =
reduceTask.getConf().getInt("io.bytes.per.checksum", 512);
LOG.info(reduceTask.getTaskID() + "We have  " +
mapOutputFilesOnDisk.size() + " map outputs on disk. " +
"Triggering merge of " + ioSortFactor + " files");
// 1. Prepare the list of files to be merged. This list is prepared
// using a list of map output files on disk. Currently we merge
// io.sort.factor files into 1.
//1. io.sort.factor构造List<Path> mapFiles,准备合并。            synchronized (mapOutputFilesOnDisk) {
for (int i = 0; i < ioSortFactor; ++i) {
FileStatus filestatus = mapOutputFilesOnDisk.first();
mapOutputFilesOnDisk.remove(filestatus);
mapFiles.add(filestatus.getPath());
approxOutputSize += filestatus.getLen();
}
}

// add the checksum length
approxOutputSize += ChecksumFileSystem
.getChecksumLength(approxOutputSize,
bytesPerSum);

// 2. 对list中的文件进行合并。
Path outputPath =
lDirAlloc.getLocalPathForWrite(mapFiles.get(0).toString(),
approxOutputSize, conf)
.suffix(".merged");
Writer writer =
new Writer(conf,rfs, outputPath,
conf.getMapOutputKeyClass(),
conf.getMapOutputValueClass(),
codec, null);
RawKeyValueIterator iter  = null;
Path tmpDir = new Path(reduceTask.getTaskID().toString());
try {
iter = Merger.merge(conf, rfs,
conf.getMapOutputKeyClass(),
conf.getMapOutputValueClass(),
codec, mapFiles.toArray(new Path[mapFiles.size()]),
true, ioSortFactor, tmpDir,
conf.getOutputKeyComparator(), reporter,
spilledRecordsCounter, null);

Merger.writeFile(iter, writer, reporter, conf);
writer.close();
} catch (Exception e) {
localFileSys.delete(outputPath, true);
throw new IOException (StringUtils.stringifyException(e));
}

synchronized (mapOutputFilesOnDisk) {
addToMapOutputFilesOnDisk(localFileSys.getFileStatus(outputPath));
}
LOG.info(reduceTask.getTaskID() +
" Finished merging " + mapFiles.size() +
" map output files on disk of total-size " +
approxOutputSize + "." +
" Local output file is " + outputPath + " of size " +
localFileSys.getFileStatus(outputPath).getLen());
}
} catch (Throwable t) {
LOG.warn(reduceTask.getTaskID()
+ " Merging of the local FS files threw an exception: "
+ StringUtils.stringifyException(t));
if (mergeThrowable == null) {
mergeThrowable = t;
}
}
}
}


9.InMemFSMergeThread线程的run方法。

public void run() {
LOG.info(reduceTask.getTaskID() + " Thread started: " + getName());
try {
boolean exit = false;
do {
exit = ramManager.waitForDataToMerge();
if (!exit) {
doInMemMerge();
}
} while (!exit);
} catch (Throwable t) {
LOG.warn(reduceTask.getTaskID() +
" Merge of the inmemory files threw an exception: "
+ StringUtils.stringifyException(t));
ReduceCopier.this.mergeThrowable = t;
}
}


10. InMemFSMergeThread线程的doInMemMerge方法,

private void doInMemMerge() throws IOException{
if (mapOutputsFilesInMemory.size() == 0) {
return;
}

TaskID mapId = mapOutputsFilesInMemory.get(0).mapId;

List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K,V>>();
long mergeOutputSize = createInMemorySegments(inMemorySegments, 0);
int noInMemorySegments = inMemorySegments.size();

Path outputPath = mapOutputFile.getInputFileForWrite(mapId,
reduceTask.getTaskID(), mergeOutputSize);

Writer writer =
new Writer(conf, rfs, outputPath,
conf.getMapOutputKeyClass(),
conf.getMapOutputValueClass(),
codec, null);

RawKeyValueIterator rIter = null;
try {
LOG.info("Initiating in-memory merge with " + noInMemorySegments +
" segments...");

rIter = Merger.merge(conf, rfs,
(Class<K>)conf.getMapOutputKeyClass(),
(Class<V>)conf.getMapOutputValueClass(),
inMemorySegments, inMemorySegments.size(),
new Path(reduceTask.getTaskID().toString()),
conf.getOutputKeyComparator(), reporter,
spilledRecordsCounter, null);

if (combinerRunner == null) {
Merger.writeFile(rIter, writer, reporter, conf);
} else {
combineCollector.setWriter(writer);
combinerRunner.combine(rIter, combineCollector);
}
writer.close();

LOG.info(reduceTask.getTaskID() +
" Merge of the " + noInMemorySegments +
" files in-memory complete." +
" Local file is " + outputPath + " of size " +
localFileSys.getFileStatus(outputPath).getLen());
} catch (Exception e) {
//make sure that we delete the ondisk file that we created
//earlier when we invoked cloneFileAttributes
localFileSys.delete(outputPath, true);
throw (IOException)new IOException
("Intermediate merge failed").initCause(e);
}

// Note the output of the merge
FileStatus status = localFileSys.getFileStatus(outputPath);
synchronized (mapOutputFilesOnDisk) {
addToMapOutputFilesOnDisk(status);
}
}
}


11.ReduceCopier.GetMapEventsThread线程的run方法。通过RPC询问TaskTracker,对每个完成的Event,获取maptask所在的服务器地址,即MapTask输出的地址,构造URL,加入到mapLocations,供copier线程获取。

public void run() {

LOG.info(reduceTask.getTaskID() + " Thread started: " + getName());

do {
try {
int numNewMaps = getMapCompletionEvents();
if (numNewMaps > 0) {
LOG.info(reduceTask.getTaskID() + ": " +
"Got " + numNewMaps + " new map-outputs");
}
Thread.sleep(SLEEP_TIME);
}
catch (InterruptedException e) {
LOG.warn(reduceTask.getTaskID() +
" GetMapEventsThread returning after an " +
" interrupted exception");
return;
}
catch (Throwable t) {
LOG.warn(reduceTask.getTaskID() +
" GetMapEventsThread Ignoring exception : " +
StringUtils.stringifyException(t));
}
} while (!exitGetMapEvents);

LOG.info("GetMapEventsThread exiting");

}


12.ReduceCopier.GetMapEventsThread线程的getMapCompletionEvents方法。通过RPC询问TaskTracker,对每个完成的Event,获取maptask所在的服务器地址,构造URL,加入到mapLocations。

private int getMapCompletionEvents() throws IOException {

int numNewMaps = 0;

//RPC调用Tasktracker的getMapCompletionEvents方法,获得MapTaskCompletionEventsUpdate,进而获得TaskCompletionEvents
MapTaskCompletionEventsUpdate update =
umbilical.getMapCompletionEvents(reduceTask.getJobID(),
fromEventId.get(),
MAX_EVENTS_TO_FETCH,
reduceTask.getTaskID());
TaskCompletionEvent events[] = update.getMapTaskCompletionEvents();

// Check if the reset is required.
// Since there is no ordering of the task completion events at the
// reducer, the only option to sync with the new jobtracker is to reset
// the events index
if (update.shouldReset()) {
fromEventId.set(0);
obsoleteMapIds.clear(); // clear the obsolete map
mapLocations.clear(); // clear the map locations mapping
}

// Update the last seen event ID
fromEventId.set(fromEventId.get() + events.length);

// Process the TaskCompletionEvents:
// 1. Save the SUCCEEDED maps in knownOutputs to fetch the outputs.
// 2. Save the OBSOLETE/FAILED/KILLED maps in obsoleteOutputs to stop
//    fetching from those maps.
// 3. Remove TIPFAILED maps from neededOutputs since we don't need their
//    outputs at all.
//对每个完成的Event,获取maptask所在的服务器地址,构造URL,加入到mapLocations,供copier线程获取。
for (TaskCompletionEvent event : events) {
switch (event.getTaskStatus()) {
case SUCCEEDED:
{
URI u = URI.create(event.getTaskTrackerHttp());
String host = u.getHost();
TaskAttemptID taskId = event.getTaskAttemptId();
int duration = event.getTaskRunTime();
if (duration > maxMapRuntime) {
maxMapRuntime = duration;
// adjust max-fetch-retries based on max-map-run-time
maxFetchRetriesPerMap = Math.max(MIN_FETCH_RETRIES_PER_MAP,
getClosestPowerOf2((maxMapRuntime / BACKOFF_INIT) + 1));
}
URL mapOutputLocation = new URL(event.getTaskTrackerHttp() +
"/mapOutput?job=" + taskId.getJobID() +
"&map=" + taskId +
"&reduce=" + getPartition());
List<MapOutputLocation> loc = mapLocations.get(host);
if (loc == null) {
loc = Collections.synchronizedList
(new LinkedList<MapOutputLocation>());
mapLocations.put(host, loc);
}
loc.add(new MapOutputLocation(taskId, host, mapOutputLocation));
numNewMaps ++;
}
break;
case FAILED:
case KILLED:
case OBSOLETE:
{
obsoleteMapIds.add(event.getTaskAttemptId());
LOG.info("Ignoring obsolete output of " + event.getTaskStatus() +
" map-task: '" + event.getTaskAttemptId() + "'");
}
break;
case TIPFAILED:
{
copiedMapOutputs.add(event.getTaskAttemptId().getTaskID());
LOG.info("Ignoring output of failed map TIP: '" +
event.getTaskAttemptId() + "'");
}
break;
}
}
return numNewMaps;
}
}
}


13.ReduceTask.ReduceCopier的createKVIterator方法,从拷贝到的map输出创建RawKeyValueIterator,作为reduce的输入。

private RawKeyValueIterator createKVIterator(
JobConf job, FileSystem fs, Reporter reporter) throws IOException {

// merge config params
Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
Class<V> valueClass = (Class<V>)job.getMapOutputValueClass();
boolean keepInputs = job.getKeepFailedTaskFiles();
final Path tmpDir = new Path(getTaskID().toString());
final RawComparator<K> comparator =
(RawComparator<K>)job.getOutputKeyComparator();

// segments required to vacate memory
List<Segment<K,V>> memDiskSegments = new ArrayList<Segment<K,V>>();
long inMemToDiskBytes = 0;
if (mapOutputsFilesInMemory.size() > 0) {
TaskID mapId = mapOutputsFilesInMemory.get(0).mapId;
inMemToDiskBytes = createInMemorySegments(memDiskSegments,
maxInMemReduce);
final int numMemDiskSegments = memDiskSegments.size();
if (numMemDiskSegments > 0 &&
ioSortFactor > mapOutputFilesOnDisk.size()) {
// must spill to disk, but can't retain in-mem for intermediate merge
final Path outputPath = mapOutputFile.getInputFileForWrite(mapId,
reduceTask.getTaskID(), inMemToDiskBytes);
final RawKeyValueIterator rIter = Merger.merge(job, fs,
keyClass, valueClass, memDiskSegments, numMemDiskSegments,
tmpDir, comparator, reporter, spilledRecordsCounter, null);
final Writer writer = new Writer(job, fs, outputPath,
keyClass, valueClass, codec, null);
try {
Merger.writeFile(rIter, writer, reporter, job);
addToMapOutputFilesOnDisk(fs.getFileStatus(outputPath));
} catch (Exception e) {
if (null != outputPath) {
fs.delete(outputPath, true);
}
throw new IOException("Final merge failed", e);
} finally {
if (null != writer) {
writer.close();
}
}
LOG.info("Merged " + numMemDiskSegments + " segments, " +
inMemToDiskBytes + " bytes to disk to satisfy " +
"reduce memory limit");
inMemToDiskBytes = 0;
memDiskSegments.clear();
} else if (inMemToDiskBytes != 0) {
LOG.info("Keeping " + numMemDiskSegments + " segments, " +
inMemToDiskBytes + " bytes in memory for " +
"intermediate, on-disk merge");
}
}

// segments on disk
List<Segment<K,V>> diskSegments = new ArrayList<Segment<K,V>>();
long onDiskBytes = inMemToDiskBytes;
Path[] onDisk = getMapFiles(fs, false);
for (Path file : onDisk) {
onDiskBytes += fs.getFileStatus(file).getLen();
diskSegments.add(new Segment<K, V>(job, fs, file, codec, keepInputs));
}
LOG.info("Merging " + onDisk.length + " files, " +
onDiskBytes + " bytes from disk");
Collections.sort(diskSegments, new Comparator<Segment<K,V>>() {
public int compare(Segment<K, V> o1, Segment<K, V> o2) {
if (o1.getLength() == o2.getLength()) {
return 0;
}
return o1.getLength() < o2.getLength() ? -1 : 1;
}
});

// build final list of segments from merged backed by disk + in-mem
List<Segment<K,V>> finalSegments = new ArrayList<Segment<K,V>>();
long inMemBytes = createInMemorySegments(finalSegments, 0);
LOG.info("Merging " + finalSegments.size() + " segments, " +
inMemBytes + " bytes from memory into reduce");
if (0 != onDiskBytes) {
final int numInMemSegments = memDiskSegments.size();
diskSegments.addAll(0, memDiskSegments);
memDiskSegments.clear();
RawKeyValueIterator diskMerge = Merger.merge(
job, fs, keyClass, valueClass, diskSegments,
ioSortFactor, numInMemSegments, tmpDir, comparator,
reporter, false, spilledRecordsCounter, null);
diskSegments.clear();
if (0 == finalSegments.size()) {
return diskMerge;
}
finalSegments.add(new Segment<K,V>(
new RawKVIteratorReader(diskMerge, onDiskBytes), true));
}
return Merger.merge(job, fs, keyClass, valueClass,
finalSegments, finalSegments.size(), tmpDir,
comparator, reporter, spilledRecordsCounter, null);
}


14.ReduceTask的runNewReducer方法。根据配置构造reducer以及其运行的上下文,调用reducer的reduce方法。

@SuppressWarnings("unchecked")
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
void runNewReducer(JobConf job,
final TaskUmbilicalProtocol umbilical,
final TaskReporter reporter,
RawKeyValueIterator rIter,
RawComparator<INKEY> comparator,
Class<INKEY> keyClass,
Class<INVALUE> valueClass
) throws IOException,InterruptedException,
ClassNotFoundException {
//1. 构造TaskContext
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
new org.apache.hadoop.mapreduce.TaskAttemptContext(job, getTaskID());
//2. 根据配置的Reducer类构造一个Reducer实例
org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =      (org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)
ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
//3. 构造RecordWriter
org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output =
(org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE>)
outputFormat.getRecordWriter(taskContext);
job.setBoolean("mapred.skip.on", isSkipping());

//4. 构造Context,是Reducer运行的上下文
org.apache.hadoop.mapreduce.Reducer.Context
reducerContext = createReduceContext(reducer, job, getTaskID(),
rIter, reduceInputValueCounter,
output, committer,
reporter, comparator, keyClass,
valueClass);
reducer.run(reducerContext);
output.close(reducerContext);
}


15.抽象类Reducer的run方法。从上下文中取出一个key和该key对应的Value集合(Iterable<VALUEIN>类型),调用reducer的reduce方法进行处理。

public void run(Context context) throws IOException, InterruptedException {
setup(context);
while (context.nextKey()) {
reduce(context.getCurrentKey(), context.getValues(), context);
}
cleanup(context);
}


16.Reducer类的reduce,是用户一般会覆盖来执行reduce处理逻辑的方法。

@SuppressWarnings("unchecked")
protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
) throws IOException, InterruptedException {
for(VALUEIN value: values) {
context.write((KEYOUT) key, (VALUEOUT) value);
}


完。

为了转载内容的一致性、可追溯性和保证及时更新纠错,转载时请注明来自:/article/5644520.html。谢谢!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: