基于mapreduce的两个简单例子
2017-10-22 20:10
357 查看
Mapreudce开发:
(对路由数据进行过滤,只保留想要的几项。只有map没有reduce)
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Map extends Configured implements Tool {
public static class Map1 extends Mapper<LongWritable,Text,NullWritable,Text>{
enum Couter{
LINESKIP,//计数器
}
public void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException{
String line=value.toString();//读取源数据
try{
String [] linespilt=line.split(" ");
String m=linespilt[0];
String t=linespilt[1];
String mac=linespilt[4];
String str=m+" "+t+" "+" "+mac;
Text tout=new Text(str);
context.write(NullWritable.get(), tout); //key \t value
}catch(java.lang.ArrayIndexOutOfBoundsException e)
{
context.getCounter(Couter.LINESKIP).increment(1);//出错行加1
return;
}
}
}
public int run(String[] args)throws Exception{
Configuration conf=getConf();
Job job=new Job(conf,"lala");//作业名
FileInputFormat.addInputPath(job,new Path("hdfs://192.168.146.130:9000/user/hadoop/input/mptrys"));
FileOutputFormat.setOutputPath(job,new Path("hdfs://192.168.146.130:9000/mptryd"));
job.setMapperClass(Map1.class);//map的class
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(NullWritable.class);//指定输出key
job.setOutputValueClass(Text.class);//指定输出的Value
job.waitForCompletion(true);
return job.isSuccessful()?1:0;
}
public static void main(String args[])throws Exception{
int res=ToolRunner.run(new Configuration(),new Map(), args);
System.exit(res);
}
}
倒排:
package trymr2;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class test2 extends Configured implements Tool {
public static class Map1 extends Mapper<LongWritable,Text,Text,Text>{
enum Couter{
LINESKIP,
}
public void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException{
String line=value.toString();
try{
String [] linespilt=line.split(" ");
String calf=linespilt[0];
String calt=linespilt[1];
Text tcalf=new Text(calf);
Text tcalt=new Text(calt);
context.write(tcalt, tcalf);
}catch(java.lang.ArrayIndexOutOfBoundsException e)
{
context.getCounter(Couter.LINESKIP).increment(1);
return;
}
}
}
public static class Reduce extends Reducer<Text,Text,Text,Text>{
enum Couter{
LINESKIP,
}
public void reduce(Text key,Iterable<Text> value,Context
context)throws IOException,InterruptedException{
String val;
String out="";
for(Text vale:value){
val=vale.toString();
out+=val+"|";
}
//Text a=new Text(out);
context.write(key, new Text(out));
}
}
public int run(String[] args) throws Exception{
Configuration conf=getConf();
Job job=new Job(conf,"lala1");
FileInputFormat.addInputPath(job,new Path
("hdfs://192.168.146.130:9000/user/hadoop/input/lilanmr2"));
FileOutputFormat.setOutputPath
(job,new Path("hdfs://192.168.146.130:9000/mptryd12"));
job.setMapperClass(Map1.class);
job.setReducerClass(Reduce.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.waitForCompletion(true);
return job.isSuccessful()?1:0;
}
public static void main(String args[])throws Exception{
int res=ToolRunner.run(new Configuration(),new test2(), args);
System.exit(res);
}
}
(对路由数据进行过滤,只保留想要的几项。只有map没有reduce)
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Map extends Configured implements Tool {
public static class Map1 extends Mapper<LongWritable,Text,NullWritable,Text>{
enum Couter{
LINESKIP,//计数器
}
public void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException{
String line=value.toString();//读取源数据
try{
String [] linespilt=line.split(" ");
String m=linespilt[0];
String t=linespilt[1];
String mac=linespilt[4];
String str=m+" "+t+" "+" "+mac;
Text tout=new Text(str);
context.write(NullWritable.get(), tout); //key \t value
}catch(java.lang.ArrayIndexOutOfBoundsException e)
{
context.getCounter(Couter.LINESKIP).increment(1);//出错行加1
return;
}
}
}
public int run(String[] args)throws Exception{
Configuration conf=getConf();
Job job=new Job(conf,"lala");//作业名
FileInputFormat.addInputPath(job,new Path("hdfs://192.168.146.130:9000/user/hadoop/input/mptrys"));
FileOutputFormat.setOutputPath(job,new Path("hdfs://192.168.146.130:9000/mptryd"));
job.setMapperClass(Map1.class);//map的class
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(NullWritable.class);//指定输出key
job.setOutputValueClass(Text.class);//指定输出的Value
job.waitForCompletion(true);
return job.isSuccessful()?1:0;
}
public static void main(String args[])throws Exception{
int res=ToolRunner.run(new Configuration(),new Map(), args);
System.exit(res);
}
}
倒排:
package trymr2;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class test2 extends Configured implements Tool {
public static class Map1 extends Mapper<LongWritable,Text,Text,Text>{
enum Couter{
LINESKIP,
}
public void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException{
String line=value.toString();
try{
String [] linespilt=line.split(" ");
String calf=linespilt[0];
String calt=linespilt[1];
Text tcalf=new Text(calf);
Text tcalt=new Text(calt);
context.write(tcalt, tcalf);
}catch(java.lang.ArrayIndexOutOfBoundsException e)
{
context.getCounter(Couter.LINESKIP).increment(1);
return;
}
}
}
public static class Reduce extends Reducer<Text,Text,Text,Text>{
enum Couter{
LINESKIP,
}
public void reduce(Text key,Iterable<Text> value,Context
context)throws IOException,InterruptedException{
String val;
String out="";
for(Text vale:value){
val=vale.toString();
out+=val+"|";
}
//Text a=new Text(out);
context.write(key, new Text(out));
}
}
public int run(String[] args) throws Exception{
Configuration conf=getConf();
Job job=new Job(conf,"lala1");
FileInputFormat.addInputPath(job,new Path
("hdfs://192.168.146.130:9000/user/hadoop/input/lilanmr2"));
FileOutputFormat.setOutputPath
(job,new Path("hdfs://192.168.146.130:9000/mptryd12"));
job.setMapperClass(Map1.class);
job.setReducerClass(Reduce.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.waitForCompletion(true);
return job.isSuccessful()?1:0;
}
public static void main(String args[])throws Exception{
int res=ToolRunner.run(new Configuration(),new test2(), args);
System.exit(res);
}
}
相关文章推荐
- 最简单的基于FFmpeg的AVUtil例子 (AVLog, AVOption等)
- 在ios中举个简单的protocol例子,关于两个类用协议方式传值。
- 最简单的基于FFmpeg的AVfilter例子(水印叠加)
- Caffe学习系列(9):运行caffe自带的两个简单例子
- 最简单的基于FFmpeg的AVDevice例子(读取摄像头)
- 最简单的基于libVLC的例子:最简单的基于libVLC的推流器
- 两个简单例子揭示makefile中“=”和“:=”的区别
- [雷神]最简单的基于FFmpeg的移动端例子:Android 视频转码器
- 从两个简单例子窥视协程的惊人性能(Python)
- linux下运行caffe自带的两个简单例子(格式转换及训练模型)
- 基于Spring-SpringMVC-Mybatis的简单例子
- Spring概念的解释和两个简单例子
- 最简单的基于FFmpeg的移动端例子:Android 推流器
- 运行caffe自带的两个简单例子
- 需要复习的:最简单的基于FFmpeg的AVUtil例子 (AVLog, AVOption等)
- hadoop学习第四天-Writable和WritableComparable序列化接口的使用&&MapReduce中传递javaBean的简单例子
- 最简单的基于FFmpeg的移动端例子:IOS 视频解码器
- 基于CFX WebService做的一个简单的HelloWorld例子
- MongoDB: 一个简单的配置两个shared的例子
- 基于MINA构建简单高性能的NIO应用-一个简单的例子