<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12
      Fork me on GitHub

      Nifi組件腳本開發(fā)—ExecuteScript 使用指南(一)

      Part 1 - 介紹 NiFi API 和 FlowFiles

        ExecuteScript 是一個萬能的處理器,允許用戶使用編程語言定義自己的數(shù)據(jù)處理功能, 在每一次 ExecuteScript processor 觸發(fā)時被調用。下面的變量綁定到腳本環(huán)境,以提供腳本中訪問 NiFi 組件環(huán)境:

        session: 是對processor的ProcessSession屬性的引用。session允許在 flow files 執(zhí)行下面的操作: create(), putAttribute(), transfer(), 像 read() 和 write()一樣。

        context: 是對 ProcessContext 的引用。可以用于檢索 processor 的屬性, 關系, Controller Services, 和 StateManager。

        log: 是對ComponentLog的引用。用于 log 消息到 NiFi系統(tǒng), 如 log.info('Hello world!')。

        REL_SUCCESS: 這是對 "success" relationship 的引用。這是從父類 (ExecuteScript)的靜態(tài)變量基礎來的, 但是一些引擎(如 Lua)不允許引用靜態(tài)成員, 只是一個為了方便的變量。

        REL_FAILURE: 這是對 "failure" relationship 的引用。與 REL_SUCCESS 一樣, 這是從父類 (ExecuteScript)的靜態(tài)變量基礎來的, 但是一些引擎(如 Lua)不允許引用靜態(tài)成員, 只是一個為了方便的變量。

        Dynamic Properties: 任何在ExecuteScript定義的動態(tài)屬性都作為變量集合到 PropertyValue 對象,對應于dynamic property。允許獲得property的 String 值 , 通過NiFi表達式進行求值,獲得相應的類型 (如 Boolean, 等等)。 因為動態(tài)屬性名稱成為腳本里的變量名, 你需要了解所選的腳本引擎的變量命名屬性。 例如, Groovy 不允許變量名中提供 (.) , 所以,如果 "my.property"作為動態(tài)屬性將會報錯。

      與這些變量名的交互通過 NiFi Java API進行, 下面的每一個案例將討論相應的API調用。 下面的案例執(zhí)行不同的函數(shù)操作 flow files, 如 reading/writing 屬性, 轉換為 relationship, logging, 等等。需要注意,這里的例子只是一些片段。舉例來說, 如果使用session.get()從隊列中獲取 flow file , 必須轉換為 relationship 或者移除, 否則將會引發(fā)錯誤。代碼片段應該是平面化的而且保持清晰,沒有容易引起混亂的代碼,僅用于演示概念,從而讓工作簡單。

      從incoming queue得到flow file

      從session中獲得flow file

      需求:從隊列中獲得輸入flow file,執(zhí)行 ExecuteScript 并進行處理。

      方法使用 session對象的get(). 該方法返回FlowFile,是下一個最高優(yōu)先級的FlowFile用于處理. 如果沒有 FlowFile 用于處理, 該方法將返回 null. 注意, 如果一個持續(xù)的FlowFiles流進入processor,也可能會返回null. 這在多個并發(fā)任務處理時會發(fā)生,此時其他任務已獲得了 FlowFiles. 如果腳本要求有一個 FlowFile才能繼續(xù)處理, 如果是session.get()得到null應該立即返回。

      例子

      Groovy:

      flowFile = session.get()
      if(!flowFile) return
      

      Jython:

      flowFile = session.get()
      if (flowFile != None):
      

      Javascript:

      var flowFile = session.get();
      if (flowFile != null) {
      // All processing code goes here
      }
      

      JRuby:

      flowFile = session.get()
      if flowFile != nil
      # All processing code goes here
      end
      

      得到多個 flow files

      需求:從queue(s)獲得多個flow files用于ExecuteScript處理

      方法使用session對象的get(maxResults) 方法. 該方法從工作隊列中返回最多 maxResults 個FlowFiles . 如果沒有 FlowFiles 可用, 一個空的list 將被返回 (而不是返回 null).。

      例子

      Groovy:

      flowFileList = session.get(100)
      if(!flowFileList.isEmpty()) {
          flowFileList.each { flowFile ->
          // Process each FlowFile here
          }
      }

      Jython:

      flowFileList = session.get(100)
      if not flowFileList.isEmpty():
          for flowFile in flowFileList:
              # Process each FlowFile here

      Javascript:

      flowFileList = session.get(100)
      if(!flowFileList.isEmpty()) {
          for each (var flowFile in flowFileList) {
              // Process each FlowFile here
          }
      }

      JRuby:

      flowFileList = session.get(100)
      if !(flowFileList.isEmpty())
          flowFileList.each { |flowFile|
              # Process each FlowFile here}
      end

      創(chuàng)建一個新的 flow files

      創(chuàng)建新Flow Files

      需求:創(chuàng)建一個新的 FlowFile 發(fā)送到下一步的 processor

      方法使用session的 create() 方法. 該方法返回 FlowFile 對象, 以用于后續(xù)的處理操作。

      例子

      Groovy:

      flowFile = session.create()
      // Additional processing here

      Jython:

      flowFile = session.create()
      # Additional processing here

      Javascript:

      var flowFile = session.create();
      // Additional processing here

      JRuby:

      flowFile = session.create()
      # Additional processing here

      從父級FlowFile創(chuàng)建新的 FlowFile

      需求:從已有的 FlowFile 創(chuàng)建新的flow file 發(fā)送到下一步的 processor

      方法使用session的 create(parentFlowFile) 方法,該方法獲得父級 FlowFile 的引用,然后返回新的派生 FlowFile 對象。新創(chuàng)建的 FlowFile 除UUID之外將繼承父級的所有屬性,同時該方法將自動創(chuàng)建一個 起源 FORK 事件或 起源 JOIN 事件,在 ProcessSession被提交的時候,取決于FlowFiles 是否從同一個parent創(chuàng)建。

      例子

      Groovy:

      flowFile = session.get()
      if(!flowFile) return
          newFlowFile = session.create(flowFile)
          // Additional processing here

      Jython:

      flowFile = session.get()
      if (flowFile != None):
          newFlowFile = session.create(flowFile)
          # Additional processing here

      Javascript:

      var flowFile = session.get();
      if (flowFile != null) {
          var newFlowFile = session.create(flowFile);
          // Additional processing here
      }

      JRuby:

      flowFile = session.get()
      if flowFile != nil
          newFlowFile = session.create(flowFile)
          # Additional processing here
      end

      flow file 的attributes操作

      從 flow file 得到屬性

      需求:獲得flow file 的屬性。

      方法使用FlowFile對象getAttribute(attributeKey) 。 該方法對于給定的attributeKey返回一個字符串值 , 如果沒有找到相應的key就返回null. 下面的例子演示返回FlowFile的 "filename" 屬性。

      例子

      Groovy:

      flowFile = session.get()
      
      if(!flowFile) return
      myAttr = flowFile.getAttribute('filename')

      Jython:

      flowFile = session.get()
      
      if (flowFile != None):
          myAttr = flowFile.getAttribute('filename')
          # implicit return at the end

      Javascript:

      var flowFile = session.get()
      
      if (flowFile != null) {
          var myAttr = flowFile.getAttribute('filename')
      }

      JRuby:

      flowFile = session.get()
      
      if flowFile != nil
      myAttr = flowFile.getAttribute('filename')
      end

      從 flow file得到所有的屬性

      需求:從flow file得到所有的屬性。

      方法使用FlowFile對象的getAttributes() 方法。 該方法返回 Map 數(shù)據(jù)結構,由字符串的 keys 和 values組成, 代表一個FlowFile的屬性的 key/value 值對。 下面的顯示如何遞歸顯示FlowFile的所有屬性的Map的值。

      例子

      Groovy:

      flowFile = session.get()
      
      if(!flowFile) return
      flowFile.getAttributes().each { key,value ->
          // Do something with the key/value pair
      }

      Jython:

      flowFile = session.get()
      
      if (flowFile != None):
          for key,value in flowFile.getAttributes().iteritems():
          # Do something with key and/or value
      
      # implicit return at the end

      Javascript:

      var flowFile = session.get()
      
      if (flowFile != null) {
          var attrs = flowFile.getAttributes();
          for each (var attrKey in attrs.keySet()) {
              // Do something with attrKey (the key) and/or attrs[attrKey] (the value)
          }
      }

      JRuby:

      flowFile = session.get()
      
      if flowFile != nil
          flowFile.getAttributes().each 
          { |key,value|
              # Do something with key and/or value
          }
      end

      添加屬性到 flow file

      需求:在已有的 flow file 上添加自己的屬性。

      方法使用session對象的 putAttribute(flowFileattributeKeyattributeValue) 方法。 該方法更新給定的 FlowFile's 屬性,使用給出的 key/value 對來進行。

      注意:對象的 "uuid" 屬性是固定的,并且不能修改; 如果key被命名為 "uuid", 將被忽略.

      這里的FlowFile 對象是不可改變的; 這意味著,如果通過API更新了 FlowFile 的屬性 (或其它的改變了) , 你將得到一個新版的FlowFile的新的引用。當轉換FlowFiles到relationships時這是非常重要的。你必須保持對FlowFile的最新版本的引用, 你必須轉換或者移除所有的FlowFiles的最后版本, 否則執(zhí)行時將會得到錯誤信息。經常情況下, 該用于存儲 FlowFile 引用變量將會被最后返回的版本覆蓋 (中間的 FlowFile 應用將會被自動拋棄). 在這個例子中,你可以看到當添加屬性時重用flowFile引用的技術。注意到當前的flowFile引用被傳遞給putAttribute() 方法. 這個結果FlowFile具有命名為 'myAttr'值為 'myValue'的屬性。如果你有一個對象,可以序列化為String. 最終, 請注意如果你添加了多個屬性, 最好創(chuàng)建一個Map,然后使用 putAllAttributes() 方法來進行賦值。

      例子

      Groovy:

      flowFile = session.get()
      if(!flowFile) return
      flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')

      Jython:

      flowFile = session.get()
      if (flowFile != None):
          flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')
          # implicit return at the end

      Javascript:

      var flowFile = session.get();
      if (flowFile != null) {
          flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')
      }

      JRuby:

      flowFile = session.get()
      if flowFile != nil
          flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')
      end

      添加多個屬性到一個flow file

      需求:向 flow file 添加多個自定義屬性。。

      方法使用 session對象的putAllAttributes(flowFileattributeMap) 方法。該方法更新給定的FlowFile's 屬性,以 key/value 對的方式存儲在Map中返回。

      注意:對象的 "uuid" 屬性是固定的,并且不能修改; 如果key被命名為 "uuid", 將被忽略.

       

      該技術創(chuàng)建了一個 Map (aka dictionary in Jython, hash in JRuby) 用于更新,然后調用putAllAttributes() 。這比對putAttribute() 對每一個 key/value 遍歷效率更高, 這將導致對每一個屬性調用時 FlowFile 都需要創(chuàng)建一個副本 (查看上面 FlowFile 不變性的討論)。下面例子中的Map包含兩個條目: myAttr1 和 myAttr2, 設為 '1' 并且第二個為 String (附著到方法簽名,對key和value都要求 String)。 注意到session.transfer() 在這里并未指定 (因此下面的代碼片段并不能工作), 查看下面的方法。

       

      例子

      Groovy:

      attrMap = ['myAttr1': '1', 'myAttr2': Integer.toString(2)]
      flowFile = session.get()
      if(!flowFile) return
      flowFile = session.putAllAttributes(flowFile, attrMap)

      Jython:

      attrMap = {'myAttr1':'1', 'myAttr2':str(2)}
      flowFile = session.get()
      if (flowFile != None):
          flowFile = session.putAllAttributes(flowFile, attrMap)
          # implicit return at the end

      Javascript:

      var number2 = 2;
      var attrMap = {'myAttr1':'1', 'myAttr2': number2.toString()}
      var flowFile = session.get()
      
      if (flowFile != null) {
          flowFile = session.putAllAttributes(flowFile, attrMap)
      }

      JRuby:

      attrMap = {'myAttr1' => '1', 'myAttr2' => 2.to_s}
      flowFile = session.get()
      if flowFile != nil
      flowFile = session.putAllAttributes(flowFile, attrMap)
      end

      轉換 flow file

      轉移一個flow file 到 relationship

      需求:在處理完flow file (new or incoming)之后, 你希望將flow file轉移到 relationship ("success" or "failure"). 在這個簡單的例子中,讓我們假定有一個變量叫做 "errorOccurred", 用于指示在哪種 relationship下 FlowFile 將被轉移。

      方法使用session對象的transfer(flowFilerelationship) 方法。基于給定的relationship,該方法將給定的FlowFile發(fā)送到適合的目標處理器隊列。如果relationship通向不止一個目標,F(xiàn)lowFile的狀態(tài)將被復制 ,從而每一個目標都將收到一個 FlowFile的拷貝,因此也將具有唯一的標識符UUID。

      注意:最后,ExecuteScript將執(zhí)行session.commit() 以進行操作的提交。你不需要在腳本內部執(zhí)行session.commit() 來執(zhí)行提交操作。

      例子

      Groovy:

      flowFile = session.get()
      
      if(!flowFile) return
      
      // Processing occurs here
      if(errorOccurred) {
          session.transfer(flowFile, REL_FAILURE)
      }
      else {
          session.transfer(flowFile, REL_SUCCESS)
      }

      Jython:

      flowFile = session.get()
      
      if (flowFile != None):
          # All processing code starts at this indent
          if errorOccurred:
              session.transfer(flowFile, REL_FAILURE)
          else:
              session.transfer(flowFile, REL_SUCCESS)
      # implicit return at the end

      Javascript:

      var flowFile = session.get();
      
      if (flowFile != null) {
          // All processing code goes here
          if(errorOccurred) {
              session.transfer(flowFile, REL_FAILURE)
          }
          else {
              session.transfer(flowFile, REL_SUCCESS)
          }
      }

      JRuby:

      flowFile = session.get()
      if flowFile != nil
          # All processing code goes here
          if errorOccurred
              session.transfer(flowFile, REL_FAILURE)
          else
              session.transfer(flowFile, REL_SUCCESS)
          end
      end

      日志 Logging

      發(fā)送消息到 log并制定日志級別

      需求:希望報告一些事件、消息并通過日志框架寫入。

      方法 使用 log 的方法(), trace(), debug(), info(), 或 error() 完成。這些方法可以是單個的字符串或者字符串數(shù)組對象, 或字符串后面跟著Throwable的對象數(shù)組。第一個用于簡單消息. 第二個用于一些動態(tài)對象(值)的log。在消息字符串中使用 "{}" 進行引用。這些用于對對象數(shù)組進行求值,當消息讀到 "Found these things: {} {} {}" 并且 Object array 是 ['Hello',1,true], 那么logged 消息將是 "Found these things: Hello 1 true",第三種logging方法帶一個 Throwable 參數(shù), 這在例外被捕捉到并且希望日志記錄時使用。

      例子

      Groovy:

      log.info('Found these things: {} {} {}', ['Hello',1,true] as Object[])

      Jython:

      from java.lang import Object
      from jarray import array
      
      objArray = ['Hello',1,True]
      javaArray = array(objArray, Object)
      log.info('Found these things: {} {} {}', javaArray)

      Javascript:

      var ObjectArrayType = Java.type("java.lang.Object[]");
      var objArray = new ObjectArrayType(3);
      
      objArray[0] = 'Hello';
      objArray[1] = 1;
      objArray[2] = true;
      log.info('Found these things: {} {} {}', objArray)

      JRuby:

      log.info('Found these things: {} {} {}', ['Hello',1,true].to_java)
       
       
       
      源:https://www.shangmayuan.com/a/0ba9c44310b04d1dad461790.html
      參考:http://nifi.apache.org/developer-guide.html
       
       

       

      1、Nifi:基本認識

      2、Nifi:基礎用法及頁面常識

      3、Nifi:ExcuseXXXScript組件的使用(一)

      4、Nifi:ExcuseXXXScript組件的使用(二)

      5、Nifi:ExcuseXXXScript組件的使用(三)

      6、Nifi:自定義處理器的開發(fā)

      7、Nifi:Nifi的Controller Service

       

       
       
       
       
      posted @ 2021-01-26 16:36  糖拌西紅柿  閱讀(3867)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 国产麻豆成人传媒免费观看| 国产线播放免费人成视频播放| 国产精品v片在线观看不卡| 国产成人精品视频不卡| 日韩乱码卡一卡2卡三卡四| 亚洲国产成人av在线观看| 亚洲欧美日韩第一页| 亚洲夂夂婷婷色拍ww47 | 国产成人卡2卡3卡4乱码| 精品999日本久久久影院| 国内精品一区二区不卡| 日日碰狠狠添天天爽不卡| 人妻少妇偷人无码视频| 欧美色欧美亚洲另类二区| 性色av无码不卡中文字幕| 日本高清中文字幕免费一区二区| 亚洲人成色77777在线观看| 日韩一区二区黄色一级片| 麻豆国产AV剧情偷闻女邻居内裤| 喀喇沁旗| 欧美日韩国产图片区一区| 久久精品夜夜夜夜夜久久| 丁香婷婷色综合激情五月| 毛片在线播放网址| 免费AV片在线观看网址| 亚洲欧美人成人综合在线播放| 精品少妇av蜜臀av| 国产亚洲视频在线播放香蕉| 99网友自拍视频在线| 四虎在线播放亚洲成人| 欧美 亚洲 中文 国产 综合| 国产日韩精品一区在线不卡| 国产在线播放专区av| 亚洲天堂一区二区三区四区| 色九九视频| 亚洲无人区码一二三四区| 四虎国产精品永久在线下载| 麻豆亚洲精品一区二区| 韩国午夜福利片在线观看| 亚洲国产午夜精品福利| 日韩在线观看 一区二区|