【Hadoop系列第五章】MapReduce2.0编程实践(下)实践
2015-07-18 10:29
661 查看
MapReduce2.0编程实践(下)实践
一、运行实例
假设hdfs文件系统中什么都没有,这里我先清空以前的东西。注意不能使用
[frank@localhost bin]$ ./hdfs dfs -rm -r -f /home/*来清空hdfs系统中/home目录下全部内容,因为shell会将/home/*解读为本地/home/目录下全部文件和文件夹,但是在hdfs上未必有相对应路径的文件。
所以应该这样做
[frank@localhost bin]$ ./hdfs dfs -rm -r -f /home/不过这样/home目录也没有了。
我们先建立文件夹/home/input
[frank@localhost bin]$ ./hdfs dfs mkdir /home [frank@localhost bin]$ ./hdfs dfs mkdir /home/input在input文件夹中放入需要统计词汇的文件test.txt
test.txt内容如下:
i have a book you do not have one so i am better than you ha ha
[frank@localhost bin]$ ./hdfs dfs -put /home/frank/input/* /home/input使用系统自己的wordcount看看效果。
注意,要保证输出目录不存在。
[frank@localhost bin]$ ./hadoop jar ../share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0-cdh5.4.4.jar wordcount /home/tmp /home/output [frank@localhost bin]$ ./hadoop fs -cat /home/output/* 15/07/17 14:34:15 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable a 1 am 1 better 1 book 1 do 1 ha 2 have 2 i 2 not 1 one 1 so 1 than 1 you 2
二、Eclipse环境下java开发mapreduce程序
首先,打开eclipse,新建一个java程序,添加需要的jar包。选中项目,右键properties->Java Build Path->Libraries->Add Extern JARs需要添加的包有:
\hadoop-2.6.0-cdh5.4.4\share\hadoop\mapreduce2\ \hadoop-2.6.0-cdh5.4.4\share\hadoop\common\ \hadoop-2.6.0-cdh5.4.4\share\hadoop\common\lib\这三个文件夹下的jar包
不同的版本可能路径有出入,自己灵活变动。
这里我懒得写一个新的wordcount,下载源码,CDH版本就有源码,把源码里面的拷过来,路径在
\hadoop-2.6.0-cdh5.4.4\src\hadoop-mapreduce-project\hadoop-mapreduce-examples\src\main\java\org\apache\hadoop\examples\WordCount.java 里面
贴上代码,方便分析。
package org.apache.hadoop.examples; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2) { System.err.println("Usage: wordcount <in> [<in>...] <out>"); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); for (int i = 0; i < otherArgs.length - 1; ++i) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
然后点右键,Export->Java->JAR包
将生成的wordcount.jar包 传至服务器,使用这个jar包运行看看效果。
[frank@localhost bin]$ ./hadoop jar /home/frank/wordcount.jar WordCount /home/tmp /home/Output2同样可以查看,效果一样。
三、C++版本的MapReduce程序
Map程序 mapper2.cpp// By dongxicheng, // blog:http://dongxicheng.org/ // mapper.cpp #include <iostream> #include <string> using namespace std; int main() { string key; while(cin >> key) { cout << key << "\t" << "1" << endl; } return 0; }
Reduce程序 reducer.cpp
// By dongxicheng, // blog:http://dongxicheng.org/ // reducer.cpp #include <iostream> #include <string> using namespace std; int main() { string cur_key, last_key, value; cin >> cur_key >> value; last_key = cur_key; int n = 1; while(cin >> cur_key) { cin >> value; if(last_key != cur_key) { cout << last_key << "\t" << n << endl; last_key = cur_key; n = 1; } else { n++; } } cout << last_key << "\t" << n << endl; return 0; }
编译:
[frank@localhost bin]$ g++ -o mapper /home/frank/sample/mapper2.cpp [frank@localhost bin]$ g++ -o reducer /home/frank/sample/reducer.cpp
可以先用linux的管道本地测试一下
[frank@localhost sample]$ cat test.txt |./mapper |sort |./reducer a 1 am 1 better 1 book 1 do 1 ha 2 have 2 i 2 not 1 one 1 so 1 than 1 you 2
写一个脚本run_cpp_mr.sh,来运行Hadoop执行
#!/bin/bash HADOOP_HOME=/home/frank/hadoop/hadoop-2.6.0-cdh5.4.4 INPUT_PATH=/home/tmp OUTPUT_PATH=/home/output_cpp echo "Clearing output path: $OUTPUT_PATH" $HADOOP_HOME/bin/hadoop fs -rmr $OUTPUT_PATH ${HADOOP_HOME}/bin/hadoop jar\ ${HADOOP_HOME}/share/hadoop/tools/lib/hadoop-streaming-2.6.0-cdh5.4.4.jar\ -D mapred.reduce.tasks=2\ -D mapreduce.iterator.no=100\ -files mapper,reducer\ -input $INPUT_PATH\ -output $OUTPUT_PATH\ -mapper mapper\ -reducer reducer
其中,HADOOP_HOME是自己Hadoop目录,注意很多路径都需要改成自己的,包括hadoop-streaming-2.6.0-cdh5.4.4.jar包的名字也要记得修改。同时自己的可执行程序的名字也要修改好。
运行看效果
[frank@localhost sample]$ ./run_cpp_mr.sh [frank@localhost sample]$ /home/frank/hadoop/hadoop-2.6.0-cdh5.4.4/bin/hadoop fs -cat /home/output_cpp/* 15/07/17 15:51:50 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable a 1 book 1 do 1 ha 2 i 2 not 1 than 1 you 2 am 1 better 1 have 2 one 1 so 1最后附上另一个mapper.cpp程序,可以自己分析一下有什么区别
// By dongxicheng, // blog:http://dongxicheng.org/ // mapper.cpp #include <iostream> #include <string> #include <sstream> #include <vector> #include <cstdlib> using namespace std; string charArrayToString(char *str) { stringstream ss(str); return ss.str(); } vector<std::string>& split( const string &s, char delim, vector<string> &elems) { stringstream ss(s); string item; while(getline(ss, item, delim)) { elems.push_back(item); } return elems; } int main(int argc, char *argv[], char *env[]) { int reduce_task_no = -1; int iterator = -1; vector<string> pairs; for(int i = 0; env[i] != NULL; i++) { pairs.clear(); split(charArrayToString(env[i]), '=', pairs); if(pairs.size() < 2) continue; if(pairs[0] == "mapreduce_job_reduces") // number of reduce tasks reduce_task_no = atoi(pairs[1].c_str()); else if(pairs[0] == "mapreduce_iterator_no") // user-defined attribute iterator = atoi(pairs[1].c_str()); } cerr << "mapreduce.job.reduces:" << reduce_task_no << ",mapreduce.iterator.no:" << iterator << endl; string key; while(cin >> key) { cout << key << "\t" << "1" << endl; // Define counter named counter_no in group counter_group cerr << "reporter:counter:counter_group,counter_no,1\n"; // dispaly status cerr << "reporter:status:processing......\n"; // Print logs for testing cerr << "This is log, will be printed in stdout file\n"; } return 0; }
四、shell程序
基本上与c++一致,只是不用编译了mapper2.sh代码如下
#! /bin/bash while read LINE; do for word in $LINE do echo "$word 1" done donereducer.sh代码如下
#! /bin/bash count=0 started=0 word="" while read LINE;do newword=`echo $LINE | cut -d ' ' -f 1` if [ "$word" != "$newword" ];then [ $started -ne 0 ] && echo "$word\t$count" word=$newword count=1 started=1 else count=$(( $count + 1 )) fi done echo "$word\t$count"编写脚本run_shell_mr.sh
#!/bin/bash HADOOP_HOME=/home/frank/hadoop/hadoop-2.6.0-cdh5.4.4 INPUT_PATH=/home/tmp OUTPUT_PATH=/home/output_cpp echo "Clearing output path: $OUTPUT_PATH" $HADOOP_HOME/bin/hadoop fs -rmr $OUTPUT_PATH ${HADOOP_HOME}/bin/hadoop jar\ ${HADOOP_HOME}/share/hadoop/tools/lib/hadoop-streaming-2.6.0-cdh5.4.4.jar\ -files mapper.sh,reducer.sh\ -input $INPUT_PATH\ -output $OUTPUT_PATH\ -mapper "sh mapper.sh"\ -reducer "sh reducer.sh"
用管道本地化测试一下
[frank@localhost sample]$ cat test.txt |sh mapper2.sh |sort |sh reducer.sh然后可以直接运行脚本用Hadoop试一下。此处就略去过程。
同样,贴上mapper.sh
#! /bin/bash while read LINE; do for word in $LINE do echo "$word 1" # in streaming, we define counter by # [reporter:counter:<group>,<counter>,<amount>] # define a counter named counter_no, in group counter_group # increase this counter by 1 # counter shoule be output through stderr echo "reporter:counter:counter_group,counter_no,1" >&2 echo "reporter:counter:status,processing......" >&2 echo "This is log for testing, will be printed in stdout file" >&2 done done
五、php程序
mapper.php#!/usr/bin/php <?php // By dongxicheng, // blog:http://dongxicheng.org/ // mapper.php error_reporting(E_ALL ^ E_NOTICE); $word2count = array(); // 标准输入为STDIN (standard input) while (($line = fgets(STDIN)) !== false) { // 移除空白 $line = trim($line); // 將行拆解成若干个单词 $words = preg_split('/\W/', $line, 0, PREG_SPLIT_NO_EMPTY); // 将结果写到 STDOUT (standard output) foreach ($words as $word) { // 印出 [字 , "tab字符" , "数字" , "结束符"] echo $word, chr(9), "1", PHP_EOL; } } ?>reducer.php
#!/usr/bin/php <?php // By dongxicheng, // blog:http://dongxicheng.org/ // reducer.php error_reporting(E_ALL ^ E_NOTICE); $word2count = array(); // 标准输入为 STDIN while (($line = fgets(STDIN)) !== false) { // 移除多余空白 $line = trim($line); // 每一行的格式为(单词 "tab" 数字),存入($word, $count) list($word, $count) = explode(chr(9), $line); // 转换格式string -> int $count = intval($count); //汇总 $word2count[$word] += $count; } // 将结果写到 STDOUT (standard output) foreach ($word2count as $word => $count) { echo $word, chr(9), $count, PHP_EOL; } ?>运行脚本run_php_mr.sh
#!/bin/bash HADOOP_HOME=/home/frank/hadoop/hadoop-2.6.0-cdh5.4.4 INPUT_PATH=/home/tmp OUTPUT_PATH=/home/output_cpp echo "Clearing output path: $OUTPUT_PATH" $HADOOP_HOME/bin/hadoop fs -rmr $OUTPUT_PATH ${HADOOP_HOME}/bin/hadoop jar\ ${HADOOP_HOME}/share/hadoop/tools/lib/hadoop-streaming-2.6.0-cdh5.4.4.jar\ -files mapper.php,reducer.php\ -input $INPUT_PATH\ -output $OUTPUT_PATH\ -mapper "php mapper.php" \ -reducer "php reducer.php" \
六、python脚本
准备自己写一个,暂留当作业了。相关文章推荐
- 在Django的视图中使用form对象的方法
- Java进阶(极客)——单例模式(二)优化
- Java进阶(极客)——单例模式(一)基本原理
- Java Web Service
- 检测到在集成的托管管道模式下不适用的ASP.NET设置的解决方法(转)
- A==B?(java大数做高精度)
- javadoc 使用中解决不能编码问题
- C++学习笔记32 断言函数
- Java设计模式之——装饰设计模式
- Java学习笔记-------事件处理机制
- Eclipse中android各个目录的介绍
- java新手笔记1 Hello World!
- 算法导论 第十六章:贪心算法之单任务调度问题
- Java序列化
- Java对象赋值原理详解(上)附实例源码
- spring4mvc
- Qt中PostgreSQL数据库驱动插件的编译
- 【Hadoop系列第五章】MapReduce2.0编程实践(上)理论
- [LeetCode][Java] Sort Colors
- HDU 1159 && POJ 1458