使用Hortonworks Sanbox 练习 Hadoop 和 MapReduce
2015-09-07 08:35
323 查看
最近在上Coursera的云计算系列课程。在Cloud Application里面,需要提交练习编写MapReduce的作业。便捷模拟Hadoop环境的虚拟机是Hortonworks Sanbox。
开机之后可以SSH登陆,127.0.0.1:2222
附源码
开机之后可以SSH登陆,127.0.0.1:2222
# 添加环境变量 export HADOOP_CLASSPATH=$JAVA_HOME/lib/tools.jar # 编译 hadoop com.sun.tools.javac.Main TopTitleStatistics.java -d build # 打包jar jar -cvf TopTitleStatistics.jar -C build/ ./ # 执行 hadoop jar TopTitleStatistics.jar TopTitleStatistics -D stopwords=/mp2/misc/stopwords.txt -D delimiters=/mp2/misc/delimiters.txt -D N=5 /mp2/titles /mp2/C-output # 查看输出结果 hadoop fs -cat /mp2/C-output/part* | head -n 100 # 删除输出和编译结果(如果要重新运行,必须删除输出) hadoop fs -rm -r /mp2/C-output rm -rf ./build/* ./TopTitleStatistics.jar
附源码
import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.util.Arrays; import java.util.List; import java.util.StringTokenizer; import java.util.TreeSet; /* * TopTitles.java */ // >>> Don't Change public class TopTitles extends Configured implements Tool { public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new TopTitles(), args); System.exit(res); } @Override public int run(String[] args) throws Exception { Configuration conf = this.getConf(); FileSystem fs = FileSystem.get(conf); Path tmpPath = new Path("/mp2/tmp"); fs.delete(tmpPath, true); Job jobA = Job.getInstance(conf, "Title Count"); jobA.setOutputKeyClass(Text.class); jobA.setOutputValueClass(IntWritable.class); jobA.setMapperClass(TitleCountMap.class); jobA.setReducerClass(TitleCountReduce.class); FileInputFormat.setInputPaths(jobA, new Path(args[0])); FileOutputFormat.setOutputPath(jobA, tmpPath); jobA.setJarByClass(TopTitles.class); jobA.waitForCompletion(true); Job jobB = Job.getInstance(conf, "Top Titles"); jobB.setOutputKeyClass(Text.class); jobB.setOutputValueClass(IntWritable.class); jobB.setMapOutputKeyClass(NullWritable.class); jobB.setMapOutputValueClass(TextArrayWritable.class); jobB.setMapperClass(TopTitlesMap.class); jobB.setReducerClass(TopTitlesReduce.class); jobB.setNumReduceTasks(1); FileInputFormat.setInputPaths(jobB, tmpPath); FileOutputFormat.setOutputPath(jobB, new Path(args[1])); jobB.setInputFormatClass(KeyValueTextInputFormat.class); jobB.setOutputFormatClass(TextOutputFormat.class); jobB.setJarByClass(TopTitles.class); return jobB.waitForCompletion(true) ? 0 : 1; } public static String readHDFSFile(String path, Configuration conf) throws IOException{ Path pt=new Path(path); FileSystem fs = FileSystem.get(pt.toUri(), conf); FSDataInputStream file = fs.open(pt); BufferedReader buffIn=new BufferedReader(new InputStreamReader(file)); StringBuilder everything = new StringBuilder(); String line; while( (line = buffIn.readLine()) != null) { everything.append(line); everything.append("\n"); } return everything.toString(); } public static class TextA 4000 rrayWritable extends ArrayWritable { public TextArrayWritable() { super(Text.class); } public TextArrayWritable(String[] strings) { super(Text.class); Text[] texts = new Text[strings.length]; for (int i = 0; i < strings.length; i++) { texts[i] = new Text(strings[i]); } set(texts); } } // <<< Don't Change public static class TitleCountMap extends Mapper<Object, Text, Text, IntWritable> { List<String> stopWords; String delimiters; @Override protected void setup(Context context) throws IOException,InterruptedException { Configuration conf = context.getConfiguration(); String stopWordsPath = conf.get("stopwords"); String delimitersPath = conf.get("delimiters"); this.stopWords = Arrays.asList(readHDFSFile(stopWordsPath, conf).split("\n")); this.delimiters = readHDFSFile(delimitersPath, conf); } @Override public void map(Object key, Text value, Context context) throws IOException, InterruptedException { // TODO String line = value.toString(); StringTokenizer st = new StringTokenizer(line, delimiters); while (st.hasMoreTokens()) { String word = (st.nextToken()).trim().toLowerCase(); if (!stopWords.contains(word)) { context.write(new Text(word), new IntWritable(1)); } } } } public static class TitleCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // TODO int sum = 0; for (IntWritable val: values) { sum += val.get(); } context.write(key, new IntWritable(sum)); } } public static class TopTitlesMap extends Mapper<Text, Text, NullWritable, TextArrayWritable> { Integer N; // TODO /* * add TreeSet. Item in treeset are sorted acsended * sorted by KEY automatically */ TreeSet<Pair<Integer,String>> titleCountMap = new TreeSet<Pair<Integer,String>>(); @Override protected void setup(Context context) throws IOException,InterruptedException { Configuration conf = context.getConfiguration(); this.N = conf.getInt("N", 10); } @Override public void map(Text key, Text value, Context context) throws IOException, InterruptedException { // TODO String word = key.toString(); Integer count = Integer.parseInt(value.toString()); titleCountMap.add(new Pair<Integer,String>(count,word)); if (titleCountMap.size() > N) { //remove too much items, no more than N (default 10) titleCountMap.remove(titleCountMap.first()); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { // TODO //When mapper is nearly finish, method cleanup() is called for (Pair<Integer,String> item: titleCountMap) { String[] strings = {item.second, item.first.toString()}; TextArrayWritable val = new TextArrayWritable(strings); context.write(NullWritable.get(), val); } } } public static class TopTitlesReduce extends Reducer<NullWritable, TextArrayWritable, Text, IntWritable> { Integer N; // TODO TreeSet<Pair<Integer,String>> titleCountMap = new TreeSet<Pair<Integer,String>>(); @Override protected void setup(Context context) throws IOException,InterruptedException { Configuration conf = context.getConfiguration(); this.N = conf.getInt("N", 10); } @Override public void reduce(NullWritable key, Iterable<TextArrayWritable> values, Context context) throws IOException, InterruptedException { // TODO //Because Mapper's output key is a NullWritable, all of the output will send to a single reducer for (TextArrayWritable val:values) { Text[] pair = (Text[]) val.toArray(); String word = pair[0].toString(); Integer count = Integer.parseInt(pair[1].toString()); titleCountMap.add(new Pair<Integer,String>(count,word)); } if (titleCountMap.size() > N) { titleCountMap.remove(titleCountMap.first()); } for (Pair<Integer,String> item:titleCountMap) { Text word = new Text(item.second); IntWritable count = new IntWritable(item.first); context.write(word, count); } } } } // >>> Don't Change class Pair<A extends Comparable<? super A>, B extends Comparable<? super B>> implements Comparable<Pair<A, B>> { public final A first; public final B second; public Pair(A first, B second) { this.first = first; this.second = second; } public static <A extends Comparable<? super A>, B extends Comparable<? super B>> Pair<A, B> of(A first, B second) { return new Pair<A, B>(first, second); } @Override public int compareTo(Pair<A, B> o) { int cmp = o == null ? 1 : (this.first).compareTo(o.first); return cmp == 0 ? (this.second).compareTo(o.second) : cmp; } @Override public int hashCode() { return 31 * hashcode(first) + hashcode(second); } private static int hashcode(Object o) { return o == null ? 0 : o.hashCode(); } @Override public boolean equals(Object obj) { if (!(obj instanceof Pair)) return false; if (this == obj) return true; return equal(first, ((Pair<?, ?>) obj).first) && equal(second, ((Pair<?, ?>) obj).second); } private boolean equal(Object o1, Object o2) { return o1 == o2 || (o1 != null && o1.equals(o2)); } @Override public String toString() { return "(" + first + ", " + second + ')'; } } // <<< Don't Change
相关文章推荐
- 简单易懂云计算(转自天涯感谢原楼主iamsatisfied)
- Hadoop_2.1.0 MapReduce序列图
- 2011云计算知识库:盘点千奇百怪的云名称
- MongoDB中的MapReduce简介
- MongoDB学习笔记之MapReduce使用示例
- MongoDB中MapReduce编程模型使用实例
- MapReduce中ArrayWritable 使用指南
- Java函数式编程(七):MapReduce
- java连接hdfs ha和调用mapreduce jar示例
- 用PHP和Shell写Hadoop的MapReduce程序
- JavaScript mapreduce工作原理简析
- mongodb mapredReduce 多个条件分组(group by)
- 3ff8 《sharepoint 2010云计算解决方案》使用SQL Azure 的BI 解决方案
- IaaS, PaaS, SaaS 解释
- HBase基本原理
- HDFS DatanodeProtocol——sendHeartbeat
- HDFS DatanodeProtocol——register
- Hadoop集群提交作业问题总结
- Hadoop源码分析 HDFS ClientProtocol——addBlock
- Hadoop源码分析HDFS ClientProtocol——create