Hadoop-MultipleInputs实例<转>
2013-08-16 18:13
246 查看
MR程序:
package org.forward.example.hadoop.multipleinputs;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.lib.MultipleInputs;
public class Example_1
{
/**
* 处理File_1.txt 输入: 行号,行内容<br/>
* 输出: key=国家名, value=编号
*
* @author Jim
*
*/
public static class MapA extends MapReduceBase implements Mapper<LongWritable, Text, Text, Capital_or_ID>
{
@Override
public void map(LongWritable lineN, Text content, OutputCollector<Text, Capital_or_ID> collect, Reporter rp)
throws IOException
{
// TODO Auto-generated method stub
String part[] = content.toString().split("\\t");
if (part.length == 2)
{
Capital_or_ID coi = new Capital_or_ID(part[0], "file_1");
collect.collect(new Text(part[1]), coi);
}
System.out.println("in MapA: content="+content);
for(String s:part)
{
System.out.println("part[idx]="+s);
}
}
}
/**
* 处理File_2.txt 输入: 行号,行内容<br/>
* 输出: key=国家名,value=首都名
*
* @author Jim
*
*/
public static class MapB extends MapReduceBase implements Mapper<LongWritable, Text, Text, Capital_or_ID>
{
@Override
public void map(LongWritable lineN, Text content, OutputCollector<Text, Capital_or_ID> collect, Reporter rp)
throws IOException
{
// TODO Auto-generated method stub
String part[] = content.toString().split("\\t");
if (part.length == 2)
{
Capital_or_ID coi = new Capital_or_ID(part[1], "file_2");
collect.collect(new Text(part[0]), coi);
}
System.out.println("in MapB: content="+content);
for(String s:part)
{
System.out.println("part[idx]="+s);
}
}
}
/**
* Reduce.class处理最后结果,将国家名、编号和首都格式化为:"ID=%s\tcountry=%s\tcapital=%s"
*
* ID=1 country=China capital=BeiJing
*
* @author Jim
*
*/
public static class Reduce extends MapReduceBase implements Reducer<Text, Capital_or_ID, Text, Text>
{
@Override
public void reduce(Text countryName, Iterator<Capital_or_ID> values, OutputCollector<Text, Text> collect,
Reporter rp) throws IOException
{
// TODO Auto-generated method stub
String capitalName = null, ID = null;
while (values.hasNext())
{
Capital_or_ID coi = values.next();
if (coi.getTag().equals("file_1"))
{
ID = coi.getValue();
} else if (coi.getTag().equals("file_2"))
{
capitalName = coi.getValue();
}
}
String result = String.format("ID=%s\tname=%s\tcapital=%s", ID, countryName, capitalName);
collect.collect(countryName, new Text(result));
}
}
public static void main(String args[]) throws IOException
{
// args[0] file1 for MapA
String file_1 = args[0];
// args[1] file2 for MapB
String file_2 = args[1];
// args[2] outPath
String outPath = args[2];
JobConf conf = new JobConf(Example_1.class);
conf.setJobName("example-MultipleInputs");
conf.setMapOutputKeyClass(Text.class);
conf.setMapOutputValueClass(Capital_or_ID.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setReducerClass(Reduce.class);
conf.setOutputFormat(TextOutputFormat.class);
FileOutputFormat.setOutputPath(conf, new Path(outPath));
MultipleInputs.addInputPath(conf, new Path(file_1), TextInputFormat.class, MapA.class);
MultipleInputs.addInputPath(conf, new Path(file_2), TextInputFormat.class, MapB.class);
JobClient.runJob(conf);
}
}
用到的类:
package org.forward.example.hadoop.multipleinputs;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
/**
* @author Jim
* 该类为自定义数据类型. 其目的是为了在多个输入文件多个Mapper的情况下,标记数据.从而Reduce可以辨认value的来源
*/
public class Capital_or_ID implements Writable
{
/** 相同来源的value应当具有相同的tag */
private String tag=null;
private String value=null;
public Capital_or_ID()
{
}
public Capital_or_ID(String value,String tag)
{
this.value=value;
this.tag=tag;
}
public String getTag()
{
return tag;
}
public void setTag(String tag)
{
this.tag=tag;
}
public String getValue()
{
return value;
}
@Override
public void readFields(DataInput in) throws IOException
{
// TODO Auto-generated method stub
tag=in.readUTF();
value=in.readUTF();
}
@Override
public void write(DataOutput out) throws IOException
{
// TODO Auto-generated method stub
out.writeUTF(tag);
out.writeUTF(value);
}
}
package org.forward.example.hadoop.multipleinputs;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.lib.MultipleInputs;
public class Example_1
{
/**
* 处理File_1.txt 输入: 行号,行内容<br/>
* 输出: key=国家名, value=编号
*
* @author Jim
*
*/
public static class MapA extends MapReduceBase implements Mapper<LongWritable, Text, Text, Capital_or_ID>
{
@Override
public void map(LongWritable lineN, Text content, OutputCollector<Text, Capital_or_ID> collect, Reporter rp)
throws IOException
{
// TODO Auto-generated method stub
String part[] = content.toString().split("\\t");
if (part.length == 2)
{
Capital_or_ID coi = new Capital_or_ID(part[0], "file_1");
collect.collect(new Text(part[1]), coi);
}
System.out.println("in MapA: content="+content);
for(String s:part)
{
System.out.println("part[idx]="+s);
}
}
}
/**
* 处理File_2.txt 输入: 行号,行内容<br/>
* 输出: key=国家名,value=首都名
*
* @author Jim
*
*/
public static class MapB extends MapReduceBase implements Mapper<LongWritable, Text, Text, Capital_or_ID>
{
@Override
public void map(LongWritable lineN, Text content, OutputCollector<Text, Capital_or_ID> collect, Reporter rp)
throws IOException
{
// TODO Auto-generated method stub
String part[] = content.toString().split("\\t");
if (part.length == 2)
{
Capital_or_ID coi = new Capital_or_ID(part[1], "file_2");
collect.collect(new Text(part[0]), coi);
}
System.out.println("in MapB: content="+content);
for(String s:part)
{
System.out.println("part[idx]="+s);
}
}
}
/**
* Reduce.class处理最后结果,将国家名、编号和首都格式化为:"ID=%s\tcountry=%s\tcapital=%s"
*
* ID=1 country=China capital=BeiJing
*
* @author Jim
*
*/
public static class Reduce extends MapReduceBase implements Reducer<Text, Capital_or_ID, Text, Text>
{
@Override
public void reduce(Text countryName, Iterator<Capital_or_ID> values, OutputCollector<Text, Text> collect,
Reporter rp) throws IOException
{
// TODO Auto-generated method stub
String capitalName = null, ID = null;
while (values.hasNext())
{
Capital_or_ID coi = values.next();
if (coi.getTag().equals("file_1"))
{
ID = coi.getValue();
} else if (coi.getTag().equals("file_2"))
{
capitalName = coi.getValue();
}
}
String result = String.format("ID=%s\tname=%s\tcapital=%s", ID, countryName, capitalName);
collect.collect(countryName, new Text(result));
}
}
public static void main(String args[]) throws IOException
{
// args[0] file1 for MapA
String file_1 = args[0];
// args[1] file2 for MapB
String file_2 = args[1];
// args[2] outPath
String outPath = args[2];
JobConf conf = new JobConf(Example_1.class);
conf.setJobName("example-MultipleInputs");
conf.setMapOutputKeyClass(Text.class);
conf.setMapOutputValueClass(Capital_or_ID.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setReducerClass(Reduce.class);
conf.setOutputFormat(TextOutputFormat.class);
FileOutputFormat.setOutputPath(conf, new Path(outPath));
MultipleInputs.addInputPath(conf, new Path(file_1), TextInputFormat.class, MapA.class);
MultipleInputs.addInputPath(conf, new Path(file_2), TextInputFormat.class, MapB.class);
JobClient.runJob(conf);
}
}
用到的类:
package org.forward.example.hadoop.multipleinputs;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
/**
* @author Jim
* 该类为自定义数据类型. 其目的是为了在多个输入文件多个Mapper的情况下,标记数据.从而Reduce可以辨认value的来源
*/
public class Capital_or_ID implements Writable
{
/** 相同来源的value应当具有相同的tag */
private String tag=null;
private String value=null;
public Capital_or_ID()
{
}
public Capital_or_ID(String value,String tag)
{
this.value=value;
this.tag=tag;
}
public String getTag()
{
return tag;
}
public void setTag(String tag)
{
this.tag=tag;
}
public String getValue()
{
return value;
}
@Override
public void readFields(DataInput in) throws IOException
{
// TODO Auto-generated method stub
tag=in.readUTF();
value=in.readUTF();
}
@Override
public void write(DataOutput out) throws IOException
{
// TODO Auto-generated method stub
out.writeUTF(tag);
out.writeUTF(value);
}
}
相关文章推荐
- Hadoop Partition使用实例<转>
- Hadoop的Partitioner使用实例2<转>
- Android实战简易教程<五十九>(EventBus小实例-传值、控制其他页控件显示)
- NoSuchMethodException: org.apache.hadoop.io.ArrayWritable.<init>问题解决总结
- Android NDK学习 <三> Android.mk实例和NDK实用技巧
- 服务名、实例名和数据库名 <转载>
- [Hadoop源码解读](一)MapReduce篇之InputFormat<转>
- Hadoop新旧API区别2<转>
- Hadoop的MapFile, SetFile, ArrayFile 及 BloomMapFile文件格式<转>
- Hadoop-datanode-FSDataset解析<转>
- Hadoop平台优化综述(一) &lt;转&gt;
- 关于< tbody >的一个实例,很实用。
- 关于< tbody >的一个实例,很实用。
- Hadoop学习<一>--hadoop安装和环境变量设置
- Win7中使用Eclipse连接虚拟机中的Ubuntu中的Hadoop2.4<2>
- [HTML5-Canvas] HTML 5的<canvas>元素教程和实例
- Android实战简易教程<五十八>(AlarmManager类用法研究小实例)
- 11_5_2扩展上一题中的Person 类,重载>、<、>=和<=运算符,比较Person实例的Age属性
- Hadoop Writable深度复制及读取任意<key,value>序列文件
- button在底部的多个view切换<实例一正常切换>