zoukankan      html  css  js  c++  java
  • Logstash:使用 aggregate filter 处理 N:N 关系

    使用Logstash从mysql同步用户和用户所有的宠物到ES中。

    "register_name": "孟林洁",
        "id": 80469531,
        "pets": [
          {
            "breed_name": "万能梗",
            "birthday": null,
            "pet_id": 999044,
            "name": "一只狗",
            "images": "{"result":["https://petkit-img3.oss-cn-hangzhou.aliyuncs.com/img/tmp_6f4c8e92de0c53ab355fdb69214d4bf3.jpg"]}",
            "breed_id": 130
          },
          {
            "breed_name": "万能梗",
            "birthday": null,
            "pet_id": 999097,
            "name": "一只狗2",
            "images": "{"result":["https://petkit-img3.oss-cn-hangzhou.aliyuncs.com/img/tmp_6f4c8e92de0c53ab355fdb69214d4bf3.jpg"]}",
            "breed_id": 130
          }
        ],
        "mobile": "*******",
        "avatar": null,
        "pet_list": [
          999044,
          999097
        ]

    问题:

    1. logstash同步nested嵌套类型到ES中。
    2. logstash同步嵌套数组对象时,聚合过程中数据丢失(用户宠物会随机丢失,偶而数据不丢失)。
    3. logstash同步时少同步一条数据,在停止logstash服务时才进行同步
    4. (更新) mysql的多条数据同步到es只有一条

    解决:

    1、解决logstash同步nested嵌套类型到ES中

    先创建索引,并且修改索引类型为nested

    创建索引:PUT /user
    修改索引映射:
    PUT /user/_mapping/doc
    {
        "doc": {
          "properties": {
            "avatar": {
              "type": "text"
            },
            "id": {
              "type": "long"
            },
            "mobile": {
              "type": "text"
            },
            "pets": {
              "type": "nested",
              "properties": {
                "birthday": {
                  "type": "date"
                },
                "breed_id": {
                  "type": "long"
                },
                "breed_name": {
                  "type": "text",
                  "analyzer": "ik_max_word",
                  "search_analyzer": "ik_max_word"
                },
                "images": {
                  "type": "text"
                },
                "name": {
                  "type": "text",
                  "analyzer": "ik_max_word",
                  "search_analyzer": "ik_max_word"
                },
                "pet_id": {
                  "type": "long"
                }
              }
            },
            "register_name": {
              "type": "text",
              "analyzer": "ik_max_word",
              "search_analyzer": "ik_max_word"
            }
          }
        }
    }

    使用logstash的过滤器中aggregate插件进行数据聚合。

    配置文件jdbc.conf

    input {
        stdin {}
        jdbc {
            jdbc_driver_library => "../mysql-connector-java-6.0.6.jar"
            jdbc_driver_class => "com.mysql.jdbc.Driver"
            jdbc_connection_string => "jdbc:mysql://****.com:3306/food-dev"
            jdbc_user => "****"
            jdbc_password => "****"
            #jdbc_paging_enabled => "true"
            #jdbc_page_size => "50"
            clean_run => true
            use_column_value => true
            record_last_run => "true"
            tracking_column => "id"
            schedule => "*/1 * * * *"
            #last_run_metadata_path => "/Users/menglinjie/ES-node/testdata.text"
            statement => "select u.id,u.register_name,u.mobile,u.avatar,u.status,svp.id  as pet_id,svp.name,svp.images,svp.gender,svp.birthday,pb.id as breed_id,pb.name as breed_name from user u left join store_vip_pet svp on svp.user_id = u.id and svp.pet_status = 1 left join pet_breed pb on svp.breed_id = pb.id order by u.id desc"
       }
     
    }
     
    filter {
    #这里做聚合
         aggregate {
            task_id => "%{id}"
            code => "
                map['id'] = event.get('id')
                map['register_name'] = event.get('register_name')
                map['mobile'] = event.get('mobile')
                map['avatar'] = event.get('avatar')
                map['pet_list'] ||=[]
                map['pets'] ||=[]
                if (event.get('pet_id') != nil)
                    if !(map['pet_list'].include? event.get('pet_id'))  
                        map['pet_list'] << event.get('pet_id')        
                        map['pets'] << {
                            'pet_id' => event.get('pet_id'),
                            'name' => event.get('name'),
                            'images' => event.get('images'),
                            'breed_id' => event.get('breed_id'),
                            'breed_name' => event.get('breed_name'),
                            'birthday' => event.get('birthday')
                        }
                    end
                end
                event.cancel()
            "
            
            push_previous_map_as_event => true
            timeout => 5
        }
        json {
            source => "message"
            remove_field => ["message"]
            #remove_field => ["message", "type", "@timestamp", "@version"]
        }
        mutate  {
            #将不需要的JSON字段过滤,且不会被存入 ES 中
            remove_field => ["tags", "@timestamp", "@version"]
        }
    }
     
    output {
       stdout {
            #codec => json_lines
       }
            elasticsearch {
            hosts => ["127.0.0.1:9200"]
            index => "user"
            document_id => "%{id}"
       }
    }

    2、解决聚合过程中子数组对象丢失

    问题定位:多线程跑聚合过程中,同一个用户的多个宠物可能被分配到不通过的线程,分别做不同的聚合,导致一个用户存在多条数据,分别拥有不同的宠物,然后多线程的进行输出到ES,ES保存过程中会把存在的数据给更新掉,这就是我的宠物丢失的原因,多线程分配的随机性导致数据也随机丢失。

    验证后确认猜想正确。

    回想刚才把线程数设置为1,这样肯定会影响性能的吧,万一以后我有不需要聚合的的数据时完全可以多线程跑。Logstash提供的pipelines.yml可以配置多管道,使不同的同步任务绑定不同管道配置。

    这里pipeline.workers: 4,pipeline.output.workers: 3,那么执行聚合的filter就是1,这样可以单线程聚合,多线程输出。

    多个任务可以配置多个管道,pipeline.id标示管道唯一性。

    - pipeline.id: user_pipeline
      pipeline.workers: 4
      pipeline.batch.size: 1000
      # 输出
      pipeline.output.workers: 3
      # 配置文件位置
      path.config: "/Users/menglinjie/ES-node/logstash-6.3.1/conf.d/*.conf"
      # 对基于磁盘的排队进行“持久化”。默认值是内存
      queue.type: persisted

    更新:

    影响聚合结果的还有sql语句!! 

    sql语句必须根据聚合task_id排序,也就是需要聚合的数据必须排在一起。否则map['pets']会被覆盖掉,导致数据丢失。

    3、logstash同步时少同步一条数据,在停止logstash服务时才进行同步

    在filter 聚合配置中添加:

    timeout => 3

    filter aggregate 创建中 event map 并不知道我、这次事件是不是应该结束,也就是它也不知道到那一条才是最后一条, 因此设置一个 timeout 告诉它这个时间执行多少秒就结束继续执行第二个。但这样并不是很严谨,因为你也不确定你的 event map 到底要执行多久 。最好的方式是 我们应该给定一个 task end 的条件 ES官网关于 aggregate 的说明

    4、es 配置id的问题,必须有唯一性,否则被覆盖

    参考链接:

    https://segmentfault.com/a/1190000016592277

    https://segmentfault.com/q/1010000016861266

    https://blog.csdn.net/weixin_33910460/article/details/88719101

    https://elasticsearch.cn/question/6648

    https://elasticstack.blog.csdn.net/article/details/108221821

  • 相关阅读:
    jQuery火箭图标返回顶部代码
    jQuery火箭图标返回顶部代码
    jQuery火箭图标返回顶部代码
    jQuery火箭图标返回顶部代码
    jQuery火箭图标返回顶部代码
    jQuery火箭图标返回顶部代码
    jQuery火箭图标返回顶部代码
    jQuery火箭图标返回顶部代码
    jQuery火箭图标返回顶部代码
    jQuery火箭图标返回顶部代码
  • 原文地址:https://www.cnblogs.com/fat-girl-spring/p/14304238.html
Copyright © 2011-2022 走看看