zoukankan      html  css  js  c++  java
  • ElasticSearch+Springboot实际应用:索引同步建设,搜索过程

    1、介绍

         springboot框架,众多自动化的部署和约定配置,造成了springboot的着手麻烦,熟练后可以快速快捷进行开发,常用作快捷开发的java底层框架。各位看官都是大神,自行体会。

         elasticsearch很受欢迎的的一款拥有活跃社区开源的搜索解决方案,底层用的是luence。
         elasticsearch具有很丰富的插件库,对于很多开源框架都支持使得ES很是受欢迎。
     
    2、安装配置过程
    安装elasticsearch参考本博客的另一篇文章:http://www.cnblogs.com/zhongshengzhen/p/elasticsearch_mysql.html
    mysql通过logstash同步数据到elasticsearch参考文章:http://www.cnblogs.com/zhongshengzhen/p/elasticsearch_logstash.html
    logstash同步数据写的较为简单,这里补充完成。
     
    采用增量的方式导入mysql新增、修改的数据,前提是数据库中的数据不进行删除,只修改数据库字段的状态。
     
    logstash jdbc的配置内容见下:
     
    [zsz@VS-zsz conf]$ cd /usr/local/logstash-2.4.0/conf 
    [zsz@VS-zsz conf]$ vi logstash-mysql-news.conf
     


    input {
      jdbc {
        jdbc_driver_library => "/usr/local/logstash-2.4.0/mysql-connector-java-5.1.39.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_connection_string => "jdbc:mysql://******************:3306/******?characterEncoding=UTF-8&useSSL=false"
        jdbc_user => "*******"
        jdbc_password => "*******************"
        statement => "SELECT n.pk,n.media_pk as mediapk,n.user_pk as userpk,n.access_source_pk as accesssourcepk,updated_at   FROM tablename1  n LEFT JOIN v ON(n.pk=v.news_pk) LEFT JOIN tablename2   c ON(n.pk=c.news_pk) WHERE date_sub(n.updated_at,interval 8 hour)  > :sql_last_value"
        last_run_metadata_path => "/usr/local/logstash-2.4.0/conf/lastRun.news"
        use_column_value => true
        tracking_column => updated_at
        jdbc_paging_enabled => "true"
        jdbc_page_size => "50000"
        schedule => "*/5 * * * *"
        type => "news"
      }
    jdbc {
        jdbc_driver_library => "/usr/local/logstash-2.4.0/mysql-connector-java-5.1.39.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_connection_string => "jdbc:mysql://*************:3306/*******?characterEncoding=UTF-8&useSSL=false"
        jdbc_user => "touchtv"
        jdbc_password => "op@touchtv"
        statement => "SELECT pk,name,avatar_url as avatarurl,`desc`,status,remark,identity_type as identitytype,updated_at   FROM tablename WHERE status=1 AND date_sub(updated_at,interval 8 hour)  > :sql_last_value"
        last_run_metadata_path => "/usr/local/logstash-2.4.0/conf/lastRun.media"
        use_column_value => true
        tracking_column => updated_at
        jdbc_paging_enabled => "true"
        jdbc_page_size => "50000"
        schedule => "*/20 * * * *"
        type => "media"
      }

    }

    filter {
       json {
            source => "message"
            remove_field => ["message"]
        }
    }

    output {
      stdout {
        codec => rubydebug
      }
      elasticsearch {
        hosts => "192.168.*******"
        index => "indexname"
        document_id => "%{pk}"
      }


    注意:
    last_run_metadata_path => "/usr/local/logstash-2.4.0/conf/lastRun.news" ,建议做这个配置,用来记录上次更新的时间或者ID,达到重启logstash而不会重新全量导入数据的目的。很重要的配置。
    use_column_value => true    ,必须配置,开启字段跟踪
    tracking_column => updated_at ,必须配置,指定跟踪的字段名,必须在返回的SQL结果集中存在的字段,要不然会有WARN。
    :sql_last_value 上次执行记录的点,只能是数字类型或者时间类型,具体可以参考官方文档。
    date_sub(n.updated_at,interval 8 hour) 这个需要做时间的转换,是个大坑。由于logstash取的时间@timestamp的时间比本地早8个小时,这个时间是UTC时间,日志应统一采用这个时间所以做的转换,也可以修改logstash的配置来处理,但是logstash的日志及很多插件都是用了UTC时间,修改后需要对周边的搭配的框架(如Kibana)也进行修改,很是麻烦,所以建议不要修改logstash的UTC时间配置。
    document_id => "%{pk}" 必须配置,这里是对index进行唯一性的命名,这个配置可以避免同一条数据的修改可以更新到相应的记录上,相当于关系型数据库中的主键。由此可以看到,数据表建模时,最好可以所有的表都有一个自增字段来唯一识别一条记录。
     
    3、启停logstash脚本
    restart.sh
     
    confFile="logstash-mysql-news.conf"
    basepath=$(cd `dirname $0`; pwd)

    directory=${basepath%/*}

    cd $directory


    pidString=`ps -ef | grep rg.jruby.Main | grep "${confFile}" | grep -v grep | awk '{print $2}'`


    if [ -n "$pidString" ]; then

            kill -9 $pidString        
            echo -e " STOP successfully! "
    else
            echo -e " No need to be stoped because it had already been stoped "

    fi
    folder="${directory}/logs"

    if [ ! -d "$folder" ]; then
      mkdir "$folder"
    fi
    nohup bin/logstash -w 4 -f conf/"${confFile}" -l logs/logstash.log.news.`date -d today +"%Y-%m-%d"` > logs/nohup.log.news.`date -d today +"%Y-%m-%d"` 2>&1 &

    echo -e "RESTARTING..."
    sleep 3

    pidString=`ps -ef | grep rg.jruby.Main | grep -v grep | awk '{print $2}'`

    if [ -n "$pidString" ]; then
      echo -e " RESTART SUCCESSFUL! "
    else
      echo -e " RESTART FAILED! "
    fi
     
    采用nohup启动logstash同步,同时对线程启动或者暂停,各位看官可以根据个人需要修改,记得点赞啊,如果觉得不错。
     
    解析:bin/logstash -w 4 这个指定是根据jdbc的进程来决定的,这里是允许同时运行四个线程
     
    4、springboot的配置
     
    springboot框架的使用这里就不说了,其实就是一个用spring管理elasticsearch连接和映射的东西,为了方便,你也可以直接用main函数作为客户端连接elasticsearch进行测试。连接方式可能不一样,但是检索过程可以相互借鉴。
     
    配置文件配置elasticsearc
    pom.xml
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
    </dependency>
     
    application.yml
    spring.data.elasticsearch.cluster-name: mycluster
    spring.data.elasticsearch.cluster-nodes : 192.168.31.78:9300,192.168.31.79:9300
    spring.data.elasticsearch.repositories.enabled : true
    这里nodes配置了两部机器,相当于配置了一个双节点的ES的集群,当一部机器进程关闭,依旧保证服务。
     
     
    定义实体类,配置与elasticsearch的字段映射
    package cn.search.domain;

    import org.springframework.data.annotation.Id;
    import org.springframework.data.annotation.Version;
    import org.springframework.data.elasticsearch.annotations.Document;
    import java.sql.Timestamp;

    /**
    * Created by zhongshzh on 2016/10/17.
    */
    @Document(indexName = "zsz", type = "news", shards = 10, replicas = 0, refreshInterval = "-1")
    public class News {
    @Id
    private long pk;
    @Version
    private Long version;

    private int mediapk;
    private int userpk;
    ......}
    配置了indexName和typeName,News.java都是些getter和setter方法,这里不一一列举了。必须定义@Id的字段,要不然会报错。
     
    5、springboot的检索
         5.1:继承ElasticsearchRepository的方式检索
    package cn.search.domain.repository;

    import cn.search.domain.Media;
    import org.springframework.data.domain.Page;
    import org.springframework.data.domain.Pageable;
    import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;

    /**
    * Created by zhongshzh on 2016/10/19.
    */
    public interface MediaRepository extends ElasticsearchRepository<Media, String> {

    public Media findByPk(int pk);

    }
    上面定义了一个方法findByPk(pk),通过pk进行检索,这种方法的使用详见springboot jpas,相同的使用方法。
     
         5.1:构造QueryBuilder的方式检索
     
    采用springdata的方式不太方便,对于含有and... (or...or......)的查询就无能为力了,所以更通用的是使用QueryBuilder。 
    public Page<News> searchNews(String keyword, int pageSize, int pageNum) {

    QueryBuilder queryBuilder = QueryBuilders.boolQuery()
    .must(QueryBuilders.
    termQuery("checkstatus", "0"))
    .must(QueryBuilders.
    multiMatchQuery(keyword, "title", "summary", "content"));

    Pageable pageable = new PageRequest(pageNum, pageSize);
    Page<News> pageNews = newsSearchRepository.search(queryBuilder, pageable);
    if(pageNews==null || pageNews.getSize() < 1)
    return null;
    return pageNews;
    }
     
    QueryBuilder queryBuilder = QueryBuilders.boolQuery().must(QueryBuilders.termQuery("checkstatus", "0")).must(QueryBuilders.multiMatchQuery(keyword, "title", "summary"));
     
    跟SQL:select * from news where checkstatus=0 and (title like '%keyword%' or summary  like '%keyword%')的含义一样。
    获得的结果集是根据相关度来排序的,自定义的排序,暂时我也不清楚,有了解的大神还请不吝赐教。
     
    本文的原文地址:http://www.cnblogs.com/zhongshengzhen/p/elasticsearch_springboot.html
     
  • 相关阅读:
    instanceof 关键词
    类,类中成员变量,类中成员方法,方法中的局部变量,接口,接口中的方法的访问修饰符
    2.Object
    1API简介
    CMC 实例管理
    BW 转换字符空格问题
    BW ON HANA 业务模型关系与数据取数
    进步缓慢
    BO客户端安装更新,重新启动挂起。
    放弃看图,无差别筛选。
  • 原文地址:https://www.cnblogs.com/zhongshengzhen/p/elasticsearch_springboot.html
Copyright © 2011-2022 走看看