您的位置:首页 > 产品设计 > UI/UE

Nutch1.7源码再研究之---11 Fetch中的QueueFeeder线程代码分析

2014-10-16 00:00 337 查看
下面我们开始分析Fetch中的QueueFeeder线程部分。

--------------------------------------------------------------------------------------

先看初始化的代码:

private RecordReader<Text, CrawlDatum> reader;

private FetchItemQueues queues;

private int size;

private long timelimit = -1;

public QueueFeeder(RecordReader<Text, CrawlDatum> reader,

FetchItemQueues queues, int size) {

this.reader = reader; //Hadoop中的记录reader

this.queues = queues; //之前构造的FetchItemQueues

this.size = size; //命令行中设置的线程个数*一个参数。

this.setDaemon(true);

this.setName("QueueFeeder");

}

--------------------------------------------------------然后开始是线程的运行部分!

boolean hasMore = true;

int cnt = 0;

int timelimitcount = 0;

这是一些初始化局部变量,后续会使用。

-----------------------------------------------------------------------------------

接下来的是一个循环

while (hasMore) {

然后是校验是否超时

if (System.currentTimeMillis() >= timelimit && timelimit != -1) {

// enough .. lets' simply

// read all the entries from the input without processing

// them

try {

Text url = new Text();

CrawlDatum datum = new CrawlDatum();

hasMore = reader.next(url, datum);

timelimitcount++;

} catch (IOException e) {

LOG.error("QueueFeeder error reading input, record "

+ cnt, e);

return;

}

continue;

}

由于默认timeLimit值是-1,所以这段代码略过。

-------------------------------------------------------------------------------------------------

接下来是计算队列里可以再放多少item.

计算公式:

int feed = size - queues.getTotalSize();

这里的size=命令行里的线程个数 threadCount * queueDepthMuliplier

queueDepthMuliplier的值来自于:
int queueDepthMuliplier = getConf().getInt("fetcher.queue.depth.multiplier", 50);

配置文件中的配置项为:<property>
<name>fetcher.queue.depth.multiplier</name>
<value>50</value>
<description>(EXPERT)The fetcher buffers the incoming URLs into queues based on the [host|domain|IP]
(see param fetcher.queue.mode). The depth of the queue is the number of threads times the value of this parameter.
A large value requires more memory but can improve the performance of the fetch when the order of the URLS in the fetch list
is not optimal.
</description>
</property>

所以,这里计算int feed = size - queues.getTotalSize();

的意思就是:

队列里一次性最多存放size个item.如果超过了就停止填充,等待消费者消费。

-------------------------接下来的代码验证了这一点。

if (feed <= 0) {

// queues are full - spin-wait until they have some free

// space

try {

Thread.sleep(1000);

} catch (Exception e) {

};

continue;

}

实际上就是休眠1秒钟,然后再重新查询是否队列为满,满--->继续休眠;不满--->往下执行继续填充。

-----------------------------下面看看填充代码。

while (feed > 0 && hasMore) //尽量填充feed个Item...

{

try

{

Text url = new Text();

CrawlDatum datum = new CrawlDatum();

hasMore = reader.next(url, datum);//取出一个<url,datum>

if (hasMore) {//如果存在

queues.addFetchItem(url, datum);//加入到队列中

cnt++;//计数器自增

feed--;//可填充的个数自减

}

}

catch (IOException e)

{

LOG.error(

"QueueFeeder error reading input, record "

+ cnt, e);

return;

}

}

通过代码可知,如果没有记录了

hasMore = reader.next(url, datum);

就会为false,整个线程也就退出了。

至此,我们顺利的把QueueFeeder线程的代码分析完毕,难度不是很大!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Nutch Fetch QueueFeeder