Custom InputFormat
2015-11-03 21:15
239 查看
The project use the directoy as an input, and make each directory to be an InputSplit, so that the key-value pair in map method is the directory name and file values in the directory.
Customed InputFormat:
Customed RecordReader:
Run the job:
(you have to set the InputFormatClass as the Customed Class)
Input directory:
output:
log:
Customed InputFormat:
import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; public class WholeDirInputFormat extends FileInputFormat<Text, Text> { @Override protected boolean isSplitable(JobContext context, Path filename) { return false; } @Override public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { WholeDirRecordReader reader = new WholeDirRecordReader(); reader.initialize(split, context); return reader; } @Override public List<InputSplit> getSplits(JobContext job) throws IOException { List<InputSplit> splits = new ArrayList<>(); Path[] inputPaths = getInputPaths(job); for (Path path : inputPaths) { FileSystem fs = path.getFileSystem(job.getConfiguration()); FileStatus file = fs.getFileStatus(path); long length = file.getLen(); splits.add(new FileSplit(path, 0, length, null)); } return splits; } }
Customed RecordReader:
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; public class WholeDirRecordReader extends RecordReader<Text, Text> { private FileSplit fileSplit; private Configuration conf; private Text key = new Text(); private Text value = new Text(); private boolean processed = false; @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { this.fileSplit = (FileSplit) split; this.conf = context.getConfiguration(); } @Override public Text getCurrentKey() throws IOException, InterruptedException { return key; } @Override public Text getCurrentValue() throws IOException, InterruptedException { return value; } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (!processed) { Path dirPath = fileSplit.getPath(); key.set(dirPath.getName()); FileSystem fs = dirPath.getFileSystem(conf); FileStatus[] listStatus = fs.listStatus(dirPath); StringBuffer sb = new StringBuffer(); for (FileStatus fileStatus : listStatus) { long fileLen = fileStatus.getLen(); FSDataInputStream in = null; try { in = fs.open(fileStatus.getPath()); byte[] contents = new byte[(int) fileLen]; IOUtils.readFully(in, contents, 0, (int) fileLen); String fileContent = new String(contents, 0, (int) fileLen); sb.append(fileContent.replaceAll("\r\n", " ")); } finally { IOUtils.closeStream(in); } } value.set(sb.toString()); processed = true; return true; } return false; } @Override public float getProgress() throws IOException, InterruptedException { return processed ? 1.0f : 0.0f; } @Override public void close() throws IOException { } }
Run the job:
(you have to set the InputFormatClass as the Customed Class)
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MainJob { public static class WordCountMapper extends Mapper<Text, Text, Text, Text> { @Override protected void map(Text key, Text value, Mapper<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { System.out.println("key-->" + key.toString() + ": value--->" + value.toString()); context.write(key, value); } } public static class WordCountReducer extends Reducer<Text, Text, Text, Text> { } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = new Job(conf, "WholeFileInputFormat"); job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); job.setInputFormatClass(WholeDirInputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); Path inputPath = new Path("hdfs://192.168.19.128:9000/user/hadoop/tc"); FileSystem fs = FileSystem.get(inputPath.toUri(), conf); FileStatus[] status = fs.listStatus(inputPath); for (FileStatus fileStatus : status) { WholeDirInputFormat.addInputPath(job, fileStatus.getPath()); } Path counterOutputPath = new Path("hdfs://192.168.19.128:9000/user/hadoop/output"); FileOutputFormat.setOutputPath(job, counterOutputPath); boolean counterJobCompletion = job.waitForCompletion(true); if (counterJobCompletion) { } else { System.out.println("Counter Job error!"); System.exit(1); } } }
Input directory:
output:
log:
相关文章推荐
- InputFormat详解 -- RecordReader篇
- hadoop中mapreducer的数据输入(InputFormat)原理详解
- Hadoop-2.4.1学习之InputFormat及源代码分析
- hadoop InputFormat详解
- Hive学习笔记-分隔符处理
- Hadoop之MapReduce编程模型
- hadoop mapreduce 自定义InputFormat
- 简单解析mapreduce切片
- 从源码看Spark读取Hive表数据小文件和分块的问题
- Hadoop源码编译,MR InputFormat重写
- MapReuce笔记六之输入类InputFormat
- MapReduce之数据读取组件InputFormat原理解析
- MapReduce InputFormat之FileInputFormat
- select下拉联动(2) 更具级联性
- 在二维数组中查找数
- Linux Shell中的延时函数 sleep
- 线性表的链状存储结构的实现
- LeetCode 015 3Sum
- ARP数据包格式
- 1020月饼