您的位置:首页 > 其它

mapreduce的文件拆分,FileInputFormat

2013-01-05 23:44 337 查看
在map之前会对要处理的文件进行拆分,按照定义的格式进行都写操作。主要是在InputFormat中,

InputFormat是一个抽象类,主要有两个抽象方法:

1, public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException;

确认输入的且分原则

2, public abstract RecordReader<K,V> createRecordReader(InputSplit split, TaskAttemptContext context ) throws IOException,InterruptedException;

按照指定格式读取数据

在起子类中需要实现这两个方法:

FileInputFormat:

配置FileInputFormat的参数:

1, mapred.input.pathFilter.class:输入文件过滤器,通过过滤器的文件才会加入InputFormat

public static void setInputPathFilter(Job job,

Class<? extends PathFilter> filter) {

job.getConfiguration().setClass("mapred.input.pathFilter.class", filter,

PathFilter.class);

}

2, mapred.min.split.size:最小的划分大

public static void setMinInputSplitSize(Job job,

long size) {

job.getConfiguration().setLong("mapred.min.split.size", size);

}

3, mapred.max.split.size:最大的划分大小;

public static void setMaxInputSplitSize(Job job,

long size) {

job.getConfiguration().setLong("mapred.max.split.size", size);

}

4, mapred.input.dir:输入路径,用逗号做分割。

conf.set("mapred.input.dir", dirs == null ? dirStr : dirs + "," + dirStr);

FileInputFormat实现了InputFormat的getSplits()方法,将输入的文件划分为InputSplit(输入块)。

/**

* Generate the list of files and make them into FileSplits.

*/

public List<InputSplit> getSplits(JobContext job

) throws IOException {

long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));

long maxSize = getMaxSplitSize(job);

// generate splits

List<InputSplit> splits = new ArrayList<InputSplit>();

for (FileStatus file: listStatus(job)) {

Path path = file.getPath();

FileSystem fs = path.getFileSystem(job.getConfiguration());

long length = file.getLen();

BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);

if ((length != 0) && isSplitable(job, path)) {

long blockSize = file.getBlockSize();

long splitSize = computeSplitSize(blockSize, minSize, maxSize);

long bytesRemaining = length;

while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {

int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);

splits.add(new FileSplit(path, length-bytesRemaining, splitSize,

blkLocations[blkIndex].getHosts()));

bytesRemaining -= splitSize;

}



if (bytesRemaining != 0) {

splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,

blkLocations[blkLocations.length-1].getHosts()));

}

} else if (length != 0) {

splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));

} else {

//Create empty hosts array for zero length files

splits.add(new FileSplit(path, 0, length, new String[0]));

}

}

LOG.debug("Total # of splits: " + splits.size());

return splits;

}

文件的划分是依据maxsize,BlockSize,minsize来的,



protected long computeSplitSize(long blockSize, long minSize,

long maxSize) {

return Math.max(minSize, Math.min(maxSize, blockSize));

}


另一个方法是:protected List<FileStatus> listStatus(JobContext job ) throws IOException

递归获取输入数据中的文件,其中的job包含前面的那几个参数,是系统的配置Configuration

/** List input directories.

* Subclasses may override to, e.g., select only files matching a regular

* expression.

*

* @param job the job to list input paths for

* @return array of FileStatus objects

* @throws IOException if zero items.

*/

protected List<FileStatus> listStatus(JobContext job

) throws IOException {

List<FileStatus> result = new ArrayList<FileStatus>();

Path[] dirs = getInputPaths(job);

if (dirs.length == 0) {

throw new IOException("No input paths specified in job");

}

List<IOException> errors = new ArrayList<IOException>();



// creates a MultiPathFilter with the hiddenFileFilter and the

// user provided one (if any).

List<PathFilter> filters = new ArrayList<PathFilter>();

filters.add(hiddenFileFilter);

PathFilter jobFilter = getInputPathFilter(job);

if (jobFilter != null) {

filters.add(jobFilter);

}

PathFilter inputFilter = new MultiPathFilter(filters);



for (int i=0; i < dirs.length; ++i) {

Path p = dirs[i];

FileSystem fs = p.getFileSystem(job.getConfiguration());

FileStatus[] matches = fs.globStatus(p, inputFilter);

if (matches == null) {

errors.add(new IOException("Input path does not exist: " + p));

} else if (matches.length == 0) {

errors.add(new IOException("Input Pattern " + p + " matches 0 files"));

} else {

for (FileStatus globStat: matches) {

if (globStat.isDir()) {

for(FileStatus stat: fs.listStatus(globStat.getPath(),

inputFilter)) {

result.add(stat);

}

} else {

result.add(globStat);

}

}

}

}

if (!errors.isEmpty()) {

throw new InvalidInputException(errors);

}

LOG.info("Total input paths to process : " + result.size());

return result;

}



切分之后有RecordReader来读取,

FileInputFormat没有对应的RecordReader,他的两个子类:

SequenceFileInputFormat二进制形式存放的键/值文件

TextInputFormat是文本文件的处理,

他们的createRecordReader()分别返回SequenceFileRecordReader,LineRecordReader实例

hadoop默认的InputFormat是TextInputFormat,重写了FileInputFormat中的createRecordReader和isSplitable方法。该类使用的reader是LineRecordReader,即以回车键(CR
= 13)或换行符(LF = 10)为行分隔符。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: