下载
参考:https://www.cnblogs.com/LQBlog/p/12177295.html#autoid-3-0-0
源码修改参考:https://www.cnblogs.com/LQBlog/p/14661238.html
zk分布式锁
get /canal-adapter/sync-switch/{canalDestination} on为可获取锁 off为阻塞
配置文件修改
server: port: 8085 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_null canal.conf: mode: tcp # kafka rocketMQ 数据来源TCP则是binlog 其他则增量数据来源是kafka和rocketMQ #canalServerHost: 192.168.20.5:11111 #canal 单机地址和端口 zookeeperHosts: 192.168.20.4:2181 #基于zookeeper集群 batchSize: 500 syncBatchSize: 1000 retries: 0 timeout: accessKey: secretKey: subscribe: merge_test.pro_brand,merge_test.soa_ord_order_summary,merge_test.soa_ord_order,merge_test.soa_ord_order_item #我自己改源码加的防止订阅不关心的消息 srcDataSources: defaultDS: url: jdbc:mysql://127.0.0.1:3306/merge_test?useUnicode=true #数据库 username: kuaihe password: Kuaihe0910Mysql canalAdapters: - instance: kuaihe_db_test # canal instance Name or mq topic name 对应canal创建的文件夹 如果是canal则是订阅指定canal-server instance 配置的的binlog groups: - groupId: g1 outerAdapters: - name: es7.4.0 #适配器名称 SPI扩展点 可参考此配置 对应的key com.alibaba.otter.canal.client.adapter.OuterAdapter hosts: 127.0.0.1.1:9200 # es 集群地址, 逗号分隔 properties: mode: rest # transport # or rest security: auth: elastic:T3StdABAk #es用户名密码
增加es文件夹
我是基于源码自己打包 所以 直接在源码修改打包 启动会自动扫描es 目录下的所有yml
pro_brand.yml例子 我的是单表同步 其他复杂同步 可以看官方文档
dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值 #outerAdapterKey: exampleKey # 对应application.yml中es配置的key destination: kuaihe_db_test # key=value_+_defaultDS对应库名字+_sql对应表 收到canal的binlog增量消息 则通过当前binlog消息的destination+数据库+表 找到当前配置com.alibaba.otter.canal.client.impl.SimpleCanalConnector.getWithoutAck(int, java.lang.Long, java.util.concurrent.TimeUnit) groupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据 esMapping: _index: pro_brand _type: _doc _id: id pk: id sql: " select id,create_timestamp,update_timestamp,`code`,brand_code,brand_mall_idx,delete_flag,depot_id,description,idx,kuaihe_idx,logo,`name`,name_en,prefix,sale_channel,seo_description,seo_keywords,seo_title,`status`,vendor_id from pro_brand " commitBatch: 3000
全量
导的时候记得看日志 有没有报错
post:http://192.168.20.5:8085/etl/es7.4.0/soa_ord_order_item.yml
{ "succeeded": true, "resultMessage": "导入ES 数据:61513 条" }
增量
增量是启动自动触发binlog订阅 可参考源码
com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader
->启动worker
com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterWorker
根据条件增量
我自己改了源码更累灵活
post http://127.0.0.1:8087/etl/es7.4.0/soa_ord_order.yml?params=683071930737168384;2021-04-29 00:00:12;2021-04-29 23:55:12
{ "condition":"where user_id={} and create_timestamp>={} and create_timestamp<={}" }
改动点1 com.alibaba.otter.canal.adapter.launcher.rest.CommonRest#etl
@PostMapping("/etl/{type}/{key}/{task}") public EtlResult etl(@PathVariable String type, @PathVariable String key, @PathVariable String task, @RequestParam(name = "params", required = false) String params,@RequestBody Map<String,String> mapParameters)
try { List<String> paramArray = null; if (params != null) { paramArray = Arrays.asList(params.trim().split(";")); } String condition=mapParameters==null?null:mapParameters.get("condition"); return adapter.etl(task,condition, paramArray); } finally { if (destination != null && oriSwitchStatus) { syncSwitch.on(destination); } else if (destination == null && oriSwitchStatus) { syncSwitch.on(task); } }
改动点2
com.alibaba.otter.canal.client.adapter.es.ESAdapter#etl
@Override public EtlResult etl(String task,String condition, List<String> params) { EtlResult etlResult = new EtlResult(); ESSyncConfig config = esSyncConfig.get(task); if (config != null) { ......... if (dataSource != null) { return esEtlService.importData(params,condition); } ......... } else { ......... EtlResult etlRes = esEtlService.importData(params,condition); .......... } etlResult.setSucceeded(false); etlResult.setErrorMessage("Task not found"); return etlResult; }
改动点3:com.alibaba.otter.canal.client.adapter.support.AbstractEtlService#importData
DruidDataSource dataSource = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey()); List<Object> values = new ArrayList<>(); //liqiangtodo etl支持动态传入condition改动 condition=condition==null?config.getMapping().getEtlCondition():condition; // 拼接条件 if (condition != null && params != null) { String etlCondition =condition; for (String param : params) { etlCondition = etlCondition.replace("{}", "?"); values.add(param); } sql += " " + etlCondition; } if (logger.isDebugEnabled()) { logger.debug("etl sql : {}", sql); } // 获取总数 String countSql = "SELECT COUNT(1) FROM ( " + sql + ") _CNT ";
动态更新配置文件
依赖2表canal_config(存储application配置),canal_adapter_config(存储es目录下的配置) 获取sql脚本
参考:https://www.cnblogs.com/LQBlog/p/14598582.html
配置application.properties
修改的话记得修改修改时间
INSERT INTO `merge_test`.`canal_config`(`id`, `cluster_id`, `server_id`, `name`, `status`, `content`, `content_md5`, `modified_time`) VALUES (2, NULL, NULL, 'application.yml', NULL, 'server: port: 8085 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_null canal.conf: mode: tcp # kafka rocketMQ #canalServerHost: 192.168.20.5:11111 #canal 地址和端口 zookeeperHosts: 192.168.20.4:2181 #基于zookeeper集群 batchSize: 500 syncBatchSize: 1000 retries: 0 timeout: accessKey: secretKey: subscribe: merge_test.pro_brand,merge_test.soa_ord_order_summary,merge_test.soa_ord_order,merge_test.soa_ord_order_item #我自己改源码加的防止订阅不关心的消息 srcDataSources: defaultDS: url: jdbc:mysql://rm-2zeqc826391dt0lk2.mysql.rds.aliyuncs.com:3306/merge_test?useUnicode=true #数据库 username: kuaihe password: Kuaihe0910Mysql canalAdapters: - instance: kuaihe_db_test # canal instance Name or mq topic name 对应canal创建的文件夹 groups: - groupId: g1 outerAdapters: - name: es7.4.0 #适配器名称 hosts: es-cn-st21taqhf0003bw9m.elasticsearch.aliyuncs.com:9200 # es 集群地址, 逗号分隔 properties: mode: rest # transport # or rest security: auth: elastic:T3StdABAk #es用户名密码 ', '', '2021-04-14 15:09:07');
id一定要为2 详情可看源码com.alibaba.otter.canal.adapter.launcher.monitor.remote.DbRemoteConfigLoader#getRemoteAdapterConfig
/** * 获取远程application.yml配置 * * @return 配置对象 */ private ConfigItem getRemoteAdapterConfig() { String sql = "select name, content, modified_time from canal_config where id=2"; try (Connection conn = dataSource.getConnection(); Statement stmt = conn.createStatement(); ResultSet rs = stmt.executeQuery(sql)) { if (rs.next()) { ConfigItem configItem = new ConfigItem(); configItem.setId(2L); configItem.setName(rs.getString("name")); configItem.setContent(rs.getString("content")); configItem.setModifiedTime(rs.getTimestamp("modified_time").getTime()); return configItem; } } catch (Exception e) { logger.error(e.getMessage(), e); } return null; }
配置canal_adapter_config
canal_adapter_config
category为目录路径 name为文件名字 修改 之后 记得修改以下修改时间
修改bootstrap.yml 增加读取配置的数据库
canal: #基于数据库 manager: jdbc: url: jdbc:127.0.0.13306/merge_test?useUnicode=true username: root password: 123456
原理
启动 会读取这2个表的配置然后刷新到项目文件 项目打了@RefreceScore注解 同时会启动一个线程监听改动