zoukankan      html  css  js  c++  java
  • 通过canal实现把MySQL数据实时增量到kafka

    说明:我们有一个业务需要把mysql中一些表实时同步到大数据集群hbase上面,我们先通过sqoop把表中数据全量导入到hbase中,然后再通过canal定位的某个binlog的position,来实现增量同步,canal官网提供了java/go接口,直接写入到Kafka,然后通过sparkstreaming实时写入到hbase中

    一. 通过sqoop把mysql表中的数据全量导入到hbase中(需要安装sqoop)

    sqoop import 
    --connect jdbc:mysql://ip:port/database 
    --username username 
    --password password 
    --table user_info 
    --hbase-create-table 
    --hbase-table user_info 
    --hbase-row-key id 
    --column-family order_info

    二. 精确定位到binlog位点,进行启动

    1. 查看当前数据库的binlog日志,在数据库中通过show binary logs 查看

    mysql> show binary logs;
    +------------------+-----------+
    | Log_name         | File_size |
    +------------------+-----------+
    | mysql-bin.001112 |    375374 |
    | mysql-bin.001113 |    366569 |
    | mysql-bin.001114 |    360112 |
    | mysql-bin.001115 |    101198 

    2. 查看当前binlog日志的position(一般最大的binlog以及最大的position)

    show binlog events in 'mysql-bin.001115';

    3. 需要先重启canal服务(如果之前正运行canal)

    cd /usr/local/canal/bin && ./stop.sh && ./startup.sh

    4. 修改zookeeper对应destination里面的配置

    说明:我这边是在配置文件里面配置了zookeeper,如果没有配置,则会在你相应的destination目录下面生成meta.dat文件,也只需修改到对应的binlog和position即可

    1)连接zookeeper

    ./zkCli.sh -server zk-address:2181

    2)查看对应destination的配置(其中test为destination的名称)

    (CONNECTED) 2] get /otter/canal/destinations/test/1001/cursor
    "journalName":"mysqlbin.002908"
    "position":198601951

    3)把上面配置中journalName和position修改为自己需要的binlog日志和偏移量

     (CONNECTED) 3]set /otter/canal/destinations/d_aura_jike/1001/cursor {xxx}

    5. 修改一下对应destination的配置文件(主要是触发使其生效)

    vim /usr/local/canal/conf/test/instance.properties

    6. 通过canal提供的java/go接口,来测试数据的同步(官网有例子https://github.com/alibaba/canal)

  • 相关阅读:
    Linux工具-curl
    常用工具-Postman
    HTTP头部信息
    HTTP状态码
    HTTP/HTTP2协议
    HTTP协议
    常用的服务端口
    三次握手,四次挥手和抓包工具
    路由表
    TCP/IP协议详解
  • 原文地址:https://www.cnblogs.com/654wangzai321/p/10655257.html
Copyright © 2011-2022 走看看