您的位置:首页 > 运维架构

mapreduce结构及运行机制

2018-01-04 00:00 337 查看
结构

一个完整的mapreduce程序在分布式运行时有三类实例进程:

MRAppMaster:负责整个程序的过程调度和状态协调。

MapTask:负责map阶段的数据处理流程。

ReduceTask:负责reduce阶段的整个数据处理流程。

运行机制及流程

mr启动时,先启动MRAppMaster进程,MRAppMaster启动后根据客户端job提交形成的任务分配策略(根据待处理数据及传递的参数形成一定的分片规则),计算出需要的maptask的实例数量,然后向集群申请机器启动相应数量的maptask进程。

maptask启动后,根据切片范围进行数据处理。(即从客户端指定的目录读取数据,逐行调用map方法,进行逻辑处理,并将map方法处理后的kv对收集到缓存,最后再将缓存中的kv对根据k分区,不断溢写到磁盘文件)

MRAppMaster监控到所有maptask处理完成后,根据客户端设定的reducetask的启动个数,启动相应数量的reducetask进程,并告知每个reducetask要处理的数据范围。

reducetask启动后,从若干maptask运行所在机器上取到map输出文件,在本地进行重组归并排序,然后按照相同的key的kv为一组,调用reduce方法进行逻辑运算并收集输出结果kv。最后,调用客户端指定的outputformat将结果输出到目标文件。

maptask并行数量的决定机制

maptask的数量是由客户端提交job时决定的。即根据待处理的数据以及job提交时指定的参数决定。

切片的基本逻辑是由FlieInputFormat实现类的getSplits()方法完成。源码:

/**
* Generate the list of files and make them into FileSplits.
* @param job the job context
* @throws IOException
*/
public List<InputSplit> getSplits(JobContext job) throws IOException {
Stopwatch sw = new Stopwatch().start();
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);

// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
List<FileStatus> files = listStatus(job);
for (FileStatus file : files) {
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file)
.getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
if (isSplitable(job, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(blockSize, minSize,
maxSize);

long bytesRemaining = length;
while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length
- bytesRemaining);
splits.add(makeSplit(path, length - bytesRemaining,
splitSize, blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}

if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length
- bytesRemaining);
splits.add(makeSplit(path, length - bytesRemaining,
bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
splits.add(makeSplit(path, 0, length,
blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
} else {
// Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
return splits;
}

public static final String SPLIT_MAXSIZE = "mapreduce.input.fileinputformat.split.maxsize";
public static final String SPLIT_MINSIZE = "mapreduce.input.fileinputformat.split.minsize";

protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}

/**
* Get the lower bound on split size imposed by the format.
* @return the number of bytes of the minimal split for this format
*/
protected long getFormatMinSplitSize() {
return 1;
}

/**
* Get the minimum split size
* @param job the job
* @return the minimum number of bytes that can be in a split
*/
public static long getMinSplitSize(JobContext job) {
return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
}

/**
* Get the maximum split size.
* @param context the job to look at.
* @return the maximum number of bytes a split can include
*/
public static long getMaxSplitSize(JobContext context) {
return context.getConfiguration().getLong(SPLIT_MAXSIZE, Long.MAX_VALUE);
}

通过源码可以看出:

在FileInputFormat中,计算切片大小的逻辑:Math.max(minSize, Math.min(maxSize, blockSize)); 切片主要由下面几个值来运算决定。

minSize:默认值1,可以通过参数mapreduce.input.fileinputformat.split.minsize指定

maxSize:默认值Long.MAX_VALUE,可以通过参数mapreduce.input.fileinputformat.split.maxsize指定

在都取默认值情况下切片大小=blocksize=128M。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Hadoop mapreduce