<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12
      Fork me on GitHub

      Nifi組件腳本開發(fā)—ExecuteScript 使用指南(二)

      Part 2 - FlowFile I/O 和 Error Handling

      flow File的IO

      NiFi 的 Flow files 由兩個主要部件組成:attributes 和 content. Attributes 是關(guān)于 content / flow file的元數(shù)據(jù), 我們在Nifi組件腳本開發(fā)—ExecuteScript 使用指南(一) 看到了如何使用 ExecuteScript 來操縱這個屬性. flow file 的內(nèi)容, 核心是一個 bytes集合,沒有繼承的 structure, schema, format, 等等. 不同的 NiFi processors 假定輸入的 flow files 具有特定的 schema/format (或者從 attributes確定如 "mime.type" 或者通過其他的方法). 這些 processors 然后按照假定的格式對內(nèi)容進行處理 (將返回 "failure" 到relationship,如果不是的話). 經(jīng)常 processors 將輸出  flow files 以特定的格式, 這在 processors' 描述中有相應(yīng)的說明.

      flow files 的 Input 和 Output (I/O) 通過 ProcessSession API 提供,通過 ExecuteScript  的"session" 變量來訪問。一個機制是傳遞一個 callback 對象到session.read() 或 session.write()的調(diào)用。對于FlowFile將創(chuàng)建一個 InputStream 和/或 OutputStream, 這個callback 對象將被激活,使用相應(yīng)的 callback 接口, 然后這個InputStream 和/或 OutputStream 的引用被傳遞到 callback函數(shù)使用. 這里有三個 callback 接口, 每一個有自己的應(yīng)用環(huán)境:

      InputStreamCallback

      這個 interface 用在 session.read( flowFile, inputStreamCallback) 方法中,提供一個 InputStream,用于讀取 flow file的內(nèi)容. 該 interface 有一個單一方法:

      void process(InputStream in) throws IOException

      該 interface 提供一個被管理的 input stream. 這個input stream自動打開和關(guān)閉,也可以手動關(guān)閉. 這是從 flow file讀取的方法, 并且不能被寫回去。

      OutputStreamCallback

      該 interface 被用于session.write( flowFile, outputStreamCallback) 方法,提供 OutputStream寫入內(nèi)容到 flow file. 該 interface 具有單一的方法:

      void process(OutputStream out) throws IOException

      該 interface 提供被管理的 output stream. 這個output stream 被自動打開和關(guān)閉,也可以手動關(guān)閉。 - 重要的一點是,如果任何 streams 包裝了這個 streams,所有打開的資源應(yīng)該被清理.

      例如, 在ExecuteScript中被創(chuàng)建數(shù)據(jù) , 來自于外部文件, 而不是一個 flow file. 然后你可以使用 session.create() 去創(chuàng)建一個新的FlowFile, 然后 session.write( flowFile, outputStreamCallback) 用于添加內(nèi)容.

      StreamCallback

      該 interface 用于 session.write( flowFile, streamCallback) 方法,提供 InputStream 和 OutputStream,為 flow file提供內(nèi)容的讀取和寫入. 該 interface 有一個單一的方法:

      void process(InputStream in, OutputStream out) throws IOException

      該 interface 提供被管理的 output stream. 這個output stream 被自動打開和關(guān)閉,也可以手動關(guān)閉。 - 重要的一點是,如果任何 streams 包裝了這個 streams,所有打開的資源應(yīng)該被清理.

      因為這些  callbacks 是 Java objects, 腳本將創(chuàng)建一個并且傳入 session 方法, 下面的方法將使用不同的腳本語言進行演示. 并且,這里還有其他的讀寫 flow files方法, 包括:

      • 使用 session.read(flowFile) 返回 InputStream. 取代 InputStreamCallback, 將返回 InputStream 用于讀取. 你必須 (close, e.g.) 手動管理 InputStream.
      • 使用 session.importFrom(inputStreamflowFile) 從 InputStream 寫入到 FlowFile. 這將替代 借助OutputStreamCallback的session.write() 的使用.

      從 flow file 中讀取數(shù)據(jù)

      需求:傳入連接執(zhí)行 ExecuteScript ,并且從隊列中得到 flow file 的內(nèi)容進行處理.

      方法:使用session的read(flowFileinputStreamCallback) 方法。一個 InputStreamCallback 對象需要被傳入 read() 方法. 注意到,因為InputStreamCallback 是一個對象, 內(nèi)容只在該對象中可見。 如果你需要在 read() 方法之外訪問, 需要使用更為全局化的變量. 這里的例子講來自flow file的全部內(nèi)容存儲到 String (使用 Apache Commons' IOUtils class)。

      注意: 對于大的 flow files, 這并不是最好的技術(shù)方法; 應(yīng)該只讀取需要的數(shù)據(jù),并按照適應(yīng)的方法處理。比如 SplitText, 你應(yīng)該一次讀一行并且在 InputStreamCallback中處理, 或者 session.read(flowFile) 方法 得到 InputStream 的引用,從而在 callback之外處理.

      例子

      Groovy:

      import org.apache.commons.io.IOUtils
      import java.nio.charset.StandardCharsets
      
      flowFile = session.get()
      
      if(!flowFile)return
      
      def text = ''
      // Cast a closure with an inputStream parameter to InputStreamCallback
      session.read(flowFile, {inputStream ->
      text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
      
      // Do something with text here
      } as InputStreamCallback)

      Jython:

      from org.apache.commons.io import IOUtils
      from java.nio.charset import StandardCharsets
      from org.apache.nifi.processor.io import InputStreamCallback
      
      # Define a subclass of InputStreamCallback for use in session.read()
      class PyInputStreamCallback(InputStreamCallback):
      def __init__(self):
          pass
      def process(self, inputStream):
          text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
      
      # Do something with text here
      # end class
      
      flowFile = session.get()
      if(flowFile != None):
      session.read(flowFile, PyInputStreamCallback())
      # implicit return at the end

      Javascript:

      var InputStreamCallback = Java.type("org.apache.nifi.processor.io.InputStreamCallback")
      var IOUtils = Java.type("org.apache.commons.io.IOUtils")
      var StandardCharsets = Java.type("java.nio.charset.StandardCharsets")
      
      var flowFile = session.get();
      
      if(flowFile != null) {
      // Create a new InputStreamCallback, passing in a function to define the interface method
      session.read(flowFile,new InputStreamCallback(function(inputStream) {
              var text = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
              // Do something with text here
          }));
      }

      JRuby:

      java_import org.apache.commons.io.IOUtils
      java_import org.apache.nifi.processor.io.InputStreamCallback
      
      # Define a subclass of InputStreamCallback for use in session.read()
      class JRubyInputStreamCallback
      include InputStreamCallback
      
      def process(inputStream)
          text = IOUtils.toString(inputStream)
          # Do something with text here
          end
      end
      
      jrubyInputStreamCallback = JRubyInputStreamCallback.new
      flowFile = session.get()
      if flowFile != nil
          session.read(flowFile, jrubyInputStreamCallback)
      end

      寫入數(shù)據(jù)至 flow file

      需求:為輸出的 flow file創(chuàng)建內(nèi)容.

      方法:使用session的write(flowFileoutputStreamCallback) 方法。一個OutputStreamCallback 對象需要傳遞給 write() 方法. 注意,因為 OutputStreamCallback 是一個對象, 因此內(nèi)容之災(zāi)對象內(nèi)部可見. 如果你需要在 write() 方法之外訪問, 使用更為全局化變量. 西面的例子寫入 String 到 flowFile。

      例子

      Groovy:

      import org.apache.commons.io.IOUtils
      import java.nio.charset.StandardCharsets
      
      flowFile = session.get()
      if(!flowFile) return
      
      def text = 'Hello world!'
      // Cast a closure with an outputStream parameter to OutputStreamCallback
      flowFile = session.write(flowFile, {outputStream ->
              outputStream.write(text.getBytes(StandardCharsets.UTF_8))
          } as OutputStreamCallback)

      Jython:

      from org.apache.commons.io import IOUtils
      from java.nio.charset import StandardCharsets
      from org.apache.nifi.processor.io import OutputStreamCallback
      
      # Define a subclass of OutputStreamCallback for use in session.write()
      class PyOutputStreamCallback(OutputStreamCallback):
      def __init__(self):
          pass
      
      def process(self, outputStream):
          outputStream.write(bytearray('Hello World!'.encode('utf-8')))
      # end class
      
      flowFile = session.get()
      
      if(flowFile != None):
          flowFile = session.write(flowFile, PyOutputStreamCallback())
      # implicit return at the end

      Javascript:

      var OutputStreamCallback = Java.type("org.apache.nifi.processor.io.OutputStreamCallback");
      var IOUtils = Java.type("org.apache.commons.io.IOUtils");
      var StandardCharsets = Java.type("java.nio.charset.StandardCharsets");
      var flowFile = session.get();
      
      if(flowFile != null) {
      // Create a new OutputStreamCallback, passing in a function to define the interface method
      flowFile = session.write(flowFile,new OutputStreamCallback(function(outputStream) {
              outputStream.write("Hello World!".getBytes(StandardCharsets.UTF_8))
          }));
      }

      JRuby:

      java_import org.apache.commons.io.IOUtils
      java_import java.nio.charset.StandardCharsets
      java_import org.apache.nifi.processor.io.OutputStreamCallback
      
      # Define a subclass of OutputStreamCallback for use in session.write()
      class JRubyOutputStreamCallback
      include OutputStreamCallback
      
      def process(outputStream)
          outputStream.write("Hello World!".to_java.getBytes(StandardCharsets::UTF_8))
      end
      end
      
      jrubyOutputStreamCallback = JRubyOutputStreamCallback.new
      flowFile = session.get()
      if flowFile != nil
          flowFile = session.write(flowFile, jrubyOutputStreamCallback)
      end

      覆蓋 flow file內(nèi)容

      需求:重用輸入 flow file但是希望修改內(nèi)容并傳遞到輸出的 flow file.

      方法:使用session的write(flowFilestreamCallback) 方法。一個StreamCallback 對象需要傳遞給 write() 方法. StreamCallback 同時提供了InputStream (從輸入的 flow file) 和 outputStream (下一版本的 flow file), 因此你可以使用InputStream去取得 flow file的當前內(nèi)容, 然后修改他們并且寫會到 flow file. 這將覆蓋 flow file 的內(nèi)容, 因此對于追加內(nèi)容要采用讀入內(nèi)容添加的方式, 或者使用不同的方法 (使用 session.append() 而不是session.write() )。

      注意,因為 StreamCallback 是一個對象, 因此內(nèi)容之災(zāi)對象內(nèi)部可見. 如果你需要在 write() 方法之外訪問, 使用更為全局化變量.

      以下這個例子將反轉(zhuǎn)輸入flowFile (假定為 String) 的內(nèi)容,并將反轉(zhuǎn)后的字符串寫入到新版的 flowFile.

      例子

      Groovy:

      import org.apache.commons.io.IOUtils
      import java.nio.charset.StandardCharsets
      
      flowFile = session.get()
      if(!flowFile) return
      
      def text = 'Hello world!'
      // Cast a closure with an inputStream and outputStream parameter to StreamCallback
      
      flowFile = session.write(flowFile, {inputStream, outputStream ->
              text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
              outputStream.write(text.reverse().getBytes(StandardCharsets.UTF_8))
          } as StreamCallback)
      
      session.transfer(flowFile, REL_SUCCESS)

      Jython:

      from org.apache.commons.io import IOUtils
      from java.nio.charset import StandardCharsets
      from org.apache.nifi.processor.io import StreamCallback
      
      # Define a subclass of StreamCallback for use in session.write()
      class PyStreamCallback(StreamCallback):
      def __init__(self):
      pass
      
      def process(self, inputStream, outputStream):
      text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
      outputStream.write(bytearray('Hello World!'[::-1].encode('utf-8')))
      # end class
      
      flowFile = session.get()
      
      if(flowFile != None):
          flowFile = session.write(flowFile, PyStreamCallback())
      
      # implicit return at the end

      Javascript:

      var StreamCallback = Java.type("org.apache.nifi.processor.io.StreamCallback");
      var IOUtils = Java.type("org.apache.commons.io.IOUtils");
      var StandardCharsets = Java.type("java.nio.charset.StandardCharsets");
      var flowFile = session.get();
      
      if(flowFile != null) {
      // Create a new StreamCallback, passing in a function to define the interface method
      flowFile = session.write(flowFile,new StreamCallback(function(inputStream, outputStream) {
          var text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
          outputStream.write(text.split("").reverse().join("").getBytes(StandardCharsets.UTF_8))
          }));
      }

      JRuby:

      java_import org.apache.commons.io.IOUtils
      java_import java.nio.charset.StandardCharsets
      java_import org.apache.nifi.processor.io.StreamCallback
      
      # Define a subclass of StreamCallback for use in session.write()
      
      class JRubyStreamCallback
      include StreamCallback
      def process(inputStream, outputStream)
          text = IOUtils.toString(inputStream)
          outputStream.write((text.reverse!).to_java.getBytes(StandardCharsets::UTF_8))
      end
      end
      
      jrubyStreamCallback = JRubyStreamCallback.new
      flowFile = session.get()
      if flowFile != nil
          flowFile = session.write(flowFile, jrubyStreamCallback)
      end

      處理錯誤

       

      需求:在 script ( data validation 或者出現(xiàn)一個 exception)運行時出現(xiàn)錯誤, 處理和拋出錯誤。

      方法:對于exceptions, 使用腳本語言的exception-handling 機制  (一般是try/catch 代碼塊). 對于 data validation, 可以使用類似的方法, 但是定義一個boolean 變量,如 "valid" 以及 if/else 語句,而不是try/catch 語句. ExecuteScript 定義了 "success" and "failure" relationships; 一般情況下,你的處理將轉(zhuǎn)移 "good" flow files 到 success,而 "bad" flow files 到 failure (記錄錯誤在后續(xù)的操作中)。

      例子

      Groovy:

      flowFile = session.get()
      
      if(!flowFile) return
      try {
          // Something that might throw an exception here
          // Last operation is transfer to success (failures handled in the catch block)
          session.transfer(flowFile, REL_SUCCESS)
      } catch(e) {
          log.error('Something went wrong', e)
          session.transfer(flowFile, REL_FAILURE)
      }

      Jython:

      flowFile = session.get()
      
      if(flowFile != None):
      try:
          # Something that might throw an exception here
          # Last operation is transfer to success (failures handled in the catch block)
          session.transfer(flowFile, REL_SUCCESS)
      except:
          log.error('Something went wrong', e)
          session.transfer(flowFile, REL_FAILURE)
      
      # implicit return at the end

      Javascript:

      var flowFile = session.get();
      if(flowFile != null) {
      try {
          // Something that might throw an exception here
          // Last operation is transfer to success (failures handled in the catch block)
          session.transfer(flowFile, REL_SUCCESS)
      } catch(e) {
          log.error('Something went wrong', e)
          session.transfer(flowFile, REL_FAILURE)
      }
      }

      JRuby:

      flowFile = session.get()
      
      if flowFile != nil
      begin
          # Something that might raise an exception here
          # Last operation is transfer to success (failures handled in the rescue block)
          session.transfer(flowFile, REL_SUCCESS)
          rescue Exception => e
          log.error('Something went wrong', e)
          session.transfer(flowFile, REL_FAILURE)
      end
      end

      上一篇:Nifi組件腳本開發(fā)—ExecuteScript 使用指南(一) 
      源:https://www.shangmayuan.com/a/0ba9c44310b04d1dad461790.html

      參考:http://nifi.apache.org/developer-guide.html

       

      1、Nifi:基本認識

      2、Nifi:基礎(chǔ)用法及頁面常識

      3、Nifi:ExcuseXXXScript組件的使用(一)

      4、Nifi:ExcuseXXXScript組件的使用(二)

      5、Nifi:ExcuseXXXScript組件的使用(三)

      6、Nifi:自定義處理器的開發(fā)

      7、Nifi:Nifi的Controller Service

       

       

      posted @ 2021-02-23 11:35  糖拌西紅柿  閱讀(2214)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 强开小雪的嫩苞又嫩又紧| 亚洲狠狠爱一区二区三区| 张北县| 国产综合一区二区三区麻豆| 国产成人亚洲精品狼色在线| 午夜视频免费试看| 亚洲av成人无网码天堂| 在线高清理伦片a| 久久中文字幕日韩无码视频 | 日韩国产精品中文字幕| 在线观看潮喷失禁大喷水无码| 国产女人水真多18毛片18精品 | 99精产国品一二三产品香蕉| 九九热精品视频在线免费| 18禁国产一区二区三区| 久久综合久中文字幕青草| 国产普通话刺激视频在线播放| 婷婷色综合成人成人网小说| 真实单亲乱l仑对白视频| 日韩av片无码一区二区不卡| 性色av不卡一区二区三区| 中文字日产幕码三区国产| 在线看国产精品自拍内射| 国产精品视频免费一区二区三区| 少妇人妻偷人精品一区二| 国产成人亚洲无码淙合青草| 玩两个丰满老熟女久久网| 人妻丝袜无码专区视频网站| 午夜精品区| 亚洲精品国产免费av| 亚洲欧洲一区二区免费| 国产精品中文字幕自拍| 高台县| 久久人妻国产精品| 四虎库影成人在线播放| 白嫩少妇激情无码| 日韩中文字幕综合第二页| 天气| 日本一区不卡高清更新二区| 国产办公室秘书无码精品99| 久久精品国产亚洲av久|