您的位置:首页 > 其它

深入学习Heritrix---解析Frontier(链接工厂)

2009-07-30 16:54 363 查看
Frontier是Heritrix最核心的组成部分之一,也是最复杂的组成部分.它主要功能是为处理链接的线程提供URL,并负责链接处理完成后的一些后续调度操作.并且为了提高效率,它在内部使用了Berkeley DB.本节将对它的内部机理进行详细解剖.

在Heritrix的官方文档上有一个Frontier的例子,虽然很简单,但是它却解释Frontier实现的基本原理.在这里就不讨论,有兴趣的读者可以参考相应文档.但是不得不提它的三个核心方法:

(1)next(int timeout):为处理线程提供一个链接.Heritrix的所有处理线程(ToeThread)都是通过调用该方法获取链接的.

(2)schedule(CandidateURI caURI):调度待处理的链接.

(3)finished(CrawlURI cURI):完成一个已处理的链接.

整体结构如下:



BdbMultipleWorkQueues:

它是对Berkeley DB的简单封装.在内部有一个Berkeley Database,存放所有待处理的链接.





Code



package
org.archive.crawler.frontier;



public

class
BdbMultipleWorkQueues



{



//
存放所有待处理的URL的数据库



private
Database pendingUrisDB
=

null
;







//
由key获取一个链接



public
CrawlURI get(DatabaseEntry headKey)



throws
DatabaseException
{



DatabaseEntry result
=

new
DatabaseEntry();





//
From Linda Lee of sleepycat:



//
"You want to check the status returned from Cursor.getSearchKeyRange



//
to make sure that you have OperationStatus.SUCCESS. In that case,



//
you have found a valid data record, and result.getData()



//
(called by internally by the binding code, in this case) will be



//
non-null. The other possible status return is



//
OperationStatus.NOTFOUND, in which case no data record matched



//
the criteria. "



//
由key获取相应的链接



OperationStatus status
=
getNextNearestItem(headKey, result);



CrawlURI retVal
=

null
;



if
(status
!=
OperationStatus.SUCCESS)
{



LOGGER.severe(
"
See '1219854 NPE je-2.0
"



+

"
entryToObject


'. OperationStatus
"



+

"
was not SUCCESS:
"



+
status



+

"
, headKey
"



+
BdbWorkQueue.getPrefixClassKey(headKey.getData()));



return

null
;



}



try

{



retVal
=
(CrawlURI)crawlUriBinding.entryToObject(result);



}

catch
(RuntimeExceptionWrapper rw)
{



LOGGER.log(



Level.SEVERE,



"
expected object missing in queue
"

+



BdbWorkQueue.getPrefixClassKey(headKey.getData()),



rw);



return

null
;



}



retVal.setHolderKey(headKey);



return
retVal;
//
返回链接



}





//
从等处理列表获取一个链接



protected
OperationStatus getNextNearestItem(DatabaseEntry headKey,



DatabaseEntry result)
throws
DatabaseException
{



Cursor cursor
=

null
;



OperationStatus status;



try

{



//
打开游标



cursor
=

this
.pendingUrisDB.openCursor(
null
,
null
);



//
get cap; headKey at this point should always point to



//
a queue-beginning cap entry (zero-length value)



status
=
cursor.getSearchKey(headKey, result,
null
);



if
(status
!=
OperationStatus.SUCCESS
||
result.getData().length
>

0
)
{



//
cap missing



throw

new
DatabaseException(
"
bdb queue cap missing
"
);



}



//
get next item (real first item of queue)



status
=
cursor.getNext(headKey,result,
null
);



}

finally

{



if
(cursor
!=
null
)
{



cursor.close();



}



}



return
status;



}





/**



* Put the given CrawlURI in at the appropriate place.



* 添加URL到数据库



*
@param
curi



*
@throws
DatabaseException



*/



public

void
put(CrawlURI curi,
boolean
overwriteIfPresent)



throws
DatabaseException
{



DatabaseEntry insertKey
=
(DatabaseEntry)curi.getHolderKey();



if
(insertKey
==

null
)
{



insertKey
=
calculateInsertKey(curi);



curi.setHolderKey(insertKey);



}



DatabaseEntry value
=

new
DatabaseEntry();



crawlUriBinding.objectToEntry(curi, value);



//
Output tally on avg. size if level is FINE or greater.



if
(LOGGER.isLoggable(Level.FINE))
{



tallyAverageEntrySize(curi, value);



}



OperationStatus status;



if
(overwriteIfPresent)
{



//
添加



status
=
pendingUrisDB.put(
null
, insertKey, value);



}

else

{



status
=
pendingUrisDB.putNoOverwrite(
null
, insertKey, value);



}



if
(status
!=
OperationStatus.SUCCESS)
{



LOGGER.severe(
"
failed;
"
+
status
+

"

"
+
curi);



}



}





}

BdbWorkQueue:

代表一个链接队列,该队列中所有的链接都具有相同的键值.它实际上是通过调用BdbMultipleWorkQueues的get方法从等处理链接数据库中取得一个链接的.



Code

package
org.archive.crawler.frontier;

public

class
BdbWorkQueue
extends
WorkQueue

implements
Comparable, Serializabl

{

//
获取一个URL

protected
CrawlURI peekItem(
final
WorkQueueFrontier frontier)

throws
IOException {

/**

* 关键:从BdbFrontier中返回pendingUris

*/

final
BdbMultipleWorkQueues queues
=
((BdbFrontier) frontier)

.getWorkQueues();

DatabaseEntry key
=

new
DatabaseEntry(origin);

CrawlURI curi
=

null
;

int
tries
=

1
;

while
(
true
) {

try
{

//
获取链接

curi
=
queues.get(key);

}
catch
(DatabaseException e) {

LOGGER.log(Level.SEVERE,
"
peekItem failure; retrying
"
,e);

}



return
curi;

}

}

WorkQueueFrontier:

实现了最核心的三个方法.



Code

public
CrawlURI next()

throws
InterruptedException, EndedException {

while
(
true
) {
//
loop left only by explicit return or exception

long
now
=
System.currentTimeMillis();

//
Do common checks for pause, terminate, bandwidth-hold

preNext(now);

synchronized
(readyClassQueues) {

int
activationsNeeded
=
targetSizeForReadyQueues()
-
readyClassQueues.size();

while
(activationsNeeded
>

0

&&

!
inactiveQueues.isEmpty()) {

activateInactiveQueue();

activationsNeeded
--
;

}

}

WorkQueue readyQ
=

null
;

Object key
=
readyClassQueues.poll(DEFAULT_WAIT,TimeUnit.MILLISECONDS);

if
(key
!=

null
) {

readyQ
=
(WorkQueue)
this
.allQueues.get(key);

}

if
(readyQ
!=

null
) {

while
(
true
) {
//
loop left by explicit return or break on empty

CrawlURI curi
=

null
;

synchronized
(readyQ) {

/**
取出一个URL,最终从子类BdbFrontier的

* pendingUris中取出一个链接

*/

curi
=
readyQ.peek(
this
);

if
(curi
!=

null
) {

//
check if curi belongs in different queue

String currentQueueKey
=
getClassKey(curi);

if
(currentQueueKey.equals(curi.getClassKey())) {

//
curi was in right queue, emit

noteAboutToEmit(curi, readyQ);

//
加入正在处理队列中

inProcessQueues.add(readyQ);

return
curi;
//
返回

}

//
URI's assigned queue has changed since it

//
was queued (eg because its IP has become

//
known). Requeue to new queue.

curi.setClassKey(currentQueueKey);

readyQ.dequeue(
this
);
//
出队列

decrementQueuedCount(
1
);

curi.setHolderKey(
null
);

//
curi will be requeued to true queue after lock

//
on readyQ is released, to prevent deadlock

}
else
{

//
readyQ is empty and ready: it's exhausted

//
release held status, allowing any subsequent

//
enqueues to again put queue in ready

readyQ.clearHeld();

break
;

}

}

if
(curi
!=
null
) {

//
complete the requeuing begun earlier

sendToQueue(curi);

}

}

}
else
{

//
ReadyQ key wasn't in all queues: unexpected

if
(key
!=

null
) {

logger.severe(
"
Key
"
+
key
+

"
in readyClassQueues but not allQueues
"
);

}

}

if
(shouldTerminate) {

//
skip subsequent steps if already on last legs

throw

new
EndedException(
"
shouldTerminate is true
"
);

}

if
(inProcessQueues.size()
==
0
) {

//
Nothing was ready or in progress or imminent to wake; ensure

//
any piled-up pending-scheduled URIs are considered

this
.alreadyIncluded.requestFlush();

}

}

}

//
将URL加入待处理队列

public

void
schedule(CandidateURI caUri) {

//
Canonicalization may set forceFetch flag. See

//
#canonicalization(CandidateURI) javadoc for circumstance.

String canon
=
canonicalize(caUri);

if
(caUri.forceFetch()) {

alreadyIncluded.addForce(canon, caUri);

}
else
{

alreadyIncluded.add(canon, caUri);

}

}

BdbFrontier:

继承了WorkQueueFrontier,是Heritrix唯一个具有实际意义的链接工厂.



Code

package
org.archive.crawler.frontier;

public

class
BdbFrontier
extends
WorkQueueFrontier
implements
Serializable

{

/**
所有待抓取的链接
*/

protected

transient
BdbMultipleWorkQueues pendingUris;

//
初始化pendingUris,父类为抽象方法

protected

void
initQueue()
throws
IOException {

try
{

this
.pendingUris
=
createMultipleWorkQueues();

}
catch
(DatabaseException e) {

throw
(IOException)
new
IOException(e.getMessage()).initCause(e);

}

}

private
BdbMultipleWorkQueues createMultipleWorkQueues()

throws
DatabaseException {

return

new
BdbMultipleWorkQueues(
this
.controller.getBdbEnvironment(),

this
.controller.getBdbEnvironment().getClassCatalog(),

this
.controller.isCheckpointRecover());

}

protected
BdbMultipleWorkQueues getWorkQueues() {

return
pendingUris;

}



}

BdbUriUniqFilter:

实际上是一个过滤器,它用来检查一个要进入等待队列的链接是否已经被抓取过.



Code

//
添加URL

protected

boolean
setAdd(CharSequence uri) {

DatabaseEntry key
=

new
DatabaseEntry();

LongBinding.longToEntry(createKey(uri), key);

long
started
=

0
;

OperationStatus status
=

null
;

try
{

if
(logger.isLoggable(Level.INFO)) {

started
=
System.currentTimeMillis();

}

//
添加到数据库

status
=
alreadySeen.putNoOverwrite(
null
, key, ZERO_LENGTH_ENTRY);

if
(logger.isLoggable(Level.INFO)) {

aggregatedLookupTime
+=

(System.currentTimeMillis()
-
started);

}

}
catch
(DatabaseException e) {

logger.severe(e.getMessage());

}

if
(status
==
OperationStatus.SUCCESS) {

count
++
;

if
(logger.isLoggable(Level.INFO)) {

final

int
logAt
=

10000
;

if
(count
>

0

&&
((count
%
logAt)
==

0
)) {

logger.info(
"
Average lookup
"

+

(aggregatedLookupTime
/
logAt)
+

"
ms.
"
);

aggregatedLookupTime
=

0
;

}

}

}

//
如果存在,返回false

if
(status
==
OperationStatus.KEYEXIST) {

return

false
;
//
not added

}
else
{

return

true
;

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: