zoukankan      html  css  js  c++  java
  • Logstash使用-mysql据同步至elasticsearch

    1.下载

    wget https://artifacts.elastic.co/downloads/logstash/logstash-6.6.0.tar.gz
    
    测试:
    cd /qqc_data/logstash-6.6.0/bin
    [root@localhost bin]# ./logstash -e 'input { stdin { } } output { stdout {} }'
    Sending Logstash logs to /qqc_data/logstash-6.6.0/logs which is now configured via log4j2.properties
    [2020-08-28T14:39:27,286][INFO ][logstash.setting.writabledirectory] Creating directory {:setting=>"path.queue", :path=>"/qqc_data/logstash-6.6.0/data/queue"}
    [2020-08-28T14:39:27,297][INFO ][logstash.setting.writabledirectory] Creating directory {:setting=>"path.dead_letter_queue", :path=>"/qqc_data/logstash-6.6.0/data/dead_letter_queue"}
    [2020-08-28T14:39:27,715][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
    [2020-08-28T14:39:27,730][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"6.6.0"}
    [2020-08-28T14:39:27,761][INFO ][logstash.agent           ] No persistent UUID file found. Generating new UUID {:uuid=>"7da33a24-8145-499d-97ce-e7abac609b77", :path=>"/qqc_data/logstash-6.6.0/data/uuid"}
    [2020-08-28T14:39:33,755][INFO ][logstash.pipeline        ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>3, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
    [2020-08-28T14:39:33,863][INFO ][logstash.pipeline        ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0xb345437 run>"}
    The stdin plugin is now waiting for input:
    [2020-08-28T14:39:33,921][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
    [2020-08-28T14:39:34,175][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
    jjjj
    {
        "@timestamp" => 2020-08-28T06:41:27.621Z,
              "host" => "localhost.localdomain",
          "@version" => "1",
           "message" => "jjjj"
    }
    
    

    2.mysql 数据同步到es

    2.1 依赖安装
    参考:https://www.jianshu.com/p/62433b9c5c96?utm_campaign=maleskine&utm_content=note&utm_medium=seo_notes&utm_source=recommendation
    
    业务:mysql 数据同步到es
    
    相关依赖安装:
    cd /qqc_data/logstash-6.6.0/bin
    
    安装 jdbc 和 elasticsearch 插件
    ./logstash-plugin install logstash-input-jdbc
    ./logstash-plugin install logstash-output-elasticsearch
    
    mysql 驱动:
    wget https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-5.1.49.zip
    
    
    2.2 配置文件
    1、vim test.conf
    input {
     jdbc {
       jdbc_driver_library => "/qqc_data/mysql-connector-java-5.1.49/mysql-connector-java-5.1.49-bin.jar"
       jdbc_driver_class => "com.mysql.jdbc.Driver"
       jdbc_connection_string => "jdbc:mysql://{IP地址}:3306/test"
       jdbc_user => "****"
       jdbc_password => "****"
       schedule => "*/1 * * * *"
       statement => "SELECT * FROM ip_info WHERE modify_time >= :sql_last_value"
       use_column_value => true
       tracking_column_type => "timestamp"
       tracking_column => "modify_time" # 递增字段的名称,这里使用 update_time 这一列,这列的类型是 timestamp
       last_run_metadata_path => "syncpoint_table"  # 同步点文件,这个文件记录了上次的同步点,重启时会读取这个文件,这个文件可以手动修改
     }
    }
    
    output {
     elasticsearch {
       hosts => ["172.30.4.129:9200"]
       # user => ""
       # password => ""
       # document_type => ""
       index => "index_one"
       document_id => "%{id}"  # 导入到 es 中的文档 id,这个需要设置成主键,否则同一条记录更新后在 es 中会出现两条记录,%{id} 表示引用 mysql 表中 id 字段的值
     }
     stdout {
            # JSON格式输出
            codec => json_lines
        }
    }
    
    2、启动:
    bin/logstash -f test.conf
    日志输出:
    [2020-08-28T16:10:29,001][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
    Fri Aug 28 16:11:01 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
    [2020-08-28T16:11:01,551][INFO ][logstash.inputs.jdbc     ] (0.014257s) SELECT version()
    [2020-08-28T16:11:01,611][INFO ][logstash.inputs.jdbc     ] (0.008063s) SELECT * FROM ip_info WHERE modify_time >= '2020-08-28 16:55:15'
    {"id":6,"create_time":"2020-08-28T08:55:06.000Z","modify_time":"2020-08-28T08:55:15.000Z","user_ip":"44.55.678.22","@version":"1","@timestamp":"2020-08-28T08:11:01.677Z","count":3}
    {"id":1,"create_time":"2020-08-25T02:10:03.805Z","modify_time":"2020-08-28T08:58:57.000Z","user_ip":"127.0.0.1.44","@version":"1","@timestamp":"2020-08-28T08:11:01.663Z","count":4}
    
    3、配置文件流程
    通过modify_time字段的变化定时执行sql
    
    2.3 测试
    1、mysql 中数据:
    mysql> SELECT * FROM ip_info;
    +----+--------------+-------+----------------------------+----------------------------+
    | id | user_ip      | count | create_time                | modify_time                |
    +----+--------------+-------+----------------------------+----------------------------+
    |  1 | 127.0.0.1.44 |     4 | 2020-08-25 10:10:03.805568 | 2020-08-28 16:58:57.000000 |
    |  2 | 58.33.77.66  |     2 | 2020-08-25 10:18:47.703727 | 2020-08-25 10:18:58.812388 |
    |  3 | 127.0.0.1    |     1 | 2020-08-26 01:25:53.632885 | 2020-08-26 01:25:53.632885 |
    |  4 | 58.33.77.66  |     3 | 2020-08-26 08:54:52.454664 | 2020-08-26 09:46:20.155353 |
    |  5 | 58.33.77.66  |     6 | 2020-08-27 07:54:49.592302 | 2020-08-27 07:56:21.768304 |
    |  6 | 44.55.678.22 |     3 | 2020-08-28 16:55:06.000000 | 2020-08-28 16:55:15.000000 |
    +----+--------------+-------+----------------------------+----------------------------+
    6 rows in set
    
    2、es 同步到的数据
    GET http://172.30.4.129:9200/index_one/_search
    {
        "took": 5,
        "timed_out": false,
        "_shards": {
            "total": 5,
            "successful": 5,
            "skipped": 0,
            "failed": 0
        },
        "hits": {
            "total": 6,
            "max_score": 1.0,
            "hits": [
                {
                    "_index": "index_one",
                    "_type": "doc",
                    "_id": "5",
                    "_score": 1.0,
                    "_source": {
                        "@version": "1",
                        "user_ip": "58.33.77.66",
                        "@timestamp": "2020-08-28T07:54:00.393Z",
                        "id": 5,
                        "count": 6,
                        "create_time": "2020-08-26T23:54:49.592Z",
                        "modify_time": "2020-08-26T23:56:21.768Z"
                    }
                },
                {
                    "_index": "index_one",
                    "_type": "doc",
                    "_id": "2",
                    "_score": 1.0,
                    "_source": {
                        "@version": "1",
                        "user_ip": "58.33.77.66",
                        "@timestamp": "2020-08-28T07:49:01.654Z",
                        "id": 2,
                        "count": 2,
                        "create_time": "2020-08-25T02:18:47.703Z",
                        "modify_time": "2020-08-25T02:18:58.812Z"
                    }
                },
                {
                    "_index": "index_one",
                    "_type": "doc",
                    "_id": "4",
                    "_score": 1.0,
                    "_source": {
                        "@version": "1",
                        "user_ip": "58.33.77.66",
                        "@timestamp": "2020-08-28T07:49:01.656Z",
                        "id": 4,
                        "count": 3,
                        "create_time": "2020-08-26T00:54:52.454Z",
                        "modify_time": "2020-08-26T01:46:20.155Z"
                    }
                },
                {
                    "_index": "index_one",
                    "_type": "doc",
                    "_id": "6",
                    "_score": 1.0,
                    "_source": {
                        "@version": "1",
                        "user_ip": "44.55.678.22",
                        "@timestamp": "2020-08-28T08:02:00.176Z",
                        "id": 6,
                        "count": 3,
                        "create_time": "2020-08-28T08:55:06.000Z",
                        "modify_time": "2020-08-28T08:55:15.000Z"
                    }
                },
                {
                    "_index": "index_one",
                    "_type": "doc",
                    "_id": "1",
                    "_score": 1.0,
                    "_source": {
                        "@version": "1",
                        "user_ip": "127.0.0.1.44",
                        "@timestamp": "2020-08-28T08:02:00.175Z",
                        "id": 1,
                        "count": 4,
                        "create_time": "2020-08-25T02:10:03.805Z",
                        "modify_time": "2020-08-28T08:58:57.000Z"
                    }
                },
                {
                    "_index": "index_one",
                    "_type": "doc",
                    "_id": "3",
                    "_score": 1.0,
                    "_source": {
                        "@version": "1",
                        "user_ip": "127.0.0.1",
                        "@timestamp": "2020-08-28T07:49:01.655Z",
                        "id": 3,
                        "count": 1,
                        "create_time": "2020-08-25T17:25:53.632Z",
                        "modify_time": "2020-08-25T17:25:53.632Z"
                    }
                }
            ]
        }
    }
    
    
    2.4 多张表同步
    [root@localhost config]# vim /qqc_data/logstash-6.6.0/config/pipelines.yml
    
    添加多个配置文件
    - pipeline.id: ip_info
      path.config: "/qqc_data/logstash-6.6.0/test.conf"
    
    - pipeline.id: user_info
      path.config: "/qqc_data/logstash-6.6.0/test_two.conf"
      
     启动:
     [root@localhost config]# /qqc_data/logstash-6.6.0/bin/logstash
    
    
  • 相关阅读:
    洛谷 P2330 [SCOI2005]繁忙的都市
    2016-2017 ACM-ICPC, Asia Tsukuba Regional Contest D Hidden Anagrams
    HDU1792A New Change Problem(GCD规律推导)
    HDU1222Wolf and Rabbit(GCD思维)
    poj2635The Embarrassed Cryptographer(同余膜定理)
    poj3270Cow Sorting(置换+贪心)
    计数排序(O(n+k)的排序算法,空间换时间)
    POJ1222EXTENDED LIGHTS OUT(高斯消元)
    BZOJ 2038: [2009国家集训队]小Z的袜子(hose) (莫队算法)
    2301: [HAOI2011]Problem b ( 分块+莫比乌斯反演+容斥)
  • 原文地址:https://www.cnblogs.com/quqinchao/p/13579357.html
Copyright © 2011-2022 走看看