1、logstash介绍
logstash 是ES 下的一款开源软件、用于数据采集,就是从Mysql等数据源采集数据、更新数据、然后将数据发送到ES中创建、更新索引
2、安装
演示环境是windows
(1)下载logstash, 注意版本要和ES一致
地址:https://www.elastic.co/downloads/past-releases/logstash-6-2-1
下载zip包后解压
(2)安装ruby
由于logstash-input-jdbc是ruby开发的,所以先下载ruby并安装
地址:https://rubyinstaller.org/downloads/ 下载2.5版本即可,安装完成通过ruby -v 查看是否安装成功,注意添加环境变量、使用管理员打开cmd
(3)安装logstash-input-jdbc
logstash5.x是自带logstash-input-jdbc的,6.x版本不带了,需要手动安装
在logstash-6.2.1in 目录下执行: .logstash-plugin.bat install logstash-input-jdbc
安装完成可以在logstash根目录查看对应版本
3、创建模版文件
logstash从Mysql采取数据、向ES中索引,因此需要准备ES索引库的mapping模版。这里模版的filed对应数据库业务模型,通常如果相关业务字段分布在不同表,我们会将其整合到一张表,方便logstash做数据采集。
在logstash的config目录course_template.json
{ "mappings" : { "doc" : { "properties" : { "charge" : { "type" : "keyword" }, "description" : { "analyzer" : "ik_max_word", "search_analyzer" : "ik_smart", "type" : "text" }, "end_time" : { "format" : "yyyy-MM-dd HH:mm:ss", "type" : "date" }, "expires" : { "format" : "yyyy-MM-dd HH:mm:ss", "type" : "date" }, "grade" : { "type" : "keyword" }, "id" : { "type" : "keyword" }, "mt" : { "type" : "keyword" }, "name" : { "analyzer" : "ik_max_word", "search_analyzer" : "ik_smart", "type" : "text" }, "pic" : { "index" : false, "type" : "keyword" }, "price" : { "type" : "float" }, "price_old" : { "type" : "float" }, "pub_time" : { "format" : "yyyy-MM-dd HH:mm:ss", "type" : "date" }, "qq" : { "index" : false, "type" : "keyword" }, "st" : { "type" : "keyword" }, "start_time" : { "format" : "yyyy-MM-dd HH:mm:ss", "type" : "date" }, "status" : { "type" : "keyword" }, "studymodel" : { "type" : "keyword" }, "teachmode" : { "type" : "keyword" }, "teachplan" : { "analyzer" : "ik_max_word", "search_analyzer" : "ik_smart", "type" : "text" }, "users" : { "index" : false, "type" : "text" }, "valid" : { "type" : "keyword" } } } }, "template": "course" }
4、配置mysql.conf
logstash会根据mysql.conf文件的配置连接数据库和ES服务器,配置详细解释,见注释
logstash/config/mysql.conf
input { stdin { } jdbc { #数据源配置 jdbc_connection_string => "jdbc:mysql://localhost:3306/course?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=UTC" jdbc_user => "root" jdbc_password => mysql jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "50000" #指定jdbc驱动包位置 jdbc_driver_library => "F:/develop/maven/repository3/mysql/mysql-connector-java/5.1.41/mysql-connector-java-5.1.41.jar" #可以通过指定sql文件的方式工作,但是这里选用statement方式直接指定sql语句 #statement_filepath => "/conf/course.sql" #由于ES采用UTS时区、比北京时间早8小时、所以读取最后更新时间加8小时 statement => "select * from course_pub where timestamp > date_add(:sql_last_value,INTERVAL 8 HOUR)" #定时配置,默认一分钟执行一次 schedule => "* * * * *" record_last_run => "true" #配置最后更新时间保存位置,默认保存在config/logstash_metadata文件中,注意修改目录 last_run_metadata_path => "D:/ElasticSearch3940/logstash-6.2.1/config/logstash_metadata" } } output { elasticsearch { #ES的ip地址和端口 hosts => "localhost:9200" #hosts => ["localhost:9200","localhost:9202","localhost:9203"] #ES索引库名称 index => "xc_course" #指定数据库模型中那个字段作为document的id document_id => "%{courseid}" document_type => "doc" template =>"D:/ElasticSearch3940/logstash-6.2.1/config/course_template.json" template_name =>"course" template_overwrite =>"true" } stdout { #日志输出 codec => json_lines } }
5、测试
执行启动logstash
.logstash.bat -f ..configmysql.conf
业务代码中跟新数据模型数据,注意更新timestamptime,使用head登录查看索引库数据,是否更新