主要是一个简单的原理说明,具体的后续会从代码层面说明
参考图
poller 接口定义
public interface WarehouseDataPoller {
WarehousePollResult pollRecords(WarehousePollContext warehousePollContext);
WarehousePollResult resumePoll(WarehousePollContext warehousePollContext);
void cleanupPipelineRunResources(WarehousePollContext warehousePollContext);
void cleanupPipelineResources(String pipelineUUID, WarehouseConfig warehouseConfig);
default Path getPipelineRunUnloadDirectory(String pipelineUUID, Long pipelineRunId) {
return ConnectorExecutionConstants.WAREHOUSE_UNLOAD_DIR_PATH.resolve(pipelineUUID).resolve(String.valueOf(pipelineRunId));
}
default Path getPipelineUnloadDirectory(String pipelineUUID) {
return ConnectorExecutionConstants.WAREHOUSE_UNLOAD_DIR_PATH.resolve(pipelineUUID);
}
}
说明
castled 是在原dw 创建了几个不同的table (当然也会放到不同的schema 中)之后定时任务进行同步的时候使用table 的diff 处理(excep 或者类似的sql 实现)
task 部分castled自己写了一个简单的任务调度+quartz 具体处理在jarvis 模块中, 目前有几个问题
- 多app pipeline 使用了同一个dw 源数据处理机制
这个就类似一个典型的数据分发问题,数据的diff 我们是公用一份还是使用独立的,目前从源码上来说castled 是使用了不同的snapshot (对于数据量比较大以及
pipeline 比较多的会潜在造成dw 数据占用比较大),这点hightouch 以及census 都使用了s3 (毕竟是面向云的服务),基于s3进行存储,具体内部细节
diff 的实现暂时是看不到相关介绍的,毕竟属于商业核心了,实际上如果是自己搞一个共享服务的话,可以直接复用此代码块的,比较适合,变动之后大家数据
都进行分发处理的,但是潜在的问题也很突出,如何保证每个应用的数据是可以准确接受到的,这点我们可以基于kafka 类似中间件解决(使用偏移,castled 也使用了代码偏移机制),netflix 的dblog 也使用了类似的技术,反向etl 还目前国外的产品比较多,国内的更多偏向于系统集成,暂时没看到比较合适的产品,而且
大部分在设计的时候都是需要一个唯一主键的(主要是方便外部系统集成的时候如何进行数据操作)
- 默认实现snapshot 算法的性能
日常的dw 中大家一般都会没有主键(dw 中很常见)目前基于except 算的是会有一个很大的性能瓶颈问题(我在一个oracle 中使用类似diff 处理一条数据变动,200多万的数据使用了1m 多)而且如果看census 官当文档的话,他们的diff 算法性能是很不错的,而且这个也是census 吐槽hightouch的地方,官方文档有简单
介绍自己的处理,这点castled 技术上与census 有部分是类似的
参考资料
https://github.com/castledio/castled/blob/main/connectors/src/main/java/io/castled/warehouses/WarehouseDataPoller.java
https://docs.getcensus.com/basics/security-and-privacy