您的位置:首页 > 数据库

nutch中的Indexer类建立索引时,会将原来的index索引删除,然后重新建立索引,如何实现增量索引?

2012-08-02 16:53 423 查看
    如何实现nutch增量索引,首先应该在Indexer建索引的时候,只对本次抓取的数据进行索引(以前的数据已经被索引过,所以没有必要再次进行索引),建立的索引假设在indexes文件夹下;然后在将本次建立的索引和以前建立的索引进行去重操作,去除对同一个url的重复的Document;最后将本次索引和以前建立的索引进行合并操作。下面将对上面三个步骤进行详细的说明:

(1)Indexer类的index()函数负责创建一个job,进行索引操作。index函数调用IndexerMapReduce类的静态函数initMRJob()初始化该job,initMRJob负责将需要建立索引的数据通过FileInputFormat.addInputPath(job, p1)操作,加载到job的输入路径中,所以可以通过修改iniMRJob()函数来控制建立索引的输入数据,即只将本次爬取的数据作为输入来避免重复建立索引。代码如下:

/**
* 修改方法:该方法可以根据爬取的深度depth来确定需要索引的segment文件
* @param crawlDb 链接数据库路径
* @param linkDb  外部链接信息数据库路径
* @param segments  爬取结果数据路径
* @param job  工作配置信息类
* @param depth  本次爬取深度
*/
public static void initMRJob(Path crawlDb, Path linkDb,
Collection<Path> segments,
JobConf job,int depth) {
LOG.info("IndexerMapReduce: crawldb: " + crawlDb);//写日志信息
LOG.info("IndexerMapReduce: linkdb: " + linkDb);//写日记信息

//	    System.err.println("size="+segments.size());
/**
* 下面的for循环,主要工作是取出爬取数据路径下所有的文件夹,然后将每个文件路径通过addInputPath添加到job中
* 只有添加的路径中的数据才会被索引,因为从文件segments中取出爬取结果文件夹的顺序是先入先出(即越早爬取的数据
* 会越早被取出),所以我们根据爬取深度将后面最新爬取的depth个结果数据文件夹添加到job中进行索引。
*/
int count=segments.size()-depth,count2=1;//
for (final Path segment : segments) {

System.out.print("IndexerMapReduce: segment="+segment.toString());

if((count2++)>count){//将最新的depth个结果文件夹添加的job中
LOG.info("IndexerMapReduces: adding segment: " + segment);
Path p1=new Path(segment, CrawlDatum.FETCH_DIR_NAME);
FileInputFormat.addInputPath(job, p1);
Path p2= new Path(segment, CrawlDatum.PARSE_DIR_NAME);
FileInputFormat.addInputPath(job,p2);
Path p3=new Path(segment, ParseData.DIR_NAME);
FileInputFormat.addInputPath(job, p3);
Path p4=new Path(segment, ParseText.DIR_NAME);
FileInputFormat.addInputPath(job, p4);
System.out.println("该文件被选中");
}else{//过滤掉老的结果文件夹
LOG.info("IndexerMapReduces: the segment is too old to add: " + segment);
System.out.println("该文件被忽略");
}

}

/**
* 下面是初始化job的其他工作,这里不再介绍
*/
FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME));
//	    System.err.println("CrawlDb="+new Path(crawlDb, CrawlDb.CURRENT_NAME));
FileInputFormat.addInputPath(job, new Path(linkDb, LinkDb.CURRENT_NAME));
//	    System.err.println("LinkDb="+new Path(linkDb, LinkDb.CURRENT_NAME));
job.setInputFormat(SequenceFileInputFormat.class);

job.setMapperClass(IndexerMapReduce.class);
job.setReducerClass(IndexerMapReduce.class);

job.setOutputFormat(IndexerOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setMapOutputValueClass(NutchWritable.class);
job.setOutputValueClass(NutchWritable.class);
}

备注:indexer在创建索引钱会重新创建临时索引存放路径indexes文件夹,所以以前的索引会被删除掉,indexes下的索引是本次刚得到的索引。

(2)新索引建立完成,需要通过DeleteDuplicates类进行去重操作,调用方法为dedup.dedup(new Path[] { indexes}),他会将indexes文件夹下(indexes文件夹下存放多个索引,每个索引一个文件夹)所有的索引添加到job中,然后经过多步操作完成去重。所以首先应该将以前建立的最终索引剪切到该文件夹,这样DeleteDuplicates就会完成去重操作,这一步的工作关键是如何将以前的索引index移动到indexes下,通过分析hadoop的HDFS文件系统得到moveFromLocalFile()函数,可以移动文件系统中的文件夹,所以完成index文件夹移动的代码如下:

if(indexes != null) {

if (fs.exists(index)) {
/**
* 将index剪切到indexes目录下
*/
fs.moveFromLocalFile(index, indexes);
}
}

(3)IndexMerger负责将索引进行合并操作,操作的代码如下:

fstats = fs.listStatus(indexes, HadoopFSUtil.getPassDirectoriesFilter(fs));
merger.merge(HadoopFSUtil.getPaths(fstats), index, tmpDir);

解释:首先他获得indexes目录下的所有索引(以索引文件夹的形式存在),然后由merge()函数对所有的索引进行合并操作。因为以前的索引已经被移动到了indexes文件夹下,所以这样就不需要再改动IndexMerger的代码,就可以实现新老索引的合并操作,最终他会在结果路径下创建新的最终索引index,然后将合并后的结果放在index中。

 

 

 

 

 

 

 

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
相关文章推荐