zoukankan      html  css  js  c++  java
  • 【NIFI】 Apache NiFI 之 ExecuteScript处理(二)

      本例介绍NiFI ExecuteScript处理器的使用,使用的脚本引擎ECMScript

      接上一篇【NIFI】 Apache NiFI 之 ExecuteScript处理(一)

    ExecuteScript使用

      1、动态属性

        其中一个功能是动态属性的概念,也称为用户定义属性。这些是处理器的属性,用户可以为其设置属性名称和值。并非所有处理器都支持/使用动态属性,但ExecuteScript会将动态属性作为变量传递,这些变量引用与属性值对应的PropertyValue对象。这里有两件重要的事情需要注意:

      • 由于属性名称按原样绑定到变量名称,因此必须为指定的编程语言支持动态属性的命名约定。例如,Groovy不支持句点(。)作为有效的变量字符,因此动态属性(如“my.value”)将导致处理器失败。在这种情况下,有效的替代方案是“myValue”。
      • 使用PropertyValue对象(而不是值的String表示形式)以允许脚本在将属性值评估为String之前对属性的值执行各种操作。如果已知属性包含文字值,则可以对变量调用getValue()方法以获取其String表示形式。相反,如果值可能包含表达式语言,或者您希望将值转换为除String之外的值(例如对布尔对象的值为'true'),则还有这些操作的方法。这些示例在下面的配方中说明,假设我们有两个属性'myProperty1'和'myProperty2'定义如下:

        

        目的:用户输入了动态属性以供脚本使用(例如,配置参数)。

        方法:使用变量的PropertyValue对象中的getValue()方法。此方法返回动态属性值的String表示形式。请注意,如果值中存在表达式语言,则getValue()将不对其进行求值

        Examples

          Javascript

    1 var myValue1 = myProperty1.getValue()
    2 var myValue2 = myProperty2.evaluateAttributeExpressions(flowFile).getValue()

       2、添加模块

        ExecuteScript的另一个功能是能够向类路径添加外部“模块”,这允许您利用各种第三方库,脚本等。但是每个脚本引擎都以不同方式处理模块的概念,因此我将讨论它们分别。一般来说,有两种类型的模块,Java库(JAR)和脚本(用与ExecuteScript中相同的语言编写

        配置如下:
          

        目的:脚本执行时,使用Java库

        方法:配置ExecuteScript,添加Module Directory目录(里面都是jar包),脚本中获取jar包中的类,使用jar包中的类方法

        Examples

          Javascript

     1 var ObjectMapper = Java.type("com.fasterxml.jackson.databind.ObjectMapper");
     2 var Map = Java.type("java.util.Map");
     3 
     4 
     5 var objectMapper = new ObjectMapper();
     6 
     7 
     8 // java对象转 json字符串
     9 var str = objectMapper.writeValueAsString(obj);
    10 
    11 // json字符串 转 java对象
    12 var map = objectMapper.readValue(str, Map.class);  

      3、状态存储

        NiFi(我相信0.5.0)为处理器和其他NiFi组件提供了持久保存一些信息的能力,以便在组件周围实现某些状态功能,例如,QueryDatabaseTable处理器跟踪它在指定列中看到的最大值,这样下次运行时,它只会获取其值大于目前已见过的行(即存储在州经理)。

        NiFi组件可以选择将其状态存储在集群级别或本地级别。请注意,在独立的NiFi实例中,“群集范围”与“本地范围”相同。范围的选择通常是关于在流中,每个节点上的相同处理器是否可以共享状态数据。如果群集中的实例不需要共享状态,则使用本地范围。在Java中,这些选项作为名为Scope的枚举提供,因此当我引用Scope.CLUSTER和Scope.LOCAL时,我分别表示集群和本地作用域。

        要在ExecuteScript中使用状态管理功能(下面是特定于语言的示例),您可以通过调用ProcessContext的getStateManager()方法获得对StateManager的引用(回想一下,每个引擎都获得一个名为“context”的变量,其中包含ProcessContext实例)。然后,您可以在StateManager对象上调用以下方法:

        void setState(Map <String,String> state,Scope scope - 更新组件在给定范围内的状态值,并将其设置为给定值。请注意,该值是一个Map; “组件状态”的概念是每个构成较低级别状态的所有键/值对的映射。Map会立即更新以提供原子性。

        StateMap getState(作用域范围 - 返回给定范围内组件的当前状态。此方法永远不会返回null; 相反,它是一个StateMap对象,如果尚未设置状态,StateMap的版本将为-1,值的映射将为空。通常会创建一个新的Map <String,String>来存储更新的值,然后调用setState()或replace()。

        boolean replace(StateMap oldValue,Map <String,String> newValue,Scope scope - 当且仅当值与给定的oldValue相同时,才更新组件状态(在给定范围内)的值到新值。如果状态已更新为新值,则返回true; 否则,如果状态的值不等于oldValue,则返回false。

        void clear(范围范围 - 清除给定范围内组件状态的所有键和值。

        目的1:脚本需要从状态管理器获取当前键/值对以供脚本使用(例如,更新)

        方法1:使用ProcessContext中的getStateManager()方法,然后使用StateManager中的getStateMap(),然后使用toMap()转换为键/值对的Map <String,String>。请注意,StateMap还有一个简单检索值的get(key)方法,但这种方法并不常用,因为Map通常会更新,一旦完成,就必须为StateManager设置。

        Examples

          Javascript

    1 var Scope = Java.type('org.apache.nifi.components.state.Scope');
    2 var oldMap = context.stateManager.getState(Scope.LOCAL).toMap();

        目的2:脚本希望使用新的键/值对映射更新状态映射。

        方法2:要获取当前StateMap对象,请再次使用ProcessContext中的getStateManager()方法,然后使用StateManager中的getStateMap()。这些示例假设一个新的Map,但使用上面的配方(使用toMap()方法),您可以使用现有值创建一个新的Map,然后只更新所需的条目。请注意,如果没有当前映射(即StateMap.getVersion()返回-1),则replace()将不起作用,因此示例将相应地检查并调用setState()或replace()。从新的ExecuteScript实例运行时,StateMap版本将为-1,因此在单次执行后,如果右键单击ExecuteScript处理器并选择View State,您应该看到如下内容:

          

        Examples

          Javascript

    1 var Scope = Java.type('org.apache.nifi.components.state.Scope');
    2 var stateManager = context.stateManager;
    3 var stateMap = stateManager.getState(Scope.CLUSTER);
    4 var newMap = {'myKey1': 'myValue1'};
    5 if (stateMap.version == -1) {
    6   stateManager.setState(newMap, Scope.CLUSTER);
    7 } else {
    8   stateManager.replace(stateMap, newMap, Scope.CLUSTER);
    9 }

    ExecuteScript-Demo

      Demo流程:先存入缓存myKey1 =》 第二次取出myKey1的值 =》 myKey1的值修改 =》 再次存入缓存中

      ExecuteScript内容如下:

     1 var Scope = Java.type('org.apache.nifi.components.state.Scope');
     2 var stateManager = context.stateManager;
     3 var stateMap = stateManager.getState(Scope.CLUSTER);
     4 
     5 if (stateMap.version == -1) {
     6   var newMap = {'myKey1': "1"};
     7   stateManager.setState(newMap, Scope.CLUSTER);
     8 } else {
     9     
    10 
    11    var myValue1 = stateMap.toMap().get("myKey1");
    12    myValue1 = myValue1*1 + 1;
    13    var newMap = {'myKey1': myValue1 + ""};
    14   
    15   // 替换
    16   stateManager.replace(stateMap, newMap, Scope.CLUSTER);
    17 }

      

      其他脚本引擎,参考以下地址 

      参考文档链接:https://community.hortonworks.com/articles/77739/executescript-cookbook-part-3.html

  • 相关阅读:
    Oracle SQL语句收集
    SqlParameter In 查询
    SQL 性能优化
    Entity Framework
    【XLL API 函数】 xlfSetName
    【XLL API 函数】xlfUnregister (Form 2)
    【XLL API 函数】xlfUnregister (Form 1)
    【Excel 4.0 函数】REGISTER 的两种形式以及VBA等效语句
    【Excel 4.0 函数】REGISTER
    【Bochs 官方手册翻译】 第一章 Bochs介绍
  • 原文地址:https://www.cnblogs.com/h--d/p/10148645.html
Copyright © 2011-2022 走看看