您的位置:首页 > 运维架构

使用Hortonworks Sanbox 练习 Hadoop 和 MapReduce

2015-09-07 08:35 323 查看
最近在上Coursera的云计算系列课程。在Cloud Application里面,需要提交练习编写MapReduce的作业。便捷模拟Hadoop环境的虚拟机是Hortonworks Sanbox

开机之后可以SSH登陆,127.0.0.1:2222

# 添加环境变量
export HADOOP_CLASSPATH=$JAVA_HOME/lib/tools.jar
# 编译
hadoop com.sun.tools.javac.Main  TopTitleStatistics.java  -d build
# 打包jar
jar -cvf  TopTitleStatistics.jar -C build/ ./
# 执行
hadoop jar TopTitleStatistics.jar TopTitleStatistics -D stopwords=/mp2/misc/stopwords.txt -D delimiters=/mp2/misc/delimiters.txt -D N=5 /mp2/titles /mp2/C-output

# 查看输出结果
hadoop fs -cat /mp2/C-output/part* | head -n 100

# 删除输出和编译结果(如果要重新运行,必须删除输出)
hadoop fs -rm -r /mp2/C-output
rm -rf ./build/* ./TopTitleStatistics.jar


附源码

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.List;
import java.util.StringTokenizer;
import java.util.TreeSet;

/*
* TopTitles.java
*/

// >>> Don't Change
public class TopTitles extends Configured implements Tool {

public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new TopTitles(), args);
System.exit(res);
}

@Override
public int run(String[] args) throws Exception {
Configuration conf = this.getConf();
FileSystem fs = FileSystem.get(conf);
Path tmpPath = new Path("/mp2/tmp");
fs.delete(tmpPath, true);

Job jobA = Job.getInstance(conf, "Title Count");
jobA.setOutputKeyClass(Text.class);
jobA.setOutputValueClass(IntWritable.class);

jobA.setMapperClass(TitleCountMap.class);
jobA.setReducerClass(TitleCountReduce.class);

FileInputFormat.setInputPaths(jobA, new Path(args[0]));
FileOutputFormat.setOutputPath(jobA, tmpPath);

jobA.setJarByClass(TopTitles.class);
jobA.waitForCompletion(true);

Job jobB = Job.getInstance(conf, "Top Titles");
jobB.setOutputKeyClass(Text.class);
jobB.setOutputValueClass(IntWritable.class);

jobB.setMapOutputKeyClass(NullWritable.class);
jobB.setMapOutputValueClass(TextArrayWritable.class);

jobB.setMapperClass(TopTitlesMap.class);
jobB.setReducerClass(TopTitlesReduce.class);
jobB.setNumReduceTasks(1);

FileInputFormat.setInputPaths(jobB, tmpPath);
FileOutputFormat.setOutputPath(jobB, new Path(args[1]));

jobB.setInputFormatClass(KeyValueTextInputFormat.class);
jobB.setOutputFormatClass(TextOutputFormat.class);

jobB.setJarByClass(TopTitles.class);
return jobB.waitForCompletion(true) ? 0 : 1;
}

public static String readHDFSFile(String path, Configuration conf) throws IOException{
Path pt=new Path(path);
FileSystem fs = FileSystem.get(pt.toUri(), conf);
FSDataInputStream file = fs.open(pt);
BufferedReader buffIn=new BufferedReader(new InputStreamReader(file));

StringBuilder everything = new StringBuilder();
String line;
while( (line = buffIn.readLine()) != null) {
everything.append(line);
everything.append("\n");
}
return everything.toString();
}

public static class TextA
4000
rrayWritable extends ArrayWritable {
public TextArrayWritable() {
super(Text.class);
}

public TextArrayWritable(String[] strings) {
super(Text.class);
Text[] texts = new Text[strings.length];
for (int i = 0; i < strings.length; i++) {
texts[i] = new Text(strings[i]);
}
set(texts);
}
}
// <<< Don't Change

public static class TitleCountMap extends Mapper<Object, Text, Text, IntWritable> {
List<String> stopWords;
String delimiters;

@Override
protected void setup(Context context) throws IOException,InterruptedException {

Configuration conf = context.getConfiguration();

String stopWordsPath = conf.get("stopwords");
String delimitersPath = conf.get("delimiters");

this.stopWords = Arrays.asList(readHDFSFile(stopWordsPath, conf).split("\n"));
this.delimiters = readHDFSFile(delimitersPath, conf);
}

@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
// TODO
String line = value.toString();
StringTokenizer st = new StringTokenizer(line, delimiters);
while (st.hasMoreTokens()) {
String word = (st.nextToken()).trim().toLowerCase();
if (!stopWords.contains(word)) {
context.write(new Text(word), new IntWritable(1));
}
}
}
}

public static class TitleCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// TODO
int sum = 0;
for (IntWritable val: values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}

public static class TopTitlesMap extends Mapper<Text, Text, NullWritable, TextArrayWritable> {
Integer N;
// TODO
/*
* add TreeSet. Item in treeset are sorted acsended
* sorted by KEY automatically
*/
TreeSet<Pair<Integer,String>> titleCountMap = new TreeSet<Pair<Integer,String>>();

@Override
protected void setup(Context context) throws IOException,InterruptedException {
Configuration conf = context.getConfiguration();
this.N = conf.getInt("N", 10);
}

@Override
public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
// TODO
String word = key.toString();
Integer count = Integer.parseInt(value.toString());

titleCountMap.add(new Pair<Integer,String>(count,word));
if (titleCountMap.size() > N) {  //remove too much items, no more than N (default 10)
titleCountMap.remove(titleCountMap.first());
}
}

@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
// TODO
//When mapper is nearly finish, method cleanup() is called
for (Pair<Integer,String> item: titleCountMap) {
String[] strings = {item.second, item.first.toString()};

TextArrayWritable val = new TextArrayWritable(strings);
context.write(NullWritable.get(), val);
}
}
}

public static class TopTitlesReduce extends Reducer<NullWritable, TextArrayWritable, Text, IntWritable> {
Integer N;
// TODO
TreeSet<Pair<Integer,String>> titleCountMap = new TreeSet<Pair<Integer,String>>();

@Override
protected void setup(Context context) throws IOException,InterruptedException {
Configuration conf = context.getConfiguration();
this.N = conf.getInt("N", 10);
}

@Override
public void reduce(NullWritable key, Iterable<TextArrayWritable> values, Context context) throws IOException, InterruptedException {
// TODO
//Because Mapper's output key is a NullWritable, all of the output will send to a single reducer
for (TextArrayWritable val:values) {
Text[] pair = (Text[]) val.toArray();
String word = pair[0].toString();
Integer count = Integer.parseInt(pair[1].toString());
titleCountMap.add(new Pair<Integer,String>(count,word));
}
if (titleCountMap.size() > N) {
titleCountMap.remove(titleCountMap.first());
}
for (Pair<Integer,String> item:titleCountMap) {
Text word = new Text(item.second);
IntWritable count = new IntWritable(item.first);
context.write(word, count);
}
}
}

}

// >>> Don't Change
class Pair<A extends Comparable<? super A>,
B extends Comparable<? super B>>
implements Comparable<Pair<A, B>> {

public final A first;
public final B second;

public Pair(A first, B second) {
this.first = first;
this.second = second;
}

public static <A extends Comparable<? super A>,
B extends Comparable<? super B>>
Pair<A, B> of(A first, B second) {
return new Pair<A, B>(first, second);
}

@Override
public int compareTo(Pair<A, B> o) {
int cmp = o == null ? 1 : (this.first).compareTo(o.first);
return cmp == 0 ? (this.second).compareTo(o.second) : cmp;
}

@Override
public int hashCode() {
return 31 * hashcode(first) + hashcode(second);
}

private static int hashcode(Object o) {
return o == null ? 0 : o.hashCode();
}

@Override
public boolean equals(Object obj) {
if (!(obj instanceof Pair))
return false;
if (this == obj)
return true;
return equal(first, ((Pair<?, ?>) obj).first)
&& equal(second, ((Pair<?, ?>) obj).second);
}

private boolean equal(Object o1, Object o2) {
return o1 == o2 || (o1 != null && o1.equals(o2));
}

@Override
public String toString() {
return "(" + first + ", " + second + ')';
}
}
// <<< Don't Change
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息