zoukankan      html  css  js  c++  java
  • Canal同步Mysql数据至Hbase

    1、deployer基本不需要改动,只需要修改你需要同步的数据库的库表,例如:

    canal.instance.filter.regex=test.user

    2、adapter中的application.yml配置

    server:
      port: 8083  #可以自己修改监听端口
    spring:
      jackson:
        date-format: yyyy-MM-dd HH:mm:ss
        time-zone: GMT+8
        default-property-inclusion: non_null
    
    
    canal.conf:
      mode: tcp
      canalServerHost: uat-datacenter2:11115  ##为deployer中的port
      batchSize: 500
      syncBatchSize: 1000
      retries: 0
      timeout:
      accessKey:
      secretKey:
      srcDataSources:
        defaultDS:
          url: jdbc:mysql://localhost:3306/test?useUnicode=true  #修改成自己的mysql地址
          username: root   #修改自己的账户
          password: passwrd  #修改自己的密码
      canalAdapters:
      - instance: instance_test_user  #需要和deployer中监听的文件夹名一致
        groups:
        - groupId: g1
          outerAdapters:
          - name: hbase                                                                            
            properties:
              hbase.zookeeper.quorum: uat-datacenter2,uat-datacenter1,uat-datacenter3  #zk地址,如果只有一个就填写一个就行,多个,按照我这种填写。
              hbase.zookeeper.property.clientPort: 2181  #端口
              zookeeper.znode.parent: /hbase  # zk中的路径 ,可以通过zk的客户端输入:ls / 查看是否有这个

    3、hbase文件夹中的配置文件instance_test_user.yml

    dataSourceKey: defaultDS
    destination: instance_test_user
    groupId:    ##不要填写,如果有g1,请删除,否则启动没有报错,没有数据
    hbaseMapping:
      mode: STRING
      database: test
      table: user
      hbaseTable: TEST.USER   # HBase表名
      family: CF  # 默认统一Family名称
      uppercaseQualifier: true  # 字段名转大写, 默认为true
      #etlCondition: "where c_time>={}"
      commitBatch: 2 # 批量提交的大小
      rowKey: id,name  # 复合字段rowKey不能和columns中的rowKey重复
      columns:
        # 数据库字段:HBase对应字段
        id: ROWKEY LEN:15
        name: CF:NAME
        age: AGE
        create_time: CREATE_TIME

    4、在大数据机器输入Hbase shell 

    运行 命令:
            list #查看有哪些表
            
    然后执行命令:
            create 'MYTEST.PERSON', {NAME=>'CF'}

    5、启动deployer和adapter

    6、在对应监听的mysql表中输入数据,如:

     7、查看hbase数据,,如有表示成功了。

    scan 'TEST.USER'
    
    结果:
     10|100                                          column=CF:AGE, timestamp=1609903480750, value=101                                                                                           
     10|100                                          column=CF:CREATE_TIME, timestamp=1609903480750, value=2021-01-06 11:23:43                                                                   
     10|100                                          column=CF:ID, timestamp=1609903480750, value=10                                                                                             
     10|100                                          column=CF:NAME, timestamp=1609903480750, value=100    

    =========================FAQ====================

    1、按照官网配置,没有报错没有数据

    解决方法:
        那就是不够仔细,我是因为保留了groupId的值,即 groupId:  g1
        
        ,实际是不需要保留,直接groupId: 就可以

    2、将mysql的数据导入hbase,时间多了一个.0

    解决方法:

      1、下载源码,用idea打开

      2、找到HbaseSyncService.java中的convertData2Row方法,修改成标红,即可

        private static void convertData2Row(MappingConfig.HbaseMapping hbaseMapping, HRow hRow, Map<String, Object> data) {
            Map<String, MappingConfig.ColumnItem> columnItems = hbaseMapping.getColumnItems();
            int i = 0;
            for (Map.Entry<String, Object> entry : data.entrySet()) {
                if (hbaseMapping.getExcludeColumns() != null && hbaseMapping.getExcludeColumns().contains(entry.getKey())) {
                    continue;
                }
                if (entry.getValue() != null) {
                    MappingConfig.ColumnItem columnItem = columnItems.get(entry.getKey());
    
                    byte[] bytes = typeConvert(columnItem, hbaseMapping, entry.getValue().toString().contains("-")?entry.getValue().toString().replace(".0",""):entry.getValue());
                    if (columnItem == null) {
                        String familyName = hbaseMapping.getFamily();
                        String qualifier = entry.getKey();
                        if (hbaseMapping.isUppercaseQualifier()) {
                            qualifier = qualifier.toUpperCase();
                        }
    
                        if (hbaseMapping.getRowKey() == null && i == 0) {
                            hRow.setRowKey(bytes);
                        } else {
                            hRow.addCell(familyName, qualifier, bytes);
                        }
                    } else {
                        if (columnItem.isRowKey()) {
                            if (columnItem.getRowKeyLen() != null && entry.getValue() != null) {
                                if (entry.getValue() instanceof Number) {
                                    String v = String.format("%0" + columnItem.getRowKeyLen() + "d",
                                        ((Number) entry.getValue()).longValue());
                                    bytes = Bytes.toBytes(v);
                                } else {
                                    try {
                                        String v = String.format("%0" + columnItem.getRowKeyLen() + "d",
                                            Integer.parseInt((String) entry.getValue()));
                                        bytes = Bytes.toBytes(v);
                                    } catch (Exception e) {
                                        logger.error(e.getMessage(), e);
                                    }
                                }
                            }
                            hRow.setRowKey(bytes);
                        } else {
                            hRow.addCell(columnItem.getFamily(), columnItem.getQualifier(), bytes);
                        }
                    }
                }
                i++;
            }
        }

       3、打包,然后将 adapter中plugin中的jar备份,然后将打好得包client-adapter.hbase-1.1.4-jar-with-dependencies.jar 放入 adapter中的plugin即可。

    3、mysql同步Hbase,获取数据错位

    https://hub.fastgit.org/alibaba/canal/issues/2772https://hub.fastgit.org/alibaba/canal/issues/2772
  • 相关阅读:
    网页背景音乐
    CSS 实现背景半透明
    实战中总结出来的CSS常见问题及解决办法
    AsyncTask的简单使用
    ORMLiteDatabase的简单使用并且与其他的表相互联系
    传递消息--第三方开源--EventBus的简单使用
    通过messenger实现activity与service的相互通信
    通过Messenger与后台连接(单向操作,activity向service发送数据)
    怎么做QQ、微信等消息气泡
    通过bind实现activity与service的交互
  • 原文地址:https://www.cnblogs.com/ywjfx/p/14240323.html
Copyright © 2011-2022 走看看