Nifi組件腳本開發(fā)—ExecuteScript 使用指南(二)
Part 2 - FlowFile I/O 和 Error Handling
flow File的IO
NiFi 的 Flow files 由兩個主要部件組成:attributes 和 content. Attributes 是關(guān)于 content / flow file的元數(shù)據(jù), 我們在Nifi組件腳本開發(fā)—ExecuteScript 使用指南(一) 看到了如何使用 ExecuteScript 來操縱這個屬性. flow file 的內(nèi)容, 核心是一個 bytes集合,沒有繼承的 structure, schema, format, 等等. 不同的 NiFi processors 假定輸入的 flow files 具有特定的 schema/format (或者從 attributes確定如 "mime.type" 或者通過其他的方法). 這些 processors 然后按照假定的格式對內(nèi)容進行處理 (將返回 "failure" 到relationship,如果不是的話). 經(jīng)常 processors 將輸出 flow files 以特定的格式, 這在 processors' 描述中有相應(yīng)的說明.
flow files 的 Input 和 Output (I/O) 通過 ProcessSession API 提供,通過 ExecuteScript 的"session" 變量來訪問。一個機制是傳遞一個 callback 對象到session.read() 或 session.write()的調(diào)用。對于FlowFile將創(chuàng)建一個 InputStream 和/或 OutputStream, 這個callback 對象將被激活,使用相應(yīng)的 callback 接口, 然后這個InputStream 和/或 OutputStream 的引用被傳遞到 callback函數(shù)使用. 這里有三個 callback 接口, 每一個有自己的應(yīng)用環(huán)境:
InputStreamCallback
這個 interface 用在 session.read( flowFile, inputStreamCallback) 方法中,提供一個 InputStream,用于讀取 flow file的內(nèi)容. 該 interface 有一個單一方法:
void process(InputStream in) throws IOException
該 interface 提供一個被管理的 input stream. 這個input stream自動打開和關(guān)閉,也可以手動關(guān)閉. 這是從 flow file讀取的方法, 并且不能被寫回去。
OutputStreamCallback
該 interface 被用于session.write( flowFile, outputStreamCallback) 方法,提供 OutputStream寫入內(nèi)容到 flow file. 該 interface 具有單一的方法:
void process(OutputStream out) throws IOException
該 interface 提供被管理的 output stream. 這個output stream 被自動打開和關(guān)閉,也可以手動關(guān)閉。 - 重要的一點是,如果任何 streams 包裝了這個 streams,所有打開的資源應(yīng)該被清理.
例如, 在ExecuteScript中被創(chuàng)建數(shù)據(jù) , 來自于外部文件, 而不是一個 flow file. 然后你可以使用 session.create() 去創(chuàng)建一個新的FlowFile, 然后 session.write( flowFile, outputStreamCallback) 用于添加內(nèi)容.
StreamCallback
該 interface 用于 session.write( flowFile, streamCallback) 方法,提供 InputStream 和 OutputStream,為 flow file提供內(nèi)容的讀取和寫入. 該 interface 有一個單一的方法:
void process(InputStream in, OutputStream out) throws IOException
該 interface 提供被管理的 output stream. 這個output stream 被自動打開和關(guān)閉,也可以手動關(guān)閉。 - 重要的一點是,如果任何 streams 包裝了這個 streams,所有打開的資源應(yīng)該被清理.
因為這些 callbacks 是 Java objects, 腳本將創(chuàng)建一個并且傳入 session 方法, 下面的方法將使用不同的腳本語言進行演示. 并且,這里還有其他的讀寫 flow files方法, 包括:
- 使用 session.read(flowFile) 返回 InputStream. 取代 InputStreamCallback, 將返回 InputStream 用于讀取. 你必須 (close, e.g.) 手動管理 InputStream.
- 使用 session.importFrom(inputStream, flowFile) 從 InputStream 寫入到 FlowFile. 這將替代 借助OutputStreamCallback的session.write() 的使用.
從 flow file 中讀取數(shù)據(jù)
需求:傳入連接執(zhí)行 ExecuteScript ,并且從隊列中得到 flow file 的內(nèi)容進行處理.
方法:使用session的read(flowFile, inputStreamCallback) 方法。一個 InputStreamCallback 對象需要被傳入 read() 方法. 注意到,因為InputStreamCallback 是一個對象, 內(nèi)容只在該對象中可見。 如果你需要在 read() 方法之外訪問, 需要使用更為全局化的變量. 這里的例子講來自flow file的全部內(nèi)容存儲到 String (使用 Apache Commons' IOUtils class)。
注意: 對于大的 flow files, 這并不是最好的技術(shù)方法; 應(yīng)該只讀取需要的數(shù)據(jù),并按照適應(yīng)的方法處理。比如 SplitText, 你應(yīng)該一次讀一行并且在 InputStreamCallback中處理, 或者 session.read(flowFile) 方法 得到 InputStream 的引用,從而在 callback之外處理.
例子:
Groovy:
import org.apache.commons.io.IOUtils import java.nio.charset.StandardCharsets flowFile = session.get() if(!flowFile)return def text = '' // Cast a closure with an inputStream parameter to InputStreamCallback session.read(flowFile, {inputStream -> text = IOUtils.toString(inputStream, StandardCharsets.UTF_8) // Do something with text here } as InputStreamCallback)
Jython:
from org.apache.commons.io import IOUtils from java.nio.charset import StandardCharsets from org.apache.nifi.processor.io import InputStreamCallback # Define a subclass of InputStreamCallback for use in session.read() class PyInputStreamCallback(InputStreamCallback): def __init__(self): pass def process(self, inputStream): text = IOUtils.toString(inputStream, StandardCharsets.UTF_8) # Do something with text here # end class flowFile = session.get() if(flowFile != None): session.read(flowFile, PyInputStreamCallback()) # implicit return at the end
Javascript:
var InputStreamCallback = Java.type("org.apache.nifi.processor.io.InputStreamCallback") var IOUtils = Java.type("org.apache.commons.io.IOUtils") var StandardCharsets = Java.type("java.nio.charset.StandardCharsets") var flowFile = session.get(); if(flowFile != null) { // Create a new InputStreamCallback, passing in a function to define the interface method session.read(flowFile,new InputStreamCallback(function(inputStream) { var text = IOUtils.toString(inputStream, StandardCharsets.UTF_8); // Do something with text here })); }
JRuby:
java_import org.apache.commons.io.IOUtils java_import org.apache.nifi.processor.io.InputStreamCallback # Define a subclass of InputStreamCallback for use in session.read() class JRubyInputStreamCallback include InputStreamCallback def process(inputStream) text = IOUtils.toString(inputStream) # Do something with text here end end jrubyInputStreamCallback = JRubyInputStreamCallback.new flowFile = session.get() if flowFile != nil session.read(flowFile, jrubyInputStreamCallback) end
寫入數(shù)據(jù)至 flow file
需求:為輸出的 flow file創(chuàng)建內(nèi)容.
方法:使用session的write(flowFile, outputStreamCallback) 方法。一個OutputStreamCallback 對象需要傳遞給 write() 方法. 注意,因為 OutputStreamCallback 是一個對象, 因此內(nèi)容之災(zāi)對象內(nèi)部可見. 如果你需要在 write() 方法之外訪問, 使用更為全局化變量. 西面的例子寫入 String 到 flowFile。
例子:
Groovy:
import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
flowFile = session.get()
if(!flowFile) return
def text = 'Hello world!'
// Cast a closure with an outputStream parameter to OutputStreamCallback
flowFile = session.write(flowFile, {outputStream ->
outputStream.write(text.getBytes(StandardCharsets.UTF_8))
} as OutputStreamCallback)
Jython:
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import OutputStreamCallback
# Define a subclass of OutputStreamCallback for use in session.write()
class PyOutputStreamCallback(OutputStreamCallback):
def __init__(self):
pass
def process(self, outputStream):
outputStream.write(bytearray('Hello World!'.encode('utf-8')))
# end class
flowFile = session.get()
if(flowFile != None):
flowFile = session.write(flowFile, PyOutputStreamCallback())
# implicit return at the end
Javascript:
var OutputStreamCallback = Java.type("org.apache.nifi.processor.io.OutputStreamCallback");
var IOUtils = Java.type("org.apache.commons.io.IOUtils");
var StandardCharsets = Java.type("java.nio.charset.StandardCharsets");
var flowFile = session.get();
if(flowFile != null) {
// Create a new OutputStreamCallback, passing in a function to define the interface method
flowFile = session.write(flowFile,new OutputStreamCallback(function(outputStream) {
outputStream.write("Hello World!".getBytes(StandardCharsets.UTF_8))
}));
}
JRuby:
java_import org.apache.commons.io.IOUtils
java_import java.nio.charset.StandardCharsets
java_import org.apache.nifi.processor.io.OutputStreamCallback
# Define a subclass of OutputStreamCallback for use in session.write()
class JRubyOutputStreamCallback
include OutputStreamCallback
def process(outputStream)
outputStream.write("Hello World!".to_java.getBytes(StandardCharsets::UTF_8))
end
end
jrubyOutputStreamCallback = JRubyOutputStreamCallback.new
flowFile = session.get()
if flowFile != nil
flowFile = session.write(flowFile, jrubyOutputStreamCallback)
end
覆蓋 flow file內(nèi)容
需求:重用輸入 flow file但是希望修改內(nèi)容并傳遞到輸出的 flow file.
方法:使用session的write(flowFile, streamCallback) 方法。一個StreamCallback 對象需要傳遞給 write() 方法. StreamCallback 同時提供了InputStream (從輸入的 flow file) 和 outputStream (下一版本的 flow file), 因此你可以使用InputStream去取得 flow file的當前內(nèi)容, 然后修改他們并且寫會到 flow file. 這將覆蓋 flow file 的內(nèi)容, 因此對于追加內(nèi)容要采用讀入內(nèi)容添加的方式, 或者使用不同的方法 (使用 session.append() 而不是session.write() )。
注意,因為 StreamCallback 是一個對象, 因此內(nèi)容之災(zāi)對象內(nèi)部可見. 如果你需要在 write() 方法之外訪問, 使用更為全局化變量.
以下這個例子將反轉(zhuǎn)輸入flowFile (假定為 String) 的內(nèi)容,并將反轉(zhuǎn)后的字符串寫入到新版的 flowFile.
例子:
Groovy:
import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
flowFile = session.get()
if(!flowFile) return
def text = 'Hello world!'
// Cast a closure with an inputStream and outputStream parameter to StreamCallback
flowFile = session.write(flowFile, {inputStream, outputStream ->
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
outputStream.write(text.reverse().getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
session.transfer(flowFile, REL_SUCCESS)
Jython:
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
# Define a subclass of StreamCallback for use in session.write()
class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
outputStream.write(bytearray('Hello World!'[::-1].encode('utf-8')))
# end class
flowFile = session.get()
if(flowFile != None):
flowFile = session.write(flowFile, PyStreamCallback())
# implicit return at the end
Javascript:
var StreamCallback = Java.type("org.apache.nifi.processor.io.StreamCallback");
var IOUtils = Java.type("org.apache.commons.io.IOUtils");
var StandardCharsets = Java.type("java.nio.charset.StandardCharsets");
var flowFile = session.get();
if(flowFile != null) {
// Create a new StreamCallback, passing in a function to define the interface method
flowFile = session.write(flowFile,new StreamCallback(function(inputStream, outputStream) {
var text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
outputStream.write(text.split("").reverse().join("").getBytes(StandardCharsets.UTF_8))
}));
}
JRuby:
java_import org.apache.commons.io.IOUtils
java_import java.nio.charset.StandardCharsets
java_import org.apache.nifi.processor.io.StreamCallback
# Define a subclass of StreamCallback for use in session.write()
class JRubyStreamCallback
include StreamCallback
def process(inputStream, outputStream)
text = IOUtils.toString(inputStream)
outputStream.write((text.reverse!).to_java.getBytes(StandardCharsets::UTF_8))
end
end
jrubyStreamCallback = JRubyStreamCallback.new
flowFile = session.get()
if flowFile != nil
flowFile = session.write(flowFile, jrubyStreamCallback)
end
處理錯誤
需求:在 script ( data validation 或者出現(xiàn)一個 exception)運行時出現(xiàn)錯誤, 處理和拋出錯誤。
方法:對于exceptions, 使用腳本語言的exception-handling 機制 (一般是try/catch 代碼塊). 對于 data validation, 可以使用類似的方法, 但是定義一個boolean 變量,如 "valid" 以及 if/else 語句,而不是try/catch 語句. ExecuteScript 定義了 "success" and "failure" relationships; 一般情況下,你的處理將轉(zhuǎn)移 "good" flow files 到 success,而 "bad" flow files 到 failure (記錄錯誤在后續(xù)的操作中)。
例子:
Groovy:
flowFile = session.get()
if(!flowFile) return
try {
// Something that might throw an exception here
// Last operation is transfer to success (failures handled in the catch block)
session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
log.error('Something went wrong', e)
session.transfer(flowFile, REL_FAILURE)
}
Jython:
flowFile = session.get()
if(flowFile != None):
try:
# Something that might throw an exception here
# Last operation is transfer to success (failures handled in the catch block)
session.transfer(flowFile, REL_SUCCESS)
except:
log.error('Something went wrong', e)
session.transfer(flowFile, REL_FAILURE)
# implicit return at the end
Javascript:
var flowFile = session.get();
if(flowFile != null) {
try {
// Something that might throw an exception here
// Last operation is transfer to success (failures handled in the catch block)
session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
log.error('Something went wrong', e)
session.transfer(flowFile, REL_FAILURE)
}
}
JRuby:
flowFile = session.get()
if flowFile != nil
begin
# Something that might raise an exception here
# Last operation is transfer to success (failures handled in the rescue block)
session.transfer(flowFile, REL_SUCCESS)
rescue Exception => e
log.error('Something went wrong', e)
session.transfer(flowFile, REL_FAILURE)
end
end
上一篇:Nifi組件腳本開發(fā)—ExecuteScript 使用指南(一)
源:https://www.shangmayuan.com/a/0ba9c44310b04d1dad461790.html
1、Nifi:基本認識
2、Nifi:基礎(chǔ)用法及頁面常識
3、Nifi:ExcuseXXXScript組件的使用(一)
4、Nifi:ExcuseXXXScript組件的使用(二)
5、Nifi:ExcuseXXXScript組件的使用(三)
6、Nifi:自定義處理器的開發(fā)
7、Nifi:Nifi的Controller Service

浙公網(wǎng)安備 33010602011771號