zoukankan      html  css  js  c++  java
  • Filebeat和pipleline processor-不部署logstash,实现对数据的处理

    利用ingest node所提供的Pipeline帮我们对数据进行处理。

    在Elasticsearch中的配置文件elasticsearch.yml文件中配置:node.ingest: true
    ingest node提供了在对文档建立索引之前对其进行预处理的功能:

    • 接收节点拦截索引或批量API请求
    • 运用转换(transformation)
    • 将文档传递回索引或批量API

    什么是pipeline呢?

    • 一个pipleline就是一套处理器:
    • 一个processor就像是Logstash里的一个filter拥有对通过管道(pipeline)的文档的读写权限.

    那么Elastic到底提供了哪些processor呢?我们可以参阅Elastic的官方文档,我们可以看到许多的pocessors可以被利用。
    地址:https://www.elastic.co/guide/en/elasticsearch/reference/7.5/ingest-processors.html

    定义一个Pipleline

    使用PUT命令配合Ingest API来操作。它是存在于cluster state里的。

    PUT _ingest/pipeline/my-pipeline-id
    {
      "description": "DESCRIPTION",
      "processors": [
        {
          ...
        }
      ],
      "on_failure": [
        {
          ...
        }
      ]
    }
    

    这里my-pipleline-id是我们自己命令的在该cluster唯一标识是的pipleline ID。在里面,我们可以定义我们喜欢的processors数组。在处理
    失败后,我们也可以定义相应的processors来完成。

    例子

    来使用Filebeat来读取一个log文件,并使用processors对这个log的数据进行处理。
    log文件中每一条的数据是这样的格式:

    20.168.183.41 - - [11/Sep/2019:00:00:05 +0000] "GET /category/health HTTP/1.1" 200 132 "/item/software/623" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.7 (KHTML, like Gecko) Chrome/16.0.912.77 Safari/535.7"
    

    配置Filebeat

    创建一个叫做filebeat_processor.yml文件:

    filebeat.inputs:
    - type: log
      enabled: true
      fields:
        apache: true
      paths:
        - /data/nginx-access.log # 根据实际情况而定
     
    output.elasticsearch:
        hosts: ["localhost:9200"]
        pipeline: "my_pipeline_id" # 下一步创建的pipeline ID
    

    使用了一个叫做my_pipleline_id的pipeline。它的定义如下:

    PUT _ingest/pipeline/my_pipeline_id
    {
      "description": "Drop ECS field and add one new field",
      "processors": [
        {
          "remove": {
            "field": "ecs"
          },
          "set": {
            "field": "added_field",
            "value": 0
          }
        }
      ]
    }
    

    定义了两个processor: remove及set。一个是删除一个叫做ecs的项,另外一个是添加一个叫做added_field的项,并把它的值设置为0。

    在正常的情况下,如果在我们的配置文件中没有定义那个pipleline的情况下,那么他们的结果是:

    {
    "_index" : "filebeat-7.3.0-2019.09.11-000001",
    "_type" : "_doc",
    "_id" : "637VIG0BJD_DqHjgqvC5",
    "_score" : 1.0,
    "_source" : {
      "@timestamp" : "2019-09-11T14:58:55.902Z",
      "message" : """144.228.123.71 - - [11/Sep/2019:01:52:35 +0000] "GET /category/games HTTP/1.1" 200 117 "/search/?c=Books+Software" "Mozilla/5.0 (Windows NT 6.0) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11"""",
      "input" : {
        "type" : "log"
      },
      "fields" : {
        "apache" : true
      },
      "ecs" : {
        "version" : "1.0.1"
      },
      "host" : {
        "name" : "localhost"
      },
      "agent" : {
        "hostname" : "localhost",
        "id" : "c88813ba-fdea-4a98-a0be-468fb53566f3",
        "version" : "7.3.0",
        "type" : "filebeat",
        "ephemeral_id" : "ec3328d6-f7f0-4134-a2b6-8ff0c5141cc5"
      },
      "log" : {
        "offset" : 300352,
        "file" : {
          "path" : "/data/nginx-access.log"
        }
      }
    }
    }
    

    运行Filebeat

    在Filebeat的安装目录,运行如下的命令:
    ./filebeat -c filebeat_processor.yml

    查看效果

    在Kibana中可以通过如下的命令来查看,
    GET _cat/indices?v
    看到了一个已经生产的以filebeat为开头的文件名。我们可以通过如下的命令来查看它的数据:
    GET filebeat-7.4.2/_search

    其中的一个文档的soure是这样的:

    "_source" : {
      "agent" : {
        "hostname" : "localhost",
        "id" : "45832d40-b664-466b-a523-3bc58890ea50",
        "type" : "filebeat",
        "ephemeral_id" : "dbbba131-9c33-4e82-a00a-9e8e09d3e799",
        "version" : "7.4.2"
      },
      "log" : {
        "file" : {
          "path" : "/data/nginx-access.log"
        },
        "offset" : 11497
      },
      "message" : """164.51.31.185 - - [11/Sep/2019:00:04:15 +0000] "GET /item/giftcards/232 HTTP/1.1" 200 130 "/category/electronics" "Mozilla/5.0 (Windows NT 6.0) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11"""",
      "input" : {
        "type" : "log"
      },
      "@timestamp" : "2019-11-23T13:11:57.478Z",
      "host" : {
        "name" : "localhost"
      },
      "fields" : {
        "apache" : true
      },
      "added_field" : 0
    }
    

    显然ecs这个field已经不见了,而另外一个叫做added_field新的field被成功添加进来了。这个说明我们的pipleline是起作用的。

  • 相关阅读:
    Helpers Overview
    Validation
    Support Facades
    Session Store
    位运算(参考百科)
    开源项目_可能使用到的开源项目集合
    秒杀系统架构分析与实战(转)
    shell命令之根据字符串查询文件对应行记录
    MySQL做为手动开启事务用法
    spring 加载bean过程源码简易解剖(转载)
  • 原文地址:https://www.cnblogs.com/sanduzxcvbnm/p/12076305.html
Copyright © 2011-2022 走看看