zoukankan      html  css  js  c++  java
  • es根据mysql实时更新数据到组件中

    由于logstash更新不是实时而是采用了轮训的方式,去触法更新,对于实时性要求比较高的场景显然不能满足需求,于是衍生出用canal方案解决实时更新入库es的方案

    方法

    1.安装canal(阿里的开源组件,可以从https://github.com/alibaba/canal/releases找到比较新的版本下载),下载canal.deployer-1.1.4.tar.gz   canal.adapter-1.1.4.tar.gz两个包

    2.由于项目用了7.6.1的es,而阿里canal的比较新版本只支持到了6.5左右,这里解决方案可以,下载阿里canal的source包,D:camalSourcecanal-canal-1.1.4canal-canal-1.1.4client-adapter到这个目录下引入idea工具进行对pom进行调整jar依赖的版本(不同版本代码方法上会有点区别需要进行稍微调整到无错误),进行编译打包mvn package -DskipTests,一旦成功就会得到目录

     这个目录就是支持7.6.1的es的adapter。打包到服务器。

    将相同版本的canal.deployer-1.1.4.tar.gz   canal.adapter-1.1.4.tar.gz(支持新的版本es)上传到服务器解压进行配置,以下是配置过程

    1.mysql开启binlog模式,canal需要用到(具体为什么可以看下架构设计图)

    开启binlog
    server-id = 1
    binlog_format = ROW
    log_bin = mysql_bin
    mysql客户端连接后
    show variables like ‘log_bin’ 查看是否真正开启binlog

    2.创建一个新的cannl给cannal-adapter使用

    CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';

    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'localhost' IDENTIFIED BY 'canal'

    FLUSH PRIVILEGES;

    3.解压canal.deployer用于部署canal
    conf/canal.properties是主配置文件,如其中canal.port用以指定服务端监听的端口
    conf/example/instance.properties是实例的配置文件,主要配置项
    修改instance.properties
    canal.instance.mysql.slaveId=2 不能和serverid冲突
    canal.instance.dbUsername=canal
    canal.instance.dbPassword=canal

    启动bin下的startup.sh 到此canal.deployer启动成功(可以netstat看看11111端口是否成功被监听和log是否正常,观察logs/canal/canal.log以及logs/example/example.log日志

    到此canal.deployer启动完成,接下来启动canal.adapter

    1.调整/opt/canal7/conf/配置文件application.yml(记得缩进和换行要使用空格,变量前面要空格,cannal对这些的读取是非常敏感的,用的时候这个调了不少时间)

    server:
      port: 8081
    spring:
      jackson:
        date-format: yyyy-MM-dd HH:mm:ss
        time-zone: GMT+8
        default-property-inclusion: non_null
    
    canal.conf:
      mode: tcp # kafka rocketMQ
      canalServerHost: 127.0.0.1:11111
    #  zookeeperHosts: slave1:2181
    #  mqServers: 127.0.0.1:9092 #or rocketmq
    #  flatMessage: true
      batchSize: 500
      syncBatchSize: 1000
      retries: 0
      timeout:
      accessKey:
      secretKey:
      srcDataSources:
        defaultDS:
          url: jdbc:mysql://192.168.1.20:3306/dianping?useUnicode=true
          username: canal
          password: canal
      canalAdapters:
      - instance: example # canal instance Name or mq topic name
        groups:
        - groupId: g1
          outerAdapters:
          - name: logger
    #      - name: rdb
    #        key: mysql1
    #        properties:
    #          jdbc.driverClassName: com.mysql.jdbc.Driver
    #          jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
    #          jdbc.username: root
    #          jdbc.password: 121212
    #      - name: rdb
    #        key: oracle1
    #        properties:
    #          jdbc.driverClassName: oracle.jdbc.OracleDriver
    #          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
    #          jdbc.username: mytest
    #          jdbc.password: m121212
    #      - name: rdb
    #        key: postgres1
    #        properties:
    #          jdbc.driverClassName: org.postgresql.Driver
    #          jdbc.url: jdbc:postgresql://localhost:5432/postgres
    #          jdbc.username: postgres
    #          jdbc.password: 121212
    #          threads: 1
    #          commitSize: 3000
    #      - name: hbase
    #        properties:
    #          hbase.zookeeper.quorum: 127.0.0.1
    #          hbase.zookeeper.property.clientPort: 2181
    #          zookeeper.znode.parent: /hbase
          - name: es
            hosts: slave1:9300 # 127.0.0.1:9200 for rest mode
            properties:
    #          mode: transport # or rest
    #          # security.auth: test:123456 #  only used for rest mode
              cluster.name: dianping-app
    

      

    接着进入es的文件夹中进行配置(新建自己的文件比如 shop.yml):

    dataSourceKey: defaultDS
    destination: example
    groupId:
    esMapping:
             _index: shop
             _type: _doc
             _upsert: true
             _id: id
             sql: "select a.id,a.name,a.tags,concat(a.latitude,',',a.longitude) as location,a.remark_score,a.price_per_man,a.category_id,b.name as category_name,a.seller_id,c.remark_score as seller_remark_score,c.disabled_flag as seller_disabled_flag from shop a inner join category b on a.category_id = b.id inner join seller c on c.id=a.seller_id"
             commitBatch: 3000
    

      启动canal.adapter bin下的startup.sh 到/opt/canal7/logs/adapter/adapter.log观察log是否正常完成,如果看到=====> Subscribe destination: example succeed <=============就是成功了

    到此只要你修改了mysql的值,cannal就会马上更新到es中,会自动根据更新条目的id和字段同步到es中(但是索引模型一个字段多处使用显然会有问题,这时候就需要自己用java代码写cannl入库es的过程,这个后面再记录了)

  • 相关阅读:
    QQ企业邮箱+Spring+Javamail+ActiveMQ(发送企业邮件)
    Notepad++使用图解
    Sublime Text 2安装图解
    IDE UltraEdit 图文激活+安装教程
    光猫与普通的家用猫
    通过Java Api与HBase交互(转)
    HBase配置性能调优(转)
    HBase Java API类介绍
    hbase shell基础和常用命令详解(转)
    HBase体系结构(转)
  • 原文地址:https://www.cnblogs.com/yaohaitao/p/12733114.html
Copyright © 2011-2022 走看看