Nutch 1.3 学习笔记 5-1 FetchThread
2011-08-27 22:54
381 查看
Nutch 1.3 学习笔记 5-1 FetchThread
-----------------------------------
上一节看了Fetcher中主要几个类的实现,这一节会来分析一下其中用到的消费者FetcherThread,来看看它是干嘛的。
1. Fetcher的Mapp模型
Fetcher.java代码中可以看到,Fetcher继承自MapRunable,它是Mapper的抽象接口,实现这个接口的子类能够更好的对Map的流程进行控制,包括多线程与异步Maper。
得到FetchItem, 抽取其url,从这个url中分析出所使用的协议,调用相应的plugin来解析这个协议
得到相当url的robotRules,看是否符合抓取规则,如果不符合或者其delayTime大于我们配置的maxDelayTime,那就不抓取这个网页
对网页进行抓取,得到其抓取的Content和抓取状态,调用FetchItemQueues的finishFetchItem方法,表明当前url已经抓取完成
根据抓取协议的状态来进行下一步操作
如果状态为WOULDBLOCK,那就进行retry,把当前url放加FetchItemQueues中,进行重试
如果是MOVED或者TEMP_MOVED,这时这个网页可以被重定向了,对其重定向的内容进行解析,得到重定向的网址,这时要生成一个新的FetchItem,根据其QueueID放到相应的队列的inProgress集合中,然后再对这个重定向的网页进行抓取
如果状态是EXCEPTION,对当前url所属的FetchItemQueue进行检测,看其异常的网页数有没有超过最大异常网页数,如果大于,那就清空这个队列,认为这个队列中的所有网页都有问题。
如果状态是RETRY或者是BLOCKED,那就输出CrawlDatum,将其状态设置成STATUS_FETCH_RETRY,在下一轮进行重新抓取
如果状态是GONE,NOTFOUND,ACCESS_DENIED,ROBOTS_DENIED,那就输出CrawlDatum,设置其状态为STATUS_FETCH_GONE,可能在下一轮中就不进行抓取了,
如果状态是NOTMODIFIED,那就认为这个网页没有改变过,那就输出其CrawlDatum,将其状态设成成STATUS_FETCH_NOTMODIFIED.
如果所有状态都没有找到,那默认输出其CrawlDatum,将其状态设置成STATUS_FETCH_RETRY,在下一轮抓取中再重试
判断网页重定向的次数,如果超过最大重定向次数,就输出其CrawlDatum,将其状态设置成STATUS_FETCH_GONE
这里有一些细节没有说明,如网页被重定向以后如果操作,相应的协议是如果产生的,这个是通过插件产生的,具体插件是怎么调用的,这里就不说了,以后有机会会再分析一下。
判断抓取的content是否为空,如果不为空,那调用相应的解析插件来对其内容进行解析,然后就是设置当前url所对应的CrawlDatum的一些参数,如当前内容的MD5码,分数等信息
然后就是使用FetchOutputFormat输出当前url的CrawlDatum,Content和解析的结果ParseResult
下面分析一下FetcherOutputFormat中所使用到的ParseOutputFormat.RecordWriter
在生成相应的ParseOutputFormat的RecordWriter过程中,这个RecordWriter会再生成三个RecordWriter来写出parse_text(MapFile),parse_data(MapFile)和crawl_parse(SequenceFile),我们在segments下具体的segment中看到的三个这样的目录就是这个对象生成的,分别输出了网页的源代码;网页的解析数据,如网页title、外链接、元数据、状态等信息,这里会对外链接进行过滤、规格化,并且用插件计算每一个外链接的初始分数;另一个是网页解析后的CrawlDatum对象,这里会分析当前CrawlDatum中的metadata,从中生成两种新的CrawlDatum,还有就是它会对外链接生成相应的CrawlDatum,放入crawl_parse目录中,这里我还没有看明白。
在Fetch的时候,会输出另外五个目录
content: 这个目录只有在配置了要输出抓取内容时才会输出
crawl_fetch: 这个目录是输出抓取成功后的CrawlDatum信息,这里是对原来crawl_generate目录中的信息进行了一些修改,下面三个目录只有配置了解析参数后才会输出,如果后面调用bin/nutch parse命令
parse_text: 这个目录存放了抓取的网页内容,以提后面建立索引用
parse_data: 这里存入了网页解析后的一些数据,如网页title,外链接信息等
crawl_parse: 这里存储了一些新生成的CrawlDatum信息,如外链接等,以供下一次迭代抓取使用
多个消费者(FetcherThread)从共享队列中取得要抓取的FetchItem数据
对FetchItem所对应的url进行抓取,得到相应的抓取内容,对抓取的状态进行判断,回调相应的操作
对抓取的内容进行解析,产生网页的外链接,生成新的CrawlDatum抓取数据,产生解析后的数据
调用FetcherOutputFormat.Writer对象,把CrawlDatum,Content,ParseResult分别写入crawl_fetch,content,(parse_text,parse_data,crawl_parse)目录中
好了,Fetcher的分析也差不多了,可能有一些细节还没有分析到,下面有机会再补上吧。
-----------------------------------
上一节看了Fetcher中主要几个类的实现,这一节会来分析一下其中用到的消费者FetcherThread,来看看它是干嘛的。
1. Fetcher的Mapp模型
Fetcher.java代码中可以看到,Fetcher继承自MapRunable,它是Mapper的抽象接口,实现这个接口的子类能够更好的对Map的流程进行控制,包括多线程与异步Maper。
1.1 Fetcher的入口函数fetch(Path segment,int threads, boolean parsing)
下面是它的源代码,来分析一下:// 对配置进行检测,看一些必要的配置是否已经配置了,如http.agent.name等参数 checkConfiguration(); // 记录fetch的开始时间 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); long start = System.currentTimeMillis(); if (LOG.isInfoEnabled()) { LOG.info("Fetcher: starting at " + sdf.format(start)); LOG.info("Fetcher: segment: " + segment); } // 这里对抓取的时候进行限制,在FetchItemQueue中会用到这个参数 // set the actual time for the timelimit relative // to the beginning of the whole job and not of a specific task // otherwise it keeps trying again if a task fails long timelimit = getConf().getLong("fetcher.timelimit.mins", -1); if (timelimit != -1) { timelimit = System.currentTimeMillis() + (timelimit * 60 * 1000); LOG.info("Fetcher Timelimit set for : " + timelimit); getConf().setLong("fetcher.timelimit", timelimit); } // 生成一个Nutch的Map-Reduce配置 JobConf job = new NutchJob(getConf()); job.setJobName("fetch " + segment); // 配置抓取线程数, job.setInt("fetcher.threads.fetch", threads); job.set(Nutch.SEGMENT_NAME_KEY, segment.getName()); // 配置是否对抓取的内容进行解析 job.setBoolean("fetcher.parse", parsing); // for politeness, don't permit parallel execution of a single task job.setSpeculativeExecution(false); // 配置输出的路径名 FileInputFormat.addInputPath(job, new Path(segment, CrawlDatum.GENERATE_DIR_NAME)); // 配置输入的文件格式,这里类继承自SequenceFileInputFormat // 它主要是覆盖了其getSplits方法,其作用是不对文件进行切分,以文件数量作为splits的依据 // 就是有几个文件,就有几个Map操作 job.setInputFormat(InputFormat.class); // 配置Map操作的类 job.setMapRunnerClass(Fetcher.class); // 配置输出路径 FileOutputFormat.setOutputPath(job, segment); // 这里配置输出文件方法,这个类在前面已经分析过 job.setOutputFormat(FetcherOutputFormat.class); // 配置输出<key,value>类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NutchWritable.class); JobClient.runJob(job);
1.2 Fetcher的run方法分析
这个是Map类的入口,用于启动抓取的生产者与消费者,下面是部分源代码:// 生成生产者,用于读取Generate出来的CrawlDatum,把它们放到共享队列中 feeder = new QueueFeeder(input, fetchQueues, threadCount * 50); //feeder.setPriority((Thread.MAX_PRIORITY + Thread.NORM_PRIORITY) / 2); // the value of the time limit is either -1 or the time where it should finish long timelimit = getConf().getLong("fetcher.timelimit", -1); if (timelimit != -1) feeder.setTimeLimit(timelimit); feeder.start(); // set non-blocking & no-robots mode for HTTP protocol plugins. getConf().setBoolean(Protocol.CHECK_BLOCKING, false); getConf().setBoolean(Protocol.CHECK_ROBOTS, false); // 启动消费者线程 for (int i = 0; i < threadCount; i++) { // spawn threads new FetcherThread(getConf()).start(); } // select a timeout that avoids a task timeout long timeout = getConf().getInt("mapred.task.timeout", 10*60*1000)/2; // 这里用一个循环来等待线程结束 do { // wait for threads to exit try { Thread.sleep(1000); } catch (InterruptedException e) {} // 这个函数是得到相前线程的抓取状态,如抓取了多少网页,多少网页抓取失败,抓取速度是多少 reportStatus(); LOG.info("-activeThreads=" + activeThreads + ", spinWaiting=" + spinWaiting.get() + ", fetchQueues.totalSize=" + fetchQueues.getTotalSize()); // 输出抓取队列中的信息 if (!feeder.isAlive() && fetchQueues.getTotalSize() < 5) { fetchQueues.dump(); } // 查看timelimit的值,这里只要返回的hitByTimeLimit不为0,checkTimelimit方法会清空抓取队列中的所有数据 // check timelimit if (!feeder.isAlive()) { int hitByTimeLimit = fetchQueues.checkTimelimit(); if (hitByTimeLimit != 0) reporter.incrCounter("FetcherStatus", "hitByTimeLimit", hitByTimeLimit); } // 查看抓取抓取线程是否超时,如果超时,就退出等待 // some requests seem to hang, despite all intentions if ((System.currentTimeMillis() - lastRequestStart.get()) > timeout) { if (LOG.isWarnEnabled()) { LOG.warn("Aborting with "+activeThreads+" hung threads."); } return; } } while (activeThreads.get() > 0); LOG.info("-activeThreads=" + activeThreads);
2. Fetcher.FetcherThread
2.1 这个类主要是用来从队列中得到FetchItem,下面来看一下其run方法,其大概做了几件事:
从抓取队列中得到一个FetchItem,如果返回为null,判断生产者是否还活着或者队列中是否还有数据, 如果队列中还有数据,那就等待,如果上面条件没有满足,就认为所有FetchItem都已经处理完了,退出当前抓取线程得到FetchItem, 抽取其url,从这个url中分析出所使用的协议,调用相应的plugin来解析这个协议
得到相当url的robotRules,看是否符合抓取规则,如果不符合或者其delayTime大于我们配置的maxDelayTime,那就不抓取这个网页
对网页进行抓取,得到其抓取的Content和抓取状态,调用FetchItemQueues的finishFetchItem方法,表明当前url已经抓取完成
根据抓取协议的状态来进行下一步操作
如果状态为WOULDBLOCK,那就进行retry,把当前url放加FetchItemQueues中,进行重试
如果是MOVED或者TEMP_MOVED,这时这个网页可以被重定向了,对其重定向的内容进行解析,得到重定向的网址,这时要生成一个新的FetchItem,根据其QueueID放到相应的队列的inProgress集合中,然后再对这个重定向的网页进行抓取
如果状态是EXCEPTION,对当前url所属的FetchItemQueue进行检测,看其异常的网页数有没有超过最大异常网页数,如果大于,那就清空这个队列,认为这个队列中的所有网页都有问题。
如果状态是RETRY或者是BLOCKED,那就输出CrawlDatum,将其状态设置成STATUS_FETCH_RETRY,在下一轮进行重新抓取
如果状态是GONE,NOTFOUND,ACCESS_DENIED,ROBOTS_DENIED,那就输出CrawlDatum,设置其状态为STATUS_FETCH_GONE,可能在下一轮中就不进行抓取了,
如果状态是NOTMODIFIED,那就认为这个网页没有改变过,那就输出其CrawlDatum,将其状态设成成STATUS_FETCH_NOTMODIFIED.
如果所有状态都没有找到,那默认输出其CrawlDatum,将其状态设置成STATUS_FETCH_RETRY,在下一轮抓取中再重试
判断网页重定向的次数,如果超过最大重定向次数,就输出其CrawlDatum,将其状态设置成STATUS_FETCH_GONE
这里有一些细节没有说明,如网页被重定向以后如果操作,相应的协议是如果产生的,这个是通过插件产生的,具体插件是怎么调用的,这里就不说了,以后有机会会再分析一下。
2.2 下面分析FetcherThread中的另外一个比较重要的方法,就是output
具体这个output大概做了如下几件事:判断抓取的content是否为空,如果不为空,那调用相应的解析插件来对其内容进行解析,然后就是设置当前url所对应的CrawlDatum的一些参数,如当前内容的MD5码,分数等信息
然后就是使用FetchOutputFormat输出当前url的CrawlDatum,Content和解析的结果ParseResult
下面分析一下FetcherOutputFormat中所使用到的ParseOutputFormat.RecordWriter
在生成相应的ParseOutputFormat的RecordWriter过程中,这个RecordWriter会再生成三个RecordWriter来写出parse_text(MapFile),parse_data(MapFile)和crawl_parse(SequenceFile),我们在segments下具体的segment中看到的三个这样的目录就是这个对象生成的,分别输出了网页的源代码;网页的解析数据,如网页title、外链接、元数据、状态等信息,这里会对外链接进行过滤、规格化,并且用插件计算每一个外链接的初始分数;另一个是网页解析后的CrawlDatum对象,这里会分析当前CrawlDatum中的metadata,从中生成两种新的CrawlDatum,还有就是它会对外链接生成相应的CrawlDatum,放入crawl_parse目录中,这里我还没有看明白。
3. 总结
有点晕了,这里的代码有点复杂,我们来整理一下思路。3.1 从目录生成的角度
从Generate后会在segments目录下生成一些要抓取的具体的segment,这里每一个segment下会有一个叫crawl_generate的目录,其中放着要抓取CrawlDatum信息在Fetch的时候,会输出另外五个目录
content: 这个目录只有在配置了要输出抓取内容时才会输出
crawl_fetch: 这个目录是输出抓取成功后的CrawlDatum信息,这里是对原来crawl_generate目录中的信息进行了一些修改,下面三个目录只有配置了解析参数后才会输出,如果后面调用bin/nutch parse命令
parse_text: 这个目录存放了抓取的网页内容,以提后面建立索引用
parse_data: 这里存入了网页解析后的一些数据,如网页title,外链接信息等
crawl_parse: 这里存储了一些新生成的CrawlDatum信息,如外链接等,以供下一次迭代抓取使用
3.2 从数据流的角度
Generate生成的CrawlDatum数据首先经过QueueFeeder生产者,放入共享队列多个消费者(FetcherThread)从共享队列中取得要抓取的FetchItem数据
对FetchItem所对应的url进行抓取,得到相应的抓取内容,对抓取的状态进行判断,回调相应的操作
对抓取的内容进行解析,产生网页的外链接,生成新的CrawlDatum抓取数据,产生解析后的数据
调用FetcherOutputFormat.Writer对象,把CrawlDatum,Content,ParseResult分别写入crawl_fetch,content,(parse_text,parse_data,crawl_parse)目录中
好了,Fetcher的分析也差不多了,可能有一些细节还没有分析到,下面有机会再补上吧。
相关文章推荐
- Apache Nutch 1.3 学习笔记五(FetchThread)
- Nutch 1.3 学习笔记 7 CrawlDb - updatedb
- Nutch 1.3 学习笔记 4 Generate
- Nutch 1.3 学习笔记 8 LinkDb
- Nutch 1.3 学习笔记 4 Generate
- Nutch 1.3 学习笔记
- Apache Nutch 1.3 学习笔记一
- Apache Nutch 1.3 学习笔记五(Fetcher流程)
- Nutch 1.3 学习笔记 4-1 SegmentReader分析
- Nutch 1.3 学习笔记 9 SolrIndexer
- Nutch 1.3 学习笔记1
- Apache Nutch 1.3 学习笔记二
- Apache Nutch 1.3 学习笔记七(CrawlDb - updatedb)
- Nutch 1.3 学习笔记 5 Fetcher
- Nutch 1.3 学习笔记3-1 Inject CrawlDB Reader
- Nutch 1.3 学习笔记2
- Nutch 1.3 学习笔记 外传 扩展Nutch插件实现自定义索引字段
- Apache Nutch 1.3 学习笔记三(Inject)
- Apache Nutch 1.3 学习笔记六(ParseSegment)
- Nutch 1.3 学习笔记 5-1 FetchThread