您的位置:首页 > 运维架构

【Hadoop】Hadoop MR 如何实现倒排索引算法?

2016-09-08 10:28 423 查看
1、概念、方案



2、代码示例

InverseIndexOne

package com.ares.hadoop.mr.inverseindex;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;

public class InverseIndexOne extends Configured implements Tool {

private static final Logger LOGGER = Logger.getLogger(InverseIndexOne.class);
enum Counter {
LINESKIP
}

public static class InverseIndexOneMapper
extends Mapper<LongWritable, Text, Text, LongWritable> {

private String line;
private final static char separatorA = ' ';
private final static char separatorB = '-';
private String fileName;

private Text text = new Text();
private final static LongWritable ONE = new LongWritable(1L);

@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, LongWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
//super.map(key, value, context);
try {
line = value.toString();
String[] fields = StringUtils.split(line, separatorA);

FileSplit fileSplit = (FileSplit) context.getInputSplit();
fileName = fileSplit.getPath().getName();

for (int i = 0; i < fields.length; i++) {
text.set(fields[i] + separatorB + fileName);
context.write(text, ONE);
}
} catch (Exception e) {
// TODO: handle exception
LOGGER.error(e);
System.out.println(e);
context.getCounter(Counter.LINESKIP).increment(1);
return;
}
}
}

public static class InverseIndexOneReducer
extends Reducer<Text, LongWritable, Text, LongWritable> {
private LongWritable result = new LongWritable();

@Override
protected void reduce(Text key, Iterable<LongWritable> values,
Reducer<Text, LongWritable, Text, LongWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
//super.reduce(arg0, arg1, arg2);
long count = 0;
for (LongWritable value : values) {
count += value.get();
}
result.set(count);
context.write(key, result);
}
}

@Override
public int run(String[] args) throws Exception {
// TODO Auto-generated method stub
//return 0;
String errMsg = "InverseIndexOne: TEST STARTED...";
LOGGER.debug(errMsg);
System.out.println(errMsg);

Configuration conf = new Configuration();
//FOR Eclipse JVM Debug
//conf.set("mapreduce.job.jar", "flowsum.jar");
Job job = Job.getInstance(conf);

// JOB NAME
job.setJobName("InverseIndexOne");

// JOB MAPPER & REDUCER
job.setJarByClass(InverseIndexOne.class);
job.setMapperClass(InverseIndexOneMapper.class);
job.setReducerClass(InverseIndexOneReducer.class);

// JOB PARTITION
//job.setPartitionerClass(FlowGroupPartition.class);

// JOB REDUCE TASK NUMBER
//job.setNumReduceTasks(5);

// MAP & REDUCE
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// MAP
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);

// JOB INPUT & OUTPUT PATH
//FileInputFormat.addInputPath(job, new Path(args[0]));
FileInputFormat.setInputPaths(job, args[1]);
Path output = new Path(args[2]);
//        FileSystem fs = FileSystem.get(conf);
//        if (fs.exists(output)) {
//            fs.delete(output, true);
//        }
FileOutputFormat.setOutputPath(job, output);

// VERBOSE OUTPUT
if (job.waitForCompletion(true)) {
errMsg = "InverseIndexOne: TEST SUCCESSFULLY...";
LOGGER.debug(errMsg);
System.out.println(errMsg);
return 0;
} else {
errMsg = "InverseIndexOne: TEST FAILED...";
LOGGER.debug(errMsg);
System.out.println(errMsg);
return 1;
}
}

public static void main(String[] args) throws Exception {
if (args.length != 3) {
String errMsg = "InverseIndexOne: ARGUMENTS ERROR";
LOGGER.error(errMsg);
System.out.println(errMsg);
System.exit(-1);
}

int result = ToolRunner.run(new Configuration(), new InverseIndexOne(), args);
System.exit(result);
}
}


InverseIndexTwo

package com.ares.hadoop.mr.inverseindex;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;

public class InverseIndexTwo extends Configured implements Tool{
private static final Logger LOGGER = Logger.getLogger(InverseIndexOne.class);
enum Counter {
LINESKIP
}

public static class InverseIndexTwoMapper extends
Mapper<LongWritable, Text, Text, Text> {

private String line;
private final static char separatorA = '\t';
private final static char separatorB = '-';

private Text textKey = new Text();
private Text textValue = new Text();

@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
//super.map(key, value, context);
try {
line = value.toString();
String[] fields = StringUtils.split(line, separatorA);
String[] wordAndfileName = StringUtils.split(fields[0], separatorB);
long count = Long.parseLong(fields[1]);
String word = wordAndfileName[0];
String fileName = wordAndfileName[1];

textKey.set(word);
textValue.set(fileName + separatorB + count);
context.write(textKey, textValue);
} catch (Exception e) {
// TODO: handle exception
LOGGER.error(e);
System.out.println(e);
context.getCounter(Counter.LINESKIP).increment(1);
return;
}
}
}

public static class InverseIndexTwoReducer extends
Reducer<Text, Text, Text, Text> {

private Text textValue = new Text();

@Override
protected void reduce(Text key, Iterable<Text> values,
Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
//super.reduce(arg0, arg1, arg2);
StringBuilder index = new StringBuilder("");
//            for (Text text : values) {
//                if (condition) {
//
//                }
//                index.append(text.toString() + separatorA);
//            }
String separatorA = "";
for (Text text : values) {
index.append(separatorA + text.toString());
separatorA = ",";
}
textValue.set(index.toString());
context.write(key, textValue);
}
}

@Override
public int run(String[] args) throws Exception {
// TODO Auto-generated method stub
//return 0;
String errMsg = "InverseIndexTwo: TEST STARTED...";
LOGGER.debug(errMsg);
System.out.println(errMsg);

Configuration conf = new Configuration();
//FOR Eclipse JVM Debug
//conf.set("mapreduce.job.jar", "flowsum.jar");
Job job = Job.getInstance(conf);

// JOB NAME
job.setJobName("InverseIndexTwo");

// JOB MAPPER & REDUCER
job.setJarByClass(InverseIndexTwo.class);
job.setMapperClass(InverseIndexTwoMapper.class);
job.setReducerClass(InverseIndexTwoReducer.class);

// JOB PARTITION
//job.setPartitionerClass(FlowGroupPartition.class);

// JOB REDUCE TASK NUMBER
//job.setNumReduceTasks(5);

// MAP & REDUCE
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// MAP
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

// JOB INPUT & OUTPUT PATH
//FileInputFormat.addInputPath(job, new Path(args[0]));
FileInputFormat.setInputPaths(job, args[1]);
Path output = new Path(args[2]);
//        FileSystem fs = FileSystem.get(conf);
//        if (fs.exists(output)) {
//            fs.delete(output, true);
//        }
FileOutputFormat.setOutputPath(job, output);

// VERBOSE OUTPUT
if (job.waitForCompletion(true)) {
errMsg = "InverseIndexTwo: TEST SUCCESSFULLY...";
LOGGER.debug(errMsg);
System.out.println(errMsg);
return 0;
} else {
errMsg = "InverseIndexTwo: TEST FAILED...";
LOGGER.debug(errMsg);
System.out.println(errMsg);
return 1;
}
}

public static void main(String[] args) throws Exception {
if (args.length != 3) {
String errMsg = "InverseIndexOne: ARGUMENTS ERROR";
LOGGER.error(errMsg);
System.out.println(errMsg);
System.exit(-1);
}

int result = ToolRunner.run(new Configuration(), new InverseIndexTwo(), args);
System.exit(result);
}

}


参考资料:

How to check if processing the last item in an Iterator?:http://stackoverflow.com/questions/9633991/how-to-check-if-processing-the-last-item-in-an-iterator
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: