zoukankan      html  css  js  c++  java
  • 通过Logstash全量和增量同步Mysql一对多关系到Elasticsearch

    前言

    在实际开发项目过程当中,难免会使用到Elasticsearch做搜索。文章描述从Mysql通过Logstash实时同步到Elasticsearch,下面就开始来进行实现吧!具体的Elasticsearch+Logstash+kibana搭建,请移步到 ELK搭建步骤

    实现方案

    本人总结了两种实现方案来实现mysql到es的同步。

    1. 使用Elastic官方提供的 Logstash 来实现Mysql的全量和增量同步(根据时间戳或者自增id)。
    2. 使用Elastic 官方提供的 Logstash 来实现全量同步,后续的数据库表更新、删除、修改等通过阿里开源的框架canal实现(增量同步)。 canal伪装成mysql的从节点,通过binlog日志文件进行同步,通过Java程序进行监听,同步到Elasticsearch当中。

    本次介绍通过 Elastic 官方提供的 Logstash 来实现Mysql的全量和增量同步

    全量和增量同步

    先看Mysql表的关系
    一个是主表:news 资讯文章表,表内容如下:
    在这里插入图片描述
    一个是从表:custom_infomation 定制信息表,与news 成 一对多的关系,一条文章对应多条定制信息。表内容如下:
    在这里插入图片描述
    描述:custom_information表中的item_id和news表中的id有关联关系。

    用JSON数据结构来描述一对多的关系,如下:

    {
        "id":"15c7ee7a5dc411ea9bc2fa163e0c8256",
        "title":"“宅经济”进入数字化时代",
        "source":"人民日报",
        "customList":[
            {
                "secondLevel":"32552",
                "isRelEnterprise":"0",
                "secondLevelName":"济南",
                "moduleType":"1",
                "customName":"地区1",
                "firstLevel":"37200",
                "firstLevelName":"山东",
                "customId":"1",
                "detId":"1"
            },
            {
                "secondLevel":"222",
                "isRelEnterprise":"0",
                "secondLevelName":"林业1",
                "moduleType":"1",
                "customName":"行业1",
                "firstLevel":"11",
                "firstLevelName":"林业",
                "customId":"2",
                "detId":"3"
            }
        ]
    }
    

    这里需要和Elasticsearch做映射关系。在Elasticsearch中也是一对多的关系。大致是这样的结构,这里采用的是Elasticsearch中的nested类型来实现。
    在这里插入图片描述

    创建所需索引(采用静态mapping映射)

    PUT app-article-link
    {
      "mappings" : {
          "properties" : {
            "address" : {
              "type" : "text",
              "fields" : {
                "keyword" : {
                  "type" : "keyword",
                  "ignore_above" : 256
                }
              }
            },
            "customList" : {
              "type" : "nested",
              "properties" : {
                "customId" : {
                  "type" : "text",
                  "fields" : {
                    "keyword" : {
                      "type" : "keyword",
                      "ignore_above" : 256
                    }
                  }
                },
                "customName" : {
                  "type" : "text",
                  "fields" : {
                    "keyword" : {
                      "type" : "keyword",
                      "ignore_above" : 256
                    }
                  }
                },
                "detId" : {
                  "type" : "keyword"
                },
                "firstLevel" : {
                  "type" : "keyword"
                },
                "firstLevelName" : {
                  "type" : "text",
                  "fields" : {
                    "keyword" : {
                      "type" : "keyword",
                      "ignore_above" : 256
                    }
                  }
                },
                "isRelEnterprise" : {
                  "type" : "keyword"
                },
                "moduleType" : {
                  "type" : "keyword"
                },
                "secondLevel" : {
                  "type" : "keyword"
                },
                "secondLevelName" : {
                  "type" : "text",
                  "fields" : {
                    "keyword" : {
                      "type" : "keyword",
                      "ignore_above" : 256
                    }
                  }
                }
              }
            },
            "custom_list" : {
              "type" : "text",
              "fields" : {
                "keyword" : {
                  "type" : "keyword",
                  "ignore_above" : 256
                }
              }
            },
            "detail" : {
              "type" : "text",
              "analyzer" : "ik_max_word",
              "search_analyzer" : "ik_smart"
            },
            "endTime" : {
              "type" : "keyword"
            },
            "id" : {
              "type" : "keyword"
            },
            "industryName" : {
              "type" : "text",
              "fields" : {
                "keyword" : {
                  "type" : "keyword",
                  "ignore_above" : 256
                }
              }
            },
            "isDelete" : {
              "type" : "keyword"
            },
            "price" : {
              "type" : "keyword"
            },
            "publishDate" : {
              "type" : "keyword"
            },
            "relevanceType" : {
              "type" : "keyword"
            },
            "savePath" : {
              "type" : "keyword"
            },
            "source" : {
              "type" : "text",
              "fields" : {
                "keyword" : {
                  "type" : "keyword",
                  "ignore_above" : 256
                },
                "suggest" : {
                  "type" : "completion",
                  "analyzer" : "simple",
                  "preserve_separators" : true,
                  "preserve_position_increments" : true,
                  "max_input_length" : 50
                }
              },
              "analyzer" : "ik_max_word",
              "search_analyzer" : "ik_smart"
            },
            "startTime" : {
              "type" : "keyword"
            },
            "summary" : {
              "type" : "text",
              "analyzer" : "ik_max_word",
              "search_analyzer" : "ik_smart"
            },
            "techFieldName" : {
              "type" : "text",
              "fields" : {
                "keyword" : {
                  "type" : "keyword",
                  "ignore_above" : 256
                }
              }
            },
            "title" : {
              "type" : "text",
              "fields" : {
                "keyword" : {
                  "type" : "keyword",
                  "ignore_above" : 256
                },
                "suggest" : {
                  "type" : "completion",
                  "analyzer" : "simple",
                  "preserve_separators" : true,
                  "preserve_position_increments" : true,
                  "max_input_length" : 50
                }
              },
              "analyzer" : "ik_max_word",
              "search_analyzer" : "ik_smart"
            },
            "update_time" : {
              "type" : "keyword"
            },
            "videoStatus" : {
              "type" : "keyword"
            }
          }
        }
    }
    

    以下是Logstash 相关配置操作:
    由于上面描述的数据库表是一对多的关系,这里选择先建立一个视图,原因是会通过数据库表的最新时间字段来作为临界点进行数据同步(关键点是找出主表和从表的最新时间点)。视图创建sql如下:

    SELECT
            t.id,
            t.title,
            t.source,
            '8' AS relevanceType ,
            date_format( greatest( `t`.`update_time`, ifnull( `i`.`update_time`, '1970' )), '%Y-%m-%d %H:%i:%s' ) AS `update_time`
           
    FROM
            `news` t
            LEFT JOIN custom_information i
            ON t.id=i.item_id
            AND i.is_delete='0'
            AND i.module_type='8'
    WHERE
            t.state = '0'
            AND t.publish_status='3'
            AND t.relevance_type='2'
            
    

    上面的update_time为两表中的最新时间。

    在logstash congf目录下创建news.conf,内容如下:

    input {
      jdbc {
        jdbc_driver_library => "/opt/apps/logstash/lib/mysql-connector-java-8.0.13.jar"
        jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
        jdbc_connection_string => "jdbc:mysql://192.168.0.178:3306/test?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true"
        jdbc_user => "root"
        jdbc_password => "123456"
        connection_retry_attempts => "3"
        jdbc_validation_timeout => "3600"
        jdbc_paging_enabled => "true"
        jdbc_page_size => "500"  
        statement_filepath => "/opt/apps/logstash/sql/news.sql"
        use_column_value => true
        lowercase_column_names => false
        tracking_column => "update_time"
        tracking_column_type => "timestamp"
        record_last_run => true
        last_run_metadata_path => "/opt/apps/logstash/station/news.txt"
        clean_run => false
        schedule => "*/5 * * * * *"
        type => "news"
      }
    }
     
    filter {
    
    	aggregate {
    		task_id => "%{id}"
    		code => "
    			map['id'] = event.get('id')
    			map['title'] = event.get('title')
                map['source'] = event.get('source')
    			map['custom_list'] ||=[]
    			map['customList'] ||=[]
    			if (event.get('detId') != nil)
    				if !(map['custom_list'].include? event.get('detId'))  
    					map['custom_list'] << event.get('detId')        
    					map['customList'] << {
    						'detId' => event.get('detId'),
    						'moduleType' => event.get('moduleType'),
    						'customId' => event.get('customId'),
                            'customName' => event.get('customName'),
                            'firstLevel' => event.get('firstLevel'),
                            'firstLevelName' => event.get('firstLevelName'),
                            'secondLevel' => event.get('secondLevel'),
                            'secondLevelName' => event.get('secondLevelName'),
                            'isRelEnterprise' => event.get('isRelEnterprise')
    					}
    				end
    			end
    			event.cancel()
    		"
    		
    		push_previous_map_as_event => true
    		timeout => 5
    	}
    
      mutate {
      }
      mutate {
        remove_field => ["@timestamp","@version"]
      }
    }
     
    output {
      elasticsearch {
        document_id => "%{id}"
        document_type => "_doc"
        index => "app-article-link"
        hosts => ["http://192.168.0.178:9200"]
      }
      stdout{
        codec => rubydebug
      }
    }
    
    

    input{} 中
    statement_filepath 为sql语句位置,
    last_run_metadata_path 记录最新时间位置,下次从这个时间点开始更新,
    tracking_column 为更新的时间字段,
    schedule 执行的时间 上述中每个五秒钟执行一次,

    执行的sql:

    SELECT
    	n.id,
    	n.title,
    	n.source
    FROM
    	news_view n 
    	order by n.update_time
    
    

    编辑conf/pipelines.yml

    [root@localhost config]# vi pipelines.yml 
    
    # List of pipelines to be loaded by Logstash
    #
    # This document must be a list of dictionaries/hashes, where the keys/values are pipeline settings.
    # Default values for omitted settings are read from the `logstash.yml` file.
    # When declaring multiple pipelines, each MUST have its own `pipeline.id`.
    #
    # Example of two pipelines:
    #
    # - pipeline.id: test
    #   pipeline.workers: 1
    #   pipeline.batch.size: 1
    #   config.string: "input { generator {} } filter { sleep { time => 1 } } output { stdout { codec => dots } }"
    # - pipeline.id: another_test
    #   queue.type: persisted
    #   path.config: "/tmp/logstash/*.config"
    #
    #- pipeline.id: news_table
    #  path.config: /opt/apps/logstash/config/addmysql.conf
    #- pipeline.id: news_table3
    #  path.config: /opt/apps/logstash/config/addmysql3.conf
    - pipeline.id: news
      path.config: /opt/apps/logstash/config/news.conf
    
    

    执行./bin/logstash
    [root@localhost logstash]# ./bin/logstash

    kibana常用查询
    精确查询

    GET /app-article-link/_search
    {
     "_source": ["id","title","source","customList","update_time","savePath","isDelete"], 
      "query": {
        "bool": {
          "must": [
          { "match": { "id": "15c7ee7a5dc411ea9bc2fa163e0c8256" }}
          ]
    }}}
    

    nested查询,mapping映射类型必须为nested

    GET app-article-link/_search
    {
      "query": {
        "bool": {
          "must": [
            {
              "nested": {
                "path": "customList",
                "query": {
                  "bool": {
                    "must": [
                      { "match": { "customList.customId": "1" }},
                       { "match": { "customList.secondLevel": "5552" }}
                    ]
            }}}}
          ]
    }}}
    

    自动补全查询,字段类型必须为completion

    GET app-article-link/_search
    {
      "_source": ["source","title","detail"],
      "suggest": {
        "title_suggest": {
          "prefix": "国家知识产",
          "completion": {
            "field": "title.suggest",
            "size": 10,
            "skip_duplicates": true
          }
        }
      }
    }
    

    高亮查询

    GET app-article-link/_search
    {
     
      "query": {
        "multi_match": {
          "query": "安徽",
          "fields": ["title"]
        }
      },
      "highlight": {
        "pre_tags": "<span class='highLight'>",
        "post_tags": "</span>",
        "fields": {
          "title": {}
        }
      }
    }
    

    最终通过Logstash导入的数据格式:
    在这里插入图片描述

    SpringBoot集成Elasticearch

    搭建的Elasticsearch为7.8.1版本。
    引入依赖

    <!-- es搜索 -->
    <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-high-level-client</artifactId>
        <version>7.8.1</version>
    </dependency>
    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch</artifactId>
        <version>7.8.1</version>
    </dependency>
    <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-client</artifactId>
        <version>7.8.1</version>
    </dependency>
    

    创建配置

    @Configuration
    public class ESConfig {
    
        private static String hosts = "192.168.0.178"; // 集群地址,多个用,隔开
        private static int port = 9200; // 使用的端口号
        private static String schema = "http"; // 使用的协议
        private static ArrayList<HttpHost> hostList = null;
    
        private static int connectTimeOut = 1000; // 连接超时时间
        private static int socketTimeOut = 30000; // 连接超时时间
        private static int connectionRequestTimeOut = 500; // 获取连接的超时时间
    
        private static int maxConnectNum = 100; // 最大连接数
        private static int maxConnectPerRoute = 100; // 最大路由连接数
        static {
            hostList = new ArrayList<>();
            String[] hostStrs = hosts.split(",");
            for (String host : hostStrs) {
                hostList.add(new HttpHost(host, port, schema));
            }
        }
        @Bean
        public RestHighLevelClient restHighLevelClient(){
            RestClientBuilder builder = RestClient.builder(hostList.toArray(new HttpHost[0]));
            // 异步httpclient连接延时配置
            builder.setRequestConfigCallback(new RequestConfigCallback() {
                @Override
                public Builder customizeRequestConfig(Builder requestConfigBuilder) {
                    requestConfigBuilder.setConnectTimeout(connectTimeOut);
                    requestConfigBuilder.setSocketTimeout(socketTimeOut);
                    requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimeOut);
                    return requestConfigBuilder;
                }
            });
            // 异步httpclient连接数配置
            builder.setHttpClientConfigCallback(new HttpClientConfigCallback() {
                @Override
                public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                    httpClientBuilder.setMaxConnTotal(maxConnectNum);
                    httpClientBuilder.setMaxConnPerRoute(maxConnectPerRoute);
                    return httpClientBuilder;
                }
            });
            RestHighLevelClient client = new RestHighLevelClient(builder);
            return client;
        }
    }
    

    编写测试
    注入template

     @Autowired
        private RestHighLevelClient restHighLevelClient ;
    

    高亮搜索

    public ResultBody highlighted(@RequestParam(value = "key") String key,
                                      @RequestParam(value = "pageSize",defaultValue = "10") Integer pageSize,
                                      @RequestParam(value = "from",defaultValue = "1") Integer from) throws IOException {
    
    
            // 偏移量
            int offset = (from -1) * pageSize ;
            
    
            //定义索引库
            SearchRequest searchRequest = new SearchRequest("app-article-link");
    
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
    
            // 创建查询语句 ES中must和should不能同时使用 同时使用should失效 嵌套多个must 将should条件拼接在一个must中即可
    
            BoolQueryBuilder shouldQuery = QueryBuilders.boolQuery();
            
    		// 行业
            /**if(industryList.size()>0) {
                for (Map<String,String> itemMap : industryList) {
                    String customId = itemMap.get("customId");
                    String firstLevel = itemMap.get("firstLevel");
                    String secondLevel = itemMap.get("secondLevel");
    
                    NestedQueryBuilder nestedQueryBuilder = QueryBuilders.nestedQuery("customList",
                            QueryBuilders.boolQuery().must(QueryBuilders.matchQuery("customList.customId.keyword", customId))
                                    .must(QueryBuilders.matchQuery("customList.firstLevel",firstLevel))
                                    .must(QueryBuilders.matchQuery("customList.secondLevel",secondLevel)),
                            ScoreMode.None);
    
                    shouldQuery.should(nestedQueryBuilder);
                }
            }**/
    
            // 地区定位
            /**if(StringUtils.isNotBlank(areaCode)) {
                // nested 嵌套对象查询
                NestedQueryBuilder nestedQueryBuilder = QueryBuilders.nestedQuery("customList",
                        QueryBuilders.boolQuery().must(QueryBuilders.matchQuery("customList.customId.keyword", "1"))
                                .must(QueryBuilders.matchQuery("customList.firstLevel",areaCode)),
                        ScoreMode.None);
    
                shouldQuery.should(nestedQueryBuilder);
            }**/
    
            BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery()
                    .must(shouldQuery)
                    .must(QueryBuilders.matchQuery("isDelete","0"));
    
    
            //boolQueryBuilder.mustNot(QueryBuilders.termsQuery("id",articleItemId));
    
            
    		//List<String> customIdList = new ArrayList();
    		//if(customIdList!=null && customIdList.size()>0) {
    		//	boolQueryBuilder.mustNot(QueryBuilders.termsQuery("id",customIdList));
    		//}
    
            // 如果有关键词,添加关键词
            if(StringUtils.isNotBlank(key)) {
                boolQueryBuilder.must(QueryBuilders.multiMatchQuery(key,"title","summary","detail" ));
            }
    
            //定义高亮查询
            HighlightBuilder highlightBuilder = new HighlightBuilder();
            //设置需要高亮的字段
            highlightBuilder.field("title")
                    // 设置前缀、后缀
                    .preTags("<font color='#ee1a1a'>")
                    .postTags("</font>");
            searchSourceBuilder.query(boolQueryBuilder);
            searchSourceBuilder.highlighter(highlightBuilder);
            // 分页
            searchSourceBuilder.from(offset);
            searchSourceBuilder.size(pageSize);
            // 按发布时间降序排序
            searchSourceBuilder.sort("publishDate", SortOrder.DESC);
    
            searchRequest.source(searchSourceBuilder);
    
            SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
            long total = searchResponse.getHits().getTotalHits().value;
            List<Map<String, Object>> list = Lists.newArrayList();
            
    
            //遍历高亮结果
            for (SearchHit hit : searchResponse.getHits().getHits()) {
                Map<String, HighlightField> highlightFields = hit.getHighlightFields();
                HighlightField nameHighlight = highlightFields.get("title");
                Map<String, Object> sourceAsMap = hit.getSourceAsMap();
                // 拼接,覆盖原有值
                if (nameHighlight != null) {
                    Text[] fragments = nameHighlight.getFragments();
                    String title = "";
                    for (Text text : fragments) {
                        title += text;
                    }
                    sourceAsMap.put("title", title);
                }
                // 初始值
                sourceAsMap.put("isRead","0");
                list.add(sourceAsMap);
            }
    
            // 构造返回数据
            Map<String,Object> retMap = new HashMap<>();
            retMap.put("total",total);
            retMap.put("dataList",list);
    
            return ResultBody.ok().data(retMap);
        }
    

    自动补全

    public ResultBody getSearchSuggest(@RequestParam(value = "key") String key) throws IOException {
    
            if(StringUtils.isBlank(key)) {
                return ResultBody.ok();
            }
    
            CompletionSuggestionBuilder suggestion = SuggestBuilders
                    .completionSuggestion("title.suggest").prefix(key).size(20).skipDuplicates(true);
            SuggestBuilder suggestBuilder = new SuggestBuilder();
            suggestBuilder.addSuggestion("suggest", suggestion);
    
            // source builder
            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
            sourceBuilder.suggest(suggestBuilder);
    
            SearchRequest searchRequest = new SearchRequest("app-article-link"); //索引
            searchRequest.source(sourceBuilder);
    
            SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
            Suggest suggest = response.getSuggest();
    
            Set<String> keywords = null;
            if (suggest != null) {
                keywords = new HashSet<>();
                List<? extends Suggest.Suggestion.Entry<? extends Suggest.Suggestion.Entry.Option>> entries = suggest.getSuggestion("suggest").getEntries();
    
                for (Suggest.Suggestion.Entry<? extends Suggest.Suggestion.Entry.Option> entry: entries) {
                    for (Suggest.Suggestion.Entry.Option option: entry.getOptions()) {
                        // 最多返回10个推荐,每个长度最大为50
                        String keyword = option.getText().string();
                        if (!StringUtils.isEmpty(keyword) && keyword.length() <= 50) {
                            // 去除输入字段
                            if (keyword.equals(key)) continue;
                            keywords.add(keyword);
                            if (keywords.size() >= 10) {
                                break;
                            }
                        }
                    }
                }
            }
            return ResultBody.ok().data(keywords);
        }
    

    欢迎交流!!!

    充满鲜花的世界到底在哪里
  • 相关阅读:
    Django基础之cookie
    Django基础之redirect()
    Django基础之render()
    Django基础之JsonResponse对象
    Django基础之response对象
    scrapy框架自定制命令
    数据分析案例-拉勾网招聘信息
    爬虫之单线程多任务异步抓取
    数据分析之matplotlib使用
    数据分析之numpy使用
  • 原文地址:https://www.cnblogs.com/aliases/p/15140759.html
Copyright © 2011-2022 走看看