您的位置:首页 > 其它

Custom InputFormat

2015-11-03 21:15 239 查看
The project use the directoy as an input, and make each directory to be an InputSplit, so that the key-value pair in map method is the directory name and file values in the directory.

Customed InputFormat:

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class WholeDirInputFormat extends FileInputFormat<Text, Text> {

@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}

@Override
public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
WholeDirRecordReader reader = new WholeDirRecordReader();
reader.initialize(split, context);
return reader;
}

@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
List<InputSplit> splits = new ArrayList<>();
Path[] inputPaths = getInputPaths(job);
for (Path path : inputPaths) {
FileSystem fs = path.getFileSystem(job.getConfiguration());
FileStatus file = fs.getFileStatus(path);
long length = file.getLen();
splits.add(new FileSplit(path, 0, length, null));
}
return splits;
}
}


Customed RecordReader:

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class WholeDirRecordReader extends RecordReader<Text, Text> {
private FileSplit fileSplit;
private Configuration conf;
private Text key = new Text();
private Text value = new Text();
private boolean processed = false;

@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
this.fileSplit = (FileSplit) split;
this.conf = context.getConfiguration();
}

@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return key;
}

@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!processed) {
Path dirPath = fileSplit.getPath();
key.set(dirPath.getName());
FileSystem fs = dirPath.getFileSystem(conf);
FileStatus[] listStatus = fs.listStatus(dirPath);
StringBuffer sb = new StringBuffer();
for (FileStatus fileStatus : listStatus) {
long fileLen = fileStatus.getLen();
FSDataInputStream in = null;
try {
in = fs.open(fileStatus.getPath());
byte[] contents = new byte[(int) fileLen];
IOUtils.readFully(in, contents, 0, (int) fileLen);
String fileContent = new String(contents, 0, (int) fileLen);
sb.append(fileContent.replaceAll("\r\n", " "));
} finally {
IOUtils.closeStream(in);
}
}
value.set(sb.toString());
processed = true;
return true;
}
return false;
}

@Override
public float getProgress() throws IOException, InterruptedException {
return processed ? 1.0f : 0.0f;
}

@Override
public void close() throws IOException {
}

}


Run the job:

(you have to set the InputFormatClass as the Customed Class)

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MainJob {

public static class WordCountMapper extends Mapper<Text, Text, Text, Text> {
@Override
protected void map(Text key, Text value, Mapper<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
System.out.println("key-->" + key.toString() + ": value--->" + value.toString());
context.write(key, value);
}
}

public static class WordCountReducer extends Reducer<Text, Text, Text, Text> {

}

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = new Job(conf, "WholeFileInputFormat");
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
job.setInputFormatClass(WholeDirInputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

Path inputPath = new Path("hdfs://192.168.19.128:9000/user/hadoop/tc");
FileSystem fs = FileSystem.get(inputPath.toUri(), conf);
FileStatus[] status = fs.listStatus(inputPath);
for (FileStatus fileStatus : status) {
WholeDirInputFormat.addInputPath(job, fileStatus.getPath());
}

Path counterOutputPath = new Path("hdfs://192.168.19.128:9000/user/hadoop/output");
FileOutputFormat.setOutputPath(job, counterOutputPath);
boolean counterJobCompletion = job.waitForCompletion(true);
if (counterJobCompletion) {

} else {
System.out.println("Counter Job error!");
System.exit(1);
}

}
}


Input directory:



output:



log:

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  InputFormat