您的位置:首页 > 大数据 > 人工智能

solr dataimport 数据导入源码分析(三)

2012-09-09 18:22 513 查看
在介绍DocBuilder 类之前,我们先来解读数据导入对应实体处理器EntityProcessor,默认的实体处理器为SqlEntityProcessor

EntityProcessor 为抽象类,具体方法由子类实现

package org.apache.solr.handler.dataimport;

import java.util.Map;

public abstract class EntityProcessor {

public abstract void init(Context context);

public abstract Map<String, Object> nextRow();

public abstract Map<String, Object> nextModifiedRowKey();

public abstract Map<String, Object> nextDeletedRowKey();

public abstract Map<String, Object> nextModifiedParentRowKey();

public abstract void destroy();

public void postTransform(Map<String, Object> r) {

}

public void close() {

//no-op

}

EntityProcessorBase继承类封装公用逻辑,其中比较重要的是getNext()方法,用于遍历数据迭代器,供子类调用

protected Iterator<Map<String, Object>> rowIterator;

protected DIHCacheSupport cacheSupport = null;
protected Map<String, Object> getNext() {

if(cacheSupport==null) {

try {

if (rowIterator == null)

return null;

if (rowIterator.hasNext())

return rowIterator.next();

query = null;

rowIterator = null;

return null;

} catch (Exception e) {

SolrException.log(log, "getNext() failed for query '" + query + "'", e);

query = null;

rowIterator = null;

wrapAndThrow(DataImportHandlerException.WARN, e);

return null;

}

} else {

return cacheSupport.getCacheData(context, query, rowIterator);

}

}

真正的数据处理器为SqlEntityProcessor,简要代码如下

package org.apache.solr.handler.dataimport;

import java.util.Iterator;

import java.util.Map;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import java.util.regex.Matcher;

import java.util.regex.Pattern;

public class SqlEntityProcessor extends EntityProcessorBase {

protected DataSource<Iterator<Map<String, Object>>> dataSource;

@Override

@SuppressWarnings("unchecked")

public void init(Context context) {

super.init(context);

dataSource = context.getDataSource();

}

protected void initQuery(String q) {

try {

DataImporter.QUERY_COUNT.get().incrementAndGet();

rowIterator = dataSource.getData(q);

this.query = q;

} catch (DataImportHandlerException e) {

throw e;

} catch (Exception e) {

LOG.error( "The query failed '" + q + "'", e);

throw new DataImportHandlerException(DataImportHandlerException.SEVERE, e);

}

}

@Override

public Map<String, Object> nextRow() {

if (rowIterator == null) {

String q = getQuery();

initQuery(context.replaceTokens(q));

}

return getNext();

}

@Override

public Map<String, Object> nextModifiedRowKey() {

if (rowIterator == null) {

String deltaQuery = context.getEntityAttribute(DELTA_QUERY);

if (deltaQuery == null)

return null;

initQuery(context.replaceTokens(deltaQuery));

}

return getNext();

}

@Override

public Map<String, Object> nextDeletedRowKey() {

if (rowIterator == null) {

String deletedPkQuery = context.getEntityAttribute(DEL_PK_QUERY);

if (deletedPkQuery == null)

return null;

initQuery(context.replaceTokens(deletedPkQuery));

}

return getNext();

}

@Override

public Map<String, Object> nextModifiedParentRowKey() {

if (rowIterator == null) {

String parentDeltaQuery = context.getEntityAttribute(PARENT_DELTA_QUERY);

if (parentDeltaQuery == null)

return null;

LOG.info("Running parentDeltaQuery for Entity: "

+ context.getEntityAttribute("name"));

initQuery(context.replaceTokens(parentDeltaQuery));

}

return getNext();

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: