您的位置:首页 > 大数据 > 人工智能

solr dataimport 数据导入源码分析(六)

2012-09-09 23:17 573 查看
DocBuilder类的简要代码如下,通过传入context参数到EntityProcessorWrapper类的构造函数,然后执行EntityProcessorWrapper的init()方法初始化数据源;
然后调用EntityProcessorWrapper的相关方法获取数据(全部导入或增量导入数据)

public class DocBuilder {

private void doFullDump() {

buildDocument(getVariableResolver(), null, null, root, true, null);

}

private void doDelta() {

Set<Map<String, Object>> deletedKeys = new HashSet<Map<String, Object>>();

Set<Map<String, Object>> allPks = collectDelta(root, resolver, deletedKeys);

buildDocument(vri, null, map, root, true, null);

}

private void buildDocument(VariableResolverImpl vr, DocWrapper doc, Map<String, Object> pk, DataConfig.Entity entity, boolean isRoot, ContextImpl parentCtx) {

buildDocument(vr, doc, pk, entity, isRoot, parentCtx, entitiesToDestroy);

}

private void buildDocument(VariableResolverImpl vr, DocWrapper doc, Map<String, Object> pk, DataConfig.Entity entity, boolean isRoot, ContextImpl parentCtx, List<EntityProcessorWrapper> entitiesToDestroy) {

EntityProcessorWrapper entityProcessor = getEntityProcessor(entity);

ContextImpl ctx = new ContextImpl(entity, vr, null,

pk == null ? Context.FULL_DUMP : Context.DELTA_DUMP,

session, parentCtx, this);

entityProcessor.init(ctx);

//Map<String, Object> arow = entityProcessor.nextRow();

//其他代码略

}

private EntityProcessorWrapper getEntityProcessor(DataConfig.Entity entity) {

EntityProcessor entityProcessor = new SqlEntityProcessor();

return entity.processor = new EntityProcessorWrapper(entityProcessor, this);

}

DataImporter dataImporter;

public static final String LAST_INDEX_TIME = "last_index_time";

public static final String INDEX_START_TIME = "index_start_time";

public DocBuilder(DataImporter dataImporter, SolrWriter solrWriter, DIHPropertiesWriter propWriter, DataImporter.RequestParams reqParams) {

this.dataImporter = dataImporter;

}

public void execute() {

doDelta();

doFullDump();

}

public Set<Map<String, Object>> collectDelta(DataConfig.Entity entity, VariableResolverImpl resolver, Set<Map<String, Object>> deletedRows) {

//someone called abort

EntityProcessor entityProcessor = getEntityProcessor(entity);

ContextImpl context1 = new ContextImpl(entity, resolver, null, Context.FIND_DELTA, session, null, this);

entityProcessor.init(context1);

Set<Map<String, Object>> myModifiedPks = new HashSet<Map<String, Object>>();

//Map<String, Object> row = entityProcessor.nextModifiedRowKey();

return myModifiedPks;

}

}
相关类图如下

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: