zoukankan      html  css  js  c++  java
  • Logstash使用-mysql据同步至elasticsearch

    1.下载

    wget https://artifacts.elastic.co/downloads/logstash/logstash-6.6.0.tar.gz
    
    测试:
    cd /qqc_data/logstash-6.6.0/bin
    [root@localhost bin]# ./logstash -e 'input { stdin { } } output { stdout {} }'
    Sending Logstash logs to /qqc_data/logstash-6.6.0/logs which is now configured via log4j2.properties
    [2020-08-28T14:39:27,286][INFO ][logstash.setting.writabledirectory] Creating directory {:setting=>"path.queue", :path=>"/qqc_data/logstash-6.6.0/data/queue"}
    [2020-08-28T14:39:27,297][INFO ][logstash.setting.writabledirectory] Creating directory {:setting=>"path.dead_letter_queue", :path=>"/qqc_data/logstash-6.6.0/data/dead_letter_queue"}
    [2020-08-28T14:39:27,715][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
    [2020-08-28T14:39:27,730][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"6.6.0"}
    [2020-08-28T14:39:27,761][INFO ][logstash.agent           ] No persistent UUID file found. Generating new UUID {:uuid=>"7da33a24-8145-499d-97ce-e7abac609b77", :path=>"/qqc_data/logstash-6.6.0/data/uuid"}
    [2020-08-28T14:39:33,755][INFO ][logstash.pipeline        ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>3, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
    [2020-08-28T14:39:33,863][INFO ][logstash.pipeline        ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0xb345437 run>"}
    The stdin plugin is now waiting for input:
    [2020-08-28T14:39:33,921][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
    [2020-08-28T14:39:34,175][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
    jjjj
    {
        "@timestamp" => 2020-08-28T06:41:27.621Z,
              "host" => "localhost.localdomain",
          "@version" => "1",
           "message" => "jjjj"
    }
    
    

    2.mysql 数据同步到es

    2.1 依赖安装
    参考:https://www.jianshu.com/p/62433b9c5c96?utm_campaign=maleskine&utm_content=note&utm_medium=seo_notes&utm_source=recommendation
    
    业务:mysql 数据同步到es
    
    相关依赖安装:
    cd /qqc_data/logstash-6.6.0/bin
    
    安装 jdbc 和 elasticsearch 插件
    ./logstash-plugin install logstash-input-jdbc
    ./logstash-plugin install logstash-output-elasticsearch
    
    mysql 驱动:
    wget https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-5.1.49.zip
    
    
    2.2 配置文件
    1、vim test.conf
    input {
     jdbc {
       jdbc_driver_library => "/qqc_data/mysql-connector-java-5.1.49/mysql-connector-java-5.1.49-bin.jar"
       jdbc_driver_class => "com.mysql.jdbc.Driver"
       jdbc_connection_string => "jdbc:mysql://{IP地址}:3306/test"
       jdbc_user => "****"
       jdbc_password => "****"
       schedule => "*/1 * * * *"
       statement => "SELECT * FROM ip_info WHERE modify_time >= :sql_last_value"
       use_column_value => true
       tracking_column_type => "timestamp"
       tracking_column => "modify_time" # 递增字段的名称,这里使用 update_time 这一列,这列的类型是 timestamp
       last_run_metadata_path => "syncpoint_table"  # 同步点文件,这个文件记录了上次的同步点,重启时会读取这个文件,这个文件可以手动修改
     }
    }
    
    output {
     elasticsearch {
       hosts => ["172.30.4.129:9200"]
       # user => ""
       # password => ""
       # document_type => ""
       index => "index_one"
       document_id => "%{id}"  # 导入到 es 中的文档 id,这个需要设置成主键,否则同一条记录更新后在 es 中会出现两条记录,%{id} 表示引用 mysql 表中 id 字段的值
     }
     stdout {
            # JSON格式输出
            codec => json_lines
        }
    }
    
    2、启动:
    bin/logstash -f test.conf
    日志输出:
    [2020-08-28T16:10:29,001][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
    Fri Aug 28 16:11:01 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
    [2020-08-28T16:11:01,551][INFO ][logstash.inputs.jdbc     ] (0.014257s) SELECT version()
    [2020-08-28T16:11:01,611][INFO ][logstash.inputs.jdbc     ] (0.008063s) SELECT * FROM ip_info WHERE modify_time >= '2020-08-28 16:55:15'
    {"id":6,"create_time":"2020-08-28T08:55:06.000Z","modify_time":"2020-08-28T08:55:15.000Z","user_ip":"44.55.678.22","@version":"1","@timestamp":"2020-08-28T08:11:01.677Z","count":3}
    {"id":1,"create_time":"2020-08-25T02:10:03.805Z","modify_time":"2020-08-28T08:58:57.000Z","user_ip":"127.0.0.1.44","@version":"1","@timestamp":"2020-08-28T08:11:01.663Z","count":4}
    
    3、配置文件流程
    通过modify_time字段的变化定时执行sql
    
    2.3 测试
    1、mysql 中数据:
    mysql> SELECT * FROM ip_info;
    +----+--------------+-------+----------------------------+----------------------------+
    | id | user_ip      | count | create_time                | modify_time                |
    +----+--------------+-------+----------------------------+----------------------------+
    |  1 | 127.0.0.1.44 |     4 | 2020-08-25 10:10:03.805568 | 2020-08-28 16:58:57.000000 |
    |  2 | 58.33.77.66  |     2 | 2020-08-25 10:18:47.703727 | 2020-08-25 10:18:58.812388 |
    |  3 | 127.0.0.1    |     1 | 2020-08-26 01:25:53.632885 | 2020-08-26 01:25:53.632885 |
    |  4 | 58.33.77.66  |     3 | 2020-08-26 08:54:52.454664 | 2020-08-26 09:46:20.155353 |
    |  5 | 58.33.77.66  |     6 | 2020-08-27 07:54:49.592302 | 2020-08-27 07:56:21.768304 |
    |  6 | 44.55.678.22 |     3 | 2020-08-28 16:55:06.000000 | 2020-08-28 16:55:15.000000 |
    +----+--------------+-------+----------------------------+----------------------------+
    6 rows in set
    
    2、es 同步到的数据
    GET http://172.30.4.129:9200/index_one/_search
    {
        "took": 5,
        "timed_out": false,
        "_shards": {
            "total": 5,
            "successful": 5,
            "skipped": 0,
            "failed": 0
        },
        "hits": {
            "total": 6,
            "max_score": 1.0,
            "hits": [
                {
                    "_index": "index_one",
                    "_type": "doc",
                    "_id": "5",
                    "_score": 1.0,
                    "_source": {
                        "@version": "1",
                        "user_ip": "58.33.77.66",
                        "@timestamp": "2020-08-28T07:54:00.393Z",
                        "id": 5,
                        "count": 6,
                        "create_time": "2020-08-26T23:54:49.592Z",
                        "modify_time": "2020-08-26T23:56:21.768Z"
                    }
                },
                {
                    "_index": "index_one",
                    "_type": "doc",
                    "_id": "2",
                    "_score": 1.0,
                    "_source": {
                        "@version": "1",
                        "user_ip": "58.33.77.66",
                        "@timestamp": "2020-08-28T07:49:01.654Z",
                        "id": 2,
                        "count": 2,
                        "create_time": "2020-08-25T02:18:47.703Z",
                        "modify_time": "2020-08-25T02:18:58.812Z"
                    }
                },
                {
                    "_index": "index_one",
                    "_type": "doc",
                    "_id": "4",
                    "_score": 1.0,
                    "_source": {
                        "@version": "1",
                        "user_ip": "58.33.77.66",
                        "@timestamp": "2020-08-28T07:49:01.656Z",
                        "id": 4,
                        "count": 3,
                        "create_time": "2020-08-26T00:54:52.454Z",
                        "modify_time": "2020-08-26T01:46:20.155Z"
                    }
                },
                {
                    "_index": "index_one",
                    "_type": "doc",
                    "_id": "6",
                    "_score": 1.0,
                    "_source": {
                        "@version": "1",
                        "user_ip": "44.55.678.22",
                        "@timestamp": "2020-08-28T08:02:00.176Z",
                        "id": 6,
                        "count": 3,
                        "create_time": "2020-08-28T08:55:06.000Z",
                        "modify_time": "2020-08-28T08:55:15.000Z"
                    }
                },
                {
                    "_index": "index_one",
                    "_type": "doc",
                    "_id": "1",
                    "_score": 1.0,
                    "_source": {
                        "@version": "1",
                        "user_ip": "127.0.0.1.44",
                        "@timestamp": "2020-08-28T08:02:00.175Z",
                        "id": 1,
                        "count": 4,
                        "create_time": "2020-08-25T02:10:03.805Z",
                        "modify_time": "2020-08-28T08:58:57.000Z"
                    }
                },
                {
                    "_index": "index_one",
                    "_type": "doc",
                    "_id": "3",
                    "_score": 1.0,
                    "_source": {
                        "@version": "1",
                        "user_ip": "127.0.0.1",
                        "@timestamp": "2020-08-28T07:49:01.655Z",
                        "id": 3,
                        "count": 1,
                        "create_time": "2020-08-25T17:25:53.632Z",
                        "modify_time": "2020-08-25T17:25:53.632Z"
                    }
                }
            ]
        }
    }
    
    
    2.4 多张表同步
    [root@localhost config]# vim /qqc_data/logstash-6.6.0/config/pipelines.yml
    
    添加多个配置文件
    - pipeline.id: ip_info
      path.config: "/qqc_data/logstash-6.6.0/test.conf"
    
    - pipeline.id: user_info
      path.config: "/qqc_data/logstash-6.6.0/test_two.conf"
      
     启动:
     [root@localhost config]# /qqc_data/logstash-6.6.0/bin/logstash
    
    
  • 相关阅读:
    计算机网路基础
    [python基础] python 2与python 3之间的区别 —— 默认中文字符串长
    [python基础] 同时赋值多个变量与变量值交换
    [python基础] python 2与python 3的区别,一个关于对象的未知的坑
    [python基础] python 2与python 3之间的区别 —— 不同数据类型间的运算
    [python基础] 浮点数乘法的误差问题
    关于HTMLTestRunner的中断与实时性问题
    [python自动化] 关于python无法修改全局变量的问题
    关于RFC2544中的Cut-Through和Store-and-Forward模式
    google filament pbr
  • 原文地址:https://www.cnblogs.com/quqinchao/p/13579357.html
Copyright © 2011-2022 走看看