您的位置:首页 > 其它

深入学习Heritrix---解析Frontier(链接工厂)

2009-11-15 13:57 489 查看
http://caixinbao1.blog.163.com/blog/static/161494162009730115520760/

Frontier是Heritrix最核心的组成部分之一,也是最复杂的组成部分.它主要功能是为处理链接的线程提供URL,并负责链接处理完成后的一些后续调度操作.并且为了提高效率,它在内部使用了Berkeley DB.本节将对它的内部机理进行详细解剖.

在Heritrix的官方文档上有一个Frontier的例子,虽然很简单,但是它却解释Frontier实现的基本原理.在这里就不讨论,有兴趣的读者可以参考相应文档.但是不得不提它的三个核心方法:
(1)next(int timeout):为处理线程提供一个链接.Heritrix的所有处理线程(ToeThread)都是通过调用该方法获取链接的.

(2)schedule(CandidateURI caURI):调度待处理的链接.
(3)finished(CrawlURI cURI):完成一个已处理的链接.
整体结构如下:



BdbMultipleWorkQueues:
它是对Berkeley DB的简单封装.在内部有一个Berkeley Database,存放所有待处理的链接.




Code


package org.archive.crawler.frontier;


public class BdbMultipleWorkQueues


{


//存放所有待处理的URL的数据库


private Database pendingUrisDB = null;






//由key获取一个链接


public CrawlURI get(DatabaseEntry headKey)


throws DatabaseException {


DatabaseEntry result = new DatabaseEntry();




// From Linda Lee of sleepycat:


// "You want to check the status returned from Cursor.getSearchKeyRange


// to make sure that you have OperationStatus.SUCCESS. In that case,


// you have found a valid data record, and result.getData()


// (called by internally by the binding code, in this case) will be


// non-null. The other possible status return is


// OperationStatus.NOTFOUND, in which case no data record matched


// the criteria. "


//由key获取相应的链接


OperationStatus status = getNextNearestItem(headKey, result);


CrawlURI retVal = null;


if (status != OperationStatus.SUCCESS) {


LOGGER.severe("See '1219854 NPE je-2.0 "


+ "entryToObject

'. OperationStatus "


+ " was not SUCCESS: "


+ status


+ ", headKey "


+ BdbWorkQueue.getPrefixClassKey(headKey.getData()));


return null;


}


try {


retVal = (CrawlURI)crawlUriBinding.entryToObject(result);


} catch (RuntimeExceptionWrapper rw) {


LOGGER.log(


Level.SEVERE,


"expected object missing in queue " +


BdbWorkQueue.getPrefixClassKey(headKey.getData()),


rw);


return null;


}


retVal.setHolderKey(headKey);


return retVal;//返回链接


}




//从等处理列表获取一个链接


protected OperationStatus getNextNearestItem(DatabaseEntry headKey,


DatabaseEntry result) throws DatabaseException {


Cursor cursor = null;


OperationStatus status;


try {


//打开游标


cursor = this.pendingUrisDB.openCursor(null, null);


// get cap; headKey at this point should always point to


// a queue-beginning cap entry (zero-length value)


status = cursor.getSearchKey(headKey, result, null);


if(status!=OperationStatus.SUCCESS || result.getData().length > 0) {


// cap missing


throw new DatabaseException("bdb queue cap missing");


}


// get next item (real first item of queue)


status = cursor.getNext(headKey,result,null);


} finally {


if(cursor!=null) {


cursor.close();


}


}


return status;


}




/**


* Put the given CrawlURI in at the appropriate place.


* 添加URL到数据库


* @param curi


* @throws DatabaseException


*/


public void put(CrawlURI curi, boolean overwriteIfPresent)


throws DatabaseException {


DatabaseEntry insertKey = (DatabaseEntry)curi.getHolderKey();


if (insertKey == null) {


insertKey = calculateInsertKey(curi);


curi.setHolderKey(insertKey);


}


DatabaseEntry value = new DatabaseEntry();


crawlUriBinding.objectToEntry(curi, value);


// Output tally on avg. size if level is FINE or greater.


if (LOGGER.isLoggable(Level.FINE)) {


tallyAverageEntrySize(curi, value);


}


OperationStatus status;


if(overwriteIfPresent) {


//添加


status = pendingUrisDB.put(null, insertKey, value);


} else {


status = pendingUrisDB.putNoOverwrite(null, insertKey, value);


}


if(status!=OperationStatus.SUCCESS) {


LOGGER.severe("failed; "+status+ " "+curi);


}


}




}

BdbWorkQueue:
代表一个链接队列,该队列中所有的链接都具有相同的键值.它实际上是通过调用BdbMultipleWorkQueues的get方法从等处理链接数据库中取得一个链接的.


Code
package org.archive.crawler.frontier;

public class BdbWorkQueue extends WorkQueue
implements Comparable, Serializabl
{
//获取一个URL
protected CrawlURI peekItem(final WorkQueueFrontier frontier)
throws IOException {
/**
* 关键:从BdbFrontier中返回pendingUris
*/
final BdbMultipleWorkQueues queues = ((BdbFrontier) frontier)
.getWorkQueues();

DatabaseEntry key = new DatabaseEntry(origin);
CrawlURI curi = null;
int tries = 1;
while(true) {
try {
//获取链接
curi = queues.get(key);
} catch (DatabaseException e) {
LOGGER.log(Level.SEVERE,"peekItem failure; retrying",e);
}



return curi;
}
}
WorkQueueFrontier:
实现了最核心的三个方法.


Code
public CrawlURI next()
throws InterruptedException, EndedException {
while (true) { // loop left only by explicit return or exception
long now = System.currentTimeMillis();

// Do common checks for pause, terminate, bandwidth-hold
preNext(now);

synchronized(readyClassQueues) {
int activationsNeeded = targetSizeForReadyQueues() - readyClassQueues.size();
while(activationsNeeded > 0 && !inactiveQueues.isEmpty()) {
activateInactiveQueue();
activationsNeeded--;
}
}

WorkQueue readyQ = null;
Object key = readyClassQueues.poll(DEFAULT_WAIT,TimeUnit.MILLISECONDS);
if (key != null) {
readyQ = (WorkQueue)this.allQueues.get(key);
}
if (readyQ != null) {
while(true) { // loop left by explicit return or break on empty
CrawlURI curi = null;
synchronized(readyQ) {
/**取出一个URL,最终从子类BdbFrontier的
* pendingUris中取出一个链接
*/
curi = readyQ.peek(this);
if (curi != null) {
// check if curi belongs in different queue
String currentQueueKey = getClassKey(curi);
if (currentQueueKey.equals(curi.getClassKey())) {
// curi was in right queue, emit
noteAboutToEmit(curi, readyQ);
//加入正在处理队列中
inProcessQueues.add(readyQ);
return curi; //返回
}
// URI's assigned queue has changed since it
// was queued (eg because its IP has become
// known). Requeue to new queue.
curi.setClassKey(currentQueueKey);
readyQ.dequeue(this);//出队列
decrementQueuedCount(1);
curi.setHolderKey(null);
// curi will be requeued to true queue after lock
// on readyQ is released, to prevent deadlock
} else {
// readyQ is empty and ready: it's exhausted
// release held status, allowing any subsequent
// enqueues to again put queue in ready
readyQ.clearHeld();
break;
}
}
if(curi!=null) {
// complete the requeuing begun earlier
sendToQueue(curi);
}
}
} else {
// ReadyQ key wasn't in all queues: unexpected
if (key != null) {
logger.severe("Key "+ key +
" in readyClassQueues but not allQueues");
}
}

if(shouldTerminate) {
// skip subsequent steps if already on last legs
throw new EndedException("shouldTerminate is true");
}

if(inProcessQueues.size()==0) {
// Nothing was ready or in progress or imminent to wake; ensure
// any piled-up pending-scheduled URIs are considered
this.alreadyIncluded.requestFlush();
}
}
}

//将URL加入待处理队列
public void schedule(CandidateURI caUri) {
// Canonicalization may set forceFetch flag. See
// #canonicalization(CandidateURI) javadoc for circumstance.
String canon = canonicalize(caUri);
if (caUri.forceFetch()) {
alreadyIncluded.addForce(canon, caUri);
} else {
alreadyIncluded.add(canon, caUri);
}
}

BdbFrontier:
继承了WorkQueueFrontier,是Heritrix唯一个具有实际意义的链接工厂.


Code
package org.archive.crawler.frontier;
public class BdbFrontier extends WorkQueueFrontier implements Serializable
{
/** 所有待抓取的链接*/
protected transient BdbMultipleWorkQueues pendingUris;

//初始化pendingUris,父类为抽象方法
protected void initQueue() throws IOException {
try {
this.pendingUris = createMultipleWorkQueues();
} catch(DatabaseException e) {
throw (IOException)new IOException(e.getMessage()).initCause(e);
}
}

private BdbMultipleWorkQueues createMultipleWorkQueues()
throws DatabaseException {
return new BdbMultipleWorkQueues(this.controller.getBdbEnvironment(),
this.controller.getBdbEnvironment().getClassCatalog(),
this.controller.isCheckpointRecover());
}
protected BdbMultipleWorkQueues getWorkQueues() {
return pendingUris;
}



}

BdbUriUniqFilter:
实际上是一个过滤器,它用来检查一个要进入等待队列的链接是否已经被抓取过.


Code
//添加URL
protected boolean setAdd(CharSequence uri) {
DatabaseEntry key = new DatabaseEntry();
LongBinding.longToEntry(createKey(uri), key);
long started = 0;

OperationStatus status = null;
try {
if (logger.isLoggable(Level.INFO)) {
started = System.currentTimeMillis();
}
//添加到数据库
status = alreadySeen.putNoOverwrite(null, key, ZERO_LENGTH_ENTRY);
if (logger.isLoggable(Level.INFO)) {
aggregatedLookupTime +=
(System.currentTimeMillis() - started);
}
} catch (DatabaseException e) {
logger.severe(e.getMessage());
}
if (status == OperationStatus.SUCCESS) {
count++;
if (logger.isLoggable(Level.INFO)) {
final int logAt = 10000;
if (count > 0 && ((count % logAt) == 0)) {
logger.info("Average lookup " +
(aggregatedLookupTime / logAt) + "ms.");
aggregatedLookupTime = 0;
}
}
}
//如果存在,返回false
if(status == OperationStatus.KEYEXIST) {
return false; // not added
} else {
return true;
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: