您的位置:首页 > 其它

MapReduce 初试

2015-06-24 14:27 239 查看
一、境遇

接触Hadoop已经有半年了,从Hadoop集群搭建到Hive、HBase、Sqoop相关组件的安装,甚至Spark on Hive、Phoenix、Kylin这些边缘的项目都有涉及。如果说部署,我自认为可以没有任何问题,但是如果说我对于这个系统已经掌握了,我却不敢这么讲,因为至少MapReduce我还没有熟悉,其工作机制也只是一知半解。关于MapReduce的运算,我差不多理解了,但是实际实现现在却只能靠找到的代码,真的是惭愧的很。
于是再也忍不住,一定要有点自己的东西,最起码,写的时候不用去找别人的博客,嗯,找自己的就行。

二、实验

1、实验过程
最开始实验的是最简单的去重MapReduce,在本地文件实验时没有任何问题,但把文件放到HDFS上就怎么也找不到了,究其原因,HDFS上的需要用Hadoop执行jar文件才可以
1)javac输出类到指定目录 dir

javac *.java -d dir

2)jar打包class文件

1,打包指定class文件到target.jar

jar cvf target.jar x1.class x2.class ... xn.class

2,打包指定路径dir下的所有class文件到target.jar
jar cvf target.jar -C dir .

3,打包class文件成可执行jar,程序入口Main函数

jar cvfe tarrget.jar Main -C dir .

Hadoop只需要普通jar即可,不用打包成可执行jar

3)执行jar,主类MapDuplicate

Hadoop jar target.jar MapDuplicate (params)

2、代码分析

1)import类
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
Configuration类:用来设定Hadoop的参数,如:IP、端口等

Path:用来设定输入输出路径

IntWritable:MapReduce用到的int类型
Text:MapReduce用到的string类型

Job:生成MapReduce任务的主类,任务参数也在此类中设定

Mapper:被继承的Map类

Reducer:被继承的Reduce类

FileInputFormat:输入文件格式
FileOutputFormat:输出文件格式(可改为其它IO类,如数据库)

GenericOptionsParser:解析命令行参数的类

2)代码结构
public class MapDuplicate {
public static class Map extends Mapper<...> { ... }
public static class Reduce extends Reducer<...> { ... }
public static void main(String[] args) throws Ex { ... }
}
2)Map类
public static class Map extends Mapper<Object,Text,Text,Text> {
private static Text line = new Text();

public void map(Object key,Text value,Context context)
throws IOException,InterruptedException {
line = value;
context.write(line,new Text(""));
}
}
Map类的主要作用是将数据进行统一处理,按照规则给出键值对,为Combine和Reduce等Reduce操作提供标准化数据。从代码上来讲,均继承Mapper类,并实现map函数

Mapper类继承的四个参数,前两个分别是输入数据键和值的类型,一般写Object,Text即可;后两个是输出数据键和值的类型,这两个类型必须和Reduce的输入数据键值类型一致。

所有的Java值类型在送到MapReduce任务前都要转化成对应的值类型:如:String->Text,int->IntWritable,long->LongWritable
Context是Java类与MapReduce任务交互的类,它把Map的键值对传给Combiner或者Reducer,也把Reducer的结果写到HDFS上

3)Reduce类
public static class Reduce extends Reducer<Text,Text,Text,Text> {
public void reduce(Text key,Iterable<Text> values,Context context)
throws IOException,InterruptedException {
context.write(key,new Text(""));
}
}
Reduce有两种操作,Combine和Reduce,都继承Reducer类。前者用于对数据进行预处理,将处理好的数据交给Reduce,可以看成是本地的Reduce,当不需要任何处理时,Combine可以直接用Reduce代替;后者用于对数据进行正式处理,将相同键值的数据合并,每一个Reduce函数过程只处理同一个键(key)的数据。

Reducer类继承的四个参数,前两个是输入数据键和值的类型,必须与Mapper类的输出类型一致(Combine也必须一致,而且Combine输出需要跟Reduce的输入一致,所以Combine输入输出类型必须是相同的);后两个是输出数据键和值的类型,即我们最终得到的结果

4)Main函数

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("mapred.job.tracker","XHadoop1:9010");
String[] ioArgs = new String[] {"duplicate_in","duplicate_out"};
String[] otherArgs = new GenericOptionsParser(conf,ioArgs).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: MapDuplicate <in> <out>");
System.exit(2);
}
Job job = new Job(conf,"MapDuplicate");
job.setJarByClass(MapDuplicate.class);
job.setMapperClass(Map.class);
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(job,new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
首先,必须有Configuration类,通过这个类指定工作的机器

然后,接收参数的语句,这个不解释了

然后,需要有Job类,指定MapReduce处理用到的类,需要指定的有:Mapper类、Combiner类、Reducer类、输出数据键和值类型的类

然后,指定输入数据的路径

然后,等待任务结束并退出

三、总结

这个实验可以说是最简单的MapReduce,但是麻雀虽小五脏俱全。

从原理来讲,MapReduce有以下步骤:
HDFS(Block)->Split->Mapper->Partion->Spill->Sort->Combiner->Merge->Reducer->HDFS
1、HDFS输入数据被分成Split,被Mapper类读取,

2、Mapper读取数据后,将任务进行Partion(分配)

3、如果Map操作内存溢出,需要Spill(溢写)到磁盘上

4、Mapper进行Sort(排序)操作

5、排序之后进行Combine(合并key)操作,可以理解为本地模式Reduce

6、Combine的同时会进行溢出文件的Merge(合并)

7、所有任务完成后将数据交给Reducer进行处理,处理完成写入HDFS

8、从Map任务开始到Reduce任务开始的数据传输操作叫做Shuffle

从编程来讲,MapReduce有以下步骤:

1、编写Mapper类

2、编写Combiner类(可选)

3、编写Reducer类

4、调用过程:参数配置Configuration
指定任务类
指定输入输出格式
指定数据位置
开始任务

以上仅仅是浅层认识,仅供学习参考及备查。

本文出自 “从零开始” 博客,请务必保留此出处http://loverofvenus.blog.51cto.com/2766338/1665053
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: