<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12
      Fork me on GitHub

      Nifi組件腳本開(kāi)發(fā)—ExecuteScript 使用指南(三)

      上一篇:Nifi組件腳本開(kāi)發(fā)—ExecuteScript 使用指南(二) 

      Part 3 - 高級(jí)特征

      本系列的前兩篇文章涵蓋了 flow file 的基本操作, 如讀寫(xiě)屬性和內(nèi)容, 以及使用"session" 變量 ( ProcessSession對(duì)象)獲取和轉(zhuǎn)移 flow files . ExecuteScript還有很多其他的能力,這里對(duì)一部分作簡(jiǎn)要介紹。

      動(dòng)態(tài)屬性-Dynamic Properties

      其中一個(gè)能力叫做 dynamic properties, 或者稱為用戶定義屬性.  processor 的一些屬性可以由用戶設(shè)置 property name 和 value. 不是所有的processors 都支持和使用動(dòng)態(tài)屬性, 在 ExecuteScript 將傳遞動(dòng)態(tài)屬性作為變量,改變了引用 PropertyValue 對(duì)象,對(duì)應(yīng)于property's value. 這里有兩個(gè)重要事需要了解:

      1. 因?yàn)?property 綁定為變量名, dynamic properties的命名規(guī)則必須滿足相應(yīng)的編程語(yǔ)言的規(guī)范。 例如, Groovy 不支持 (.) 作為變量名字符, 像 "my.value" 引起processor處理失敗. 有效的可選項(xiàng)是 "myValue"
      2. PropertyValue 對(duì)象用于 (rather than a String representation of the value) 腳本執(zhí)行多種操作,在轉(zhuǎn)換為String之前進(jìn)行。如果property已知包含合法的值, 你可以調(diào)用 該變量的 getValue() 方法得到其字符串表示. 如果值包含 Expression Language,或者希望轉(zhuǎn)為除字符串外的其它值(如 'true' 對(duì)于Boolean 對(duì)象), 這里也提供了操作方法. 這些例子在下面的示例中演示, 假定我們有兩個(gè)屬性 'myProperty1' 和 'myProperty2',像下面這樣被定義

      獲取 dynamic property的值

      需求:在腳本中得到 dynamic property(如, 配置參數(shù))。

      方法:使用變量的PropertyValue對(duì)象的getValue() 方法. 該方法返回其字符串代表 dynamic property. 注意,如果Expression Language包含在字符串中, getValue() 將不會(huì)對(duì)其求值(參加下一個(gè)方法實(shí)現(xiàn)求職功能)。

      例子

      Groovy

      def myValue1 = myProperty1.value

      Jython

      myValue1 = myProperty1.getValue()

      Javascript

      var myValue1 = myProperty1.getValue()

      JRuby

      myValue1 = myProperty1.getValue()

      添加模塊

      ExecuteScript 的另一個(gè)特征是具有添加外部模塊到 classpath 的能力, 這將允許使用大量的第三方庫(kù)、腳本等增強(qiáng)能力. 但是,每一個(gè)腳本引擎處理模塊的方法都是不一樣的, 因此需要分開(kāi)討論。總體上說(shuō), 主要有兩種類型的模塊, Java libraries (JARs) 和 scripts (以在 ExecuteScript中的同一種語(yǔ)言編寫(xiě). 這里將討論和顯示不同 script engines 如何進(jìn)行處理:

      Groovy

       Groovy script engine (至少在 ExecuteScript中) 不支持導(dǎo)入其他的 Groovy scripts, 但是允許 JARs 添加到 classpath. 因此,對(duì)于外部Groovy projects, 考慮編譯為bytecode,然后指向 classes 目錄或者包裝為 JAR.

      當(dāng)使用 Groovy, 這個(gè) Module Directory 屬性設(shè)為 comma-separated 的文件列表 (JARs) 和 folders. 如果folder 被指定, ExecuteScript 將發(fā)現(xiàn)該目錄所有的 JARs 并添加進(jìn)去. 這允許你添加第三方軟件,哪怕包含很多個(gè) JARs. 

      Jython

      Jython script engine (在 ExecuteScript) 目前僅支持導(dǎo)入純 Python 模塊, 不包含natively-compiled modules (如CPython),如 numpy 或 scipy. 目前也暫不支持 JARs, 這在將來(lái)版本中也許會(huì)考慮. 在Module Directory property在執(zhí)行前需要加載, 使用"import sys" 跟著 "sys.path.append" 對(duì)每一個(gè)指定的模塊位置進(jìn)行加載.

      如果 Python 已經(jīng)安裝, 可以將所有的安裝好的純 Python modules 添加進(jìn)來(lái),通過(guò)將 site-packages 目錄加到Module Directory 屬性即可, 如:

      /usr/local/lib/python2.7/site-packages

      然后,你的腳本就能 import 各種軟件包了,如:

      from user_agents import parse

      Javascript

      Javascript script engine (在 ExecuteScript), 允許同樣的 JARs/folders設(shè)置,與 Groovy engine一樣. 將查找JARs 以及指定的folder.

      JRuby

      JRuby script engine (在 ExecuteScript) 目前只允許單個(gè)的 JARs指定, 如果 folder 被指定,其中一定要有classes ( java compiler 需要能看見(jiàn)), 如果folder 包含 JARs將不會(huì)自動(dòng)加入。目前, 沒(méi)有pure Ruby 模塊能被導(dǎo)入

      狀態(tài)管理

      NiFi (如0.5.0 ) 提供了為 Processors 和其他 NiFi 組件持久化一些信息從而實(shí)現(xiàn)組件的狀態(tài)管理功能. 例如,  QueryDatabaseTable processor 保存對(duì)大數(shù)據(jù)集的跟蹤, 當(dāng)下次運(yùn)行時(shí), 將只會(huì)獲取哪些比原來(lái)(存儲(chǔ)在 State Manager)更大的行的數(shù)據(jù)。

      狀態(tài)管理的一個(gè)重要概念是Scope. NiFi 組件可以選擇存儲(chǔ)它的狀態(tài)在集群級(jí)別還是本地級(jí)別. 注意,在獨(dú)立的 NiFi 實(shí)例中, "cluster scope" 與 "local scope"是一樣的. 這個(gè) scope 選擇的區(qū)別在于在一個(gè)數(shù)據(jù)流中,每個(gè)結(jié)點(diǎn)的處理器是否需要共享狀態(tài)信息. 如果集群中的實(shí)例不需要共享狀態(tài),就使用local scope. 在 Java,這些選項(xiàng)作為一個(gè) enum變量 Scope提供, 因此,當(dāng)引用 Scope.CLUSTER 和 Scope.LOCAL, 就意味著是集群模式或本地模式.

      為了探究ExecuteScript (語(yǔ)言獨(dú)立的例子如下)狀態(tài)管理的特征 , 你可以獲得 StateManager的引用,通過(guò)調(diào)用 ProcessContext的 getStateManager() 方法實(shí)現(xiàn) (recall that each engine gets a variable named "context" with an instance of ProcessContext). 然后調(diào)用 StateManager 對(duì)象的下面方法:

      • void setState(Map<String, String> state, Scope scope) - 在給定的scope更新組件狀態(tài)的值, 設(shè)置為給定的值. 注意,這個(gè)值是 Map 數(shù)據(jù)結(jié)構(gòu); 概念 "component state" 所有的 key/value鍵值對(duì)的 Map. 該 Map被一次全部更新,從而提供原子性.
      • StateMap getState(Scope scope) - 返回組件在給定scope的當(dāng)前狀態(tài). 該方法永不會(huì)返回 null; 對(duì)于 StateMap 對(duì)象,如果 state沒(méi)有被設(shè)置, StateMap's 版本將是 -1, 而 map的值將是 empty. 經(jīng)常,一個(gè)新的 Map<String,String> 被創(chuàng)建來(lái)存儲(chǔ)更新的值,然后setState()或 replace() 被調(diào)用.
      • boolean replace(StateMap oldValue, Map<String, String> newValue, Scope scope) - 更新組件的狀態(tài)值 (在給定的 scope)為新的值,僅在當(dāng)前值與給定的 oldValue一樣時(shí)執(zhí)行. 如果 state 被更新為新的值, 返回true; 否則返回 false,如果state's value 不等于oldValue.
      • void clear(Scope scope- 從給定的scope下,清除組件狀態(tài)所有的鍵值

      得到當(dāng)前map的 key/value 對(duì)

      需求:腳本從狀態(tài)管理器得到當(dāng)前的 key/value 對(duì),然后在 script 中使用(如更新等)。

      方法:使用ProcessContext的getStateManager()方法, 然后從 StateManager調(diào)用 getStateMap() , 再 toMap() 轉(zhuǎn)換為Map<String,String>形式的key/value對(duì). 注意,StateMap 也有 get(key) 方法去簡(jiǎn)化獲得 value的方法, 但是不如 Map用的普遍。必須在 StateManager 一次性設(shè)置完畢。

      例子

      Groovy

      import org.apache.nifi.components.state.Scope
      def oldMap = context.stateManager.getState(Scope.LOCAL).toMap()

      Jython

      from org.apache.nifi.components.state import Scope
      oldMap = context.stateManager.getState(Scope.LOCAL).toMap()

      Javascript

      var Scope = Java.type('org.apache.nifi.components.state.Scope');
      var oldMap = context.stateManager.getState(Scope.LOCAL).toMap();

      JRuby

      java_import org.apache.nifi.components.state.Scope
      oldMap = context.stateManager.getState(Scope::LOCAL).toMap()

      更新 key/value 映射的值對(duì)

      需求:腳本希望通過(guò)新的包含key/value的映射值對(duì)來(lái)更新 state map。

      方法:為了得到當(dāng)前的 StateMap 對(duì)象, 再次用ProcessContext調(diào)用 getStateManager() 方法, 然后 StateManager調(diào)用getStateMap() . 例子中假定為新的 Map, 但是使用上面的配方 (通過(guò) toMap() 方法), 你可以使用存在的值創(chuàng)建新的 Map, 然后用于更新想要的記錄. 注意,如果沒(méi)有當(dāng)前map (i.e. the StateMap.getVersion() returns -1),replace() 將不會(huì)工作, 因此例子中檢查并相應(yīng)地調(diào)用 setState() 或 replace(). 當(dāng)從ExecuteScript的新實(shí)例運(yùn)行時(shí),該StateMap 版本將會(huì)是 -1, 當(dāng)單次執(zhí)行后, 如果鼠標(biāo)右鍵 ExecuteScript processor,然后選擇 View State, 將看到如下所示的信息:

       

      例子

      Groovy

      import org.apache.nifi.components.state.Scope
      def stateManager = context.stateManager
      def stateMap = stateManager.getState(Scope.CLUSTER)
      def newMap = ['myKey1': 'myValue1']
      
      if (stateMap.version == -1) {
          stateManager.setState(newMap, Scope.CLUSTER);
      } else {
          stateManager.replace(stateMap, newMap, Scope.CLUSTER);
      }

      Jython

      from org.apache.nifi.components.state import Scope
      stateManager = context.stateManager
      stateMap = stateManager.getState(Scope.CLUSTER)
      
      newMap = {'myKey1': 'myValue1'}
      
      if stateMap.version == -1:
          stateManager.setState(newMap, Scope.CLUSTER)
      else:
          stateManager.replace(stateMap, newMap, Scope.CLUSTER)

      Javascript

      var Scope = Java.type('org.apache.nifi.components.state.Scope');
      var stateManager = context.stateManager;
      var stateMap = stateManager.getState(Scope.CLUSTER);
      var newMap = {'myKey1': 'myValue1'};
      
      if (stateMap.version == -1) {
          stateManager.setState(newMap, Scope.CLUSTER);
      } else {
          stateManager.replace(stateMap, newMap, Scope.CLUSTER);
      }

      JRuby

      java_import org.apache.nifi.components.state.Scope
      stateManager = context.stateManager
      stateMap = stateManager.getState(Scope::CLUSTER)
      newMap = {'myKey1'=> 'myValue1'}
      
      if stateMap.version == -1
          stateManager.setState(newMap, Scope::CLUSTER)
      else
          stateManager.replace(stateMap, newMap, Scope::CLUSTER)
      end

      清空 state map

      需求:清空 state map所有的e key/value 值。

      方法:使用ProcessContext的getStateManager()方法, 然后調(diào)用StateManager的clear(scope)方法

      例子

      Groovy

      import org.apache.nifi.components.state.Scope
      context.stateManager.clear(Scope.LOCAL)

      Jython

      from org.apache.nifi.components.state import Scopecontext.state
      Manager.clear(Scope.LOCAL)

      Javascript

      var Scope = Java.type('org.apache.nifi.components.state.Scope');
      context.stateManager.clear(Scope.LOCAL);

      JRuby

      java_import org.apache.nifi.components.state.Scope
      context.stateManager.clear(Scope::LOCAL)

      存取控制器服務(wù)

      在 NiFi ARchive (NAR) 結(jié)構(gòu)中, Controller Services-控制器服務(wù)被暴露為 interfaces, 在 API JAR中. 例如 , DistributedCacheClient 是一個(gè)從 ControllerService擴(kuò)展來(lái)的接口, 位于 nifi-distributed-cache-client-service-api JAR中, 在 nifi-standard-services-api-nar NAR. 其他的 NARs 如果想要引用interfaces (去創(chuàng)建新的 client implementation, e.g.) 必須指定 nifi-standard-services-api-nar 作為父級(jí) NAR, 然后在processor的子模塊提供 API JARs 的實(shí)例。

      這是一些底層的細(xì)節(jié),可能需要的以提升 Controller Services的使用, 這里提及主要是為了:

      1. 在 NiFi 1.0.0前, scripting NAR (包括 ExecuteScript 和 InvokeScriptedProcessor) 不需要指定nifi-standard-services-api-nar 作為父級(jí). 這意味著只有明確的引用能被用于 ControllerServices 接口 (及其實(shí)現(xiàn)), 同樣的原因, 只有沒(méi)有要求其他不可用類的接口方法可以被使用. 這限制了 ExecuteScript 對(duì)Controller Services的使用
      2. NiFi 1.0.0, scripting processors 在nifi-standard-services-api-nar中存取 Controller Service interfaces (及其相關(guān)的classes) . 這包括DBCPService, DistributedMapCacheClient, DistributedSetCacheClient, HttpContextMap 和 SSLContextService. 但是我不相信nifi-standard-services-api-nar中其它的API 將會(huì)可用, 而且沒(méi)有定制化 ControllerService interfaces 將被識(shí)別

      Processors 總是傾向于使用 Controller Service 實(shí)例創(chuàng)建 property (如PropertyDescriptor 對(duì)象) 并且調(diào)用 identifiesControllerService(class) . 當(dāng) UI component被渲染時(shí), 將會(huì)發(fā)現(xiàn)所有的實(shí)現(xiàn)了期望接口的 Controller Services ,  component's ID 被使用, 友好顯示名稱被顯示給用戶。

      對(duì)于ExecuteScript, 我們可以讓用戶選擇Controller Service 實(shí)例,通過(guò)讓他指定名稱或者 ID 來(lái)實(shí)現(xiàn). 如果我們?cè)试S用戶指定name, 腳本將不得不執(zhí)行一個(gè)查詢Controller Service實(shí)例列表去找到匹配名稱的元素。  這在上面的博客中提到了, 這里不再重復(fù). 如果用戶輸入實(shí)例的ID, 然后 (在 NiFi 1.0.0) 將會(huì)更加容易滴匹配對(duì)象并存取,在下面將會(huì)看到. 這個(gè)例子將使用DistributedMapCacheClientService 實(shí)例為 "distMapClient", 連接到DistributedMapCacheServer 實(shí)例 (在標(biāo)準(zhǔn)的缺省配置下, localhost:4557), 這里 client instance 的ID為 93db6734-0159-1000-b46f-78a8af3b69ed:

      在ExecuteScript 配置中, dynamic property被創(chuàng)建, 名為 "clientServiceId" 并且設(shè)為 93db6734-0159-1000-b46f-78a8af3b69ed:

      然后我們使用clientServiceId.asControllerService(DistributedMapCacheClient), 這里參數(shù)是對(duì)DistributedMapCacheClient類對(duì)象的引用. 例如, 我有一個(gè)預(yù)先填充的緩存,字符串 key 'a' 設(shè)為字符串值  'hello'. 讓 Groovy script 使用 DistributedMapCacheServer進(jìn)行工作。

       

      一旦我們有了一個(gè) DistributedMapCacheClient 實(shí)例, 然后就可以調(diào)用get(keyserializerdeserializer)去獲取值. 在這個(gè)例子中,因?yàn)閗eys 和 values 都是Strings, 我們只需要一個(gè) Serializer<String> 和 Deserializer<String> 實(shí)例傳給 get() 方法. 該方法對(duì)于所有語(yǔ)言都是一樣的,通過(guò) StreamCallback 實(shí)例的創(chuàng)建(在本系列文章的 Part 2). 這個(gè)將從預(yù)先填充的服務(wù)器得到 key 'a' 的值,并且記錄值("Result = hello")。

       

      得到property(存儲(chǔ)在 DistributedMapCacheServer)

      需求:用戶發(fā)布值到 DistributedMapCacheServer (如配置數(shù)據(jù)),然后使用腳本進(jìn)行訪問(wèn)。

      方法:使用上面描述的方法,創(chuàng)建一個(gè)StringSerializer 和 StringDeserializer 對(duì)象, 然后通過(guò)ID得到DistributedMapCacheClientService 實(shí)例, 然后調(diào)用服務(wù)的 get() 方法. 記錄下結(jié)果到日志,方便后面查看

      例子

      Groovy

      import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient
      import org.apache.nifi.distributed.cache.client.Serializer
      import org.apache.nifi.distributed.cache.client.Deserializer
      import java.nio.charset.StandardCharsets
      
      def StringSerializer = {value, out -> out.write(value.getBytes(StandardCharsets.UTF_8))} 
          as Serializer<String>
      def StringDeserializer = { bytes -> new String(bytes) } as Deserializer<String>
      def myDistClient = clientServiceId.asControllerService(DistributedMapCacheClient)
      def result = myDistClient.get('a', StringSerializer, StringDeserializer)
      log.info("Result = $result")

      Jython

      from org.python.core.util import StringUtil
      from org.apache.nifi.distributed.cache.client 
          import DistributedMapCacheClient, Serializer, Deserializer
      
      # Define a subclass of Serializer for use in the client's get() method
      class StringSerializer(Serializer):
      def __init__(self):
          pass
      def serialize(self, value, out):
          out.write(value)
      
      # Define a subclass of Deserializer for use in the client's get() method
      class StringDeserializer(Deserializer):
      def __init__(self):
          pass
      def deserialize(self, bytes):
          return StringUtil.fromBytes(bytes)
      
      myDistClient = clientServiceId.asControllerService(DistributedMapCacheClient)
      result = myDistClient.get('a', StringSerializer(), StringDeserializer())
      log.info('Result = ' + str(result))

      Javascript

      var DistributedMapCacheClient = 
          Java.type('org.apache.nifi.distributed.cache.client.DistributedMapCacheClient');
      var Serializer = Java.type('org.apache.nifi.distributed.cache.client.Serializer');
      var Deserializer = Java.type('org.apache.nifi.distributed.cache.client.Deserializer');
      var StandardCharsets = Java.type('java.nio.charset.StandardCharsets');
      var StringSerializer = new Serializer(function(value, out) {
              out.write(value.getBytes(StandardCharsets.UTF_8));
          })
      
      var StringDeserializer = new Deserializer(function(arr) {
          // For some reason I had to build a string from the character codes in the "arr" array
      var s = "";
      
      for(var i = 0; i < arr.length; i++) {
          s = s + String.fromCharCode(arr[i]);
      }
      
      return s;
      })
      
      var myDistClient = clientServiceId.asControllerService(DistributedMapCacheClient.class);
      var result = myDistClient.get('a', StringSerializer, StringDeserializer);
      log.info("Result = "+ result);

      JRuby

      java_import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient
      java_import org.apache.nifi.distributed.cache.client.Serializer
      java_import org.apache.nifi.distributed.cache.client.Deserializer
      java_import java.nio.charset.StandardCharsets
      
      # Define a subclass of Serializer for use in the client's get() method
      class StringSerializer
      
      include Serializer
      
      def serialize(value, out)
          out.write(value.to_java.getBytes(StandardCharsets::UTF_8))
      end
      end
      
      # Define a subclass of Deserializer for use in the client's get() method
      class StringDeserializer
      include Deserializer
      def deserialize(bytes)
          bytes.to_s
      end
      end
      
      myDistClient = clientServiceId.asControllerService(DistributedMapCacheClient.java_class)
      result = myDistClient.get('a', StringSerializer.new, StringDeserializer.new)
      log.info('Result = ' + result)

       

       

       

      原文地址:https://my.oschina.net/u/2306127/blog/858943


       

      1、Nifi:基本認(rèn)識(shí)

      2、Nifi:基礎(chǔ)用法及頁(yè)面常識(shí)

      3、Nifi:ExcuseXXXScript組件的使用(一)

      4、Nifi:ExcuseXXXScript組件的使用(二)

      5、Nifi:ExcuseXXXScript組件的使用(三)

      6、Nifi:自定義處理器的開(kāi)發(fā)

      7、Nifi:Nifi的Controller Service

       

      posted @ 2021-02-24 09:50  糖拌西紅柿  閱讀(2304)  評(píng)論(0)    收藏  舉報(bào)
      主站蜘蛛池模板: 日韩高清在线亚洲专区不卡| 1024你懂的国产精品| 美女胸18下看禁止免费视频| 亚洲色最新高清AV网站| 好吊妞| 武城县| 亚洲国产大胸一区二区三区| 成人无码午夜在线观看| 国产乱子影视频上线免费观看| 日韩人妻无码一区二区三区| 熟妇人妻中文a∨无码| 青青热在线精品视频免费观看| jizzjizz日本高潮喷水| 韩国主播av福利一区二区| 国产精品成人午夜福利| 欧美va天堂在线电影| 妓女妓女一区二区三区在线观看 | 精品无码国产自产拍在线观看蜜 | 农村妇女野外一区二区视频| 日本三级理论久久人妻电影| 无码专区视频精品老司机 | 成人h动漫精品一区二区无码| av在线播放无码线| 日韩精品久久一区二区三| 免费无码一区无码东京热| 国产无遮挡又黄又爽不要vip软件| 99精品国产在热久久无| 成人亚洲国产精品一区不卡| 国产亚洲精品成人aa片新蒲金| 国产麻豆精品一区一区三区| 巨爆乳中文字幕爆乳区| 人妻熟女一二三区夜夜爱| 国产精品青青青高清在线| 久久婷婷五月综合97色直播| 国产无套精品一区二区三区| 亚洲一区二区三区激情在线| 亚洲嫩模喷白浆在线观看| 国产94在线 | 亚洲| 97精品亚成在人线免视频| 国产99视频精品免费视频36| 陆良县|