MapReduce编程实战(二)——20151112
2015-11-12 21:39
441 查看
MapReduce Ruby编程
Hadoop的Streaming使用UNIX标准流作为Hadoop和应用程序之间的接口,所以我们可以使用任何编程语言通过标准输入、输出来写MapReduce程序。
关于Ruby的环境安装,可以参照这篇文章:
http://www.linuxidc.com/Linux/2014-04/100242.htm
Ruby改写的查找最高气温的程序如下。
map.rb
#!/usr/local/rvm/bin/ruby
STDIN.each_line do |line|
val = line
year,temp,q = val[15,4],val[87,5],val[92,1]
puts "#{year}\t#{temp}" if (temp != "+9999" && q =~ /[01459]/)
end
reduce.rb
#!/usr/local/rvm/bin/ruby
last_key,max_val = nil,0
STDIN.each_line do |line|
key,val = line.split("\t")
if last_key && last_key != key
puts "#{last_key}\t#{max_val}"
last_key,max_val = key,val.to_i
else
last_key,max_val = key,[max_val,val.to_i].max
end
end
puts "#{last_key}\t#{max_val}" if last_key
使用Unix管道来模拟整个MapReduce过程,如下:
cat temperature.txt | ./map.rb | sort | ./reduce.rb
或者:cat temperature.txt | ruby map.rb | sort | ruby reduce.rb
可以看到,输出结果和Java是一样的。
在集群中运行
运行Java的MapReduce:
我在程序中写死了HDFS的输入路径为/hadooptemp/input/2,输出路径为/hadooptemp/output,运行Java的MapReduce的大致步骤如下:
(1)上传jar包到服务器:test.jar
(2)hadoop fs -mkdir -p /hadooptemp/input/2
(3)hadoop fs -put /home/hadoop/temperature.txt /hadooptemp/input/2
(4)运行:hadoop jar test.jar MaxTemperature
(5)查看输出结果:hadoop fs -ls /hadooptemp/output hadoop fs -cat /hadooptemp/output/part-00000
运行Ruby的MapReduce:
hadoop jar /home/hadoop/hadoop-2.2.0/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar \
-input /hadooptemp/input/2
-output /hadooptemp/output
-mapper "map.rb | sort | reduce.rb"
-reducer reduce.rb
这里的mapper部分中的reduce.rb,实际起到了combiner的作用。
WordCount MapReduce程序演示
代码如下:
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
public class WordCount {
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("World Count");
// FileInputFormat.setInputPaths(conf, new Path(args[0]));
// FileOutputFormat.setOutputPath(conf, new Path(args[1]));
FileInputFormat.setInputPaths(conf, new Path("/hadooptemp/input/1"));
FileOutputFormat.setOutputPath(conf, new Path("/hadooptemp/output"));
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
JobClient.runJob(conf);
}
}
class Map extends MapReduceBase implements
Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}
}
class Reduce extends MapReduceBase implements
Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
示例数据:
hello world
nihao
hello beijing
Hadoop的Streaming使用UNIX标准流作为Hadoop和应用程序之间的接口,所以我们可以使用任何编程语言通过标准输入、输出来写MapReduce程序。
关于Ruby的环境安装,可以参照这篇文章:
http://www.linuxidc.com/Linux/2014-04/100242.htm
Ruby改写的查找最高气温的程序如下。
map.rb
#!/usr/local/rvm/bin/ruby
STDIN.each_line do |line|
val = line
year,temp,q = val[15,4],val[87,5],val[92,1]
puts "#{year}\t#{temp}" if (temp != "+9999" && q =~ /[01459]/)
end
reduce.rb
#!/usr/local/rvm/bin/ruby
last_key,max_val = nil,0
STDIN.each_line do |line|
key,val = line.split("\t")
if last_key && last_key != key
puts "#{last_key}\t#{max_val}"
last_key,max_val = key,val.to_i
else
last_key,max_val = key,[max_val,val.to_i].max
end
end
puts "#{last_key}\t#{max_val}" if last_key
使用Unix管道来模拟整个MapReduce过程,如下:
cat temperature.txt | ./map.rb | sort | ./reduce.rb
或者:cat temperature.txt | ruby map.rb | sort | ruby reduce.rb
可以看到,输出结果和Java是一样的。
在集群中运行
运行Java的MapReduce:
我在程序中写死了HDFS的输入路径为/hadooptemp/input/2,输出路径为/hadooptemp/output,运行Java的MapReduce的大致步骤如下:
(1)上传jar包到服务器:test.jar
(2)hadoop fs -mkdir -p /hadooptemp/input/2
(3)hadoop fs -put /home/hadoop/temperature.txt /hadooptemp/input/2
(4)运行:hadoop jar test.jar MaxTemperature
(5)查看输出结果:hadoop fs -ls /hadooptemp/output hadoop fs -cat /hadooptemp/output/part-00000
运行Ruby的MapReduce:
hadoop jar /home/hadoop/hadoop-2.2.0/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar \
-input /hadooptemp/input/2
-output /hadooptemp/output
-mapper "map.rb | sort | reduce.rb"
-reducer reduce.rb
这里的mapper部分中的reduce.rb,实际起到了combiner的作用。
WordCount MapReduce程序演示
代码如下:
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
public class WordCount {
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("World Count");
// FileInputFormat.setInputPaths(conf, new Path(args[0]));
// FileOutputFormat.setOutputPath(conf, new Path(args[1]));
FileInputFormat.setInputPaths(conf, new Path("/hadooptemp/input/1"));
FileOutputFormat.setOutputPath(conf, new Path("/hadooptemp/output"));
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
JobClient.runJob(conf);
}
}
class Map extends MapReduceBase implements
Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}
}
class Reduce extends MapReduceBase implements
Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
示例数据:
hello world
nihao
hello beijing
相关文章推荐
- JAVA经典及细节总结
- C#WebBrowser控件使用教程与技巧收集
- MapReduce编程实战(一)——20151112
- IIS对ASP的解析问题
- eclipse 编辑器的使用
- java.util.ConcurrentModificationException 解决办法
- 【坑】当java的环境变量配置正确还是不能运行eclipse的情况
- Qt从零开始制作串口调试助手-(第七章、设置窗口标题(中文)和图标)-Creator_Ly
- python 文件、目录操作(新增、移动、删除等)
- phpStorm显示localhost:63342 和 502 Bad gateway解决方法
- java监听器的学习与应用
- phpStorm显示localhost:63342 和 502 Bad gateway解决方法
- RM报表 文本框 自动换行 相关代码
- webpy源码阅读
- ASP.NET 页生命周期概述
- Java中可变长参数的使用及注意事项
- python,学校成员类的例子,老师和学生(python class父类与子类之间的联系与逻辑)
- Erlang OTP学习(2):gen_event
- lua开发--web页面服务
- Prime C++ copy 构造函数