您的位置:首页 > 其它

Combiner执行顺序引起的错误,无法产生结果

2014-02-28 14:22 399 查看
昨天在些Tmporal Join 的代码,算法是将时间分成以阈值为大小的一个个小的区间,之后每隔一定数目的区间为同一个Key,最后一个区间需要送入到下一个Key之中,当我完成代码对的时候发现无法产生结果,通过控制台对输出

smap1**6
rmap1**0
smap2** 6
rmap2** 0
smap1**16
rmap1**0
smap2** 16
rmap2** 0
smap1**26
rmap1**0
smap2** 26
rmap2** 0
smap1**36
rmap1**0
smap2** 36
rmap2** 0
smap1**46
rmap1**0
smap2** 46
rmap2** 0
smap1**49
rmap1**0
smap2** 49
rmap2** 0


发现两个Map总有一个是没有数值的,这样就无法做join操作,开始我以为是我Reducer错误,赋值的时候出现误差,后来通过控制台对输出是没有问题,后来想到是不是map中没有读取另一个文件,后来查看发现也不是,两个文件都读取来,这个时候开始疑问是不是MapReduce这个架构出现来问题,造成无法完成我需要的操作,后来我看到以前的程序,按照这个逻辑都是可以运行对,后来我查看我的Job文件,才发现我写了Combine函数,我用我Reduce的类去执行对Combine对逻辑,本来我以为是没有问题,我认为一台电脑的执行顺序应该是执行完所有对map函数,再执行combine,而我是一台电脑,应该读取两个文件之后再执行combine,这样应该没错呀,但是看控制台对输出发现这和我想的执行过程有区别,看控制台对输出,应该是读取完一个map之后,就马上执行combine,这样才会有一个是空的集合输出,所以应该是有一个map完成之后就马上combine

以下是我查询到对资料

Combiner会在map端的那个时期执行呢?实际上,Conbiner函数的执行时机可能会在map的merge操作完成之前,也可能在merge之后执行,这个时机由配置参数min.num.spill.for.combine(该值默认为3),也就是说在map端产生的spill文件最少有min.num.spill.for.combine的时候,Conbiner函数会在merge操作合并最终的本机结果文件之前执行,否则在merge之后执行。通过这种方式,就可以在spill文件很多并且需要做conbine的时候,减少写入本地磁盘的数据量,同样也减少了对磁盘的读写频率,可以起到优化作业的目的。

merge有三种形式:1)内存到内存 2)内存到磁盘 3)磁盘到磁盘。默认情况下第一种形式不启用,让人比较困惑,是吧。当内存中的数据量到达一定阈值,就启动内存到磁盘的merge。与map
端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。第二种merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的那个文件。

总结如下:

在做map操作的时候,如果有设置有combine操作的话,为了节省空间,在map执行的过程中就会执行combine函数,不一定会等到所有的map结束后才会执行。

package org.macau.stjoin.basic.temporal;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.macau.flickr.job.TemporalSimilarityJoin;
import org.macau.flickr.util.FlickrSimilarityUtil;
import org.macau.flickr.util.FlickrValue;

/**
*
* @author hadoop
* Map: Input:  KEY  :
*              Value:
*      output: KEY  :
*      		Value:
*
* Reduce: Input: KEY  :
* 				  Value:
*/
public class TemporalJoinJob {

public static boolean TemporalSimilarityBasicJoin(Configuration conf) throws Exception{

Job basicJob = new Job(conf,"Temporal Basic Similarity Join");
basicJob.setJarByClass(TemporalSimilarityJoin.class);

basicJob.setMapperClass(TemporalJoinMapper.class);
basicJob.setCombinerClass(TemporalJoinReducer.class);

basicJob.setReducerClass(TemporalJoinReducer.class);

basicJob.setMapOutputKeyClass(LongWritable.class);
basicJob.setMapOutputValueClass(FlickrValue.class);

//		basicJob.setOutputKeyClass(Text.class);
//		basicJob.setOutputValueClass(Text.class);
//		basicJob.setNumReduceTasks(6);

FileInputFormat.addInputPath(basicJob, new Path(FlickrSimilarityUtil.flickrInputPath));
FileOutputFormat.setOutputPath(basicJob, new Path(FlickrSimilarityUtil.flickrOutputPath));

if(basicJob.waitForCompletion(true))
return true;
else
return false;
}
}


另附map代码

package org.macau.stjoin.basic.temporal;

/**
* The Mapper uses the temporal information
*
*/
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.macau.flickr.util.FlickrSimilarityUtil;
import org.macau.flickr.util.FlickrValue;

public class TemporalJoinMapper extends
Mapper<Object, Text, LongWritable, FlickrValue>{

private final LongWritable outputKey = new LongWritable();

private final FlickrValue outputValue = new FlickrValue();

public static String convertDateToString(Date date){
SimpleDateFormat df=new SimpleDateFormat("yyyy-MM-dd");
return df.format(date);
}

public static Date convertLongToDate(Long date){
return new Date(date);
}

public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {

InputSplit inputSplit = context.getInputSplit();

//R: 0; S:1
int tag;

//get the the file name which is used for separating the different set
String fileName = ((FileSplit)inputSplit).getPath().getName();

if(fileName.contains(FlickrSimilarityUtil.R_TAG)){

tag = 0;

}else{
tag = 1;
}

long id =Long.parseLong(value.toString().split(":")[0]);
double lat = Double.parseDouble(value.toString().split(":")[2]);
double lon = Double.parseDouble(value.toString().split(":")[3]);
long timestamp = Long.parseLong(value.toString().split(":")[4]);

/* Convert the timestamp to the Date
* use the day as key
* the all value as a value
* use the timestamp to refine and compare the distance
*/

//		long previousTimeStamp = timestamp - MS_OF_ONE_DAY;
//		long laterTimestamp = timestamp + MS_OF_ONE_DAY;

long timeInterval = timestamp / FlickrSimilarityUtil.TEMPORAL_THRESHOLD;

outputValue.setTileNumber((int)timeInterval);

outputValue.setId(id);
outputValue.setLat(lat);
outputValue.setLon(lon);
outputValue.setTag(tag);
outputValue.setTiles(value.toString().split(":")[5]);

outputValue.setTimestamp(timestamp);

//		System.out.println("map" + (timeInterval/10 + 1));

if(timeInterval % 10 == 9){

outputKey.set(timeInterval/10 + 1);
context.write(outputKey, outputValue);

}

outputKey.set(timeInterval/10);
context.write(outputKey, outputValue);

}
}


另附reducer代码

package org.macau.stjoin.basic.temporal;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.math.util.OpenIntToDoubleHashMap.Iterator;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;
import org.macau.flickr.util.FlickrSimilarityUtil;
import org.macau.flickr.util.FlickrValue;

import org.macau.stjoin.basic.temporal.TemporalComparator;;

public class TemporalJoinReducer extends
Reducer<LongWritable, FlickrValue, Text, Text>{

private final Text text = new Text();
private final ArrayList<FlickrValue> records = new ArrayList<FlickrValue>();
private final long MS_OF_ONE_DAY = 86400000L;

private final Map<Integer,ArrayList<FlickrValue>> rMap = new HashMap<Integer,ArrayList<FlickrValue>>();
private final Map<Integer,ArrayList<FlickrValue>> sMap = new HashMap<Integer,ArrayList<FlickrValue>>();

@SuppressWarnings("unchecked")
public void reduce(LongWritable key, Iterable<FlickrValue> values,
Context context) throws IOException, InterruptedException{

//store the R and S records

for(FlickrValue value:values){

FlickrValue recCopy = new FlickrValue(value);
records.add(recCopy);

//			    System.out.println(value);

if(value.getTag() == FlickrSimilarityUtil.R_tag){

if(rMap.containsKey(value.getTileNumber())){

rMap.get(value.getTileNumber()).add(new FlickrValue(value));

}else{

ArrayList<FlickrValue> list = new ArrayList<FlickrValue>();

list.add(value);
//				    	System.out.println("rmap:" + rMap.size());

rMap.put(value.getTileNumber(), list);
//				    	System.out.println("rmap==" + rMap.size());
}
}else{

if(sMap.containsKey(value.getTileNumber())){

sMap.get(value.getTileNumber()).add(new FlickrValue(value));

}else{

ArrayList<FlickrValue> list = new ArrayList<FlickrValue>();

list.add(value);

sMap.put(value.getTileNumber(), list);
}
}

}

System.out.println("smap1**" + sMap.size());
System.out.println("rmap1**" + rMap.size());

for(java.util.Iterator<Integer> i = rMap.keySet().iterator();i.hasNext();){

TemporalComparator comp = new TemporalComparator();
Collections.sort(rMap.get(i.next()),comp);

}

for(java.util.Iterator<Integer> i = sMap.keySet().iterator();i.hasNext();){

TemporalComparator comp = new TemporalComparator();
Collections.sort(sMap.get(i.next()),comp);

}

System.out.println("smap2** " + sMap.size());
System.out.println("rmap2** " + rMap.size());

for(java.util.Iterator<Integer> obj = rMap.keySet().iterator();obj.hasNext();){

System.out.println("smap3**" + sMap.size());
System.out.println("rmap3**" + rMap.size());
Integer i = obj.next();

//				System.out.println(i + "fuck"+rMap.get(i).size());
//				System.out.println("smap" + sMap.size());
//				System.out.println(sMap.containsKey(i));
//				System.out.println(sMap.containsKey(obj));

for(java.util.Iterator<Integer> o= sMap.keySet().iterator();o.hasNext();){

int value = o.next();
System.out.println("smap4** " + sMap.size());
System.out.println("rmap4** " + rMap.size());
//					System.out.println(value + "good "+sMap.get(value).size());

}

if(sMap.containsKey(i)){

System.out.println("size" + rMap.get(i).size());

for(int j = 0;j < rMap.get(i).size();j++){

FlickrValue value1 = rMap.get(i).get(j);

System.out.println(i + "," + j );

for(int k = 0; k < sMap.get(i).size();k++){
FlickrValue value2 = sMap.get(i).get(k);

if(FlickrSimilarityUtil.SpatialSimilarity(value1, value2)){
long ridA = value1.getId();
long ridB = value2.getId();
if (ridA < ridB) {
long rid = ridA;
ridA = ridB;
ridB = rid;
}

//System.out.println("" + ridA + "%" + ridB);

text.set("" + ridA + "%" + ridB);
context.write(text, new Text(""));
}
}

// for the adjacent tail
if(sMap.containsKey(i+1)){
for(int m = 0; m < sMap.get(i+1).size();m++){
FlickrValue value3 = sMap.get(i+1).get(m);

if(FlickrSimilarityUtil.SpatialSimilarity(value1, value3)){

if(FlickrSimilarityUtil.SpatialSimilarity(value1, value3)){

long ridA = value1.getId();
long ridB = value3.getId();

if (ridA < ridB) {
long rid = ridA;
ridA = ridB;
ridB = rid;
}

text.set("" + ridA + "%" + ridB);
context.write(text, new Text(""));
}
}

}
}

}
}
}

//			rMap.clear();
//			sMap.clear();
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐