zoukankan      html  css  js  c++  java
  • Logstash同步MySQL关联表到Elasticsearch的嵌套文档中

    前言:

             上一篇实践了通过Logstash同步MySQL的几张关联表到Elasticsearch中。为了实现同一种业务需求,嵌套文档在资源开销和查询速度上要优于父子文档(针对少量数据的情况)。所以以下就实践一下嵌套文档的基本使用和,以及Logstash如何同步一对多关系表到ElasticSearch的嵌套文档中。

    RESTful模拟:

           以下以博客内容和博客评论为例,从映射创建,到增,删,改,查,聚合演示嵌套文档的使用方法,索引名 “blog_new”。

    1. 创建映射

    PUT blog_new
    {
     "mappings": {
        "properties": {
            "title": {
              "type": "text"
            },
            "body": {
              "type": "text"
            },
            "tags": {
              "type": "keyword"
            },
            "published_on": {
              "type": "keyword"
            },
            "comments": {
              "type": "nested",
              "properties": {
                "name": {
                  "type": "text"
                },
                "comment": {
                  "type": "text"
                },
                "age": {
                  "type": "short"
                },
                "rating": {
                  "type": "short"
                },
                "commented_on": {
                  "type": "text"
                }
              }
            }
          }
      }
    }

    2. 添加

    POST blog_new/blog/2
    {
      "title": "Hero",
      "body": "Hero test body...",
      "tags": ["Heros", "happy"],
      "published_on": "6 Oct 2018",
      "comments": [
        {
          "name": "steve",
          "age": 24,
          "rating": 18,
          "comment": "Nice article..",
          "commented_on": "3 Nov 2018"
        }
      ]
    }

    3.  删除

    POST  blog_new/blog/1/_update
    {
     "script": {
        "lang": "painless",
        "source": "ctx._source.comments.removeIf(it -> it.name == 'John');"
     }
    }

    4. 修改

    POST blog_new/blog/2/_update
    {
      "script": {
        "source": "for(e in ctx._source.comments){if (e.name == 'steve') {e.age = 25; e.comment= 'very very good article...';}}" 
      }
    }

    5. 查询

    GET /blog_new/_search?pretty
    {
      "query": {
        "bool": {
          "must": [
            {
              "nested": {
                "path": "comments",
                "query": {
                  "bool": {
                    "must": [
                      {
                        "match": {
                          "comments.name": "William"
                        }
                      },
                      {
                        "match": {
                          "comments.age": 34
                        }
                      }
                    ]
                  }
                }
              }
            }
          ]
        }
      }
    }

    6. 聚合

    GET blog_new/_search
    {
      "size": 0,
      "aggs": {
        "comm_aggs": {
          "nested": {
            "path": "comments"
          },
          "aggs": {
            "min_age": {
              "min": {
                "field": "comments.age"
              }
            }
          }
        }
      }
    }

    Logstash同步:

            同步到ES的嵌套文档和前面的父子文档就有点不一样了,这里只需要一个jdbc。合并主要是通过关联查询出结果,然后聚合导入到ElasticSearch中。以下还是以博客和评论为例,创建索引映射和其他MySQL表之类的就省略,直接看运行命令。

    1. 创建嵌套文档索引和映射

    可以用上面RESTful方式的映射创建进行修改,主要的是嵌套的类型是nested,执行配置前运行SQL查询效果如下。

    2. 配置同步代码

    
    input {
    
        stdin {}
    	
        jdbc {
            jdbc_driver_library => "E:/2setsoft/1dev/logstash-7.8.0/mysqletc/mysql-connector-java-5.1.7-bin.jar"
    		
            jdbc_driver_class => "com.mysql.jdbc.Driver"
    		
            jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/community?characterEncoding=UTF-8&useSSL=false"
    		
            jdbc_user => root
    		
            jdbc_password => "root"
    		
            schedule => "*/1 * * * *"
           
            statement => "SELECT community.id AS community_id, community.content, community.location, community.images, comment.content AS comment_content , comment.id AS comment_id FROM yiqi_comment comment LEFT JOIN yiqi_community community ON community.id = comment.community_id"
       }
     
    }
     
    filter {
    	
    	aggregate {
            task_id => "%{community_id}"
            code => "
                map['id'] = event.get('community_id')
                map['content'] = event.get('content')
                map['location'] = event.get('location')
                map['images'] = event.get('images')
                map['comment_list'] ||=[]
                map['comment'] ||=[]
                if (event.get('comment_id') != nil)
                    if !(map['comment_list'].include? event.get('comment_id'))  
                        map['comment_list'] << event.get('comment_id')        
                        map['comment'] << {
                            'comment_id' => event.get('comment_id'),
                            'content' => event.get('comment_content')
                        }
                    end
                end
                event.cancel()
            "
            
            push_previous_map_as_event => true
            timeout => 5
        }
    	
        json {
            source => "message"
            remove_field => ["message"]
            #remove_field => ["message", "type", "@timestamp", "@version"]
        }
    	
        mutate  {
            #将不需要的JSON字段过滤,且不会被存入 ES 中
            remove_field => ["tags", "@timestamp", "@version"]
        }
    	
    }
     
    output {
        stdout {
            #codec => json_lines
        }
        elasticsearch {
    		
            hosts => ["127.0.0.1:9200"]
            index => "test_nested_community_content"
            document_id => "%{id}"
       }
    }

    3. 运行命令开始同步

    bin\logstash -f mysql\mysql.conf

    4. 查询

    交流学习

    个人网站:www.zerofc.cn 公众号:ZEROFC_DEV QQ交流群:515937120 QQ:2652364582 头条号:1637769351151619 B站:286666708 大鱼号:北桥苏
  • 相关阅读:
    java四种线程池类型以及可选择的阻塞队列
    复习-java向上转型
    synchronized 加在方法和代码块底层实现区别
    synchronized 和 lock 的区别
    hashmap-put方法过程
    mybatis-防止sql注入
    synchronized-粗略过程
    消息队列-观察者模式和发布订阅模式区别
    复习-进程的调度算法
    Chocolatey
  • 原文地址:https://www.cnblogs.com/zerofc/p/15718235.html
Copyright © 2011-2022 走看看