MapReduce 编程及Job运行模式
2017-12-08 09:00
351 查看
Hadoop提供了三种形式的编程⽅式: 1.Java(最原始的⽅式) 2.Hadoop Streaming(⽀持多语⾔) 3.Hadoop Pipes(⽀持C/C++) 其中Java编程是所有编程接⼜的基础,不同的编程接⼜的内部引擎是⼀样 的。效率是不同的。 MapReduce编程主要需要⾃⼰来编写Mapper程序和Reducer程序。 以统计单词数量为列: Yarn集群⽅式运⾏ ①.编写Mapper程序 package com.firewolf;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text ; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.util.StringUtils;
/** * Mapper程序:4个泛型:输⼊key类型,输⼊value类型,输出key类型,输 出value类型 * @author liuxing * */ public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text,
Text, LongWritable>.Context context) throws IOException, InterruptedException { String content = value.toString(); //切分输⼊串
//切分输⼊串 String[] words = StringUtils.split(content, ' '); //每⼀个统计次数,并且输出 for(String word : words){ context.write(new Text(word), new LongWritable(1l)); } } } ②.编写Reducer程序 package com.firewolf;
import java.io.IOException; import java.util.Iterator;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
/** * Reducer程序:4个泛型:输⼊key类型,输⼊value类型,输出key类型,输 出value类型 * @author liuxing * */ public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable>{ @Override protected void reduce(Text text, Iterable<LongWritable> counts,
Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException { //取出每⼀个的数量,然后加起来,得到每⼀个word出现的次数 Iterator<LongWritable> iterator = counts.iterator(); long count = 0l; while(iterator.hasNext()){
LongWritable next = iterator.next();
long l = next.get(); count += l; } context.write(text, new LongWritable(count)); } }
③.编写主类,描述Job信息。 package com.firewolf;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class JobRunner { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //设置Mapper类 job.setMapperClass(WordCountMapper.class); //设置Reducer类
job.setReducerClass(WordCountReducer.class); //设置当前Jar信息,通过当前类设置,这样hadoop在启动的时候能加 载相关类 job.setJarByClass(JobRunner.class); //设置Map的Key类型 job.setMapOutputKeyClass(Text.class); //设置Map的Value类型 job.setMapOutputValueClass(LongWritable.class);
//设置Reducer的key类型(也可以设置所有的Key类型,包括) job.setOutputKeyClass(Text.class);
job.setOutputKeyClass(Text.class); //设置Reducer的输出值类型 job.setOutputValueClass(LongWritable.class); //设置输⼊位置 FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop:9000/wc/data")); //设置输出位置 FileOutputFormat.setOutputPath(job,
new Path("hdfs://hadoop:9000/wc/ouput")); boolean b = job.waitForCompletion(true); System.exit(b == true ? 0:1); } } ④.打jar包wcount.jar ⑤.执⾏jar包(命令中可以指定主类) hadoop jar wcount.jar ⑥.执⾏完毕后,查看⽂件: hadoop fs -cat /wc/ouput/part-r-00000 结果:
上⾯的这种⽅式,不⽅便调试,可以在本地Eclipse中运⾏ 本地Eclipse运⾏
本地Eclipse运⾏ ①.只⽤在编译路径中加⼊Hadoop的所有Jar包就可以了(注意需要有yarn的 jar包,上⾯的⽅式可以没有yarn Jar包的情况下编写代码,因为不需要资源 调度) ②.修改下输⼊输出路径到本地就好了
③.如果在windows下⾯报⽤户⾮法异常,可以配置运⾏参数:DHADOOP_USER_HOME=hadoop,配置hadoop登录的⽤户名(就是 Hadoop安装的机器名) 本地Eclipse提交集群作业 ①.拷贝配置⽂件到Eclipse⽬录下。 ②.修改主类如下:
③.打包jar包。 ⽬录如下:
④.启动测试 运⾏模式总结: 1.jar包在服务器运⾏
1.jar包在服务器运⾏ 在eclipse上开发好MR程序(Linux和Eclipse都可以),然后打成Jar包, 上传服务器, 执⾏命令hadoop jar xxx.jar XXX类 这种⽅式会将和这个Job提交到yarn集群上去运⾏。 2.使⽤Eclipse本地job(主要⽤于调试) 在Eclipse中开发mr,并且直接运⾏ 3.在Eclipse⾥⾯启动,但是在yarn集群上运⾏。 a.需要把⼏个配置⽂件拷贝到项⽬中, b.需要把项⽬打包放置在当前项⽬ c.需要指定mapreduce.job.jar属性为b中打包好的.jar
是否运⾏在本地取决于mapred-site.xml中的mapreduce.framework.name属 性,如果是yarn就运⾏在yarn集群上,如果是local,就运⾏在本地
import java.io.IOException;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text ; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.util.StringUtils;
/** * Mapper程序:4个泛型:输⼊key类型,输⼊value类型,输出key类型,输 出value类型 * @author liuxing * */ public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text,
Text, LongWritable>.Context context) throws IOException, InterruptedException { String content = value.toString(); //切分输⼊串
//切分输⼊串 String[] words = StringUtils.split(content, ' '); //每⼀个统计次数,并且输出 for(String word : words){ context.write(new Text(word), new LongWritable(1l)); } } } ②.编写Reducer程序 package com.firewolf;
import java.io.IOException; import java.util.Iterator;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
/** * Reducer程序:4个泛型:输⼊key类型,输⼊value类型,输出key类型,输 出value类型 * @author liuxing * */ public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable>{ @Override protected void reduce(Text text, Iterable<LongWritable> counts,
Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException { //取出每⼀个的数量,然后加起来,得到每⼀个word出现的次数 Iterator<LongWritable> iterator = counts.iterator(); long count = 0l; while(iterator.hasNext()){
LongWritable next = iterator.next();
long l = next.get(); count += l; } context.write(text, new LongWritable(count)); } }
③.编写主类,描述Job信息。 package com.firewolf;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class JobRunner { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //设置Mapper类 job.setMapperClass(WordCountMapper.class); //设置Reducer类
job.setReducerClass(WordCountReducer.class); //设置当前Jar信息,通过当前类设置,这样hadoop在启动的时候能加 载相关类 job.setJarByClass(JobRunner.class); //设置Map的Key类型 job.setMapOutputKeyClass(Text.class); //设置Map的Value类型 job.setMapOutputValueClass(LongWritable.class);
//设置Reducer的key类型(也可以设置所有的Key类型,包括) job.setOutputKeyClass(Text.class);
job.setOutputKeyClass(Text.class); //设置Reducer的输出值类型 job.setOutputValueClass(LongWritable.class); //设置输⼊位置 FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop:9000/wc/data")); //设置输出位置 FileOutputFormat.setOutputPath(job,
new Path("hdfs://hadoop:9000/wc/ouput")); boolean b = job.waitForCompletion(true); System.exit(b == true ? 0:1); } } ④.打jar包wcount.jar ⑤.执⾏jar包(命令中可以指定主类) hadoop jar wcount.jar ⑥.执⾏完毕后,查看⽂件: hadoop fs -cat /wc/ouput/part-r-00000 结果:
上⾯的这种⽅式,不⽅便调试,可以在本地Eclipse中运⾏ 本地Eclipse运⾏
本地Eclipse运⾏ ①.只⽤在编译路径中加⼊Hadoop的所有Jar包就可以了(注意需要有yarn的 jar包,上⾯的⽅式可以没有yarn Jar包的情况下编写代码,因为不需要资源 调度) ②.修改下输⼊输出路径到本地就好了
③.如果在windows下⾯报⽤户⾮法异常,可以配置运⾏参数:DHADOOP_USER_HOME=hadoop,配置hadoop登录的⽤户名(就是 Hadoop安装的机器名) 本地Eclipse提交集群作业 ①.拷贝配置⽂件到Eclipse⽬录下。 ②.修改主类如下:
③.打包jar包。 ⽬录如下:
④.启动测试 运⾏模式总结: 1.jar包在服务器运⾏
1.jar包在服务器运⾏ 在eclipse上开发好MR程序(Linux和Eclipse都可以),然后打成Jar包, 上传服务器, 执⾏命令hadoop jar xxx.jar XXX类 这种⽅式会将和这个Job提交到yarn集群上去运⾏。 2.使⽤Eclipse本地job(主要⽤于调试) 在Eclipse中开发mr,并且直接运⾏ 3.在Eclipse⾥⾯启动,但是在yarn集群上运⾏。 a.需要把⼏个配置⽂件拷贝到项⽬中, b.需要把项⽬打包放置在当前项⽬ c.需要指定mapreduce.job.jar属性为b中打包好的.jar
是否运⾏在本地取决于mapred-site.xml中的mapreduce.framework.name属 性,如果是yarn就运⾏在yarn集群上,如果是local,就运⾏在本地
相关文章推荐
- hadoop mapreduce的job的几种运行模式
- MapReduce 编程 系列四 MapReduce例子程序运行
- Hadoop集群_WordCount运行详解--MapReduce编程模型
- hadoop中mapreduce程序的几种提交运行模式
- hadoop运行到mapreduce.job: Running job后停止运行
- [转]Hadoop集群_WordCount运行详解--MapReduce编程模型
- Hadoop之Mapreduce------>3种运行模式
- spark on yarn(Job的运行流程,可以对比mapreduce的yarn运行)
- 分布式编程模式MapReduce应用
- MapReduce:计算Job运行时的CPU与内存平均利用率
- Hadoop-MapReduce Job本地运行流程
- MapReduce程序的3种集群提交运行模式详解---基于Windows与Linux两种开发环境
- mr(mapreduce)几种提交运行模式
- mapreduce.job运行报错
- MapReduce程序的几种提交运行模式
- Arduino 编程技巧 睡眠模式的运用 2个AA电池运行一年的秘密(未完待续)
- Hadoop运行任务时一直卡在: INFO mapreduce.Job: Running job
- MapReduce:计算Job运行时的CPU与内存平均利用率
- 一脸懵逼学习MapReduce的原理和编程(Map局部处理,Reduce汇总)和MapReduce几种运行方式