DocBuilder类的简要代码如下,通过传入context参数到EntityProcessorWrapper类的构造函数,然后执行EntityProcessorWrapper的init()方法初始化数据源;
然后调用EntityProcessorWrapper的相关方法获取数据(全部导入或增量导入数据)
public class DocBuilder {
private void doFullDump() {
buildDocument(getVariableResolver(), null, null, root, true, null);
}
private void doDelta() {
Set<Map<String, Object>> deletedKeys = new HashSet<Map<String, Object>>();
Set<Map<String, Object>> allPks = collectDelta(root, resolver, deletedKeys);
buildDocument(vri, null, map, root, true, null);
}
private void buildDocument(VariableResolverImpl vr, DocWrapper doc, Map<String, Object> pk, DataConfig.Entity entity, boolean isRoot, ContextImpl parentCtx) {
buildDocument(vr, doc, pk, entity, isRoot, parentCtx, entitiesToDestroy);
}
private void buildDocument(VariableResolverImpl vr, DocWrapper doc, Map<String, Object> pk, DataConfig.Entity entity, boolean isRoot, ContextImpl parentCtx, List<EntityProcessorWrapper> entitiesToDestroy) {
EntityProcessorWrapper entityProcessor = getEntityProcessor(entity);
ContextImpl ctx = new ContextImpl(entity, vr, null,
pk == null ? Context.FULL_DUMP : Context.DELTA_DUMP,
session, parentCtx, this);
entityProcessor.init(ctx);
//Map<String, Object> arow = entityProcessor.nextRow();
//其他代码略
}
private EntityProcessorWrapper getEntityProcessor(DataConfig.Entity entity) {
EntityProcessor entityProcessor = new SqlEntityProcessor();
return entity.processor = new EntityProcessorWrapper(entityProcessor, this);
}
DataImporter dataImporter;
public static final String LAST_INDEX_TIME = "last_index_time";
public static final String INDEX_START_TIME = "index_start_time";
public DocBuilder(DataImporter dataImporter, SolrWriter solrWriter, DIHPropertiesWriter propWriter, DataImporter.RequestParams reqParams) {
this.dataImporter = dataImporter;
}
public void execute() {
doDelta();
doFullDump();
}
public Set<Map<String, Object>> collectDelta(DataConfig.Entity entity, VariableResolverImpl resolver, Set<Map<String, Object>> deletedRows) {
//someone called abort
EntityProcessor entityProcessor = getEntityProcessor(entity);
ContextImpl context1 = new ContextImpl(entity, resolver, null, Context.FIND_DELTA, session, null, this);
entityProcessor.init(context1);
Set<Map<String, Object>> myModifiedPks = new HashSet<Map<String, Object>>();
//Map<String, Object> row = entityProcessor.nextModifiedRowKey();
return myModifiedPks;
}
}
private void doFullDump() {
buildDocument(getVariableResolver(), null, null, root, true, null);
}
private void doDelta() {
Set<Map<String, Object>> deletedKeys = new HashSet<Map<String, Object>>();
Set<Map<String, Object>> allPks = collectDelta(root, resolver, deletedKeys);
buildDocument(vri, null, map, root, true, null);
}
private void buildDocument(VariableResolverImpl vr, DocWrapper doc, Map<String, Object> pk, DataConfig.Entity entity, boolean isRoot, ContextImpl parentCtx) {
buildDocument(vr, doc, pk, entity, isRoot, parentCtx, entitiesToDestroy);
}
private void buildDocument(VariableResolverImpl vr, DocWrapper doc, Map<String, Object> pk, DataConfig.Entity entity, boolean isRoot, ContextImpl parentCtx, List<EntityProcessorWrapper> entitiesToDestroy) {
EntityProcessorWrapper entityProcessor = getEntityProcessor(entity);
ContextImpl ctx = new ContextImpl(entity, vr, null,
pk == null ? Context.FULL_DUMP : Context.DELTA_DUMP,
session, parentCtx, this);
entityProcessor.init(ctx);
//Map<String, Object> arow = entityProcessor.nextRow();
//其他代码略
}
private EntityProcessorWrapper getEntityProcessor(DataConfig.Entity entity) {
EntityProcessor entityProcessor = new SqlEntityProcessor();
return entity.processor = new EntityProcessorWrapper(entityProcessor, this);
}
DataImporter dataImporter;
public static final String LAST_INDEX_TIME = "last_index_time";
public static final String INDEX_START_TIME = "index_start_time";
public DocBuilder(DataImporter dataImporter, SolrWriter solrWriter, DIHPropertiesWriter propWriter, DataImporter.RequestParams reqParams) {
this.dataImporter = dataImporter;
}
public void execute() {
doDelta();
doFullDump();
}
public Set<Map<String, Object>> collectDelta(DataConfig.Entity entity, VariableResolverImpl resolver, Set<Map<String, Object>> deletedRows) {
//someone called abort
EntityProcessor entityProcessor = getEntityProcessor(entity);
ContextImpl context1 = new ContextImpl(entity, resolver, null, Context.FIND_DELTA, session, null, this);
entityProcessor.init(context1);
Set<Map<String, Object>> myModifiedPks = new HashSet<Map<String, Object>>();
//Map<String, Object> row = entityProcessor.nextModifiedRowKey();
return myModifiedPks;
}
}
相关类图如下