您的位置:首页 > 编程语言

【Hadoop系列第五章】MapReduce2.0编程实践(下)实践

2015-07-18 10:29 661 查看

MapReduce2.0编程实践(下)实践

一、运行实例

假设hdfs文件系统中什么都没有,这里我先清空以前的东西。

注意不能使用

[frank@localhost bin]$ ./hdfs dfs -rm -r -f /home/*
来清空hdfs系统中/home目录下全部内容,因为shell会将/home/*解读为本地/home/目录下全部文件和文件夹,但是在hdfs上未必有相对应路径的文件。

所以应该这样做

[frank@localhost bin]$ ./hdfs dfs -rm -r -f /home/
不过这样/home目录也没有了。

我们先建立文件夹/home/input

[frank@localhost bin]$ ./hdfs dfs mkdir /home
[frank@localhost bin]$ ./hdfs dfs mkdir /home/input
在input文件夹中放入需要统计词汇的文件test.txt

test.txt内容如下:

i
have
a
book
you
do
not
have
one
so
i
am
better
than
you
ha
ha


[frank@localhost bin]$ ./hdfs dfs -put /home/frank/input/* /home/input
使用系统自己的wordcount看看效果。

注意,要保证输出目录不存在。

[frank@localhost bin]$ ./hadoop jar ../share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0-cdh5.4.4.jar wordcount /home/tmp /home/output
[frank@localhost bin]$ ./hadoop fs -cat /home/output/*
15/07/17 14:34:15 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
a	1
am	1
better	1
book	1
do	1
ha	2
have	2
i	2
not	1
one	1
so	1
than	1
you	2


二、Eclipse环境下java开发mapreduce程序

首先,打开eclipse,新建一个java程序,添加需要的jar包。选中项目,右键properties->Java Build Path->Libraries->Add Extern JARs

需要添加的包有:

\hadoop-2.6.0-cdh5.4.4\share\hadoop\mapreduce2\
\hadoop-2.6.0-cdh5.4.4\share\hadoop\common\
\hadoop-2.6.0-cdh5.4.4\share\hadoop\common\lib\
这三个文件夹下的jar包

不同的版本可能路径有出入,自己灵活变动。

这里我懒得写一个新的wordcount,下载源码,CDH版本就有源码,把源码里面的拷过来,路径在

\hadoop-2.6.0-cdh5.4.4\src\hadoop-mapreduce-project\hadoop-mapreduce-examples\src\main\java\org\apache\hadoop\examples\WordCount.java 里面

贴上代码,方便分析。

package org.apache.hadoop.examples;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{

private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}

public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}


然后点右键,Export->Java->JAR包

将生成的wordcount.jar包 传至服务器,使用这个jar包运行看看效果。

[frank@localhost bin]$ ./hadoop jar /home/frank/wordcount.jar WordCount /home/tmp /home/Output2
同样可以查看,效果一样。

三、C++版本的MapReduce程序

Map程序 mapper2.cpp

// By dongxicheng,
// blog:http://dongxicheng.org/
// mapper.cpp
#include <iostream>
#include <string>
using namespace std;

int main() {
string key;
while(cin >> key) {
cout << key << "\t" << "1" << endl;
}
return 0;
}


Reduce程序 reducer.cpp

// By dongxicheng,
// blog:http://dongxicheng.org/
// reducer.cpp
#include <iostream>
#include <string>

using namespace std;
int main() {
string cur_key, last_key, value;
cin >> cur_key >> value;
last_key = cur_key;
int n = 1;
while(cin >> cur_key) {
cin >> value;
if(last_key != cur_key) {
cout << last_key << "\t" << n << endl;
last_key = cur_key;
n = 1;
} else {
n++;
}
}
cout << last_key << "\t" << n << endl;
return 0;
}


编译:

[frank@localhost bin]$ g++ -o mapper /home/frank/sample/mapper2.cpp
[frank@localhost bin]$ g++ -o reducer /home/frank/sample/reducer.cpp


可以先用linux的管道本地测试一下

[frank@localhost sample]$ cat test.txt |./mapper |sort |./reducer
a	1
am	1
better	1
book	1
do	1
ha	2
have	2
i	2
not	1
one	1
so	1
than	1
you	2


写一个脚本run_cpp_mr.sh,来运行Hadoop执行

#!/bin/bash
HADOOP_HOME=/home/frank/hadoop/hadoop-2.6.0-cdh5.4.4
INPUT_PATH=/home/tmp
OUTPUT_PATH=/home/output_cpp
echo "Clearing output path: $OUTPUT_PATH"
$HADOOP_HOME/bin/hadoop fs -rmr $OUTPUT_PATH

${HADOOP_HOME}/bin/hadoop jar\
${HADOOP_HOME}/share/hadoop/tools/lib/hadoop-streaming-2.6.0-cdh5.4.4.jar\
-D mapred.reduce.tasks=2\
-D mapreduce.iterator.no=100\
-files mapper,reducer\
-input $INPUT_PATH\
-output $OUTPUT_PATH\
-mapper mapper\
-reducer reducer


其中,HADOOP_HOME是自己Hadoop目录,注意很多路径都需要改成自己的,包括hadoop-streaming-2.6.0-cdh5.4.4.jar包的名字也要记得修改。同时自己的可执行程序的名字也要修改好。

运行看效果

[frank@localhost sample]$ ./run_cpp_mr.sh
[frank@localhost sample]$ /home/frank/hadoop/hadoop-2.6.0-cdh5.4.4/bin/hadoop fs -cat /home/output_cpp/*
15/07/17 15:51:50 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
a	1
book	1
do	1
ha	2
i	2
not	1
than	1
you	2
am	1
better	1
have	2
one	1
so	1
最后附上另一个mapper.cpp程序,可以自己分析一下有什么区别

// By dongxicheng,
// blog:http://dongxicheng.org/
// mapper.cpp
#include <iostream>
#include <string>
#include <sstream>
#include <vector>
#include <cstdlib>
using namespace std;
string charArrayToString(char *str) {
stringstream ss(str);
return ss.str();
}

vector<std::string>& split(
const string &s, char delim, vector<string> &elems) {
stringstream ss(s);
string item;
while(getline(ss, item, delim)) {
elems.push_back(item);
}
return elems;
}

int main(int argc, char *argv[], char *env[]) {
int reduce_task_no = -1;
int iterator = -1;
vector<string> pairs;
for(int i = 0; env[i] != NULL; i++) {
pairs.clear();
split(charArrayToString(env[i]), '=', pairs);
if(pairs.size() < 2) continue;
if(pairs[0] == "mapreduce_job_reduces") // number of reduce tasks
reduce_task_no = atoi(pairs[1].c_str());
else if(pairs[0] == "mapreduce_iterator_no") // user-defined attribute
iterator = atoi(pairs[1].c_str());
}
cerr << "mapreduce.job.reduces:" << reduce_task_no
<< ",mapreduce.iterator.no:" << iterator << endl;

string key;
while(cin >> key) {
cout << key << "\t" << "1" << endl;
// Define counter named counter_no in group counter_group
cerr << "reporter:counter:counter_group,counter_no,1\n";
// dispaly status
cerr << "reporter:status:processing......\n";
// Print logs for testing
cerr << "This is log, will be printed in stdout file\n";
}
return 0;
}


四、shell程序

基本上与c++一致,只是不用编译了

mapper2.sh代码如下

#! /bin/bash
while read LINE; do
for word in $LINE
do
echo "$word 1"
done
done
reducer.sh代码如下

#! /bin/bash
count=0
started=0
word=""
while read LINE;do
newword=`echo $LINE | cut -d ' '  -f 1`
if [ "$word" != "$newword" ];then
[ $started -ne 0 ] && echo "$word\t$count"
word=$newword
count=1
started=1
else
count=$(( $count + 1 ))
fi
done
echo "$word\t$count"
编写脚本run_shell_mr.sh

#!/bin/bash
HADOOP_HOME=/home/frank/hadoop/hadoop-2.6.0-cdh5.4.4
INPUT_PATH=/home/tmp
OUTPUT_PATH=/home/output_cpp
echo "Clearing output path: $OUTPUT_PATH"
$HADOOP_HOME/bin/hadoop fs -rmr $OUTPUT_PATH

${HADOOP_HOME}/bin/hadoop jar\
${HADOOP_HOME}/share/hadoop/tools/lib/hadoop-streaming-2.6.0-cdh5.4.4.jar\
-files mapper.sh,reducer.sh\
-input $INPUT_PATH\
-output $OUTPUT_PATH\
-mapper "sh mapper.sh"\
-reducer "sh reducer.sh"


用管道本地化测试一下

[frank@localhost sample]$ cat test.txt |sh mapper2.sh |sort |sh reducer.sh
然后可以直接运行脚本用Hadoop试一下。此处就略去过程。

同样,贴上mapper.sh

#! /bin/bash
while read LINE; do
for word in $LINE
do
echo "$word 1"
# in streaming, we define counter by
# [reporter:counter:<group>,<counter>,<amount>]
# define a counter named counter_no, in group counter_group
# increase this counter by 1
# counter shoule be output through stderr
echo "reporter:counter:counter_group,counter_no,1" >&2
echo "reporter:counter:status,processing......" >&2
echo "This is log for testing, will be printed in stdout file" >&2
done
done


五、php程序

mapper.php

#!/usr/bin/php
<?php
// By dongxicheng,
// blog:http://dongxicheng.org/
// mapper.php
error_reporting(E_ALL ^ E_NOTICE);
$word2count = array();
// 标准输入为STDIN (standard input)
while (($line = fgets(STDIN)) !== false) {
// 移除空白
$line = trim($line);
// 將行拆解成若干个单词
$words = preg_split('/\W/', $line, 0, PREG_SPLIT_NO_EMPTY);
// 将结果写到 STDOUT (standard output)
foreach ($words as $word) {
// 印出 [字 , "tab字符" ,  "数字" , "结束符"]
echo $word, chr(9), "1", PHP_EOL;
}
}
?>
reducer.php

#!/usr/bin/php
<?php
// By dongxicheng,
// blog:http://dongxicheng.org/
// reducer.php
error_reporting(E_ALL ^ E_NOTICE);
$word2count = array();
// 标准输入为 STDIN
while (($line = fgets(STDIN)) !== false) {
// 移除多余空白
$line = trim($line);
// 每一行的格式为(单词 "tab" 数字),存入($word, $count)
list($word, $count) = explode(chr(9), $line);
// 转换格式string -> int
$count = intval($count);
//汇总
$word2count[$word] += $count;
}
// 将结果写到 STDOUT (standard output)
foreach ($word2count as $word => $count) {
echo $word, chr(9), $count, PHP_EOL;
}
?>
运行脚本run_php_mr.sh

#!/bin/bash
HADOOP_HOME=/home/frank/hadoop/hadoop-2.6.0-cdh5.4.4
INPUT_PATH=/home/tmp
OUTPUT_PATH=/home/output_cpp
echo "Clearing output path: $OUTPUT_PATH"
$HADOOP_HOME/bin/hadoop fs -rmr $OUTPUT_PATH

${HADOOP_HOME}/bin/hadoop jar\
${HADOOP_HOME}/share/hadoop/tools/lib/hadoop-streaming-2.6.0-cdh5.4.4.jar\
-files mapper.php,reducer.php\
-input $INPUT_PATH\
-output $OUTPUT_PATH\
-mapper "php mapper.php" \
-reducer "php reducer.php" \


六、python脚本

准备自己写一个,暂留当作业了。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: