您的位置:首页 > 编程语言

MapReduce编程模式

2017-06-28 20:12 357 查看
MapReduce作业(job)是客户端执行的单位:它包括输入数据、MapReduce程序和配置信息。Hadoop通过把作业分成若干个小任务(task)来工作,其包括两种类型的任务:map任务和reduce任务。

 

有两种类型的节点控制着作业执行过程:jobtracker和多个tasktracker。Jobtracker通过调度任务在tasktracker上运行,来协调所有运行在系统上的作业。Tasktracker运行任务的同时,把进度报告传送给jobtracker,jobtracker则记录着每项任务的整体进展情况。如果其中一个任务失败,jobtracker可以重新调度任务到另外一个tasktracker。Hadoop把输入数据划分为等长的小数据发送给mapreduce,称为输入分片。Hadoop为每个分片创建一个map任务,由它来运行用户自定义的map函数来分析每个分片的记录。

 

Map任务的执行节点和输入数据的存储节点在同一个节点,Hadoop的性能达到最佳。Reduce任务并不具备数据本地读取的优势,一个单一的reduce任务的输入往往来自于所有mapper的输出。

一个简单的MapReduce程序

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class WordCount {

public static class WordCountMap extends
Mapper<LongWritable, Text, Text, IntWritable> {

private final IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer token = new StringTokenizer(line);
while (token.hasMoreTokens()) {
word.set(token.nextToken());
context.write(word, one);
}
}
}

public static class WordCountReduce extends
Reducer<Text, IntWritable, Text, IntWritable> {

public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf);
job.setJarByClass(WordCount.class);
job.setJobName("wordcount");

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

job.setMapperClass(WordCountMap.class);
job.setReducerClass(WordCountReduce.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.waitForCompletion(true);
}
}



小结:

客户端(client):编写mapreduce程序,配置作业,提交作业。
JobTracker:初始化作业,分配作业,与TaskTracker通信,协调整个作业的执行;
TaskTracker:保持与JobTracker的通信,在分配的数据片段上执行Map或Reduce任务,TaskTracker和JobTracker的不同有个很重要的方面,就是在执行任务时候TaskTracker可以有n多个,JobTracker则只会有一个
Hdfs:保存作业的数据、配置信息等等,最后的结果也是保存在hdfs上面
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: