
课程索引
如何维护课程索引信息?
1、当课程向MySQL添加后同时将课程信息添加到索引库。采用Logstach实现,Logstach会从MySQL中将数据采集到ES索引库。
2、当课程在MySQL更新信息后同时更新该课程在索引库的信息。采用Logstach实现。
3、当课程在MySQL删除后同时将该课程从索引库删除。手工写程序实现,在删除课程后将索引库中该课程信息删除。
下载Logstash
下载版本和es版本需要一致,下载完毕后,直接解压
安装logstash-input-jdbc(可能不需要安装,可以略过)
logstash-input-jdbc是ruby开发的,先下载ruby并安装(测试中不需要安装)
Logstash5.x以上版本本身自带有logstash-input-jdbc,6.x版本本身不带logstash-input-jdbc插件,需要手动安装,安装成功后我们可以在logstash根目录下的以下目录查看对应的插件版本
创建模板文件
Logstash的工作是从MySQL中读取数据,向ES中创建索引,这里需要提前创建mapping的模板文件以便logstash使用。
示例:在logstach的config目录创建xc_course_template.json,内容如下
{
"mappings" : {
"doc" : {
"properties" : {
"charge" : {
"type" : "keyword"
},
"description" : {
"analyzer" : "ik_max_word",
"search_analyzer" : "ik_smart",
"type" : "text"
},
"end_time" : {
"format" : "yyyy-MM-dd HH:mm:ss",
"type" : "date"
},
"expires" : {
"format" : "yyyy-MM-dd HH:mm:ss",
"type" : "date"
},
"grade" : {
"type" : "keyword"
},
"id" : {
"type" : "keyword"
},
"mt" : {
"type" : "keyword"
},
"name" : {
"analyzer" : "ik_max_word",
"search_analyzer" : "ik_smart",
"type" : "text"
},
"pic" : {
"index" : false,
"type" : "keyword"
},
"price" : {
"type" : "float"
},
"price_old" : {
"type" : "float"
},
"pub_time" : {
"format" : "yyyy-MM-dd HH:mm:ss",
"type" : "date"
},
"qq" : {
"index" : false,
"type" : "keyword"
},
"st" : {
"type" : "keyword"
},
"start_time" : {
"format" : "yyyy-MM-dd HH:mm:ss",
"type" : "date"
},
"status" : {
"type" : "keyword"
},
"studymodel" : {
"type" : "keyword"
},
"teachmode" : {
"type" : "keyword"
},
"teachplan" : {
"analyzer" : "ik_max_word",
"search_analyzer" : "ik_smart",
"type" : "text"
},
"users" : {
"index" : false,
"type" : "text"
},
"valid" : {
"type" : "keyword"
}
}
}
},
"template" : "xc_course"
}
配置mysql.conf
数据表中,需要定义一个timestamp的字段,类型就是timestamp
input {
stdin {
}
jdbc {
jdbc_connection_string => "jdbc:mysql://localhost:3306/xc_course?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=UTC"
# the user we wish to excute our statement as
jdbc_user => "root"
jdbc_password => "123456"
# the path to our downloaded jdbc driver
jdbc_driver_library => "D:/elasticsearch-6.2.2/mysql-connector-java-8.0.18.jar"
# the name of the driver class for mysql
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
#要执行的sql文件
#statement_filepath => "/conf/course.sql"
statement => "select * from course_pub where timestamp > date_add(:sql_last_value,INTERVAL 8 HOUR)"
#定时配置(每一分钟采集一次)
schedule => "* * * * *"
record_last_run => true
#记录了上次采集数据的时间
last_run_metadata_path => "D:/elasticsearch-6.2.2/logstash-6.2.2/config/logstash_metadata"
}
}
output {
elasticsearch {
#ES的ip地址和端口
hosts => "localhost:9200"
#hosts => ["localhost:9200","localhost:9202","localhost:9203"]
#ES索引库名称
index => "xc_course"
document_id => "%{id}"
document_type => "doc"
template =>"D:/elasticsearch-6.2.2/logstash-6.2.2/config/xc_course_template.json"
template_name =>"xc_course"
template_overwrite =>"true"
}
stdout {
#日志输出
codec => json_lines
}
}
启动,并测试
启动
.logstash.bat -f ..configmysql.conf
修改course_pub中的数据,并且修改timestamp为当前时间,观察Logstash日志是否读取到要索引的数据。
查看索引文档内容是否修改。
启动流程:logstash启动成功后,会去读取last_run_metadata_path中的时间,执行statement这条sql语句,进行数据的采集。
实际生产中使用logstash
我们需要在特定的条件中,将数据汇总到一张表中,logstash对这一张表建立索引,将数据存放到es中;就是logstash监测这一张表的变化;
使用
1、创建ES索引库
2、启动logstash