1、deployer基本不需要改动,只需要修改你需要同步的数据库的库表,例如:
canal.instance.filter.regex=test.user
2、adapter中的application.yml配置
server: port: 8083 #可以自己修改监听端口 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_null canal.conf: mode: tcp canalServerHost: uat-datacenter2:11115 ##为deployer中的port batchSize: 500 syncBatchSize: 1000 retries: 0 timeout: accessKey: secretKey: srcDataSources: defaultDS: url: jdbc:mysql://localhost:3306/test?useUnicode=true #修改成自己的mysql地址 username: root #修改自己的账户 password: passwrd #修改自己的密码 canalAdapters: - instance: instance_test_user #需要和deployer中监听的文件夹名一致 groups: - groupId: g1 outerAdapters: - name: hbase properties: hbase.zookeeper.quorum: uat-datacenter2,uat-datacenter1,uat-datacenter3 #zk地址,如果只有一个就填写一个就行,多个,按照我这种填写。 hbase.zookeeper.property.clientPort: 2181 #端口 zookeeper.znode.parent: /hbase # zk中的路径 ,可以通过zk的客户端输入:ls / 查看是否有这个
3、hbase文件夹中的配置文件instance_test_user.yml
dataSourceKey: defaultDS destination: instance_test_user groupId: ##不要填写,如果有g1,请删除,否则启动没有报错,没有数据 hbaseMapping: mode: STRING database: test table: user hbaseTable: TEST.USER # HBase表名 family: CF # 默认统一Family名称 uppercaseQualifier: true # 字段名转大写, 默认为true #etlCondition: "where c_time>={}" commitBatch: 2 # 批量提交的大小 rowKey: id,name # 复合字段rowKey不能和columns中的rowKey重复 columns: # 数据库字段:HBase对应字段 id: ROWKEY LEN:15 name: CF:NAME age: AGE create_time: CREATE_TIME
4、在大数据机器输入Hbase shell
运行 命令:
list #查看有哪些表
然后执行命令:
create 'MYTEST.PERSON', {NAME=>'CF'}
5、启动deployer和adapter
6、在对应监听的mysql表中输入数据,如:
7、查看hbase数据,,如有表示成功了。
scan 'TEST.USER'
结果:
10|100 column=CF:AGE, timestamp=1609903480750, value=101
10|100 column=CF:CREATE_TIME, timestamp=1609903480750, value=2021-01-06 11:23:43
10|100 column=CF:ID, timestamp=1609903480750, value=10
10|100 column=CF:NAME, timestamp=1609903480750, value=100
=========================FAQ====================
1、按照官网配置,没有报错没有数据
解决方法:
那就是不够仔细,我是因为保留了groupId的值,即 groupId: g1
,实际是不需要保留,直接groupId: 就可以
2、将mysql的数据导入hbase,时间多了一个.0
解决方法:
1、下载源码,用idea打开
2、找到HbaseSyncService.java中的convertData2Row方法,修改成标红,即可
private static void convertData2Row(MappingConfig.HbaseMapping hbaseMapping, HRow hRow, Map<String, Object> data) { Map<String, MappingConfig.ColumnItem> columnItems = hbaseMapping.getColumnItems(); int i = 0; for (Map.Entry<String, Object> entry : data.entrySet()) { if (hbaseMapping.getExcludeColumns() != null && hbaseMapping.getExcludeColumns().contains(entry.getKey())) { continue; } if (entry.getValue() != null) { MappingConfig.ColumnItem columnItem = columnItems.get(entry.getKey()); byte[] bytes = typeConvert(columnItem, hbaseMapping, entry.getValue().toString().contains("-")?entry.getValue().toString().replace(".0",""):entry.getValue()); if (columnItem == null) { String familyName = hbaseMapping.getFamily(); String qualifier = entry.getKey(); if (hbaseMapping.isUppercaseQualifier()) { qualifier = qualifier.toUpperCase(); } if (hbaseMapping.getRowKey() == null && i == 0) { hRow.setRowKey(bytes); } else { hRow.addCell(familyName, qualifier, bytes); } } else { if (columnItem.isRowKey()) { if (columnItem.getRowKeyLen() != null && entry.getValue() != null) { if (entry.getValue() instanceof Number) { String v = String.format("%0" + columnItem.getRowKeyLen() + "d", ((Number) entry.getValue()).longValue()); bytes = Bytes.toBytes(v); } else { try { String v = String.format("%0" + columnItem.getRowKeyLen() + "d", Integer.parseInt((String) entry.getValue())); bytes = Bytes.toBytes(v); } catch (Exception e) { logger.error(e.getMessage(), e); } } } hRow.setRowKey(bytes); } else { hRow.addCell(columnItem.getFamily(), columnItem.getQualifier(), bytes); } } } i++; } }
3、打包,然后将 adapter中plugin中的jar备份,然后将打好得包client-adapter.hbase-1.1.4-jar-with-dependencies.jar 放入 adapter中的plugin即可。
3、mysql同步Hbase,获取数据错位
https://hub.fastgit.org/alibaba/canal/issues/2772https://hub.fastgit.org/alibaba/canal/issues/2772