zoukankan      html  css  js  c++  java
  • 【NIFI】 Apache NiFI 之 ExecuteScript处理(一)

       本例介绍NiFI ExecuteScript处理器的使用,使用的脚本引擎ECMScript

    FlowFile I / O简介

      NiFi中的流文件由两个主要组件构成,即属性和内容。属性是关于内容/流文件的元数据,我们在本系列的第1部分中看到了如何使用ExecuteScript来操作它们流文件的内容本质上只是一个字节集合,没有固有的结构,模式,格式等。各种NiFi处理器假设传入的流文件具有特定的模式/格式(或者从属性中确定它作为“mime.type”或以其他方式推断它。然后,这些处理器可以基于文件确实具有该格式的假设来对内容起作用(并且如果它们不这样,则经常转移到“失败”关系)。处理器也可以输出指定格式的流文件,这在处理器中有描述。NiFi文档

    流文件内容的输入和输出(I / O)通过ProcessSession API提供,因此是ExecuteScript的“session”变量(有关更多信息,请参阅第1部分)。一种机制是将回调对象传递给对session.read()或session.write()的调用。将为FlowFile对象创建InputStream和/或OutputStream,并使用相应的回调接口调用回调对象,并传入InputStream和/或OutputStream引用以供回调使用。有三个主要的回调接口,每个接口都有自己的用例:

      InputStreamCallback

        session.read(flowFileinputStreamCallback)方法使用此接口 提供InputStream,从中读取流文件的内容。界面有一个方法:

    1 void process(InputStream in) throws IOException

       此接口提供托管输入流以供使用。虽然可以手动关闭流,但输入流会自动打开和关闭。如果您只是从特定的流文件中读取而不是写回来,那么这是您将使用的表单。

       例如,当您想要处理传入的流文件,但创建许多输出流文件时,如 SplitText处理器。

      OutputStreamCallback

        session.write(flowFileoutputStreamCallback)方法使用此接口 来提供要写入流文件内容的OutputStream。界面有一个方法:

    1 void process(OutputStream out) throws IOException

        此接口提供托管输出流以供使用。尽管可以手动关闭流,但输出流会自动打开和关闭 - 如果包含这些流的任何流打开应该清除的资源,则非常重要。

        例如,ExecuteScript将从内部或外部文件生成数据,但不生成流文件。然后你将使用session.create()创建一个新的FlowFile,然后使用session.write( flowFileoutputStreamCallback)来插入内容。

      StreamCallback

        session.write(flowFilestreamCallback)方法使用此接口 来提供InputStream和OutputStream,从中读取和/或写入流文件的内容。界面有一个方法:

    1 void process(InputStream in, OutputStream out) throws IOException

     

        此接口提供托管输入和输出流以供使用。虽然可以手动关闭流,但输入流会自动打开和关闭 - 如果包含这些流的任何流打开应该清除的资源,则非常重要。

        例如,当您想要处理传入的流文件并用新的东西覆盖其内容时,例如 EncryptContent处理器。

      由于这些回调是Java对象,因此脚本必须创建一个并将其传递给会话方法,还有其他读取和写入流文件的方法,包括:

      • 使用session.read(flowFile)返回一个InputStream。这减轻了对InputStreamCallback的需求,而是返回可以读取的InputStream。作为交换,您必须手动管理(关闭,例如)InputStream。
      • 使用session.importFrom(inputStreamflowFile)从InputStream写入FlowFile。这取代了传递了OutputStreamCallback的session.write()的需要。

     ExecuteScript介绍

      ExecuteScript是一个多功能处理器,允许用户使用编程语言编写自定义逻辑,每次触发ExecuteScript处理器时都会执行该编程语言。为脚本提供以下变量绑定以启用对NiFi组件的访问:

      session:这是对分配给处理器的ProcessSession的引用。该会话允许您对流文件执行操作,如create(),putAttribute()和transfer(),以及read()和write()。

      context:这是对处理器的ProcessContext的引用。它可用于检索处理器属性,关系,Controller Services和StateManager。

      log:这是对处理器的ComponentLog的引用。使用它将消息记录到NiFi,例如log.info('Hello world!')

      REL_SUCCESS:这是对为处理器定义的“成功”关系的引用。它也可以通过引用父类的静态成员(ExecuteScript)来继承,但是某些引擎(如Lua)不允许引用静态成员,因此这是一个便利变量。它还节省了必须使用关系的完全限定名称。

      REL_FAILURE:这是对为处理器定义的“失败”关系的引用。与REL_SUCCESS一样,它也可以通过引用父类的静态成员(ExecuteScript)来继承,但是某些引擎(如Lua)不允许引用静态成员,因此这是一个便利变量。它还节省了必须使用关系的完全限定名称。

      动态属性:ExecuteScript中定义的任何动态属性都将作为设置为与动态属性对应的PropertyValue对象的变量传递给脚本引擎。这允许您获取属性的String值,还可以根据NiFi表达式语言评估属性,将值转换为适当的数据类型(例如布尔值等)等。因为动态属性名称变为脚本的变量名,您必须知道所选脚本引擎的变量命名属性

    ExecuteScript使用

      1、从会话中获取传入的流文件

        目的:有到ExecuteScript的传入连接,并希望从队列中检索一个流文件以进行处理

        方法:使用会话对象中的get()方法。此方法返回要处理的下一个最高优先级FlowFile的FlowFile。如果没有要处理的FlowFile,则该方法将返回null。请注意,即使FlowFiles有稳定的流入处理器,也可能返回null。如果处理器有多个并发任务,并且其他任务已经检索到FlowFiles,则会发生这种情况。如果脚本需要FlowFile继续处理,那么如果从session.get()返回null,它应立即返回

        Examples

          Javascript

    1 var flowFile = session.get();
    2 if (flowFile != null) {
    3     // All processing code goes here
    4 }

      2、从会话中获取多个传入流文件

        目的:有到ExecuteScript的传入连接,并希望从队列中检索多个流文件以进行处理

        方法:使用会话对象中的get(maxResults)方法。此方法从工作队列返回maxResults FlowFiles。如果没有可用的FlowFiles,则返回一个空列表(该方法不返回null)。注意:如果存在多个传入队列,则根据是否在单个调用中轮询所有队列或仅轮询单个队列,未指定行为。话虽如此,这里描述了观察到的行为(对于NiFi 1.1.0+和之前)

        Examples

          Javascript

    1 flowFileList = session.get(100)
    2 if(!flowFileList.isEmpty()) {
    3   for each (var flowFile in flowFileList) { 
    4        // Process each FlowFile here
    5   }
    6 }

      3、创建一个新的FlowFile

        目的:生成一个新的FlowFile以发送到下一个处理器

        方法:使用会话对象中的create()方法。此方法返回一个新的FlowFile对象,您可以对其执行进一步处理

        Examples

          Javascript

    1 var flowFile = session.create();
    2 // Additional processing here

      4、从父FlowFile创建新的FlowFile

        目的:希望基于传入的FlowFile生成新的FlowFile

        方法:使用会话对象中的create(parentFlowFile)方法。此方法采用父FlowFile引用并返回新的子FlowFile对象。新创建的FlowFile将继承除UUID之外的所有父属性。此方法将自动生成Provenance FORK事件或Provenance JOIN事件,具体取决于在提交ProcessSession之前是否从同一父级生成其他FlowFiles

        Examples

          Javascript

    1 var flowFile = session.get();
    2 if (flowFile != null) {
    3     var newFlowFile = session.create(flowFile);
    4     // Additional processing here
    5 }

      5、向流文件添加属性

        目的:有一个要添加自定义属性的流文件

        方法:使用会话对象中的putAttribute(flowFileattributeKeyattributeValue)方法。此方法使用给定的键/值对更新给定的FlowFile属性。注意:“uuid”属性对于FlowFile是固定的,不能修改; 如果密钥名为“uuid”,则将被忽略。

        Examples

          Javascript

    1 var flowFile = session.get();
    2 if (flowFile != null) {
    3     flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')
    4 }

      6、向流文件添加多个属性

        目的:有一个要添加自定义属性的流文件

        方法:使用会话对象中的putAllAttributes(flowFileattributeMap)方法。此方法使用给定Map中的键/值对更新给定的FlowFile属性。注意:“uuid”属性对于FlowFile是固定的,不能修改; 如果密钥名为“uuid”,则将被忽略。

        Examples

          Javascript

    1 var number2 = 2;
    2 var attrMap = {'myAttr1':'1', 'myAttr2': number2.toString()}
    3 var flowFile = session.get() 
    4 if (flowFile != null) {
    5     flowFile = session.putAllAttributes(flowFile, attrMap)
    6 }

      7、从流文件中获取属性

        目的:有一个流文件,您可以从中检查属性

        方法:使用FlowFile对象中的getAttribute(attributeKey)方法。此方法返回给定attributeKey的String值,如果未找到attributeKey,则返回null。这些示例显示了“filename”属性值的检索。

        Examples

          Javascript

    1 var flowFile = session.get() 
    2 if (flowFile != null) {
    3     var myAttr = flowFile.getAttribute('filename')
    4 }
    1 var flowFile = session.get() 
    2 if (flowFile != null) {
    3     var attrs = flowFile.getAttributes();
    4     for each (var attrKey in attrs.keySet()) { 
    5        // Do something with attrKey (the key) and/or attrs[attrKey] (the value)
    6   }
    7 }

      8、将流文件传输到关系

        目的:处理流文件(新文件或传入文件)后,您希望将流文件传输到关系(“成功”或“失败”)。在这个简单的情况下,让我们假设有一个名为“errorOccurred”的变量,它指示FlowFile应该传输到哪个关系。

        方法:使用会话对象中的transfer(flowFilerelationship)方法。从文档中:此方法根据给定的关系将给定的FlowFile传输到适当的目标处理器工作队列。如果关系导致多个目标,则复制FlowFile的状态,使得每个目标都接收FlowFile的精确副本,尽管每个目标都具有其自己的唯一标识。

        注意:ExecuteScript将在每次执行结束时执行session.commit()以确保已提交操作。您不需要(也不应该)在脚本中执行session.commit()。

        Examples

          Javascript

     1 var flowFile = session.get();
     2 if (flowFile != null) {
     3    // All processing code goes here
     4    if(errorOccurred) {
     5      session.transfer(flowFile, REL_FAILURE)
     6    }
     7    else {
     8      session.transfer(flowFile, REL_SUCCESS)
     9    }
    10 }

      9、以指定的日志记录级别向日志发送消息

        目的:将处理期间发生的某些事件报告给日志记录框架。

        方法:将log变量与warn(),trace(),debug(),info()或error()方法一起使用。这些方法可以使用单个String,或者后跟对象数组的String,或者后跟对象数组后跟Throwable的String。第一个用于简单消息。当您有一些要记录的动态对象/值时,将使用第二个。要在消息字符串中引用这些,请在消息中使用“{}”。这些是按照外观的顺序针对Object数组进行评估的,因此如果消息显示为“Found these things:{} {} {}”并且Object数组为['Hello',1,true],则记录的消息将为“找到这些东西:你好1真的”。这些日志记录方法的第三种形式也采用Throwable参数
        
    Examples

          Javascript

    1 var ObjectArrayType = Java.type("java.lang.Object[]");
    2 var objArray = new ObjectArrayType(3);
    3 objArray[0] = 'Hello';
    4 objArray[1] = 1;
    5 objArray[2] = true;
    6 log.info('Found these things: {} {} {}', objArray)

       10、使用回调读取传入流文件的内容

        目的:有到ExecuteScript的传入连接,并希望从队列中检索流文件的内容以进行处理

        方法:使用read(flowFileinputStreamCallback)来自会话对象的方法。传入read()方法需要一个InputStreamCallback对象。请注意,因为InputStreamCallback是一个对象,所以默认情况下内容只对该对象可见。如果需要使用read()方法之外的数据,请使用更全局范围的变量。这些示例将传入流文件的完整内容存储到String中(使用Apache Commons的IOUtils类)。注意:对于大流量文件,这不是最好的技术; 相反,您应该只读取您需要的数据,并根据需要进行处理。对于像SplitText这样的东西,你可以一次读取一行并在InputStreamCallback中处理它,或者使用前面提到的session.read(flowFile)方法来获得在回调之外使用的InputStream引用。

        Examples

          Javascript

     1 var InputStreamCallback =  Java.type("org.apache.nifi.processor.io.InputStreamCallback")
     2 var IOUtils = Java.type("org.apache.commons.io.IOUtils")
     3 var StandardCharsets = Java.type("java.nio.charset.StandardCharsets")
     4  
     5 var flowFile = session.get();
     6 if(flowFile != null) {
     7   // Create a new InputStreamCallback, passing in a function to define the interface method
     8   session.read(flowFile,
     9     new InputStreamCallback(function(inputStream) {
    10         var text = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
    11         // Do something with text here
    12     }));
    13 }

       11、使用回调将内容写入传出流文件

        目的:传出流文件生成内容

        方法:使用会话对象中的write(flowFileoutputStreamCallback)方法。传递给write()方法需要一个OutputStreamCallback对象。请注意,因为OutputStreamCallback是一个对象,所以默认情况下内容只对该对象可见。如果需要使用write()方法之外的数据,请使用更全局范围的变量。这些示例将示例String写入flowFile。

        Examples

          Javascript

     1 var OutputStreamCallback =  Java.type("org.apache.nifi.processor.io.OutputStreamCallback");
     2 var IOUtils = Java.type("org.apache.commons.io.IOUtils");
     3 var StandardCharsets = Java.type("java.nio.charset.StandardCharsets");
     4  
     5 var flowFile = session.get();
     6 if(flowFile != null) {
     7   // Create a new OutputStreamCallback, passing in a function to define the interface method
     8   flowFile = session.write(flowFile,
     9     new OutputStreamCallback(function(outputStream) {
    10         outputStream.write("Hello World!".getBytes(StandardCharsets.UTF_8))
    11     }));
    12 }

       12、使用回调覆盖带有更新内容的传入流文件

        目的:重用传入的流文件,但希望修改其传出流文件的内容。

        方法:使用write(flowFilestreamCallback)来自会话对象的方法。传递给write()方法需要StreamCallback对象。StreamCallback提供InputStream(来自传入流文件)和outputStream(用于该流文件的下一个版本),因此您可以使用InputStream获取流文件的当前内容,然后修改它们并将它们写回到流文件。这会覆盖流文件的内容,因此对于追加,您必须通过附加到读入内容来处理它,或者使用不同的方法(使用session.append()而不是session.write())。请注意,由于StreamCallback是一个对象,因此默认情况下内容仅对该对象可见。如果需要使用write()方法之外的数据,请使用更全局范围的变量

        Examples

          Javascript

    var StreamCallback =  Java.type("org.apache.nifi.processor.io.StreamCallback");
    var IOUtils = Java.type("org.apache.commons.io.IOUtils");
    var StandardCharsets = Java.type("java.nio.charset.StandardCharsets");
     
    var flowFile = session.get();
    if(flowFile != null) {
      // Create a new StreamCallback, passing in a function to define the interface method
      flowFile = session.write(flowFile,
        new StreamCallback(function(inputStream, outputStream) {
            var text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
            outputStream.write(text.split("").reverse().join("").getBytes(StandardCharsets.UTF_8))
        }));
    }

       13、处理脚本处理过程中的错误

        目的:脚本中发生错误(通过数据验证或抛出异常),并且您希望脚本正常处理它。

        方法:对于异常,使用脚本语言的异常处理机制(通常它们是try / catch块)。对于数据验证,您可以使用类似的方法,但定义一个布尔变量,如“valid”和if / else子句而不是try / catch子句。ExecuteScript定义“成功”和“失败”关系; 通常,您的处理将“好”流文件转移到成功,“坏”流文件转换为失败(在后一种情况下记录错误)

        Examples

          Javascript

     1 var flowFile = session.get();
     2 if(flowFile != null) {
     3   try {
     4     // Something that might throw an exception here
     5  
     6     // Last operation is transfer to success (failures handled in the catch block)
     7     session.transfer(flowFile, REL_SUCCESS)
     8 } catch(e) {
     9   log.error('Something went wrong', e)
    10   session.transfer(flowFile, REL_FAILURE)
    11 }
    12 }

     ExecuteScript-Demo

      1、页面如下图

      

      2、GenerateFlowFile

        

      2、ExecuteScript

        

        脚本内容:

     1 var InputStreamCallback =  Java.type("org.apache.nifi.processor.io.InputStreamCallback");
     2 var OutputStreamCallback =  Java.type("org.apache.nifi.processor.io.OutputStreamCallback");
     3 var IOUtils = Java.type("org.apache.commons.io.IOUtils");
     4 var StandardCharsets = Java.type("java.nio.charset.StandardCharsets");
     5  
     6 var flowFile = session.get();
     7 
     8 
     9 if(flowFile != null) {
    10 
    11     try {
    12 
    13         var text = "";
    14 
    15         // 读取flowFile中内容
    16         session.read(flowFile,new InputStreamCallback(function(inputStream) {
    17             var str = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
    18             
    19 
    20             //由JSON字符串转换为JSON对象
    21             var obj = JSON.parse(str); 
    22             obj.age = 18
    23 
    24             //将JSON对象转化为JSON字符
    25             text = JSON.stringify(obj); 
    26 
    27         }));
    28 
    29         // 向flowFile中写入内容
    30         flowFile = session.write(flowFile, new OutputStreamCallback(function(outputStream) {
    31 
    32             outputStream.write(text.getBytes(StandardCharsets.UTF_8))
    33 
    34         }));
    35 
    36         session.transfer(flowFile, REL_SUCCESS)
    37 
    38     } catch(e) {
    39         log.error('Something went wrong', e)
    40         session.transfer(flowFile, REL_FAILURE)
    41     }
    42     
    43 }

      3、PutFile

        

        输出文件内容:{"id":1,"name":"god","age":18}

      

      其他脚本引擎,参考以下地址 

      参考文档链接:https://community.hortonworks.com/articles/75032/executescript-cookbook-part-1.html

  • 相关阅读:
    【NLP】UnicodeDecodeError: 'ascii' codec can't decode byte 0xd1 in position 74752: ordinal not in rang
    【Android】Android学习过程中的一些网站
    【Java】第10章 内部类
    【Java】第7章 复用类
    【Linux】Ubuntu下安装QQ
    【Java】第9章 接口
    【Java】第8章 多态
    【Coding】用筛法求素数的C++实现(附100000以内素数表)
    【Android】挺好用的chart engine,可用于Android画饼图,条形图等
    【Coding】Visual Studio中最常用的13个快捷键
  • 原文地址:https://www.cnblogs.com/h--d/p/10111850.html
Copyright © 2011-2022 走看看