您的位置:首页 > 其它

(转)多个mapreduce工作相互依赖处理方法完整实例(JobControl)

2015-11-01 17:34 579 查看

多个mapreduce工作相互依赖处理方法完整实例(JobControl)

原文地址:http://mntms.iteye.com/blog/2096456?utm_source=tuicool&utm_medium=referral

处理复杂的要求的时候,有时一个mapreduce程序时完成不了的,往往需要多个mapreduce程序,这个时候就要牵扯到各个任务之间的依赖关系所谓依赖就是一个M/R Job 的处理结果是另外的M/R 的输入,以此类推,完成几个mapreduce程序,得到最后的结果,下面将直接贴出一个例子的全部代码,因为为了找一个完整的例子实在是太难了,今天找了半天才把这个问题解决。

代码描述,一共包括两个mapreduce作业。也就是两个map和两个reduce函数,第一个job处理后的输出是第二个job的输入,然后交由第二个job来做出最后的结果,代码里面的关键的地方已经有了注释

先是代码的主体部分:



上代码:

Java代码


/*

* anthor TMS

*/

package 依赖MR处理方法;

import java.io.IOException;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.JobConf;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;

import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MODEL {

//第一个Job的map函数

public static class Map_First extends Mapper<Object, Text ,Text , IntWritable>{ private final static IntWritable one = new IntWritable(1);

private Text keys = new Text();

public void map(Object key,Text value, Context context ) throws IOException, InterruptedException {

String s = value.toString();

String[] allStr = Config.CatString(s);

keys.set(allStr[1]);

context.write(keys, one);

}

}

//第一个Job的reduce函数

public static class Reduce_First extends Reducer<Text, IntWritable, Text, IntWritable> {

private IntWritable result = new IntWritable();

public void reduce(Text key,Iterable<IntWritable>values, Context context) throws IOException, InterruptedException {

int sum = 0;

for(IntWritable value:values) {

sum += value.get();

}

result.set(sum);

context.write(key, result);

}

}

//第二个job的map函数

public static class Map_Second extends Mapper<Object, Text ,Text , IntWritable>{

private final static IntWritable one = new IntWritable(1);

private Text keys = new Text();

public void map(Object key,Text value, Context context ) throws IOException, InterruptedException {

String s = value.toString();

String[] allStr = Config.CatString(s);

keys.set(allStr[1]);

context.write(keys, one);

}

}

//第二个Job的reduce函数

public static class Reduce_Second extends Reducer<Text, IntWritable, Text, IntWritable> {

private IntWritable result = new IntWritable();

public void reduce(Text key,Iterable<IntWritable>values, Context context) throws IOException, InterruptedException {

int sum = 0;

for(IntWritable value:values) {

sum += value.get();

}

result.set(sum);

context.write(key, result);

}

}

//启动函数

public static void main(String[] args) throws IOException {

JobConf conf = new JobConf(MODEL.class);

//第一个job的配置

Job job1 = new Job(conf,"join1");

job1.setJarByClass(MODEL.class);

job1.setMapperClass(Map_First.class);

job1.setReducerClass(Reduce_First.class);

job1.setMapOutputKeyClass(Text.class);//map阶段的输出的key

job1.setMapOutputValueClass(IntWritable.class);//map阶段的输出的value

job1.setOutputKeyClass(Text.class);//reduce阶段的输出的key

job1.setOutputValueClass(IntWritable.class);//reduce阶段的输出的value

//加入控制容器

ControlledJob ctrljob1=new ControlledJob(conf);

ctrljob1.setJob(job1);

//job1的输入输出文件路径

FileInputFormat.addInputPath(job1, new Path(args[0]));

FileOutputFormat.setOutputPath(job1, new Path(args[1]));

//第二个job的配置

Job job2=new Job(conf,"Join2");

job2.setJarByClass(MODEL.class);

job2.setMapperClass(Map_Second.class);

job2.setReducerClass(Reduce_Second.class);

job2.setMapOutputKeyClass(Text.class);//map阶段的输出的key

job2.setMapOutputValueClass(IntWritable.class);//map阶段的输出的value

job2.setOutputKeyClass(Text.class);//reduce阶段的输出的key

job2.setOutputValueClass(IntWritable.class);//reduce阶段的输出的value

//作业2加入控制容器

ControlledJob ctrljob2=new ControlledJob(conf);

ctrljob2.setJob(job2);

//设置多个作业直接的依赖关系

//如下所写:

//意思为job2的启动,依赖于job1作业的完成

ctrljob2.addDependingJob(ctrljob1);

//输入路径是上一个作业的输出路径,因此这里填args[1],要和上面对应好

FileInputFormat.addInputPath(job2, new Path(args[1]));

//输出路径从新传入一个参数,这里需要注意,因为我们最后的输出文件一定要是没有出现过得

//因此我们在这里new Path(args[2])因为args[2]在上面没有用过,只要和上面不同就可以了

FileOutputFormat.setOutputPath(job2,new Path(args[2]) );

//主的控制容器,控制上面的总的两个子作业

JobControl jobCtrl=new JobControl("myctrl");

//添加到总的JobControl里,进行控制

jobCtrl.addJob(ctrljob1);

jobCtrl.addJob(ctrljob2);

//在线程启动,记住一定要有这个

Thread t=new Thread(jobCtrl);

t.start();

while(true){

if(jobCtrl.allFinished()){//如果作业成功完成,就打印成功作业的信息

System.out.println(jobCtrl.getSuccessfulJobList());

jobCtrl.stop();

break;

}

}

}

}

工程上右键run进行配置:先配置第一个栏目main里面的Project(项目名)和Main Class(主类名)



接下来是arguments如下所示:



最后点击Apply然后Run,运行成功之后,刷新DFS出现几个文件,如下分别为输入的原始数据文件,第一个mapreduce任务后输出的文件output和第二个mapreduce任务之后输出的文件output1



这里只有两个mapreduce任务,多个也是一样,主要的思想就是先写好每一个mapreduce任务的主体部分,也就是map和reduce函数,然后就是分别配置每一个mapreduce任务(这里要注意设置好输入和输出路径,很容易忘记!!!)此时将job任务加入到控制容器,每一个都要加,再就是使用addDependingJob()添加依赖关系,再用一个总的控制器控制每一个任务。最后用一个线程启动!!!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: