zoukankan      html  css  js  c++  java
  • 使用logstash配合jdbc全量导出mysql数据到elasticsearch

    网站搜索要上elasticsearch,需要把mysql库中原有的数据全量导出到elaticsearch,再用canal消费msyqlbinlog把增量数据实时同步到elaticsearch即可

    用容器化的logstash配合jdbc全量导出mysql到elaticsearch

    参考链接1
    参考链接2
    参考链接3

    本例以docker-compose文件来运行,不想在服务器上安装任何logstash的东西,需要插件可以自己基于logstash官方镜像制作额外插件的自己镜像。

    参考elastic官方文档,jdbc input plugin 从logstash 5.X版本后现在已经内置为JDBC Integration Plugin的一部分,无需手动安装该插件,如果不确定是否安装该插件可以执行logstash-plugin list查看

    jdbc插件github
    官方使用文档

    下载jdbc驱动

    通过阿里镜像网站搜索特定版本的jdbc驱动上传至服务器即可。

    logstash配置文件如下

    $ cat jdbc.conf
    # 输入部分
    input {
      stdin {}
      jdbc {
        # mysql数据库驱动
        jdbc_driver_library => "/opt/mysql-connector-java-8.0.20.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        # mysql数据库链接,数据库名,tinyInt1isBit=false参数禁止jdbc将tinyint(1)转为boolean类型
        jdbc_connection_string => "jdbc:mysql://rm-***.mysql.rds.aliyuncs.com:3306/bbt?tinyInt1isBit=false"
        # mysql数据库用户名,密码
        jdbc_user => "***"
        jdbc_password => "***"
        # 设置监听间隔  各字段含义(分、时、天、月、年),全部为*默认含义为每分钟更新一次
        schedule => "* * * * *"
        # 分页
        jdbc_paging_enabled => true
        # 分页大小
        jdbc_page_size => "1000"
        # sql语句执行文件,也可直接使用 statement => 'select * from t_school_archives_fold create_time >= :sql_last_value order by create_time limit 200000'
        # statement => "SELECT * FROM tb1 "
        statement_filepath => "/opt/jdbc.sql"
        # elasticsearch索引类型名,logstash7.0+ deprecated
        #type => "student"
        use_column_value => true
        tracking_column => "tid"
        # record_last_run上次数据存放位置;
        last_run_metadata_path => "/tmp/last_id.txt"
      }
    }
    
    # 过滤部分(不是必须项)
    filter {
        mutate {
            #过滤不需要的字段值
            remove_field => ["@version", "@timestamp"]
        }
    }
    
    # 输出部分
    output {
        elasticsearch {
            # elasticsearch索引名
            index => "forum_post"
            # 使用input中的type作为elasticsearch索引下的类型名,logstash 7.0+ deprecated
            #document_type => "%{type}"   # <- use the type from each input
            # elasticsearch的ip和端口号
            hosts => "es-cn-***.elasticsearch.aliyuncs.com:9200"
    	user => "elastic"
    	password => "XXX"
            # 同步mysql中数据tid作为elasticsearch中文档id
            document_id => "%{tid}"
        }
        stdout {
            codec => json_lines
        }
    }
    

    jdbc.sql文件内容如下

    $ cat ./jdbc.sql
    select a.tid,a.fid,a.pid,a.first,a.invisible,a.authorid,a.dateline,a.replycredit as p_replycredit,a.new_position,a.reply_count,a.attachment as p_attachment,a.official_response as p_official_response,a.status as p_status,a.author,a.tags,a.message,t.subject,t.price,t.views,t.replies,t.displayorder,t.lastpost,t.digest,t.special,t.attachment as t_attachment,t.closed,t.stickreply,t.heats,t.status as t_status,t.favtimes,t.replycredit as t_replycredit,t.official_response as t_official_response,t.lastposter from pre_forum_post as a left join pre_forum_thread as t on a.tid = t.tid where a.first = 1 and a.tid> :sql_last_value and a.invisible in(-2,0)  order by a.tid
    

    docker-compose文件如下

    本例采用阿里云的elaticsearch和kibana,就不开启了

    $ cat docker-compose.yml
    version: '3'
    services:
      logstash:
        image: logstash:7.6.0
        container_name: logstash
        volumes:
          - ./mysql-connector-java-8.0.20.jar:/opt/mysql-connector-java-8.0.20.jar
          - ./jdbc.sql:/opt/jdbc.sql
          - ./jdbc.conf:/usr/share/logstash/pipeline/logstash.conf
        logging:
          driver: json-file
          options:
            max-size: '100m'
            max-file: '2'
      #elasticsearch:
      #  image: elasticsearch
      #  restart: always
      #  container_name: elasticsearch
      #  environment :
      #    - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
      #  ports:
      #    - "9200:9200"
      #    - "9300:9300"
      #kibana:
      #  image: kibana
      #  environment:
      #    - ELASTICSEARCH_URL=http://elasticsearch:9200
      #    #- ELASTICSEARCH_URL=http://elastic:9200
      #  container_name: kibana
      #  hostname: kibana
      #  restart: always
      #  links:
      #    - elasticsearch
      #  depends_on:
      #    - elasticsearch
      #  ports:
      #    - "5601:5601"
    

    启动docker-compose

    docker-compose up -d
    

    等待1min左右,去kibana查看是否有新的index生成

  • 相关阅读:
    P1144 最短路计数 题解 最短路应用题
    C++高精度加减乘除模板
    HDU3746 Teacher YYF 题解 KMP算法
    POJ3080 Blue Jeans 题解 KMP算法
    POJ2185 Milking Grid 题解 KMP算法
    POJ2752 Seek the Name, Seek the Fame 题解 KMP算法
    POJ2406 Power Strings 题解 KMP算法
    HDU2087 剪花布条 题解 KMP算法
    eclipse创建maven项目(详细)
    maven的作用及优势
  • 原文地址:https://www.cnblogs.com/johnsonjie/p/12985626.html
Copyright © 2011-2022 走看看