zoukankan      html  css  js  c++  java
  • elasticsearch-logstash

    1、logstash介绍

    logstash 是ES 下的一款开源软件、用于数据采集,就是从Mysql等数据源采集数据、更新数据、然后将数据发送到ES中创建、更新索引

    2、安装

    演示环境是windows

    (1)下载logstash, 注意版本要和ES一致

    地址:https://www.elastic.co/downloads/past-releases/logstash-6-2-1

    下载zip包后解压

    (2)安装ruby

    由于logstash-input-jdbc是ruby开发的,所以先下载ruby并安装

    地址:https://rubyinstaller.org/downloads/    下载2.5版本即可,安装完成通过ruby -v 查看是否安装成功,注意添加环境变量、使用管理员打开cmd

    (3)安装logstash-input-jdbc

    logstash5.x是自带logstash-input-jdbc的,6.x版本不带了,需要手动安装

    在logstash-6.2.1in 目录下执行: .logstash-plugin.bat install logstash-input-jdbc

    安装完成可以在logstash根目录查看对应版本

    3、创建模版文件

    logstash从Mysql采取数据、向ES中索引,因此需要准备ES索引库的mapping模版。这里模版的filed对应数据库业务模型,通常如果相关业务字段分布在不同表,我们会将其整合到一张表,方便logstash做数据采集。

    在logstash的config目录course_template.json

    {
       "mappings" : {
          "doc" : {
             "properties" : {
                "charge" : {
                   "type" : "keyword"
                },
                "description" : {
                   "analyzer" : "ik_max_word",
                   "search_analyzer" : "ik_smart",
                   "type" : "text"
                },
                "end_time" : {
                   "format" : "yyyy-MM-dd HH:mm:ss",
                   "type" : "date"
                },
                "expires" : {
                   "format" : "yyyy-MM-dd HH:mm:ss",
                   "type" : "date"
                },
                "grade" : {
                   "type" : "keyword"
                },
                "id" : {
                   "type" : "keyword"
                },
                "mt" : {
                   "type" : "keyword"
                },
                "name" : {
                   "analyzer" : "ik_max_word",
                   "search_analyzer" : "ik_smart",
                   "type" : "text"
                },
                "pic" : {
                   "index" : false,
                   "type" : "keyword"
                },
                "price" : {
                   "type" : "float"
                },
                "price_old" : {
                   "type" : "float"
                },
                "pub_time" : {
                   "format" : "yyyy-MM-dd HH:mm:ss",
                   "type" : "date"
                },
                "qq" : {
                   "index" : false,
                   "type" : "keyword"
                },
                "st" : {
                   "type" : "keyword"
                },
                "start_time" : {
                   "format" : "yyyy-MM-dd HH:mm:ss",
                   "type" : "date"
                },
                "status" : {
                   "type" : "keyword"
                },
                "studymodel" : {
                   "type" : "keyword"
                },
                "teachmode" : {
                   "type" : "keyword"
                },
                "teachplan" : {
                   "analyzer" : "ik_max_word",
                   "search_analyzer" : "ik_smart",
                   "type" : "text"
                },
                "users" : {
                   "index" : false,
                   "type" : "text"
                },
                "valid" : {
                   "type" : "keyword"
                }
             }
          }
       },
       "template": "course"
    }

    4、配置mysql.conf

    logstash会根据mysql.conf文件的配置连接数据库和ES服务器,配置详细解释,见注释

    logstash/config/mysql.conf

    input {
      stdin {
      }
      jdbc {
          #数据源配置
          jdbc_connection_string => "jdbc:mysql://localhost:3306/course?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=UTC"
          jdbc_user => "root"
          jdbc_password => mysql
          jdbc_driver_class => "com.mysql.jdbc.Driver"
          jdbc_paging_enabled => "true"
          jdbc_page_size => "50000"
          #指定jdbc驱动包位置
          jdbc_driver_library => "F:/develop/maven/repository3/mysql/mysql-connector-java/5.1.41/mysql-connector-java-5.1.41.jar"
          
          #可以通过指定sql文件的方式工作,但是这里选用statement方式直接指定sql语句
          #statement_filepath => "/conf/course.sql"
          #由于ES采用UTS时区、比北京时间早8小时、所以读取最后更新时间加8小时
          statement => "select * from course_pub where timestamp > date_add(:sql_last_value,INTERVAL 8 HOUR)"
          #定时配置,默认一分钟执行一次
          schedule => "* * * * *"
          record_last_run => "true"
          #配置最后更新时间保存位置,默认保存在config/logstash_metadata文件中,注意修改目录
          last_run_metadata_path => "D:/ElasticSearch3940/logstash-6.2.1/config/logstash_metadata"
      }
    }
    
    output {
      elasticsearch {
          #ES的ip地址和端口
          hosts => "localhost:9200"
          #hosts => ["localhost:9200","localhost:9202","localhost:9203"]
          #ES索引库名称
          index => "xc_course"
          #指定数据库模型中那个字段作为document的id
          document_id => "%{courseid}"
          document_type => "doc"
          template =>"D:/ElasticSearch3940/logstash-6.2.1/config/course_template.json"
          template_name =>"course"
          template_overwrite =>"true"
      }
      stdout {
          #日志输出
          codec => json_lines
      }
    }

    5、测试

    执行启动logstash

    .logstash.bat -f ..configmysql.conf

    业务代码中跟新数据模型数据,注意更新timestamptime,使用head登录查看索引库数据,是否更新

  • 相关阅读:
    Spark SQL概述
    Spark编程进阶篇
    数据的读取与保存
    键值对RDD数据分区器
    Spark master的HA实战案例
    生产环境中zookeeper分布式集群部署实战案例
    Spark的RDD编程实战案例
    部署Spark历史服务器
    Spark的Standalone运行模式部署实战案例
    将开发的程序打包到正式环境中运行实战篇
  • 原文地址:https://www.cnblogs.com/dehigher/p/10169480.html
Copyright © 2011-2022 走看看