zoukankan      html  css  js  c++  java
  • 使用logstash配合jdbc全量导出mysql数据到elasticsearch

    网站搜索要上elasticsearch,需要把mysql库中原有的数据全量导出到elaticsearch,再用canal消费msyqlbinlog把增量数据实时同步到elaticsearch即可

    用容器化的logstash配合jdbc全量导出mysql到elaticsearch

    参考链接1
    参考链接2
    参考链接3

    本例以docker-compose文件来运行,不想在服务器上安装任何logstash的东西,需要插件可以自己基于logstash官方镜像制作额外插件的自己镜像。

    参考elastic官方文档,jdbc input plugin 从logstash 5.X版本后现在已经内置为JDBC Integration Plugin的一部分,无需手动安装该插件,如果不确定是否安装该插件可以执行logstash-plugin list查看

    jdbc插件github
    官方使用文档

    下载jdbc驱动

    通过阿里镜像网站搜索特定版本的jdbc驱动上传至服务器即可。

    logstash配置文件如下

    $ cat jdbc.conf
    # 输入部分
    input {
      stdin {}
      jdbc {
        # mysql数据库驱动
        jdbc_driver_library => "/opt/mysql-connector-java-8.0.20.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        # mysql数据库链接,数据库名,tinyInt1isBit=false参数禁止jdbc将tinyint(1)转为boolean类型
        jdbc_connection_string => "jdbc:mysql://rm-***.mysql.rds.aliyuncs.com:3306/bbt?tinyInt1isBit=false"
        # mysql数据库用户名,密码
        jdbc_user => "***"
        jdbc_password => "***"
        # 设置监听间隔  各字段含义(分、时、天、月、年),全部为*默认含义为每分钟更新一次
        schedule => "* * * * *"
        # 分页
        jdbc_paging_enabled => true
        # 分页大小
        jdbc_page_size => "1000"
        # sql语句执行文件,也可直接使用 statement => 'select * from t_school_archives_fold create_time >= :sql_last_value order by create_time limit 200000'
        # statement => "SELECT * FROM tb1 "
        statement_filepath => "/opt/jdbc.sql"
        # elasticsearch索引类型名,logstash7.0+ deprecated
        #type => "student"
        use_column_value => true
        tracking_column => "tid"
        # record_last_run上次数据存放位置;
        last_run_metadata_path => "/tmp/last_id.txt"
      }
    }
    
    # 过滤部分(不是必须项)
    filter {
        mutate {
            #过滤不需要的字段值
            remove_field => ["@version", "@timestamp"]
        }
    }
    
    # 输出部分
    output {
        elasticsearch {
            # elasticsearch索引名
            index => "forum_post"
            # 使用input中的type作为elasticsearch索引下的类型名,logstash 7.0+ deprecated
            #document_type => "%{type}"   # <- use the type from each input
            # elasticsearch的ip和端口号
            hosts => "es-cn-***.elasticsearch.aliyuncs.com:9200"
    	user => "elastic"
    	password => "XXX"
            # 同步mysql中数据tid作为elasticsearch中文档id
            document_id => "%{tid}"
        }
        stdout {
            codec => json_lines
        }
    }
    

    jdbc.sql文件内容如下

    $ cat ./jdbc.sql
    select a.tid,a.fid,a.pid,a.first,a.invisible,a.authorid,a.dateline,a.replycredit as p_replycredit,a.new_position,a.reply_count,a.attachment as p_attachment,a.official_response as p_official_response,a.status as p_status,a.author,a.tags,a.message,t.subject,t.price,t.views,t.replies,t.displayorder,t.lastpost,t.digest,t.special,t.attachment as t_attachment,t.closed,t.stickreply,t.heats,t.status as t_status,t.favtimes,t.replycredit as t_replycredit,t.official_response as t_official_response,t.lastposter from pre_forum_post as a left join pre_forum_thread as t on a.tid = t.tid where a.first = 1 and a.tid> :sql_last_value and a.invisible in(-2,0)  order by a.tid
    

    docker-compose文件如下

    本例采用阿里云的elaticsearch和kibana,就不开启了

    $ cat docker-compose.yml
    version: '3'
    services:
      logstash:
        image: logstash:7.6.0
        container_name: logstash
        volumes:
          - ./mysql-connector-java-8.0.20.jar:/opt/mysql-connector-java-8.0.20.jar
          - ./jdbc.sql:/opt/jdbc.sql
          - ./jdbc.conf:/usr/share/logstash/pipeline/logstash.conf
        logging:
          driver: json-file
          options:
            max-size: '100m'
            max-file: '2'
      #elasticsearch:
      #  image: elasticsearch
      #  restart: always
      #  container_name: elasticsearch
      #  environment :
      #    - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
      #  ports:
      #    - "9200:9200"
      #    - "9300:9300"
      #kibana:
      #  image: kibana
      #  environment:
      #    - ELASTICSEARCH_URL=http://elasticsearch:9200
      #    #- ELASTICSEARCH_URL=http://elastic:9200
      #  container_name: kibana
      #  hostname: kibana
      #  restart: always
      #  links:
      #    - elasticsearch
      #  depends_on:
      #    - elasticsearch
      #  ports:
      #    - "5601:5601"
    

    启动docker-compose

    docker-compose up -d
    

    等待1min左右,去kibana查看是否有新的index生成

  • 相关阅读:
    当你输入一个网址的时候,实际会发生什么?
    HTTP响应报文与工作原理详解
    DNS系统的解析原理
    spark-streaming集成Kafka处理实时数据
    python分布式环境下的限流器
    使用spark与MySQL进行数据交互的方法
    Linux Redis集群搭建与集群客户端实现
    commons-pool与commons-pool2连接池(Hadoop连接池)
    Kazoo Python Zookeeper 选主
    SpringMVC拦截器Interceptor
  • 原文地址:https://www.cnblogs.com/johnsonjie/p/12985626.html
Copyright © 2011-2022 走看看