您的位置:首页 > 编程语言 > Java开发

HDPCD-Java-复习笔记(7)- lab

2017-10-12 23:52 411 查看
Java Lab Booklet

Lab: Distributed Grep


You will search the U.S. Constitution for string patterns matching any string you want to pass in on the command line.

Split that line of text into words (using a space as the separator andbackslash as the escape character):

·        String [] words =StringUtils.split(value.toString(),  '\\', ' ');
代码:

package grep;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Grep extends Configured implements Tool {

public static class GrepMapper extends
Mapper<LongWritable, Text, Text, IntWritable> {
private String searchStr = null;
private Text outputKey = new Text();
private static final IntWritable ONE = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
String[] words = StringUtils.split(value.toString(), '\\', ' ');
for (String word : words) {
if (word.contains(searchStr)) {
outputKey.set(word);
context.write(outputKey, ONE);
}

}
}

@Override
protected void setup(
Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
searchStr = context.getConfiguration().get("searchString");
super.setup(context);
}

}

@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(getConf(), "GrepJob");
Configuration configuration = job.getConfiguration();
configuration.set("searchString", args[2]);
job.setJarByClass(getClass());
Path in = new Path(args[0]);
Path out = new Path(args[1]);
out.getFileSystem(configuration).delete(out, true);
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setMapperClass(GrepMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
return job.waitForCompletion(true) ? 0 : 1;
}

public static void main(String[] args) throws Exception {
int result = ToolRunner.run(new Configuration(), new Grep(), args);
System.exit(result);
}

}

Using Inverted Index Mapping in MapReduce

The resulting file will have the original content but with the indexes inverted.

The first value incoming is the Web page URL, which is going to be the output value. For each word that follows the URL, output a key/value pair containing the word as the key and the URL
as the value.

转换数据例子:
http://www.baidu.com,新闻,博客  http://blog.163.com,博客,技术
-------------------------------------》》》

博客,http://www.baidu.com,http://blog.163.com

新闻,http://www.baidu.com

技术,http://blog.163.com

代码:

package inverted;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class IndexInverterJob extends Configured implements Tool {

public static class IndexInverterMapper extends
Mapper<LongWritable, Text, Text, Text> {

private Text outputKey = new Text();
private Text outputValue = new Text();

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] words = StringUtils.split(value.toString(), ',');
for (int i = 1; i < words.length; i++) {
outputKey.set(words[i]);
outputValue.set(words[0]);
context.write(outputKey, outputValue);
}

}
}

public static class IndexInverterReducer extends
Reducer<Text, Text, Text, Text> {
private Text outputValue = new Text();

@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
StringBuilder builder = new StringBuilder();
for (Text value : values) {
builder.append(value.toString()).append(",");
}
builder.deleteCharAt(builder.length() - 1);
outputValue.set(builder.toString());
context.write(key, outputValue);
}

}

@Override
public int run(String[] args) throws Exception {
Configuration conf = super.getConf();
Job job = Job.getInstance(conf, "IndexInverterJob");

job.setJarByClass(IndexInverterJob.class);

Path in = new Path(args[0]);
Path out = new Path(args[1]);
out.getFileSystem(conf).delete(out, true);
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job,  out);

job.setMapperClass(IndexInverterMapper.class);
job.setReducerClass(IndexInverterReducer.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

return job.waitForCompletion(true)?0:1;
}

public static void main(String[] args) {
int result;
try {
result = ToolRunner.run(new Configuration(),
new IndexInverterJob(), args);
System.exit(result);
} catch (Exception e) {
e.printStackTrace();
}
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hdp