您的位置:首页 > 其它

多数据源的MapReduce作业(二)--基于Distributed的复制联结

2014-03-03 21:29 302 查看
 多数据源在reduce侧做join操作,效率不会太高。我们首先会让所有的数据在网络上重排,然后在conbine联结过程中丢弃了大部分的数据。如果我们在mapper侧就去除不必要的数据,联结会更有效率。

  map阶段执行联结主要障碍是一个mapper正在处理的记录要能访问到另外表的所有数据,这样就能保证map侧联结可以正常工作。

   引入hadoop的DistributedCache。仔细观察发现,大部分两表做join操作时,都会是一张大表,一张小表。可以将小表的数据复制到每个执行map的节点上,这样就能访问到小表所有的数据。







缺点:如果小表的数据可观,会出现OOM现象。

具体实例如下:

自定义InputFormat,当然也可以不用定义,直接用KeyValueInputFormat。

DisCahceInputFormat.java

package com.hadoop.data.join.disCache;

import java.io.IOException;

import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FSDataInputStream;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.InputSplit;

import org.apache.hadoop.mapreduce.JobContext;

import org.apache.hadoop.mapreduce.RecordReader;

import org.apache.hadoop.mapreduce.TaskAttemptContext;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import org.apache.hadoop.util.LineReader;

public class DisCahceInputFormat extends FileInputFormat<Text, Text> {

 @Override  
   protected boolean isSplitable(JobContext context, Path filename) {  
       // TODO Auto-generated method stub  
       return false;  
   }  
   @Override  
   public RecordReader<Text, Text> createRecordReader(InputSplit inputsplit,  
           TaskAttemptContext context) throws IOException, InterruptedException {  
       // TODO Auto-generated method stub  
       return new objPosRecordReader();  
   }  
   public static class objPosRecordReader extends RecordReader<Text,Text>{  
 
       public LineReader in;  
       public Text lineKey;  
       public Text lineValue;  
       public StringTokenizer token=null;  
         
       public Text line;  
       
       @Override  
       public void close() throws IOException {  
       
 in.close();
       }  
 
       @Override  
       public Text getCurrentKey() throws IOException, InterruptedException {  
           return lineKey;  
       }  
 
       @Override  
       public Text getCurrentValue() throws IOException,  
               InterruptedException {  
           return lineValue;  
       }  
 
       @Override  
       public float getProgress() throws IOException, InterruptedException {  
           return 0;  
       }  
 
       /**
        * 初始化
        */
       @Override  
       public void initialize(InputSplit input, TaskAttemptContext context)  
               throws IOException, InterruptedException {  
           FileSplit split=(FileSplit)input;  
           Configuration job=context.getConfiguration();  
           Path file=split.getPath();  
           FileSystem fs=file.getFileSystem(job);  
             
           FSDataInputStream filein=fs.open(file);  
           in=new LineReader(filein,job);  
           line=new Text();  
           lineKey=new Text();  
           lineValue=new Text();  
       }  
 
       @Override  
       public boolean nextKeyValue() throws IOException, InterruptedException {  
           int linesize=in.readLine(line);  
           if(linesize==0)  
               return false;  
           System.out.println("line = " + line );  
           String[] pieces = line.toString().split(",",2);  
           lineKey.set(pieces[0]);  
           lineValue.set(pieces[1]);  
             
           return true;  
       }  
   }  

}
Mapper类

JoinMapperjava

package com.hadoop.data.join.disCache;

import java.io.BufferedReader;

import java.io.FileReader;

import java.io.IOException;

import java.util.Hashtable;

import org.apache.commons.lang.StringUtils;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.filecache.DistributedCache;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

public class JoinMapper extends Mapper<Text, Text, Text, Text> {
private Hashtable<String,String> joinData = new Hashtable<String,String>();
@Override
protected void setup(Context context) throws IOException,
InterruptedException {

Configuration conf = context.getConfiguration();
System.out.println("here setup begin initializen");
try{
Path[] cacheFiles = DistributedCache.getLocalCacheFiles(conf);//获取缓存文件路径
if (cacheFiles != null && cacheFiles.length > 0) {
String line;
String[] tokens;
//String file = "Customers.txt";
BufferedReader joinReader = new BufferedReader(new FileReader(
cacheFiles[0].toString()));
try{
while((line = joinReader.readLine())!= null){
tokens = line.split(",",2);
joinData.put(tokens[0], tokens[1]);
}
System.out.println("cache map is " + joinData);
}finally{
joinReader.close();
}

}
}catch(IOException e){
System.err.println("reading distributedcache: " + e);
}
}

@Override
public void map(Text key, Text value, Context context)
throws IOException, InterruptedException {

System.out.println("let's see the joinData is " + joinData);
String joinValue = joinData.get(key.toString());

if(StringUtils.isNotEmpty(joinValue)){
context.write(key, new Text(value.toString() + "," + joinValue));
}

}

}
驱动类

DataDriver.java

package com.hadoop.data.join.disCache;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.filecache.DistributedCache;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class DataDriver{

public void run(String[] args) throws Exception {
Configuration conf = new Configuration();
if (args.length != 3) {
System.err.println("Usage:DataDisCache <input path> <output path>");
System.exit(-1);
}
String pathCache = args[0];
String pathIn = args[1];
String pathOut = args[2];
FileSystem hdfs = FileSystem.get(conf);

if (hdfs.exists(new Path(pathOut))) {
hdfs.delete(new Path(pathOut), true);
}
//指定分布式缓存文件
String file = "Customers.txt";
URI cache = URI.create(pathCache + "#" + file);
DistributedCache.addCacheFile(new URI(pathCache+"#"+file), conf);

Job job = new Job(conf, "Data join base on distributedCache");
job.setJarByClass(DataDriver.class);
job.setMapperClass(JoinMapper.class);

//set input format
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setInputFormatClass(DisCahceInputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(0);
FileInputFormat.addInputPath(job,new Path(pathIn));
FileOutputFormat.setOutputPath(job, new Path(pathOut));
System.out.println("here main we begin");
System.exit(job.waitForCompletion(true) ? 0 : 1);
System.out.println("here main we end");
}

public static void main(String[] args) throws Exception {
DataDriver dr = new DataDriver();
dr.run(args);
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: