zoukankan      html  css  js  c++  java
  • 【NIFI】 开发自定义Nifi Processor

      本例需要基础知识:【NIFI】 Apache NiFI 安装及简单的使用

      Nifi不光可以使用自带的Processor,还可以自定义Processor。本例简单介绍开发一个Processor

    开发

      1、新建一个Maven工程,这里采用的是eclipse的模板原型来创建。

        a、创建

        

        b、添加模板,内容:

        • Archetype Group Id:org.apache.nifi
        • Archetype Artifact Id:nifi-processor-bundle-archetype
        • Archetype Version:1.2.0

        

        c、根据模板创建,maven项目

        

      2、创建后,项目目录如下:

        

        其中3个pom文件如下

     1 <?xml version="1.0" encoding="UTF-8"?>
     2 <!--
     3   Licensed to the Apache Software Foundation (ASF) under one or more
     4   contributor license agreements. See the NOTICE file distributed with
     5   this work for additional information regarding copyright ownership.
     6   The ASF licenses this file to You under the Apache License, Version 2.0
     7   (the "License"); you may not use this file except in compliance with
     8   the License. You may obtain a copy of the License at
     9   http://www.apache.org/licenses/LICENSE-2.0
    10   Unless required by applicable law or agreed to in writing, software
    11   distributed under the License is distributed on an "AS IS" BASIS,
    12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13   See the License for the specific language governing permissions and
    14   limitations under the License.
    15 -->
    16 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    17     <modelVersion>4.0.0</modelVersion>
    18 
    19     <parent>
    20         <groupId>org.apache.nifi</groupId>
    21         <artifactId>nifi-nar-bundles</artifactId>
    22         <version>1.2.0</version>
    23     </parent>
    24 
    25     <groupId>com.test</groupId>
    26     <artifactId>my-processor</artifactId>
    27     <version>1.0-SNAPSHOT</version>
    28     <packaging>pom</packaging>
    29 
    30     <modules>
    31         <module>nifi-my-processor-processors</module>
    32         <module>nifi-my-processor-nar</module>
    33     </modules>
    34     
    35 </project>
    my-processor pom.xml 

      

     1 <?xml version="1.0" encoding="UTF-8"?>
     2 <!--
     3   Licensed to the Apache Software Foundation (ASF) under one or more
     4   contributor license agreements. See the NOTICE file distributed with
     5   this work for additional information regarding copyright ownership.
     6   The ASF licenses this file to You under the Apache License, Version 2.0
     7   (the "License"); you may not use this file except in compliance with
     8   the License. You may obtain a copy of the License at
     9   http://www.apache.org/licenses/LICENSE-2.0
    10   Unless required by applicable law or agreed to in writing, software
    11   distributed under the License is distributed on an "AS IS" BASIS,
    12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13   See the License for the specific language governing permissions and
    14   limitations under the License.
    15 -->
    16 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    17     <modelVersion>4.0.0</modelVersion>
    18 
    19     <parent>
    20         <groupId>com.test</groupId>
    21         <artifactId>my-processor</artifactId>
    22         <version>1.0-SNAPSHOT</version>
    23     </parent>
    24 
    25     <artifactId>nifi-my-processor-nar</artifactId>
    26     <packaging>nar</packaging>
    27     <properties>
    28         <maven.javadoc.skip>true</maven.javadoc.skip>
    29         <source.skip>true</source.skip>
    30     </properties>
    31 
    32     <dependencies>
    33         <dependency>
    34             <groupId>com.test</groupId>
    35             <artifactId>nifi-my-processor-processors</artifactId>
    36             <version>1.0-SNAPSHOT</version>
    37         </dependency>
    38     </dependencies>
    39 
    40 </project>
    nifi-my-processor-nar pom.xml
     1 <?xml version="1.0" encoding="UTF-8"?>
     2 <!--
     3   Licensed to the Apache Software Foundation (ASF) under one or more
     4   contributor license agreements. See the NOTICE file distributed with
     5   this work for additional information regarding copyright ownership.
     6   The ASF licenses this file to You under the Apache License, Version 2.0
     7   (the "License"); you may not use this file except in compliance with
     8   the License. You may obtain a copy of the License at
     9   http://www.apache.org/licenses/LICENSE-2.0
    10   Unless required by applicable law or agreed to in writing, software
    11   distributed under the License is distributed on an "AS IS" BASIS,
    12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13   See the License for the specific language governing permissions and
    14   limitations under the License.
    15 -->
    16 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    17     <modelVersion>4.0.0</modelVersion>
    18 
    19     <parent>
    20         <groupId>com.test</groupId>
    21         <artifactId>my-processor</artifactId>
    22         <version>1.0-SNAPSHOT</version>
    23     </parent>
    24 
    25     <artifactId>nifi-my-processor-processors</artifactId>
    26     <packaging>jar</packaging>
    27 
    28     <dependencies>
    29         <dependency>
    30             <groupId>org.apache.nifi</groupId>
    31             <artifactId>nifi-api</artifactId>
    32         </dependency>
    33         <dependency>
    34             <groupId>org.apache.nifi</groupId>
    35             <artifactId>nifi-utils</artifactId>
    36         </dependency>
    37         <dependency>
    38             <groupId>org.apache.nifi</groupId>
    39             <artifactId>nifi-mock</artifactId>
    40             <scope>test</scope>
    41         </dependency>
    42         <dependency>
    43             <groupId>org.slf4j</groupId>
    44             <artifactId>slf4j-simple</artifactId>
    45             <scope>test</scope>
    46         </dependency>
    47         <dependency>
    48             <groupId>junit</groupId>
    49             <artifactId>junit</artifactId>
    50             <scope>test</scope>
    51         </dependency>
    52     </dependencies>
    53 </project>
    nifi-my-processor-processors pom.xml

      

      3、修改项目,因环境引起的错误

        a、删除nifi-my-processor-processors子项目中,src/test中的测试文件(打包可能出现错误)

        b、在org.apache.nifi.processor.Processor文件中配置自己的Porcessor

          

      4、代码编写,编辑MyProcessor.java文件,文件在项目创建的时候已经生成,做适当修改即可。其中有设置状态,属性,及处理方法(onTrigger)等

        

        内容:

      1 /*
      2  * Licensed to the Apache Software Foundation (ASF) under one or more
      3  * contributor license agreements.  See the NOTICE file distributed with
      4  * this work for additional information regarding copyright ownership.
      5  * The ASF licenses this file to You under the Apache License, Version 2.0
      6  * (the "License"); you may not use this file except in compliance with
      7  * the License.  You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  */
     17 package com.test.processors;
     18 
     19 import org.apache.nifi.components.PropertyDescriptor;
     20 import org.apache.nifi.flowfile.FlowFile;
     21 import org.apache.nifi.annotation.behavior.ReadsAttribute;
     22 import org.apache.nifi.annotation.behavior.ReadsAttributes;
     23 import org.apache.nifi.annotation.behavior.WritesAttribute;
     24 import org.apache.nifi.annotation.behavior.WritesAttributes;
     25 import org.apache.nifi.annotation.lifecycle.OnScheduled;
     26 import org.apache.nifi.annotation.documentation.CapabilityDescription;
     27 import org.apache.nifi.annotation.documentation.SeeAlso;
     28 import org.apache.nifi.annotation.documentation.Tags;
     29 import org.apache.nifi.processor.exception.ProcessException;
     30 import org.apache.nifi.processor.AbstractProcessor;
     31 import org.apache.nifi.processor.ProcessContext;
     32 import org.apache.nifi.processor.ProcessSession;
     33 import org.apache.nifi.processor.ProcessorInitializationContext;
     34 import org.apache.nifi.processor.Relationship;
     35 import org.apache.nifi.processor.util.StandardValidators;
     36 
     37 import java.io.InputStreamReader;
     38 import java.io.StringWriter;
     39 import java.util.ArrayList;
     40 import java.util.Collections;
     41 import java.util.HashSet;
     42 import java.util.List;
     43 import java.util.Set;
     44 import java.util.concurrent.atomic.AtomicReference;
     45 
     46 @Tags({"example"})
     47 @CapabilityDescription("Provide a description")
     48 @SeeAlso({})
     49 @ReadsAttributes({@ReadsAttribute(attribute="", description="")})
     50 @WritesAttributes({@WritesAttribute(attribute="", description="")})
     51 public class MyProcessor extends AbstractProcessor {
     52 
     53     public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor
     54             .Builder().name("MY_PROPERTY")
     55             .displayName("My property")
     56             .description("Example Property")
     57             .required(true)
     58             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
     59             .build();
     60 
     61     public static final Relationship MY_RELATIONSHIP_SUCCESS = new Relationship.Builder()
     62             .name("sucess")
     63             .description("Example relationship Success")
     64             .build();
     65     
     66     public static final Relationship MY_RELATIONSHIP_FAILURE = new Relationship.Builder()
     67             .name("failure")
     68             .description("Example relationship Failure")
     69             .build();
     70 
     71     private List<PropertyDescriptor> descriptors;
     72 
     73     private Set<Relationship> relationships;
     74 
     75     @Override
     76     protected void init(final ProcessorInitializationContext context) {
     77         final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
     78         descriptors.add(MY_PROPERTY);
     79         this.descriptors = Collections.unmodifiableList(descriptors);
     80 
     81         final Set<Relationship> relationships = new HashSet<Relationship>();
     82         relationships.add(MY_RELATIONSHIP_SUCCESS);
     83         relationships.add(MY_RELATIONSHIP_FAILURE);
     84         this.relationships = Collections.unmodifiableSet(relationships);
     85     }
     86 
     87     @Override
     88     public Set<Relationship> getRelationships() {
     89         return this.relationships;
     90     }
     91 
     92     @Override
     93     public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
     94         return descriptors;
     95     }
     96 
     97     @OnScheduled
     98     public void onScheduled(final ProcessContext context) {
     99 
    100     }
    101 
    102     @Override
    103     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    104         FlowFile flowFile = session.get();
    105         if ( flowFile == null ) {
    106             return;
    107         }
    108         // TODO implement
    109         final AtomicReference<String> value = new AtomicReference<>();
    110         session.read(flowFile, in -> {
    111             try{
    112                 StringWriter sw = new StringWriter();
    113                 InputStreamReader inr = new InputStreamReader(in);
    114                 char[] buffer = new char[1024];
    115                 int n = 0;
    116                 while (-1 != (n = inr.read(buffer))) {
    117                     sw.write(buffer, 0, n);
    118                 }
    119                 String str = sw.toString();
    120                 
    121                 String result = "处理了:" + str + context.getProperty("MY_PROPERTY").getValue();
    122                 value.set(result);
    123             }catch(Exception ex){
    124                 ex.printStackTrace();
    125                 getLogger().error("Failed to read json string.");
    126             }
    127         });
    128 
    129         String results = value.get();
    130         if(results != null && !results.isEmpty()){
    131             flowFile = session.putAttribute(flowFile, "match", results);
    132         }
    133 
    134         flowFile = session.write(flowFile, out -> out.write(value.get().getBytes()));
    135 
    136         session.transfer(flowFile, MY_RELATIONSHIP_SUCCESS);
    137         
    138     }
    139 }

      5、打包,使用maven命令:mvn clean package

      6、将nifi-my-processor-nar工程target目录中的 nifi-my-processor-nar-1.0-SNAPSHOT.nar 文件,拷贝到 nifilib 目录中 

      7、启动 NIFI 项目,使用自定义的Process:MyProcessor

        配置如下:

          a、拉入三个Processor

            

          b、配置三个Processor,

            下图是GenerateFlowFile的配置,主要配置了执行的时间(10s)及产生的字符串(123)

              

            下图是MyProcessor配置

            

          c、启动三个Processor

          d、查看输出,可以看到字符串 123 经过处理成 : "处理了:123abc"

            

            

        

  • 相关阅读:
    ZTree id值太大,ZTree没有生成树,ZTree的id值过大
    Spring NamedParameterJdbcTemplate命名参数查询条件封装, NamedParameterJdbcTemplate查询封装
    Linux Redis 重启数据丢失解决方案,Linux重启后Redis数据丢失解决方
    Linux Redis自动启动,Redis开机启动,Linux Redis设置开机启动
    Linux Redis安装,Linux如何安装Redis,Linux Redis自动启动,Redis开机启动
    springJdbc in 查询,Spring namedParameterJdbcTemplate in查询
    Sublime的插件Color Highlighter的安装方法
    Sublime的Package Control安装方法
    JavaScript实现最简单的拖拽效果
    css抠图之background-position-背景定位
  • 原文地址:https://www.cnblogs.com/h--d/p/10090692.html
Copyright © 2011-2022 走看看