zoukankan      html  css  js  c++  java
  • Logstash 安装并把mysql数据同步到elasticsearch

    1. 下载Logstash

    由于我的elasticsearch是6.4.3,索引我logstash下载相同版本

    https://artifacts.elastic.co/downloads/logstash/logstash-6.4.3.tar.gz
    

    下mysql java jar包

    https://dev.mysql.com/downloads/connector/j/5.1.html
    

    通过ftp把两个包上传到linux中

    2.安装Logstash

    2.1 把解压并且移动到对应安装目录

    tar -zxvf logstash-6.4.3.tar.gz
    mv logstash-6.4.3 /usr/local/
    

    2.2 在/usr/local/logstash-6.4.3 创建sync目录

    mkdir sync
    

    2.3 进入到sync目录,并创建同步配置文件

    vim logstash-db-sync.conf
    

    内容如下:

    input {
    	jdbc {
    		# 设置 MySql/MariaDB 数据库url以及数据库名称
    		jdbc_connection_string => "jdbc:mysql://192.168.1.6:3306/foodie-shop-dev?useUnicode=true&characterEncoding=UTF-8&autoReconnect&useSSL=false"
    		# 用户名和密码
    		jdbc_user => "root"
    		jdbc_password => "123456"
    		# 数据库驱动所在位置,可以是绝对路径或者相对路径
    		jdbc_driver_library => "/usr/local/logstash-6.4.3/sync/mysql-connector-java-5.1.49.jar"
    		# 驱动类名
    		jdbc_driver_class => "com.mysql.jdbc.Driver"
    		# 开启分页
    		jdbc_paging_enabled => "true"
    		# 分页每页数量,可以自定义
    		jdbc_page_size => "1000"
    		# 执行的sql文件路径
    		statement_filepath => "/usr/local/logstash-6.4.3/sync/foodie-items.sql"
    		# 设置定时任务间隔 含义:分、时、天、月、年 全部为*默认是一分钟跑一次
    		schedule => "* * * * *"
    		# 索引类型
    		type => "_doc"
    		# 是否开启记录上次追踪结果
    		use_column_value => true
    		# 记录上一次追踪的结果值
    		last_run_metadata_path => "/usr/local/logstash-6.4.3/sync/track_time"
    		#
            tracking_column => "updated_time"
            # tracking_column 对应字段的类型		
    		tracking_column_type => "timestamp"
    		# 是否清楚 last_run_metadata_path记录
    		clean_run => false
    		# 数据库字段名称大写转小写
    		lowercase_column_names => false
    		
    	}
    }	
    output {
    	elasticsearch {
    		# es 地址
    		hosts => ["192.168.174.128:9200"]
    		index => "foodie-items"
    		document_id => "%{itemId}"
    	}
    	# 日志输出
    	stdout {
    		codec => json_lines
    	}
    }
    

    2.4 把mysql jar文件复制到/usr/local/logstash-6.4.3/sync/
    2.5 配置sql文件 /usr/local/logstash-6.4.3/sync/foodie-items.sql

    SELECT                                                                                             
               i.id as itemId,                                                                                
               i.item_name as itemName,                                                                       
               i.sell_counts as sellCounts,                                                                   
               ii.url as imgUrl,                                                                              
               tempSpec.price_discount as price,
               i.updated_time
           FROM                                                                                               
               items i                                                                                        
           LEFT JOIN                                                                                          
               items_img ii                                                                                   
           ON                                                                                                 
               i.id = ii.item_id                                                                              
           LEFT JOIN                                                                                          
               (                                                                                              
                   SELECT                                                                                     
                       item_id, MIN(price_discount) as price_discount                                         
                   FROM                                                                                       
                       items_spec                                                                             
                   GROUP BY                                                                                   
                       item_id                                                                                
               ) tempSpec                                                                                     
           ON                                                                                                 
               i.id = tempSpec.item_id                                                                        
           WHERE                                                                                              
               ii.is_main = 1
               AND 
               i.updated_time >= :sql_last_value 
    

    3.启动Logstash

    3.1 启动Logstash进行同步

    ./logstash -f /usr/local/logstash-6.4.3/sync/logstash-db-sync.conf
    

    3.2 同步的时候有个问题,建立索引时候默认用的es默认的分词,不适合中文

    4.修改logstash同步默认分词插件

    4.1 通过postman请求es,获取默认模板

    GET
    http://192.168.174.128:9200/_template/logstash
    

    内容是

    {
        "logstash": {
            "order": 0,
            "version": 60001,
            "index_patterns": [
                "logstash-*"
            ],
            "settings": {
                "index": {
                    "refresh_interval": "5s"
                }
            },
            "mappings": {
                "_default_": {
                    "dynamic_templates": [
                        {
                            "message_field": {
                                "path_match": "message",
                                "match_mapping_type": "string",
                                "mapping": {
                                    "type": "text",
                                    "norms": false
                                }
                            }
                        },
                        {
                            "string_fields": {
                                "match": "*",
                                "match_mapping_type": "string",
                                "mapping": {
                                    "type": "text",
                                    "norms": false,
                                    "fields": {
                                        "keyword": {
                                            "type": "keyword",
                                            "ignore_above": 256
                                        }
                                    }
                                }
                            }
                        }
                    ],
                    "properties": {
                        "@timestamp": {
                            "type": "date"
                        },
                        "@version": {
                            "type": "keyword"
                        },
                        "geoip": {
                            "dynamic": true,
                            "properties": {
                                "ip": {
                                    "type": "ip"
                                },
                                "location": {
                                    "type": "geo_point"
                                },
                                "latitude": {
                                    "type": "half_float"
                                },
                                "longitude": {
                                    "type": "half_float"
                                }
                            }
                        }
                    }
                }
            },
            "aliases": {}
        }
    }
    

    4.2 把内容复制下来,修改成下面这样

        {
            "order": 0,
            "version": 1,
            "index_patterns": ["*"],
            "settings": {
                "index": {
                    "refresh_interval": "5s"
                }
            },
            "mappings": {
                "_default_": {
                    "dynamic_templates": [
                        {
                            "message_field": {
                                "path_match": "message",
                                "match_mapping_type": "string",
                                "mapping": {
                                    "type": "text",
                                    "norms": false
                                }
                            }
                        },
                        {
                            "string_fields": {
                                "match": "*",
                                "match_mapping_type": "string",
                                "mapping": {
                                    "type": "text",
                                    "norms": false,
    								"analyzer":"ik_max_word",
                                    "fields": {
                                        "keyword": {
                                            "type": "keyword",
                                            "ignore_above": 256
                                        }
                                    }
                                }
                            }
                        }
                    ],
                    "properties": {
                        "@timestamp": {
                            "type": "date"
                        },
                        "@version": {
                            "type": "keyword"
                        },
                        "geoip": {
                            "dynamic": true,
                            "properties": {
                                "ip": {
                                    "type": "ip"
                                },
                                "location": {
                                    "type": "geo_point"
                                },
                                "latitude": {
                                    "type": "half_float"
                                },
                                "longitude": {
                                    "type": "half_float"
                                }
                            }
                        }
                    }
                }
            },
            "aliases": {}
        }
    
    

    4.3 在Logstash目录创建logstash-ik.json文件,把上面内容复制进去

    vim /usr/local/logstash-6.4.3/sync/logstash-ik.json
    

    4.4 在修改Logstash的同步文件

    vim /usr/local/logstash-6.4.3/sync/logstash-db-sync.conf
    

    out修改成

    output {
            elasticsearch {
                    # es 地址
                    hosts => ["192.168.174.128:9200"]
                    index => "foodie-items"
                    document_id => "%{itemId}"
    
                    # 定义模板名称
                    template_name => "myik"
    
                    # 模板所在位置
                    template => "/usr/local/logstash-6.4.3/sync/logstash-ik.json"
                    template_overwrite => true
                    # 默认为true, false关闭logstash自动管理模板功能
                    manage_template => true
            }
            # 日志输出
            stdout {
                    codec => json_lines
            }
    }
    
    
  • 相关阅读:
    oracle数据库 SQL语句、内置函数大全
    oracle数据库常用关键字汇总!
    Java 统计一个项目中src下的所有 .java 文件的代码行数, 注释行数, 空行数
    java用流实现创建文件夹, 文件改名, 文件删除, 文件复制
    深度解析线程工作原理
    重要的几种流:文件流、缓冲流、转换流!
    Java中流的概念和递归算法
    针对集合类容器的归纳总结!
    Comparable、Iterator接口和Collections类的实现方法
    Map接口的一些常用方法
  • 原文地址:https://www.cnblogs.com/hardy-wang/p/13880369.html
Copyright © 2011-2022 走看看