zoukankan      html  css  js  c++  java
  • 小姐姐教你定制一个Logstash Java Filter

    Logstash是用来收集数据,解析处理数据,最终输出数据到存储组件的处理引擎。数据处理流程为:

    Logstash Java Filter 就是基于Logstash的Filter扩展API开发一个用Java语言实现的Filter,然后将Filter代码打包构建到自己服务器上的Logstash Filter lib中。就可以在数据流转配置文件中(也就是logstash -f 指定的配置文件)使用这个定制的Logstash Java Filter了。

    定制步骤包括以下五步:

    1.准备Logstash环境

    因为Logstash Java Filter需要依赖Logstash的API,我们需要将Logstash源码下载下来并构建
    1.1.下载logstash源码

    git clone --branch <branch_name> --single-branch https://github.com/elastic/logstash.git <target_folder>
    

    其中<branch_name>需替换为你想要使用的logstash版本,使用7.1之后的GA版本就可以。<target_folder>需替换为你想要下载到的logstash代码父目录,不指定的话就下载到当前目录的logstash文件夹下。我这里使用的是7.6版本:

    git clone --branch 7.6  --single-branch https://github.com/elastic/logstash.git 
    

    1.2.构建logstash源码
    进入到当前目录的logstash目录(也就是logstash源码目录,后续称为:$LS_HOME)下,执行

    ./gradlew assemble
    

    如果是Windows系统的话执行gradlew.bat assemble

    这一步要等很久很久,如果下载不下来可以试着添加gradle的国内镜像。
    vim $LS_HOME/build.gradle,然后在文件中添加

        repositories {
               maven { url 'https://maven.aliyun.com/repository/google/' }
               maven { url 'https://maven.aliyun.com/repository/jcenter/'}
               mavenCentral()
       
               maven {
                   url 'https://plugins.gradle.org/m2/'
               }
          }
    

    构建成功后检查在$LS_HOME/logstash-core/build/libs/目录下是否生成logstash-core-x.y.z.jar。其中x,y,z是你下载的logstash版本号。我的就是

    /Users/xx/corprepo/logstash/logstash-core/build/libs/logstash-core-7.6.3.jar
    

    2.编写Logstash Java Filter代码

    2.1.下载官方demo
    官方提供了一个demo,我们可以下载下来基于这个demo做修改。

    2.2.指定LOGSTASH_CORE_PATH
    下载下来demo后,在项目根目录创建gradle.properties文件,

    添加一行数据:

    LOGSTASH_CORE_PATH=<target_folder>/logstash-core
    

    2.3.开发Filter代码
    我们需要继承Logstash的Filter API实现我们自己的Java Filter功能。一个实现好的Filter如下:

    import co.elastic.logstash.api.Configuration;
    import co.elastic.logstash.api.Context;
    import co.elastic.logstash.api.Event;
    import co.elastic.logstash.api.Filter;
    import co.elastic.logstash.api.FilterMatchListener;
    import co.elastic.logstash.api.LogstashPlugin;
    import co.elastic.logstash.api.PluginConfigSpec;
    import org.apache.commons.lang3.StringUtils;
    
    import java.util.Collection;
    import java.util.Collections;
    //类名必须按照驼峰命名匹配这个下划线注解名,JavaFilterExample -> java_filter_example
    @LogstashPlugin(name = "java_filter_example")
    public class JavaFilterExample implements Filter {
        //定义一个该Filter支持的setting配置。名字是source,默认值为message
        //可从filter方法中看出是拿 SOURCE_CONFIG 的value值做field 的名称使用的
        public static final PluginConfigSpec<String> SOURCE_CONFIG =
                PluginConfigSpec.stringSetting("source", "message");
    
        private String id;
        private String sourceField;
    
        public JavaFilterExample(String id, Configuration config, Context context) {
            // constructors should validate configuration options
            this.id = id;
            this.sourceField = config.get(SOURCE_CONFIG);
        }
    
        /**
         * 该Filter的过滤逻辑,可以对输入的event数据做各种CRUD操作
         * @param events
         * @param matchListener
         * @return 最终流转到下一个pipeline的数据,如果有符合条件的event必须返回
         */
        @Override
        public Collection<Event> filter(Collection<Event> events, FilterMatchListener matchListener) {
            for (Event e : events) {
                Object f = e.getField(sourceField);
                if (f instanceof String) {
                    e.setField(sourceField, StringUtils.reverse((String)f));
                    matchListener.filterMatched(e);
                }
            }
            return events;
        }
        /**
         *
         * @return 返回该Filter支持的所有setting配置
         */
        @Override
        public Collection<PluginConfigSpec<?>> configSchema() {
            // should return a list of all configuration options for this plugin
            return Collections.singletonList(SOURCE_CONFIG);
        }
    
        /**
         *
         * @return 该Filter的ID,Logstash会帮我们赋值
         */
        @Override
        public String getId() {
            return this.id;
        }
    }
    

    其中需要注意两点:

    • @LogstashPlugin注解的name必须和类名高度保持一致。如java_filter_example-> JavaFilterExample(我特么反正是被坑了。。)
    • 需要实现co.elastic.logstash.api.Filter类,如果你import不成功,那就是gradle.properties配置不成功 或者 构建logstash源码不成功。重写其三个方法:

    getId方法
    返回该Filter的ID,Logstash会帮我们赋值。我们只需要定义一个成员变量构造方法中赋值进去就好了。

    configSchema方法
    返回该Filter支持的所有setting配置集合。PluginConfigSpec定义的setting配置就是我们在logstash的配置文件中使用该Filter时,可以传的参数,如在使用grok Filter时传进去的patterns_dirmatch

    filter {
          grok {
            patterns_dir => ["./patterns"]
            match => { "message" => "%{SYSLOGBASE} %{POSTFIX_QUEUEID:queue_id}: %{GREEDYDATA:syslog_message}" }
          }
        }
    

    这个setting配置PluginConfigSpec支持的配置参数有name, type, deprecation status, required status, 和 default value

    在我们的Filter类中我们定义了PluginConfigSpec<String> SOURCE_CONFIG = PluginConfigSpec.stringSetting("source", "message");其中name=source, default value= message

    filter方法
    过滤器当然要干过滤逻辑的事了。其中入参Collection<Event> events是我们要处理的输入过来的数据,我们可以针对逻辑做一些CURD操作。入参FilterMatchListener matchListener是该 Filter将满足自己逻辑的event数据通知给matchListener. 如Logstash中matchListener的实现为DecoratingFilterMatchListener。它能做的操作比如有ADD_FIELD
    同样需要我们先定义PluginConfigSpec,然后在使用该Filter时配置add_field参数。如grok Filter就支持该参数和该DecoratingFilterMatchListener

     filter {
          grok {
            add_field => { "foo_%{somefield}" => "Hello world, from %{host}" }
          }
        }
    

    没有通知matchListener的需求时就不用调用matchListener.filterMatched(e)了。

    3.单元测试

    demo里面也有测试类,run一下就完了。。

    4.打包部署Filter

    我们需要使用gradle将我们的Filter项目达成ruby gem包,所以最好一定要基于demo项目中的gradle配置文件修改。

    4.1.配置gradle打包任务
    编辑项目根路径下的build.gradle文件

    plugin info部分是我们Filter的信息,其中需要修改的特别注意点我已经用TODO标示出来了。
    4.2.运行gradle打包任务
    在项目根目录下执行

    ./gradlew gem
    

    Windows系统执行gradlew.bat gem

    执行成功之后会看到在根目录下生成logstash-{plugintype}-<pluginName>-<version>.gem文件

    4.3.到Logstash中安装filter gem包
    到logstash目录($LS_HOME)下执行

    bin/logstash-plugin install --no-verify --local /path/to/javaPlugin.gem
    

    其中 /path/to/javaPlugin.gem就是我们4.2步骤中生成的gem绝对路径。成功可以看到

    5.使用我们的Java Filter运行Logstash

    5.1.在$LS_HOME/config目录下创建logstash运行配置文件java_filter.conf

    input {
      generator { message => "Hello world!" count => 1 }
    }
    filter {
    # java_filter_example:我们的filter中@LogstashPlugin注解的name
      java_filter_example {}
    }
    output {
      stdout { codec => rubydebug }
    }
    

    5.2.启动Logstash
    在$LS_HOME运行

    bin/logstash -f  config/java_filter.conf
    

    至此就成功啦~

    {
           "message" => "!dlrow olleH",
          "sequence" => 0,
          "@version" => "1",
              "host" => "xxdeMacBook-Pro.local",
        "@timestamp" => 2020-04-12T13:15:30.376Z
    }
    

    参考官方文档:https://www.elastic.co/guide/en/logstash/7.6/java-filter-plugin.html

    感谢您的阅读,我是Monica23334 || Monica2333 。立下每周写一篇原创文章flag的小姐姐,关注我并期待打脸吧~

  • 相关阅读:
    HDU 2639 Bone Collector II (01背包,第k解)
    POJ 2184 Cow Exhibition 奶牛展(01背包,变形)
    hihoCoder #1165 : 益智游戏 (挑战赛11 B题)
    UVA 562 Dividing coins 分硬币(01背包,简单变形)
    POJ Charm Bracelet 挑饰品 (常规01背包)
    hiho一下 第四十四周 博弈游戏·Nim游戏(直接公式解)
    UVA 624 CD(01背包,要记录路径)
    118 Pascal's Triangle 帕斯卡三角形 杨辉三角形
    117 Populating Next Right Pointers in Each Node II 每个节点的右向指针 II
    116 Populating Next Right Pointers in Each Node 每个节点的右向指针
  • 原文地址:https://www.cnblogs.com/gxm2333/p/12732340.html
Copyright © 2011-2022 走看看