Nutch学习笔记1 ---------Inject
2011-10-23 11:34
232 查看
1. Inject 功能介绍
在Nutch中Inject是用来把文本格式的url列表注入到抓取数据库中,一般是用来引导系统的初始化。其中文本格式的URL每一列包含一个url。
同时inject里面保留了两个元数据。
nutch.score : 允许设置特定url的分数
nutch.fetchInterval : 表示特定url的抓取间隔,单位为毫秒。
2. Inject 命令行执行
可以用如下命令来查看数据库内容
3. Inject 源代码分析
org.apache.nutch.crawl.Injector ,使用hadoop提供的ToolRunner来运行其实例。最终的入口函数是void
inject(Path crawlDb, Path urlDir).
共涉及到了两个MapReduce。
1).主要功能是吧文件格式的输入转化成<url,CrawlDatum>格式的输出,这里的CrawlDatum是Nutch对于单个抓取url对象的一个抽象,其中有很多url相关信息。
2).主要功能是把上面新生成的输出与旧的CrawlDB数据进行合并,生成一个新的CrawlDb.
3.1 第一个MR Job任务源码分析
3.1.1 对InjectMapper源码分析
这个类主要用于对url进行解析,过滤和规格化
3.2 第二个MR任务源码分析
第二个MR任务主要是对CrawlDatum进行合并,源代码如下:
继续看InjectReduce的工作,源码如下:
最后看CrawlDb.install方法。源码部分:
4. 总结
Inject主要是从文本文件中注入新的url,使其与老的crawlDb中的url进行合并,然后把老的CrawlDb目录删除。同时把新生成的CrawlDb临时目录重命名为DrawlDb目录名
具体流程如下:
url_dir-->MapReduce1(Inject new urls)-->MapReduce2(merge new urls with old crawlDb)-->install new CrwalDb-->cleanup
主要参考 lemo专栏
在Nutch中Inject是用来把文本格式的url列表注入到抓取数据库中,一般是用来引导系统的初始化。其中文本格式的URL每一列包含一个url。
同时inject里面保留了两个元数据。
nutch.score : 允许设置特定url的分数
nutch.fetchInterval : 表示特定url的抓取间隔,单位为毫秒。
e.g. http://www.nutch.org/ \t nutch.score=10 \t nutch.fetchInterval=2592000 \t userType=open_source
2. Inject 命令行执行
bin/nutch inject <url_dir> <crawl_db>
可以用如下命令来查看数据库内容
bin/nutch readdb <crawl_db> -stats -sort输出如下
rawlDb statistics start: db/crawldb Statistics for CrawlDb: db/crawldb TOTAL urls: 1 retry 0: 1 min score: 1.0 avg score: 1.0 max score: 1.0 status 1 (db_unfetched): 1 www.baidu.com : 1 CrawlDb statistics: done
3. Inject 源代码分析
org.apache.nutch.crawl.Injector ,使用hadoop提供的ToolRunner来运行其实例。最终的入口函数是void
inject(Path crawlDb, Path urlDir).
共涉及到了两个MapReduce。
1).主要功能是吧文件格式的输入转化成<url,CrawlDatum>格式的输出,这里的CrawlDatum是Nutch对于单个抓取url对象的一个抽象,其中有很多url相关信息。
2).主要功能是把上面新生成的输出与旧的CrawlDB数据进行合并,生成一个新的CrawlDb.
3.1 第一个MR Job任务源码分析
JobConf sortJob = new NutchJob(getConf()); //获得一个Nutch Job配置 sortJob.setJobName("inject " + urlDir); //设置Job名称 FileInputFormat.addInputPath(sortJob, urlDir); //设置InputFormat,这里为FileInputFormat,这里要注意的是可以调用多次
addInputPath方法。效果是会有多个输入源 sortJob.setMapperClass(InjectMapper.class); //设置Mapper方法,主要用于解析,过滤和规格化url文本。将其转化成<url,CrawlDatum>格式
FileOutputFormat.setOutputPath(sortJob, tempDir);//定义了一个输出路径,tempDir=mapred.temp.dir/inject-temp-Random sortJob.setOutputFormat(SequenceFileOutputFormat.class);//配置了输出格式SequenceFileOutputFormat,这是Mp的一种二进制输出结构 sortJob.setOutputKeyClass(Text.class);//配置了<key,value>类型,这里为<Text,CrawlDatum> sortJob.setOutputValueClass(CrawlDatum.class); sortJob.setLong("injector.current.time", System.currentTimeMillis()); JobClient.runJob(sortJob);//提交任务到JobTracker,让其运行任务。
3.1.1 对InjectMapper源码分析
这个类主要用于对url进行解析,过滤和规格化
public void map(WritableComparable key, Text value, OutputCollector<Text, CrawlDatum> output, Reporter reporter) throws IOException { String url = value.toString(); // 将value转化成string型 if (url != null && url.trim().startsWith("#")) { //过滤以#开头的文本 /* Ignore line that start with # */ return; } // if tabs : metadata that could be stored // must be name=value and separated by \t float customScore = -1f; int customInterval = interval; Map<String,String> metadata = new TreeMap<String,String>();//设置一个保存metabata的Map容器 if (url.indexOf("\t")!=-1){ String[] splits = url.split("\t"); //对一行文本进行切分 url = splits[0]; for (int s=1;s<splits.length;s++){ // find separation between name and value int indexEquals = splits[s].indexOf("="); if (indexEquals==-1) { // skip anything without a = continue; } String metaname = splits[s].substring(0, indexEquals); //得到元数据的名字 String metavalue = splits[s].substring(indexEquals+1); //得到元数据的值 if (metaname.equals(nutchScoreMDName)) { //判定是不是保留的元数据 try { customScore = Float.parseFloat(metavalue);} catch (NumberFormatException nfe){} } else if (metaname.equals(nutchFetchIntervalMDName)) { try { customInterval = Integer.parseInt(metavalue);} catch (NumberFormatException nfe){} } else metadata.put(metaname,metavalue); //如果这个元数据不是保留的原数据,就放到容器中 } } try { url = urlNormalizers.normalize(url, URLNormalizers.SCOPE_INJECT);//对url进行规格化,这里调用的是plugings中的插件 url = filters.filter(url); // filter the url } catch (Exception e) { if (LOG.isWarnEnabled()) { LOG.warn("Skipping " +url+":"+e); } url = null; } if (url != null) { // if it passes value.set(url); // collect it
//这里生成一个CrawlDatum对象,设置一些url初始化数据 CrawlDatum datum = new CrawlDatum(CrawlDatum.STATUS_INJECTED, customInterval); datum.setFetchTime(curTime); //设置当前url的抓取时间 // now add the metadata Iterator<String> keysIter = metadata.keySet().iterator(); while (keysIter.hasNext()){ //配置其元数据 String keymd = keysIter.next(); String valuemd = metadata.get(keymd); datum.getMetaData().put(new Text(keymd), new Text(valuemd)); } if (customScore != -1) datum.setScore(customScore);//设置初始化分数 else datum.setScore(scoreInjected); try { scfilters.injectedScore(value, datum);//这里对url的分数进行初始化 } catch (ScoringFilterException e) { if (LOG.isWarnEnabled()) { LOG.warn("Cannot filter injected score for url " + url + ", using default (" + e.getMessage() + ")"); } }
//Map收集相应的数据,类型为<Text,CrawlDatum> output.collect(value, datum); } } }
3.2 第二个MR任务源码分析
第二个MR任务主要是对CrawlDatum进行合并,源代码如下:
//合并已经存在的crawldb
JobConf mergeJob = CrawlDb.createJob(getConf(), crawlDb); //配置相应的Job
FileInputFormat.addInputPath(mergeJob, tempDir); 配置输入文本数据,就是上面的第一个MR任务的输出 mergeJob.setReducerClass(InjectReducer.class);//这里配置了Reduce的抽象类,这里回覆盖上面createJob设置的Reduce类。 JobClient.runJob(mergeJob); //提交运行任务 CrawlDb.install(mergeJob, crawlDb);//把上面新生成的目录重命名为crawlDb的标准文件夹名,然后删除老的目录 // clean up FileSystem fs = FileSystem.get(getConf()); fs.delete(tempDir, true); //把第一个MR任务的输出目录删除下面是createJob的源代码说明:
public static JobConf createJob(Configuration config, Path crawlDb) throws IOException {
//生车新的crawlDb文件名 Path newCrawlDb = new Path(crawlDb,Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); JobConf job = new NutchJob(config); //生成相应的Job配置抽象 job.setJobName("crawldb " + crawlDb);
Path current = new Path(crawlDb, CURRENT_NAME); if (FileSystem.get(job).exists(current)) { //如果存在老的CrawlDb目录,将其加入到InputPath中,和上面
的tempDir一起进行合并 FileInputFormat.addInputPath(job, current); }// NOTE:有没有注意到这里如果有老的CrawlDb目录的话,那它的文件格式是MapFileOutputFormat,而下面对其读取用了SequenceFileInputFormat来读,因为这两个类底层都是调用了SequenceFile的Reader与Writer来读写的,所以可以通用。
job.setInputFormat(SequenceFileInputFormat.class);//设子CrawlDb目录文件的格式为
SequenceFileInputFormat job.setMapperClass(CrawlDbFilter.class);//设在相应的Map操作,主要是过滤和规格化url job.setReducerClass(CrawlDbReducer.class);//设置相应的Reduce操作,主要是对相应的url进行聚合 FileOutputFormat.setOutputPath(job, newCrawlDb);//设置新的输出路径 job.setOutputFormat(MapFileOutputFormat.class);//设置输出格式MapFileOutputFormat job.setOutputKeyClass(Text.class);//这里设置了输出类型<Text,CrawlDatum> job.setOutputValueClass(CrawlDatum.class); return job; }
继续看InjectReduce的工作,源码如下:
public void reduce(Text key, Iterator<CrawlDatum> values, OutputCollector<Text, CrawlDatum> output, Reporter reporter) throws IOException { boolean oldSet = false;
//把相同url聚合後的结果进行处理,这里循环主要是判定新注入的url与老的url有没有相同的,
//如果有相同的话就不设置其状态,支持collect出去 while (values.hasNext()) { CrawlDatum val = values.next(); if (val.getStatus() == CrawlDatum.STATUS_INJECTED) { injected.set(val); injected.setStatus(CrawlDatum.STATUS_DB_UNFETCHED); } else { old.set(val); oldSet = true; } } CrawlDatum res = null; if (oldSet) res = old; // don't overwrite existing value else res = injected; output.collect(key, res); } }
最后看CrawlDb.install方法。源码部分:
public static void install(JobConf job, Path crawlDb) throws IOException { Path newCrawlDb = FileOutputFormat.getOutputPath(job); //得到第二个MR任务的输出目录 FileSystem fs = new JobClient(job).getFs(); Path old = new Path(crawlDb, "old"); Path current = new Path(crawlDb, CURRENT_NAME); //得到CrawlDb的正规目录名,也就是没有老的CrawlDB if (fs.exists(current)) {
//如果有老的CrawlDb目录,就把老的目录名重命名为old这个名字 if (fs.exists(old)) fs.delete(old, true);//这里判定old这个目录是不是已经存在,如果存在就删除 fs.rename(current, old); } fs.mkdirs(crawlDb); fs.rename(newCrawlDb, current);//把第二个MR任务的输出目录重命名为Current目录,也就是正规目录
if (fs.exists(old)) fs.delete(old, true);//删除重命名後的老的CrawlDb目录 Path lock = new Path(crawlDb, LOCK_NAME); LockUtil.removeLockFile(fs, lock);//目录解锁 }
4. 总结
Inject主要是从文本文件中注入新的url,使其与老的crawlDb中的url进行合并,然后把老的CrawlDb目录删除。同时把新生成的CrawlDb临时目录重命名为DrawlDb目录名
具体流程如下:
url_dir-->MapReduce1(Inject new urls)-->MapReduce2(merge new urls with old crawlDb)-->install new CrwalDb-->cleanup
主要参考 lemo专栏
相关文章推荐
- Nutch 1.3 学习笔记3-1 Inject CrawlDB Reader
- *Nutch 1.3 学习笔记3-1 Inject CrawlDB Reader
- Nutch 1.3 学习笔记 - Inject
- Nutch 1.3 学习笔记3 - Inject
- *** Nutch 1.3 学习笔记3 - Inject
- nutch 1.3 学习笔记3-1 Inject CrawlDB Reader
- Apache Nutch 1.3 学习笔记三(Inject)
- Apache Nutch 1.3 学习笔记三(Inject CrawlDB Reader)
- Nutch 1.3 学习笔记3-1 Inject CrawlDB Reader
- Nutch 1.3 学习笔记3 - Inject
- Nutch学习笔记5---Nutch 1.7 解析流程图
- Nutch 1.3 学习笔记 11-1 页面评分机制 OPIC
- Nutch 1.3 学习笔记1
- nutch-1.7-学习笔记(1)-org.apache.nutch.crawl.Injector.java-Configuration
- nutch-1.7-学习笔记(1)-org.apache.nutch.crawl.Injector.java-CrawlDatum
- nutch-1.7-学习笔记(1)-org.apache.nutch.crawl.Injector.java-Filesystem.get()
- Nutch 1.3 学习笔记1
- Nutch学习笔记2: Nutch-2.2.1脚本分析
- Nutch 1.3 学习笔记 10-3 插件机制分析
- Nutch学习笔记4-Nutch 1.7 的 索引篇 ElasticSearch