深入学习Heritrix---解析Frontier(链接工厂)
2009-07-30 16:54
363 查看
Frontier是Heritrix最核心的组成部分之一,也是最复杂的组成部分.它主要功能是为处理链接的线程提供URL,并负责链接处理完成后的一些后续调度操作.并且为了提高效率,它在内部使用了Berkeley DB.本节将对它的内部机理进行详细解剖.
在Heritrix的官方文档上有一个Frontier的例子,虽然很简单,但是它却解释Frontier实现的基本原理.在这里就不讨论,有兴趣的读者可以参考相应文档.但是不得不提它的三个核心方法:
(1)next(int timeout):为处理线程提供一个链接.Heritrix的所有处理线程(ToeThread)都是通过调用该方法获取链接的.
(2)schedule(CandidateURI caURI):调度待处理的链接.
(3)finished(CrawlURI cURI):完成一个已处理的链接.
整体结构如下:
BdbMultipleWorkQueues:
它是对Berkeley DB的简单封装.在内部有一个Berkeley Database,存放所有待处理的链接.
Code
package
org.archive.crawler.frontier;
public
class
BdbMultipleWorkQueues
{
//
存放所有待处理的URL的数据库
private
Database pendingUrisDB
=
null
;
//
由key获取一个链接
public
CrawlURI get(DatabaseEntry headKey)
throws
DatabaseException
{
DatabaseEntry result
=
new
DatabaseEntry();
//
From Linda Lee of sleepycat:
//
"You want to check the status returned from Cursor.getSearchKeyRange
//
to make sure that you have OperationStatus.SUCCESS. In that case,
//
you have found a valid data record, and result.getData()
//
(called by internally by the binding code, in this case) will be
//
non-null. The other possible status return is
//
OperationStatus.NOTFOUND, in which case no data record matched
//
the criteria. "
//
由key获取相应的链接
OperationStatus status
=
getNextNearestItem(headKey, result);
CrawlURI retVal
=
null
;
if
(status
!=
OperationStatus.SUCCESS)
{
LOGGER.severe(
"
See '1219854 NPE je-2.0
"
+
"
entryToObject
'. OperationStatus
"
+
"
was not SUCCESS:
"
+
status
+
"
, headKey
"
+
BdbWorkQueue.getPrefixClassKey(headKey.getData()));
return
null
;
}
try
{
retVal
=
(CrawlURI)crawlUriBinding.entryToObject(result);
}
catch
(RuntimeExceptionWrapper rw)
{
LOGGER.log(
Level.SEVERE,
"
expected object missing in queue
"
+
BdbWorkQueue.getPrefixClassKey(headKey.getData()),
rw);
return
null
;
}
retVal.setHolderKey(headKey);
return
retVal;
//
返回链接
}
//
从等处理列表获取一个链接
protected
OperationStatus getNextNearestItem(DatabaseEntry headKey,
DatabaseEntry result)
throws
DatabaseException
{
Cursor cursor
=
null
;
OperationStatus status;
try
{
//
打开游标
cursor
=
this
.pendingUrisDB.openCursor(
null
,
null
);
//
get cap; headKey at this point should always point to
//
a queue-beginning cap entry (zero-length value)
status
=
cursor.getSearchKey(headKey, result,
null
);
if
(status
!=
OperationStatus.SUCCESS
||
result.getData().length
>
0
)
{
//
cap missing
throw
new
DatabaseException(
"
bdb queue cap missing
"
);
}
//
get next item (real first item of queue)
status
=
cursor.getNext(headKey,result,
null
);
}
finally
{
if
(cursor
!=
null
)
{
cursor.close();
}
}
return
status;
}
/**
* Put the given CrawlURI in at the appropriate place.
* 添加URL到数据库
*
@param
curi
*
@throws
DatabaseException
*/
public
void
put(CrawlURI curi,
boolean
overwriteIfPresent)
throws
DatabaseException
{
DatabaseEntry insertKey
=
(DatabaseEntry)curi.getHolderKey();
if
(insertKey
==
null
)
{
insertKey
=
calculateInsertKey(curi);
curi.setHolderKey(insertKey);
}
DatabaseEntry value
=
new
DatabaseEntry();
crawlUriBinding.objectToEntry(curi, value);
//
Output tally on avg. size if level is FINE or greater.
if
(LOGGER.isLoggable(Level.FINE))
{
tallyAverageEntrySize(curi, value);
}
OperationStatus status;
if
(overwriteIfPresent)
{
//
添加
status
=
pendingUrisDB.put(
null
, insertKey, value);
}
else
{
status
=
pendingUrisDB.putNoOverwrite(
null
, insertKey, value);
}
if
(status
!=
OperationStatus.SUCCESS)
{
LOGGER.severe(
"
failed;
"
+
status
+
"
"
+
curi);
}
}
}
BdbWorkQueue:
代表一个链接队列,该队列中所有的链接都具有相同的键值.它实际上是通过调用BdbMultipleWorkQueues的get方法从等处理链接数据库中取得一个链接的.
Code
package
org.archive.crawler.frontier;
public
class
BdbWorkQueue
extends
WorkQueue
implements
Comparable, Serializabl
{
//
获取一个URL
protected
CrawlURI peekItem(
final
WorkQueueFrontier frontier)
throws
IOException {
/**
* 关键:从BdbFrontier中返回pendingUris
*/
final
BdbMultipleWorkQueues queues
=
((BdbFrontier) frontier)
.getWorkQueues();
DatabaseEntry key
=
new
DatabaseEntry(origin);
CrawlURI curi
=
null
;
int
tries
=
1
;
while
(
true
) {
try
{
//
获取链接
curi
=
queues.get(key);
}
catch
(DatabaseException e) {
LOGGER.log(Level.SEVERE,
"
peekItem failure; retrying
"
,e);
}
return
curi;
}
}
WorkQueueFrontier:
实现了最核心的三个方法.
Code
public
CrawlURI next()
throws
InterruptedException, EndedException {
while
(
true
) {
//
loop left only by explicit return or exception
long
now
=
System.currentTimeMillis();
//
Do common checks for pause, terminate, bandwidth-hold
preNext(now);
synchronized
(readyClassQueues) {
int
activationsNeeded
=
targetSizeForReadyQueues()
-
readyClassQueues.size();
while
(activationsNeeded
>
0
&&
!
inactiveQueues.isEmpty()) {
activateInactiveQueue();
activationsNeeded
--
;
}
}
WorkQueue readyQ
=
null
;
Object key
=
readyClassQueues.poll(DEFAULT_WAIT,TimeUnit.MILLISECONDS);
if
(key
!=
null
) {
readyQ
=
(WorkQueue)
this
.allQueues.get(key);
}
if
(readyQ
!=
null
) {
while
(
true
) {
//
loop left by explicit return or break on empty
CrawlURI curi
=
null
;
synchronized
(readyQ) {
/**
取出一个URL,最终从子类BdbFrontier的
* pendingUris中取出一个链接
*/
curi
=
readyQ.peek(
this
);
if
(curi
!=
null
) {
//
check if curi belongs in different queue
String currentQueueKey
=
getClassKey(curi);
if
(currentQueueKey.equals(curi.getClassKey())) {
//
curi was in right queue, emit
noteAboutToEmit(curi, readyQ);
//
加入正在处理队列中
inProcessQueues.add(readyQ);
return
curi;
//
返回
}
//
URI's assigned queue has changed since it
//
was queued (eg because its IP has become
//
known). Requeue to new queue.
curi.setClassKey(currentQueueKey);
readyQ.dequeue(
this
);
//
出队列
decrementQueuedCount(
1
);
curi.setHolderKey(
null
);
//
curi will be requeued to true queue after lock
//
on readyQ is released, to prevent deadlock
}
else
{
//
readyQ is empty and ready: it's exhausted
//
release held status, allowing any subsequent
//
enqueues to again put queue in ready
readyQ.clearHeld();
break
;
}
}
if
(curi
!=
null
) {
//
complete the requeuing begun earlier
sendToQueue(curi);
}
}
}
else
{
//
ReadyQ key wasn't in all queues: unexpected
if
(key
!=
null
) {
logger.severe(
"
Key
"
+
key
+
"
in readyClassQueues but not allQueues
"
);
}
}
if
(shouldTerminate) {
//
skip subsequent steps if already on last legs
throw
new
EndedException(
"
shouldTerminate is true
"
);
}
if
(inProcessQueues.size()
==
0
) {
//
Nothing was ready or in progress or imminent to wake; ensure
//
any piled-up pending-scheduled URIs are considered
this
.alreadyIncluded.requestFlush();
}
}
}
//
将URL加入待处理队列
public
void
schedule(CandidateURI caUri) {
//
Canonicalization may set forceFetch flag. See
//
#canonicalization(CandidateURI) javadoc for circumstance.
String canon
=
canonicalize(caUri);
if
(caUri.forceFetch()) {
alreadyIncluded.addForce(canon, caUri);
}
else
{
alreadyIncluded.add(canon, caUri);
}
}
BdbFrontier:
继承了WorkQueueFrontier,是Heritrix唯一个具有实际意义的链接工厂.
Code
package
org.archive.crawler.frontier;
public
class
BdbFrontier
extends
WorkQueueFrontier
implements
Serializable
{
/**
所有待抓取的链接
*/
protected
transient
BdbMultipleWorkQueues pendingUris;
//
初始化pendingUris,父类为抽象方法
protected
void
initQueue()
throws
IOException {
try
{
this
.pendingUris
=
createMultipleWorkQueues();
}
catch
(DatabaseException e) {
throw
(IOException)
new
IOException(e.getMessage()).initCause(e);
}
}
private
BdbMultipleWorkQueues createMultipleWorkQueues()
throws
DatabaseException {
return
new
BdbMultipleWorkQueues(
this
.controller.getBdbEnvironment(),
this
.controller.getBdbEnvironment().getClassCatalog(),
this
.controller.isCheckpointRecover());
}
protected
BdbMultipleWorkQueues getWorkQueues() {
return
pendingUris;
}
}
BdbUriUniqFilter:
实际上是一个过滤器,它用来检查一个要进入等待队列的链接是否已经被抓取过.
Code
//
添加URL
protected
boolean
setAdd(CharSequence uri) {
DatabaseEntry key
=
new
DatabaseEntry();
LongBinding.longToEntry(createKey(uri), key);
long
started
=
0
;
OperationStatus status
=
null
;
try
{
if
(logger.isLoggable(Level.INFO)) {
started
=
System.currentTimeMillis();
}
//
添加到数据库
status
=
alreadySeen.putNoOverwrite(
null
, key, ZERO_LENGTH_ENTRY);
if
(logger.isLoggable(Level.INFO)) {
aggregatedLookupTime
+=
(System.currentTimeMillis()
-
started);
}
}
catch
(DatabaseException e) {
logger.severe(e.getMessage());
}
if
(status
==
OperationStatus.SUCCESS) {
count
++
;
if
(logger.isLoggable(Level.INFO)) {
final
int
logAt
=
10000
;
if
(count
>
0
&&
((count
%
logAt)
==
0
)) {
logger.info(
"
Average lookup
"
+
(aggregatedLookupTime
/
logAt)
+
"
ms.
"
);
aggregatedLookupTime
=
0
;
}
}
}
//
如果存在,返回false
if
(status
==
OperationStatus.KEYEXIST) {
return
false
;
//
not added
}
else
{
return
true
;
}
}
在Heritrix的官方文档上有一个Frontier的例子,虽然很简单,但是它却解释Frontier实现的基本原理.在这里就不讨论,有兴趣的读者可以参考相应文档.但是不得不提它的三个核心方法:
(1)next(int timeout):为处理线程提供一个链接.Heritrix的所有处理线程(ToeThread)都是通过调用该方法获取链接的.
(2)schedule(CandidateURI caURI):调度待处理的链接.
(3)finished(CrawlURI cURI):完成一个已处理的链接.
整体结构如下:
BdbMultipleWorkQueues:
它是对Berkeley DB的简单封装.在内部有一个Berkeley Database,存放所有待处理的链接.
Code
package
org.archive.crawler.frontier;
public
class
BdbMultipleWorkQueues
{
//
存放所有待处理的URL的数据库
private
Database pendingUrisDB
=
null
;
//
由key获取一个链接
public
CrawlURI get(DatabaseEntry headKey)
throws
DatabaseException
{
DatabaseEntry result
=
new
DatabaseEntry();
//
From Linda Lee of sleepycat:
//
"You want to check the status returned from Cursor.getSearchKeyRange
//
to make sure that you have OperationStatus.SUCCESS. In that case,
//
you have found a valid data record, and result.getData()
//
(called by internally by the binding code, in this case) will be
//
non-null. The other possible status return is
//
OperationStatus.NOTFOUND, in which case no data record matched
//
the criteria. "
//
由key获取相应的链接
OperationStatus status
=
getNextNearestItem(headKey, result);
CrawlURI retVal
=
null
;
if
(status
!=
OperationStatus.SUCCESS)
{
LOGGER.severe(
"
See '1219854 NPE je-2.0
"
+
"
entryToObject
'. OperationStatus
"
+
"
was not SUCCESS:
"
+
status
+
"
, headKey
"
+
BdbWorkQueue.getPrefixClassKey(headKey.getData()));
return
null
;
}
try
{
retVal
=
(CrawlURI)crawlUriBinding.entryToObject(result);
}
catch
(RuntimeExceptionWrapper rw)
{
LOGGER.log(
Level.SEVERE,
"
expected object missing in queue
"
+
BdbWorkQueue.getPrefixClassKey(headKey.getData()),
rw);
return
null
;
}
retVal.setHolderKey(headKey);
return
retVal;
//
返回链接
}
//
从等处理列表获取一个链接
protected
OperationStatus getNextNearestItem(DatabaseEntry headKey,
DatabaseEntry result)
throws
DatabaseException
{
Cursor cursor
=
null
;
OperationStatus status;
try
{
//
打开游标
cursor
=
this
.pendingUrisDB.openCursor(
null
,
null
);
//
get cap; headKey at this point should always point to
//
a queue-beginning cap entry (zero-length value)
status
=
cursor.getSearchKey(headKey, result,
null
);
if
(status
!=
OperationStatus.SUCCESS
||
result.getData().length
>
0
)
{
//
cap missing
throw
new
DatabaseException(
"
bdb queue cap missing
"
);
}
//
get next item (real first item of queue)
status
=
cursor.getNext(headKey,result,
null
);
}
finally
{
if
(cursor
!=
null
)
{
cursor.close();
}
}
return
status;
}
/**
* Put the given CrawlURI in at the appropriate place.
* 添加URL到数据库
*
@param
curi
*
@throws
DatabaseException
*/
public
void
put(CrawlURI curi,
boolean
overwriteIfPresent)
throws
DatabaseException
{
DatabaseEntry insertKey
=
(DatabaseEntry)curi.getHolderKey();
if
(insertKey
==
null
)
{
insertKey
=
calculateInsertKey(curi);
curi.setHolderKey(insertKey);
}
DatabaseEntry value
=
new
DatabaseEntry();
crawlUriBinding.objectToEntry(curi, value);
//
Output tally on avg. size if level is FINE or greater.
if
(LOGGER.isLoggable(Level.FINE))
{
tallyAverageEntrySize(curi, value);
}
OperationStatus status;
if
(overwriteIfPresent)
{
//
添加
status
=
pendingUrisDB.put(
null
, insertKey, value);
}
else
{
status
=
pendingUrisDB.putNoOverwrite(
null
, insertKey, value);
}
if
(status
!=
OperationStatus.SUCCESS)
{
LOGGER.severe(
"
failed;
"
+
status
+
"
"
+
curi);
}
}
}
BdbWorkQueue:
代表一个链接队列,该队列中所有的链接都具有相同的键值.它实际上是通过调用BdbMultipleWorkQueues的get方法从等处理链接数据库中取得一个链接的.
Code
package
org.archive.crawler.frontier;
public
class
BdbWorkQueue
extends
WorkQueue
implements
Comparable, Serializabl
{
//
获取一个URL
protected
CrawlURI peekItem(
final
WorkQueueFrontier frontier)
throws
IOException {
/**
* 关键:从BdbFrontier中返回pendingUris
*/
final
BdbMultipleWorkQueues queues
=
((BdbFrontier) frontier)
.getWorkQueues();
DatabaseEntry key
=
new
DatabaseEntry(origin);
CrawlURI curi
=
null
;
int
tries
=
1
;
while
(
true
) {
try
{
//
获取链接
curi
=
queues.get(key);
}
catch
(DatabaseException e) {
LOGGER.log(Level.SEVERE,
"
peekItem failure; retrying
"
,e);
}
return
curi;
}
}
WorkQueueFrontier:
实现了最核心的三个方法.
Code
public
CrawlURI next()
throws
InterruptedException, EndedException {
while
(
true
) {
//
loop left only by explicit return or exception
long
now
=
System.currentTimeMillis();
//
Do common checks for pause, terminate, bandwidth-hold
preNext(now);
synchronized
(readyClassQueues) {
int
activationsNeeded
=
targetSizeForReadyQueues()
-
readyClassQueues.size();
while
(activationsNeeded
>
0
&&
!
inactiveQueues.isEmpty()) {
activateInactiveQueue();
activationsNeeded
--
;
}
}
WorkQueue readyQ
=
null
;
Object key
=
readyClassQueues.poll(DEFAULT_WAIT,TimeUnit.MILLISECONDS);
if
(key
!=
null
) {
readyQ
=
(WorkQueue)
this
.allQueues.get(key);
}
if
(readyQ
!=
null
) {
while
(
true
) {
//
loop left by explicit return or break on empty
CrawlURI curi
=
null
;
synchronized
(readyQ) {
/**
取出一个URL,最终从子类BdbFrontier的
* pendingUris中取出一个链接
*/
curi
=
readyQ.peek(
this
);
if
(curi
!=
null
) {
//
check if curi belongs in different queue
String currentQueueKey
=
getClassKey(curi);
if
(currentQueueKey.equals(curi.getClassKey())) {
//
curi was in right queue, emit
noteAboutToEmit(curi, readyQ);
//
加入正在处理队列中
inProcessQueues.add(readyQ);
return
curi;
//
返回
}
//
URI's assigned queue has changed since it
//
was queued (eg because its IP has become
//
known). Requeue to new queue.
curi.setClassKey(currentQueueKey);
readyQ.dequeue(
this
);
//
出队列
decrementQueuedCount(
1
);
curi.setHolderKey(
null
);
//
curi will be requeued to true queue after lock
//
on readyQ is released, to prevent deadlock
}
else
{
//
readyQ is empty and ready: it's exhausted
//
release held status, allowing any subsequent
//
enqueues to again put queue in ready
readyQ.clearHeld();
break
;
}
}
if
(curi
!=
null
) {
//
complete the requeuing begun earlier
sendToQueue(curi);
}
}
}
else
{
//
ReadyQ key wasn't in all queues: unexpected
if
(key
!=
null
) {
logger.severe(
"
Key
"
+
key
+
"
in readyClassQueues but not allQueues
"
);
}
}
if
(shouldTerminate) {
//
skip subsequent steps if already on last legs
throw
new
EndedException(
"
shouldTerminate is true
"
);
}
if
(inProcessQueues.size()
==
0
) {
//
Nothing was ready or in progress or imminent to wake; ensure
//
any piled-up pending-scheduled URIs are considered
this
.alreadyIncluded.requestFlush();
}
}
}
//
将URL加入待处理队列
public
void
schedule(CandidateURI caUri) {
//
Canonicalization may set forceFetch flag. See
//
#canonicalization(CandidateURI) javadoc for circumstance.
String canon
=
canonicalize(caUri);
if
(caUri.forceFetch()) {
alreadyIncluded.addForce(canon, caUri);
}
else
{
alreadyIncluded.add(canon, caUri);
}
}
BdbFrontier:
继承了WorkQueueFrontier,是Heritrix唯一个具有实际意义的链接工厂.
Code
package
org.archive.crawler.frontier;
public
class
BdbFrontier
extends
WorkQueueFrontier
implements
Serializable
{
/**
所有待抓取的链接
*/
protected
transient
BdbMultipleWorkQueues pendingUris;
//
初始化pendingUris,父类为抽象方法
protected
void
initQueue()
throws
IOException {
try
{
this
.pendingUris
=
createMultipleWorkQueues();
}
catch
(DatabaseException e) {
throw
(IOException)
new
IOException(e.getMessage()).initCause(e);
}
}
private
BdbMultipleWorkQueues createMultipleWorkQueues()
throws
DatabaseException {
return
new
BdbMultipleWorkQueues(
this
.controller.getBdbEnvironment(),
this
.controller.getBdbEnvironment().getClassCatalog(),
this
.controller.isCheckpointRecover());
}
protected
BdbMultipleWorkQueues getWorkQueues() {
return
pendingUris;
}
}
BdbUriUniqFilter:
实际上是一个过滤器,它用来检查一个要进入等待队列的链接是否已经被抓取过.
Code
//
添加URL
protected
boolean
setAdd(CharSequence uri) {
DatabaseEntry key
=
new
DatabaseEntry();
LongBinding.longToEntry(createKey(uri), key);
long
started
=
0
;
OperationStatus status
=
null
;
try
{
if
(logger.isLoggable(Level.INFO)) {
started
=
System.currentTimeMillis();
}
//
添加到数据库
status
=
alreadySeen.putNoOverwrite(
null
, key, ZERO_LENGTH_ENTRY);
if
(logger.isLoggable(Level.INFO)) {
aggregatedLookupTime
+=
(System.currentTimeMillis()
-
started);
}
}
catch
(DatabaseException e) {
logger.severe(e.getMessage());
}
if
(status
==
OperationStatus.SUCCESS) {
count
++
;
if
(logger.isLoggable(Level.INFO)) {
final
int
logAt
=
10000
;
if
(count
>
0
&&
((count
%
logAt)
==
0
)) {
logger.info(
"
Average lookup
"
+
(aggregatedLookupTime
/
logAt)
+
"
ms.
"
);
aggregatedLookupTime
=
0
;
}
}
}
//
如果存在,返回false
if
(status
==
OperationStatus.KEYEXIST) {
return
false
;
//
not added
}
else
{
return
true
;
}
}
相关文章推荐
- 深入学习Heritrix---解析Frontier(链接工厂)
- 深入学习Heritrix---解析Frontier(链接工厂)
- 深入学习Heritrix---解析Frontier(链接工厂)
- 深入学习Heritrix---解析处理器(Processor)(转)
- 深入学习Heritrix---解析CrawlController
- 深入学习Heritrix---解析处理器(Processor)
- 深入学习Heritrix---解析处理器(Processor)
- heritrix的web应用2(10.2.3 Frontier链接制造工厂 )
- 深入学习Heritrix---解析CrawlController
- 深入学习Heritrix---解析CrawlController
- 深入学习Heritrix---解析CrawlController
- 深入学习Heritrix---解析处理器(Processor)
- 【转】 深入学习Heritrix---解析CrawlController
- 深入学习Heritrix---解析CrawlController(转)
- 深入学习heritrix---体系结构(Overview of the crawler)
- React.js深入学习详细解析
- 《TCP-IP详解 卷2:实现》学习笔记—mbuf的深入解析
- 深入学习Hadoop之第三篇——YARN架构解析
- 链接器解析外部符号的过程 深入理解计算机系统 P475 注:在链接时的注意事项
- Hadoop深入学习:解析HDFS的写资料流程