您的位置:首页 > 编程语言

MapReduce编程实战(二)——20151112

2015-11-12 21:39 441 查看
MapReduce Ruby编程

Hadoop的Streaming使用UNIX标准流作为Hadoop和应用程序之间的接口,所以我们可以使用任何编程语言通过标准输入、输出来写MapReduce程序。

关于Ruby的环境安装,可以参照这篇文章:
http://www.linuxidc.com/Linux/2014-04/100242.htm
Ruby改写的查找最高气温的程序如下。

map.rb

#!/usr/local/rvm/bin/ruby

STDIN.each_line do |line|

val = line

year,temp,q = val[15,4],val[87,5],val[92,1]

puts "#{year}\t#{temp}" if (temp != "+9999" && q =~ /[01459]/)

end

reduce.rb

#!/usr/local/rvm/bin/ruby

last_key,max_val = nil,0

STDIN.each_line do |line|

key,val = line.split("\t")

if last_key && last_key != key

puts "#{last_key}\t#{max_val}"

last_key,max_val = key,val.to_i

else

last_key,max_val = key,[max_val,val.to_i].max

end

end

puts "#{last_key}\t#{max_val}" if last_key

使用Unix管道来模拟整个MapReduce过程,如下:

cat temperature.txt | ./map.rb | sort | ./reduce.rb

或者:cat temperature.txt | ruby map.rb | sort | ruby reduce.rb

可以看到,输出结果和Java是一样的。

在集群中运行

运行Java的MapReduce:

我在程序中写死了HDFS的输入路径为/hadooptemp/input/2,输出路径为/hadooptemp/output,运行Java的MapReduce的大致步骤如下:

(1)上传jar包到服务器:test.jar

(2)hadoop fs -mkdir -p /hadooptemp/input/2

(3)hadoop fs -put /home/hadoop/temperature.txt /hadooptemp/input/2

(4)运行:hadoop jar test.jar MaxTemperature

(5)查看输出结果:hadoop fs -ls /hadooptemp/output hadoop fs -cat /hadooptemp/output/part-00000

运行Ruby的MapReduce:

hadoop jar /home/hadoop/hadoop-2.2.0/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar \

-input /hadooptemp/input/2

-output /hadooptemp/output

-mapper "map.rb | sort | reduce.rb"

-reducer reduce.rb

这里的mapper部分中的reduce.rb,实际起到了combiner的作用。

WordCount MapReduce程序演示

代码如下:

import java.io.IOException;

import java.util.Iterator;

import java.util.StringTokenizer;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.FileInputFormat;

import org.apache.hadoop.mapred.FileOutputFormat;

import org.apache.hadoop.mapred.JobClient;

import org.apache.hadoop.mapred.JobConf;

import org.apache.hadoop.mapred.MapReduceBase;

import org.apache.hadoop.mapred.Mapper;

import org.apache.hadoop.mapred.OutputCollector;

import org.apache.hadoop.mapred.Reducer;

import org.apache.hadoop.mapred.Reporter;

import org.apache.hadoop.mapred.TextInputFormat;

import org.apache.hadoop.mapred.TextOutputFormat;

public class WordCount {

public static void main(String[] args) throws Exception {

JobConf conf = new JobConf(WordCount.class);

conf.setJobName("World Count");

// FileInputFormat.setInputPaths(conf, new Path(args[0]));

// FileOutputFormat.setOutputPath(conf, new Path(args[1]));

FileInputFormat.setInputPaths(conf, new Path("/hadooptemp/input/1"));

FileOutputFormat.setOutputPath(conf, new Path("/hadooptemp/output"));

conf.setMapperClass(Map.class);

conf.setCombinerClass(Reduce.class);

conf.setReducerClass(Reduce.class);

conf.setOutputKeyClass(Text.class);

conf.setOutputValueClass(IntWritable.class);

conf.setInputFormat(TextInputFormat.class);

conf.setOutputFormat(TextOutputFormat.class);

JobClient.runJob(conf);

}

}

class Map extends MapReduceBase implements

Mapper<LongWritable, Text, Text, IntWritable> {

private final static IntWritable one = new IntWritable(1);

private Text word = new Text();

public void map(LongWritable key, Text value,

OutputCollector<Text, IntWritable> output, Reporter reporter)

throws IOException {

String line = value.toString();

StringTokenizer tokenizer = new StringTokenizer(line);

while (tokenizer.hasMoreTokens()) {

word.set(tokenizer.nextToken());

output.collect(word, one);

}

}

}

class Reduce extends MapReduceBase implements

Reducer<Text, IntWritable, Text, IntWritable> {

public void reduce(Text key, Iterator<IntWritable> values,

OutputCollector<Text, IntWritable> output, Reporter reporter)

throws IOException {

int sum = 0;

while (values.hasNext()) {

sum += values.next().get();

}

output.collect(key, new IntWritable(sum));

}

}

示例数据:

hello world

nihao

hello beijing
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: